forked from grafana/tempo
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathconfig.go
More file actions
112 lines (87 loc) · 3.58 KB
/
config.go
File metadata and controls
112 lines (87 loc) · 3.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package blockbuilder
import (
"encoding/json"
"flag"
"fmt"
"os"
"time"
"github.com/go-kit/log/level"
"github.com/grafana/tempo/pkg/ingest"
"github.com/grafana/tempo/pkg/util/log"
"github.com/grafana/tempo/tempodb/encoding"
"github.com/grafana/tempo/tempodb/encoding/common"
"github.com/grafana/tempo/tempodb/wal"
)
type BlockConfig struct {
MaxBlockBytes uint64 `yaml:"max_block_bytes" doc:"Maximum size of a block."`
BlockCfg common.BlockConfig `yaml:"-,inline"`
}
func (c *BlockConfig) RegisterFlags(f *flag.FlagSet) {
c.RegisterFlagsAndApplyDefaults("", f)
}
func (c *BlockConfig) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) {
f.Uint64Var(&c.MaxBlockBytes, prefix+".max-block-bytes", 20*1024*1024, "Maximum size of a block.") // TODO - Review default
c.BlockCfg.Version = encoding.DefaultEncoding().Version()
c.BlockCfg.RegisterFlagsAndApplyDefaults(prefix, f)
}
type Config struct {
InstanceID string `yaml:"instance_id" doc:"Instance id."`
AssignedPartitions map[string][]int32 `yaml:"assigned_partitions" doc:"List of partitions assigned to this block builder."`
ConsumeCycleDuration time.Duration `yaml:"consume_cycle_duration" doc:"Interval between consumption cycles."`
LookbackOnNoCommit time.Duration `yaml:"lookback_on_no_commit" category:"advanced"`
BlockConfig BlockConfig `yaml:"block" doc:"Configuration for the block builder."`
WAL wal.Config `yaml:"wal" doc:"Configuration for the write ahead log."`
// This config is dynamically injected because defined outside the ingester config.
IngestStorageConfig ingest.Config `yaml:"-"`
}
func (c *Config) Validate() error {
if c.BlockConfig.BlockCfg.Version != c.WAL.Version {
c.WAL.Version = c.BlockConfig.BlockCfg.Version
}
if err := common.ValidateConfig(&c.BlockConfig.BlockCfg); err != nil {
return fmt.Errorf("block config validation failed: %w", err)
}
if err := c.WAL.Validate(); err != nil {
return fmt.Errorf("wal config validation failed: %w", err)
}
return nil
}
func (c *Config) RegisterFlags(f *flag.FlagSet) {
c.RegisterFlagsAndApplyDefaults("", f)
}
func (c *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) {
hostname, err := os.Hostname()
if err != nil {
level.Error(log.Logger).Log("msg", "failed to get hostname", "err", err)
os.Exit(1)
}
f.StringVar(&c.InstanceID, "block-builder.instance-id", hostname, "Instance id.")
f.Var(newPartitionAssignmentVar(&c.AssignedPartitions), prefix+".assigned-partitions", "List of partitions assigned to this block builder.")
f.DurationVar(&c.ConsumeCycleDuration, prefix+".consume-cycle-duration", 5*time.Minute, "Interval between consumption cycles.")
// TODO - Review default
f.DurationVar(&c.LookbackOnNoCommit, prefix+".lookback-on-no-commit", 12*time.Hour, "How much of the historical records to look back when there is no kafka commit for a partition.")
c.BlockConfig.RegisterFlagsAndApplyDefaults(prefix+".block", f)
c.WAL.RegisterFlags(f)
c.WAL.Version = c.BlockConfig.BlockCfg.Version
f.StringVar(&c.WAL.Filepath, prefix+".wal.path", "/var/tempo/block-builder/traces", "Path at which store WAL blocks.")
}
type partitionAssignmentVar struct {
p *map[string][]int32
}
func newPartitionAssignmentVar(p *map[string][]int32) *partitionAssignmentVar {
return &partitionAssignmentVar{p}
}
func (p *partitionAssignmentVar) Set(s string) error {
if s == "" {
return nil
}
val := make(map[string][]int32)
if err := json.Unmarshal([]byte(s), &val); err != nil {
return err
}
*p.p = val
return nil
}
func (p *partitionAssignmentVar) String() string {
return fmt.Sprintf("%v", *p.p)
}