Skip to content

Commit

Permalink
refactor: Encapsulate task runner startup to module (#11531)
Browse files Browse the repository at this point in the history
  • Loading branch information
tomi authored Nov 4, 2024
1 parent d49686c commit 9355fc3
Show file tree
Hide file tree
Showing 13 changed files with 224 additions and 50 deletions.
18 changes: 3 additions & 15 deletions packages/cli/src/commands/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus'
import { EventService } from '@/events/event.service';
import { ExecutionService } from '@/executions/execution.service';
import { License } from '@/license';
import { LocalTaskManager } from '@/runners/task-managers/local-task-manager';
import { TaskManager } from '@/runners/task-managers/task-manager';
import { PubSubHandler } from '@/scaling/pubsub/pubsub-handler';
import { Subscriber } from '@/scaling/pubsub/subscriber.service';
import { Server } from '@/server';
Expand Down Expand Up @@ -224,19 +222,9 @@ export class Start extends BaseCommand {

const { taskRunners: taskRunnerConfig } = this.globalConfig;
if (!taskRunnerConfig.disabled) {
Container.set(TaskManager, new LocalTaskManager());
const { TaskRunnerServer } = await import('@/runners/task-runner-server');
const taskRunnerServer = Container.get(TaskRunnerServer);
await taskRunnerServer.start();

if (
taskRunnerConfig.mode === 'internal_childprocess' ||
taskRunnerConfig.mode === 'internal_launcher'
) {
const { TaskRunnerProcess } = await import('@/runners/task-runner-process');
const runnerProcess = Container.get(TaskRunnerProcess);
await runnerProcess.start();
}
const { TaskRunnerModule } = await import('@/runners/task-runner-module');
const taskRunnerModule = Container.get(TaskRunnerModule);
await taskRunnerModule.start();
}
}

Expand Down
18 changes: 3 additions & 15 deletions packages/cli/src/commands/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import { EventMessageGeneric } from '@/eventbus/event-message-classes/event-mess
import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus';
import { LogStreamingEventRelay } from '@/events/relays/log-streaming.event-relay';
import { Logger } from '@/logging/logger.service';
import { LocalTaskManager } from '@/runners/task-managers/local-task-manager';
import { TaskManager } from '@/runners/task-managers/task-manager';
import { PubSubHandler } from '@/scaling/pubsub/pubsub-handler';
import { Subscriber } from '@/scaling/pubsub/subscriber.service';
import type { ScalingService } from '@/scaling/scaling.service';
Expand Down Expand Up @@ -116,19 +114,9 @@ export class Worker extends BaseCommand {

const { taskRunners: taskRunnerConfig } = this.globalConfig;
if (!taskRunnerConfig.disabled) {
Container.set(TaskManager, new LocalTaskManager());
const { TaskRunnerServer } = await import('@/runners/task-runner-server');
const taskRunnerServer = Container.get(TaskRunnerServer);
await taskRunnerServer.start();

if (
taskRunnerConfig.mode === 'internal_childprocess' ||
taskRunnerConfig.mode === 'internal_launcher'
) {
const { TaskRunnerProcess } = await import('@/runners/task-runner-process');
const runnerProcess = Container.get(TaskRunnerProcess);
await runnerProcess.start();
}
const { TaskRunnerModule } = await import('@/runners/task-runner-module');
const taskRunnerModule = Container.get(TaskRunnerModule);
await taskRunnerModule.start();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ describe('TaskRunnerProcess', () => {
});

describe('constructor', () => {
it('should not throw if runner mode is external', () => {
it('should throw if runner mode is external', () => {
runnerConfig.mode = 'external';

expect(() => new TaskRunnerProcess(logger, runnerConfig, authService)).not.toThrow();
expect(() => new TaskRunnerProcess(logger, runnerConfig, authService)).toThrow();

runnerConfig.mode = 'internal_childprocess';
});
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import { Service } from 'typedi';

import { TaskRunnerDisconnectedError } from './errors/task-runner-disconnected-error';
import type { DisconnectAnalyzer } from './runner-types';
import type { TaskRunner } from './task-broker.service';

/**
* Analyzes the disconnect reason of a task runner to provide a more
* meaningful error message to the user.
*/
@Service()
export class DefaultTaskRunnerDisconnectAnalyzer implements DisconnectAnalyzer {
async determineDisconnectReason(runnerId: TaskRunner['id']): Promise<Error> {
return new TaskRunnerDisconnectedError(runnerId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { Service } from 'typedi';

import config from '@/config';

import { TaskRunnerDisconnectedError } from './errors/task-runner-disconnected-error';
import { DefaultTaskRunnerDisconnectAnalyzer } from './default-task-runner-disconnect-analyzer';
import { TaskRunnerOomError } from './errors/task-runner-oom-error';
import { SlidingWindowSignal } from './sliding-window-signal';
import type { TaskRunner } from './task-broker.service';
Expand All @@ -15,13 +15,19 @@ import { TaskRunnerProcess } from './task-runner-process';
* meaningful error message to the user.
*/
@Service()
export class TaskRunnerDisconnectAnalyzer {
export class InternalTaskRunnerDisconnectAnalyzer extends DefaultTaskRunnerDisconnectAnalyzer {
private get isCloudDeployment() {
return config.get('deployment.type') === 'cloud';
}

private readonly exitReasonSignal: SlidingWindowSignal<TaskRunnerProcessEventMap, 'exit'>;

constructor(
private readonly runnerConfig: TaskRunnersConfig,
private readonly taskRunnerProcess: TaskRunnerProcess,
) {
super();

// When the task runner process is running as a child process, there's
// no determinate time when it exits compared to when the runner disconnects
// (i.e. it's a race condition). Hence we use a sliding window to determine
Expand All @@ -32,17 +38,13 @@ export class TaskRunnerDisconnectAnalyzer {
});
}

private get isCloudDeployment() {
return config.get('deployment.type') === 'cloud';
}

async determineDisconnectReason(runnerId: TaskRunner['id']): Promise<Error> {
const exitCode = await this.awaitExitSignal();
if (exitCode === 'oom') {
return new TaskRunnerOomError(runnerId, this.isCloudDeployment);
}

return new TaskRunnerDisconnectedError(runnerId);
return await super.determineDisconnectReason(runnerId);
}

private async awaitExitSignal(): Promise<ExitReason> {
Expand Down
6 changes: 6 additions & 0 deletions packages/cli/src/runners/runner-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ export interface TaskDataRequestParams {
env: boolean;
}

export interface DisconnectAnalyzer {
determineDisconnectReason(runnerId: TaskRunner['id']): Promise<Error>;
}

export type DataRequestType = 'input' | 'node' | 'all';

export interface TaskResultData {
result: INodeExecutionData[];
customData?: Record<string, string>;
Expand Down
15 changes: 12 additions & 3 deletions packages/cli/src/runners/runner-ws-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,38 @@ import type WebSocket from 'ws';

import { Logger } from '@/logging/logger.service';

import { DefaultTaskRunnerDisconnectAnalyzer } from './default-task-runner-disconnect-analyzer';
import type {
RunnerMessage,
N8nMessage,
TaskRunnerServerInitRequest,
TaskRunnerServerInitResponse,
DisconnectAnalyzer,
} from './runner-types';
import { TaskBroker, type MessageCallback, type TaskRunner } from './task-broker.service';
import { TaskRunnerDisconnectAnalyzer } from './task-runner-disconnect-analyzer';

function heartbeat(this: WebSocket) {
this.isAlive = true;
}

@Service()
export class TaskRunnerService {
export class TaskRunnerWsServer {
runnerConnections: Map<TaskRunner['id'], WebSocket> = new Map();

constructor(
private readonly logger: Logger,
private readonly taskBroker: TaskBroker,
private readonly disconnectAnalyzer: TaskRunnerDisconnectAnalyzer,
private disconnectAnalyzer: DefaultTaskRunnerDisconnectAnalyzer,
) {}

setDisconnectAnalyzer(disconnectAnalyzer: DisconnectAnalyzer) {
this.disconnectAnalyzer = disconnectAnalyzer;
}

getDisconnectAnalyzer() {
return this.disconnectAnalyzer;
}

sendMessage(id: TaskRunner['id'], message: N8nMessage.ToRunner.All) {
this.runnerConnections.get(id)?.send(JSON.stringify(message));
}
Expand Down
85 changes: 85 additions & 0 deletions packages/cli/src/runners/task-runner-module.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import { TaskRunnersConfig } from '@n8n/config';
import * as a from 'node:assert/strict';
import Container, { Service } from 'typedi';

import type { TaskRunnerProcess } from '@/runners/task-runner-process';

import { TaskRunnerWsServer } from './runner-ws-server';
import type { LocalTaskManager } from './task-managers/local-task-manager';
import type { TaskRunnerServer } from './task-runner-server';

/**
* Module responsible for loading and starting task runner. Task runner can be
* run either internally (=launched by n8n as a child process) or externally
* (=launched by some other orchestrator)
*/
@Service()
export class TaskRunnerModule {
private taskRunnerHttpServer: TaskRunnerServer | undefined;

private taskRunnerWsServer: TaskRunnerWsServer | undefined;

private taskManager: LocalTaskManager | undefined;

private taskRunnerProcess: TaskRunnerProcess | undefined;

constructor(private readonly runnerConfig: TaskRunnersConfig) {}

async start() {
a.ok(!this.runnerConfig.disabled, 'Task runner is disabled');

await this.loadTaskManager();
await this.loadTaskRunnerServer();

if (
this.runnerConfig.mode === 'internal_childprocess' ||
this.runnerConfig.mode === 'internal_launcher'
) {
await this.startInternalTaskRunner();
}
}

async stop() {
if (this.taskRunnerProcess) {
await this.taskRunnerProcess.stop();
this.taskRunnerProcess = undefined;
}

if (this.taskRunnerHttpServer) {
await this.taskRunnerHttpServer.stop();
this.taskRunnerHttpServer = undefined;
}
}

private async loadTaskManager() {
const { TaskManager } = await import('@/runners/task-managers/task-manager');
const { LocalTaskManager } = await import('@/runners/task-managers/local-task-manager');
this.taskManager = new LocalTaskManager();
Container.set(TaskManager, this.taskManager);
}

private async loadTaskRunnerServer() {
// These are imported dynamically because we need to set the task manager
// instance before importing them
const { TaskRunnerServer } = await import('@/runners/task-runner-server');
this.taskRunnerHttpServer = Container.get(TaskRunnerServer);
this.taskRunnerWsServer = Container.get(TaskRunnerWsServer);

await this.taskRunnerHttpServer.start();
}

private async startInternalTaskRunner() {
a.ok(this.taskRunnerWsServer, 'Task Runner WS Server not loaded');

const { TaskRunnerProcess } = await import('@/runners/task-runner-process');
this.taskRunnerProcess = Container.get(TaskRunnerProcess);
await this.taskRunnerProcess.start();

const { InternalTaskRunnerDisconnectAnalyzer } = await import(
'@/runners/internal-task-runner-disconnect-analyzer'
);
this.taskRunnerWsServer.setDisconnectAnalyzer(
Container.get(InternalTaskRunnerDisconnectAnalyzer),
);
}
}
9 changes: 5 additions & 4 deletions packages/cli/src/runners/task-runner-process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,15 @@ export class TaskRunnerProcess extends TypedEmitter<TaskRunnerProcessEventMap> {
) {
super();

a.ok(
this.runnerConfig.mode !== 'external',
'Task Runner Process cannot be used in external mode',
);

this.logger = logger.scoped('task-runner');
}

async start() {
a.ok(
this.runnerConfig.mode === 'internal_childprocess' ||
this.runnerConfig.mode === 'internal_launcher',
);
a.ok(!this.process, 'Task Runner Process already running');

const grantToken = await this.authService.createGrantToken();
Expand Down
4 changes: 2 additions & 2 deletions packages/cli/src/runners/task-runner-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import type {
TaskRunnerServerInitRequest,
TaskRunnerServerInitResponse,
} from '@/runners/runner-types';
import { TaskRunnerService } from '@/runners/runner-ws-server';
import { TaskRunnerWsServer } from '@/runners/runner-ws-server';

/**
* Task Runner HTTP & WS server
Expand All @@ -44,7 +44,7 @@ export class TaskRunnerServer {
private readonly logger: Logger,
private readonly globalConfig: GlobalConfig,
private readonly taskRunnerAuthController: TaskRunnerAuthController,
private readonly taskRunnerService: TaskRunnerService,
private readonly taskRunnerService: TaskRunnerWsServer,
) {
this.app = express();
this.app.disable('x-powered-by');
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import { TaskRunnersConfig } from '@n8n/config';
import Container from 'typedi';

import { TaskRunnerModule } from '@/runners/task-runner-module';

import { DefaultTaskRunnerDisconnectAnalyzer } from '../../../src/runners/default-task-runner-disconnect-analyzer';
import { TaskRunnerWsServer } from '../../../src/runners/runner-ws-server';

describe('TaskRunnerModule in external mode', () => {
const runnerConfig = Container.get(TaskRunnersConfig);
runnerConfig.mode = 'external';
runnerConfig.port = 0;
const module = Container.get(TaskRunnerModule);

afterEach(async () => {
await module.stop();
});

describe('start', () => {
it('should throw if the task runner is disabled', async () => {
runnerConfig.disabled = true;

// Act
await expect(module.start()).rejects.toThrow('Task runner is disabled');
});

it('should start the task runner', async () => {
runnerConfig.disabled = false;

// Act
await module.start();
});

it('should use DefaultTaskRunnerDisconnectAnalyzer', () => {
const wsServer = Container.get(TaskRunnerWsServer);

expect(wsServer.getDisconnectAnalyzer()).toBeInstanceOf(DefaultTaskRunnerDisconnectAnalyzer);
});
});
});
Loading

0 comments on commit 9355fc3

Please sign in to comment.