Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
* [ENHANCEMENT] Jsonnet: add `$._config.memcached.memory_limit_mb` [#987](https://github.com/grafana/tempo/pull/987) (@kvrhdn)
* [ENHANCEMENT] Upgrade jsonnet-libs to 1.19 and update tk examples [#1001](https://github.com/grafana/tempo/pull/1001) (@mapno)
* [ENHANCEMENT] Shard tenant index creation by tenant and add functionality to handle stale indexes. [#1005](https://github.com/grafana/tempo/pull/1005) (@joe-elliott)
* [ENHANCEMENT] Support partial results from failed block queries [#1007](https://github.com/grafana/tempo/pull/1007) (@mapno)
* [BUGFIX] Update port spec for GCS docker-compose example [#869](https://github.com/grafana/tempo/pull/869) (@zalegrala)
* [BUGFIX] Fix "magic number" errors and other block mishandling when an ingester forcefully shuts down [#937](https://github.com/grafana/tempo/issues/937) (@mdisibio)
* [BUGFIX] Fix compactor memory leak [#806](https://github.com/grafana/tempo/pull/806) (@mdisibio)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ require (
go.opentelemetry.io/otel/trace v1.0.0-RC2
go.uber.org/atomic v1.9.0
go.uber.org/goleak v1.1.10
go.uber.org/multierr v1.6.0
go.uber.org/zap v1.17.0
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c // indirect
golang.org/x/time v0.0.0-20210611083556-38a9dc6acbc6
Expand Down Expand Up @@ -239,7 +240,6 @@ require (
go.mongodb.org/mongo-driver v1.5.1 // indirect
go.opentelemetry.io/otel/metric v0.21.0 // indirect
go.opentelemetry.io/otel/sdk/export/metric v0.21.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e // indirect
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 // indirect
golang.org/x/mod v0.4.2 // indirect
Expand Down
69 changes: 48 additions & 21 deletions modules/frontend/querysharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"encoding/binary"
"encoding/hex"
"fmt"
"io"
"net/http"
"strings"
Expand All @@ -13,12 +14,11 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/log/level"
"github.com/golang/protobuf/proto"
"github.com/opentracing/opentracing-go"
"github.com/weaveworks/common/user"

"github.com/grafana/tempo/modules/querier"
"github.com/grafana/tempo/pkg/model"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/opentracing/opentracing-go"
"github.com/weaveworks/common/user"
)

const (
Expand All @@ -27,6 +27,8 @@ const (

querierPrefix = "/querier"
queryDelimiter = "?"

defaulMaxFailedBlocks = 5
Comment thread
mapno marked this conversation as resolved.
Outdated
)

func ShardingWare(queryShards int, logger log.Logger) Middleware {
Expand All @@ -36,6 +38,7 @@ func ShardingWare(queryShards int, logger log.Logger) Middleware {
queryShards: queryShards,
logger: logger,
blockBoundaries: createBlockBoundaries(queryShards - 1), // one shard will be used to query ingesters
maxFailedBlocks: defaulMaxFailedBlocks,
}
})
}
Expand All @@ -45,6 +48,7 @@ type shardQuery struct {
queryShards int
logger log.Logger
blockBoundaries [][]byte
maxFailedBlocks uint32
}

// RoundTrip implements http.RoundTripper
Expand All @@ -66,6 +70,7 @@ func (s shardQuery) RoundTrip(r *http.Request) (*http.Response, error) {

var overallTrace *tempopb.Trace
var overallError error
var totalFailedBlocks uint32
statusCode := http.StatusNotFound
statusMsg := "trace not found"

Expand All @@ -82,7 +87,7 @@ func (s shardQuery) RoundTrip(r *http.Request) (*http.Response, error) {
overallError = err
}

if shouldQuit(r.Context(), statusCode, overallError) {
if s.shouldQuit(r.Context(), statusCode, totalFailedBlocks, overallError) {
return
}

Expand Down Expand Up @@ -121,13 +126,20 @@ func (s shardQuery) RoundTrip(r *http.Request) (*http.Response, error) {
// marshal into a trace to combine.
// todo: better define responsibilities between middleware. the parent middleware in frontend.go actually sets the header
// which forces the body here to be a proto encoded tempopb.Trace{}
trace := &tempopb.Trace{}
err = proto.Unmarshal(buff, trace)
traceResp := &tempopb.TraceByIDResponse{}
err = proto.Unmarshal(buff, traceResp)
if err != nil {
_ = level.Error(s.logger).Log("msg", "error unmarshalling response", "url", innerR.RequestURI, "err", err, "body", string(buff))
overallError = err
return
}
if traceResp.Metrics != nil {
totalFailedBlocks += traceResp.Metrics.FailedBlocks
}
trace := &tempopb.Trace{}
if traceResp.Trace != nil {
Comment thread
mapno marked this conversation as resolved.
Outdated
trace = traceResp.Trace
}

// happy path
statusCode = http.StatusOK
Expand Down Expand Up @@ -155,14 +167,23 @@ func (s shardQuery) RoundTrip(r *http.Request) (*http.Response, error) {
}, nil
}

buff, err := proto.Marshal(overallTrace)
buff, err := proto.Marshal(&tempopb.TraceByIDResponse{
Trace: overallTrace,
Metrics: &tempopb.TraceByIDMetrics{
FailedBlocks: totalFailedBlocks,
},
})
if err != nil {
_ = level.Error(s.logger).Log("msg", "error marshalling response to proto", "err", err)
return nil, err
}

statusCode = http.StatusOK
if totalFailedBlocks > 0 {
statusCode = http.StatusPartialContent
}
return &http.Response{
StatusCode: http.StatusOK,
StatusCode: statusCode,
Header: http.Header{},
Body: io.NopCloser(bytes.NewReader(buff)),
ContentLength: int64(len(buff)),
Expand Down Expand Up @@ -205,6 +226,25 @@ func (s *shardQuery) buildShardedRequests(parent *http.Request) ([]*http.Request
return reqs, nil
}

func (s *shardQuery) shouldQuit(ctx context.Context, statusCode int, totalFailedBlocks uint32, err error) bool {
if err != nil {
return true
}
if ctx.Err() != nil {
return true
}
if statusCode/100 == 5 { // bail on any 5xx's
return true
}

if totalFailedBlocks > s.maxFailedBlocks {
fmt.Println("its quitting here")
return true
}

return false
}

// createBlockBoundaries splits the range of blockIDs into queryShards parts
func createBlockBoundaries(queryShards int) [][]byte {
if queryShards == 0 {
Expand All @@ -227,16 +267,3 @@ func createBlockBoundaries(queryShards int) [][]byte {

return blockBoundaries
}

func shouldQuit(ctx context.Context, statusCode int, err error) bool {
if err != nil {
return true
}
if ctx.Err() != nil {
return true
}
if statusCode/100 == 5 { // bail on any 5xx's
return true
}
return false
}
19 changes: 11 additions & 8 deletions modules/frontend/querysharding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"bytes"
"context"
"errors"
"fmt"
"io"
"io/ioutil"
"math/rand"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -234,14 +234,16 @@ func TestShardingWareDoRequest(t *testing.T) {
return nil, err
}

var traceBytes []byte
var resBytes []byte
if trace != nil {
traceBytes, err = proto.Marshal(trace)
resBytes, err = proto.Marshal(&tempopb.TraceByIDResponse{
Trace: trace,
})
require.NoError(t, err)
}

return &http.Response{
Body: ioutil.NopCloser(bytes.NewReader(traceBytes)),
Body: io.NopCloser(bytes.NewReader(resBytes)),
StatusCode: statusCode,
}, nil
})
Expand All @@ -261,15 +263,16 @@ func TestShardingWareDoRequest(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, tc.expectedStatus, resp.StatusCode)
if tc.expectedTrace != nil {
actualTrace := &tempopb.Trace{}
actualResp := &tempopb.TraceByIDResponse{}
bytesTrace, err := io.ReadAll(resp.Body)
require.NoError(t, err)
err = proto.Unmarshal(bytesTrace, actualTrace)
err = proto.Unmarshal(bytesTrace, actualResp)
require.NoError(t, err)

model.SortTrace(tc.expectedTrace)
model.SortTrace(actualTrace)
assert.True(t, proto.Equal(tc.expectedTrace, actualTrace))
model.SortTrace(actualResp.Trace)
fmt.Println(tc.expectedTrace)
assert.True(t, proto.Equal(tc.expectedTrace, actualResp.Trace))
}
})
}
Expand Down
7 changes: 5 additions & 2 deletions modules/querier/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,14 @@ func (q *Querier) TraceByIDHandler(w http.ResponseWriter, r *http.Request) {

if r.Header.Get(util.AcceptHeaderKey) == util.ProtobufTypeHeaderValue {
span.SetTag("contentType", util.ProtobufTypeHeaderValue)
b, err := proto.Marshal(resp.Trace)
b, err := proto.Marshal(resp)
Comment thread
mapno marked this conversation as resolved.
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if resp.Metrics.FailedBlocks > 0 {
w.WriteHeader(http.StatusPartialContent)
}
_, err = w.Write(b)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
Expand All @@ -94,7 +97,7 @@ func (q *Querier) TraceByIDHandler(w http.ResponseWriter, r *http.Request) {

span.SetTag("contentType", util.JSONTypeHeaderValue)
marshaller := &jsonpb.Marshaler{}
err = marshaller.Marshal(w, resp.Trace)
err = marshaller.Marshal(w, resp)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
Expand Down
7 changes: 6 additions & 1 deletion modules/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,13 +185,15 @@ func (q *Querier) FindTraceByID(ctx context.Context, req *tempopb.TraceByIDReque
ot_log.Int("combinedTraces", traceCountTotal))
}

var failedBlocks uint32
if req.QueryMode == QueryModeBlocks || req.QueryMode == QueryModeAll {
span.LogFields(ot_log.String("msg", "searching store"))
partialTraces, dataEncodings, err := q.store.Find(opentracing.ContextWithSpan(ctx, span), userID, req.TraceID, req.BlockStart, req.BlockEnd)
partialTraces, dataEncodings, fb, err := q.store.Find(opentracing.ContextWithSpan(ctx, span), userID, req.TraceID, req.BlockStart, req.BlockEnd)
if err != nil {
return nil, errors.Wrap(err, "error querying store in Querier.FindTraceByID")
}

failedBlocks = uint32(fb)
span.LogFields(ot_log.String("msg", "done searching store"))

if len(partialTraces) != 0 {
Expand Down Expand Up @@ -227,6 +229,9 @@ func (q *Querier) FindTraceByID(ctx context.Context, req *tempopb.TraceByIDReque

return &tempopb.TraceByIDResponse{
Trace: completeTrace,
Metrics: &tempopb.TraceByIDMetrics{
FailedBlocks: failedBlocks,
},
}, nil
}

Expand Down
5 changes: 3 additions & 2 deletions modules/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
type mockSharder struct {
}

func (m *mockSharder) Owns(hash string) bool {
func (m *mockSharder) Owns(string) bool {
return true
}

Expand Down Expand Up @@ -105,8 +105,9 @@ func TestReturnAllHits(t *testing.T) {
time.Sleep(200 * time.Millisecond)

// find should return both now
foundBytes, _, err := r.Find(context.Background(), util.FakeTenantID, testTraceID, tempodb.BlockIDMin, tempodb.BlockIDMax)
foundBytes, _, failedBLocks, err := r.Find(context.Background(), util.FakeTenantID, testTraceID, tempodb.BlockIDMin, tempodb.BlockIDMax)
assert.NoError(t, err)
assert.Equal(t, 0, failedBLocks)
require.Len(t, foundBytes, 2)

// expected trace
Expand Down
Loading