Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Build the manager binary
FROM golang:1.21 as builder
FROM golang:1.21 AS builder
ARG BUILDOS
ARG BUILDPLATFORM
ARG BUILDARCH
Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ manifests: controller-gen
fmt:
go fmt ./...

lint: golangci-lint
$(GOLANGCI_LINT) run

# Run go vet against code
vet:
go vet ./...
Expand Down
4 changes: 2 additions & 2 deletions api/status/redis-cluster_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@ const (
RedisClusterInitializing RedisClusterState = "Initializing"
RedisClusterBootstrap RedisClusterState = "Bootstrap"
// RedisClusterReady means the RedisCluster is ready for use, we use redis-cli --cluster check 127.0.0.1:6379 to check the cluster status
RedisClusterReady RedisClusterState = "Ready"
// RedisClusterFailed RedisClusterState = "Failed"
RedisClusterReady RedisClusterState = "Ready"
RedisClusterFailed RedisClusterState = "Failed"
)
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ require (

require (
emperror.dev/errors v0.8.0 // indirect
github.com/avast/retry-go v3.0.0+incompatible // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ github.com/PuerkitoBio/purell v1.0.0/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbt
github.com/PuerkitoBio/urlesc v0.0.0-20160726150825-5bd2802263f2/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
github.com/avast/retry-go v3.0.0+incompatible h1:4SOWQ7Qs+oroOTQOYnAHqelpCO0biHSxpiH9JdtuBj0=
github.com/avast/retry-go v3.0.0+incompatible/go.mod h1:XtSnn+n/sHqQIpZ10K1qAevBhOOCWBLXXy3hyiqqBrY=
github.com/banzaicloud/k8s-objectmatcher v1.8.0 h1:Nugn25elKtPMTA2br+JgHNeSQ04sc05MDPmpJnd1N2A=
github.com/banzaicloud/k8s-objectmatcher v1.8.0/go.mod h1:p2LSNAjlECf07fbhDyebTkPUIYnU05G+WfGgkTmgeMg=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
Expand Down
40 changes: 35 additions & 5 deletions pkg/controllers/rediscluster/rediscluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ package rediscluster

import (
"context"
"fmt"
"time"

"github.com/OT-CONTAINER-KIT/redis-operator/api/status"
redisv1beta2 "github.com/OT-CONTAINER-KIT/redis-operator/api/v1beta2"
intctrlutil "github.com/OT-CONTAINER-KIT/redis-operator/pkg/controllerutil"
"github.com/OT-CONTAINER-KIT/redis-operator/pkg/k8sutils"
retry "github.com/avast/retry-go"
"github.com/go-logr/logr"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -45,7 +47,7 @@ type RedisClusterReconciler struct {

func (r *RedisClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
reqLogger := r.Log.WithValues("Request.Namespace", req.Namespace, "Request.Name", req.Name)
reqLogger.Info("Reconciling opstree redis Cluster controller")
reqLogger.V(1).Info("Reconciling opstree redis Cluster controller")
instance := &redisv1beta2.RedisCluster{}

err := r.Client.Get(context.TODO(), req.NamespacedName, instance)
Expand Down Expand Up @@ -186,13 +188,41 @@ func (r *RedisClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request
return intctrlutil.RequeueAfter(reqLogger, time.Second*60, "Redis cluster count is not desired", "Current.Count", nc, "Desired.Count", totalReplicas)
}

reqLogger.Info("Redis cluster count is desired")
if int(totalReplicas) > 1 && k8sutils.CheckRedisClusterState(ctx, r.K8sClient, r.Log, instance) >= int(totalReplicas)-1 {
reqLogger.Info("Redis leader is not desired, executing failover operation")
err = k8sutils.ExecuteFailoverOperation(ctx, r.K8sClient, r.Log, instance)
reqLogger.V(1).Info("Number of Redis nodes match desired")
unhealthyNodeCount, err := k8sutils.UnhealthyNodesInCluster(ctx, r.K8sClient, r.Log, instance)
if err != nil {
reqLogger.Error(err, "failed to determine unhealthy node count in cluster")
}
if int(totalReplicas) > 1 && unhealthyNodeCount >= int(totalReplicas)-1 {
err = k8sutils.UpdateRedisClusterStatus(instance, status.RedisClusterFailed, "RedisCluster has too many unhealthy nodes", leaderReplicas, followerReplicas, r.Dk8sClient)
if err != nil {
return intctrlutil.RequeueWithError(err, reqLogger, "")
}

reqLogger.Info("healthy leader count does not match desired; attempting to repair disconnected masters")
if err = k8sutils.RepairDisconnectedMasters(ctx, r.K8sClient, r.Log, instance); err != nil {
reqLogger.Error(err, "failed to repair disconnected masters")
}

err = retry.Do(func() error {
nc, nErr := k8sutils.UnhealthyNodesInCluster(ctx, r.K8sClient, r.Log, instance)
if nErr != nil {
return nErr
}
if nc == 0 {
return nil
}
return fmt.Errorf("%d unhealthy nodes", nc)
}, retry.Attempts(3), retry.Delay(time.Second*5))

if err == nil {
reqLogger.Info("repairing unhealthy masters successful, no unhealthy masters left")
return intctrlutil.RequeueAfter(reqLogger, time.Second*30, "no unhealthy nodes found after repairing disconnected masters")
}
reqLogger.Info("unhealthy nodes exist after attempting to repair disconnected masters; starting failover")
if err = k8sutils.ExecuteFailoverOperation(ctx, r.K8sClient, r.Log, instance); err != nil {
return intctrlutil.RequeueWithError(err, reqLogger, "")
}
}

// Check If there is No Empty Master Node
Expand Down
99 changes: 84 additions & 15 deletions pkg/k8sutils/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,53 @@ func CreateSingleLeaderRedisCommand(logger logr.Logger, cr *redisv1beta2.RedisCl
return cmd
}

// RepairDisconnectedMasters attempts to repair disconnected/failed masters by issuing
// a CLUSTER MEET with the updated address of the host
func RepairDisconnectedMasters(ctx context.Context, client kubernetes.Interface, logger logr.Logger, cr *redisv1beta2.RedisCluster) error {
redisClient := configureRedisClient(client, logger, cr, cr.ObjectMeta.Name+"-leader-0")
defer redisClient.Close()
return repairDisconnectedMasters(ctx, client, logger, cr, redisClient)
}

func repairDisconnectedMasters(ctx context.Context, client kubernetes.Interface, logger logr.Logger, cr *redisv1beta2.RedisCluster, redisClient *redis.Client) error {
nodes, err := clusterNodes(ctx, redisClient, logger)
if err != nil {
return err
}
masterNodeType := "master"
for _, node := range nodes {
if !nodeIsOfType(node, masterNodeType) {
continue
}
if !nodeFailedOrDisconnected(node) {
continue
}
log.V(1).Info("found disconnected master node", "node", node)
podName, err := getMasterHostFromClusterNode(node)
if err != nil {
return err
}
ip := getRedisServerIP(client, logger, RedisDetails{
PodName: podName,
Namespace: cr.Namespace,
})
err = redisClient.ClusterMeet(ctx, ip, strconv.Itoa(*cr.Spec.Port)).Err()
if err != nil {
return fmt.Errorf("failed to issue cluster meet: %w", err)
}
}
return nil
}

func getMasterHostFromClusterNode(node clusterNodesResponse) (string, error) {
addressAndHost := node[1]
s := strings.Split(addressAndHost, ",")
if len(s) != 2 {
return "", fmt.Errorf("failed to extract host from host and address string, unexpected number of elements: %d", len(s))
}
return strings.Split(addressAndHost, ",")[1], nil
}

// CreateMultipleLeaderRedisCommand will create command for single leader cluster creation
func CreateMultipleLeaderRedisCommand(client kubernetes.Interface, logger logr.Logger, cr *redisv1beta2.RedisCluster) []string {
cmd := []string{"redis-cli", "--cluster", "create"}
Expand Down Expand Up @@ -189,7 +236,10 @@ func ExecuteRedisReplicationCommand(ctx context.Context, client kubernetes.Inter
redisClient := configureRedisClient(client, logger, cr, cr.ObjectMeta.Name+"-leader-0")
defer redisClient.Close()

nodes := checkRedisCluster(ctx, redisClient, logger)
nodes, err := clusterNodes(ctx, redisClient, logger)
if err != nil {
logger.Error(err, "failed to get cluster nodes")
}
for followerIdx := 0; followerIdx <= int(followerCounts)-1; {
for i := 0; i < int(followerPerLeader) && followerIdx <= int(followerCounts)-1; i++ {
followerPod := RedisDetails{
Expand Down Expand Up @@ -225,22 +275,27 @@ func ExecuteRedisReplicationCommand(ctx context.Context, client kubernetes.Inter
}
}

// checkRedisCluster will check the redis cluster have sufficient nodes or not
func checkRedisCluster(ctx context.Context, redisClient *redis.Client, logger logr.Logger) [][]string {
type clusterNodesResponse []string

// clusterNodes will returns the response of CLUSTER NODES
func clusterNodes(ctx context.Context, redisClient *redis.Client, logger logr.Logger) ([]clusterNodesResponse, error) {
output, err := redisClient.ClusterNodes(ctx).Result()
if err != nil {
logger.Error(err, "Error in getting Redis cluster nodes")
return nil, err
}
logger.V(1).Info("Redis cluster nodes are listed", "Output", output)

csvOutput := csv.NewReader(strings.NewReader(output))
csvOutput.Comma = ' '
csvOutput.FieldsPerRecord = -1
csvOutputRecords, err := csvOutput.ReadAll()
if err != nil {
logger.Error(err, "Error parsing Node Counts", "output", output)
return nil, err
}
response := make([]clusterNodesResponse, 0, len(csvOutputRecords))
for _, record := range csvOutputRecords {
response = append(response, record)
}
return csvOutputRecords
return response, nil
}

// ExecuteFailoverOperation will execute redis failover operations
Expand Down Expand Up @@ -297,7 +352,10 @@ func CheckRedisNodeCount(ctx context.Context, client kubernetes.Interface, logge
redisClient := configureRedisClient(client, logger, cr, cr.ObjectMeta.Name+"-leader-0")
defer redisClient.Close()
var redisNodeType string
clusterNodes := checkRedisCluster(ctx, redisClient, logger)
clusterNodes, err := clusterNodes(ctx, redisClient, logger)
if err != nil {
logger.Error(err, "failed to get cluster nodes")
}
count := len(clusterNodes)

switch nodeType {
Expand All @@ -311,7 +369,7 @@ func CheckRedisNodeCount(ctx context.Context, client kubernetes.Interface, logge
if nodeType != "" {
count = 0
for _, node := range clusterNodes {
if strings.Contains(node[2], redisNodeType) {
if nodeIsOfType(node, redisNodeType) {
count++
}
}
Expand Down Expand Up @@ -350,19 +408,30 @@ func RedisClusterStatusHealth(ctx context.Context, client kubernetes.Interface,
return true
}

// CheckRedisClusterState will check the redis cluster state
func CheckRedisClusterState(ctx context.Context, client kubernetes.Interface, logger logr.Logger, cr *redisv1beta2.RedisCluster) int {
// UnhealthyNodesInCluster returns the number of unhealthy nodes in the cluster cr
func UnhealthyNodesInCluster(ctx context.Context, client kubernetes.Interface, logger logr.Logger, cr *redisv1beta2.RedisCluster) (int, error) {
redisClient := configureRedisClient(client, logger, cr, cr.ObjectMeta.Name+"-leader-0")
defer redisClient.Close()
clusterNodes := checkRedisCluster(ctx, redisClient, logger)
clusterNodes, err := clusterNodes(ctx, redisClient, logger)
if err != nil {
return 0, err
}
count := 0
for _, node := range clusterNodes {
if strings.Contains(node[2], "fail") || strings.Contains(node[7], "disconnected") {
if nodeFailedOrDisconnected(node) {
count++
}
}
logger.V(1).Info("Number of failed nodes in cluster", "Failed Node Count", count)
return count
return count, nil
}

func nodeIsOfType(node clusterNodesResponse, nodeType string) bool {
return strings.Contains(node[2], nodeType)
}

func nodeFailedOrDisconnected(node clusterNodesResponse) bool {
return strings.Contains(node[2], "fail") || strings.Contains(node[7], "disconnected")
}

// configureRedisClient will configure the Redis Client
Expand Down Expand Up @@ -469,7 +538,7 @@ func getContainerID(client kubernetes.Interface, logger logr.Logger, cr *redisv1
}

// checkRedisNodePresence will check if the redis node exist in cluster or not
func checkRedisNodePresence(cr *redisv1beta2.RedisCluster, nodeList [][]string, nodeName string) bool {
func checkRedisNodePresence(cr *redisv1beta2.RedisCluster, nodeList []clusterNodesResponse, nodeName string) bool {
logger := generateRedisManagerLogger(cr.Namespace, cr.ObjectMeta.Name)
logger.V(1).Info("Checking if Node is in cluster", "Node", nodeName)
for _, node := range nodeList {
Expand Down
53 changes: 47 additions & 6 deletions pkg/k8sutils/redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,15 @@ func TestCheckRedisNodePresence(t *testing.T) {
csvOutput := csv.NewReader(strings.NewReader(output))
csvOutput.Comma = ' '
csvOutput.FieldsPerRecord = -1
nodes, _ := csvOutput.ReadAll()
rawNodes, _ := csvOutput.ReadAll()

nodes := make([]clusterNodesResponse, 0, len(rawNodes))
for _, node := range rawNodes {
nodes = append(nodes, node)
}

tests := []struct {
nodes [][]string
nodes []clusterNodesResponse
ip string
want bool
}{
Expand All @@ -52,6 +57,40 @@ func TestCheckRedisNodePresence(t *testing.T) {
}
}

func TestRepairDisconnectedMasters(t *testing.T) {
ctx := context.Background()
redisClient, mock := redismock.NewClientMock()
mock.ExpectClusterNodes().SetVal(`
07c37dfeb235213a872192d90877d0cd55635b91 127.0.0.1:30004@31004,redis-cluster-follower-0 slave e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca 0 1426238317239 4 connected
67ed2db8d677e59ec4a4cefb06858cf2a1a89fa1 127.0.0.1:30002@31002,redis-cluster-leader-0 master - 0 1426238316232 2 disconnected 5461-10922
824fe116063bc5fcf9f4ffd895bc17aee7731ac3 127.0.0.1:30006@31006,redis-cluster-follower-1 slave 292f8b365bb7edb5e285caf0b7e6ddc7265d2f4f 0 1426238317741 6 disconnected
e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca 127.0.0.1:30001@31001,redis-cluster-leader-1 myself,master - 0 0 1 connected 0-5460
`)

namespace := "default"
newPodIP := "0.0.0.0"
k8sClient := k8sClientFake.NewSimpleClientset(&corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "redis-cluster-leader-0",
Namespace: namespace,
},
Status: corev1.PodStatus{
PodIP: newPodIP,
},
})
mock.ExpectClusterMeet(newPodIP, "6379").SetVal("OK")
port := 6379
err := repairDisconnectedMasters(ctx, k8sClient, logr.Discard(), &redisv1beta2.RedisCluster{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
},
Spec: redisv1beta2.RedisClusterSpec{
Port: &port,
},
}, redisClient)
assert.NoError(t, err)
}

func TestGetRedisServerIP(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -772,14 +811,14 @@ func Test_checkRedisServerRole(t *testing.T) {
}
}

func TestCheckRedisCluster(t *testing.T) {
func TestClusterNodes(t *testing.T) {
logger := logr.Discard() // Discard logs

tests := []struct {
name string
expectError error
clusterNodesOutput string
expectedResult [][]string
expectedResult []clusterNodesResponse
}{
{
name: "Detailed cluster nodes output",
Expand All @@ -789,7 +828,7 @@ func TestCheckRedisCluster(t *testing.T) {
6ec23923021cf3ffec47632106199cb7f496ce01 127.0.0.1:30005@31005,hostname5 slave 67ed2db8d677e59ec4a4cefb06858cf2a1a89fa1 0 1426238316232 5 connected
824fe116063bc5fcf9f4ffd895bc17aee7731ac3 127.0.0.1:30006@31006,hostname6 slave 292f8b365bb7edb5e285caf0b7e6ddc7265d2f4f 0 1426238317741 6 connected
e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca 127.0.0.1:30001@31001,hostname1 myself,master - 0 0 1 connected 0-5460`,
expectedResult: [][]string{
expectedResult: []clusterNodesResponse{
{"07c37dfeb235213a872192d90877d0cd55635b91", "127.0.0.1:30004@31004,hostname4", "slave", "e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca", "0", "1426238317239", "4", "connected"},
{"67ed2db8d677e59ec4a4cefb06858cf2a1a89fa1", "127.0.0.1:30002@31002,hostname2", "master", "-", "0", "1426238316232", "2", "connected", "5461-10922"},
{"292f8b365bb7edb5e285caf0b7e6ddc7265d2f4f", "127.0.0.1:30003@31003,hostname3", "master", "-", "0", "1426238318243", "3", "connected", "10923-16383"},
Expand All @@ -814,11 +853,13 @@ e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca 127.0.0.1:30001@31001,hostname1 myself,
} else {
mock.ExpectClusterNodes().SetVal(tc.clusterNodesOutput)
}
result := checkRedisCluster(context.TODO(), db, logger)
result, err := clusterNodes(context.TODO(), db, logger)

if tc.expectError != nil {
assert.Nil(t, result)
assert.Error(t, err)
} else {
assert.NoError(t, err)
assert.ElementsMatch(t, tc.expectedResult, result)
}

Expand Down
7 changes: 5 additions & 2 deletions tests/_config/chainsaw-configuration.yaml
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
---
# yaml-language-server: $schema=https://raw.githubusercontent.com/kyverno/chainsaw/main/.schemas/json/configuration-chainsaw-v1alpha1.json
apiVersion: chainsaw.kyverno.io/v1alpha1
apiVersion: chainsaw.kyverno.io/v1alpha2
kind: Configuration
metadata:
name: chainsaw-configuration
spec:
delayBeforeCleanup: 10s
execution:
failFast: true
cleanup:
delayBeforeCleanup: 10s
timeouts:
apply: 5m
delete: 5m
Expand Down
Loading