Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions sentry_sdk/integrations/litellm.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,8 @@
) -> None:
"""Handle successful completion."""

span = _get_metadata_dict(kwargs).get("_sentry_span")
metadata = _get_metadata_dict(kwargs)
span = metadata.get("_sentry_span")
if span is None:
return

Expand Down Expand Up @@ -230,8 +231,13 @@
)

finally:
# Always finish the span and clean up
span.__exit__(None, None, None)
is_streaming = kwargs.get("stream")
# Callback is fired multiple times when streaming a response.
# Streaming flag checked at https://github.com/BerriAI/litellm/blob/33c3f13443eaf990ac8c6e3da78bddbc2b7d0e7a/litellm/litellm_core_utils/litellm_logging.py#L1603
if is_streaming is not True or "complete_streaming_response" in kwargs:

Check warning on line 237 in sentry_sdk/integrations/litellm.py

View check run for this annotation

@sentry/warden / warden: code-review

Failure callback lacks streaming-aware span handling

The `_success_callback` now correctly pops the span from metadata to prevent double exits during streaming, but `_failure_callback` still uses `.get()` instead of `.pop()` and doesn't check for streaming status. If `failure_callback` is also invoked multiple times during streaming (similar to success callbacks), or if both callbacks fire for the same request, this could still cause double span exits or resource leaks.
span = metadata.pop("_sentry_span", None)
if span is not None:
span.__exit__(None, None, None)


def _failure_callback(
Expand Down
114 changes: 114 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -1063,6 +1063,120 @@ def inner(response_content, serialize_pydantic=False, request_headers=None):
return inner


@pytest.fixture
def streaming_chat_completions_model_response():
return [
openai.types.chat.ChatCompletionChunk(
id="chatcmpl-test",
object="chat.completion.chunk",
created=10000000,
model="gpt-3.5-turbo",
choices=[
openai.types.chat.chat_completion_chunk.Choice(
index=0,
delta=openai.types.chat.chat_completion_chunk.ChoiceDelta(
role="assistant"
),
finish_reason=None,
),
],
),
openai.types.chat.ChatCompletionChunk(
id="chatcmpl-test",
object="chat.completion.chunk",
created=10000000,
model="gpt-3.5-turbo",
choices=[
openai.types.chat.chat_completion_chunk.Choice(
index=0,
delta=openai.types.chat.chat_completion_chunk.ChoiceDelta(
content="Tes"
),
finish_reason=None,
),
],
),
openai.types.chat.ChatCompletionChunk(
id="chatcmpl-test",
object="chat.completion.chunk",
created=10000000,
model="gpt-3.5-turbo",
choices=[
openai.types.chat.chat_completion_chunk.Choice(
index=0,
delta=openai.types.chat.chat_completion_chunk.ChoiceDelta(
content="t r"
),
finish_reason=None,
),
],
),
openai.types.chat.ChatCompletionChunk(
id="chatcmpl-test",
object="chat.completion.chunk",
created=10000000,
model="gpt-3.5-turbo",
choices=[
openai.types.chat.chat_completion_chunk.Choice(
index=0,
delta=openai.types.chat.chat_completion_chunk.ChoiceDelta(
content="esp"
),
finish_reason=None,
),
],
),
openai.types.chat.ChatCompletionChunk(
id="chatcmpl-test",
object="chat.completion.chunk",
created=10000000,
model="gpt-3.5-turbo",
choices=[
openai.types.chat.chat_completion_chunk.Choice(
index=0,
delta=openai.types.chat.chat_completion_chunk.ChoiceDelta(
content="ons"
),
finish_reason=None,
),
],
),
openai.types.chat.ChatCompletionChunk(
id="chatcmpl-test",
object="chat.completion.chunk",
created=10000000,
model="gpt-3.5-turbo",
choices=[
openai.types.chat.chat_completion_chunk.Choice(
index=0,
delta=openai.types.chat.chat_completion_chunk.ChoiceDelta(
content="e"
),
finish_reason=None,
),
],
),
openai.types.chat.ChatCompletionChunk(
id="chatcmpl-test",
object="chat.completion.chunk",
created=10000000,
model="gpt-3.5-turbo",
choices=[
openai.types.chat.chat_completion_chunk.Choice(
index=0,
delta=openai.types.chat.chat_completion_chunk.ChoiceDelta(),
finish_reason="stop",
),
],
usage=openai.types.CompletionUsage(
prompt_tokens=10,
completion_tokens=20,
total_tokens=30,
),
),
]


@pytest.fixture
def nonstreaming_responses_model_response():
return openai.types.responses.Response(
Expand Down
75 changes: 58 additions & 17 deletions tests/integrations/litellm/test_litellm.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,28 @@ async def __call__(self, *args, **kwargs):
)
from sentry_sdk.utils import package_version

from openai import OpenAI

from concurrent.futures import ThreadPoolExecutor

import litellm.utils as litellm_utils
from litellm.litellm_core_utils import streaming_handler
from litellm.litellm_core_utils import thread_pool_executor
from litellm.litellm_core_utils import litellm_logging


LITELLM_VERSION = package_version("litellm")


@pytest.fixture()
def reset_litellm_executor():
yield
thread_pool_executor.executor = ThreadPoolExecutor(max_workers=100)
litellm_utils.executor = thread_pool_executor.executor
streaming_handler.executor = thread_pool_executor.executor
litellm_logging.executor = thread_pool_executor.executor


@pytest.fixture
def clear_litellm_cache():
"""
Expand Down Expand Up @@ -212,7 +230,14 @@ def test_nonstreaming_chat_completion(
],
)
def test_streaming_chat_completion(
sentry_init, capture_events, send_default_pii, include_prompts
reset_litellm_executor,
sentry_init,
capture_events,
send_default_pii,
include_prompts,
get_model_response,
server_side_event_chunks,
streaming_chat_completions_model_response,
):
sentry_init(
integrations=[LiteLLMIntegration(include_prompts=include_prompts)],
Expand All @@ -222,29 +247,45 @@ def test_streaming_chat_completion(
events = capture_events()

messages = [{"role": "user", "content": "Hello!"}]
mock_response = MockCompletionResponse()

with start_transaction(name="litellm test"):
kwargs = {
"model": "gpt-3.5-turbo",
"messages": messages,
"stream": True,
}
client = OpenAI(api_key="z")

_input_callback(kwargs)
_success_callback(
kwargs,
mock_response,
datetime.now(),
datetime.now(),
)
model_response = get_model_response(
server_side_event_chunks(
streaming_chat_completions_model_response,
include_event_type=False,
),
request_headers={"X-Stainless-Raw-Response": "True"},
)

with mock.patch.object(
client.completions._client._client,
"send",
return_value=model_response,
):
with start_transaction(name="litellm test"):
response = litellm.completion(
model="gpt-3.5-turbo",
messages=messages,
client=client,
stream=True,
)
for _ in response:
pass

streaming_handler.executor.shutdown(wait=True)

assert len(events) == 1
(event,) = events

assert event["type"] == "transaction"
assert len(event["spans"]) == 1
(span,) = event["spans"]
chat_spans = list(
x
for x in event["spans"]
if x["op"] == OP.GEN_AI_CHAT and x["origin"] == "auto.ai.litellm"
)
assert len(chat_spans) == 1
span = chat_spans[0]

assert span["op"] == OP.GEN_AI_CHAT
assert span["data"][SPANDATA.GEN_AI_RESPONSE_STREAMING] is True
Expand Down
Loading