Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ require (
go.opentelemetry.io/otel/trace v1.0.0-RC2
go.uber.org/atomic v1.9.0
go.uber.org/goleak v1.1.10
go.uber.org/multierr v1.6.0
go.uber.org/zap v1.17.0
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c // indirect
golang.org/x/time v0.0.0-20210611083556-38a9dc6acbc6
Expand Down Expand Up @@ -239,7 +240,6 @@ require (
go.mongodb.org/mongo-driver v1.5.1 // indirect
go.opentelemetry.io/otel/metric v0.21.0 // indirect
go.opentelemetry.io/otel/sdk/export/metric v0.21.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e // indirect
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 // indirect
golang.org/x/mod v0.4.2 // indirect
Expand Down
57 changes: 44 additions & 13 deletions modules/frontend/querysharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@ import (
"context"
"encoding/binary"
"encoding/hex"
"fmt"
"io"
"io/ioutil"
"net/http"
"strings"

"github.com/go-kit/kit/log"
"github.com/golang/protobuf/proto"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/weaveworks/common/user"
Expand All @@ -25,6 +28,9 @@ const (

querierPrefix = "/querier"
queryDelimiter = "?"

// todo: make configurable
maxBlockErrCount = 5
)

func ShardingWare(queryShards int, logger log.Logger) Middleware {
Expand Down Expand Up @@ -154,25 +160,36 @@ func mergeResponses(ctx context.Context, rrs []RequestResponse) (*http.Response,

var errCode = http.StatusOK
var errBody io.ReadCloser
var combinedTrace []byte
var shardMissCount = 0
var combinedTrace *tempopb.Trace
var combinedTraceBytes []byte
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: we don't use combinedTraceBytes in this for-loop yet, so we can move the declaration a bit more down in this function. This makes the code a bit easier to read as we don't have to worry about this variable yet.

var shardMissCount, totalBlockErrCount int
for _, rr := range rrs {
if rr.Response.StatusCode == http.StatusOK {
partialContent := rr.Response.StatusCode == http.StatusPartialContent
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: renaming this to isPartialContent or gotPartialContent might make it more obvious this is a boolean. For a while I thought this was a variable holding a part of the content.

if rr.Response.StatusCode == http.StatusOK || partialContent {
body, err := io.ReadAll(rr.Response.Body)
rr.Response.Body.Close()
if err != nil {
return nil, errors.Wrap(err, "error reading response body at query frontend")
}
Comment on lines 169 to 173
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we should fail the entire request here. If we can't read the body from one of the requests, isn't this also a partial result?
Same for unmarshaling the body.


if len(combinedTrace) == 0 {
combinedTrace = body
} else {
combinedTrace, _, err = model.CombineTraceBytes(combinedTrace, body, model.TracePBEncoding, model.TracePBEncoding)
if err != nil {
// will result in a 500 internal server error
return nil, errors.Wrap(err, "error combining traces at query frontend")
var resp tempopb.TraceByIDResponse
err = proto.Unmarshal(body, &resp)
Comment on lines +175 to +176
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The querier only seems to marshal the trace part of TraceByIDResponse (modules/querier/http.go:82):

b, err := proto.Marshal(resp.Trace)

How can we unmarshal a full TraceByIDResponse here?

if err != nil {
return nil, errors.Wrap(err, "error reading response body at query frontend")
}

if partialContent {
totalBlockErrCount += int(resp.BlockErrCount)
if totalBlockErrCount > maxBlockErrCount {
return nil, fmt.Errorf("too many block queries failed (max %d)", maxBlockErrCount)
}
Comment on lines +183 to 185
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if it's useful to fail on maxBlockErrCount. We already did the work: all the queriers returned something. Instead of throwing away the results we can still return whatever we got.

}

if combinedTrace == nil {
combinedTrace = resp.Trace
} else {
combinedTrace, _, _, _ = model.CombineTraceProtos(combinedTrace, resp.Trace)
}
} else if rr.Response.StatusCode != http.StatusNotFound {
errCode = rr.Response.StatusCode
errBody = rr.Response.Body
Expand All @@ -181,6 +198,14 @@ func mergeResponses(ctx context.Context, rrs []RequestResponse) (*http.Response,
}
}

if combinedTrace != nil {
var err error
combinedTraceBytes, err = combinedTrace.Marshal()
if err != nil {
return nil, errors.Wrap(err, "error marshaling combined trace at query frontend")
}
}

if shardMissCount == len(rrs) {
Copy link
Copy Markdown
Contributor

@yvrhdn yvrhdn Sep 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can move this block above if combinedTrace != nil: when we exit the loop and no querier returned something, just exit immediately.

return &http.Response{
StatusCode: http.StatusNotFound,
Expand All @@ -190,12 +215,18 @@ func mergeResponses(ctx context.Context, rrs []RequestResponse) (*http.Response,
}

if errCode == http.StatusOK {
statusCode := http.StatusOK
if totalBlockErrCount > 0 {
// If there are failed blocks, and we haven't returned with an error,
// signal the upstream that the result contains partial results.
statusCode = http.StatusPartialContent
}
return &http.Response{
StatusCode: http.StatusOK,
Body: ioutil.NopCloser(bytes.NewReader(combinedTrace)),
StatusCode: statusCode,
Body: ioutil.NopCloser(bytes.NewReader(combinedTraceBytes)),
Copy link
Copy Markdown
Contributor

@yvrhdn yvrhdn Sep 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Body: ioutil.NopCloser(bytes.NewReader(combinedTraceBytes)),
Body: io.NopCloser(bytes.NewReader(combinedTraceBytes)),

ioutil.NopCloser has been moved to io.NopCloser, see https://golang.org/doc/go1.16#ioutil
We just removed all occurrences of ioutil here #998 🙂

// ContentLength header is added to log the size of response in the Tripperware in frontend.go
// This could be overwritten if the query client and Tempo negotiate compression
ContentLength: int64(len(combinedTrace)),
ContentLength: int64(len(combinedTraceBytes)),
Header: http.Header{},
}, nil
}
Expand Down
23 changes: 16 additions & 7 deletions modules/frontend/querysharding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"testing"

"github.com/golang/protobuf/proto"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/stretchr/testify/assert"

"github.com/grafana/tempo/pkg/model"
Expand Down Expand Up @@ -57,12 +58,20 @@ func TestMergeResponses(t *testing.T) {
t1 := test.MakeTrace(10, []byte{0x01, 0x02})
t2 := test.MakeTrace(10, []byte{0x01, 0x03})

b1, err := proto.Marshal(t1)
bt1, err := proto.Marshal(t1)
assert.NoError(t, err)
b2, err := proto.Marshal(t2)
bt2, err := proto.Marshal(t2)
assert.NoError(t, err)

combinedTrace, _, err := model.CombineTraceBytes(b1, b2, model.TracePBEncoding, model.TracePBEncoding)
r1 := &tempopb.TraceByIDResponse{Trace: t1, BlockErrCount: 1}
r2 := &tempopb.TraceByIDResponse{Trace: t2, BlockErrCount: 2}

br1, err := proto.Marshal(r1)
assert.NoError(t, err)
br2, err := proto.Marshal(r2)
assert.NoError(t, err)

combinedTrace, _, err := model.CombineTraceBytes(bt1, bt2, model.TracePBEncoding, model.TracePBEncoding)
assert.NoError(t, err)

tests := []struct {
Expand All @@ -76,13 +85,13 @@ func TestMergeResponses(t *testing.T) {
{
Response: &http.Response{
StatusCode: http.StatusOK,
Body: ioutil.NopCloser(bytes.NewReader(b1)),
Body: ioutil.NopCloser(bytes.NewReader(br1)),
},
},
{
Response: &http.Response{
StatusCode: http.StatusOK,
Body: ioutil.NopCloser(bytes.NewReader(b2)),
Body: ioutil.NopCloser(bytes.NewReader(br2)),
},
},
{
Expand All @@ -104,7 +113,7 @@ func TestMergeResponses(t *testing.T) {
{
Response: &http.Response{
StatusCode: http.StatusOK,
Body: ioutil.NopCloser(bytes.NewReader(b1)),
Body: ioutil.NopCloser(bytes.NewReader(br1)),
},
},
{
Expand Down Expand Up @@ -146,7 +155,7 @@ func TestMergeResponses(t *testing.T) {
{
Response: &http.Response{
StatusCode: http.StatusOK,
Body: ioutil.NopCloser(bytes.NewReader(b1)),
Body: ioutil.NopCloser(bytes.NewReader(br1)),
},
},
{
Expand Down
5 changes: 4 additions & 1 deletion modules/querier/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ func (q *Querier) TraceByIDHandler(w http.ResponseWriter, r *http.Request) {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if resp.BlockErrCount > 0 { // If some blocks failed, return 206
w.WriteHeader(http.StatusPartialContent)
}
_, err = w.Write(b)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
Expand All @@ -94,7 +97,7 @@ func (q *Querier) TraceByIDHandler(w http.ResponseWriter, r *http.Request) {

span.SetTag("response marshalling format", util.JSONTypeHeaderValue)
marshaller := &jsonpb.Marshaler{}
err = marshaller.Marshal(w, resp.Trace)
err = marshaller.Marshal(w, resp)
Copy link
Copy Markdown
Contributor

@yvrhdn yvrhdn Sep 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a difference between our responses depending on whether the caller accepts protobuf or not: if the caller accepts protobuf we return HTTP 206 and resp.Trace (line 82).

But if the caller requests something else (aka JSON) we marshal the entire TraceByIDResponse. Which would be something like:

{
  "trace": ...,
  "blockErrCount": ...
}

Why not also return HTTP 206? This change breaks our API I think.

if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
Expand Down
11 changes: 9 additions & 2 deletions modules/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,13 +185,19 @@ func (q *Querier) FindTraceByID(ctx context.Context, req *tempopb.TraceByIDReque
ot_log.Int("combinedTraces", traceCountTotal))
}

var blockErrsCount uint32
if req.QueryMode == QueryModeBlocks || req.QueryMode == QueryModeAll {
span.LogFields(ot_log.String("msg", "searching store"))
partialTraces, dataEncodings, err := q.store.Find(opentracing.ContextWithSpan(ctx, span), userID, req.TraceID, req.BlockStart, req.BlockEnd)
partialTraces, dataEncodings, blockErrs, err := q.store.Find(opentracing.ContextWithSpan(ctx, span), userID, req.TraceID, req.BlockStart, req.BlockEnd)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't use blockErrs in a meaningful way (we only use the count, not the errors itself). Maybe we should only return an int instead of []error?

// err contains unrecoverable errors
// errs querying blocks are contained in blockErrs
Comment on lines +192 to +193
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add these comments to Reader.Find itself?

if err != nil {
// todo: change err log to specify what failed
return nil, errors.Wrap(err, "error querying store in Querier.FindTraceByID")
}

blockErrsCount = uint32(len(blockErrs))

span.LogFields(ot_log.String("msg", "done searching store"))

if len(partialTraces) != 0 {
Expand Down Expand Up @@ -226,7 +232,8 @@ func (q *Querier) FindTraceByID(ctx context.Context, req *tempopb.TraceByIDReque
}

return &tempopb.TraceByIDResponse{
Trace: completeTrace,
Trace: completeTrace,
BlockErrCount: blockErrsCount,
}, nil
}

Expand Down
16 changes: 15 additions & 1 deletion modules/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/multierr"

"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/pkg/util"
Expand Down Expand Up @@ -106,8 +107,9 @@ func TestReturnAllHits(t *testing.T) {
time.Sleep(200 * time.Millisecond)

// find should return both now
foundBytes, _, err := r.Find(context.Background(), util.FakeTenantID, testTraceID, tempodb.BlockIDMin, tempodb.BlockIDMax)
foundBytes, _, blockErrs, err := r.Find(context.Background(), util.FakeTenantID, testTraceID, tempodb.BlockIDMin, tempodb.BlockIDMax)
assert.NoError(t, err)
assert.NoError(t, multierr.Combine(blockErrs...))
require.Len(t, foundBytes, 2)

// expected trace
Expand All @@ -123,4 +125,16 @@ func TestReturnAllHits(t *testing.T) {

model.SortTrace(actualTrace)
assert.Equal(t, expectedTrace, actualTrace)

// store's directory
err = os.RemoveAll(tempDir)
assert.NoError(t, err)

// it should return no results and two errors from block queries
foundBytes, _, blockErrs, err = r.Find(context.Background(), util.FakeTenantID, testTraceID, tempodb.BlockIDMin, tempodb.BlockIDMax)
assert.Nil(t, foundBytes)
assert.NoError(t, err)
assert.Equal(t, 2, len(blockErrs))
assert.Error(t, multierr.Combine(blockErrs...))

}
Loading