Skip to content

Commit 9d6acf2

Browse files
committed
Add services test cases
Signed-off-by: Hiroshi Hatake <[email protected]>
1 parent 3814f70 commit 9d6acf2

File tree

3 files changed

+240
-13
lines changed

3 files changed

+240
-13
lines changed

custom_jaeger_remote.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ type jaegerRemotePlugin struct {
3434
config *Config
3535
server *serverComponent // Server-related state
3636
clientTracer *clientComponent // Client-related state
37+
38+
// newSamplerFn allows injecting a mock sampler factory for testing.
39+
newSamplerFn func(context.Context, *Config) (*remoteSampler, error)
3740
}
3841

3942
type serverComponent struct {
@@ -103,20 +106,21 @@ func (plug *jaegerRemotePlugin) Init(ctx context.Context, fbit *plugin.Fluentbit
103106
}
104107
plug.config = cfg
105108

106-
// Run client mode
109+
// Default to the real sampler factory if none is injected for tests.
110+
if plug.newSamplerFn == nil {
111+
plug.newSamplerFn = newRemoteSampler
112+
}
113+
107114
if cfg.Mode == "client" || cfg.Mode == "all" {
108115
if err := plug.initClient(ctx); err != nil {
109116
return fmt.Errorf("failed to initialize client mode: %w", err)
110117
}
111118
}
112-
113-
// Run server mode
114119
if cfg.Mode == "server" || cfg.Mode == "all" {
115120
if err := plug.initServer(ctx); err != nil {
116121
return fmt.Errorf("failed to initialize server mode: %w", err)
117122
}
118123
}
119-
120124
plug.log.Info("plugin initialized successfully in mode: '%s'", cfg.Mode)
121125
return nil
122126
}

jaeger_services.go

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -66,23 +66,17 @@ func (plug *jaegerRemotePlugin) initClient(ctx context.Context) error {
6666
return nil
6767
}
6868

69-
//
7069
// --- Server Mode Initialization ---
71-
//
72-
7370
func (plug *jaegerRemotePlugin) initServer(ctx context.Context) error {
7471
plug.log.Info("initializing server mode...")
75-
7672
plug.server = &serverComponent{}
77-
sampler, err := newRemoteSampler(ctx, plug.config)
73+
sampler, err := plug.newSamplerFn(ctx, plug.config)
7874
if err != nil {
7975
return fmt.Errorf("could not create remote sampler for server: %w", err)
8076
}
8177
plug.server.sampler = sampler
8278
plug.server.cache = &samplingStrategyCache{strategies: make(map[string]*api_v2.SamplingStrategyResponse)}
8379

84-
go plug.pollStrategiesWithRetry(ctx)
85-
8680
if plug.config.ServerHttpListenAddr != "" {
8781
plug.server.httpServer = plug.startHttpServer()
8882
}
@@ -93,7 +87,7 @@ func (plug *jaegerRemotePlugin) initServer(ctx context.Context) error {
9387
if plug.server.httpServer != nil {
9488
shutdownCtx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
9589
defer cancel()
96-
plug.server.httpServer.Shutdown(shutdownCtx)
90+
_ = plug.server.httpServer.Shutdown(shutdownCtx)
9791
}
9892
return err
9993
}
@@ -103,6 +97,8 @@ func (plug *jaegerRemotePlugin) initServer(ctx context.Context) error {
10397
return errors.New("server mode is enabled, but neither 'server.http.listen_addr' nor 'server.grpc.listen_addr' are configured")
10498
}
10599

100+
// All checks passed, now start background goroutines.
101+
go plug.pollStrategiesWithRetry(ctx)
106102
go func() {
107103
<-ctx.Done()
108104
plug.log.Info("shutting down server components...")
@@ -111,7 +107,7 @@ func (plug *jaegerRemotePlugin) initServer(ctx context.Context) error {
111107
plug.log.Info("gRPC server stopped.")
112108
}
113109
if sampler.conn != nil {
114-
sampler.conn.Close()
110+
_ = sampler.conn.Close()
115111
plug.log.Info("gRPC client connection to Jaeger Collector closed.")
116112
}
117113
if plug.server.httpServer != nil {

jaeger_services_test.go

Lines changed: 227 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,227 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"net"
8+
"net/http"
9+
"net/http/httptest"
10+
"strings"
11+
"sync"
12+
"testing"
13+
"time"
14+
15+
"github.com/alecthomas/assert/v2"
16+
"github.com/calyptia/plugin"
17+
api_v2 "github.com/jaegertracing/jaeger-idl/proto-gen/api_v2"
18+
"google.golang.org/grpc"
19+
"google.golang.org/grpc/credentials/insecure"
20+
"google.golang.org/grpc/metadata"
21+
"google.golang.org/grpc/test/bufconn"
22+
)
23+
24+
type observedCall struct {
25+
Context context.Context
26+
Params *api_v2.SamplingStrategyParameters
27+
}
28+
29+
type samplingServer struct {
30+
api_v2.UnimplementedSamplingManagerServer
31+
mu sync.Mutex
32+
observedCalls []observedCall
33+
strategy *api_v2.SamplingStrategyResponse
34+
}
35+
36+
func (s *samplingServer) GetSamplingStrategy(ctx context.Context, params *api_v2.SamplingStrategyParameters) (*api_v2.SamplingStrategyResponse, error) {
37+
s.mu.Lock()
38+
defer s.mu.Unlock()
39+
s.observedCalls = append(s.observedCalls, observedCall{Context: ctx, Params: params})
40+
return s.strategy, nil
41+
}
42+
43+
func (s *samplingServer) callCount() int {
44+
s.mu.Lock()
45+
defer s.mu.Unlock()
46+
return len(s.observedCalls)
47+
}
48+
func (s *samplingServer) lastCall() *observedCall {
49+
s.mu.Lock()
50+
defer s.mu.Unlock()
51+
if len(s.observedCalls) == 0 {
52+
return nil
53+
}
54+
return &s.observedCalls[len(s.observedCalls)-1]
55+
}
56+
57+
func startMockGrpcServer(t *testing.T, mock *samplingServer) (*grpc.Server, *bufconn.Listener) {
58+
t.Helper()
59+
lis := bufconn.Listen(1024 * 1024)
60+
s := grpc.NewServer()
61+
api_v2.RegisterSamplingManagerServer(s, mock)
62+
go func() {
63+
if err := s.Serve(lis); err != nil {
64+
t.Logf("Mock gRPC server stopped: %v", err)
65+
}
66+
}()
67+
return s, lis
68+
}
69+
70+
func startMockHTTPSamplingServer(t *testing.T, strategy *api_v2.SamplingStrategyResponse) *httptest.Server {
71+
t.Helper()
72+
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
73+
w.Header().Set("Content-Type", "application/json")
74+
err := json.NewEncoder(w).Encode(strategy)
75+
assert.NoError(t, err)
76+
}))
77+
}
78+
79+
func Test_InitClient(t *testing.T) {
80+
t.Run("successfully initializes in client mode", func(t *testing.T) {
81+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
82+
defer cancel()
83+
84+
mockStrategy := &api_v2.SamplingStrategyResponse{
85+
StrategyType: api_v2.SamplingStrategyType_PROBABILISTIC,
86+
ProbabilisticSampling: &api_v2.ProbabilisticSamplingStrategy{SamplingRate: 0.1},
87+
}
88+
mockSamplingSrv := startMockHTTPSamplingServer(t, mockStrategy)
89+
defer mockSamplingSrv.Close()
90+
91+
mockOtlpSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
92+
w.WriteHeader(http.StatusOK)
93+
}))
94+
defer mockOtlpSrv.Close()
95+
96+
fbit := &plugin.Fluentbit{
97+
Logger: newTestLogger(t),
98+
Conf: mapConfigLoader{
99+
"mode": "client",
100+
"client.server_url": mockOtlpSrv.URL,
101+
"client.sampling_url": mockSamplingSrv.URL,
102+
},
103+
}
104+
plug := &jaegerRemotePlugin{}
105+
106+
err := plug.Init(ctx, fbit)
107+
108+
assert.NoError(t, err)
109+
assert.NotZero(t, plug.clientTracer, "clientTracer should be initialized")
110+
assert.NotZero(t, plug.clientTracer.tracerProvider, "tracerProvider should be initialized")
111+
})
112+
}
113+
114+
func Test_InitServer_EndToEnd(t *testing.T) {
115+
testCases := []struct {
116+
name string
117+
configHeaders string
118+
expectedHeaders map[string]string
119+
}{
120+
{"no headers", "", nil},
121+
{
122+
"with headers",
123+
"x-custom-header=fluent-bit,authorization=Bearer 12345",
124+
map[string]string{"x-custom-header": "fluent-bit", "authorization": "Bearer 12345"},
125+
},
126+
}
127+
128+
for _, tc := range testCases {
129+
t.Run(tc.name, func(t *testing.T) {
130+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
131+
defer cancel()
132+
133+
mockStrategy := &api_v2.SamplingStrategyResponse{StrategyType: api_v2.SamplingStrategyType_RATE_LIMITING, RateLimitingSampling: &api_v2.RateLimitingSamplingStrategy{MaxTracesPerSecond: 100}}
134+
mockJaeger := &samplingServer{strategy: mockStrategy}
135+
upstreamJaegerServer, lis := startMockGrpcServer(t, mockJaeger)
136+
defer upstreamJaegerServer.Stop()
137+
138+
fbit := &plugin.Fluentbit{
139+
Logger: newTestLogger(t),
140+
Conf: mapConfigLoader{
141+
"mode": "server",
142+
"server.endpoint": "bufnet",
143+
"server.http.listen_addr": getFreePort(t),
144+
"server.service_names": "test-service",
145+
"server.retry.initial_interval": "10ms",
146+
"server.headers": tc.configHeaders,
147+
},
148+
}
149+
plug := &jaegerRemotePlugin{}
150+
151+
plug.newSamplerFn = func(ctx context.Context, cfg *Config) (*remoteSampler, error) {
152+
dialOpts := []grpc.DialOption{
153+
grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) { return lis.Dial() }),
154+
grpc.WithTransportCredentials(insecure.NewCredentials()),
155+
}
156+
if len(cfg.ServerHeaders) > 0 {
157+
dialOpts = append(dialOpts, grpc.WithUnaryInterceptor(func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
158+
return invoker(metadata.NewOutgoingContext(ctx, metadata.New(cfg.ServerHeaders)), method, req, reply, cc, opts...)
159+
}))
160+
}
161+
conn, err := grpc.DialContext(ctx, cfg.ServerEndpoint, dialOpts...)
162+
if err != nil {
163+
return nil, err
164+
}
165+
client := api_v2.NewSamplingManagerClient(conn)
166+
return &remoteSampler{conn: conn, client: client}, nil
167+
}
168+
169+
err := plug.Init(ctx, fbit)
170+
assert.NoError(t, err)
171+
assert.NotZero(t, plug.server)
172+
if plug.server.httpServer != nil {
173+
defer plug.server.httpServer.Close()
174+
}
175+
176+
lastCall := mockJaeger.lastCall()
177+
assert.NotZero(t, lastCall)
178+
assert.Equal(t, "test-service", lastCall.Params.ServiceName)
179+
180+
md, ok := metadata.FromIncomingContext(lastCall.Context)
181+
assert.True(t, ok)
182+
for k, v := range tc.expectedHeaders {
183+
assert.Equal(t, []string{v}, md.Get(k))
184+
}
185+
})
186+
}
187+
}
188+
189+
func Test_InitServer_Failure(t *testing.T) {
190+
t.Run("fails if both http and grpc listen addresses are missing", func(t *testing.T) {
191+
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
192+
defer cancel()
193+
194+
fbit := &plugin.Fluentbit{
195+
Logger: newTestLogger(t),
196+
Conf: mapConfigLoader{
197+
"mode": "server",
198+
"server.endpoint": "dummy:1234",
199+
"server.service_names": "test-service",
200+
},
201+
}
202+
plug := &jaegerRemotePlugin{}
203+
204+
err := plug.Init(ctx, fbit)
205+
206+
assert.Error(t, err)
207+
assert.Contains(t, err.Error(), "neither 'server.http.listen_addr' nor 'server.grpc.listen_addr' are configured")
208+
})
209+
}
210+
211+
func getFreePort(t *testing.T) string {
212+
t.Helper()
213+
addr, err := net.ResolveTCPAddr("tcp", "localhost:0")
214+
assert.NoError(t, err, "failed to resolve free port")
215+
l, err := net.ListenTCP("tcp", addr)
216+
assert.NoError(t, err, "failed to listen on free port")
217+
defer l.Close()
218+
return l.Addr().String()
219+
}
220+
221+
func mapToString(m map[string]string) string {
222+
var parts []string
223+
for k, v := range m {
224+
parts = append(parts, fmt.Sprintf("%s=%s", k, v))
225+
}
226+
return strings.Join(parts, ",")
227+
}

0 commit comments

Comments
 (0)