Skip to content

Commit c0b1db9

Browse files
authored
[clickhouse] Implement FindTraceIDs for ClickHouse Storage For Primitive Parameters (#7648)
<!-- !! Please DELETE this comment before posting. We appreciate your contribution to the Jaeger project! 👋🎉 --> ## Which problem is this PR solving? - Towards #7134 ## Description of the changes - This PR implements the `FindTraceIDs` function for ClickHouse Storage. It only adds the primitive query parameters (service name and operation name). The other query parameters will be added in follow-up PRs. ## How was this change tested? - Unit tests ## Checklist - [x] I have read https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md - [x] I have signed all commits - [x] I have added unit tests for the new functionality - [x] I have run lint and test steps successfully - for `jaeger`: `make lint test` - for `jaeger-ui`: `npm run lint` and `npm run test` --------- Signed-off-by: Mahad Zaryab <mahadzaryab1@gmail.com> Signed-off-by: Mahad Zaryab <43658574+mahadzaryab1@users.noreply.github.com>
1 parent 3501efe commit c0b1db9

File tree

6 files changed

+371
-18
lines changed

6 files changed

+371
-18
lines changed

internal/storage/v2/clickhouse/config.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,13 @@ import (
1111
"go.opentelemetry.io/collector/config/configoptional"
1212
)
1313

14+
const (
15+
defaultProtocol = "native"
16+
defaultDatabase = "jaeger"
17+
defaultSearchDepth = 1000
18+
defaultMaxSearchDepth = 10000
19+
)
20+
1421
type Configuration struct {
1522
// Protocol is the protocol to use to connect to ClickHouse.
1623
// Supported values are "native" and "http". Default is "native".
@@ -25,6 +32,13 @@ type Configuration struct {
2532
DialTimeout time.Duration `mapstructure:"dial_timeout"`
2633
// CreateSchema, if set to true, will create the ClickHouse schema if it does not exist.
2734
CreateSchema bool `mapstructure:"create_schema"`
35+
// DefaultSearchDepth is the default search depth for queries.
36+
// This is the maximum number of trace IDs that will be returned when searching for traces
37+
// if a limit is not specified in the query.
38+
DefaultSearchDepth int `mapstructure:"default_search_depth"`
39+
// MaxSearchDepth is the maximum allowed search depth for queries.
40+
// This limits the number of trace IDs that can be returned when searching for traces.
41+
MaxSearchDepth int `mapstructure:"max_search_depth"`
2842
// TODO: add more settings
2943
}
3044

@@ -37,3 +51,18 @@ func (cfg *Configuration) Validate() error {
3751
_, err := govalidator.ValidateStruct(cfg)
3852
return err
3953
}
54+
55+
func (cfg *Configuration) applyDefaults() {
56+
if cfg.Protocol == "" {
57+
cfg.Protocol = "native"
58+
}
59+
if cfg.Database == "" {
60+
cfg.Database = defaultDatabase
61+
}
62+
if cfg.DefaultSearchDepth == 0 {
63+
cfg.DefaultSearchDepth = defaultSearchDepth
64+
}
65+
if cfg.MaxSearchDepth == 0 {
66+
cfg.MaxSearchDepth = defaultMaxSearchDepth
67+
}
68+
}

internal/storage/v2/clickhouse/config_test.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,3 +82,13 @@ func TestValidate(t *testing.T) {
8282
})
8383
}
8484
}
85+
86+
func TestConfigurationApplyDefaults(t *testing.T) {
87+
config := &Configuration{}
88+
config.applyDefaults()
89+
90+
require.Equal(t, defaultProtocol, config.Protocol)
91+
require.Equal(t, defaultDatabase, config.Database)
92+
require.Equal(t, defaultSearchDepth, config.DefaultSearchDepth)
93+
require.Equal(t, defaultMaxSearchDepth, config.MaxSearchDepth)
94+
}

internal/storage/v2/clickhouse/factory.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ type Factory struct {
3535
}
3636

3737
func NewFactory(ctx context.Context, cfg Configuration, telset telemetry.Settings) (*Factory, error) {
38+
cfg.applyDefaults()
3839
f := &Factory{
3940
config: cfg,
4041
telset: telset,
@@ -88,7 +89,10 @@ func NewFactory(ctx context.Context, cfg Configuration, telset telemetry.Setting
8889
}
8990

9091
func (f *Factory) CreateTraceReader() (tracestore.Reader, error) {
91-
return chtracestore.NewReader(f.conn), nil
92+
return chtracestore.NewReader(f.conn, chtracestore.ReaderConfig{
93+
DefaultSearchDepth: f.config.DefaultSearchDepth,
94+
MaxSearchDepth: f.config.MaxSearchDepth,
95+
}), nil
9296
}
9397

9498
func (f *Factory) CreateTraceWriter() (tracestore.Writer, error) {

internal/storage/v2/clickhouse/sql/queries.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,13 @@ WHERE
206206
trace_id = ?
207207
`
208208

209+
// SearchTraceIDs is the base SQL fragment used by FindTraceIDs.
210+
//
211+
// The query begins with a no-op predicate (`WHERE 1=1`) so that additional
212+
// filters can be appended unconditionally using `AND` without needing to check
213+
// whether this is the first WHERE clause.
214+
const SearchTraceIDs = `SELECT DISTINCT trace_id FROM spans WHERE 1=1`
215+
209216
const SelectServices = `
210217
SELECT DISTINCT
211218
name

internal/storage/v2/clickhouse/tracestore/reader.go

Lines changed: 72 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,12 @@ package tracestore
55

66
import (
77
"context"
8+
"encoding/hex"
89
"fmt"
910
"iter"
1011

1112
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
13+
"go.opentelemetry.io/collector/pdata/pcommon"
1214
"go.opentelemetry.io/collector/pdata/ptrace"
1315

1416
"github.com/jaegertracing/jaeger/internal/storage/v2/api/tracestore"
@@ -18,17 +20,27 @@ import (
1820

1921
var _ tracestore.Reader = (*Reader)(nil)
2022

23+
type ReaderConfig struct {
24+
// DefaultSearchDepth is the default number of trace IDs to return when searching for traces.
25+
// This value is used when the SearchDepth field in TraceQueryParams is not set.
26+
DefaultSearchDepth int
27+
// MaxSearchDepth is the maximum number of trace IDs that can be returned when searching for traces.
28+
// This value is used to limit the SearchDepth field in TraceQueryParams.
29+
MaxSearchDepth int
30+
}
31+
2132
type Reader struct {
22-
conn driver.Conn
33+
conn driver.Conn
34+
config ReaderConfig
2335
}
2436

2537
// NewReader returns a new Reader instance that uses the given ClickHouse connection
2638
// to read trace data.
2739
//
2840
// The provided connection is used exclusively for reading traces, meaning it is safe
2941
// to enable instrumentation on the connection without risk of recursively generating traces.
30-
func NewReader(conn driver.Conn) *Reader {
31-
return &Reader{conn: conn}
42+
func NewReader(conn driver.Conn, cfg ReaderConfig) *Reader {
43+
return &Reader{conn: conn, config: cfg}
3244
}
3345

3446
func (r *Reader) GetTraces(
@@ -129,9 +141,62 @@ func (*Reader) FindTraces(
129141
panic("not implemented")
130142
}
131143

132-
func (*Reader) FindTraceIDs(
133-
context.Context,
134-
tracestore.TraceQueryParams,
144+
func readRowIntoTraceID(rows driver.Rows) ([]tracestore.FoundTraceID, error) {
145+
var str string
146+
147+
if err := rows.Scan(&str); err != nil {
148+
return nil, fmt.Errorf("failed to scan row: %w", err)
149+
}
150+
151+
b, err := hex.DecodeString(str)
152+
if err != nil {
153+
return nil, fmt.Errorf("failed to decode trace ID: %w", err)
154+
}
155+
156+
return []tracestore.FoundTraceID{
157+
{TraceID: pcommon.TraceID(b)},
158+
}, nil
159+
}
160+
161+
func (r *Reader) FindTraceIDs(
162+
ctx context.Context,
163+
query tracestore.TraceQueryParams,
135164
) iter.Seq2[[]tracestore.FoundTraceID, error] {
136-
panic("not implemented")
165+
return func(yield func([]tracestore.FoundTraceID, error) bool) {
166+
q := sql.SearchTraceIDs
167+
args := []any{}
168+
169+
if query.ServiceName != "" {
170+
q += " AND service_name = ?"
171+
args = append(args, query.ServiceName)
172+
}
173+
if query.OperationName != "" {
174+
q += " AND name = ?"
175+
args = append(args, query.OperationName)
176+
}
177+
q += " LIMIT ?"
178+
if query.SearchDepth > 0 {
179+
if query.SearchDepth > r.config.MaxSearchDepth {
180+
yield(nil, fmt.Errorf("search depth %d exceeds maximum allowed %d", query.SearchDepth, r.config.MaxSearchDepth))
181+
return
182+
}
183+
args = append(args, query.SearchDepth)
184+
} else {
185+
args = append(args, r.config.DefaultSearchDepth)
186+
}
187+
188+
rows, err := r.conn.Query(ctx, q, args...)
189+
if err != nil {
190+
yield(nil, fmt.Errorf("failed to query trace IDs: %w", err))
191+
return
192+
}
193+
defer rows.Close()
194+
195+
for rows.Next() {
196+
traceID, err := readRowIntoTraceID(rows)
197+
if !yield(traceID, err) {
198+
return
199+
}
200+
}
201+
}
137202
}

0 commit comments

Comments
 (0)