From d4ccb5d7df618eb7b956b5bebf3642d8a4927447 Mon Sep 17 00:00:00 2001 From: ericz Date: Tue, 12 Feb 2013 16:30:39 -0800 Subject: [PATCH] many updates --- lib/server.js | 302 ++++++++++++++++++++++++++------------------------ lib/util.js | 7 ++ 2 files changed, 162 insertions(+), 147 deletions(-) diff --git a/lib/server.js b/lib/server.js index 13e53d2..d4700ce 100644 --- a/lib/server.js +++ b/lib/server.js @@ -13,8 +13,6 @@ function PeerServer(options) { this._app = express(); this._httpServer = http.createServer(this._app); - this._app.use(express.bodyParser()); - this._app.use(this._allowCrossDomain); this._options = util.extend({ port: 80, @@ -24,17 +22,11 @@ function PeerServer(options) { util.debug = this._options.debug; - // Listen on user-specified port and create WebSocket server as well. - this._httpServer.listen(this._options.port); - this._wss = new WebSocketServer({ path: '/ws', server: this._httpServer }); - - // WebSockets that are opened or HTTP responses (which are paired with - // something in timeouts. + // Connected clients this._clients = {}; - // Timeouts for HTTP responses. - this._timeouts = {}; - // Connections waiting for another peer. - this._outstandingOffers = {}; + + // Messages waiting for another peer. + this._outstanding = {}; // Initailize WebSocket server handlers. this._initializeWSS(); @@ -46,46 +38,53 @@ function PeerServer(options) { util.inherits(PeerServer, EventEmitter); -/** Handle CORS */ -PeerServer.prototype._allowCrossDomain = function(req, res, next) { - res.setHeader('Access-Control-Allow-Origin', '*'); - res.setHeader('Access-Control-Allow-Methods', 'GET,PUT,POST,DELETE'); - res.setHeader('Access-Control-Allow-Headers', 'Content-Type'); - - next(); -} /** Initialize WebSocket server. */ PeerServer.prototype._initializeWSS = function() { var self = this; + + // Create WebSocket server as well. + this._wss = new WebSocketServer({ path: '/ws', server: this._httpServer }); + + this._wss.on('connection', function(socket) { - var id = url.parse(socket.upgradeReq.url, true).query.id; - if (!!id && !!self._clients[id]) { - // If response client and timeout exist, overwrite and clear. - if (!!self._timeouts[id]) { - clearTimeout(self._timeouts[id]); - delete self._timeouts[id]; - self._clients[id].end(JSON.stringify({ type: 'HTTP-SOCKET' })); - } else { - socket.send(JSON.stringify({ type: 'ID-TAKEN', msg: 'ID is taken' })); - socket.close(); - return; + var query = url.parse(socket.upgradeReq.url, true).query; + var id = query.id || self.generateClientId(); + var token = query.token; + + var client = self._clients[id]; + + if (!client) { + // New client, save info + client = { token: token }; + self._clients[id] = client; + socket.send(JSON.stringify({ type: 'OPEN', id: id })); + } + + if (token === client.token) { + // res 'close' event will delete client.res for us + client.socket = socket; + // Client already exists + if (client.res) { + client.res.end(); } - } else if (id === undefined) { - id = self._generateClientId(); + } else { + // ID-taken, invalid token + socket.send(JSON.stringify({ type: 'ID-TAKEN', payload: { msg: 'ID is taken' } })); + socket.close(); + return; } - socket.send(JSON.stringify({ type: 'OPEN', id: id })); - // Save the socket for this id. - self._clients[id] = socket; - - self._processOutstandingOffers(id); + self._processOutstanding(id); // Cleanup after a socket closes. socket.on('close', function() { util.log('Socket closed:', id); - self._removePeer(id); + if (client.socket == socket) { + self._removePeer(id); + } }); + // Handle messages from peers. socket.on('message', function(data) { try { @@ -105,10 +104,14 @@ PeerServer.prototype._initializeWSS = function() { case 'OFFER': case 'ANSWER': // Firefoxism (connectDataConnection ports) - case 'PORT': + // case 'PORT': // Use the ID we know to be correct to prevent spoofing. - message.src = id; - self._handleTransmission(message); + self._handleTransmission({ + type: message.type, + src: id, + dst: message.dst, + payload: msg.payload + }); break; default: util.prettyError('Message unrecognized'); @@ -121,26 +124,13 @@ PeerServer.prototype._initializeWSS = function() { }; -/** Process outstanding peer offers. */ -PeerServer.prototype._processOutstandingOffers = function(id) { - var offers = this._outstandingOffers[id]; - if (offers === undefined) { - return; - } - var sources = Object.keys(offers); - for (var i = 0, ii = sources.length; i < ii; i += 1) { - var messages = offers[sources[i]]; - for (var j = 0, jj = messages.length; j < jj; j += 1) { - this._handleTransmission.apply(this, messages[j]); - } - delete this._outstandingOffers[id][sources[i]]; - } -}; - /** Initialize HTTP server routes. */ PeerServer.prototype._initializeHTTP = function() { var self = this; - + + this._app.use(express.bodyParser()); + this._app.use(util.allowCrossDomain); + this._app.options('/*', function(req, res, next) { res.send(200); }); @@ -148,82 +138,122 @@ PeerServer.prototype._initializeHTTP = function() { // Server sets up HTTP streaming whether you get or post an ID. // Retrieve guaranteed random ID. this._app.get('/id', function(req, res) { - var clientId = util.randomId(); - while (!!self._clients[clientId]) { - clientId = util.randomId(); - } - self._startStreaming(res, clientId, function() { - // Chrome hacks. - res.write('{"id":"' + clientId + '"}\n'); - }); + var id = self.generateClientId(); + var token = req.query.token; + self._startStreaming(res, id, token); }); this._app.post('/id', function(req, res) { - var id = req.body.id; - self._startStreaming(res, id); + self._startStreaming(res, req.body.id, req.body.token); }); - this._app.post('/offer', function(req, res) { - self._handleTransmission(req.body, res); - }); + + var handle = function(req, res){ + var client = self._clients[req.body.id]; + // Auth the req + if (!client || req.body.token !== client.token) { + res.send(401); + return; + } else { + self._handleTransmission({ + type: req.body.type, + src: client.id, + dst: req.body.dst, + payload: req.body.payload + }); + res.send(200); + } + }; + + this._app.post('/offer', handle); - this._app.post('/candidate', function(req, res) { - self._handleTransmission(req.body, res); - }); + this._app.post('/candidate', handle); - this._app.post('/answer', function(req, res) { - self._handleTransmission(req.body, res); - }); + this._app.post('/answer', handle); - this._app.post('/leave', function(req, res) { - self._handleTransmission(req.body, res); - }); + this._app.post('/leave', handle); - this._app.post('/port', function(req, res) { - self._handleTransmission(req.body, res); - }); -}; - -PeerServer.prototype._removePeer = function(id) { - delete this._clients[id]; - if (this._timeouts[id]) { - clearTimeout(this._timeouts[id]); - delete this._timeouts[id]; - } + this._app.post('/port', handle); + + // Listen on user-specified port and + this._httpServer.listen(this._options.port); + }; /** Saves a streaming response and takes care of timeouts and headers. */ -PeerServer.prototype._startStreaming = function(res, id, write) { +PeerServer.prototype._startStreaming = function(res, id, token) { res.writeHead(200, {'Content-Type': 'application/octet-stream'}); - if (!!write) { - write(); - } + var client = this._clients[id]; + + // Save res so we can write to it. + if (!client) { + // New client, save info + client = { token: token }; + this._clients[id] = client; + res.write(JSON.stringify({ type: 'OPEN', id: id }) + '\n'); + } + var pad = '00'; - var iterations = 10; - for (var i = 0; i < iterations; i++) { + for (var i = 0; i < 10; i++) { pad += pad; } res.write(pad + '\n'); - // Save res so we can write to it. - if (!this._clients[id]) { - this._clients[id] = res; - // Set timeout to expire. - var self = this; - this._timeouts[id] = setTimeout(function() { - self._removePeer(id); - res.end(JSON.stringify({ type: 'HTTP-END' })); - }, 30000); - this._processOutstandingOffers(id); + if (token === client.token) { + // Client already exists + res.on('close', function(){ + if (client.res === res) { + if (!client.socket) { + // No new request yet, peer dead + self._remotePeer(id); + return; + } + delete client.res; + } + }); + client.res = res; + this._processOutstanding(id); } else { + // ID-taken, invalid token res.end(JSON.stringify({ type: 'HTTP-ERROR' })); } +}; +PeerServer.prototype._pruneOutstanding = function() { + var dsts = Object.keys(this._outstanding); + for (var i = 0, ii = dsts.length; i < ii; i++) { + var offers = this._outstanding[dsts[i]]; + var seen = {}; + for (var j = 0, jj = offers.length; j < jj; j++) { + var message = offers[j]; + if (!seen[message.src]) { + this._handleTransmission({ type: 'EXPIRE', src: message.dst, dst: message.src }); + seen[message.src] = true; + } + } + } + this._outstanding = []; +} + +/** Process outstanding peer offers. */ +PeerServer.prototype._processOutstanding = function(id) { + var offers = this._outstanding[id]; + if (!offers) { + return; + } + for (var j = 0, jj = offers.length; j < jj; j += 1) { + this._handleTransmission(offers[j]); + } + delete this._outstanding[id]; +}; + +PeerServer.prototype._removePeer = function(id) { + delete this._clients[id]; }; /** Handles passing on a message. */ -PeerServer.prototype._handleTransmission = function(message, res) { +PeerServer.prototype._handleTransmission = function(message) { var type = message.type; var src = message.src; var dst = message.dst; @@ -231,16 +261,17 @@ PeerServer.prototype._handleTransmission = function(message, res) { var destination = this._clients[dst]; - if (!!destination) { + // User is connected! + if (destination) { try { - if (this._timeouts[dst]) { + if (destination.socket) { + destination.socket.send(data); + } else if (destination.res) { data += '\n'; - destination.write(data); + destination.res.write(data); } else { - destination.send(data); - } - if (!!res) { - res.send(200); + // Neither socket no res available. Peer dead? + throw "Peer dead" } } catch (e) { // This happens when a peer disconnects without closing connections and @@ -253,44 +284,21 @@ PeerServer.prototype._handleTransmission = function(message, res) { src: dst, dst: src }); - if (!!res) res.send(501); } } else { - // Wait 5 seconds for this client to connect/reconnect (XHR) for important + // Wait for this client to connect/reconnect (XHR) for important // messages. - if (type !== 'LEAVE') { + if (type !== 'LEAVE' && type !== 'EXPIRE' && !!dst) { var self = this; - if (!this._outstandingOffers[dst]) { - this._outstandingOffers[dst] = {}; + if (!this._outstanding[dst]) { + this._outstanding[dst] = []; } - if (!this._outstandingOffers[dst][src]) { - this._outstandingOffers[dst][src] = []; - setTimeout(function() { - if(!!self._outstandingOffers[dst][src]) { - delete self._outstandingOffers[dst][src]; - self._handleTransmission({ - type: 'EXPIRE', - src: dst, - dst: src - }); - } - }, this._options.timeout); - } - this._outstandingOffers[dst][src].push([message]); - if (!!res) res.send(200); + this._outstanding[dst].push(message); } else if (type === 'LEAVE' && !dst) { this._removePeer(src); - if (!!res) res.send(200); - } else if (src) { - // Assume a disconnect if the client no longer exists. - // Unless it's a message from the server. - this._handleTransmission({ - type: 'EXPIRE', - src: dst, - dst: src - }); - // 410: Resource not available. - if (!!res) res.send(410); + } else { + // Unavailable destination specified with message LEAVE or EXPIRE + // Ignore } } }; diff --git a/lib/util.js b/lib/util.js index 98842a8..c6ec76e 100644 --- a/lib/util.js +++ b/lib/util.js @@ -36,6 +36,13 @@ var util = { } console.log.apply(console, copy); } + }, + _allowCrossDomain: function(req, res, next) { + res.setHeader('Access-Control-Allow-Origin', '*'); + res.setHeader('Access-Control-Allow-Methods', 'GET,PUT,POST,DELETE'); + res.setHeader('Access-Control-Allow-Headers', 'Content-Type'); + + next(); } };