Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/tempo/website/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ querier:
frontend_address: query-frontend-discovery.default.svc.cluster.local:9095 # the address of the query frontend to connect to, and process queries
```

The Querier also queries compacted blocks that fall within (2 * BlocklistPoll) where the value of Blocklist poll duration
It also queries compacted blocks that fall within the (2 * BlocklistPoll) range where the value of Blocklist poll duration
is defined in the storage section below.

## Compactor
Expand Down
22 changes: 18 additions & 4 deletions tempodb/tempodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,13 @@ func (rw *readerWriter) Find(ctx context.Context, tenantID string, id common.ID,
copiedBlocklist = append(copiedBlocklist, b)
}
}

compactedBlocklist := rw.compactedBlockLists[tenantID]
for _, c := range compactedBlocklist {
if includeCompactedBlock(c, id, blockStartBytes, blockEndBytes, rw.cfg.BlocklistPoll) {
copiedBlocklist = append(copiedBlocklist, &c.BlockMeta)
}
}
rw.blockListsMtx.Unlock()

// deliberately placed outside the blocklist mtx unlock
Expand Down Expand Up @@ -460,23 +467,21 @@ func (rw *readerWriter) cleanMissingTenants(tenants []string) {
tenantSet[tenantID] = struct{}{}
}

rw.blockListsMtx.Lock()
Comment thread
annanay25 marked this conversation as resolved.
for tenantID := range rw.blockLists {
if _, present := tenantSet[tenantID]; !present {
rw.blockListsMtx.Lock()
delete(rw.blockLists, tenantID)
rw.blockListsMtx.Unlock()
level.Info(rw.logger).Log("msg", "deleted in-memory blocklists", "tenantID", tenantID)
}
}

for tenantID := range rw.compactedBlockLists {
if _, present := tenantSet[tenantID]; !present {
rw.blockListsMtx.Lock()
delete(rw.compactedBlockLists, tenantID)
rw.blockListsMtx.Unlock()
level.Info(rw.logger).Log("msg", "deleted in-memory compacted blocklists", "tenantID", tenantID)
}
}
rw.blockListsMtx.Unlock()
}

// updateBlocklist Add and remove regular or compacted blocks from the in-memory blocklist.
Expand Down Expand Up @@ -529,3 +534,12 @@ func includeBlock(b *backend.BlockMeta, id common.ID, blockStart []byte, blockEn

return true
}

// if block is compacted within lookback period, and is within shard ranges, include it in search
func includeCompactedBlock(c *backend.CompactedBlockMeta, id common.ID, blockStart []byte, blockEnd []byte, poll time.Duration) bool {
lookback := time.Now().Add(-(2 * poll))
if c.CompactedTime.After(lookback) {
Comment thread
annanay25 marked this conversation as resolved.
Outdated
return includeBlock(&c.BlockMeta, id, blockStart, blockEnd)
}
return false
}
179 changes: 179 additions & 0 deletions tempodb/tempodb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -796,3 +796,182 @@ func TestIncludeBlock(t *testing.T) {
})
}
}

func TestIncludeCompactedBlock(t *testing.T) {
blocklistPoll := 5 * time.Minute
tests := []struct {
name string
searchID common.ID
blockStart uuid.UUID
blockEnd uuid.UUID
meta *backend.CompactedBlockMeta
expected bool
}{
{
name: "include recent",
searchID: []byte{0x05},
blockStart: uuid.MustParse(BlockIDMin),
blockEnd: uuid.MustParse(BlockIDMax),
meta: &backend.CompactedBlockMeta{
BlockMeta: backend.BlockMeta{
BlockID: uuid.MustParse("50000000-0000-0000-0000-000000000000"),
MinID: []byte{0x00},
MaxID: []byte{0x10},
},
CompactedTime: time.Now().Add(-(1 * blocklistPoll)),
},
expected: true,
},
{
name: "skip old",
searchID: []byte{0x05},
blockStart: uuid.MustParse(BlockIDMin),
blockEnd: uuid.MustParse(BlockIDMax),
meta: &backend.CompactedBlockMeta{
BlockMeta: backend.BlockMeta{
BlockID: uuid.MustParse("50000000-0000-0000-0000-000000000000"),
MinID: []byte{0x00},
MaxID: []byte{0x10},
},
CompactedTime: time.Now().Add(-(3 * blocklistPoll)),
},
expected: false,
},
{
name: "skip recent but out of range",
searchID: []byte{0x05},
blockStart: uuid.MustParse("40000000-0000-0000-0000-000000000000"),
blockEnd: uuid.MustParse("50000000-0000-0000-0000-000000000000"),
meta: &backend.CompactedBlockMeta{
BlockMeta: backend.BlockMeta{
BlockID: uuid.MustParse("51000000-0000-0000-0000-000000000000"),
MinID: []byte{0x00},
MaxID: []byte{0x10},
},
CompactedTime: time.Now().Add(-(1 * blocklistPoll)),
},
expected: false,
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
s, err := tc.blockStart.MarshalBinary()
require.NoError(t, err)
e, err := tc.blockEnd.MarshalBinary()
require.NoError(t, err)

assert.Equal(t, tc.expected, includeCompactedBlock(tc.meta, tc.searchID, s, e, blocklistPoll))
})
}

}

func TestSearchCompactedBlocks(t *testing.T) {
tempDir, err := ioutil.TempDir("/tmp", "")
defer os.RemoveAll(tempDir)
assert.NoError(t, err, "unexpected error creating temp dir")

r, w, c, err := New(&Config{
Backend: "local",
Local: &local.Config{
Path: path.Join(tempDir, "traces"),
},
Block: &encoding.BlockConfig{
IndexDownsampleBytes: 17,
BloomFP: .01,
Encoding: backend.EncLZ4_256k,
IndexPageSizeBytes: 1000,
},
WAL: &wal.Config{
Filepath: path.Join(tempDir, "wal"),
},
BlocklistPoll: time.Minute,
}, log.NewNopLogger())
assert.NoError(t, err)

c.EnableCompaction(&CompactorConfig{
ChunkSizeBytes: 10,
MaxCompactionRange: time.Hour,
BlockRetention: 0,
CompactedBlockRetention: 0,
}, &mockSharder{}, &mockOverrides{})

wal := w.WAL()
assert.NoError(t, err)

head, err := wal.NewBlock(uuid.New(), testTenantID)
assert.NoError(t, err)

// write
numMsgs := 10
reqs := make([]*tempopb.PushRequest, 0, numMsgs)
ids := make([][]byte, 0, numMsgs)
for i := 0; i < numMsgs; i++ {
id := make([]byte, 16)
rand.Read(id)
req := test.MakeRequest(rand.Int()%1000, id)
reqs = append(reqs, req)
ids = append(ids, id)

bReq, err := proto.Marshal(req)
assert.NoError(t, err)
err = head.Write(id, bReq)
assert.NoError(t, err, "unexpected error writing req")
}

complete, err := w.CompleteBlock(head, &mockSharder{})
assert.NoError(t, err)

blockID := complete.BlockMeta().BlockID.String()

err = w.WriteBlock(context.Background(), complete)
assert.NoError(t, err)

rw := r.(*readerWriter)

// poll
rw.pollBlocklist()

// read
for i, id := range ids {
bFound, err := r.Find(context.Background(), testTenantID, id, blockID, blockID)
assert.NoError(t, err)

out := &tempopb.PushRequest{}
err = proto.Unmarshal(bFound[0], out)
assert.NoError(t, err)

assert.True(t, proto.Equal(out, reqs[i]))
}

// compact
var blockMetas []*backend.BlockMeta
blockMetas = append(blockMetas, complete.BlockMeta())
assert.NoError(t, rw.compact(blockMetas, testTenantID))

// poll
rw.pollBlocklist()

// make sure the block is compacted
compactedBlocks, ok := rw.compactedBlockLists[testTenantID]
require.True(t, ok)
require.Len(t, compactedBlocks, 1)
assert.Equal(t, compactedBlocks[0].BlockID.String(), blockID)
blocks, ok := rw.blockLists[testTenantID]
require.True(t, ok)
require.Len(t, blocks, 1)
assert.NotEqual(t, blocks[0].BlockID.String(), blockID)

// find should succeed with old block range
for i, id := range ids {
bFound, err := r.Find(context.Background(), testTenantID, id, blockID, blockID)
assert.NoError(t, err)

out := &tempopb.PushRequest{}
err = proto.Unmarshal(bFound[0], out)
assert.NoError(t, err)

assert.True(t, proto.Equal(out, reqs[i]))
}
}