Skip to content

Commit 3ccc1ea

Browse files
AnmolxSinghyurishkuro
authored andcommitted
[Fix] Restore ES metrics (jaegertracing#7006)
## Which problem is this PR solving? - Resolves jaegertracing#6891 ## Description of the changes - Reintroduced metrics which were lost during jaegertracing#6883 ## How was this change tested? - Ran writer tests ## Checklist - [x] I have read https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md - [x] I have signed all commits - [x] I have added unit tests for the new functionality - [x] I have run lint and test steps successfully - for `jaeger`: `make lint test` - for `jaeger-ui`: `npm run lint` and `npm run test` --------- Signed-off-by: anmol7344 <anmol7344@gmail.com> Signed-off-by: Anmol <166167480+AnmolxSingh@users.noreply.github.com> Signed-off-by: Yuri Shkuro <github@ysh.us> Signed-off-by: Yuri Shkuro <yurishkuro@users.noreply.github.com> Co-authored-by: Yuri Shkuro <yurishkuro@users.noreply.github.com> Co-authored-by: Yuri Shkuro <github@ysh.us>
1 parent 144d270 commit 3ccc1ea

File tree

5 files changed

+176
-40
lines changed

5 files changed

+176
-40
lines changed

internal/storage/elasticsearch/config/config.go

Lines changed: 58 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,12 @@ type Indices struct {
6969
Sampling IndexOptions `mapstructure:"sampling"`
7070
}
7171

72+
type bulkCallback struct {
73+
startTimes sync.Map
74+
sm *spanstoremetrics.WriteMetrics
75+
logger *zap.Logger
76+
}
77+
7278
type IndexPrefix string
7379

7480
func (p IndexPrefix) Apply(indexName string) string {
@@ -227,48 +233,16 @@ func NewClient(c *Configuration, logger *zap.Logger, metricsFactory metrics.Fact
227233
return nil, err
228234
}
229235

230-
sm := spanstoremetrics.NewWriter(metricsFactory, "bulk_index")
231-
m := sync.Map{}
236+
bcb := bulkCallback{
237+
sm: spanstoremetrics.NewWriter(metricsFactory, "bulk_index"),
238+
logger: logger,
239+
}
232240

233241
bulkProc, err := rawClient.BulkProcessor().
234242
Before(func(id int64, _ /* requests */ []elastic.BulkableRequest) {
235-
m.Store(id, time.Now())
236-
}).
237-
After(func(id int64, requests []elastic.BulkableRequest, response *elastic.BulkResponse, err error) {
238-
start, ok := m.Load(id)
239-
if !ok {
240-
return
241-
}
242-
m.Delete(id)
243-
244-
// log individual errors, note that err might be false and these errors still present
245-
if response != nil && response.Errors {
246-
for _, it := range response.Items {
247-
for key, val := range it {
248-
if val.Error != nil {
249-
logger.Error("Elasticsearch part of bulk request failed", zap.String("map-key", key),
250-
zap.Reflect("response", val))
251-
}
252-
}
253-
}
254-
}
255-
256-
sm.Emit(err, time.Since(start.(time.Time)))
257-
if err != nil {
258-
var failed int
259-
if response == nil {
260-
failed = 0
261-
} else {
262-
failed = len(response.Failed())
263-
}
264-
total := len(requests)
265-
logger.Error("Elasticsearch could not process bulk request",
266-
zap.Int("request_count", total),
267-
zap.Int("failed_count", failed),
268-
zap.Error(err),
269-
zap.Any("response", response))
270-
}
243+
bcb.startTimes.Store(id, time.Now())
271244
}).
245+
After(bcb.invoke).
272246
BulkSize(c.BulkProcessing.MaxBytes).
273247
Workers(c.BulkProcessing.Workers).
274248
BulkActions(c.BulkProcessing.MaxActions).
@@ -315,6 +289,52 @@ func NewClient(c *Configuration, logger *zap.Logger, metricsFactory metrics.Fact
315289
return eswrapper.WrapESClient(rawClient, bulkProc, c.Version, rawClientV8), nil
316290
}
317291

292+
func (bcb *bulkCallback) invoke(id int64, requests []elastic.BulkableRequest, response *elastic.BulkResponse, err error) {
293+
start, ok := bcb.startTimes.Load(id)
294+
if ok {
295+
bcb.startTimes.Delete(id)
296+
} else {
297+
start = time.Now()
298+
}
299+
300+
// Log individual errors
301+
if response != nil && response.Errors {
302+
for _, it := range response.Items {
303+
for key, val := range it {
304+
if val.Error != nil {
305+
bcb.logger.Error("Elasticsearch part of bulk request failed",
306+
zap.String("map-key", key), zap.Reflect("response", val))
307+
}
308+
}
309+
}
310+
}
311+
312+
latency := time.Since(start.(time.Time))
313+
if err != nil {
314+
bcb.sm.LatencyErr.Record(latency)
315+
} else {
316+
bcb.sm.LatencyOk.Record(latency)
317+
}
318+
319+
var failed int
320+
if response != nil {
321+
failed = len(response.Failed())
322+
}
323+
324+
total := len(requests)
325+
bcb.sm.Attempts.Inc(int64(total))
326+
bcb.sm.Inserts.Inc(int64(total - failed))
327+
bcb.sm.Errors.Inc(int64(failed))
328+
329+
if err != nil {
330+
bcb.logger.Error("Elasticsearch could not process bulk request",
331+
zap.Int("request_count", total),
332+
zap.Int("failed_count", failed),
333+
zap.Error(err),
334+
zap.Any("response", response))
335+
}
336+
}
337+
318338
func newElasticsearchV8(c *Configuration, logger *zap.Logger) (*esV8.Client, error) {
319339
var options esV8.Config
320340
options.Addresses = c.Servers

internal/storage/elasticsearch/config/config_test.go

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,19 @@ import (
88
"net/http/httptest"
99
"os"
1010
"path/filepath"
11+
"sync"
1112
"testing"
1213
"time"
1314

15+
"github.com/olivere/elastic"
1416
"github.com/stretchr/testify/assert"
1517
"github.com/stretchr/testify/require"
1618
"go.opentelemetry.io/collector/config/configtls"
1719
"go.uber.org/zap"
1820

1921
"github.com/jaegertracing/jaeger/internal/metrics"
22+
"github.com/jaegertracing/jaeger/internal/metricstest"
23+
"github.com/jaegertracing/jaeger/internal/storage/v1/api/spanstore/spanstoremetrics"
2024
"github.com/jaegertracing/jaeger/internal/testutils"
2125
)
2226

@@ -796,6 +800,103 @@ func TestApplyForIndexPrefix(t *testing.T) {
796800
}
797801
}
798802

803+
func TestHandleBulkAfterCallback_ErrorMetricsEmitted(t *testing.T) {
804+
mf := metricstest.NewFactory(time.Minute)
805+
sm := spanstoremetrics.NewWriter(mf, "bulk_index")
806+
logger := zap.NewNop()
807+
defer mf.Stop()
808+
809+
var m sync.Map
810+
batchID := int64(1)
811+
start := time.Now().Add(-100 * time.Millisecond)
812+
m.Store(batchID, start)
813+
814+
fakeRequests := []elastic.BulkableRequest{nil, nil}
815+
response := &elastic.BulkResponse{
816+
Errors: true,
817+
Items: []map[string]*elastic.BulkResponseItem{
818+
{
819+
"index": {
820+
Status: 500,
821+
Error: &elastic.ErrorDetails{Type: "server_error"},
822+
},
823+
},
824+
{
825+
"index": {
826+
Status: 200,
827+
Error: nil,
828+
},
829+
},
830+
},
831+
}
832+
833+
bcb := bulkCallback{
834+
sm: sm,
835+
logger: logger,
836+
}
837+
838+
bcb.invoke(batchID, fakeRequests, response, assert.AnError)
839+
840+
mf.AssertCounterMetrics(t,
841+
metricstest.ExpectedMetric{
842+
Name: "bulk_index.errors",
843+
Value: 1,
844+
},
845+
metricstest.ExpectedMetric{
846+
Name: "bulk_index.inserts",
847+
Value: 1,
848+
},
849+
metricstest.ExpectedMetric{
850+
Name: "bulk_index.attempts",
851+
Value: 2,
852+
},
853+
)
854+
}
855+
856+
func TestHandleBulkAfterCallback_MissingStartTime(t *testing.T) {
857+
mf := metricstest.NewFactory(time.Minute)
858+
sm := spanstoremetrics.NewWriter(mf, "bulk_index")
859+
logger := zap.NewNop()
860+
defer mf.Stop()
861+
862+
batchID := int64(42) // assign any value which is not stored in the map
863+
864+
fakeRequests := []elastic.BulkableRequest{nil}
865+
response := &elastic.BulkResponse{
866+
Errors: true,
867+
Items: []map[string]*elastic.BulkResponseItem{
868+
{
869+
"index": {
870+
Status: 500,
871+
Error: &elastic.ErrorDetails{Type: "mock_error"},
872+
},
873+
},
874+
},
875+
}
876+
877+
bcb := bulkCallback{
878+
sm: sm,
879+
logger: logger,
880+
}
881+
882+
bcb.invoke(batchID, fakeRequests, response, assert.AnError)
883+
884+
mf.AssertCounterMetrics(t,
885+
metricstest.ExpectedMetric{
886+
Name: "bulk_index.errors",
887+
Value: 1,
888+
},
889+
metricstest.ExpectedMetric{
890+
Name: "bulk_index.inserts",
891+
Value: 0,
892+
},
893+
metricstest.ExpectedMetric{
894+
Name: "bulk_index.attempts",
895+
Value: 1,
896+
},
897+
)
898+
}
899+
799900
func TestMain(m *testing.M) {
800901
testutils.VerifyGoLeaks(m)
801902
}

internal/storage/v1/elasticsearch/spanstore/writer.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
es "github.com/jaegertracing/jaeger/internal/storage/elasticsearch"
1818
cfg "github.com/jaegertracing/jaeger/internal/storage/elasticsearch/config"
1919
"github.com/jaegertracing/jaeger/internal/storage/elasticsearch/dbmodel"
20+
"github.com/jaegertracing/jaeger/internal/storage/v1/api/spanstore/spanstoremetrics"
2021
)
2122

2223
const (
@@ -33,6 +34,7 @@ type SpanWriter struct {
3334
client func() es.Client
3435
logger *zap.Logger
3536
// indexCache cache.Cache
37+
writerMetrics *spanstoremetrics.WriteMetrics
3638
serviceWriter serviceWriter
3739
spanServiceIndex spanAndServiceIndexFn
3840
allTagsAsFields bool
@@ -91,6 +93,7 @@ func NewSpanWriter(p SpanWriterParams) *SpanWriter {
9193
return &SpanWriter{
9294
client: p.Client,
9395
logger: p.Logger,
96+
writerMetrics: spanstoremetrics.NewWriter(p.MetricsFactory, "spans"),
9497
serviceWriter: serviceOperationStorage.Write,
9598
spanServiceIndex: getSpanAndServiceIndexFn(p, writeAliasSuffix),
9699
tagKeysAsFields: tags,
@@ -132,6 +135,7 @@ func getSpanAndServiceIndexFn(p SpanWriterParams, writeAlias string) spanAndServ
132135

133136
// WriteSpan writes a span and its corresponding service:operation in ElasticSearch
134137
func (s *SpanWriter) WriteSpan(spanStartTime time.Time, span *dbmodel.Span) {
138+
s.writerMetrics.Attempts.Inc(1)
135139
s.convertNestedTagsToFieldTags(span)
136140
spanIndexName, serviceIndexName := s.spanServiceIndex(spanStartTime)
137141
if serviceIndexName != "" {

internal/storage/v1/elasticsearch/spanstore/writer_test.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"go.uber.org/zap"
1818

1919
"github.com/jaegertracing/jaeger-idl/model/v1"
20+
"github.com/jaegertracing/jaeger/internal/metrics"
2021
"github.com/jaegertracing/jaeger/internal/metricstest"
2122
es "github.com/jaegertracing/jaeger/internal/storage/elasticsearch"
2223
"github.com/jaegertracing/jaeger/internal/storage/elasticsearch/config"
@@ -434,6 +435,8 @@ func TestTagMap(t *testing.T) {
434435
}
435436
dbSpan := dbmodel.Span{Tags: tags, Process: dbmodel.Process{Tags: tags}}
436437
converter := NewSpanWriter(SpanWriterParams{
438+
Logger: zap.NewNop(),
439+
MetricsFactory: metrics.NullFactory,
437440
AllTagsAsFields: false,
438441
TagKeysAsFields: []string{"a", "b.b", "b*"},
439442
TagDotReplacement: ":",
@@ -509,7 +512,10 @@ func TestNewSpanTags(t *testing.T) {
509512
Tags: []dbmodel.KeyValue{{Key: "foo", Value: "bar", Type: dbmodel.StringType}},
510513
Process: dbmodel.Process{Tags: []dbmodel.KeyValue{{Key: "bar", Value: "baz", Type: dbmodel.StringType}}},
511514
}
512-
writer := NewSpanWriter(test.params)
515+
params := test.params
516+
params.Logger = zap.NewNop()
517+
params.MetricsFactory = metrics.NullFactory
518+
writer := NewSpanWriter(params)
513519
writer.convertNestedTagsToFieldTags(mSpan)
514520
assert.Equal(t, test.expected.Tag, mSpan.Tag)
515521
assert.Equal(t, test.expected.Tags, mSpan.Tags)

internal/storage/v2/elasticsearch/tracestore/writer_test.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,10 @@ import (
1010
"github.com/stretchr/testify/assert"
1111
"github.com/stretchr/testify/require"
1212
"go.opentelemetry.io/collector/pdata/ptrace"
13+
"go.uber.org/zap"
1314

1415
"github.com/jaegertracing/jaeger-idl/model/v1"
16+
"github.com/jaegertracing/jaeger/internal/metrics"
1517
cfg "github.com/jaegertracing/jaeger/internal/storage/elasticsearch/config"
1618
"github.com/jaegertracing/jaeger/internal/storage/v1/elasticsearch/spanstore"
1719
"github.com/jaegertracing/jaeger/internal/storage/v1/elasticsearch/spanstore/mocks"
@@ -48,7 +50,10 @@ func TestTraceWriter_CreateTemplates(t *testing.T) {
4850
}
4951

5052
func Test_NewTraceWriter(t *testing.T) {
51-
params := spanstore.SpanWriterParams{}
53+
params := spanstore.SpanWriterParams{
54+
Logger: zap.NewNop(),
55+
MetricsFactory: metrics.NullFactory,
56+
}
5257
writer := NewTraceWriter(params)
5358
assert.NotNil(t, writer)
5459
}

0 commit comments

Comments
 (0)