diff options
Diffstat (limited to 'client/src/database-connection.ts')
-rw-r--r-- | client/src/database-connection.ts | 120 |
1 files changed, 105 insertions, 15 deletions
diff --git a/client/src/database-connection.ts b/client/src/database-connection.ts index 3cf733c..f37b298 100644 --- a/client/src/database-connection.ts +++ b/client/src/database-connection.ts @@ -1,7 +1,28 @@ import { Config } from './config.model' +import { getKey } from '../../lib/util' +import { encodePath } from '../../lib/path' import { DatabaseConnection } from './database-connection.model' +import { DatabaseChange } from '../../lib/database-change.model' +import { Body } from 'node-fetch' -const IS_NODE = true +import WebSocket from 'ws' + +const objectToQuery = (query: Object): string => + Object.values(query).reduce( + (acc: string, [key, val]: [string, string]) => acc + `${key}=${val}&`, + '?' + ) + +// each path has one or more callbacks +// that must be fired with each change +interface CallbackTable { + [path: string]: CallbackRef[] +} + +interface CallbackRef { + id: string + callback: (e: Object) => void +} /** * Client side implementation is implemented using @@ -10,31 +31,100 @@ const IS_NODE = true */ export const dbFactory = (config: Config): DatabaseConnection => { // TODO figure out how to patch - // in native fetch instead + // in native fetch/websocke instead // when this is running in the browser - const send = require('node-fetch') + const nodeFetch = require('node-fetch') + const WebSocket = require('ws') + + let ws: WebSocket | null + let subCallbacks: CallbackTable = {} + + const send = (method: string) => ( + route: string, + body: Object | void + ): Promise<Body> => { + const isGet = method === 'GET' + if (isGet && body) { + route += objectToQuery(body) + } + return nodeFetch(route, { + method, + ...(isGet ? undefined : { body: JSON.stringify(body) }), + headers: { + Accept: 'application/json', + 'Content-Type': 'application/json' + } + }) + } + + const hget = (route: string, body: Object | void) => + send('GET')(route, body).then((r: any) => r.json()) + const hpost = send('POST') + const write = (path: string, toWrite: Object | null) => { const { httpUrl } = config - const body = JSON.stringify({ + return hpost(`${httpUrl}/write`, { path, toWrite }) - return send(`${httpUrl}/write`, { - method: 'POST', - body, - headers: { - Accept: 'application/json', - 'Content-Type': 'application/json' + } + + const read = (path: string): Promise<any> => { + const { httpUrl } = config + return hget(`${httpUrl}/read/${encodePath(path)}`) + } + + const socketMessageHandler = (event: { + data: any + type: string + target: WebSocket + }) => { + const dbChange: DatabaseChange = JSON.parse(event.data) + const callbackRefs = subCallbacks[dbChange.path] + if (callbackRefs && callbackRefs.length) { + for (let ref of callbackRefs) { + ref.callback(dbChange.change) } - }) + } } + return { - async init() {}, - async close() {}, - async subscribe(path: string, callback: (e: any) => Promise<any>) { - return () => {} + init() { + return new Promise(resolve => { + ws = new WebSocket(config.wsUrl) + // @ts-ignore + ws.addEventListener('message', socketMessageHandler) + // @ts-ignore + ws.addEventListener('open', () => resolve()) + }) + }, + close() { + if (ws) { + ws.removeEventListener('message', socketMessageHandler) + ws.close() + ws = null + } + }, + async subscribe(path: string, callback: (e: Object) => void) { + const id = getKey('callbackref') + const ref: CallbackRef = { + id, + callback + } + if (!subCallbacks[path]) { + const { httpUrl } = config + subCallbacks[path] = [ref] + await hpost(`${httpUrl}/subscribe`, { path }) + } else subCallbacks[path].push(ref) + // return a fucntion + // to remove our new sub from + // the table + return () => { + subCallbacks[path] = subCallbacks[path].filter(k => k.id !== id) + } }, write, + read, async remove(path: string) { write(path, null) } |