Skip to content
This repository was archived by the owner on Dec 9, 2024. It is now read-only.

Commit 7d34ccd

Browse files
authored
refactor prometheus metrics (#347)
1 parent 0adf040 commit 7d34ccd

File tree

10 files changed

+119
-74
lines changed

10 files changed

+119
-74
lines changed

api/config.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,11 @@ import (
66
"time"
77

88
"github.com/julienschmidt/httprouter"
9-
"github.com/prometheus/client_golang/prometheus"
9+
"github.com/prometheus/client_golang/prometheus/promhttp"
1010

1111
"github.com/serverless/event-gateway/functions"
1212
"github.com/serverless/event-gateway/internal/httpapi"
1313
"github.com/serverless/event-gateway/internal/kv"
14-
"github.com/serverless/event-gateway/internal/metrics"
1514
"github.com/serverless/event-gateway/subscriptions"
1615
)
1716

@@ -37,11 +36,11 @@ func StartConfigAPI(config httpapi.Config) {
3736
subscriptionsAPI.RegisterRoutes(router)
3837

3938
router.GET("/v1/status", func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {})
40-
router.Handler("GET", "/metrics", prometheus.Handler())
39+
router.Handler("GET", "/metrics", promhttp.Handler())
4140

4241
handler := &http.Server{
4342
Addr: ":" + strconv.Itoa(int(config.Port)),
44-
Handler: metrics.HTTPLogger{Handler: router, RequestDuration: metrics.RequestDuration},
43+
Handler: metricsReporter{router},
4544
ReadTimeout: 3 * time.Second,
4645
WriteTimeout: 3 * time.Second,
4746
}

api/metrics.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package api
2+
3+
import (
4+
"net/http"
5+
"time"
6+
7+
"github.com/prometheus/client_golang/prometheus"
8+
)
9+
10+
func init() {
11+
prometheus.MustRegister(requestDuration)
12+
}
13+
14+
var requestDuration = prometheus.NewHistogram(
15+
prometheus.HistogramOpts{
16+
Namespace: "gateway",
17+
Subsystem: "config",
18+
Name: "request_duration_seconds",
19+
Help: "Bucketed histogram of request duration of config API requests",
20+
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 16),
21+
})
22+
23+
type metricsReporter struct {
24+
Handler http.Handler
25+
}
26+
27+
func (m metricsReporter) ServeHTTP(w http.ResponseWriter, r *http.Request) {
28+
start := time.Now()
29+
m.Handler.ServeHTTP(w, r)
30+
requestDuration.Observe(time.Since(start).Seconds())
31+
}

cmd/event-gateway/main.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"strings"
88
"time"
99

10-
"github.com/prometheus/client_golang/prometheus"
1110
"github.com/serverless/libkv"
1211
"github.com/serverless/libkv/store"
1312
etcd "github.com/serverless/libkv/store/etcd/v3"
@@ -18,7 +17,6 @@ import (
1817
"github.com/serverless/event-gateway/internal/cache"
1918
"github.com/serverless/event-gateway/internal/embedded"
2019
"github.com/serverless/event-gateway/internal/httpapi"
21-
"github.com/serverless/event-gateway/internal/metrics"
2220
"github.com/serverless/event-gateway/internal/sync"
2321
"github.com/serverless/event-gateway/plugin"
2422
"github.com/serverless/event-gateway/router"
@@ -28,9 +26,6 @@ var version = "dev"
2826

2927
func init() {
3028
etcd.Register()
31-
32-
prometheus.MustRegister(metrics.RequestDuration)
33-
prometheus.MustRegister(metrics.DroppedPubSubEvents)
3429
}
3530

3631
// nolint: gocyclo
@@ -88,7 +83,7 @@ func main() {
8883
}
8984

9085
targetCache := cache.NewTarget("/serverless-event-gateway", kv, log)
91-
router := router.New(targetCache, pluginManager, metrics.DroppedPubSubEvents, log)
86+
router := router.New(targetCache, pluginManager, log)
9287
router.StartWorkers()
9388

9489
api.StartEventsAPI(httpapi.Config{

internal/metrics/http_logger.go

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1 @@
11
package metrics
2-
3-
import (
4-
"net/http"
5-
"time"
6-
7-
"github.com/prometheus/client_golang/prometheus"
8-
)
9-
10-
// HTTPLogger logs HTTP requests and collects request related metrics
11-
type HTTPLogger struct {
12-
Handler http.Handler
13-
RequestDuration prometheus.Histogram
14-
}
15-
16-
func (l HTTPLogger) ServeHTTP(w http.ResponseWriter, r *http.Request) {
17-
start := time.Now()
18-
19-
l.Handler.ServeHTTP(w, r)
20-
21-
duration := time.Since(start)
22-
l.RequestDuration.Observe(float64(duration) / float64(time.Millisecond))
23-
}

internal/metrics/metrics.go

Lines changed: 0 additions & 23 deletions
This file was deleted.

router/event.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ func isHTTPEvent(r *http.Request) bool {
3939
return true
4040
}
4141

42-
type workEvent struct {
42+
type backlogEvent struct {
4343
path string
4444
event eventpkg.Event
4545
}

router/integration_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020
"github.com/serverless/event-gateway/internal/cache"
2121
"github.com/serverless/event-gateway/internal/embedded"
2222
"github.com/serverless/event-gateway/internal/kv"
23-
"github.com/serverless/event-gateway/internal/metrics"
2423
"github.com/serverless/event-gateway/internal/sync"
2524
"github.com/serverless/event-gateway/plugin"
2625
"github.com/serverless/event-gateway/subscriptions"
@@ -196,7 +195,7 @@ func get(url string) (int, http.Header, string) {
196195
func newTestRouterServer(kvstore store.Store, log *zap.Logger) (*Router, *httptest.Server) {
197196
targetCache := cache.NewTarget("/serverless-event-gateway", kvstore, log)
198197

199-
router := New(targetCache, plugin.NewManager([]string{}, log), metrics.DroppedPubSubEvents, log)
198+
router := New(targetCache, plugin.NewManager([]string{}, log), log)
200199
return router, httptest.NewServer(router)
201200
}
202201

router/metrics.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package router
2+
3+
import (
4+
"sync"
5+
"time"
6+
7+
"github.com/prometheus/client_golang/prometheus"
8+
)
9+
10+
func init() {
11+
prometheus.MustRegister(routerBacklog, routerDroppedEvents, routerProcessingDuration)
12+
}
13+
14+
// RouterDroppedEvents counts the number of times we need
15+
// to drop events instead of forwarding them in the router.
16+
// This should be alerted on in a monitoring system, and
17+
// trigger adding more capacity.
18+
var routerDroppedEvents = prometheus.NewCounter(
19+
prometheus.CounterOpts{
20+
Namespace: "gateway",
21+
Subsystem: "router",
22+
Name: "dropped_events_total",
23+
Help: "Dropped events due to insufficient processing power.",
24+
})
25+
26+
// RouterBacklog is a gauge of events count waiting to be
27+
// processed by the router.
28+
var routerBacklog = prometheus.NewGauge(
29+
prometheus.GaugeOpts{
30+
Namespace: "gateway",
31+
Subsystem: "router",
32+
Name: "backlog_events",
33+
Help: "Gauge of events count waiting to be processed by the router.",
34+
})
35+
36+
// ProcessingDuration is a bucketed histogram of processing
37+
// duration of an event in the router.
38+
var routerProcessingDuration = prometheus.NewHistogram(
39+
prometheus.HistogramOpts{
40+
Namespace: "gateway",
41+
Subsystem: "router",
42+
Name: "event_processing_seconds",
43+
Help: "Bucketed histogram of processing duration of an event in the router. " +
44+
"From receiving the event to calling a function.",
45+
Buckets: prometheus.ExponentialBuckets(0.00001, 2, 20),
46+
})
47+
48+
var receivedEventsMutex = sync.Mutex{}
49+
var receivedEvents = map[string]time.Time{}
50+
51+
func reportReceivedEvent(id string) {
52+
receivedEventsMutex.Lock()
53+
defer receivedEventsMutex.Unlock()
54+
receivedEvents[id] = time.Now()
55+
}
56+
57+
func reportProceededEvent(id string) {
58+
receivedEventsMutex.Lock()
59+
defer receivedEventsMutex.Unlock()
60+
if startTime, ok := receivedEvents[id]; ok {
61+
routerProcessingDuration.Observe(time.Since(startTime).Seconds())
62+
delete(receivedEvents, id)
63+
}
64+
}

router/router.go

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"sync"
1212
"time"
1313

14-
"github.com/prometheus/client_golang/prometheus"
1514
"github.com/rs/cors"
1615
"go.uber.org/zap"
1716

@@ -25,25 +24,23 @@ type Router struct {
2524
sync.Mutex
2625
targetCache Targeter
2726
plugins *plugin.Manager
28-
dropMetric prometheus.Counter
2927
log *zap.Logger
3028
workerNumber uint
3129
drain chan struct{}
3230
drainWaitGroup sync.WaitGroup
3331
active bool
34-
work chan workEvent
32+
backlog chan backlogEvent
3533
}
3634

3735
// New instantiates a new Router
38-
func New(targetCache Targeter, plugins *plugin.Manager, dropMetric prometheus.Counter, log *zap.Logger) *Router {
36+
func New(targetCache Targeter, plugins *plugin.Manager, log *zap.Logger) *Router {
3937
return &Router{
4038
targetCache: targetCache,
4139
plugins: plugins,
42-
dropMetric: dropMetric,
4340
log: log,
4441
workerNumber: 20,
4542
drain: make(chan struct{}),
46-
work: nil,
43+
backlog: nil,
4744
}
4845
}
4946

@@ -81,6 +78,7 @@ func (router *Router) ServeHTTP(w http.ResponseWriter, r *http.Request) {
8178
if event.Type == eventpkg.TypeInvoke {
8279
router.handleInvokeEvent(event, w, r)
8380
} else if !event.IsSystem() {
81+
reportReceivedEvent(event.ID)
8482
router.enqueueWork(path, event)
8583
w.WriteHeader(http.StatusAccepted)
8684
}
@@ -100,8 +98,8 @@ func (router *Router) StartWorkers() {
10098
}
10199
router.active = true
102100

103-
if router.work == nil {
104-
router.work = make(chan workEvent, router.workerNumber*2)
101+
if router.backlog == nil {
102+
router.backlog = make(chan backlogEvent, router.workerNumber*2)
105103
}
106104

107105
for i := 0; i < int(router.workerNumber); i++ {
@@ -326,13 +324,14 @@ func (router *Router) enqueueWork(path string, event *eventpkg.Event) {
326324
}
327325

328326
select {
329-
case router.work <- workEvent{
327+
case router.backlog <- backlogEvent{
330328
path: path,
331329
event: *event,
332330
}:
331+
routerBacklog.Inc()
333332
default:
334333
// We could not submit any work, this is NOT good but we will sacrifice consistency for availability for now.
335-
router.dropMetric.Inc()
334+
routerDroppedEvents.Inc()
336335
}
337336
}
338337

@@ -397,7 +396,8 @@ func (router *Router) loop() {
397396

398397
// 1. see if there's work in a non-blocking way
399398
select {
400-
case e := <-router.work:
399+
case e := <-router.backlog:
400+
routerBacklog.Dec()
401401
router.processEvent(e)
402402
continue
403403
default:
@@ -411,7 +411,8 @@ func (router *Router) loop() {
411411
// without this, there is a race condition
412412
// where we exit before work is processed.
413413
select {
414-
case e := <-router.work:
414+
case e := <-router.backlog:
415+
routerBacklog.Dec()
415416
router.processEvent(e)
416417
continue
417418
default:
@@ -420,15 +421,17 @@ func (router *Router) loop() {
420421
// no more work to do, decrement WaitGroup and return
421422
router.drainWaitGroup.Done()
422423
return
423-
case e := <-router.work:
424+
case e := <-router.backlog:
425+
routerBacklog.Dec()
424426
router.processEvent(e)
425427
}
426428
}
427429
}
428430

429431
// processEvent call all functions subscribed for an event
430-
func (router *Router) processEvent(e workEvent) {
432+
func (router *Router) processEvent(e backlogEvent) {
431433
subscribers := router.targetCache.SubscribersOfEvent(e.path, e.event.Type)
434+
reportProceededEvent(e.event.ID)
432435
for _, subscriber := range subscribers {
433436
router.callFunction(subscriber, e.event)
434437
}

router/router_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import (
1212
"github.com/serverless/event-gateway/event"
1313
"github.com/serverless/event-gateway/functions"
1414
"github.com/serverless/event-gateway/internal/cors"
15-
"github.com/serverless/event-gateway/internal/metrics"
1615
"github.com/serverless/event-gateway/internal/pathtree"
1716
"github.com/serverless/event-gateway/plugin"
1817
"github.com/serverless/event-gateway/router/mock"
@@ -145,7 +144,7 @@ func TestRouterServeHTTP_AllowCORSPreflightForCustomEvents(t *testing.T) {
145144
func testrouter(target Targeter) *Router {
146145
log := zap.NewNop()
147146
plugins := plugin.NewManager([]string{}, log)
148-
router := New(target, plugins, metrics.DroppedPubSubEvents, log)
147+
router := New(target, plugins, log)
149148
router.StartWorkers()
150149
return router
151150
}

0 commit comments

Comments
 (0)