Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions src/langbot/pkg/api/http/controller/groups/plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,8 @@ async def _() -> str:
return self.http_status(400, -1, 'Missing asset_url parameter')

ctx = taskmgr.TaskContext.new()
ctx.metadata['plugin_name'] = f'{owner}/{repo}'
ctx.metadata['install_source'] = 'github'
install_info = {
'asset_url': asset_url,
'owner': owner,
Expand Down Expand Up @@ -295,12 +297,17 @@ async def _() -> str:

data = await quart.request.json

plugin_author = data.get('plugin_author', '')
plugin_name = data.get('plugin_name', '')

ctx = taskmgr.TaskContext.new()
ctx.metadata['plugin_name'] = f'{plugin_author}/{plugin_name}'
ctx.metadata['install_source'] = 'marketplace'
wrapper = self.ap.task_mgr.create_user_task(
self.ap.plugin_connector.install_plugin(PluginInstallSource.MARKETPLACE, data, task_context=ctx),
kind='plugin-operation',
name='plugin-install-marketplace',
label=f'Installing plugin from marketplace ...{data}',
label=f'Installing plugin from marketplace {plugin_author}/{plugin_name}',
context=ctx,
)

Expand All @@ -323,11 +330,13 @@ async def _() -> str:
}

ctx = taskmgr.TaskContext.new()
ctx.metadata['plugin_name'] = file.filename or 'local plugin'
ctx.metadata['install_source'] = 'local'
wrapper = self.ap.task_mgr.create_user_task(
self.ap.plugin_connector.install_plugin(PluginInstallSource.LOCAL, data, task_context=ctx),
kind='plugin-operation',
name='plugin-install-local',
label=f'Installing plugin from local ...{file.filename}',
label=f'Installing plugin from local {file.filename}',
context=ctx,
)

Expand Down
5 changes: 4 additions & 1 deletion src/langbot/pkg/api/http/controller/groups/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,14 @@ async def _() -> str:
@self.route('/tasks', methods=['GET'], auth_type=group.AuthType.USER_TOKEN)
async def _() -> str:
task_type = quart.request.args.get('type')
task_kind = quart.request.args.get('kind')

if task_type == '':
task_type = None
if task_kind == '':
task_kind = None

return self.success(data=self.ap.task_mgr.get_tasks_dict(task_type))
return self.success(data=self.ap.task_mgr.get_tasks_dict(task_type, task_kind))

@self.route('/tasks/<task_id>', methods=['GET'], auth_type=group.AuthType.USER_TOKEN)
async def _(task_id: str) -> str:
Expand Down
13 changes: 11 additions & 2 deletions src/langbot/pkg/core/taskmgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@ class TaskContext:
log: str
"""Log"""

metadata: dict
"""Structured metadata for progress reporting"""

def __init__(self):
self.current_action = 'default'
self.log = ''
self.metadata = {}

def _log(self, msg: str):
self.log += msg + '\n'
Expand All @@ -38,7 +42,7 @@ def trace(
self._log(f'{datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")} | {self.current_action} | {msg}')

def to_dict(self) -> dict:
return {'current_action': self.current_action, 'log': self.log}
return {'current_action': self.current_action, 'log': self.log, 'metadata': self.metadata}

@staticmethod
def new() -> TaskContext:
Expand Down Expand Up @@ -211,9 +215,14 @@ def get_all_tasks(self) -> list[TaskWrapper]:
def get_tasks_dict(
self,
type: str = None,
kind: str = None,
) -> dict:
return {
'tasks': [t.to_dict() for t in self.tasks if type is None or t.task_type == type],
'tasks': [
t.to_dict()
for t in self.tasks
if (type is None or t.task_type == type) and (kind is None or t.kind == kind)
],
'id_index': TaskWrapper._id_index,
}

Expand Down
67 changes: 60 additions & 7 deletions src/langbot/pkg/plugin/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
from __future__ import annotations

import asyncio
import io
import time
import zipfile
from typing import Any
import typing
import os
Expand Down Expand Up @@ -192,6 +195,30 @@ async def ping_plugin_runtime(self):

return await self.handler.ping()

def _extract_deps_metadata(
self,
file_bytes: bytes,
task_context: taskmgr.TaskContext | None,
):
"""Extract dependency count from requirements.txt inside plugin zip."""
if task_context is None:
return
try:
with zipfile.ZipFile(io.BytesIO(file_bytes)) as zf:
for name in zf.namelist():
if name.endswith('requirements.txt'):
content = zf.read(name).decode('utf-8', errors='ignore')
deps = [
line.strip()
for line in content.splitlines()
if line.strip() and not line.strip().startswith('#')
]
task_context.metadata['deps_total'] = len(deps)
task_context.metadata['deps_list'] = deps
break
except Exception:
pass

async def install_plugin(
self,
install_source: PluginInstallSource,
Expand All @@ -201,23 +228,44 @@ async def install_plugin(
if install_source == PluginInstallSource.LOCAL:
# transfer file before install
file_bytes = install_info['plugin_file']
self._extract_deps_metadata(file_bytes, task_context)
file_key = await self.handler.send_file(file_bytes, 'lbpkg')
install_info['plugin_file_key'] = file_key
del install_info['plugin_file']
self.ap.logger.info(f'Transfered file {file_key} to plugin runtime')
elif install_source == PluginInstallSource.GITHUB:
# download and transfer file
# download and transfer file with streaming progress
try:
async with httpx.AsyncClient(
trust_env=True,
follow_redirects=True,
timeout=20,
timeout=60,
) as client:
response = await client.get(
install_info['asset_url'],
)
response.raise_for_status()
file_bytes = response.content
async with client.stream('GET', install_info['asset_url']) as response:
response.raise_for_status()
total = int(response.headers.get('content-length', 0))
downloaded = 0
chunks: list[bytes] = []
start_time = time.time()

if task_context is not None:
task_context.set_current_action('downloading plugin package')
task_context.metadata['download_total'] = total
task_context.metadata['download_current'] = 0
task_context.metadata['download_speed'] = 0

async for chunk in response.aiter_bytes(chunk_size=8192):
chunks.append(chunk)
downloaded += len(chunk)

if task_context is not None:
elapsed = time.time() - start_time
task_context.metadata['download_current'] = downloaded
task_context.metadata['download_total'] = total
task_context.metadata['download_speed'] = downloaded / elapsed if elapsed > 0 else 0

file_bytes = b''.join(chunks)
self._extract_deps_metadata(file_bytes, task_context)
file_key = await self.handler.send_file(file_bytes, 'lbpkg')
install_info['plugin_file_key'] = file_key
self.ap.logger.info(f'Transfered file {file_key} to plugin runtime')
Expand All @@ -236,6 +284,11 @@ async def install_plugin(
if task_context is not None:
task_context.trace(trace)

# Forward structured metadata from runtime
metadata = ret.get('metadata', None)
if metadata is not None and task_context is not None:
task_context.metadata.update(metadata)

async def upgrade_plugin(
self,
plugin_author: str,
Expand Down
2 changes: 1 addition & 1 deletion web/.env.example
Original file line number Diff line number Diff line change
@@ -1 +1 @@
NEXT_PUBLIC_API_BASE_URL=http://localhost:5300
NEXT_PUBLIC_API_BASE_URL=http://192.168.1.97:5300
1 change: 1 addition & 0 deletions web/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
"@radix-ui/react-hover-card": "^1.1.13",
"@radix-ui/react-label": "^2.1.6",
"@radix-ui/react-popover": "^1.1.14",
"@radix-ui/react-progress": "^1.1.8",
"@radix-ui/react-scroll-area": "^1.2.9",
"@radix-ui/react-select": "^2.2.4",
"@radix-ui/react-separator": "^1.1.8",
Expand Down
Loading
Loading