Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add worker, fill out CLI, + various lints #4

Merged
merged 2 commits into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion agents/build.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import dts from 'bun-plugin-dts';

await Bun.build({
entrypoints: ['./src/index.ts', './src/tts/index.ts', './src/stt/index.ts'],
entrypoints: ['./src/index.ts', './src/tts/index.ts', './src/stt/index.ts', './src/cli.ts'],
outdir: './dist',
target: 'bun', // https://github.com/oven-sh/bun/blob/main/src/bundler/bundle_v2.zig#L2667
sourcemap: 'external',
Expand Down
6 changes: 4 additions & 2 deletions agents/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
},
"devDependencies": {
"@types/bun": "latest",
"@types/ws": "^8.5.10",
"bun-plugin-dts": "^0.2.1",
"eslint": "^8.57.0",
"eslint-config-prettier": "^9.1.0",
Expand All @@ -25,10 +26,11 @@
"typescript": "^5.0.0"
},
"dependencies": {
"@livekit/protocol": "^1.12.0",
"@livekit/protocol": "^1.13.0",
"commander": "^12.0.0",
"livekit-server-sdk": "^2.1.2",
"pino": "^8.19.0",
"pino-pretty": "^11.0.0"
"pino-pretty": "^11.0.0",
"ws": "^8.16.0"
}
}
107 changes: 70 additions & 37 deletions agents/src/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,44 +2,77 @@
//
// SPDX-License-Identifier: Apache-2.0

import { version } from './index';
import { version } from '.';
import { Option, Command } from 'commander';
import { WorkerOptions, Worker } from './worker';
import { EventEmitter } from 'events';
import { log } from './log';

const program = new Command();
program
.name('agents')
.description('LiveKit Agents CLI')
.version(version)
.addOption(
new Option('--log-level', 'Set the logging level').choices([
'DEBUG',
'INFO',
'WARNING',
'ERROR',
'CRITICAL',
]),
);

program
.command('start')
.description('Start the worker')
.addOption(
new Option('--url <string>', 'LiveKit server or Cloud project websocket URL')
.makeOptionMandatory(true)
.env('LIVEKIT_URL'),
)
.addOption(
new Option('--api-key <string>', "LiveKit server or Cloud project's API key")
.makeOptionMandatory(true)
.env('LIVEKIT_API_KEY'),
)
.addOption(
new Option('--api-secret <string>', "LiveKit server or Cloud project's API secret")
.makeOptionMandatory(true)
.env('LIVEKIT_API_SECRET'),
)
.action(() => {
return;
type CliArgs = {
opts: WorkerOptions;
logLevel: string;
production: boolean;
watch: boolean;
event?: EventEmitter;
};

const runWorker = async (args: CliArgs) => {
log.level = args.logLevel;
const worker = new Worker(args.opts);

process.on('SIGINT', async () => {
await worker.close();
process.exit(130); // SIGINT exit code
});

program.parse();
try {
await worker.run();
} catch {
log.fatal('worker failed');
}
};

export const runApp = (opts: WorkerOptions) => {
const program = new Command()
.name('agents')
.description('LiveKit Agents CLI')
.version(version)
.addOption(
new Option('--log-level <level>', 'Set the logging level')
.choices(['trace', 'debug', 'info', 'warn', 'error', 'fatal'])
.default('trace'),
)
.addOption(
new Option('--url <string>', 'LiveKit server or Cloud project websocket URL')
.makeOptionMandatory(true)
.env('LIVEKIT_URL'),
)
.addOption(
new Option('--api-key <string>', "LiveKit server or Cloud project's API key")
.makeOptionMandatory(true)
.env('LIVEKIT_API_KEY'),
)
.addOption(
new Option('--api-secret <string>', "LiveKit server or Cloud project's API secret")
.makeOptionMandatory(true)
.env('LIVEKIT_API_SECRET'),
)

program
.command('start')
.description('Start the worker in production mode')
.action(() => {
const options = program.optsWithGlobals()
opts.wsURL = options.url || opts.wsURL;
opts.apiKey = options.apiKey || opts.apiKey;
opts.apiSecret = options.apiSecret || opts.apiSecret;
runWorker({
opts,
production: true,
watch: false,
logLevel: options.logLevel,
});
});

program.parse();
};
3 changes: 3 additions & 0 deletions agents/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,6 @@
export * from './vad';
export * from './plugin';
export * from './version';
export * from './job_context';
export * from './job_request';
export * from './worker';
9 changes: 5 additions & 4 deletions agents/src/ipc/job_process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ import { runJob } from './job_main';
import { EventEmitter } from 'events';
import { log } from '../log';

const START_TIMEOUT = 90;
const PING_INTERVAL = 5;
const PING_TIMEOUT = 90;
const HIGH_PING_THRESHOLD = 10; // milliseconds
const START_TIMEOUT = 90 * 1000;
const PING_INTERVAL = 5 * 1000;
const PING_TIMEOUT = 90 * 1000;
const HIGH_PING_THRESHOLD = 10;

export class JobProcess {
#job: Job;
Expand Down Expand Up @@ -103,6 +103,7 @@ export class JobProcess {
const delay = Date.now() - msg.timestamp;
if (delay > HIGH_PING_THRESHOLD) {
this.logger.warn(`job is unresponsive (${delay}ms delay)`);
// @ts-expect-error: this actually works fine types/bun doesn't have a typedecl for it yet
pongTimeout.refresh();
}
} else if (msg instanceof UserExit || msg instanceof ShutdownResponse) {
Expand Down
2 changes: 1 addition & 1 deletion agents/src/ipc/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ export class StartJobRequest implements Message {

export class StartJobResponse implements Message {
static MSG_ID = 1;
err: Error | undefined;
err?: Error;

get MSG_ID(): number {
return StartJobResponse.MSG_ID;
Expand Down
2 changes: 1 addition & 1 deletion agents/src/job_context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { EventEmitter } from 'events';
export class JobContext {
#job: Job;
#room: Room;
#publisher: RemoteParticipant | undefined;
#publisher?: RemoteParticipant;
tx: EventEmitter;

constructor(
Expand Down
8 changes: 2 additions & 6 deletions agents/src/job_request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
// SPDX-License-Identifier: Apache-2.0

import { JobContext } from './job_context';
import { VideoGrant } from 'livekit-server-sdk';
import { Job, ParticipantInfo, Room } from '@livekit/protocol';
import { log } from './log';
import { EventEmitter } from 'events';
Expand Down Expand Up @@ -35,16 +34,15 @@ export type AcceptData = {
entry: AgentEntry;
autoSubscribe: AutoSubscribe;
autoDisconnect: AutoDisconnect;
grants: VideoGrant;
name: string;
identity: string;
metadata: string;
assign: EventEmitter;
};

type AvailRes = {
export type AvailRes = {
avail: boolean;
data: AcceptData | undefined;
data?: AcceptData;
};

export class JobRequest {
Expand Down Expand Up @@ -91,7 +89,6 @@ export class JobRequest {
entry: AgentEntry,
autoSubscribe: AutoSubscribe = AutoSubscribe.SUBSCRIBE_ALL,
autoDisconnect: AutoDisconnect = AutoDisconnect.ROOM_EMPTY,
grants: VideoGrant,
name: string = '',
identity: string = '',
metadata: string = '',
Expand All @@ -110,7 +107,6 @@ export class JobRequest {
entry,
autoSubscribe,
autoDisconnect,
grants,
name,
identity,
metadata,
Expand Down
2 changes: 1 addition & 1 deletion agents/src/stt/stream_adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ export class StreamAdapterWrapper extends SpeechStream {
stt: STT;
vadStream: VADStream;
eventQueue: (SpeechEvent | undefined)[];
language: string | undefined;
language?: string;
task: {
run: Promise<void>;
cancel: () => void;
Expand Down
2 changes: 1 addition & 1 deletion agents/src/stt/stt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ export abstract class STT {
this.#streamingSupported = streamingSupported;
}

abstract recognize(buffer: AudioBuffer, language: string | undefined): Promise<SpeechEvent>;
abstract recognize(buffer: AudioBuffer, language?: string): Promise<SpeechEvent>;

abstract stream(language: string | undefined): SpeechStream;

Expand Down
2 changes: 1 addition & 1 deletion agents/src/tokenize.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ export interface SegmentedSentence {
}

export abstract class SentenceTokenizer {
abstract tokenize(text: string, language: string | undefined): SegmentedSentence[];
abstract tokenize(text: string, language?: string): SegmentedSentence[];
abstract stream(language: string | undefined): SentenceStream;
}

Expand Down
4 changes: 2 additions & 2 deletions agents/src/tts/tts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ export enum SynthesisEventType {

export class SynthesisEvent {
type: SynthesisEventType;
audio: SynthesizedAudio | undefined;
audio?: SynthesizedAudio;

constructor(type: SynthesisEventType, audio: SynthesizedAudio | undefined = undefined) {
this.type = type;
Expand All @@ -26,7 +26,7 @@ export class SynthesisEvent {
}

export abstract class SynthesizeStream implements IterableIterator<SynthesisEvent> {
abstract pushText(token: string | undefined): void;
abstract pushText(token?: string): void;

markSegmentEnd() {
this.pushText(undefined);
Expand Down
Loading