Skip to content

Anthropic - Token tracking for Passthrough Batch API calls #11388

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 34 additions & 28 deletions litellm/cost_calculator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1209,28 +1209,7 @@ def batch_cost_calculator(
return total_prompt_cost, total_completion_cost


class RealtimeAPITokenUsageProcessor:
@staticmethod
def collect_usage_from_realtime_stream_results(
results: OpenAIRealtimeStreamList,
) -> List[Usage]:
"""
Collect usage from realtime stream results
"""
response_done_events: List[OpenAIRealtimeStreamResponseBaseObject] = cast(
List[OpenAIRealtimeStreamResponseBaseObject],
[result for result in results if result["type"] == "response.done"],
)
usage_objects: List[Usage] = []
for result in response_done_events:
usage_object = (
ResponseAPILoggingUtils._transform_response_api_usage_to_chat_usage(
result["response"].get("usage", {})
)
)
usage_objects.append(usage_object)
return usage_objects

class BaseTokenUsageProcessor:
@staticmethod
def combine_usage_objects(usage_objects: List[Usage]) -> Usage:
"""
Expand Down Expand Up @@ -1266,13 +1245,17 @@ def combine_usage_objects(usage_objects: List[Usage]) -> Usage:
combined.prompt_tokens_details = PromptTokensDetailsWrapper()

# Check what keys exist in the model's prompt_tokens_details
for attr in dir(usage.prompt_tokens_details):
if not attr.startswith("_") and not callable(
getattr(usage.prompt_tokens_details, attr)
for attr in usage.prompt_tokens_details.model_fields:
if (
hasattr(usage.prompt_tokens_details, attr)
and not attr.startswith("_")
and not callable(getattr(usage.prompt_tokens_details, attr))
):
current_val = getattr(combined.prompt_tokens_details, attr, 0)
new_val = getattr(usage.prompt_tokens_details, attr, 0)
if new_val is not None:
current_val = (
getattr(combined.prompt_tokens_details, attr, 0) or 0
)
new_val = getattr(usage.prompt_tokens_details, attr, 0) or 0
if new_val is not None and isinstance(new_val, (int, float)):
setattr(
combined.prompt_tokens_details,
attr,
Expand Down Expand Up @@ -1308,6 +1291,29 @@ def combine_usage_objects(usage_objects: List[Usage]) -> Usage:

return combined


class RealtimeAPITokenUsageProcessor(BaseTokenUsageProcessor):
@staticmethod
def collect_usage_from_realtime_stream_results(
results: OpenAIRealtimeStreamList,
) -> List[Usage]:
"""
Collect usage from realtime stream results
"""
response_done_events: List[OpenAIRealtimeStreamResponseBaseObject] = cast(
List[OpenAIRealtimeStreamResponseBaseObject],
[result for result in results if result["type"] == "response.done"],
)
usage_objects: List[Usage] = []
for result in response_done_events:
usage_object = (
ResponseAPILoggingUtils._transform_response_api_usage_to_chat_usage(
result["response"].get("usage", {})
)
)
usage_objects.append(usage_object)
return usage_objects

@staticmethod
def collect_and_combine_usage_from_realtime_stream_results(
results: OpenAIRealtimeStreamList,
Expand Down
15 changes: 15 additions & 0 deletions litellm/llms/anthropic/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from typing import Type, Union

from .batches.transformation import AnthropicBatchesConfig
from .chat.transformation import AnthropicConfig

__all__ = ["AnthropicBatchesConfig", "AnthropicConfig"]


def get_anthropic_config(
url_route: str,
) -> Union[Type[AnthropicBatchesConfig], Type[AnthropicConfig]]:
if "messages/batches" in url_route and "results" in url_route:
return AnthropicBatchesConfig
else:
return AnthropicConfig
76 changes: 76 additions & 0 deletions litellm/llms/anthropic/batches/transformation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import json
from typing import TYPE_CHECKING, Any, Dict, List, Optional, cast

from httpx import Response

from litellm.types.llms.openai import AllMessageValues
from litellm.utils import ModelResponse

if TYPE_CHECKING:
from litellm.litellm_core_utils.litellm_logging import Logging as LiteLLMLoggingObj

LoggingClass = LiteLLMLoggingObj
else:
LoggingClass = Any


class AnthropicBatchesConfig:
def __init__(self):
from ..chat.transformation import AnthropicConfig

self.anthropic_chat_config = AnthropicConfig() # initialize once

def transform_response(
self,
model: str,
raw_response: Response,
model_response: ModelResponse,
logging_obj: LoggingClass,
request_data: Dict,
messages: List[AllMessageValues],
optional_params: Dict,
litellm_params: dict,
encoding: Any,
api_key: Optional[str] = None,
json_mode: Optional[bool] = None,
) -> ModelResponse:
from litellm.cost_calculator import BaseTokenUsageProcessor
from litellm.types.utils import Usage

response_text = raw_response.text.strip()
all_usage: List[Usage] = []

try:
# Split by newlines and try to parse each line as JSON
lines = response_text.split("\n")
for line in lines:
line = line.strip()
if not line:
continue
try:
response_json = json.loads(line)
# Update model_response with the parsed JSON
completion_response = response_json["result"]["message"]
transformed_response = (
self.anthropic_chat_config.transform_parsed_response(
completion_response=completion_response,
raw_response=raw_response,
model_response=model_response,
)
)

transformed_response_usage = getattr(
transformed_response, "usage", None
)
if transformed_response_usage:
all_usage.append(cast(Usage, transformed_response_usage))
except json.JSONDecodeError:
continue

## SUM ALL USAGE
combined_usage = BaseTokenUsageProcessor.combine_usage_objects(all_usage)
setattr(model_response, "usage", combined_usage)

return model_response
except Exception as e:
raise e
77 changes: 47 additions & 30 deletions litellm/llms/anthropic/chat/transformation.py
Original file line number Diff line number Diff line change
Expand Up @@ -784,44 +784,17 @@ def calculate_usage(
)
return usage

def transform_response(
def transform_parsed_response(
self,
model: str,
completion_response: dict,
raw_response: httpx.Response,
model_response: ModelResponse,
logging_obj: LoggingClass,
request_data: Dict,
messages: List[AllMessageValues],
optional_params: Dict,
litellm_params: dict,
encoding: Any,
api_key: Optional[str] = None,
json_mode: Optional[bool] = None,
) -> ModelResponse:
):
_hidden_params: Dict = {}
_hidden_params["additional_headers"] = process_anthropic_headers(
dict(raw_response.headers)
)
## LOGGING
logging_obj.post_call(
input=messages,
api_key=api_key,
original_response=raw_response.text,
additional_args={"complete_input_dict": request_data},
)

## RESPONSE OBJECT
try:
completion_response = raw_response.json()
except Exception as e:
response_headers = getattr(raw_response, "headers", None)
raise AnthropicError(
message="Unable to get json response - {}, Original Response: {}".format(
str(e), raw_response.text
),
status_code=raw_response.status_code,
headers=response_headers,
)
if "error" in completion_response:
response_headers = getattr(raw_response, "headers", None)
raise AnthropicError(
Expand Down Expand Up @@ -890,6 +863,50 @@ def transform_response(
model_response.model = completion_response["model"]

model_response._hidden_params = _hidden_params

return model_response

def transform_response(
self,
model: str,
raw_response: httpx.Response,
model_response: ModelResponse,
logging_obj: LoggingClass,
request_data: Dict,
messages: List[AllMessageValues],
optional_params: Dict,
litellm_params: dict,
encoding: Any,
api_key: Optional[str] = None,
json_mode: Optional[bool] = None,
) -> ModelResponse:
## LOGGING
logging_obj.post_call(
input=messages,
api_key=api_key,
original_response=raw_response.text,
additional_args={"complete_input_dict": request_data},
)

## RESPONSE OBJECT
try:
completion_response = raw_response.json()
except Exception as e:
response_headers = getattr(raw_response, "headers", None)
raise AnthropicError(
message="Unable to get json response - {}, Original Response: {}".format(
str(e), raw_response.text
),
status_code=raw_response.status_code,
headers=response_headers,
)

model_response = self.transform_parsed_response(
completion_response=completion_response,
raw_response=raw_response,
model_response=model_response,
json_mode=json_mode,
)
return model_response

@staticmethod
Expand Down
8 changes: 6 additions & 2 deletions litellm/llms/vertex_ai/common_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,16 +223,20 @@ def _filter_anyof_fields(schema_dict: Dict[str, Any]) -> Dict[str, Any]:
E.g. {"anyOf": [{"type": "string"}, {"type": "null"}], "default": "test", "title": "test"} -> {"anyOf": [{"type": "string", "title": "test"}, {"type": "null", "title": "test"}]}
"""
title = schema_dict.get("title", None)
description = schema_dict.get("description", None)

if isinstance(schema_dict, dict) and schema_dict.get("anyOf"):
any_of = schema_dict["anyOf"]
if (
title
(title or description)
and isinstance(any_of, list)
and all(isinstance(item, dict) for item in any_of)
):
for item in any_of:
item["title"] = title
if title:
item["title"] = title
if description:
item["description"] = description
return {"anyOf": any_of}
else:
return schema_dict
Expand Down
1 change: 0 additions & 1 deletion litellm/proxy/_experimental/out/onboarding.html

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
import litellm
from litellm._logging import verbose_proxy_logger
from litellm.litellm_core_utils.litellm_logging import Logging as LiteLLMLoggingObj
from litellm.llms.anthropic import get_anthropic_config
from litellm.llms.anthropic.chat.handler import (
ModelResponseIterator as AnthropicModelResponseIterator,
)
from litellm.llms.anthropic.chat.transformation import AnthropicConfig
from litellm.proxy._types import PassThroughEndpointLoggingTypedDict
from litellm.proxy.auth.auth_utils import get_end_user_id_from_request_body
from litellm.types.passthrough_endpoints.pass_through_endpoints import (
Expand Down Expand Up @@ -43,7 +43,8 @@ def anthropic_passthrough_handler(
Transforms Anthropic response to OpenAI response, generates a standard logging object so downstream logging can be handled
"""
model = response_body.get("model", "")
litellm_model_response: ModelResponse = AnthropicConfig().transform_response(
anthropic_config = get_anthropic_config(url_route)
litellm_model_response: ModelResponse = anthropic_config().transform_response(
raw_response=httpx_response,
model_response=litellm.ModelResponse(),
model=model,
Expand Down Expand Up @@ -124,9 +125,9 @@ def _create_anthropic_response_logging_payload(
litellm_model_response.id = logging_obj.litellm_call_id
litellm_model_response.model = model
logging_obj.model_call_details["model"] = model
logging_obj.model_call_details["custom_llm_provider"] = (
litellm.LlmProviders.ANTHROPIC.value
)
logging_obj.model_call_details[
"custom_llm_provider"
] = litellm.LlmProviders.ANTHROPIC.value
return kwargs
except Exception as e:
verbose_proxy_logger.exception(
Expand Down
Loading