Skip to content

Commit 5ccdecc

Browse files
authored
Merge pull request #438 from ther3zz/patch-3
langfuse pipeline: chatID + clearer observation names + usage + trace tagging
2 parents fb20b75 + c0a60f1 commit 5ccdecc

File tree

1 file changed

+149
-98
lines changed

1 file changed

+149
-98
lines changed
Lines changed: 149 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
"""
22
title: Langfuse Filter Pipeline
33
author: open-webui
4-
date: 2025-02-20
5-
version: 1.5
4+
date: 2025-03-28
5+
version: 1.7
66
license: MIT
77
description: A filter pipeline that uses Langfuse.
88
requirements: langfuse
@@ -20,6 +20,7 @@
2020

2121

2222
def get_last_assistant_message_obj(messages: List[dict]) -> dict:
23+
"""Retrieve the last assistant message from the message list."""
2324
for message in reversed(messages):
2425
if message["role"] == "assistant":
2526
return message
@@ -33,6 +34,10 @@ class Valves(BaseModel):
3334
secret_key: str
3435
public_key: str
3536
host: str
37+
# New valve that controls whether task names are added as tags:
38+
insert_tags: bool = True
39+
# New valve that controls whether to use model name instead of model ID for generation
40+
use_model_name_instead_of_id_for_generation: bool = False
3641
debug: bool = False
3742

3843
def __init__(self):
@@ -45,18 +50,21 @@ def __init__(self):
4550
"secret_key": os.getenv("LANGFUSE_SECRET_KEY", "your-secret-key-here"),
4651
"public_key": os.getenv("LANGFUSE_PUBLIC_KEY", "your-public-key-here"),
4752
"host": os.getenv("LANGFUSE_HOST", "https://cloud.langfuse.com"),
53+
"use_model_name_instead_of_id_for_generation": os.getenv("USE_MODEL_NAME", "false").lower() == "true",
4854
"debug": os.getenv("DEBUG_MODE", "false").lower() == "true",
4955
}
5056
)
5157

5258
self.langfuse = None
53-
# Keep track of the trace and the last-created generation for each chat_id
5459
self.chat_traces = {}
55-
self.chat_generations = {}
5660
self.suppressed_logs = set()
61+
# Dictionary to store model names for each chat
62+
self.model_names = {}
63+
64+
# Only these tasks will be treated as LLM "generations":
65+
self.GENERATION_TASKS = {"llm_response"}
5766

5867
def log(self, message: str, suppress_repeats: bool = False):
59-
"""Logs messages to the terminal if debugging is enabled."""
6068
if self.valves.debug:
6169
if suppress_repeats:
6270
if message in self.suppressed_logs:
@@ -96,47 +104,44 @@ def set_langfuse(self):
96104
f"Langfuse error: {e} Please re-enter your Langfuse credentials in the pipeline settings."
97105
)
98106

99-
async def inlet(self, body: dict, user: Optional[dict] = None) -> dict:
107+
def _build_tags(self, task_name: str) -> list:
100108
"""
101-
Inlet handles the incoming request (usually a user message).
102-
- If no trace exists yet for this chat_id, we create a new trace.
103-
- If a trace does exist, we simply create a new generation for the new user message.
109+
Builds a list of tags based on valve settings, ensuring we always add
110+
'open-webui' and skip user_response / llm_response from becoming tags themselves.
104111
"""
112+
tags_list = []
113+
if self.valves.insert_tags:
114+
# Always add 'open-webui'
115+
tags_list.append("open-webui")
116+
# Add the task_name if it's not one of the excluded defaults
117+
if task_name not in ["user_response", "llm_response"]:
118+
tags_list.append(task_name)
119+
return tags_list
120+
121+
async def inlet(self, body: dict, user: Optional[dict] = None) -> dict:
105122
if self.valves.debug:
106123
print(f"[DEBUG] Received request: {json.dumps(body, indent=2)}")
107124

108125
self.log(f"Inlet function called with body: {body} and user: {user}")
109126

110127
metadata = body.get("metadata", {})
128+
chat_id = metadata.get("chat_id", str(uuid.uuid4()))
129+
metadata["chat_id"] = chat_id
130+
body["metadata"] = metadata
111131

112-
# ---------------------------------------------------------
113-
# Prepend the system prompt from metadata to the system message:
132+
# Extract and store both model name and ID if available
114133
model_info = metadata.get("model", {})
115-
params_info = model_info.get("params", {})
116-
system_prompt = params_info.get("system", "")
117-
118-
if system_prompt:
119-
for msg in body["messages"]:
120-
if msg.get("role") == "system":
121-
# Only prepend if it hasn't already been prepended:
122-
if not msg["content"].startswith("System Prompt:"):
123-
msg["content"] = f"System Prompt:\n{system_prompt}\n\n{msg['content']}"
124-
break
125-
# ---------------------------------------------------------
126-
127-
# Fix SYSTEM MESSAGE prefix issue: Only apply for "task_generation"
128-
if "chat_id" not in metadata:
129-
if "task_generation" in metadata.get("type", "").lower():
130-
chat_id = f"SYSTEM MESSAGE {uuid.uuid4()}"
131-
self.log(f"Task Generation detected, assigned SYSTEM MESSAGE ID: {chat_id}")
132-
else:
133-
chat_id = str(uuid.uuid4()) # Regular chat messages
134-
self.log(f"Assigned normal chat_id: {chat_id}")
135-
136-
metadata["chat_id"] = chat_id
137-
body["metadata"] = metadata
134+
model_id = body.get("model")
135+
136+
# Store model information for this chat
137+
if chat_id not in self.model_names:
138+
self.model_names[chat_id] = {"id": model_id}
138139
else:
139-
chat_id = metadata["chat_id"]
140+
self.model_names[chat_id]["id"] = model_id
141+
142+
if isinstance(model_info, dict) and "name" in model_info:
143+
self.model_names[chat_id]["name"] = model_info["name"]
144+
self.log(f"Stored model info - name: '{model_info['name']}', id: '{model_id}' for chat_id: {chat_id}")
140145

141146
required_keys = ["model", "messages"]
142147
missing_keys = [key for key in required_keys if key not in body]
@@ -146,100 +151,108 @@ async def inlet(self, body: dict, user: Optional[dict] = None) -> dict:
146151
raise ValueError(error_message)
147152

148153
user_email = user.get("email") if user else None
154+
# Defaulting to 'user_response' if no task is provided
155+
task_name = metadata.get("task", "user_response")
156+
157+
# Build tags
158+
tags_list = self._build_tags(task_name)
149159

150-
# Check if we already have a trace for this chat
151160
if chat_id not in self.chat_traces:
152-
# Create a new trace and generation
153-
self.log(f"Creating new chat trace for chat_id: {chat_id}")
161+
self.log(f"Creating new trace for chat_id: {chat_id}")
154162

155163
trace_payload = {
156-
"name": f"filter:{__name__}",
164+
"name": f"chat:{chat_id}",
157165
"input": body,
158166
"user_id": user_email,
159-
"metadata": {"chat_id": chat_id},
167+
"metadata": metadata,
160168
"session_id": chat_id,
161169
}
162170

171+
if tags_list:
172+
trace_payload["tags"] = tags_list
173+
163174
if self.valves.debug:
164175
print(f"[DEBUG] Langfuse trace request: {json.dumps(trace_payload, indent=2)}")
165176

166177
trace = self.langfuse.trace(**trace_payload)
167-
178+
self.chat_traces[chat_id] = trace
179+
else:
180+
trace = self.chat_traces[chat_id]
181+
self.log(f"Reusing existing trace for chat_id: {chat_id}")
182+
if tags_list:
183+
trace.update(tags=tags_list)
184+
185+
# Update metadata with type
186+
metadata["type"] = task_name
187+
metadata["interface"] = "open-webui"
188+
189+
# If it's a task that is considered an LLM generation
190+
if task_name in self.GENERATION_TASKS:
191+
# Determine which model value to use based on the use_model_name valve
192+
model_id = self.model_names.get(chat_id, {}).get("id", body["model"])
193+
model_name = self.model_names.get(chat_id, {}).get("name", "unknown")
194+
195+
# Pick primary model identifier based on valve setting
196+
model_value = model_name if self.valves.use_model_name_instead_of_id_for_generation else model_id
197+
198+
# Add both values to metadata regardless of valve setting
199+
metadata["model_id"] = model_id
200+
metadata["model_name"] = model_name
201+
168202
generation_payload = {
169-
"name": chat_id,
170-
"model": body["model"],
203+
"name": f"{task_name}:{str(uuid.uuid4())}",
204+
"model": model_value,
171205
"input": body["messages"],
172-
"metadata": {"interface": "open-webui"},
206+
"metadata": metadata,
173207
}
208+
if tags_list:
209+
generation_payload["tags"] = tags_list
174210

175211
if self.valves.debug:
176212
print(f"[DEBUG] Langfuse generation request: {json.dumps(generation_payload, indent=2)}")
177213

178-
generation = trace.generation(**generation_payload)
179-
180-
self.chat_traces[chat_id] = trace
181-
self.chat_generations[chat_id] = generation
182-
self.log(f"Trace and generation objects successfully created for chat_id: {chat_id}")
183-
214+
trace.generation(**generation_payload)
184215
else:
185-
# Re-use existing trace but create a new generation for each new message
186-
self.log(f"Re-using existing chat trace for chat_id: {chat_id}")
187-
trace = self.chat_traces[chat_id]
188-
189-
new_generation_payload = {
190-
"name": f"{chat_id}:{str(uuid.uuid4())}",
191-
"model": body["model"],
216+
# Otherwise, log it as an event
217+
event_payload = {
218+
"name": f"{task_name}:{str(uuid.uuid4())}",
219+
"metadata": metadata,
192220
"input": body["messages"],
193-
"metadata": {"interface": "open-webui"},
194221
}
222+
if tags_list:
223+
event_payload["tags"] = tags_list
224+
195225
if self.valves.debug:
196-
print(f"[DEBUG] Langfuse new_generation request: {json.dumps(new_generation_payload, indent=2)}")
226+
print(f"[DEBUG] Langfuse event request: {json.dumps(event_payload, indent=2)}")
197227

198-
new_generation = trace.generation(**new_generation_payload)
199-
self.chat_generations[chat_id] = new_generation
228+
trace.event(**event_payload)
200229

201230
return body
202231

203232
async def outlet(self, body: dict, user: Optional[dict] = None) -> dict:
204-
"""
205-
Outlet handles the response body (usually the assistant message).
206-
It will finalize/end the generation created for the user request.
207-
"""
208233
self.log(f"Outlet function called with body: {body}")
209234

210235
chat_id = body.get("chat_id")
236+
metadata = body.get("metadata", {})
237+
# Defaulting to 'llm_response' if no task is provided
238+
task_name = metadata.get("task", "llm_response")
211239

212-
# If no trace or generation exist, attempt to register again
213-
if chat_id not in self.chat_traces or chat_id not in self.chat_generations:
214-
self.log(f"[WARNING] No matching chat trace found for chat_id: {chat_id}, attempting to re-register.")
240+
# Build tags
241+
tags_list = self._build_tags(task_name)
242+
243+
if chat_id not in self.chat_traces:
244+
self.log(f"[WARNING] No matching trace found for chat_id: {chat_id}, attempting to re-register.")
245+
# Re-run inlet to register if somehow missing
215246
return await self.inlet(body, user)
216247

217248
trace = self.chat_traces[chat_id]
218-
generation = self.chat_generations[chat_id]
219249

220-
# Get the last assistant message from the conversation
221250
assistant_message = get_last_assistant_message(body["messages"])
222251
assistant_message_obj = get_last_assistant_message_obj(body["messages"])
223252

224-
# ---------------------------------------------------------
225-
# If the outlet contains a sources array, append it after the "System Prompt:"
226-
# section in the system message:
227-
if assistant_message_obj and "sources" in assistant_message_obj and assistant_message_obj["sources"]:
228-
for msg in body["messages"]:
229-
if msg.get("role") == "system":
230-
if msg["content"].startswith("System Prompt:"):
231-
# Format the sources nicely
232-
sources_str = "\n\n".join(
233-
json.dumps(src, indent=2) for src in assistant_message_obj["sources"]
234-
)
235-
msg["content"] += f"\n\nSources:\n{sources_str}"
236-
break
237-
# ---------------------------------------------------------
238-
239-
# Extract usage if available
240253
usage = None
241254
if assistant_message_obj:
242-
info = assistant_message_obj.get("info", {})
255+
info = assistant_message_obj.get("usage", {})
243256
if isinstance(info, dict):
244257
input_tokens = info.get("prompt_eval_count") or info.get("prompt_tokens")
245258
output_tokens = info.get("eval_count") or info.get("completion_tokens")
@@ -251,20 +264,58 @@ async def outlet(self, body: dict, user: Optional[dict] = None) -> dict:
251264
}
252265
self.log(f"Usage data extracted: {usage}")
253266

254-
# Optionally update the trace with the final assistant output
267+
# Update the trace output with the last assistant message
255268
trace.update(output=assistant_message)
256269

257-
# End the generation with the final assistant message and updated conversation
258-
generation_payload = {
259-
"input": body["messages"], # include the entire conversation
260-
"metadata": {"interface": "open-webui"},
261-
"usage": usage,
262-
}
270+
metadata["type"] = task_name
271+
metadata["interface"] = "open-webui"
272+
273+
if task_name in self.GENERATION_TASKS:
274+
# Determine which model value to use based on the use_model_name valve
275+
model_id = self.model_names.get(chat_id, {}).get("id", body.get("model"))
276+
model_name = self.model_names.get(chat_id, {}).get("name", "unknown")
277+
278+
# Pick primary model identifier based on valve setting
279+
model_value = model_name if self.valves.use_model_name_instead_of_id_for_generation else model_id
280+
281+
# Add both values to metadata regardless of valve setting
282+
metadata["model_id"] = model_id
283+
metadata["model_name"] = model_name
284+
285+
# If it's an LLM generation
286+
generation_payload = {
287+
"name": f"{task_name}:{str(uuid.uuid4())}",
288+
"model": model_value, # <-- Use model name or ID based on valve setting
289+
"input": body["messages"],
290+
"metadata": metadata,
291+
"usage": usage,
292+
}
293+
if tags_list:
294+
generation_payload["tags"] = tags_list
263295

264-
if self.valves.debug:
265-
print(f"[DEBUG] Langfuse generation end request: {json.dumps(generation_payload, indent=2)}")
296+
if self.valves.debug:
297+
print(f"[DEBUG] Langfuse generation end request: {json.dumps(generation_payload, indent=2)}")
298+
299+
trace.generation().end(**generation_payload)
300+
self.log(f"Generation ended for chat_id: {chat_id}")
301+
else:
302+
# Otherwise log as an event
303+
event_payload = {
304+
"name": f"{task_name}:{str(uuid.uuid4())}",
305+
"metadata": metadata,
306+
"input": body["messages"],
307+
}
308+
if usage:
309+
# If you want usage on event as well
310+
event_payload["metadata"]["usage"] = usage
311+
312+
if tags_list:
313+
event_payload["tags"] = tags_list
314+
315+
if self.valves.debug:
316+
print(f"[DEBUG] Langfuse event end request: {json.dumps(event_payload, indent=2)}")
266317

267-
generation.end(**generation_payload)
268-
self.log(f"Generation ended for chat_id: {chat_id}")
318+
trace.event(**event_payload)
319+
self.log(f"Event logged for chat_id: {chat_id}")
269320

270321
return body

0 commit comments

Comments
 (0)