Resolved merge conflicts, updated README.

This commit is contained in:
Eden Tyler-Moss 2020-01-31 11:39:19 +00:00
commit ef651a9c7e
86 changed files with 5326 additions and 1784 deletions

2
.eslintignore Normal file
View File

@ -0,0 +1,2 @@
src/
dist/

View File

@ -1,6 +1,9 @@
{ {
"parser": "@typescript-eslint/parser",
"extends": [ "extends": [
"eslint:recommended" "eslint:recommended",
"plugin:@typescript-eslint/eslint-recommended",
"plugin:@typescript-eslint/recommended"
], ],
"env": { "env": {
"node": true, "node": true,
@ -9,11 +12,25 @@
}, },
"parserOptions": { "parserOptions": {
"ecmaVersion": 2018, "ecmaVersion": 2018,
"sourceType": "module", "sourceType": "module"
"allowImportExportEverywhere": true
}, },
"rules": { "rules": {
"no-var": "error", "no-var": "error",
"no-console": "off" "no-console": "off",
"@typescript-eslint/camelcase": "off",
"@typescript-eslint/interface-name-prefix": "off",
"@typescript-eslint/member-delimiter-style": [
"error",
{
"multiline": {
"delimiter": "semi",
"requireLast": true
},
"singleline": {
"delimiter": "semi",
"requireLast": true
}
}
]
} }
} }

20
.github/workflows/dockerimage.yml vendored Normal file
View File

@ -0,0 +1,20 @@
name: Docker Image CI
on:
push:
branches:
- master
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v1
- name: Build the Docker image
run: docker build . --file Dockerfile --tag peerjs/peerjs-server:latest
- name: Publish to Registry
uses: elgohr/Publish-Docker-Github-Action@master
with:
name: peerjs/peerjs-server:latest
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}

4
.gitignore vendored
View File

@ -14,4 +14,6 @@ results
node_modules node_modules
npm-debug.log npm-debug.log
.idea .idea
.cache
.vscode

7
.gitpod.yml Normal file
View File

@ -0,0 +1,7 @@
tasks:
- init: npm i
command: npm start
ports:
- port: 9000
onOpen: open-preview

View File

@ -2,10 +2,8 @@ FROM node:alpine
RUN mkdir /peer-server RUN mkdir /peer-server
WORKDIR /peer-server WORKDIR /peer-server
COPY bin ./bin COPY bin ./bin
COPY dist ./dist
COPY package.json . COPY package.json .
COPY src ./src
COPY config ./config
COPY app.json .
RUN npm install RUN npm install
EXPOSE 9000 EXPOSE 9000
ENTRYPOINT ["node", "bin/peerjs"] ENTRYPOINT ["node", "bin/peerjs"]

View File

@ -1,26 +1,34 @@
[![Build Status](https://travis-ci.org/peers/peerjs-server.png?branch=master)](https://travis-ci.org/peers/peerjs-server) [![Build Status](https://travis-ci.org/peers/peerjs-server.png?branch=master)](https://travis-ci.org/peers/peerjs-server)
[![npm version](https://badge.fury.io/js/peer.svg)](https://www.npmjs.com/package/peer)
[![Downloads](https://img.shields.io/npm/dm/peer.svg)](https://www.npmjs.com/package/peer)
# PeerServer: A server for PeerJS # # PeerServer: A server for PeerJS #
This fork of peerjs-server adds functionality to set a custom ID generation fucntion. [Commit](https://github.com/ajmar/peerjs-server/commit/2552e9d)
PeerServer helps broker connections between PeerJS clients. Data is not proxied through the server. PeerServer helps broker connections between PeerJS clients. Data is not proxied through the server.
Run your own server on Gitpod!
[![Open in Gitpod](https://gitpod.io/button/open-in-gitpod.svg)](https://gitpod.io/#https://github.com/peers/peerjs-server)
## [https://peerjs.com](https://peerjs.com) ## [https://peerjs.com](https://peerjs.com)
### Run PeerServer ### Run PeerServer
1. Clone app: 1. Install PeerServer from npm or github:
#### NPM
```bash ```bash
git clone https://github.com/peers/peerjs-server.git npm install peer
``` ```
2. Install dependencies: #### github
```bash ```bash
git clone https://github.com/peers/peerjs-server.git#master
npm install npm install
``` ```
3. Run the server: 2. Run the server:
```bash ```bash
$> peerjs --port 9000 --key peerjs --path /myapp $> peerjs --port 9000 --key peerjs --path /myapp
@ -38,7 +46,9 @@ import {PeerServer} from 'peerjs-server';
const server = PeerServer({port: 9000, path: '/myapp'}); const server = PeerServer({port: 9000, path: '/myapp'});
``` ```
Connecting to the server from PeerJS: 3. Check that server works: open browser with [http://localhost:9000/myapp](http://localhost:9000/myapp) It should returns JSON with name, description and website fields.
### Connecting to the server from PeerJS:
```html ```html
<script> <script>
@ -46,7 +56,7 @@ Connecting to the server from PeerJS:
</script> </script>
``` ```
Using HTTPS: Simply pass in PEM-encoded certificate and key. ### Using HTTPS: Simply pass in PEM-encoded certificate and key.
```javascript ```javascript
import fs from 'fs'; import fs from 'fs';
@ -61,7 +71,7 @@ const server = PeerServer({
}); });
``` ```
#### Running PeerServer behind a reverse proxy ### Running PeerServer behind a reverse proxy
Make sure to set the `proxied` option, otherwise IP based limiting will fail. Make sure to set the `proxied` option, otherwise IP based limiting will fail.
The option is passed verbatim to the The option is passed verbatim to the
@ -74,6 +84,20 @@ import {PeerServer} from 'peerjs-server';
const server = PeerServer({port: 9000, path: '/myapp', proxied: true}); const server = PeerServer({port: 9000, path: '/myapp', proxied: true});
``` ```
### Custom client ID generation
You can specify a custom function to use to generate client IDs.
```javascript
const genRandomId = () => {
// Original generation algorithm
return (Math.random().toString(36) + '0000000000000000000').substr(2, 16);
}
const server = PeerServer({port: 9000, path: '/myapp', proxied: true, genRandomId: genRandomId });
```
### Combining with existing express app ### Combining with existing express app
```javascript ```javascript
@ -146,7 +170,7 @@ This will start a peerjs server on port 9000 exposed on port 9000.
## Problems? ## Problems?
Discuss PeerJS on our Google Group: Discuss PeerJS on our Telegram chat:
https://groups.google.com/forum/?fromgroups#!forum/peerjs https://t.me/joinchat/ENhPuhTvhm8WlIxTjQf7Og
Please post any bugs as a Github issue. Please post any bugs as a Github issue.

View File

@ -1,69 +1,70 @@
#!/usr/bin/env node #!/usr/bin/env node
// tslint:disable
const path = require('path'); const path = require("path");
const pkg = require('../package.json'); const pkg = require("../package.json");
const fs = require('fs'); const fs = require("fs");
const version = pkg.version; const version = pkg.version;
const PeerServer = require('../src').PeerServer; const { PeerServer } = require("../dist/src");
const opts = require('optimist') const opts = require("optimist")
.usage('Usage: $0') .usage("Usage: $0")
.options({ .options({
expire_timeout: { expire_timeout: {
demand: false, demand: false,
alias: 't', alias: "t",
description: 'timeout (milliseconds)', description: "timeout (milliseconds)",
default: 5000 default: 5000
}, },
concurrent_limit: { concurrent_limit: {
demand: false, demand: false,
alias: 'c', alias: "c",
description: 'concurrent limit', description: "concurrent limit",
default: 5000 default: 5000
}, },
alive_timeout: { alive_timeout: {
demand: false, demand: false,
description: 'broken connection check timeout (milliseconds)', description: "broken connection check timeout (milliseconds)",
default: 60000 default: 60000
}, },
key: { key: {
demand: false, demand: false,
alias: 'k', alias: "k",
description: 'connection key', description: "connection key",
default: 'peerjs' default: "peerjs"
}, },
sslkey: { sslkey: {
demand: false, demand: false,
description: 'path to SSL key' description: "path to SSL key"
}, },
sslcert: { sslcert: {
demand: false, demand: false,
description: 'path to SSL certificate' description: "path to SSL certificate"
}, },
port: { port: {
demand: true, demand: true,
alias: 'p', alias: "p",
description: 'port' description: "port"
}, },
path: { path: {
demand: false, demand: false,
description: 'custom path', description: "custom path",
default: '/' default: "/"
}, },
allow_discovery: { allow_discovery: {
demand: false, demand: false,
description: 'allow discovery of peers' description: "allow discovery of peers"
}, },
proxied: { proxied: {
demand: false, demand: false,
description: 'Set true if PeerServer stays behind a reverse proxy', description: "Set true if PeerServer stays behind a reverse proxy",
default: false default: false
} }
}) })
.boolean('allow_discovery') .boolean("allow_discovery")
.argv; .argv;
process.on('uncaughtException', function (e) { process.on("uncaughtException", function (e) {
console.error('Error: ' + e); console.error("Error: " + e);
}); });
if (opts.sslkey || opts.sslcert) { if (opts.sslkey || opts.sslcert) {
@ -76,8 +77,8 @@ if (opts.sslkey || opts.sslcert) {
delete opts.sslkey; delete opts.sslkey;
delete opts.sslcert; delete opts.sslcert;
} else { } else {
console.error('Warning: PeerServer will not run because either ' + console.error("Warning: PeerServer will not run because either " +
'the key or the certificate has not been provided.'); "the key or the certificate has not been provided.");
process.exit(1); process.exit(1);
} }
} }
@ -88,15 +89,15 @@ const server = PeerServer(opts, server => {
const port = server.address().port; const port = server.address().port;
console.log( console.log(
'Started PeerServer on %s, port: %s, path: %s (v. %s)', "Started PeerServer on %s, port: %s, path: %s (v. %s)",
host, port, userPath || '/', version host, port, userPath || "/", version
); );
}); });
server.on('connection', client => { server.on("connection", client => {
console.log(`Client connected: ${client.getId()}`); console.log(`Client connected: ${client.getId()}`);
}); });
server.on('disconnect', client => { server.on("disconnect", client => {
console.log(`Client disconnected: ${client.getId()}`); console.log(`Client disconnected: ${client.getId()}`);
}); });

View File

@ -2,8 +2,10 @@
### vNEXT 0.3.0 ### vNEXT 0.3.0
* refactoring (add ESLint, split code into small unit) Thanks to @d07RiV @zhou-yg * Convert project to TypeScript 3.7.3.
* update deps * Use UUID when generate client id - #152
* Refactoring (add ESLint, split code into small unit) Thanks to @d07RiV @zhou-yg
* Update deps.
### 0.2.6 ### 0.2.6

View File

@ -1,15 +0,0 @@
module.exports = {
port: 9000,
expire_timeout: 5000,
alive_timeout: 60000,
key: 'peerjs',
path: '/myapp',
concurrent_limit: 5000,
allow_discovery: false,
proxied: false,
cleanup_out_msgs: 1000,
ssl: {
key: '',
cert: ''
}
};

5
dist/app.json vendored Normal file
View File

@ -0,0 +1,5 @@
{
"name": "PeerJS Server",
"description": "A server side element to broker connections between PeerJS clients.",
"website": "http://peerjs.com/"
}

18
dist/config/index.js vendored Normal file
View File

@ -0,0 +1,18 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const defaultConfig = {
port: 9000,
expire_timeout: 5000,
alive_timeout: 60000,
key: "peerjs",
path: "/myapp",
concurrent_limit: 5000,
allow_discovery: false,
proxied: false,
cleanup_out_msgs: 1000,
ssl: {
key: "",
cert: ""
}
};
exports.default = defaultConfig;

24
dist/src/api/index.js vendored Normal file
View File

@ -0,0 +1,24 @@
"use strict";
var __importDefault = (this && this.__importDefault) || function (mod) {
return (mod && mod.__esModule) ? mod : { "default": mod };
};
Object.defineProperty(exports, "__esModule", { value: true });
const body_parser_1 = __importDefault(require("body-parser"));
const cors_1 = __importDefault(require("cors"));
const express_1 = __importDefault(require("express"));
const app_json_1 = __importDefault(require("../../app.json"));
const auth_1 = require("./middleware/auth");
const calls_1 = __importDefault(require("./v1/calls"));
const public_1 = __importDefault(require("./v1/public"));
exports.Api = ({ config, realm, messageHandler }) => {
const authMiddleware = new auth_1.AuthMiddleware(config, realm);
const app = express_1.default.Router();
const jsonParser = body_parser_1.default.json();
app.use(cors_1.default());
app.get("/", (_, res) => {
res.send(app_json_1.default);
});
app.use("/:key", public_1.default({ config, realm }));
app.use("/:key/:id/:token", authMiddleware.handle, jsonParser, calls_1.default({ realm, messageHandler }));
return app;
};

27
dist/src/api/middleware/auth/index.js vendored Normal file
View File

@ -0,0 +1,27 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const enums_1 = require("../../../enums");
class AuthMiddleware {
constructor(config, realm) {
this.config = config;
this.realm = realm;
}
handle(req, res, next) {
const { id, token, key } = req.params;
if (key !== this.config.key) {
return res.status(401).send(enums_1.Errors.INVALID_KEY);
}
if (!id) {
return res.sendStatus(401);
}
const client = this.realm.getClientById(id);
if (!client) {
return res.sendStatus(401);
}
if (client.getToken() && token !== client.getToken()) {
return res.status(401).send(enums_1.Errors.INVALID_TOKEN);
}
next();
}
}
exports.AuthMiddleware = AuthMiddleware;

2
dist/src/api/middleware/middleware.js vendored Normal file
View File

@ -0,0 +1,2 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });

32
dist/src/api/v1/calls/index.js vendored Normal file
View File

@ -0,0 +1,32 @@
"use strict";
var __importDefault = (this && this.__importDefault) || function (mod) {
return (mod && mod.__esModule) ? mod : { "default": mod };
};
Object.defineProperty(exports, "__esModule", { value: true });
const express_1 = __importDefault(require("express"));
exports.default = ({ realm, messageHandler }) => {
const app = express_1.default.Router();
const handle = (req, res, next) => {
const { id } = req.params;
if (!id)
return next();
const client = realm.getClientById(id);
if (!client) {
throw new Error(`client not found:${id}`);
}
const { type, dst, payload } = req.body;
const message = {
type,
src: id,
dst,
payload
};
messageHandler.handle(client, message);
res.sendStatus(200);
};
app.post("/offer", handle);
app.post("/candidate", handle);
app.post("/answer", handle);
app.post("/leave", handle);
return app;
};

23
dist/src/api/v1/public/index.js vendored Normal file
View File

@ -0,0 +1,23 @@
"use strict";
var __importDefault = (this && this.__importDefault) || function (mod) {
return (mod && mod.__esModule) ? mod : { "default": mod };
};
Object.defineProperty(exports, "__esModule", { value: true });
const express_1 = __importDefault(require("express"));
exports.default = ({ config, realm }) => {
const app = express_1.default.Router();
// Retrieve guaranteed random ID.
app.get("/id", (_, res) => {
res.contentType("html");
res.send(realm.generateClientId());
});
// Get a list of all peers for a key, enabled by the `allowDiscovery` flag.
app.get("/peers", (_, res) => {
if (config.allow_discovery) {
const clientsIds = realm.getClientsIds();
return res.send(clientsIds);
}
res.sendStatus(401);
});
return app;
};

18
dist/src/config/index.js vendored Normal file
View File

@ -0,0 +1,18 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const defaultConfig = {
port: 9000,
expire_timeout: 5000,
alive_timeout: 60000,
key: "peerjs",
path: "/myapp",
concurrent_limit: 5000,
allow_discovery: false,
proxied: false,
cleanup_out_msgs: 1000,
ssl: {
key: "",
cert: ""
}
};
exports.default = defaultConfig;

21
dist/src/enums.js vendored Normal file
View File

@ -0,0 +1,21 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
var Errors;
(function (Errors) {
Errors["INVALID_KEY"] = "Invalid key provided";
Errors["INVALID_TOKEN"] = "Invalid token provided";
Errors["INVALID_WS_PARAMETERS"] = "No id, token, or key supplied to websocket server";
Errors["CONNECTION_LIMIT_EXCEED"] = "Server has reached its concurrent user limit";
})(Errors = exports.Errors || (exports.Errors = {}));
var MessageType;
(function (MessageType) {
MessageType["OPEN"] = "OPEN";
MessageType["LEAVE"] = "LEAVE";
MessageType["CANDIDATE"] = "CANDIDATE";
MessageType["OFFER"] = "OFFER";
MessageType["ANSWER"] = "ANSWER";
MessageType["EXPIRE"] = "EXPIRE";
MessageType["HEARTBEAT"] = "HEARTBEAT";
MessageType["ID_TAKEN"] = "ID-TAKEN";
MessageType["ERROR"] = "ERROR";
})(MessageType = exports.MessageType || (exports.MessageType = {}));

52
dist/src/index.js vendored Normal file
View File

@ -0,0 +1,52 @@
"use strict";
var __importDefault = (this && this.__importDefault) || function (mod) {
return (mod && mod.__esModule) ? mod : { "default": mod };
};
Object.defineProperty(exports, "__esModule", { value: true });
const express_1 = __importDefault(require("express"));
const http_1 = __importDefault(require("http"));
const https_1 = __importDefault(require("https"));
const config_1 = __importDefault(require("./config"));
const instance_1 = require("./instance");
function ExpressPeerServer(server, options) {
const app = express_1.default();
const newOptions = Object.assign(Object.assign({}, config_1.default), options);
if (newOptions.proxied) {
app.set("trust proxy", newOptions.proxied === "false" ? false : !!newOptions.proxied);
}
app.on("mount", () => {
if (!server) {
throw new Error("Server is not passed to constructor - " +
"can't start PeerServer");
}
instance_1.createInstance({ app, server, options: newOptions });
});
return app;
}
exports.ExpressPeerServer = ExpressPeerServer;
function PeerServer(options = {}, callback) {
const app = express_1.default();
const newOptions = Object.assign(Object.assign({}, config_1.default), options);
let path = newOptions.path;
const port = newOptions.port;
if (!path.startsWith('/')) {
path = "/" + path;
}
if (!path.endsWith('/')) {
path += "/";
}
let server;
if (newOptions.ssl && newOptions.ssl.key && newOptions.ssl.cert) {
server = https_1.default.createServer(options.ssl, app);
// @ts-ignore
delete newOptions.ssl;
}
else {
server = http_1.default.createServer(app);
}
const peerjs = ExpressPeerServer(server, newOptions);
app.use(peerjs);
server.listen(port, () => { var _a; return (_a = callback) === null || _a === void 0 ? void 0 : _a(server); });
return peerjs;
}
exports.PeerServer = PeerServer;

51
dist/src/instance.js vendored Normal file
View File

@ -0,0 +1,51 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const realm_1 = require("./models/realm");
const checkBrokenConnections_1 = require("./services/checkBrokenConnections");
const messagesExpire_1 = require("./services/messagesExpire");
const webSocketServer_1 = require("./services/webSocketServer");
const messageHandler_1 = require("./messageHandler");
const api_1 = require("./api");
exports.createInstance = ({ app, server, options }) => {
const config = options;
const realm = new realm_1.Realm();
const messageHandler = new messageHandler_1.MessageHandler(realm);
const api = api_1.Api({ config, realm, messageHandler });
const messagesExpire = new messagesExpire_1.MessagesExpire({ realm, config, messageHandler });
const checkBrokenConnections = new checkBrokenConnections_1.CheckBrokenConnections({
realm,
config,
onClose: client => {
app.emit("disconnect", client);
}
});
app.use(options.path, api);
const wss = new webSocketServer_1.WebSocketServer({
server,
realm,
config
});
wss.on("connection", (client) => {
const messageQueue = realm.getMessageQueueById(client.getId());
if (messageQueue) {
let message;
while (message = messageQueue.readMessage()) {
messageHandler.handle(client, message);
}
realm.clearMessageQueue(client.getId());
}
app.emit("connection", client);
});
wss.on("message", (client, message) => {
app.emit("message", client, message);
messageHandler.handle(client, message);
});
wss.on("close", (client) => {
app.emit("disconnect", client);
});
wss.on("error", (error) => {
app.emit("error", error);
});
messagesExpire.startMessagesExpiration();
checkBrokenConnections.start();
};

2
dist/src/messageHandler/handler.js vendored Normal file
View File

@ -0,0 +1,2 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });

View File

@ -0,0 +1,9 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.HeartbeatHandler = (client) => {
if (client) {
const nowTime = new Date().getTime();
client.setLastPing(nowTime);
}
return true;
};

View File

@ -0,0 +1,6 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
var heartbeat_1 = require("./heartbeat");
exports.HeartbeatHandler = heartbeat_1.HeartbeatHandler;
var transmission_1 = require("./transmission");
exports.TransmissionHandler = transmission_1.TransmissionHandler;

View File

@ -0,0 +1,58 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const enums_1 = require("../../../enums");
exports.TransmissionHandler = ({ realm }) => {
const handle = (client, message) => {
const type = message.type;
const srcId = message.src;
const dstId = message.dst;
const destinationClient = realm.getClientById(dstId);
// User is connected!
if (destinationClient) {
const socket = destinationClient.getSocket();
try {
if (socket) {
const data = JSON.stringify(message);
socket.send(data);
}
else {
// Neither socket no res available. Peer dead?
throw new Error("Peer dead");
}
}
catch (e) {
// This happens when a peer disconnects without closing connections and
// the associated WebSocket has not closed.
// Tell other side to stop trying.
if (socket) {
socket.close();
}
else {
realm.removeClientById(destinationClient.getId());
}
handle(client, {
type: enums_1.MessageType.LEAVE,
src: dstId,
dst: srcId
});
}
}
else {
// Wait for this client to connect/reconnect (XHR) for important
// messages.
const ignoredTypes = [enums_1.MessageType.LEAVE, enums_1.MessageType.EXPIRE];
if (!ignoredTypes.includes(type) && dstId) {
realm.addMessageToQueue(dstId, message);
}
else if (type === enums_1.MessageType.LEAVE && !dstId) {
realm.removeClientById(srcId);
}
else {
// Unavailable destination specified with message LEAVE or EXPIRE
// Ignore
}
}
return true;
};
return handle;
};

View File

@ -0,0 +1,20 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
class HandlersRegistry {
constructor() {
this.handlers = new Map();
}
registerHandler(messageType, handler) {
if (this.handlers.has(messageType))
return;
this.handlers.set(messageType, handler);
}
handle(client, message) {
const { type } = message;
const handler = this.handlers.get(type);
if (!handler)
return false;
return handler(client, message);
}
}
exports.HandlersRegistry = HandlersRegistry;

31
dist/src/messageHandler/index.js vendored Normal file
View File

@ -0,0 +1,31 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const enums_1 = require("../enums");
const handlers_1 = require("./handlers");
const handlersRegistry_1 = require("./handlersRegistry");
class MessageHandler {
constructor(realm, handlersRegistry = new handlersRegistry_1.HandlersRegistry()) {
this.handlersRegistry = handlersRegistry;
const transmissionHandler = handlers_1.TransmissionHandler({ realm });
const heartbeatHandler = handlers_1.HeartbeatHandler;
const handleTransmission = (client, { type, src, dst, payload }) => {
return transmissionHandler(client, {
type,
src,
dst,
payload,
});
};
const handleHeartbeat = (client, message) => heartbeatHandler(client, message);
this.handlersRegistry.registerHandler(enums_1.MessageType.HEARTBEAT, handleHeartbeat);
this.handlersRegistry.registerHandler(enums_1.MessageType.OFFER, handleTransmission);
this.handlersRegistry.registerHandler(enums_1.MessageType.ANSWER, handleTransmission);
this.handlersRegistry.registerHandler(enums_1.MessageType.CANDIDATE, handleTransmission);
this.handlersRegistry.registerHandler(enums_1.MessageType.LEAVE, handleTransmission);
this.handlersRegistry.registerHandler(enums_1.MessageType.EXPIRE, handleTransmission);
}
handle(client, message) {
return this.handlersRegistry.handle(client, message);
}
}
exports.MessageHandler = MessageHandler;

View File

@ -0,0 +1,20 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
class MessageHandlers {
constructor() {
this.handlers = new Map();
}
registerHandler(messageType, handler) {
if (this.handlers.has(messageType))
return;
this.handlers.set(messageType, handler);
}
handle(client, message) {
const { type } = message;
const handler = this.handlers.get(type);
if (!handler)
return false;
return handler(client, message);
}
}
exports.MessageHandlers = MessageHandlers;

33
dist/src/models/client.js vendored Normal file
View File

@ -0,0 +1,33 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
class Client {
constructor({ id, token }) {
this.socket = null;
this.lastPing = new Date().getTime();
this.id = id;
this.token = token;
}
getId() {
return this.id;
}
getToken() {
return this.token;
}
getSocket() {
return this.socket;
}
setSocket(socket) {
this.socket = socket;
}
getLastPing() {
return this.lastPing;
}
setLastPing(lastPing) {
this.lastPing = lastPing;
}
send(data) {
var _a;
(_a = this.socket) === null || _a === void 0 ? void 0 : _a.send(JSON.stringify(data));
}
}
exports.Client = Client;

2
dist/src/models/message.js vendored Normal file
View File

@ -0,0 +1,2 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });

25
dist/src/models/messageQueue.js vendored Normal file
View File

@ -0,0 +1,25 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
class MessageQueue {
constructor() {
this.lastReadAt = new Date().getTime();
this.messages = [];
}
getLastReadAt() {
return this.lastReadAt;
}
addMessage(message) {
this.messages.push(message);
}
readMessage() {
if (this.messages.length > 0) {
this.lastReadAt = new Date().getTime();
return this.messages.shift();
}
return undefined;
}
getMessages() {
return this.messages;
}
}
exports.MessageQueue = MessageQueue;

52
dist/src/models/realm.js vendored Normal file
View File

@ -0,0 +1,52 @@
"use strict";
var __importDefault = (this && this.__importDefault) || function (mod) {
return (mod && mod.__esModule) ? mod : { "default": mod };
};
Object.defineProperty(exports, "__esModule", { value: true });
const v4_1 = __importDefault(require("uuid/v4"));
const messageQueue_1 = require("./messageQueue");
class Realm {
constructor() {
this.clients = new Map();
this.messageQueues = new Map();
}
getClientsIds() {
return [...this.clients.keys()];
}
getClientById(clientId) {
return this.clients.get(clientId);
}
getClientsIdsWithQueue() {
return [...this.messageQueues.keys()];
}
setClient(client, id) {
this.clients.set(id, client);
}
removeClientById(id) {
const client = this.getClientById(id);
if (!client)
return false;
this.clients.delete(id);
return true;
}
getMessageQueueById(id) {
return this.messageQueues.get(id);
}
addMessageToQueue(id, message) {
if (!this.getMessageQueueById(id)) {
this.messageQueues.set(id, new messageQueue_1.MessageQueue());
}
this.getMessageQueueById(id).addMessage(message);
}
clearMessageQueue(id) {
this.messageQueues.delete(id);
}
generateClientId() {
let clientId = v4_1.default();
while (this.getClientById(clientId)) {
clientId = v4_1.default();
}
return clientId;
}
}
exports.Realm = Realm;

View File

@ -0,0 +1,50 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const DEFAULT_CHECK_INTERVAL = 300;
class CheckBrokenConnections {
constructor({ realm, config, checkInterval = DEFAULT_CHECK_INTERVAL, onClose }) {
this.timeoutId = null;
this.realm = realm;
this.config = config;
this.onClose = onClose;
this.checkInterval = checkInterval;
}
start() {
if (this.timeoutId) {
clearTimeout(this.timeoutId);
}
this.timeoutId = setTimeout(() => {
this.checkConnections();
this.timeoutId = null;
this.start();
}, this.checkInterval);
}
stop() {
if (this.timeoutId) {
clearTimeout(this.timeoutId);
this.timeoutId = null;
}
}
checkConnections() {
var _a, _b, _c;
const clientsIds = this.realm.getClientsIds();
const now = new Date().getTime();
const { alive_timeout: aliveTimeout } = this.config;
for (const clientId of clientsIds) {
const client = this.realm.getClientById(clientId);
const timeSinceLastPing = now - client.getLastPing();
if (timeSinceLastPing < aliveTimeout)
continue;
try {
(_a = client.getSocket()) === null || _a === void 0 ? void 0 : _a.close();
}
finally {
this.realm.clearMessageQueue(clientId);
this.realm.removeClientById(clientId);
client.setSocket(null);
(_c = (_b = this).onClose) === null || _c === void 0 ? void 0 : _c.call(_b, client);
}
}
}
}
exports.CheckBrokenConnections = CheckBrokenConnections;

View File

@ -0,0 +1,53 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const enums_1 = require("../../enums");
class MessagesExpire {
constructor({ realm, config, messageHandler }) {
this.timeoutId = null;
this.realm = realm;
this.config = config;
this.messageHandler = messageHandler;
}
startMessagesExpiration() {
if (this.timeoutId) {
clearTimeout(this.timeoutId);
}
// Clean up outstanding messages
this.timeoutId = setTimeout(() => {
this.pruneOutstanding();
this.timeoutId = null;
this.startMessagesExpiration();
}, this.config.cleanup_out_msgs);
}
stopMessagesExpiration() {
if (this.timeoutId) {
clearTimeout(this.timeoutId);
this.timeoutId = null;
}
}
pruneOutstanding() {
const destinationClientsIds = this.realm.getClientsIdsWithQueue();
const now = new Date().getTime();
const maxDiff = this.config.expire_timeout;
const seen = {};
for (const destinationClientId of destinationClientsIds) {
const messageQueue = this.realm.getMessageQueueById(destinationClientId);
const lastReadDiff = now - messageQueue.getLastReadAt();
if (lastReadDiff < maxDiff)
continue;
const messages = messageQueue.getMessages();
for (const message of messages) {
if (!seen[message.src]) {
this.messageHandler.handle(undefined, {
type: enums_1.MessageType.EXPIRE,
src: message.dst,
dst: message.src
});
seen[message.src] = true;
}
}
this.realm.clearMessageQueue(destinationClientId);
}
}
}
exports.MessagesExpire = MessagesExpire;

View File

@ -0,0 +1,92 @@
"use strict";
var __importDefault = (this && this.__importDefault) || function (mod) {
return (mod && mod.__esModule) ? mod : { "default": mod };
};
Object.defineProperty(exports, "__esModule", { value: true });
const events_1 = __importDefault(require("events"));
const url_1 = __importDefault(require("url"));
const ws_1 = __importDefault(require("ws"));
const enums_1 = require("../../enums");
const client_1 = require("../../models/client");
const WS_PATH = 'peerjs';
class WebSocketServer extends events_1.default {
constructor({ server, realm, config }) {
super();
this.setMaxListeners(0);
this.realm = realm;
this.config = config;
const path = this.config.path;
this.path = `${path}${path.endsWith('/') ? "" : "/"}${WS_PATH}`;
this.socketServer = new ws_1.default.Server({ path: this.path, server });
this.socketServer.on("connection", (socket, req) => this._onSocketConnection(socket, req));
this.socketServer.on("error", (error) => this._onSocketError(error));
}
_onSocketConnection(socket, req) {
const { query = {} } = url_1.default.parse(req.url, true);
const { id, token, key } = query;
if (!id || !token || !key) {
return this._sendErrorAndClose(socket, enums_1.Errors.INVALID_WS_PARAMETERS);
}
if (key !== this.config.key) {
return this._sendErrorAndClose(socket, enums_1.Errors.INVALID_KEY);
}
const client = this.realm.getClientById(id);
if (client) {
if (token !== client.getToken()) {
// ID-taken, invalid token
socket.send(JSON.stringify({
type: enums_1.MessageType.ID_TAKEN,
payload: { msg: "ID is taken" }
}));
return socket.close();
}
return this._configureWS(socket, client);
}
this._registerClient({ socket, id, token });
}
_onSocketError(error) {
// handle error
this.emit("error", error);
}
_registerClient({ socket, id, token }) {
// Check concurrent limit
const clientsCount = this.realm.getClientsIds().length;
if (clientsCount >= this.config.concurrent_limit) {
return this._sendErrorAndClose(socket, enums_1.Errors.CONNECTION_LIMIT_EXCEED);
}
const newClient = new client_1.Client({ id, token });
this.realm.setClient(newClient, id);
socket.send(JSON.stringify({ type: enums_1.MessageType.OPEN }));
this._configureWS(socket, newClient);
}
_configureWS(socket, client) {
client.setSocket(socket);
// Cleanup after a socket closes.
socket.on("close", () => {
if (client.getSocket() === socket) {
this.realm.removeClientById(client.getId());
this.emit("close", client);
}
});
// Handle messages from peers.
socket.on("message", (data) => {
try {
const message = JSON.parse(data);
message.src = client.getId();
this.emit("message", client, message);
}
catch (e) {
this.emit("error", e);
}
});
this.emit("connection", client);
}
_sendErrorAndClose(socket, msg) {
socket.send(JSON.stringify({
type: enums_1.MessageType.ERROR,
payload: { msg }
}));
socket.close();
}
}
exports.WebSocketServer = WebSocketServer;

View File

@ -0,0 +1,2 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });

3888
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@ -2,7 +2,7 @@
"name": "peer", "name": "peer",
"version": "0.3.0", "version": "0.3.0",
"description": "PeerJS server component", "description": "PeerJS server component",
"main": "src/index.js", "main": "dist/peerjs.server.min.js",
"bin": { "bin": {
"peerjs": "./bin/peerjs" "peerjs": "./bin/peerjs"
}, },
@ -13,22 +13,47 @@
"author": "Michelle Bu, Eric Zhang", "author": "Michelle Bu, Eric Zhang",
"license": "MIT", "license": "MIT",
"scripts": { "scripts": {
"test": "eslint . && mocha \"test/**/*.js\"", "build": "tsc",
"start": "bin/peerjs --port ${PORT:=9000}" "clean": "rimraf ./dist",
"lint": "eslint --ext .js,.ts .",
"tsc": "tsc",
"prebuild": "npm run lint",
"test": "npm run lint && mocha -r ts-node/register \"test/**/*\"",
"start": "bin/peerjs --port ${PORT:=9000}",
"dev:start": "npm-run-all build start",
"dev": "nodemon --watch src -e ts --exec npm run dev:start"
},
"release": {
"branch": "master"
}, },
"dependencies": { "dependencies": {
"body-parser": "^1.19.0", "@types/cors": "2.8.6",
"cors": "~2.8.4", "@types/express": "4.17.1",
"express": "^4.17.1", "@types/ws": "6.0.4",
"optimist": "~0.6.1", "body-parser": "1.19.0",
"ws": "^7.1.2" "cors": "2.8.4",
"express": "4.17.1",
"optimist": "0.6.1",
"uuid": "3.3.3",
"ws": "7.1.2"
}, },
"devDependencies": { "devDependencies": {
"@types/chai": "^4.1.7",
"@types/mocha": "^5.2.7",
"@types/node": "^10.14.16",
"@types/uuid": "3.4.6",
"@typescript-eslint/eslint-plugin": "^2.11.0",
"@typescript-eslint/parser": "^2.11.0",
"chai": "^4.2.0", "chai": "^4.2.0",
"eslint": "^6.2.1", "eslint": "^6.7.2",
"mocha": "^6.2.0", "mocha": "^6.2.2",
"semistandard": "^14.0.1", "mock-socket": "8.0.5",
"sinon": "^7.4.1" "nodemon": "1.19.1",
"npm-run-all": "4.1.5",
"rimraf": "3.0.0",
"sinon": "7.5.0",
"ts-node": "8.5.4",
"typescript": "3.7.3"
}, },
"engines": { "engines": {
"node": ">=10" "node": ">=10"

View File

@ -1,23 +0,0 @@
const express = require('express');
const cors = require('cors');
const bodyParser = require('body-parser');
const publicContent = require('../../app.json');
module.exports = ({ config, realm, messageHandler }) => {
const authMiddleware = require('./middleware/auth')({ config, realm });
const app = express.Router();
const jsonParser = bodyParser.json();
app.use(cors());
app.get('/', (req, res) => {
res.send(publicContent);
});
app.use('/:key', require('./v1/public')({ config, realm }));
app.use('/:key/:id/:token', authMiddleware, jsonParser, require('./v1/calls')({ realm, messageHandler }));
return app;
};

33
src/api/index.ts Normal file
View File

@ -0,0 +1,33 @@
import bodyParser from "body-parser";
import cors from "cors";
import express from "express";
import publicContent from "../../app.json";
import { IConfig } from "../config";
import { IMessageHandler } from "../messageHandler";
import { IRealm } from "../models/realm";
import { AuthMiddleware } from "./middleware/auth";
import CallsApi from "./v1/calls";
import PublicApi from "./v1/public";
export const Api = ({ config, realm, messageHandler }: {
config: IConfig;
realm: IRealm;
messageHandler: IMessageHandler;
}): express.Router => {
const authMiddleware = new AuthMiddleware(config, realm);
const app = express.Router();
const jsonParser = bodyParser.json();
app.use(cors());
app.get("/", (_, res) => {
res.send(publicContent);
});
app.use("/:key", PublicApi({ config, realm }));
app.use("/:key/:id/:token", authMiddleware.handle, jsonParser, CallsApi({ realm, messageHandler }));
return app;
};

View File

@ -1,25 +0,0 @@
const { Errors } = require('../../../enums');
module.exports = ({ config, realm }) => (req, res, next) => {
const { id, token, key } = req.params;
if (key !== config.key) {
return res.status(401).send(Errors.INVALID_KEY);
}
if (!id) {
return res.sendStatus(401);
}
const client = realm.getClientById(id);
if (!client) {
return res.sendStatus(401);
}
if (client.getToken() && token !== client.getToken()) {
return res.status(401).send(Errors.INVALID_TOKEN);
}
next();
};

View File

@ -0,0 +1,35 @@
import express from "express";
import { IConfig } from "../../../config";
import { Errors } from "../../../enums";
import { IRealm } from "../../../models/realm";
import { IMiddleware } from "../middleware";
export class AuthMiddleware implements IMiddleware {
constructor(private readonly config: IConfig, private readonly realm: IRealm) { }
public handle(req: express.Request, res: express.Response, next: express.NextFunction): any {
const { id, token, key } = req.params;
if (key !== this.config.key) {
return res.status(401).send(Errors.INVALID_KEY);
}
if (!id) {
return res.sendStatus(401);
}
const client = this.realm.getClientById(id);
if (!client) {
return res.sendStatus(401);
}
if (client.getToken() && token !== client.getToken()) {
return res.status(401).send(Errors.INVALID_TOKEN);
}
next();
}
}

View File

@ -0,0 +1,5 @@
import express from "express";
export interface IMiddleware {
handle(req: express.Request, res: express.Response, next: express.NextFunction): any;
}

View File

@ -1,36 +0,0 @@
const express = require('express');
module.exports = ({ realm, messageHandler }) => {
const app = express.Router();
const handle = (req, res, next) => {
const { id } = req.params;
if (!id) return next();
const client = realm.getClientById(id);
const { type, dst, payload } = req.body;
const message = {
type,
src: id,
dst,
payload
};
messageHandler(client, message);
res.sendStatus(200);
};
app.post('/offer', handle);
app.post('/candidate', handle);
app.post('/answer', handle);
app.post('/leave', handle);
return app;
};

40
src/api/v1/calls/index.ts Normal file
View File

@ -0,0 +1,40 @@
import express from "express";
import { IMessageHandler } from "../../../messageHandler";
import { IMessage } from "../../../models/message";
import { IRealm } from "../../../models/realm";
export default ({ realm, messageHandler }: { realm: IRealm, messageHandler: IMessageHandler; }): express.Router => {
const app = express.Router();
const handle = (req: express.Request, res: express.Response, next: express.NextFunction): any => {
const { id } = req.params;
if (!id) return next();
const client = realm.getClientById(id);
if (!client) {
throw new Error(`client not found:${id}`);
}
const { type, dst, payload } = req.body;
const message: IMessage = {
type,
src: id,
dst,
payload
};
messageHandler.handle(client, message);
res.sendStatus(200);
};
app.post("/offer", handle);
app.post("/candidate", handle);
app.post("/answer", handle);
app.post("/leave", handle);
return app;
};

View File

@ -1,16 +1,20 @@
const express = require('express'); import express from "express";
import { IConfig } from "../../../config";
import { IRealm } from "../../../models/realm";
module.exports = ({ config, realm }) => { export default ({ config, realm }: {
config: IConfig, realm: IRealm
}): express.Router => {
const app = express.Router(); const app = express.Router();
// Retrieve guaranteed random ID. // Retrieve guaranteed random ID.
app.get('/id', (req, res) => { app.get("/id", (_, res: express.Response) => {
res.contentType = 'text/html'; res.contentType("html");
res.send(realm.generateClientId(config.genRandomId)); res.send(realm.generateClientId(config.genRandomId));
}); });
// Get a list of all peers for a key, enabled by the `allowDiscovery` flag. // Get a list of all peers for a key, enabled by the `allowDiscovery` flag.
app.get('/peers', (req, res) => { app.get("/peers", (_, res: express.Response) => {
if (config.allow_discovery) { if (config.allow_discovery) {
const clientsIds = realm.getClientsIds(); const clientsIds = realm.getClientsIds();

33
src/config/index.ts Normal file
View File

@ -0,0 +1,33 @@
export interface IConfig {
readonly port: number;
readonly expire_timeout: number;
readonly alive_timeout: number;
readonly key: string;
readonly path: string;
readonly concurrent_limit: number;
readonly allow_discovery: boolean;
readonly proxied: boolean | string;
readonly cleanup_out_msgs: number;
readonly ssl?: {
key: string;
cert: string;
};
}
const defaultConfig: IConfig = {
port: 9000,
expire_timeout: 5000,
alive_timeout: 60000,
key: "peerjs",
path: "/myapp",
concurrent_limit: 5000,
allow_discovery: false,
proxied: false,
cleanup_out_msgs: 1000,
ssl: {
key: "",
cert: ""
}
};
export default defaultConfig;

View File

@ -1,18 +0,0 @@
module.exports.Errors = {
INVALID_KEY: 'Invalid key provided',
INVALID_TOKEN: 'Invalid token provided',
INVALID_WS_PARAMETERS: 'No id, token, or key supplied to websocket server',
CONNECTION_LIMIT_EXCEED: 'Server has reached its concurrent user limit'
};
module.exports.MessageType = {
OPEN: 'OPEN',
LEAVE: 'LEAVE',
CANDIDATE: 'CANDIDATE',
OFFER: 'OFFER',
ANSWER: 'ANSWER',
EXPIRE: 'EXPIRE',
HEARTBEAT: 'HEARTBEAT',
ID_TAKEN: 'ID-TAKEN',
ERROR: 'ERROR'
};

18
src/enums.ts Normal file
View File

@ -0,0 +1,18 @@
export enum Errors {
INVALID_KEY = "Invalid key provided",
INVALID_TOKEN = "Invalid token provided",
INVALID_WS_PARAMETERS = "No id, token, or key supplied to websocket server",
CONNECTION_LIMIT_EXCEED = "Server has reached its concurrent user limit"
}
export enum MessageType {
OPEN = "OPEN",
LEAVE = "LEAVE",
CANDIDATE = "CANDIDATE",
OFFER = "OFFER",
ANSWER = "ANSWER",
EXPIRE = "EXPIRE",
HEARTBEAT = "HEARTBEAT",
ID_TAKEN = "ID-TAKEN",
ERROR = "ERROR"
}

View File

@ -1,134 +0,0 @@
const express = require('express');
const http = require('http');
const https = require('https');
const defaultConfig = require('../config');
const WebSocketServer = require('./services/webSocketServer');
const Realm = require('./models/realm');
const init = ({ app, server, options }) => {
const config = options;
const realm = new Realm();
const messageHandler = require('./messageHandler')({ realm });
const api = require('./api')({ config, realm, messageHandler });
const { startMessagesExpiration } = require('./services/messagesExpire')({ realm, config, messageHandler });
const checkBrokenConnections = require('./services/checkBrokenConnections')({
realm, config, onClose: (client) => {
app.emit('disconnect', client);
}
});
app.use(options.path, api);
const wss = new WebSocketServer({
server,
realm,
config: {
...config,
}
});
wss.on('connection', client => {
const messageQueue = realm.getMessageQueueById(client.getId());
if (messageQueue) {
let message;
// eslint-disable-next-line no-cond-assign
while (message = messageQueue.readMessage()) {
messageHandler(client, message);
}
realm.clearMessageQueue(client.getId());
}
app.emit('connection', client);
});
wss.on('message', (client, message) => {
app.emit('message', client, message);
messageHandler(client, message);
});
wss.on('close', client => {
app.emit('disconnect', client);
});
wss.on('error', error => {
app.emit('error', error);
});
startMessagesExpiration();
checkBrokenConnections.start();
};
function ExpressPeerServer(server, options) {
const app = express();
options = {
...defaultConfig,
...options
};
if (options.proxied) {
app.set('trust proxy', options.proxied === 'false' ? false : options.proxied);
}
app.on('mount', () => {
if (!server) {
throw new Error('Server is not passed to constructor - ' +
'can\'t start PeerServer');
}
init({ app, server, options });
});
return app;
}
function PeerServer(options = {}, callback) {
const app = express();
options = {
...defaultConfig,
...options
};
let path = options.path;
const port = options.port;
if (path[0] !== '/') {
path = '/' + path;
}
if (path[path.length - 1] !== '/') {
path += '/';
}
let server;
if (options.ssl && options.ssl.key && options.ssl.cert) {
server = https.createServer(options.ssl, app);
delete options.ssl;
} else {
server = http.createServer(app);
}
const peerjs = ExpressPeerServer(server, options);
app.use(peerjs);
if (callback) {
server.listen(port, () => {
callback(server);
});
} else {
server.listen(port);
}
return peerjs;
}
exports = module.exports = {
ExpressPeerServer: ExpressPeerServer,
PeerServer: PeerServer
};

77
src/index.ts Normal file
View File

@ -0,0 +1,77 @@
import express from "express";
import http from "http";
import https from "https";
import { Server } from "net";
import defaultConfig, { IConfig } from "./config";
import { createInstance } from "./instance";
type Optional<T> = {
[P in keyof T]?: (T[P] | undefined);
};
function ExpressPeerServer(server: Server, options?: IConfig) {
const app = express();
const newOptions: IConfig = {
...defaultConfig,
...options
};
if (newOptions.proxied) {
app.set("trust proxy", newOptions.proxied === "false" ? false : !!newOptions.proxied);
}
app.on("mount", () => {
if (!server) {
throw new Error("Server is not passed to constructor - " +
"can't start PeerServer");
}
createInstance({ app, server, options: newOptions });
});
return app;
}
function PeerServer(options: Optional<IConfig> = {}, callback?: (server: Server) => void) {
const app = express();
const newOptions: IConfig = {
...defaultConfig,
...options
};
let path = newOptions.path;
const port = newOptions.port;
if (!path.startsWith('/')) {
path = "/" + path;
}
if (!path.endsWith('/')) {
path += "/";
}
let server: Server;
if (newOptions.ssl && newOptions.ssl.key && newOptions.ssl.cert) {
server = https.createServer(options.ssl!, app);
// @ts-ignore
delete newOptions.ssl;
} else {
server = http.createServer(app);
}
const peerjs = ExpressPeerServer(server, newOptions);
app.use(peerjs);
server.listen(port, () => callback?.(server));
return peerjs;
}
export {
ExpressPeerServer,
PeerServer
};

71
src/instance.ts Normal file
View File

@ -0,0 +1,71 @@
import express from "express";
import { Server } from "net";
import { IClient } from "./models/client";
import { IMessage } from "./models/message";
import { Realm } from "./models/realm";
import { IRealm } from "./models/realm";
import { CheckBrokenConnections } from "./services/checkBrokenConnections";
import { IMessagesExpire, MessagesExpire } from "./services/messagesExpire";
import { IWebSocketServer, WebSocketServer } from "./services/webSocketServer";
import { MessageHandler } from "./messageHandler";
import { Api } from "./api";
import { IConfig } from "./config";
export const createInstance = ({ app, server, options }: {
app: express.Application,
server: Server,
options: IConfig;
}): void => {
const config = options;
const realm: IRealm = new Realm();
const messageHandler = new MessageHandler(realm);
const api = Api({ config, realm, messageHandler });
const messagesExpire: IMessagesExpire = new MessagesExpire({ realm, config, messageHandler });
const checkBrokenConnections = new CheckBrokenConnections({
realm,
config,
onClose: client => {
app.emit("disconnect", client);
}
});
app.use(options.path, api);
const wss: IWebSocketServer = new WebSocketServer({
server,
realm,
config
});
wss.on("connection", (client: IClient) => {
const messageQueue = realm.getMessageQueueById(client.getId());
if (messageQueue) {
let message: IMessage | undefined;
while (message = messageQueue.readMessage()) {
messageHandler.handle(client, message);
}
realm.clearMessageQueue(client.getId());
}
app.emit("connection", client);
});
wss.on("message", (client: IClient, message: IMessage) => {
app.emit("message", client, message);
messageHandler.handle(client, message);
});
wss.on("close", (client: IClient) => {
app.emit("disconnect", client);
});
wss.on("error", (error: Error) => {
app.emit("error", error);
});
messagesExpire.startMessagesExpiration();
checkBrokenConnections.start();
};

View File

@ -0,0 +1,4 @@
import { IClient } from "../models/client";
import { IMessage } from "../models/message";
export type Handler = (client: IClient | undefined, message: IMessage) => boolean;

View File

@ -1,4 +0,0 @@
module.exports = (client) => {
const nowTime = new Date().getTime();
client.setLastPing(nowTime);
};

View File

@ -0,0 +1,10 @@
import { IClient } from "../../../models/client";
export const HeartbeatHandler = (client: IClient | undefined): boolean => {
if (client) {
const nowTime = new Date().getTime();
client.setLastPing(nowTime);
}
return true;
};

View File

@ -0,0 +1,2 @@
export { HeartbeatHandler } from "./heartbeat";
export { TransmissionHandler } from "./transmission";

View File

@ -1,49 +0,0 @@
const { MessageType } = require('../../../enums');
module.exports = ({ realm }) => (client, message) => {
const type = message.type;
const srcId = message.src;
const dstId = message.dst;
const destinationClient = realm.getClientById(dstId);
// User is connected!
if (destinationClient) {
try {
if (destinationClient.socket) {
const data = JSON.stringify(message);
destinationClient.socket.send(data);
} else {
// Neither socket no res available. Peer dead?
throw new Error('Peer dead');
}
} catch (e) {
// This happens when a peer disconnects without closing connections and
// the associated WebSocket has not closed.
// Tell other side to stop trying.
if (destinationClient.socket) {
destinationClient.socket.close();
} else {
realm.removeClientById(destinationClient.getId());
}
module.exports({ realm })(client, {
type: MessageType.LEAVE,
src: dstId,
dst: srcId
});
}
} else {
// Wait for this client to connect/reconnect (XHR) for important
// messages.
if (type !== MessageType.LEAVE && type !== MessageType.EXPIRE && dstId) {
realm.addMessageToQueue(dstId, message);
} else if (type === MessageType.LEAVE && !dstId) {
realm.removeClientById(srcId);
} else {
// Unavailable destination specified with message LEAVE or EXPIRE
// Ignore
}
}
};

View File

@ -0,0 +1,61 @@
import { MessageType } from "../../../enums";
import { IClient } from "../../../models/client";
import { IMessage } from "../../../models/message";
import { IRealm } from "../../../models/realm";
export const TransmissionHandler = ({ realm }: { realm: IRealm; }): (client: IClient | undefined, message: IMessage) => boolean => {
const handle = (client: IClient | undefined, message: IMessage) => {
const type = message.type;
const srcId = message.src;
const dstId = message.dst;
const destinationClient = realm.getClientById(dstId);
// User is connected!
if (destinationClient) {
const socket = destinationClient.getSocket();
try {
if (socket) {
const data = JSON.stringify(message);
socket.send(data);
} else {
// Neither socket no res available. Peer dead?
throw new Error("Peer dead");
}
} catch (e) {
// This happens when a peer disconnects without closing connections and
// the associated WebSocket has not closed.
// Tell other side to stop trying.
if (socket) {
socket.close();
} else {
realm.removeClientById(destinationClient.getId());
}
handle(client, {
type: MessageType.LEAVE,
src: dstId,
dst: srcId
});
}
} else {
// Wait for this client to connect/reconnect (XHR) for important
// messages.
const ignoredTypes = [MessageType.LEAVE, MessageType.EXPIRE];
if (!ignoredTypes.includes(type) && dstId) {
realm.addMessageToQueue(dstId, message);
} else if (type === MessageType.LEAVE && !dstId) {
realm.removeClientById(srcId);
} else {
// Unavailable destination specified with message LEAVE or EXPIRE
// Ignore
}
}
return true;
};
return handle;
};

View File

@ -0,0 +1,29 @@
import { MessageType } from "../enums";
import { IClient } from "../models/client";
import { IMessage } from "../models/message";
import { Handler } from "./handler";
export interface IHandlersRegistry {
registerHandler(messageType: MessageType, handler: Handler): void;
handle(client: IClient | undefined, message: IMessage): boolean;
}
export class HandlersRegistry implements IHandlersRegistry {
private readonly handlers: Map<MessageType, Handler> = new Map();
public registerHandler(messageType: MessageType, handler: Handler): void {
if (this.handlers.has(messageType)) return;
this.handlers.set(messageType, handler);
}
public handle(client: IClient | undefined, message: IMessage): boolean {
const { type } = message;
const handler = this.handlers.get(type);
if (!handler) return false;
return handler(client, message);
}
}

View File

@ -1,49 +0,0 @@
const { MessageType } = require('../enums');
class MessageHandlers {
constructor() {
this.handlers = {};
}
registerHandler(messageType, handler) {
this.handlers[messageType] = handler;
}
handle(client, message) {
const { type } = message;
const handler = this.handlers[type];
if (!handler) {
return;
}
handler(client, message);
}
}
module.exports = ({ realm }) => {
const transmissionHandler = require('./handlers/transmission')({ realm });
const heartbeatHandler = require('./handlers/heartbeat');
const messageHandlers = new MessageHandlers();
const handleTransmission = (client, message) => {
transmissionHandler(client, {
type: message.type,
src: message.src,
dst: message.dst,
payload: message.payload
});
};
const handleHeartbeat = (client) => heartbeatHandler(client);
messageHandlers.registerHandler(MessageType.HEARTBEAT, handleHeartbeat);
messageHandlers.registerHandler(MessageType.OFFER, handleTransmission);
messageHandlers.registerHandler(MessageType.ANSWER, handleTransmission);
messageHandlers.registerHandler(MessageType.CANDIDATE, handleTransmission);
messageHandlers.registerHandler(MessageType.LEAVE, handleTransmission);
messageHandlers.registerHandler(MessageType.EXPIRE, handleTransmission);
return (client, message) => messageHandlers.handle(client, message);
};

View File

@ -0,0 +1,40 @@
import { MessageType } from "../enums";
import { IClient } from "../models/client";
import { IMessage } from "../models/message";
import { IRealm } from "../models/realm";
import { Handler } from "./handler";
import { HeartbeatHandler, TransmissionHandler } from "./handlers";
import { IHandlersRegistry, HandlersRegistry } from "./handlersRegistry";
export interface IMessageHandler {
handle(client: IClient | undefined, message: IMessage): boolean;
}
export class MessageHandler implements IMessageHandler {
constructor(realm: IRealm, private readonly handlersRegistry: IHandlersRegistry = new HandlersRegistry()) {
const transmissionHandler: Handler = TransmissionHandler({ realm });
const heartbeatHandler: Handler = HeartbeatHandler;
const handleTransmission: Handler = (client: IClient | undefined, { type, src, dst, payload }: IMessage): boolean => {
return transmissionHandler(client, {
type,
src,
dst,
payload,
});
};
const handleHeartbeat = (client: IClient | undefined, message: IMessage) => heartbeatHandler(client, message);
this.handlersRegistry.registerHandler(MessageType.HEARTBEAT, handleHeartbeat);
this.handlersRegistry.registerHandler(MessageType.OFFER, handleTransmission);
this.handlersRegistry.registerHandler(MessageType.ANSWER, handleTransmission);
this.handlersRegistry.registerHandler(MessageType.CANDIDATE, handleTransmission);
this.handlersRegistry.registerHandler(MessageType.LEAVE, handleTransmission);
this.handlersRegistry.registerHandler(MessageType.EXPIRE, handleTransmission);
}
public handle(client: IClient | undefined, message: IMessage): boolean {
return this.handlersRegistry.handle(client, message);
}
}

View File

@ -1,38 +0,0 @@
class Client {
constructor({ id, token }) {
this.id = id;
this.token = token;
this.socket = null;
this.lastPing = new Date().getTime();
}
getId() {
return this.id;
}
getToken() {
return this.token;
}
getSocket() {
return this.socket;
}
setSocket(socket) {
this.socket = socket;
}
getLastPing() {
return this.lastPing;
}
setLastPing(lastPing) {
this.lastPing = lastPing;
}
send(data) {
this.socket.send(JSON.stringify(data));
}
}
module.exports = Client;

57
src/models/client.ts Normal file
View File

@ -0,0 +1,57 @@
import { MyWebSocket } from "../services/webSocketServer/webSocket";
export interface IClient {
getId(): string;
getToken(): string;
getSocket(): MyWebSocket | null;
setSocket(socket: MyWebSocket | null): void;
getLastPing(): number;
setLastPing(lastPing: number): void;
send(data: any): void;
}
export class Client implements IClient {
private readonly id: string;
private readonly token: string;
private socket: MyWebSocket | null = null;
private lastPing: number = new Date().getTime();
constructor({ id, token }: { id: string, token: string; }) {
this.id = id;
this.token = token;
}
public getId(): string {
return this.id;
}
public getToken(): string {
return this.token;
}
public getSocket(): MyWebSocket | null {
return this.socket;
}
public setSocket(socket: MyWebSocket | null): void {
this.socket = socket;
}
public getLastPing(): number {
return this.lastPing;
}
public setLastPing(lastPing: number): void {
this.lastPing = lastPing;
}
public send(data: any): void {
this.socket?.send(JSON.stringify(data));
}
}

8
src/models/message.ts Normal file
View File

@ -0,0 +1,8 @@
import { MessageType } from "../enums";
export interface IMessage {
readonly type: MessageType;
readonly src: string;
readonly dst: string;
readonly payload?: any;
}

View File

@ -1,30 +0,0 @@
class MessageQueue {
constructor (id) {
this._id = id;
this._lastReadAt = new Date().getTime();
this._messages = [];
}
getLastReadAt () {
return this._lastReadAt;
}
addMessage (message) {
this._messages.push(message);
}
readMessage () {
if (this._messages.length > 0) {
this._lastReadAt = new Date().getTime();
return this._messages.shift();
}
return null;
}
getMessages () {
return this._messages;
}
}
module.exports = MessageQueue;

View File

@ -0,0 +1,37 @@
import { IMessage } from "./message";
export interface IMessageQueue {
getLastReadAt(): number;
addMessage(message: IMessage): void;
readMessage(): IMessage | undefined;
getMessages(): IMessage[];
}
export class MessageQueue implements IMessageQueue {
private lastReadAt: number = new Date().getTime();
private readonly messages: IMessage[] = [];
public getLastReadAt(): number {
return this.lastReadAt;
}
public addMessage(message: IMessage): void {
this.messages.push(message);
}
public readMessage(): IMessage | undefined {
if (this.messages.length > 0) {
this.lastReadAt = new Date().getTime();
return this.messages.shift()!;
}
return undefined;
}
public getMessages(): IMessage[] {
return this.messages;
}
}

View File

@ -1,64 +0,0 @@
const MessageQueue = require('./messageQueue');
class Realm {
constructor () {
this._clients = new Map();
this._messageQueues = new Map();
}
getClientsIds () {
return [...this._clients.keys()];
}
getClientById (clientId) {
return this._clients.get(clientId);
}
setClient (client, id) {
this._clients.set(id, client);
}
removeClientById (id) {
const client = this.getClientById(id);
if (!client) return false;
this._clients.delete(id);
}
getMessageQueueById (id) {
return this._messageQueues.get(id);
}
addMessageToQueue (id, message) {
if (!this.getMessageQueueById(id)) {
this._messageQueues.set(id, new MessageQueue(id));
}
this.getMessageQueueById(id).addMessage(message);
}
clearMessageQueue (id) {
this._messageQueues.delete(id);
}
generateClientId (_genRandomId) {
const originalGenRandomId = () => {
return (Math.random().toString(36) + '0000000000000000000').substr(2, 16);
}
const genRandomId = _genRandomId && typeof _genRandomId === 'function' ?
_genRandomId :
originalGenRandomId;
let clientId = genRandomId();
while (this.getClientById(clientId)) {
clientId = genRandomId();
}
return clientId;
}
}
module.exports = Realm;

84
src/models/realm.ts Normal file
View File

@ -0,0 +1,84 @@
import uuidv4 from "uuid/v4";
import { IClient } from "./client";
import { IMessage } from "./message";
import { IMessageQueue, MessageQueue } from "./messageQueue";
export interface IRealm {
getClientsIds(): string[];
getClientById(clientId: string): IClient | undefined;
getClientsIdsWithQueue(): string[];
setClient(client: IClient, id: string): void;
removeClientById(id: string): boolean;
getMessageQueueById(id: string): IMessageQueue | undefined;
addMessageToQueue(id: string, message: IMessage): void;
clearMessageQueue(id: string): void;
generateClientId(genRandomId: () => string): string;
}
export class Realm implements IRealm {
private readonly clients: Map<string, IClient> = new Map();
private readonly messageQueues: Map<string, IMessageQueue> = new Map();
public getClientsIds(): string[] {
return [...this.clients.keys()];
}
public getClientById(clientId: string): IClient | undefined {
return this.clients.get(clientId);
}
public getClientsIdsWithQueue(): string[] {
return [...this.messageQueues.keys()];
}
public setClient(client: IClient, id: string): void {
this.clients.set(id, client);
}
public removeClientById(id: string): boolean {
const client = this.getClientById(id);
if (!client) return false;
this.clients.delete(id);
return true;
}
public getMessageQueueById(id: string): IMessageQueue | undefined {
return this.messageQueues.get(id);
}
public addMessageToQueue(id: string, message: IMessage): void {
if (!this.getMessageQueueById(id)) {
this.messageQueues.set(id, new MessageQueue());
}
this.getMessageQueueById(id)!.addMessage(message);
}
public clearMessageQueue(id: string): void {
this.messageQueues.delete(id);
}
public generateClientId(genRandomId: () => string): string {
const _genRandomId = genRandomId ? genRandomId : uuidv4;
let clientId = _genRandomId();
while (this.getClientById(clientId)) {
clientId = _genRandomId();
}
return clientId;
}
}

View File

@ -1,57 +0,0 @@
const DEFAULT_CHECK_INTERVAL = 300;
module.exports = ({ realm, config, checkInterval = DEFAULT_CHECK_INTERVAL, onClose = () => { } }) => {
const checkConnections = () => {
const clientsIds = realm.getClientsIds();
const now = new Date().getTime();
const aliveTimeout = config.alive_timeout;
for (const clientId of clientsIds) {
const client = realm.getClientById(clientId);
const timeSinceLastPing = now - client.getLastPing();
if (timeSinceLastPing < aliveTimeout) continue;
try {
client.getSocket().close();
// eslint-disable-next-line no-empty
} catch (e) { } finally {
realm.clearMessageQueue(clientId);
realm.removeClientById(clientId);
client.setSocket(null);
if (onClose) onClose(client);
}
}
};
let timeoutId;
const start = () => {
if (timeoutId) {
clearTimeout(timeoutId);
}
timeoutId = setTimeout(() => {
checkConnections();
timeoutId = null;
start();
}, checkInterval);
};
const stop = () => {
if (timeoutId) {
clearTimeout(timeoutId);
timeoutId = null;
}
};
return {
start,
stop,
CHECK_INTERVAL: checkInterval
};
};

View File

@ -0,0 +1,74 @@
import { IConfig } from "../../config";
import { IClient } from "../../models/client";
import { IRealm } from "../../models/realm";
const DEFAULT_CHECK_INTERVAL = 300;
type CustomConfig = Pick<IConfig, 'alive_timeout'>;
export class CheckBrokenConnections {
public readonly checkInterval: number;
private timeoutId: NodeJS.Timeout | null = null;
private readonly realm: IRealm;
private readonly config: CustomConfig;
private readonly onClose?: (client: IClient) => void;
constructor({ realm, config, checkInterval = DEFAULT_CHECK_INTERVAL, onClose }: {
realm: IRealm,
config: CustomConfig,
checkInterval?: number,
onClose?: (client: IClient) => void;
}) {
this.realm = realm;
this.config = config;
this.onClose = onClose;
this.checkInterval = checkInterval;
}
public start(): void {
if (this.timeoutId) {
clearTimeout(this.timeoutId);
}
this.timeoutId = setTimeout(() => {
this.checkConnections();
this.timeoutId = null;
this.start();
}, this.checkInterval);
}
public stop(): void {
if (this.timeoutId) {
clearTimeout(this.timeoutId);
this.timeoutId = null;
}
}
private checkConnections(): void {
const clientsIds = this.realm.getClientsIds();
const now = new Date().getTime();
const { alive_timeout: aliveTimeout } = this.config;
for (const clientId of clientsIds) {
const client = this.realm.getClientById(clientId)!;
const timeSinceLastPing = now - client.getLastPing();
if (timeSinceLastPing < aliveTimeout) continue;
try {
client.getSocket()?.close();
} finally {
this.realm.clearMessageQueue(clientId);
this.realm.removeClientById(clientId);
client.setSocket(null);
this.onClose?.(client);
}
}
}
}

View File

@ -1,63 +0,0 @@
const { MessageType } = require('../../enums');
module.exports = ({ realm, config, messageHandler }) => {
const pruneOutstanding = () => {
const destinationClientsIds = realm._messageQueues.keys();
const now = new Date().getTime();
const maxDiff = config.expire_timeout;
const seen = {};
for (const destinationClientId of destinationClientsIds) {
const messageQueue = realm.getMessageQueueById(destinationClientId);
const lastReadDiff = now - messageQueue.getLastReadAt();
if (lastReadDiff < maxDiff) continue;
const messages = messageQueue.getMessages();
for (const message of messages) {
if (!seen[message.src]) {
messageHandler(null, {
type: MessageType.EXPIRE,
src: message.dst,
dst: message.src
});
seen[message.src] = true;
}
}
realm.clearMessageQueue(destinationClientId);
}
};
let timeoutId;
const startMessagesExpiration = () => {
if (timeoutId) {
clearTimeout(timeoutId);
}
// Clean up outstanding messages
timeoutId = setTimeout(() => {
pruneOutstanding();
timeoutId = null;
startMessagesExpiration();
}, config.cleanup_out_msgs);
};
const stopMessagesExpiration = () => {
if (timeoutId) {
clearTimeout(timeoutId);
timeoutId = null;
}
};
return {
startMessagesExpiration,
stopMessagesExpiration
};
};

View File

@ -0,0 +1,83 @@
import { IConfig } from "../../config";
import { MessageType } from "../../enums";
import { IMessageHandler } from "../../messageHandler";
import { IRealm } from "../../models/realm";
export interface IMessagesExpire {
startMessagesExpiration(): void;
stopMessagesExpiration(): void;
}
type CustomConfig = Pick<IConfig, 'cleanup_out_msgs' | 'expire_timeout'>;
export class MessagesExpire implements IMessagesExpire {
private readonly realm: IRealm;
private readonly config: CustomConfig;
private readonly messageHandler: IMessageHandler;
private timeoutId: NodeJS.Timeout | null = null;
constructor({ realm, config, messageHandler }: {
realm: IRealm;
config: CustomConfig;
messageHandler: IMessageHandler;
}) {
this.realm = realm;
this.config = config;
this.messageHandler = messageHandler;
}
public startMessagesExpiration(): void {
if (this.timeoutId) {
clearTimeout(this.timeoutId);
}
// Clean up outstanding messages
this.timeoutId = setTimeout(() => {
this.pruneOutstanding();
this.timeoutId = null;
this.startMessagesExpiration();
}, this.config.cleanup_out_msgs);
}
public stopMessagesExpiration(): void {
if (this.timeoutId) {
clearTimeout(this.timeoutId);
this.timeoutId = null;
}
}
private pruneOutstanding(): void {
const destinationClientsIds = this.realm.getClientsIdsWithQueue();
const now = new Date().getTime();
const maxDiff = this.config.expire_timeout;
const seen: Record<string, boolean> = {};
for (const destinationClientId of destinationClientsIds) {
const messageQueue = this.realm.getMessageQueueById(destinationClientId)!;
const lastReadDiff = now - messageQueue.getLastReadAt();
if (lastReadDiff < maxDiff) continue;
const messages = messageQueue.getMessages();
for (const message of messages) {
if (!seen[message.src]) {
this.messageHandler.handle(undefined, {
type: MessageType.EXPIRE,
src: message.dst,
dst: message.src
});
seen[message.src] = true;
}
}
this.realm.clearMessageQueue(destinationClientId);
}
}
}

View File

@ -1,114 +0,0 @@
const WSS = require('ws').Server;
const url = require('url');
const EventEmitter = require('events');
const { MessageType, Errors } = require('../../enums');
const Client = require('../../models/client');
class WebSocketServer extends EventEmitter {
constructor({ server, realm, config }) {
super();
this.setMaxListeners(0);
this.realm = realm;
this.config = config;
let path = this.config.path;
path = path + (path[path.length - 1] !== '/' ? '/' : '') + 'peerjs';
this._wss = new WSS({ path, server });
this._wss.on('connection', (socket, req) => this._onSocketConnection(socket, req));
this._wss.on('error', (error) => this._onSocketError(error));
}
_onSocketConnection(socket, req) {
const { query = {} } = url.parse(req.url, true);
const { id, token, key } = query;
if (!id || !token || !key) {
return this._sendErrorAndClose(socket, Errors.INVALID_WS_PARAMETERS);
}
if (key !== this.config.key) {
return this._sendErrorAndClose(socket, Errors.INVALID_KEY);
}
const client = this.realm.getClientById(id);
if (client) {
if (token !== client.getToken()) {
// ID-taken, invalid token
socket.send(JSON.stringify({
type: MessageType.ID_TAKEN,
payload: { msg: 'ID is taken' }
}));
return socket.close();
}
return this._configureWS(socket, client);
}
this._registerClient({ socket, id, token });
}
_onSocketError(error) {
// handle error
this.emit('error', error);
}
_registerClient({ socket, id, token }) {
// Check concurrent limit
const clientsCount = this.realm.getClientsIds().length;
if (clientsCount >= this.config.concurrent_limit) {
return this._sendErrorAndClose(socket, Errors.CONNECTION_LIMIT_EXCEED);
}
const newClient = new Client({ id, token });
this.realm.setClient(newClient, id);
socket.send(JSON.stringify({ type: MessageType.OPEN }));
this._configureWS(socket, newClient);
}
_configureWS(socket, client) {
client.setSocket(socket);
// Cleanup after a socket closes.
socket.on('close', () => {
if (client.socket === socket) {
this.realm.removeClientById(client.getId());
this.emit('close', client);
}
});
// Handle messages from peers.
socket.on('message', (data) => {
try {
const message = JSON.parse(data);
message.src = client.getId();
this.emit('message', client, message);
} catch (e) {
this.emit('error', e);
}
});
this.emit('connection', client);
}
_sendErrorAndClose(socket, msg) {
socket.send(
JSON.stringify({
type: MessageType.ERROR,
payload: { msg }
})
);
socket.close();
}
}
module.exports = WebSocketServer;

View File

@ -0,0 +1,143 @@
import EventEmitter from "events";
import { IncomingMessage } from "http";
import url from "url";
import WebSocketLib from "ws";
import { IConfig } from "../../config";
import { Errors, MessageType } from "../../enums";
import { Client, IClient } from "../../models/client";
import { IRealm } from "../../models/realm";
import { MyWebSocket } from "./webSocket";
export interface IWebSocketServer extends EventEmitter {
readonly path: string;
}
interface IAuthParams {
id?: string;
token?: string;
key?: string;
}
type CustomConfig = Pick<IConfig, 'path' | 'key' | 'concurrent_limit'>;
const WS_PATH = 'peerjs';
export class WebSocketServer extends EventEmitter implements IWebSocketServer {
public readonly path: string;
private readonly realm: IRealm;
private readonly config: CustomConfig;
public readonly socketServer: WebSocketLib.Server;
constructor({ server, realm, config }: { server: any, realm: IRealm, config: CustomConfig; }) {
super();
this.setMaxListeners(0);
this.realm = realm;
this.config = config;
const path = this.config.path;
this.path = `${path}${path.endsWith('/') ? "" : "/"}${WS_PATH}`;
this.socketServer = new WebSocketLib.Server({ path: this.path, server });
this.socketServer.on("connection", (socket: MyWebSocket, req) => this._onSocketConnection(socket, req));
this.socketServer.on("error", (error: Error) => this._onSocketError(error));
}
private _onSocketConnection(socket: MyWebSocket, req: IncomingMessage): void {
const { query = {} } = url.parse(req.url!, true);
const { id, token, key }: IAuthParams = query;
if (!id || !token || !key) {
return this._sendErrorAndClose(socket, Errors.INVALID_WS_PARAMETERS);
}
if (key !== this.config.key) {
return this._sendErrorAndClose(socket, Errors.INVALID_KEY);
}
const client = this.realm.getClientById(id);
if (client) {
if (token !== client.getToken()) {
// ID-taken, invalid token
socket.send(JSON.stringify({
type: MessageType.ID_TAKEN,
payload: { msg: "ID is taken" }
}));
return socket.close();
}
return this._configureWS(socket, client);
}
this._registerClient({ socket, id, token });
}
private _onSocketError(error: Error): void {
// handle error
this.emit("error", error);
}
private _registerClient({ socket, id, token }:
{
socket: MyWebSocket;
id: string;
token: string;
}): void {
// Check concurrent limit
const clientsCount = this.realm.getClientsIds().length;
if (clientsCount >= this.config.concurrent_limit) {
return this._sendErrorAndClose(socket, Errors.CONNECTION_LIMIT_EXCEED);
}
const newClient: IClient = new Client({ id, token });
this.realm.setClient(newClient, id);
socket.send(JSON.stringify({ type: MessageType.OPEN }));
this._configureWS(socket, newClient);
}
private _configureWS(socket: MyWebSocket, client: IClient): void {
client.setSocket(socket);
// Cleanup after a socket closes.
socket.on("close", () => {
if (client.getSocket() === socket) {
this.realm.removeClientById(client.getId());
this.emit("close", client);
}
});
// Handle messages from peers.
socket.on("message", (data: WebSocketLib.Data) => {
try {
const message = JSON.parse(data as string);
message.src = client.getId();
this.emit("message", client, message);
} catch (e) {
this.emit("error", e);
}
});
this.emit("connection", client);
}
private _sendErrorAndClose(socket: MyWebSocket, msg: Errors): void {
socket.send(
JSON.stringify({
type: MessageType.ERROR,
payload: { msg }
})
);
socket.close();
}
}

View File

@ -0,0 +1,4 @@
import EventEmitter from "events";
import WebSocketLib from "ws";
export type MyWebSocket = WebSocketLib & EventEmitter;

View File

@ -1,6 +1,6 @@
const { expect } = require('chai'); import { expect } from 'chai';
const Client = require('../../../../src/models/client'); import { Client } from '../../../../src/models/client';
const heartbeatHandler = require('../../../../src/messageHandler/handlers/heartbeat'); import { HeartbeatHandler } from '../../../../src/messageHandler/handlers';
describe('Heartbeat handler', () => { describe('Heartbeat handler', () => {
it('should update last ping time', () => { it('should update last ping time', () => {
@ -9,7 +9,7 @@ describe('Heartbeat handler', () => {
const nowTime = new Date().getTime(); const nowTime = new Date().getTime();
heartbeatHandler(client); HeartbeatHandler(client);
expect(client.getLastPing()).to.be.closeTo(nowTime, 2); expect(client.getLastPing()).to.be.closeTo(nowTime, 2);
}); });

View File

@ -0,0 +1,96 @@
import { expect } from 'chai';
import { Client } from '../../../../src/models/client';
import { TransmissionHandler } from '../../../../src/messageHandler/handlers';
import { Realm } from '../../../../src/models/realm';
import { MessageType } from '../../../../src/enums';
import { MyWebSocket } from '../../../../src/services/webSocketServer/webSocket';
const createFakeSocket = (): MyWebSocket => {
/* eslint-disable @typescript-eslint/no-empty-function */
const sock = {
send: (): void => { },
close: (): void => { },
on: (): void => { },
};
/* eslint-enable @typescript-eslint/no-empty-function */
return (sock as unknown as MyWebSocket);
};
describe('Transmission handler', () => {
it('should save message in queue when destination client not connected', () => {
const realm = new Realm();
const handleTransmission = TransmissionHandler({ realm });
const clientFrom = new Client({ id: 'id1', token: '' });
const idTo = 'id2';
realm.setClient(clientFrom, clientFrom.getId());
handleTransmission(clientFrom, { type: MessageType.OFFER, src: clientFrom.getId(), dst: idTo });
expect(realm.getMessageQueueById(idTo)?.getMessages().length).to.be.eq(1);
});
it('should not save LEAVE and EXPIRE messages in queue when destination client not connected', () => {
const realm = new Realm();
const handleTransmission = TransmissionHandler({ realm });
const clientFrom = new Client({ id: 'id1', token: '' });
const idTo = 'id2';
realm.setClient(clientFrom, clientFrom.getId());
handleTransmission(clientFrom, { type: MessageType.LEAVE, src: clientFrom.getId(), dst: idTo });
handleTransmission(clientFrom, { type: MessageType.EXPIRE, src: clientFrom.getId(), dst: idTo });
expect(realm.getMessageQueueById(idTo)).to.be.undefined;
});
it('should send message to destination client when destination client connected', () => {
const realm = new Realm();
const handleTransmission = TransmissionHandler({ realm });
const clientFrom = new Client({ id: 'id1', token: '' });
const clientTo = new Client({ id: 'id2', token: '' });
const socketTo = createFakeSocket();
clientTo.setSocket(socketTo);
realm.setClient(clientTo, clientTo.getId());
let sent = false;
socketTo.send = (): void => {
sent = true;
};
handleTransmission(clientFrom, { type: MessageType.OFFER, src: clientFrom.getId(), dst: clientTo.getId() });
expect(sent).to.be.true;
});
it('should send LEAVE message to source client when sending to destination client failed', () => {
const realm = new Realm();
const handleTransmission = TransmissionHandler({ realm });
const clientFrom = new Client({ id: 'id1', token: '' });
const clientTo = new Client({ id: 'id2', token: '' });
const socketFrom = createFakeSocket();
const socketTo = createFakeSocket();
clientFrom.setSocket(socketFrom);
clientTo.setSocket(socketTo);
realm.setClient(clientFrom, clientFrom.getId());
realm.setClient(clientTo, clientTo.getId());
let sent = false;
socketFrom.send = (data: string): void => {
if (JSON.parse(data)?.type === MessageType.LEAVE) {
sent = true;
}
};
socketTo.send = (): void => {
throw Error();
};
handleTransmission(clientFrom, { type: MessageType.OFFER, src: clientFrom.getId(), dst: clientTo.getId() });
expect(sent).to.be.true;
});
});

View File

@ -0,0 +1,23 @@
import { expect } from 'chai';
import { HandlersRegistry } from '../../src/messageHandler/handlersRegistry';
import { Handler } from '../../src/messageHandler/handler';
import { MessageType } from '../../src/enums';
describe('HandlersRegistry', () => {
it('should execute handler for message type', () => {
const handlersRegistry = new HandlersRegistry();
let handled = false;
const handler: Handler = (): boolean => {
handled = true;
return true;
};
handlersRegistry.registerHandler(MessageType.OPEN, handler);
handlersRegistry.handle(undefined, { type: MessageType.OPEN, src: 'src', dst: 'dst' });
expect(handled).to.be.true;
});
});

View File

@ -0,0 +1,62 @@
import { expect } from 'chai';
import { MessageQueue } from '../../src/models/messageQueue';
import { MessageType } from '../../src/enums';
import { IMessage } from '../../src/models/message';
import { wait } from '../utils';
describe('MessageQueue', () => {
const createTestMessage = (): IMessage => {
return {
type: MessageType.OPEN,
src: 'src',
dst: 'dst'
};
};
describe('#addMessage', () => {
it('should add message to queue', () => {
const queue = new MessageQueue();
queue.addMessage(createTestMessage());
expect(queue.getMessages().length).to.eq(1);
});
});
describe('#readMessage', () => {
it('should return undefined for empty queue', () => {
const queue = new MessageQueue();
expect(queue.readMessage()).to.be.undefined;
});
it('should return message if any exists in queue', () => {
const queue = new MessageQueue();
const message = createTestMessage();
queue.addMessage(message);
expect(queue.readMessage()).to.deep.eq(message);
expect(queue.readMessage()).to.be.undefined;
});
});
describe('#getLastReadAt', () => {
it('should not be changed if no messages when read', () => {
const queue = new MessageQueue();
const lastReadAt = queue.getLastReadAt();
queue.readMessage();
expect(queue.getLastReadAt()).to.be.eq(lastReadAt);
});
it('should be changed when read message', async () => {
const queue = new MessageQueue();
const lastReadAt = queue.getLastReadAt();
queue.addMessage(createTestMessage());
await wait(10);
expect(queue.getLastReadAt()).to.be.eq(lastReadAt);
queue.readMessage();
expect(queue.getLastReadAt()).to.be.greaterThan(lastReadAt + 10);
});
});
});

View File

@ -1,12 +1,12 @@
const { expect } = require('chai'); import { expect } from 'chai';
const Realm = require('../../src/models/realm'); import { Realm } from '../../src/models/realm';
const Client = require('../../src/models/client'); import { Client } from '../../src/models/client';
describe('Realm', () => { describe('Realm', () => {
describe('#generateClientId', () => { describe('#generateClientId', () => {
it('should generate a 16-character ID', () => { it('should generate a 36-character UUID', () => {
const realm = new Realm(); const realm = new Realm();
expect(realm.generateClientId().length).to.eq(16); expect(realm.generateClientId().length).to.eq(36);
expect(realm.generateClientId(() => 'abcd')).to.eq('abcd'); expect(realm.generateClientId(() => 'abcd')).to.eq('abcd');
}); });
}); });

View File

@ -1,43 +0,0 @@
const { expect } = require('chai');
const Client = require('../../../src/models/client');
const Realm = require('../../../src/models/realm');
const checkBrokenConnectionsBuilder = require('../../../src/services/checkBrokenConnections');
describe('checkBrokenConnections service', () => {
it('should remove client after 2 checks', (done) => {
const realm = new Realm();
const doubleCheckTime = 55;//~ equals to checkBrokenConnections.CHECK_INTERVAL * 2
const checkBrokenConnections = checkBrokenConnectionsBuilder({ realm, config: { alive_timeout: doubleCheckTime }, checkInterval: 30 });
const client = new Client({ id: 'id', token: '' });
realm.setClient(client, 'id');
checkBrokenConnections.start();
setTimeout(() => {
expect(realm.getClientById('id')).to.be.undefined;
checkBrokenConnections.stop();
done();
}, checkBrokenConnections.CHECK_INTERVAL * 2 + 3);
});
it('should remove client after 1 ping', (done) => {
const realm = new Realm();
const doubleCheckTime = 55;//~ equals to checkBrokenConnections.CHECK_INTERVAL * 2
const checkBrokenConnections = checkBrokenConnectionsBuilder({ realm, config: { alive_timeout: doubleCheckTime }, checkInterval: 30 });
const client = new Client({ id: 'id', token: '' });
realm.setClient(client, 'id');
checkBrokenConnections.start();
//set ping after first check
setTimeout(() => {
client.setLastPing(new Date().getTime());
setTimeout(() => {
expect(realm.getClientById('id')).to.be.undefined;
checkBrokenConnections.stop();
done();
}, checkBrokenConnections.CHECK_INTERVAL * 2 + 10);
}, checkBrokenConnections.CHECK_INTERVAL);
});
});

View File

@ -0,0 +1,44 @@
import { expect } from 'chai';
import { Client } from '../../../src/models/client';
import { Realm } from '../../../src/models/realm';
import { CheckBrokenConnections } from '../../../src/services/checkBrokenConnections';
import { wait } from '../../utils';
describe('CheckBrokenConnections', () => {
it('should remove client after 2 checks', async () => {
const realm = new Realm();
const doubleCheckTime = 55;//~ equals to checkBrokenConnections.checkInterval * 2
const checkBrokenConnections = new CheckBrokenConnections({ realm, config: { alive_timeout: doubleCheckTime }, checkInterval: 30 });
const client = new Client({ id: 'id', token: '' });
realm.setClient(client, 'id');
checkBrokenConnections.start();
await wait(checkBrokenConnections.checkInterval * 2 + 30);
expect(realm.getClientById('id')).to.be.undefined;
checkBrokenConnections.stop();
});
it('should remove client after 1 ping', async () => {
const realm = new Realm();
const doubleCheckTime = 55;//~ equals to checkBrokenConnections.checkInterval * 2
const checkBrokenConnections = new CheckBrokenConnections({ realm, config: { alive_timeout: doubleCheckTime }, checkInterval: 30 });
const client = new Client({ id: 'id', token: '' });
realm.setClient(client, 'id');
checkBrokenConnections.start();
//set ping after first check
await wait(checkBrokenConnections.checkInterval);
client.setLastPing(new Date().getTime());
await wait(checkBrokenConnections.checkInterval * 2 + 10);
expect(realm.getClientById('id')).to.be.undefined;
checkBrokenConnections.stop();
});
});

View File

@ -0,0 +1,78 @@
import { expect } from 'chai';
import { Client } from '../../../src/models/client';
import { Realm } from '../../../src/models/realm';
import { IMessage } from '../../../src/models/message';
import { MessagesExpire } from '../../../src/services/messagesExpire';
import { MessageHandler } from '../../../src/messageHandler';
import { MessageType } from '../../../src/enums';
import { wait } from '../../utils';
describe('MessagesExpire', () => {
const createTestMessage = (): IMessage => {
return {
type: MessageType.OPEN,
src: 'src',
dst: 'dst'
};
};
it('should remove client if no read from queue', async () => {
const realm = new Realm();
const messageHandler = new MessageHandler(realm);
const checkInterval = 10;
const expireTimeout = 50;
const config = { cleanup_out_msgs: checkInterval, expire_timeout: expireTimeout };
const messagesExpire = new MessagesExpire({ realm, config, messageHandler });
const client = new Client({ id: 'id', token: '' });
realm.setClient(client, 'id');
realm.addMessageToQueue(client.getId(), createTestMessage());
messagesExpire.startMessagesExpiration();
await wait(checkInterval * 2);
expect(realm.getMessageQueueById(client.getId())?.getMessages().length).to.be.eq(1);
await wait(expireTimeout);
expect(realm.getMessageQueueById(client.getId())).to.be.undefined;
messagesExpire.stopMessagesExpiration();
});
it('should fire EXPIRE message', async () => {
const realm = new Realm();
const messageHandler = new MessageHandler(realm);
const checkInterval = 10;
const expireTimeout = 50;
const config = { cleanup_out_msgs: checkInterval, expire_timeout: expireTimeout };
const messagesExpire = new MessagesExpire({ realm, config, messageHandler });
const client = new Client({ id: 'id', token: '' });
realm.setClient(client, 'id');
realm.addMessageToQueue(client.getId(), createTestMessage());
let handled = false;
messageHandler.handle = (client, message): boolean => {
expect(client).to.be.undefined;
expect(message.type).to.be.eq(MessageType.EXPIRE);
handled = true;
return true;
};
messagesExpire.startMessagesExpiration();
await wait(checkInterval * 2);
await wait(expireTimeout);
expect(handled).to.be.true;
messagesExpire.stopMessagesExpiration();
});
});

View File

@ -0,0 +1,195 @@
import { expect } from 'chai';
import { Server, WebSocket } from 'mock-socket';
import { Realm } from '../../../src/models/realm';
import { WebSocketServer } from '../../../src/services/webSocketServer';
import { Errors, MessageType } from '../../../src/enums';
import { wait } from '../../utils';
type Destroyable<T> = T & { destroy?: () => Promise<void>; };
const checkOpen = async (c: WebSocket): Promise<boolean> => {
return new Promise(resolve => {
c.onmessage = (event: object & { data?: string; }): void => {
c.onmessage = null;
const message = JSON.parse(event.data as string);
resolve(message.type === MessageType.OPEN);
};
});
};
const checkSequence = async (c: WebSocket, msgs: { type: MessageType; error?: Errors; }[]): Promise<boolean> => {
return new Promise(resolve => {
const restMessages = [...msgs];
const finish = (success = false): void => {
c.onmessage = null;
resolve(success);
};
c.onmessage = (event: object & { data?: string; }): void => {
const [mes] = restMessages;
if (!mes) {
return finish();
}
restMessages.shift();
const message = JSON.parse(event.data as string);
if (message.type !== mes.type) {
return finish();
}
const isOk = !mes.error || message.payload?.msg === mes.error;
if (!isOk) {
return finish();
}
if (restMessages.length === 0) {
finish(true);
}
};
});
};
const createTestServer = ({ realm, config, url }: { realm: Realm; config: { path: string; key: string; concurrent_limit: number; }; url: string; }): Destroyable<WebSocketServer> => {
const server = new Server(url);
const webSocketServer: Destroyable<WebSocketServer> = new WebSocketServer({ server, realm, config });
server.on('connection', (socket: WebSocket & { on?: (eventName: string, callback: () => void) => void; }) => {
const s = webSocketServer.socketServer;
s.emit('connection', socket, { url: socket.url });
socket.onclose = (): void => {
const userId = socket.url.split('?')[1]?.split('&').find(p => p.startsWith('id'))?.split('=')[1];
if (!userId) return;
const client = realm.getClientById(userId);
const clientSocket = client?.getSocket();
if (!clientSocket) return;
(clientSocket as unknown as WebSocket).listeners['server::close']?.forEach((s: () => void) => s());
};
socket.onmessage = (event: object & { data?: string; }): void => {
const userId = socket.url.split('?')[1]?.split('&').find(p => p.startsWith('id'))?.split('=')[1];
if (!userId) return;
const client = realm.getClientById(userId);
const clientSocket = client?.getSocket();
if (!clientSocket) return;
(clientSocket as unknown as WebSocket).listeners['server::message']?.forEach((s: (data: object) => void) => s(event));
};
});
webSocketServer.destroy = async (): Promise<void> => {
server.close();
};
return webSocketServer;
};
describe('WebSocketServer', () => {
it('should return valid path', () => {
const realm = new Realm();
const config = { path: '/', key: 'testKey', concurrent_limit: 1 };
const config2 = { ...config, path: 'path' };
const server = new Server('path1');
const server2 = new Server('path2');
const webSocketServer = new WebSocketServer({ server, realm, config });
expect(webSocketServer.path).to.be.eq('/peerjs');
const webSocketServer2 = new WebSocketServer({ server: server2, realm, config: config2 });
expect(webSocketServer2.path).to.be.eq('path/peerjs');
server.stop();
server2.stop();
});
it(`should check client's params`, async () => {
const realm = new Realm();
const config = { path: '/', key: 'testKey', concurrent_limit: 1 };
const fakeURL = 'ws://localhost:8080/peerjs';
const getError = async (url: string, validError: Errors = Errors.INVALID_WS_PARAMETERS): Promise<boolean> => {
const webSocketServer = createTestServer({ url, realm, config });
const ws = new WebSocket(url);
const errorSent = await checkSequence(ws, [{ type: MessageType.ERROR, error: validError }]);
ws.close();
await webSocketServer.destroy?.();
return errorSent;
};
expect(await getError(fakeURL)).to.be.true;
expect(await getError(`${fakeURL}?key=${config.key}`)).to.be.true;
expect(await getError(`${fakeURL}?key=${config.key}&id=1`)).to.be.true;
expect(await getError(`${fakeURL}?key=notValidKey&id=userId&token=userToken`, Errors.INVALID_KEY)).to.be.true;
});
it(`should check concurrent limit`, async () => {
const realm = new Realm();
const config = { path: '/', key: 'testKey', concurrent_limit: 1 };
const fakeURL = 'ws://localhost:8080/peerjs';
const createClient = (id: string): Destroyable<WebSocket> => {
const url = `${fakeURL}?key=${config.key}&id=${id}&token=${id}`;
const webSocketServer = createTestServer({ url, realm, config });
const ws: Destroyable<WebSocket> = new WebSocket(url);
ws.destroy = async (): Promise<void> => {
ws.close();
wait(10);
webSocketServer.destroy?.();
wait(10);
ws.destroy = undefined;
};
return ws;
};
const c1 = createClient('1');
expect(await checkOpen(c1)).to.be.true;
const c2 = createClient('2');
expect(await checkSequence(c2, [
{ type: MessageType.ERROR, error: Errors.CONNECTION_LIMIT_EXCEED }
])).to.be.true;
await c1.destroy?.();
await c2.destroy?.();
await wait(10);
expect(realm.getClientsIds().length).to.be.eq(0);
const c3 = createClient('3');
expect(await checkOpen(c3)).to.be.true;
await c3.destroy?.();
});
});

1
test/utils.ts Normal file
View File

@ -0,0 +1 @@
export const wait = (ms: number): Promise<void> => new Promise(resolve => setTimeout(resolve, ms));

27
tsconfig.json Normal file
View File

@ -0,0 +1,27 @@
{
"compilerOptions": {
"lib": [
"esnext"
],
"target": "es2016",
"module": "commonjs",
"strict": true,
"esModuleInterop": true,
"downlevelIteration": true,
"moduleResolution": "node",
"noImplicitAny": true,
"noUnusedLocals": true,
"noUnusedParameters": true,
"resolveJsonModule": true,
"skipLibCheck": true,
"sourceMap": false,
"outDir": "dist"
},
"include": [
"./src/**/*",
],
"exclude": [
"test",
"bin",
]
}