diff --git a/.eslintrc.json b/.eslintrc.json index 000419f..0471021 100644 --- a/.eslintrc.json +++ b/.eslintrc.json @@ -18,6 +18,19 @@ "no-var": "error", "no-console": "off", "@typescript-eslint/camelcase": "off", - "@typescript-eslint/interface-name-prefix": "off" + "@typescript-eslint/interface-name-prefix": "off", + "@typescript-eslint/member-delimiter-style": [ + "error", + { + "multiline": { + "delimiter": "semi", + "requireLast": true + }, + "singleline": { + "delimiter": "semi", + "requireLast": true + } + } + ] } } \ No newline at end of file diff --git a/package-lock.json b/package-lock.json index 7c57bca..3f43aaa 100644 --- a/package-lock.json +++ b/package-lock.json @@ -3123,6 +3123,15 @@ } } }, + "mock-socket": { + "version": "8.0.5", + "resolved": "https://registry.npmjs.org/mock-socket/-/mock-socket-8.0.5.tgz", + "integrity": "sha512-dE2EbcxJKQCeYLZSsI7BAiMZCe/bHbJ2LHb5aGwUuDmfoOINEJ8QI6qYJ85NHsSNkNa90F3s6onZcmt/+MppFA==", + "dev": true, + "requires": { + "url-parse": "^1.2.0" + } + }, "ms": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/ms/-/ms-2.0.0.tgz", @@ -3700,6 +3709,12 @@ "resolved": "https://registry.npmjs.org/qs/-/qs-6.7.0.tgz", "integrity": "sha512-VCdBRNFTX1fyE7Nb6FYoURo/SPe62QCaAyzJvUjwRaIsc+NePBEniHlvxFmmX56+HZphIGtV0XeCirBtpDrTyQ==" }, + "querystringify": { + "version": "2.1.1", + "resolved": "https://registry.npmjs.org/querystringify/-/querystringify-2.1.1.tgz", + "integrity": "sha512-w7fLxIRCRT7U8Qu53jQnJyPkYZIaR4n5151KMfcJlO/A9397Wxb1amJvROTK6TOnp7PfoAmg/qXiNHI+08jRfA==", + "dev": true + }, "range-parser": { "version": "1.2.1", "resolved": "https://registry.npmjs.org/range-parser/-/range-parser-1.2.1.tgz", @@ -3848,6 +3863,12 @@ "integrity": "sha512-NKN5kMDylKuldxYLSUfrbo5Tuzh4hd+2E8NPPX02mZtn1VuREQToYe/ZdlJy+J3uCpfaiGF05e7B8W0iXbQHmg==", "dev": true }, + "requires-port": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/requires-port/-/requires-port-1.0.0.tgz", + "integrity": "sha1-kl0mAdOaxIXgkc8NpcbmlNw9yv8=", + "dev": true + }, "resolve": { "version": "1.12.0", "resolved": "https://registry.npmjs.org/resolve/-/resolve-1.12.0.tgz", @@ -4776,6 +4797,16 @@ "integrity": "sha1-2pN/emLiH+wf0Y1Js1wpNQZ6bHI=", "dev": true }, + "url-parse": { + "version": "1.4.7", + "resolved": "https://registry.npmjs.org/url-parse/-/url-parse-1.4.7.tgz", + "integrity": "sha512-d3uaVyzDB9tQoSXFvuSUNFibTd9zxd2bkVrDRvF5TmvWWQwqE4lgYJ5m+x1DbecWkw+LK4RNl2CU1hHuOKPVlg==", + "dev": true, + "requires": { + "querystringify": "^2.1.1", + "requires-port": "^1.0.0" + } + }, "url-parse-lax": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/url-parse-lax/-/url-parse-lax-1.0.0.tgz", diff --git a/package.json b/package.json index f4f1b03..65b2d29 100644 --- a/package.json +++ b/package.json @@ -47,6 +47,7 @@ "chai": "^4.2.0", "eslint": "^6.7.2", "mocha": "^6.2.2", + "mock-socket": "8.0.5", "nodemon": "1.19.1", "npm-run-all": "4.1.5", "rimraf": "3.0.0", diff --git a/src/messageHandler/handlers/transmission/index.ts b/src/messageHandler/handlers/transmission/index.ts index defa00f..bd31c70 100644 --- a/src/messageHandler/handlers/transmission/index.ts +++ b/src/messageHandler/handlers/transmission/index.ts @@ -42,7 +42,9 @@ export const TransmissionHandler = ({ realm }: { realm: IRealm; }): (client: ICl } else { // Wait for this client to connect/reconnect (XHR) for important // messages. - if (type !== MessageType.LEAVE && type !== MessageType.EXPIRE && dstId) { + const ignoredTypes = [MessageType.LEAVE, MessageType.EXPIRE]; + + if (!ignoredTypes.includes(type) && dstId) { realm.addMessageToQueue(dstId, message); } else if (type === MessageType.LEAVE && !dstId) { realm.removeClientById(srcId); diff --git a/src/services/messagesExpire/index.ts b/src/services/messagesExpire/index.ts index 9c3722b..cf94a38 100644 --- a/src/services/messagesExpire/index.ts +++ b/src/services/messagesExpire/index.ts @@ -8,16 +8,18 @@ export interface IMessagesExpire { stopMessagesExpiration(): void; } +type CustomConfig = Pick; + export class MessagesExpire implements IMessagesExpire { private readonly realm: IRealm; - private readonly config: IConfig; + private readonly config: CustomConfig; private readonly messageHandler: IMessageHandler; private timeoutId: NodeJS.Timeout | null = null; constructor({ realm, config, messageHandler }: { realm: IRealm; - config: IConfig; + config: CustomConfig; messageHandler: IMessageHandler; }) { this.realm = realm; diff --git a/src/services/webSocketServer/index.ts b/src/services/webSocketServer/index.ts index d345a31..de96049 100644 --- a/src/services/webSocketServer/index.ts +++ b/src/services/webSocketServer/index.ts @@ -18,26 +18,32 @@ interface IAuthParams { key?: string; } +type CustomConfig = Pick; + +const WS_PATH = 'peerjs'; + export class WebSocketServer extends EventEmitter implements IWebSocketServer { public readonly path: string; private readonly realm: IRealm; - private readonly config: IConfig; - private readonly webSocketServer: WebSocketLib.Server; + private readonly config: CustomConfig; + public readonly socketServer: WebSocketLib.Server; - constructor({ server, realm, config }: { server: any, realm: IRealm, config: IConfig; }) { + constructor({ server, realm, config }: { server: any, realm: IRealm, config: CustomConfig; }) { super(); + this.setMaxListeners(0); + this.realm = realm; this.config = config; const path = this.config.path; - this.path = path + (path[path.length - 1] !== "/" ? "/" : "") + "peerjs"; + this.path = `${path}${path.endsWith('/') ? "" : "/"}${WS_PATH}`; - this.webSocketServer = new WebSocketLib.Server({ path, server }); + this.socketServer = new WebSocketLib.Server({ path, server }); - this.webSocketServer.on("connection", (socket: MyWebSocket, req) => this._onSocketConnection(socket, req)); - this.webSocketServer.on("error", (error: Error) => this._onSocketError(error)); + this.socketServer.on("connection", (socket: MyWebSocket, req) => this._onSocketConnection(socket, req)); + this.socketServer.on("error", (error: Error) => this._onSocketError(error)); } private _onSocketConnection(socket: MyWebSocket, req: IncomingMessage): void { diff --git a/test/services/checkBrokenConnections/index.ts b/test/services/checkBrokenConnections/index.ts index a4b8ef4..437f054 100644 --- a/test/services/checkBrokenConnections/index.ts +++ b/test/services/checkBrokenConnections/index.ts @@ -4,7 +4,7 @@ import { Realm } from '../../../src/models/realm'; import { CheckBrokenConnections } from '../../../src/services/checkBrokenConnections'; import { wait } from '../../utils'; -describe('checkBrokenConnections service', () => { +describe('CheckBrokenConnections', () => { it('should remove client after 2 checks', async () => { const realm = new Realm(); const doubleCheckTime = 55;//~ equals to checkBrokenConnections.checkInterval * 2 diff --git a/test/services/messagesExpire/index.ts b/test/services/messagesExpire/index.ts new file mode 100644 index 0000000..eac362b --- /dev/null +++ b/test/services/messagesExpire/index.ts @@ -0,0 +1,78 @@ +import { expect } from 'chai'; +import { Client } from '../../../src/models/client'; +import { Realm } from '../../../src/models/realm'; +import { IMessage } from '../../../src/models/message'; +import { MessagesExpire } from '../../../src/services/messagesExpire'; +import { MessageHandler } from '../../../src/messageHandler'; +import { MessageType } from '../../../src/enums'; +import { wait } from '../../utils'; + +describe('MessagesExpire', () => { + const createTestMessage = (): IMessage => { + return { + type: MessageType.OPEN, + src: 'src', + dst: 'dst' + }; + }; + + it('should remove client if no read from queue', async () => { + const realm = new Realm(); + const messageHandler = new MessageHandler(realm); + const checkInterval = 10; + const expireTimeout = 50; + const config = { cleanup_out_msgs: checkInterval, expire_timeout: expireTimeout }; + + const messagesExpire = new MessagesExpire({ realm, config, messageHandler }); + + const client = new Client({ id: 'id', token: '' }); + realm.setClient(client, 'id'); + realm.addMessageToQueue(client.getId(), createTestMessage()); + + messagesExpire.startMessagesExpiration(); + + await wait(checkInterval * 2); + + expect(realm.getMessageQueueById(client.getId())?.getMessages().length).to.be.eq(1); + + await wait(expireTimeout); + + expect(realm.getMessageQueueById(client.getId())).to.be.undefined; + + messagesExpire.stopMessagesExpiration(); + }); + + it('should fire EXPIRE message', async () => { + const realm = new Realm(); + const messageHandler = new MessageHandler(realm); + const checkInterval = 10; + const expireTimeout = 50; + const config = { cleanup_out_msgs: checkInterval, expire_timeout: expireTimeout }; + + const messagesExpire = new MessagesExpire({ realm, config, messageHandler }); + + const client = new Client({ id: 'id', token: '' }); + realm.setClient(client, 'id'); + realm.addMessageToQueue(client.getId(), createTestMessage()); + + let handled = false; + + messageHandler.handle = (client, message): boolean => { + expect(client).to.be.undefined; + expect(message.type).to.be.eq(MessageType.EXPIRE); + + handled = true; + + return true; + }; + + messagesExpire.startMessagesExpiration(); + + await wait(checkInterval * 2); + await wait(expireTimeout); + + expect(handled).to.be.true; + + messagesExpire.stopMessagesExpiration(); + }); +}); diff --git a/test/services/webSocketServer/index.ts b/test/services/webSocketServer/index.ts new file mode 100644 index 0000000..705b2a6 --- /dev/null +++ b/test/services/webSocketServer/index.ts @@ -0,0 +1,195 @@ +import { expect } from 'chai'; +import { Server, WebSocket } from 'mock-socket'; +import { Realm } from '../../../src/models/realm'; +import { WebSocketServer } from '../../../src/services/webSocketServer'; +import { Errors, MessageType } from '../../../src/enums'; +import { wait } from '../../utils'; + +type Destroyable = T & { destroy?: () => Promise; }; + +const checkOpen = async (c: WebSocket): Promise => { + return new Promise(resolve => { + c.onmessage = (event: object & { data?: string; }): void => { + c.onmessage = null; + const message = JSON.parse(event.data as string); + resolve(message.type === MessageType.OPEN); + }; + }); +}; + +const checkSequence = async (c: WebSocket, msgs: { type: MessageType; error?: Errors; }[]): Promise => { + return new Promise(resolve => { + const restMessages = [...msgs]; + + const finish = (success = false): void => { + c.onmessage = null; + resolve(success); + }; + + c.onmessage = (event: object & { data?: string; }): void => { + const [mes] = restMessages; + + if (!mes) { + return finish(); + } + + restMessages.shift(); + + const message = JSON.parse(event.data as string); + if (message.type !== mes.type) { + return finish(); + } + + const isOk = !mes.error || message.payload?.msg === mes.error; + + if (!isOk) { + return finish(); + } + + if (restMessages.length === 0) { + finish(true); + } + }; + }); +}; + +const createTestServer = ({ realm, config, url }: { realm: Realm; config: { path: string; key: string; concurrent_limit: number; }; url: string; }): Destroyable => { + const server = new Server(url); + const webSocketServer: Destroyable = new WebSocketServer({ server, realm, config }); + + server.on('connection', (socket: WebSocket & { on?: (eventName: string, callback: () => void) => void; }) => { + const s = webSocketServer.socketServer; + s.emit('connection', socket, { url: socket.url }); + + socket.onclose = (): void => { + const userId = socket.url.split('?')[1]?.split('&').find(p => p.startsWith('id'))?.split('=')[1]; + + if (!userId) return; + + const client = realm.getClientById(userId); + + const clientSocket = client?.getSocket(); + + if (!clientSocket) return; + + (clientSocket as unknown as WebSocket).listeners['server::close']?.forEach((s: () => void) => s()); + }; + + socket.onmessage = (event: object & { data?: string; }): void => { + const userId = socket.url.split('?')[1]?.split('&').find(p => p.startsWith('id'))?.split('=')[1]; + + if (!userId) return; + + const client = realm.getClientById(userId); + + const clientSocket = client?.getSocket(); + + if (!clientSocket) return; + + (clientSocket as unknown as WebSocket).listeners['server::message']?.forEach((s: (data: object) => void) => s(event)); + }; + }); + + webSocketServer.destroy = async (): Promise => { + server.close(); + }; + + return webSocketServer; +}; + +describe('WebSocketServer', () => { + + it('should return valid path', () => { + const realm = new Realm(); + const config = { path: '/', key: 'testKey', concurrent_limit: 1 }; + const config2 = { ...config, path: 'path' }; + const server = new Server('path1'); + const server2 = new Server('path2'); + + const webSocketServer = new WebSocketServer({ server, realm, config }); + + expect(webSocketServer.path).to.be.eq('/peerjs'); + + const webSocketServer2 = new WebSocketServer({ server: server2, realm, config: config2 }); + + expect(webSocketServer2.path).to.be.eq('path/peerjs'); + + server.stop(); + server2.stop(); + }); + + it(`should check client's params`, async () => { + const realm = new Realm(); + const config = { path: '/', key: 'testKey', concurrent_limit: 1 }; + const fakeURL = 'ws://localhost:8080/peerjs'; + + const getError = async (url: string, validError: Errors = Errors.INVALID_WS_PARAMETERS): Promise => { + const webSocketServer = createTestServer({ url, realm, config }); + + const ws = new WebSocket(url); + + const errorSent = await checkSequence(ws, [{ type: MessageType.ERROR, error: validError }]); + + ws.close(); + + await webSocketServer.destroy?.(); + + return errorSent; + }; + + expect(await getError(fakeURL)).to.be.true; + expect(await getError(`${fakeURL}?key=${config.key}`)).to.be.true; + expect(await getError(`${fakeURL}?key=${config.key}&id=1`)).to.be.true; + expect(await getError(`${fakeURL}?key=notValidKey&id=userId&token=userToken`, Errors.INVALID_KEY)).to.be.true; + }); + + it(`should check concurrent limit`, async () => { + const realm = new Realm(); + const config = { path: '/', key: 'testKey', concurrent_limit: 1 }; + const fakeURL = 'ws://localhost:8080/peerjs'; + + const createClient = (id: string): Destroyable => { + const url = `${fakeURL}?key=${config.key}&id=${id}&token=${id}`; + const webSocketServer = createTestServer({ url, realm, config }); + const ws: Destroyable = new WebSocket(url); + + ws.destroy = async (): Promise => { + ws.close(); + + wait(10); + + webSocketServer.destroy?.(); + + wait(10); + + ws.destroy = undefined; + }; + + return ws; + }; + + + const c1 = createClient('1'); + + expect(await checkOpen(c1)).to.be.true; + + const c2 = createClient('2'); + + expect(await checkSequence(c2, [ + { type: MessageType.ERROR, error: Errors.CONNECTION_LIMIT_EXCEED } + ])).to.be.true; + + await c1.destroy?.(); + await c2.destroy?.(); + + await wait(10); + + expect(realm.getClientsIds().length).to.be.eq(0); + + const c3 = createClient('3'); + + expect(await checkOpen(c3)).to.be.true; + + await c3.destroy?.(); + }); +});