Skip to content

[Feat] Performance - Don't create 1 task for every hanging request alert #11385

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Jun 4, 2025
8 changes: 8 additions & 0 deletions litellm/caching/in_memory_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,3 +234,11 @@ async def async_get_ttl(self, key: str) -> Optional[int]:
Get the remaining TTL of a key in in-memory cache
"""
return self.ttl_dict.get(key, None)

async def async_get_oldest_n_keys(self, n: int) -> List[str]:
"""
Get the oldest n keys in the cache
"""
# sorted ttl dict by ttl
sorted_ttl_dict = sorted(self.ttl_dict.items(), key=lambda x: x[1])
return [key for key, _ in sorted_ttl_dict[:n]]
175 changes: 175 additions & 0 deletions litellm/integrations/SlackAlerting/hanging_request_check.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
"""
Class to check for LLM API hanging requests


Notes:
- Do not create tasks that sleep, that can saturate the event loop
- Do not store large objects (eg. messages in memory) that can increase RAM usage
"""

import asyncio
from typing import TYPE_CHECKING, Any, Optional

import litellm
from litellm._logging import verbose_proxy_logger
from litellm.caching.in_memory_cache import InMemoryCache
from litellm.litellm_core_utils.core_helpers import get_litellm_metadata_from_kwargs
from litellm.types.integrations.slack_alerting import (
HANGING_ALERT_BUFFER_TIME_SECONDS,
MAX_OLDEST_HANGING_REQUESTS_TO_CHECK,
HangingRequestData,
)

if TYPE_CHECKING:
from litellm.integrations.SlackAlerting.slack_alerting import SlackAlerting
else:
SlackAlerting = Any


class AlertingHangingRequestCheck:
"""
Class to safely handle checking hanging requests alerts
"""

def __init__(
self,
slack_alerting_object: SlackAlerting,
):
self.slack_alerting_object = slack_alerting_object
self.hanging_request_cache = InMemoryCache(
default_ttl=int(
self.slack_alerting_object.alerting_threshold
+ HANGING_ALERT_BUFFER_TIME_SECONDS
),
)

async def add_request_to_hanging_request_check(
self,
request_data: Optional[dict] = None,
):
"""
Add a request to the hanging request cache. This is the list of request_ids that gets periodicall checked for hanging requests
"""
if request_data is None:
return

request_metadata = get_litellm_metadata_from_kwargs(kwargs=request_data)
model = request_data.get("model", "")
api_base: Optional[str] = None

if request_data.get("deployment", None) is not None and isinstance(
request_data["deployment"], dict
):
api_base = litellm.get_api_base(
model=model,
optional_params=request_data["deployment"].get("litellm_params", {}),
)

hanging_request_data = HangingRequestData(
request_id=request_data.get("litellm_call_id", ""),
model=model,
api_base=api_base,
key_alias=request_metadata.get("user_api_key_alias", ""),
team_alias=request_metadata.get("user_api_key_team_alias", ""),
)

await self.hanging_request_cache.async_set_cache(
key=hanging_request_data.request_id,
value=hanging_request_data,
ttl=int(
self.slack_alerting_object.alerting_threshold
+ HANGING_ALERT_BUFFER_TIME_SECONDS
),
)
return

async def send_alerts_for_hanging_requests(self):
"""
Send alerts for hanging requests
"""
from litellm.proxy.proxy_server import proxy_logging_obj

#########################################################
# Find all requests that have been hanging for more than the alerting threshold
# Get the last 50 oldest items in the cache and check if they have completed
#########################################################
# check if request_id is in internal usage cache
if proxy_logging_obj.internal_usage_cache is None:
return

hanging_requests = await self.hanging_request_cache.async_get_oldest_n_keys(
n=MAX_OLDEST_HANGING_REQUESTS_TO_CHECK,
)

for request_id in hanging_requests:
hanging_request_data: Optional[HangingRequestData] = (
await self.hanging_request_cache.async_get_cache(
key=request_id,
)
)

if hanging_request_data is None:
continue

request_status = (
await proxy_logging_obj.internal_usage_cache.async_get_cache(
key="request_status:{}".format(hanging_request_data.request_id),
litellm_parent_otel_span=None,
local_only=True,
)
)
# this means the request status was either success or fail
# and is not hanging
if request_status is not None:
# clear this request from hanging request cache since the request was either success or failed
self.hanging_request_cache._remove_key(
key=request_id,
)
continue

################
# Send the Alert on Slack
################
await self.send_hanging_request_alert(
hanging_request_data=hanging_request_data
)

return

async def check_for_hanging_requests(
self,
):
"""
Background task that checks all request ids in self.hanging_request_cache to check if they have completed

Runs every alerting_threshold/2 seconds to check for hanging requests
"""
while True:
verbose_proxy_logger.debug("Checking for hanging requests....")
await self.send_alerts_for_hanging_requests()
await asyncio.sleep(self.slack_alerting_object.alerting_threshold / 2)

async def send_hanging_request_alert(
self,
hanging_request_data: HangingRequestData,
):
"""
Send a hanging request alert
"""
from litellm.integrations.SlackAlerting.slack_alerting import AlertType

################
# Send the Alert on Slack
################
request_info = f"""Request Model: `{hanging_request_data.model}`
API Base: `{hanging_request_data.api_base}`
Key Alias: `{hanging_request_data.key_alias}`
Team Alias: `{hanging_request_data.team_alias}`"""

alerting_message = f"`Requests are hanging - {self.slack_alerting_object.alerting_threshold}s+ request time`"
await self.slack_alerting_object.send_alert(
message=alerting_message + "\n" + request_info,
level="Medium",
alert_type=AlertType.llm_requests_hanging,
alerting_metadata=hanging_request_data.alerting_metadata or {},
)
111 changes: 14 additions & 97 deletions litellm/integrations/SlackAlerting/slack_alerting.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
from litellm.constants import HOURS_IN_A_DAY
from litellm.integrations.custom_batch_logger import CustomBatchLogger
from litellm.integrations.SlackAlerting.budget_alert_types import get_budget_alert_type
from litellm.integrations.SlackAlerting.hanging_request_check import (
AlertingHangingRequestCheck,
)
from litellm.litellm_core_utils.duration_parser import duration_in_seconds
from litellm.litellm_core_utils.exception_mapping_utils import (
_add_key_name_and_team_to_alert,
Expand All @@ -38,7 +41,7 @@

from ..email_templates.templates import *
from .batching_handler import send_to_webhook, squash_payloads
from .utils import _add_langfuse_trace_id_to_alert, process_slack_alerting_variables
from .utils import process_slack_alerting_variables

if TYPE_CHECKING:
from litellm.router import Router as _Router
Expand Down Expand Up @@ -86,6 +89,9 @@ def __init__(
self.default_webhook_url = default_webhook_url
self.flush_lock = asyncio.Lock()
self.periodic_started = False
self.hanging_request_check = AlertingHangingRequestCheck(
slack_alerting_object=self,
)
super().__init__(**kwargs, flush_lock=self.flush_lock)

def update_values(
Expand All @@ -107,10 +113,10 @@ def update_values(
self.alert_types = alert_types
if alerting_args is not None:
self.alerting_args = SlackAlertingArgs(**alerting_args)
if not self.periodic_started:
if not self.periodic_started:
asyncio.create_task(self.periodic_flush())
self.periodic_started = True

if alert_to_webhook_url is not None:
# update the dict
if self.alert_to_webhook_url is None:
Expand Down Expand Up @@ -451,106 +457,17 @@ async def send_daily_reports(self, router) -> bool: # noqa: PLR0915

async def response_taking_too_long(
self,
start_time: Optional[datetime.datetime] = None,
end_time: Optional[datetime.datetime] = None,
type: Literal["hanging_request", "slow_response"] = "hanging_request",
request_data: Optional[dict] = None,
):
if self.alerting is None or self.alert_types is None:
return
model: str = ""
if request_data is not None:
model = request_data.get("model", "")
messages = request_data.get("messages", None)
if messages is None:
# if messages does not exist fallback to "input"
messages = request_data.get("input", None)

# try casting messages to str and get the first 100 characters, else mark as None
try:
messages = str(messages)
messages = messages[:100]
except Exception:
messages = ""

if (
litellm.turn_off_message_logging
or litellm.redact_messages_in_exceptions
):
messages = (
"Message not logged. litellm.redact_messages_in_exceptions=True"
)
request_info = f"\nRequest Model: `{model}`\nMessages: `{messages}`"
else:
request_info = ""

if type == "hanging_request":
await asyncio.sleep(
self.alerting_threshold
) # Set it to 5 minutes - i'd imagine this might be different for streaming, non-streaming, non-completion (embedding + img) requests
alerting_metadata: dict = {}
if await self._request_is_completed(request_data=request_data) is True:
return

if request_data is not None:
if request_data.get("deployment", None) is not None and isinstance(
request_data["deployment"], dict
):
_api_base = litellm.get_api_base(
model=model,
optional_params=request_data["deployment"].get(
"litellm_params", {}
),
)

if _api_base is None:
_api_base = ""

request_info += f"\nAPI Base: {_api_base}"
elif request_data.get("metadata", None) is not None and isinstance(
request_data["metadata"], dict
):
# In hanging requests sometime it has not made it to the point where the deployment is passed to the `request_data``
# in that case we fallback to the api base set in the request metadata
_metadata: dict = request_data["metadata"]
_api_base = _metadata.get("api_base", "")

request_info = _add_key_name_and_team_to_alert(
request_info=request_info, metadata=_metadata
)

if _api_base is None:
_api_base = ""

if "alerting_metadata" in _metadata:
alerting_metadata = _metadata["alerting_metadata"]
request_info += f"\nAPI Base: `{_api_base}`"
# only alert hanging responses if they have not been marked as success
alerting_message = (
f"`Requests are hanging - {self.alerting_threshold}s+ request time`"
)

if "langfuse" in litellm.success_callback:
langfuse_url = await _add_langfuse_trace_id_to_alert(
request_data=request_data,
)

if langfuse_url is not None:
request_info += "\n🪢 Langfuse Trace: {}".format(langfuse_url)

# add deployment latencies to alert
_deployment_latency_map = self._get_deployment_latencies_to_alert(
metadata=request_data.get("metadata", {})
)
if _deployment_latency_map is not None:
request_info += f"\nDeployment Latencies\n{_deployment_latency_map}"
if AlertType.llm_requests_hanging not in self.alert_types:
return

await self.send_alert(
message=alerting_message + request_info,
level="Medium",
alert_type=AlertType.llm_requests_hanging,
alerting_metadata=alerting_metadata,
)
await self.hanging_request_check.add_request_to_hanging_request_check(
request_data=request_data
)

async def failed_tracking_alert(self, error_message: str, failing_model: str):
"""
Expand Down
17 changes: 17 additions & 0 deletions litellm/proxy/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,10 @@ def __init__(
self.db_spend_update_writer = DBSpendUpdateWriter()
self.proxy_hook_mapping: Dict[str, CustomLogger] = {}

# Guard flags to prevent duplicate background tasks
self.daily_report_started: bool = False
self.hanging_requests_check_started: bool = False

def startup_event(
self,
llm_router: Optional[Router],
Expand All @@ -301,12 +305,25 @@ def startup_event(
if (
self.slack_alerting_instance is not None
and "daily_reports" in self.slack_alerting_instance.alert_types
and not self.daily_report_started
):
asyncio.create_task(
self.slack_alerting_instance._run_scheduled_daily_report(
llm_router=llm_router
)
) # RUN DAILY REPORT (if scheduled)
self.daily_report_started = True

if (
self.slack_alerting_instance is not None
and AlertType.llm_requests_hanging
in self.slack_alerting_instance.alert_types
and not self.hanging_requests_check_started
):
asyncio.create_task(
self.slack_alerting_instance.hanging_request_check.check_for_hanging_requests()
) # RUN HANGING REQUEST CHECK (if user wants to alert on hanging requests)
self.hanging_requests_check_started = True

def update_values(
self,
Expand Down
Loading
Loading