Skip to content
187 changes: 93 additions & 94 deletions modules/ingester/instance_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strings"
"sync"

"github.com/google/uuid"
ot_log "github.com/opentracing/opentracing-go/log"
"go.uber.org/atomic"

Expand Down Expand Up @@ -40,90 +41,16 @@ func (i *instance) Search(ctx context.Context, req *tempopb.SearchRequest) (*tem

span.LogFields(ot_log.String("SearchRequest", req.String()))

sr := search.NewResults(maxResults)
defer sr.Close() // signal all running workers to quit
var (
resultsMtx = sync.Mutex{}
combiner = traceql.NewMetadataCombiner()
metrics = &tempopb.SearchMetrics{}
opts = common.DefaultSearchOptions()
anyErr atomic.Error
)

// Lock headblock separately from other blocks and release it as soon as this
// subtask is finished.
// A warning about deadlocks!! This area does a hard-acquire of both mutexes.
// To avoid deadlocks this function and all others must acquire them in
// the ** same_order ** or else!!! i.e. another function can't acquire blocksMtx
// then headblockMtx. Even if the likelihood is low it is a statistical certainly
// that eventually a deadlock will occur.
i.headBlockMtx.RLock()
i.searchBlock(ctx, req, sr, i.headBlock.BlockMeta(), i.headBlock, i.headBlockMtx.RUnlock)

// Lock blocks mutex until all search tasks are finished and this function exists. This avoids
// deadlocking with other activity (ingest, flushing), caused by releasing
// and then attempting to retake the lock.
i.blocksMtx.RLock()
defer i.blocksMtx.RUnlock()

for _, b := range i.completingBlocks {
i.searchBlock(ctx, req, sr, b.BlockMeta(), b, nil)
}

for _, b := range i.completeBlocks {
i.searchBlock(ctx, req, sr, b.BlockMeta(), b, nil)
}

sr.AllWorkersStarted()

// read and combine search results
combiner := traceql.NewMetadataCombiner()

// collect results from all the goroutines via sr.Results channel.
// range loop will exit when sr.Results channel is closed.
for result := range sr.Results() {
if combiner.Count() >= maxResults {
sr.Close() // signal pending workers to exit
continue
}

combiner.AddMetadata(result)
}

if sr.Error() != nil {
return nil, sr.Error()
}

return &tempopb.SearchResponse{
Traces: combiner.Metadata(),
Metrics: &tempopb.SearchMetrics{
InspectedTraces: sr.TracesInspected(),
InspectedBytes: sr.BytesInspected(),
},
}, nil
}

// searchBlock starts a search task for the given block. The block must already be under lock,
// and this method calls cleanup to unlock the block when done.
func (i *instance) searchBlock(ctx context.Context, req *tempopb.SearchRequest, sr *search.Results, meta *backend.BlockMeta, block common.Searcher, cleanup func()) {
// confirm block should be included in search
if !includeBlock(meta, req) {
if cleanup != nil {
cleanup()
}
return
}

if sr.Quit() {
if cleanup != nil {
cleanup()
}
return
}

blockID := meta.BlockID

sr.StartWorker()
go func(e common.Searcher, cleanup func()) {
if cleanup != nil {
defer cleanup()
}
defer sr.FinishWorker()

span, ctx := opentracing.StartSpanFromContext(ctx, "instance.searchBlock")
search := func(blockID uuid.UUID, block common.Searcher, spanName string) {
span, ctx := opentracing.StartSpanFromContext(ctx, "instance.searchBlock."+spanName)
defer span.Finish()

span.LogFields(ot_log.Event("block entry mtx acquired"))
Expand All @@ -132,15 +59,14 @@ func (i *instance) searchBlock(ctx context.Context, req *tempopb.SearchRequest,
var resp *tempopb.SearchResponse
var err error

opts := common.DefaultSearchOptions()
if api.IsTraceQLQuery(req) {
// note: we are creating new engine for each wal block,
// and engine.ExecuteSearch is parsing the query for each block
resp, err = traceql.NewEngine().ExecuteSearch(ctx, req, traceql.NewSpansetFetcherWrapper(func(ctx context.Context, req traceql.FetchSpansRequest) (traceql.FetchSpansResponse, error) {
return e.Fetch(ctx, req, opts)
return block.Fetch(ctx, req, opts)
}))
} else {
resp, err = e.Search(ctx, req, opts)
resp, err = block.Search(ctx, req, opts)
}

if errors.Is(err, common.ErrUnsupported) {
Expand All @@ -149,20 +75,93 @@ func (i *instance) searchBlock(ctx context.Context, req *tempopb.SearchRequest,
}
if err != nil {
level.Error(log.Logger).Log("msg", "error searching block", "blockID", blockID, "err", err)
sr.SetError(err)
anyErr.Store(err)
return
}

if resp == nil {
return
}

for _, t := range resp.Traces {
if sr.AddResult(ctx, t) {
break
resultsMtx.Lock()
defer resultsMtx.Unlock()

if resp.Metrics != nil {
metrics.InspectedTraces += resp.Metrics.InspectedTraces
metrics.InspectedBytes += resp.Metrics.InspectedBytes
}

for _, tr := range resp.Traces {
combiner.AddMetadata(tr)
if combiner.Count() >= maxResults {
return
Comment thread
joe-elliott marked this conversation as resolved.
}
}
sr.AddBlockInspected()
}

sr.AddBytesInspected(resp.Metrics.InspectedBytes)
sr.AddTraceInspected(resp.Metrics.InspectedTraces)
}(block, cleanup)
// Search headblock (synchronously)
// Lock headblock separately from other blocks and release it as quickly as possible.
Comment thread
joe-elliott marked this conversation as resolved.
// A warning about deadlocks!! This area does a hard-acquire of both mutexes.
// To avoid deadlocks this function and all others must acquire them in
// the ** same_order ** or else!!! i.e. another function can't acquire blocksMtx
// then headblockMtx. Even if the likelihood is low it is a statistical certainly
// that eventually a deadlock will occur.
i.headBlockMtx.RLock()
if includeBlock(i.headBlock.BlockMeta(), req) {
search(i.headBlock.BlockMeta().BlockID, i.headBlock, "headBlock")
}
i.headBlockMtx.RUnlock()
if err := anyErr.Load(); err != nil {
return nil, err
}
if combiner.Count() >= maxResults {
return &tempopb.SearchResponse{
Traces: combiner.Metadata(),
Metrics: metrics,
}, nil
}

// Search all other blocks (concurrently)
// Lock blocks mutex until all search tasks are finished and this function exists. This avoids
Comment thread
joe-elliott marked this conversation as resolved.
Outdated
// deadlocking with other activity (ingest, flushing), caused by releasing
// and then attempting to retake the lock.
i.blocksMtx.RLock()
defer i.blocksMtx.RUnlock()

wg := sync.WaitGroup{}

for _, b := range i.completingBlocks {
if !includeBlock(b.BlockMeta(), req) {
continue
}

wg.Add(1)
go func(b common.WALBlock) {
defer wg.Done()
search(b.BlockMeta().BlockID, b, "completingBlock")
}(b)
}

for _, b := range i.completeBlocks {
if !includeBlock(b.BlockMeta(), req) {
continue
}
wg.Add(1)
go func(b *localBlock) {
defer wg.Done()
search(b.BlockMeta().BlockID, b, "completeBlock")
}(b)
}

wg.Wait()

if err := anyErr.Load(); err != nil {
return nil, err
}
return &tempopb.SearchResponse{
Traces: combiner.Metadata(),
Metrics: metrics,
}, nil
}

func (i *instance) SearchTags(ctx context.Context, scope string) (*tempopb.SearchTagsResponse, error) {
Expand Down
151 changes: 0 additions & 151 deletions pkg/search/results.go

This file was deleted.

Loading