Skip to content

Commit 28d421b

Browse files
committed
Instrument obsreport.Scraper (#19)
* instrument obsreport.scraper metrics with otel go
1 parent 2257952 commit 28d421b

File tree

6 files changed

+172
-73
lines changed

6 files changed

+172
-73
lines changed

internal/obsreportconfig/obsreportconfig.go

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -71,12 +71,7 @@ func allViews() []*view.View {
7171
views = append(views, receiverViews()...)
7272

7373
// Scraper views.
74-
measures = []*stats.Int64Measure{
75-
obsmetrics.ScraperScrapedMetricPoints,
76-
obsmetrics.ScraperErroredMetricPoints,
77-
}
78-
tagKeys = []tag.Key{obsmetrics.TagKeyReceiver, obsmetrics.TagKeyScraper}
79-
views = append(views, genViews(measures, tagKeys, view.Sum())...)
74+
views = append(views, scraperViews()...)
8075

8176
// Exporter views.
8277
measures = []*stats.Int64Measure{
@@ -136,6 +131,20 @@ func receiverViews() []*view.View {
136131
return genViews(measures, tagKeys, view.Sum())
137132
}
138133

134+
func scraperViews() []*view.View {
135+
if featuregate.GetRegistry().IsEnabled(UseOtelForInternalMetricsfeatureGateID) {
136+
return nil
137+
}
138+
139+
measures := []*stats.Int64Measure{
140+
obsmetrics.ScraperScrapedMetricPoints,
141+
obsmetrics.ScraperErroredMetricPoints,
142+
}
143+
tagKeys := []tag.Key{obsmetrics.TagKeyReceiver, obsmetrics.TagKeyScraper}
144+
145+
return genViews(measures, tagKeys, view.Sum())
146+
}
147+
139148
func genViews(
140149
measures []*stats.Int64Measure,
141150
tagKeys []tag.Key,

obsreport/obsreport_scraper.go

Lines changed: 74 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,22 +21,40 @@ import (
2121
"go.opencensus.io/stats"
2222
"go.opencensus.io/tag"
2323
"go.opentelemetry.io/otel/attribute"
24+
"go.opentelemetry.io/otel/metric/instrument"
25+
"go.opentelemetry.io/otel/metric/instrument/syncint64"
26+
"go.opentelemetry.io/otel/metric/unit"
2427
"go.opentelemetry.io/otel/trace"
28+
"go.uber.org/zap"
2529

2630
"go.opentelemetry.io/collector/component"
2731
"go.opentelemetry.io/collector/config"
2832
"go.opentelemetry.io/collector/config/configtelemetry"
33+
"go.opentelemetry.io/collector/featuregate"
34+
"go.opentelemetry.io/collector/internal/obsreportconfig"
2935
"go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics"
3036
"go.opentelemetry.io/collector/receiver/scrapererror"
3137
)
3238

39+
var (
40+
scraperName = "scraper"
41+
scraperScope = scopeName + nameSep + scraperName
42+
)
43+
3344
// Scraper is a helper to add observability to a component.Scraper.
3445
type Scraper struct {
3546
level configtelemetry.Level
3647
receiverID config.ComponentID
3748
scraper config.ComponentID
3849
mutators []tag.Mutator
3950
tracer trace.Tracer
51+
52+
logger *zap.Logger
53+
54+
useOtelForMetrics bool
55+
otelAttrs []attribute.KeyValue
56+
scrapedMetricsPoints syncint64.Counter
57+
erroredMetricsPoints syncint64.Counter
4058
}
4159

4260
// ScraperSettings are settings for creating a Scraper.
@@ -48,15 +66,57 @@ type ScraperSettings struct {
4866

4967
// NewScraper creates a new Scraper.
5068
func NewScraper(cfg ScraperSettings) *Scraper {
51-
return &Scraper{
69+
return newScraper(cfg, featuregate.GetRegistry())
70+
}
71+
72+
func newScraper(cfg ScraperSettings, registry *featuregate.Registry) *Scraper {
73+
scraper := &Scraper{
5274
level: cfg.ReceiverCreateSettings.TelemetrySettings.MetricsLevel,
5375
receiverID: cfg.ReceiverID,
5476
scraper: cfg.Scraper,
5577
mutators: []tag.Mutator{
5678
tag.Upsert(obsmetrics.TagKeyReceiver, cfg.ReceiverID.String(), tag.WithTTL(tag.TTLNoPropagation)),
5779
tag.Upsert(obsmetrics.TagKeyScraper, cfg.Scraper.String(), tag.WithTTL(tag.TTLNoPropagation))},
5880
tracer: cfg.ReceiverCreateSettings.TracerProvider.Tracer(cfg.Scraper.String()),
81+
82+
logger: cfg.ReceiverCreateSettings.Logger,
83+
useOtelForMetrics: registry.IsEnabled(obsreportconfig.UseOtelForInternalMetricsfeatureGateID),
84+
otelAttrs: []attribute.KeyValue{
85+
attribute.String(obsmetrics.ReceiverKey, cfg.ReceiverID.String()),
86+
attribute.String(obsmetrics.ScraperKey, cfg.Scraper.String()),
87+
},
88+
}
89+
90+
scraper.createOtelMetrics(cfg)
91+
return scraper
92+
}
93+
94+
func (s *Scraper) createOtelMetrics(cfg ScraperSettings) {
95+
if !s.useOtelForMetrics {
96+
return
97+
}
98+
meter := cfg.ReceiverCreateSettings.MeterProvider.Meter(scraperScope)
99+
100+
var err error
101+
handleError := func(metricName string, err error) {
102+
if err != nil {
103+
s.logger.Warn("failed to create otel instrument", zap.Error(err), zap.String("metric", metricName))
104+
}
59105
}
106+
107+
s.scrapedMetricsPoints, err = meter.SyncInt64().Counter(
108+
obsmetrics.ScraperPrefix+obsmetrics.ScrapedMetricPointsKey,
109+
instrument.WithDescription("Number of metric points successfully scraped."),
110+
instrument.WithUnit(unit.Dimensionless),
111+
)
112+
handleError(obsmetrics.ScraperPrefix+obsmetrics.ScrapedMetricPointsKey, err)
113+
114+
s.erroredMetricsPoints, err = meter.SyncInt64().Counter(
115+
obsmetrics.ScraperPrefix+obsmetrics.ErroredMetricPointsKey,
116+
instrument.WithDescription("Number of metric points that were unable to be scraped."),
117+
instrument.WithUnit(unit.Dimensionless),
118+
)
119+
handleError(obsmetrics.ScraperPrefix+obsmetrics.ErroredMetricPointsKey, err)
60120
}
61121

62122
// StartMetricsOp is called when a scrape operation is started. The
@@ -91,10 +151,7 @@ func (s *Scraper) EndMetricsOp(
91151
span := trace.SpanFromContext(scraperCtx)
92152

93153
if s.level != configtelemetry.LevelNone {
94-
stats.Record(
95-
scraperCtx,
96-
obsmetrics.ScraperScrapedMetricPoints.M(int64(numScrapedMetrics)),
97-
obsmetrics.ScraperErroredMetricPoints.M(int64(numErroredMetrics)))
154+
s.recordMetrics(scraperCtx, numScrapedMetrics, numErroredMetrics)
98155
}
99156

100157
// end span according to errors
@@ -109,3 +166,15 @@ func (s *Scraper) EndMetricsOp(
109166

110167
span.End()
111168
}
169+
170+
func (s *Scraper) recordMetrics(scraperCtx context.Context, numScrapedMetrics, numErroredMetrics int) {
171+
if s.useOtelForMetrics {
172+
s.scrapedMetricsPoints.Add(scraperCtx, int64(numScrapedMetrics), s.otelAttrs...)
173+
s.erroredMetricsPoints.Add(scraperCtx, int64(numErroredMetrics), s.otelAttrs...)
174+
} else { // OC for metrics
175+
stats.Record(
176+
scraperCtx,
177+
obsmetrics.ScraperScrapedMetricPoints.M(int64(numScrapedMetrics)),
178+
obsmetrics.ScraperErroredMetricPoints.M(int64(numErroredMetrics)))
179+
}
180+
}

obsreport/obsreport_test.go

Lines changed: 47 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -214,61 +214,59 @@ func TestReceiveMetricsOp(t *testing.T) {
214214
}
215215

216216
func TestScrapeMetricsDataOp(t *testing.T) {
217-
tt, err := obsreporttest.SetupTelemetry()
218-
require.NoError(t, err)
219-
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })
220-
221-
parentCtx, parentSpan := tt.TracerProvider.Tracer("test").Start(context.Background(), t.Name())
222-
defer parentSpan.End()
217+
testTelemetry(t, func(tt obsreporttest.TestTelemetry, registry *featuregate.Registry) {
218+
parentCtx, parentSpan := tt.TracerProvider.Tracer("test").Start(context.Background(), t.Name())
219+
defer parentSpan.End()
223220

224-
params := []testParams{
225-
{items: 23, err: partialErrFake},
226-
{items: 29, err: errFake},
227-
{items: 15, err: nil},
228-
}
229-
for i := range params {
230-
scrp := NewScraper(ScraperSettings{
231-
ReceiverID: receiver,
232-
Scraper: scraper,
233-
ReceiverCreateSettings: tt.ToReceiverCreateSettings(),
234-
})
235-
ctx := scrp.StartMetricsOp(parentCtx)
236-
assert.NotNil(t, ctx)
237-
scrp.EndMetricsOp(ctx, params[i].items, params[i].err)
238-
}
221+
params := []testParams{
222+
{items: 23, err: partialErrFake},
223+
{items: 29, err: errFake},
224+
{items: 15, err: nil},
225+
}
226+
for i := range params {
227+
scrp := newScraper(ScraperSettings{
228+
ReceiverID: receiver,
229+
Scraper: scraper,
230+
ReceiverCreateSettings: tt.ToReceiverCreateSettings(),
231+
}, registry)
232+
ctx := scrp.StartMetricsOp(parentCtx)
233+
assert.NotNil(t, ctx)
234+
scrp.EndMetricsOp(ctx, params[i].items, params[i].err)
235+
}
239236

240-
spans := tt.SpanRecorder.Ended()
241-
require.Equal(t, len(params), len(spans))
237+
spans := tt.SpanRecorder.Ended()
238+
require.Equal(t, len(params), len(spans))
242239

243-
var scrapedMetricPoints, erroredMetricPoints int
244-
for i, span := range spans {
245-
assert.Equal(t, "scraper/"+receiver.String()+"/"+scraper.String()+"/MetricsScraped", span.Name())
246-
switch {
247-
case params[i].err == nil:
248-
scrapedMetricPoints += params[i].items
249-
require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ScrapedMetricPointsKey, Value: attribute.Int64Value(int64(params[i].items))})
250-
require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ErroredMetricPointsKey, Value: attribute.Int64Value(0)})
251-
assert.Equal(t, codes.Unset, span.Status().Code)
252-
case errors.Is(params[i].err, errFake):
253-
erroredMetricPoints += params[i].items
254-
require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ScrapedMetricPointsKey, Value: attribute.Int64Value(0)})
255-
require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ErroredMetricPointsKey, Value: attribute.Int64Value(int64(params[i].items))})
256-
assert.Equal(t, codes.Error, span.Status().Code)
257-
assert.Equal(t, params[i].err.Error(), span.Status().Description)
240+
var scrapedMetricPoints, erroredMetricPoints int
241+
for i, span := range spans {
242+
assert.Equal(t, "scraper/"+receiver.String()+"/"+scraper.String()+"/MetricsScraped", span.Name())
243+
switch {
244+
case params[i].err == nil:
245+
scrapedMetricPoints += params[i].items
246+
require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ScrapedMetricPointsKey, Value: attribute.Int64Value(int64(params[i].items))})
247+
require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ErroredMetricPointsKey, Value: attribute.Int64Value(0)})
248+
assert.Equal(t, codes.Unset, span.Status().Code)
249+
case errors.Is(params[i].err, errFake):
250+
erroredMetricPoints += params[i].items
251+
require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ScrapedMetricPointsKey, Value: attribute.Int64Value(0)})
252+
require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ErroredMetricPointsKey, Value: attribute.Int64Value(int64(params[i].items))})
253+
assert.Equal(t, codes.Error, span.Status().Code)
254+
assert.Equal(t, params[i].err.Error(), span.Status().Description)
258255

259-
case errors.Is(params[i].err, partialErrFake):
260-
scrapedMetricPoints += params[i].items
261-
erroredMetricPoints++
262-
require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ScrapedMetricPointsKey, Value: attribute.Int64Value(int64(params[i].items))})
263-
require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ErroredMetricPointsKey, Value: attribute.Int64Value(1)})
264-
assert.Equal(t, codes.Error, span.Status().Code)
265-
assert.Equal(t, params[i].err.Error(), span.Status().Description)
266-
default:
267-
t.Fatalf("unexpected err param: %v", params[i].err)
256+
case errors.Is(params[i].err, partialErrFake):
257+
scrapedMetricPoints += params[i].items
258+
erroredMetricPoints++
259+
require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ScrapedMetricPointsKey, Value: attribute.Int64Value(int64(params[i].items))})
260+
require.Contains(t, span.Attributes(), attribute.KeyValue{Key: obsmetrics.ErroredMetricPointsKey, Value: attribute.Int64Value(1)})
261+
assert.Equal(t, codes.Error, span.Status().Code)
262+
assert.Equal(t, params[i].err.Error(), span.Status().Description)
263+
default:
264+
t.Fatalf("unexpected err param: %v", params[i].err)
265+
}
268266
}
269-
}
270267

271-
require.NoError(t, obsreporttest.CheckScraperMetrics(tt, receiver, scraper, int64(scrapedMetricPoints), int64(erroredMetricPoints)))
268+
require.NoError(t, obsreporttest.CheckScraperMetrics(tt, receiver, scraper, int64(scrapedMetricPoints), int64(erroredMetricPoints)))
269+
})
272270
}
273271

274272
func TestExportTraceDataOp(t *testing.T) {

obsreport/obsreporttest/obsreporttest.go

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -206,11 +206,8 @@ func CheckReceiverMetrics(tts TestTelemetry, receiver config.ComponentID, protoc
206206

207207
// CheckScraperMetrics checks that for the current exported values for metrics scraper metrics match given values.
208208
// When this function is called it is required to also call SetupTelemetry as first thing.
209-
func CheckScraperMetrics(_ TestTelemetry, receiver config.ComponentID, scraper config.ComponentID, scrapedMetricPoints, erroredMetricPoints int64) error {
210-
scraperTags := tagsForScraperView(receiver, scraper)
211-
return multierr.Combine(
212-
checkValueForView(scraperTags, scrapedMetricPoints, "scraper/scraped_metric_points"),
213-
checkValueForView(scraperTags, erroredMetricPoints, "scraper/errored_metric_points"))
209+
func CheckScraperMetrics(tts TestTelemetry, receiver config.ComponentID, scraper config.ComponentID, scrapedMetricPoints, erroredMetricPoints int64) error {
210+
return tts.otelPrometheusChecker.checkScraperMetrics(receiver, scraper, scrapedMetricPoints, erroredMetricPoints)
214211
}
215212

216213
// checkValueForView checks that for the current exported value in the view with the given name
@@ -238,14 +235,6 @@ func checkValueForView(wantTags []tag.Tag, value int64, vName string) error {
238235
return fmt.Errorf("[%s]: could not find tags, wantTags: %s in rows %v", vName, wantTags, rows)
239236
}
240237

241-
// tagsForScraperView returns the tags that are needed for the scraper views.
242-
func tagsForScraperView(receiver config.ComponentID, scraper config.ComponentID) []tag.Tag {
243-
return []tag.Tag{
244-
{Key: receiverTag, Value: receiver.String()},
245-
{Key: scraperTag, Value: scraper.String()},
246-
}
247-
}
248-
249238
// tagsForProcessorView returns the tags that are needed for the processor views.
250239
func tagsForProcessorView(processor config.ComponentID) []tag.Tag {
251240
return []tag.Tag{

obsreport/obsreporttest/obsreporttest_test.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,31 @@ const (
3232
)
3333

3434
var (
35+
scraper = config.NewComponentID("fakeScraper")
3536
receiver = config.NewComponentID("fakeReicever")
3637
exporter = config.NewComponentID("fakeExporter")
3738
)
3839

40+
func TestCheckScraperMetricsViews(t *testing.T) {
41+
tt, err := obsreporttest.SetupTelemetry()
42+
require.NoError(t, err)
43+
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })
44+
45+
s := obsreport.NewScraper(obsreport.ScraperSettings{
46+
ReceiverID: receiver,
47+
Scraper: scraper,
48+
ReceiverCreateSettings: tt.ToReceiverCreateSettings(),
49+
})
50+
ctx := s.StartMetricsOp(context.Background())
51+
require.NotNil(t, ctx)
52+
s.EndMetricsOp(ctx, 7, nil)
53+
54+
assert.NoError(t, obsreporttest.CheckScraperMetrics(tt, receiver, scraper, 7, 0))
55+
assert.Error(t, obsreporttest.CheckScraperMetrics(tt, receiver, scraper, 7, 7))
56+
assert.Error(t, obsreporttest.CheckScraperMetrics(tt, receiver, scraper, 0, 0))
57+
assert.Error(t, obsreporttest.CheckScraperMetrics(tt, receiver, scraper, 0, 7))
58+
}
59+
3960
func TestCheckReceiverTracesViews(t *testing.T) {
4061
tt, err := obsreporttest.SetupTelemetry()
4162
require.NoError(t, err)

obsreport/obsreporttest/otelprometheuschecker.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,13 @@ type prometheusChecker struct {
3434
promHandler http.Handler
3535
}
3636

37+
func (pc *prometheusChecker) checkScraperMetrics(receiver config.ComponentID, scraper config.ComponentID, scrapedMetricPoints, erroredMetricPoints int64) error{
38+
scraperAttrs := attributesForScraperMetrics(receiver, scraper)
39+
return multierr.Combine(
40+
pc.checkCounter("scraper_scraped_metric_points", scrapedMetricPoints, scraperAttrs),
41+
pc.checkCounter("scraper_errored_metric_points", erroredMetricPoints, scraperAttrs))
42+
}
43+
3744
func (pc *prometheusChecker) checkReceiverTraces(receiver config.ComponentID, protocol string, acceptedSpans, droppedSpans int64) error {
3845
receiverAttrs := attributesForReceiverMetrics(receiver, protocol)
3946
return multierr.Combine(
@@ -145,6 +152,12 @@ func fetchPrometheusMetrics(handler http.Handler) (map[string]*io_prometheus_cli
145152
return parser.TextToMetricFamilies(rr.Body)
146153
}
147154

155+
func attributesForScraperMetrics(receiver config.ComponentID, scraper config.ComponentID) []attribute.KeyValue {
156+
return []attribute.KeyValue{
157+
attribute.String(receiverTag.Name(), receiver.String()),
158+
attribute.String(scraperTag.Name(), scraper.String()),
159+
}
160+
}
148161
// attributesForReceiverMetrics returns the attributes that are needed for the receiver metrics.
149162
func attributesForReceiverMetrics(receiver config.ComponentID, transport string) []attribute.KeyValue {
150163
return []attribute.KeyValue{

0 commit comments

Comments
 (0)