Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion cmd/tempo/app/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,11 @@ func (t *App) initQuerier() (services.Service, error) {
}

func (t *App) initQueryFrontend() (services.Service, error) {
var err error
if t.cfg.Frontend.QueryShards < frontend.MinQueryShards || t.cfg.Frontend.QueryShards > frontend.MaxQueryShards {
Comment thread
joe-elliott marked this conversation as resolved.
return nil, fmt.Errorf("frontend query shards should be between %d and %d (both inclusive)", frontend.MinQueryShards, frontend.MaxQueryShards)
}

var err error
cortexTripper, v1, _, err := cortex_frontend.InitFrontend(t.cfg.Frontend.Config, frontend.CortexNoQuerierLimits{}, 0, log.Logger, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
Expand All @@ -179,7 +182,9 @@ func (t *App) initQueryFrontend() (services.Service, error) {
t.httpAuthMiddleware,
).Wrap(cortexHandler)

// register grpc server for queriers to connect to
cortex_frontend_v1pb.RegisterFrontendServer(t.server.GRPC, t.frontend)
// http query endpoint
t.server.HTTP.Handle("/api/traces/{traceID}", tracesHandler)

return services.NewIdleService(nil, func(_ error) error {
Expand Down
2 changes: 1 addition & 1 deletion modules/frontend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet)
cfg.Config.DownstreamURL = ""
cfg.Config.Handler.LogQueriesLongerThan = 0
cfg.Config.FrontendV1.MaxOutstandingPerTenant = 100
cfg.QueryShards = 4
cfg.QueryShards = 2
}

type CortexNoQuerierLimits struct{}
Expand Down
15 changes: 9 additions & 6 deletions modules/frontend/querysharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ import (
)

const (
MinQueryShards = 2
MaxQueryShards = 256

querierPrefix = "/querier"
queryDelimiter = "?"
)
Expand Down Expand Up @@ -53,7 +56,7 @@ func (s shardQuery) Do(r *http.Request) (*http.Response, error) {

// only need to initialise boundaries once
if len(s.blockBoundaries) == 0 {
s.blockBoundaries = createBlockBoundaries(s.queryShards)
s.blockBoundaries = createBlockBoundaries(s.queryShards - 1) // one shard will be used to query ingesters
Comment thread
annanay25 marked this conversation as resolved.
Outdated
}

// check marshalling format
Expand All @@ -63,15 +66,15 @@ func (s shardQuery) Do(r *http.Request) (*http.Response, error) {
}

reqs := make([]*http.Request, s.queryShards)
for i := 0; i < len(s.blockBoundaries)-1; i++ {
for i := 0; i < s.queryShards; i++ {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of creating block boundaries, it would be a lot cleaner to make a method that created a slice of structs that represented the query. like:

type queryParams struct {
  queryShards bool
  startBlock []byte
  endBlock []byte
}

that would simplify this logic quite a bit and would be much easier to test as well.

reqs[i] = r.Clone(r.Context())
q := reqs[i].URL.Query()
q.Add(querier.BlockStartKey, hex.EncodeToString(s.blockBoundaries[i]))
q.Add(querier.BlockEndKey, hex.EncodeToString(s.blockBoundaries[i+1]))

if i == 0 {
q := reqs[i].URL.Query()
if i == (s.queryShards - 1) { // one shard dedicated to querying ingesters
q.Add(querier.QueryIngestersKey, "true")
} else {
q.Add(querier.BlockStartKey, hex.EncodeToString(s.blockBoundaries[i]))
q.Add(querier.BlockEndKey, hex.EncodeToString(s.blockBoundaries[i+1]))
q.Add(querier.QueryIngestersKey, "false")
}

Expand Down
3 changes: 3 additions & 0 deletions modules/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/go-kit/kit/log/level"
"github.com/opentracing/opentracing-go"
ot_log "github.com/opentracing/opentracing-go/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/weaveworks/common/user"
Expand Down Expand Up @@ -209,6 +210,8 @@ func (i *Ingester) FindTraceByID(ctx context.Context, req *tempopb.TraceByIDRequ
return nil, err
}

span.LogFields(ot_log.Bool("trace found", trace != nil))

return &tempopb.TraceByIDResponse{
Trace: trace,
}, nil
Expand Down
11 changes: 8 additions & 3 deletions modules/querier/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,7 @@ func (q *Querier) TraceByIDHandler(w http.ResponseWriter, r *http.Request) {

// return values are (valid, blockStart, blockEnd, queryIngesters)
func validateAndSanitizeRequest(r *http.Request) (string, string, bool, error) {
// get parameter values
q := r.URL.Query().Get(QueryIngestersKey)
start := r.URL.Query().Get(BlockStartKey)
end := r.URL.Query().Get(BlockEndKey)

// validate queryIngesters. it should either be empty or one of (true|false)
var queryIngesters bool
Expand All @@ -109,6 +106,14 @@ func validateAndSanitizeRequest(r *http.Request) (string, string, bool, error) {
return "", "", false, fmt.Errorf("invalid value for queryIngesters %s", q)
}

// no need to validate/sanitize other parameters if queryIngesters is true
if queryIngesters {
Comment thread
annanay25 marked this conversation as resolved.
Outdated
return "", "", true, nil
}

start := r.URL.Query().Get(BlockStartKey)
end := r.URL.Query().Get(BlockEndKey)

// validate start. it should either be empty or a valid uuid
if len(start) == 0 {
start = tempodb.BlockIDMin
Expand Down
51 changes: 24 additions & 27 deletions modules/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,11 +150,7 @@ func (q *Querier) FindTraceByID(ctx context.Context, req *tempopb.TraceByIDReque
var completeTrace *tempopb.Trace
var spanCount, spanCountTotal int
if req.QueryIngesters {
key := tempo_util.TokenFor(userID, req.TraceID)

const maxExpectedReplicationSet = 3 // 3. b/c frigg it
var descs [maxExpectedReplicationSet]ring.InstanceDesc
replicationSet, err := q.ring.Get(key, ring.Read, descs[:0], nil, nil)
replicationSet, err := q.ring.GetAllHealthy(ring.Read)
if err != nil {
return nil, errors.Wrap(err, "error finding ingesters in Querier.FindTraceByID")
}
Expand All @@ -175,34 +171,35 @@ func (q *Querier) FindTraceByID(ctx context.Context, req *tempopb.TraceByIDReque
if spanCount > 0 {
spanCountTotal = spanCount
}
span.LogFields(ot_log.String("msg", "combined trace protos from ingesters"))
}
}

span.LogFields(ot_log.String("msg", "done searching ingesters"), ot_log.Bool("found", completeTrace != nil))
}

span.LogFields(ot_log.String("msg", "searching store"))
partialTraces, err := q.store.Find(opentracing.ContextWithSpan(ctx, span), userID, req.TraceID, req.BlockStart, req.BlockEnd)
if err != nil {
return nil, errors.Wrap(err, "error querying store in Querier.FindTraceByID")
}

span.LogFields(ot_log.String("msg", "done searching store"))
// combine partialTraces with completeTrace
for _, partialTrace := range partialTraces {
storeTrace := &tempopb.Trace{}
err = proto.Unmarshal(partialTrace, storeTrace)
span.LogFields(ot_log.String("msg", "done searching ingesters"),
Comment thread
annanay25 marked this conversation as resolved.
ot_log.Bool("found", completeTrace != nil),
ot_log.Int("spanCountTotal", spanCountTotal))
} else {
span.LogFields(ot_log.String("msg", "searching store"))
partialTraces, err := q.store.Find(opentracing.ContextWithSpan(ctx, span), userID, req.TraceID, req.BlockStart, req.BlockEnd)
if err != nil {
return nil, err
return nil, errors.Wrap(err, "error querying store in Querier.FindTraceByID")
}
completeTrace, _, _, spanCount = tempo_util.CombineTraceProtos(completeTrace, storeTrace)
if spanCount > 0 {
spanCountTotal = spanCount

span.LogFields(ot_log.String("msg", "done searching store"))

// combine partialTraces
for _, partialTrace := range partialTraces {
storeTrace := &tempopb.Trace{}
err = proto.Unmarshal(partialTrace, storeTrace)
if err != nil {
return nil, err
}
completeTrace, _, _, spanCount = tempo_util.CombineTraceProtos(completeTrace, storeTrace)
if spanCount > 0 {
spanCountTotal = spanCount
}
}
span.LogFields(ot_log.String("msg", "combined trace protos from store"),
ot_log.Int("spanCountTotal", spanCountTotal))
}
span.LogFields(ot_log.String("msg", "combined trace protos from ingesters and store"),
ot_log.Int("spanCountTotal", spanCountTotal))

return &tempopb.TraceByIDResponse{
Trace: completeTrace,
Expand Down