Skip to content

Commit db91903

Browse files
authored
Merge pull request #1 from robversluis/feat/workflow-execution-priority
feat(queue): add workflow execution priority (high/medium/low)
2 parents 2b99258 + 6717534 commit db91903

File tree

6 files changed

+162
-1
lines changed

6 files changed

+162
-1
lines changed

packages/cli/src/__tests__/workflow-runner.test.ts

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,100 @@ describe('enqueueExecution', () => {
299299

300300
expect(setupQueue).toHaveBeenCalledTimes(1);
301301
});
302+
303+
it('should map queuePriority high -> priority 1', async () => {
304+
const activeExecutions = Container.get(ActiveExecutions);
305+
jest.spyOn(activeExecutions, 'attachWorkflowExecution').mockReturnValue();
306+
jest.spyOn(runner, 'processError').mockResolvedValue();
307+
const data = mock<IWorkflowExecutionDataProcess>({
308+
workflowData: { nodes: [], settings: { queuePriority: 'high' } },
309+
executionData: undefined,
310+
});
311+
const error = new Error('stop for test purposes');
312+
addJob.mockRejectedValueOnce(error);
313+
// @ts-expect-error Private method
314+
await expect(runner.enqueueExecution('1', 'workflow-xyz', data)).rejects.toThrowError(error);
315+
expect(addJob).toHaveBeenCalledWith(
316+
expect.any(Object),
317+
expect.objectContaining({ priority: 1 }),
318+
);
319+
});
320+
321+
it('should map queuePriority medium -> priority 50', async () => {
322+
const activeExecutions = Container.get(ActiveExecutions);
323+
jest.spyOn(activeExecutions, 'attachWorkflowExecution').mockReturnValue();
324+
jest.spyOn(runner, 'processError').mockResolvedValue();
325+
const data = mock<IWorkflowExecutionDataProcess>({
326+
workflowData: { nodes: [], settings: { queuePriority: 'medium' } },
327+
executionData: undefined,
328+
});
329+
const error = new Error('stop for test purposes');
330+
addJob.mockRejectedValueOnce(error);
331+
// @ts-expect-error Private method
332+
await expect(runner.enqueueExecution('1', 'workflow-xyz', data)).rejects.toThrowError(error);
333+
expect(addJob).toHaveBeenCalledWith(
334+
expect.any(Object),
335+
expect.objectContaining({ priority: 50 }),
336+
);
337+
});
338+
339+
it('should map queuePriority low -> priority 100', async () => {
340+
const activeExecutions = Container.get(ActiveExecutions);
341+
jest.spyOn(activeExecutions, 'attachWorkflowExecution').mockReturnValue();
342+
jest.spyOn(runner, 'processError').mockResolvedValue();
343+
const data = mock<IWorkflowExecutionDataProcess>({
344+
workflowData: { nodes: [], settings: { queuePriority: 'low' } },
345+
executionData: undefined,
346+
});
347+
const error = new Error('stop for test purposes');
348+
addJob.mockRejectedValueOnce(error);
349+
// @ts-expect-error Private method
350+
await expect(runner.enqueueExecution('1', 'workflow-xyz', data)).rejects.toThrowError(error);
351+
expect(addJob).toHaveBeenCalledWith(
352+
expect.any(Object),
353+
expect.objectContaining({ priority: 100 }),
354+
);
355+
});
356+
357+
it('should fallback to realtime true -> priority 50 when queuePriority is unset', async () => {
358+
const activeExecutions = Container.get(ActiveExecutions);
359+
jest.spyOn(activeExecutions, 'attachWorkflowExecution').mockReturnValue();
360+
jest.spyOn(runner, 'processError').mockResolvedValue();
361+
const data = mock<IWorkflowExecutionDataProcess>({
362+
workflowData: { nodes: [] },
363+
executionData: undefined,
364+
});
365+
const error = new Error('stop for test purposes');
366+
addJob.mockRejectedValueOnce(error);
367+
// @ts-expect-error Private method
368+
await expect(
369+
runner.enqueueExecution('1', 'workflow-xyz', data, undefined, true),
370+
).rejects.toThrowError(error);
371+
expect(addJob).toHaveBeenCalledWith(
372+
expect.any(Object),
373+
expect.objectContaining({ priority: 50 }),
374+
);
375+
});
376+
377+
it('should fallback to realtime false -> priority 100 when queuePriority is unset', async () => {
378+
const activeExecutions = Container.get(ActiveExecutions);
379+
jest.spyOn(activeExecutions, 'attachWorkflowExecution').mockReturnValue();
380+
jest.spyOn(runner, 'processError').mockResolvedValue();
381+
const data = mock<IWorkflowExecutionDataProcess>({
382+
workflowData: { nodes: [] },
383+
executionData: undefined,
384+
});
385+
const error = new Error('stop for test purposes');
386+
addJob.mockRejectedValueOnce(error);
387+
// @ts-expect-error Private method
388+
await expect(
389+
runner.enqueueExecution('1', 'workflow-xyz', data, undefined, false),
390+
).rejects.toThrowError(error);
391+
expect(addJob).toHaveBeenCalledWith(
392+
expect.any(Object),
393+
expect.objectContaining({ priority: 100 }),
394+
);
395+
});
302396
});
303397

304398
describe('workflow timeout with startedAt', () => {

packages/cli/src/workflow-runner.ts

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -381,7 +381,20 @@ export class WorkflowRunner {
381381
let job: Job;
382382
let lifecycleHooks: ExecutionLifecycleHooks;
383383
try {
384-
job = await this.scalingService.addJob(jobData, { priority: realtime ? 50 : 100 });
384+
// Determine numeric Bull priority. Lower numbers are higher priority.
385+
const qp = data.workflowData?.settings?.queuePriority as
386+
| 'low'
387+
| 'medium'
388+
| 'high'
389+
| undefined;
390+
const priorityMap: Record<'low' | 'medium' | 'high', number> = {
391+
high: 1,
392+
medium: 50,
393+
low: 100,
394+
};
395+
const computedPriority = qp ? priorityMap[qp] : realtime ? 50 : 100;
396+
397+
job = await this.scalingService.addJob(jobData, { priority: computedPriority });
385398

386399
lifecycleHooks = getLifecycleHooksForScalingMain(data, executionId);
387400

packages/frontend/@n8n/i18n/src/locales/en.json

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2565,6 +2565,11 @@
25652565
"workflowSettings.defaultTimezoneNotValid": "Default Timezone not valid",
25662566
"workflowSettings.errorWorkflow": "Error Workflow (to notify when this one errors)",
25672567
"workflowSettings.executionOrder": "Execution Order",
2568+
"workflowSettings.executionPriority": "Execution priority",
2569+
"workflowSettings.executionPriorityPlaceholder": "Select priority",
2570+
"workflowSettings.executionPriority.options.low": "Low",
2571+
"workflowSettings.executionPriority.options.medium": "Medium",
2572+
"workflowSettings.executionPriority.options.high": "High",
25682573
"workflowSettings.helpTexts.errorWorkflow": "A second workflow to run if the current one fails.<br />The second workflow should have an 'Error Trigger' node.",
25692574
"workflowSettings.helpTexts.executionTimeout": "How long the workflow should wait before timing out",
25702575
"workflowSettings.helpTexts.executionTimeoutToggle": "Whether to cancel workflow execution after a defined time",

packages/frontend/editor-ui/src/Interface.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -568,6 +568,8 @@ export interface IWorkflowSettings extends IWorkflowSettingsWorkflow {
568568
callerIds?: string;
569569
callerPolicy?: WorkflowSettings.CallerPolicy;
570570
executionOrder: NonNullable<IWorkflowSettingsWorkflow['executionOrder']>;
571+
/** Optional queue execution priority for queued workflow runs */
572+
queuePriority?: 'low' | 'medium' | 'high';
571573
}
572574

573575
export interface ITimeoutHMS {

packages/frontend/editor-ui/src/components/WorkflowSettings.vue

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,20 @@ const executionOrderOptions = ref<Array<{ key: string; value: string }>>([
4848
{ key: 'v0', value: 'v0 (legacy)' },
4949
{ key: 'v1', value: 'v1 (recommended)' },
5050
]);
51+
const executionPriorityOptions = ref<Array<{ key: 'low' | 'medium' | 'high'; value: string }>>([
52+
{
53+
key: 'low',
54+
value: i18n.baseText('workflowSettings.executionPriority.options.low'),
55+
},
56+
{
57+
key: 'medium',
58+
value: i18n.baseText('workflowSettings.executionPriority.options.medium'),
59+
},
60+
{
61+
key: 'high',
62+
value: i18n.baseText('workflowSettings.executionPriority.options.high'),
63+
},
64+
]);
5165
const timezones = ref<Array<{ key: string; value: string }>>([]);
5266
const workflowSettings = ref<IWorkflowSettings>({} as IWorkflowSettings);
5367
const workflows = ref<IWorkflowShortResponse[]>([]);
@@ -464,6 +478,9 @@ onMounted(async () => {
464478
if (workflowSettingsData.executionOrder === undefined) {
465479
workflowSettingsData.executionOrder = 'v0';
466480
}
481+
if (workflowSettingsData.queuePriority === undefined) {
482+
workflowSettingsData.queuePriority = 'medium';
483+
}
467484
468485
workflowSettings.value = workflowSettingsData;
469486
timeoutHMS.value = convertToHMS(workflowSettingsData.executionTimeout);
@@ -522,6 +539,31 @@ onBeforeUnmount(() => {
522539
</el-col>
523540
</el-row>
524541

542+
<el-row>
543+
<el-col :span="10" class="setting-name">
544+
{{ i18n.baseText('workflowSettings.executionPriority') }}
545+
</el-col>
546+
<el-col :span="14" class="ignore-key-press-canvas">
547+
<N8nSelect
548+
v-model="workflowSettings.queuePriority"
549+
:placeholder="i18n.baseText('workflowSettings.executionPriorityPlaceholder')"
550+
size="medium"
551+
filterable
552+
:disabled="readOnlyEnv || !workflowPermissions.update"
553+
:limit-popper-width="true"
554+
data-test-id="workflow-settings-execution-priority"
555+
>
556+
<N8nOption
557+
v-for="option in executionPriorityOptions"
558+
:key="option.key"
559+
:label="option.value"
560+
:value="option.key"
561+
>
562+
</N8nOption>
563+
</N8nSelect>
564+
</el-col>
565+
</el-row>
566+
525567
<el-row data-test-id="error-workflow">
526568
<el-col :span="10" class="setting-name">
527569
{{ i18n.baseText('workflowSettings.errorWorkflow') }}

packages/workflow/src/interfaces.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2579,6 +2579,11 @@ export interface IWorkflowSettings {
25792579
executionTimeout?: number;
25802580
executionOrder?: 'v0' | 'v1';
25812581
timeSavedPerExecution?: number;
2582+
/**
2583+
* Optional queue execution priority. Controls the relative priority of queued executions.
2584+
* Defaults to 'medium' when unspecified.
2585+
*/
2586+
queuePriority?: 'low' | 'medium' | 'high';
25822587
}
25832588

25842589
export interface WorkflowFEMeta {

0 commit comments

Comments
 (0)