Skip to content

Commit 4470e6e

Browse files
mahadzaryab1amol-verma-allen
authored andcommitted
[grpc][v2] Implement OTLP exporter API in gRPC v2 handler (jaegertracing#7012)
## Which problem is this PR solving? - Towards jaegertracing#6979 ## Description of the changes - Implement the `Export` (from OTEL's Exporter API) call in the gRPC v2 handler ## How was this change tested? - Added unit tests ## Checklist - [x] I have read https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md - [x] I have signed all commits - [x] I have added unit tests for the new functionality - [x] I have run lint and test steps successfully - for `jaeger`: `make lint test` - for `jaeger-ui`: `npm run lint` and `npm run test` --------- Signed-off-by: Mahad Zaryab <mahadzaryab1@gmail.com>
1 parent bdf3615 commit 4470e6e

File tree

2 files changed

+61
-9
lines changed

2 files changed

+61
-9
lines changed

internal/storage/v2/grpc/handler.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,13 @@ type Handler struct {
2626
ptraceotlp.UnimplementedGRPCServer
2727

2828
traceReader tracestore.Reader
29+
traceWriter tracestore.Writer
2930
}
3031

31-
func NewHandler(traceReader tracestore.Reader) *Handler {
32+
func NewHandler(traceReader tracestore.Reader, traceWriter tracestore.Writer) *Handler {
3233
return &Handler{
3334
traceReader: traceReader,
35+
traceWriter: traceWriter,
3436
}
3537
}
3638

@@ -139,6 +141,17 @@ func (h *Handler) FindTraceIDs(
139141
}, nil
140142
}
141143

144+
func (h *Handler) Export(ctx context.Context, request ptraceotlp.ExportRequest) (
145+
ptraceotlp.ExportResponse,
146+
error,
147+
) {
148+
err := h.traceWriter.WriteTraces(ctx, request.Traces())
149+
if err != nil {
150+
return ptraceotlp.NewExportResponse(), err
151+
}
152+
return ptraceotlp.NewExportResponse(), nil
153+
}
154+
142155
func toTraceQueryParams(t *storage.TraceQueryParameters) tracestore.TraceQueryParams {
143156
return tracestore.TraceQueryParams{
144157
ServiceName: t.ServiceName,

internal/storage/v2/grpc/handler_test.go

Lines changed: 47 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/stretchr/testify/require"
1515
"go.opentelemetry.io/collector/pdata/pcommon"
1616
"go.opentelemetry.io/collector/pdata/ptrace"
17+
"go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp"
1718
"google.golang.org/grpc"
1819

1920
"github.com/jaegertracing/jaeger/internal/jptrace"
@@ -41,7 +42,6 @@ func (f *testStream) Send(td *jptrace.TracesData) error {
4142
}
4243

4344
func TestHandler_GetTraces(t *testing.T) {
44-
reader := new(tracestoremocks.Reader)
4545
start := time.Now()
4646
end := start.Add(time.Minute)
4747
query := tracestore.GetTraceParams{
@@ -92,6 +92,8 @@ func TestHandler_GetTraces(t *testing.T) {
9292

9393
for _, test := range tests {
9494
t.Run(test.name, func(t *testing.T) {
95+
reader := new(tracestoremocks.Reader)
96+
writer := new(tracestoremocks.Writer)
9597
reader.On("GetTraces", mock.Anything, query).
9698
Return(iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) {
9799
if test.getTraceErr != nil {
@@ -105,7 +107,7 @@ func TestHandler_GetTraces(t *testing.T) {
105107
}
106108
})).Once()
107109

108-
server := NewHandler(reader)
110+
server := NewHandler(reader, writer)
109111
stream := &testStream{
110112
sendErr: test.sendErr,
111113
}
@@ -156,10 +158,11 @@ func TestHandler_GetServices(t *testing.T) {
156158
for _, test := range tests {
157159
t.Run(test.name, func(t *testing.T) {
158160
reader := new(tracestoremocks.Reader)
161+
writer := new(tracestoremocks.Writer)
159162
reader.On("GetServices", mock.Anything).
160163
Return(test.services, test.err).Once()
161164

162-
server := NewHandler(reader)
165+
server := NewHandler(reader, writer)
163166
resp, err := server.GetServices(context.Background(), &storage.GetServicesRequest{})
164167
if test.expectedErr == nil {
165168
require.NoError(t, err)
@@ -213,10 +216,11 @@ func TestHandler_GetOperations(t *testing.T) {
213216
for _, test := range tests {
214217
t.Run(test.name, func(t *testing.T) {
215218
reader := new(tracestoremocks.Reader)
219+
writer := new(tracestoremocks.Writer)
216220
reader.On("GetOperations", mock.Anything, params).
217221
Return(test.operations, test.err).Once()
218222

219-
server := NewHandler(reader)
223+
server := NewHandler(reader, writer)
220224
resp, err := server.GetOperations(context.Background(), req)
221225
if test.expectedErr == nil {
222226
require.NoError(t, err)
@@ -229,7 +233,6 @@ func TestHandler_GetOperations(t *testing.T) {
229233
}
230234

231235
func TestHandler_FindTraces(t *testing.T) {
232-
reader := new(tracestoremocks.Reader)
233236
query := tracestore.TraceQueryParams{
234237
ServiceName: "service",
235238
OperationName: "operation",
@@ -278,6 +281,8 @@ func TestHandler_FindTraces(t *testing.T) {
278281

279282
for _, test := range tests {
280283
t.Run(test.name, func(t *testing.T) {
284+
reader := new(tracestoremocks.Reader)
285+
writer := new(tracestoremocks.Writer)
281286
reader.On("FindTraces", mock.Anything, query).
282287
Return(iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) {
283288
if test.getTraceErr != nil {
@@ -291,7 +296,7 @@ func TestHandler_FindTraces(t *testing.T) {
291296
}
292297
})).Once()
293298

294-
server := NewHandler(reader)
299+
server := NewHandler(reader, writer)
295300
stream := &testStream{
296301
sendErr: test.sendErr,
297302
}
@@ -312,7 +317,6 @@ func TestHandler_FindTraces(t *testing.T) {
312317
}
313318

314319
func TestHandler_FindTraceIDs(t *testing.T) {
315-
reader := new(tracestoremocks.Reader)
316320
query := tracestore.TraceQueryParams{
317321
ServiceName: "service",
318322
OperationName: "operation",
@@ -368,11 +372,13 @@ func TestHandler_FindTraceIDs(t *testing.T) {
368372
}
369373

370374
for _, test := range tests {
375+
reader := new(tracestoremocks.Reader)
376+
writer := new(tracestoremocks.Writer)
371377
reader.On("FindTraceIDs", mock.Anything, query).
372378
Return(iter.Seq2[[]tracestore.FoundTraceID, error](func(yield func([]tracestore.FoundTraceID, error) bool) {
373379
yield(test.traceIDs, test.findTraceIDsErr)
374380
})).Once()
375-
server := NewHandler(reader)
381+
server := NewHandler(reader, writer)
376382

377383
response, err := server.FindTraceIDs(context.Background(), &storage.FindTracesRequest{
378384
Query: &storage.TraceQueryParameters{
@@ -389,6 +395,39 @@ func TestHandler_FindTraceIDs(t *testing.T) {
389395
}
390396
}
391397

398+
func TestHandler_Export(t *testing.T) {
399+
tests := []struct {
400+
name string
401+
writeTracesErr error
402+
expectedErr error
403+
}{
404+
{
405+
name: "success",
406+
},
407+
{
408+
name: "write error",
409+
expectedErr: assert.AnError,
410+
writeTracesErr: assert.AnError,
411+
},
412+
}
413+
for _, test := range tests {
414+
t.Run(test.name, func(t *testing.T) {
415+
reader := new(tracestoremocks.Reader)
416+
writer := new(tracestoremocks.Writer)
417+
writer.On("WriteTraces", mock.Anything, makeTestTrace()).Return(test.writeTracesErr).Once()
418+
server := NewHandler(reader, writer)
419+
420+
response, err := server.Export(context.Background(), ptraceotlp.NewExportRequestFromTraces(makeTestTrace()))
421+
if test.expectedErr != nil {
422+
require.ErrorIs(t, err, test.expectedErr)
423+
} else {
424+
require.NoError(t, err)
425+
}
426+
require.Equal(t, ptraceotlp.NewExportResponse(), response)
427+
})
428+
}
429+
}
430+
392431
func TestConvertKeyValueListToMap(t *testing.T) {
393432
tests := []struct {
394433
name string

0 commit comments

Comments
 (0)