Skip to content

Commit ad6f12f

Browse files
committed
xinfo-groups: support nil lag in XINFO GROUPS (redis#3369)
* xinfo-groups: support nil lag in XINFO GROUPS * Add test * docs: clarify XInfoGroup.Lag field behavior with Nil values * docs: clarify XInfoGroup.Lag field behavior
1 parent 03a95d1 commit ad6f12f

File tree

2 files changed

+36
-1
lines changed

2 files changed

+36
-1
lines changed

command.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2559,7 +2559,9 @@ type XInfoGroup struct {
25592559
Pending int64
25602560
LastDeliveredID string
25612561
EntriesRead int64
2562-
Lag int64
2562+
// Lag represents the number of pending messages in the stream not yet
2563+
// delivered to this consumer group. Returns -1 when the lag cannot be determined.
2564+
Lag int64
25632565
}
25642566

25652567
var _ Cmder = (*XInfoGroupsCmd)(nil)
@@ -2643,8 +2645,11 @@ func (cmd *XInfoGroupsCmd) readReply(rd *proto.Reader) error {
26432645

26442646
// lag: the number of entries in the stream that are still waiting to be delivered
26452647
// to the group's consumers, or a NULL(Nil) when that number can't be determined.
2648+
// In that case, we return -1.
26462649
if err != nil && err != Nil {
26472650
return err
2651+
} else if err == Nil {
2652+
group.Lag = -1
26482653
}
26492654
default:
26502655
return fmt.Errorf("redis: unexpected key %q in XINFO GROUPS reply", key)

commands_test.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6789,6 +6789,36 @@ var _ = Describe("Commands", func() {
67896789
}))
67906790
})
67916791

6792+
It("should return -1 for nil lag in XINFO GROUPS", func() {
6793+
_, err := client.XAdd(ctx, &redis.XAddArgs{Stream: "s", ID: "0-1", Values: []string{"foo", "1"}}).Result()
6794+
Expect(err).NotTo(HaveOccurred())
6795+
6796+
client.XAdd(ctx, &redis.XAddArgs{Stream: "s", ID: "0-2", Values: []string{"foo", "2"}})
6797+
Expect(err).NotTo(HaveOccurred())
6798+
client.XAdd(ctx, &redis.XAddArgs{Stream: "s", ID: "0-3", Values: []string{"foo", "3"}})
6799+
Expect(err).NotTo(HaveOccurred())
6800+
6801+
err = client.XGroupCreate(ctx, "s", "g", "0").Err()
6802+
Expect(err).NotTo(HaveOccurred())
6803+
err = client.XReadGroup(ctx, &redis.XReadGroupArgs{Group: "g", Consumer: "c", Streams: []string{"s", ">"}, Count: 1, Block: -1, NoAck: false}).Err()
6804+
Expect(err).NotTo(HaveOccurred())
6805+
6806+
client.XDel(ctx, "s", "0-2")
6807+
6808+
res, err := client.XInfoGroups(ctx, "s").Result()
6809+
Expect(err).NotTo(HaveOccurred())
6810+
Expect(res).To(Equal([]redis.XInfoGroup{
6811+
{
6812+
Name: "g",
6813+
Consumers: 1,
6814+
Pending: 1,
6815+
LastDeliveredID: "0-1",
6816+
EntriesRead: 1,
6817+
Lag: -1, // nil lag from Redis is reported as -1
6818+
},
6819+
}))
6820+
})
6821+
67926822
It("should XINFO CONSUMERS", func() {
67936823
res, err := client.XInfoConsumers(ctx, "stream", "group1").Result()
67946824
Expect(err).NotTo(HaveOccurred())

0 commit comments

Comments
 (0)