Merge pull request #342 from peers/master

chore: release
This commit is contained in:
Jonas Gloning 2023-03-07 20:25:02 +01:00 committed by GitHub
commit ced7d65267
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
74 changed files with 26397 additions and 24194 deletions

View File

@ -0,0 +1,22 @@
// For format details, see https://aka.ms/devcontainer.json. For config options, see the
// README at: https://github.com/devcontainers/templates/tree/main/src/typescript-node
{
"name": "Node.js & TypeScript",
// Or use a Dockerfile or Docker Compose file. More info: https://containers.dev/guide/dockerfile
"image": "mcr.microsoft.com/devcontainers/typescript-node:0-18",
// Features to add to the dev container. More info: https://containers.dev/features.
// "features": {},
// Use 'forwardPorts' to make a list of ports inside the container available locally.
"forwardPorts": [9000],
// Use 'postCreateCommand' to run commands after the container is created.
"postCreateCommand": "npm clean-install"
// Configure tool-specific properties.
// "customizations": {},
// Uncomment to connect as root instead. More info: https://aka.ms/dev-containers-non-root.
// "remoteUser": "root"
}

6
.dockerignore Normal file
View File

@ -0,0 +1,6 @@
.git
.nyc_output
.parcel-cache
coverage
dist
node_modules

View File

@ -1,37 +1,16 @@
{
"parser": "@typescript-eslint/parser",
"extends": [
"eslint:recommended",
"plugin:@typescript-eslint/eslint-recommended",
"plugin:@typescript-eslint/recommended"
],
"env": {
"node": true,
"es6": true,
"mocha": true
},
"parserOptions": {
"ecmaVersion": 2018,
"sourceType": "module"
},
"rules": {
"no-var": "error",
"no-console": "off",
"@typescript-eslint/camelcase": "off",
"@typescript-eslint/interface-name-prefix": "off",
"@typescript-eslint/member-delimiter-style": [
"error",
{
"multiline": {
"delimiter": "semi",
"requireLast": true
},
"singleline": {
"delimiter": "semi",
"requireLast": true
}
}
],
"@typescript-eslint/explicit-function-return-type": "off"
}
}
"parser": "@typescript-eslint/parser",
"extends": [
"eslint:recommended",
"plugin:@typescript-eslint/eslint-recommended",
"plugin:@typescript-eslint/recommended"
],
"env": {
"node": true,
"es6": true
},
"parserOptions": {
"ecmaVersion": 2018,
"sourceType": "module"
}
}

View File

@ -1 +1 @@
blank_issues_enabled: false
blank_issues_enabled: false

View File

@ -1,28 +1,30 @@
---
name: peer template
about: Create a report to help us improve
title: ''
labels: ''
assignees: ''
title: ""
labels: ""
assignees: ""
---
### I'm having an issue:
- Give an expressive description of what is went wrong
- Version of `peer` you're experiencing this issue
- Nodejs version?
- Platform name and its version (Win, Mac, Linux)?
- Nice to have: a repository with code to reproduce the issue
- If you're getting an error or exception, please provide its full stack-trace as plain-text or screenshot
- Give an expressive description of what is went wrong
- Version of `peer` you're experiencing this issue
- Nodejs version?
- Platform name and its version (Win, Mac, Linux)?
- Nice to have: a repository with code to reproduce the issue
- If you're getting an error or exception, please provide its full stack-trace as plain-text or screenshot
### I have a suggestion:
- Describe your feature / request
- How you're going to use it? Give a usage example(s)
- Describe your feature / request
- How you're going to use it? Give a usage example(s)
### Documentation is missing something or incorrect (have typos, etc.):
- Give an expressive description what you have changed/added and why
- Make sure you're using correct markdown markup
- Make sure all code blocks starts with triple ``` (*backtick*) and have a syntax tag, for more read [this docs](https://help.github.com/articles/creating-and-highlighting-code-blocks/#syntax-highlighting)
- Post addition/changes in issue, we will manage it
- Give an expressive description what you have changed/added and why
- Make sure you're using correct markdown markup
- Make sure all code blocks starts with triple ``` (_backtick_) and have a syntax tag, for more read [this docs](https://help.github.com/articles/creating-and-highlighting-code-blocks/#syntax-highlighting)
- Post addition/changes in issue, we will manage it
## Thank you, and do not forget to get rid of this default message

28
.github/workflows/docker.yml vendored Normal file
View File

@ -0,0 +1,28 @@
name: Docker build & publish
on:
push:
branches: ["master"]
pull_request:
branches: ["master"]
jobs:
docker:
runs-on: ubuntu-latest
steps:
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2
- name: Login to Docker Hub
uses: docker/login-action@v2
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }}
- name: Build
if: ${{ github.event_name == 'pull_request' }}
uses: docker/build-push-action@v4
- name: Build & publish
if: ${{ github.event_name == 'push' }}
uses: docker/build-push-action@v4
with:
push: true
tags: peerjs/peerjs-server-test:nightly

18
.github/workflows/fly.yml vendored Normal file
View File

@ -0,0 +1,18 @@
name: Fly Deploy
on:
push:
branches:
- master
env:
FLY_API_TOKEN: ${{ secrets.FLY_API_TOKEN }}
jobs:
deploy:
name: Deploy app
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: superfly/flyctl-actions/setup-flyctl@master
- run: flyctl deploy --remote-only

35
.github/workflows/node.js.yml vendored Normal file
View File

@ -0,0 +1,35 @@
# This workflow will do a clean installation of node dependencies, cache/restore them, build the source code and run tests across different versions of node
# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-nodejs
name: Node.js CI
on:
push:
branches: ["master"]
pull_request:
branches: ["master"]
jobs:
build:
runs-on: ubuntu-latest
strategy:
matrix:
node-version: [14.x, 16.x, 18.x]
# See supported Node.js release schedule at https://nodejs.org/en/about/releases/
steps:
- uses: actions/checkout@v3
- name: Use Node.js ${{ matrix.node-version }}
uses: actions/setup-node@v3
with:
node-version: ${{ matrix.node-version }}
cache: "npm"
- run: npm ci
- run: npm run build
- run: npm run lint
- run: npm run coverage
- name: Publish code coverage to CodeClimate
uses: paambaati/codeclimate-action@v3.2.0
env:
CC_TEST_REPORTER_ID: ${{secrets.CC_TEST_REPORTER_ID}}

23
.github/workflows/prettier.yml vendored Normal file
View File

@ -0,0 +1,23 @@
# From https://til.simonwillison.net/github-actions/prettier-github-actions
name: Check JavaScript for conformance with Prettier
on:
push:
pull_request:
jobs:
prettier:
runs-on: ubuntu-latest
steps:
- name: Check out repo
uses: actions/checkout@v3
- uses: actions/cache@v3
name: Configure npm caching
with:
path: ~/.npm
key: ${{ runner.os }}-npm-${{ hashFiles('**/workflows/prettier.yml') }}
restore-keys: |
${{ runner.os }}-npm-
- name: Run prettier
run: |-
npx prettier --check .

View File

@ -10,18 +10,20 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v2
uses: actions/checkout@v3
with:
fetch-depth: 0
- name: Setup Node.js
uses: actions/setup-node@v2
uses: actions/setup-node@v3
with:
node-version: "lts/*"
- name: Install dependencies
run: npm ci
- name: Build
run: npm run build
- name: Import GPG key
id: import_gpg
uses: crazy-max/ghaction-import-gpg@v4
uses: crazy-max/ghaction-import-gpg@v5
with:
gpg_private_key: ${{ secrets.GPG_PRIVATE_KEY }}
passphrase: ${{ secrets.GPG_PASSPHRASE }}
@ -33,4 +35,6 @@ jobs:
NPM_TOKEN: ${{ secrets.NPM_TOKEN }}
GIT_COMMITTER_NAME: ${{ steps.import_gpg.outputs.name }}
GIT_COMMITTER_EMAIL: ${{ steps.import_gpg.outputs.email }}
DOCKER_REGISTRY_USER: ${{ secrets.DOCKERHUB_USERNAME }}
DOCKER_REGISTRY_PASSWORD: ${{ secrets.DOCKERHUB_TOKEN }}
run: npx semantic-release

6
.gitignore vendored
View File

@ -1,4 +1,5 @@
lib-cov
.nyc_output
coverage
*.seed
*.log
*.csv
@ -7,6 +8,7 @@ lib-cov
*.pid
*.gz
.parcel-cache
dist
pids
logs
@ -17,4 +19,4 @@ npm-debug.log
.idea
.cache
.vscode
.vscode

5
.prettierignore Normal file
View File

@ -0,0 +1,5 @@
dist
coverage
# semantic-release
CHANGELOG.md

3
.prettierrc.toml Normal file
View File

@ -0,0 +1,3 @@
trailingComma = "all"
semi = true
useTabs = true

View File

@ -1,11 +1,31 @@
{
"branches": ["stable", { "name": "rc", "prerelease": true }],
"branches": [
"stable",
{
"name": "rc",
"prerelease": true
}
],
"plugins": [
"@semantic-release/commit-analyzer",
"@semantic-release/release-notes-generator",
"@semantic-release/changelog",
"@semantic-release/npm",
"@semantic-release/git",
"@semantic-release/github"
"@semantic-release/github",
[
"@codedependant/semantic-release-docker",
{
"dockerTags": [
"{{#if prerelease.[0]}}{{prerelease.[0]}}{{else}}latest{{/if}}",
"{{major}}-{{#if prerelease.[0]}}{{prerelease.[0]}}{{else}}latest{{/if}}",
"{{major}}.{{minor}}-{{#if prerelease.[0]}}{{prerelease.[0]}}{{else}}latest{{/if}}",
"{{version}}"
],
"dockerImage": "peerjs-server",
"dockerFile": "Dockerfile",
"dockerProject": "peerjs"
}
]
]
}

View File

@ -1,7 +0,0 @@
language: node_js
node_js:
- node
- lts/*
- 14
- 12
- 10

14
.whitesource Normal file
View File

@ -0,0 +1,14 @@
{
"scanSettings": {
"baseBranches": []
},
"checkRunSettings": {
"vulnerableCheckRunConclusionLevel": "failure",
"displayMode": "diff",
"useMendCheckNames": true
},
"issueSettings": {
"minSeverityLevel": "LOW",
"issueType": "DEPENDENCY"
}
}

View File

@ -1,10 +1,18 @@
FROM node:alpine
FROM docker.io/library/node:18.14.2 as build
RUN mkdir /peer-server
WORKDIR /peer-server
COPY bin ./bin
COPY dist ./dist
COPY package.json .
RUN npm install --production
EXPOSE 9000
ENTRYPOINT ["node", "bin/peerjs"]
CMD [ "--port", "9000", "--path", "/myapp" ]
COPY package.json package-lock.json ./
RUN npm clean-install
COPY . ./
RUN npm run build
RUN npm run test
FROM docker.io/library/node:18.14.2-alpine as production
RUN mkdir /peer-server
WORKDIR /peer-server
COPY package.json package-lock.json ./
RUN npm clean-install --omit=dev
COPY --from=build /peer-server/dist/bin/peerjs.js ./
ENV PORT 9000
EXPOSE ${PORT}
ENTRYPOINT ["node", "peerjs.js"]

View File

@ -3,8 +3,8 @@
**We do not collect or store any information.**
While you are connected to a PeerJS server, your IP address, randomly-generated
client ID, and signalling data are kept in the server's memory. With default
client ID, and signalling data are kept in the server's memory. With default
settings, the server will remove this information from memory 60 seconds after
you stop communicating with the service. (See the
you stop communicating with the service. (See the
[`alive_timeout`](https://github.com/peers/peerjs-server#config--cli-options)
setting.)

182
README.md
View File

@ -4,7 +4,8 @@
[![npm version](https://badge.fury.io/js/peer.svg)](https://www.npmjs.com/package/peer)
[![Downloads](https://img.shields.io/npm/dm/peer.svg)](https://www.npmjs.com/package/peer)
[![Docker Image Size (latest semver)](https://img.shields.io/docker/image-size/peerjs/peerjs-server)](https://hub.docker.com/r/peerjs/peerjs-server)
# PeerServer: A server for PeerJS #
# PeerServer: A server for PeerJS
PeerServer helps establishing connections between PeerJS clients. Data is not proxied through the server.
@ -23,20 +24,23 @@ Run your own server on Gitpod!
If you don't want to develop anything, just enter few commands below.
1. Install the package globally:
```sh
$ npm install peer -g
```
```sh
$ npm install peer -g
```
2. Run the server:
```sh
$ peerjs --port 9000 --key peerjs --path /myapp
Started PeerServer on ::, port: 9000, path: /myapp (v. 0.3.2)
```
```sh
$ peerjs --port 9000 --key peerjs --path /myapp
Started PeerServer on ::, port: 9000, path: /myapp (v. 0.3.2)
```
3. Check it: http://127.0.0.1:9000/myapp It should returns JSON with name, description and website fields.
#### Docker
Also, you can use Docker image to run a new container:
```sh
$ docker run -p 9000:9000 -d peerjs/peerjs-server
```
@ -48,24 +52,28 @@ $ kubectl run peerjs-server --image=peerjs/peerjs-server --port 9000 --expose --
```
### Create a custom server:
If you have your own server, you can attach PeerServer.
1. Install the package:
```bash
# $ cd your-project-path
# with npm
$ npm install peer
# with yarn
$ yarn add peer
```
2. Use PeerServer object to create a new server:
```javascript
const { PeerServer } = require('peer');
const peerServer = PeerServer({ port: 9000, path: '/myapp' });
```
```bash
# $ cd your-project-path
# with npm
$ npm install peer
# with yarn
$ yarn add peer
```
2. Use PeerServer object to create a new server:
```javascript
const { PeerServer } = require("peer");
const peerServer = PeerServer({ port: 9000, path: "/myapp" });
```
3. Check it: http://127.0.0.1:9000/myapp It should returns JSON with name, description and website fields.
@ -73,64 +81,66 @@ If you have your own server, you can attach PeerServer.
```html
<script>
const peer = new Peer('someid', {
host: 'localhost',
port: 9000,
path: '/myapp'
});
const peer = new Peer("someid", {
host: "localhost",
port: 9000,
path: "/myapp",
});
</script>
```
## Config / CLI options
You can provide config object to `PeerServer` function or specify options for `peerjs` CLI.
| CLI option | JS option | Description | Required | Default |
| -------- | ------- | ------------- | :------: | :---------: |
| `--port, -p` | `port` | Port to listen (number) | **Yes** | |
| `--key, -k` | `key` | Connection key (string). Client must provide it to call API methods | No | `"peerjs"` |
| `--path` | `path` | Path (string). The server responds for requests to the root URL + path. **E.g.** Set the `path` to `/myapp` and run server on 9000 port via `peerjs --port 9000 --path /myapp` Then open http://127.0.0.1:9000/myapp - you should see a JSON reponse. | No | `"/"` |
| `--proxied` | `proxied` | Set `true` if PeerServer stays behind a reverse proxy (boolean) | No | `false` |
| `--expire_timeout, -t` | `expire_timeout` | The amount of time after which a message sent will expire, the sender will then receive a `EXPIRE` message (milliseconds). | No | `5000` |
| `--alive_timeout` | `alive_timeout` | Timeout for broken connection (milliseconds). If the server doesn't receive any data from client (includes `pong` messages), the client's connection will be destroyed. | No | `60000` |
| `--concurrent_limit, -c` | `concurrent_limit` | Maximum number of clients' connections to WebSocket server (number) | No | `5000` |
| `--sslkey` | `sslkey` | Path to SSL key (string) | No | |
| `--sslcert` | `sslcert` | Path to SSL certificate (string) | No | |
| `--allow_discovery` | `allow_discovery` | Allow to use GET `/peers` http API method to get an array of ids of all connected clients (boolean) | No | |
| | `generateClientId` | A function which generate random client IDs when calling `/id` API method (`() => string`) | No | `uuid/v4` |
| CLI option | JS option | Description | Required | Default |
| ------------------------ | ------------------ | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | :------: | :--------: |
| `--port, -p` | `port` | Port to listen (number) | **Yes** | |
| `--key, -k` | `key` | Connection key (string). Client must provide it to call API methods | No | `"peerjs"` |
| `--path` | `path` | Path (string). The server responds for requests to the root URL + path. **E.g.** Set the `path` to `/myapp` and run server on 9000 port via `peerjs --port 9000 --path /myapp` Then open http://127.0.0.1:9000/myapp - you should see a JSON reponse. | No | `"/"` |
| `--proxied` | `proxied` | Set `true` if PeerServer stays behind a reverse proxy (boolean) | No | `false` |
| `--expire_timeout, -t` | `expire_timeout` | The amount of time after which a message sent will expire, the sender will then receive a `EXPIRE` message (milliseconds). | No | `5000` |
| `--alive_timeout` | `alive_timeout` | Timeout for broken connection (milliseconds). If the server doesn't receive any data from client (includes `pong` messages), the client's connection will be destroyed. | No | `60000` |
| `--concurrent_limit, -c` | `concurrent_limit` | Maximum number of clients' connections to WebSocket server (number) | No | `5000` |
| `--sslkey` | `sslkey` | Path to SSL key (string) | No | |
| `--sslcert` | `sslcert` | Path to SSL certificate (string) | No | |
| `--allow_discovery` | `allow_discovery` | Allow to use GET `/peers` http API method to get an array of ids of all connected clients (boolean) | No | |
| `--cors` | `corsOptions` | The CORS origins that can access this server |
| | `generateClientId` | A function which generate random client IDs when calling `/id` API method (`() => string`) | No | `uuid/v4` |
## Using HTTPS
Simply pass in PEM-encoded certificate and key.
```javascript
const fs = require('fs');
const { PeerServer } = require('peer');
const fs = require("fs");
const { PeerServer } = require("peer");
const peerServer = PeerServer({
port: 9000,
ssl: {
key: fs.readFileSync('/path/to/your/ssl/key/here.key'),
cert: fs.readFileSync('/path/to/your/ssl/certificate/here.crt')
}
port: 9000,
ssl: {
key: fs.readFileSync("/path/to/your/ssl/key/here.key"),
cert: fs.readFileSync("/path/to/your/ssl/certificate/here.crt"),
},
});
```
You can also pass any other [SSL options accepted by https.createServer](https://nodejs.org/api/https.html#https_https_createserver_options_requestlistenerfrom), such as `SNICallback:
```javascript
const fs = require('fs');
const { PeerServer } = require('peer');
const fs = require("fs");
const { PeerServer } = require("peer");
const peerServer = PeerServer({
port: 9000,
ssl: {
SNICallback: (servername, cb) => {
// your code here ....
}
}
port: 9000,
ssl: {
SNICallback: (servername, cb) => {
// your code here ....
},
},
});
```
## Running PeerServer behind a reverse proxy
Make sure to set the `proxied` option, otherwise IP based limiting will fail.
@ -139,29 +149,31 @@ The option is passed verbatim to the
if it is truthy.
```javascript
const { PeerServer } = require('peer');
const { PeerServer } = require("peer");
const peerServer = PeerServer({
port: 9000,
path: '/myapp',
proxied: true
port: 9000,
path: "/myapp",
proxied: true,
});
```
## Custom client ID generation
By default, PeerServer uses `uuid/v4` npm package to generate random client IDs.
You can set `generateClientId` option in config to specify a custom function to generate client IDs.
```javascript
const { PeerServer } = require('peer');
const { PeerServer } = require("peer");
const customGenerationFunction = () => (Math.random().toString(36) + '0000000000000000000').substr(2, 16);
const customGenerationFunction = () =>
(Math.random().toString(36) + "0000000000000000000").substr(2, 16);
const peerServer = PeerServer({
port: 9000,
path: '/myapp',
generateClientId: customGenerationFunction
port: 9000,
path: "/myapp",
generateClientId: customGenerationFunction,
});
```
@ -170,34 +182,34 @@ Open http://127.0.0.1:9000/myapp/peerjs/id to see a new random id.
## Combining with existing express app
```javascript
const express = require('express');
const { ExpressPeerServer } = require('peer');
const express = require("express");
const { ExpressPeerServer } = require("peer");
const app = express();
app.get('/', (req, res, next) => res.send('Hello world!'));
app.get("/", (req, res, next) => res.send("Hello world!"));
// =======
const server = app.listen(9000);
const peerServer = ExpressPeerServer(server, {
path: '/myapp'
path: "/myapp",
});
app.use('/peerjs', peerServer);
app.use("/peerjs", peerServer);
// == OR ==
const http = require('http');
const http = require("http");
const server = http.createServer(app);
const peerServer = ExpressPeerServer(server, {
debug: true,
path: '/myapp'
debug: true,
path: "/myapp",
});
app.use('/peerjs', peerServer);
app.use("/peerjs", peerServer);
server.listen(9000);
@ -236,18 +248,20 @@ $ npm test
We have 'ready to use' images on docker hub:
https://hub.docker.com/r/peerjs/peerjs-server
To run the latest image:
To run the latest image:
```sh
$ docker run -p 9000:9000 -d peerjs/peerjs-server
```
You can build a new image simply by calling:
```sh
$ docker build -t myimage https://github.com/peers/peerjs-server.git
```
To run the image execute this:
To run the image execute this:
```sh
$ docker run -p 9000:9000 -d myimage
```
@ -289,29 +303,29 @@ resources:
3. Create `server.js` (which node will run by default for the `start` script):
```js
const express = require('express');
const { ExpressPeerServer } = require('peer');
const express = require("express");
const { ExpressPeerServer } = require("peer");
const app = express();
app.enable('trust proxy');
app.enable("trust proxy");
const PORT = process.env.PORT || 9000;
const server = app.listen(PORT, () => {
console.log(`App listening on port ${PORT}`);
console.log('Press Ctrl+C to quit.');
console.log(`App listening on port ${PORT}`);
console.log("Press Ctrl+C to quit.");
});
const peerServer = ExpressPeerServer(server, {
path: '/'
path: "/",
});
app.use('/', peerServer);
app.use("/", peerServer);
module.exports = app;
```
4. Deploy to an existing GAE project (assuming you are already logged in via
`gcloud`), replacing `YOUR-PROJECT-ID-HERE` with your particular project ID:
`gcloud`), replacing `YOUR-PROJECT-ID-HERE` with your particular project ID:
```sh
gcloud app deploy --project=YOUR-PROJECT-ID-HERE --promote --quiet app.yaml

View File

@ -0,0 +1,17 @@
import { describe, expect, it } from "@jest/globals";
import { Client } from "../../../../src/models/client";
import { HeartbeatHandler } from "../../../../src/messageHandler/handlers";
describe("Heartbeat handler", () => {
it("should update last ping time", () => {
const client = new Client({ id: "id", token: "" });
client.setLastPing(0);
const nowTime = new Date().getTime();
HeartbeatHandler(client);
expect(client.getLastPing()).toBeGreaterThanOrEqual(nowTime - 2);
expect(nowTime).toBeGreaterThanOrEqual(client.getLastPing() - 2);
});
});

View File

@ -0,0 +1,117 @@
import { describe, expect, it } from "@jest/globals";
import { Client } from "../../../../src/models/client";
import { TransmissionHandler } from "../../../../src/messageHandler/handlers";
import { Realm } from "../../../../src/models/realm";
import { MessageType } from "../../../../src/enums";
import type WebSocket from "ws";
const createFakeSocket = (): WebSocket => {
/* eslint-disable @typescript-eslint/no-empty-function */
const sock = {
send: (): void => {},
close: (): void => {},
on: (): void => {},
};
/* eslint-enable @typescript-eslint/no-empty-function */
return sock as unknown as WebSocket;
};
describe("Transmission handler", () => {
it("should save message in queue when destination client not connected", () => {
const realm = new Realm();
const handleTransmission = TransmissionHandler({ realm });
const clientFrom = new Client({ id: "id1", token: "" });
const idTo = "id2";
realm.setClient(clientFrom, clientFrom.getId());
handleTransmission(clientFrom, {
type: MessageType.OFFER,
src: clientFrom.getId(),
dst: idTo,
});
expect(realm.getMessageQueueById(idTo)?.getMessages().length).toBe(1);
});
it("should not save LEAVE and EXPIRE messages in queue when destination client not connected", () => {
const realm = new Realm();
const handleTransmission = TransmissionHandler({ realm });
const clientFrom = new Client({ id: "id1", token: "" });
const idTo = "id2";
realm.setClient(clientFrom, clientFrom.getId());
handleTransmission(clientFrom, {
type: MessageType.LEAVE,
src: clientFrom.getId(),
dst: idTo,
});
handleTransmission(clientFrom, {
type: MessageType.EXPIRE,
src: clientFrom.getId(),
dst: idTo,
});
expect(realm.getMessageQueueById(idTo)).toBeUndefined();
});
it("should send message to destination client when destination client connected", () => {
const realm = new Realm();
const handleTransmission = TransmissionHandler({ realm });
const clientFrom = new Client({ id: "id1", token: "" });
const clientTo = new Client({ id: "id2", token: "" });
const socketTo = createFakeSocket();
clientTo.setSocket(socketTo);
realm.setClient(clientTo, clientTo.getId());
let sent = false;
socketTo.send = (): void => {
sent = true;
};
handleTransmission(clientFrom, {
type: MessageType.OFFER,
src: clientFrom.getId(),
dst: clientTo.getId(),
});
expect(sent).toBe(true);
});
it("should send LEAVE message to source client when sending to destination client failed", () => {
const realm = new Realm();
const handleTransmission = TransmissionHandler({ realm });
const clientFrom = new Client({ id: "id1", token: "" });
const clientTo = new Client({ id: "id2", token: "" });
const socketFrom = createFakeSocket();
const socketTo = createFakeSocket();
clientFrom.setSocket(socketFrom);
clientTo.setSocket(socketTo);
realm.setClient(clientFrom, clientFrom.getId());
realm.setClient(clientTo, clientTo.getId());
let sent = false;
socketFrom.send = (data: string): void => {
if (JSON.parse(data)?.type === MessageType.LEAVE) {
sent = true;
}
};
socketTo.send = (): void => {
throw Error();
};
handleTransmission(clientFrom, {
type: MessageType.OFFER,
src: clientFrom.getId(),
dst: clientTo.getId(),
});
expect(sent).toBe(true);
});
});

View File

@ -0,0 +1,28 @@
import { describe, expect, it } from "@jest/globals";
import { HandlersRegistry } from "../../src/messageHandler/handlersRegistry";
import type { Handler } from "../../src/messageHandler/handler";
import { MessageType } from "../../src/enums";
describe("HandlersRegistry", () => {
it("should execute handler for message type", () => {
const handlersRegistry = new HandlersRegistry();
let handled = false;
const handler: Handler = (): boolean => {
handled = true;
return true;
};
handlersRegistry.registerHandler(MessageType.OPEN, handler);
handlersRegistry.handle(undefined, {
type: MessageType.OPEN,
src: "src",
dst: "dst",
});
expect(handled).toBe(true);
});
});

View File

@ -0,0 +1,63 @@
import { describe, expect, it } from "@jest/globals";
import { MessageQueue } from "../../src/models/messageQueue";
import { MessageType } from "../../src/enums";
import type { IMessage } from "../../src/models/message";
import { wait } from "../utils";
describe("MessageQueue", () => {
const createTestMessage = (): IMessage => {
return {
type: MessageType.OPEN,
src: "src",
dst: "dst",
};
};
describe("#addMessage", () => {
it("should add message to queue", () => {
const queue = new MessageQueue();
queue.addMessage(createTestMessage());
expect(queue.getMessages().length).toBe(1);
});
});
describe("#readMessage", () => {
it("should return undefined for empty queue", () => {
const queue = new MessageQueue();
expect(queue.readMessage()).toBeUndefined();
});
it("should return message if any exists in queue", () => {
const queue = new MessageQueue();
const message = createTestMessage();
queue.addMessage(message);
expect(queue.readMessage()).toEqual(message);
expect(queue.readMessage()).toBeUndefined();
});
});
describe("#getLastReadAt", () => {
it("should not be changed if no messages when read", () => {
const queue = new MessageQueue();
const lastReadAt = queue.getLastReadAt();
queue.readMessage();
expect(queue.getLastReadAt()).toBe(lastReadAt);
});
it("should be changed when read message", async () => {
const queue = new MessageQueue();
const lastReadAt = queue.getLastReadAt();
queue.addMessage(createTestMessage());
await wait(10);
expect(queue.getLastReadAt()).toBe(lastReadAt);
queue.readMessage();
expect(queue.getLastReadAt()).toBeGreaterThanOrEqual(lastReadAt + 10 - 2);
});
});
});

View File

@ -0,0 +1,51 @@
import { describe, expect, it } from "@jest/globals";
import { Realm } from "../../src/models/realm";
import { Client } from "../../src/models/client";
describe("Realm", () => {
describe("#generateClientId", () => {
it("should generate a 36-character UUID, or return function value", () => {
const realm = new Realm();
expect(realm.generateClientId().length).toBe(36);
expect(realm.generateClientId(() => "abcd")).toBe("abcd");
});
});
describe("#setClient", () => {
it("should add client to realm", () => {
const realm = new Realm();
const client = new Client({ id: "id", token: "" });
realm.setClient(client, "id");
expect(realm.getClientsIds()).toEqual(["id"]);
});
});
describe("#removeClientById", () => {
it("should remove client from realm", () => {
const realm = new Realm();
const client = new Client({ id: "id", token: "" });
realm.setClient(client, "id");
realm.removeClientById("id");
expect(realm.getClientById("id")).toBeUndefined();
});
});
describe("#getClientsIds", () => {
it("should reflects on add/remove childs", () => {
const realm = new Realm();
const client = new Client({ id: "id", token: "" });
realm.setClient(client, "id");
expect(realm.getClientsIds()).toEqual(["id"]);
expect(realm.getClientById("id")).toBe(client);
realm.removeClientById("id");
expect(realm.getClientsIds()).toEqual([]);
});
});
});

91
__test__/peerjs.spec.ts Normal file
View File

@ -0,0 +1,91 @@
import { describe, expect, it } from "@jest/globals";
import http from "http";
import expectedJson from "../app.json";
import fetch from "node-fetch";
import * as crypto from "crypto";
import { startServer } from "./utils";
const PORT = "9000";
async function makeRequest() {
return new Promise<object>((resolve, reject) => {
http
.get(`http://localhost:${PORT}/`, (resp) => {
let data = "";
resp.on("data", (chunk) => {
data += chunk;
});
resp.on("end", () => {
resolve(JSON.parse(data));
});
})
.on("error", (err) => {
console.log("Error: " + err.message);
reject(err);
});
});
}
describe("Check bin/peerjs", () => {
it("should return content of app.json file", async () => {
expect.assertions(1);
const ls = await startServer();
try {
const resp = await makeRequest();
expect(resp).toEqual(expectedJson);
} finally {
ls.kill();
}
});
it("should reflect the origin header in CORS by default", async () => {
expect.assertions(1);
const ls = await startServer();
const origin = crypto.randomUUID();
try {
const res = await fetch(`http://localhost:${PORT}/peerjs/id`, {
headers: {
Origin: origin,
},
});
expect(res.headers.get("access-control-allow-origin")).toBe(origin);
} finally {
ls.kill();
}
});
it("should respect the CORS parameters", async () => {
expect.assertions(3);
const origin1 = crypto.randomUUID();
const origin2 = crypto.randomUUID();
const origin3 = crypto.randomUUID();
const ls = await startServer(["--cors", origin1, "--cors", origin2]);
try {
const res1 = await fetch(`http://localhost:${PORT}/peerjs/id`, {
headers: {
Origin: origin1,
},
});
expect(res1.headers.get("access-control-allow-origin")).toBe(origin1);
const res2 = await fetch(`http://localhost:${PORT}/peerjs/id`, {
headers: {
Origin: origin2,
},
});
expect(res2.headers.get("access-control-allow-origin")).toBe(origin2);
const res3 = await fetch(`http://localhost:${PORT}/peerjs/id`, {
headers: {
Origin: origin3,
},
});
expect(res3.headers.get("access-control-allow-origin")).toBe(null);
} finally {
ls.kill();
}
});
});

View File

@ -0,0 +1,53 @@
import { describe, expect, it } from "@jest/globals";
import { Client } from "../../../src/models/client";
import { Realm } from "../../../src/models/realm";
import { CheckBrokenConnections } from "../../../src/services/checkBrokenConnections";
import { wait } from "../../utils";
describe("CheckBrokenConnections", () => {
it("should remove client after 2 checks", async () => {
const realm = new Realm();
const doubleCheckTime = 55; //~ equals to checkBrokenConnections.checkInterval * 2
const checkBrokenConnections = new CheckBrokenConnections({
realm,
config: { alive_timeout: doubleCheckTime },
checkInterval: 30,
});
const client = new Client({ id: "id", token: "" });
realm.setClient(client, "id");
checkBrokenConnections.start();
await wait(checkBrokenConnections.checkInterval * 2 + 30);
expect(realm.getClientById("id")).toBeUndefined();
checkBrokenConnections.stop();
});
it("should remove client after 1 ping", async () => {
const realm = new Realm();
const doubleCheckTime = 55; //~ equals to checkBrokenConnections.checkInterval * 2
const checkBrokenConnections = new CheckBrokenConnections({
realm,
config: { alive_timeout: doubleCheckTime },
checkInterval: 30,
});
const client = new Client({ id: "id", token: "" });
realm.setClient(client, "id");
checkBrokenConnections.start();
//set ping after first check
await wait(checkBrokenConnections.checkInterval);
client.setLastPing(new Date().getTime());
await wait(checkBrokenConnections.checkInterval * 2 + 10);
expect(realm.getClientById("id")).toBeUndefined();
checkBrokenConnections.stop();
});
});

View File

@ -0,0 +1,96 @@
import { describe, expect, it } from "@jest/globals";
import { Client } from "../../../src/models/client";
import { Realm } from "../../../src/models/realm";
import type { IMessage } from "../../../src/models/message";
import { MessagesExpire } from "../../../src/services/messagesExpire";
import { MessageHandler } from "../../../src/messageHandler";
import { MessageType } from "../../../src/enums";
import { wait } from "../../utils";
describe("MessagesExpire", () => {
const createTestMessage = (dst: string): IMessage => {
return {
type: MessageType.OPEN,
src: "src",
dst,
};
};
it("should remove client if no read from queue", async () => {
const realm = new Realm();
const messageHandler = new MessageHandler(realm);
const checkInterval = 10;
const expireTimeout = 50;
const config = {
cleanup_out_msgs: checkInterval,
expire_timeout: expireTimeout,
};
const messagesExpire = new MessagesExpire({
realm,
config,
messageHandler,
});
const client = new Client({ id: "id", token: "" });
realm.setClient(client, "id");
realm.addMessageToQueue(client.getId(), createTestMessage("dst"));
messagesExpire.startMessagesExpiration();
await wait(checkInterval * 2);
expect(
realm.getMessageQueueById(client.getId())?.getMessages().length,
).toBe(1);
await wait(expireTimeout);
expect(realm.getMessageQueueById(client.getId())).toBeUndefined();
messagesExpire.stopMessagesExpiration();
});
it("should fire EXPIRE message", async () => {
const realm = new Realm();
const messageHandler = new MessageHandler(realm);
const checkInterval = 10;
const expireTimeout = 50;
const config = {
cleanup_out_msgs: checkInterval,
expire_timeout: expireTimeout,
};
const messagesExpire = new MessagesExpire({
realm,
config,
messageHandler,
});
const client = new Client({ id: "id", token: "" });
realm.setClient(client, "id");
realm.addMessageToQueue(client.getId(), createTestMessage("dst1"));
realm.addMessageToQueue(client.getId(), createTestMessage("dst2"));
let handledCount = 0;
messageHandler.handle = (client, message): boolean => {
expect(client).toBeUndefined();
expect(message.type).toBe(MessageType.EXPIRE);
handledCount++;
return true;
};
messagesExpire.startMessagesExpiration();
await wait(checkInterval * 2);
await wait(expireTimeout);
expect(handledCount).toBe(2);
messagesExpire.stopMessagesExpiration();
});
});

View File

@ -0,0 +1,244 @@
import { describe, expect, it } from "@jest/globals";
import { Server, WebSocket } from "mock-socket";
import type { Server as HttpServer } from "node:http";
import { Realm } from "../../../src/models/realm";
import { WebSocketServer } from "../../../src/services/webSocketServer";
import { Errors, MessageType } from "../../../src/enums";
import { wait } from "../../utils";
type Destroyable<T> = T & { destroy?: () => Promise<void> };
const checkOpen = async (c: WebSocket): Promise<boolean> => {
return new Promise((resolve) => {
c.onmessage = (event: object & { data?: string }): void => {
const message = JSON.parse(event.data as string);
resolve(message.type === MessageType.OPEN);
};
});
};
const checkSequence = async (
c: WebSocket,
msgs: { type: MessageType; error?: Errors }[],
): Promise<boolean> => {
return new Promise((resolve) => {
const restMessages = [...msgs];
const finish = (success = false): void => {
resolve(success);
};
c.onmessage = (event: object & { data?: string }): void => {
const [mes] = restMessages;
if (!mes) {
return finish();
}
restMessages.shift();
const message = JSON.parse(event.data as string);
if (message.type !== mes.type) {
return finish();
}
const isOk = !mes.error || message.payload?.msg === mes.error;
if (!isOk) {
return finish();
}
if (restMessages.length === 0) {
finish(true);
}
};
});
};
const createTestServer = ({
realm,
config,
url,
}: {
realm: Realm;
config: { path: string; key: string; concurrent_limit: number };
url: string;
}): Destroyable<WebSocketServer> => {
const server = new Server(url) as Server & HttpServer;
const webSocketServer: Destroyable<WebSocketServer> = new WebSocketServer({
server,
realm,
config,
});
server.on(
"connection",
(
socket: WebSocket & {
on?: (eventName: string, callback: () => void) => void;
},
) => {
const s = webSocketServer.socketServer;
s.emit("connection", socket, { url: socket.url });
socket.onclose = (): void => {
const userId = socket.url
.split("?")[1]
?.split("&")
.find((p) => p.startsWith("id"))
?.split("=")[1];
if (!userId) return;
const client = realm.getClientById(userId);
const clientSocket = client?.getSocket();
if (!clientSocket) return;
(clientSocket as unknown as WebSocket).listeners[
"server::close"
]?.forEach((s: () => void) => s());
};
socket.onmessage = (event: object & { data?: string }): void => {
const userId = socket.url
.split("?")[1]
?.split("&")
.find((p) => p.startsWith("id"))
?.split("=")[1];
if (!userId) return;
const client = realm.getClientById(userId);
const clientSocket = client?.getSocket();
if (!clientSocket) return;
(clientSocket as unknown as WebSocket).listeners[
"server::message"
]?.forEach((s: (data: object) => void) => s(event));
};
},
);
webSocketServer.destroy = async (): Promise<void> => {
server.close();
};
return webSocketServer;
};
describe("WebSocketServer", () => {
it("should return valid path", () => {
const realm = new Realm();
const config = { path: "/", key: "testKey", concurrent_limit: 1 };
const config2 = { ...config, path: "path" };
const server = new Server("path1") as Server & HttpServer;
const server2 = new Server("path2") as Server & HttpServer;
const webSocketServer = new WebSocketServer({ server, realm, config });
expect(webSocketServer.path).toBe("/peerjs");
const webSocketServer2 = new WebSocketServer({
server: server2,
realm,
config: config2,
});
expect(webSocketServer2.path).toBe("path/peerjs");
server.stop();
server2.stop();
});
it(`should check client's params`, async () => {
const realm = new Realm();
const config = { path: "/", key: "testKey", concurrent_limit: 1 };
const fakeURL = "ws://localhost:8080/peerjs";
const getError = async (
url: string,
validError: Errors = Errors.INVALID_WS_PARAMETERS,
): Promise<boolean> => {
const webSocketServer = createTestServer({ url, realm, config });
const ws = new WebSocket(url);
const errorSent = await checkSequence(ws, [
{ type: MessageType.ERROR, error: validError },
]);
ws.close();
await webSocketServer.destroy?.();
return errorSent;
};
expect(await getError(fakeURL)).toBe(true);
expect(await getError(`${fakeURL}?key=${config.key}`)).toBe(true);
expect(await getError(`${fakeURL}?key=${config.key}&id=1`)).toBe(true);
expect(
await getError(
`${fakeURL}?key=notValidKey&id=userId&token=userToken`,
Errors.INVALID_KEY,
),
).toBe(true);
});
it(`should check concurrent limit`, async () => {
const realm = new Realm();
const config = { path: "/", key: "testKey", concurrent_limit: 1 };
const fakeURL = "ws://localhost:8080/peerjs";
const createClient = (id: string): Destroyable<WebSocket> => {
// id in the path ensures that all mock servers listen on different urls
const url = `${fakeURL}${id}?key=${config.key}&id=${id}&token=${id}`;
const webSocketServer = createTestServer({ url, realm, config });
const ws: Destroyable<WebSocket> = new WebSocket(url);
ws.destroy = async (): Promise<void> => {
ws.close();
wait(10);
webSocketServer.destroy?.();
wait(10);
ws.destroy = undefined;
};
return ws;
};
const c1 = createClient("1");
expect(await checkOpen(c1)).toBe(true);
const c2 = createClient("2");
expect(
await checkSequence(c2, [
{ type: MessageType.ERROR, error: Errors.CONNECTION_LIMIT_EXCEED },
]),
).toBe(true);
await c1.destroy?.();
await c2.destroy?.();
await wait(10);
expect(realm.getClientsIds().length).toBe(0);
const c3 = createClient("3");
expect(await checkOpen(c3)).toBe(true);
await c3.destroy?.();
});
});

21
__test__/utils.ts Normal file
View File

@ -0,0 +1,21 @@
import { ChildProcessWithoutNullStreams, spawn } from "child_process";
import path from "path";
export const wait = (ms: number): Promise<void> =>
new Promise((resolve) => setTimeout(resolve, ms));
export const startServer = (params: string[] = []) => {
return new Promise<ChildProcessWithoutNullStreams>((resolve, reject) => {
const ls = spawn("node", [
path.join(__dirname, "../", "dist/bin/peerjs.js"),
"--port",
"9000",
...params,
]);
ls.stdout.once("data", () => resolve(ls));
ls.stderr.once("data", () => {
ls.kill();
reject();
});
});
};

View File

@ -1,5 +1,5 @@
{
"name": "PeerJS Server",
"description": "A server side element to broker connections between PeerJS clients.",
"website": "https://peerjs.com/"
"name": "PeerJS Server",
"description": "A server side element to broker connections between PeerJS clients.",
"website": "https://peerjs.com/"
}

View File

@ -1,122 +0,0 @@
#!/usr/bin/env node
// tslint:disable
const path = require("path");
const pkg = require("../package.json");
const fs = require("fs");
const optimistUsageLength = 98;
const yargs = require("yargs");
const version = pkg.version;
const { PeerServer } = require("../dist/src");
const opts = yargs
.usage("Usage: $0")
.wrap(Math.min(optimistUsageLength, yargs.terminalWidth()))
.options({
expire_timeout: {
demandOption: false,
alias: "t",
describe: "timeout (milliseconds)",
default: 5000
},
concurrent_limit: {
demandOption: false,
alias: "c",
describe: "concurrent limit",
default: 5000
},
alive_timeout: {
demandOption: false,
describe: "broken connection check timeout (milliseconds)",
default: 60000
},
key: {
demandOption: false,
alias: "k",
describe: "connection key",
default: "peerjs"
},
sslkey: {
demandOption: false,
describe: "path to SSL key"
},
sslcert: {
demandOption: false,
describe: "path to SSL certificate"
},
host: {
demandOption: false,
alias: "H",
describe: "host"
},
port: {
demandOption: true,
alias: "p",
describe: "port"
},
path: {
demandOption: false,
describe: "custom path",
default: "/"
},
allow_discovery: {
demandOption: false,
describe: "allow discovery of peers"
},
proxied: {
demandOption: false,
describe: "Set true if PeerServer stays behind a reverse proxy",
default: false
}
})
.boolean("allow_discovery")
.argv;
process.on("uncaughtException", function (e) {
console.error("Error: " + e);
});
if (opts.sslkey || opts.sslcert) {
if (opts.sslkey && opts.sslcert) {
opts.ssl = {
key: fs.readFileSync(path.resolve(opts.sslkey)),
cert: fs.readFileSync(path.resolve(opts.sslcert))
};
delete opts.sslkey;
delete opts.sslcert;
} else {
console.error("Warning: PeerServer will not run because either " +
"the key or the certificate has not been provided.");
process.exit(1);
}
}
const userPath = opts.path;
const server = PeerServer(opts, server => {
const host = server.address().address;
const port = server.address().port;
console.log(
"Started PeerServer on %s, port: %s, path: %s (v. %s)",
host, port, userPath || "/", version
);
const shutdownApp = () => {
server.close(() => {
console.log('Http server closed.');
process.exit(0);
});
};
process.on('SIGINT', shutdownApp);
process.on('SIGTERM', shutdownApp);
});
server.on("connection", client => {
console.log(`Client connected: ${client.getId()}`);
});
server.on("disconnect", client => {
console.log(`Client disconnected: ${client.getId()}`);
});

149
bin/peerjs.ts Normal file
View File

@ -0,0 +1,149 @@
#!/usr/bin/env node
import path from "node:path";
import { version } from "../package.json";
import fs from "node:fs";
const optimistUsageLength = 98;
import yargs from "yargs";
import { hideBin } from "yargs/helpers";
import { PeerServer } from "../src";
import type { AddressInfo } from "node:net";
import type { CorsOptions } from "cors";
const y = yargs(hideBin(process.argv));
const portEnvIsSet = !!process.env["PORT"];
const opts = y
.usage("Usage: $0")
.wrap(Math.min(optimistUsageLength, y.terminalWidth()))
.options({
expire_timeout: {
demandOption: false,
alias: "t",
describe: "timeout (milliseconds)",
default: 5000,
},
concurrent_limit: {
demandOption: false,
alias: "c",
describe: "concurrent limit",
default: 5000,
},
alive_timeout: {
demandOption: false,
describe: "broken connection check timeout (milliseconds)",
default: 60000,
},
key: {
demandOption: false,
alias: "k",
describe: "connection key",
default: "peerjs",
},
sslkey: {
type: "string",
demandOption: false,
describe: "path to SSL key",
},
sslcert: {
type: "string",
demandOption: false,
describe: "path to SSL certificate",
},
host: {
type: "string",
demandOption: false,
alias: "H",
describe: "host",
},
port: {
type: "number",
demandOption: !portEnvIsSet,
alias: "p",
describe: "port",
},
path: {
type: "string",
demandOption: false,
describe: "custom path",
default: process.env["PEERSERVER_PATH"] || "/",
},
allow_discovery: {
type: "boolean",
demandOption: false,
describe: "allow discovery of peers",
},
proxied: {
type: "boolean",
demandOption: false,
describe: "Set true if PeerServer stays behind a reverse proxy",
default: false,
},
cors: {
type: "string",
array: true,
describe: "Set the list of CORS origins",
},
})
.boolean("allow_discovery")
.parseSync();
if (!opts.port) {
opts.port = parseInt(process.env["PORT"] as string);
}
if (opts.cors) {
opts["corsOptions"] = {
origin: opts.cors,
} satisfies CorsOptions;
}
process.on("uncaughtException", function (e) {
console.error("Error: " + e);
});
if (opts.sslkey || opts.sslcert) {
if (opts.sslkey && opts.sslcert) {
opts["ssl"] = {
key: fs.readFileSync(path.resolve(opts.sslkey)),
cert: fs.readFileSync(path.resolve(opts.sslcert)),
};
} else {
console.error(
"Warning: PeerServer will not run because either " +
"the key or the certificate has not been provided.",
);
process.exit(1);
}
}
const userPath = opts.path;
const server = PeerServer(opts, (server) => {
const { address: host, port } = server.address() as AddressInfo;
console.log(
"Started PeerServer on %s, port: %s, path: %s (v. %s)",
host,
port,
userPath || "/",
version,
);
const shutdownApp = () => {
server.close(() => {
console.log("Http server closed.");
process.exit(0);
});
};
process.on("SIGINT", shutdownApp);
process.on("SIGTERM", shutdownApp);
});
server.on("connection", (client) => {
console.log(`Client connected: ${client.getId()}`);
});
server.on("disconnect", (client) => {
console.log(`Client disconnected: ${client.getId()}`);
});

View File

@ -1,10 +0,0 @@
version: '3'
services:
peerjs:
build: .
container_name: peerjs_server
expose:
- 9000
ports:
- "9000:9000"

37
fly.toml Normal file
View File

@ -0,0 +1,37 @@
# fly.toml file generated for peerserver on 2023-01-17T16:27:45+01:00
app = "peerserver"
kill_signal = "SIGINT"
kill_timeout = 5
processes = []
[env]
[experimental]
auto_rollback = true
[[services]]
http_checks = []
internal_port = 9000
processes = ["app"]
protocol = "tcp"
script_checks = []
[services.concurrency]
hard_limit = 25
soft_limit = 20
type = "connections"
[[services.ports]]
force_https = true
handlers = ["http"]
port = 80
[[services.ports]]
handlers = ["tls", "http"]
port = 443
[[services.tcp_checks]]
grace_period = "1s"
interval = "15s"
restart_limit = 0
timeout = "2s"

71
index.d.ts vendored
View File

@ -1,71 +0,0 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
/// <reference types="node" />
import { Server } from "net";
import { EventEmitter } from "events";
import WebSocketLib from "ws";
import Express from "express";
declare type MyWebSocket = WebSocketLib & EventEmitter;
declare type Optional<T> = {
[P in keyof T]?: (T[P] | undefined);
};
declare interface IConfig {
readonly port?: number;
readonly expire_timeout?: number;
readonly alive_timeout?: number;
readonly key?: string;
readonly path?: string;
readonly concurrent_limit?: number;
readonly allow_discovery?: boolean;
readonly proxied?: boolean | string;
readonly cleanup_out_msgs?: number;
readonly ssl?: {
key: string;
cert: string;
};
readonly generateClientId?: () => string;
}
declare interface IClient {
getId(): string;
getToken(): string;
getSocket(): MyWebSocket | null;
setSocket(socket: MyWebSocket | null): void;
getLastPing(): number;
setLastPing(lastPing: number): void;
send(data: any): void;
}
declare enum MessageType {
OPEN = "OPEN",
LEAVE = "LEAVE",
CANDIDATE = "CANDIDATE",
OFFER = "OFFER",
ANSWER = "ANSWER",
EXPIRE = "EXPIRE",
HEARTBEAT = "HEARTBEAT",
ID_TAKEN = "ID-TAKEN",
ERROR = "ERROR"
}
declare interface IMessage {
readonly type: MessageType;
readonly src: string;
readonly dst: string;
readonly payload?: any;
}
declare interface CustomExpress extends Express.Express {
on(event: string, callback: (...args: any[]) => void): this;
on(event: 'connection', callback: (client: IClient) => void): this;
on(event: 'disconnect', callback: (client: IClient) => void): this;
on(event: 'message', callback: (client: IClient, message: IMessage) => void): this;
on(event: 'error', callback: (error: Error) => void): this;
}
declare function ExpressPeerServer(server: Server, options?: IConfig): CustomExpress;
declare function PeerServer(options?: Optional<IConfig>, callback?: (server: Server) => void): CustomExpress;
export { ExpressPeerServer, PeerServer };

13
jest.config.js Normal file
View File

@ -0,0 +1,13 @@
/** @type {import('jest').Config} */
const config = {
testEnvironment: "node",
transform: {
"^.+\\.(t|j)sx?$": "@swc/jest",
},
transformIgnorePatterns: [
// "node_modules"
],
collectCoverageFrom: ["./src/**"],
};
export default config;

46576
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@ -1,78 +1,106 @@
{
"name": "peer",
"version": "0.0.0-development",
"description": "PeerJS server component",
"main": "dist/src/index.js",
"bin": {
"peerjs": "./bin/peerjs"
},
"keywords": [
"peerjs",
"webrtc",
"signaling"
],
"files": [
"bin/",
"dist/",
"index.d.ts"
],
"homepage": "https://github.com/peers/peerjs-server#readme",
"bugs": {
"url": "https://github.com/peers/peerjs-server/issues"
},
"repository": {
"type": "git",
"url": "https://github.com/peers/peerjs-server.git"
},
"author": "Michelle Bu, Eric Zhang, Alex Sosnovskiy",
"license": "MIT",
"scripts": {
"preversion": "npm run clean && npm run build",
"build": "tsc",
"clean": "rimraf ./dist",
"lint": "eslint --ext .js,.ts .",
"tsc": "tsc",
"prebuild": "npm run lint",
"test": "npm run lint && mocha -r ts-node/register \"test/**/*\"",
"start": "bin/peerjs --port ${PORT:=9000}",
"dev:start": "npm-run-all build start",
"dev": "nodemon --watch src -e ts --exec npm run dev:start",
"semantic-release": "semantic-release"
},
"release": {
"branch": "master"
},
"dependencies": {
"@types/cors": "^2.8.6",
"@types/express": "^4.17.3",
"@types/ws": "^7.2.3",
"body-parser": "^1.19.0",
"cors": "^2.8.5",
"express": "^4.17.1",
"uuid": "^3.4.0",
"ws": "^7.2.3",
"yargs": "^15.3.1"
},
"devDependencies": {
"@types/chai": "^4.2.11",
"@types/mocha": "^7.0.2",
"@types/node": "^10.17.17",
"@types/uuid": "^3.4.8",
"@typescript-eslint/eslint-plugin": "^2.24.0",
"@typescript-eslint/parser": "^2.24.0",
"chai": "^4.2.0",
"eslint": "^6.8.0",
"mocha": "^7.1.1",
"mock-socket": "8.0.5",
"nodemon": "^1.19.4",
"npm-run-all": "^4.1.5",
"rimraf": "^3.0.2",
"sinon": "^7.5.0",
"ts-node": "^8.7.0",
"typescript": "^4.1.2",
"semantic-release": "^19.0.5"
},
"engines": {
"node": ">=10"
}
"name": "peer",
"version": "0.0.0-development",
"keywords": [
"peerjs",
"webrtc",
"p2p",
"rtc"
],
"description": "PeerJS server component",
"homepage": "https://peerjs.com",
"bugs": {
"url": "https://github.com/peers/peerjs-server/issues"
},
"repository": {
"type": "git",
"url": "https://github.com/peers/peerjs-server"
},
"license": "MIT",
"contributors": [],
"type": "module",
"exports": {
".": {
"import": {
"types": "./dist/peer.d.ts",
"default": "./dist/module.mjs"
},
"require": {
"types": "./dist/peer.d.ts",
"default": "./dist/index.cjs"
}
}
},
"main": "dist/index.cjs",
"module": "dist/module.mjs",
"source": "src/index.ts",
"binary": "dist/bin/peerjs.js",
"types": "dist/peer.d.ts",
"bin": {
"peerjs": "dist/bin/peerjs.js"
},
"funding": {
"type": "opencollective",
"url": "https://opencollective.com/peer"
},
"collective": {
"type": "opencollective",
"url": "https://opencollective.com/peer"
},
"files": [
"dist/"
],
"engines": {
"node": ">=14"
},
"targets": {
"binary": {
"source": "bin/peerjs.ts"
},
"main": {},
"module": {}
},
"scripts": {
"format": "prettier --write .",
"build": "parcel build",
"lint": "eslint --ext .js,.ts . && npm run check",
"check": "tsc --noEmit",
"test": "npm run lint && jest",
"coverage": "jest --coverage",
"start": "node dist/bin/peerjs.js --port ${PORT:=9000}",
"dev": "nodemon --watch src -e ts --exec 'npm run build && npm run start'",
"semantic-release": "semantic-release"
},
"dependencies": {
"@types/express": "^4.17.3",
"@types/ws": "^7.2.3 || ^8.0.0",
"cors": "^2.8.5",
"express": "^4.17.1",
"node-fetch": "^3.3.0",
"ws": "^7.2.3 || ^8.0.0",
"yargs": "^17.6.2"
},
"devDependencies": {
"@codedependant/semantic-release-docker": "^4.3.0",
"@parcel/packager-ts": "^2.8.2",
"@parcel/transformer-typescript-types": "^2.8.2",
"@semantic-release/changelog": "^6.0.1",
"@semantic-release/git": "^10.0.1",
"@swc/core": "^1.3.35",
"@swc/jest": "^0.2.24",
"@tsconfig/node16-strictest-esm": "^1.0.3",
"@types/cors": "^2.8.6",
"@types/jest": "^29.4.0",
"@types/node": "^14.18.33",
"@types/yargs": "^17.0.19",
"@typescript-eslint/eslint-plugin": "^5.0.0",
"@typescript-eslint/parser": "^5.0.0",
"eslint": "^8.0.0",
"jest": "^29.4.2",
"mock-socket": "^9.1.5",
"parcel": "^2.8.2",
"prettier": "^2.8.4",
"semantic-release": "^20.0.0",
"typescript": "^4.1.2"
}
}

29
renovate.json Normal file
View File

@ -0,0 +1,29 @@
{
"$schema": "https://docs.renovatebot.com/renovate-schema.json",
"extends": ["config:base", ":assignAndReview(jonasgloning)"],
"labels": ["dependencies"],
"assignees": ["jonasgloning"],
"major": {
"dependencyDashboardApproval": true
},
"packageRules": [
{
"matchDepTypes": ["devDependencies"],
"addLabels": ["dev-dependencies"],
"automerge": true,
"automergeType": "branch"
},
{
"matchUpdateTypes": ["minor", "patch"],
"matchCurrentVersion": "!/^0/",
"automerge": true,
"automergeType": "pr",
"platformAutomerge": true
}
],
"lockFileMaintenance": {
"enabled": true,
"automerge": true,
"automergeType": "branch"
}
}

View File

@ -8,14 +8,9 @@ So, the base path should be like `http://127.0.0.1:9000/` or `http://127.0.0.1:9
Endpoints:
* GET `/` - return a JSON to test the server.
- GET `/` - return a JSON to test the server.
This group of methods uses `:key` option from config:
* GET `/:key/id` - return a new user id. required `:key` from config.
* GET `/:key/peers` - return an array of all connected users. required `:key` from config. **IMPORTANT:** You should set `allow_discovery` to `true` in config to enable this method. It disabled by default.
This group of methods uses `:key` option from config, `:userId` and `:userToken` parameters from user.
* POST `/:key/:userId/:userToken/offer`
* POST `/:key/:userId/:userToken/candidate`
* POST `/:key/:userId/:userToken/answer`
* POST `/:key/:userId/:userToken/leave`
- GET `/:key/id` - return a new user id. required `:key` from config.
- GET `/:key/peers` - return an array of all connected users. required `:key` from config. **IMPORTANT:** You should set `allow_discovery` to `true` in config to enable this method. It disabled by default.

View File

@ -1,33 +1,28 @@
import bodyParser from "body-parser";
import cors from "cors";
import cors, { CorsOptions } from "cors";
import express from "express";
import publicContent from "../../app.json";
import { IConfig } from "../config";
import { IMessageHandler } from "../messageHandler";
import { IRealm } from "../models/realm";
import { AuthMiddleware } from "./middleware/auth";
import CallsApi from "./v1/calls";
import PublicApi from "./v1/public";
import type { IConfig } from "../config";
import type { IRealm } from "../models/realm";
export const Api = ({ config, realm, messageHandler }: {
config: IConfig;
realm: IRealm;
messageHandler: IMessageHandler;
export const Api = ({
config,
realm,
corsOptions,
}: {
config: IConfig;
realm: IRealm;
corsOptions: CorsOptions;
}): express.Router => {
const authMiddleware = new AuthMiddleware(config, realm);
const app = express.Router();
const app = express.Router();
app.use(cors(corsOptions));
const jsonParser = bodyParser.json();
app.get("/", (_, res) => {
res.send(publicContent);
});
app.use(cors());
app.use("/:key", PublicApi({ config, realm }));
app.get("/", (_, res) => {
res.send(publicContent);
});
app.use("/:key", PublicApi({ config, realm }));
app.use("/:key/:id/:token", authMiddleware.handle, jsonParser, CallsApi({ realm, messageHandler }));
return app;
return app;
};

View File

@ -1,35 +0,0 @@
import express from "express";
import { IConfig } from "../../../config";
import { Errors } from "../../../enums";
import { IRealm } from "../../../models/realm";
import { IMiddleware } from "../middleware";
export class AuthMiddleware implements IMiddleware {
constructor(private readonly config: IConfig, private readonly realm: IRealm) { }
public handle = (req: express.Request, res: express.Response, next: express.NextFunction) => {
const { id, token, key } = req.params;
if (key !== this.config.key) {
return res.status(401).send(Errors.INVALID_KEY);
}
if (!id) {
return res.sendStatus(401);
}
const client = this.realm.getClientById(id);
if (!client) {
return res.sendStatus(401);
}
if (client.getToken() && token !== client.getToken()) {
return res.status(401).send(Errors.INVALID_TOKEN);
}
next();
};
}

View File

@ -1,5 +0,0 @@
import express from "express";
export interface IMiddleware {
handle(req: express.Request, res: express.Response, next: express.NextFunction): any;
}

View File

@ -1,40 +0,0 @@
import express from "express";
import { IMessageHandler } from "../../../messageHandler";
import { IMessage } from "../../../models/message";
import { IRealm } from "../../../models/realm";
export default ({ realm, messageHandler }: { realm: IRealm; messageHandler: IMessageHandler; }): express.Router => {
const app = express.Router();
const handle = (req: express.Request, res: express.Response, next: express.NextFunction): any => {
const { id } = req.params;
if (!id) return next();
const client = realm.getClientById(id);
if (!client) {
throw new Error(`client not found:${id}`);
}
const { type, dst, payload } = req.body;
const message: IMessage = {
type,
src: id,
dst,
payload
};
messageHandler.handle(client, message);
res.sendStatus(200);
};
app.post("/offer", handle);
app.post("/candidate", handle);
app.post("/answer", handle);
app.post("/leave", handle);
return app;
};

View File

@ -1,28 +1,32 @@
import express from "express";
import { IConfig } from "../../../config";
import { IRealm } from "../../../models/realm";
import type { IConfig } from "../../../config";
import type { IRealm } from "../../../models/realm";
export default ({ config, realm }: {
config: IConfig; realm: IRealm;
export default ({
config,
realm,
}: {
config: IConfig;
realm: IRealm;
}): express.Router => {
const app = express.Router();
const app = express.Router();
// Retrieve guaranteed random ID.
app.get("/id", (_, res: express.Response) => {
res.contentType("html");
res.send(realm.generateClientId(config.generateClientId));
});
// Retrieve guaranteed random ID.
app.get("/id", (_, res: express.Response) => {
res.contentType("html");
res.send(realm.generateClientId(config.generateClientId));
});
// Get a list of all peers for a key, enabled by the `allowDiscovery` flag.
app.get("/peers", (_, res: express.Response) => {
if (config.allow_discovery) {
const clientsIds = realm.getClientsIds();
// Get a list of all peers for a key, enabled by the `allowDiscovery` flag.
app.get("/peers", (_, res: express.Response) => {
if (config.allow_discovery) {
const clientsIds = realm.getClientsIds();
return res.send(clientsIds);
}
return res.send(clientsIds);
}
res.sendStatus(401);
});
return res.sendStatus(401);
});
return app;
return app;
};

View File

@ -1,32 +1,38 @@
import type { WebSocketServer, ServerOptions } from "ws";
import type { CorsOptions } from "cors";
export interface IConfig {
readonly host: string;
readonly port: number;
readonly expire_timeout: number;
readonly alive_timeout: number;
readonly key: string;
readonly path: string;
readonly concurrent_limit: number;
readonly allow_discovery: boolean;
readonly proxied: boolean | string;
readonly cleanup_out_msgs: number;
readonly ssl?: {
key: string;
cert: string;
};
readonly generateClientId?: () => string;
readonly host: string;
readonly port: number;
readonly expire_timeout: number;
readonly alive_timeout: number;
readonly key: string;
readonly path: string;
readonly concurrent_limit: number;
readonly allow_discovery: boolean;
readonly proxied: boolean | string;
readonly cleanup_out_msgs: number;
readonly ssl?: {
key: string;
cert: string;
};
readonly generateClientId?: () => string;
readonly createWebSocketServer?: (options: ServerOptions) => WebSocketServer;
readonly corsOptions: CorsOptions;
}
const defaultConfig: IConfig = {
host: "::",
port: 9000,
expire_timeout: 5000,
alive_timeout: 60000,
key: "peerjs",
path: "/",
concurrent_limit: 5000,
allow_discovery: false,
proxied: false,
cleanup_out_msgs: 1000,
host: "::",
port: 9000,
expire_timeout: 5000,
alive_timeout: 60000,
key: "peerjs",
path: "/",
concurrent_limit: 5000,
allow_discovery: false,
proxied: false,
cleanup_out_msgs: 1000,
corsOptions: { origin: true },
};
export default defaultConfig;

View File

@ -1,18 +1,18 @@
export enum Errors {
INVALID_KEY = "Invalid key provided",
INVALID_TOKEN = "Invalid token provided",
INVALID_WS_PARAMETERS = "No id, token, or key supplied to websocket server",
CONNECTION_LIMIT_EXCEED = "Server has reached its concurrent user limit"
INVALID_KEY = "Invalid key provided",
INVALID_TOKEN = "Invalid token provided",
INVALID_WS_PARAMETERS = "No id, token, or key supplied to websocket server",
CONNECTION_LIMIT_EXCEED = "Server has reached its concurrent user limit",
}
export enum MessageType {
OPEN = "OPEN",
LEAVE = "LEAVE",
CANDIDATE = "CANDIDATE",
OFFER = "OFFER",
ANSWER = "ANSWER",
EXPIRE = "EXPIRE",
HEARTBEAT = "HEARTBEAT",
ID_TAKEN = "ID-TAKEN",
ERROR = "ERROR"
OPEN = "OPEN",
LEAVE = "LEAVE",
CANDIDATE = "CANDIDATE",
OFFER = "OFFER",
ANSWER = "ANSWER",
EXPIRE = "EXPIRE",
HEARTBEAT = "HEARTBEAT",
ID_TAKEN = "ID-TAKEN",
ERROR = "ERROR",
}

View File

@ -1,70 +1,79 @@
import express from "express";
import http from "http";
import https from "https";
import { Server } from "net";
import express, { type Express } from "express";
import http from "node:http";
import https from "node:https";
import defaultConfig, { IConfig } from "./config";
import type { IConfig } from "./config";
import defaultConfig from "./config";
import type { PeerServerEvents } from "./instance";
import { createInstance } from "./instance";
import type { IClient } from "./models/client";
import type { IMessage } from "./models/message";
type Optional<T> = {
[P in keyof T]?: (T[P] | undefined);
};
export type { MessageType } from "./enums";
export type { IConfig, PeerServerEvents, IClient, IMessage };
function ExpressPeerServer(server: Server, options?: IConfig) {
const app = express();
function ExpressPeerServer(
server: https.Server | http.Server,
options?: Partial<IConfig>,
) {
const app = express();
const newOptions: IConfig = {
...defaultConfig,
...options
};
const newOptions: IConfig = {
...defaultConfig,
...options,
};
if (newOptions.proxied) {
app.set("trust proxy", newOptions.proxied === "false" ? false : !!newOptions.proxied);
}
if (newOptions.proxied) {
app.set(
"trust proxy",
newOptions.proxied === "false" ? false : !!newOptions.proxied,
);
}
app.on("mount", () => {
if (!server) {
throw new Error("Server is not passed to constructor - " +
"can't start PeerServer");
}
app.on("mount", () => {
if (!server) {
throw new Error(
"Server is not passed to constructor - " + "can't start PeerServer",
);
}
createInstance({ app, server, options: newOptions });
});
createInstance({ app, server, options: newOptions });
});
return app;
return app as Express & PeerServerEvents;
}
function PeerServer(options: Optional<IConfig> = {}, callback?: (server: Server) => void) {
const app = express();
function PeerServer(
options: Partial<IConfig> = {},
callback?: (server: https.Server | http.Server) => void,
) {
const app = express();
let newOptions: IConfig = {
...defaultConfig,
...options
};
let newOptions: IConfig = {
...defaultConfig,
...options,
};
const port = newOptions.port;
const host = newOptions.host;
const port = newOptions.port;
const host = newOptions.host;
let server: Server;
let server: https.Server | http.Server;
const { ssl, ...restOptions } = newOptions;
if (ssl && Object.keys(ssl).length) {
server = https.createServer(ssl, app);
const { ssl, ...restOptions } = newOptions;
if (ssl && Object.keys(ssl).length) {
server = https.createServer(ssl, app);
newOptions = restOptions;
} else {
server = http.createServer(app);
}
newOptions = restOptions;
} else {
server = http.createServer(app);
}
const peerjs = ExpressPeerServer(server, newOptions);
app.use(peerjs);
const peerjs = ExpressPeerServer(server, newOptions);
app.use(peerjs);
server.listen(port, host, () => callback?.(server));
server.listen(port, host, () => callback?.(server));
return peerjs;
return peerjs;
}
export {
ExpressPeerServer,
PeerServer
};
export { ExpressPeerServer, PeerServer };

View File

@ -1,75 +1,99 @@
import express from "express";
import { Server } from "net";
import path from "path";
import { IClient } from "./models/client";
import { IMessage } from "./models/message";
import type express from "express";
import type { Server as HttpServer } from "node:http";
import type { Server as HttpsServer } from "node:https";
import path from "node:path";
import type { IRealm } from "./models/realm";
import { Realm } from "./models/realm";
import { IRealm } from "./models/realm";
import { CheckBrokenConnections } from "./services/checkBrokenConnections";
import { IMessagesExpire, MessagesExpire } from "./services/messagesExpire";
import { IWebSocketServer, WebSocketServer } from "./services/webSocketServer";
import type { IMessagesExpire } from "./services/messagesExpire";
import { MessagesExpire } from "./services/messagesExpire";
import type { IWebSocketServer } from "./services/webSocketServer";
import { WebSocketServer } from "./services/webSocketServer";
import { MessageHandler } from "./messageHandler";
import { Api } from "./api";
import { IConfig } from "./config";
import type { IClient } from "./models/client";
import type { IMessage } from "./models/message";
import type { IConfig } from "./config";
export const createInstance = ({ app, server, options }: {
app: express.Application;
server: Server;
options: IConfig;
export interface PeerServerEvents {
on(event: "connection", listener: (client: IClient) => void): this;
on(
event: "message",
listener: (client: IClient, message: IMessage) => void,
): this;
on(event: "disconnect", listener: (client: IClient) => void): this;
on(event: "error", listener: (client: Error) => void): this;
}
export const createInstance = ({
app,
server,
options,
}: {
app: express.Application;
server: HttpServer | HttpsServer;
options: IConfig;
}): void => {
const config = options;
const realm: IRealm = new Realm();
const messageHandler = new MessageHandler(realm);
const config = options;
const realm: IRealm = new Realm();
const messageHandler = new MessageHandler(realm);
const api = Api({ config, realm, messageHandler });
const messagesExpire: IMessagesExpire = new MessagesExpire({ realm, config, messageHandler });
const checkBrokenConnections = new CheckBrokenConnections({
realm,
config,
onClose: client => {
app.emit("disconnect", client);
}
});
const api = Api({ config, realm, corsOptions: options.corsOptions });
const messagesExpire: IMessagesExpire = new MessagesExpire({
realm,
config,
messageHandler,
});
const checkBrokenConnections = new CheckBrokenConnections({
realm,
config,
onClose: (client) => {
app.emit("disconnect", client);
},
});
app.use(options.path, api);
app.use(options.path, api);
//use mountpath for WS server
const customConfig = { ...config, path: path.posix.join(app.path(), options.path, '/') };
//use mountpath for WS server
const customConfig = {
...config,
path: path.posix.join(app.path(), options.path, "/"),
};
const wss: IWebSocketServer = new WebSocketServer({
server,
realm,
config: customConfig
});
const wss: IWebSocketServer = new WebSocketServer({
server,
realm,
config: customConfig,
});
wss.on("connection", (client: IClient) => {
const messageQueue = realm.getMessageQueueById(client.getId());
wss.on("connection", (client: IClient) => {
const messageQueue = realm.getMessageQueueById(client.getId());
if (messageQueue) {
let message: IMessage | undefined;
if (messageQueue) {
let message: IMessage | undefined;
while ((message = messageQueue.readMessage())) {
messageHandler.handle(client, message);
}
realm.clearMessageQueue(client.getId());
}
while ((message = messageQueue.readMessage())) {
messageHandler.handle(client, message);
}
realm.clearMessageQueue(client.getId());
}
app.emit("connection", client);
});
app.emit("connection", client);
});
wss.on("message", (client: IClient, message: IMessage) => {
app.emit("message", client, message);
messageHandler.handle(client, message);
});
wss.on("message", (client: IClient, message: IMessage) => {
app.emit("message", client, message);
messageHandler.handle(client, message);
});
wss.on("close", (client: IClient) => {
app.emit("disconnect", client);
});
wss.on("close", (client: IClient) => {
app.emit("disconnect", client);
});
wss.on("error", (error: Error) => {
app.emit("error", error);
});
wss.on("error", (error: Error) => {
app.emit("error", error);
});
messagesExpire.startMessagesExpiration();
checkBrokenConnections.start();
};
messagesExpire.startMessagesExpiration();
checkBrokenConnections.start();
};

View File

@ -1,4 +1,7 @@
import { IClient } from "../models/client";
import { IMessage } from "../models/message";
import type { IClient } from "../models/client";
import type { IMessage } from "../models/message";
export type Handler = (client: IClient | undefined, message: IMessage) => boolean;
export type Handler = (
client: IClient | undefined,
message: IMessage,
) => boolean;

View File

@ -1,10 +1,10 @@
import { IClient } from "../../../models/client";
import type { IClient } from "../../../models/client";
export const HeartbeatHandler = (client: IClient | undefined): boolean => {
if (client) {
const nowTime = new Date().getTime();
client.setLastPing(nowTime);
}
if (client) {
const nowTime = new Date().getTime();
client.setLastPing(nowTime);
}
return true;
return true;
};

View File

@ -1,61 +1,65 @@
import { MessageType } from "../../../enums";
import { IClient } from "../../../models/client";
import { IMessage } from "../../../models/message";
import { IRealm } from "../../../models/realm";
import type { IClient } from "../../../models/client";
import type { IMessage } from "../../../models/message";
import type { IRealm } from "../../../models/realm";
export const TransmissionHandler = ({ realm }: { realm: IRealm; }): (client: IClient | undefined, message: IMessage) => boolean => {
const handle = (client: IClient | undefined, message: IMessage) => {
const type = message.type;
const srcId = message.src;
const dstId = message.dst;
export const TransmissionHandler = ({
realm,
}: {
realm: IRealm;
}): ((client: IClient | undefined, message: IMessage) => boolean) => {
const handle = (client: IClient | undefined, message: IMessage) => {
const type = message.type;
const srcId = message.src;
const dstId = message.dst;
const destinationClient = realm.getClientById(dstId);
const destinationClient = realm.getClientById(dstId);
// User is connected!
if (destinationClient) {
const socket = destinationClient.getSocket();
try {
if (socket) {
const data = JSON.stringify(message);
// User is connected!
if (destinationClient) {
const socket = destinationClient.getSocket();
try {
if (socket) {
const data = JSON.stringify(message);
socket.send(data);
} else {
// Neither socket no res available. Peer dead?
throw new Error("Peer dead");
}
} catch (e) {
// This happens when a peer disconnects without closing connections and
// the associated WebSocket has not closed.
// Tell other side to stop trying.
if (socket) {
socket.close();
} else {
realm.removeClientById(destinationClient.getId());
}
socket.send(data);
} else {
// Neither socket no res available. Peer dead?
throw new Error("Peer dead");
}
} catch (e) {
// This happens when a peer disconnects without closing connections and
// the associated WebSocket has not closed.
// Tell other side to stop trying.
if (socket) {
socket.close();
} else {
realm.removeClientById(destinationClient.getId());
}
handle(client, {
type: MessageType.LEAVE,
src: dstId,
dst: srcId
});
}
} else {
// Wait for this client to connect/reconnect (XHR) for important
// messages.
const ignoredTypes = [MessageType.LEAVE, MessageType.EXPIRE];
handle(client, {
type: MessageType.LEAVE,
src: dstId,
dst: srcId,
});
}
} else {
// Wait for this client to connect/reconnect (XHR) for important
// messages.
const ignoredTypes = [MessageType.LEAVE, MessageType.EXPIRE];
if (!ignoredTypes.includes(type) && dstId) {
realm.addMessageToQueue(dstId, message);
} else if (type === MessageType.LEAVE && !dstId) {
realm.removeClientById(srcId);
} else {
// Unavailable destination specified with message LEAVE or EXPIRE
// Ignore
}
}
if (!ignoredTypes.includes(type) && dstId) {
realm.addMessageToQueue(dstId, message);
} else if (type === MessageType.LEAVE && !dstId) {
realm.removeClientById(srcId);
} else {
// Unavailable destination specified with message LEAVE or EXPIRE
// Ignore
}
}
return true;
};
return true;
};
return handle;
return handle;
};

View File

@ -1,29 +1,29 @@
import { MessageType } from "../enums";
import { IClient } from "../models/client";
import { IMessage } from "../models/message";
import { Handler } from "./handler";
import type { MessageType } from "../enums";
import type { IClient } from "../models/client";
import type { IMessage } from "../models/message";
import type { Handler } from "./handler";
export interface IHandlersRegistry {
registerHandler(messageType: MessageType, handler: Handler): void;
handle(client: IClient | undefined, message: IMessage): boolean;
registerHandler(messageType: MessageType, handler: Handler): void;
handle(client: IClient | undefined, message: IMessage): boolean;
}
export class HandlersRegistry implements IHandlersRegistry {
private readonly handlers: Map<MessageType, Handler> = new Map();
private readonly handlers: Map<MessageType, Handler> = new Map();
public registerHandler(messageType: MessageType, handler: Handler): void {
if (this.handlers.has(messageType)) return;
public registerHandler(messageType: MessageType, handler: Handler): void {
if (this.handlers.has(messageType)) return;
this.handlers.set(messageType, handler);
}
this.handlers.set(messageType, handler);
}
public handle(client: IClient | undefined, message: IMessage): boolean {
const { type } = message;
public handle(client: IClient | undefined, message: IMessage): boolean {
const { type } = message;
const handler = this.handlers.get(type);
const handler = this.handlers.get(type);
if (!handler) return false;
if (!handler) return false;
return handler(client, message);
}
return handler(client, message);
}
}

View File

@ -1,40 +1,66 @@
import { MessageType } from "../enums";
import { IClient } from "../models/client";
import { IMessage } from "../models/message";
import { IRealm } from "../models/realm";
import { Handler } from "./handler";
import { HeartbeatHandler, TransmissionHandler } from "./handlers";
import { IHandlersRegistry, HandlersRegistry } from "./handlersRegistry";
import type { IHandlersRegistry } from "./handlersRegistry";
import { HandlersRegistry } from "./handlersRegistry";
import type { IClient } from "../models/client";
import type { IMessage } from "../models/message";
import type { IRealm } from "../models/realm";
import type { Handler } from "./handler";
export interface IMessageHandler {
handle(client: IClient | undefined, message: IMessage): boolean;
handle(client: IClient | undefined, message: IMessage): boolean;
}
export class MessageHandler implements IMessageHandler {
constructor(realm: IRealm, private readonly handlersRegistry: IHandlersRegistry = new HandlersRegistry()) {
const transmissionHandler: Handler = TransmissionHandler({ realm });
const heartbeatHandler: Handler = HeartbeatHandler;
constructor(
realm: IRealm,
private readonly handlersRegistry: IHandlersRegistry = new HandlersRegistry(),
) {
const transmissionHandler: Handler = TransmissionHandler({ realm });
const heartbeatHandler: Handler = HeartbeatHandler;
const handleTransmission: Handler = (client: IClient | undefined, { type, src, dst, payload }: IMessage): boolean => {
return transmissionHandler(client, {
type,
src,
dst,
payload,
});
};
const handleTransmission: Handler = (
client: IClient | undefined,
{ type, src, dst, payload }: IMessage,
): boolean => {
return transmissionHandler(client, {
type,
src,
dst,
payload,
});
};
const handleHeartbeat = (client: IClient | undefined, message: IMessage) => heartbeatHandler(client, message);
const handleHeartbeat = (client: IClient | undefined, message: IMessage) =>
heartbeatHandler(client, message);
this.handlersRegistry.registerHandler(MessageType.HEARTBEAT, handleHeartbeat);
this.handlersRegistry.registerHandler(MessageType.OFFER, handleTransmission);
this.handlersRegistry.registerHandler(MessageType.ANSWER, handleTransmission);
this.handlersRegistry.registerHandler(MessageType.CANDIDATE, handleTransmission);
this.handlersRegistry.registerHandler(MessageType.LEAVE, handleTransmission);
this.handlersRegistry.registerHandler(MessageType.EXPIRE, handleTransmission);
}
this.handlersRegistry.registerHandler(
MessageType.HEARTBEAT,
handleHeartbeat,
);
this.handlersRegistry.registerHandler(
MessageType.OFFER,
handleTransmission,
);
this.handlersRegistry.registerHandler(
MessageType.ANSWER,
handleTransmission,
);
this.handlersRegistry.registerHandler(
MessageType.CANDIDATE,
handleTransmission,
);
this.handlersRegistry.registerHandler(
MessageType.LEAVE,
handleTransmission,
);
this.handlersRegistry.registerHandler(
MessageType.EXPIRE,
handleTransmission,
);
}
public handle(client: IClient | undefined, message: IMessage): boolean {
return this.handlersRegistry.handle(client, message);
}
public handle(client: IClient | undefined, message: IMessage): boolean {
return this.handlersRegistry.handle(client, message);
}
}

View File

@ -1,57 +1,57 @@
import { MyWebSocket } from "../services/webSocketServer/webSocket";
import type WebSocket from "ws";
export interface IClient {
getId(): string;
getId(): string;
getToken(): string;
getToken(): string;
getSocket(): MyWebSocket | null;
getSocket(): WebSocket | null;
setSocket(socket: MyWebSocket | null): void;
setSocket(socket: WebSocket | null): void;
getLastPing(): number;
getLastPing(): number;
setLastPing(lastPing: number): void;
setLastPing(lastPing: number): void;
send<T>(data: T): void;
send<T>(data: T): void;
}
export class Client implements IClient {
private readonly id: string;
private readonly token: string;
private socket: MyWebSocket | null = null;
private lastPing: number = new Date().getTime();
private readonly id: string;
private readonly token: string;
private socket: WebSocket | null = null;
private lastPing: number = new Date().getTime();
constructor({ id, token }: { id: string; token: string; }) {
this.id = id;
this.token = token;
}
constructor({ id, token }: { id: string; token: string }) {
this.id = id;
this.token = token;
}
public getId(): string {
return this.id;
}
public getId(): string {
return this.id;
}
public getToken(): string {
return this.token;
}
public getToken(): string {
return this.token;
}
public getSocket(): MyWebSocket | null {
return this.socket;
}
public getSocket(): WebSocket | null {
return this.socket;
}
public setSocket(socket: MyWebSocket | null): void {
this.socket = socket;
}
public setSocket(socket: WebSocket | null): void {
this.socket = socket;
}
public getLastPing(): number {
return this.lastPing;
}
public getLastPing(): number {
return this.lastPing;
}
public setLastPing(lastPing: number): void {
this.lastPing = lastPing;
}
public setLastPing(lastPing: number): void {
this.lastPing = lastPing;
}
public send<T>(data: T): void {
this.socket?.send(JSON.stringify(data));
}
public send<T>(data: T): void {
this.socket?.send(JSON.stringify(data));
}
}

View File

@ -1,8 +1,8 @@
import { MessageType } from "../enums";
import type { MessageType } from "../enums";
export interface IMessage {
readonly type: MessageType;
readonly src: string;
readonly dst: string;
readonly payload?: any;
readonly type: MessageType;
readonly src: string;
readonly dst: string;
readonly payload?: string | undefined;
}

View File

@ -1,37 +1,37 @@
import { IMessage } from "./message";
import type { IMessage } from "./message";
export interface IMessageQueue {
getLastReadAt(): number;
getLastReadAt(): number;
addMessage(message: IMessage): void;
addMessage(message: IMessage): void;
readMessage(): IMessage | undefined;
readMessage(): IMessage | undefined;
getMessages(): IMessage[];
getMessages(): IMessage[];
}
export class MessageQueue implements IMessageQueue {
private lastReadAt: number = new Date().getTime();
private readonly messages: IMessage[] = [];
private lastReadAt: number = new Date().getTime();
private readonly messages: IMessage[] = [];
public getLastReadAt(): number {
return this.lastReadAt;
}
public getLastReadAt(): number {
return this.lastReadAt;
}
public addMessage(message: IMessage): void {
this.messages.push(message);
}
public addMessage(message: IMessage): void {
this.messages.push(message);
}
public readMessage(): IMessage | undefined {
if (this.messages.length > 0) {
this.lastReadAt = new Date().getTime();
return this.messages.shift();
}
public readMessage(): IMessage | undefined {
if (this.messages.length > 0) {
this.lastReadAt = new Date().getTime();
return this.messages.shift();
}
return undefined;
}
return undefined;
}
public getMessages(): IMessage[] {
return this.messages;
}
public getMessages(): IMessage[] {
return this.messages;
}
}

View File

@ -1,83 +1,84 @@
import uuidv4 from "uuid/v4";
import { IClient } from "./client";
import { IMessage } from "./message";
import { IMessageQueue, MessageQueue } from "./messageQueue";
import type { IMessageQueue } from "./messageQueue";
import { MessageQueue } from "./messageQueue";
import { randomUUID } from "node:crypto";
import type { IClient } from "./client";
import type { IMessage } from "./message";
export interface IRealm {
getClientsIds(): string[];
getClientsIds(): string[];
getClientById(clientId: string): IClient | undefined;
getClientById(clientId: string): IClient | undefined;
getClientsIdsWithQueue(): string[];
getClientsIdsWithQueue(): string[];
setClient(client: IClient, id: string): void;
setClient(client: IClient, id: string): void;
removeClientById(id: string): boolean;
removeClientById(id: string): boolean;
getMessageQueueById(id: string): IMessageQueue | undefined;
getMessageQueueById(id: string): IMessageQueue | undefined;
addMessageToQueue(id: string, message: IMessage): void;
addMessageToQueue(id: string, message: IMessage): void;
clearMessageQueue(id: string): void;
clearMessageQueue(id: string): void;
generateClientId(generateClientId?: () => string): string;
generateClientId(generateClientId?: () => string): string;
}
export class Realm implements IRealm {
private readonly clients: Map<string, IClient> = new Map();
private readonly messageQueues: Map<string, IMessageQueue> = new Map();
private readonly clients: Map<string, IClient> = new Map();
private readonly messageQueues: Map<string, IMessageQueue> = new Map();
public getClientsIds(): string[] {
return [...this.clients.keys()];
}
public getClientsIds(): string[] {
return [...this.clients.keys()];
}
public getClientById(clientId: string): IClient | undefined {
return this.clients.get(clientId);
}
public getClientById(clientId: string): IClient | undefined {
return this.clients.get(clientId);
}
public getClientsIdsWithQueue(): string[] {
return [...this.messageQueues.keys()];
}
public getClientsIdsWithQueue(): string[] {
return [...this.messageQueues.keys()];
}
public setClient(client: IClient, id: string): void {
this.clients.set(id, client);
}
public setClient(client: IClient, id: string): void {
this.clients.set(id, client);
}
public removeClientById(id: string): boolean {
const client = this.getClientById(id);
public removeClientById(id: string): boolean {
const client = this.getClientById(id);
if (!client) return false;
if (!client) return false;
this.clients.delete(id);
this.clients.delete(id);
return true;
}
return true;
}
public getMessageQueueById(id: string): IMessageQueue | undefined {
return this.messageQueues.get(id);
}
public getMessageQueueById(id: string): IMessageQueue | undefined {
return this.messageQueues.get(id);
}
public addMessageToQueue(id: string, message: IMessage): void {
if (!this.getMessageQueueById(id)) {
this.messageQueues.set(id, new MessageQueue());
}
public addMessageToQueue(id: string, message: IMessage): void {
if (!this.getMessageQueueById(id)) {
this.messageQueues.set(id, new MessageQueue());
}
this.getMessageQueueById(id)?.addMessage(message);
}
this.getMessageQueueById(id)?.addMessage(message);
}
public clearMessageQueue(id: string): void {
this.messageQueues.delete(id);
}
public clearMessageQueue(id: string): void {
this.messageQueues.delete(id);
}
public generateClientId(generateClientId?: () => string): string {
const generateId = generateClientId ? generateClientId : uuidv4;
public generateClientId(generateClientId?: () => string): string {
const generateId = generateClientId ? generateClientId : randomUUID;
let clientId = generateId();
let clientId = generateId();
while (this.getClientById(clientId)) {
clientId = generateId();
}
while (this.getClientById(clientId)) {
clientId = generateId();
}
return clientId;
}
return clientId;
}
}

View File

@ -1,77 +1,81 @@
import { IConfig } from "../../config";
import { IClient } from "../../models/client";
import { IRealm } from "../../models/realm";
import type { IConfig } from "../../config";
import type { IClient } from "../../models/client";
import type { IRealm } from "../../models/realm";
const DEFAULT_CHECK_INTERVAL = 300;
type CustomConfig = Pick<IConfig, 'alive_timeout'>;
type CustomConfig = Pick<IConfig, "alive_timeout">;
export class CheckBrokenConnections {
public readonly checkInterval: number;
private timeoutId: NodeJS.Timeout | null = null;
private readonly realm: IRealm;
private readonly config: CustomConfig;
private readonly onClose?: (client: IClient) => void;
public readonly checkInterval: number;
private timeoutId: NodeJS.Timeout | null = null;
private readonly realm: IRealm;
private readonly config: CustomConfig;
private readonly onClose?: (client: IClient) => void;
constructor({
realm,
config,
checkInterval = DEFAULT_CHECK_INTERVAL,
onClose,
}: {
realm: IRealm;
config: CustomConfig;
checkInterval?: number;
onClose?: (client: IClient) => void;
}) {
this.realm = realm;
this.config = config;
this.onClose = onClose;
this.checkInterval = checkInterval;
}
constructor({ realm, config, checkInterval = DEFAULT_CHECK_INTERVAL, onClose }: {
realm: IRealm;
config: CustomConfig;
checkInterval?: number;
onClose?: (client: IClient) => void;
}) {
this.realm = realm;
this.config = config;
this.onClose = onClose;
this.checkInterval = checkInterval;
}
public start(): void {
if (this.timeoutId) {
clearTimeout(this.timeoutId);
}
public start(): void {
if (this.timeoutId) {
clearTimeout(this.timeoutId);
}
this.timeoutId = setTimeout(() => {
this.checkConnections();
this.timeoutId = setTimeout(() => {
this.checkConnections();
this.timeoutId = null;
this.timeoutId = null;
this.start();
}, this.checkInterval);
}
this.start();
}, this.checkInterval);
}
public stop(): void {
if (this.timeoutId) {
clearTimeout(this.timeoutId);
this.timeoutId = null;
}
}
public stop(): void {
if (this.timeoutId) {
clearTimeout(this.timeoutId);
this.timeoutId = null;
}
}
private checkConnections(): void {
const clientsIds = this.realm.getClientsIds();
private checkConnections(): void {
const clientsIds = this.realm.getClientsIds();
const now = new Date().getTime();
const { alive_timeout: aliveTimeout } = this.config;
const now = new Date().getTime();
const { alive_timeout: aliveTimeout } = this.config;
for (const clientId of clientsIds) {
const client = this.realm.getClientById(clientId);
for (const clientId of clientsIds) {
const client = this.realm.getClientById(clientId);
if (!client) continue;
if (!client) continue;
const timeSinceLastPing = now - client.getLastPing();
const timeSinceLastPing = now - client.getLastPing();
if (timeSinceLastPing < aliveTimeout) continue;
if (timeSinceLastPing < aliveTimeout) continue;
try {
client.getSocket()?.close();
} finally {
this.realm.clearMessageQueue(clientId);
this.realm.removeClientById(clientId);
try {
client.getSocket()?.close();
} finally {
this.realm.clearMessageQueue(clientId);
this.realm.removeClientById(clientId);
client.setSocket(null);
client.setSocket(null);
this.onClose?.(client);
}
}
}
this.onClose?.(client);
}
}
}
}

View File

@ -1,88 +1,92 @@
import { IConfig } from "../../config";
import { MessageType } from "../../enums";
import { IMessageHandler } from "../../messageHandler";
import { IRealm } from "../../models/realm";
import type { IConfig } from "../../config";
import type { IMessageHandler } from "../../messageHandler";
import type { IRealm } from "../../models/realm";
export interface IMessagesExpire {
startMessagesExpiration(): void;
stopMessagesExpiration(): void;
startMessagesExpiration(): void;
stopMessagesExpiration(): void;
}
type CustomConfig = Pick<IConfig, 'cleanup_out_msgs' | 'expire_timeout'>;
type CustomConfig = Pick<IConfig, "cleanup_out_msgs" | "expire_timeout">;
export class MessagesExpire implements IMessagesExpire {
private readonly realm: IRealm;
private readonly config: CustomConfig;
private readonly messageHandler: IMessageHandler;
private readonly realm: IRealm;
private readonly config: CustomConfig;
private readonly messageHandler: IMessageHandler;
private timeoutId: NodeJS.Timeout | null = null;
private timeoutId: NodeJS.Timeout | null = null;
constructor({ realm, config, messageHandler }: {
realm: IRealm;
config: CustomConfig;
messageHandler: IMessageHandler;
}) {
this.realm = realm;
this.config = config;
this.messageHandler = messageHandler;
}
constructor({
realm,
config,
messageHandler,
}: {
realm: IRealm;
config: CustomConfig;
messageHandler: IMessageHandler;
}) {
this.realm = realm;
this.config = config;
this.messageHandler = messageHandler;
}
public startMessagesExpiration(): void {
if (this.timeoutId) {
clearTimeout(this.timeoutId);
}
public startMessagesExpiration(): void {
if (this.timeoutId) {
clearTimeout(this.timeoutId);
}
// Clean up outstanding messages
this.timeoutId = setTimeout(() => {
this.pruneOutstanding();
// Clean up outstanding messages
this.timeoutId = setTimeout(() => {
this.pruneOutstanding();
this.timeoutId = null;
this.timeoutId = null;
this.startMessagesExpiration();
}, this.config.cleanup_out_msgs);
}
this.startMessagesExpiration();
}, this.config.cleanup_out_msgs);
}
public stopMessagesExpiration(): void {
if (this.timeoutId) {
clearTimeout(this.timeoutId);
this.timeoutId = null;
}
}
public stopMessagesExpiration(): void {
if (this.timeoutId) {
clearTimeout(this.timeoutId);
this.timeoutId = null;
}
}
private pruneOutstanding(): void {
const destinationClientsIds = this.realm.getClientsIdsWithQueue();
private pruneOutstanding(): void {
const destinationClientsIds = this.realm.getClientsIdsWithQueue();
const now = new Date().getTime();
const maxDiff = this.config.expire_timeout;
const now = new Date().getTime();
const maxDiff = this.config.expire_timeout;
const seen: Record<string, boolean> = {};
const seen: Record<string, boolean> = {};
for (const destinationClientId of destinationClientsIds) {
const messageQueue = this.realm.getMessageQueueById(destinationClientId);
for (const destinationClientId of destinationClientsIds) {
const messageQueue = this.realm.getMessageQueueById(destinationClientId);
if (!messageQueue) continue;
if (!messageQueue) continue;
const lastReadDiff = now - messageQueue.getLastReadAt();
const lastReadDiff = now - messageQueue.getLastReadAt();
if (lastReadDiff < maxDiff) continue;
if (lastReadDiff < maxDiff) continue;
const messages = messageQueue.getMessages();
const messages = messageQueue.getMessages();
for (const message of messages) {
const seenKey = `${message.src}_${message.dst}`;
for (const message of messages) {
const seenKey = `${message.src}_${message.dst}`;
if (!seen[seenKey]) {
this.messageHandler.handle(undefined, {
type: MessageType.EXPIRE,
src: message.dst,
dst: message.src
});
if (!seen[seenKey]) {
this.messageHandler.handle(undefined, {
type: MessageType.EXPIRE,
src: message.dst,
dst: message.src,
});
seen[seenKey] = true;
}
}
seen[seenKey] = true;
}
}
this.realm.clearMessageQueue(destinationClientId);
}
}
this.realm.clearMessageQueue(destinationClientId);
}
}
}

View File

@ -1,143 +1,173 @@
import EventEmitter from "events";
import { IncomingMessage } from "http";
import url from "url";
import WebSocketLib from "ws";
import { IConfig } from "../../config";
import { EventEmitter } from "node:events";
import type { IncomingMessage } from "node:http";
import url from "node:url";
import type WebSocket from "ws";
import { Errors, MessageType } from "../../enums";
import { Client, IClient } from "../../models/client";
import { IRealm } from "../../models/realm";
import { MyWebSocket } from "./webSocket";
import type { IClient } from "../../models/client";
import { Client } from "../../models/client";
import type { IConfig } from "../../config";
import type { IRealm } from "../../models/realm";
import { WebSocketServer as Server } from "ws";
import type { Server as HttpServer } from "node:http";
import type { Server as HttpsServer } from "node:https";
export interface IWebSocketServer extends EventEmitter {
readonly path: string;
readonly path: string;
}
interface IAuthParams {
id?: string;
token?: string;
key?: string;
id?: string;
token?: string;
key?: string;
}
type CustomConfig = Pick<IConfig, 'path' | 'key' | 'concurrent_limit'>;
type CustomConfig = Pick<
IConfig,
"path" | "key" | "concurrent_limit" | "createWebSocketServer"
>;
const WS_PATH = 'peerjs';
const WS_PATH = "peerjs";
export class WebSocketServer extends EventEmitter implements IWebSocketServer {
public readonly path: string;
private readonly realm: IRealm;
private readonly config: CustomConfig;
public readonly socketServer: Server;
public readonly path: string;
private readonly realm: IRealm;
private readonly config: CustomConfig;
public readonly socketServer: WebSocketLib.Server;
constructor({
server,
realm,
config,
}: {
server: HttpServer | HttpsServer;
realm: IRealm;
config: CustomConfig;
}) {
super();
constructor({ server, realm, config }: { server: any; realm: IRealm; config: CustomConfig; }) {
super();
this.setMaxListeners(0);
this.setMaxListeners(0);
this.realm = realm;
this.config = config;
this.realm = realm;
this.config = config;
const path = this.config.path;
this.path = `${path}${path.endsWith("/") ? "" : "/"}${WS_PATH}`;
const path = this.config.path;
this.path = `${path}${path.endsWith('/') ? "" : "/"}${WS_PATH}`;
const options: WebSocket.ServerOptions = {
path: this.path,
server,
};
this.socketServer = new WebSocketLib.Server({ path: this.path, server });
this.socketServer = config.createWebSocketServer
? config.createWebSocketServer(options)
: new Server(options);
this.socketServer.on("connection", (socket: MyWebSocket, req) => this._onSocketConnection(socket, req));
this.socketServer.on("error", (error: Error) => this._onSocketError(error));
}
this.socketServer.on("connection", (socket, req) =>
this._onSocketConnection(socket, req),
);
this.socketServer.on("error", (error: Error) => this._onSocketError(error));
}
private _onSocketConnection(socket: MyWebSocket, req: IncomingMessage): void {
const { query = {} } = url.parse(req.url ?? '', true);
private _onSocketConnection(socket: WebSocket, req: IncomingMessage): void {
// An unhandled socket error might crash the server. Handle it first.
socket.on("error", (error) => this._onSocketError(error));
const { id, token, key }: IAuthParams = query;
const { query = {} } = url.parse(req.url ?? "", true);
if (!id || !token || !key) {
return this._sendErrorAndClose(socket, Errors.INVALID_WS_PARAMETERS);
}
const { id, token, key }: IAuthParams = query;
if (key !== this.config.key) {
return this._sendErrorAndClose(socket, Errors.INVALID_KEY);
}
if (!id || !token || !key) {
return this._sendErrorAndClose(socket, Errors.INVALID_WS_PARAMETERS);
}
const client = this.realm.getClientById(id);
if (key !== this.config.key) {
return this._sendErrorAndClose(socket, Errors.INVALID_KEY);
}
if (client) {
if (token !== client.getToken()) {
// ID-taken, invalid token
socket.send(JSON.stringify({
type: MessageType.ID_TAKEN,
payload: { msg: "ID is taken" }
}));
const client = this.realm.getClientById(id);
return socket.close();
}
if (client) {
if (token !== client.getToken()) {
// ID-taken, invalid token
socket.send(
JSON.stringify({
type: MessageType.ID_TAKEN,
payload: { msg: "ID is taken" },
}),
);
return this._configureWS(socket, client);
}
return socket.close();
}
this._registerClient({ socket, id, token });
}
return this._configureWS(socket, client);
}
private _onSocketError(error: Error): void {
// handle error
this.emit("error", error);
}
this._registerClient({ socket, id, token });
}
private _registerClient({ socket, id, token }:
{
socket: MyWebSocket;
id: string;
token: string;
}): void {
// Check concurrent limit
const clientsCount = this.realm.getClientsIds().length;
private _onSocketError(error: Error): void {
// handle error
this.emit("error", error);
}
if (clientsCount >= this.config.concurrent_limit) {
return this._sendErrorAndClose(socket, Errors.CONNECTION_LIMIT_EXCEED);
}
private _registerClient({
socket,
id,
token,
}: {
socket: WebSocket;
id: string;
token: string;
}): void {
// Check concurrent limit
const clientsCount = this.realm.getClientsIds().length;
const newClient: IClient = new Client({ id, token });
this.realm.setClient(newClient, id);
socket.send(JSON.stringify({ type: MessageType.OPEN }));
if (clientsCount >= this.config.concurrent_limit) {
return this._sendErrorAndClose(socket, Errors.CONNECTION_LIMIT_EXCEED);
}
this._configureWS(socket, newClient);
}
const newClient: IClient = new Client({ id, token });
this.realm.setClient(newClient, id);
socket.send(JSON.stringify({ type: MessageType.OPEN }));
private _configureWS(socket: MyWebSocket, client: IClient): void {
client.setSocket(socket);
this._configureWS(socket, newClient);
}
// Cleanup after a socket closes.
socket.on("close", () => {
if (client.getSocket() === socket) {
this.realm.removeClientById(client.getId());
this.emit("close", client);
}
});
private _configureWS(socket: WebSocket, client: IClient): void {
client.setSocket(socket);
// Handle messages from peers.
socket.on("message", (data: WebSocketLib.Data) => {
try {
const message = JSON.parse(data as string);
// Cleanup after a socket closes.
socket.on("close", () => {
if (client.getSocket() === socket) {
this.realm.removeClientById(client.getId());
this.emit("close", client);
}
});
message.src = client.getId();
// Handle messages from peers.
socket.on("message", (data) => {
try {
const message = JSON.parse(data.toString());
this.emit("message", client, message);
} catch (e) {
this.emit("error", e);
}
});
message.src = client.getId();
this.emit("connection", client);
}
this.emit("message", client, message);
} catch (e) {
this.emit("error", e);
}
});
private _sendErrorAndClose(socket: MyWebSocket, msg: Errors): void {
socket.send(
JSON.stringify({
type: MessageType.ERROR,
payload: { msg }
})
);
this.emit("connection", client);
}
socket.close();
}
private _sendErrorAndClose(socket: WebSocket, msg: Errors): void {
socket.send(
JSON.stringify({
type: MessageType.ERROR,
payload: { msg },
}),
);
socket.close();
}
}

View File

@ -1,4 +0,0 @@
import EventEmitter from "events";
import WebSocketLib from "ws";
export type MyWebSocket = WebSocketLib & EventEmitter;

View File

@ -1,16 +0,0 @@
import { expect } from 'chai';
import { Client } from '../../../../src/models/client';
import { HeartbeatHandler } from '../../../../src/messageHandler/handlers';
describe('Heartbeat handler', () => {
it('should update last ping time', () => {
const client = new Client({ id: 'id', token: '' });
client.setLastPing(0);
const nowTime = new Date().getTime();
HeartbeatHandler(client);
expect(client.getLastPing()).to.be.closeTo(nowTime, 2);
});
});

View File

@ -1,96 +0,0 @@
import { expect } from 'chai';
import { Client } from '../../../../src/models/client';
import { TransmissionHandler } from '../../../../src/messageHandler/handlers';
import { Realm } from '../../../../src/models/realm';
import { MessageType } from '../../../../src/enums';
import { MyWebSocket } from '../../../../src/services/webSocketServer/webSocket';
const createFakeSocket = (): MyWebSocket => {
/* eslint-disable @typescript-eslint/no-empty-function */
const sock = {
send: (): void => { },
close: (): void => { },
on: (): void => { },
};
/* eslint-enable @typescript-eslint/no-empty-function */
return (sock as unknown as MyWebSocket);
};
describe('Transmission handler', () => {
it('should save message in queue when destination client not connected', () => {
const realm = new Realm();
const handleTransmission = TransmissionHandler({ realm });
const clientFrom = new Client({ id: 'id1', token: '' });
const idTo = 'id2';
realm.setClient(clientFrom, clientFrom.getId());
handleTransmission(clientFrom, { type: MessageType.OFFER, src: clientFrom.getId(), dst: idTo });
expect(realm.getMessageQueueById(idTo)?.getMessages().length).to.be.eq(1);
});
it('should not save LEAVE and EXPIRE messages in queue when destination client not connected', () => {
const realm = new Realm();
const handleTransmission = TransmissionHandler({ realm });
const clientFrom = new Client({ id: 'id1', token: '' });
const idTo = 'id2';
realm.setClient(clientFrom, clientFrom.getId());
handleTransmission(clientFrom, { type: MessageType.LEAVE, src: clientFrom.getId(), dst: idTo });
handleTransmission(clientFrom, { type: MessageType.EXPIRE, src: clientFrom.getId(), dst: idTo });
expect(realm.getMessageQueueById(idTo)).to.be.undefined;
});
it('should send message to destination client when destination client connected', () => {
const realm = new Realm();
const handleTransmission = TransmissionHandler({ realm });
const clientFrom = new Client({ id: 'id1', token: '' });
const clientTo = new Client({ id: 'id2', token: '' });
const socketTo = createFakeSocket();
clientTo.setSocket(socketTo);
realm.setClient(clientTo, clientTo.getId());
let sent = false;
socketTo.send = (): void => {
sent = true;
};
handleTransmission(clientFrom, { type: MessageType.OFFER, src: clientFrom.getId(), dst: clientTo.getId() });
expect(sent).to.be.true;
});
it('should send LEAVE message to source client when sending to destination client failed', () => {
const realm = new Realm();
const handleTransmission = TransmissionHandler({ realm });
const clientFrom = new Client({ id: 'id1', token: '' });
const clientTo = new Client({ id: 'id2', token: '' });
const socketFrom = createFakeSocket();
const socketTo = createFakeSocket();
clientFrom.setSocket(socketFrom);
clientTo.setSocket(socketTo);
realm.setClient(clientFrom, clientFrom.getId());
realm.setClient(clientTo, clientTo.getId());
let sent = false;
socketFrom.send = (data: string): void => {
if (JSON.parse(data)?.type === MessageType.LEAVE) {
sent = true;
}
};
socketTo.send = (): void => {
throw Error();
};
handleTransmission(clientFrom, { type: MessageType.OFFER, src: clientFrom.getId(), dst: clientTo.getId() });
expect(sent).to.be.true;
});
});

View File

@ -1,23 +0,0 @@
import { expect } from 'chai';
import { HandlersRegistry } from '../../src/messageHandler/handlersRegistry';
import { Handler } from '../../src/messageHandler/handler';
import { MessageType } from '../../src/enums';
describe('HandlersRegistry', () => {
it('should execute handler for message type', () => {
const handlersRegistry = new HandlersRegistry();
let handled = false;
const handler: Handler = (): boolean => {
handled = true;
return true;
};
handlersRegistry.registerHandler(MessageType.OPEN, handler);
handlersRegistry.handle(undefined, { type: MessageType.OPEN, src: 'src', dst: 'dst' });
expect(handled).to.be.true;
});
});

View File

@ -1,62 +0,0 @@
import { expect } from 'chai';
import { MessageQueue } from '../../src/models/messageQueue';
import { MessageType } from '../../src/enums';
import { IMessage } from '../../src/models/message';
import { wait } from '../utils';
describe('MessageQueue', () => {
const createTestMessage = (): IMessage => {
return {
type: MessageType.OPEN,
src: 'src',
dst: 'dst'
};
};
describe('#addMessage', () => {
it('should add message to queue', () => {
const queue = new MessageQueue();
queue.addMessage(createTestMessage());
expect(queue.getMessages().length).to.eq(1);
});
});
describe('#readMessage', () => {
it('should return undefined for empty queue', () => {
const queue = new MessageQueue();
expect(queue.readMessage()).to.be.undefined;
});
it('should return message if any exists in queue', () => {
const queue = new MessageQueue();
const message = createTestMessage();
queue.addMessage(message);
expect(queue.readMessage()).to.deep.eq(message);
expect(queue.readMessage()).to.be.undefined;
});
});
describe('#getLastReadAt', () => {
it('should not be changed if no messages when read', () => {
const queue = new MessageQueue();
const lastReadAt = queue.getLastReadAt();
queue.readMessage();
expect(queue.getLastReadAt()).to.be.eq(lastReadAt);
});
it('should be changed when read message', async () => {
const queue = new MessageQueue();
const lastReadAt = queue.getLastReadAt();
queue.addMessage(createTestMessage());
await wait(10);
expect(queue.getLastReadAt()).to.be.eq(lastReadAt);
queue.readMessage();
expect(queue.getLastReadAt()).to.be.gte(lastReadAt + 10);
});
});
});

View File

@ -1,50 +0,0 @@
import { expect } from 'chai';
import { Realm } from '../../src/models/realm';
import { Client } from '../../src/models/client';
describe('Realm', () => {
describe('#generateClientId', () => {
it('should generate a 36-character UUID, or return function value', () => {
const realm = new Realm();
expect(realm.generateClientId().length).to.eq(36);
expect(realm.generateClientId(() => 'abcd')).to.eq('abcd');
});
});
describe('#setClient', () => {
it('should add client to realm', () => {
const realm = new Realm();
const client = new Client({ id: 'id', token: '' });
realm.setClient(client, 'id');
expect(realm.getClientsIds()).to.deep.eq(['id']);
});
});
describe('#removeClientById', () => {
it('should remove client from realm', () => {
const realm = new Realm();
const client = new Client({ id: 'id', token: '' });
realm.setClient(client, 'id');
realm.removeClientById('id');
expect(realm.getClientById('id')).to.be.undefined;
});
});
describe('#getClientsIds', () => {
it('should reflects on add/remove childs', () => {
const realm = new Realm();
const client = new Client({ id: 'id', token: '' });
realm.setClient(client, 'id');
expect(realm.getClientsIds()).to.deep.eq(['id']);
expect(realm.getClientById('id')).to.eq(client);
realm.removeClientById('id');
expect(realm.getClientsIds()).to.deep.eq([]);
});
});
});

View File

@ -1,56 +0,0 @@
import { expect } from 'chai';
import http from 'http';
import expectedJson from '../app.json';
import { spawn } from 'child_process';
import path from 'path';
const PORT = '9000';
async function makeRequest() {
return new Promise<object>((resolve, reject) => {
http.get(`http://localhost:${PORT}/`, resp => {
let data = '';
resp.on('data', chunk => {
data += chunk;
});
resp.on('end', () => {
resolve(JSON.parse(data));
});
}).on("error", err => {
console.log("Error: " + err.message);
reject(err);
});
});
}
describe('Check bin/peerjs', () => {
it('should return content of app.json file', async () => {
let resolver: () => void;
let rejecter: (err: Error) => void;
const promise = new Promise<void>((resolve, reject) => {
resolver = resolve;
rejecter = reject;
});
const ls = spawn('node', [path.join(__dirname, '../', 'bin/peerjs'), '--port', PORT]);
ls.stdout.on('data', async (data: string) => {
if (!data.includes('Started')) return;
try {
const resp = await makeRequest();
expect(resp).to.deep.eq(expectedJson);
resolver();
} catch (error) {
rejecter(error);
} finally {
ls.kill('SIGINT');
}
});
return promise;
});
});

View File

@ -1,44 +0,0 @@
import { expect } from 'chai';
import { Client } from '../../../src/models/client';
import { Realm } from '../../../src/models/realm';
import { CheckBrokenConnections } from '../../../src/services/checkBrokenConnections';
import { wait } from '../../utils';
describe('CheckBrokenConnections', () => {
it('should remove client after 2 checks', async () => {
const realm = new Realm();
const doubleCheckTime = 55;//~ equals to checkBrokenConnections.checkInterval * 2
const checkBrokenConnections = new CheckBrokenConnections({ realm, config: { alive_timeout: doubleCheckTime }, checkInterval: 30 });
const client = new Client({ id: 'id', token: '' });
realm.setClient(client, 'id');
checkBrokenConnections.start();
await wait(checkBrokenConnections.checkInterval * 2 + 30);
expect(realm.getClientById('id')).to.be.undefined;
checkBrokenConnections.stop();
});
it('should remove client after 1 ping', async () => {
const realm = new Realm();
const doubleCheckTime = 55;//~ equals to checkBrokenConnections.checkInterval * 2
const checkBrokenConnections = new CheckBrokenConnections({ realm, config: { alive_timeout: doubleCheckTime }, checkInterval: 30 });
const client = new Client({ id: 'id', token: '' });
realm.setClient(client, 'id');
checkBrokenConnections.start();
//set ping after first check
await wait(checkBrokenConnections.checkInterval);
client.setLastPing(new Date().getTime());
await wait(checkBrokenConnections.checkInterval * 2 + 10);
expect(realm.getClientById('id')).to.be.undefined;
checkBrokenConnections.stop();
});
});

View File

@ -1,79 +0,0 @@
import { expect } from 'chai';
import { Client } from '../../../src/models/client';
import { Realm } from '../../../src/models/realm';
import { IMessage } from '../../../src/models/message';
import { MessagesExpire } from '../../../src/services/messagesExpire';
import { MessageHandler } from '../../../src/messageHandler';
import { MessageType } from '../../../src/enums';
import { wait } from '../../utils';
describe('MessagesExpire', () => {
const createTestMessage = (dst: string): IMessage => {
return {
type: MessageType.OPEN,
src: 'src',
dst,
};
};
it('should remove client if no read from queue', async () => {
const realm = new Realm();
const messageHandler = new MessageHandler(realm);
const checkInterval = 10;
const expireTimeout = 50;
const config = { cleanup_out_msgs: checkInterval, expire_timeout: expireTimeout };
const messagesExpire = new MessagesExpire({ realm, config, messageHandler });
const client = new Client({ id: 'id', token: '' });
realm.setClient(client, 'id');
realm.addMessageToQueue(client.getId(), createTestMessage('dst'));
messagesExpire.startMessagesExpiration();
await wait(checkInterval * 2);
expect(realm.getMessageQueueById(client.getId())?.getMessages().length).to.be.eq(1);
await wait(expireTimeout);
expect(realm.getMessageQueueById(client.getId())).to.be.undefined;
messagesExpire.stopMessagesExpiration();
});
it('should fire EXPIRE message', async () => {
const realm = new Realm();
const messageHandler = new MessageHandler(realm);
const checkInterval = 10;
const expireTimeout = 50;
const config = { cleanup_out_msgs: checkInterval, expire_timeout: expireTimeout };
const messagesExpire = new MessagesExpire({ realm, config, messageHandler });
const client = new Client({ id: 'id', token: '' });
realm.setClient(client, 'id');
realm.addMessageToQueue(client.getId(), createTestMessage('dst1'));
realm.addMessageToQueue(client.getId(), createTestMessage('dst2'));
let handledCount = 0;
messageHandler.handle = (client, message): boolean => {
expect(client).to.be.undefined;
expect(message.type).to.be.eq(MessageType.EXPIRE);
handledCount++;
return true;
};
messagesExpire.startMessagesExpiration();
await wait(checkInterval * 2);
await wait(expireTimeout);
expect(handledCount).to.be.eq(2);
messagesExpire.stopMessagesExpiration();
});
});

View File

@ -1,195 +0,0 @@
import { expect } from 'chai';
import { Server, WebSocket } from 'mock-socket';
import { Realm } from '../../../src/models/realm';
import { WebSocketServer } from '../../../src/services/webSocketServer';
import { Errors, MessageType } from '../../../src/enums';
import { wait } from '../../utils';
type Destroyable<T> = T & { destroy?: () => Promise<void>; };
const checkOpen = async (c: WebSocket): Promise<boolean> => {
return new Promise(resolve => {
c.onmessage = (event: object & { data?: string; }): void => {
c.onmessage = null;
const message = JSON.parse(event.data as string);
resolve(message.type === MessageType.OPEN);
};
});
};
const checkSequence = async (c: WebSocket, msgs: { type: MessageType; error?: Errors; }[]): Promise<boolean> => {
return new Promise(resolve => {
const restMessages = [...msgs];
const finish = (success = false): void => {
c.onmessage = null;
resolve(success);
};
c.onmessage = (event: object & { data?: string; }): void => {
const [mes] = restMessages;
if (!mes) {
return finish();
}
restMessages.shift();
const message = JSON.parse(event.data as string);
if (message.type !== mes.type) {
return finish();
}
const isOk = !mes.error || message.payload?.msg === mes.error;
if (!isOk) {
return finish();
}
if (restMessages.length === 0) {
finish(true);
}
};
});
};
const createTestServer = ({ realm, config, url }: { realm: Realm; config: { path: string; key: string; concurrent_limit: number; }; url: string; }): Destroyable<WebSocketServer> => {
const server = new Server(url);
const webSocketServer: Destroyable<WebSocketServer> = new WebSocketServer({ server, realm, config });
server.on('connection', (socket: WebSocket & { on?: (eventName: string, callback: () => void) => void; }) => {
const s = webSocketServer.socketServer;
s.emit('connection', socket, { url: socket.url });
socket.onclose = (): void => {
const userId = socket.url.split('?')[1]?.split('&').find(p => p.startsWith('id'))?.split('=')[1];
if (!userId) return;
const client = realm.getClientById(userId);
const clientSocket = client?.getSocket();
if (!clientSocket) return;
(clientSocket as unknown as WebSocket).listeners['server::close']?.forEach((s: () => void) => s());
};
socket.onmessage = (event: object & { data?: string; }): void => {
const userId = socket.url.split('?')[1]?.split('&').find(p => p.startsWith('id'))?.split('=')[1];
if (!userId) return;
const client = realm.getClientById(userId);
const clientSocket = client?.getSocket();
if (!clientSocket) return;
(clientSocket as unknown as WebSocket).listeners['server::message']?.forEach((s: (data: object) => void) => s(event));
};
});
webSocketServer.destroy = async (): Promise<void> => {
server.close();
};
return webSocketServer;
};
describe('WebSocketServer', () => {
it('should return valid path', () => {
const realm = new Realm();
const config = { path: '/', key: 'testKey', concurrent_limit: 1 };
const config2 = { ...config, path: 'path' };
const server = new Server('path1');
const server2 = new Server('path2');
const webSocketServer = new WebSocketServer({ server, realm, config });
expect(webSocketServer.path).to.be.eq('/peerjs');
const webSocketServer2 = new WebSocketServer({ server: server2, realm, config: config2 });
expect(webSocketServer2.path).to.be.eq('path/peerjs');
server.stop();
server2.stop();
});
it(`should check client's params`, async () => {
const realm = new Realm();
const config = { path: '/', key: 'testKey', concurrent_limit: 1 };
const fakeURL = 'ws://localhost:8080/peerjs';
const getError = async (url: string, validError: Errors = Errors.INVALID_WS_PARAMETERS): Promise<boolean> => {
const webSocketServer = createTestServer({ url, realm, config });
const ws = new WebSocket(url);
const errorSent = await checkSequence(ws, [{ type: MessageType.ERROR, error: validError }]);
ws.close();
await webSocketServer.destroy?.();
return errorSent;
};
expect(await getError(fakeURL)).to.be.true;
expect(await getError(`${fakeURL}?key=${config.key}`)).to.be.true;
expect(await getError(`${fakeURL}?key=${config.key}&id=1`)).to.be.true;
expect(await getError(`${fakeURL}?key=notValidKey&id=userId&token=userToken`, Errors.INVALID_KEY)).to.be.true;
});
it(`should check concurrent limit`, async () => {
const realm = new Realm();
const config = { path: '/', key: 'testKey', concurrent_limit: 1 };
const fakeURL = 'ws://localhost:8080/peerjs';
const createClient = (id: string): Destroyable<WebSocket> => {
const url = `${fakeURL}?key=${config.key}&id=${id}&token=${id}`;
const webSocketServer = createTestServer({ url, realm, config });
const ws: Destroyable<WebSocket> = new WebSocket(url);
ws.destroy = async (): Promise<void> => {
ws.close();
wait(10);
webSocketServer.destroy?.();
wait(10);
ws.destroy = undefined;
};
return ws;
};
const c1 = createClient('1');
expect(await checkOpen(c1)).to.be.true;
const c2 = createClient('2');
expect(await checkSequence(c2, [
{ type: MessageType.ERROR, error: Errors.CONNECTION_LIMIT_EXCEED }
])).to.be.true;
await c1.destroy?.();
await c2.destroy?.();
await wait(10);
expect(realm.getClientsIds().length).to.be.eq(0);
const c3 = createClient('3');
expect(await checkOpen(c3)).to.be.true;
await c3.destroy?.();
});
});

View File

@ -1 +0,0 @@
export const wait = (ms: number): Promise<void> => new Promise(resolve => setTimeout(resolve, ms));

View File

@ -1,27 +1,11 @@
{
"compilerOptions": {
"lib": [
"esnext"
],
"target": "es2016",
"module": "commonjs",
"strict": true,
"esModuleInterop": true,
"downlevelIteration": true,
"moduleResolution": "node",
"noImplicitAny": true,
"noUnusedLocals": true,
"noUnusedParameters": true,
"resolveJsonModule": true,
"skipLibCheck": true,
"sourceMap": false,
"outDir": "dist"
},
"include": [
"./src/**/*",
],
"exclude": [
"test",
"bin",
]
}
"extends": "@tsconfig/node16-strictest-esm/tsconfig.json",
"compilerOptions": {
"lib": ["esnext"],
"noEmit": true,
"resolveJsonModule": true,
"exactOptionalPropertyTypes": false
},
"include": ["./src/**/*", "__test__/**/*"],
"exclude": ["test", "bin"]
}