-
Notifications
You must be signed in to change notification settings - Fork 67
revert: rollback "chore: add is_container_env to telemetry MCP-2 #330
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,152 +7,114 @@ import { MACHINE_METADATA } from "./constants.js"; | |
import { EventCache } from "./eventCache.js"; | ||
import nodeMachineId from "node-machine-id"; | ||
import { getDeviceId } from "@mongodb-js/device-id"; | ||
import fs from "fs/promises"; | ||
|
||
async function fileExists(filePath: string): Promise<boolean> { | ||
try { | ||
await fs.access(filePath, fs.constants.F_OK); | ||
return true; // File exists | ||
} catch (e: unknown) { | ||
if ( | ||
e instanceof Error && | ||
( | ||
e as Error & { | ||
code: string; | ||
} | ||
).code === "ENOENT" | ||
) { | ||
return false; // File does not exist | ||
} | ||
throw e; // Re-throw unexpected errors | ||
} | ||
} | ||
|
||
async function isContainerized(): Promise<boolean> { | ||
if (process.env.container) { | ||
return true; | ||
} | ||
|
||
const exists = await Promise.all(["/.dockerenv", "/run/.containerenv", "/var/run/.containerenv"].map(fileExists)); | ||
type EventResult = { | ||
success: boolean; | ||
error?: Error; | ||
}; | ||
|
||
return exists.includes(true); | ||
} | ||
export const DEVICE_ID_TIMEOUT = 3000; | ||
|
||
export class Telemetry { | ||
private isBufferingEvents: boolean = true; | ||
/** Resolves when the device ID is retrieved or timeout occurs */ | ||
public deviceIdPromise: Promise<string> | undefined; | ||
private deviceIdAbortController = new AbortController(); | ||
private eventCache: EventCache; | ||
private getRawMachineId: () => Promise<string>; | ||
private getContainerEnv: () => Promise<boolean>; | ||
private cachedCommonProperties?: CommonProperties; | ||
private flushing: boolean = false; | ||
|
||
private constructor( | ||
private readonly session: Session, | ||
private readonly userConfig: UserConfig, | ||
{ | ||
eventCache, | ||
getRawMachineId, | ||
getContainerEnv, | ||
}: { | ||
eventCache: EventCache; | ||
getRawMachineId: () => Promise<string>; | ||
getContainerEnv: () => Promise<boolean>; | ||
} | ||
private readonly commonProperties: CommonProperties, | ||
{ eventCache, getRawMachineId }: { eventCache: EventCache; getRawMachineId: () => Promise<string> } | ||
) { | ||
this.eventCache = eventCache; | ||
this.getRawMachineId = getRawMachineId; | ||
this.getContainerEnv = getContainerEnv; | ||
} | ||
|
||
static create( | ||
session: Session, | ||
userConfig: UserConfig, | ||
{ | ||
commonProperties = { ...MACHINE_METADATA }, | ||
eventCache = EventCache.getInstance(), | ||
getRawMachineId = () => nodeMachineId.machineId(true), | ||
getContainerEnv = isContainerized, | ||
}: { | ||
eventCache?: EventCache; | ||
getRawMachineId?: () => Promise<string>; | ||
getContainerEnv?: () => Promise<boolean>; | ||
commonProperties?: CommonProperties; | ||
} = {} | ||
): Telemetry { | ||
const instance = new Telemetry(session, userConfig, { | ||
eventCache, | ||
getRawMachineId, | ||
getContainerEnv, | ||
}); | ||
const instance = new Telemetry(session, userConfig, commonProperties, { eventCache, getRawMachineId }); | ||
|
||
void instance.start(); | ||
return instance; | ||
} | ||
|
||
private async start(): Promise<void> { | ||
if (!this.isTelemetryEnabled()) { | ||
return; | ||
} | ||
this.deviceIdPromise = getDeviceId({ | ||
getMachineId: () => this.getRawMachineId(), | ||
onError: (reason, error) => { | ||
switch (reason) { | ||
case "resolutionError": | ||
logger.debug(LogId.telemetryDeviceIdFailure, "telemetry", String(error)); | ||
break; | ||
case "timeout": | ||
logger.debug(LogId.telemetryDeviceIdTimeout, "telemetry", "Device ID retrieval timed out"); | ||
break; | ||
case "abort": | ||
// No need to log in the case of aborts | ||
break; | ||
} | ||
}, | ||
abortSignal: this.deviceIdAbortController.signal, | ||
}); | ||
|
||
this.commonProperties.device_id = await this.deviceIdPromise; | ||
|
||
this.isBufferingEvents = false; | ||
} | ||
|
||
public async close(): Promise<void> { | ||
this.deviceIdAbortController.abort(); | ||
await this.flush(); | ||
this.isBufferingEvents = false; | ||
await this.emitEvents(this.eventCache.getEvents()); | ||
} | ||
|
||
/** | ||
* Emits events through the telemetry pipeline | ||
* @param events - The events to emit | ||
*/ | ||
public emitEvents(events: BaseEvent[]): void { | ||
void this.flush(events); | ||
public async emitEvents(events: BaseEvent[]): Promise<void> { | ||
try { | ||
if (!this.isTelemetryEnabled()) { | ||
logger.info(LogId.telemetryEmitFailure, "telemetry", `Telemetry is disabled.`); | ||
return; | ||
} | ||
|
||
await this.emit(events); | ||
} catch { | ||
logger.debug(LogId.telemetryEmitFailure, "telemetry", `Error emitting telemetry events.`); | ||
} | ||
} | ||
|
||
/** | ||
* Gets the common properties for events | ||
* @returns Object containing common properties for all events | ||
*/ | ||
private async getCommonProperties(): Promise<CommonProperties> { | ||
if (!this.cachedCommonProperties) { | ||
let deviceId: string | undefined; | ||
let containerEnv: boolean | undefined; | ||
try { | ||
await Promise.all([ | ||
getDeviceId({ | ||
getMachineId: () => this.getRawMachineId(), | ||
onError: (reason, error) => { | ||
switch (reason) { | ||
case "resolutionError": | ||
logger.debug(LogId.telemetryDeviceIdFailure, "telemetry", String(error)); | ||
break; | ||
case "timeout": | ||
logger.debug( | ||
LogId.telemetryDeviceIdTimeout, | ||
"telemetry", | ||
"Device ID retrieval timed out" | ||
); | ||
break; | ||
case "abort": | ||
// No need to log in the case of aborts | ||
break; | ||
} | ||
}, | ||
abortSignal: this.deviceIdAbortController.signal, | ||
}).then((id) => { | ||
deviceId = id; | ||
}), | ||
this.getContainerEnv().then((env) => { | ||
containerEnv = env; | ||
}), | ||
]); | ||
} catch (error: unknown) { | ||
const err = error instanceof Error ? error : new Error(String(error)); | ||
logger.debug(LogId.telemetryDeviceIdFailure, "telemetry", err.message); | ||
} | ||
this.cachedCommonProperties = { | ||
...MACHINE_METADATA, | ||
mcp_client_version: this.session.agentRunner?.version, | ||
mcp_client_name: this.session.agentRunner?.name, | ||
session_id: this.session.sessionId, | ||
config_atlas_auth: this.session.apiClient.hasCredentials() ? "true" : "false", | ||
config_connection_string: this.userConfig.connectionString ? "true" : "false", | ||
is_container_env: containerEnv ? "true" : "false", | ||
device_id: deviceId, | ||
}; | ||
} | ||
|
||
return this.cachedCommonProperties; | ||
public getCommonProperties(): CommonProperties { | ||
return { | ||
...this.commonProperties, | ||
mcp_client_version: this.session.agentRunner?.version, | ||
mcp_client_name: this.session.agentRunner?.name, | ||
session_id: this.session.sessionId, | ||
config_atlas_auth: this.session.apiClient.hasCredentials() ? "true" : "false", | ||
config_connection_string: this.userConfig.connectionString ? "true" : "false", | ||
}; | ||
} | ||
|
||
/** | ||
|
@@ -173,74 +135,60 @@ export class Telemetry { | |
} | ||
|
||
/** | ||
* Attempts to flush events through authenticated and unauthenticated clients | ||
* Attempts to emit events through authenticated and unauthenticated clients | ||
* Falls back to caching if both attempts fail | ||
*/ | ||
public async flush(events?: BaseEvent[]): Promise<void> { | ||
if (!this.isTelemetryEnabled()) { | ||
logger.info(LogId.telemetryEmitFailure, "telemetry", `Telemetry is disabled.`); | ||
return; | ||
} | ||
|
||
if (this.flushing) { | ||
this.eventCache.appendEvents(events ?? []); | ||
process.nextTick(async () => { | ||
// try again if in the middle of a flush | ||
await this.flush(); | ||
}); | ||
private async emit(events: BaseEvent[]): Promise<void> { | ||
if (this.isBufferingEvents) { | ||
this.eventCache.appendEvents(events); | ||
return; | ||
} | ||
|
||
this.flushing = true; | ||
const cachedEvents = this.eventCache.getEvents(); | ||
const allEvents = [...cachedEvents, ...events]; | ||
|
||
try { | ||
const cachedEvents = this.eventCache.getEvents(); | ||
const allEvents = [...cachedEvents, ...(events ?? [])]; | ||
if (allEvents.length <= 0) { | ||
this.flushing = false; | ||
return; | ||
} | ||
|
||
logger.debug( | ||
LogId.telemetryEmitStart, | ||
"telemetry", | ||
`Attempting to send ${allEvents.length} events (${cachedEvents.length} cached)` | ||
); | ||
logger.debug( | ||
LogId.telemetryEmitStart, | ||
"telemetry", | ||
`Attempting to send ${allEvents.length} events (${cachedEvents.length} cached)` | ||
); | ||
|
||
await this.sendEvents(this.session.apiClient, allEvents); | ||
const result = await this.sendEvents(this.session.apiClient, allEvents); | ||
if (result.success) { | ||
this.eventCache.clearEvents(); | ||
logger.debug( | ||
LogId.telemetryEmitSuccess, | ||
"telemetry", | ||
`Sent ${allEvents.length} events successfully: ${JSON.stringify(allEvents, null, 2)}` | ||
); | ||
} catch (error: unknown) { | ||
logger.debug( | ||
LogId.telemetryEmitFailure, | ||
"telemetry", | ||
`Error sending event to client: ${error instanceof Error ? error.message : String(error)}` | ||
); | ||
this.eventCache.appendEvents(events ?? []); | ||
process.nextTick(async () => { | ||
// try again | ||
await this.flush(); | ||
}); | ||
return; | ||
} | ||
|
||
this.flushing = false; | ||
logger.debug( | ||
LogId.telemetryEmitFailure, | ||
"telemetry", | ||
`Error sending event to client: ${result.error instanceof Error ? result.error.message : String(result.error)}` | ||
); | ||
this.eventCache.appendEvents(events); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [nitpick] On send failure the code appends events back to the cache but does not schedule an immediate retry. Consider documenting that retry happens on the next emit or add an explicit retry trigger to avoid long delays in flushing backlog. Copilot uses AI. Check for mistakes. Positive FeedbackNegative Feedback |
||
} | ||
|
||
/** | ||
* Attempts to send events through the provided API client | ||
*/ | ||
private async sendEvents(client: ApiClient, events: BaseEvent[]): Promise<void> { | ||
const commonProperties = await this.getCommonProperties(); | ||
|
||
await client.sendEvents( | ||
events.map((event) => ({ | ||
...event, | ||
properties: { ...commonProperties, ...event.properties }, | ||
})) | ||
); | ||
private async sendEvents(client: ApiClient, events: BaseEvent[]): Promise<EventResult> { | ||
try { | ||
await client.sendEvents( | ||
events.map((event) => ({ | ||
...event, | ||
properties: { ...this.getCommonProperties(), ...event.properties }, | ||
})) | ||
); | ||
return { success: true }; | ||
} catch (error) { | ||
return { | ||
success: false, | ||
error: error instanceof Error ? error : new Error(String(error)), | ||
}; | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -74,12 +74,12 @@ export abstract class ToolBase { | |
logger.debug(LogId.toolExecute, "tool", `Executing tool ${this.name}`); | ||
|
||
const result = await this.execute(...args); | ||
this.emitToolEvent(startTime, result, ...args); | ||
await this.emitToolEvent(startTime, result, ...args).catch(() => {}); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [nitpick] Silently swallowing errors may make debugging failures difficult. Consider logging the caught error or at least adding a comment explaining why it's safe to ignore. Copilot uses AI. Check for mistakes. Positive FeedbackNegative Feedback |
||
return result; | ||
} catch (error: unknown) { | ||
logger.error(LogId.toolExecuteFailure, "tool", `Error executing ${this.name}: ${error as string}`); | ||
const toolResult = await this.handleError(error, args[0] as ToolArgs<typeof this.argsShape>); | ||
this.emitToolEvent(startTime, toolResult, ...args); | ||
await this.emitToolEvent(startTime, toolResult, ...args).catch(() => {}); | ||
return toolResult; | ||
} | ||
}; | ||
|
@@ -179,11 +179,11 @@ export abstract class ToolBase { | |
* @param result - Whether the command succeeded or failed | ||
* @param args - The arguments passed to the tool | ||
*/ | ||
private emitToolEvent( | ||
private async emitToolEvent( | ||
startTime: number, | ||
result: CallToolResult, | ||
...args: Parameters<ToolCallback<typeof this.argsShape>> | ||
): void { | ||
): Promise<void> { | ||
if (!this.telemetry.isTelemetryEnabled()) { | ||
return; | ||
} | ||
|
@@ -209,6 +209,6 @@ export abstract class ToolBase { | |
event.properties.project_id = metadata.projectId; | ||
} | ||
|
||
this.telemetry.emitEvents([event]); | ||
await this.telemetry.emitEvents([event]); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] This empty
.catch()
will swallow any errors fromemitEvents
. It may be worth logging an error or warning to help diagnose unexpected telemetry failures.Copilot uses AI. Check for mistakes.