Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions modules/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,16 @@ func (i *Ingester) rediscoverLocalBlocks() error {
level.Info(log.Logger).Log("msg", "reloading local blocks", "tenants", len(tenants))

for _, t := range tenants {
// check if any local blocks exist for a tenant before creating the instance. this is to protect us from cases
// where left-over empty local tenant folders persist empty tenants
blocks, err := reader.Blocks(ctx, t)
if err != nil {
return err
}
if len(blocks) == 0 {
continue
}

inst, err := i.getOrCreateInstance(t)
if err != nil {
return err
Expand Down
27 changes: 27 additions & 0 deletions modules/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,33 @@ func TestWal(t *testing.T) {
}
}

func TestWalDropsZeroLength(t *testing.T) {
tmpDir := t.TempDir()
ingester, _, _ := defaultIngester(t, tmpDir)

// force cut all traces and wipe wal
for _, instance := range ingester.instances {
err := instance.CutCompleteTraces(0, true)
require.NoError(t, err, "unexpected error cutting traces")

blockID, err := instance.CutBlockIfReady(0, 0, true)
require.NoError(t, err)

err = instance.CompleteBlock(blockID)
require.NoError(t, err)

err = instance.ClearCompletingBlock(blockID)
require.NoError(t, err)

err = ingester.local.ClearBlock(blockID, instance.instanceID)
require.NoError(t, err)
}

// create new ingester. we should have no tenants b/c we all our wals should have been 0 length
ingester, _, _ = defaultIngesterWithPush(t, tmpDir, func(t testing.TB, i *Ingester, rs *v1.ResourceSpans, b []byte) {})
require.Equal(t, 0, len(ingester.instances))
}

func TestSearchWAL(t *testing.T) {
tmpDir, err := os.MkdirTemp("/tmp", "")
require.NoError(t, err, "unexpected error getting tempdir")
Expand Down
2 changes: 1 addition & 1 deletion modules/ingester/instance_search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func TestInstanceSearch(t *testing.T) {
ingester, _, _ = defaultIngester(t, tempDir)

i, ok := ingester.getInstanceByID("fake")
assert.True(t, ok)
require.True(t, ok)

sr, err = i.Search(context.Background(), req)
assert.NoError(t, err)
Expand Down
30 changes: 27 additions & 3 deletions tempodb/backend/local/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package local

import (
"encoding/json"
"errors"
"fmt"
"os"
"path"
Expand All @@ -20,14 +21,37 @@ func (rw *Backend) MarkBlockCompacted(blockID uuid.UUID, tenantID string) error

func (rw *Backend) ClearBlock(blockID uuid.UUID, tenantID string) error {
if len(tenantID) == 0 {
return fmt.Errorf("empty tenant id")
return errors.New("empty tenant id")
}

if blockID == uuid.Nil {
return fmt.Errorf("empty block id")
return errors.New("empty block id")
}

return os.RemoveAll(rw.rootPath(backend.KeyPathForBlock(blockID, tenantID)))
path := rw.rootPath(backend.KeyPathForBlock(blockID, tenantID))
err := os.RemoveAll(path)
if err != nil {
return fmt.Errorf("failed to remove keypath for block %s: %w", path, err)
}

// if there are no more blocks in this tenant, clear the tenant
path = rw.rootPath(backend.KeyPath{tenantID})
folders, err := os.ReadDir(path)
if errors.Is(err, os.ErrNotExist) {
return nil
}
if err != nil {
return fmt.Errorf("failed to list keypath for tenant %s: %w", path, err)
}

if len(folders) == 0 {
err = os.RemoveAll(path)
Comment thread
joe-elliott marked this conversation as resolved.
Outdated
if err != nil {
return fmt.Errorf("failed to remove keypath for tenant %s: %w", path, err)
}
}

return nil
}

func (rw *Backend) CompactedBlockMeta(blockID uuid.UUID, tenantID string) (*backend.CompactedBlockMeta, error) {
Expand Down