Skip to content
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
* [FEATURE] Add query_end_cutoff setting to clamp near-“now” queries (default 30s) for consistent Tempo results [#5682](https://github.com/grafana/tempo/pull/5682) (@javiermolinar)
* [FEATURE] New block encoding vParquet5-preview4 with array support for dedicated columns. This is a preview, breaking changes are expected. [#5760](https://github.com/grafana/tempo/pull/5760) (@stoewer)
* [FEATURE] Add SSE-C encryption support to S3 backend [#5789](https://github.com/grafana/tempo/pull/5789) (@steffsas)
* [FEATURE] Add support for `Accept: application/vnd.grafana.llm` to Tempo endpoints. Currently supported directly by trace by id and tag values [#5961](https://github.com/grafana/tempo/pull/5961) (@joe-elliott)
This response is subject to change and should not be relied on. It is intended for LLM consumption only. Even a fundamental change to its representation (yaml? markdown?) would not be considered breaking.
* [ENHANCEMENT] docs: Add explicit notes about authentication [#5735](https://github.com/grafana/tempo/pull/5735) (@electron0zero)
* [ENHANCEMENT] On startup, first record for live store to consume is not older than two complete block timeouts [#5693](https://github.com/grafana/tempo/pull/5693) (@ruslan-mikhailov)
* [ENHANCEMENT] Add secure connection support to tempo-cli [#5692](https://github.com/grafana/tempo/pull/5692) (@TheoBrigitte)
Expand Down
18 changes: 7 additions & 11 deletions integration/e2e/api/mcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,10 @@ import (
"testing"
"time"

"github.com/gogo/protobuf/jsonpb"
"github.com/grafana/e2e"
e2edb "github.com/grafana/e2e/db"
"github.com/grafana/tempo/integration/util"
"github.com/grafana/tempo/pkg/api"
"github.com/grafana/tempo/pkg/tempopb"
tempoUtil "github.com/grafana/tempo/pkg/util"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -43,7 +41,7 @@ func TestMCP(t *testing.T) {
info := tempoUtil.NewTraceInfo(time.Now(), "")
require.NoError(t, info.EmitAllBatches(jaegerClient))

expected, err := info.ConstructTraceFromEpoch()
_, err = info.ConstructTraceFromEpoch()
require.NoError(t, err)

// now query it back with mcp
Expand Down Expand Up @@ -84,8 +82,7 @@ func TestMCP(t *testing.T) {
sort.Strings(expectedTools)
require.Equal(t, expectedTools, actualTools)

trace := traceOverMCP(t, mcpClient, info.HexID())
util.AssertEqualTrace(t, expected, trace)
assertTraceOverMCP(t, mcpClient, info.HexID())
}

func createMCPClient(t *testing.T, tempo *e2e.HTTPService) mcpclient.MCPClient {
Expand Down Expand Up @@ -119,7 +116,7 @@ func listTools(t *testing.T, mcpClient mcpclient.MCPClient) []mcp.Tool {
return toolsResponse.Tools
}

func traceOverMCP(t *testing.T, mcpClient mcpclient.MCPClient, traceID string) *tempopb.Trace {
func assertTraceOverMCP(t *testing.T, mcpClient mcpclient.MCPClient, traceID string) {
resp, err := mcpClient.CallTool(context.Background(), mcp.CallToolRequest{
Params: mcp.CallToolParams{
Name: "get-trace",
Expand All @@ -130,11 +127,10 @@ func traceOverMCP(t *testing.T, mcpClient mcpclient.MCPClient, traceID string) *

str := toolResult(t, resp)

trace := &tempopb.TraceByIDResponse{}
err = jsonpb.UnmarshalString(str, trace)
require.NoError(t, err)

return trace.Trace
// the trace format is subject is to change so let's keep it simple.
if !strings.Contains(str, traceID) {
t.Fatalf("expected trace ID %s not found in response", traceID)
}
}

func toolResult(t *testing.T, resp *mcp.CallToolResult) string {
Expand Down
63 changes: 47 additions & 16 deletions modules/frontend/combiner/common.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package combiner

import (
"bytes"
"errors"
"fmt"
"io"
"net/http"
"strings"
"sync"
"unsafe"

tempo_io "github.com/grafana/tempo/pkg/io"
"github.com/grafana/tempo/pkg/tempopb"
Expand Down Expand Up @@ -38,7 +41,7 @@ type genericCombiner[T TResponse] struct {
combine func(partial T, final T, resp PipelineResponse) error
metadata func(resp PipelineResponse, final T) error
finalize func(T) (T, error)
diff func(T) (T, error) // currently only implemented by the search combiner. required for streaming
diff func(T) (T, error)
quit func(T) bool

// Used to determine the response code and when to stop
Expand Down Expand Up @@ -163,27 +166,18 @@ func (c *genericCombiner[T]) HTTPFinal() (*http.Response, error) {
return nil, err
}

var bodyString string
if c.httpMarshalingFormat == api.MarshallingFormatProtobuf {
buff, err := proto.Marshal(final)
if err != nil {
return nil, fmt.Errorf("error marshalling response body: %w", err)
}
bodyString = string(buff)
} else {
bodyString, err = new(jsonpb.Marshaler).MarshalToString(final)
if err != nil {
return nil, fmt.Errorf("error marshalling response body: %w", err)
}
bodyBytes, contentType, err := c.internalMarshalAs(final)
if err != nil {
return nil, fmt.Errorf("error marshalling response body as %s: %w", c.httpMarshalingFormat, err)
}

return &http.Response{
StatusCode: 200,
Header: http.Header{
api.HeaderContentType: {string(c.httpMarshalingFormat)},
api.HeaderContentType: {contentType},
},
Body: io.NopCloser(strings.NewReader(bodyString)),
ContentLength: int64(len([]byte(bodyString))),
Body: io.NopCloser(bytes.NewReader(bodyBytes)),
ContentLength: int64(len(bodyBytes)),
}, nil
}

Expand Down Expand Up @@ -291,6 +285,43 @@ func (c *genericCombiner[R]) shouldQuit() bool {
return false
}

func (c *genericCombiner[T]) internalMarshalAs(final T) ([]byte, string, error) {
var bodyBytes []byte
var contentType string
var err error

switch c.httpMarshalingFormat {
case api.MarshallingFormatProtobuf:
bodyBytes, err = proto.Marshal(final)
contentType = string(api.MarshallingFormatProtobuf)
case api.MarshallingFormatLLM:
var bodyString string
bodyString, err = new(llmMarshaler).marshalToString(final)
contentType = string(api.MarshallingFormatLLM) + "+json" // postfix the content subtype to indicate its parseable as json
// if its unsupported, just fallthrough to marshal as normal json
if errors.Is(err, util.ErrUnsupported) {
bodyString, err = new(jsonpb.Marshaler).MarshalToString(final)
contentType = string(api.MarshallingFormatJSON)
}
bodyBytes = unsafeStringToBytes(bodyString)
case api.MarshallingFormatJSON:
fallthrough
default:
var bodyString string
bodyString, err = new(jsonpb.Marshaler).MarshalToString(final)
contentType = string(api.MarshallingFormatJSON)
bodyBytes = unsafeStringToBytes(bodyString)
}

return bodyBytes, contentType, err
}

type TraceRedactor interface {
RedactTraceAttributes(t *tempopb.Trace)
}

// unsafeStringToBytes converts a string to []byte without allocation.
// The returned byte slice must not be modified.
func unsafeStringToBytes(s string) []byte {
return unsafe.Slice(unsafe.StringData(s), len(s))
}
Loading
Loading