@@ -7,152 +7,114 @@ import { MACHINE_METADATA } from "./constants.js";
7
7
import { EventCache } from "./eventCache.js" ;
8
8
import nodeMachineId from "node-machine-id" ;
9
9
import { getDeviceId } from "@mongodb-js/device-id" ;
10
- import fs from "fs/promises" ;
11
-
12
- async function fileExists ( filePath : string ) : Promise < boolean > {
13
- try {
14
- await fs . access ( filePath , fs . constants . F_OK ) ;
15
- return true ; // File exists
16
- } catch ( e : unknown ) {
17
- if (
18
- e instanceof Error &&
19
- (
20
- e as Error & {
21
- code : string ;
22
- }
23
- ) . code === "ENOENT"
24
- ) {
25
- return false ; // File does not exist
26
- }
27
- throw e ; // Re-throw unexpected errors
28
- }
29
- }
30
10
31
- async function isContainerized ( ) : Promise < boolean > {
32
- if ( process . env . container ) {
33
- return true ;
34
- }
35
-
36
- const exists = await Promise . all ( [ "/.dockerenv" , "/run/.containerenv" , "/var/run/.containerenv" ] . map ( fileExists ) ) ;
11
+ type EventResult = {
12
+ success : boolean ;
13
+ error ?: Error ;
14
+ } ;
37
15
38
- return exists . includes ( true ) ;
39
- }
16
+ export const DEVICE_ID_TIMEOUT = 3000 ;
40
17
41
18
export class Telemetry {
19
+ private isBufferingEvents : boolean = true ;
20
+ /** Resolves when the device ID is retrieved or timeout occurs */
21
+ public deviceIdPromise : Promise < string > | undefined ;
42
22
private deviceIdAbortController = new AbortController ( ) ;
43
23
private eventCache : EventCache ;
44
24
private getRawMachineId : ( ) => Promise < string > ;
45
- private getContainerEnv : ( ) => Promise < boolean > ;
46
- private cachedCommonProperties ?: CommonProperties ;
47
- private flushing : boolean = false ;
48
25
49
26
private constructor (
50
27
private readonly session : Session ,
51
28
private readonly userConfig : UserConfig ,
52
- {
53
- eventCache,
54
- getRawMachineId,
55
- getContainerEnv,
56
- } : {
57
- eventCache : EventCache ;
58
- getRawMachineId : ( ) => Promise < string > ;
59
- getContainerEnv : ( ) => Promise < boolean > ;
60
- }
29
+ private readonly commonProperties : CommonProperties ,
30
+ { eventCache, getRawMachineId } : { eventCache : EventCache ; getRawMachineId : ( ) => Promise < string > }
61
31
) {
62
32
this . eventCache = eventCache ;
63
33
this . getRawMachineId = getRawMachineId ;
64
- this . getContainerEnv = getContainerEnv ;
65
34
}
66
35
67
36
static create (
68
37
session : Session ,
69
38
userConfig : UserConfig ,
70
39
{
40
+ commonProperties = { ...MACHINE_METADATA } ,
71
41
eventCache = EventCache . getInstance ( ) ,
72
42
getRawMachineId = ( ) => nodeMachineId . machineId ( true ) ,
73
- getContainerEnv = isContainerized ,
74
43
} : {
75
44
eventCache ?: EventCache ;
76
45
getRawMachineId ?: ( ) => Promise < string > ;
77
- getContainerEnv ?: ( ) => Promise < boolean > ;
46
+ commonProperties ?: CommonProperties ;
78
47
} = { }
79
48
) : Telemetry {
80
- const instance = new Telemetry ( session , userConfig , {
81
- eventCache,
82
- getRawMachineId,
83
- getContainerEnv,
84
- } ) ;
49
+ const instance = new Telemetry ( session , userConfig , commonProperties , { eventCache, getRawMachineId } ) ;
85
50
51
+ void instance . start ( ) ;
86
52
return instance ;
87
53
}
88
54
55
+ private async start ( ) : Promise < void > {
56
+ if ( ! this . isTelemetryEnabled ( ) ) {
57
+ return ;
58
+ }
59
+ this . deviceIdPromise = getDeviceId ( {
60
+ getMachineId : ( ) => this . getRawMachineId ( ) ,
61
+ onError : ( reason , error ) => {
62
+ switch ( reason ) {
63
+ case "resolutionError" :
64
+ logger . debug ( LogId . telemetryDeviceIdFailure , "telemetry" , String ( error ) ) ;
65
+ break ;
66
+ case "timeout" :
67
+ logger . debug ( LogId . telemetryDeviceIdTimeout , "telemetry" , "Device ID retrieval timed out" ) ;
68
+ break ;
69
+ case "abort" :
70
+ // No need to log in the case of aborts
71
+ break ;
72
+ }
73
+ } ,
74
+ abortSignal : this . deviceIdAbortController . signal ,
75
+ } ) ;
76
+
77
+ this . commonProperties . device_id = await this . deviceIdPromise ;
78
+
79
+ this . isBufferingEvents = false ;
80
+ }
81
+
89
82
public async close ( ) : Promise < void > {
90
83
this . deviceIdAbortController . abort ( ) ;
91
- await this . flush ( ) ;
84
+ this . isBufferingEvents = false ;
85
+ await this . emitEvents ( this . eventCache . getEvents ( ) ) ;
92
86
}
93
87
94
88
/**
95
89
* Emits events through the telemetry pipeline
96
90
* @param events - The events to emit
97
91
*/
98
- public emitEvents ( events : BaseEvent [ ] ) : void {
99
- void this . flush ( events ) ;
92
+ public async emitEvents ( events : BaseEvent [ ] ) : Promise < void > {
93
+ try {
94
+ if ( ! this . isTelemetryEnabled ( ) ) {
95
+ logger . info ( LogId . telemetryEmitFailure , "telemetry" , `Telemetry is disabled.` ) ;
96
+ return ;
97
+ }
98
+
99
+ await this . emit ( events ) ;
100
+ } catch {
101
+ logger . debug ( LogId . telemetryEmitFailure , "telemetry" , `Error emitting telemetry events.` ) ;
102
+ }
100
103
}
101
104
102
105
/**
103
106
* Gets the common properties for events
104
107
* @returns Object containing common properties for all events
105
108
*/
106
- private async getCommonProperties ( ) : Promise < CommonProperties > {
107
- if ( ! this . cachedCommonProperties ) {
108
- let deviceId : string | undefined ;
109
- let containerEnv : boolean | undefined ;
110
- try {
111
- await Promise . all ( [
112
- getDeviceId ( {
113
- getMachineId : ( ) => this . getRawMachineId ( ) ,
114
- onError : ( reason , error ) => {
115
- switch ( reason ) {
116
- case "resolutionError" :
117
- logger . debug ( LogId . telemetryDeviceIdFailure , "telemetry" , String ( error ) ) ;
118
- break ;
119
- case "timeout" :
120
- logger . debug (
121
- LogId . telemetryDeviceIdTimeout ,
122
- "telemetry" ,
123
- "Device ID retrieval timed out"
124
- ) ;
125
- break ;
126
- case "abort" :
127
- // No need to log in the case of aborts
128
- break ;
129
- }
130
- } ,
131
- abortSignal : this . deviceIdAbortController . signal ,
132
- } ) . then ( ( id ) => {
133
- deviceId = id ;
134
- } ) ,
135
- this . getContainerEnv ( ) . then ( ( env ) => {
136
- containerEnv = env ;
137
- } ) ,
138
- ] ) ;
139
- } catch ( error : unknown ) {
140
- const err = error instanceof Error ? error : new Error ( String ( error ) ) ;
141
- logger . debug ( LogId . telemetryDeviceIdFailure , "telemetry" , err . message ) ;
142
- }
143
- this . cachedCommonProperties = {
144
- ...MACHINE_METADATA ,
145
- mcp_client_version : this . session . agentRunner ?. version ,
146
- mcp_client_name : this . session . agentRunner ?. name ,
147
- session_id : this . session . sessionId ,
148
- config_atlas_auth : this . session . apiClient . hasCredentials ( ) ? "true" : "false" ,
149
- config_connection_string : this . userConfig . connectionString ? "true" : "false" ,
150
- is_container_env : containerEnv ? "true" : "false" ,
151
- device_id : deviceId ,
152
- } ;
153
- }
154
-
155
- return this . cachedCommonProperties ;
109
+ public getCommonProperties ( ) : CommonProperties {
110
+ return {
111
+ ...this . commonProperties ,
112
+ mcp_client_version : this . session . agentRunner ?. version ,
113
+ mcp_client_name : this . session . agentRunner ?. name ,
114
+ session_id : this . session . sessionId ,
115
+ config_atlas_auth : this . session . apiClient . hasCredentials ( ) ? "true" : "false" ,
116
+ config_connection_string : this . userConfig . connectionString ? "true" : "false" ,
117
+ } ;
156
118
}
157
119
158
120
/**
@@ -173,74 +135,60 @@ export class Telemetry {
173
135
}
174
136
175
137
/**
176
- * Attempts to flush events through authenticated and unauthenticated clients
138
+ * Attempts to emit events through authenticated and unauthenticated clients
177
139
* Falls back to caching if both attempts fail
178
140
*/
179
- public async flush ( events ?: BaseEvent [ ] ) : Promise < void > {
180
- if ( ! this . isTelemetryEnabled ( ) ) {
181
- logger . info ( LogId . telemetryEmitFailure , "telemetry" , `Telemetry is disabled.` ) ;
182
- return ;
183
- }
184
-
185
- if ( this . flushing ) {
186
- this . eventCache . appendEvents ( events ?? [ ] ) ;
187
- process . nextTick ( async ( ) => {
188
- // try again if in the middle of a flush
189
- await this . flush ( ) ;
190
- } ) ;
141
+ private async emit ( events : BaseEvent [ ] ) : Promise < void > {
142
+ if ( this . isBufferingEvents ) {
143
+ this . eventCache . appendEvents ( events ) ;
191
144
return ;
192
145
}
193
146
194
- this . flushing = true ;
147
+ const cachedEvents = this . eventCache . getEvents ( ) ;
148
+ const allEvents = [ ...cachedEvents , ...events ] ;
195
149
196
- try {
197
- const cachedEvents = this . eventCache . getEvents ( ) ;
198
- const allEvents = [ ...cachedEvents , ...( events ?? [ ] ) ] ;
199
- if ( allEvents . length <= 0 ) {
200
- this . flushing = false ;
201
- return ;
202
- }
203
-
204
- logger . debug (
205
- LogId . telemetryEmitStart ,
206
- "telemetry" ,
207
- `Attempting to send ${ allEvents . length } events (${ cachedEvents . length } cached)`
208
- ) ;
150
+ logger . debug (
151
+ LogId . telemetryEmitStart ,
152
+ "telemetry" ,
153
+ `Attempting to send ${ allEvents . length } events (${ cachedEvents . length } cached)`
154
+ ) ;
209
155
210
- await this . sendEvents ( this . session . apiClient , allEvents ) ;
156
+ const result = await this . sendEvents ( this . session . apiClient , allEvents ) ;
157
+ if ( result . success ) {
211
158
this . eventCache . clearEvents ( ) ;
212
159
logger . debug (
213
160
LogId . telemetryEmitSuccess ,
214
161
"telemetry" ,
215
162
`Sent ${ allEvents . length } events successfully: ${ JSON . stringify ( allEvents , null , 2 ) } `
216
163
) ;
217
- } catch ( error : unknown ) {
218
- logger . debug (
219
- LogId . telemetryEmitFailure ,
220
- "telemetry" ,
221
- `Error sending event to client: ${ error instanceof Error ? error . message : String ( error ) } `
222
- ) ;
223
- this . eventCache . appendEvents ( events ?? [ ] ) ;
224
- process . nextTick ( async ( ) => {
225
- // try again
226
- await this . flush ( ) ;
227
- } ) ;
164
+ return ;
228
165
}
229
166
230
- this . flushing = false ;
167
+ logger . debug (
168
+ LogId . telemetryEmitFailure ,
169
+ "telemetry" ,
170
+ `Error sending event to client: ${ result . error instanceof Error ? result . error . message : String ( result . error ) } `
171
+ ) ;
172
+ this . eventCache . appendEvents ( events ) ;
231
173
}
232
174
233
175
/**
234
176
* Attempts to send events through the provided API client
235
177
*/
236
- private async sendEvents ( client : ApiClient , events : BaseEvent [ ] ) : Promise < void > {
237
- const commonProperties = await this . getCommonProperties ( ) ;
238
-
239
- await client . sendEvents (
240
- events . map ( ( event ) => ( {
241
- ...event ,
242
- properties : { ...commonProperties , ...event . properties } ,
243
- } ) )
244
- ) ;
178
+ private async sendEvents ( client : ApiClient , events : BaseEvent [ ] ) : Promise < EventResult > {
179
+ try {
180
+ await client . sendEvents (
181
+ events . map ( ( event ) => ( {
182
+ ...event ,
183
+ properties : { ...this . getCommonProperties ( ) , ...event . properties } ,
184
+ } ) )
185
+ ) ;
186
+ return { success : true } ;
187
+ } catch ( error ) {
188
+ return {
189
+ success : false ,
190
+ error : error instanceof Error ? error : new Error ( String ( error ) ) ,
191
+ } ;
192
+ }
245
193
}
246
194
}
0 commit comments