Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ Internal types are updated to use `scope` instead of `instrumentation_library`.
* [ENHANCEMENT] New tenant dashboard [#1901](https://github.com/grafana/tempo/pull/1901) (@mapno)
* [BUGFIX] Fix docker-compose examples not running on Apple M1 hardware [#1920](https://github.com/grafana/tempo/pull/1920) (@stoewer)
* [BUGFIX] Fix traceql parsing of most binary operations to not require spacing [#1939](https://github.com/grafana/tempo/pull/1941) (@mdisibio)
* [BUGFIX] Don't persist tenants without blocks in the ingester[#1947](https://github.com/grafana/tempo/pull/1947) (@joe-elliott)

## v1.5.0 / 2022-08-17

Expand Down
12 changes: 12 additions & 0 deletions modules/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ func (i *Ingester) stopping(_ error) error {
i.flushQueuesDone.Wait()
}

i.local.Shutdown()

return nil
}

Expand Down Expand Up @@ -411,6 +413,16 @@ func (i *Ingester) rediscoverLocalBlocks() error {
level.Info(log.Logger).Log("msg", "reloading local blocks", "tenants", len(tenants))

for _, t := range tenants {
// check if any local blocks exist for a tenant before creating the instance. this is to protect us from cases
// where left-over empty local tenant folders persist empty tenants
blocks, err := reader.Blocks(ctx, t)
if err != nil {
return err
}
if len(blocks) == 0 {
continue
}

inst, err := i.getOrCreateInstance(t)
if err != nil {
return err
Expand Down
27 changes: 27 additions & 0 deletions modules/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,33 @@ func TestWal(t *testing.T) {
}
}

func TestWalDropsZeroLength(t *testing.T) {
tmpDir := t.TempDir()
ingester, _, _ := defaultIngester(t, tmpDir)

// force cut all traces and wipe wal
for _, instance := range ingester.instances {
err := instance.CutCompleteTraces(0, true)
require.NoError(t, err, "unexpected error cutting traces")

blockID, err := instance.CutBlockIfReady(0, 0, true)
require.NoError(t, err)

err = instance.CompleteBlock(blockID)
require.NoError(t, err)

err = instance.ClearCompletingBlock(blockID)
require.NoError(t, err)

err = ingester.local.ClearBlock(blockID, instance.instanceID)
require.NoError(t, err)
}

// create new ingester. we should have no tenants b/c we all our wals should have been 0 length
ingester, _, _ = defaultIngesterWithPush(t, tmpDir, func(t testing.TB, i *Ingester, rs *v1.ResourceSpans, b []byte) {})
require.Equal(t, 0, len(ingester.instances))
}

func TestSearchWAL(t *testing.T) {
tmpDir, err := os.MkdirTemp("/tmp", "")
require.NoError(t, err, "unexpected error getting tempdir")
Expand Down
2 changes: 1 addition & 1 deletion modules/ingester/instance_search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func TestInstanceSearch(t *testing.T) {
ingester, _, _ = defaultIngester(t, tempDir)

i, ok := ingester.getInstanceByID("fake")
assert.True(t, ok)
require.True(t, ok)

sr, err = i.Search(context.Background(), req)
assert.NoError(t, err)
Expand Down
13 changes: 10 additions & 3 deletions tempodb/backend/local/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package local

import (
"encoding/json"
"errors"
"fmt"
"os"
"path"
Expand All @@ -20,14 +21,20 @@ func (rw *Backend) MarkBlockCompacted(blockID uuid.UUID, tenantID string) error

func (rw *Backend) ClearBlock(blockID uuid.UUID, tenantID string) error {
if len(tenantID) == 0 {
return fmt.Errorf("empty tenant id")
return errors.New("empty tenant id")
}

if blockID == uuid.Nil {
return fmt.Errorf("empty block id")
return errors.New("empty block id")
}

return os.RemoveAll(rw.rootPath(backend.KeyPathForBlock(blockID, tenantID)))
path := rw.rootPath(backend.KeyPathForBlock(blockID, tenantID))
err := os.RemoveAll(path)
if err != nil {
return fmt.Errorf("failed to remove keypath for block %s: %w", path, err)
}

return nil
}

func (rw *Backend) CompactedBlockMeta(blockID uuid.UUID, tenantID string) (*backend.CompactedBlockMeta, error) {
Expand Down
20 changes: 19 additions & 1 deletion tempodb/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,27 @@ func (rw *Backend) ReadRange(ctx context.Context, name string, keypath backend.K
return nil
}

// Shutdown implements backend.Reader
// Shutdown implements backend.Reader. It attempts to clear all tenants
// that do not have blocks.
func (rw *Backend) Shutdown() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forgot this existed. Excellent.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. It's weird that Shutdown() on the Reader interface does the job of cleaning up, but meh. We should combine these interfaces anyway. I don't think much is gained from the split up.

(or just add Shutdown to the writer?)

ctx := context.Background()

// Shutdown() doesn't return error so this is best effort
tenants, err := rw.List(ctx, backend.KeyPath{})
if err != nil {
return
}

for _, tenant := range tenants {
blocks, err := rw.List(ctx, backend.KeyPath{tenant})
if err != nil {
continue
}

if len(blocks) == 0 {
_ = os.RemoveAll(rw.rootPath(backend.KeyPath{tenant}))
}
}
}

func (rw *Backend) objectFileName(keypath backend.KeyPath, name string) string {
Expand Down
78 changes: 78 additions & 0 deletions tempodb/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/grafana/tempo/tempodb/backend"
)
Expand Down Expand Up @@ -67,3 +68,80 @@ func TestReadWrite(t *testing.T) {
assert.Len(t, list, 1)
assert.Equal(t, blockID.String(), list[0])
}

func TestShutdownLeavesTenantsWithBlocks(t *testing.T) {
r, w, _, err := New(&Config{
Path: t.TempDir(),
})
require.NoError(t, err)

ctx := context.Background()
blockID := uuid.New()
contents := bytes.NewReader([]byte("test"))
tenant := "fake"

// write a "block"
err = w.Write(ctx, "test", backend.KeyPathForBlock(blockID, tenant), contents, contents.Size(), false)
require.NoError(t, err)

tenantExists(t, tenant, r)
blockExists(t, blockID, tenant, r)

// shutdown the backend
r.Shutdown()

tenantExists(t, tenant, r)
blockExists(t, blockID, tenant, r)
}

func TestShutdownRemovesTenantsWithoutBlocks(t *testing.T) {
r, w, c, err := New(&Config{
Path: t.TempDir(),
})
require.NoError(t, err)

ctx := context.Background()
blockID := uuid.New()
contents := bytes.NewReader([]byte("test"))
tenant := "tenant"

// write a "block"
err = w.Write(ctx, "test", backend.KeyPathForBlock(blockID, tenant), contents, contents.Size(), false)
require.NoError(t, err)

tenantExists(t, tenant, r)
blockExists(t, blockID, tenant, r)

// clear the block
err = c.ClearBlock(blockID, tenant)
require.NoError(t, err)

tenantExists(t, tenant, r)

// block should not exist
blocks, err := r.List(ctx, backend.KeyPath{tenant})
require.NoError(t, err)
require.Len(t, blocks, 0)

// shutdown the backend
r.Shutdown()

// tenant should not exist
tenants, err := r.List(ctx, backend.KeyPath{})
require.NoError(t, err)
require.Len(t, tenants, 0)
}

func tenantExists(t *testing.T, tenant string, r backend.RawReader) {
tenants, err := r.List(context.Background(), backend.KeyPath{})
require.NoError(t, err)
require.Len(t, tenants, 1)
require.Equal(t, tenant, tenants[0])
}

func blockExists(t *testing.T, blockID uuid.UUID, tenant string, r backend.RawReader) {
blocks, err := r.List(context.Background(), backend.KeyPath{tenant})
require.NoError(t, err)
require.Len(t, blocks, 1)
require.Equal(t, blockID.String(), blocks[0])
}