Skip to content

XREAD accept multiple streams but only one id resulting multiple issues #3694

@jtorkkel

Description

@jtorkkel

Expected Behavior

XREAD should support multiple ID, one per stream

Redis XREAD syntax is

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]

Redis XREAD support reading from multiple streams, redis-cli mandates that each stream has own ID. That is logical as each stream can be trimmed very differently, Unique ID needed when reading streams larger than COUNT, ID used to continue streaming from right location.

127.0.0.1:6379> XREAD COUNT 100 STREAMS stream-1 stream-2 1769414803181-0 1769417819561-0

Also Java lettuce for example support own ID for multiple stream, see redis-cli>monitor output. See XREAD command given by lettuce when reading 3 separate streams

1769435290.365489 [0 172.18.0.1:63222] "XREAD" "COUNT" "1000" "STREAMS" "stream-1" "stream-2" "stream-3" "1769414825724-0" "1769418971541-0" "0"

Current Behavior

In Go-Redis when XREAD is used to read multiple streams []string it is possible to set only one ID (string).

This result that same ID is used for reading all streams. This means that client needs to select one of the ID and that results

  • data lost if highest ID taken, because COUNT might have limited the number of messages
  • same data read multiple times if smallest ID is taken.

Monitor when go-redis is used, second XREAD after one of the returned ID is passed to next requests

1769439443.897530 [0 192.168.65.1:23864] "xread" "count" "1000" "streams" "uid-stream" "session-id-stream" "jti-stream" "1769438569498-0" "1769438569498-0" "1769438569498-0"

Possible Solution

The problem originates to XReadArgs . ID is defined as string, thus only one value can be passed.

type XReadArgs struct {
    Streams []string  // slice allows multiple streams
    Count int64
    Block time.Duration
    ID string   // ID incorrectly string, only one allowed
}

Options

  1. Change ID to []string and pass ID to Redis as expected. This is potentially breaking change
  2. Ad new fields IDs, which is used when set, fallback to single ID for compatibility
  3. Change ID type to interface. When creating redis XREAD ID, check if type is string, then keep current behavior, if type is []string then apply multiple. If len(streams) != len(ID), just pass to Redis. Redis will return error

Workaround:
When using XREAD pass only one stream to API. When multiple streams needs to be read then multiple calls are required.
Also using the smallest ID might work but result reading same data multiple times.

Multiple streams are needed to simulate different TTL for each stream, and single XREAD is preferable way to read multiple streams.

Steps to Reproduce

  1. Add data to stream-1: XADD stream-1 cmd ADD key XXX value expired ttl 200
  2. Add data to stream-2: XADD stream-2 cmd ADD key YYY value expired ttl 200
  3. Turn on cli monitor in redis
  4. Read both stream in single XREAD, start from beginning XREAD COUNT 100 STREAMS stream-1 stream-2 "0" "0", note in go-redis you can pass only one id, in reply you get ID for both stream. In first call ID is normally "0".
  5. Add mode data to both stream, you can use same XADD data ow new one with different values.
  6. Now you should use per stream ID to read data XREAD COUNT 100 STREAMS stream-1 stream-2 "last-id-1" "last-id2", but in go-redis you can only pass one id.

Context (Environment)

redis 8.4 in docker
Windows 11 go-redis 9.17.2
go 1.24.2

Detailed Description

I think the above description is detailed enough

Possible Implementation

This is solution for option 3 (use ID type interface

type XReadArgs struct {
    Streams []string  // slice allows multiple streams
    Count int64
    Block time.Duration
    ID interface{}   // interface for backward compatibility
}

Issue is here Same ID set to every stream

	switch v := a.ID.(type) {
	case string:
		for range a.Streams {
			args = append(args, a.ID)
		}
	case []string:
		for _, id := range v {
			args = append(args, id)
		}
	default:
		// handle unexpected type
	}

	/*
		if a.ID != "" {
			for range a.Streams {
				args = append(args, a.ID)
			}
		}
	*/

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions