Skip to content

Commit d20cc05

Browse files
committed
Instrument obsreport.Scraper (#19)
* instrument obsreport.scraper metrics with otel go
1 parent abf4a50 commit d20cc05

File tree

6 files changed

+152
-57
lines changed

6 files changed

+152
-57
lines changed

internal/obsreportconfig/obsreportconfig.go

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -71,12 +71,7 @@ func allViews() []*view.View {
7171
views = append(views, receiverViews()...)
7272

7373
// Scraper views.
74-
measures = []*stats.Int64Measure{
75-
obsmetrics.ScraperScrapedMetricPoints,
76-
obsmetrics.ScraperErroredMetricPoints,
77-
}
78-
tagKeys = []tag.Key{obsmetrics.TagKeyReceiver, obsmetrics.TagKeyScraper}
79-
views = append(views, genViews(measures, tagKeys, view.Sum())...)
74+
views = append(views, scraperViews()...)
8075

8176
// Exporter views.
8277
measures = []*stats.Int64Measure{
@@ -136,6 +131,20 @@ func receiverViews() []*view.View {
136131
return genViews(measures, tagKeys, view.Sum())
137132
}
138133

134+
func scraperViews() []*view.View {
135+
if featuregate.GetRegistry().IsEnabled(UseOtelForInternalMetricsfeatureGateID) {
136+
return nil
137+
}
138+
139+
measures := []*stats.Int64Measure{
140+
obsmetrics.ScraperScrapedMetricPoints,
141+
obsmetrics.ScraperErroredMetricPoints,
142+
}
143+
tagKeys := []tag.Key{obsmetrics.TagKeyReceiver, obsmetrics.TagKeyScraper}
144+
145+
return genViews(measures, tagKeys, view.Sum())
146+
}
147+
139148
func genViews(
140149
measures []*stats.Int64Measure,
141150
tagKeys []tag.Key,

obsreport/obsreport_scraper.go

Lines changed: 69 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,22 +21,40 @@ import (
2121
"go.opencensus.io/stats"
2222
"go.opencensus.io/tag"
2323
"go.opentelemetry.io/otel/attribute"
24+
"go.opentelemetry.io/otel/metric/instrument"
25+
"go.opentelemetry.io/otel/metric/instrument/syncint64"
26+
"go.opentelemetry.io/otel/metric/unit"
2427
"go.opentelemetry.io/otel/trace"
28+
"go.uber.org/zap"
2529

2630
"go.opentelemetry.io/collector/component"
2731
"go.opentelemetry.io/collector/config"
2832
"go.opentelemetry.io/collector/config/configtelemetry"
33+
"go.opentelemetry.io/collector/featuregate"
34+
"go.opentelemetry.io/collector/internal/obsreportconfig"
2935
"go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics"
3036
"go.opentelemetry.io/collector/receiver/scrapererror"
3137
)
3238

39+
var (
40+
scraperName = "scraper"
41+
scraperScope = scopeName + nameSep + scraperName
42+
)
43+
3344
// Scraper is a helper to add observability to a component.Scraper.
3445
type Scraper struct {
3546
level configtelemetry.Level
3647
receiverID config.ComponentID
3748
scraper config.ComponentID
3849
mutators []tag.Mutator
3950
tracer trace.Tracer
51+
52+
logger *zap.Logger
53+
54+
useOtelForMetrics bool
55+
otelAttrs []attribute.KeyValue
56+
scrapedMetricsPoints syncint64.Counter
57+
erroredMetricsPoints syncint64.Counter
4058
}
4159

4260
// ScraperSettings are settings for creating a Scraper.
@@ -59,7 +77,45 @@ func MustNewScraper(cfg ScraperSettings) *Scraper {
5977
tag.Upsert(obsmetrics.TagKeyReceiver, cfg.ReceiverID.String(), tag.WithTTL(tag.TTLNoPropagation)),
6078
tag.Upsert(obsmetrics.TagKeyScraper, cfg.Scraper.String(), tag.WithTTL(tag.TTLNoPropagation))},
6179
tracer: cfg.ReceiverCreateSettings.TracerProvider.Tracer(cfg.Scraper.String()),
80+
81+
logger: cfg.ReceiverCreateSettings.Logger,
82+
useOtelForMetrics: registry.IsEnabled(obsreportconfig.UseOtelForInternalMetricsfeatureGateID),
83+
otelAttrs: []attribute.KeyValue{
84+
attribute.String(obsmetrics.ReceiverKey, cfg.ReceiverID.String()),
85+
attribute.String(obsmetrics.ScraperKey, cfg.Scraper.String()),
86+
},
87+
}
88+
89+
scraper.createOtelMetrics(cfg)
90+
return scraper
91+
}
92+
93+
func (s *Scraper) createOtelMetrics(cfg ScraperSettings) {
94+
if !s.useOtelForMetrics {
95+
return
96+
}
97+
meter := cfg.ReceiverCreateSettings.MeterProvider.Meter(scraperScope)
98+
99+
var err error
100+
handleError := func(metricName string, err error) {
101+
if err != nil {
102+
s.logger.Warn("failed to create otel instrument", zap.Error(err), zap.String("metric", metricName))
103+
}
62104
}
105+
106+
s.scrapedMetricsPoints, err = meter.SyncInt64().Counter(
107+
obsmetrics.ScraperPrefix+obsmetrics.ScrapedMetricPointsKey,
108+
instrument.WithDescription("Number of metric points successfully scraped."),
109+
instrument.WithUnit(unit.Dimensionless),
110+
)
111+
handleError(obsmetrics.ScraperPrefix+obsmetrics.ScrapedMetricPointsKey, err)
112+
113+
s.erroredMetricsPoints, err = meter.SyncInt64().Counter(
114+
obsmetrics.ScraperPrefix+obsmetrics.ErroredMetricPointsKey,
115+
instrument.WithDescription("Number of metric points that were unable to be scraped."),
116+
instrument.WithUnit(unit.Dimensionless),
117+
)
118+
handleError(obsmetrics.ScraperPrefix+obsmetrics.ErroredMetricPointsKey, err)
63119
}
64120

65121
// StartMetricsOp is called when a scrape operation is started. The
@@ -94,10 +150,7 @@ func (s *Scraper) EndMetricsOp(
94150
span := trace.SpanFromContext(scraperCtx)
95151

96152
if s.level != configtelemetry.LevelNone {
97-
stats.Record(
98-
scraperCtx,
99-
obsmetrics.ScraperScrapedMetricPoints.M(int64(numScrapedMetrics)),
100-
obsmetrics.ScraperErroredMetricPoints.M(int64(numErroredMetrics)))
153+
s.recordMetrics(scraperCtx, numScrapedMetrics, numErroredMetrics)
101154
}
102155

103156
// end span according to errors
@@ -112,3 +165,15 @@ func (s *Scraper) EndMetricsOp(
112165

113166
span.End()
114167
}
168+
169+
func (s *Scraper) recordMetrics(scraperCtx context.Context, numScrapedMetrics, numErroredMetrics int) {
170+
if s.useOtelForMetrics {
171+
s.scrapedMetricsPoints.Add(scraperCtx, int64(numScrapedMetrics), s.otelAttrs...)
172+
s.erroredMetricsPoints.Add(scraperCtx, int64(numErroredMetrics), s.otelAttrs...)
173+
} else { // OC for metrics
174+
stats.Record(
175+
scraperCtx,
176+
obsmetrics.ScraperScrapedMetricPoints.M(int64(numScrapedMetrics)),
177+
obsmetrics.ScraperErroredMetricPoints.M(int64(numErroredMetrics)))
178+
}
179+
}

obsreport/obsreport_test.go

Lines changed: 32 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -219,12 +219,9 @@ func TestReceiveMetricsOp(t *testing.T) {
219219
}
220220

221221
func TestScrapeMetricsDataOp(t *testing.T) {
222-
tt, err := obsreporttest.SetupTelemetry()
223-
require.NoError(t, err)
224-
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })
225-
226-
parentCtx, parentSpan := tt.TracerProvider.Tracer("test").Start(context.Background(), t.Name())
227-
defer parentSpan.End()
222+
testTelemetry(t, func(tt obsreporttest.TestTelemetry, registry *featuregate.Registry) {
223+
parentCtx, parentSpan := tt.TracerProvider.Tracer("test").Start(context.Background(), t.Name())
224+
defer parentSpan.End()
228225

229226
params := []testParams{
230227
{items: 23, err: partialErrFake},
@@ -242,38 +239,39 @@ func TestScrapeMetricsDataOp(t *testing.T) {
242239
scrp.EndMetricsOp(ctx, params[i].items, params[i].err)
243240
}
244241

245-
spans := tt.SpanRecorder.Ended()
246-
require.Equal(t, len(params), len(spans))
242+
spans := tt.SpanRecorder.Ended()
243+
require.Equal(t, len(params), len(spans))
247244

248-
var scrapedMetricPoints, erroredMetricPoints int
249-
for i, span := range spans {
250-
assert.Equal(t, "scraper/"+receiver.String()+"/"+scraper.String()+"/MetricsScraped", span.Name())
251-
switch {
252-
case params[i].err == nil:
253-
scrapedMetricPoints += params[i].items
254-
require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ScrapedMetricPointsKey, Value: attribute.Int64Value(int64(params[i].items))})
255-
require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ErroredMetricPointsKey, Value: attribute.Int64Value(0)})
256-
assert.Equal(t, codes.Unset, span.Status().Code)
257-
case errors.Is(params[i].err, errFake):
258-
erroredMetricPoints += params[i].items
259-
require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ScrapedMetricPointsKey, Value: attribute.Int64Value(0)})
260-
require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ErroredMetricPointsKey, Value: attribute.Int64Value(int64(params[i].items))})
261-
assert.Equal(t, codes.Error, span.Status().Code)
262-
assert.Equal(t, params[i].err.Error(), span.Status().Description)
245+
var scrapedMetricPoints, erroredMetricPoints int
246+
for i, span := range spans {
247+
assert.Equal(t, "scraper/"+receiver.String()+"/"+scraper.String()+"/MetricsScraped", span.Name())
248+
switch {
249+
case params[i].err == nil:
250+
scrapedMetricPoints += params[i].items
251+
require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ScrapedMetricPointsKey, Value: attribute.Int64Value(int64(params[i].items))})
252+
require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ErroredMetricPointsKey, Value: attribute.Int64Value(0)})
253+
assert.Equal(t, codes.Unset, span.Status().Code)
254+
case errors.Is(params[i].err, errFake):
255+
erroredMetricPoints += params[i].items
256+
require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ScrapedMetricPointsKey, Value: attribute.Int64Value(0)})
257+
require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ErroredMetricPointsKey, Value: attribute.Int64Value(int64(params[i].items))})
258+
assert.Equal(t, codes.Error, span.Status().Code)
259+
assert.Equal(t, params[i].err.Error(), span.Status().Description)
263260

264-
case errors.Is(params[i].err, partialErrFake):
265-
scrapedMetricPoints += params[i].items
266-
erroredMetricPoints++
267-
require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ScrapedMetricPointsKey, Value: attribute.Int64Value(int64(params[i].items))})
268-
require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ErroredMetricPointsKey, Value: attribute.Int64Value(1)})
269-
assert.Equal(t, codes.Error, span.Status().Code)
270-
assert.Equal(t, params[i].err.Error(), span.Status().Description)
271-
default:
272-
t.Fatalf("unexpected err param: %v", params[i].err)
261+
case errors.Is(params[i].err, partialErrFake):
262+
scrapedMetricPoints += params[i].items
263+
erroredMetricPoints++
264+
require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ScrapedMetricPointsKey, Value: attribute.Int64Value(int64(params[i].items))})
265+
require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ErroredMetricPointsKey, Value: attribute.Int64Value(1)})
266+
assert.Equal(t, codes.Error, span.Status().Code)
267+
assert.Equal(t, params[i].err.Error(), span.Status().Description)
268+
default:
269+
t.Fatalf("unexpected err param: %v", params[i].err)
270+
}
273271
}
274-
}
275272

276-
require.NoError(t, obsreporttest.CheckScraperMetrics(tt, receiver, scraper, int64(scrapedMetricPoints), int64(erroredMetricPoints)))
273+
require.NoError(t, obsreporttest.CheckScraperMetrics(tt, receiver, scraper, int64(scrapedMetricPoints), int64(erroredMetricPoints)))
274+
})
277275
}
278276

279277
func TestExportTraceDataOp(t *testing.T) {

obsreport/obsreporttest/obsreporttest.go

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -206,11 +206,8 @@ func CheckReceiverMetrics(tts TestTelemetry, receiver config.ComponentID, protoc
206206

207207
// CheckScraperMetrics checks that for the current exported values for metrics scraper metrics match given values.
208208
// When this function is called it is required to also call SetupTelemetry as first thing.
209-
func CheckScraperMetrics(_ TestTelemetry, receiver config.ComponentID, scraper config.ComponentID, scrapedMetricPoints, erroredMetricPoints int64) error {
210-
scraperTags := tagsForScraperView(receiver, scraper)
211-
return multierr.Combine(
212-
checkValueForView(scraperTags, scrapedMetricPoints, "scraper/scraped_metric_points"),
213-
checkValueForView(scraperTags, erroredMetricPoints, "scraper/errored_metric_points"))
209+
func CheckScraperMetrics(tts TestTelemetry, receiver config.ComponentID, scraper config.ComponentID, scrapedMetricPoints, erroredMetricPoints int64) error {
210+
return tts.otelPrometheusChecker.checkScraperMetrics(receiver, scraper, scrapedMetricPoints, erroredMetricPoints)
214211
}
215212

216213
// checkValueForView checks that for the current exported value in the view with the given name
@@ -238,14 +235,6 @@ func checkValueForView(wantTags []tag.Tag, value int64, vName string) error {
238235
return fmt.Errorf("[%s]: could not find tags, wantTags: %s in rows %v", vName, wantTags, rows)
239236
}
240237

241-
// tagsForScraperView returns the tags that are needed for the scraper views.
242-
func tagsForScraperView(receiver config.ComponentID, scraper config.ComponentID) []tag.Tag {
243-
return []tag.Tag{
244-
{Key: receiverTag, Value: receiver.String()},
245-
{Key: scraperTag, Value: scraper.String()},
246-
}
247-
}
248-
249238
// tagsForProcessorView returns the tags that are needed for the processor views.
250239
func tagsForProcessorView(processor config.ComponentID) []tag.Tag {
251240
return []tag.Tag{

obsreport/obsreporttest/obsreporttest_test.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,31 @@ const (
3232
)
3333

3434
var (
35+
scraper = config.NewComponentID("fakeScraper")
3536
receiver = config.NewComponentID("fakeReicever")
3637
exporter = config.NewComponentID("fakeExporter")
3738
)
3839

40+
func TestCheckScraperMetricsViews(t *testing.T) {
41+
tt, err := obsreporttest.SetupTelemetry()
42+
require.NoError(t, err)
43+
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })
44+
45+
s := obsreport.NewScraper(obsreport.ScraperSettings{
46+
ReceiverID: receiver,
47+
Scraper: scraper,
48+
ReceiverCreateSettings: tt.ToReceiverCreateSettings(),
49+
})
50+
ctx := s.StartMetricsOp(context.Background())
51+
require.NotNil(t, ctx)
52+
s.EndMetricsOp(ctx, 7, nil)
53+
54+
assert.NoError(t, obsreporttest.CheckScraperMetrics(tt, receiver, scraper, 7, 0))
55+
assert.Error(t, obsreporttest.CheckScraperMetrics(tt, receiver, scraper, 7, 7))
56+
assert.Error(t, obsreporttest.CheckScraperMetrics(tt, receiver, scraper, 0, 0))
57+
assert.Error(t, obsreporttest.CheckScraperMetrics(tt, receiver, scraper, 0, 7))
58+
}
59+
3960
func TestCheckReceiverTracesViews(t *testing.T) {
4061
tt, err := obsreporttest.SetupTelemetry()
4162
require.NoError(t, err)

obsreport/obsreporttest/otelprometheuschecker.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,13 @@ type prometheusChecker struct {
3434
promHandler http.Handler
3535
}
3636

37+
func (pc *prometheusChecker) checkScraperMetrics(receiver config.ComponentID, scraper config.ComponentID, scrapedMetricPoints, erroredMetricPoints int64) error{
38+
scraperAttrs := attributesForScraperMetrics(receiver, scraper)
39+
return multierr.Combine(
40+
pc.checkCounter("scraper_scraped_metric_points", scrapedMetricPoints, scraperAttrs),
41+
pc.checkCounter("scraper_errored_metric_points", erroredMetricPoints, scraperAttrs))
42+
}
43+
3744
func (pc *prometheusChecker) checkReceiverTraces(receiver config.ComponentID, protocol string, acceptedSpans, droppedSpans int64) error {
3845
receiverAttrs := attributesForReceiverMetrics(receiver, protocol)
3946
return multierr.Combine(
@@ -145,6 +152,12 @@ func fetchPrometheusMetrics(handler http.Handler) (map[string]*io_prometheus_cli
145152
return parser.TextToMetricFamilies(rr.Body)
146153
}
147154

155+
func attributesForScraperMetrics(receiver config.ComponentID, scraper config.ComponentID) []attribute.KeyValue {
156+
return []attribute.KeyValue{
157+
attribute.String(receiverTag.Name(), receiver.String()),
158+
attribute.String(scraperTag.Name(), scraper.String()),
159+
}
160+
}
148161
// attributesForReceiverMetrics returns the attributes that are needed for the receiver metrics.
149162
func attributesForReceiverMetrics(receiver config.ComponentID, transport string) []attribute.KeyValue {
150163
return []attribute.KeyValue{

0 commit comments

Comments
 (0)