forked from grafana/tempo
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlocal_block.go
More file actions
90 lines (72 loc) · 2.54 KB
/
local_block.go
File metadata and controls
90 lines (72 loc) · 2.54 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package wal
import (
"context"
"time"
"go.uber.org/atomic"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/tempodb/backend"
"github.com/grafana/tempo/tempodb/backend/local"
"github.com/grafana/tempo/tempodb/encoding"
"github.com/grafana/tempo/tempodb/encoding/common"
"github.com/pkg/errors"
)
const nameFlushed = "flushed"
// LocalBlock is a block stored in a local storage. It can be searched and flushed to a remote backend, and
// permanently tracks the flushed time with a special file in the block
type LocalBlock struct {
common.BackendBlock
reader backend.Reader
writer backend.Writer
flushedTime atomic.Int64 // protecting flushedTime b/c it's accessed from the store on flush and from the ingester instance checking flush time
}
var _ common.Finder = (*LocalBlock)(nil)
func NewLocalBlock(ctx context.Context, existingBlock common.BackendBlock, l *local.Backend) (*LocalBlock, error) {
c := &LocalBlock{
BackendBlock: existingBlock,
reader: backend.NewReader(l),
writer: backend.NewWriter(l),
}
flushedBytes, err := c.reader.Read(ctx, nameFlushed, c.BlockMeta().BlockID, c.BlockMeta().TenantID, false)
if err == nil {
flushedTime := time.Time{}
err = flushedTime.UnmarshalText(flushedBytes)
if err == nil {
c.flushedTime.Store(flushedTime.Unix())
}
}
return c, nil
}
func (c *LocalBlock) FindTraceByID(ctx context.Context, id common.ID, opts common.SearchOptions) (*tempopb.Trace, error) {
return c.BackendBlock.FindTraceByID(ctx, id, opts)
}
// FlushedTime returns the time the block was flushed. Will return 0
//
// if the block was never flushed
func (c *LocalBlock) FlushedTime() time.Time {
unixTime := c.flushedTime.Load()
if unixTime == 0 {
return time.Time{} // return 0 time. 0 unix time is jan 1, 1970
}
return time.Unix(unixTime, 0)
}
func (c *LocalBlock) SetFlushed(ctx context.Context) error {
flushedTime := time.Now()
flushedBytes, err := flushedTime.MarshalText()
if err != nil {
return errors.Wrap(err, "error marshalling flush time to text")
}
err = c.writer.Write(ctx, nameFlushed, c.BlockMeta().BlockID, c.BlockMeta().TenantID, flushedBytes, false)
if err != nil {
return errors.Wrap(err, "error writing ingester block flushed file")
}
c.flushedTime.Store(flushedTime.Unix())
return nil
}
func (c *LocalBlock) Write(ctx context.Context, w backend.Writer) error {
err := encoding.CopyBlock(ctx, c.BlockMeta(), c.reader, w)
if err != nil {
return errors.Wrap(err, "error copying block from local to remote backend")
}
err = c.SetFlushed(ctx)
return err
}