Skip to content

Commit 6075dff

Browse files
authored
Merge pull request #5 from fluent/add-proxiable-implemantations
Add proxiable implemantations
2 parents 79be4c3 + dcdfcc4 commit 6075dff

File tree

9 files changed

+1808
-246
lines changed

9 files changed

+1808
-246
lines changed

README.md

Lines changed: 113 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# Custom Go Plugin for Jaeger
22

3-
**Note:** This plugin is under heavily development and needed to use with this Fluent Bit's PR: https://github.com/fluent/fluent-bit/pull/10299
3+
**Note:** This plugin is under heavily development.
44

55
This plugin implements Jaeger Remote Sampling protocol on Golang cutom plugin mechanism.
66

@@ -26,21 +26,125 @@ custom_jeager_remote.so
2626

2727
For custom jeager plugin, we need to add custom section in fluent-bit conf:
2828

29+
### Basic structure
30+
31+
```ini
32+
[CUSTOM]
33+
# The name registered in the Go code ("jaeger_remote")
34+
Name jaeger_remote
35+
36+
# --- Mode Selection ---
37+
# Can be: "client", "server", or "all" (default)
38+
Mode all
39+
40+
# --- Add parameters for the selected mode below ---
41+
```
42+
43+
### Mode: client
44+
45+
In this mode, the plugin acts as an OpenTelemetry client, fetching sampling strategies from an external server.
46+
47+
```ini
48+
[CUSTOM]
49+
Name jaeger_remote
50+
Mode client
51+
52+
# URL of the OTLP-compatible collector to send traces to.
53+
client.server_url http://localhost:4318
54+
55+
# URL of the Jaeger-compatible sampling server.
56+
client.sampling_url http://localhost:5778/sampling
57+
```
58+
59+
### Mode: Server
60+
61+
In this mode, the plugin polls a Jaeger Collector for strategies and serves them via its own HTTP and/or gRPC endpoints. You can enable one or both by providing their listen addresses.
62+
63+
```ini
64+
[CUSTOM]
65+
Name jaeger_remote
66+
Mode server
67+
68+
# --- Connection to Jaeger Collector ---
69+
server.endpoint jaeger-collector:14250
70+
server.service_names frontend,backend,database
71+
72+
# --- Exposed Endpoints (enable one or both) ---
73+
server.http.listen_addr 0.0.0.0:8899
74+
server.grpc.listen_addr 0.0.0.0:9099
75+
76+
# --- Advanced Connection Settings (Optional) ---
77+
server.retry.initial_interval 10s
78+
server.keepalive.time 30s
79+
```
80+
81+
### Mode: server (Local File)
82+
83+
This mode loads a static strategy file and serves it via HTTP and/or gRPC. It does not connect to a remote Jaeger Collector.
84+
2985
```
3086
[CUSTOM]
31-
Name jaeger_remote
32-
server_url "http://localhost:14268"
33-
sampling_url "http://localhost:5778/sampling"
34-
rate 5s
87+
Name jaeger_remote
88+
Mode server
89+
90+
# --- Local Strategy File ---
91+
server.strategy_file /path/to/my_strategies.json
92+
93+
# --- Exposed Endpoints ---
94+
server.http.listen_addr 0.0.0.0:8899
3595
```
3696

37-
For plugins.yaml,
97+
### Mode: All
3898

39-
```yaml
40-
plugins:
41-
- /path/to/custom_jeager_remote.so
99+
This mode enables both client and server functionalities simultaneously.
100+
101+
```ini
102+
[CUSTOM]
103+
Name jaeger_remote
104+
Mode all
105+
106+
# --- Client Parameters ---
107+
client.server_url http://localhost:4318
108+
client.sampling_url http://localhost:5778/sampling
109+
110+
# --- Server Parameters ---
111+
server.endpoint jaeger-collector:14250
112+
server.service_names frontend,backend,database
113+
server.http.listen_addr 0.0.0.0:8899
114+
server.grpc.listen_addr 0.0.0.0:9099
42115
```
43116

117+
## Parameter Reference
118+
119+
| Key | Mode | Description | Default |
120+
| ---------------------------------------- | ------------- | ------------------------------------------------------------------------------------------------------- | ------------------------ |
121+
| `mode` | **Global** | Sets the operating mode. Can be `client`, `server`, or `all`. | `all` |
122+
| **Client Settings** | | | |
123+
| `client.server_url` | `client`/`all` | The endpoint URL of the OTLP collector to which traces will be sent. | **Required** |
124+
| `client.sampling_url` | `client`/`all` | The URL of the Jaeger-compatible sampling server to poll for strategies. | **Required** |
125+
| **Server Settings** | | | |
126+
| `server.endpoint` | `server`/`all` | The gRPC endpoint of the Jaeger Collector to poll for sampling strategies. **Mutually exclusive** with server.strategy_file. | **Required** |
127+
{ `server.strategy_file` | `server`/`all` | Path to a local JSON file containing sampling strategies. **Mutually exclusive** with `server.endpoint`. | ` ` (Disabled) |
128+
| `server.service_names` | `server`/`all` | A comma-separated list of service names to fetch strategies for. | **Required** |
129+
| `server.http.listen_addr` | `server`/`all` | The address and port for the internal HTTP server to listen on. If empty, the HTTP server is disabled. | ` ` (Disabled) |
130+
| `server.grpc.listen_addr` | `server`/`all` | The address and port for the internal gRPC server to listen on. If empty, the gRPC server is disabled. | ` ` (Disabled) |
131+
| `server.headers` | `server`/`all` | Comma-separated key=value pairs to add as gRPC metadata to requests to the Jaeger Collector. | ` ` |
132+
| **Server TLS Settings** | | | |
133+
| `server.tls.insecure` | `server`/`all` | If `true`, TLS certificate verification is skipped when connecting to the Jaeger Collector. | `false` |
134+
| `server.tls.server_name_override` | `server`/`all` | Overrides the server name used for TLS validation. | ` ` |
135+
| `server.tls.ca_file` | `server`/`all` | Path to the CA certificate file for verifying the Jaeger Collector's certificate. | ` ` |
136+
| `server.tls.cert_file` | `server`/`all` | Path to the client's TLS certificate file. | ` ` |
137+
| `server.tls.key_file` | `server`/`all` | Path to the client's TLS private key file. | ` ` |
138+
| **Server Keepalive Settings** | | | |
139+
| `server.keepalive.time` | `server`/`all` | Interval to send keepalive pings to the Jaeger Collector. If not set, keepalive is disabled. | ` ` (Disabled) |
140+
| `server.keepalive.timeout` | `server`/`all` | Time to wait for a keepalive ack before considering the connection dead. | `20s` |
141+
| `server.keepalive.permit_without_stream` | `server`/`all` | If `true`, allows pings to be sent even if there are no active streams. | `true` |
142+
| **Server Retry Settings** | | | |
143+
| `server.retry.initial_interval` | `server`/`all` | The initial time to wait before retrying a failed connection to the Jaeger Collector. | `5s` |
144+
| `server.retry.max_interval` | `server`/`all` | The maximum time to wait between retries. | `5m` |
145+
| `server.retry.multiplier` | `server`/`all` | The factor by which the retry interval is multiplied after each failed attempt. Must be > 1.0. | `1.5` |
146+
| `server.retry.max_retry` | `server`/`all` | The factor by which the maximum retries after each failed attempt. Must be > 1.0. | `10` |
147+
44148
## Run
45149

46150
Put c-chared object in the specified path and run fluent-bit:

config.go

Lines changed: 215 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,215 @@
1+
package main
2+
3+
import (
4+
"crypto/tls"
5+
"crypto/x509"
6+
"errors"
7+
"io/ioutil"
8+
"strconv"
9+
"strings"
10+
"time"
11+
12+
"github.com/calyptia/plugin"
13+
)
14+
15+
func loadTLSConfig(cfg TLSSettings) (*tls.Config, error) {
16+
tlsConfig := &tls.Config{InsecureSkipVerify: cfg.Insecure, ServerName: cfg.ServerName}
17+
if cfg.CAFile != "" {
18+
caBytes, err := ioutil.ReadFile(cfg.CAFile)
19+
if err != nil {
20+
return nil, err
21+
}
22+
caPool := x509.NewCertPool()
23+
caPool.AppendCertsFromPEM(caBytes)
24+
tlsConfig.RootCAs = caPool
25+
}
26+
if cfg.CertFile != "" && cfg.KeyFile != "" {
27+
cert, err := tls.LoadX509KeyPair(cfg.CertFile, cfg.KeyFile)
28+
if err != nil {
29+
return nil, err
30+
}
31+
tlsConfig.Certificates = []tls.Certificate{cert}
32+
}
33+
return tlsConfig, nil
34+
}
35+
36+
func parseCorsConfig(cfg *Config, conf plugin.ConfigLoader) (*Config, error) {
37+
if cfg == nil {
38+
return nil, errors.New("cfg must not nil")
39+
}
40+
if corsStr := conf.String("server.cors.allowed_origins"); corsStr != "" {
41+
for _, origin := range strings.Split(corsStr, ",") {
42+
trimmedOrigin := strings.TrimSpace(origin)
43+
if trimmedOrigin != "" { // Only append non-empty origins
44+
cfg.ServerCors.AllowedOrigins = append(cfg.ServerCors.AllowedOrigins, trimmedOrigin)
45+
}
46+
}
47+
}
48+
49+
return cfg, nil
50+
}
51+
52+
func loadConfig(fbit *plugin.Fluentbit) (*Config, error) {
53+
log := fbit.Logger
54+
cfg := &Config{
55+
Mode: fbit.Conf.String("mode"),
56+
ClientServerURL: fbit.Conf.String("client.server_url"),
57+
ClientSamplingURL: fbit.Conf.String("client.sampling_url"),
58+
ServerEndpoint: fbit.Conf.String("server.endpoint"),
59+
ServerStrategyFile: fbit.Conf.String("server.strategy_file"),
60+
ServerHttpListenAddr: fbit.Conf.String("server.http.listen_addr"),
61+
ServerGrpcListenAddr: fbit.Conf.String("server.grpc.listen_addr"),
62+
ServerHeaders: parseHeaders(fbit.Conf.String("server.headers")),
63+
}
64+
65+
cfg, err := parseCorsConfig(cfg, fbit.Conf)
66+
if err != nil {
67+
return nil, err
68+
}
69+
70+
if cfg.Mode == "" {
71+
cfg.Mode = "all"
72+
}
73+
74+
if cfg.Mode == "client" || cfg.Mode == "all" {
75+
if cfg.ClientServerURL == "" {
76+
return nil, errors.New("'client.server_url' is required for client/all mode")
77+
}
78+
if cfg.ClientSamplingURL == "" {
79+
return nil, errors.New("'client.sampling_url' is required for client/all mode")
80+
}
81+
}
82+
83+
if cfg.Mode == "server" || cfg.Mode == "all" {
84+
if cfg.ServerEndpoint == "" && cfg.ServerStrategyFile == "" {
85+
return nil, errors.New("for server mode, either 'server.endpoint' or 'server.strategy_file' must be configured")
86+
}
87+
if cfg.ServerEndpoint != "" && cfg.ServerStrategyFile != "" {
88+
return nil, errors.New("'server.endpoint' and 'server.strategy_file' are mutually exclusive")
89+
}
90+
91+
// Remote-only server settings
92+
if cfg.ServerEndpoint != "" {
93+
if valStr := fbit.Conf.String("server.reload_interval"); valStr != "" {
94+
val, err := time.ParseDuration(valStr)
95+
if err != nil {
96+
log.Warn("could not parse 'server.reload_interval', using default of 5m. error: %v", err)
97+
cfg.ServerReloadInterval = 5 * time.Minute
98+
} else {
99+
cfg.ServerReloadInterval = val
100+
}
101+
} else {
102+
cfg.ServerReloadInterval = 5 * time.Minute // Default TTL
103+
}
104+
105+
cfg.ServerTLS = TLSSettings{
106+
ServerName: fbit.Conf.String("server.tls.server_name_override"),
107+
CAFile: fbit.Conf.String("server.tls.ca_file"),
108+
CertFile: fbit.Conf.String("server.tls.cert_file"),
109+
KeyFile: fbit.Conf.String("server.tls.key_file"),
110+
}
111+
if insecureStr := fbit.Conf.String("server.tls.insecure"); insecureStr != "" {
112+
cfg.ServerTLS.Insecure, _ = strconv.ParseBool(insecureStr)
113+
}
114+
115+
if kaTimeStr := fbit.Conf.String("server.keepalive.time"); kaTimeStr != "" {
116+
cfg.ServerKeepalive = &KeepaliveConfig{}
117+
kaTime, err := time.ParseDuration(kaTimeStr)
118+
if err != nil {
119+
log.Warn("could not parse 'server.keepalive.time', skipping keepalive config. error: %v", err)
120+
cfg.ServerKeepalive = nil
121+
} else {
122+
cfg.ServerKeepalive.Time = kaTime
123+
kaTimeoutStr := fbit.Conf.String("server.keepalive.timeout")
124+
if kaTimeoutStr == "" {
125+
cfg.ServerKeepalive.Timeout = 20 * time.Second
126+
} else {
127+
kaTimeout, err := time.ParseDuration(kaTimeoutStr)
128+
if err != nil {
129+
log.Warn("could not parse 'server.keepalive.timeout', using default of 20s. error: %v", err)
130+
cfg.ServerKeepalive.Timeout = 20 * time.Second
131+
} else {
132+
cfg.ServerKeepalive.Timeout = kaTimeout
133+
}
134+
}
135+
kaPermitStr := fbit.Conf.String("server.keepalive.permit_without_stream")
136+
if kaPermitStr == "" {
137+
cfg.ServerKeepalive.PermitWithoutStream = true
138+
} else {
139+
kaPermit, err := strconv.ParseBool(kaPermitStr)
140+
if err != nil {
141+
log.Warn("could not parse 'server.keepalive.permit_without_stream', using default of true. error: %v", err)
142+
cfg.ServerKeepalive.PermitWithoutStream = true
143+
} else {
144+
cfg.ServerKeepalive.PermitWithoutStream = kaPermit
145+
}
146+
}
147+
}
148+
}
149+
150+
cfg.ServerRetry = &RetryConfig{}
151+
if valStr := fbit.Conf.String("server.retry.initial_interval"); valStr != "" {
152+
val, err := time.ParseDuration(valStr)
153+
if err != nil {
154+
log.Warn("could not parse 'server.retry.initial_interval', using default of 5s. error: %v", err)
155+
cfg.ServerRetry.InitialInterval = 5 * time.Second
156+
} else {
157+
cfg.ServerRetry.InitialInterval = val
158+
}
159+
} else {
160+
cfg.ServerRetry.InitialInterval = 5 * time.Second
161+
}
162+
if valStr := fbit.Conf.String("server.retry.max_interval"); valStr != "" {
163+
val, err := time.ParseDuration(valStr)
164+
if err != nil {
165+
log.Warn("could not parse 'server.retry.max_interval', using default of 5m. error: %v", err)
166+
cfg.ServerRetry.MaxInterval = 5 * time.Minute
167+
} else {
168+
cfg.ServerRetry.MaxInterval = val
169+
}
170+
} else {
171+
cfg.ServerRetry.MaxInterval = 5 * time.Minute
172+
}
173+
174+
if valStr := fbit.Conf.String("server.retry.max_retry"); valStr != "" {
175+
val, err := strconv.ParseInt(valStr, 10, 0)
176+
if err != nil {
177+
log.Warn("could not parse 'server.retry.max_retry', using default of 5m. error: %v", err)
178+
cfg.ServerRetry.MaxRetry = 10
179+
} else {
180+
cfg.ServerRetry.MaxRetry = val
181+
}
182+
} else {
183+
cfg.ServerRetry.MaxRetry = 10
184+
}
185+
if cfg.ServerRetry.MaxRetry <= 0 {
186+
log.Warn("'server.retry.multiplier' must be > 0, using default of 10")
187+
cfg.ServerRetry.MaxRetry = 10
188+
}
189+
if valStr := fbit.Conf.String("server.retry.multiplier"); valStr != "" {
190+
val, err := strconv.ParseFloat(valStr, 64)
191+
if err != nil {
192+
log.Warn("could not parse 'server.retry.multiplier', using default of 1.5. error: %v", err)
193+
cfg.ServerRetry.Multiplier = 1.5
194+
} else {
195+
cfg.ServerRetry.Multiplier = val
196+
}
197+
} else {
198+
cfg.ServerRetry.Multiplier = 1.5
199+
}
200+
if cfg.ServerRetry.Multiplier <= 1.0 {
201+
log.Warn("'server.retry.multiplier' must be > 1.0, using default of 1.5")
202+
cfg.ServerRetry.Multiplier = 1.5
203+
}
204+
205+
serviceNamesStr := fbit.Conf.String("server.service_names")
206+
if serviceNamesStr == "" {
207+
return nil, errors.New("'server.service_names' (comma-separated) is required for server/all mode with a remote endpoint")
208+
}
209+
for _, s := range strings.Split(serviceNamesStr, ",") {
210+
cfg.ServerServiceNames = append(cfg.ServerServiceNames, strings.TrimSpace(s))
211+
}
212+
}
213+
}
214+
return cfg, nil
215+
}

0 commit comments

Comments
 (0)