Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 24 additions & 7 deletions command.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,10 +274,10 @@ func writeCmd(wr *proto.Writer, cmd Cmder) error {
return wr.WriteArgs(cmd.Args())
}

// cmdFirstKeyPos returns the position of the first key in the command's arguments.
// If the command does not have a key, it returns 0.
// TODO: Use the data in CommandInfo to determine the first key position.
func cmdFirstKeyPos(cmd Cmder) int {
// cmdFirstKeyPosWithInfo returns the first key position in a command's args (0 if none).
// Uses CommandInfo.FirstKeyPos when available (via cache peek, no network call), falling
// back to a hardcoded table. eval/evalsha variants are resolved from the runtime numkeys arg.
func cmdFirstKeyPosWithInfo(cmd Cmder, info *CommandInfo) int {
if pos := cmd.firstKeyPos(); pos != 0 {
return int(pos)
}
Expand All @@ -296,14 +296,20 @@ func cmdFirstKeyPos(cmd Cmder) int {
}

return 0
case "publish":
return 1
case "memory":
// https://github.com/redis/redis/issues/7493
if cmd.stringArg(1) == "usage" {
return 2
}
// CommandInfo (if available) gives the correct answer
// otherwise the hardcoded fallback applies.
}

// Use CommandInfo cache when warm (in-memory only, no extra round-trips).
if info != nil {
return int(info.FirstKeyPos)
}

return 1
}

Expand Down Expand Up @@ -4896,7 +4902,7 @@ type cmdsInfoCache struct {
fn func(ctx context.Context) (map[string]*CommandInfo, error)

once internal.Once
refreshLock sync.Mutex
refreshLock sync.RWMutex
cmds map[string]*CommandInfo
}

Expand Down Expand Up @@ -4936,6 +4942,17 @@ func (c *cmdsInfoCache) Refresh() {
c.once = internal.Once{}
}

// Peek returns the cached CommandInfo map without triggering a Redis round-trip.
// Returns nil when the cache is cold; callers should fall back to other heuristics.
func (c *cmdsInfoCache) Peek() map[string]*CommandInfo {
if c == nil {
return nil
}
c.refreshLock.RLock()
defer c.refreshLock.RUnlock()
return c.cmds
}
Comment thread
cursor[bot] marked this conversation as resolved.

// ------------------------------------------------------------------------------
const requestPolicy = "request_policy"
const responsePolicy = "response_policy"
Expand Down
13 changes: 11 additions & 2 deletions osscluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1915,7 +1915,7 @@ func (c *ClusterClient) slottedKeyedCommands(ctx context.Context, cmds []Cmder)

prefferedRandomSlot := -1
for _, cmd := range cmds {
if cmdFirstKeyPos(cmd) == 0 {
if cmdFirstKeyPosWithInfo(cmd, c.cmdInfoPeek(cmd.Name())) == 0 {
Comment thread
cursor[bot] marked this conversation as resolved.
continue
}

Expand Down Expand Up @@ -2305,13 +2305,22 @@ func (c *ClusterClient) cmdInfo(ctx context.Context, name string) *CommandInfo {
return info
}

// cmdInfoPeek returns the cached CommandInfo for the named command without
// triggering a round-trip to Redis. It returns nil when the cache is cold.
func (c *ClusterClient) cmdInfoPeek(name string) *CommandInfo {
if cmds := c.cmdsInfoCache.Peek(); cmds != nil {
return cmds[name]
}
return nil
}

func (c *ClusterClient) cmdSlot(cmd Cmder, prefferedSlot int) int {
args := cmd.Args()
if args[0] == "cluster" && (args[1] == "getkeysinslot" || args[1] == "countkeysinslot") {
return args[2].(int)
}

return cmdSlot(cmd, cmdFirstKeyPos(cmd), prefferedSlot)
return cmdSlot(cmd, cmdFirstKeyPosWithInfo(cmd, c.cmdInfoPeek(cmd.Name())), prefferedSlot)
}

func cmdSlot(cmd Cmder, pos int, prefferedRandomSlot int) int {
Expand Down
8 changes: 4 additions & 4 deletions osscluster_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (c *ClusterClient) executeOnAllShards(ctx context.Context, cmd Cmder, polic
// executeMultiShard handles commands that operate on multiple keys across shards
func (c *ClusterClient) executeMultiShard(ctx context.Context, cmd Cmder, policy *routing.CommandPolicy) error {
args := cmd.Args()
firstKeyPos := int(cmdFirstKeyPos(cmd))
firstKeyPos := cmdFirstKeyPosWithInfo(cmd, c.cmdInfoPeek(cmd.Name()))
stepCount := int(cmd.stepCount())
if stepCount == 0 {
stepCount = 1 // Default to 1 if not set
Expand Down Expand Up @@ -179,7 +179,7 @@ func (c *ClusterClient) executeMultiSlot(ctx context.Context, cmd Cmder, slotMap
// createSlotSpecificCommand creates a new command for a specific slot's keys
func (c *ClusterClient) createSlotSpecificCommand(ctx context.Context, originalCmd Cmder, keys []string) Cmder {
originalArgs := originalCmd.Args()
firstKeyPos := int(cmdFirstKeyPos(originalCmd))
firstKeyPos := cmdFirstKeyPosWithInfo(originalCmd, c.cmdInfoPeek(originalCmd.Name()))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent firstKeyPos from independent cache-dependent computations

Low Severity

createSlotSpecificCommand independently recomputes firstKeyPos via cmdFirstKeyPosWithInfo(originalCmd, c.cmdInfoPeek(...)), which can yield a different value than what executeMultiShard used to extract keys—if the cache transitions from cold to warm between calls. The old cmdFirstKeyPos was a pure function with no external state dependency, so this inconsistency was impossible before. The same pattern exists in slottedKeyedCommands where cmdFirstKeyPosWithInfo is called once to filter keyless commands and then again inside cmdSlot. If the cache warms between these calls, a command might pass the "has keys" check but then get routed as keyless.

Additional Locations (2)
Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit c956039. Configure here.


// Build new args with only the specified keys
newArgs := make([]interface{}, 0, firstKeyPos+len(keys))
Expand Down Expand Up @@ -467,7 +467,7 @@ func (c *ClusterClient) createAggregator(policy *routing.CommandPolicy, cmd Cmde
}

if !isKeyed {
firstKeyPos := cmdFirstKeyPos(cmd)
firstKeyPos := cmdFirstKeyPosWithInfo(cmd, c.cmdInfoPeek(cmd.Name()))
isKeyed = firstKeyPos > 0
}

Expand Down Expand Up @@ -500,7 +500,7 @@ func (c *ClusterClient) pickArbitraryNode(ctx context.Context) *clusterNode {

// hasKeys checks if a command operates on keys
func (c *ClusterClient) hasKeys(cmd Cmder) bool {
firstKeyPos := cmdFirstKeyPos(cmd)
firstKeyPos := cmdFirstKeyPosWithInfo(cmd, c.cmdInfoPeek(cmd.Name()))
return firstKeyPos > 0
}

Expand Down
18 changes: 16 additions & 2 deletions ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -771,8 +771,17 @@ func (c *Ring) cmdsInfo(ctx context.Context) (map[string]*CommandInfo, error) {
return nil, firstErr
}

// cmdInfoPeek returns the cached CommandInfo for the named command without
// triggering a round-trip to Redis. It returns nil when the cache is cold.
func (c *Ring) cmdInfoPeek(name string) *CommandInfo {
if cmds := c.cmdsInfoCache.Peek(); cmds != nil {
return cmds[name]
}
return nil
}

func (c *Ring) cmdShard(cmd Cmder) (*ringShard, error) {
pos := cmdFirstKeyPos(cmd)
pos := cmdFirstKeyPosWithInfo(cmd, c.cmdInfoPeek(cmd.Name()))
if pos == 0 {
return c.sharding.Random()
}
Expand Down Expand Up @@ -838,9 +847,14 @@ func (c *Ring) generalProcessPipeline(
}

cmdsMap := make(map[string][]Cmder)
cachedInfo := c.cmdsInfoCache.Peek()

for _, cmd := range cmds {
hash := cmd.stringArg(cmdFirstKeyPos(cmd))
var info *CommandInfo
if cachedInfo != nil {
info = cachedInfo[cmd.Name()]
}
hash := cmd.stringArg(cmdFirstKeyPosWithInfo(cmd, info))
if hash != "" {
hash = c.sharding.Hash(hash)
}
Expand Down
Loading