convert to lib

This commit is contained in:
afrokick 2019-04-10 23:44:15 +03:00
parent 438d512557
commit b31d2b881d
16 changed files with 3232 additions and 368 deletions

96
bin/peerjs Executable file
View File

@ -0,0 +1,96 @@
#!/usr/bin/env node
const path = require('path');
const pkg = require('../package.json');
const fs = require('fs');
const version = pkg.version;
const PeerServer = require('../src').PeerServer;
const opts = require('optimist')
.usage('Usage: $0')
.options({
debug: {
demand: false,
alias: 'd',
description: 'debug',
default: false
},
timeout: {
demand: false,
alias: 't',
description: 'timeout (milliseconds)',
default: 5000
},
ip_limit: {
demand: false,
alias: 'i',
description: 'IP limit',
default: 5000
},
concurrent_limit: {
demand: false,
alias: 'c',
description: 'concurrent limit',
default: 5000
},
key: {
demand: false,
alias: 'k',
description: 'connection key',
default: 'peerjs'
},
sslkey: {
demand: false,
description: 'path to SSL key'
},
sslcert: {
demand: false,
description: 'path to SSL certificate'
},
port: {
demand: true,
alias: 'p',
description: 'port'
},
path: {
demand: false,
description: 'custom path',
default: '/'
},
allow_discovery: {
demand: false,
description: 'allow discovery of peers'
}
})
.boolean('allow_discovery')
.argv;
process.on('uncaughtException', function (e) {
console.error('Error: ' + e);
});
if (opts.sslkey || opts.sslcert) {
if (opts.sslkey && opts.sslcert) {
opts.ssl = {
key: fs.readFileSync(path.resolve(opts.sslkey)),
cert: fs.readFileSync(path.resolve(opts.sslcert))
};
delete opts.sslkey;
delete opts.sslcert;
} else {
console.error('Warning: PeerServer will not run because either ' +
'the key or the certificate has not been provided.');
process.exit(1);
}
}
const userPath = opts.path;
const server = PeerServer(opts, (server) => {
var host = server.address().address;
var port = server.address().port;
console.log(
'Started PeerServer on %s, port: %s, path: %s (v. %s)',
host, port, userPath || '/', version
);
});

View File

@ -1,5 +1,15 @@
const config = require('./schema'); module.exports = {
host: '0.0.0.0',
config.validate({ allowed: 'strict' }); port: 9000,
expire_timeout: 5000,
module.exports = config; key: 'peerjs',
path: '/myapp',
concurrent_limit: 5000,
allow_discovery: false,
proxied: false,
cleanup_out_msgs: 1000,
ssl: {
key: '',
cert: ''
}
};

View File

@ -1,101 +0,0 @@
const convict = require('convict');
module.exports = convict({
logger: {
level: {
doc: 'The log level. See log4js',
format: [
'ALL',
'MARK',
'TRACE',
'DEBUG',
'INFO',
'WARN',
'ERROR',
'FATAL',
'OFF'
],
default: 'ERROR',
env: 'LOG_LEVEL',
arg: 'logLevel'
}
},
env: {
doc: 'The application environment.',
format: ['prod', 'dev', 'test'],
default: 'dev',
env: 'NODE_ENV'
},
host: {
doc: 'The host to bind.',
format: '*',
default: '0.0.0.0',
env: 'HOST',
arg: 'host'
},
port: {
doc: 'The port to bind.',
format: 'port',
default: 9000,
env: 'PORT',
arg: 'port'
},
expire_timeout: {
doc: 'The timeout before EXPIRE message send',
format: 'duration',
default: 5000,
arg: 'expireTimeout'
},
key: {
doc: 'The key to check incoming clients',
format: String,
default: 'peerjs',
env: 'APP_KEY',
arg: 'key'
},
path: {
doc: '',
format: String,
default: '/myapp',
env: 'APP_PATH',
arg: 'path'
},
concurrent_limit: {
doc: 'Max connections',
format: 'duration',
default: 5000,
arg: 'concurrentLimit'
},
allow_discovery: {
doc: 'Allow discovery of peers',
format: Boolean,
default: false,
arg: 'allowDiscovery'
},
proxied: {
doc: 'Set true if server running behind proxy',
format: Boolean,
default: false,
env: 'APP_PROXIED',
arg: 'proxied'
},
cleanup_out_msgs: {
doc: 'The period in ms to check expired messages',
format: 'duration',
default: 1000
},
ssl: {
key_path: {
doc: 'The path to the private key file',
format: String,
default: '',
arg: 'sslKeyPath'
},
cert_path: {
doc: 'The path to the cert file',
format: String,
default: '',
arg: 'sslCertPath'
}
}
});

2838
package-lock.json generated Normal file

File diff suppressed because it is too large Load Diff

View File

@ -3,6 +3,9 @@
"version": "0.2.9", "version": "0.2.9",
"description": "PeerJS server component", "description": "PeerJS server component",
"main": "src/index.js", "main": "src/index.js",
"bin": {
"peerjs": "./bin/peerjs"
},
"repository": { "repository": {
"type": "git", "type": "git",
"url": "git://github.com/peers/peerjs-server.git" "url": "git://github.com/peers/peerjs-server.git"
@ -11,19 +14,18 @@
"license": "MIT", "license": "MIT",
"scripts": { "scripts": {
"test": "mocha test/**/*.js", "test": "mocha test/**/*.js",
"start": "node ./src/index.js" "start": "bin/peerjs --port ${PORT:=9000}"
}, },
"dependencies": { "dependencies": {
"body-parser": "^1.18.3", "body-parser": "^1.18.3",
"convict": "^4.4.1",
"cors": "~2.8.4", "cors": "~2.8.4",
"express": "^4.16.3", "express": "^4.16.3",
"log4js": "^4.1.0", "ws": "6.0.0",
"ws": "6.0.0" "optimist": "~0.6.1"
}, },
"devDependencies": { "devDependencies": {
"mocha": "^6.0.2",
"chai": "^4.2.0", "chai": "^4.2.0",
"mocha": "^6.1.2",
"semistandard": "^13.0.1", "semistandard": "^13.0.1",
"sinon": "^7.3.1" "sinon": "^7.3.1"
}, },

View File

@ -1,10 +1,12 @@
const express = require('express'); const express = require('express');
const cors = require('cors'); const cors = require('cors');
const bodyParser = require('body-parser'); const bodyParser = require('body-parser');
const authMiddleware = require('./middleware/auth');
const publicContent = require('../../app.json'); const publicContent = require('../../app.json');
const app = module.exports = express.Router(); module.exports = ({ config, realm, messageHandler }) => {
const authMiddleware = require('./middleware/auth')({ config, realm });
const app = express.Router();
const jsonParser = bodyParser.json(); const jsonParser = bodyParser.json();
@ -14,5 +16,8 @@ app.get('/', (req, res, next) => {
res.send(publicContent); res.send(publicContent);
}); });
app.use('/:key', require('./v1/public')); app.use('/:key', require('./v1/public')({ config, realm }));
app.use('/:key/:id/:token', authMiddleware, jsonParser, require('./v1/calls')); app.use('/:key/:id/:token', authMiddleware, jsonParser, require('./v1/calls')({ realm, messageHandler }));
return app;
};

View File

@ -1,11 +1,9 @@
const config = require('../../../../config');
const realm = require('../../../services/realm');
const { Errors } = require('../../../enums'); const { Errors } = require('../../../enums');
module.exports = (req, res, next) => { module.exports = ({ config, realm }) => (req, res, next) => {
const { id, token, key } = req.params; const { id, token, key } = req.params;
if (key !== config.get('key')) { if (key !== config.key) {
return res.status(401).send(Errors.INVALID_KEY); return res.status(401).send(Errors.INVALID_KEY);
} }

View File

@ -1,8 +1,7 @@
const express = require('express'); const express = require('express');
const realm = require('../../../services/realm');
const messageHandler = require('../../../messageHandler');
const app = module.exports = express.Router(); module.exports = ({ realm, messageHandler }) => {
const app = express.Router();
const handle = (req, res, next) => { const handle = (req, res, next) => {
const { id } = req.params; const { id } = req.params;
@ -32,3 +31,6 @@ app.post('/candidate', handle);
app.post('/answer', handle); app.post('/answer', handle);
app.post('/leave', handle); app.post('/leave', handle);
return app;
};

View File

@ -1,8 +1,7 @@
const express = require('express'); const express = require('express');
const realm = require('../../../services/realm');
const config = require('../../../../config');
const app = module.exports = express.Router(); module.exports = ({ config, realm }) => {
const app = express.Router();
// Retrieve guaranteed random ID. // Retrieve guaranteed random ID.
app.get('/id', (req, res, next) => { app.get('/id', (req, res, next) => {
@ -12,7 +11,7 @@ app.get('/id', (req, res, next) => {
// Get a list of all peers for a key, enabled by the `allowDiscovery` flag. // Get a list of all peers for a key, enabled by the `allowDiscovery` flag.
app.get('/peers', (req, res, next) => { app.get('/peers', (req, res, next) => {
if (config.get('allow_discovery')) { if (config.allow_discovery) {
const clientsIds = realm.getClientsIds(); const clientsIds = realm.getClientsIds();
return res.send(clientsIds); return res.send(clientsIds);
@ -20,3 +19,6 @@ app.get('/peers', (req, res, next) => {
res.sendStatus(401); res.sendStatus(401);
}); });
return app;
};

View File

@ -1,57 +1,29 @@
const express = require('express'); const express = require('express');
const http = require('http'); const http = require('http');
const https = require('https'); const https = require('https');
const fs = require('fs');
const config = require('../config'); const config = require('../config');
const WebSocketServer = require('./services/webSocketServer'); const WebSocketServer = require('./services/webSocketServer');
const logger = require('./services/logger'); const Realm = require('./models/realm');
const realm = require('./services/realm');
const { startMessagesExpiration } = require('./services/messagesExpire');
const api = require('./api');
const messageHandler = require('./messageHandler');
process.on('uncaughtException', (e) => { const init = ({ app, server, options }) => {
logger.error('Error: ' + e); const config = options;
const realm = new Realm();
const messageHandler = require('./messageHandler')({ realm });
const api = require('./api')({ config, realm, messageHandler });
const { startMessagesExpiration } = require('./services/messagesExpire')({ realm, config });
app.use(options.path, api);
const wss = new WebSocketServer({
server,
realm,
config: {
...config,
path: app.mountpath
}
}); });
// parse config
let path = config.get('path');
if (path[0] !== '/') {
path = '/' + path;
}
if (path[path.length - 1] !== '/') {
path += '/';
}
const app = express();
if (config.get('proxied')) {
app.set('trust proxy', config.get('proxied'));
}
let server;
if (config.get('ssl.key_path') && config.get('ssl.cert_path')) {
const keyPath = config.get('ssl.key_path');
const certPath = config.get('ssl.cert_path');
const opts = {
key: fs.readFileSync(path.resolve(keyPath)),
cert: fs.readFileSync(path.resolve(certPath))
};
server = https.createServer(opts, app);
} else {
server = http.createServer(app);
}
app.use(path, api);
const wss = new WebSocketServer(server, app.mountpath);
wss.on('connection', client => { wss.on('connection', client => {
const messageQueue = realm.getMessageQueueById(client.getId()); const messageQueue = realm.getMessageQueueById(client.getId());
@ -63,32 +35,94 @@ wss.on('connection', client => {
realm.clearMessageQueue(client.getId()); realm.clearMessageQueue(client.getId());
} }
logger.info(`client ${client.getId()} was connected`); app.emit('connection', client);
}); });
wss.on('message', (client, message) => { wss.on('message', (client, message) => {
app.emit('message', client, message);
messageHandler(client, message); messageHandler(client, message);
}); });
wss.on('close', client => { wss.on('close', client => {
logger.info(`client ${client.getId()} was disconnected`); app.emit('disconnected', client);
}); });
wss.on('error', error => { wss.on('error', error => {
logger.error(error); app.emit('error', error);
}); });
const port = config.get('port');
const host = config.get('host');
server.listen(port, host, () => {
const host = server.address().address;
const port = server.address().port;
logger.info(
'Started PeerServer on %s, port: %s',
host, port
);
startMessagesExpiration(); startMessagesExpiration();
};
function ExpressPeerServer (server, options) {
const app = express();
options = {
...config,
...options
};
if (options.proxied) {
app.set('trust proxy', options.proxied);
}
app.on('mount', () => {
if (!server) {
throw new Error('Server is not passed to constructor - ' +
'can\'t start PeerServer');
}
init({ app, server, options });
}); });
return app;
}
function PeerServer (options = {}, callback) {
const app = express();
options = {
...config,
...options
};
let path = options.path;
const port = options.port;
delete options.path;
if (path[0] !== '/') {
path = '/' + path;
}
if (path[path.length - 1] !== '/') {
path += '/';
}
let server;
if (options.ssl && options.ssl.key && options.ssl.cert) {
server = https.createServer(options.ssl, app);
delete options.ssl;
} else {
server = http.createServer(app);
}
const peerjs = ExpressPeerServer(server, options);
app.use(path, peerjs);
if (callback) {
server.listen(port, () => {
callback(server);
});
} else {
server.listen(port);
}
return peerjs;
}
exports = module.exports = {
ExpressPeerServer: ExpressPeerServer,
PeerServer: PeerServer
};

View File

@ -1,8 +1,6 @@
const realm = require('../../../services/realm');
const logger = require('../../../services/logger');
const { MessageType } = require('../../../enums'); const { MessageType } = require('../../../enums');
const handler = (client, message) => { module.exports = ({ realm }) => (client, message) => {
const type = message.type; const type = message.type;
const srcId = message.src; const srcId = message.src;
const dstId = message.dst; const dstId = message.dst;
@ -12,8 +10,6 @@ const handler = (client, message) => {
// User is connected! // User is connected!
if (destinationClient) { if (destinationClient) {
try { try {
logger.debug(type, 'from', srcId, 'to', dstId);
if (destinationClient.socket) { if (destinationClient.socket) {
const data = JSON.stringify(message); const data = JSON.stringify(message);
@ -23,7 +19,6 @@ const handler = (client, message) => {
throw new Error('Peer dead'); throw new Error('Peer dead');
} }
} catch (e) { } catch (e) {
logger.error(e);
// This happens when a peer disconnects without closing connections and // This happens when a peer disconnects without closing connections and
// the associated WebSocket has not closed. // the associated WebSocket has not closed.
// Tell other side to stop trying. // Tell other side to stop trying.
@ -33,7 +28,7 @@ const handler = (client, message) => {
realm.removeClientById(destinationClient.getId()); realm.removeClientById(destinationClient.getId());
} }
handler(client, { module.exports({ realm })(client, {
type: MessageType.LEAVE, type: MessageType.LEAVE,
src: dstId, src: dstId,
dst: srcId dst: srcId
@ -43,10 +38,8 @@ const handler = (client, message) => {
// Wait for this client to connect/reconnect (XHR) for important // Wait for this client to connect/reconnect (XHR) for important
// messages. // messages.
if (type !== MessageType.LEAVE && type !== MessageType.EXPIRE && dstId) { if (type !== MessageType.LEAVE && type !== MessageType.EXPIRE && dstId) {
logger.debug(`[transmission] dst client ${dstId} not found, add msg ${type} to queue`);
realm.addMessageToQueue(dstId, message); realm.addMessageToQueue(dstId, message);
} else if (type === MessageType.LEAVE && !dstId) { } else if (type === MessageType.LEAVE && !dstId) {
logger.debug(`[transmission] remove client ${srcId}`);
realm.removeClientById(srcId); realm.removeClientById(srcId);
} else { } else {
// Unavailable destination specified with message LEAVE or EXPIRE // Unavailable destination specified with message LEAVE or EXPIRE
@ -54,5 +47,3 @@ const handler = (client, message) => {
} }
} }
}; };
module.exports = handler;

View File

@ -1,25 +1,30 @@
const logger = require('../services/logger');
const { MessageType } = require('../enums'); const { MessageType } = require('../enums');
const transmissionHandler = require('./handlers/transmission');
const handlers = {}; class MessageHandlers {
constructor ({ realm }) {
this.handlers = {};
}
const registerHandler = (messageType, handler) => { registerHandler (messageType, handler) {
logger.debug(`[MSGHANDLER] register handler for ${messageType}`); this.handlers[messageType] = handler;
handlers[messageType] = handler; }
};
module.exports = (client, message) => { handle (client, message) {
const { type } = message; const { type } = message;
const handler = handlers[type]; const handler = this.handlers[type];
if (!handler) { if (!handler) {
return logger.error(`[MSGHANDLER] Message unrecognized:${type}`); return;
} }
handler(client, message); handler(client, message);
}; }
}
module.exports = ({ realm }) => {
const transmissionHandler = require('./handlers/transmission')({ realm });
const messageHandlers = new MessageHandlers({ realm });
const handleTransmission = (client, message) => { const handleTransmission = (client, message) => {
transmissionHandler(client, { transmissionHandler(client, {
@ -34,9 +39,12 @@ const handleHeartbeat = (client, message) => {
}; };
registerHandler(MessageType.HEARTBEAT, handleHeartbeat); messageHandlers.registerHandler(MessageType.HEARTBEAT, handleHeartbeat);
registerHandler(MessageType.OFFER, handleTransmission); messageHandlers.registerHandler(MessageType.OFFER, handleTransmission);
registerHandler(MessageType.ANSWER, handleTransmission); messageHandlers.registerHandler(MessageType.ANSWER, handleTransmission);
registerHandler(MessageType.CANDIDATE, handleTransmission); messageHandlers.registerHandler(MessageType.CANDIDATE, handleTransmission);
registerHandler(MessageType.LEAVE, handleTransmission); messageHandlers.registerHandler(MessageType.LEAVE, handleTransmission);
registerHandler(MessageType.EXPIRE, handleTransmission); messageHandlers.registerHandler(MessageType.EXPIRE, handleTransmission);
return (client, message) => messageHandlers.handle(client, message);
};

View File

@ -1,8 +0,0 @@
const log4js = require('log4js');
const config = require('../../../config');
const logger = log4js.getLogger();
logger.level = config.get('logger.level');
module.exports = logger;

View File

@ -1,14 +1,12 @@
const config = require('../../../config');
const messageHandler = require('../../messageHandler'); const messageHandler = require('../../messageHandler');
const { MessageType } = require('../../enums'); const { MessageType } = require('../../enums');
const realm = require('../realm');
const logger = require('../logger');
module.exports = ({ realm, config }) => {
const pruneOutstanding = () => { const pruneOutstanding = () => {
const destinationClientsIds = realm._messageQueues.keys(); const destinationClientsIds = realm._messageQueues.keys();
const now = new Date().getTime(); const now = new Date().getTime();
const maxDiff = config.get('expire_timeout'); const maxDiff = config.expire_timeout;
const seen = {}; const seen = {};
@ -32,8 +30,6 @@ const pruneOutstanding = () => {
} }
realm.clearMessageQueue(destinationClientId); realm.clearMessageQueue(destinationClientId);
logger.trace(`[MSGSEXPIRE] mq ${destinationClientId} was cleared`);
} }
}; };
@ -51,7 +47,7 @@ const startMessagesExpiration = () => {
timeoutId = null; timeoutId = null;
startMessagesExpiration(); startMessagesExpiration();
}, config.get('cleanup_out_msgs')); }, config.cleanup_out_msgs);
}; };
const stopMessagesExpiration = () => { const stopMessagesExpiration = () => {
@ -61,7 +57,8 @@ const stopMessagesExpiration = () => {
} }
}; };
module.exports = { return {
startMessagesExpiration, startMessagesExpiration,
stopMessagesExpiration stopMessagesExpiration
}; };
};

View File

@ -1,3 +0,0 @@
const Realm = require('../../models/realm');
module.exports = new Realm();

View File

@ -1,22 +1,19 @@
const WSS = require('ws').Server; const WSS = require('ws').Server;
const url = require('url'); const url = require('url');
const EventEmitter = require('events'); const EventEmitter = require('events');
const logger = require('../logger');
const { MessageType, Errors } = require('../../enums'); const { MessageType, Errors } = require('../../enums');
const config = require('../../../config');
const realm = require('../realm');
const Client = require('../../models/client'); const Client = require('../../models/client');
class WebSocketServer extends EventEmitter { class WebSocketServer extends EventEmitter {
constructor (server) { constructor ({ server, realm, config }) {
super(); super();
this.setMaxListeners(0); this.setMaxListeners(0);
this.realm = realm;
this.config = config;
let path = config.get('path'); let path = this.config.path;
path = path + (path[path.length - 1] !== '/' ? '/' : '') + 'peerjs'; path = path + (path[path.length - 1] !== '/' ? '/' : '') + 'peerjs';
logger.info(`ws opened on path:${path}`);
this._wss = new WSS({ path, server }); this._wss = new WSS({ path, server });
this._wss.on('connection', (socket, req) => this._onSocketConnection(socket, req)); this._wss.on('connection', (socket, req) => this._onSocketConnection(socket, req));
@ -32,11 +29,11 @@ class WebSocketServer extends EventEmitter {
return this._sendErrorAndClose(socket, Errors.INVALID_WS_PARAMETERS); return this._sendErrorAndClose(socket, Errors.INVALID_WS_PARAMETERS);
} }
if (key !== config.get('key')) { if (key !== this.config.key) {
return this._sendErrorAndClose(socket, Errors.INVALID_KEY); return this._sendErrorAndClose(socket, Errors.INVALID_KEY);
} }
const client = realm.getClientById(id); const client = this.realm.getClientById(id);
if (client) { if (client) {
if (token !== client.getToken()) { if (token !== client.getToken()) {
@ -56,21 +53,20 @@ class WebSocketServer extends EventEmitter {
} }
_onSocketError (error) { _onSocketError (error) {
logger.debug(`[WSS] on error:${error}`);
// handle error // handle error
this.emit('error', error); this.emit('error', error);
} }
_registerClient ({ socket, id, token }) { _registerClient ({ socket, id, token }) {
// Check concurrent limit // Check concurrent limit
const clientsCount = realm.getClientsIds().length; const clientsCount = this.realm.getClientsIds().length;
if (clientsCount >= config.get('concurrent_limit')) { if (clientsCount >= this.config.concurrent_limit) {
return this._sendErrorAndClose(socket, Errors.CONNECTION_LIMIT_EXCEED); return this._sendErrorAndClose(socket, Errors.CONNECTION_LIMIT_EXCEED);
} }
const newClient = new Client({ id, token }); const newClient = new Client({ id, token });
realm.setClient(newClient, id); this.realm.setClient(newClient, id);
socket.send(JSON.stringify({ type: MessageType.OPEN })); socket.send(JSON.stringify({ type: MessageType.OPEN }));
this._configureWS(socket, newClient); this._configureWS(socket, newClient);
@ -81,10 +77,8 @@ class WebSocketServer extends EventEmitter {
// Cleanup after a socket closes. // Cleanup after a socket closes.
socket.on('close', () => { socket.on('close', () => {
logger.info('Socket closed:', client.getId());
if (client.socket === socket) { if (client.socket === socket) {
realm.removeClientById(client.getId()); this.realm.removeClientById(client.getId());
this.emit('close', client); this.emit('close', client);
} }
}); });
@ -98,7 +92,6 @@ class WebSocketServer extends EventEmitter {
this.emit('message', client, message); this.emit('message', client, message);
} catch (e) { } catch (e) {
logger.error('Invalid message', data);
throw e; throw e;
} }
}); });