feat: implement acp handler and communication layer with the sidecar#8294
feat: implement acp handler and communication layer with the sidecar#8294Nabil-Salah wants to merge 11 commits intojaegertracing:mainfrom
Conversation
…agent Signed-off-by: Nabil-Salah <nabil.salah203@gmail.com>
There was a problem hiding this comment.
Pull request overview
This PR adds an AI gateway capability to the Jaeger Query service by introducing an /api/ai/chat endpoint that proxies chat prompts to an external ACP-capable sidecar over WebSockets, streaming the response back to the HTTP client.
Changes:
- Added
extensions.jaeger_query.ai.agent_urlconfiguration and wired it into Jaeger Query options and routing. - Implemented a
jaegeraicommunication layer (WebSocket adapter, ACP client connection, streaming response handling). - Added unit tests covering config defaults, route registration, ACP handshake/prompt flows, and error/streaming behaviors.
Reviewed changes
Copilot reviewed 13 out of 14 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| go.mod | Adds ACP SDK + gorilla/websocket dependencies needed for sidecar communication. |
| go.sum | Adds checksums for new dependencies. |
| cmd/jaeger/internal/extension/jaegerquery/internal/server.go | Registers /api/ai/chat route when AI config is enabled. |
| cmd/jaeger/internal/extension/jaegerquery/internal/server_test.go | Tests route registration behavior with/without base path and enabled/disabled config. |
| cmd/jaeger/internal/extension/jaegerquery/internal/flags.go | Introduces AI config types and defaults in Query options. |
| cmd/jaeger/internal/extension/jaegerquery/internal/flags_test.go | Tests default AI config values. |
| cmd/jaeger/internal/extension/jaegerquery/internal/jaegerai/handler.go | Implements the HTTP chat handler and ACP handshake/session/prompt flow over WebSocket. |
| cmd/jaeger/internal/extension/jaegerquery/internal/jaegerai/handler_test.go | Tests handler success path and error paths (dial/init/session/prompt failures). |
| cmd/jaeger/internal/extension/jaegerquery/internal/jaegerai/streaming_client.go | Implements ACP client callbacks and streaming text writes to HTTP response. |
| cmd/jaeger/internal/extension/jaegerquery/internal/jaegerai/streaming_client_test.go | Tests streaming client write/flush behavior and ACP callback handling. |
| cmd/jaeger/internal/extension/jaegerquery/internal/jaegerai/ws_adapter.go | Adds io.ReadWriteCloser adapter around gorilla/websocket for ACP SDK. |
| cmd/jaeger/internal/extension/jaegerquery/internal/jaegerai/ws_adapter_test.go | Tests adapter behavior for round-trip, EOF handling, and close/write errors. |
| cmd/jaeger/internal/extension/jaegerquery/internal/jaegerai/package_test.go | Adds leak-checking TestMain for the new package. |
| cmd/jaeger/config.yaml | Adds example AI configuration stanza under extensions.jaeger_query. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
cmd/jaeger/internal/extension/jaegerquery/internal/jaegerai/ws_adapter.go
Outdated
Show resolved
Hide resolved
| func (*streamingClient) RequestPermission(_ context.Context, p acp.RequestPermissionRequest) (acp.RequestPermissionResponse, error) { | ||
| if len(p.Options) == 0 { | ||
| return acp.RequestPermissionResponse{ | ||
| Outcome: acp.RequestPermissionOutcome{ | ||
| Cancelled: &acp.RequestPermissionOutcomeCancelled{}, | ||
| }, | ||
| }, nil | ||
| } | ||
| return acp.RequestPermissionResponse{ | ||
| Outcome: acp.RequestPermissionOutcome{ | ||
| Selected: &acp.RequestPermissionOutcomeSelected{OptionId: p.Options[0].OptionId}, |
There was a problem hiding this comment.
RequestPermission automatically selects the first option whenever options are provided. This effectively grants permissions without any user/admin consent and could let a sidecar perform actions it should not be allowed to. Safer default is to always return a cancelled/denied outcome (or gate auto-approval behind an explicit config option).
| func (*streamingClient) RequestPermission(_ context.Context, p acp.RequestPermissionRequest) (acp.RequestPermissionResponse, error) { | |
| if len(p.Options) == 0 { | |
| return acp.RequestPermissionResponse{ | |
| Outcome: acp.RequestPermissionOutcome{ | |
| Cancelled: &acp.RequestPermissionOutcomeCancelled{}, | |
| }, | |
| }, nil | |
| } | |
| return acp.RequestPermissionResponse{ | |
| Outcome: acp.RequestPermissionOutcome{ | |
| Selected: &acp.RequestPermissionOutcomeSelected{OptionId: p.Options[0].OptionId}, | |
| func (*streamingClient) RequestPermission(_ context.Context, _ acp.RequestPermissionRequest) (acp.RequestPermissionResponse, error) { | |
| return acp.RequestPermissionResponse{ | |
| Outcome: acp.RequestPermissionOutcome{ | |
| Cancelled: &acp.RequestPermissionOutcomeCancelled{}, |
cmd/jaeger/internal/extension/jaegerquery/internal/jaegerai/streaming_client.go
Outdated
Show resolved
Hide resolved
| // This blocks until the sidecar completes the ACP prompt turn. | ||
| _, err = acpConn.Prompt(acpCtx, acp.PromptRequest{ | ||
| SessionId: sess.SessionId, | ||
| Prompt: []acp.ContentBlock{acp.TextBlock(req.Prompt)}, | ||
| }) | ||
| if err != nil { | ||
| w.WriteHeader(http.StatusBadGateway) | ||
| if _, writeErr := fmt.Fprintf(w, "Error starting prompt: %v\n", err); writeErr != nil { | ||
| h.Logger.Warn("Failed to write prompt error response", zap.Error(writeErr)) | ||
| } | ||
| return | ||
| } | ||
|
|
||
| // Wait for explicit end-of-turn marker from the sidecar, with a 3 mins timeout fallback. | ||
| clientImpl.waitForTurnCompletion(acpCtx, 180*time.Second) | ||
| } |
There was a problem hiding this comment.
After Prompt returns successfully, the handler waits for an out-of-band __END_OF_TURN__ marker to close doneCh. This marker is not part of ACP itself and requires sidecars to implement a Jaeger-specific convention; if they don’t, requests will hang until the 180s timeout. Consider treating a successful Prompt response (e.g., StopReasonEndTurn) as completion and signaling doneCh immediately, keeping the timeout only as a safeguard for the RPC itself.
There was a problem hiding this comment.
good catch - why do we need an explicit __END_OF_TURN__ signal instead of relying on the ACP protocol semantics?
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #8294 +/- ##
==========================================
+ Coverage 95.61% 95.67% +0.05%
==========================================
Files 314 317 +3
Lines 16505 16705 +200
==========================================
+ Hits 15782 15982 +200
Misses 570 570
Partials 153 153
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
| } | ||
|
|
||
| // Wait for explicit end-of-turn marker from the sidecar, with a 3 mins timeout fallback. | ||
| clientImpl.waitForTurnCompletion(acpCtx, 180*time.Second) |
There was a problem hiding this comment.
180*time.Second - move to configurable param wait_for_turn_timeout
| } | ||
| } | ||
| if u.ToolCall != nil { | ||
| c.writeAndFlush(fmt.Sprintf("\n[tool_call] %s\n", u.ToolCall.Title)) |
There was a problem hiding this comment.
what is the significance of the string formatting [tool_call] ...? Is something meant to be parsing it? Is it part of some protocol? Please document with comments
| return string(*v) | ||
| } | ||
|
|
||
| func (*streamingClient) WriteTextFile(_ context.Context, _ acp.WriteTextFileRequest) (acp.WriteTextFileResponse, error) { |
There was a problem hiding this comment.
all of these functions are rather odd - who is expected to call them? Shouldn't they all be returning "not implemented" error if we don't expect anyone to call them?
cmd/jaeger/internal/extension/jaegerquery/internal/jaegerai/streaming_client.go
Show resolved
Hide resolved
| c.flusher.Flush() | ||
| } | ||
|
|
||
| func (c *streamingClient) waitForTurnCompletion(ctx context.Context, maxWait time.Duration) { |
There was a problem hiding this comment.
nothing seems to be calling this function
| func DefaultQueryOptions() QueryOptions { | ||
| return QueryOptions{ | ||
| MaxClockSkewAdjust: 0, // disabled by default | ||
| AI: configoptional.Some(AIConfig{AgentURL: "ws://localhost:9000"}), |
There was a problem hiding this comment.
this should be configoptional.Default(...)
Signed-off-by: Nabil-Salah <nabil.salah203@gmail.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 13 out of 14 changed files in this pull request and generated 5 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| func (*streamingClient) RequestPermission(_ context.Context, p acp.RequestPermissionRequest) (acp.RequestPermissionResponse, error) { | ||
| if len(p.Options) == 0 { | ||
| return acp.RequestPermissionResponse{ | ||
| Outcome: acp.RequestPermissionOutcome{ | ||
| Cancelled: &acp.RequestPermissionOutcomeCancelled{}, | ||
| }, | ||
| }, nil | ||
| } | ||
| return acp.RequestPermissionResponse{ | ||
| Outcome: acp.RequestPermissionOutcome{ | ||
| Selected: &acp.RequestPermissionOutcomeSelected{OptionId: p.Options[0].OptionId}, |
There was a problem hiding this comment.
RequestPermission auto-selects the first permission option when any are provided. This effectively grants whatever the agent asks for without user consent/policy, which is risky for an AI gateway. Safer default is to always cancel/deny, or gate approvals behind explicit config/policy and restrict to a known-safe subset of permission kinds.
| func (*streamingClient) RequestPermission(_ context.Context, p acp.RequestPermissionRequest) (acp.RequestPermissionResponse, error) { | |
| if len(p.Options) == 0 { | |
| return acp.RequestPermissionResponse{ | |
| Outcome: acp.RequestPermissionOutcome{ | |
| Cancelled: &acp.RequestPermissionOutcomeCancelled{}, | |
| }, | |
| }, nil | |
| } | |
| return acp.RequestPermissionResponse{ | |
| Outcome: acp.RequestPermissionOutcome{ | |
| Selected: &acp.RequestPermissionOutcomeSelected{OptionId: p.Options[0].OptionId}, | |
| func (*streamingClient) RequestPermission(_ context.Context, _ acp.RequestPermissionRequest) (acp.RequestPermissionResponse, error) { | |
| return acp.RequestPermissionResponse{ | |
| Outcome: acp.RequestPermissionOutcome{ | |
| Cancelled: &acp.RequestPermissionOutcomeCancelled{}, |
cmd/jaeger/internal/extension/jaegerquery/internal/jaegerai/streaming_client.go
Outdated
Show resolved
Hide resolved
|
|
||
| type AIConfig struct { | ||
| // AgentURL is the WebSocket endpoint of an agent that supports ACP. | ||
| // See https://agentclientprotocol.com/ |
There was a problem hiding this comment.
| // See https://agentclientprotocol.com/ | |
| // For example, ws://localhost:9000 | |
| // See https://agentclientprotocol.com/ |
| defer cancel() | ||
|
|
||
| clientVersion := version.Get().GitVersion | ||
| if clientVersion == "" { |
| // Build an ACP client-side connection over the websocket adapter. | ||
| acpConn := acp.NewClientSideConnection(clientImpl, adapter, adapter) | ||
|
|
||
| acpCtx, cancel := context.WithCancel(ctx) |
| w.Header().Set("Connection", "keep-alive") | ||
|
|
||
| ctx := r.Context() | ||
| dialer := websocket.Dialer{HandshakeTimeout: 5 * time.Second} |
There was a problem hiding this comment.
I think this initialization should be moved into NewWsAdapter(), since it's part of websocket handling and the conn variable created in this scope is never used again. Instead you would call defer adapter.Close()
|
|
||
| func (h *ChatHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { | ||
| if r.Method != http.MethodPost { | ||
| http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) |
There was a problem hiding this comment.
| http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) | |
| http.Error(w, "Only POST method is supported", http.StatusMethodNotAllowed) |
| ) | ||
|
|
||
| const ( | ||
| endOfTurnMarker = "__END_OF_TURN__" |
There was a problem hiding this comment.
what's your take on this observation?
The acp.Client interface has two types of methods:
Request/Response methods (agent calls client, blocks waiting for response):
ReadTextFile,WriteTextFile- file operationsRequestPermission- permission dialogsCreateTerminal,TerminalOutput, etc. - terminal operations
Notification method (one-way, fire-and-forget):
SessionUpdate- streams real-time progress and results during prompt processing
SessionUpdate carries only informational content for UI streaming:
AgentMessageChunk- streamed text from the agentAgentThoughtChunk- agent's internal reasoningToolCall/ToolCallUpdate- notifications that the agent has initiated/completed a tool call (for UI progress display, not execution requests)Plan- execution plan for complex tasks
In Jaeger's architecture, the sidecar executes MCP tools by calling Jaeger's MCP
server directly over HTTP. The SessionUpdate(ToolCall) notifications merely
inform the UI that a tool is running - they do not ask the client to execute
anything.
End-of-Turn Handling
The agent will not return PromptResponse until all actual work (including tool
execution via MCP) is complete. By the time Prompt() returns:
- All tool calls have finished (via MCP)
- All
SessionUpdatenotifications have been sent
The current implementation uses an __END_OF_TURN__ marker as a SessionUpdate
to signal completion. This works around a minor race condition in acp-go-sdk
where notifications are processed in goroutines (go c.handleInbound(&msg) in
connection.go:97), meaning SessionUpdate callbacks might still be running
when PromptResponse arrives and Prompt() returns.
Since SessionUpdate handlers just write to the HTTP response (fast operation),
this race is minimal. The marker may be over-engineering - a potential
simplification would be to remove the marker and return immediately after
Prompt() completes.
There was a problem hiding this comment.
but the marker costs almost nothing, so why not keep on it?
There was a problem hiding this comment.
it costs the complexity of implementation and it's not part of the protocol we're trying to implement.
There was a problem hiding this comment.
okay will update my code
…nd Return errNotSupported Signed-off-by: Nabil-Salah <nabil.salah203@gmail.com>
Signed-off-by: Nabil-Salah <nabil.salah203@gmail.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 13 out of 14 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| w.Header().Set("Content-Type", "text/plain; charset=utf-8") | ||
| w.Header().Set("Cache-Control", "no-cache") | ||
| w.Header().Set("Connection", "keep-alive") |
There was a problem hiding this comment.
The handler sets a Connection: keep-alive response header. This is a hop-by-hop header that net/http manages automatically and can be problematic with proxies and is not applicable to HTTP/2. Consider removing this header and relying on the server’s connection handling (keeping Cache-Control: no-cache etc. for streaming).
| func (c *streamingClient) waitForTurnCompletion(ctx context.Context, maxWait time.Duration) { | ||
| if maxWait <= 0 { | ||
| return | ||
| } | ||
|
|
||
| maxTimer := time.NewTimer(maxWait) | ||
| defer maxTimer.Stop() | ||
|
|
||
| select { | ||
| case <-ctx.Done(): | ||
| return | ||
| case <-maxTimer.C: | ||
| return | ||
| case <-c.doneCh: | ||
| return | ||
| } | ||
| } |
There was a problem hiding this comment.
waitForTurnCompletion can only return early via c.doneCh, but doneCh is never signaled on the normal success path (it’s only closed on request cancel, write error, or panic). As a result, successful requests will always wait the full maxWait duration, adding fixed latency without actually confirming that all SessionUpdate goroutines have drained. Consider either (a) simplifying this to a plain sleep with a clarifying name/comment, or (b) implementing an actual “idle/drained” signal (e.g., track last-write time and wait until no writes occur for N ms, or close doneCh via an explicit end-of-turn notification if ACP provides one).
Signed-off-by: Nabil-Salah <nabil.salah203@gmail.com>
…jaeger into develop_acp_handler
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 13 out of 14 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // Limit the size of the request body to prevent memory/CPU abuse. | ||
| r.Body = http.MaxBytesReader(w, r.Body, h.maxRequestBodySize) | ||
| defer r.Body.Close() | ||
|
|
||
| var req ChatRequest | ||
| if err := json.NewDecoder(r.Body).Decode(&req); err != nil { | ||
| http.Error(w, "Bad request", http.StatusBadRequest) | ||
| return | ||
| } |
There was a problem hiding this comment.
Request body is wrapped with http.MaxBytesReader, but JSON decode errors are always returned as 400. When the body exceeds maxRequestBodySize, Decode will fail with *http.MaxBytesError and should return 413 (Request Entity Too Large) so clients can distinguish oversized payloads from malformed JSON.
Signed-off-by: Nabil-Salah <nabil.salah203@gmail.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 13 out of 14 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| var req ChatRequest | ||
| if err := json.NewDecoder(r.Body).Decode(&req); err != nil { | ||
| http.Error(w, "Bad request", http.StatusBadRequest) | ||
| return | ||
| } | ||
|
|
There was a problem hiding this comment.
The handler accepts an empty/missing "prompt" (it will forward an empty string to the sidecar). If "prompt" is required for this API, add validation after decoding (e.g., strings.TrimSpace(req.Prompt) != "") and return 400 to fail fast and avoid creating ACP sessions for invalid requests.
| func (*streamingClient) RequestPermission(_ context.Context, p acp.RequestPermissionRequest) (acp.RequestPermissionResponse, error) { | ||
| if len(p.Options) == 0 { | ||
| return acp.RequestPermissionResponse{ | ||
| Outcome: acp.RequestPermissionOutcome{ | ||
| Cancelled: &acp.RequestPermissionOutcomeCancelled{}, | ||
| }, | ||
| }, nil | ||
| } | ||
| return acp.RequestPermissionResponse{ | ||
| Outcome: acp.RequestPermissionOutcome{ | ||
| Selected: &acp.RequestPermissionOutcomeSelected{OptionId: p.Options[0].OptionId}, | ||
| }, | ||
| }, nil | ||
| } |
There was a problem hiding this comment.
RequestPermission currently auto-selects the first option whenever any options are provided. That can unintentionally approve sensitive actions if an agent presents an "allow" option first. Consider defaulting to a safe outcome (Cancelled/Denied) unless the gateway has explicit user consent/config for auto-approval, or select only an explicitly safe option kind.
Which problem is this PR solving?
Description of the changes
Added AI proxy configuration for Jaeger Query via extensions.jaeger_query.ai.agent_url (default ws://localhost:9000) and wired it into query options.
Registered a new chat endpoint POST /api/ai/chat, enabled only when ai.agent_url is configured and non-empty.
Introduced a new jaegerai communication layer that:
Added required dependencies for ACP/WebSocket support (github.com/coder/acp-go-sdk, github.com/gorilla/websocket).
continue of work to this pr (feat: Add sidecar example for jaeger agent #8276)
How was this change tested?
Checklist
make lint testAI Usage in this PR (choose one)
See AI Usage Policy.