Skip to content

Commit

Permalink
feat(repeater): replace rmq with web-sockets
Browse files Browse the repository at this point in the history
closes #196
  • Loading branch information
ostridm committed May 20, 2024
1 parent fd2e5cd commit 589c90e
Show file tree
Hide file tree
Showing 25 changed files with 1,302 additions and 544 deletions.
360 changes: 317 additions & 43 deletions package-lock.json

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,20 @@
"@har-sdk/core": "^1.4.3",
"amqp-connection-manager": "^4.1.13",
"amqplib": "^0.10.3",
"arch": "^3.0.0",
"axios": "^0.26.1",
"axios-rate-limit": "^1.3.0",
"chalk": "^4.1.2",
"ci-info": "^3.3.0",
"content-type": "^1.0.4",
"find-up": "^5.0.0",
"form-data": "^4.0.0",
"http-proxy-agent": "^7.0.2",
"https-proxy-agent": "^7.0.4",
"reflect-metadata": "^0.1.13",
"semver": "^7.5.2",
"socket.io-client": "^4.7.5",
"socket.io-msgpack-parser": "^3.0.2",
"socks-proxy-agent": "^6.2.0-beta.0",
"tslib": "~2.3.1",
"tsyringe": "^4.6.0",
Expand Down
8 changes: 6 additions & 2 deletions packages/core/src/logger/Logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,12 @@ export class Logger {
this._logLevel = logLevel;
}

public error(message: string, ...args: any[]): void {
this.write(message, LogLevel.ERROR, ...args);
public error(errorOrMessage: string | Error, ...args: any[]): void {
if (typeof errorOrMessage === 'string') {
this.write(errorOrMessage, LogLevel.ERROR, ...args);
} else {
this.write(errorOrMessage.message, LogLevel.ERROR, ...args);
}
}

public warn(message: string, ...args: any[]): void {
Expand Down
32 changes: 1 addition & 31 deletions packages/repeater/src/api/DefaultRepeatersManager.spec.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
import 'reflect-metadata';
import {
CreateRepeaterRequest,
DeleteRepeaterRequest,
GetRepeaterRequest
} from './commands';
import { CreateRepeaterRequest, DeleteRepeaterRequest } from './commands';
import { DefaultRepeatersManager } from './DefaultRepeatersManager';
import { RepeatersManager } from './RepeatersManager';
import { CommandDispatcher } from '@sectester/core';
Expand Down Expand Up @@ -67,32 +63,6 @@ describe('DefaultRepeatersManager', () => {
});
});

describe('getRepeater', () => {
it('should return repeater', async () => {
const repeaterId = '142';
when(
mockedCommandDispatcher.execute(anyOfClass(GetRepeaterRequest))
).thenResolve({ id: repeaterId });

const result = await manager.getRepeater(repeaterId);

verify(
mockedCommandDispatcher.execute(anyOfClass(GetRepeaterRequest))
).once();
expect(result).toMatchObject({ repeaterId });
});

it('should throw an error if cannot find repeater', async () => {
when(
mockedCommandDispatcher.execute(anyOfClass(GetRepeaterRequest))
).thenResolve();

const act = manager.getRepeater('123');

await expect(act).rejects.toThrow('Cannot find repeater');
});
});

describe('deleteRepeater', () => {
it('should remove repeater', async () => {
when(
Expand Down
20 changes: 1 addition & 19 deletions packages/repeater/src/api/DefaultRepeatersManager.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
import { RepeatersManager } from './RepeatersManager';
import {
CreateRepeaterRequest,
DeleteRepeaterRequest,
GetRepeaterRequest
} from './commands';
import { CreateRepeaterRequest, DeleteRepeaterRequest } from './commands';
import { inject, injectable } from 'tsyringe';
import { CommandDispatcher } from '@sectester/core';

Expand All @@ -14,20 +10,6 @@ export class DefaultRepeatersManager implements RepeatersManager {
private readonly commandDispatcher: CommandDispatcher
) {}

public async getRepeater(
repeaterId: string
): Promise<{ repeaterId: string }> {
const repeater = await this.commandDispatcher.execute(
new GetRepeaterRequest(repeaterId)
);

if (!repeater?.id) {
throw new Error('Cannot find repeater');
}

return { repeaterId: repeater.id };
}

public async createRepeater({
projectId,
...options
Expand Down
2 changes: 0 additions & 2 deletions packages/repeater/src/api/RepeatersManager.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
export interface RepeatersManager {
getRepeater(repeaterId: string): Promise<{ repeaterId: string }>;

createRepeater(options: {
name: string;
projectId?: string;
Expand Down
20 changes: 0 additions & 20 deletions packages/repeater/src/api/commands/GetRepeaterRequest.ts

This file was deleted.

1 change: 0 additions & 1 deletion packages/repeater/src/api/commands/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
export { CreateRepeaterRequest } from './CreateRepeaterRequest';
export { DeleteRepeaterRequest } from './DeleteRepeaterRequest';
export { GetRepeaterRequest } from './GetRepeaterRequest';
export {
RegisterRepeaterCommand,
RegisterRepeaterCommandPayload,
Expand Down
157 changes: 157 additions & 0 deletions packages/repeater/src/bus/DefaultRepeaterBus.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
import { RepeaterBus } from './RepeaterBus';
import { RepeaterCommandHub } from './RepeaterCommandHub';
import {
RepeaterErrorCodes,
RepeaterServer,
RepeaterServerErrorEvent,
RepeaterServerEvents,
RepeaterServerReconnectionFailedEvent,
RepeaterServerRequestEvent
} from './RepeaterServer';
import { Request } from '../request-runner/Request';
import { Logger } from '@sectester/core';
import chalk from 'chalk';

export class DefaultRepeaterBus implements RepeaterBus {
private repeaterRunning: boolean = false;

constructor(
private readonly repeaterId: string,
private readonly logger: Logger,
private readonly repeaterServer: RepeaterServer,
private readonly commandHub: RepeaterCommandHub
) {}

public close() {
this.repeaterRunning = false;

this.repeaterServer.disconnect();

return Promise.resolve();
}

public async connect(): Promise<void> {
if (this.repeaterRunning) {
return;
}

this.repeaterRunning = true;

this.logger.log('Connecting the Repeater (%s)...', this.repeaterId);

this.subscribeToEvents();

await this.repeaterServer.connect(this.repeaterId);

this.logger.log('Deploying the Repeater (%s)...', this.repeaterId);

await this.repeaterServer.deploy({
repeaterId: this.repeaterId
});

this.logger.log('The Repeater (%s) started', this.repeaterId);
}

private subscribeToEvents() {
this.repeaterServer.on(RepeaterServerEvents.ERROR, this.handleError);
this.repeaterServer.on(
RepeaterServerEvents.RECONNECTION_FAILED,
this.reconnectionFailed
);
this.repeaterServer.on(RepeaterServerEvents.REQUEST, this.requestReceived);
this.repeaterServer.on(RepeaterServerEvents.UPDATE_AVAILABLE, payload =>
this.logger.warn(
'%s: A new Repeater version (%s) is available, for update instruction visit https://docs.brightsec.com/docs/installation-options',
chalk.yellow('(!) IMPORTANT'),
payload.version
)
);
this.repeaterServer.on(
RepeaterServerEvents.RECONNECT_ATTEMPT,
({ attempt, maxAttempts }) =>
this.logger.warn(
'Failed to connect to Bright cloud (attempt %d/%d)',
attempt,
maxAttempts
)
);
this.repeaterServer.on(RepeaterServerEvents.RECONNECTION_SUCCEEDED, () =>
this.logger.log('The Repeater (%s) connected', this.repeaterId)
);
}

private handleError = ({
code,
message,
remediation
}: RepeaterServerErrorEvent) => {
const normalizedMessage = this.normalizeMessage(message);
const normalizedRemediation = this.normalizeMessage(remediation ?? '');

if (this.isCriticalError(code)) {
this.handleCriticalError(normalizedMessage, normalizedRemediation);
} else {
this.logger.error(normalizedMessage);
}
};

private normalizeMessage(message: string): string {
return message.replace(/\.$/, '');
}

private isCriticalError(code: RepeaterErrorCodes): boolean {
return [
RepeaterErrorCodes.REPEATER_DEACTIVATED,
RepeaterErrorCodes.REPEATER_NO_LONGER_SUPPORTED,
RepeaterErrorCodes.REPEATER_UNAUTHORIZED,
RepeaterErrorCodes.REPEATER_ALREADY_STARTED,
RepeaterErrorCodes.REPEATER_NOT_PERMITTED,
RepeaterErrorCodes.UNEXPECTED_ERROR
].includes(code);
}

private handleCriticalError(message: string, remediation: string): void {
this.logger.error(
'%s: %s. %s',
chalk.red('(!) CRITICAL'),
message,
remediation
);
this.close().catch(this.logger.error);
process.exitCode = 1;
}

private reconnectionFailed = ({
error
}: RepeaterServerReconnectionFailedEvent) => {
this.logger.error(error);
this.close().catch(this.logger.error);
process.exitCode = 1;
};

private requestReceived = async (event: RepeaterServerRequestEvent) => {
const response = await this.commandHub.sendRequest(
new Request({ ...event })
);

const {
statusCode,
message,
errorCode,
body,
headers,
protocol,
encoding
} = response;

return {
protocol,
body,
headers,
statusCode,
errorCode,
message,
encoding
};
};
}
33 changes: 33 additions & 0 deletions packages/repeater/src/bus/DefaultRepeaterBusFactory.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import { RepeaterBus } from './RepeaterBus';
import { DefaultRepeaterBus } from './DefaultRepeaterBus';
import { RepeaterBusFactory } from './RepeaterBusFactory';
import { RepeaterCommandHub } from './RepeaterCommandHub';
import { RepeaterServer } from './RepeaterServer';
import { Configuration, Logger } from '@sectester/core';
import { inject, injectable } from 'tsyringe';

@injectable()
export class DefaultRepeaterBusFactory implements RepeaterBusFactory {
constructor(
private readonly logger: Logger,
private readonly configuration: Configuration,
@inject(RepeaterServer) private readonly repeaterServer: RepeaterServer,
@inject(RepeaterCommandHub)
private readonly commandHub: RepeaterCommandHub
) {}

public create(repeaterId: string): RepeaterBus {
this.logger.log(
'Creating the repeater (%s, %s)...',
repeaterId,
this.configuration.version
);

return new DefaultRepeaterBus(
repeaterId,
this.logger,
this.repeaterServer,
this.commandHub
);
}
}
25 changes: 25 additions & 0 deletions packages/repeater/src/bus/DefaultRepeaterCommandHub.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { RepeaterCommandHub } from './RepeaterCommandHub';
import { Request, Response, RequestRunner } from '../request-runner';
import { injectable, injectAll } from 'tsyringe';

@injectable()
export class DefaultRepeaterCommandHub implements RepeaterCommandHub {
constructor(
@injectAll(RequestRunner)
private readonly requestRunners: RequestRunner[]
) {}

public sendRequest(request: Request): Promise<Response> {
const { protocol } = request;

const requestRunner = this.requestRunners.find(
x => x.protocol === protocol
);

if (!requestRunner) {
throw new Error(`Unsupported protocol "${protocol}"`);
}

return requestRunner.run(request);
}
}
Loading

0 comments on commit 589c90e

Please sign in to comment.