Skip to content

Commit 357c9c7

Browse files
guillaumejacquartTianYi0217
authored andcommitted
fix(core): Start insights collection timer for webhook instances (n8n-io#15964)
1 parent db40698 commit 357c9c7

File tree

5 files changed

+106
-134
lines changed

5 files changed

+106
-134
lines changed

packages/cli/src/modules/insights/__tests__/insights.module.test.ts

Lines changed: 0 additions & 34 deletions
This file was deleted.

packages/cli/src/modules/insights/__tests__/insights.service.test.ts

Lines changed: 83 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@ import type { WorkflowEntity } from '@n8n/db';
55
import type { IWorkflowDb } from '@n8n/db';
66
import type { WorkflowExecuteAfterContext } from '@n8n/decorators';
77
import { Container } from '@n8n/di';
8+
import type { MockProxy } from 'jest-mock-extended';
89
import { mock } from 'jest-mock-extended';
910
import { DateTime } from 'luxon';
11+
import type { InstanceSettings } from 'n8n-core';
1012
import type { IRun } from 'n8n-workflow';
1113

1214
import { mockLogger } from '@test/mocking';
@@ -47,6 +49,84 @@ afterAll(async () => {
4749
await testDb.terminate();
4850
});
4951

52+
describe('startTimers', () => {
53+
let insightsService: InsightsService;
54+
let compactionService: InsightsCompactionService;
55+
let collectionService: InsightsCollectionService;
56+
let pruningService: InsightsPruningService;
57+
let instanceSettings: MockProxy<InstanceSettings>;
58+
59+
beforeEach(() => {
60+
compactionService = mock<InsightsCompactionService>();
61+
collectionService = mock<InsightsCollectionService>();
62+
pruningService = mock<InsightsPruningService>();
63+
instanceSettings = mock<InstanceSettings>({
64+
instanceType: 'main',
65+
});
66+
insightsService = new InsightsService(
67+
mock<InsightsByPeriodRepository>(),
68+
compactionService,
69+
collectionService,
70+
pruningService,
71+
mock<LicenseState>(),
72+
instanceSettings,
73+
mockLogger(),
74+
);
75+
76+
jest.clearAllMocks();
77+
});
78+
79+
const setupMocks = (
80+
instanceType: string,
81+
isLeader: boolean = false,
82+
isPruningEnabled: boolean = false,
83+
) => {
84+
(instanceSettings as any).instanceType = instanceType;
85+
Object.defineProperty(instanceSettings, 'isLeader', {
86+
get: jest.fn(() => isLeader),
87+
});
88+
Object.defineProperty(pruningService, 'isPruningEnabled', {
89+
get: jest.fn(() => isPruningEnabled),
90+
});
91+
};
92+
93+
test('starts flushing timer for main instance', () => {
94+
setupMocks('main', false, false);
95+
insightsService.startTimers();
96+
97+
expect(collectionService.startFlushingTimer).toHaveBeenCalled();
98+
expect(compactionService.startCompactionTimer).not.toHaveBeenCalled();
99+
expect(pruningService.startPruningTimer).not.toHaveBeenCalled();
100+
});
101+
102+
test('starts compaction and flushing timers for main leader instances', () => {
103+
setupMocks('main', true, false);
104+
insightsService.startTimers();
105+
106+
expect(collectionService.startFlushingTimer).toHaveBeenCalled();
107+
expect(compactionService.startCompactionTimer).toHaveBeenCalled();
108+
expect(pruningService.startPruningTimer).not.toHaveBeenCalled();
109+
});
110+
111+
test('starts compaction, flushing and pruning timers for main leader instance with pruning enabled', () => {
112+
setupMocks('main', true, true);
113+
insightsService.startTimers();
114+
115+
expect(collectionService.startFlushingTimer).toHaveBeenCalled();
116+
expect(compactionService.startCompactionTimer).toHaveBeenCalled();
117+
expect(pruningService.startPruningTimer).toHaveBeenCalled();
118+
});
119+
120+
test('starts only collection flushing timer for webhook instance', () => {
121+
setupMocks('webhook', false, false);
122+
insightsService.startTimers();
123+
124+
expect(collectionService.startFlushingTimer).toHaveBeenCalled();
125+
expect(compactionService.startCompactionTimer).not.toHaveBeenCalled();
126+
expect(pruningService.startPruningTimer).not.toHaveBeenCalled();
127+
});
128+
});
129+
50130
describe('getInsightsSummary', () => {
51131
let insightsService: InsightsService;
52132
beforeAll(async () => {
@@ -512,6 +592,7 @@ describe('getAvailableDateRanges', () => {
512592
mock<InsightsCollectionService>(),
513593
mock<InsightsPruningService>(),
514594
licenseMock,
595+
mock<InstanceSettings>(),
515596
mockLogger(),
516597
);
517598
});
@@ -614,6 +695,7 @@ describe('getMaxAgeInDaysAndGranularity', () => {
614695
mock<InsightsCollectionService>(),
615696
mock<InsightsPruningService>(),
616697
licenseMock,
698+
mock<InstanceSettings>(),
617699
mockLogger(),
618700
);
619701
});
@@ -702,6 +784,7 @@ describe('shutdown', () => {
702784
mockCollectionService,
703785
mockPruningService,
704786
mock<LicenseState>(),
787+
mock<InstanceSettings>(),
705788
mockLogger(),
706789
);
707790
});
@@ -717,74 +800,6 @@ describe('shutdown', () => {
717800
});
718801
});
719802

720-
describe('timers', () => {
721-
let insightsService: InsightsService;
722-
723-
const mockCollectionService = mock<InsightsCollectionService>({
724-
startFlushingTimer: jest.fn(),
725-
stopFlushingTimer: jest.fn(),
726-
});
727-
728-
const mockCompactionService = mock<InsightsCompactionService>({
729-
startCompactionTimer: jest.fn(),
730-
stopCompactionTimer: jest.fn(),
731-
});
732-
733-
const mockPruningService = mock<InsightsPruningService>({
734-
startPruningTimer: jest.fn(),
735-
stopPruningTimer: jest.fn(),
736-
isPruningEnabled: false,
737-
});
738-
739-
const mockedLogger = mockLogger();
740-
const mockedConfig = mock<InsightsConfig>({
741-
maxAgeDays: -1,
742-
});
743-
744-
beforeAll(() => {
745-
insightsService = new InsightsService(
746-
mock<InsightsByPeriodRepository>(),
747-
mockCompactionService,
748-
mockCollectionService,
749-
mockPruningService,
750-
mock<LicenseState>(),
751-
mockedLogger,
752-
);
753-
});
754-
755-
test('startTimers starts timers except pruning', () => {
756-
// ACT
757-
insightsService.startTimers();
758-
759-
// ASSERT
760-
expect(mockCompactionService.startCompactionTimer).toHaveBeenCalled();
761-
expect(mockCollectionService.startFlushingTimer).toHaveBeenCalled();
762-
expect(mockPruningService.startPruningTimer).not.toHaveBeenCalled();
763-
});
764-
765-
test('startTimers starts pruning timer', () => {
766-
// ARRANGE
767-
mockedConfig.maxAgeDays = 30;
768-
Object.defineProperty(mockPruningService, 'isPruningEnabled', { value: true });
769-
770-
// ACT
771-
insightsService.startTimers();
772-
773-
// ASSERT
774-
expect(mockPruningService.startPruningTimer).toHaveBeenCalled();
775-
});
776-
777-
test('stopTimers stops timers', () => {
778-
// ACT
779-
insightsService.stopTimers();
780-
781-
// ASSERT
782-
expect(mockCompactionService.stopCompactionTimer).toHaveBeenCalled();
783-
expect(mockCollectionService.stopFlushingTimer).toHaveBeenCalled();
784-
expect(mockPruningService.stopPruningTimer).toHaveBeenCalled();
785-
});
786-
});
787-
788803
describe('legacy sqlite (without pooling) handles concurrent insights db process without throwing', () => {
789804
let initialFlushBatchSize: number;
790805
let insightsConfig: InsightsConfig;

packages/cli/src/modules/insights/insights-collection.service.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,6 @@ export class InsightsCollectionService {
7676
startFlushingTimer() {
7777
this.isAsynchronouslySavingInsights = true;
7878
this.scheduleFlushing();
79-
this.logger.debug('Started flushing timer');
8079
}
8180

8281
scheduleFlushing() {
Lines changed: 2 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,15 @@
1-
import { Logger } from '@n8n/backend-common';
21
import type { BaseN8nModule } from '@n8n/decorators';
3-
import { N8nModule, OnLeaderStepdown, OnLeaderTakeover } from '@n8n/decorators';
4-
import { InstanceSettings } from 'n8n-core';
2+
import { N8nModule } from '@n8n/decorators';
53

64
import { InsightsService } from './insights.service';
75

86
import './insights.controller';
97

108
@N8nModule()
119
export class InsightsModule implements BaseN8nModule {
12-
constructor(
13-
private readonly logger: Logger,
14-
private readonly insightsService: InsightsService,
15-
private readonly instanceSettings: InstanceSettings,
16-
) {
17-
this.logger = this.logger.scoped('insights');
18-
}
10+
constructor(private readonly insightsService: InsightsService) {}
1911

2012
initialize() {
21-
// We want to initialize the insights background process (schedulers) for the main leader instance
22-
// to have only one main instance saving the insights data
23-
if (this.instanceSettings.isLeader) {
24-
this.insightsService.startTimers();
25-
}
26-
}
27-
28-
@OnLeaderTakeover()
29-
startBackgroundProcess() {
3013
this.insightsService.startTimers();
3114
}
32-
33-
@OnLeaderStepdown()
34-
stopBackgroundProcess() {
35-
this.insightsService.stopTimers();
36-
}
3715
}

packages/cli/src/modules/insights/insights.service.ts

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,9 @@ import {
44
INSIGHTS_DATE_RANGE_KEYS,
55
} from '@n8n/api-types';
66
import { LicenseState, Logger } from '@n8n/backend-common';
7-
import { OnShutdown } from '@n8n/decorators';
7+
import { OnLeaderStepdown, OnLeaderTakeover, OnShutdown } from '@n8n/decorators';
88
import { Service } from '@n8n/di';
9+
import { InstanceSettings } from 'n8n-core';
910
import { UserError } from 'n8n-workflow';
1011

1112
import type { PeriodUnit, TypeUnit } from './database/entities/insights-shared';
@@ -33,31 +34,44 @@ export class InsightsService {
3334
private readonly collectionService: InsightsCollectionService,
3435
private readonly pruningService: InsightsPruningService,
3536
private readonly licenseState: LicenseState,
37+
private readonly instanceSettings: InstanceSettings,
3638
private readonly logger: Logger,
3739
) {
3840
this.logger = this.logger.scoped('insights');
3941
}
4042

4143
startTimers() {
42-
this.compactionService.startCompactionTimer();
4344
this.collectionService.startFlushingTimer();
45+
this.logger.debug('Started flushing timer');
46+
47+
// Start compaction and pruning timers for main leader instance only
48+
if (this.instanceSettings.isLeader) {
49+
this.startCompactionAndPruningTimers();
50+
}
51+
}
52+
53+
@OnLeaderTakeover()
54+
startCompactionAndPruningTimers() {
55+
this.compactionService.startCompactionTimer();
56+
this.logger.debug('Started compaction timer');
4457
if (this.pruningService.isPruningEnabled) {
4558
this.pruningService.startPruningTimer();
59+
this.logger.debug('Started pruning timer');
4660
}
47-
this.logger.debug('Started compaction, flushing and pruning schedulers');
4861
}
4962

50-
stopTimers() {
63+
@OnLeaderStepdown()
64+
stopCompactionAndPruningTimers() {
5165
this.compactionService.stopCompactionTimer();
52-
this.collectionService.stopFlushingTimer();
66+
this.logger.debug('Stopped compaction timer');
5367
this.pruningService.stopPruningTimer();
54-
this.logger.debug('Stopped compaction, flushing and pruning schedulers');
68+
this.logger.debug('Stopped pruning timer');
5569
}
5670

5771
@OnShutdown()
5872
async shutdown() {
5973
await this.collectionService.shutdown();
60-
this.stopTimers();
74+
this.stopCompactionAndPruningTimers();
6175
}
6276

6377
async getInsightsSummary({

0 commit comments

Comments
 (0)