Skip to content

Commit c3e6e14

Browse files
black-adderyurishkuro
authored andcommitted
Add cassandra tag filter (#442)
* Add cassandra tag filter Signed-off-by: Won Jun Jang <wjang@uber.com>
1 parent 0549efd commit c3e6e14

File tree

9 files changed

+239
-9
lines changed

9 files changed

+239
-9
lines changed
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
// Copyright (c) 2017 Uber Technologies, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package dbmodel
16+
17+
import "github.com/uber/jaeger/model"
18+
19+
// LogFieldsFilter filters all span.Logs.Fields.
20+
type LogFieldsFilter struct {
21+
tagFilterImpl
22+
}
23+
24+
// NewLogFieldsFilter return a filter that filters all span.Logs.Fields.
25+
func NewLogFieldsFilter() *LogFieldsFilter {
26+
return &LogFieldsFilter{}
27+
}
28+
29+
// FilterLogFields implements TagFilter#FilterLogFields
30+
func (f *LogFieldsFilter) FilterLogFields(logFields model.KeyValues) model.KeyValues {
31+
return model.KeyValues{}
32+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
// Copyright (c) 2017 Uber Technologies, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package dbmodel
16+
17+
import (
18+
"testing"
19+
20+
"github.com/kr/pretty"
21+
"github.com/stretchr/testify/assert"
22+
)
23+
24+
var filter TagFilter = &LogFieldsFilter{} // Check API compliance
25+
26+
func TestFilterLogTags(t *testing.T) {
27+
span := getTestJaegerSpan()
28+
filter := NewLogFieldsFilter()
29+
expectedTags := append(someTags, someTags...)
30+
filteredTags := filter.FilterProcessTags(span.Process.Tags)
31+
filteredTags = append(filteredTags, filter.FilterTags(span.Tags)...)
32+
for _, log := range span.Logs {
33+
filteredTags = append(filteredTags, filter.FilterLogFields(log.Fields)...)
34+
}
35+
if !assert.EqualValues(t, expectedTags, filteredTags) {
36+
for _, diff := range pretty.Diff(expectedTags, filteredTags) {
37+
t.Log(diff)
38+
}
39+
}
40+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
// Copyright (c) 2017 Uber Technologies, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package dbmodel
16+
17+
import (
18+
"github.com/uber/jaeger/model"
19+
)
20+
21+
// TODO (black-adder) add a chain filter
22+
23+
// TagFilter filters out any tags that should not be indexed.
24+
type TagFilter interface {
25+
FilterProcessTags(processTags model.KeyValues) model.KeyValues
26+
FilterTags(tags model.KeyValues) model.KeyValues
27+
FilterLogFields(logFields model.KeyValues) model.KeyValues
28+
}
29+
30+
// DefaultTagFilter returns a filter that retrieves all tags from span.Tags, span.Logs, and span.Process.
31+
var DefaultTagFilter = tagFilterImpl{}
32+
33+
type tagFilterImpl struct{}
34+
35+
func (f tagFilterImpl) FilterProcessTags(processTags model.KeyValues) model.KeyValues {
36+
return processTags
37+
}
38+
39+
func (f tagFilterImpl) FilterTags(tags model.KeyValues) model.KeyValues {
40+
return tags
41+
}
42+
43+
func (f tagFilterImpl) FilterLogFields(logFields model.KeyValues) model.KeyValues {
44+
return logFields
45+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
// Copyright (c) 2017 Uber Technologies, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package dbmodel
16+
17+
import (
18+
"testing"
19+
20+
"github.com/kr/pretty"
21+
"github.com/stretchr/testify/assert"
22+
)
23+
24+
func TestDefaultTagFilter(t *testing.T) {
25+
span := getTestJaegerSpan()
26+
expectedTags := append(append(someTags, someTags...), someTags...)
27+
filteredTags := DefaultTagFilter.FilterProcessTags(span.Process.Tags)
28+
filteredTags = append(filteredTags, DefaultTagFilter.FilterTags(span.Tags)...)
29+
for _, log := range span.Logs {
30+
filteredTags = append(filteredTags, DefaultTagFilter.FilterLogFields(log.Fields)...)
31+
}
32+
if !assert.EqualValues(t, expectedTags, filteredTags) {
33+
for _, diff := range pretty.Diff(expectedTags, filteredTags) {
34+
t.Log(diff)
35+
}
36+
}
37+
}

plugin/storage/cassandra/spanstore/dbmodel/unique_tags.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,12 @@ package dbmodel
1616

1717
import "github.com/uber/jaeger/model"
1818

19-
// GetAllUniqueTags creates a list of all unique tags found in a span and process
20-
func GetAllUniqueTags(span *model.Span) []TagInsertion {
21-
process := span.Process
22-
allTags := span.Tags
23-
allTags = append(allTags, process.Tags...)
19+
// GetAllUniqueTags creates a list of all unique tags from a set of filtered tags.
20+
func GetAllUniqueTags(span *model.Span, tagFilter TagFilter) []TagInsertion {
21+
allTags := append(model.KeyValues{}, tagFilter.FilterProcessTags(span.Process.Tags)...)
22+
allTags = append(allTags, tagFilter.FilterTags(span.Tags)...)
2423
for _, log := range span.Logs {
25-
allTags = append(allTags, log.Fields...)
24+
allTags = append(allTags, tagFilter.FilterLogFields(log.Fields)...)
2625
}
2726
allTags.Sort()
2827
uniqueTags := make([]TagInsertion, 0, len(allTags))
@@ -34,7 +33,7 @@ func GetAllUniqueTags(span *model.Span) []TagInsertion {
3433
continue // skip identical tags
3534
}
3635
uniqueTags = append(uniqueTags, TagInsertion{
37-
ServiceName: process.ServiceName,
36+
ServiceName: span.Process.ServiceName,
3837
TagKey: allTags[i].Key,
3938
TagValue: allTags[i].AsString(),
4039
})

plugin/storage/cassandra/spanstore/dbmodel/unique_tags_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import (
2323

2424
func TestGetUniqueTags(t *testing.T) {
2525
expectedTags := getTestUniqueTags()
26-
uniqueTags := GetAllUniqueTags(getTestJaegerSpan())
26+
uniqueTags := GetAllUniqueTags(getTestJaegerSpan(), DefaultTagFilter)
2727
if !assert.EqualValues(t, expectedTags, uniqueTags) {
2828
for _, diff := range pretty.Diff(expectedTags, uniqueTags) {
2929
t.Log(diff)

plugin/storage/cassandra/spanstore/writer.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ type SpanWriter struct {
8686
logger *zap.Logger
8787
tagIndexSkipped metrics.Counter
8888
bucketCounter uint32
89+
tagFilter dbmodel.TagFilter
8990
}
9091

9192
// NewSpanWriter returns a SpanWriter
@@ -94,10 +95,12 @@ func NewSpanWriter(
9495
writeCacheTTL time.Duration,
9596
metricsFactory metrics.Factory,
9697
logger *zap.Logger,
98+
options ...Option,
9799
) *SpanWriter {
98100
serviceNamesStorage := NewServiceNamesStorage(session, writeCacheTTL, metricsFactory, logger)
99101
operationNamesStorage := NewOperationNamesStorage(session, writeCacheTTL, metricsFactory, logger)
100102
tagIndexSkipped := metricsFactory.Counter("tagIndexSkipped", nil)
103+
opts := applyOptions(options...)
101104
return &SpanWriter{
102105
session: session,
103106
serviceNamesWriter: serviceNamesStorage.Write,
@@ -111,6 +114,7 @@ func NewSpanWriter(
111114
},
112115
logger: logger,
113116
tagIndexSkipped: tagIndexSkipped,
117+
tagFilter: opts.tagFilter,
114118
}
115119
}
116120

@@ -160,7 +164,7 @@ func (s *SpanWriter) WriteSpan(span *model.Span) error {
160164
}
161165

162166
func (s *SpanWriter) indexByTags(span *model.Span, ds *dbmodel.Span) error {
163-
for _, v := range dbmodel.GetAllUniqueTags(span) {
167+
for _, v := range dbmodel.GetAllUniqueTags(span, s.tagFilter) {
164168
// we should introduce retries or just ignore failures imo, retrying each individual tag insertion might be better
165169
// we should consider bucketing.
166170
if s.shouldIndexTag(v) {
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
// Copyright (c) 2017 Uber Technologies, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package spanstore
16+
17+
import (
18+
"github.com/uber/jaeger/plugin/storage/cassandra/spanstore/dbmodel"
19+
)
20+
21+
// Option is a function that sets some option on the writer.
22+
type Option func(c *Options)
23+
24+
// Options control behavior of the writer.
25+
type Options struct {
26+
tagFilter dbmodel.TagFilter
27+
}
28+
29+
// TagFilter can be provided to filter any tags that should not be indexed.
30+
func TagFilter(tagFilter dbmodel.TagFilter) Option {
31+
return func(o *Options) {
32+
o.tagFilter = tagFilter
33+
}
34+
}
35+
36+
func applyOptions(opts ...Option) Options {
37+
o := Options{}
38+
for _, opt := range opts {
39+
opt(&o)
40+
}
41+
if o.tagFilter == nil {
42+
o.tagFilter = dbmodel.DefaultTagFilter
43+
}
44+
return o
45+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
// Copyright (c) 2017 Uber Technologies, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package spanstore
16+
17+
import (
18+
"testing"
19+
20+
"github.com/stretchr/testify/assert"
21+
22+
"github.com/uber/jaeger/plugin/storage/cassandra/spanstore/dbmodel"
23+
)
24+
25+
func TestWriterOpetions(t *testing.T) {
26+
opts := applyOptions(TagFilter(dbmodel.DefaultTagFilter))
27+
assert.Equal(t, dbmodel.DefaultTagFilter, opts.tagFilter)
28+
}

0 commit comments

Comments
 (0)