Merge pull request #140 from peers/refactoring/ts

[WIP] Convert to TypeScript
This commit is contained in:
Alex Sosnovskiy 2019-12-15 14:20:45 +03:00 committed by GitHub
commit 37ee973e55
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
71 changed files with 4686 additions and 1732 deletions

2
.eslintignore Normal file
View File

@ -0,0 +1,2 @@
src/
dist/

View File

@ -1,6 +1,9 @@
{
"parser": "@typescript-eslint/parser",
"extends": [
"eslint:recommended"
"eslint:recommended",
"plugin:@typescript-eslint/eslint-recommended",
"plugin:@typescript-eslint/recommended"
],
"env": {
"node": true,
@ -9,11 +12,12 @@
},
"parserOptions": {
"ecmaVersion": 2018,
"sourceType": "module",
"allowImportExportEverywhere": true
"sourceType": "module"
},
"rules": {
"no-var": "error",
"no-console": "off"
"no-console": "off",
"@typescript-eslint/camelcase": "off",
"@typescript-eslint/interface-name-prefix": "off"
}
}

4
.gitignore vendored
View File

@ -14,4 +14,6 @@ results
node_modules
npm-debug.log
.idea
.idea
.cache
.vscode

View File

@ -1,69 +1,70 @@
#!/usr/bin/env node
// tslint:disable
const path = require('path');
const pkg = require('../package.json');
const fs = require('fs');
const path = require("path");
const pkg = require("../package.json");
const fs = require("fs");
const version = pkg.version;
const PeerServer = require('../src').PeerServer;
const opts = require('optimist')
.usage('Usage: $0')
const { PeerServer } = require("../dist/src");
const opts = require("optimist")
.usage("Usage: $0")
.options({
expire_timeout: {
demand: false,
alias: 't',
description: 'timeout (milliseconds)',
alias: "t",
description: "timeout (milliseconds)",
default: 5000
},
concurrent_limit: {
demand: false,
alias: 'c',
description: 'concurrent limit',
alias: "c",
description: "concurrent limit",
default: 5000
},
alive_timeout: {
demand: false,
description: 'broken connection check timeout (milliseconds)',
description: "broken connection check timeout (milliseconds)",
default: 60000
},
key: {
demand: false,
alias: 'k',
description: 'connection key',
default: 'peerjs'
alias: "k",
description: "connection key",
default: "peerjs"
},
sslkey: {
demand: false,
description: 'path to SSL key'
description: "path to SSL key"
},
sslcert: {
demand: false,
description: 'path to SSL certificate'
description: "path to SSL certificate"
},
port: {
demand: true,
alias: 'p',
description: 'port'
alias: "p",
description: "port"
},
path: {
demand: false,
description: 'custom path',
default: '/'
description: "custom path",
default: "/"
},
allow_discovery: {
demand: false,
description: 'allow discovery of peers'
description: "allow discovery of peers"
},
proxied: {
demand: false,
description: 'Set true if PeerServer stays behind a reverse proxy',
description: "Set true if PeerServer stays behind a reverse proxy",
default: false
}
})
.boolean('allow_discovery')
.boolean("allow_discovery")
.argv;
process.on('uncaughtException', function (e) {
console.error('Error: ' + e);
process.on("uncaughtException", function (e) {
console.error("Error: " + e);
});
if (opts.sslkey || opts.sslcert) {
@ -76,8 +77,8 @@ if (opts.sslkey || opts.sslcert) {
delete opts.sslkey;
delete opts.sslcert;
} else {
console.error('Warning: PeerServer will not run because either ' +
'the key or the certificate has not been provided.');
console.error("Warning: PeerServer will not run because either " +
"the key or the certificate has not been provided.");
process.exit(1);
}
}
@ -88,15 +89,15 @@ const server = PeerServer(opts, server => {
const port = server.address().port;
console.log(
'Started PeerServer on %s, port: %s, path: %s (v. %s)',
host, port, userPath || '/', version
"Started PeerServer on %s, port: %s, path: %s (v. %s)",
host, port, userPath || "/", version
);
});
server.on('connection', client => {
server.on("connection", client => {
console.log(`Client connected: ${client.getId()}`);
});
server.on('disconnect', client => {
server.on("disconnect", client => {
console.log(`Client disconnected: ${client.getId()}`);
});

View File

@ -1,15 +0,0 @@
module.exports = {
port: 9000,
expire_timeout: 5000,
alive_timeout: 60000,
key: 'peerjs',
path: '/myapp',
concurrent_limit: 5000,
allow_discovery: false,
proxied: false,
cleanup_out_msgs: 1000,
ssl: {
key: '',
cert: ''
}
};

5
dist/app.json vendored Normal file
View File

@ -0,0 +1,5 @@
{
"name": "PeerJS Server",
"description": "A server side element to broker connections between PeerJS clients.",
"website": "http://peerjs.com/"
}

18
dist/config/index.js vendored Normal file
View File

@ -0,0 +1,18 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const defaultConfig = {
port: 9000,
expire_timeout: 5000,
alive_timeout: 60000,
key: "peerjs",
path: "/myapp",
concurrent_limit: 5000,
allow_discovery: false,
proxied: false,
cleanup_out_msgs: 1000,
ssl: {
key: "",
cert: ""
}
};
exports.default = defaultConfig;

24
dist/src/api/index.js vendored Normal file
View File

@ -0,0 +1,24 @@
"use strict";
var __importDefault = (this && this.__importDefault) || function (mod) {
return (mod && mod.__esModule) ? mod : { "default": mod };
};
Object.defineProperty(exports, "__esModule", { value: true });
const body_parser_1 = __importDefault(require("body-parser"));
const cors_1 = __importDefault(require("cors"));
const express_1 = __importDefault(require("express"));
const app_json_1 = __importDefault(require("../../app.json"));
const auth_1 = require("./middleware/auth");
const calls_1 = __importDefault(require("./v1/calls"));
const public_1 = __importDefault(require("./v1/public"));
exports.Api = ({ config, realm, messageHandler }) => {
const authMiddleware = new auth_1.AuthMiddleware(config, realm);
const app = express_1.default.Router();
const jsonParser = body_parser_1.default.json();
app.use(cors_1.default());
app.get("/", (_, res) => {
res.send(app_json_1.default);
});
app.use("/:key", public_1.default({ config, realm }));
app.use("/:key/:id/:token", authMiddleware.handle, jsonParser, calls_1.default({ realm, messageHandler }));
return app;
};

27
dist/src/api/middleware/auth/index.js vendored Normal file
View File

@ -0,0 +1,27 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const enums_1 = require("../../../enums");
class AuthMiddleware {
constructor(config, realm) {
this.config = config;
this.realm = realm;
}
handle(req, res, next) {
const { id, token, key } = req.params;
if (key !== this.config.key) {
return res.status(401).send(enums_1.Errors.INVALID_KEY);
}
if (!id) {
return res.sendStatus(401);
}
const client = this.realm.getClientById(id);
if (!client) {
return res.sendStatus(401);
}
if (client.getToken() && token !== client.getToken()) {
return res.status(401).send(enums_1.Errors.INVALID_TOKEN);
}
next();
}
}
exports.AuthMiddleware = AuthMiddleware;

2
dist/src/api/middleware/middleware.js vendored Normal file
View File

@ -0,0 +1,2 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });

32
dist/src/api/v1/calls/index.js vendored Normal file
View File

@ -0,0 +1,32 @@
"use strict";
var __importDefault = (this && this.__importDefault) || function (mod) {
return (mod && mod.__esModule) ? mod : { "default": mod };
};
Object.defineProperty(exports, "__esModule", { value: true });
const express_1 = __importDefault(require("express"));
exports.default = ({ realm, messageHandler }) => {
const app = express_1.default.Router();
const handle = (req, res, next) => {
const { id } = req.params;
if (!id)
return next();
const client = realm.getClientById(id);
if (!client) {
throw new Error(`client not found:${id}`);
}
const { type, dst, payload } = req.body;
const message = {
type,
src: id,
dst,
payload
};
messageHandler.handle(client, message);
res.sendStatus(200);
};
app.post("/offer", handle);
app.post("/candidate", handle);
app.post("/answer", handle);
app.post("/leave", handle);
return app;
};

23
dist/src/api/v1/public/index.js vendored Normal file
View File

@ -0,0 +1,23 @@
"use strict";
var __importDefault = (this && this.__importDefault) || function (mod) {
return (mod && mod.__esModule) ? mod : { "default": mod };
};
Object.defineProperty(exports, "__esModule", { value: true });
const express_1 = __importDefault(require("express"));
exports.default = ({ config, realm }) => {
const app = express_1.default.Router();
// Retrieve guaranteed random ID.
app.get("/id", (_, res) => {
res.contentType("html");
res.send(realm.generateClientId());
});
// Get a list of all peers for a key, enabled by the `allowDiscovery` flag.
app.get("/peers", (_, res) => {
if (config.allow_discovery) {
const clientsIds = realm.getClientsIds();
return res.send(clientsIds);
}
res.sendStatus(401);
});
return app;
};

18
dist/src/config/index.js vendored Normal file
View File

@ -0,0 +1,18 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const defaultConfig = {
port: 9000,
expire_timeout: 5000,
alive_timeout: 60000,
key: "peerjs",
path: "/myapp",
concurrent_limit: 5000,
allow_discovery: false,
proxied: false,
cleanup_out_msgs: 1000,
ssl: {
key: "",
cert: ""
}
};
exports.default = defaultConfig;

21
dist/src/enums.js vendored Normal file
View File

@ -0,0 +1,21 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
var Errors;
(function (Errors) {
Errors["INVALID_KEY"] = "Invalid key provided";
Errors["INVALID_TOKEN"] = "Invalid token provided";
Errors["INVALID_WS_PARAMETERS"] = "No id, token, or key supplied to websocket server";
Errors["CONNECTION_LIMIT_EXCEED"] = "Server has reached its concurrent user limit";
})(Errors = exports.Errors || (exports.Errors = {}));
var MessageType;
(function (MessageType) {
MessageType["OPEN"] = "OPEN";
MessageType["LEAVE"] = "LEAVE";
MessageType["CANDIDATE"] = "CANDIDATE";
MessageType["OFFER"] = "OFFER";
MessageType["ANSWER"] = "ANSWER";
MessageType["EXPIRE"] = "EXPIRE";
MessageType["HEARTBEAT"] = "HEARTBEAT";
MessageType["ID_TAKEN"] = "ID-TAKEN";
MessageType["ERROR"] = "ERROR";
})(MessageType = exports.MessageType || (exports.MessageType = {}));

101
dist/src/index.js vendored Normal file
View File

@ -0,0 +1,101 @@
"use strict";
var __importDefault = (this && this.__importDefault) || function (mod) {
return (mod && mod.__esModule) ? mod : { "default": mod };
};
Object.defineProperty(exports, "__esModule", { value: true });
const express_1 = __importDefault(require("express"));
const http_1 = __importDefault(require("http"));
const https_1 = __importDefault(require("https"));
const api_1 = require("./api");
const config_1 = __importDefault(require("./config"));
const messageHandler_1 = require("./messageHandler");
const realm_1 = require("./models/realm");
const checkBrokenConnections_1 = require("./services/checkBrokenConnections");
const messagesExpire_1 = require("./services/messagesExpire");
const webSocketServer_1 = require("./services/webSocketServer");
const init = ({ app, server, options }) => {
const config = options;
const realm = new realm_1.Realm();
const messageHandler = new messageHandler_1.MessageHandler(realm);
const api = api_1.Api({ config, realm, messageHandler });
const messagesExpire = new messagesExpire_1.MessagesExpire({ realm, config, messageHandler });
const checkBrokenConnections = new checkBrokenConnections_1.CheckBrokenConnections({
realm,
config,
onClose: (client) => {
app.emit("disconnect", client);
}
});
app.use(options.path, api);
const wss = new webSocketServer_1.WebSocketServer({
server,
realm,
config
});
wss.on("connection", (client) => {
const messageQueue = realm.getMessageQueueById(client.getId());
if (messageQueue) {
let message;
// tslint:disable
while (message = messageQueue.readMessage()) {
messageHandler.handle(client, message);
}
realm.clearMessageQueue(client.getId());
}
app.emit("connection", client);
});
wss.on("message", (client, message) => {
app.emit("message", client, message);
messageHandler.handle(client, message);
});
wss.on("close", (client) => {
app.emit("disconnect", client);
});
wss.on("error", (error) => {
app.emit("error", error);
});
messagesExpire.startMessagesExpiration();
checkBrokenConnections.start();
};
function ExpressPeerServer(server, options) {
const app = express_1.default();
const newOptions = Object.assign(Object.assign({}, config_1.default), options);
if (newOptions.proxied) {
app.set("trust proxy", newOptions.proxied === "false" ? false : !!newOptions.proxied);
}
app.on("mount", () => {
if (!server) {
throw new Error("Server is not passed to constructor - " +
"can't start PeerServer");
}
init({ app, server, options: newOptions });
});
return app;
}
exports.ExpressPeerServer = ExpressPeerServer;
function PeerServer(options = {}, callback) {
const app = express_1.default();
const newOptions = Object.assign(Object.assign({}, config_1.default), options);
let path = newOptions.path;
const port = newOptions.port;
if (path[0] !== "/") {
path = "/" + path;
}
if (path[path.length - 1] !== "/") {
path += "/";
}
let server;
if (newOptions.ssl && newOptions.ssl.key && newOptions.ssl.cert) {
server = https_1.default.createServer(options.ssl, app);
// @ts-ignore
delete newOptions.ssl;
}
else {
server = http_1.default.createServer(app);
}
const peerjs = ExpressPeerServer(server, newOptions);
app.use(peerjs);
server.listen(port, () => { var _a; return (_a = callback) === null || _a === void 0 ? void 0 : _a(server); });
return peerjs;
}
exports.PeerServer = PeerServer;

2
dist/src/messageHandler/handler.js vendored Normal file
View File

@ -0,0 +1,2 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });

View File

@ -0,0 +1,9 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.HeartbeatHandler = (client) => {
if (client) {
const nowTime = new Date().getTime();
client.setLastPing(nowTime);
}
return true;
};

View File

@ -0,0 +1,6 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
var heartbeat_1 = require("./heartbeat");
exports.HeartbeatHandler = heartbeat_1.HeartbeatHandler;
var transmission_1 = require("./transmission");
exports.TransmissionHandler = transmission_1.TransmissionHandler;

View File

@ -0,0 +1,57 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const enums_1 = require("../../../enums");
exports.TransmissionHandler = ({ realm }) => {
const handle = (client, message) => {
const type = message.type;
const srcId = message.src;
const dstId = message.dst;
const destinationClient = realm.getClientById(dstId);
// User is connected!
if (destinationClient) {
const socket = destinationClient.getSocket();
try {
if (socket) {
const data = JSON.stringify(message);
socket.send(data);
}
else {
// Neither socket no res available. Peer dead?
throw new Error("Peer dead");
}
}
catch (e) {
// This happens when a peer disconnects without closing connections and
// the associated WebSocket has not closed.
// Tell other side to stop trying.
if (socket) {
socket.close();
}
else {
realm.removeClientById(destinationClient.getId());
}
handle(client, {
type: enums_1.MessageType.LEAVE,
src: dstId,
dst: srcId
});
}
}
else {
// Wait for this client to connect/reconnect (XHR) for important
// messages.
if (type !== enums_1.MessageType.LEAVE && type !== enums_1.MessageType.EXPIRE && dstId) {
realm.addMessageToQueue(dstId, message);
}
else if (type === enums_1.MessageType.LEAVE && !dstId) {
realm.removeClientById(srcId);
}
else {
// Unavailable destination specified with message LEAVE or EXPIRE
// Ignore
}
}
return true;
};
return handle;
};

31
dist/src/messageHandler/index.js vendored Normal file
View File

@ -0,0 +1,31 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const enums_1 = require("../enums");
const handlers_1 = require("./handlers");
const messageHandlers_1 = require("./messageHandlers");
class MessageHandler {
constructor(realm) {
this.messageHandlers = new messageHandlers_1.MessageHandlers();
const transmissionHandler = handlers_1.TransmissionHandler({ realm });
const heartbeatHandler = handlers_1.HeartbeatHandler;
const handleTransmission = (client, message) => {
return transmissionHandler(client, {
type: message.type,
src: message.src,
dst: message.dst,
payload: message.payload
});
};
const handleHeartbeat = (client, message) => heartbeatHandler(client, message);
this.messageHandlers.registerHandler(enums_1.MessageType.HEARTBEAT, handleHeartbeat);
this.messageHandlers.registerHandler(enums_1.MessageType.OFFER, handleTransmission);
this.messageHandlers.registerHandler(enums_1.MessageType.ANSWER, handleTransmission);
this.messageHandlers.registerHandler(enums_1.MessageType.CANDIDATE, handleTransmission);
this.messageHandlers.registerHandler(enums_1.MessageType.LEAVE, handleTransmission);
this.messageHandlers.registerHandler(enums_1.MessageType.EXPIRE, handleTransmission);
}
handle(client, message) {
return this.messageHandlers.handle(client, message);
}
}
exports.MessageHandler = MessageHandler;

View File

@ -0,0 +1,20 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
class MessageHandlers {
constructor() {
this.handlers = new Map();
}
registerHandler(messageType, handler) {
if (this.handlers.has(messageType))
return;
this.handlers.set(messageType, handler);
}
handle(client, message) {
const { type } = message;
const handler = this.handlers.get(type);
if (!handler)
return false;
return handler(client, message);
}
}
exports.MessageHandlers = MessageHandlers;

33
dist/src/models/client.js vendored Normal file
View File

@ -0,0 +1,33 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
class Client {
constructor({ id, token }) {
this.socket = null;
this.lastPing = new Date().getTime();
this.id = id;
this.token = token;
}
getId() {
return this.id;
}
getToken() {
return this.token;
}
getSocket() {
return this.socket;
}
setSocket(socket) {
this.socket = socket;
}
getLastPing() {
return this.lastPing;
}
setLastPing(lastPing) {
this.lastPing = lastPing;
}
send(data) {
var _a;
(_a = this.socket) === null || _a === void 0 ? void 0 : _a.send(JSON.stringify(data));
}
}
exports.Client = Client;

2
dist/src/models/message.js vendored Normal file
View File

@ -0,0 +1,2 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });

25
dist/src/models/messageQueue.js vendored Normal file
View File

@ -0,0 +1,25 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
class MessageQueue {
constructor() {
this.lastReadAt = new Date().getTime();
this.messages = [];
}
getLastReadAt() {
return this.lastReadAt;
}
addMessage(message) {
this.messages.push(message);
}
readMessage() {
if (this.messages.length > 0) {
this.lastReadAt = new Date().getTime();
return this.messages.shift();
}
return null;
}
getMessages() {
return this.messages;
}
}
exports.MessageQueue = MessageQueue;

52
dist/src/models/realm.js vendored Normal file
View File

@ -0,0 +1,52 @@
"use strict";
var __importDefault = (this && this.__importDefault) || function (mod) {
return (mod && mod.__esModule) ? mod : { "default": mod };
};
Object.defineProperty(exports, "__esModule", { value: true });
const v4_1 = __importDefault(require("uuid/v4"));
const messageQueue_1 = require("./messageQueue");
class Realm {
constructor() {
this.clients = new Map();
this.messageQueues = new Map();
}
getClientsIds() {
return [...this.clients.keys()];
}
getClientById(clientId) {
return this.clients.get(clientId);
}
getClientsIdsWithQueue() {
return [...this.messageQueues.keys()];
}
setClient(client, id) {
this.clients.set(id, client);
}
removeClientById(id) {
const client = this.getClientById(id);
if (!client)
return false;
this.clients.delete(id);
return true;
}
getMessageQueueById(id) {
return this.messageQueues.get(id);
}
addMessageToQueue(id, message) {
if (!this.getMessageQueueById(id)) {
this.messageQueues.set(id, new messageQueue_1.MessageQueue());
}
this.getMessageQueueById(id).addMessage(message);
}
clearMessageQueue(id) {
this.messageQueues.delete(id);
}
generateClientId() {
let clientId = v4_1.default();
while (this.getClientById(clientId)) {
clientId = v4_1.default();
}
return clientId;
}
}
exports.Realm = Realm;

View File

@ -0,0 +1,50 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const DEFAULT_CHECK_INTERVAL = 300;
class CheckBrokenConnections {
constructor({ realm, config, checkInterval = DEFAULT_CHECK_INTERVAL, onClose }) {
this.timeoutId = null;
this.realm = realm;
this.config = config;
this.onClose = onClose;
this.checkInterval = checkInterval;
}
start() {
if (this.timeoutId) {
clearTimeout(this.timeoutId);
}
this.timeoutId = setTimeout(() => {
this.checkConnections();
this.timeoutId = null;
this.start();
}, this.checkInterval);
}
stop() {
if (this.timeoutId) {
clearTimeout(this.timeoutId);
this.timeoutId = null;
}
}
checkConnections() {
var _a, _b, _c;
const clientsIds = this.realm.getClientsIds();
const now = new Date().getTime();
const { alive_timeout: aliveTimeout } = this.config;
for (const clientId of clientsIds) {
const client = this.realm.getClientById(clientId);
const timeSinceLastPing = now - client.getLastPing();
if (timeSinceLastPing < aliveTimeout)
continue;
try {
(_a = client.getSocket()) === null || _a === void 0 ? void 0 : _a.close();
}
finally {
this.realm.clearMessageQueue(clientId);
this.realm.removeClientById(clientId);
client.setSocket(null);
(_c = (_b = this).onClose) === null || _c === void 0 ? void 0 : _c.call(_b, client);
}
}
}
}
exports.CheckBrokenConnections = CheckBrokenConnections;

View File

@ -0,0 +1,53 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const enums_1 = require("../../enums");
class MessagesExpire {
constructor({ realm, config, messageHandler }) {
this.timeoutId = null;
this.realm = realm;
this.config = config;
this.messageHandler = messageHandler;
}
startMessagesExpiration() {
if (this.timeoutId) {
clearTimeout(this.timeoutId);
}
// Clean up outstanding messages
this.timeoutId = setTimeout(() => {
this.pruneOutstanding();
this.timeoutId = null;
this.startMessagesExpiration();
}, this.config.cleanup_out_msgs);
}
stopMessagesExpiration() {
if (this.timeoutId) {
clearTimeout(this.timeoutId);
this.timeoutId = null;
}
}
pruneOutstanding() {
const destinationClientsIds = this.realm.getClientsIdsWithQueue();
const now = new Date().getTime();
const maxDiff = this.config.expire_timeout;
const seen = {};
for (const destinationClientId of destinationClientsIds) {
const messageQueue = this.realm.getMessageQueueById(destinationClientId);
const lastReadDiff = now - messageQueue.getLastReadAt();
if (lastReadDiff < maxDiff)
continue;
const messages = messageQueue.getMessages();
for (const message of messages) {
if (!seen[message.src]) {
this.messageHandler.handle(undefined, {
type: enums_1.MessageType.EXPIRE,
src: message.dst,
dst: message.src
});
seen[message.src] = true;
}
}
this.realm.clearMessageQueue(destinationClientId);
}
}
}
exports.MessagesExpire = MessagesExpire;

View File

@ -0,0 +1,91 @@
"use strict";
var __importDefault = (this && this.__importDefault) || function (mod) {
return (mod && mod.__esModule) ? mod : { "default": mod };
};
Object.defineProperty(exports, "__esModule", { value: true });
const events_1 = __importDefault(require("events"));
const url_1 = __importDefault(require("url"));
const ws_1 = __importDefault(require("ws"));
const enums_1 = require("../../enums");
const client_1 = require("../../models/client");
class WebSocketServer extends events_1.default {
constructor({ server, realm, config }) {
super();
this.setMaxListeners(0);
this.realm = realm;
this.config = config;
const path = this.config.path;
this.path = path + (path[path.length - 1] !== "/" ? "/" : "") + "peerjs";
this.webSocketServer = new ws_1.default.Server({ path, server });
this.webSocketServer.on("connection", (socket, req) => this._onSocketConnection(socket, req));
this.webSocketServer.on("error", (error) => this._onSocketError(error));
}
_onSocketConnection(socket, req) {
const { query = {} } = url_1.default.parse(req.url, true);
const { id, token, key } = query;
if (!id || !token || !key) {
return this._sendErrorAndClose(socket, enums_1.Errors.INVALID_WS_PARAMETERS);
}
if (key !== this.config.key) {
return this._sendErrorAndClose(socket, enums_1.Errors.INVALID_KEY);
}
const client = this.realm.getClientById(id);
if (client) {
if (token !== client.getToken()) {
// ID-taken, invalid token
socket.send(JSON.stringify({
type: enums_1.MessageType.ID_TAKEN,
payload: { msg: "ID is taken" }
}));
return socket.close();
}
return this._configureWS(socket, client);
}
this._registerClient({ socket, id, token });
}
_onSocketError(error) {
// handle error
this.emit("error", error);
}
_registerClient({ socket, id, token }) {
// Check concurrent limit
const clientsCount = this.realm.getClientsIds().length;
if (clientsCount >= this.config.concurrent_limit) {
return this._sendErrorAndClose(socket, enums_1.Errors.CONNECTION_LIMIT_EXCEED);
}
const newClient = new client_1.Client({ id, token });
this.realm.setClient(newClient, id);
socket.send(JSON.stringify({ type: enums_1.MessageType.OPEN }));
this._configureWS(socket, newClient);
}
_configureWS(socket, client) {
client.setSocket(socket);
// Cleanup after a socket closes.
socket.on("close", () => {
if (client.getSocket() === socket) {
this.realm.removeClientById(client.getId());
this.emit("close", client);
}
});
// Handle messages from peers.
socket.on("message", (data) => {
try {
const message = JSON.parse(data);
message.src = client.getId();
this.emit("message", client, message);
}
catch (e) {
this.emit("error", e);
}
});
this.emit("connection", client);
}
_sendErrorAndClose(socket, msg) {
socket.send(JSON.stringify({
type: enums_1.MessageType.ERROR,
payload: { msg }
}));
socket.close();
}
}
exports.WebSocketServer = WebSocketServer;

View File

@ -0,0 +1,2 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });

3860
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@ -2,7 +2,7 @@
"name": "peer",
"version": "0.3.0",
"description": "PeerJS server component",
"main": "src/index.js",
"main": "dist/peerjs.server.min.js",
"bin": {
"peerjs": "./bin/peerjs"
},
@ -13,23 +13,46 @@
"author": "Michelle Bu, Eric Zhang",
"license": "MIT",
"scripts": {
"test": "eslint . && mocha \"test/**/*.js\"",
"start": "bin/peerjs --port ${PORT:=9000}"
"build": "tsc",
"clean": "rimraf ./dist",
"lint": "eslint --ext .js,.ts .",
"tsc": "tsc",
"prebuild": "npm run lint",
"test": "npm run lint && mocha -r ts-node/register \"test/**/*\"",
"start": "bin/peerjs --port ${PORT:=9000}",
"dev:start": "npm-run-all build start",
"dev": "nodemon --watch src -e ts --exec npm run dev:start"
},
"release": {
"branch": "master"
},
"dependencies": {
"body-parser": "^1.19.0",
"cors": "~2.8.4",
"express": "^4.17.1",
"optimist": "~0.6.1",
"uuid4": "^1.1.4",
"ws": "^7.1.2"
"@types/cors": "2.8.6",
"@types/express": "4.17.1",
"@types/ws": "6.0.4",
"body-parser": "1.19.0",
"cors": "2.8.4",
"express": "4.17.1",
"optimist": "0.6.1",
"uuid": "3.3.3",
"ws": "7.1.2"
},
"devDependencies": {
"@types/chai": "^4.1.7",
"@types/mocha": "^5.2.7",
"@types/node": "^10.14.16",
"@types/uuid": "3.4.6",
"@typescript-eslint/eslint-plugin": "^2.11.0",
"@typescript-eslint/parser": "^2.11.0",
"chai": "^4.2.0",
"eslint": "^6.2.1",
"mocha": "^6.2.0",
"semistandard": "^14.0.1",
"sinon": "^7.4.1"
"eslint": "^6.7.2",
"mocha": "^6.2.2",
"nodemon": "1.19.1",
"npm-run-all": "4.1.5",
"rimraf": "3.0.0",
"sinon": "7.5.0",
"ts-node": "8.5.4",
"typescript": "3.7.3"
},
"engines": {
"node": ">=10"

View File

@ -1,23 +0,0 @@
const express = require('express');
const cors = require('cors');
const bodyParser = require('body-parser');
const publicContent = require('../../app.json');
module.exports = ({ config, realm, messageHandler }) => {
const authMiddleware = require('./middleware/auth')({ config, realm });
const app = express.Router();
const jsonParser = bodyParser.json();
app.use(cors());
app.get('/', (req, res) => {
res.send(publicContent);
});
app.use('/:key', require('./v1/public')({ config, realm }));
app.use('/:key/:id/:token', authMiddleware, jsonParser, require('./v1/calls')({ realm, messageHandler }));
return app;
};

33
src/api/index.ts Normal file
View File

@ -0,0 +1,33 @@
import bodyParser from "body-parser";
import cors from "cors";
import express from "express";
import publicContent from "../../app.json";
import { IConfig } from "../config";
import { IMessageHandler } from "../messageHandler";
import { IRealm } from "../models/realm";
import { AuthMiddleware } from "./middleware/auth";
import CallsApi from "./v1/calls";
import PublicApi from "./v1/public";
export const Api = ({ config, realm, messageHandler }: {
config: IConfig,
realm: IRealm,
messageHandler: IMessageHandler
}): express.Router => {
const authMiddleware = new AuthMiddleware(config, realm);
const app = express.Router();
const jsonParser = bodyParser.json();
app.use(cors());
app.get("/", (_, res) => {
res.send(publicContent);
});
app.use("/:key", PublicApi({ config, realm }));
app.use("/:key/:id/:token", authMiddleware.handle, jsonParser, CallsApi({ realm, messageHandler }));
return app;
};

View File

@ -1,25 +0,0 @@
const { Errors } = require('../../../enums');
module.exports = ({ config, realm }) => (req, res, next) => {
const { id, token, key } = req.params;
if (key !== config.key) {
return res.status(401).send(Errors.INVALID_KEY);
}
if (!id) {
return res.sendStatus(401);
}
const client = realm.getClientById(id);
if (!client) {
return res.sendStatus(401);
}
if (client.getToken() && token !== client.getToken()) {
return res.status(401).send(Errors.INVALID_TOKEN);
}
next();
};

View File

@ -0,0 +1,35 @@
import express from "express";
import { IConfig } from "../../../config";
import { Errors } from "../../../enums";
import { IRealm } from "../../../models/realm";
import { IMiddleware } from "../middleware";
export class AuthMiddleware implements IMiddleware {
constructor(private readonly config: IConfig, private readonly realm: IRealm) { }
public handle(req: express.Request, res: express.Response, next: express.NextFunction): any {
const { id, token, key } = req.params;
if (key !== this.config.key) {
return res.status(401).send(Errors.INVALID_KEY);
}
if (!id) {
return res.sendStatus(401);
}
const client = this.realm.getClientById(id);
if (!client) {
return res.sendStatus(401);
}
if (client.getToken() && token !== client.getToken()) {
return res.status(401).send(Errors.INVALID_TOKEN);
}
next();
}
}

View File

@ -0,0 +1,5 @@
import express from "express";
export interface IMiddleware {
handle(req: express.Request, res: express.Response, next: express.NextFunction): any;
}

View File

@ -1,36 +0,0 @@
const express = require('express');
module.exports = ({ realm, messageHandler }) => {
const app = express.Router();
const handle = (req, res, next) => {
const { id } = req.params;
if (!id) return next();
const client = realm.getClientById(id);
const { type, dst, payload } = req.body;
const message = {
type,
src: id,
dst,
payload
};
messageHandler(client, message);
res.sendStatus(200);
};
app.post('/offer', handle);
app.post('/candidate', handle);
app.post('/answer', handle);
app.post('/leave', handle);
return app;
};

40
src/api/v1/calls/index.ts Normal file
View File

@ -0,0 +1,40 @@
import express from "express";
import { IMessageHandler } from "../../../messageHandler";
import { IMessage } from "../../../models/message";
import { IRealm } from "../../../models/realm";
export default ({ realm, messageHandler }: { realm: IRealm, messageHandler: IMessageHandler; }): express.Router => {
const app = express.Router();
const handle = (req: express.Request, res: express.Response, next: express.NextFunction): any => {
const { id } = req.params;
if (!id) return next();
const client = realm.getClientById(id);
if (!client) {
throw new Error(`client not found:${id}`);
}
const { type, dst, payload } = req.body;
const message: IMessage = {
type,
src: id,
dst,
payload
};
messageHandler.handle(client, message);
res.sendStatus(200);
};
app.post("/offer", handle);
app.post("/candidate", handle);
app.post("/answer", handle);
app.post("/leave", handle);
return app;
};

View File

@ -1,16 +1,20 @@
const express = require('express');
import express from "express";
import { IConfig } from "../../../config";
import { IRealm } from "../../../models/realm";
module.exports = ({ config, realm }) => {
export default ({ config, realm }: {
config: IConfig, realm: IRealm
}): express.Router => {
const app = express.Router();
// Retrieve guaranteed random ID.
app.get('/id', (req, res) => {
res.contentType = 'text/html';
app.get("/id", (_, res: express.Response) => {
res.contentType("html");
res.send(realm.generateClientId());
});
// Get a list of all peers for a key, enabled by the `allowDiscovery` flag.
app.get('/peers', (req, res) => {
app.get("/peers", (_, res: express.Response) => {
if (config.allow_discovery) {
const clientsIds = realm.getClientsIds();

33
src/config/index.ts Normal file
View File

@ -0,0 +1,33 @@
export interface IConfig {
readonly port: number;
readonly expire_timeout: number;
readonly alive_timeout: number;
readonly key: string;
readonly path: string;
readonly concurrent_limit: number;
readonly allow_discovery: boolean;
readonly proxied: boolean | string;
readonly cleanup_out_msgs: number;
readonly ssl?: {
key: string;
cert: string;
};
}
const defaultConfig: IConfig = {
port: 9000,
expire_timeout: 5000,
alive_timeout: 60000,
key: "peerjs",
path: "/myapp",
concurrent_limit: 5000,
allow_discovery: false,
proxied: false,
cleanup_out_msgs: 1000,
ssl: {
key: "",
cert: ""
}
};
export default defaultConfig;

View File

@ -1,18 +0,0 @@
module.exports.Errors = {
INVALID_KEY: 'Invalid key provided',
INVALID_TOKEN: 'Invalid token provided',
INVALID_WS_PARAMETERS: 'No id, token, or key supplied to websocket server',
CONNECTION_LIMIT_EXCEED: 'Server has reached its concurrent user limit'
};
module.exports.MessageType = {
OPEN: 'OPEN',
LEAVE: 'LEAVE',
CANDIDATE: 'CANDIDATE',
OFFER: 'OFFER',
ANSWER: 'ANSWER',
EXPIRE: 'EXPIRE',
HEARTBEAT: 'HEARTBEAT',
ID_TAKEN: 'ID-TAKEN',
ERROR: 'ERROR'
};

18
src/enums.ts Normal file
View File

@ -0,0 +1,18 @@
export enum Errors {
INVALID_KEY = "Invalid key provided",
INVALID_TOKEN = "Invalid token provided",
INVALID_WS_PARAMETERS = "No id, token, or key supplied to websocket server",
CONNECTION_LIMIT_EXCEED = "Server has reached its concurrent user limit"
}
export enum MessageType {
OPEN = "OPEN",
LEAVE = "LEAVE",
CANDIDATE = "CANDIDATE",
OFFER = "OFFER",
ANSWER = "ANSWER",
EXPIRE = "EXPIRE",
HEARTBEAT = "HEARTBEAT",
ID_TAKEN = "ID-TAKEN",
ERROR = "ERROR"
}

View File

@ -1,134 +0,0 @@
const express = require('express');
const http = require('http');
const https = require('https');
const defaultConfig = require('../config');
const WebSocketServer = require('./services/webSocketServer');
const Realm = require('./models/realm');
const init = ({ app, server, options }) => {
const config = options;
const realm = new Realm();
const messageHandler = require('./messageHandler')({ realm });
const api = require('./api')({ config, realm, messageHandler });
const { startMessagesExpiration } = require('./services/messagesExpire')({ realm, config, messageHandler });
const checkBrokenConnections = require('./services/checkBrokenConnections')({
realm, config, onClose: (client) => {
app.emit('disconnect', client);
}
});
app.use(options.path, api);
const wss = new WebSocketServer({
server,
realm,
config: {
...config,
}
});
wss.on('connection', client => {
const messageQueue = realm.getMessageQueueById(client.getId());
if (messageQueue) {
let message;
// eslint-disable-next-line no-cond-assign
while (message = messageQueue.readMessage()) {
messageHandler(client, message);
}
realm.clearMessageQueue(client.getId());
}
app.emit('connection', client);
});
wss.on('message', (client, message) => {
app.emit('message', client, message);
messageHandler(client, message);
});
wss.on('close', client => {
app.emit('disconnect', client);
});
wss.on('error', error => {
app.emit('error', error);
});
startMessagesExpiration();
checkBrokenConnections.start();
};
function ExpressPeerServer(server, options) {
const app = express();
options = {
...defaultConfig,
...options
};
if (options.proxied) {
app.set('trust proxy', options.proxied === 'false' ? false : options.proxied);
}
app.on('mount', () => {
if (!server) {
throw new Error('Server is not passed to constructor - ' +
'can\'t start PeerServer');
}
init({ app, server, options });
});
return app;
}
function PeerServer(options = {}, callback) {
const app = express();
options = {
...defaultConfig,
...options
};
let path = options.path;
const port = options.port;
if (path[0] !== '/') {
path = '/' + path;
}
if (path[path.length - 1] !== '/') {
path += '/';
}
let server;
if (options.ssl && options.ssl.key && options.ssl.cert) {
server = https.createServer(options.ssl, app);
delete options.ssl;
} else {
server = http.createServer(app);
}
const peerjs = ExpressPeerServer(server, options);
app.use(peerjs);
if (callback) {
server.listen(port, () => {
callback(server);
});
} else {
server.listen(port);
}
return peerjs;
}
exports = module.exports = {
ExpressPeerServer: ExpressPeerServer,
PeerServer: PeerServer
};

146
src/index.ts Normal file
View File

@ -0,0 +1,146 @@
import { IRealm } from "./models/realm";
import express from "express";
import http from "http";
import https from "https";
import { Server } from "net";
import { Api } from "./api";
import defaultConfig, { IConfig } from "./config";
import { MessageHandler } from "./messageHandler";
import { IClient } from "./models/client";
import { IMessage } from "./models/message";
import { Realm } from "./models/realm";
import { CheckBrokenConnections } from "./services/checkBrokenConnections";
import { IMessagesExpire, MessagesExpire } from "./services/messagesExpire";
import { IWebSocketServer, WebSocketServer } from "./services/webSocketServer";
const init = ({ app, server, options }: {
app: express.Express,
server: Server,
options: IConfig;
}) => {
const config = options;
const realm: IRealm = new Realm();
const messageHandler = new MessageHandler(realm);
const api = Api({ config, realm, messageHandler });
const messagesExpire: IMessagesExpire = new MessagesExpire({ realm, config, messageHandler });
const checkBrokenConnections = new CheckBrokenConnections({
realm,
config,
onClose: (client: IClient) => {
app.emit("disconnect", client);
}
});
app.use(options.path, api);
const wss: IWebSocketServer = new WebSocketServer({
server,
realm,
config
});
wss.on("connection", (client: IClient) => {
const messageQueue = realm.getMessageQueueById(client.getId());
if (messageQueue) {
let message: IMessage | null;
// tslint:disable
while (message = messageQueue.readMessage()) {
messageHandler.handle(client, message);
}
realm.clearMessageQueue(client.getId());
}
app.emit("connection", client);
});
wss.on("message", (client: IClient, message: IMessage) => {
app.emit("message", client, message);
messageHandler.handle(client, message);
});
wss.on("close", (client: IClient) => {
app.emit("disconnect", client);
});
wss.on("error", (error: Error) => {
app.emit("error", error);
});
messagesExpire.startMessagesExpiration();
checkBrokenConnections.start();
};
function ExpressPeerServer(server: Server, options?: IConfig) {
const app = express();
const newOptions: IConfig = {
...defaultConfig,
...options
};
if (newOptions.proxied) {
app.set("trust proxy", newOptions.proxied === "false" ? false : !!newOptions.proxied);
}
app.on("mount", () => {
if (!server) {
throw new Error("Server is not passed to constructor - " +
"can't start PeerServer");
}
init({ app, server, options: newOptions });
});
return app;
}
type Optional<T> = {
[P in keyof T]?: (T[P] | undefined);
};
function PeerServer(options: Optional<IConfig> = {}, callback?: (server: Server) => void) {
const app = express();
const newOptions: IConfig = {
...defaultConfig,
...options
};
let path = newOptions.path;
const port = newOptions.port;
if (path[0] !== "/") {
path = "/" + path;
}
if (path[path.length - 1] !== "/") {
path += "/";
}
let server: Server;
if (newOptions.ssl && newOptions.ssl.key && newOptions.ssl.cert) {
server = https.createServer(options.ssl!, app);
// @ts-ignore
delete newOptions.ssl;
} else {
server = http.createServer(app);
}
const peerjs = ExpressPeerServer(server, newOptions);
app.use(peerjs);
server.listen(port, () => callback?.(server));
return peerjs;
}
export {
ExpressPeerServer,
PeerServer
};

View File

@ -0,0 +1,4 @@
import { IClient } from "../models/client";
import { IMessage } from "../models/message";
export type Handler = (client: IClient | undefined, message: IMessage) => boolean;

View File

@ -1,4 +0,0 @@
module.exports = (client) => {
const nowTime = new Date().getTime();
client.setLastPing(nowTime);
};

View File

@ -0,0 +1,10 @@
import { IClient } from "../../../models/client";
export const HeartbeatHandler = (client: IClient | undefined): boolean => {
if (client) {
const nowTime = new Date().getTime();
client.setLastPing(nowTime);
}
return true;
};

View File

@ -0,0 +1,2 @@
export { HeartbeatHandler } from "./heartbeat";
export { TransmissionHandler } from "./transmission";

View File

@ -1,49 +0,0 @@
const { MessageType } = require('../../../enums');
module.exports = ({ realm }) => (client, message) => {
const type = message.type;
const srcId = message.src;
const dstId = message.dst;
const destinationClient = realm.getClientById(dstId);
// User is connected!
if (destinationClient) {
try {
if (destinationClient.socket) {
const data = JSON.stringify(message);
destinationClient.socket.send(data);
} else {
// Neither socket no res available. Peer dead?
throw new Error('Peer dead');
}
} catch (e) {
// This happens when a peer disconnects without closing connections and
// the associated WebSocket has not closed.
// Tell other side to stop trying.
if (destinationClient.socket) {
destinationClient.socket.close();
} else {
realm.removeClientById(destinationClient.getId());
}
module.exports({ realm })(client, {
type: MessageType.LEAVE,
src: dstId,
dst: srcId
});
}
} else {
// Wait for this client to connect/reconnect (XHR) for important
// messages.
if (type !== MessageType.LEAVE && type !== MessageType.EXPIRE && dstId) {
realm.addMessageToQueue(dstId, message);
} else if (type === MessageType.LEAVE && !dstId) {
realm.removeClientById(srcId);
} else {
// Unavailable destination specified with message LEAVE or EXPIRE
// Ignore
}
}
};

View File

@ -0,0 +1,59 @@
import { MessageType } from "../../../enums";
import { IClient } from "../../../models/client";
import { IMessage } from "../../../models/message";
import { IRealm } from "../../../models/realm";
export const TransmissionHandler = ({ realm }: { realm: IRealm; }): (client: IClient | undefined, message: IMessage) => boolean => {
const handle = (client: IClient | undefined, message: IMessage) => {
const type = message.type;
const srcId = message.src;
const dstId = message.dst;
const destinationClient = realm.getClientById(dstId);
// User is connected!
if (destinationClient) {
const socket = destinationClient.getSocket();
try {
if (socket) {
const data = JSON.stringify(message);
socket.send(data);
} else {
// Neither socket no res available. Peer dead?
throw new Error("Peer dead");
}
} catch (e) {
// This happens when a peer disconnects without closing connections and
// the associated WebSocket has not closed.
// Tell other side to stop trying.
if (socket) {
socket.close();
} else {
realm.removeClientById(destinationClient.getId());
}
handle(client, {
type: MessageType.LEAVE,
src: dstId,
dst: srcId
});
}
} else {
// Wait for this client to connect/reconnect (XHR) for important
// messages.
if (type !== MessageType.LEAVE && type !== MessageType.EXPIRE && dstId) {
realm.addMessageToQueue(dstId, message);
} else if (type === MessageType.LEAVE && !dstId) {
realm.removeClientById(srcId);
} else {
// Unavailable destination specified with message LEAVE or EXPIRE
// Ignore
}
}
return true;
};
return handle;
};

View File

@ -1,49 +0,0 @@
const { MessageType } = require('../enums');
class MessageHandlers {
constructor() {
this.handlers = {};
}
registerHandler(messageType, handler) {
this.handlers[messageType] = handler;
}
handle(client, message) {
const { type } = message;
const handler = this.handlers[type];
if (!handler) {
return;
}
handler(client, message);
}
}
module.exports = ({ realm }) => {
const transmissionHandler = require('./handlers/transmission')({ realm });
const heartbeatHandler = require('./handlers/heartbeat');
const messageHandlers = new MessageHandlers();
const handleTransmission = (client, message) => {
transmissionHandler(client, {
type: message.type,
src: message.src,
dst: message.dst,
payload: message.payload
});
};
const handleHeartbeat = (client) => heartbeatHandler(client);
messageHandlers.registerHandler(MessageType.HEARTBEAT, handleHeartbeat);
messageHandlers.registerHandler(MessageType.OFFER, handleTransmission);
messageHandlers.registerHandler(MessageType.ANSWER, handleTransmission);
messageHandlers.registerHandler(MessageType.CANDIDATE, handleTransmission);
messageHandlers.registerHandler(MessageType.LEAVE, handleTransmission);
messageHandlers.registerHandler(MessageType.EXPIRE, handleTransmission);
return (client, message) => messageHandlers.handle(client, message);
};

View File

@ -0,0 +1,42 @@
import { MessageType } from "../enums";
import { IClient } from "../models/client";
import { IMessage } from "../models/message";
import { IRealm } from "../models/realm";
import { Handler } from "./handler";
import { HeartbeatHandler, TransmissionHandler } from "./handlers";
import { IMessageHandlers, MessageHandlers } from "./messageHandlers";
export interface IMessageHandler {
handle(client: IClient | undefined, message: IMessage): boolean;
}
export class MessageHandler implements IMessageHandler {
private readonly messageHandlers: IMessageHandlers = new MessageHandlers();
constructor(realm: IRealm) {
const transmissionHandler: Handler = TransmissionHandler({ realm });
const heartbeatHandler: Handler = HeartbeatHandler;
const handleTransmission: Handler = (client: IClient | undefined, message: IMessage): boolean => {
return transmissionHandler(client, {
type: message.type,
src: message.src,
dst: message.dst,
payload: message.payload
});
};
const handleHeartbeat = (client: IClient | undefined, message: IMessage) => heartbeatHandler(client, message);
this.messageHandlers.registerHandler(MessageType.HEARTBEAT, handleHeartbeat);
this.messageHandlers.registerHandler(MessageType.OFFER, handleTransmission);
this.messageHandlers.registerHandler(MessageType.ANSWER, handleTransmission);
this.messageHandlers.registerHandler(MessageType.CANDIDATE, handleTransmission);
this.messageHandlers.registerHandler(MessageType.LEAVE, handleTransmission);
this.messageHandlers.registerHandler(MessageType.EXPIRE, handleTransmission);
}
public handle(client: IClient | undefined, message: IMessage): boolean {
return this.messageHandlers.handle(client, message);
}
}

View File

@ -0,0 +1,29 @@
import { MessageType } from "../enums";
import { IClient } from "../models/client";
import { IMessage } from "../models/message";
import { Handler } from "./handler";
export interface IMessageHandlers {
registerHandler(messageType: MessageType, handler: Handler): void;
handle(client: IClient | undefined, message: IMessage): boolean;
}
export class MessageHandlers implements IMessageHandlers {
private readonly handlers: Map<MessageType, Handler> = new Map();
public registerHandler(messageType: MessageType, handler: Handler): void {
if (this.handlers.has(messageType)) return;
this.handlers.set(messageType, handler);
}
public handle(client: IClient | undefined, message: IMessage): boolean {
const { type } = message;
const handler = this.handlers.get(type);
if (!handler) return false;
return handler(client, message);
}
}

View File

@ -1,38 +0,0 @@
class Client {
constructor({ id, token }) {
this.id = id;
this.token = token;
this.socket = null;
this.lastPing = new Date().getTime();
}
getId() {
return this.id;
}
getToken() {
return this.token;
}
getSocket() {
return this.socket;
}
setSocket(socket) {
this.socket = socket;
}
getLastPing() {
return this.lastPing;
}
setLastPing(lastPing) {
this.lastPing = lastPing;
}
send(data) {
this.socket.send(JSON.stringify(data));
}
}
module.exports = Client;

57
src/models/client.ts Normal file
View File

@ -0,0 +1,57 @@
import { MyWebSocket } from "../services/webSocketServer/webSocket";
export interface IClient {
getId(): string;
getToken(): string;
getSocket(): MyWebSocket | null;
setSocket(socket: MyWebSocket | null): void;
getLastPing(): number;
setLastPing(lastPing: number): void;
send(data: any): void;
}
export class Client implements IClient {
private readonly id: string;
private readonly token: string;
private socket: MyWebSocket | null = null;
private lastPing: number = new Date().getTime();
constructor({ id, token }: { id: string, token: string; }) {
this.id = id;
this.token = token;
}
public getId(): string {
return this.id;
}
public getToken(): string {
return this.token;
}
public getSocket(): MyWebSocket | null {
return this.socket;
}
public setSocket(socket: MyWebSocket | null): void {
this.socket = socket;
}
public getLastPing(): number {
return this.lastPing;
}
public setLastPing(lastPing: number): void {
this.lastPing = lastPing;
}
public send(data: any): void {
this.socket?.send(JSON.stringify(data));
}
}

8
src/models/message.ts Normal file
View File

@ -0,0 +1,8 @@
import { MessageType } from "../enums";
export interface IMessage {
readonly type: MessageType;
readonly src: string;
readonly dst: string;
readonly payload?: any;
}

View File

@ -1,30 +0,0 @@
class MessageQueue {
constructor (id) {
this._id = id;
this._lastReadAt = new Date().getTime();
this._messages = [];
}
getLastReadAt () {
return this._lastReadAt;
}
addMessage (message) {
this._messages.push(message);
}
readMessage () {
if (this._messages.length > 0) {
this._lastReadAt = new Date().getTime();
return this._messages.shift();
}
return null;
}
getMessages () {
return this._messages;
}
}
module.exports = MessageQueue;

View File

@ -0,0 +1,37 @@
import { IMessage } from "./message";
export interface IMessageQueue {
getLastReadAt(): number;
addMessage(message: IMessage): void;
readMessage(): IMessage | null;
getMessages(): IMessage[];
}
export class MessageQueue implements IMessageQueue {
private lastReadAt: number = new Date().getTime();
private readonly messages: IMessage[] = [];
public getLastReadAt(): number {
return this.lastReadAt;
}
public addMessage(message: IMessage): void {
this.messages.push(message);
}
public readMessage(): IMessage | null {
if (this.messages.length > 0) {
this.lastReadAt = new Date().getTime();
return this.messages.shift()!;
}
return null;
}
public getMessages(): IMessage[] {
return this.messages;
}
}

View File

@ -1,57 +0,0 @@
const uuidv4 = require('uuid/v4');
const MessageQueue = require('./messageQueue');
class Realm {
constructor () {
this._clients = new Map();
this._messageQueues = new Map();
}
getClientsIds () {
return [...this._clients.keys()];
}
getClientById (clientId) {
return this._clients.get(clientId);
}
setClient (client, id) {
this._clients.set(id, client);
}
removeClientById (id) {
const client = this.getClientById(id);
if (!client) return false;
this._clients.delete(id);
}
getMessageQueueById (id) {
return this._messageQueues.get(id);
}
addMessageToQueue (id, message) {
if (!this.getMessageQueueById(id)) {
this._messageQueues.set(id, new MessageQueue(id));
}
this.getMessageQueueById(id).addMessage(message);
}
clearMessageQueue (id) {
this._messageQueues.delete(id);
}
generateClientId () {
let clientId = uuidv4();
while (this.getClientById(clientId)) {
clientId = uuidv4();
}
return clientId;
}
}
module.exports = Realm;

81
src/models/realm.ts Normal file
View File

@ -0,0 +1,81 @@
import uuidv4 from "uuid/v4";
import { IClient } from "./client";
import { IMessage } from "./message";
import { IMessageQueue, MessageQueue } from "./messageQueue";
export interface IRealm {
getClientsIds(): string[];
getClientById(clientId: string): IClient | undefined;
getClientsIdsWithQueue(): string[];
setClient(client: IClient, id: string): void;
removeClientById(id: string): boolean;
getMessageQueueById(id: string): IMessageQueue | undefined;
addMessageToQueue(id: string, message: IMessage): void;
clearMessageQueue(id: string): void;
generateClientId(): string;
}
export class Realm implements IRealm {
private readonly clients: Map<string, IClient> = new Map();
private readonly messageQueues: Map<string, IMessageQueue> = new Map();
public getClientsIds(): string[] {
return [...this.clients.keys()];
}
public getClientById(clientId: string): IClient | undefined {
return this.clients.get(clientId);
}
public getClientsIdsWithQueue(): string[] {
return [...this.messageQueues.keys()];
}
public setClient(client: IClient, id: string): void {
this.clients.set(id, client);
}
public removeClientById(id: string): boolean {
const client = this.getClientById(id);
if (!client) return false;
this.clients.delete(id);
return true;
}
public getMessageQueueById(id: string): IMessageQueue | undefined {
return this.messageQueues.get(id);
}
public addMessageToQueue(id: string, message: IMessage): void {
if (!this.getMessageQueueById(id)) {
this.messageQueues.set(id, new MessageQueue());
}
this.getMessageQueueById(id)!.addMessage(message);
}
public clearMessageQueue(id: string): void {
this.messageQueues.delete(id);
}
public generateClientId(): string {
let clientId = uuidv4();
while (this.getClientById(clientId)) {
clientId = uuidv4();
}
return clientId;
}
}

View File

@ -1,57 +0,0 @@
const DEFAULT_CHECK_INTERVAL = 300;
module.exports = ({ realm, config, checkInterval = DEFAULT_CHECK_INTERVAL, onClose = () => { } }) => {
const checkConnections = () => {
const clientsIds = realm.getClientsIds();
const now = new Date().getTime();
const aliveTimeout = config.alive_timeout;
for (const clientId of clientsIds) {
const client = realm.getClientById(clientId);
const timeSinceLastPing = now - client.getLastPing();
if (timeSinceLastPing < aliveTimeout) continue;
try {
client.getSocket().close();
// eslint-disable-next-line no-empty
} catch (e) { } finally {
realm.clearMessageQueue(clientId);
realm.removeClientById(clientId);
client.setSocket(null);
if (onClose) onClose(client);
}
}
};
let timeoutId;
const start = () => {
if (timeoutId) {
clearTimeout(timeoutId);
}
timeoutId = setTimeout(() => {
checkConnections();
timeoutId = null;
start();
}, checkInterval);
};
const stop = () => {
if (timeoutId) {
clearTimeout(timeoutId);
timeoutId = null;
}
};
return {
start,
stop,
CHECK_INTERVAL: checkInterval
};
};

View File

@ -0,0 +1,74 @@
import { IConfig } from "../../config";
import { IClient } from "../../models/client";
import { IRealm } from "../../models/realm";
const DEFAULT_CHECK_INTERVAL = 300;
type CustomConfig = Pick<IConfig, 'alive_timeout'>;
export class CheckBrokenConnections {
public readonly checkInterval: number;
private timeoutId: NodeJS.Timeout | null = null;
private readonly realm: IRealm;
private readonly config: CustomConfig;
private readonly onClose?: (client: IClient) => void;
constructor({ realm, config, checkInterval = DEFAULT_CHECK_INTERVAL, onClose }: {
realm: IRealm,
config: CustomConfig,
checkInterval?: number,
onClose?: (client: IClient) => void;
}) {
this.realm = realm;
this.config = config;
this.onClose = onClose;
this.checkInterval = checkInterval;
}
public start(): void {
if (this.timeoutId) {
clearTimeout(this.timeoutId);
}
this.timeoutId = setTimeout(() => {
this.checkConnections();
this.timeoutId = null;
this.start();
}, this.checkInterval);
}
public stop(): void {
if (this.timeoutId) {
clearTimeout(this.timeoutId);
this.timeoutId = null;
}
}
private checkConnections(): void {
const clientsIds = this.realm.getClientsIds();
const now = new Date().getTime();
const { alive_timeout: aliveTimeout } = this.config;
for (const clientId of clientsIds) {
const client = this.realm.getClientById(clientId)!;
const timeSinceLastPing = now - client.getLastPing();
if (timeSinceLastPing < aliveTimeout) continue;
try {
client.getSocket()?.close();
} finally {
this.realm.clearMessageQueue(clientId);
this.realm.removeClientById(clientId);
client.setSocket(null);
this.onClose?.(client);
}
}
}
}

View File

@ -1,63 +0,0 @@
const { MessageType } = require('../../enums');
module.exports = ({ realm, config, messageHandler }) => {
const pruneOutstanding = () => {
const destinationClientsIds = realm._messageQueues.keys();
const now = new Date().getTime();
const maxDiff = config.expire_timeout;
const seen = {};
for (const destinationClientId of destinationClientsIds) {
const messageQueue = realm.getMessageQueueById(destinationClientId);
const lastReadDiff = now - messageQueue.getLastReadAt();
if (lastReadDiff < maxDiff) continue;
const messages = messageQueue.getMessages();
for (const message of messages) {
if (!seen[message.src]) {
messageHandler(null, {
type: MessageType.EXPIRE,
src: message.dst,
dst: message.src
});
seen[message.src] = true;
}
}
realm.clearMessageQueue(destinationClientId);
}
};
let timeoutId;
const startMessagesExpiration = () => {
if (timeoutId) {
clearTimeout(timeoutId);
}
// Clean up outstanding messages
timeoutId = setTimeout(() => {
pruneOutstanding();
timeoutId = null;
startMessagesExpiration();
}, config.cleanup_out_msgs);
};
const stopMessagesExpiration = () => {
if (timeoutId) {
clearTimeout(timeoutId);
timeoutId = null;
}
};
return {
startMessagesExpiration,
stopMessagesExpiration
};
};

View File

@ -0,0 +1,81 @@
import { IConfig } from "../../config";
import { MessageType } from "../../enums";
import { IMessageHandler } from "../../messageHandler";
import { IRealm } from "../../models/realm";
export interface IMessagesExpire {
startMessagesExpiration(): void;
stopMessagesExpiration(): void;
}
export class MessagesExpire implements IMessagesExpire {
private readonly realm: IRealm;
private readonly config: IConfig;
private readonly messageHandler: IMessageHandler;
private timeoutId: NodeJS.Timeout | null = null;
constructor({ realm, config, messageHandler }: {
realm: IRealm;
config: IConfig;
messageHandler: IMessageHandler;
}) {
this.realm = realm;
this.config = config;
this.messageHandler = messageHandler;
}
public startMessagesExpiration(): void {
if (this.timeoutId) {
clearTimeout(this.timeoutId);
}
// Clean up outstanding messages
this.timeoutId = setTimeout(() => {
this.pruneOutstanding();
this.timeoutId = null;
this.startMessagesExpiration();
}, this.config.cleanup_out_msgs);
}
public stopMessagesExpiration(): void {
if (this.timeoutId) {
clearTimeout(this.timeoutId);
this.timeoutId = null;
}
}
private pruneOutstanding(): void {
const destinationClientsIds = this.realm.getClientsIdsWithQueue();
const now = new Date().getTime();
const maxDiff = this.config.expire_timeout;
const seen: Record<string, boolean> = {};
for (const destinationClientId of destinationClientsIds) {
const messageQueue = this.realm.getMessageQueueById(destinationClientId)!;
const lastReadDiff = now - messageQueue.getLastReadAt();
if (lastReadDiff < maxDiff) continue;
const messages = messageQueue.getMessages();
for (const message of messages) {
if (!seen[message.src]) {
this.messageHandler.handle(undefined, {
type: MessageType.EXPIRE,
src: message.dst,
dst: message.src
});
seen[message.src] = true;
}
}
this.realm.clearMessageQueue(destinationClientId);
}
}
}

View File

@ -1,114 +0,0 @@
const WSS = require('ws').Server;
const url = require('url');
const EventEmitter = require('events');
const { MessageType, Errors } = require('../../enums');
const Client = require('../../models/client');
class WebSocketServer extends EventEmitter {
constructor({ server, realm, config }) {
super();
this.setMaxListeners(0);
this.realm = realm;
this.config = config;
let path = this.config.path;
path = path + (path[path.length - 1] !== '/' ? '/' : '') + 'peerjs';
this._wss = new WSS({ path, server });
this._wss.on('connection', (socket, req) => this._onSocketConnection(socket, req));
this._wss.on('error', (error) => this._onSocketError(error));
}
_onSocketConnection(socket, req) {
const { query = {} } = url.parse(req.url, true);
const { id, token, key } = query;
if (!id || !token || !key) {
return this._sendErrorAndClose(socket, Errors.INVALID_WS_PARAMETERS);
}
if (key !== this.config.key) {
return this._sendErrorAndClose(socket, Errors.INVALID_KEY);
}
const client = this.realm.getClientById(id);
if (client) {
if (token !== client.getToken()) {
// ID-taken, invalid token
socket.send(JSON.stringify({
type: MessageType.ID_TAKEN,
payload: { msg: 'ID is taken' }
}));
return socket.close();
}
return this._configureWS(socket, client);
}
this._registerClient({ socket, id, token });
}
_onSocketError(error) {
// handle error
this.emit('error', error);
}
_registerClient({ socket, id, token }) {
// Check concurrent limit
const clientsCount = this.realm.getClientsIds().length;
if (clientsCount >= this.config.concurrent_limit) {
return this._sendErrorAndClose(socket, Errors.CONNECTION_LIMIT_EXCEED);
}
const newClient = new Client({ id, token });
this.realm.setClient(newClient, id);
socket.send(JSON.stringify({ type: MessageType.OPEN }));
this._configureWS(socket, newClient);
}
_configureWS(socket, client) {
client.setSocket(socket);
// Cleanup after a socket closes.
socket.on('close', () => {
if (client.socket === socket) {
this.realm.removeClientById(client.getId());
this.emit('close', client);
}
});
// Handle messages from peers.
socket.on('message', (data) => {
try {
const message = JSON.parse(data);
message.src = client.getId();
this.emit('message', client, message);
} catch (e) {
this.emit('error', e);
}
});
this.emit('connection', client);
}
_sendErrorAndClose(socket, msg) {
socket.send(
JSON.stringify({
type: MessageType.ERROR,
payload: { msg }
})
);
socket.close();
}
}
module.exports = WebSocketServer;

View File

@ -0,0 +1,137 @@
import EventEmitter from "events";
import { IncomingMessage } from "http";
import url from "url";
import WebSocketLib from "ws";
import { IConfig } from "../../config";
import { Errors, MessageType } from "../../enums";
import { Client, IClient } from "../../models/client";
import { IRealm } from "../../models/realm";
import { MyWebSocket } from "./webSocket";
export interface IWebSocketServer extends EventEmitter {
readonly path: string;
}
interface IAuthParams {
id?: string;
token?: string;
key?: string;
}
export class WebSocketServer extends EventEmitter implements IWebSocketServer {
public readonly path: string;
private readonly realm: IRealm;
private readonly config: IConfig;
private readonly webSocketServer: WebSocketLib.Server;
constructor({ server, realm, config }: { server: any, realm: IRealm, config: IConfig; }) {
super();
this.setMaxListeners(0);
this.realm = realm;
this.config = config;
const path = this.config.path;
this.path = path + (path[path.length - 1] !== "/" ? "/" : "") + "peerjs";
this.webSocketServer = new WebSocketLib.Server({ path, server });
this.webSocketServer.on("connection", (socket: MyWebSocket, req) => this._onSocketConnection(socket, req));
this.webSocketServer.on("error", (error: Error) => this._onSocketError(error));
}
private _onSocketConnection(socket: MyWebSocket, req: IncomingMessage): void {
const { query = {} } = url.parse(req.url!, true);
const { id, token, key }: IAuthParams = query;
if (!id || !token || !key) {
return this._sendErrorAndClose(socket, Errors.INVALID_WS_PARAMETERS);
}
if (key !== this.config.key) {
return this._sendErrorAndClose(socket, Errors.INVALID_KEY);
}
const client = this.realm.getClientById(id);
if (client) {
if (token !== client.getToken()) {
// ID-taken, invalid token
socket.send(JSON.stringify({
type: MessageType.ID_TAKEN,
payload: { msg: "ID is taken" }
}));
return socket.close();
}
return this._configureWS(socket, client);
}
this._registerClient({ socket, id, token });
}
private _onSocketError(error: Error): void {
// handle error
this.emit("error", error);
}
private _registerClient({ socket, id, token }:
{
socket: MyWebSocket;
id: string;
token: string;
}): void {
// Check concurrent limit
const clientsCount = this.realm.getClientsIds().length;
if (clientsCount >= this.config.concurrent_limit) {
return this._sendErrorAndClose(socket, Errors.CONNECTION_LIMIT_EXCEED);
}
const newClient: IClient = new Client({ id, token });
this.realm.setClient(newClient, id);
socket.send(JSON.stringify({ type: MessageType.OPEN }));
this._configureWS(socket, newClient);
}
private _configureWS(socket: MyWebSocket, client: IClient): void {
client.setSocket(socket);
// Cleanup after a socket closes.
socket.on("close", () => {
if (client.getSocket() === socket) {
this.realm.removeClientById(client.getId());
this.emit("close", client);
}
});
// Handle messages from peers.
socket.on("message", (data: WebSocketLib.Data) => {
try {
const message = JSON.parse(data as string);
message.src = client.getId();
this.emit("message", client, message);
} catch (e) {
this.emit("error", e);
}
});
this.emit("connection", client);
}
private _sendErrorAndClose(socket: MyWebSocket, msg: Errors): void {
socket.send(
JSON.stringify({
type: MessageType.ERROR,
payload: { msg }
})
);
socket.close();
}
}

View File

@ -0,0 +1,4 @@
import EventEmitter from "events";
import WebSocketLib from "ws";
export type MyWebSocket = WebSocketLib & EventEmitter;

View File

@ -1,6 +1,6 @@
const { expect } = require('chai');
const Client = require('../../../../src/models/client');
const heartbeatHandler = require('../../../../src/messageHandler/handlers/heartbeat');
import { expect } from 'chai';
import { Client } from '../../../../src/models/client';
import { HeartbeatHandler } from '../../../../src/messageHandler/handlers';
describe('Heartbeat handler', () => {
it('should update last ping time', () => {
@ -9,7 +9,7 @@ describe('Heartbeat handler', () => {
const nowTime = new Date().getTime();
heartbeatHandler(client);
HeartbeatHandler(client);
expect(client.getLastPing()).to.be.closeTo(nowTime, 2);
});

View File

@ -1,6 +1,6 @@
const { expect } = require('chai');
const Realm = require('../../src/models/realm');
const Client = require('../../src/models/client');
import { expect } from 'chai';
import { Realm } from '../../src/models/realm';
import { Client } from '../../src/models/client';
describe('Realm', () => {
describe('#generateClientId', () => {

View File

@ -1,13 +1,13 @@
const { expect } = require('chai');
const Client = require('../../../src/models/client');
const Realm = require('../../../src/models/realm');
const checkBrokenConnectionsBuilder = require('../../../src/services/checkBrokenConnections');
import { expect } from 'chai';
import { Client } from '../../../src/models/client';
import { Realm } from '../../../src/models/realm';
import { CheckBrokenConnections } from '../../../src/services/checkBrokenConnections';
describe('checkBrokenConnections service', () => {
it('should remove client after 2 checks', (done) => {
const realm = new Realm();
const doubleCheckTime = 55;//~ equals to checkBrokenConnections.CHECK_INTERVAL * 2
const checkBrokenConnections = checkBrokenConnectionsBuilder({ realm, config: { alive_timeout: doubleCheckTime }, checkInterval: 30 });
const doubleCheckTime = 55;//~ equals to checkBrokenConnections.checkInterval * 2
const checkBrokenConnections = new CheckBrokenConnections({ realm, config: { alive_timeout: doubleCheckTime }, checkInterval: 30 });
const client = new Client({ id: 'id', token: '' });
realm.setClient(client, 'id');
@ -17,13 +17,13 @@ describe('checkBrokenConnections service', () => {
expect(realm.getClientById('id')).to.be.undefined;
checkBrokenConnections.stop();
done();
}, checkBrokenConnections.CHECK_INTERVAL * 2 + 30);
}, checkBrokenConnections.checkInterval * 2 + 30);
});
it('should remove client after 1 ping', (done) => {
const realm = new Realm();
const doubleCheckTime = 55;//~ equals to checkBrokenConnections.CHECK_INTERVAL * 2
const checkBrokenConnections = checkBrokenConnectionsBuilder({ realm, config: { alive_timeout: doubleCheckTime }, checkInterval: 30 });
const doubleCheckTime = 55;//~ equals to checkBrokenConnections.checkInterval * 2
const checkBrokenConnections = new CheckBrokenConnections({ realm, config: { alive_timeout: doubleCheckTime }, checkInterval: 30 });
const client = new Client({ id: 'id', token: '' });
realm.setClient(client, 'id');
@ -37,7 +37,7 @@ describe('checkBrokenConnections service', () => {
expect(realm.getClientById('id')).to.be.undefined;
checkBrokenConnections.stop();
done();
}, checkBrokenConnections.CHECK_INTERVAL * 2 + 10);
}, checkBrokenConnections.CHECK_INTERVAL);
}, checkBrokenConnections.checkInterval * 2 + 10);
}, checkBrokenConnections.checkInterval);
});
});

27
tsconfig.json Normal file
View File

@ -0,0 +1,27 @@
{
"compilerOptions": {
"lib": [
"esnext"
],
"target": "es2016",
"module": "commonjs",
"strict": true,
"esModuleInterop": true,
"downlevelIteration": true,
"moduleResolution": "node",
"noImplicitAny": true,
"noUnusedLocals": true,
"noUnusedParameters": true,
"resolveJsonModule": true,
"skipLibCheck": true,
"sourceMap": false,
"outDir": "dist"
},
"include": [
"./src/**/*",
],
"exclude": [
"test",
"bin",
]
}