diff --git a/internal/obsreportconfig/obsreportconfig.go b/internal/obsreportconfig/obsreportconfig.go index ba8f482130c..cb15ad8d1cb 100644 --- a/internal/obsreportconfig/obsreportconfig.go +++ b/internal/obsreportconfig/obsreportconfig.go @@ -71,12 +71,7 @@ func allViews() []*view.View { views = append(views, receiverViews()...) // Scraper views. - measures = []*stats.Int64Measure{ - obsmetrics.ScraperScrapedMetricPoints, - obsmetrics.ScraperErroredMetricPoints, - } - tagKeys = []tag.Key{obsmetrics.TagKeyReceiver, obsmetrics.TagKeyScraper} - views = append(views, genViews(measures, tagKeys, view.Sum())...) + views = append(views, scraperViews()...) // Exporter views. measures = []*stats.Int64Measure{ @@ -136,6 +131,20 @@ func receiverViews() []*view.View { return genViews(measures, tagKeys, view.Sum()) } +func scraperViews() []*view.View { + if featuregate.GetRegistry().IsEnabled(UseOtelForInternalMetricsfeatureGateID) { + return nil + } + + measures := []*stats.Int64Measure{ + obsmetrics.ScraperScrapedMetricPoints, + obsmetrics.ScraperErroredMetricPoints, + } + tagKeys := []tag.Key{obsmetrics.TagKeyReceiver, obsmetrics.TagKeyScraper} + + return genViews(measures, tagKeys, view.Sum()) +} + func genViews( measures []*stats.Int64Measure, tagKeys []tag.Key, diff --git a/obsreport/obsreport_scraper.go b/obsreport/obsreport_scraper.go index fe70977b6be..8f6ba118bd4 100644 --- a/obsreport/obsreport_scraper.go +++ b/obsreport/obsreport_scraper.go @@ -21,15 +21,26 @@ import ( "go.opencensus.io/stats" "go.opencensus.io/tag" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric/instrument" + "go.opentelemetry.io/otel/metric/instrument/syncint64" + "go.opentelemetry.io/otel/metric/unit" "go.opentelemetry.io/otel/trace" + "go.uber.org/zap" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/config/configtelemetry" + "go.opentelemetry.io/collector/featuregate" + "go.opentelemetry.io/collector/internal/obsreportconfig" "go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics" "go.opentelemetry.io/collector/receiver/scrapererror" ) +var ( + scraperName = "scraper" + scraperScope = scopeName + nameSep + scraperName +) + // Scraper is a helper to add observability to a component.Scraper. type Scraper struct { level configtelemetry.Level @@ -37,6 +48,13 @@ type Scraper struct { scraper config.ComponentID mutators []tag.Mutator tracer trace.Tracer + + logger *zap.Logger + + useOtelForMetrics bool + otelAttrs []attribute.KeyValue + scrapedMetricsPoints syncint64.Counter + erroredMetricsPoints syncint64.Counter } // ScraperSettings are settings for creating a Scraper. @@ -48,7 +66,11 @@ type ScraperSettings struct { // NewScraper creates a new Scraper. func NewScraper(cfg ScraperSettings) *Scraper { - return &Scraper{ + return newScraper(cfg, featuregate.GetRegistry()) +} + +func newScraper(cfg ScraperSettings, registry *featuregate.Registry) *Scraper { + scraper := &Scraper{ level: cfg.ReceiverCreateSettings.TelemetrySettings.MetricsLevel, receiverID: cfg.ReceiverID, scraper: cfg.Scraper, @@ -56,7 +78,45 @@ func NewScraper(cfg ScraperSettings) *Scraper { tag.Upsert(obsmetrics.TagKeyReceiver, cfg.ReceiverID.String(), tag.WithTTL(tag.TTLNoPropagation)), tag.Upsert(obsmetrics.TagKeyScraper, cfg.Scraper.String(), tag.WithTTL(tag.TTLNoPropagation))}, tracer: cfg.ReceiverCreateSettings.TracerProvider.Tracer(cfg.Scraper.String()), + + logger: cfg.ReceiverCreateSettings.Logger, + useOtelForMetrics: registry.IsEnabled(obsreportconfig.UseOtelForInternalMetricsfeatureGateID), + otelAttrs: []attribute.KeyValue{ + attribute.String(obsmetrics.ReceiverKey, cfg.ReceiverID.String()), + attribute.String(obsmetrics.ScraperKey, cfg.Scraper.String()), + }, + } + + scraper.createOtelMetrics(cfg) + return scraper +} + +func (s *Scraper) createOtelMetrics(cfg ScraperSettings) { + if !s.useOtelForMetrics { + return + } + meter := cfg.ReceiverCreateSettings.MeterProvider.Meter(scraperScope) + + var err error + handleError := func(metricName string, err error) { + if err != nil { + s.logger.Warn("failed to create otel instrument", zap.Error(err), zap.String("metric", metricName)) + } } + + s.scrapedMetricsPoints, err = meter.SyncInt64().Counter( + obsmetrics.ScraperPrefix+obsmetrics.ScrapedMetricPointsKey, + instrument.WithDescription("Number of metric points successfully scraped."), + instrument.WithUnit(unit.Dimensionless), + ) + handleError(obsmetrics.ScraperPrefix+obsmetrics.ScrapedMetricPointsKey, err) + + s.erroredMetricsPoints, err = meter.SyncInt64().Counter( + obsmetrics.ScraperPrefix+obsmetrics.ErroredMetricPointsKey, + instrument.WithDescription("Number of metric points that were unable to be scraped."), + instrument.WithUnit(unit.Dimensionless), + ) + handleError(obsmetrics.ScraperPrefix+obsmetrics.ErroredMetricPointsKey, err) } // StartMetricsOp is called when a scrape operation is started. The @@ -91,10 +151,7 @@ func (s *Scraper) EndMetricsOp( span := trace.SpanFromContext(scraperCtx) if s.level != configtelemetry.LevelNone { - stats.Record( - scraperCtx, - obsmetrics.ScraperScrapedMetricPoints.M(int64(numScrapedMetrics)), - obsmetrics.ScraperErroredMetricPoints.M(int64(numErroredMetrics))) + s.recordMetrics(scraperCtx, numScrapedMetrics, numErroredMetrics) } // end span according to errors @@ -109,3 +166,15 @@ func (s *Scraper) EndMetricsOp( span.End() } + +func (s *Scraper) recordMetrics(scraperCtx context.Context, numScrapedMetrics, numErroredMetrics int) { + if s.useOtelForMetrics { + s.scrapedMetricsPoints.Add(scraperCtx, int64(numScrapedMetrics), s.otelAttrs...) + s.erroredMetricsPoints.Add(scraperCtx, int64(numErroredMetrics), s.otelAttrs...) + } else { // OC for metrics + stats.Record( + scraperCtx, + obsmetrics.ScraperScrapedMetricPoints.M(int64(numScrapedMetrics)), + obsmetrics.ScraperErroredMetricPoints.M(int64(numErroredMetrics))) + } +} \ No newline at end of file diff --git a/obsreport/obsreport_test.go b/obsreport/obsreport_test.go index 69dbbc250b6..9b85f644ca6 100644 --- a/obsreport/obsreport_test.go +++ b/obsreport/obsreport_test.go @@ -214,61 +214,59 @@ func TestReceiveMetricsOp(t *testing.T) { } func TestScrapeMetricsDataOp(t *testing.T) { - tt, err := obsreporttest.SetupTelemetry() - require.NoError(t, err) - t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) - - parentCtx, parentSpan := tt.TracerProvider.Tracer("test").Start(context.Background(), t.Name()) - defer parentSpan.End() + testTelemetry(t, func(tt obsreporttest.TestTelemetry, registry *featuregate.Registry) { + parentCtx, parentSpan := tt.TracerProvider.Tracer("test").Start(context.Background(), t.Name()) + defer parentSpan.End() - params := []testParams{ - {items: 23, err: partialErrFake}, - {items: 29, err: errFake}, - {items: 15, err: nil}, - } - for i := range params { - scrp := NewScraper(ScraperSettings{ - ReceiverID: receiver, - Scraper: scraper, - ReceiverCreateSettings: tt.ToReceiverCreateSettings(), - }) - ctx := scrp.StartMetricsOp(parentCtx) - assert.NotNil(t, ctx) - scrp.EndMetricsOp(ctx, params[i].items, params[i].err) - } + params := []testParams{ + {items: 23, err: partialErrFake}, + {items: 29, err: errFake}, + {items: 15, err: nil}, + } + for i := range params { + scrp := newScraper(ScraperSettings{ + ReceiverID: receiver, + Scraper: scraper, + ReceiverCreateSettings: tt.ToReceiverCreateSettings(), + }, registry) + ctx := scrp.StartMetricsOp(parentCtx) + assert.NotNil(t, ctx) + scrp.EndMetricsOp(ctx, params[i].items, params[i].err) + } - spans := tt.SpanRecorder.Ended() - require.Equal(t, len(params), len(spans)) + spans := tt.SpanRecorder.Ended() + require.Equal(t, len(params), len(spans)) - var scrapedMetricPoints, erroredMetricPoints int - for i, span := range spans { - assert.Equal(t, "scraper/"+receiver.String()+"/"+scraper.String()+"/MetricsScraped", span.Name()) - switch { - case params[i].err == nil: - scrapedMetricPoints += params[i].items - require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ScrapedMetricPointsKey, Value: attribute.Int64Value(int64(params[i].items))}) - require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ErroredMetricPointsKey, Value: attribute.Int64Value(0)}) - assert.Equal(t, codes.Unset, span.Status().Code) - case errors.Is(params[i].err, errFake): - erroredMetricPoints += params[i].items - require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ScrapedMetricPointsKey, Value: attribute.Int64Value(0)}) - require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ErroredMetricPointsKey, Value: attribute.Int64Value(int64(params[i].items))}) - assert.Equal(t, codes.Error, span.Status().Code) - assert.Equal(t, params[i].err.Error(), span.Status().Description) + var scrapedMetricPoints, erroredMetricPoints int + for i, span := range spans { + assert.Equal(t, "scraper/"+receiver.String()+"/"+scraper.String()+"/MetricsScraped", span.Name()) + switch { + case params[i].err == nil: + scrapedMetricPoints += params[i].items + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ScrapedMetricPointsKey, Value: attribute.Int64Value(int64(params[i].items))}) + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ErroredMetricPointsKey, Value: attribute.Int64Value(0)}) + assert.Equal(t, codes.Unset, span.Status().Code) + case errors.Is(params[i].err, errFake): + erroredMetricPoints += params[i].items + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ScrapedMetricPointsKey, Value: attribute.Int64Value(0)}) + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ErroredMetricPointsKey, Value: attribute.Int64Value(int64(params[i].items))}) + assert.Equal(t, codes.Error, span.Status().Code) + assert.Equal(t, params[i].err.Error(), span.Status().Description) - case errors.Is(params[i].err, partialErrFake): - scrapedMetricPoints += params[i].items - erroredMetricPoints++ - require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ScrapedMetricPointsKey, Value: attribute.Int64Value(int64(params[i].items))}) - require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ErroredMetricPointsKey, Value: attribute.Int64Value(1)}) - assert.Equal(t, codes.Error, span.Status().Code) - assert.Equal(t, params[i].err.Error(), span.Status().Description) - default: - t.Fatalf("unexpected err param: %v", params[i].err) + case errors.Is(params[i].err, partialErrFake): + scrapedMetricPoints += params[i].items + erroredMetricPoints++ + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ScrapedMetricPointsKey, Value: attribute.Int64Value(int64(params[i].items))}) + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ErroredMetricPointsKey, Value: attribute.Int64Value(1)}) + assert.Equal(t, codes.Error, span.Status().Code) + assert.Equal(t, params[i].err.Error(), span.Status().Description) + default: + t.Fatalf("unexpected err param: %v", params[i].err) + } } - } - require.NoError(t, obsreporttest.CheckScraperMetrics(tt, receiver, scraper, int64(scrapedMetricPoints), int64(erroredMetricPoints))) + require.NoError(t, obsreporttest.CheckScraperMetrics(tt, receiver, scraper, int64(scrapedMetricPoints), int64(erroredMetricPoints))) + }) } func TestExportTraceDataOp(t *testing.T) { diff --git a/obsreport/obsreporttest/obsreporttest.go b/obsreport/obsreporttest/obsreporttest.go index c506535ec18..3978868b752 100644 --- a/obsreport/obsreporttest/obsreporttest.go +++ b/obsreport/obsreporttest/obsreporttest.go @@ -224,11 +224,8 @@ func CheckReceiverMetrics(tts TestTelemetry, receiver config.ComponentID, protoc // CheckScraperMetrics checks that for the current exported values for metrics scraper metrics match given values. // When this function is called it is required to also call SetupTelemetry as first thing. -func CheckScraperMetrics(_ TestTelemetry, receiver config.ComponentID, scraper config.ComponentID, scrapedMetricPoints, erroredMetricPoints int64) error { - scraperTags := tagsForScraperView(receiver, scraper) - return multierr.Combine( - checkValueForView(scraperTags, scrapedMetricPoints, "scraper/scraped_metric_points"), - checkValueForView(scraperTags, erroredMetricPoints, "scraper/errored_metric_points")) +func CheckScraperMetrics(tts TestTelemetry, receiver config.ComponentID, scraper config.ComponentID, scrapedMetricPoints, erroredMetricPoints int64) error { + return tts.otelPrometheusChecker.checkScraperMetrics(receiver, scraper, scrapedMetricPoints, erroredMetricPoints) } // checkValueForView checks that for the current exported value in the view with the given name @@ -256,14 +253,6 @@ func checkValueForView(wantTags []tag.Tag, value int64, vName string) error { return fmt.Errorf("[%s]: could not find tags, wantTags: %s in rows %v", vName, wantTags, rows) } -// tagsForScraperView returns the tags that are needed for the scraper views. -func tagsForScraperView(receiver config.ComponentID, scraper config.ComponentID) []tag.Tag { - return []tag.Tag{ - {Key: receiverTag, Value: receiver.String()}, - {Key: scraperTag, Value: scraper.String()}, - } -} - // tagsForProcessorView returns the tags that are needed for the processor views. func tagsForProcessorView(processor config.ComponentID) []tag.Tag { return []tag.Tag{ diff --git a/obsreport/obsreporttest/obsreporttest_test.go b/obsreport/obsreporttest/obsreporttest_test.go index fe3f5108ec1..007fd232953 100644 --- a/obsreport/obsreporttest/obsreporttest_test.go +++ b/obsreport/obsreporttest/obsreporttest_test.go @@ -32,10 +32,31 @@ const ( ) var ( + scraper = config.NewComponentID("fakeScraper") receiver = config.NewComponentID("fakeReicever") exporter = config.NewComponentID("fakeExporter") ) +func TestCheckScraperMetricsViews(t *testing.T) { + tt, err := obsreporttest.SetupTelemetry() + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) + + s := obsreport.NewScraper(obsreport.ScraperSettings{ + ReceiverID: receiver, + Scraper: scraper, + ReceiverCreateSettings: tt.ToReceiverCreateSettings(), + }) + ctx := s.StartMetricsOp(context.Background()) + require.NotNil(t, ctx) + s.EndMetricsOp(ctx, 7, nil) + + assert.NoError(t, obsreporttest.CheckScraperMetrics(tt, receiver, scraper, 7, 0)) + assert.Error(t, obsreporttest.CheckScraperMetrics(tt, receiver, scraper, 7, 7)) + assert.Error(t, obsreporttest.CheckScraperMetrics(tt, receiver, scraper, 0, 0)) + assert.Error(t, obsreporttest.CheckScraperMetrics(tt, receiver, scraper, 0, 7)) +} + func TestCheckReceiverTracesViews(t *testing.T) { tt, err := obsreporttest.SetupTelemetry() require.NoError(t, err) diff --git a/obsreport/obsreporttest/otelprometheuschecker.go b/obsreport/obsreporttest/otelprometheuschecker.go index 9c6586de998..35946b38bdc 100644 --- a/obsreport/obsreporttest/otelprometheuschecker.go +++ b/obsreport/obsreporttest/otelprometheuschecker.go @@ -34,6 +34,13 @@ type prometheusChecker struct { promHandler http.Handler } +func (pc *prometheusChecker) checkScraperMetrics(receiver config.ComponentID, scraper config.ComponentID, scrapedMetricPoints, erroredMetricPoints int64) error{ + scraperAttrs := attributesForScraperMetrics(receiver, scraper) + return multierr.Combine( + pc.checkCounter("scraper_scraped_metric_points", scrapedMetricPoints, scraperAttrs), + pc.checkCounter("scraper_errored_metric_points", erroredMetricPoints, scraperAttrs)) +} + func (pc *prometheusChecker) checkReceiverTraces(receiver config.ComponentID, protocol string, acceptedSpans, droppedSpans int64) error { receiverAttrs := attributesForReceiverMetrics(receiver, protocol) return multierr.Combine( @@ -124,6 +131,12 @@ func fetchPrometheusMetrics(handler http.Handler) (map[string]*io_prometheus_cli return parser.TextToMetricFamilies(rr.Body) } +func attributesForScraperMetrics(receiver config.ComponentID, scraper config.ComponentID) []attribute.KeyValue { + return []attribute.KeyValue{ + attribute.String(receiverTag.Name(), receiver.String()), + attribute.String(scraperTag.Name(), scraper.String()), + } +} // attributesForReceiverMetrics returns the attributes that are needed for the receiver metrics. func attributesForReceiverMetrics(receiver config.ComponentID, transport string) []attribute.KeyValue { return []attribute.KeyValue{