summaryrefslogtreecommitdiff
path: root/client/src/database-connection.ts
diff options
context:
space:
mode:
Diffstat (limited to 'client/src/database-connection.ts')
-rw-r--r--client/src/database-connection.ts120
1 files changed, 105 insertions, 15 deletions
diff --git a/client/src/database-connection.ts b/client/src/database-connection.ts
index 3cf733c..f37b298 100644
--- a/client/src/database-connection.ts
+++ b/client/src/database-connection.ts
@@ -1,7 +1,28 @@
import { Config } from './config.model'
+import { getKey } from '../../lib/util'
+import { encodePath } from '../../lib/path'
import { DatabaseConnection } from './database-connection.model'
+import { DatabaseChange } from '../../lib/database-change.model'
+import { Body } from 'node-fetch'
-const IS_NODE = true
+import WebSocket from 'ws'
+
+const objectToQuery = (query: Object): string =>
+ Object.values(query).reduce(
+ (acc: string, [key, val]: [string, string]) => acc + `${key}=${val}&`,
+ '?'
+ )
+
+// each path has one or more callbacks
+// that must be fired with each change
+interface CallbackTable {
+ [path: string]: CallbackRef[]
+}
+
+interface CallbackRef {
+ id: string
+ callback: (e: Object) => void
+}
/**
* Client side implementation is implemented using
@@ -10,31 +31,100 @@ const IS_NODE = true
*/
export const dbFactory = (config: Config): DatabaseConnection => {
// TODO figure out how to patch
- // in native fetch instead
+ // in native fetch/websocke instead
// when this is running in the browser
- const send = require('node-fetch')
+ const nodeFetch = require('node-fetch')
+ const WebSocket = require('ws')
+
+ let ws: WebSocket | null
+ let subCallbacks: CallbackTable = {}
+
+ const send = (method: string) => (
+ route: string,
+ body: Object | void
+ ): Promise<Body> => {
+ const isGet = method === 'GET'
+ if (isGet && body) {
+ route += objectToQuery(body)
+ }
+ return nodeFetch(route, {
+ method,
+ ...(isGet ? undefined : { body: JSON.stringify(body) }),
+ headers: {
+ Accept: 'application/json',
+ 'Content-Type': 'application/json'
+ }
+ })
+ }
+
+ const hget = (route: string, body: Object | void) =>
+ send('GET')(route, body).then((r: any) => r.json())
+ const hpost = send('POST')
+
const write = (path: string, toWrite: Object | null) => {
const { httpUrl } = config
- const body = JSON.stringify({
+ return hpost(`${httpUrl}/write`, {
path,
toWrite
})
- return send(`${httpUrl}/write`, {
- method: 'POST',
- body,
- headers: {
- Accept: 'application/json',
- 'Content-Type': 'application/json'
+ }
+
+ const read = (path: string): Promise<any> => {
+ const { httpUrl } = config
+ return hget(`${httpUrl}/read/${encodePath(path)}`)
+ }
+
+ const socketMessageHandler = (event: {
+ data: any
+ type: string
+ target: WebSocket
+ }) => {
+ const dbChange: DatabaseChange = JSON.parse(event.data)
+ const callbackRefs = subCallbacks[dbChange.path]
+ if (callbackRefs && callbackRefs.length) {
+ for (let ref of callbackRefs) {
+ ref.callback(dbChange.change)
}
- })
+ }
}
+
return {
- async init() {},
- async close() {},
- async subscribe(path: string, callback: (e: any) => Promise<any>) {
- return () => {}
+ init() {
+ return new Promise(resolve => {
+ ws = new WebSocket(config.wsUrl)
+ // @ts-ignore
+ ws.addEventListener('message', socketMessageHandler)
+ // @ts-ignore
+ ws.addEventListener('open', () => resolve())
+ })
+ },
+ close() {
+ if (ws) {
+ ws.removeEventListener('message', socketMessageHandler)
+ ws.close()
+ ws = null
+ }
+ },
+ async subscribe(path: string, callback: (e: Object) => void) {
+ const id = getKey('callbackref')
+ const ref: CallbackRef = {
+ id,
+ callback
+ }
+ if (!subCallbacks[path]) {
+ const { httpUrl } = config
+ subCallbacks[path] = [ref]
+ await hpost(`${httpUrl}/subscribe`, { path })
+ } else subCallbacks[path].push(ref)
+ // return a fucntion
+ // to remove our new sub from
+ // the table
+ return () => {
+ subCallbacks[path] = subCallbacks[path].filter(k => k.id !== id)
+ }
},
write,
+ read,
async remove(path: string) {
write(path, null)
}