Skip to content

Commit

Permalink
feat: support events and std streams from an abort handler
Browse files Browse the repository at this point in the history
  • Loading branch information
joshLong145 authored Dec 12, 2024
1 parent 6bf949e commit ea38666
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 11 deletions.
25 changes: 17 additions & 8 deletions src/WorkerHandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -220,13 +220,11 @@ function objectToError (obj) {

function handleEmittedStdPayload(handler, payload) {
// TODO: refactor if parallel task execution gets added
if (Object.keys(handler.processing).length !== 1) {
return;
}
var task = Object.values(handler.processing)[0]
if (task.options && typeof task.options.on === 'function') {
task.options.on(payload);
}
Object.values(handler.processing)
.forEach(task => task?.options?.on(payload));

Object.values(handler.tracking)
.forEach(task => task?.options?.on(payload));
}

/**
Expand Down Expand Up @@ -299,6 +297,16 @@ function WorkerHandler(script, _options) {
task.resolver.resolve(response.result);
}
}
} else {
// if the task is not the current, it might be tracked for cleanup
var task = me.tracking[id];
if (task !== undefined) {
if (response.isEvent) {
if (task.options && typeof task.options.on === 'function') {
task.options.on(response.payload);
}
}
}
}

if (response.method === CLEANUP_METHOD_ID) {
Expand Down Expand Up @@ -422,7 +430,8 @@ WorkerHandler.prototype.exec = function(method, params, resolver, options) {
if (error instanceof Promise.CancellationError || error instanceof Promise.TimeoutError) {
me.tracking[id] = {
id,
resolver: Promise.defer()
resolver: Promise.defer(),
options: options,
};

// remove this task from the queue. It is already rejected (hence this
Expand Down
36 changes: 35 additions & 1 deletion test/Pool.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1356,7 +1356,7 @@ describe('Pool', function () {
maxWorkers: 1,
onCreateWorker: () => {
workerCount += 1;
}
},
});

let task = pool.exec('asyncTimeout', [], {});
Expand Down Expand Up @@ -1561,6 +1561,40 @@ describe('Pool', function () {
});
});
});

it('should trigger event stdout in abort handler', function (done) {
var pool = createPool(__dirname + '/workers/cleanup-abort.js', {
maxWorkers: 1,
workerType: 'process',
emitStdStreams: true,
workerTerminateTimeout: 1000,
});

pool.exec('stdoutStreamOnAbort', [], {
on: function (payload) {
assert.strictEqual(payload.stdout.trim(), "Hello, world!");
pool.terminate();
done();
}
}).timeout(50);
});

it('should trigger event in abort handler', function (done) {
var pool = createPool(__dirname + '/workers/cleanup-abort.js', {
maxWorkers: 1,
workerType: 'process',
emitStdStreams: true,
workerTerminateTimeout: 1000,
});

pool.exec('eventEmitOnAbort', [], {
on: function (payload) {
assert.strictEqual(payload.status, 'cleanup_success');
pool.terminate();
done();
}
}).timeout(50);
});
});

describe('validate', () => {
Expand Down
28 changes: 26 additions & 2 deletions test/workers/cleanup-abort.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ function asyncTimeout() {
return new Promise(function (resolve) {
let timeout = setTimeout(() => {
resolve();
}, 5000);

}, 5000);
me.worker.addAbortListener(async function () {
clearTimeout(timeout);
resolve();
Expand Down Expand Up @@ -34,11 +33,36 @@ function asyncAbortHandlerNeverResolves() {
});
}

function stdoutStreamOnAbort() {
var me = this;
return new Promise(function (resolve) {
me.worker.addAbortListener(async function () {
console.log("Hello, world!");
resolve();
});
});
}

function eventEmitOnAbort() {
var me = this;
return new Promise(function (resolve) {
me.worker.addAbortListener(async function () {
workerpool.workerEmit({
status: 'cleanup_success',
});
resolve();
});
});
}


// create a worker and register public functions
workerpool.worker(
{
asyncTimeout: asyncTimeout,
asyncAbortHandlerNeverResolves: asyncAbortHandlerNeverResolves,
stdoutStreamOnAbort: stdoutStreamOnAbort,
eventEmitOnAbort: eventEmitOnAbort,
},
{
abortListenerTimeout: 1000
Expand Down

0 comments on commit ea38666

Please sign in to comment.