Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions cmd/jaeger/internal/extension/jaegerquery/querysvc/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger-idl/model/v1"
"github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerquery/internal/adjuster"
Expand All @@ -33,6 +34,8 @@ type QueryServiceOptions struct {
// MaxTraceSize is the maximum number of spans allowed per trace. A value of 0 (default) means unlimited.
// If a trace has more spans than this limit, it will be truncated and a warning will be added.
MaxTraceSize int
// Logger is used for diagnostic logging. If nil, a no-op logger is used.
Logger *zap.Logger
}

// StorageCapabilities is a feature flag for query service
Expand All @@ -50,6 +53,7 @@ type QueryService struct {
dependencyReader depstore.Reader
adjuster adjuster.Adjuster
options QueryServiceOptions
logger *zap.Logger
}

// GetTraceParams defines the parameters for retrieving traces using the GetTraces function.
Expand All @@ -74,13 +78,18 @@ func NewQueryService(
dependencyReader depstore.Reader,
options QueryServiceOptions,
) *QueryService {
logger := options.Logger
if logger == nil {
logger = zap.NewNop()
}
qsvc := &QueryService{
traceReader: traceReader,
dependencyReader: dependencyReader,
adjuster: adjuster.Sequence(
adjuster.StandardAdjusters(options.MaxClockSkewAdjust)...,
),
options: options,
logger: logger,
}

return qsvc
Expand Down Expand Up @@ -165,6 +174,9 @@ func (qs QueryService) ArchiveTrace(ctx context.Context, query tracestore.GetTra
if qs.options.ArchiveTraceWriter == nil {
return errNoArchiveSpanStorage
}
qs.logger.Debug("archive trace request received",
zap.String("trace_id", query.TraceID.String()),
)
getTracesIter := qs.GetTraces(
ctx, GetTraceParams{TraceIDs: []tracestore.GetTraceParams{query}},
)
Expand All @@ -189,6 +201,16 @@ func (qs QueryService) ArchiveTrace(ctx context.Context, query tracestore.GetTra
if archiveErr == nil && !found {
return spanstore.ErrTraceNotFound
}
if archiveErr != nil {
qs.logger.Warn("archive trace write failed",
zap.String("trace_id", query.TraceID.String()),
zap.Error(archiveErr),
)
} else {
qs.logger.Debug("archive trace completed",
zap.String("trace_id", query.TraceID.String()),
)
}
return archiveErr
}

Expand Down
1 change: 1 addition & 0 deletions cmd/jaeger/internal/extension/jaegerquery/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func (s *server) Start(ctx context.Context, host component.Host) error {
opts := querysvc.QueryServiceOptions{
MaxClockSkewAdjust: s.config.MaxClockSkewAdjust,
MaxTraceSize: s.config.MaxTraceSize,
Logger: s.telset.Logger,
}
if err := s.addArchiveStorage(&opts, host); err != nil {
return err
Expand Down
31 changes: 31 additions & 0 deletions internal/storage/v2/elasticsearch/tracestore/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,60 @@ import (
"context"

"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger-idl/model/v1"
"github.com/jaegertracing/jaeger/internal/storage/v1/elasticsearch/spanstore"
)

type TraceWriter struct {
spanWriter spanstore.CoreSpanWriter
logger *zap.Logger
}

// NewTraceWriter returns the TraceWriter for use
func NewTraceWriter(p spanstore.SpanWriterParams) *TraceWriter {
logger := p.Logger
if logger == nil {
logger = zap.NewNop()
}
return &TraceWriter{
spanWriter: spanstore.NewSpanWriter(p),
logger: logger,
}
}

// WriteTraces convert the traces to ES Span model and write into the database
func (t *TraceWriter) WriteTraces(_ context.Context, td ptrace.Traces) error {
rs := td.ResourceSpans()
if rs.Len() == 0 {
t.logger.Debug("skipping write of empty trace data")
return nil
}

dbSpans := ToDBModel(td)
if len(dbSpans) == 0 {
scopeSpansCount := 0
spanCount := 0
for i := 0; i < rs.Len(); i++ {
scopeSpans := rs.At(i).ScopeSpans()
scopeSpansCount += scopeSpans.Len()
for j := 0; j < scopeSpans.Len(); j++ {
spanCount += scopeSpans.At(j).Spans().Len()
}
}
t.logger.Warn("no spans converted from trace data",
zap.Int("resource_spans", rs.Len()),
zap.Int("scope_spans", scopeSpansCount),
zap.Int("spans", spanCount),
)
return nil
}
for i := range dbSpans {
span := &dbSpans[i]
t.spanWriter.WriteSpan(model.EpochMicrosecondsAsTime(span.StartTime), span)
}
t.logger.Debug("wrote spans to ES", zap.Int("count", len(dbSpans)))
return nil
}

Expand Down
46 changes: 44 additions & 2 deletions internal/storage/v2/elasticsearch/tracestore/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest/observer"

"github.com/jaegertracing/jaeger-idl/model/v1"
"github.com/jaegertracing/jaeger/internal/metrics"
Expand All @@ -19,23 +21,63 @@ import (
)

func TestTraceWriter_WriteTraces(t *testing.T) {
core, logs := observer.New(zap.DebugLevel)
logger := zap.New(core)
coreWriter := &mocks.CoreSpanWriter{}
td := ptrace.NewTraces()
resourceSpans := td.ResourceSpans().AppendEmpty()
resourceSpans.Resource().Attributes().PutStr("service.name", "testing-service")
span := resourceSpans.ScopeSpans().AppendEmpty().Spans().AppendEmpty()
span.SetName("op-1")
dbSpan := ToDBModel(td)
writer := TraceWriter{spanWriter: coreWriter}
writer := TraceWriter{spanWriter: coreWriter, logger: logger}
coreWriter.On("WriteSpan", model.EpochMicrosecondsAsTime(dbSpan[0].StartTime), &dbSpan[0])
err := writer.WriteTraces(context.Background(), td)
require.NoError(t, err)
require.Equal(t, 1, logs.Len())
assert.Equal(t, "wrote spans to ES", logs.All()[0].Message)
assert.Equal(t, zapcore.DebugLevel, logs.All()[0].Level)
}

func TestTraceWriter_WriteTraces_EmptyTraces(t *testing.T) {
core, logs := observer.New(zap.DebugLevel)
logger := zap.New(core)
coreWriter := &mocks.CoreSpanWriter{}
writer := TraceWriter{spanWriter: coreWriter, logger: logger}
td := ptrace.NewTraces()
err := writer.WriteTraces(context.Background(), td)
require.NoError(t, err)
coreWriter.AssertNotCalled(t, "WriteSpan")
require.Equal(t, 1, logs.Len())
assert.Equal(t, "skipping write of empty trace data", logs.All()[0].Message)
assert.Equal(t, zapcore.DebugLevel, logs.All()[0].Level)
}

func TestTraceWriter_WriteTraces_NonEmptyResourceSpansZeroSpans(t *testing.T) {
core, logs := observer.New(zap.DebugLevel)
logger := zap.New(core)
coreWriter := &mocks.CoreSpanWriter{}
writer := TraceWriter{spanWriter: coreWriter, logger: logger}
td := ptrace.NewTraces()
rs := td.ResourceSpans().AppendEmpty()
rs.Resource().Attributes().PutStr("service.name", "testing-service")
rs.ScopeSpans().AppendEmpty() // ScopeSpans present but no actual Spans
err := writer.WriteTraces(context.Background(), td)
require.NoError(t, err)
coreWriter.AssertNotCalled(t, "WriteSpan")
require.Equal(t, 1, logs.Len())
logEntry := logs.All()[0]
assert.Equal(t, "no spans converted from trace data", logEntry.Message)
assert.Equal(t, zapcore.WarnLevel, logEntry.Level)
assert.Equal(t, int64(1), logEntry.ContextMap()["resource_spans"])
assert.Equal(t, int64(1), logEntry.ContextMap()["scope_spans"])
assert.Equal(t, int64(0), logEntry.ContextMap()["spans"])
}

func TestTraceWriter_Close(t *testing.T) {
coreWriter := &mocks.CoreSpanWriter{}
coreWriter.On("Close").Return(nil)
writer := TraceWriter{spanWriter: coreWriter}
writer := TraceWriter{spanWriter: coreWriter, logger: zap.NewNop()}
err := writer.Close()
require.NoError(t, err)
}
Expand Down
Loading