Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions plugin/storage/cassandra/spanstore/dbmodel/index_filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright (c) 2018 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package dbmodel

const (
// DurationIndex represents the flag for indexing by duration.
DurationIndex = iota

// ServiceIndex represents the flag for indexing by service.
ServiceIndex

// OperationIndex represents the flag for indexing by service-operation.
OperationIndex
)

// IndexFilter filters out any spans that should not be indexed depending on the index specified.
type IndexFilter func(span *Span, index int) bool

// DefaultIndexFilter is a filter that indexes everything.
var DefaultIndexFilter = func(span *Span, index int) bool {
return true
}
29 changes: 29 additions & 0 deletions plugin/storage/cassandra/spanstore/dbmodel/index_filter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright (c) 2018 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package dbmodel

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestDefaultIndexFilter(t *testing.T) {
span := &Span{}
filter := DefaultIndexFilter
assert.True(t, filter(span, DurationIndex))
assert.True(t, filter(span, ServiceIndex))
assert.True(t, filter(span, OperationIndex))
}
24 changes: 16 additions & 8 deletions plugin/storage/cassandra/spanstore/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ type SpanWriter struct {
tagIndexSkipped metrics.Counter
tagFilter dbmodel.TagFilter
storageMode storageMode
indexFilter dbmodel.IndexFilter
}

// NewSpanWriter returns a SpanWriter
Expand Down Expand Up @@ -121,6 +122,7 @@ func NewSpanWriter(
tagIndexSkipped: tagIndexSkipped,
tagFilter: opts.tagFilter,
storageMode: opts.storageMode,
indexFilter: opts.indexFilter,
}
}

Expand Down Expand Up @@ -178,16 +180,22 @@ func (s *SpanWriter) writeIndexes(span *model.Span, ds *dbmodel.Span) error {
return s.logError(ds, err, "Failed to index tags", s.logger)
}

if err := s.indexByService(span.TraceID, ds); err != nil {
return s.logError(ds, err, "Failed to index service name", s.logger)
if s.indexFilter(ds, dbmodel.ServiceIndex) {
if err := s.indexByService(ds); err != nil {
return s.logError(ds, err, "Failed to index service name", s.logger)
}
}

if err := s.indexByOperation(span.TraceID, ds); err != nil {
return s.logError(ds, err, "Failed to index operation name", s.logger)
if s.indexFilter(ds, dbmodel.OperationIndex) {
if err := s.indexByOperation(ds); err != nil {
return s.logError(ds, err, "Failed to index operation name", s.logger)
}
}

if err := s.indexByDuration(ds, span.StartTime); err != nil {
return s.logError(ds, err, "Failed to index duration", s.logger)
if s.indexFilter(ds, dbmodel.DurationIndex) {
if err := s.indexByDuration(ds, span.StartTime); err != nil {
return s.logError(ds, err, "Failed to index duration", s.logger)
}
}
return nil
}
Expand Down Expand Up @@ -228,14 +236,14 @@ func (s *SpanWriter) indexByDuration(span *dbmodel.Span, startTime time.Time) er
return err
}

func (s *SpanWriter) indexByService(traceID model.TraceID, span *dbmodel.Span) error {
func (s *SpanWriter) indexByService(span *dbmodel.Span) error {
bucketNo := uint64(span.SpanHash) % defaultNumBuckets
query := s.session.Query(serviceNameIndex)
q := query.Bind(span.Process.ServiceName, bucketNo, span.StartTime, span.TraceID)
return s.writerMetrics.serviceNameIndex.Exec(q, s.logger)
}

func (s *SpanWriter) indexByOperation(traceID model.TraceID, span *dbmodel.Span) error {
func (s *SpanWriter) indexByOperation(span *dbmodel.Span) error {
query := s.session.Query(serviceOperationIndex)
q := query.Bind(span.Process.ServiceName, span.OperationName, span.StartTime, span.TraceID)
return s.writerMetrics.serviceOperationIndex.Exec(q, s.logger)
Expand Down
11 changes: 11 additions & 0 deletions plugin/storage/cassandra/spanstore/writer_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type Option func(c *Options)
type Options struct {
tagFilter dbmodel.TagFilter
storageMode storageMode
indexFilter dbmodel.IndexFilter
}

// TagFilter can be provided to filter any tags that should not be indexed.
Expand All @@ -48,6 +49,13 @@ func StoreWithoutIndexing() Option {
}
}

// IndexFilter can be provided to filter certain spans that should not be indexed.
func IndexFilter(indexFilter dbmodel.IndexFilter) Option {
return func(o *Options) {
o.indexFilter = indexFilter
}
}

func applyOptions(opts ...Option) Options {
o := Options{}
for _, opt := range opts {
Expand All @@ -59,5 +67,8 @@ func applyOptions(opts ...Option) Options {
if o.storageMode == 0 {
o.storageMode = storeFlag | indexFlag
}
if o.indexFilter == nil {
o.indexFilter = dbmodel.DefaultIndexFilter
}
return o
}
3 changes: 2 additions & 1 deletion plugin/storage/cassandra/spanstore/writer_options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ import (
)

func TestWriterOptions(t *testing.T) {
opts := applyOptions(TagFilter(dbmodel.DefaultTagFilter))
opts := applyOptions(TagFilter(dbmodel.DefaultTagFilter), IndexFilter(dbmodel.DefaultIndexFilter))
assert.Equal(t, dbmodel.DefaultTagFilter, opts.tagFilter)
assert.ObjectsAreEqual(dbmodel.DefaultIndexFilter, opts.indexFilter)
}

func TestWriterOptions_StorageMode(t *testing.T) {
Expand Down
24 changes: 24 additions & 0 deletions plugin/storage/cassandra/spanstore/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,30 @@ func TestStorageMode_IndexOnly(t *testing.T) {
}, StoreIndexesOnly())
}

var filterEverything = func(*dbmodel.Span, int) bool {
return false
}

func TestStorageMode_IndexOnly_WithFilter(t *testing.T) {
withSpanWriter(0, func(w *spanWriterTest) {
w.writer.indexFilter = filterEverything
w.writer.serviceNamesWriter = func(serviceName string) error { return nil }
w.writer.operationNamesWriter = func(serviceName, operationName string) error { return nil }
span := &model.Span{
TraceID: model.NewTraceID(0, 1),
Process: &model.Process{
ServiceName: "service-a",
},
}
err := w.writer.WriteSpan(span)
assert.NoError(t, err)
w.session.AssertExpectations(t)
w.session.AssertNotCalled(t, "Query", stringMatcher(serviceOperationIndex))
w.session.AssertNotCalled(t, "Query", stringMatcher(serviceNameIndex))
w.session.AssertNotCalled(t, "Query", stringMatcher(durationIndex))
}, StoreIndexesOnly())
}

func TestStorageMode_StoreWithoutIndexing(t *testing.T) {
withSpanWriter(0, func(w *spanWriterTest) {

Expand Down