Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions controllers/redis_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,11 @@ func (r *RedisReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl
return ctrl.Result{}, err
}

err = k8sutils.CreateStandaloneRedis(instance)
err = k8sutils.CreateStandaloneRedis(instance, r.K8sClient)
if err != nil {
return ctrl.Result{}, err
}
err = k8sutils.CreateStandaloneService(instance)
err = k8sutils.CreateStandaloneService(instance, r.K8sClient)
if err != nil {
return ctrl.Result{}, err
}
Expand Down
16 changes: 8 additions & 8 deletions controllers/rediscluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,22 +117,22 @@ func (r *RedisClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request
}

if leaderReplicas != 0 {
err = k8sutils.CreateRedisLeaderService(instance)
err = k8sutils.CreateRedisLeaderService(instance, r.K8sClient)
if err != nil {
return ctrl.Result{}, err
}
}
err = k8sutils.CreateRedisLeader(instance)
err = k8sutils.CreateRedisLeader(instance, r.K8sClient)
if err != nil {
return ctrl.Result{}, err
}

err = k8sutils.ReconcileRedisPodDisruptionBudget(instance, "leader", instance.Spec.RedisLeader.PodDisruptionBudget)
err = k8sutils.ReconcileRedisPodDisruptionBudget(instance, "leader", instance.Spec.RedisLeader.PodDisruptionBudget, r.K8sClient)
if err != nil {
return ctrl.Result{}, err
}

redisLeaderInfo, err := k8sutils.GetStatefulSet(instance.Namespace, instance.ObjectMeta.Name+"-leader")
redisLeaderInfo, err := k8sutils.GetStatefulSet(instance.Namespace, instance.ObjectMeta.Name+"-leader", r.K8sClient)
if err != nil {
if errors.IsNotFound(err) {
return ctrl.Result{RequeueAfter: time.Second * 60}, nil
Expand All @@ -151,21 +151,21 @@ func (r *RedisClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request
}
// if we have followers create their service.
if followerReplicas != 0 {
err = k8sutils.CreateRedisFollowerService(instance)
err = k8sutils.CreateRedisFollowerService(instance, r.K8sClient)
if err != nil {
return ctrl.Result{}, err
}
}
err = k8sutils.CreateRedisFollower(instance)
err = k8sutils.CreateRedisFollower(instance, r.K8sClient)
if err != nil {
return ctrl.Result{}, err
}
err = k8sutils.ReconcileRedisPodDisruptionBudget(instance, "follower", instance.Spec.RedisFollower.PodDisruptionBudget)
err = k8sutils.ReconcileRedisPodDisruptionBudget(instance, "follower", instance.Spec.RedisFollower.PodDisruptionBudget, r.K8sClient)
if err != nil {
return ctrl.Result{}, err
}
}
redisFollowerInfo, err := k8sutils.GetStatefulSet(instance.Namespace, instance.ObjectMeta.Name+"-follower")
redisFollowerInfo, err := k8sutils.GetStatefulSet(instance.Namespace, instance.ObjectMeta.Name+"-follower", r.K8sClient)
if err != nil {
if errors.IsNotFound(err) {
return ctrl.Result{RequeueAfter: time.Second * 60}, nil
Expand Down
6 changes: 3 additions & 3 deletions controllers/redisreplication_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,18 @@ func (r *RedisReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Req
return ctrl.Result{}, err
}

err = k8sutils.CreateReplicationRedis(instance)
err = k8sutils.CreateReplicationRedis(instance, r.K8sClient)
if err != nil {
return ctrl.Result{}, err
}
err = k8sutils.CreateReplicationService(instance)
err = k8sutils.CreateReplicationService(instance, r.K8sClient)
if err != nil {
return ctrl.Result{}, err
}

// Set Pod distruptiuon Budget Later

redisReplicationInfo, err := k8sutils.GetStatefulSet(instance.Namespace, instance.ObjectMeta.Name)
redisReplicationInfo, err := k8sutils.GetStatefulSet(instance.Namespace, instance.ObjectMeta.Name, r.K8sClient)
if err != nil {
return ctrl.Result{RequeueAfter: time.Second * 60}, err
}
Expand Down
6 changes: 3 additions & 3 deletions controllers/redissentinel_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,18 @@ func (r *RedisSentinelReconciler) Reconcile(ctx context.Context, req ctrl.Reques
}

// Create Redis Sentinel
err = k8sutils.CreateRedisSentinel(ctx, r.K8sClient, r.Log, instance)
err = k8sutils.CreateRedisSentinel(ctx, r.K8sClient, r.Log, instance, r.K8sClient)
if err != nil {
return ctrl.Result{}, err
}

err = k8sutils.ReconcileSentinelPodDisruptionBudget(instance, instance.Spec.PodDisruptionBudget)
err = k8sutils.ReconcileSentinelPodDisruptionBudget(instance, instance.Spec.PodDisruptionBudget, r.K8sClient)
if err != nil {
return ctrl.Result{}, err
}

// Create the Service for Redis Sentinel
err = k8sutils.CreateRedisSentinelService(instance)
err = k8sutils.CreateRedisSentinelService(instance, r.K8sClient)
if err != nil {
return ctrl.Result{}, err
}
Expand Down
65 changes: 23 additions & 42 deletions k8sutils/poddisruption.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,24 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/kubernetes"
)

// CreateRedisLeaderPodDisruptionBudget check and create a PodDisruptionBudget for Leaders
func ReconcileRedisPodDisruptionBudget(cr *redisv1beta2.RedisCluster, role string, pdbParams *commonapi.RedisPodDisruptionBudget) error {
func ReconcileRedisPodDisruptionBudget(cr *redisv1beta2.RedisCluster, role string, pdbParams *commonapi.RedisPodDisruptionBudget, cl kubernetes.Interface) error {
pdbName := cr.ObjectMeta.Name + "-" + role
logger := pdbLogger(cr.Namespace, pdbName)
if pdbParams != nil && pdbParams.Enabled {
labels := getRedisLabels(cr.ObjectMeta.Name, cluster, role, cr.ObjectMeta.GetLabels())
annotations := generateStatefulSetsAnots(cr.ObjectMeta, cr.Spec.KubernetesConfig.IgnoreAnnotations)
pdbMeta := generateObjectMetaInformation(pdbName, cr.Namespace, labels, annotations)
pdbDef := generatePodDisruptionBudgetDef(cr, role, pdbMeta, cr.Spec.RedisLeader.PodDisruptionBudget)
return CreateOrUpdatePodDisruptionBudget(pdbDef)
return CreateOrUpdatePodDisruptionBudget(pdbDef, cl)
} else {
// Check if one exists, and delete it.
_, err := GetPodDisruptionBudget(cr.Namespace, pdbName)
_, err := GetPodDisruptionBudget(cr.Namespace, pdbName, cl)
if err == nil {
return deletePodDisruptionBudget(cr.Namespace, pdbName)
return deletePodDisruptionBudget(cr.Namespace, pdbName, cl)
} else if err != nil && errors.IsNotFound(err) {
logger.V(1).Info("Reconciliation Successful, no PodDisruptionBudget Found.")
// Its ok if its not found, as we're deleting anyway
Expand All @@ -38,20 +39,20 @@ func ReconcileRedisPodDisruptionBudget(cr *redisv1beta2.RedisCluster, role strin
}
}

func ReconcileSentinelPodDisruptionBudget(cr *redisv1beta2.RedisSentinel, pdbParams *commonapi.RedisPodDisruptionBudget) error {
func ReconcileSentinelPodDisruptionBudget(cr *redisv1beta2.RedisSentinel, pdbParams *commonapi.RedisPodDisruptionBudget, cl kubernetes.Interface) error {
pdbName := cr.ObjectMeta.Name + "-sentinel"
logger := pdbLogger(cr.Namespace, pdbName)
if pdbParams != nil && pdbParams.Enabled {
labels := getRedisLabels(cr.ObjectMeta.Name, sentinel, "sentinel", cr.ObjectMeta.GetLabels())
annotations := generateStatefulSetsAnots(cr.ObjectMeta, cr.Spec.KubernetesConfig.IgnoreAnnotations)
pdbMeta := generateObjectMetaInformation(pdbName, cr.Namespace, labels, annotations)
pdbDef := generateSentinelPodDisruptionBudgetDef(cr, "sentinel", pdbMeta, pdbParams)
return CreateOrUpdatePodDisruptionBudget(pdbDef)
return CreateOrUpdatePodDisruptionBudget(pdbDef, cl)
} else {
// Check if one exists, and delete it.
_, err := GetPodDisruptionBudget(cr.Namespace, pdbName)
_, err := GetPodDisruptionBudget(cr.Namespace, pdbName, cl)
if err == nil {
return deletePodDisruptionBudget(cr.Namespace, pdbName)
return deletePodDisruptionBudget(cr.Namespace, pdbName, cl)
} else if err != nil && errors.IsNotFound(err) {
logger.V(1).Info("Reconciliation Successful, no PodDisruptionBudget Found.")
// Its ok if its not found, as we're deleting anyway
Expand Down Expand Up @@ -116,24 +117,24 @@ func generateSentinelPodDisruptionBudgetDef(cr *redisv1beta2.RedisSentinel, role
}

// CreateOrUpdateService method will create or update Redis service
func CreateOrUpdatePodDisruptionBudget(pdbDef *policyv1.PodDisruptionBudget) error {
func CreateOrUpdatePodDisruptionBudget(pdbDef *policyv1.PodDisruptionBudget, cl kubernetes.Interface) error {
logger := pdbLogger(pdbDef.Namespace, pdbDef.Name)
storedPDB, err := GetPodDisruptionBudget(pdbDef.Namespace, pdbDef.Name)
storedPDB, err := GetPodDisruptionBudget(pdbDef.Namespace, pdbDef.Name, cl)
if err != nil {
if err := patch.DefaultAnnotator.SetLastAppliedAnnotation(pdbDef); err != nil { //nolint
logger.Error(err, "Unable to patch redis PodDisruptionBudget with comparison object")
return err
}
if errors.IsNotFound(err) {
return createPodDisruptionBudget(pdbDef.Namespace, pdbDef)
return createPodDisruptionBudget(pdbDef.Namespace, pdbDef, cl)
}
return err
}
return patchPodDisruptionBudget(storedPDB, pdbDef, pdbDef.Namespace)
return patchPodDisruptionBudget(storedPDB, pdbDef, pdbDef.Namespace, cl)
}

// patchPodDisruptionBudget will patch Redis Kubernetes PodDisruptionBudgets
func patchPodDisruptionBudget(storedPdb *policyv1.PodDisruptionBudget, newPdb *policyv1.PodDisruptionBudget, namespace string) error {
func patchPodDisruptionBudget(storedPdb *policyv1.PodDisruptionBudget, newPdb *policyv1.PodDisruptionBudget, namespace string, cl kubernetes.Interface) error {
logger := pdbLogger(namespace, storedPdb.Name)
// We want to try and keep this atomic as possible.
newPdb.ResourceVersion = storedPdb.ResourceVersion
Expand Down Expand Up @@ -169,20 +170,15 @@ func patchPodDisruptionBudget(storedPdb *policyv1.PodDisruptionBudget, newPdb *p
logger.Error(err, "Unable to patch redis PodDisruptionBudget with comparison object")
return err
}
return updatePodDisruptionBudget(namespace, newPdb)
return updatePodDisruptionBudget(namespace, newPdb, cl)
}
return nil
}

// createPodDisruptionBudget is a method to create PodDisruptionBudgets in Kubernetes
func createPodDisruptionBudget(namespace string, pdb *policyv1.PodDisruptionBudget) error {
func createPodDisruptionBudget(namespace string, pdb *policyv1.PodDisruptionBudget, cl kubernetes.Interface) error {
logger := pdbLogger(namespace, pdb.Name)
client, err := GenerateK8sClient(GenerateK8sConfig)
if err != nil {
logger.Error(err, "Could not generate kubernetes client")
return err
}
_, err = client.PolicyV1().PodDisruptionBudgets(namespace).Create(context.TODO(), pdb, metav1.CreateOptions{})
_, err := cl.PolicyV1().PodDisruptionBudgets(namespace).Create(context.TODO(), pdb, metav1.CreateOptions{})
if err != nil {
logger.Error(err, "Redis PodDisruptionBudget creation failed")
return err
Expand All @@ -192,14 +188,9 @@ func createPodDisruptionBudget(namespace string, pdb *policyv1.PodDisruptionBudg
}

// updatePodDisruptionBudget is a method to update PodDisruptionBudgets in Kubernetes
func updatePodDisruptionBudget(namespace string, pdb *policyv1.PodDisruptionBudget) error {
func updatePodDisruptionBudget(namespace string, pdb *policyv1.PodDisruptionBudget, cl kubernetes.Interface) error {
logger := pdbLogger(namespace, pdb.Name)
client, err := GenerateK8sClient(GenerateK8sConfig)
if err != nil {
logger.Error(err, "Could not generate kubernetes client")
return err
}
_, err = client.PolicyV1().PodDisruptionBudgets(namespace).Update(context.TODO(), pdb, metav1.UpdateOptions{})
_, err := cl.PolicyV1().PodDisruptionBudgets(namespace).Update(context.TODO(), pdb, metav1.UpdateOptions{})
if err != nil {
logger.Error(err, "Redis PodDisruptionBudget update failed")
return err
Expand All @@ -209,14 +200,9 @@ func updatePodDisruptionBudget(namespace string, pdb *policyv1.PodDisruptionBudg
}

// deletePodDisruptionBudget is a method to delete PodDisruptionBudgets in Kubernetes
func deletePodDisruptionBudget(namespace string, pdbName string) error {
func deletePodDisruptionBudget(namespace string, pdbName string, cl kubernetes.Interface) error {
logger := pdbLogger(namespace, pdbName)
client, err := GenerateK8sClient(GenerateK8sConfig)
if err != nil {
logger.Error(err, "Could not generate kubernetes client")
return err
}
err = client.PolicyV1().PodDisruptionBudgets(namespace).Delete(context.TODO(), pdbName, metav1.DeleteOptions{})
err := cl.PolicyV1().PodDisruptionBudgets(namespace).Delete(context.TODO(), pdbName, metav1.DeleteOptions{})
if err != nil {
logger.Error(err, "Redis PodDisruption deletion failed")
return err
Expand All @@ -226,17 +212,12 @@ func deletePodDisruptionBudget(namespace string, pdbName string) error {
}

// GetPodDisruptionBudget is a method to get PodDisruptionBudgets in Kubernetes
func GetPodDisruptionBudget(namespace string, pdb string) (*policyv1.PodDisruptionBudget, error) {
func GetPodDisruptionBudget(namespace string, pdb string, cl kubernetes.Interface) (*policyv1.PodDisruptionBudget, error) {
logger := pdbLogger(namespace, pdb)
client, err := GenerateK8sClient(GenerateK8sConfig)
if err != nil {
logger.Error(err, "Could not generate kubernetes client")
return nil, err
}
getOpts := metav1.GetOptions{
TypeMeta: generateMetaInformation("PodDisruptionBudget", "policy/v1"),
}
pdbInfo, err := client.PolicyV1().PodDisruptionBudgets(namespace).Get(context.TODO(), pdb, getOpts)
pdbInfo, err := cl.PolicyV1().PodDisruptionBudgets(namespace).Get(context.TODO(), pdb, getOpts)
if err != nil {
logger.V(1).Info("Redis PodDisruptionBudget get action failed")
return nil, err
Expand Down
Loading