From 799299d6697bd67921acbc6271b6c18c58ea47ec Mon Sep 17 00:00:00 2001 From: afrokick Date: Wed, 21 Aug 2019 17:39:00 +0300 Subject: [PATCH] add checkConnections service --- bin/peerjs | 5 ++ config/index.js | 1 + package.json | 2 +- src/index.js | 8 +++ .../handlers/heartbeat/index.js | 4 ++ src/messageHandler/index.js | 11 ++-- src/models/client.js | 23 ++++++-- src/services/checkBrokenConnections/index.js | 57 +++++++++++++++++++ .../handlers/heartbeat/index.js | 16 ++++++ test/services/checkBrokenConnections/index.js | 43 ++++++++++++++ 10 files changed, 158 insertions(+), 12 deletions(-) create mode 100644 src/messageHandler/handlers/heartbeat/index.js create mode 100644 src/services/checkBrokenConnections/index.js create mode 100644 test/messageHandler/handlers/heartbeat/index.js create mode 100644 test/services/checkBrokenConnections/index.js diff --git a/bin/peerjs b/bin/peerjs index 5d43eac..01c4a8f 100755 --- a/bin/peerjs +++ b/bin/peerjs @@ -20,6 +20,11 @@ const opts = require('optimist') description: 'concurrent limit', default: 5000 }, + alive_timeout: { + demand: false, + description: 'broken connection check timeout (milliseconds)', + default: 60000 + }, key: { demand: false, alias: 'k', diff --git a/config/index.js b/config/index.js index efc3705..3b0ecb4 100644 --- a/config/index.js +++ b/config/index.js @@ -1,6 +1,7 @@ module.exports = { port: 9000, expire_timeout: 5000, + alive_timeout: 60000, key: 'peerjs', path: '/myapp', concurrent_limit: 5000, diff --git a/package.json b/package.json index f63d000..dc8076c 100644 --- a/package.json +++ b/package.json @@ -13,7 +13,7 @@ "author": "Michelle Bu, Eric Zhang", "license": "MIT", "scripts": { - "test": "eslint . && mocha test/**/*.js", + "test": "eslint . && mocha \"test/**/*.js\"", "start": "bin/peerjs --port ${PORT:=9000}" }, "dependencies": { diff --git a/src/index.js b/src/index.js index 1e61514..ea3d9e0 100644 --- a/src/index.js +++ b/src/index.js @@ -11,7 +11,13 @@ const init = ({ app, server, options }) => { const realm = new Realm(); const messageHandler = require('./messageHandler')({ realm }); const api = require('./api')({ config, realm, messageHandler }); + const { startMessagesExpiration } = require('./services/messagesExpire')({ realm, config, messageHandler }); + const checkBrokenConnections = require('./services/checkBrokenConnections')({ + realm, config, onClose: (client) => { + app.emit('disconnect', client); + } + }); app.use(options.path, api); @@ -52,6 +58,8 @@ const init = ({ app, server, options }) => { }); startMessagesExpiration(); + + checkBrokenConnections.start(); }; function ExpressPeerServer(server, options) { diff --git a/src/messageHandler/handlers/heartbeat/index.js b/src/messageHandler/handlers/heartbeat/index.js new file mode 100644 index 0000000..a74b99d --- /dev/null +++ b/src/messageHandler/handlers/heartbeat/index.js @@ -0,0 +1,4 @@ +module.exports = (client) => { + const nowTime = new Date().getTime(); + client.setLastPing(nowTime); +}; diff --git a/src/messageHandler/index.js b/src/messageHandler/index.js index 0c3d331..447e949 100644 --- a/src/messageHandler/index.js +++ b/src/messageHandler/index.js @@ -1,15 +1,15 @@ const { MessageType } = require('../enums'); class MessageHandlers { - constructor () { + constructor() { this.handlers = {}; } - registerHandler (messageType, handler) { + registerHandler(messageType, handler) { this.handlers[messageType] = handler; } - handle (client, message) { + handle(client, message) { const { type } = message; const handler = this.handlers[type]; @@ -23,6 +23,7 @@ class MessageHandlers { } module.exports = ({ realm }) => { const transmissionHandler = require('./handlers/transmission')({ realm }); + const heartbeatHandler = require('./handlers/heartbeat'); const messageHandlers = new MessageHandlers(); @@ -35,9 +36,7 @@ module.exports = ({ realm }) => { }); }; - const handleHeartbeat = () => { - - }; + const handleHeartbeat = (client) => heartbeatHandler(client); messageHandlers.registerHandler(MessageType.HEARTBEAT, handleHeartbeat); messageHandlers.registerHandler(MessageType.OFFER, handleTransmission); diff --git a/src/models/client.js b/src/models/client.js index aaf0e5b..50cc5ef 100644 --- a/src/models/client.js +++ b/src/models/client.js @@ -1,23 +1,36 @@ class Client { - constructor ({ id, token }) { + constructor({ id, token }) { this.id = id; this.token = token; this.socket = null; + this.lastPing = new Date().getTime(); } - getId () { + getId() { return this.id; } - getToken () { + getToken() { return this.token; } - setSocket (socket) { + getSocket() { + return this.socket; + } + + setSocket(socket) { this.socket = socket; } - send (data) { + getLastPing() { + return this.lastPing; + } + + setLastPing(lastPing) { + this.lastPing = lastPing; + } + + send(data) { this.socket.send(JSON.stringify(data)); } } diff --git a/src/services/checkBrokenConnections/index.js b/src/services/checkBrokenConnections/index.js new file mode 100644 index 0000000..fe66fef --- /dev/null +++ b/src/services/checkBrokenConnections/index.js @@ -0,0 +1,57 @@ +const DEFAULT_CHECK_INTERVAL = 300; + +module.exports = ({ realm, config, checkInterval = DEFAULT_CHECK_INTERVAL, onClose = () => { } }) => { + const checkConnections = () => { + const clientsIds = realm.getClientsIds(); + + const now = new Date().getTime(); + const aliveTimeout = config.alive_timeout; + + for (const clientId of clientsIds) { + const client = realm.getClientById(clientId); + const timeSinceLastPing = now - client.getLastPing(); + + if (timeSinceLastPing < aliveTimeout) continue; + + try { + client.getSocket().close(); + // eslint-disable-next-line no-empty + } catch (e) { } finally { + realm.clearMessageQueue(clientId); + realm.removeClientById(clientId); + client.setSocket(null); + + if (onClose) onClose(client); + } + } + }; + + let timeoutId; + + const start = () => { + if (timeoutId) { + clearTimeout(timeoutId); + } + + timeoutId = setTimeout(() => { + checkConnections(); + + timeoutId = null; + + start(); + }, checkInterval); + }; + + const stop = () => { + if (timeoutId) { + clearTimeout(timeoutId); + timeoutId = null; + } + }; + + return { + start, + stop, + CHECK_INTERVAL: checkInterval + }; +}; diff --git a/test/messageHandler/handlers/heartbeat/index.js b/test/messageHandler/handlers/heartbeat/index.js new file mode 100644 index 0000000..de76b79 --- /dev/null +++ b/test/messageHandler/handlers/heartbeat/index.js @@ -0,0 +1,16 @@ +const { expect } = require('chai'); +const Client = require('../../../../src/models/client'); +const heartbeatHandler = require('../../../../src/messageHandler/handlers/heartbeat'); + +describe('Heartbeat handler', () => { + it('should update last ping time', () => { + const client = new Client({ id: 'id', token: '' }); + client.setLastPing(0); + + const nowTime = new Date().getTime(); + + heartbeatHandler(client); + + expect(client.getLastPing()).to.be.closeTo(nowTime, 2); + }); +}); diff --git a/test/services/checkBrokenConnections/index.js b/test/services/checkBrokenConnections/index.js new file mode 100644 index 0000000..1d77ad5 --- /dev/null +++ b/test/services/checkBrokenConnections/index.js @@ -0,0 +1,43 @@ +const { expect } = require('chai'); +const Client = require('../../../src/models/client'); +const Realm = require('../../../src/models/realm'); +const checkBrokenConnectionsBuilder = require('../../../src/services/checkBrokenConnections'); + +describe('checkBrokenConnections service', () => { + it('should remove client after 2 checks', (done) => { + const realm = new Realm(); + const doubleCheckTime = 55;//~ equals to checkBrokenConnections.CHECK_INTERVAL * 2 + const checkBrokenConnections = checkBrokenConnectionsBuilder({ realm, config: { alive_timeout: doubleCheckTime }, checkInterval: 30 }); + const client = new Client({ id: 'id', token: '' }); + realm.setClient(client, 'id'); + + checkBrokenConnections.start(); + + setTimeout(() => { + expect(realm.getClientById('id')).to.be.undefined; + checkBrokenConnections.stop(); + done(); + }, checkBrokenConnections.CHECK_INTERVAL * 2 + 3); + }); + + it('should remove client after 1 ping', (done) => { + const realm = new Realm(); + const doubleCheckTime = 55;//~ equals to checkBrokenConnections.CHECK_INTERVAL * 2 + const checkBrokenConnections = checkBrokenConnectionsBuilder({ realm, config: { alive_timeout: doubleCheckTime }, checkInterval: 30 }); + const client = new Client({ id: 'id', token: '' }); + realm.setClient(client, 'id'); + + checkBrokenConnections.start(); + + //set ping after first check + setTimeout(() => { + client.setLastPing(new Date().getTime()); + + setTimeout(() => { + expect(realm.getClientById('id')).to.be.undefined; + checkBrokenConnections.stop(); + done(); + }, checkBrokenConnections.CHECK_INTERVAL * 2 + 10); + }, checkBrokenConnections.CHECK_INTERVAL); + }); +});