Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 170 additions & 1 deletion sentry_sdk/integrations/langchain.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,173 @@
"top_p": SPANDATA.GEN_AI_REQUEST_TOP_P,
}

# Map LangChain content types to Sentry modalities
LANGCHAIN_TYPE_TO_MODALITY = {
"image": "image",
"image_url": "image",
"audio": "audio",
"video": "video",
"file": "document",
}


def _transform_langchain_content_block(
content_block: "Dict[str, Any]",
) -> "Dict[str, Any]":
"""
Transform a LangChain content block to Sentry-compatible format.

Handles multimodal content (images, audio, video, documents) by converting them
to the standardized format:
- base64 encoded data -> type: "blob"
- URL references -> type: "uri"
- file_id references -> type: "file"

Supports multiple content block formats:
- LangChain standard: type + base64/url/file_id fields
- OpenAI legacy: image_url with nested url field
- Anthropic: type + source dict with type/media_type/data or url
- Google: inline_data or file_data dicts
"""
if not isinstance(content_block, dict):
return content_block

block_type = content_block.get("type")

# Handle standard multimodal content types (image, audio, video, file)
if block_type in ("image", "audio", "video", "file"):
modality = LANGCHAIN_TYPE_TO_MODALITY.get(block_type, block_type)
mime_type = content_block.get("mime_type", "")

# Check for base64 encoded content
if "base64" in content_block:
return {
"type": "blob",
"modality": modality,
"mime_type": mime_type,
"content": content_block.get("base64", ""),
}
# Check for URL reference
elif "url" in content_block:
return {
"type": "uri",
"modality": modality,
"mime_type": mime_type,
"uri": content_block.get("url", ""),
}
# Check for file_id reference
elif "file_id" in content_block:
return {
"type": "file",
"modality": modality,
"mime_type": mime_type,
"file_id": content_block.get("file_id", ""),
}
# Handle Anthropic-style format with nested "source" dict
elif "source" in content_block:
source = content_block.get("source", {})
if isinstance(source, dict):
source_type = source.get("type")
media_type = source.get("media_type", "") or mime_type

if source_type == "base64":
return {
"type": "blob",
"modality": modality,
"mime_type": media_type,
"content": source.get("data", ""),
}
elif source_type == "url":
return {
"type": "uri",
"modality": modality,
"mime_type": media_type,
"uri": source.get("url", ""),
}

# Handle legacy image_url format (OpenAI style)
Comment thread
constantinius marked this conversation as resolved.
Outdated
elif block_type == "image_url":
image_url_data = content_block.get("image_url", {})
if isinstance(image_url_data, dict):
url = image_url_data.get("url", "")
else:
url = str(image_url_data)

# Check if it's a data URI (base64 encoded)
if url and url.startswith("data:"):
# Parse data URI: data:mime_type;base64,content
try:
# Format: data:image/jpeg;base64,/9j/4AAQ...
header, content = url.split(",", 1)
mime_type = header.split(":")[1].split(";")[0] if ":" in header else ""
Comment thread
constantinius marked this conversation as resolved.
Outdated
return {
"type": "blob",
"modality": "image",
"mime_type": mime_type,
"content": content,
}
except (ValueError, IndexError):
# If parsing fails, return as URI
return {
"type": "uri",
"modality": "image",
"mime_type": "",
"uri": url,
}
else:
# Regular URL
return {
"type": "uri",
"modality": "image",
"mime_type": "",
"uri": url,
}

# Handle Google-style inline_data format
if "inline_data" in content_block:
inline_data = content_block.get("inline_data", {})
if isinstance(inline_data, dict):
return {
"type": "blob",
Comment thread
constantinius marked this conversation as resolved.
Outdated
"modality": "image",
"mime_type": inline_data.get("mime_type", ""),
"content": inline_data.get("data", ""),
}

Comment thread
sentry[bot] marked this conversation as resolved.
Outdated
# Handle Google-style file_data format
if "file_data" in content_block:
file_data = content_block.get("file_data", {})
if isinstance(file_data, dict):
return {
"type": "uri",
"modality": "image",
"mime_type": file_data.get("mime_type", ""),
"uri": file_data.get("file_uri", ""),
}
Comment thread
constantinius marked this conversation as resolved.
Outdated

# For text blocks and other types, return as-is
return content_block


def _transform_langchain_message_content(content: "Any") -> "Any":
"""
Transform LangChain message content, handling both string content and
list of content blocks.
"""
if isinstance(content, str):
return content

if isinstance(content, (list, tuple)):
transformed = []
for block in content:
if isinstance(block, dict):
transformed.append(_transform_langchain_content_block(block))
else:
transformed.append(block)
return transformed

return content


# Contextvar to track agent names in a stack for re-entrant agent support
_agent_stack: "contextvars.ContextVar[Optional[List[Optional[str]]]]" = (
Expand Down Expand Up @@ -234,7 +401,9 @@ def _handle_error(self, run_id: "UUID", error: "Any") -> None:
del self.span_map[run_id]

def _normalize_langchain_message(self, message: "BaseMessage") -> "Any":
parsed = {"role": message.type, "content": message.content}
# Transform content to handle multimodal data (images, audio, video, files)
transformed_content = _transform_langchain_message_content(message.content)
parsed = {"role": message.type, "content": transformed_content}
parsed.update(message.additional_kwargs)
return parsed

Expand Down
Loading
Loading