Skip to content

Commit d1b4786

Browse files
move leader logic to insights service. refactor tests
1 parent 4e18bdb commit d1b4786

File tree

5 files changed

+67
-152
lines changed

5 files changed

+67
-152
lines changed

packages/cli/src/modules/insights/__tests__/insights.module.test.ts

Lines changed: 0 additions & 44 deletions
This file was deleted.

packages/cli/src/modules/insights/__tests__/insights.service.test.ts

Lines changed: 49 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@ import type { WorkflowEntity } from '@n8n/db';
55
import type { IWorkflowDb } from '@n8n/db';
66
import type { WorkflowExecuteAfterContext } from '@n8n/decorators';
77
import { Container } from '@n8n/di';
8+
import type { MockProxy } from 'jest-mock-extended';
89
import { mock } from 'jest-mock-extended';
910
import { DateTime } from 'luxon';
11+
import type { InstanceSettings, InstanceType } from 'n8n-core';
1012
import type { IRun } from 'n8n-workflow';
1113

1214
import { mockLogger } from '@test/mocking';
@@ -52,17 +54,30 @@ describe('startTimers', () => {
5254
let compactionService: InsightsCompactionService;
5355
let collectionService: InsightsCollectionService;
5456
let pruningService: InsightsPruningService;
57+
let instanceSettings: MockProxy<InstanceSettings>;
58+
let isLeader = jest.fn(() => true);
59+
let isPruningEnabled = jest.fn(() => false);
5560

5661
beforeAll(() => {
5762
compactionService = mock<InsightsCompactionService>();
5863
collectionService = mock<InsightsCollectionService>();
59-
pruningService = mock<InsightsPruningService>({ isPruningEnabled: true });
64+
pruningService = mock<InsightsPruningService>();
65+
Object.defineProperty(pruningService, 'isPruningEnabled', {
66+
get: isPruningEnabled,
67+
});
68+
instanceSettings = mock<InstanceSettings>({
69+
instanceType: 'main',
70+
});
71+
Object.defineProperty(instanceSettings, 'isLeader', {
72+
get: isLeader,
73+
});
6074
insightsService = new InsightsService(
6175
mock<InsightsByPeriodRepository>(),
6276
compactionService,
6377
collectionService,
6478
pruningService,
6579
mock<LicenseState>(),
80+
instanceSettings,
6681
mockLogger(),
6782
);
6883
});
@@ -71,21 +86,48 @@ describe('startTimers', () => {
7186
jest.clearAllMocks();
7287
});
7388

74-
test('starts compaction, flushing and pruning timers', () => {
89+
test('starts compaction, flushing timers for main leader instance', () => {
90+
isLeader.mockReturnValueOnce(true);
91+
isPruningEnabled.mockReturnValueOnce(false);
92+
insightsService.startTimers();
93+
94+
expect(collectionService.startFlushingTimer).toHaveBeenCalled();
95+
expect(compactionService.startCompactionTimer).toHaveBeenCalled();
96+
expect(pruningService.startPruningTimer).not.toHaveBeenCalled();
97+
});
98+
99+
test('starts compaction, flushing and pruning timers for main leader instance with pruning enabled', () => {
100+
isLeader.mockReturnValueOnce(true);
101+
isPruningEnabled.mockReturnValueOnce(true);
75102
insightsService.startTimers();
76103

77104
expect(collectionService.startFlushingTimer).toHaveBeenCalled();
78105
expect(compactionService.startCompactionTimer).toHaveBeenCalled();
79106
expect(pruningService.startPruningTimer).toHaveBeenCalled();
80107
});
81108

82-
test('starts only collection flushing timer when onlyCollection is true', () => {
83-
insightsService.startTimers({ onlyCollection: true });
109+
test('starts only collection flushing timer for webhook instance', () => {
110+
(instanceSettings as any).instanceType = 'webhook';
111+
insightsService.startTimers();
84112

85113
expect(collectionService.startFlushingTimer).toHaveBeenCalled();
86114
expect(compactionService.startCompactionTimer).not.toHaveBeenCalled();
87115
expect(pruningService.startPruningTimer).not.toHaveBeenCalled();
88116
});
117+
118+
test.each<InstanceType>(['worker', 'main'])(
119+
'do not start any timers for instance of type %s when not leader',
120+
(instanceType: InstanceType) => {
121+
(instanceSettings as any).instanceType = instanceType;
122+
isLeader.mockReturnValue(false);
123+
124+
insightsService.startTimers();
125+
126+
expect(collectionService.startFlushingTimer).not.toHaveBeenCalled();
127+
expect(compactionService.startCompactionTimer).not.toHaveBeenCalled();
128+
expect(pruningService.startPruningTimer).not.toHaveBeenCalled();
129+
},
130+
);
89131
});
90132

91133
describe('getInsightsSummary', () => {
@@ -553,6 +595,7 @@ describe('getAvailableDateRanges', () => {
553595
mock<InsightsCollectionService>(),
554596
mock<InsightsPruningService>(),
555597
licenseMock,
598+
mock<InstanceSettings>(),
556599
mockLogger(),
557600
);
558601
});
@@ -655,6 +698,7 @@ describe('getMaxAgeInDaysAndGranularity', () => {
655698
mock<InsightsCollectionService>(),
656699
mock<InsightsPruningService>(),
657700
licenseMock,
701+
mock<InstanceSettings>(),
658702
mockLogger(),
659703
);
660704
});
@@ -743,6 +787,7 @@ describe('shutdown', () => {
743787
mockCollectionService,
744788
mockPruningService,
745789
mock<LicenseState>(),
790+
mock<InstanceSettings>(),
746791
mockLogger(),
747792
);
748793
});
@@ -758,74 +803,6 @@ describe('shutdown', () => {
758803
});
759804
});
760805

761-
describe('timers', () => {
762-
let insightsService: InsightsService;
763-
764-
const mockCollectionService = mock<InsightsCollectionService>({
765-
startFlushingTimer: jest.fn(),
766-
stopFlushingTimer: jest.fn(),
767-
});
768-
769-
const mockCompactionService = mock<InsightsCompactionService>({
770-
startCompactionTimer: jest.fn(),
771-
stopCompactionTimer: jest.fn(),
772-
});
773-
774-
const mockPruningService = mock<InsightsPruningService>({
775-
startPruningTimer: jest.fn(),
776-
stopPruningTimer: jest.fn(),
777-
isPruningEnabled: false,
778-
});
779-
780-
const mockedLogger = mockLogger();
781-
const mockedConfig = mock<InsightsConfig>({
782-
maxAgeDays: -1,
783-
});
784-
785-
beforeAll(() => {
786-
insightsService = new InsightsService(
787-
mock<InsightsByPeriodRepository>(),
788-
mockCompactionService,
789-
mockCollectionService,
790-
mockPruningService,
791-
mock<LicenseState>(),
792-
mockedLogger,
793-
);
794-
});
795-
796-
test('startTimers starts timers except pruning', () => {
797-
// ACT
798-
insightsService.startTimers();
799-
800-
// ASSERT
801-
expect(mockCompactionService.startCompactionTimer).toHaveBeenCalled();
802-
expect(mockCollectionService.startFlushingTimer).toHaveBeenCalled();
803-
expect(mockPruningService.startPruningTimer).not.toHaveBeenCalled();
804-
});
805-
806-
test('startTimers starts pruning timer', () => {
807-
// ARRANGE
808-
mockedConfig.maxAgeDays = 30;
809-
Object.defineProperty(mockPruningService, 'isPruningEnabled', { value: true });
810-
811-
// ACT
812-
insightsService.startTimers();
813-
814-
// ASSERT
815-
expect(mockPruningService.startPruningTimer).toHaveBeenCalled();
816-
});
817-
818-
test('stopTimers stops timers', () => {
819-
// ACT
820-
insightsService.stopTimers();
821-
822-
// ASSERT
823-
expect(mockCompactionService.stopCompactionTimer).toHaveBeenCalled();
824-
expect(mockCollectionService.stopFlushingTimer).toHaveBeenCalled();
825-
expect(mockPruningService.stopPruningTimer).toHaveBeenCalled();
826-
});
827-
});
828-
829806
describe('legacy sqlite (without pooling) handles concurrent insights db process without throwing', () => {
830807
let initialFlushBatchSize: number;
831808
let insightsConfig: InsightsConfig;

packages/cli/src/modules/insights/insights-collection.service.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,6 @@ export class InsightsCollectionService {
7676
startFlushingTimer() {
7777
this.isAsynchronouslySavingInsights = true;
7878
this.scheduleFlushing();
79-
this.logger.debug('Started flushing timer');
8079
}
8180

8281
scheduleFlushing() {
Lines changed: 2 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,15 @@
1-
import { Logger } from '@n8n/backend-common';
21
import type { BaseN8nModule } from '@n8n/decorators';
3-
import { N8nModule, OnLeaderStepdown, OnLeaderTakeover } from '@n8n/decorators';
4-
import { InstanceSettings } from 'n8n-core';
2+
import { N8nModule } from '@n8n/decorators';
53

64
import { InsightsService } from './insights.service';
75

86
import './insights.controller';
97

108
@N8nModule()
119
export class InsightsModule implements BaseN8nModule {
12-
constructor(
13-
private readonly logger: Logger,
14-
private readonly insightsService: InsightsService,
15-
private readonly instanceSettings: InstanceSettings,
16-
) {
17-
this.logger = this.logger.scoped('insights');
18-
}
10+
constructor(private readonly insightsService: InsightsService) {}
1911

2012
initialize() {
21-
// We want to initialize the insights background process (schedulers) for the main leader instance
22-
if (this.instanceSettings.isLeader) {
23-
this.insightsService.startTimers();
24-
} else if (this.instanceSettings.instanceType === 'webhook') {
25-
// Webhook instances collect insights data independently
26-
// So we only start the collection timers, compaction and pruning are done by the main instance
27-
this.insightsService.startTimers({ onlyCollection: true });
28-
}
29-
}
30-
31-
@OnLeaderTakeover()
32-
startBackgroundProcess() {
3313
this.insightsService.startTimers();
3414
}
35-
36-
@OnLeaderStepdown()
37-
stopBackgroundProcess() {
38-
this.insightsService.stopTimers();
39-
}
4015
}

packages/cli/src/modules/insights/insights.service.ts

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,9 @@ import {
44
INSIGHTS_DATE_RANGE_KEYS,
55
} from '@n8n/api-types';
66
import { LicenseState, Logger } from '@n8n/backend-common';
7-
import { OnShutdown } from '@n8n/decorators';
7+
import { OnLeaderStepdown, OnLeaderTakeover, OnShutdown } from '@n8n/decorators';
88
import { Service } from '@n8n/di';
9+
import { InstanceSettings } from 'n8n-core';
910
import { UserError } from 'n8n-workflow';
1011

1112
import type { PeriodUnit, TypeUnit } from './database/entities/insights-shared';
@@ -33,26 +34,33 @@ export class InsightsService {
3334
private readonly collectionService: InsightsCollectionService,
3435
private readonly pruningService: InsightsPruningService,
3536
private readonly licenseState: LicenseState,
37+
private readonly instanceSettings: InstanceSettings,
3638
private readonly logger: Logger,
3739
) {
3840
this.logger = this.logger.scoped('insights');
3941
}
4042

41-
startTimers({ onlyCollection } = { onlyCollection: false }) {
43+
@OnLeaderTakeover()
44+
startTimers() {
45+
if (!this.instanceSettings.isLeader && this.instanceSettings.instanceType !== 'webhook') {
46+
return;
47+
}
48+
4249
this.collectionService.startFlushingTimer();
43-
if (!onlyCollection) {
50+
this.logger.debug('Started collection flushing scheduler');
51+
52+
// Start compaction and pruning timers for main leader instance only
53+
if (this.instanceSettings.instanceType !== 'webhook') {
4454
this.compactionService.startCompactionTimer();
55+
this.logger.debug('Started compaction scheduler');
4556
if (this.pruningService.isPruningEnabled) {
4657
this.pruningService.startPruningTimer();
58+
this.logger.debug('Started pruning scheduler');
4759
}
4860
}
49-
this.logger.debug(
50-
onlyCollection
51-
? 'Starting collection flushing schedulers'
52-
: 'Starting compaction, flushing and pruning schedulers',
53-
);
5461
}
5562

63+
@OnLeaderStepdown()
5664
stopTimers() {
5765
this.compactionService.stopCompactionTimer();
5866
this.collectionService.stopFlushingTimer();

0 commit comments

Comments
 (0)