From 9e0d8617152e2167884b0b13c456605a0a021b3c Mon Sep 17 00:00:00 2001 From: Vsevolod Strukchinsky Date: Thu, 3 Apr 2014 21:23:30 +0600 Subject: [PATCH] Migrate to express from restify PeerServer is now instance of express application - this allows to combine it with other express applications. Listen is deferred - because it now method from express (you can call it later and supply callback for start event). Constructor now not using `new` (because we mixing in methods to express app) Closes #36 --- README.md | 5 +- bin/peerjs | 125 ++++----- lib/index.js | 62 +++++ lib/server.js | 729 ++++++++++++++++++++++--------------------------- package.json | 9 +- test/server.js | 333 +++++++++++----------- 6 files changed, 609 insertions(+), 654 deletions(-) create mode 100644 lib/index.js diff --git a/README.md b/README.md index a0c03da..2e8c8b1 100644 --- a/README.md +++ b/README.md @@ -46,13 +46,14 @@ Using HTTPS: Simply pass in PEM-encoded certificate and key. var fs = require('fs'); var PeerServer = require('peer').PeerServer; -var server = new PeerServer({ - port: 9000, +var server = PeerServer({ ssl: { key: fs.readFileSync('/path/to/your/ssl/key/here.key'), certificate: fs.readFileSync('/path/to/your/ssl/certificate/here.crt') } }); + +server.listen(9000); ``` ### Events diff --git a/bin/peerjs b/bin/peerjs index d246622..4d31846 100755 --- a/bin/peerjs +++ b/bin/peerjs @@ -1,81 +1,82 @@ #!/usr/bin/env node var path = require('path') - , pkg = require('../package.json') - , fs = require('fs') - , version = pkg.version - , PeerServer = require('../lib/server').PeerServer - , opts = require('optimist') - .usage('Usage: $0') - .options({ - debug: { - demand: false, - alias: 'd', - description: 'debug', - default: false - }, - timeout: { - demand: false, - alias: 't', - description: 'timeout (milliseconds)', + , pkg = require('../package.json') + , fs = require('fs') + , version = pkg.version + , PeerServer = require('../lib')() + , opts = require('optimist') + .usage('Usage: $0') + .options({ + debug: { + demand: false, + alias: 'd', + description: 'debug', + default: false + }, + timeout: { + demand: false, + alias: 't', + description: 'timeout (milliseconds)', default: 5000 - }, - ip_limit: { - demand: false, - alias: 'i', - description: 'IP limit', + }, + ip_limit: { + demand: false, + alias: 'i', + description: 'IP limit', default: 5000 - }, - concurrent_limit: { - demand: false, - alias: 'c', - description: 'concurrent limit', + }, + concurrent_limit: { + demand: false, + alias: 'c', + description: 'concurrent limit', default: 5000 - }, - key: { - demand: false, - alias: 'k', - description: 'connection key', + }, + key: { + demand: false, + alias: 'k', + description: 'connection key', default: 'peerjs' - }, - sslkey: { - demand: false, - description: 'path to SSL key' - }, - sslcert: { - demand: false, - description: 'path to SSL certificate' - }, - port: { - demand: true, - alias: 'p', - description: 'port' - }, - path: { - demand: false, - description: 'custom path' - }, - allow_discovery: { - demand: false, - description: 'allow discovery of peers' - } - }) - .boolean('allow_discovery') - .argv; + }, + sslkey: { + demand: false, + description: 'path to SSL key' + }, + sslcert: { + demand: false, + description: 'path to SSL certificate' + }, + port: { + demand: true, + alias: 'p', + description: 'port' + }, + path: { + demand: false, + description: 'custom path' + }, + allow_discovery: { + demand: false, + description: 'allow discovery of peers' + } + }) + .boolean('allow_discovery') + .argv; opts.version = version; if (opts.sslkey && opts.sslcert) { - opts['ssl'] = {}; - opts.ssl['key'] = fs.readFileSync(path.resolve(opts.sslkey)); - opts.ssl['certificate'] = fs.readFileSync(path.resolve(opts.sslcert)); + opts['ssl'] = {}; + opts.ssl['key'] = fs.readFileSync(path.resolve(opts.sslkey)); + opts.ssl['certificate'] = fs.readFileSync(path.resolve(opts.sslcert)); } process.on('uncaughtException', function(e) { - console.error('Error: ' + e); + console.error('Error: ' + e); }); var server = new PeerServer(opts); +server.listen(opts.port); console.log( - 'Started PeerServer, port: ' + opts.port + ', path: ' + (opts.path || '/') + (" (v. %s)"), version + 'Started PeerServer, port: ' + opts.port + ', path: ' + (opts.path || '/') + (" (v. %s)"), version ); diff --git a/lib/index.js b/lib/index.js new file mode 100644 index 0000000..aafcbc0 --- /dev/null +++ b/lib/index.js @@ -0,0 +1,62 @@ +var express = require('express'); +var mixin = require('utils-merge'); +var proto = require('./server'); + +exports = module.exports = { + PeerServer: createPeerServer +}; + +function createPeerServer(options) { + + var app = express(); + + mixin(app, proto); + + app.options = { + debug: false, + timeout: 5000, + key: 'peerjs', + ip_limit: 5000, + concurrent_limit: 5000, + ssl: {}, + path: '/', + allow_discovery: false + }; + + mixin(app.options, options); + + // Print warning if only one of the two is given. + if (Object.keys(app.options.ssl).length === 1) { + util.prettyError('Warning: PeerServer will not run on an HTTPS server' + + ' because either the key or the certificate has not been provided.'); + } + + app.options.ssl.name = 'PeerServer'; + + if (app.options.path[0] !== '/') { + app.options.path = '/' + app.options.path; + } + if (app.options.path[app.options.path.length - 1] !== '/') { + app.options.path += '/'; + } + + // Connected clients + app._clients = {}; + + // Messages waiting for another peer. + app._outstanding = {}; + + // Initailize WebSocket server handlers. + app._initializeWSS(); + + // Initialize HTTP routes. This is only used for the first few milliseconds + // before a socket is opened for a Peer. + app._initializeHTTP(); + + // Mark concurrent users per ip + app._ips = {}; + + app._setCleanupIntervals(); + + return app; +} diff --git a/lib/server.js b/lib/server.js index 8b5ae47..93995a8 100644 --- a/lib/server.js +++ b/lib/server.js @@ -1,475 +1,388 @@ var util = require('./util'); -var restify = require('restify'); -var EventEmitter = require('events').EventEmitter; +var express = require('express'); var WebSocketServer = require('ws').Server; var url = require('url'); -function PeerServer(options) { - if (!(this instanceof PeerServer)) return new PeerServer(options); - EventEmitter.call(this); - - this._options = util.extend({ - port: 80, - debug: false, - timeout: 5000, - key: 'peerjs', - ip_limit: 5000, - concurrent_limit: 5000, - ssl: {}, - path: '/', - allow_discovery: false - }, options); - - util.debug = this._options.debug; - - // Print warning if only one of the two is given. - if (Object.keys(this._options.ssl).length === 1) { - util.prettyError('Warning: PeerServer will not run on an HTTPS server' - + ' because either the key or the certificate has not been provided.'); - } - - this._options.ssl['name'] = 'PeerServer'; - - if (this._options.path[0] !== '/') { - this._options.path = '/' + this._options.path; - } - if (this._options.path[this._options.path.length - 1] !== '/') { - this._options.path += '/'; - } - - this._app = restify.createServer(this._options.ssl); - - // Connected clients - this._clients = {}; - - // Messages waiting for another peer. - this._outstanding = {}; - - // Initailize WebSocket server handlers. - this._initializeWSS(); - - // Initialize HTTP routes. This is only used for the first few milliseconds - // before a socket is opened for a Peer. - this._initializeHTTP(); - - // Mark concurrent users per ip - this._ips = {}; - - this._setCleanupIntervals(); -} - -util.inherits(PeerServer, EventEmitter); - +var app = exports = module.exports = {}; /** Initialize WebSocket server. */ -PeerServer.prototype._initializeWSS = function() { - var self = this; +app._initializeWSS = function() { + var self = this; - // Create WebSocket server as well. - this._wss = new WebSocketServer({ path: this._options.path + 'peerjs', server: this._app}); + // Create WebSocket server as well. + this._wss = new WebSocketServer({ path: this.options.path + 'peerjs', server: this}); - this._wss.on('connection', function(socket) { - var query = url.parse(socket.upgradeReq.url, true).query; - var id = query.id; - var token = query.token; - var key = query.key; - var ip = socket.upgradeReq.socket.remoteAddress; + this._wss.on('connection', function(socket) { + var query = url.parse(socket.upgradeReq.url, true).query; + var id = query.id; + var token = query.token; + var key = query.key; + var ip = socket.upgradeReq.socket.remoteAddress; - if (!id || !token || !key) { - socket.send(JSON.stringify({ type: 'ERROR', payload: { msg: 'No id, token, or key supplied to websocket server' } })); - socket.close(); - return; - } - - if (!self._clients[key] || !self._clients[key][id]) { - self._checkKey(key, ip, function(err) { - if (!err) { - if (!self._clients[key][id]) { - self._clients[key][id] = { token: token, ip: ip }; - self._ips[ip]++; - socket.send(JSON.stringify({ type: 'OPEN' })); - } - self._configureWS(socket, key, id, token); - } else { - socket.send(JSON.stringify({ type: 'ERROR', payload: { msg: err } })); + if (!id || !token || !key) { + socket.send(JSON.stringify({ type: 'ERROR', payload: { msg: 'No id, token, or key supplied to websocket server' } })); + socket.close(); + return; + } + + if (!self._clients[key] || !self._clients[key][id]) { + self._checkKey(key, ip, function(err) { + if (!err) { + if (!self._clients[key][id]) { + self._clients[key][id] = { token: token, ip: ip }; + self._ips[ip]++; + socket.send(JSON.stringify({ type: 'OPEN' })); + } + self._configureWS(socket, key, id, token); + } else { + socket.send(JSON.stringify({ type: 'ERROR', payload: { msg: err } })); + } + }); + } else { + self._configureWS(socket, key, id, token); + } + }); +}; + +app._configureWS = function(socket, key, id, token) { + var self = this; + var client = this._clients[key][id]; + + if (token === client.token) { + // res 'close' event will delete client.res for us + client.socket = socket; + // Client already exists + if (client.res) { + client.res.end(); } - }); } else { - self._configureWS(socket, key, id, token); + // ID-taken, invalid token + socket.send(JSON.stringify({ type: 'ID-TAKEN', payload: { msg: 'ID is taken' } })); + socket.close(); + return; } - }); + + this._processOutstanding(key, id); + + // Cleanup after a socket closes. + socket.on('close', function() { + util.log('Socket closed:', id); + if (client.socket == socket) { + self._removePeer(key, id); + } + }); + + // Handle messages from peers. + socket.on('message', function(data) { + try { + var message = JSON.parse(data); + + if (['LEAVE', 'CANDIDATE', 'OFFER', 'ANSWER'].indexOf(message.type) !== -1) { + self._handleTransmission(key, { + type: message.type, + src: id, + dst: message.dst, + payload: message.payload + }); + } else { + util.prettyError('Message unrecognized'); + } + } catch(e) { + util.log('Invalid message', data); + throw e; + } + }); + + // We're going to emit here, because for XHR we don't *know* when someone + // disconnects. + this.emit('connection', id); }; -PeerServer.prototype._configureWS = function(socket, key, id, token) { - var self = this; - var client = this._clients[key][id]; - - if (token === client.token) { - // res 'close' event will delete client.res for us - client.socket = socket; - // Client already exists - if (client.res) { - client.res.end(); - } - } else { - // ID-taken, invalid token - socket.send(JSON.stringify({ type: 'ID-TAKEN', payload: { msg: 'ID is taken' } })); - socket.close(); - return; - } - - this._processOutstanding(key, id); - - // Cleanup after a socket closes. - socket.on('close', function() { - util.log('Socket closed:', id); - if (client.socket == socket) { - self._removePeer(key, id); - } - }); - - // Handle messages from peers. - socket.on('message', function(data) { - try { - var message = JSON.parse(data); - - if (['LEAVE', 'CANDIDATE', 'OFFER', 'ANSWER'].indexOf(message.type) !== -1) { - self._handleTransmission(key, { - type: message.type, - src: id, - dst: message.dst, - payload: message.payload - }); - } else { - util.prettyError('Message unrecognized'); - } - } catch(e) { - util.log('Invalid message', data); - throw e; - } - }); - - // We're going to emit here, because for XHR we don't *know* when someone - // disconnects. - this.emit('connection', id); +app._checkAllowsDiscovery = function(key, cb) { + cb(this.options.allow_discovery); }; -PeerServer.prototype._checkAllowsDiscovery = function(key, cb) { - cb(this._options.allow_discovery); -}; - -PeerServer.prototype._checkKey = function(key, ip, cb) { - if (key == this._options.key) { - if (!this._clients[key]) { - this._clients[key] = {}; +app._checkKey = function(key, ip, cb) { + if (key == this.options.key) { + if (!this._clients[key]) { + this._clients[key] = {}; + } + if (!this._outstanding[key]) { + this._outstanding[key] = {}; + } + if (!this._ips[ip]) { + this._ips[ip] = 0; + } + // Check concurrent limit + if (Object.keys(this._clients[key]).length >= this.options.concurrent_limit) { + cb('Server has reached its concurrent user limit'); + return; + } + if (this._ips[ip] >= this.options.ip_limit) { + cb(ip + ' has reached its concurrent user limit'); + return; + } + cb(null); + } else { + cb('Invalid key provided'); } - if (!this._outstanding[key]) { - this._outstanding[key] = {}; - } - if (!this._ips[ip]) { - this._ips[ip] = 0; - } - // Check concurrent limit - if (Object.keys(this._clients[key]).length >= this._options.concurrent_limit) { - cb('Server has reached its concurrent user limit'); - return; - } - if (this._ips[ip] >= this._options.ip_limit) { - cb(ip + ' has reached its concurrent user limit'); - return; - } - cb(null); - } else { - cb('Invalid key provided'); - } }; /** Initialize HTTP server routes. */ -PeerServer.prototype._initializeHTTP = function() { - var self = this; +app._initializeHTTP = function() { + var self = this; - this._app.use(restify.bodyParser({ mapParams: false })); - this._app.use(restify.queryParser()); - this._app.use(util.allowCrossDomain); + this.use(express.bodyParser({ mapParams: false })); + this.use(util.allowCrossDomain); - /** Hack from https://github.com/mcavage/node-restify/issues/284, until we switch to express */ - function unknownMethodHandler(req, res) { - if (req.method.toLowerCase() === 'options') { - var allowHeaders = ['Accept', 'Accept-Version', 'Content-Type', 'Api-Version']; + // Retrieve guaranteed random ID. + this.get(this.options.path + ':key/id', function(req, res, next) { + res.contentType = 'text/html'; + res.send(self._generateClientId(req.params.key)); + return next(); + }); - if (res.methods.indexOf('OPTIONS') === -1) res.methods.push('OPTIONS'); + // Server sets up HTTP streaming when you get post an ID. + this.post(this.options.path + ':key/:id/:token/id', function(req, res, next) { + var id = req.params.id; + var token = req.params.token; + var key = req.params.key; + var ip = req.connection.remoteAddress; - res.header('Access-Control-Allow-Credentials', true); - res.header('Access-Control-Allow-Headers', allowHeaders.join(', ')); - res.header('Access-Control-Allow-Methods', res.methods.join(', ')); - res.header('Access-Control-Allow-Origin', req.headers.origin); - - return res.send(204); - } else { - return res.send(new restify.MethodNotAllowedError()); - } - } - - this._app.on('MethodNotAllowed', unknownMethodHandler); - - // Retrieve guaranteed random ID. - this._app.get(this._options.path + ':key/id', function(req, res, next) { - res.contentType = 'text/html'; - res.send(self._generateClientId(req.params.key)); - return next(); - }); - - // Server sets up HTTP streaming when you get post an ID. - this._app.post(this._options.path + ':key/:id/:token/id', function(req, res, next) { - var id = req.params.id; - var token = req.params.token; - var key = req.params.key; - var ip = req.connection.remoteAddress; - - if (!self._clients[key] || !self._clients[key][id]) { - self._checkKey(key, ip, function(err) { - if (!err && !self._clients[key][id]) { - self._clients[key][id] = { token: token, ip: ip }; - self._ips[ip]++; - self._startStreaming(res, key, id, token, true); + if (!self._clients[key] || !self._clients[key][id]) { + self._checkKey(key, ip, function(err) { + if (!err && !self._clients[key][id]) { + self._clients[key][id] = { token: token, ip: ip }; + self._ips[ip]++; + self._startStreaming(res, key, id, token, true); + } else { + res.send(JSON.stringify({ type: 'HTTP-ERROR' })); + } + }); } else { - res.send(JSON.stringify({ type: 'HTTP-ERROR' })); - } - }); - } else { - self._startStreaming(res, key, id, token); - } - return next(); - }); - - // Get a list of all peers for a key, enabled by the `allowDiscovery` flag. - this._app.get(this._options.path + ':key/peers', function(req, res, next) { - var key = req.params.key; - if (self._clients[key]) { - self._checkAllowsDiscovery(key, function(isAllowed) { - if (isAllowed) { - res.send(Object.keys(self._clients[key])); - } else { - res.send(401); + self._startStreaming(res, key, id, token); } return next(); - }); - } else { - res.send(404); - return next(); - } - }); + }); - var handle = function(req, res, next) { - var key = req.params.key; - var id = req.params.id; + // Get a list of all peers for a key, enabled by the `allowDiscovery` flag. + this.get(this.options.path + ':key/peers', function(req, res, next) { + var key = req.params.key; + if (self._clients[key]) { + self._checkAllowsDiscovery(key, function(isAllowed) { + if (isAllowed) { + res.send(Object.keys(self._clients[key])); + } else { + res.send(401); + } + return next(); + }); + } else { + res.send(404); + return next(); + } + }); - var client; - if (!self._clients[key] || !(client = self._clients[key][id])) { - if (req.params.retry) { - res.send(401); + var handle = function(req, res, next) { + var key = req.params.key; + var id = req.params.id; + + var client; + if (!self._clients[key] || !(client = self._clients[key][id])) { + if (req.params.retry) { + res.send(401); + return next(); + } else { + // Retry this request + req.params.retry = true; + setTimeout(handle, 25, req, res); + return; + } + } + + // Auth the req + if (req.params.token !== client.token) { + res.send(401); + return; + } else { + self._handleTransmission(key, { + type: req.body.type, + src: id, + dst: req.body.dst, + payload: req.body.payload + }); + res.send(200); + } return next(); - } else { - // Retry this request - req.params.retry = true; - setTimeout(handle, 25, req, res); - return; - } - } + }; - // Auth the req - if (req.params.token !== client.token) { - res.send(401); - return; - } else { - self._handleTransmission(key, { - type: req.body.type, - src: id, - dst: req.body.dst, - payload: req.body.payload - }); - res.send(200); - } - return next(); - }; + this.post(this.options.path + ':key/:id/:token/offer', handle); - this._app.post(this._options.path + ':key/:id/:token/offer', handle); + this.post(this.options.path + ':key/:id/:token/candidate', handle); - this._app.post(this._options.path + ':key/:id/:token/candidate', handle); - - this._app.post(this._options.path + ':key/:id/:token/answer', handle); - - this._app.post(this._options.path + ':key/:id/:token/leave', handle); - - // Listen on user-specified port and IP address. - if (this._options.ip) { - this._app.listen(this._options.port, this._options.ip); - } else { - this._app.listen(this._options.port); - } + this.post(this.options.path + ':key/:id/:token/answer', handle); + this.post(this.options.path + ':key/:id/:token/leave', handle); }; /** Saves a streaming response and takes care of timeouts and headers. */ -PeerServer.prototype._startStreaming = function(res, key, id, token, open) { - var self = this; +app._startStreaming = function(res, key, id, token, open) { + var self = this; - res.writeHead(200, {'Content-Type': 'application/octet-stream'}); + res.writeHead(200, {'Content-Type': 'application/octet-stream'}); - var pad = '00'; - for (var i = 0; i < 10; i++) { - pad += pad; - } - res.write(pad + '\n'); + var pad = '00'; + for (var i = 0; i < 10; i++) { + pad += pad; + } + res.write(pad + '\n'); - if (open) { - res.write(JSON.stringify({ type: 'OPEN' }) + '\n'); - } + if (open) { + res.write(JSON.stringify({ type: 'OPEN' }) + '\n'); + } - var client = this._clients[key][id]; + var client = this._clients[key][id]; - if (token === client.token) { - // Client already exists - res.on('close', function() { - if (client.res === res) { - if (!client.socket) { - // No new request yet, peer dead - self._removePeer(key, id); - return; - } - delete client.res; - } - }); - client.res = res; - this._processOutstanding(key, id); - } else { - // ID-taken, invalid token - res.end(JSON.stringify({ type: 'HTTP-ERROR' })); - } + if (token === client.token) { + // Client already exists + res.on('close', function() { + if (client.res === res) { + if (!client.socket) { + // No new request yet, peer dead + self._removePeer(key, id); + return; + } + delete client.res; + } + }); + client.res = res; + this._processOutstanding(key, id); + } else { + // ID-taken, invalid token + res.end(JSON.stringify({ type: 'HTTP-ERROR' })); + } }; -PeerServer.prototype._pruneOutstanding = function() { - var keys = Object.keys(this._outstanding); - for (var k = 0, kk = keys.length; k < kk; k += 1) { - var key = keys[k]; - var dsts = Object.keys(this._outstanding[key]); - for (var i = 0, ii = dsts.length; i < ii; i += 1) { - var offers = this._outstanding[key][dsts[i]]; - var seen = {}; - for (var j = 0, jj = offers.length; j < jj; j += 1) { - var message = offers[j]; - if (!seen[message.src]) { - this._handleTransmission(key, { type: 'EXPIRE', src: message.dst, dst: message.src }); - seen[message.src] = true; +app._pruneOutstanding = function() { + var keys = Object.keys(this._outstanding); + for (var k = 0, kk = keys.length; k < kk; k += 1) { + var key = keys[k]; + var dsts = Object.keys(this._outstanding[key]); + for (var i = 0, ii = dsts.length; i < ii; i += 1) { + var offers = this._outstanding[key][dsts[i]]; + var seen = {}; + for (var j = 0, jj = offers.length; j < jj; j += 1) { + var message = offers[j]; + if (!seen[message.src]) { + this._handleTransmission(key, { type: 'EXPIRE', src: message.dst, dst: message.src }); + seen[message.src] = true; + } + } } - } + this._outstanding[key] = {}; } - this._outstanding[key] = {}; - } }; /** Cleanup */ -PeerServer.prototype._setCleanupIntervals = function() { - var self = this; +app._setCleanupIntervals = function() { + var self = this; - // Clean up ips every 10 minutes - setInterval(function() { - var keys = Object.keys(self._ips); - for (var i = 0, ii = keys.length; i < ii; i += 1) { - var key = keys[i]; - if (self._ips[key] === 0) { - delete self._ips[key]; - } - } - }, 600000); + // Clean up ips every 10 minutes + setInterval(function() { + var keys = Object.keys(self._ips); + for (var i = 0, ii = keys.length; i < ii; i += 1) { + var key = keys[i]; + if (self._ips[key] === 0) { + delete self._ips[key]; + } + } + }, 600000); - // Clean up outstanding messages every 5 seconds - setInterval(function() { - self._pruneOutstanding(); - }, 5000); + // Clean up outstanding messages every 5 seconds + setInterval(function() { + self._pruneOutstanding(); + }, 5000); }; /** Process outstanding peer offers. */ -PeerServer.prototype._processOutstanding = function(key, id) { - var offers = this._outstanding[key][id]; - if (!offers) { - return; - } - for (var j = 0, jj = offers.length; j < jj; j += 1) { - this._handleTransmission(key, offers[j]); - } - delete this._outstanding[key][id]; +app._processOutstanding = function(key, id) { + var offers = this._outstanding[key][id]; + if (!offers) { + return; + } + for (var j = 0, jj = offers.length; j < jj; j += 1) { + this._handleTransmission(key, offers[j]); + } + delete this._outstanding[key][id]; }; -PeerServer.prototype._removePeer = function(key, id) { - if (this._clients[key] && this._clients[key][id]) { - this._ips[this._clients[key][id].ip]--; - delete this._clients[key][id]; - this.emit('disconnect', id); - } +app._removePeer = function(key, id) { + if (this._clients[key] && this._clients[key][id]) { + this._ips[this._clients[key][id].ip]--; + delete this._clients[key][id]; + this.emit('disconnect', id); + } }; /** Handles passing on a message. */ -PeerServer.prototype._handleTransmission = function(key, message) { - var type = message.type; - var src = message.src; - var dst = message.dst; - var data = JSON.stringify(message); +app._handleTransmission = function(key, message) { + var type = message.type; + var src = message.src; + var dst = message.dst; + var data = JSON.stringify(message); - var destination = this._clients[key][dst]; + var destination = this._clients[key][dst]; - // User is connected! - if (destination) { - try { - util.log(type, 'from', src, 'to', dst); - if (destination.socket) { - destination.socket.send(data); - } else if (destination.res) { - data += '\n'; - destination.res.write(data); - } else { - // Neither socket no res available. Peer dead? - throw "Peer dead"; - } - } catch (e) { - // This happens when a peer disconnects without closing connections and - // the associated WebSocket has not closed. - util.prettyError(e); - // Tell other side to stop trying. - this._removePeer(key, dst); - this._handleTransmission(key, { - type: 'LEAVE', - src: dst, - dst: src - }); - } - } else { - // Wait for this client to connect/reconnect (XHR) for important - // messages. - if (type !== 'LEAVE' && type !== 'EXPIRE' && dst) { - var self = this; - if (!this._outstanding[key][dst]) { - this._outstanding[key][dst] = []; - } - this._outstanding[key][dst].push(message); - } else if (type === 'LEAVE' && !dst) { - this._removePeer(key, src); + // User is connected! + if (destination) { + try { + util.log(type, 'from', src, 'to', dst); + if (destination.socket) { + destination.socket.send(data); + } else if (destination.res) { + data += '\n'; + destination.res.write(data); + } else { + // Neither socket no res available. Peer dead? + throw "Peer dead"; + } + } catch (e) { + // This happens when a peer disconnects without closing connections and + // the associated WebSocket has not closed. + util.prettyError(e); + // Tell other side to stop trying. + this._removePeer(key, dst); + this._handleTransmission(key, { + type: 'LEAVE', + src: dst, + dst: src + }); + } } else { - // Unavailable destination specified with message LEAVE or EXPIRE - // Ignore + // Wait for this client to connect/reconnect (XHR) for important + // messages. + if (type !== 'LEAVE' && type !== 'EXPIRE' && dst) { + var self = this; + if (!this._outstanding[key][dst]) { + this._outstanding[key][dst] = []; + } + this._outstanding[key][dst].push(message); + } else if (type === 'LEAVE' && !dst) { + this._removePeer(key, src); + } else { + // Unavailable destination specified with message LEAVE or EXPIRE + // Ignore + } } - } }; -PeerServer.prototype._generateClientId = function(key) { - var clientId = util.randomId(); - if (!this._clients[key]) { +app._generateClientId = function(key) { + var clientId = util.randomId(); + if (!this._clients[key]) { + return clientId; + } + while (!!this._clients[key][clientId]) { + clientId = util.randomId(); + } return clientId; - } - while (!!this._clients[key][clientId]) { - clientId = util.randomId(); - } - return clientId; }; - -exports.PeerServer = PeerServer; diff --git a/package.json b/package.json index 3cd6bc2..5a81c9b 100644 --- a/package.json +++ b/package.json @@ -2,7 +2,7 @@ "name": "peer", "version": "0.2.7", "description": "PeerJS server component", - "main": "lib/server.js", + "main": "lib/index.js", "bin": { "peerjs": "./bin/peerjs" }, @@ -13,9 +13,10 @@ "author": "Michelle Bu, Eric Zhang", "license": "MIT", "dependencies": { - "restify": "~2.6.0", - "ws": "~0.4.25", - "optimist": "*" + "express": "~3.5.1", + "optimist": "~0.6.1", + "utils-merge": "~1.0.0", + "ws": "~0.4.25" }, "devDependencies": { "expect.js": "*", diff --git a/test/server.js b/test/server.js index e40d858..c74f831 100644 --- a/test/server.js +++ b/test/server.js @@ -3,210 +3,187 @@ var expect = require('expect.js'); var sinon = require('sinon'); describe('PeerServer', function() { - describe('constructor', function() { - before(function() { - PeerServer.prototype._initializeWSS = sinon.stub(); - PeerServer.prototype._initializeHTTP = sinon.stub(); + describe('#_initializeWSS', function() { + WebSocketServer = sinon.stub(); + }); - it('should be able to be created without the `new` keyword', function() { - var p = PeerServer(); - expect(p.constructor).to.be(PeerServer); + describe('#_configureWS', function() { + }); - it('should default to port 80, key `peerjs`', function() { - var p = new PeerServer(); - expect(p._options.key).to.be('peerjs'); - expect(p._options.port).to.be(80); + describe('#_checkKey', function() { + var p; + before(function() { + PeerServer.prototype._initializeHTTP = sinon.stub(); + p = PeerServer({ port: 8000 }); + p._checkKey('peerjs', 'myip', function() {}); + }); + + it('should reject keys that are not the default', function(done) { + p._checkKey('bad key', null, function(response) { + expect(response).to.be('Invalid key provided'); + done(); + }); + }); + + it('should accept valid key/ip pairs', function(done) { + p._checkKey('peerjs', 'myip', function(response) { + expect(response).to.be(null); + done(); + }); + }); + + it('should reject ips that are at their limit', function(done) { + p.options.ip_limit = 0; + p._checkKey('peerjs', 'myip', function(response) { + expect(response).to.be('myip has reached its concurrent user limit'); + done(); + }); + }); + + it('should reject when the server is at its limit', function(done) { + p.options.concurrent_limit = 0; + p._checkKey('peerjs', 'myip', function(response) { + expect(response).to.be('Server has reached its concurrent user limit'); + done(); + }); + }); + }); - it('should accept a custom port', function() { - var p = new PeerServer({ port: 8000 }); - expect(p._options.port).to.be(8000); - }); - }); + describe('#_initializeHTTP', function() { - describe('#_initializeWSS', function() { - WebSocketServer = sinon.stub(); - - }); - - describe('#_configureWS', function() { - - }); - - describe('#_checkKey', function() { - var p; - before(function() { - PeerServer.prototype._initializeHTTP = sinon.stub(); - p = new PeerServer({ port: 8000 }); - p._checkKey('peerjs', 'myip', function() {}); }); - it('should reject keys that are not the default', function(done) { - p._checkKey('bad key', null, function(response) { - expect(response).to.be('Invalid key provided'); - done(); - }); + describe('#_startStreaming', function() { + }); - it('should accept valid key/ip pairs', function(done) { - p._checkKey('peerjs', 'myip', function(response) { - expect(response).to.be(null); - done(); - }); + describe('#_pruneOutstanding', function() { + }); - it('should reject ips that are at their limit', function(done) { - p._options.ip_limit = 0; - p._checkKey('peerjs', 'myip', function(response) { - expect(response).to.be('myip has reached its concurrent user limit'); - done(); - }); + describe('#_processOutstanding', function() { + }); - it('should reject when the server is at its limit', function(done) { - p._options.concurrent_limit = 0; - p._checkKey('peerjs', 'myip', function(response) { - expect(response).to.be('Server has reached its concurrent user limit'); - done(); - }); + describe('#_removePeer', function() { + var p; + before(function() { + PeerServer.prototype._initializeHTTP = sinon.stub(); + p = PeerServer({ port: 8000 }); + + var fake = {ip: '0.0.0.0'}; + p._ips[fake.ip] = 1; + p._clients['peerjs'] = {}; + p._clients['peerjs']['test'] = fake; + }); + + it('should decrement the number of ips being used and remove the connection', function() { + expect(p._ips['0.0.0.0']).to.be(1); + p._removePeer('peerjs', 'test'); + expect(p._ips['0.0.0.0']).to.be(0); + expect(p._clients['peerjs']['test']).to.be(undefined); + }); }); - }); + describe('#_handleTransmission', function() { + var p; + var KEY = 'peerjs'; + var ID = 'test'; + before(function() { + PeerServer.prototype._initializeHTTP = sinon.stub(); + p = PeerServer({ port: 8000 }); + p._clients[KEY] = {}; + }); - describe('#_initializeHTTP', function() { + it('should send to the socket when appropriate', function() { + var send = sinon.spy(); + var write = sinon.spy(); + var message = {dst: ID}; + p._clients[KEY][ID] = { + socket: { + send: send + }, + res: { + write: write + } + } + p._handleTransmission(KEY, message); + expect(send.calledWith(JSON.stringify(message))).to.be(true); + expect(write.calledWith(JSON.stringify(message))).to.be(false); + }); - }); + it('should write to the response with a newline when appropriate', function() { + var write = sinon.spy(); + var message = {dst: ID}; + p._clients[KEY][ID] = { + res: { + write: write + } + } + p._handleTransmission(KEY, message); + expect(write.calledWith(JSON.stringify(message) + '\n')).to.be(true); + }); - describe('#_startStreaming', function() { + // no destination. + it('should push to outstanding messages if the destination is not found', function() { + var message = {dst: ID}; + p._outstanding[KEY] = {}; + p._clients[KEY] = {}; + p._handleTransmission(KEY, message); + expect(p._outstanding[KEY][ID][0]).to.be(message); + }); - }); + it('should not push to outstanding messages if the message is a LEAVE or EXPIRE', function() { + var message = {dst: ID, type: 'LEAVE'}; + p._outstanding[KEY] = {}; + p._clients[KEY] = {}; + p._handleTransmission(KEY, message); + expect(p._outstanding[KEY][ID]).to.be(undefined); - describe('#_pruneOutstanding', function() { + message = {dst: ID, type: 'EXPIRE'}; + p._handleTransmission(KEY, message); + expect(p._outstanding[KEY][ID]).to.be(undefined); + }); - }); + it('should remove the peer if there is no dst in the message', function() { + var message = {type: 'LEAVE'}; + p._removePeer = sinon.spy(); + p._outstanding[KEY] = {}; + p._handleTransmission(KEY, message); + expect(p._removePeer.calledWith(KEY, undefined)).to.be(true); + }); - describe('#_processOutstanding', function() { - - }); - - describe('#_removePeer', function() { - var p; - before(function() { - PeerServer.prototype._initializeHTTP = sinon.stub(); - p = new PeerServer({ port: 8000 }); - - var fake = {ip: '0.0.0.0'}; - p._ips[fake.ip] = 1; - p._clients['peerjs'] = {}; - p._clients['peerjs']['test'] = fake; + it('should remove the peer and send a LEAVE message if the socket appears to be closed', function() { + var send = sinon.stub().throws(); + var message = {dst: ID}; + var leaveMessage = {type: 'LEAVE', dst: undefined, src: ID}; + var oldHandleTransmission = p._handleTransmission; + p._removePeer = function() { + // Hacks! + p._handleTransmission = sinon.spy(); + }; + p._clients[KEY][ID] = { + socket: { + send: send + } + } + p._handleTransmission(KEY, message); + expect(p._handleTransmission.calledWith(KEY, leaveMessage)).to.be(true); + }); }); - it('should decrement the number of ips being used and remove the connection', function() { - expect(p._ips['0.0.0.0']).to.be(1); - p._removePeer('peerjs', 'test'); - expect(p._ips['0.0.0.0']).to.be(0); - expect(p._clients['peerjs']['test']).to.be(undefined); - }); - }); + describe('#_generateClientId', function() { + var p; + before(function() { + PeerServer.prototype._initializeHTTP = sinon.stub(); + p = PeerServer({ port: 8000 }); + }); - describe('#_handleTransmission', function() { - var p; - var KEY = 'peerjs'; - var ID = 'test'; - before(function() { - PeerServer.prototype._initializeHTTP = sinon.stub(); - p = new PeerServer({ port: 8000 }); - p._clients[KEY] = {}; + it('should generate a 16-character ID', function() { + expect(p._generateClientId('anykey').length).to.be.within(15, 16); + }); }); - - it('should send to the socket when appropriate', function() { - var send = sinon.spy(); - var write = sinon.spy(); - var message = {dst: ID}; - p._clients[KEY][ID] = { - socket: { - send: send - }, - res: { - write: write - } - } - p._handleTransmission(KEY, message); - expect(send.calledWith(JSON.stringify(message))).to.be(true); - expect(write.calledWith(JSON.stringify(message))).to.be(false); - }); - - it('should write to the response with a newline when appropriate', function() { - var write = sinon.spy(); - var message = {dst: ID}; - p._clients[KEY][ID] = { - res: { - write: write - } - } - p._handleTransmission(KEY, message); - expect(write.calledWith(JSON.stringify(message) + '\n')).to.be(true); - }); - - // no destination. - it('should push to outstanding messages if the destination is not found', function() { - var message = {dst: ID}; - p._outstanding[KEY] = {}; - p._clients[KEY] = {}; - p._handleTransmission(KEY, message); - expect(p._outstanding[KEY][ID][0]).to.be(message); - }); - - it('should not push to outstanding messages if the message is a LEAVE or EXPIRE', function() { - var message = {dst: ID, type: 'LEAVE'}; - p._outstanding[KEY] = {}; - p._clients[KEY] = {}; - p._handleTransmission(KEY, message); - expect(p._outstanding[KEY][ID]).to.be(undefined); - - message = {dst: ID, type: 'EXPIRE'}; - p._handleTransmission(KEY, message); - expect(p._outstanding[KEY][ID]).to.be(undefined); - }); - - it('should remove the peer if there is no dst in the message', function() { - var message = {type: 'LEAVE'}; - p._removePeer = sinon.spy(); - p._outstanding[KEY] = {}; - p._handleTransmission(KEY, message); - expect(p._removePeer.calledWith(KEY, undefined)).to.be(true); - }); - - it('should remove the peer and send a LEAVE message if the socket appears to be closed', function() { - var send = sinon.stub().throws(); - var message = {dst: ID}; - var leaveMessage = {type: 'LEAVE', dst: undefined, src: ID}; - var oldHandleTransmission = p._handleTransmission; - p._removePeer = function() { - // Hacks! - p._handleTransmission = sinon.spy(); - }; - p._clients[KEY][ID] = { - socket: { - send: send - } - } - p._handleTransmission(KEY, message); - expect(p._handleTransmission.calledWith(KEY, leaveMessage)).to.be(true); - }); - }); - - describe('#_generateClientId', function() { - var p; - before(function() { - PeerServer.prototype._initializeHTTP = sinon.stub(); - p = new PeerServer({ port: 8000 }); - }); - - it('should generate a 16-character ID', function() { - expect(p._generateClientId('anykey').length).to.be(16); - }); - }); });