Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion pkg/controllers/rediscluster/rediscluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu

// Check if the cluster is downscaled
if leaderCount := k8sutils.CheckRedisNodeCount(ctx, r.K8sClient, instance, "leader"); leaderReplicas < leaderCount {
if !(r.IsStatefulSetReady(ctx, instance.Namespace, instance.Name+"-leader") && r.IsStatefulSetReady(ctx, instance.Namespace, instance.Name+"-follower")) {
return intctrlutil.Reconciled()
}

logger.Info("Redis cluster is downscaling...", "Current.LeaderReplicas", leaderCount, "Desired.LeaderReplicas", leaderReplicas)
for shardIdx := leaderCount - 1; shardIdx >= leaderReplicas; shardIdx-- {
logger.Info("Remove the shard", "Shard.Index", shardIdx)
Expand All @@ -83,7 +87,11 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
// lastLeaderPod is slaving right now Make it the master Pod
// We have to bring a manual failover here to make it a leaderPod
// clusterFailover should also include the clusterReplicate since we have to map the followers to new leader
k8sutils.ClusterFailover(ctx, r.K8sClient, instance)
logger.Info("Cluster Failover is initiated", "Shard.Index", shardIdx)
if err = k8sutils.ClusterFailover(ctx, r.K8sClient, instance); err != nil {
logger.Error(err, "Failed to initiate cluster failover")
return intctrlutil.RequeueWithError(ctx, err, "")
}
}
// Step 1 Remove the Follower Node
k8sutils.RemoveRedisFollowerNodesFromCluster(ctx, r.K8sClient, instance)
Expand Down
18 changes: 13 additions & 5 deletions pkg/k8sutils/cluster-scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ func verifyLeaderPodInfo(ctx context.Context, redisClient *redis.Client, podName
return false
}

func ClusterFailover(ctx context.Context, client kubernetes.Interface, cr *redisv1beta2.RedisCluster) {
func ClusterFailover(ctx context.Context, client kubernetes.Interface, cr *redisv1beta2.RedisCluster) error {
slavePodName := cr.Name + "-leader-" + strconv.Itoa(int(CheckRedisNodeCount(ctx, client, cr, "leader"))-1)
// cmd = redis-cli cluster failover -a <pass>
var cmd []string
Expand All @@ -400,13 +400,15 @@ func ClusterFailover(ctx context.Context, client kubernetes.Interface, cr *redis
Namespace: cr.Namespace,
}

cmd = []string{"redis-cli", "cluster", "failover"}
cmd = []string{"redis-cli", "-h"}

if *cr.Spec.ClusterVersion == "v7" {
cmd = append(cmd, getRedisHostname(pod, cr, "leader")+fmt.Sprintf(":%d", *cr.Spec.Port))
cmd = append(cmd, getRedisHostname(pod, cr, "leader"))
} else {
cmd = append(cmd, getRedisServerAddress(ctx, client, pod, *cr.Spec.Port))
cmd = append(cmd, getRedisServerIP(ctx, client, pod))
}
cmd = append(cmd, "-p")
cmd = append(cmd, strconv.Itoa(*cr.Spec.Port))

if cr.Spec.KubernetesConfig.ExistingPasswordSecret != nil {
pass, err := getRedisPassword(ctx, client, cr.Namespace, *cr.Spec.KubernetesConfig.ExistingPasswordSecret.Name, *cr.Spec.KubernetesConfig.ExistingPasswordSecret.Key)
Expand All @@ -418,7 +420,13 @@ func ClusterFailover(ctx context.Context, client kubernetes.Interface, cr *redis
}

cmd = append(cmd, getRedisTLSArgs(cr.Spec.TLS, slavePodName)...)
cmd = append(cmd, "cluster", "failover")

log.FromContext(ctx).V(1).Info("Redis cluster failover command is", "Command", cmd)
executeCommand(ctx, client, cr, cmd, slavePodName)
execOut, err := executeCommand1(ctx, client, cr, cmd, slavePodName)
if err != nil {
log.FromContext(ctx).Error(err, "Could not execute command", "Command", cmd, "Output", execOut)
return err
}
return nil
}