Skip to content

Commit 4a4f01c

Browse files
author
Yuri Shkuro
committed
Support archive storage in the query-service
Signed-off-by: Yuri Shkuro <ys@uber.com>
1 parent 8db7a11 commit 4a4f01c

File tree

9 files changed

+271
-14
lines changed

9 files changed

+271
-14
lines changed

cmd/query/main.go

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import (
3939
"github.com/jaegertracing/jaeger/pkg/recoveryhandler"
4040
"github.com/jaegertracing/jaeger/pkg/version"
4141
"github.com/jaegertracing/jaeger/plugin/storage"
42+
istorage "github.com/jaegertracing/jaeger/storage"
4243
)
4344

4445
func main() {
@@ -107,12 +108,16 @@ func main() {
107108
logger.Fatal("Failed to create dependency reader", zap.Error(err))
108109
}
109110

111+
apiHandlerOptions := []app.HandlerOption{
112+
app.HandlerOptions.Prefix(queryOpts.Prefix),
113+
app.HandlerOptions.Logger(logger),
114+
app.HandlerOptions.Tracer(tracer),
115+
}
116+
apiHandlerOptions = append(apiHandlerOptions, archiveOptions(storageFactory, logger)...)
110117
apiHandler := app.NewAPIHandler(
111118
spanReader,
112119
dependencyReader,
113-
app.HandlerOptions.Prefix(queryOpts.Prefix),
114-
app.HandlerOptions.Logger(logger),
115-
app.HandlerOptions.Tracer(tracer))
120+
apiHandlerOptions...)
116121
r := mux.NewRouter()
117122
apiHandler.RegisterRoutes(r)
118123
registerStaticHandler(r, logger, queryOpts)
@@ -174,3 +179,26 @@ func registerStaticHandler(r *mux.Router, logger *zap.Logger, qOpts *app.QueryOp
174179
logger.Info("Static handler is not registered")
175180
}
176181
}
182+
183+
func archiveOptions(storageFactory istorage.Factory, logger *zap.Logger) []app.HandlerOption {
184+
reader, err := storageFactory.CreateSpanReader()
185+
if err == istorage.ErrArchiveStorageNotConfigured || err == istorage.ErrArchiveStorageNotSupported {
186+
return nil
187+
}
188+
if err != nil {
189+
logger.Error("Cannot init archive storage reader", zap.Error(err))
190+
return nil
191+
}
192+
writer, err := storageFactory.CreateSpanWriter()
193+
if err == istorage.ErrArchiveStorageNotConfigured || err == istorage.ErrArchiveStorageNotSupported {
194+
return nil
195+
}
196+
if err != nil {
197+
logger.Error("Cannot init archive storage writer", zap.Error(err))
198+
return nil
199+
}
200+
return []app.HandlerOption{
201+
app.HandlerOptions.ArchiveSpanReader(reader),
202+
app.HandlerOptions.ArchiveSpanWriter(writer),
203+
}
204+
}

plugin/storage/cassandra/factory.go

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,16 @@ import (
2525
"github.com/jaegertracing/jaeger/pkg/cassandra/config"
2626
cDepStore "github.com/jaegertracing/jaeger/plugin/storage/cassandra/dependencystore"
2727
cSpanStore "github.com/jaegertracing/jaeger/plugin/storage/cassandra/spanstore"
28+
"github.com/jaegertracing/jaeger/storage"
2829
"github.com/jaegertracing/jaeger/storage/dependencystore"
2930
"github.com/jaegertracing/jaeger/storage/spanstore"
3031
)
3132

33+
const (
34+
primaryStorageConfig = "cassandra"
35+
archiveStorageConfig = "cassandra-archive"
36+
)
37+
3238
// Factory implements storage.Factory for Cassandra backend.
3339
type Factory struct {
3440
Options *Options
@@ -38,13 +44,14 @@ type Factory struct {
3844

3945
primaryConfig config.SessionBuilder
4046
primarySession cassandra.Session
41-
// archiveSession cassandra.Session TODO
47+
archiveConfig config.SessionBuilder
48+
archiveSession cassandra.Session
4249
}
4350

4451
// NewFactory creates a new Factory.
4552
func NewFactory() *Factory {
4653
return &Factory{
47-
Options: NewOptions("cassandra"), // TODO add "cassandra-archive" once supported
54+
Options: NewOptions(primaryStorageConfig, archiveStorageConfig),
4855
}
4956
}
5057

@@ -57,6 +64,9 @@ func (f *Factory) AddFlags(flagSet *flag.FlagSet) {
5764
func (f *Factory) InitFromViper(v *viper.Viper) {
5865
f.Options.InitFromViper(v)
5966
f.primaryConfig = f.Options.GetPrimary()
67+
if cfg := f.Options.Get(archiveStorageConfig); cfg != nil {
68+
f.archiveConfig = cfg // this is so stupid - see https://golang.org/doc/faq#nil_error
69+
}
6070
}
6171

6272
// Initialize implements storage.Factory
@@ -68,7 +78,14 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger)
6878
return err
6979
}
7080
f.primarySession = primarySession
71-
// TODO init archive (cf. https://github.com/jaegertracing/jaeger/pull/604)
81+
82+
if f.archiveConfig != nil {
83+
if archiveSession, err := f.archiveConfig.NewSession(); err == nil {
84+
f.archiveSession = archiveSession
85+
} else {
86+
return err
87+
}
88+
}
7289
return nil
7390
}
7491

@@ -86,3 +103,19 @@ func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) {
86103
func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) {
87104
return cDepStore.NewDependencyStore(f.primarySession, f.Options.DepStoreDataFrequency, f.metricsFactory, f.logger), nil
88105
}
106+
107+
// CreateArchiveSpanReader implements storage.ArchiveFactory
108+
func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) {
109+
if f.archiveSession == nil {
110+
return nil, storage.ErrArchiveStorageNotConfigured
111+
}
112+
return cSpanStore.NewSpanReader(f.archiveSession, f.metricsFactory, f.logger), nil
113+
}
114+
115+
// CreateArchiveSpanWriter implements storage.ArchiveFactory
116+
func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) {
117+
if f.archiveSession == nil {
118+
return nil, storage.ErrArchiveStorageNotConfigured
119+
}
120+
return cSpanStore.NewSpanWriter(f.archiveSession, f.Options.SpanStoreWriteCacheTTL, f.metricsFactory, f.logger), nil
121+
}

plugin/storage/cassandra/factory_test.go

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,12 @@ import (
2525
"github.com/jaegertracing/jaeger/pkg/cassandra"
2626
"github.com/jaegertracing/jaeger/pkg/cassandra/mocks"
2727
"github.com/jaegertracing/jaeger/pkg/config"
28+
2829
"github.com/jaegertracing/jaeger/storage"
2930
)
3031

3132
var _ storage.Factory = new(Factory)
33+
var _ storage.ArchiveFactory = new(Factory)
3234

3335
type mockSessionBuilder struct {
3436
err error
@@ -44,7 +46,7 @@ func (m *mockSessionBuilder) NewSession() (cassandra.Session, error) {
4446
func TestCassandraFactory(t *testing.T) {
4547
f := NewFactory()
4648
v, command := config.Viperize(f.AddFlags)
47-
command.ParseFlags([]string{})
49+
command.ParseFlags([]string{"--cassandra-archive.enabled=true"})
4850
f.InitFromViper(v)
4951

5052
// after InitFromViper, f.primaryConfig points to a real session builder that will fail in unit tests,
@@ -53,6 +55,10 @@ func TestCassandraFactory(t *testing.T) {
5355
assert.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "made-up error")
5456

5557
f.primaryConfig = &mockSessionBuilder{}
58+
f.archiveConfig = &mockSessionBuilder{err: errors.New("made-up error")}
59+
assert.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "made-up error")
60+
61+
f.archiveConfig = nil
5662
assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop()))
5763

5864
_, err := f.CreateSpanReader()
@@ -63,4 +69,19 @@ func TestCassandraFactory(t *testing.T) {
6369

6470
_, err = f.CreateDependencyReader()
6571
assert.NoError(t, err)
72+
73+
_, err = f.CreateArchiveSpanReader()
74+
assert.EqualError(t, err, "Archive storage not configured")
75+
76+
_, err = f.CreateArchiveSpanWriter()
77+
assert.EqualError(t, err, "Archive storage not configured")
78+
79+
f.archiveConfig = &mockSessionBuilder{}
80+
assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop()))
81+
82+
_, err = f.CreateArchiveSpanReader()
83+
assert.NoError(t, err)
84+
85+
_, err = f.CreateArchiveSpanWriter()
86+
assert.NoError(t, err)
6687
}

plugin/storage/cassandra/options.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626

2727
const (
2828
// session settings
29+
suffixEnabled = ".enabled"
2930
suffixConnPerHost = ".connections-per-host"
3031
suffixMaxRetryAttempts = ".max-retry-attempts"
3132
suffixTimeout = ".timeout"
@@ -65,6 +66,8 @@ type namespaceConfig struct {
6566
config.Configuration
6667
servers string
6768
namespace string
69+
primary bool
70+
Enabled bool
6871
}
6972

7073
// NewOptions creates a new Options struct.
@@ -78,12 +81,14 @@ func NewOptions(primaryNamespace string, otherNamespaces ...string) *Options {
7881
EnableHostVerification: true,
7982
},
8083
MaxRetryAttempts: 3,
81-
Keyspace: "jaeger_v1_local",
84+
Keyspace: "jaeger_v1_test",
8285
ProtoVersion: 4,
8386
ConnectionsPerHost: 2,
8487
},
8588
servers: "127.0.0.1",
8689
namespace: primaryNamespace,
90+
primary: true,
91+
Enabled: true,
8792
},
8893
others: make(map[string]*namespaceConfig, len(otherNamespaces)),
8994
SpanStoreWriteCacheTTL: time.Hour * 12,
@@ -112,6 +117,12 @@ func (opt *Options) AddFlags(flagSet *flag.FlagSet) {
112117
}
113118

114119
func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) {
120+
if !nsConfig.primary {
121+
flagSet.Bool(
122+
nsConfig.namespace+suffixEnabled,
123+
false,
124+
"Enable extra storage")
125+
}
115126
flagSet.Int(
116127
nsConfig.namespace+suffixConnPerHost,
117128
nsConfig.ConnectionsPerHost,
@@ -189,6 +200,9 @@ func (opt *Options) InitFromViper(v *viper.Viper) {
189200
}
190201

191202
func (cfg *namespaceConfig) initFromViper(v *viper.Viper) {
203+
if !cfg.primary {
204+
cfg.Enabled = v.GetBool(cfg.namespace + suffixEnabled)
205+
}
192206
cfg.ConnectionsPerHost = v.GetInt(cfg.namespace + suffixConnPerHost)
193207
cfg.MaxRetryAttempts = v.GetInt(cfg.namespace + suffixMaxRetryAttempts)
194208
cfg.Timeout = v.GetDuration(cfg.namespace + suffixTimeout)
@@ -220,6 +234,9 @@ func (opt *Options) Get(namespace string) *config.Configuration {
220234
nsCfg = &namespaceConfig{}
221235
opt.others[namespace] = nsCfg
222236
}
237+
if !nsCfg.Enabled {
238+
return nil
239+
}
223240
nsCfg.Configuration.ApplyDefaults(&opt.primary.Configuration)
224241
if nsCfg.servers == "" {
225242
nsCfg.servers = opt.primary.servers

plugin/storage/cassandra/options_test.go

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"time"
2020

2121
"github.com/stretchr/testify/assert"
22+
"github.com/stretchr/testify/require"
2223

2324
"github.com/jaegertracing/jaeger/pkg/config"
2425
)
@@ -31,13 +32,19 @@ func TestOptions(t *testing.T) {
3132
assert.Equal(t, 2, primary.ConnectionsPerHost)
3233

3334
aux := opts.Get("archive")
35+
assert.Nil(t, aux)
36+
37+
assert.NotNil(t, opts.others["archive"])
38+
opts.others["archive"].Enabled = true
39+
aux = opts.Get("archive")
40+
require.NotNil(t, aux)
3441
assert.Equal(t, primary.Keyspace, aux.Keyspace)
3542
assert.Equal(t, primary.Servers, aux.Servers)
3643
assert.Equal(t, primary.ConnectionsPerHost, aux.ConnectionsPerHost)
3744
}
3845

3946
func TestOptionsWithFlags(t *testing.T) {
40-
opts := NewOptions("cas", "cas.aux")
47+
opts := NewOptions("cas", "cas-aux")
4148
v, command := config.Viperize(opts.AddFlags)
4249
command.ParseFlags([]string{
4350
"--cas.keyspace=jaeger",
@@ -48,17 +55,19 @@ func TestOptionsWithFlags(t *testing.T) {
4855
"--cas.port=4242",
4956
"--cas.proto-version=3",
5057
"--cas.socket-keep-alive=42s",
51-
// a couple overrides
52-
"--cas.aux.keyspace=jaeger-archive",
53-
"--cas.aux.servers=3.3.3.3,4.4.4.4",
58+
// enable aux with a couple overrides
59+
"--cas-aux.enabled=true",
60+
"--cas-aux.keyspace=jaeger-archive",
61+
"--cas-aux.servers=3.3.3.3,4.4.4.4",
5462
})
5563
opts.InitFromViper(v)
5664

5765
primary := opts.GetPrimary()
5866
assert.Equal(t, "jaeger", primary.Keyspace)
5967
assert.Equal(t, []string{"1.1.1.1", "2.2.2.2"}, primary.Servers)
6068

61-
aux := opts.Get("cas.aux")
69+
aux := opts.Get("cas-aux")
70+
require.NotNil(t, aux)
6271
assert.Equal(t, "jaeger-archive", aux.Keyspace)
6372
assert.Equal(t, []string{"3.3.3.3", "4.4.4.4"}, aux.Servers)
6473
assert.Equal(t, 42, aux.ConnectionsPerHost)

plugin/storage/factory.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,3 +131,29 @@ func (f *Factory) InitFromViper(v *viper.Viper) {
131131
}
132132
}
133133
}
134+
135+
// CreateArchiveSpanReader implements storage.ArchiveFactory
136+
func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) {
137+
factory, ok := f.factories[f.SpanStorageType]
138+
if !ok {
139+
return nil, fmt.Errorf("No %s backend registered for span store", f.SpanStorageType)
140+
}
141+
archive, ok := factory.(storage.ArchiveFactory)
142+
if !ok {
143+
return nil, storage.ErrArchiveStorageNotSupported
144+
}
145+
return archive.CreateArchiveSpanReader()
146+
}
147+
148+
// CreateArchiveSpanWriter implements storage.ArchiveFactory
149+
func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) {
150+
factory, ok := f.factories[f.SpanStorageType]
151+
if !ok {
152+
return nil, fmt.Errorf("No %s backend registered for span store", f.SpanStorageType)
153+
}
154+
archive, ok := factory.(storage.ArchiveFactory)
155+
if !ok {
156+
return nil, storage.ErrArchiveStorageNotSupported
157+
}
158+
return archive.CreateArchiveSpanWriter()
159+
}

0 commit comments

Comments
 (0)