@@ -123,100 +123,103 @@ func TestBatchProcessorSpansDeliveredEnforceBatchSize(t *testing.T) {
123
123
}
124
124
125
125
func TestBatchProcessorSentBySize (t * testing.T ) {
126
+ telemetryTest (t , testBatchProcessorSentBySize )
127
+ }
128
+
129
+ func testBatchProcessorSentBySize (t * testing.T , tel testTelemetry , registry * featuregate.Registry ) {
126
130
sizer := & ptrace.ProtoMarshaler {}
127
- telemetryTest (t , func (t * testing.T , tel testTelemetry , registry * featuregate.Registry ) {
128
- sink := new (consumertest.TracesSink )
129
- cfg := createDefaultConfig ().(* Config )
130
- sendBatchSize := 20
131
- cfg .SendBatchSize = uint32 (sendBatchSize )
132
- cfg .Timeout = 500 * time .Millisecond
133
- creationSet := tel .NewProcessorCreateSettings ()
134
- batcher , err := newBatchTracesProcessor (creationSet , sink , cfg , configtelemetry .LevelDetailed , registry )
135
- require .NoError (t , err )
136
- require .NoError (t , batcher .Start (context .Background (), componenttest .NewNopHost ()))
137
-
138
- requestCount := 100
139
- spansPerRequest := 5
140
-
141
- start := time .Now ()
142
- sizeSum := 0
143
- for requestNum := 0 ; requestNum < requestCount ; requestNum ++ {
144
- td := testdata .GenerateTraces (spansPerRequest )
145
- sizeSum += sizer .TracesSize (td )
146
- assert .NoError (t , batcher .ConsumeTraces (context .Background (), td ))
147
- }
131
+ sink := new (consumertest.TracesSink )
132
+ cfg := createDefaultConfig ().(* Config )
133
+ sendBatchSize := 20
134
+ cfg .SendBatchSize = uint32 (sendBatchSize )
135
+ cfg .Timeout = 500 * time .Millisecond
136
+ creationSet := tel .NewProcessorCreateSettings ()
137
+ batcher , err := newBatchTracesProcessor (creationSet , sink , cfg , configtelemetry .LevelDetailed , registry )
138
+ require .NoError (t , err )
139
+ require .NoError (t , batcher .Start (context .Background (), componenttest .NewNopHost ()))
140
+
141
+ requestCount := 100
142
+ spansPerRequest := 5
143
+
144
+ start := time .Now ()
145
+ sizeSum := 0
146
+ for requestNum := 0 ; requestNum < requestCount ; requestNum ++ {
147
+ td := testdata .GenerateTraces (spansPerRequest )
148
+ sizeSum += sizer .TracesSize (td )
149
+ assert .NoError (t , batcher .ConsumeTraces (context .Background (), td ))
150
+ }
148
151
149
- require .NoError (t , batcher .Shutdown (context .Background ()))
152
+ require .NoError (t , batcher .Shutdown (context .Background ()))
150
153
151
- elapsed := time .Since (start )
152
- require .LessOrEqual (t , elapsed .Nanoseconds (), cfg .Timeout .Nanoseconds ())
154
+ elapsed := time .Since (start )
155
+ require .LessOrEqual (t , elapsed .Nanoseconds (), cfg .Timeout .Nanoseconds ())
153
156
154
- expectedBatchesNum := requestCount * spansPerRequest / sendBatchSize
155
- expectedBatchingFactor := sendBatchSize / spansPerRequest
157
+ expectedBatchesNum := requestCount * spansPerRequest / sendBatchSize
158
+ expectedBatchingFactor := sendBatchSize / spansPerRequest
156
159
157
- require .Equal (t , requestCount * spansPerRequest , sink .SpanCount ())
158
- receivedTraces := sink .AllTraces ()
159
- require .EqualValues (t , expectedBatchesNum , len (receivedTraces ))
160
- for _ , td := range receivedTraces {
161
- rss := td .ResourceSpans ()
162
- require .Equal (t , expectedBatchingFactor , rss .Len ())
163
- for i := 0 ; i < expectedBatchingFactor ; i ++ {
164
- require .Equal (t , spansPerRequest , rss .At (i ).ScopeSpans ().At (0 ).Spans ().Len ())
165
- }
160
+ require .Equal (t , requestCount * spansPerRequest , sink .SpanCount ())
161
+ receivedTraces := sink .AllTraces ()
162
+ require .EqualValues (t , expectedBatchesNum , len (receivedTraces ))
163
+ for _ , td := range receivedTraces {
164
+ rss := td .ResourceSpans ()
165
+ require .Equal (t , expectedBatchingFactor , rss .Len ())
166
+ for i := 0 ; i < expectedBatchingFactor ; i ++ {
167
+ require .Equal (t , spansPerRequest , rss .At (i ).ScopeSpans ().At (0 ).Spans ().Len ())
166
168
}
169
+ }
167
170
168
- tel .assertMetrics (t , expectedMetrics {
169
- sendCount : float64 (expectedBatchesNum ),
170
- sendSizeSum : float64 (sink .SpanCount ()),
171
- sendSizeBytesSum : float64 (sizeSum ),
172
- sizeTrigger : float64 (expectedBatchesNum ),
173
- })
171
+ tel .assertMetrics (t , expectedMetrics {
172
+ sendCount : float64 (expectedBatchesNum ),
173
+ sendSizeSum : float64 (sink .SpanCount ()),
174
+ sendSizeBytesSum : float64 (sizeSum ),
175
+ sizeTrigger : float64 (expectedBatchesNum ),
174
176
})
175
-
176
177
}
177
178
178
179
func TestBatchProcessorSentBySize_withMaxSize (t * testing.T ) {
179
- telemetryTest (t , func (t * testing.T , tel testTelemetry , registry * featuregate.Registry ) {
180
- sink := new (consumertest.TracesSink )
181
- cfg := createDefaultConfig ().(* Config )
182
- sendBatchSize := 20
183
- sendBatchMaxSize := 37
184
- cfg .SendBatchSize = uint32 (sendBatchSize )
185
- cfg .SendBatchMaxSize = uint32 (sendBatchMaxSize )
186
- cfg .Timeout = 500 * time .Millisecond
187
- creationSet := tel .NewProcessorCreateSettings ()
188
- batcher , err := newBatchTracesProcessor (creationSet , sink , cfg , configtelemetry .LevelDetailed , registry )
189
- require .NoError (t , err )
190
- require .NoError (t , batcher .Start (context .Background (), componenttest .NewNopHost ()))
191
-
192
- requestCount := 1
193
- spansPerRequest := 500
194
- totalSpans := requestCount * spansPerRequest
195
-
196
- start := time .Now ()
197
- for requestNum := 0 ; requestNum < requestCount ; requestNum ++ {
198
- td := testdata .GenerateTraces (spansPerRequest )
199
- assert .NoError (t , batcher .ConsumeTraces (context .Background (), td ))
200
- }
180
+ telemetryTest (t , testBatchProcessorSentBySize_withMaxSize )
181
+ }
201
182
202
- require .NoError (t , batcher .Shutdown (context .Background ()))
183
+ func testBatchProcessorSentBySize_withMaxSize (t * testing.T , tel testTelemetry , registry * featuregate.Registry ) {
184
+ sink := new (consumertest.TracesSink )
185
+ cfg := createDefaultConfig ().(* Config )
186
+ sendBatchSize := 20
187
+ sendBatchMaxSize := 37
188
+ cfg .SendBatchSize = uint32 (sendBatchSize )
189
+ cfg .SendBatchMaxSize = uint32 (sendBatchMaxSize )
190
+ cfg .Timeout = 500 * time .Millisecond
191
+ creationSet := tel .NewProcessorCreateSettings ()
192
+ batcher , err := newBatchTracesProcessor (creationSet , sink , cfg , configtelemetry .LevelDetailed , registry )
193
+ require .NoError (t , err )
194
+ require .NoError (t , batcher .Start (context .Background (), componenttest .NewNopHost ()))
203
195
204
- elapsed := time .Since (start )
205
- require .LessOrEqual (t , elapsed .Nanoseconds (), cfg .Timeout .Nanoseconds ())
196
+ requestCount := 1
197
+ spansPerRequest := 500
198
+ totalSpans := requestCount * spansPerRequest
206
199
207
- // The max batch size is not a divisor of the total number of spans
208
- expectedBatchesNum := int (math .Ceil (float64 (totalSpans ) / float64 (sendBatchMaxSize )))
200
+ start := time .Now ()
201
+ for requestNum := 0 ; requestNum < requestCount ; requestNum ++ {
202
+ td := testdata .GenerateTraces (spansPerRequest )
203
+ assert .NoError (t , batcher .ConsumeTraces (context .Background (), td ))
204
+ }
205
+
206
+ require .NoError (t , batcher .Shutdown (context .Background ()))
207
+
208
+ elapsed := time .Since (start )
209
+ require .LessOrEqual (t , elapsed .Nanoseconds (), cfg .Timeout .Nanoseconds ())
209
210
210
- require .Equal (t , totalSpans , sink .SpanCount ())
211
- receivedTraces := sink .AllTraces ()
212
- require .EqualValues (t , expectedBatchesNum , len (receivedTraces ))
211
+ // The max batch size is not a divisor of the total number of spans
212
+ expectedBatchesNum := int (math .Ceil (float64 (totalSpans ) / float64 (sendBatchMaxSize )))
213
213
214
- tel .assertMetrics (t , expectedMetrics {
215
- sendCount : float64 (expectedBatchesNum ),
216
- sendSizeSum : float64 (sink .SpanCount ()),
217
- sizeTrigger : math .Floor (float64 (totalSpans ) / float64 (sendBatchMaxSize )),
218
- timeoutTrigger : 1 ,
219
- })
214
+ require .Equal (t , totalSpans , sink .SpanCount ())
215
+ receivedTraces := sink .AllTraces ()
216
+ require .EqualValues (t , expectedBatchesNum , len (receivedTraces ))
217
+
218
+ tel .assertMetrics (t , expectedMetrics {
219
+ sendCount : float64 (expectedBatchesNum ),
220
+ sendSizeSum : float64 (sink .SpanCount ()),
221
+ sizeTrigger : math .Floor (float64 (totalSpans ) / float64 (sendBatchMaxSize )),
222
+ timeoutTrigger : 1 ,
220
223
})
221
224
}
222
225
@@ -346,59 +349,61 @@ func TestBatchMetricProcessor_ReceivingData(t *testing.T) {
346
349
}
347
350
348
351
func TestBatchMetricProcessor_BatchSize (t * testing.T ) {
352
+ telemetryTest (t , testBatchMetricProcessor_BatchSize )
353
+ }
354
+
355
+ func testBatchMetricProcessor_BatchSize (t * testing.T , tel testTelemetry , registry * featuregate.Registry ) {
349
356
sizer := & pmetric.ProtoMarshaler {}
350
- telemetryTest (t , func (t * testing.T , tel testTelemetry , registry * featuregate.Registry ) {
351
-
352
- // Instantiate the batch processor with low config values to test data
353
- // gets sent through the processor.
354
- cfg := Config {
355
- ProcessorSettings : config .NewProcessorSettings (component .NewID (typeStr )),
356
- Timeout : 100 * time .Millisecond ,
357
- SendBatchSize : 50 ,
358
- }
359
357
360
- requestCount := 100
361
- metricsPerRequest := 5
362
- dataPointsPerMetric := 2 // Since the int counter uses two datapoints.
363
- dataPointsPerRequest := metricsPerRequest * dataPointsPerMetric
364
- sink := new (consumertest.MetricsSink )
365
-
366
- creationSet := tel .NewProcessorCreateSettings ()
367
- batcher , err := newBatchMetricsProcessor (creationSet , sink , & cfg , configtelemetry .LevelDetailed , registry )
368
- require .NoError (t , err )
369
- require .NoError (t , batcher .Start (context .Background (), componenttest .NewNopHost ()))
370
-
371
- start := time .Now ()
372
- size := 0
373
- for requestNum := 0 ; requestNum < requestCount ; requestNum ++ {
374
- md := testdata .GenerateMetrics (metricsPerRequest )
375
- size += sizer .MetricsSize (md )
376
- assert .NoError (t , batcher .ConsumeMetrics (context .Background (), md ))
377
- }
378
- require .NoError (t , batcher .Shutdown (context .Background ()))
358
+ // Instantiate the batch processor with low config values to test data
359
+ // gets sent through the processor.
360
+ cfg := Config {
361
+ ProcessorSettings : config .NewProcessorSettings (component .NewID (typeStr )),
362
+ Timeout : 100 * time .Millisecond ,
363
+ SendBatchSize : 50 ,
364
+ }
379
365
380
- elapsed := time .Since (start )
381
- require .LessOrEqual (t , elapsed .Nanoseconds (), cfg .Timeout .Nanoseconds ())
366
+ requestCount := 100
367
+ metricsPerRequest := 5
368
+ dataPointsPerMetric := 2 // Since the int counter uses two datapoints.
369
+ dataPointsPerRequest := metricsPerRequest * dataPointsPerMetric
370
+ sink := new (consumertest.MetricsSink )
382
371
383
- expectedBatchesNum := requestCount * dataPointsPerRequest / int (cfg .SendBatchSize )
384
- expectedBatchingFactor := int (cfg .SendBatchSize ) / dataPointsPerRequest
372
+ creationSet := tel .NewProcessorCreateSettings ()
373
+ batcher , err := newBatchMetricsProcessor (creationSet , sink , & cfg , configtelemetry .LevelDetailed , registry )
374
+ require .NoError (t , err )
375
+ require .NoError (t , batcher .Start (context .Background (), componenttest .NewNopHost ()))
385
376
386
- require .Equal (t , requestCount * 2 * metricsPerRequest , sink .DataPointCount ())
387
- receivedMds := sink .AllMetrics ()
388
- require .Equal (t , expectedBatchesNum , len (receivedMds ))
389
- for _ , md := range receivedMds {
390
- require .Equal (t , expectedBatchingFactor , md .ResourceMetrics ().Len ())
391
- for i := 0 ; i < expectedBatchingFactor ; i ++ {
392
- require .Equal (t , metricsPerRequest , md .ResourceMetrics ().At (i ).ScopeMetrics ().At (0 ).Metrics ().Len ())
393
- }
377
+ start := time .Now ()
378
+ size := 0
379
+ for requestNum := 0 ; requestNum < requestCount ; requestNum ++ {
380
+ md := testdata .GenerateMetrics (metricsPerRequest )
381
+ size += sizer .MetricsSize (md )
382
+ assert .NoError (t , batcher .ConsumeMetrics (context .Background (), md ))
383
+ }
384
+ require .NoError (t , batcher .Shutdown (context .Background ()))
385
+
386
+ elapsed := time .Since (start )
387
+ require .LessOrEqual (t , elapsed .Nanoseconds (), cfg .Timeout .Nanoseconds ())
388
+
389
+ expectedBatchesNum := requestCount * dataPointsPerRequest / int (cfg .SendBatchSize )
390
+ expectedBatchingFactor := int (cfg .SendBatchSize ) / dataPointsPerRequest
391
+
392
+ require .Equal (t , requestCount * 2 * metricsPerRequest , sink .DataPointCount ())
393
+ receivedMds := sink .AllMetrics ()
394
+ require .Equal (t , expectedBatchesNum , len (receivedMds ))
395
+ for _ , md := range receivedMds {
396
+ require .Equal (t , expectedBatchingFactor , md .ResourceMetrics ().Len ())
397
+ for i := 0 ; i < expectedBatchingFactor ; i ++ {
398
+ require .Equal (t , metricsPerRequest , md .ResourceMetrics ().At (i ).ScopeMetrics ().At (0 ).Metrics ().Len ())
394
399
}
400
+ }
395
401
396
- tel .assertMetrics (t , expectedMetrics {
397
- sendCount : float64 (expectedBatchesNum ),
398
- sendSizeSum : float64 (sink .DataPointCount ()),
399
- sendSizeBytesSum : float64 (size ),
400
- sizeTrigger : 20 ,
401
- })
402
+ tel .assertMetrics (t , expectedMetrics {
403
+ sendCount : float64 (expectedBatchesNum ),
404
+ sendSizeSum : float64 (sink .DataPointCount ()),
405
+ sendSizeBytesSum : float64 (size ),
406
+ sizeTrigger : 20 ,
402
407
})
403
408
}
404
409
@@ -654,57 +659,59 @@ func TestBatchLogProcessor_ReceivingData(t *testing.T) {
654
659
}
655
660
656
661
func TestBatchLogProcessor_BatchSize (t * testing.T ) {
662
+ telemetryTest (t , testBatchLogProcessor_BatchSize )
663
+ }
664
+
665
+ func testBatchLogProcessor_BatchSize (t * testing.T , tel testTelemetry , registry * featuregate.Registry ) {
657
666
sizer := & plog.ProtoMarshaler {}
658
- telemetryTest (t , func (t * testing.T , tel testTelemetry , registry * featuregate.Registry ) {
659
-
660
- // Instantiate the batch processor with low config values to test data
661
- // gets sent through the processor.
662
- cfg := Config {
663
- ProcessorSettings : config .NewProcessorSettings (component .NewID (typeStr )),
664
- Timeout : 100 * time .Millisecond ,
665
- SendBatchSize : 50 ,
666
- }
667
667
668
- requestCount := 100
669
- logsPerRequest := 5
670
- sink := new (consumertest.LogsSink )
671
-
672
- creationSet := tel .NewProcessorCreateSettings ()
673
- batcher , err := newBatchLogsProcessor (creationSet , sink , & cfg , configtelemetry .LevelDetailed , registry )
674
- require .NoError (t , err )
675
- require .NoError (t , batcher .Start (context .Background (), componenttest .NewNopHost ()))
676
-
677
- start := time .Now ()
678
- size := 0
679
- for requestNum := 0 ; requestNum < requestCount ; requestNum ++ {
680
- ld := testdata .GenerateLogs (logsPerRequest )
681
- size += sizer .LogsSize (ld )
682
- assert .NoError (t , batcher .ConsumeLogs (context .Background (), ld ))
683
- }
684
- require .NoError (t , batcher .Shutdown (context .Background ()))
668
+ // Instantiate the batch processor with low config values to test data
669
+ // gets sent through the processor.
670
+ cfg := Config {
671
+ ProcessorSettings : config .NewProcessorSettings (component .NewID (typeStr )),
672
+ Timeout : 100 * time .Millisecond ,
673
+ SendBatchSize : 50 ,
674
+ }
685
675
686
- elapsed := time .Since (start )
687
- require .LessOrEqual (t , elapsed .Nanoseconds (), cfg .Timeout .Nanoseconds ())
676
+ requestCount := 100
677
+ logsPerRequest := 5
678
+ sink := new (consumertest.LogsSink )
688
679
689
- expectedBatchesNum := requestCount * logsPerRequest / int (cfg .SendBatchSize )
690
- expectedBatchingFactor := int (cfg .SendBatchSize ) / logsPerRequest
680
+ creationSet := tel .NewProcessorCreateSettings ()
681
+ batcher , err := newBatchLogsProcessor (creationSet , sink , & cfg , configtelemetry .LevelDetailed , registry )
682
+ require .NoError (t , err )
683
+ require .NoError (t , batcher .Start (context .Background (), componenttest .NewNopHost ()))
691
684
692
- require .Equal (t , requestCount * logsPerRequest , sink .LogRecordCount ())
693
- receivedMds := sink .AllLogs ()
694
- require .Equal (t , expectedBatchesNum , len (receivedMds ))
695
- for _ , ld := range receivedMds {
696
- require .Equal (t , expectedBatchingFactor , ld .ResourceLogs ().Len ())
697
- for i := 0 ; i < expectedBatchingFactor ; i ++ {
698
- require .Equal (t , logsPerRequest , ld .ResourceLogs ().At (i ).ScopeLogs ().At (0 ).LogRecords ().Len ())
699
- }
685
+ start := time .Now ()
686
+ size := 0
687
+ for requestNum := 0 ; requestNum < requestCount ; requestNum ++ {
688
+ ld := testdata .GenerateLogs (logsPerRequest )
689
+ size += sizer .LogsSize (ld )
690
+ assert .NoError (t , batcher .ConsumeLogs (context .Background (), ld ))
691
+ }
692
+ require .NoError (t , batcher .Shutdown (context .Background ()))
693
+
694
+ elapsed := time .Since (start )
695
+ require .LessOrEqual (t , elapsed .Nanoseconds (), cfg .Timeout .Nanoseconds ())
696
+
697
+ expectedBatchesNum := requestCount * logsPerRequest / int (cfg .SendBatchSize )
698
+ expectedBatchingFactor := int (cfg .SendBatchSize ) / logsPerRequest
699
+
700
+ require .Equal (t , requestCount * logsPerRequest , sink .LogRecordCount ())
701
+ receivedMds := sink .AllLogs ()
702
+ require .Equal (t , expectedBatchesNum , len (receivedMds ))
703
+ for _ , ld := range receivedMds {
704
+ require .Equal (t , expectedBatchingFactor , ld .ResourceLogs ().Len ())
705
+ for i := 0 ; i < expectedBatchingFactor ; i ++ {
706
+ require .Equal (t , logsPerRequest , ld .ResourceLogs ().At (i ).ScopeLogs ().At (0 ).LogRecords ().Len ())
700
707
}
708
+ }
701
709
702
- tel .assertMetrics (t , expectedMetrics {
703
- sendCount : float64 (expectedBatchesNum ),
704
- sendSizeSum : float64 (sink .LogRecordCount ()),
705
- sendSizeBytesSum : float64 (size ),
706
- sizeTrigger : float64 (expectedBatchesNum ),
707
- })
710
+ tel .assertMetrics (t , expectedMetrics {
711
+ sendCount : float64 (expectedBatchesNum ),
712
+ sendSizeSum : float64 (sink .LogRecordCount ()),
713
+ sendSizeBytesSum : float64 (size ),
714
+ sizeTrigger : float64 (expectedBatchesNum ),
708
715
})
709
716
}
710
717
0 commit comments