Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,11 @@ src/langbot/web/
/dist
/build
*.egg-info

# Docker 部署产生的本地文件
docker/data/
docker/docker-compose.override.yaml

# 备份目录
LangBot_backup_*/
*.bak
8 changes: 6 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,18 @@ FROM python:3.12.7-slim

WORKDIR /app

# Use Chinese mirror for faster and more reliable package downloads
RUN sed -i 's|deb.debian.org|mirrors.aliyun.com|g' /etc/apt/sources.list.d/debian.sources 2>/dev/null || \
sed -i 's|deb.debian.org|mirrors.aliyun.com|g' /etc/apt/sources.list 2>/dev/null || true

COPY . .

COPY --from=node /app/web/out ./web/out

RUN apt update \
&& apt install gcc -y \
&& python -m pip install --no-cache-dir uv \
&& uv sync \
&& python -m pip install --no-cache-dir -i https://pypi.tuna.tsinghua.edu.cn/simple uv \
&& uv sync --index-url https://pypi.tuna.tsinghua.edu.cn/simple \
&& touch /.dockerenv

CMD [ "uv", "run", "--no-sync", "main.py" ]
2 changes: 1 addition & 1 deletion docker/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,4 @@ services:

networks:
langbot_network:
driver: bridge
driver: bridge
141 changes: 134 additions & 7 deletions src/langbot/libs/wecom_ai_bot_api/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ class StreamSession:
# 缓存最近一次片段,处理重试或超时兜底
last_chunk: Optional[StreamChunk] = None

# 反馈 ID,用于接收用户点赞/点踩反馈
feedback_id: Optional[str] = None


class StreamSessionManager:
"""管理 stream 会话的生命周期,并负责队列的生产消费。"""
Expand All @@ -74,6 +77,7 @@ def __init__(self, logger: EventLogger, ttl: int = 60) -> None:
self.ttl = ttl # 超时时间(秒),超过该时间未被访问的会话会被清理由 cleanup
self._sessions: dict[str, StreamSession] = {} # stream_id -> StreamSession 映射
self._msg_index: dict[str, str] = {} # msgid -> stream_id 映射,便于流水线根据消息 ID 找到会话
self._feedback_index: dict[str, str] = {} # feedback_id -> stream_id 映射

def get_stream_id_by_msg(self, msg_id: str) -> Optional[str]:
if not msg_id:
Expand All @@ -83,6 +87,32 @@ def get_stream_id_by_msg(self, msg_id: str) -> Optional[str]:
def get_session(self, stream_id: str) -> Optional[StreamSession]:
return self._sessions.get(stream_id)

def get_session_by_feedback_id(self, feedback_id: str) -> Optional[StreamSession]:
"""根据 feedback_id 查找会话。

Args:
feedback_id: 企业微信反馈事件中的反馈 ID。

Returns:
Optional[StreamSession]: 找到的会话实例,未找到返回 None。
"""
if not feedback_id:
return None
stream_id = self._feedback_index.get(feedback_id)
if stream_id:
return self._sessions.get(stream_id)
return None

def register_feedback_id(self, stream_id: str, feedback_id: str) -> None:
"""注册 feedback_id 与 stream_id 的映射。

Args:
stream_id: 企业微信流式会话 ID。
feedback_id: 反馈 ID。
"""
if feedback_id and stream_id:
self._feedback_index[feedback_id] = stream_id

def create_or_get(self, msg_json: dict[str, Any]) -> tuple[StreamSession, bool]:
"""根据企业微信回调创建或获取会话。

Expand Down Expand Up @@ -597,28 +627,44 @@ def __init__(self, Token: str, EnCodingAESKey: str, Corpid: str, logger: EventLo
self.stream_sessions = StreamSessionManager(logger=logger)
self.stream_poll_timeout = 0.5

self._feedback_callback: Optional[Callable] = None

def set_feedback_callback(self, callback: Callable) -> None:
"""设置反馈回调函数。

Args:
callback: 反馈回调函数,签名: async def callback(feedback_id, feedback_type, feedback_content, inaccurate_reasons, session)
"""
self._feedback_callback = callback

@staticmethod
def _build_stream_payload(stream_id: str, content: str, finish: bool) -> dict[str, Any]:
def _build_stream_payload(
stream_id: str, content: str, finish: bool, feedback_id: Optional[str] = None
) -> dict[str, Any]:
"""按照企业微信协议拼装返回报文。

Args:
stream_id: 企业微信会话 ID。
content: 推送的文本内容。
finish: 是否为最终片段。
feedback_id: 反馈 ID,用于接收用户点赞/点踩反馈。

Returns:
dict[str, Any]: 可直接加密返回的 payload。

Example:
组装 `{'msgtype': 'stream', 'stream': {'id': 'sid', ...}}` 结构。
"""
stream_payload = {
'id': stream_id,
'finish': finish,
'content': content,
}
if feedback_id:
stream_payload['feedback'] = {'id': feedback_id}
return {
'msgtype': 'stream',
'stream': {
'id': stream_id,
'finish': finish,
'content': content,
},
'stream': stream_payload,
}

async def _encrypt_and_reply(self, payload: dict[str, Any], nonce: str) -> tuple[Response, int]:
Expand Down Expand Up @@ -674,9 +720,14 @@ async def _handle_post_initial_response(self, msg_json: dict[str, Any], nonce: s
"""
session, is_new = self.stream_sessions.create_or_get(msg_json)

feedback_id = str(uuid.uuid4())
session.feedback_id = feedback_id
self.stream_sessions.register_feedback_id(session.stream_id, feedback_id)

message_data = await self.get_message(msg_json)
if message_data:
message_data['stream_id'] = session.stream_id
message_data['feedback_id'] = feedback_id
try:
event = wecombotevent.WecomBotEvent(message_data)
except Exception:
Expand All @@ -685,7 +736,7 @@ async def _handle_post_initial_response(self, msg_json: dict[str, Any], nonce: s
if is_new:
asyncio.create_task(self._dispatch_event(event))

payload = self._build_stream_payload(session.stream_id, '', False)
payload = self._build_stream_payload(session.stream_id, '', False, feedback_id)
return await self._encrypt_and_reply(payload, nonce)

async def _handle_post_followup_response(self, msg_json: dict[str, Any], nonce: str) -> tuple[Response, int]:
Expand Down Expand Up @@ -810,11 +861,78 @@ async def _handle_post_callback(self, req) -> tuple[Response, int] | Response:

msg_json = json.loads(decrypted_xml)

event = msg_json.get('event', {})
event_type = event.get('eventtype', '')

if event_type == 'feedback_event':
return await self._handle_feedback_event(msg_json, nonce)

if msg_json.get('msgtype') == 'stream':
return await self._handle_post_followup_response(msg_json, nonce)

return await self._handle_post_initial_response(msg_json, nonce)

async def _handle_feedback_event(self, msg_json: dict[str, Any], nonce: str) -> tuple[Response, int]:
"""处理企业微信用户反馈事件(点赞/点踩)。

Args:
msg_json: 解密后的企业微信反馈事件 JSON。
nonce: 企业微信回调参数 nonce。

Returns:
Tuple[Response, int]: Quart Response 及状态码。

Note:
企业微信协议要求:反馈事件目前仅支持回复空包。
"""
try:
feedback_event = msg_json.get('event', {}).get('feedback_event', {})
feedback_id = feedback_event.get('id', '')
feedback_type = feedback_event.get('type', 0)
feedback_content = feedback_event.get('content', '')
inaccurate_reasons = feedback_event.get('inaccurate_reason_list', [])

await self.logger.info(
f'收到用户反馈事件: feedback_id={feedback_id}, type={feedback_type}, '
f'content={feedback_content}, reasons={inaccurate_reasons}'
)

session = self.stream_sessions.get_session_by_feedback_id(feedback_id)
if session:
await self.logger.info(
f'反馈关联到会话: stream_id={session.stream_id}, msg_id={session.msg_id}, user_id={session.user_id}'
)
for handler in self._message_handlers.get('feedback', []):
try:
await handler(
feedback_id=feedback_id,
feedback_type=feedback_type,
feedback_content=feedback_content,
inaccurate_reasons=inaccurate_reasons,
session=session,
)
except Exception:
await self.logger.error(traceback.format_exc())

if self._feedback_callback:
try:
await self._feedback_callback(
feedback_id=feedback_id,
feedback_type=feedback_type,
feedback_content=feedback_content,
inaccurate_reasons=inaccurate_reasons,
session=session,
)
except Exception:
await self.logger.error(traceback.format_exc())
else:
await self.logger.warning(f'未找到 feedback_id={feedback_id} 对应的会话')

except Exception:
await self.logger.error(traceback.format_exc())

return await self._encrypt_and_reply({}, nonce)

async def get_message(self, msg_json):
return await parse_wecom_bot_message(msg_json, self.EnCodingAESKey, self.logger)

Expand Down Expand Up @@ -883,6 +1001,15 @@ def decorator(func: Callable[[wecombotevent.WecomBotEvent], None]):

return decorator

def on_feedback(self):
def decorator(func: Callable):
if 'feedback' not in self._message_handlers:
self._message_handlers['feedback'] = []
self._message_handlers['feedback'].append(func)
return func

return decorator

async def download_url_to_base64(self, download_url, encoding_aes_key):
data, _filename = await download_encrypted_file(download_url, encoding_aes_key, self.logger)
if data:
Expand Down
14 changes: 14 additions & 0 deletions src/langbot/libs/wecom_ai_bot_api/wecombotevent.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,17 @@ def ai_bot_id(self) -> str:
AI Bot ID
"""
return self.get('aibotid', '')

@property
def feedback_id(self) -> str:
"""
反馈 ID,用于关联用户点赞/点踩反馈
"""
return self.get('feedback_id', '')

@property
def stream_id(self) -> str:
"""
流式消息 ID
"""
return self.get('stream_id', '')
85 changes: 85 additions & 0 deletions src/langbot/pkg/api/http/controller/groups/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,31 @@ async def export_data() -> tuple[str, int]:
'platform',
'user_id',
]
elif export_type == 'feedback':
data = await self.ap.monitoring_service.export_feedback(
bot_ids=bot_ids if bot_ids else None,
pipeline_ids=pipeline_ids if pipeline_ids else None,
start_time=start_time,
end_time=end_time,
limit=limit,
)
headers = [
'id',
'timestamp',
'feedback_id',
'feedback_type',
'feedback_content',
'inaccurate_reasons',
'bot_id',
'bot_name',
'pipeline_id',
'pipeline_name',
'session_id',
'message_id',
'stream_id',
'user_id',
'platform',
]
else:
return self.error(message=f'Invalid export type: {export_type}', code=400)

Expand Down Expand Up @@ -486,3 +511,63 @@ async def export_data() -> tuple[str, int]:
)

return response, 200

@self.route('/feedback/stats', methods=['GET'], auth_type=group.AuthType.USER_TOKEN)
async def get_feedback_stats() -> str:
"""Get feedback statistics"""
# Parse query parameters
bot_ids = quart.request.args.getlist('botId')
pipeline_ids = quart.request.args.getlist('pipelineId')
start_time_str = quart.request.args.get('startTime')
end_time_str = quart.request.args.get('endTime')

# Parse datetime
start_time = parse_iso_datetime(start_time_str)
end_time = parse_iso_datetime(end_time_str)

stats = await self.ap.monitoring_service.get_feedback_stats(
bot_ids=bot_ids if bot_ids else None,
pipeline_ids=pipeline_ids if pipeline_ids else None,
start_time=start_time,
end_time=end_time,
)

return self.success(data=stats)

@self.route('/feedback', methods=['GET'], auth_type=group.AuthType.USER_TOKEN)
async def get_feedback() -> str:
"""Get feedback list"""
# Parse query parameters
bot_ids = quart.request.args.getlist('botId')
pipeline_ids = quart.request.args.getlist('pipelineId')
feedback_type_str = quart.request.args.get('feedbackType')
start_time_str = quart.request.args.get('startTime')
end_time_str = quart.request.args.get('endTime')
limit = int(quart.request.args.get('limit', 100))
offset = int(quart.request.args.get('offset', 0))

# Parse datetime
start_time = parse_iso_datetime(start_time_str)
end_time = parse_iso_datetime(end_time_str)

# Parse feedback type
feedback_type = int(feedback_type_str) if feedback_type_str else None

feedback_list, total = await self.ap.monitoring_service.get_feedback_list(
bot_ids=bot_ids if bot_ids else None,
pipeline_ids=pipeline_ids if pipeline_ids else None,
feedback_type=feedback_type,
start_time=start_time,
end_time=end_time,
limit=limit,
offset=offset,
)

return self.success(
data={
'feedback': feedback_list,
'total': total,
'limit': limit,
'offset': offset,
}
)
Loading
Loading