-
Notifications
You must be signed in to change notification settings - Fork 67
Open
Labels
Description
Implement Advanced MCP Features
Feature Description
Implement optional MCP protocol features that enhance the user experience for long-running operations and large datasets.
Features to Implement
1. Progress Tracking
For operations that take significant time, provide real-time progress updates.
Implementation
# In tools.py or new progress.py module
class ProgressTracker:
"""Track and report progress for long operations."""
def __init__(self, operation_id: str, total: int):
self.operation_id = operation_id
self.total = total
self.current = 0
self.start_time = time.time()
async def update(self, current: int, message: str = None):
"""Update progress and notify client."""
self.current = current
progress = {
"operation_id": self.operation_id,
"current": current,
"total": self.total,
"percentage": (current / self.total * 100) if self.total > 0 else 0,
"elapsed": time.time() - self.start_time,
"message": message
}
# Send progress update through MCP
await self.app.send_progress(progress)
async def complete(self, message: str = "Operation completed"):
"""Mark operation as complete."""
await self.update(self.total, message)Use Cases
@mcp.tool()
async def bulk_update_records(
model: str,
record_ids: List[int],
values: Dict[str, Any],
report_progress: bool = True
) -> Dict[str, Any]:
"""Update multiple records with progress tracking."""
total = len(record_ids)
tracker = ProgressTracker(f"bulk_update_{model}", total) if report_progress else None
updated = []
errors = []
for i, record_id in enumerate(record_ids):
try:
# Update record
await self.connection.write(model, [record_id], values)
updated.append(record_id)
# Report progress
if tracker and i % 10 == 0: # Every 10 records
await tracker.update(
i + 1,
f"Updated {i + 1}/{total} records"
)
except Exception as e:
errors.append({"id": record_id, "error": str(e)})
if tracker:
await tracker.complete(f"Updated {len(updated)} records")
return {
"updated": updated,
"errors": errors,
"total": total
}2. Operation Cancellation
Allow users to cancel long-running operations gracefully.
Implementation
# In server.py or new cancellation.py module
class CancellationManager:
"""Manage cancellable operations."""
def __init__(self):
self.cancelled_operations = set()
self.active_operations = {}
def register_operation(self, operation_id: str):
"""Register an operation as cancellable."""
self.active_operations[operation_id] = {
"start_time": time.time(),
"status": "running"
}
def cancel_operation(self, operation_id: str):
"""Mark an operation for cancellation."""
if operation_id in self.active_operations:
self.cancelled_operations.add(operation_id)
self.active_operations[operation_id]["status"] = "cancelling"
def is_cancelled(self, operation_id: str) -> bool:
"""Check if operation should be cancelled."""
return operation_id in self.cancelled_operations
def complete_operation(self, operation_id: str):
"""Mark operation as complete and clean up."""
self.cancelled_operations.discard(operation_id)
self.active_operations.pop(operation_id, None)Usage Example
@mcp.tool()
async def import_large_dataset(
model: str,
data: List[Dict],
operation_id: str = None
) -> Dict[str, Any]:
"""Import large dataset with cancellation support."""
operation_id = operation_id or f"import_{model}_{int(time.time())}"
self.cancellation_manager.register_operation(operation_id)
imported = []
errors = []
try:
for i, record_data in enumerate(data):
# Check for cancellation
if self.cancellation_manager.is_cancelled(operation_id):
return {
"status": "cancelled",
"imported": imported,
"errors": errors,
"cancelled_at": i
}
# Import record
try:
record_id = await self.connection.create(model, record_data)
imported.append(record_id)
except Exception as e:
errors.append({"index": i, "error": str(e)})
return {
"status": "completed",
"imported": imported,
"errors": errors
}
finally:
self.cancellation_manager.complete_operation(operation_id)3. Sampling Support
Preview operations on a subset of data before executing on the full dataset.
Implementation
# In sampling.py or tools.py
class DataSampler:
"""Sample data for preview operations."""
@staticmethod
def get_sample_size(total: int, sample_percentage: float = 0.1) -> int:
"""Calculate appropriate sample size."""
sample_size = int(total * sample_percentage)
return max(1, min(sample_size, 100)) # Between 1 and 100
@staticmethod
def sample_indices(total: int, sample_size: int) -> List[int]:
"""Get evenly distributed sample indices."""
if sample_size >= total:
return list(range(total))
step = total / sample_size
return [int(i * step) for i in range(sample_size)]Usage Example
@mcp.tool()
async def preview_bulk_operation(
model: str,
operation: str,
filters: List = None,
values: Dict = None,
sample_size: int = 10
) -> Dict[str, Any]:
"""Preview operation on sample data."""
# Get total count
domain = filters or []
total = await self.connection.search_count(model, domain)
# Get sample
sampler = DataSampler()
sample_size = min(sample_size, sampler.get_sample_size(total))
# Fetch sample records
record_ids = await self.connection.search(
model, domain, limit=sample_size
)
# Preview operation
preview_results = []
for record_id in record_ids:
# Fetch current state
current = await self.connection.read(model, [record_id])[0]
# Simulate operation
if operation == "update" and values:
preview = {**current, **values}
changes = {
k: {"old": current.get(k), "new": v}
for k, v in values.items()
if current.get(k) != v
}
preview_results.append({
"id": record_id,
"changes": changes
})
return {
"sample_size": sample_size,
"total_records": total,
"preview": preview_results,
"estimated_changes": len([r for r in preview_results if r.get("changes")])
}Integration Points
1. MCP Protocol Support
# In server.py
class OdooMCPServer:
def __init__(self, config: OdooConfig):
# ... existing init ...
self.progress_tracker = ProgressTracker()
self.cancellation_manager = CancellationManager()
self.data_sampler = DataSampler()
# Register MCP handlers
self.app.on_cancel(self._handle_cancellation)
self.app.progress_handler(self._handle_progress)2. Client Communication
# Progress updates
async def _handle_progress(self, progress: Dict):
"""Send progress updates to client."""
await self.app.notify_progress({
"type": "progress",
"data": progress
})
# Cancellation requests
async def _handle_cancellation(self, operation_id: str):
"""Handle cancellation request from client."""
self.cancellation_manager.cancel_operation(operation_id)
return {"cancelled": True, "operation_id": operation_id}Testing Requirements
Progress Tracking Tests
- Progress updates sent at correct intervals
- Percentage calculation accurate
- Time estimation works
- Multiple operations tracked simultaneously
Cancellation Tests
- Operations stop when cancelled
- Partial results returned correctly
- Cleanup happens properly
- Concurrent cancellations handled
Sampling Tests
- Sample size calculation correct
- Representative samples selected
- Preview accurate for operations
- Edge cases handled (empty sets, single records)
Success Criteria
- Progress tracking works for operations > 5 seconds
- Cancellation responds within 1 second
- Sampling provides accurate previews
- No memory leaks from tracking
- Client compatibility verified
- Performance overhead < 5%
Use Cases
-
Data Import Wizard
- Show progress during CSV import
- Allow cancellation if errors detected
- Preview import with sample data
-
Bulk Updates
- Track progress of mass updates
- Cancel if wrong values selected
- Preview changes before applying
-
Report Generation
- Show progress for long reports
- Cancel unnecessary reports
- Sample data for report preview
Priority
Low - These are nice-to-have features that enhance UX but aren't critical for core functionality. Implement after write operations and other high-priority items.