refactoring index.js

This commit is contained in:
afrokick 2019-12-16 11:16:43 +03:00
parent c4f04b2ff8
commit 3398d65d2d
14 changed files with 314 additions and 169 deletions

56
dist/src/index.js vendored
View File

@ -6,56 +6,8 @@ Object.defineProperty(exports, "__esModule", { value: true });
const express_1 = __importDefault(require("express"));
const http_1 = __importDefault(require("http"));
const https_1 = __importDefault(require("https"));
const api_1 = require("./api");
const config_1 = __importDefault(require("./config"));
const messageHandler_1 = require("./messageHandler");
const realm_1 = require("./models/realm");
const checkBrokenConnections_1 = require("./services/checkBrokenConnections");
const messagesExpire_1 = require("./services/messagesExpire");
const webSocketServer_1 = require("./services/webSocketServer");
const init = ({ app, server, options }) => {
const config = options;
const realm = new realm_1.Realm();
const messageHandler = new messageHandler_1.MessageHandler(realm);
const api = api_1.Api({ config, realm, messageHandler });
const messagesExpire = new messagesExpire_1.MessagesExpire({ realm, config, messageHandler });
const checkBrokenConnections = new checkBrokenConnections_1.CheckBrokenConnections({
realm,
config,
onClose: (client) => {
app.emit("disconnect", client);
}
});
app.use(options.path, api);
const wss = new webSocketServer_1.WebSocketServer({
server,
realm,
config
});
wss.on("connection", (client) => {
const messageQueue = realm.getMessageQueueById(client.getId());
if (messageQueue) {
let message;
while (message = messageQueue.readMessage()) {
messageHandler.handle(client, message);
}
realm.clearMessageQueue(client.getId());
}
app.emit("connection", client);
});
wss.on("message", (client, message) => {
app.emit("message", client, message);
messageHandler.handle(client, message);
});
wss.on("close", (client) => {
app.emit("disconnect", client);
});
wss.on("error", (error) => {
app.emit("error", error);
});
messagesExpire.startMessagesExpiration();
checkBrokenConnections.start();
};
const instance_1 = require("./instance");
function ExpressPeerServer(server, options) {
const app = express_1.default();
const newOptions = Object.assign(Object.assign({}, config_1.default), options);
@ -67,7 +19,7 @@ function ExpressPeerServer(server, options) {
throw new Error("Server is not passed to constructor - " +
"can't start PeerServer");
}
init({ app, server, options: newOptions });
instance_1.createInstance({ app, server, options: newOptions });
});
return app;
}
@ -77,10 +29,10 @@ function PeerServer(options = {}, callback) {
const newOptions = Object.assign(Object.assign({}, config_1.default), options);
let path = newOptions.path;
const port = newOptions.port;
if (path[0] !== "/") {
if (!path.startsWith('/')) {
path = "/" + path;
}
if (path[path.length - 1] !== "/") {
if (!path.endsWith('/')) {
path += "/";
}
let server;

51
dist/src/instance.js vendored Normal file
View File

@ -0,0 +1,51 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const realm_1 = require("./models/realm");
const checkBrokenConnections_1 = require("./services/checkBrokenConnections");
const messagesExpire_1 = require("./services/messagesExpire");
const webSocketServer_1 = require("./services/webSocketServer");
const messageHandler_1 = require("./messageHandler");
const api_1 = require("./api");
exports.createInstance = ({ app, server, options }) => {
const config = options;
const realm = new realm_1.Realm();
const messageHandler = new messageHandler_1.MessageHandler(realm);
const api = api_1.Api({ config, realm, messageHandler });
const messagesExpire = new messagesExpire_1.MessagesExpire({ realm, config, messageHandler });
const checkBrokenConnections = new checkBrokenConnections_1.CheckBrokenConnections({
realm,
config,
onClose: client => {
app.emit("disconnect", client);
}
});
app.use(options.path, api);
const wss = new webSocketServer_1.WebSocketServer({
server,
realm,
config
});
wss.on("connection", (client) => {
const messageQueue = realm.getMessageQueueById(client.getId());
if (messageQueue) {
let message;
while (message = messageQueue.readMessage()) {
messageHandler.handle(client, message);
}
realm.clearMessageQueue(client.getId());
}
app.emit("connection", client);
});
wss.on("message", (client, message) => {
app.emit("message", client, message);
messageHandler.handle(client, message);
});
wss.on("close", (client) => {
app.emit("disconnect", client);
});
wss.on("error", (error) => {
app.emit("error", error);
});
messagesExpire.startMessagesExpiration();
checkBrokenConnections.start();
};

View File

@ -40,7 +40,8 @@ exports.TransmissionHandler = ({ realm }) => {
else {
// Wait for this client to connect/reconnect (XHR) for important
// messages.
if (type !== enums_1.MessageType.LEAVE && type !== enums_1.MessageType.EXPIRE && dstId) {
const ignoredTypes = [enums_1.MessageType.LEAVE, enums_1.MessageType.EXPIRE];
if (!ignoredTypes.includes(type) && dstId) {
realm.addMessageToQueue(dstId, message);
}
else if (type === enums_1.MessageType.LEAVE && !dstId) {

View File

@ -0,0 +1,20 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
class HandlersRegistry {
constructor() {
this.handlers = new Map();
}
registerHandler(messageType, handler) {
if (this.handlers.has(messageType))
return;
this.handlers.set(messageType, handler);
}
handle(client, message) {
const { type } = message;
const handler = this.handlers.get(type);
if (!handler)
return false;
return handler(client, message);
}
}
exports.HandlersRegistry = HandlersRegistry;

View File

@ -2,30 +2,30 @@
Object.defineProperty(exports, "__esModule", { value: true });
const enums_1 = require("../enums");
const handlers_1 = require("./handlers");
const messageHandlers_1 = require("./messageHandlers");
const handlersRegistry_1 = require("./handlersRegistry");
class MessageHandler {
constructor(realm) {
this.messageHandlers = new messageHandlers_1.MessageHandlers();
constructor(realm, handlersRegistry = new handlersRegistry_1.HandlersRegistry()) {
this.handlersRegistry = handlersRegistry;
const transmissionHandler = handlers_1.TransmissionHandler({ realm });
const heartbeatHandler = handlers_1.HeartbeatHandler;
const handleTransmission = (client, message) => {
const handleTransmission = (client, { type, src, dst, payload }) => {
return transmissionHandler(client, {
type: message.type,
src: message.src,
dst: message.dst,
payload: message.payload
type,
src,
dst,
payload,
});
};
const handleHeartbeat = (client, message) => heartbeatHandler(client, message);
this.messageHandlers.registerHandler(enums_1.MessageType.HEARTBEAT, handleHeartbeat);
this.messageHandlers.registerHandler(enums_1.MessageType.OFFER, handleTransmission);
this.messageHandlers.registerHandler(enums_1.MessageType.ANSWER, handleTransmission);
this.messageHandlers.registerHandler(enums_1.MessageType.CANDIDATE, handleTransmission);
this.messageHandlers.registerHandler(enums_1.MessageType.LEAVE, handleTransmission);
this.messageHandlers.registerHandler(enums_1.MessageType.EXPIRE, handleTransmission);
this.handlersRegistry.registerHandler(enums_1.MessageType.HEARTBEAT, handleHeartbeat);
this.handlersRegistry.registerHandler(enums_1.MessageType.OFFER, handleTransmission);
this.handlersRegistry.registerHandler(enums_1.MessageType.ANSWER, handleTransmission);
this.handlersRegistry.registerHandler(enums_1.MessageType.CANDIDATE, handleTransmission);
this.handlersRegistry.registerHandler(enums_1.MessageType.LEAVE, handleTransmission);
this.handlersRegistry.registerHandler(enums_1.MessageType.EXPIRE, handleTransmission);
}
handle(client, message) {
return this.messageHandlers.handle(client, message);
return this.handlersRegistry.handle(client, message);
}
}
exports.MessageHandler = MessageHandler;

View File

@ -8,6 +8,7 @@ const url_1 = __importDefault(require("url"));
const ws_1 = __importDefault(require("ws"));
const enums_1 = require("../../enums");
const client_1 = require("../../models/client");
const WS_PATH = 'peerjs';
class WebSocketServer extends events_1.default {
constructor({ server, realm, config }) {
super();
@ -15,10 +16,10 @@ class WebSocketServer extends events_1.default {
this.realm = realm;
this.config = config;
const path = this.config.path;
this.path = path + (path[path.length - 1] !== "/" ? "/" : "") + "peerjs";
this.webSocketServer = new ws_1.default.Server({ path, server });
this.webSocketServer.on("connection", (socket, req) => this._onSocketConnection(socket, req));
this.webSocketServer.on("error", (error) => this._onSocketError(error));
this.path = `${path}${path.endsWith('/') ? "" : "/"}${WS_PATH}`;
this.socketServer = new ws_1.default.Server({ path: this.path, server });
this.socketServer.on("connection", (socket, req) => this._onSocketConnection(socket, req));
this.socketServer.on("error", (error) => this._onSocketError(error));
}
_onSocketConnection(socket, req) {
const { query = {} } = url_1.default.parse(req.url, true);

View File

@ -10,9 +10,9 @@ import CallsApi from "./v1/calls";
import PublicApi from "./v1/public";
export const Api = ({ config, realm, messageHandler }: {
config: IConfig,
realm: IRealm,
messageHandler: IMessageHandler
config: IConfig;
realm: IRealm;
messageHandler: IMessageHandler;
}): express.Router => {
const authMiddleware = new AuthMiddleware(config, realm);

View File

@ -1,77 +1,13 @@
import { IRealm } from "./models/realm";
import express from "express";
import http from "http";
import https from "https";
import { Server } from "net";
import { Api } from "./api";
import defaultConfig, { IConfig } from "./config";
import { MessageHandler } from "./messageHandler";
import { IClient } from "./models/client";
import { IMessage } from "./models/message";
import { Realm } from "./models/realm";
import { CheckBrokenConnections } from "./services/checkBrokenConnections";
import { IMessagesExpire, MessagesExpire } from "./services/messagesExpire";
import { IWebSocketServer, WebSocketServer } from "./services/webSocketServer";
import { createInstance } from "./instance";
const init = ({ app, server, options }: {
app: express.Express,
server: Server,
options: IConfig;
}) => {
const config = options;
const realm: IRealm = new Realm();
const messageHandler = new MessageHandler(realm);
const api = Api({ config, realm, messageHandler });
const messagesExpire: IMessagesExpire = new MessagesExpire({ realm, config, messageHandler });
const checkBrokenConnections = new CheckBrokenConnections({
realm,
config,
onClose: (client: IClient) => {
app.emit("disconnect", client);
}
});
app.use(options.path, api);
const wss: IWebSocketServer = new WebSocketServer({
server,
realm,
config
});
wss.on("connection", (client: IClient) => {
const messageQueue = realm.getMessageQueueById(client.getId());
if (messageQueue) {
let message: IMessage | undefined;
while (message = messageQueue.readMessage()) {
messageHandler.handle(client, message);
}
realm.clearMessageQueue(client.getId());
}
app.emit("connection", client);
});
wss.on("message", (client: IClient, message: IMessage) => {
app.emit("message", client, message);
messageHandler.handle(client, message);
});
wss.on("close", (client: IClient) => {
app.emit("disconnect", client);
});
wss.on("error", (error: Error) => {
app.emit("error", error);
});
messagesExpire.startMessagesExpiration();
checkBrokenConnections.start();
type Optional<T> = {
[P in keyof T]?: (T[P] | undefined);
};
function ExpressPeerServer(server: Server, options?: IConfig) {
@ -92,16 +28,12 @@ function ExpressPeerServer(server: Server, options?: IConfig) {
"can't start PeerServer");
}
init({ app, server, options: newOptions });
createInstance({ app, server, options: newOptions });
});
return app;
}
type Optional<T> = {
[P in keyof T]?: (T[P] | undefined);
};
function PeerServer(options: Optional<IConfig> = {}, callback?: (server: Server) => void) {
const app = express();
@ -113,11 +45,11 @@ function PeerServer(options: Optional<IConfig> = {}, callback?: (server: Server)
let path = newOptions.path;
const port = newOptions.port;
if (path[0] !== "/") {
if (!path.startsWith('/')) {
path = "/" + path;
}
if (path[path.length - 1] !== "/") {
if (!path.endsWith('/')) {
path += "/";
}

71
src/instance.ts Normal file
View File

@ -0,0 +1,71 @@
import express from "express";
import { Server } from "net";
import { IClient } from "./models/client";
import { IMessage } from "./models/message";
import { Realm } from "./models/realm";
import { IRealm } from "./models/realm";
import { CheckBrokenConnections } from "./services/checkBrokenConnections";
import { IMessagesExpire, MessagesExpire } from "./services/messagesExpire";
import { IWebSocketServer, WebSocketServer } from "./services/webSocketServer";
import { MessageHandler } from "./messageHandler";
import { Api } from "./api";
import { IConfig } from "./config";
export const createInstance = ({ app, server, options }: {
app: express.Application,
server: Server,
options: IConfig;
}): void => {
const config = options;
const realm: IRealm = new Realm();
const messageHandler = new MessageHandler(realm);
const api = Api({ config, realm, messageHandler });
const messagesExpire: IMessagesExpire = new MessagesExpire({ realm, config, messageHandler });
const checkBrokenConnections = new CheckBrokenConnections({
realm,
config,
onClose: client => {
app.emit("disconnect", client);
}
});
app.use(options.path, api);
const wss: IWebSocketServer = new WebSocketServer({
server,
realm,
config
});
wss.on("connection", (client: IClient) => {
const messageQueue = realm.getMessageQueueById(client.getId());
if (messageQueue) {
let message: IMessage | undefined;
while (message = messageQueue.readMessage()) {
messageHandler.handle(client, message);
}
realm.clearMessageQueue(client.getId());
}
app.emit("connection", client);
});
wss.on("message", (client: IClient, message: IMessage) => {
app.emit("message", client, message);
messageHandler.handle(client, message);
});
wss.on("close", (client: IClient) => {
app.emit("disconnect", client);
});
wss.on("error", (error: Error) => {
app.emit("error", error);
});
messagesExpire.startMessagesExpiration();
checkBrokenConnections.start();
};

View File

@ -3,12 +3,12 @@ import { IClient } from "../models/client";
import { IMessage } from "../models/message";
import { Handler } from "./handler";
export interface IMessageHandlers {
export interface IHandlersRegistry {
registerHandler(messageType: MessageType, handler: Handler): void;
handle(client: IClient | undefined, message: IMessage): boolean;
}
export class MessageHandlers implements IMessageHandlers {
export class HandlersRegistry implements IHandlersRegistry {
private readonly handlers: Map<MessageType, Handler> = new Map();
public registerHandler(messageType: MessageType, handler: Handler): void {

View File

@ -4,39 +4,37 @@ import { IMessage } from "../models/message";
import { IRealm } from "../models/realm";
import { Handler } from "./handler";
import { HeartbeatHandler, TransmissionHandler } from "./handlers";
import { IMessageHandlers, MessageHandlers } from "./messageHandlers";
import { IHandlersRegistry, HandlersRegistry } from "./handlersRegistry";
export interface IMessageHandler {
handle(client: IClient | undefined, message: IMessage): boolean;
}
export class MessageHandler implements IMessageHandler {
private readonly messageHandlers: IMessageHandlers = new MessageHandlers();
constructor(realm: IRealm) {
constructor(realm: IRealm, private readonly handlersRegistry: IHandlersRegistry = new HandlersRegistry()) {
const transmissionHandler: Handler = TransmissionHandler({ realm });
const heartbeatHandler: Handler = HeartbeatHandler;
const handleTransmission: Handler = (client: IClient | undefined, message: IMessage): boolean => {
const handleTransmission: Handler = (client: IClient | undefined, { type, src, dst, payload }: IMessage): boolean => {
return transmissionHandler(client, {
type: message.type,
src: message.src,
dst: message.dst,
payload: message.payload
type,
src,
dst,
payload,
});
};
const handleHeartbeat = (client: IClient | undefined, message: IMessage) => heartbeatHandler(client, message);
this.messageHandlers.registerHandler(MessageType.HEARTBEAT, handleHeartbeat);
this.messageHandlers.registerHandler(MessageType.OFFER, handleTransmission);
this.messageHandlers.registerHandler(MessageType.ANSWER, handleTransmission);
this.messageHandlers.registerHandler(MessageType.CANDIDATE, handleTransmission);
this.messageHandlers.registerHandler(MessageType.LEAVE, handleTransmission);
this.messageHandlers.registerHandler(MessageType.EXPIRE, handleTransmission);
this.handlersRegistry.registerHandler(MessageType.HEARTBEAT, handleHeartbeat);
this.handlersRegistry.registerHandler(MessageType.OFFER, handleTransmission);
this.handlersRegistry.registerHandler(MessageType.ANSWER, handleTransmission);
this.handlersRegistry.registerHandler(MessageType.CANDIDATE, handleTransmission);
this.handlersRegistry.registerHandler(MessageType.LEAVE, handleTransmission);
this.handlersRegistry.registerHandler(MessageType.EXPIRE, handleTransmission);
}
public handle(client: IClient | undefined, message: IMessage): boolean {
return this.messageHandlers.handle(client, message);
return this.handlersRegistry.handle(client, message);
}
}

View File

@ -40,7 +40,7 @@ export class WebSocketServer extends EventEmitter implements IWebSocketServer {
const path = this.config.path;
this.path = `${path}${path.endsWith('/') ? "" : "/"}${WS_PATH}`;
this.socketServer = new WebSocketLib.Server({ path, server });
this.socketServer = new WebSocketLib.Server({ path: this.path, server });
this.socketServer.on("connection", (socket: MyWebSocket, req) => this._onSocketConnection(socket, req));
this.socketServer.on("error", (error: Error) => this._onSocketError(error));

View File

@ -0,0 +1,96 @@
import { expect } from 'chai';
import { Client } from '../../../../src/models/client';
import { TransmissionHandler } from '../../../../src/messageHandler/handlers';
import { Realm } from '../../../../src/models/realm';
import { MessageType } from '../../../../src/enums';
import { MyWebSocket } from '../../../../src/services/webSocketServer/webSocket';
const createFakeSocket = (): MyWebSocket => {
/* eslint-disable @typescript-eslint/no-empty-function */
const sock = {
send: (): void => { },
close: (): void => { },
on: (): void => { },
};
/* eslint-enable @typescript-eslint/no-empty-function */
return (sock as unknown as MyWebSocket);
};
describe('Transmission handler', () => {
it('should save message in queue when destination client not connected', () => {
const realm = new Realm();
const handleTransmission = TransmissionHandler({ realm });
const clientFrom = new Client({ id: 'id1', token: '' });
const idTo = 'id2';
realm.setClient(clientFrom, clientFrom.getId());
handleTransmission(clientFrom, { type: MessageType.OFFER, src: clientFrom.getId(), dst: idTo });
expect(realm.getMessageQueueById(idTo)?.getMessages().length).to.be.eq(1);
});
it('should not save LEAVE and EXPIRE messages in queue when destination client not connected', () => {
const realm = new Realm();
const handleTransmission = TransmissionHandler({ realm });
const clientFrom = new Client({ id: 'id1', token: '' });
const idTo = 'id2';
realm.setClient(clientFrom, clientFrom.getId());
handleTransmission(clientFrom, { type: MessageType.LEAVE, src: clientFrom.getId(), dst: idTo });
handleTransmission(clientFrom, { type: MessageType.EXPIRE, src: clientFrom.getId(), dst: idTo });
expect(realm.getMessageQueueById(idTo)).to.be.undefined;
});
it('should send message to destination client when destination client connected', () => {
const realm = new Realm();
const handleTransmission = TransmissionHandler({ realm });
const clientFrom = new Client({ id: 'id1', token: '' });
const clientTo = new Client({ id: 'id2', token: '' });
const socketTo = createFakeSocket();
clientTo.setSocket(socketTo);
realm.setClient(clientTo, clientTo.getId());
let sent = false;
socketTo.send = (): void => {
sent = true;
};
handleTransmission(clientFrom, { type: MessageType.OFFER, src: clientFrom.getId(), dst: clientTo.getId() });
expect(sent).to.be.true;
});
it('should send LEAVE message to source client when sending to destination client failed', () => {
const realm = new Realm();
const handleTransmission = TransmissionHandler({ realm });
const clientFrom = new Client({ id: 'id1', token: '' });
const clientTo = new Client({ id: 'id2', token: '' });
const socketFrom = createFakeSocket();
const socketTo = createFakeSocket();
clientFrom.setSocket(socketFrom);
clientTo.setSocket(socketTo);
realm.setClient(clientFrom, clientFrom.getId());
realm.setClient(clientTo, clientTo.getId());
let sent = false;
socketFrom.send = (data: string): void => {
if (JSON.parse(data)?.type === MessageType.LEAVE) {
sent = true;
}
};
socketTo.send = (): void => {
throw Error();
};
handleTransmission(clientFrom, { type: MessageType.OFFER, src: clientFrom.getId(), dst: clientTo.getId() });
expect(sent).to.be.true;
});
});

View File

@ -0,0 +1,23 @@
import { expect } from 'chai';
import { HandlersRegistry } from '../../src/messageHandler/handlersRegistry';
import { Handler } from '../../src/messageHandler/handler';
import { MessageType } from '../../src/enums';
describe('HandlersRegistry', () => {
it('should execute handler for message type', () => {
const handlersRegistry = new HandlersRegistry();
let handled = false;
const handler: Handler = (): boolean => {
handled = true;
return true;
};
handlersRegistry.registerHandler(MessageType.OPEN, handler);
handlersRegistry.handle(undefined, { type: MessageType.OPEN, src: 'src', dst: 'dst' });
expect(handled).to.be.true;
});
});