Skip to content

Commit

Permalink
extract out assignPairedItems. add tests, and add docs
Browse files Browse the repository at this point in the history
  • Loading branch information
netroy committed Dec 27, 2024
1 parent 4d45409 commit 96bb42a
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 50 deletions.
122 changes: 75 additions & 47 deletions packages/core/src/WorkflowExecute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1533,53 +1533,7 @@ export class WorkflowExecute {
workflowId: workflow.id,
});

if (nodeSuccessData?.length) {
// Check if the output data contains pairedItem data and if not try
// to automatically fix it

const isSingleInputAndOutput =
executionData.data.main.length === 1 && executionData.data.main[0]?.length === 1;

const isSameNumberOfItems =
nodeSuccessData.length === 1 &&
executionData.data.main.length === 1 &&
executionData.data.main[0]?.length === nodeSuccessData[0].length;

checkOutputData: for (const outputData of nodeSuccessData) {
if (outputData === null) {
continue;
}
for (const [index, item] of outputData.entries()) {
if (item.pairedItem === undefined) {
// The pairedItem data is missing, so check if it can get automatically fixed
if (isSingleInputAndOutput) {
// The node has one input and one incoming item, so we know
// that all items must originate from that single
item.pairedItem = {
item: 0,
};
} else if (isSameNumberOfItems) {
// The number of oncoming and outcoming items is identical so we can
// make the reasonable assumption that each of the input items
// is the origin of the corresponding output items
item.pairedItem = {
item: index,
};
} else {
// In all other cases autofixing is not possible
break checkOutputData;
}
}
}
}
}

if (nodeSuccessData === undefined) {
// Node did not get executed
nodeSuccessData = null;
} else {
this.runExecutionData.resultData.lastNodeExecuted = executionData.node.name;
}
nodeSuccessData = this.assignPairedItems(nodeSuccessData, executionData);

if (nodeSuccessData === null || nodeSuccessData[0][0] === undefined) {
if (executionData.node.alwaysOutputData === true) {
Expand Down Expand Up @@ -2287,6 +2241,80 @@ export class WorkflowExecute {
nodeSuccessData[mainOutputTypes.length - 1] = errorItems;
}

/**
* Assigns pairedItem information to node output items by matching them with input items.
* PairedItem data is used to track which output items were derived from which input items.
*
* @param nodeSuccessData - The output data from a node execution
* @param executionData - The execution data containing input information
*
* @returns The node output data with pairedItem information assigned where possible
*
* @remarks
* Auto-assignment of pairedItem happens in two scenarios:
* 1. Single input/output: When node has exactly one input item and produces output(s),
* all outputs are marked as derived from that single input (item: 0)
* 2. Matching items count: When number of input and output items match exactly,
* each output item is paired with the input item at the same index
*
* In all other cases, if pairedItem is missing, it remains undefined as automatic
* assignment cannot be done reliably.
*/
assignPairedItems(
nodeSuccessData: INodeExecutionData[][] | null | undefined,
executionData: IExecuteData,
) {
if (nodeSuccessData?.length) {
// Check if the output data contains pairedItem data and if not try
// to automatically fix it

const isSingleInputAndOutput =
executionData.data.main.length === 1 && executionData.data.main[0]?.length === 1;

const isSameNumberOfItems =
nodeSuccessData.length === 1 &&
executionData.data.main.length === 1 &&
executionData.data.main[0]?.length === nodeSuccessData[0].length;

checkOutputData: for (const outputData of nodeSuccessData) {
if (outputData === null) {
continue;
}
for (const [index, item] of outputData.entries()) {
if (item.pairedItem === undefined) {
// The pairedItem data is missing, so check if it can get automatically fixed
if (isSingleInputAndOutput) {
// The node has one input and one incoming item, so we know
// that all items must originate from that single
item.pairedItem = {
item: 0,
};
} else if (isSameNumberOfItems) {
// The number of oncoming and outcoming items is identical so we can
// make the reasonable assumption that each of the input items
// is the origin of the corresponding output items
item.pairedItem = {
item: index,
};
} else {
// In all other cases autofixing is not possible
break checkOutputData;
}
}
}
}
}

if (nodeSuccessData === undefined) {
// Node did not get executed
nodeSuccessData = null;
} else {
this.runExecutionData.resultData.lastNodeExecuted = executionData.node.name;
}

return nodeSuccessData;
}

private get isCancelled() {
return this.abortController.signal.aborted;
}
Expand Down
68 changes: 65 additions & 3 deletions packages/core/test/WorkflowExecute.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ import type {
ITriggerResponse,
IWorkflowExecuteAdditionalData,
WorkflowTestData,
RelatedExecution,
} from 'n8n-workflow';
import {
ApplicationError,
createDeferredPromise,
NodeConnectionType,
NodeExecutionOutput,
NodeHelpers,
RelatedExecution,
Workflow,
} from 'n8n-workflow';

Expand Down Expand Up @@ -1180,7 +1180,9 @@ describe('WorkflowExecute', () => {

const stoppedAt = new Date('2023-01-01T00:00:10.000Z');
jest.setSystemTime(stoppedAt);
workflowExecute['status'] = 'running';
// @ts-expect-error read-only property
workflowExecute.status = 'running';

const result2 = workflowExecute.getFullRunData(startedAt);

expect(result2).toEqual({
Expand Down Expand Up @@ -1240,13 +1242,16 @@ describe('WorkflowExecute', () => {
const testError = new Error('Test error') as ExecutionBaseError;

// Reset the status since it was changed by previous tests
workflowExecute['status'] = 'new';
// @ts-expect-error read-only property
workflowExecute.status = 'new';
runExecutionData.waitTill = undefined;

const errorResult = await workflowExecute.processSuccessExecution(
startedAt,
workflow,
testError,
);

expect(errorResult.data.resultData.error).toBeDefined();
expect(errorResult.data.resultData.error?.message).toBe('Test error');

Expand Down Expand Up @@ -1295,4 +1300,61 @@ describe('WorkflowExecute', () => {
expect(cleanupCalled).toBe(true);
});
});

describe('assignPairedItems', () => {
let workflowExecute: WorkflowExecute;

beforeEach(() => {
workflowExecute = new WorkflowExecute(mock(), 'manual');
});

test('should handle undefined node output', () => {
const result = workflowExecute.assignPairedItems(
undefined,
mock<IExecuteData>({ data: { main: [] } }),
);
expect(result).toBeNull();
});

test('should auto-fix pairedItem for single input/output scenario', () => {
const nodeOutput = [[{ json: { test: true } }]];
const executionData = mock<IExecuteData>({ data: { main: [[{ json: { input: true } }]] } });

const result = workflowExecute.assignPairedItems(nodeOutput, executionData);

expect(result?.[0][0].pairedItem).toEqual({ item: 0 });
});

test('should auto-fix pairedItem when number of items match', () => {
const nodeOutput = [[{ json: { test: 1 } }, { json: { test: 2 } }]];
const executionData = mock<IExecuteData>({
data: { main: [[{ json: { input: 1 } }, { json: { input: 2 } }]] },
});

const result = workflowExecute.assignPairedItems(nodeOutput, executionData);

expect(result?.[0][0].pairedItem).toEqual({ item: 0 });
expect(result?.[0][1].pairedItem).toEqual({ item: 1 });
});

test('should not modify existing pairedItem data', () => {
const existingPairedItem = { item: 5, input: 2 };
const nodeOutput = [[{ json: { test: true }, pairedItem: existingPairedItem }]];
const executionData = mock<IExecuteData>({ data: { main: [[{ json: { input: true } }]] } });

const result = workflowExecute.assignPairedItems(nodeOutput, executionData);

expect(result?.[0][0].pairedItem).toEqual(existingPairedItem);
});

test('should process multiple output branches correctly', () => {
const nodeOutput = [[{ json: { test: 1 } }], [{ json: { test: 2 } }]];
const executionData = mock<IExecuteData>({ data: { main: [[{ json: { input: true } }]] } });

const result = workflowExecute.assignPairedItems(nodeOutput, executionData);

expect(result?.[0][0].pairedItem).toEqual({ item: 0 });
expect(result?.[1][0].pairedItem).toEqual({ item: 0 });
});
});
});

0 comments on commit 96bb42a

Please sign in to comment.