forked from grafana/tempo
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpartition_offset_client.go
More file actions
119 lines (99 loc) · 4.35 KB
/
partition_offset_client.go
File metadata and controls
119 lines (99 loc) · 4.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
// SPDX-License-Identifier: AGPL-3.0-only
package ingest
import (
"context"
"fmt"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/kmsg"
)
const (
// kafkaOffsetStart is a special offset value that means the beginning of the partition.
kafkaOffsetStart = int64(-2)
// kafkaOffsetEnd is a special offset value that means the end of the partition.
kafkaOffsetEnd = int64(-1)
)
// PartitionOffsetClient is a client used to read partition offsets.
type PartitionOffsetClient struct {
client *kgo.Client
topic string
}
func NewPartitionOffsetClient(client *kgo.Client, topic string) *PartitionOffsetClient {
return &PartitionOffsetClient{
client: client,
topic: topic,
}
}
// FetchPartitionsLastProducedOffsets fetches and returns the last produced offsets for all input partitions. The returned offsets for each partition
// are guaranteed to be always updated (no stale or cached offsets returned).
// The Kafka client used under the hood may retry a failed request until the retry timeout is hit.
func (p *PartitionOffsetClient) FetchPartitionsLastProducedOffsets(ctx context.Context, partitionIDs []int32) (_ kadm.ListedOffsets, returnErr error) {
// Skip lookup and don't track any metric if no partition was requested.
if len(partitionIDs) == 0 {
return nil, nil
}
return p.fetchPartitionsOffsets(ctx, kafkaOffsetEnd, partitionIDs)
}
// // FetchPartitionsStartOffsets fetches and returns the earliest available offsets for all input partitions. The returned offsets for each partition
// are guaranteed to be always updated (no stale or cached offsets returned).
// The Kafka client used under the hood may retry a failed request until the retry timeout is hit.
func (p *PartitionOffsetClient) FetchPartitionsStartProducedOffsets(ctx context.Context, partitionIDs []int32) (_ kadm.ListedOffsets, returnErr error) {
// Skip lookup and don't track any metric if no partition was requested.
if len(partitionIDs) == 0 {
return nil, nil
}
return p.fetchPartitionsOffsets(ctx, kafkaOffsetStart, partitionIDs)
}
// fetchPartitionsOffsets fetches and returns offsets for the specified partitions using Kafka's ListOffsets API.
// The fetchOffset parameter determines which offsets to retrieve:
// - kafkaOffsetStart (-2): earliest available offset in each partition
// - kafkaOffsetEnd (-1): next offset to be produced (high watermark) in each partition
// - specific timestamp: offset of the first message at or after the given timestamp
//
// This function returns an error if it fails to get the offset of any partition (no partial results are returned).
// The Kafka ListOffsets API is documented here: https://github.com/twmb/franz-go/blob/master/pkg/kmsg/generated.go#L5781-L5808
func (p *PartitionOffsetClient) fetchPartitionsOffsets(ctx context.Context, fetchOffset int64, partitionIDs []int32) (kadm.ListedOffsets, error) {
list := kadm.ListedOffsets{
p.topic: make(map[int32]kadm.ListedOffset, len(partitionIDs)),
}
// Prepare the request to list offsets.
topicReq := kmsg.NewListOffsetsRequestTopic()
topicReq.Topic = p.topic
for _, partitionID := range partitionIDs {
partitionReq := kmsg.NewListOffsetsRequestTopicPartition()
partitionReq.Partition = partitionID
partitionReq.Timestamp = fetchOffset
topicReq.Partitions = append(topicReq.Partitions, partitionReq)
}
req := kmsg.NewPtrListOffsetsRequest()
req.IsolationLevel = 0 // 0 means READ_UNCOMMITTED.
req.Topics = []kmsg.ListOffsetsRequestTopic{topicReq}
// Execute the request.
shards := p.client.RequestSharded(ctx, req)
for _, shard := range shards {
if shard.Err != nil {
return nil, shard.Err
}
res := shard.Resp.(*kmsg.ListOffsetsResponse)
if len(res.Topics) != 1 {
return nil, fmt.Errorf("unexpected number of topics in the response (expected: %d, got: %d)", 1, len(res.Topics))
}
if res.Topics[0].Topic != p.topic {
return nil, fmt.Errorf("unexpected topic in the response (expected: %s, got: %s)", p.topic, res.Topics[0].Topic)
}
for _, pt := range res.Topics[0].Partitions {
if err := kerr.ErrorForCode(pt.ErrorCode); err != nil {
return nil, err
}
list[p.topic][pt.Partition] = kadm.ListedOffset{
Topic: p.topic,
Partition: pt.Partition,
Timestamp: pt.Timestamp,
Offset: pt.Offset,
LeaderEpoch: pt.LeaderEpoch,
}
}
}
return list, nil
}