diff --git a/src/agents/tracing/processors.py b/src/agents/tracing/processors.py index 2913b11a4..73f733125 100644 --- a/src/agents/tracing/processors.py +++ b/src/agents/tracing/processors.py @@ -188,10 +188,27 @@ def __init__( # Track when we next *must* perform a scheduled export self._next_export_time = time.time() + self._schedule_delay - self._worker_thread = threading.Thread(target=self._run, daemon=True) - self._worker_thread.start() + # We lazily start the background worker thread the first time a span/trace is queued. + self._worker_thread: threading.Thread | None = None + self._thread_start_lock = threading.Lock() + + def _ensure_thread_started(self) -> None: + # Fast path without holding the lock + if self._worker_thread and self._worker_thread.is_alive(): + return + + # Double-checked locking to avoid starting multiple threads + with self._thread_start_lock: + if self._worker_thread and self._worker_thread.is_alive(): + return + + self._worker_thread = threading.Thread(target=self._run, daemon=True) + self._worker_thread.start() def on_trace_start(self, trace: Trace) -> None: + # Ensure the background worker is running before we enqueue anything. + self._ensure_thread_started() + try: self._queue.put_nowait(trace) except queue.Full: @@ -206,6 +223,9 @@ def on_span_start(self, span: Span[Any]) -> None: pass def on_span_end(self, span: Span[Any]) -> None: + # Ensure the background worker is running before we enqueue anything. + self._ensure_thread_started() + try: self._queue.put_nowait(span) except queue.Full: @@ -216,7 +236,13 @@ def shutdown(self, timeout: float | None = None): Called when the application stops. We signal our thread to stop, then join it. """ self._shutdown_event.set() - self._worker_thread.join(timeout=timeout) + + # Only join if we ever started the background thread; otherwise flush synchronously. + if self._worker_thread and self._worker_thread.is_alive(): + self._worker_thread.join(timeout=timeout) + else: + # No background thread: process any remaining items synchronously. + self._export_batches(force=True) def force_flush(self): """