Skip to content

Commit 2530903

Browse files
authored
Refactor tenancy checking from gRPC to gRPC batch consumer (#3718)
* Refactor tenancy checking from gRPC to gRPC batch consumer Signed-off-by: Ed Snible <snible@us.ibm.com> * Pseudo-constructor to initialize batchConsumer Signed-off-by: Ed Snible <snible@us.ibm.com>
1 parent 3499c88 commit 2530903

File tree

3 files changed

+42
-34
lines changed

3 files changed

+42
-34
lines changed

cmd/collector/app/handler/grpc_handler.go

Lines changed: 34 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -33,51 +33,62 @@ import (
3333
type GRPCHandler struct {
3434
logger *zap.Logger
3535
batchConsumer batchConsumer
36-
tenancyConfig *tenancy.TenancyConfig
3736
}
3837

3938
// NewGRPCHandler registers routes for this handler on the given router.
4039
func NewGRPCHandler(logger *zap.Logger, spanProcessor processor.SpanProcessor, tenancyConfig *tenancy.TenancyConfig) *GRPCHandler {
4140
return &GRPCHandler{
4241
logger: logger,
43-
batchConsumer: batchConsumer{
44-
logger: logger,
45-
spanProcessor: spanProcessor,
46-
spanOptions: processor.SpansOptions{
47-
InboundTransport: processor.GRPCTransport,
48-
SpanFormat: processor.ProtoSpanFormat,
49-
},
50-
},
51-
tenancyConfig: tenancyConfig,
42+
batchConsumer: newBatchConsumer(logger,
43+
spanProcessor,
44+
processor.GRPCTransport,
45+
processor.ProtoSpanFormat,
46+
tenancyConfig),
5247
}
5348
}
5449

5550
// PostSpans implements gRPC CollectorService.
5651
func (g *GRPCHandler) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest) (*api_v2.PostSpansResponse, error) {
57-
tenant, err := g.validateTenant(ctx)
58-
if err != nil {
59-
g.logger.Error("rejecting spans (tenancy)", zap.Error(err))
60-
return nil, err
61-
}
62-
6352
batch := &r.Batch
64-
err = g.batchConsumer.consume(batch, tenant)
53+
err := g.batchConsumer.consume(ctx, batch)
6554
return &api_v2.PostSpansResponse{}, err
6655
}
6756

6857
type batchConsumer struct {
6958
logger *zap.Logger
7059
spanProcessor processor.SpanProcessor
7160
spanOptions processor.SpansOptions
61+
tenancyConfig tenancy.TenancyConfig
7262
}
7363

74-
func (c *batchConsumer) consume(batch *model.Batch, tenant string) error {
64+
func newBatchConsumer(logger *zap.Logger, spanProcessor processor.SpanProcessor, transport processor.InboundTransport, spanFormat processor.SpanFormat, tenancyConfig *tenancy.TenancyConfig) batchConsumer {
65+
if tenancyConfig == nil {
66+
tenancyConfig = &tenancy.TenancyConfig{}
67+
}
68+
return batchConsumer{
69+
logger: logger,
70+
spanProcessor: spanProcessor,
71+
spanOptions: processor.SpansOptions{
72+
InboundTransport: transport,
73+
SpanFormat: spanFormat,
74+
},
75+
tenancyConfig: *tenancyConfig,
76+
}
77+
}
78+
79+
func (c *batchConsumer) consume(ctx context.Context, batch *model.Batch) error {
80+
tenant, err := c.validateTenant(ctx)
81+
if err != nil {
82+
c.logger.Error("rejecting spans (tenancy)", zap.Error(err))
83+
return err
84+
}
85+
7586
for _, span := range batch.Spans {
7687
if span.GetProcess() == nil {
7788
span.Process = batch.Process
7889
}
7990
}
80-
_, err := c.spanProcessor.ProcessSpans(batch.Spans, processor.SpansOptions{
91+
_, err = c.spanProcessor.ProcessSpans(batch.Spans, processor.SpansOptions{
8192
InboundTransport: processor.GRPCTransport,
8293
SpanFormat: processor.ProtoSpanFormat,
8394
Tenant: tenant,
@@ -92,8 +103,8 @@ func (c *batchConsumer) consume(batch *model.Batch, tenant string) error {
92103
return nil
93104
}
94105

95-
func (g *GRPCHandler) validateTenant(ctx context.Context) (string, error) {
96-
if !g.tenancyConfig.Enabled {
106+
func (c *batchConsumer) validateTenant(ctx context.Context) (string, error) {
107+
if !c.tenancyConfig.Enabled {
97108
return "", nil
98109
}
99110

@@ -102,14 +113,14 @@ func (g *GRPCHandler) validateTenant(ctx context.Context) (string, error) {
102113
return "", status.Errorf(codes.PermissionDenied, "missing tenant header")
103114
}
104115

105-
tenants := md[g.tenancyConfig.Header]
116+
tenants := md[c.tenancyConfig.Header]
106117
if len(tenants) < 1 {
107118
return "", status.Errorf(codes.PermissionDenied, "missing tenant header")
108119
} else if len(tenants) > 1 {
109120
return "", status.Errorf(codes.PermissionDenied, "extra tenant header")
110121
}
111122

112-
if !g.tenancyConfig.Valid(tenants[0]) {
123+
if !c.tenancyConfig.Valid(tenants[0]) {
113124
return "", status.Errorf(codes.PermissionDenied, "unknown tenant")
114125
}
115126

cmd/collector/app/handler/grpc_handler_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -350,7 +350,7 @@ func TestGetTenant(t *testing.T) {
350350
}))
351351
for _, test := range tests {
352352
t.Run(test.name, func(t *testing.T) {
353-
tenant, err := handler.validateTenant(test.ctx)
353+
tenant, err := handler.batchConsumer.validateTenant(test.ctx)
354354
if test.mustFail {
355355
require.Error(t, err)
356356
} else {

cmd/collector/app/handler/otlp_receiver.go

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -139,14 +139,11 @@ func applyTLSSettings(opts *tlscfg.Options) *configtls.TLSServerSetting {
139139

140140
func newConsumerDelegate(logger *zap.Logger, spanProcessor processor.SpanProcessor) *consumerDelegate {
141141
return &consumerDelegate{
142-
batchConsumer: batchConsumer{
143-
logger: logger,
144-
spanProcessor: spanProcessor,
145-
spanOptions: processor.SpansOptions{
146-
SpanFormat: processor.OTLPSpanFormat,
147-
InboundTransport: processor.UnknownTransport, // could be gRPC or HTTP
148-
},
149-
},
142+
batchConsumer: newBatchConsumer(logger,
143+
spanProcessor,
144+
processor.UnknownTransport, // could be gRPC or HTTP
145+
processor.OTLPSpanFormat,
146+
nil),
150147
protoFromTraces: otlp2jaeger.ProtoFromTraces,
151148
}
152149
}
@@ -156,13 +153,13 @@ type consumerDelegate struct {
156153
protoFromTraces func(td ptrace.Traces) ([]*model.Batch, error)
157154
}
158155

159-
func (c *consumerDelegate) consume(_ context.Context, td ptrace.Traces) error {
156+
func (c *consumerDelegate) consume(ctx context.Context, td ptrace.Traces) error {
160157
batches, err := c.protoFromTraces(td)
161158
if err != nil {
162159
return err
163160
}
164161
for _, batch := range batches {
165-
err := c.batchConsumer.consume(batch, "")
162+
err := c.batchConsumer.consume(ctx, batch)
166163
if err != nil {
167164
return err
168165
}

0 commit comments

Comments
 (0)