diff --git a/charts/fleet-crd/templates/crds.yaml b/charts/fleet-crd/templates/crds.yaml index 88f7c7b3d9..5e9d7aab43 100644 --- a/charts/fleet-crd/templates/crds.yaml +++ b/charts/fleet-crd/templates/crds.yaml @@ -6388,6 +6388,14 @@ spec: sha256sum: description: SHA256Sum of the Content field type: string + status: + description: ContentStatus defines the observed state of Content + properties: + referenceCount: + description: ReferenceCount is the number of BundleDeployments that + currently reference this Content resource. + type: integer + type: object type: object served: true storage: true diff --git a/charts/fleet/ci/debug-values.yaml b/charts/fleet/ci/debug-values.yaml index d7d1d78e8c..69040c6963 100644 --- a/charts/fleet/ci/debug-values.yaml +++ b/charts/fleet/ci/debug-values.yaml @@ -52,6 +52,7 @@ controller: bundle: "1" bundledeployment: "1" schedule: "1" + content: "1" shards: - id: shard0 diff --git a/charts/fleet/ci/nobootstrap-values.yaml b/charts/fleet/ci/nobootstrap-values.yaml index 2e50af55eb..4d6565f8c6 100644 --- a/charts/fleet/ci/nobootstrap-values.yaml +++ b/charts/fleet/ci/nobootstrap-values.yaml @@ -51,6 +51,7 @@ controller: bundle: "1" bundledeployment: "1" schedule: "1" + content: "1" shards: - id: shard0 diff --git a/charts/fleet/ci/nodebug-values.yaml b/charts/fleet/ci/nodebug-values.yaml index 77a22a4aac..0f2b3c49da 100644 --- a/charts/fleet/ci/nodebug-values.yaml +++ b/charts/fleet/ci/nodebug-values.yaml @@ -51,6 +51,7 @@ controller: bundle: "1" bundledeployment: "1" schedule: "1" + content: "1" shards: - id: shard0 diff --git a/charts/fleet/ci/nogitops-values.yaml b/charts/fleet/ci/nogitops-values.yaml index 6fe3b3c044..5238095fdf 100644 --- a/charts/fleet/ci/nogitops-values.yaml +++ b/charts/fleet/ci/nogitops-values.yaml @@ -51,6 +51,7 @@ controller: bundle: "1" bundledeployment: "1" schedule: "1" + content: "1" shards: - id: shard0 diff --git a/charts/fleet/ci/nohelmops-values.yaml b/charts/fleet/ci/nohelmops-values.yaml index 6d4182918d..b29719210c 100644 --- a/charts/fleet/ci/nohelmops-values.yaml +++ b/charts/fleet/ci/nohelmops-values.yaml @@ -51,6 +51,7 @@ controller: bundle: "1" bundledeployment: "1" schedule: "1" + content: "1" shards: - id: shard0 diff --git a/charts/fleet/templates/deployment.yaml b/charts/fleet/templates/deployment.yaml index c9d9959aa2..651ff6c273 100644 --- a/charts/fleet/templates/deployment.yaml +++ b/charts/fleet/templates/deployment.yaml @@ -93,6 +93,10 @@ spec: - name: SCHEDULE_RECONCILER_WORKERS value: {{ quote $.Values.controller.reconciler.workers.schedule }} {{- end }} + {{- if $.Values.controller.reconciler.workers.content }} + - name: CONTENT_RECONCILER_WORKERS + value: {{ quote $.Values.controller.reconciler.workers.content }} + {{- end }} {{- if $.Values.extraEnv }} {{ toYaml $.Values.extraEnv | indent 8}} {{- end }} diff --git a/charts/fleet/values.yaml b/charts/fleet/values.yaml index e4bbe21c3d..1a537cd084 100644 --- a/charts/fleet/values.yaml +++ b/charts/fleet/values.yaml @@ -129,6 +129,7 @@ controller: clustergroup: "50" imagescan: "50" schedule: "50" + content: "50" gitjob: replicas: 1 diff --git a/integrationtests/gitjob/controller/suite_test.go b/integrationtests/gitjob/controller/suite_test.go index 84e89f67bf..c4d54a2e6b 100644 --- a/integrationtests/gitjob/controller/suite_test.go +++ b/integrationtests/gitjob/controller/suite_test.go @@ -17,6 +17,7 @@ import ( appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + "github.com/rancher/fleet/internal/cmd/controller/gitops" "github.com/rancher/fleet/internal/cmd/controller/gitops/reconciler" ctrlreconciler "github.com/rancher/fleet/internal/cmd/controller/reconciler" "github.com/rancher/fleet/internal/cmd/controller/target" @@ -89,6 +90,9 @@ var _ = BeforeSuite(func() { }) Expect(err).ToNot(HaveOccurred()) + Expect(gitops.AddRepoNameLabelIndexer(ctx, mgr)).ToNot(HaveOccurred()) + Expect(gitops.AddImageScanGitRepoIndexer(ctx, mgr)).ToNot(HaveOccurred()) + ctlr := gomock.NewController(GinkgoT()) // redirect logs to a buffer that we can read in the tests diff --git a/internal/cmd/controller/finalize/finalize.go b/internal/cmd/controller/finalize/finalize.go index 0094be2170..d94bb9caee 100644 --- a/internal/cmd/controller/finalize/finalize.go +++ b/internal/cmd/controller/finalize/finalize.go @@ -6,15 +6,11 @@ import ( "strings" "github.com/rancher/fleet/pkg/apis/fleet.cattle.io/v1alpha1" - "github.com/rancher/wrangler/v3/pkg/kv" - corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/util/retry" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" - "sigs.k8s.io/controller-runtime/pkg/log" ) const ( @@ -66,64 +62,6 @@ func PurgeBundles(ctx context.Context, c client.Client, gitrepo types.Namespaced return nil } -// PurgeContent tries to delete the content resource related with the given bundle deployment. -func PurgeContent(ctx context.Context, c client.Client, name, deplID string) error { - contentID, _ := kv.Split(deplID, ":") - content := &v1alpha1.Content{} - if err := c.Get(ctx, types.NamespacedName{Name: contentID}, content); err != nil { - return client.IgnoreNotFound(err) - } - - logger := log.FromContext(ctx).WithName("purge-content").WithValues("contentID", contentID, "finalizerName", name) - - nn := types.NamespacedName{Name: content.Name} - if controllerutil.ContainsFinalizer(content, name) { - err := retry.RetryOnConflict(retry.DefaultRetry, func() error { - if err := c.Get(ctx, nn, content); err != nil { - return client.IgnoreNotFound(err) - } - - controllerutil.RemoveFinalizer(content, name) - - return c.Update(ctx, content) - }) - if err != nil { - return err - } - - logger.V(1).Info("Removed finalizer from content resource") - } - - if len(content.Finalizers) == 0 { - if err := c.Delete(ctx, content); err != nil { - return err - } - logger.V(1).Info("Deleted content resource") - } - - return nil -} - -// PurgeImageScans deletes all ImageScan resources related with the given GitRepo namespaces name. -func PurgeImageScans(ctx context.Context, c client.Client, gitrepo types.NamespacedName) error { - images := &v1alpha1.ImageScanList{} - err := c.List(ctx, images, client.InNamespace(gitrepo.Namespace)) - if err != nil { - return err - } - - for _, image := range images.Items { - if image.Spec.GitRepoName == gitrepo.Name { - err := c.Delete(ctx, &image) - if err != nil { - return err - } - } - - } - return nil -} - // PurgeNamespace deletes the given namespace if deleteNamespace is set to true. // It ignores the following namespaces, that are considered as default by fleet or kubernetes: // fleet-local, cattle-fleet-system, fleet-default, cattle-fleet-clusters-system, default @@ -168,3 +106,14 @@ func EnsureFinalizer(ctx context.Context, c client.Client, obj client.Object, fi controllerutil.AddFinalizer(obj, finalizer) return c.Update(ctx, obj) } + +func PurgeTargetNamespaceIfNeeded(ctx context.Context, c client.Client, gitrepo *v1alpha1.GitRepo) error { + deleteNamespace := gitrepo.Spec.DeleteNamespace + namespace := gitrepo.Spec.TargetNamespace + + if gitrepo.Spec.KeepResources { + deleteNamespace = false + } + + return PurgeNamespace(ctx, c, deleteNamespace, namespace) +} diff --git a/internal/cmd/controller/gitops/operator.go b/internal/cmd/controller/gitops/operator.go index 5e211c9853..e0507b8006 100644 --- a/internal/cmd/controller/gitops/operator.go +++ b/internal/cmd/controller/gitops/operator.go @@ -20,11 +20,13 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" clog "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/log/zap" + "sigs.k8s.io/controller-runtime/pkg/manager" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" command "github.com/rancher/fleet/internal/cmd" "github.com/rancher/fleet/internal/cmd/controller/gitops/reconciler" fcreconciler "github.com/rancher/fleet/internal/cmd/controller/reconciler" + "github.com/rancher/fleet/internal/config" "github.com/rancher/fleet/internal/metrics" "github.com/rancher/fleet/internal/ssh" fleet "github.com/rancher/fleet/pkg/apis/fleet.cattle.io/v1alpha1" @@ -140,6 +142,18 @@ func (g *GitOperator) Run(cmd *cobra.Command, args []string) error { kh := ssh.KnownHosts{EnforceHostKeyChecks: !g.SkipHostKeyChecks} + // Add an indexer for the Gitrepo name label as that will make accesses in the cache + // faster + if err := AddRepoNameLabelIndexer(ctx, mgr); err != nil { + return err + } + + // Add an indexer for the GitRepo name field in ImageScans as that will make accesses in the cache + // faster + if err := AddImageScanGitRepoIndexer(ctx, mgr); err != nil { + return err + } + gitJobReconciler := &reconciler.GitJobReconciler{ Client: mgr.GetClient(), Scheme: mgr.GetScheme(), @@ -245,3 +259,40 @@ func startWebhook(ctx context.Context, namespace string, addr string, client cli return nil } + +func AddRepoNameLabelIndexer(ctx context.Context, mgr manager.Manager) error { + return mgr.GetFieldIndexer().IndexField( + ctx, + &fleet.Bundle{}, + config.RepoNameIndex, + func(obj client.Object) []string { + content, ok := obj.(*fleet.Bundle) + if !ok { + return nil + } + if name, exists := content.Labels[fleet.RepoLabel]; exists { + return []string{name} + } + + return nil + }, + ) +} + +func AddImageScanGitRepoIndexer(ctx context.Context, mgr manager.Manager) error { + return mgr.GetFieldIndexer().IndexField( + ctx, + &fleet.ImageScan{}, + config.ImageScanGitRepoIndex, + func(obj client.Object) []string { + content, ok := obj.(*fleet.ImageScan) + if !ok { + return nil + } + if content.Spec.GitRepoName == "" { + return nil + } + return []string{content.Spec.GitRepoName} + }, + ) +} diff --git a/internal/cmd/controller/gitops/reconciler/gitjob_controller.go b/internal/cmd/controller/gitops/reconciler/gitjob_controller.go index a55ff6efbb..4e5260faf2 100644 --- a/internal/cmd/controller/gitops/reconciler/gitjob_controller.go +++ b/internal/cmd/controller/gitops/reconciler/gitjob_controller.go @@ -16,6 +16,7 @@ import ( "github.com/rancher/fleet/internal/cmd/controller/imagescan" ctrlquartz "github.com/rancher/fleet/internal/cmd/controller/quartz" "github.com/rancher/fleet/internal/cmd/controller/reconciler" + "github.com/rancher/fleet/internal/config" "github.com/rancher/fleet/internal/metrics" v1alpha1 "github.com/rancher/fleet/pkg/apis/fleet.cattle.io/v1alpha1" "github.com/rancher/fleet/pkg/durations" @@ -29,6 +30,7 @@ import ( batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/labels" @@ -56,6 +58,10 @@ const ( // make sure Prometheus scrapes them. ShortLivedMetricsTTL = 120 * time.Second gitJobPollingJitterPercent = 10 + + // period after which the GitRepo reconciler is re-scheduled, + // in order to wait for the dependent resources cleanup to finish + requeueAfterResourceCleanup = 2 * time.Second ) var ( @@ -179,9 +185,7 @@ func (r *GitJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr if !gitrepo.DeletionTimestamp.IsZero() { if controllerutil.ContainsFinalizer(gitrepo, finalize.GitRepoFinalizer) { - if err := r.cleanupGitRepo(ctx, logger, gitrepo); err != nil { - return ctrl.Result{}, err - } + return r.handleDelete(ctx, logger, gitrepo) } return ctrl.Result{}, nil @@ -357,25 +361,50 @@ func (r *GitJobReconciler) deletePreviousJob(ctx context.Context, logger logr.Lo return r.Delete(ctx, &job) } -func (r *GitJobReconciler) cleanupGitRepo(ctx context.Context, logger logr.Logger, gitrepo *v1alpha1.GitRepo) error { +func (r *GitJobReconciler) handleDelete(ctx context.Context, logger logr.Logger, gitrepo *v1alpha1.GitRepo) (ctrl.Result, error) { logger.Info("Gitrepo deleted, deleting bundle, image scans") - metrics.GitRepoCollector.Delete(gitrepo.Name, gitrepo.Namespace) _ = r.deletePollingJob(*gitrepo) - nsName := types.NamespacedName{Name: gitrepo.Name, Namespace: gitrepo.Namespace} - if err := finalize.PurgeBundles(ctx, r.Client, nsName, v1alpha1.RepoLabel); err != nil { - return err + if !controllerutil.ContainsFinalizer(gitrepo, finalize.GitRepoFinalizer) { + return ctrl.Result{}, nil + } + + bundles, err := r.listBundlesForGitrepo(ctx, gitrepo) + if err != nil { + return ctrl.Result{}, err + } + + // Bundle deletion happens asynchronously: mark them for deletion and requeue + // This ensures the Gitrepo is kept around until all its Bundles are completely deleted. + if len(bundles.Items) > 0 { + logger.V(1).Info("GitRepo deleted, purging bundles") + return ctrl.Result{RequeueAfter: requeueAfterResourceCleanup}, batchDeleteDependentResources(ctx, r.Client, bundles) } // remove the job scheduled by imagescan, if any _ = r.Scheduler.DeleteJob(imagescan.GitCommitKey(gitrepo.Namespace, gitrepo.Name)) - if err := finalize.PurgeImageScans(ctx, r.Client, nsName); err != nil { - return err + images, err := r.listImageScansForGitrepo(ctx, gitrepo) + if err != nil { + return ctrl.Result{}, err + } + + if len(images.Items) > 0 { + logger.V(1).Info("GitRepo deleted, purging imagescans") + return ctrl.Result{RequeueAfter: requeueAfterResourceCleanup}, batchDeleteDependentResources(ctx, r.Client, images) } - err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + // Delete the target namespace if DeleteNamespace is true + if err := finalize.PurgeTargetNamespaceIfNeeded(ctx, r.Client, gitrepo); err != nil { + return ctrl.Result{}, err + } + + metrics.GitRepoCollector.Delete(gitrepo.Name, gitrepo.Namespace) + + // we don't have pending Bundles nor ImageScans, we can remove the finalizer + nsName := types.NamespacedName{Name: gitrepo.Name, Namespace: gitrepo.Namespace} + err = retry.RetryOnConflict(retry.DefaultRetry, func() error { if err := r.Get(ctx, nsName, gitrepo); err != nil { return err } @@ -386,10 +415,10 @@ func (r *GitJobReconciler) cleanupGitRepo(ctx context.Context, logger logr.Logge }) if client.IgnoreNotFound(err) != nil { - return err + return ctrl.Result{}, err } - return nil + return ctrl.Result{}, nil } // shouldCreateJob checks if the conditions to create a new job are met. @@ -600,6 +629,29 @@ func (r *GitJobReconciler) managePollingJob(logger logr.Logger, gitrepo v1alpha1 return jobUpdatedOrCreated, nil } +func (r *GitJobReconciler) listBundlesForGitrepo(ctx context.Context, gitrepo *v1alpha1.GitRepo) (*v1alpha1.BundleList, error) { + list := &v1alpha1.BundleList{} + err := r.List(ctx, list, client.MatchingLabels{v1alpha1.RepoLabel: gitrepo.Name}, client.InNamespace(gitrepo.Namespace)) + if err != nil { + return nil, err + } + return list, nil +} + +func (r *GitJobReconciler) listImageScansForGitrepo(ctx context.Context, gitrepo *v1alpha1.GitRepo) (*v1alpha1.ImageScanList, error) { + list := &v1alpha1.ImageScanList{} + + if err := r.List(ctx, list, + client.InNamespace(gitrepo.Namespace), + client.MatchingFields{ + config.ImageScanGitRepoIndex: gitrepo.Name, + }, + ); err != nil { + return nil, err + } + return list, nil +} + func generationChanged(r *v1alpha1.GitRepo) bool { // checks if generation changed. // it ignores the case when Status.ObservedGeneration=0 because that's @@ -837,3 +889,27 @@ func getNextCommit(status v1alpha1.GitRepoStatus) string { return commit } + +func batchDeleteDependentResources(ctx context.Context, c client.Client, list client.ObjectList) error { + var errs []error + + _ = meta.EachListItem(list, func(obj runtime.Object) error { + o, ok := obj.(client.Object) + if !ok { + errs = append(errs, fmt.Errorf("item does not implement client.Object: %T", obj)) + return nil // continue iterating + } + if o.GetDeletionTimestamp() != nil { + // already being deleted + return nil + } + + if err := c.Delete(ctx, o); err != nil { + errs = append(errs, err) + } + + return nil // continue iterating no matter what + }) + + return errors.Join(errs...) +} diff --git a/internal/cmd/controller/operator.go b/internal/cmd/controller/operator.go index bd9e7368bb..730cc57973 100644 --- a/internal/cmd/controller/operator.go +++ b/internal/cmd/controller/operator.go @@ -9,17 +9,20 @@ import ( "github.com/rancher/fleet/internal/cmd" "github.com/rancher/fleet/internal/cmd/controller/reconciler" "github.com/rancher/fleet/internal/cmd/controller/target" + "github.com/rancher/fleet/internal/config" "github.com/rancher/fleet/internal/experimental" "github.com/rancher/fleet/internal/manifest" "github.com/rancher/fleet/internal/metrics" - "github.com/rancher/fleet/pkg/apis/fleet.cattle.io/v1alpha1" + fleet "github.com/rancher/fleet/pkg/apis/fleet.cattle.io/v1alpha1" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/healthz" + "sigs.k8s.io/controller-runtime/pkg/manager" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" ) @@ -29,7 +32,7 @@ var ( func init() { utilruntime.Must(clientgoscheme.AddToScheme(scheme)) - utilruntime.Must(v1alpha1.AddToScheme(scheme)) + utilruntime.Must(fleet.AddToScheme(scheme)) //+kubebuilder:scaffold:scheme } @@ -184,6 +187,22 @@ func start( } } + // Add an indexer for the ContentName label as that will make accesses in the cache + // faster + if err := AddContentNameLabelIndexer(ctx, mgr); err != nil { + return err + } + + if err = (&reconciler.ContentReconciler{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + ShardID: shardID, + Workers: workersOpts.Content, + }).SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "Content") + return err + } + //+kubebuilder:scaffold:builder if err := reconciler.Load(ctx, mgr.GetAPIReader(), systemNamespace); err != nil { @@ -216,3 +235,21 @@ func start( return nil } + +func AddContentNameLabelIndexer(ctx context.Context, mgr manager.Manager) error { + return mgr.GetFieldIndexer().IndexField( + ctx, + &fleet.BundleDeployment{}, + config.ContentNameIndex, + func(obj client.Object) []string { + content, ok := obj.(*fleet.BundleDeployment) + if !ok { + return nil + } + if val, exists := content.Labels[fleet.ContentNameLabel]; exists { + return []string{val} + } + return nil + }, + ) +} diff --git a/internal/cmd/controller/reconciler/bundle_controller.go b/internal/cmd/controller/reconciler/bundle_controller.go index 514af3e32b..9c9fd78c23 100644 --- a/internal/cmd/controller/reconciler/bundle_controller.go +++ b/internal/cmd/controller/reconciler/bundle_controller.go @@ -335,6 +335,15 @@ func (r *BundleReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr // Changes in the values hash trigger a bundle deployment reconcile. bd.Spec.ValuesHash = valuesHash + // When content resources are stored in etcd, we need to keep track of the content resource so they + // are properly gargabe-collected by the content controller. + if !contentsInOCI && !contentsInHelmChart { + if bd.Labels == nil { + bd.Labels = make(map[string]string) + } + bd.Labels[fleet.ContentNameLabel] = manifestID + } + helmvalues.ClearOptions(bd) // If there's already a bundledeployment for this target, track its UID @@ -348,10 +357,7 @@ func (r *BundleReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr op, bd, err := r.createBundleDeployment( ctx, logger, - bd, - contentsInOCI, - bundle.Spec.HelmOpOptions != nil, - manifestID) + bd) if err != nil { // We could end up here, because we cannot add a // finalizer to a content resource, which has a @@ -457,45 +463,11 @@ func (r *BundleReconciler) createBundleDeployment( ctx context.Context, l logr.Logger, bd *fleet.BundleDeployment, - contentsInOCI bool, - contentsInHelmChart bool, - manifestID string, ) (controllerutil.OperationResult, *fleet.BundleDeployment, error) { logger := l.WithValues("deploymentID", bd.Spec.DeploymentID) - // When content resources are stored in etcd, we need to add finalizers. - if !contentsInOCI && !contentsInHelmChart { - content := &fleet.Content{} - if err := r.Get(ctx, types.NamespacedName{Name: manifestID}, content); err != nil { - return controllerutil.OperationResultNone, nil, fmt.Errorf("failed to get content resource: %w", err) - } - - if added := controllerutil.AddFinalizer(content, bd.Name); added { - if err := r.Update(ctx, content); err != nil { - return controllerutil.OperationResultNone, nil, fmt.Errorf( - "could not add finalizer to content resource, thus cannot create/update bundledeployment: %w", - err, - ) - } - } - } - updated := bd.DeepCopy() op, err := controllerutil.CreateOrUpdate(ctx, r.Client, bd, func() error { - // When this mutation function is called by CreateOrUpdate, bd contains the - // _old_ bundle deployment, if any. - // The corresponding Content resource must only be deleted if it is no longer in use, ie if the - // latest version of the bundle points to a different deployment ID. - // An empty value for bd.Spec.DeploymentID means that we are deploying the first version of this - // bundle, hence there are no Contents left over to purge. - if (!bd.Spec.OCIContents || !contentsInHelmChart) && - bd.Spec.DeploymentID != "" && - bd.Spec.DeploymentID != updated.Spec.DeploymentID { - if err := finalize.PurgeContent(ctx, r.Client, bd.Name, bd.Spec.DeploymentID); err != nil { - logger.Error(err, "Reconcile failed to purge old content resource") - } - } - // check if there's any OCI secret that can be purged if err := maybePurgeOCIReferenceSecret(ctx, r.Client, bd, updated); err != nil { logger.Error(err, "Reconcile failed to purge old OCI reference secret") diff --git a/internal/cmd/controller/reconciler/bundle_controller_test.go b/internal/cmd/controller/reconciler/bundle_controller_test.go index f5b438a412..dae903371d 100644 --- a/internal/cmd/controller/reconciler/bundle_controller_test.go +++ b/internal/cmd/controller/reconciler/bundle_controller_test.go @@ -449,8 +449,6 @@ func TestReconcile_OptionsSecretCreateUpdateError(t *testing.T) { mockClient := mocks.NewMockK8sClient(mockCtrl) expectGetWithFinalizer(mockClient, bundle) - // No content creation/update expected, as options secret management happens before creating the BD - c.secretCalls(mockClient) // No expected status update (retryable error) @@ -532,8 +530,6 @@ func TestReconcile_OptionsSecretDeletionError(t *testing.T) { mockClient := mocks.NewMockK8sClient(mockCtrl) expectGetWithFinalizer(mockClient, bundle) - // No content creation/update expected, as options secret management happens before creating the BD - mockClient.EXPECT().Delete(gomock.Any(), gomock.AssignableToTypeOf(&corev1.Secret{}), gomock.Any()). Return(errors.New("something went wrong")) @@ -794,8 +790,6 @@ func TestReconcile_DownstreamObjectsHandlingError(t *testing.T) { mockClient := mocks.NewMockK8sClient(mockCtrl) expectGetWithFinalizer(mockClient, bundle) - expectContentCreationAndUpdate(mockClient) - // Options secret: deletion attempt in case it exists, as the bundle deployment's values hash is empty mockClient.EXPECT().Delete(gomock.Any(), gomock.AssignableToTypeOf(&corev1.Secret{}), gomock.Any()). Return(nil) @@ -1114,11 +1108,3 @@ func expectGetWithFinalizer(mockCli *mocks.MockK8sClient, bundle fleetv1.Bundle) }, ) } - -func expectContentCreationAndUpdate(mockCli *mocks.MockK8sClient) { - // Get content and update it, adding a finalizer, from createBundleDeployment - mockCli.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.AssignableToTypeOf(&fleetv1.Content{}), gomock.Any()). - Return(nil) - - mockCli.EXPECT().Update(gomock.Any(), gomock.AssignableToTypeOf(&fleetv1.Content{}), gomock.Any()).Return(nil) -} diff --git a/internal/cmd/controller/reconciler/bundledeployment_controller.go b/internal/cmd/controller/reconciler/bundledeployment_controller.go index 95134320b0..62c62e9c49 100644 --- a/internal/cmd/controller/reconciler/bundledeployment_controller.go +++ b/internal/cmd/controller/reconciler/bundledeployment_controller.go @@ -73,9 +73,6 @@ func (r *BundleDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Req // The bundle reconciler takes care of adding the finalizer when creating a bundle deployment if !bd.DeletionTimestamp.IsZero() { if controllerutil.ContainsFinalizer(bd, finalize.BundleDeploymentFinalizer) { - if err := finalize.PurgeContent(ctx, r.Client, bd.Name, bd.Spec.DeploymentID); err != nil { - return ctrl.Result{}, err - } err = retry.RetryOnConflict(retry.DefaultRetry, func() error { err := r.Get(ctx, req.NamespacedName, bd) if err != nil { diff --git a/internal/cmd/controller/reconciler/content_controller.go b/internal/cmd/controller/reconciler/content_controller.go new file mode 100644 index 0000000000..b4421ceb1b --- /dev/null +++ b/internal/cmd/controller/reconciler/content_controller.go @@ -0,0 +1,136 @@ +package reconciler + +import ( + "context" + + "github.com/rancher/fleet/internal/config" + fleet "github.com/rancher/fleet/pkg/apis/fleet.cattle.io/v1alpha1" + "github.com/rancher/fleet/pkg/sharding" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/predicate" +) + +// ContentReconciler reconciles a Content object +type ContentReconciler struct { + client.Client + Scheme *runtime.Scheme + ShardID string + + Workers int +} + +//+kubebuilder:rbac:groups=fleet.cattle.io,resources=contents,verbs=get;list;watch;update;patch;delete +//+kubebuilder:rbac:groups=fleet.cattle.io,resources=contents/status,verbs=get;update;patch +//+kubebuilder:rbac:groups=fleet.cattle.io,resources=bundledeployments,verbs=get;list;watch + +// SetupWithManager sets up the controller with the Manager. +func (r *ContentReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + Named("content"). // naming because this controller does not use For() + Watches( + &fleet.BundleDeployment{}, + handler.EnqueueRequestsFromMapFunc(r.mapBundleDeploymentToContent), + builder.WithPredicates( + // Only trigger for BundleDeployment changes that affect Content references + predicate.Funcs{ + CreateFunc: func(e event.CreateEvent) bool { + return true + }, + UpdateFunc: func(e event.UpdateEvent) bool { + newBD := e.ObjectNew.(*fleet.BundleDeployment) + oldBD := e.ObjectOld.(*fleet.BundleDeployment) + + // Reconcile if ContentNameLabel changes + contentNameChanged := (newBD.Labels != nil && newBD.Labels[fleet.ContentNameLabel] != "") && + (oldBD.Labels == nil || newBD.Labels[fleet.ContentNameLabel] != oldBD.Labels[fleet.ContentNameLabel]) + + return contentNameChanged + }, + DeleteFunc: func(e event.DeleteEvent) bool { + return true + }, + GenericFunc: func(e event.GenericEvent) bool { return false }, + }, + ), + ). + WithEventFilter(sharding.FilterByShardID(r.ShardID)). + WithOptions(controller.Options{MaxConcurrentReconciles: r.Workers}). + Complete(r) +} + +// Reconcile is part of the main kubernetes reconciliation loop which aims to +// move the current state of the cluster closer to the desired state. +func (r *ContentReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + logger := log.FromContext(ctx).WithName("content") + ctx = log.IntoContext(ctx, logger) + + content := &fleet.Content{} + if err := r.Get(ctx, req.NamespacedName, content); err != nil { + return ctrl.Result{}, client.IgnoreNotFound(err) + } + + // List all BundleDeployments that reference this Content resource + bdList := &fleet.BundleDeploymentList{} + err := r.List(ctx, bdList, client.MatchingFields{config.ContentNameIndex: content.Name}) + if err != nil { + logger.Error(err, "Failed to list BundleDeployments for Content resource") + return ctrl.Result{}, err + } + + newReferenceCount := 0 + for _, bd := range bdList.Items { + // Only count non-deleted BundleDeployments + if bd.DeletionTimestamp.IsZero() { + newReferenceCount++ + } + } + + // If the Content resource has no more references... delete it + if newReferenceCount == 0 && content.Status.ReferenceCount > 0 { + logger.V(1).Info("Content resource has no more references, deleting it") + return ctrl.Result{}, r.Delete(ctx, content) + } + + if content.Status.ReferenceCount != newReferenceCount { + logger.V(1).Info("Updating Content reference count", "oldCount", content.Status.ReferenceCount, "newCount", newReferenceCount) + orig := content.DeepCopy() + content.Status.ReferenceCount = newReferenceCount + if err := r.Status().Patch(ctx, content, client.MergeFrom(orig)); err != nil { + logger.Error(err, "Failed to update Content reference count status") + return ctrl.Result{}, err + } + } + + return ctrl.Result{}, nil +} + +// mapBundleDeploymentToContent maps a BundleDeployment to its associated Content resource. +func (r *ContentReconciler) mapBundleDeploymentToContent(ctx context.Context, obj client.Object) []ctrl.Request { + bd, ok := obj.(*fleet.BundleDeployment) + if !ok { + return nil + } + + contentName := bd.Labels[fleet.ContentNameLabel] + if contentName == "" { + return nil + } + + return []ctrl.Request{ + { + NamespacedName: types.NamespacedName{ + Name: contentName, + // Content resources are cluster-scoped, so namespace is empty + }, + }, + } +} diff --git a/internal/cmd/controller/reconciler/content_controller_test.go b/internal/cmd/controller/reconciler/content_controller_test.go new file mode 100644 index 0000000000..e7bdaaf7e5 --- /dev/null +++ b/internal/cmd/controller/reconciler/content_controller_test.go @@ -0,0 +1,217 @@ +package reconciler + +import ( + "context" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/rancher/fleet/internal/config" + fleet "github.com/rancher/fleet/pkg/apis/fleet.cattle.io/v1alpha1" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" +) + +var _ = Describe("ContentReconciler", func() { + var ( + ctx context.Context + sch *runtime.Scheme + cl client.Client + r *ContentReconciler + content *fleet.Content + ) + + BeforeEach(func() { + ctx = context.Background() + sch = runtime.NewScheme() + Expect(clientgoscheme.AddToScheme(sch)).To(Succeed()) + Expect(fleet.AddToScheme(sch)).To(Succeed()) + }) + + Describe("mapBundleDeploymentToContent", func() { + It("returns nil for non-BundleDeployment objects", func() { + reconciler := &ContentReconciler{} + Expect(reconciler.mapBundleDeploymentToContent(ctx, &fleet.Content{})).To(BeNil()) + }) + + It("returns nil for BundleDeployment without content label", func() { + reconciler := &ContentReconciler{} + bd := &fleet.BundleDeployment{} + Expect(reconciler.mapBundleDeploymentToContent(ctx, bd)).To(BeNil()) + }) + + It("returns nil for BundleDeployment with a label using an empty name", func() { + reconciler := &ContentReconciler{} + bd := &fleet.BundleDeployment{} + bd.Labels = map[string]string{fleet.ContentNameLabel: ""} + Expect(reconciler.mapBundleDeploymentToContent(ctx, bd)).To(BeNil()) + }) + + It("maps BundleDeployment with label to a single content request", func() { + reconciler := &ContentReconciler{} + bd := &fleet.BundleDeployment{} + name := "my-content" + bd.Labels = map[string]string{fleet.ContentNameLabel: name} + res := reconciler.mapBundleDeploymentToContent(ctx, bd) + Expect(res).To(HaveLen(1)) + Expect(res[0].NamespacedName.Name).To(Equal(name)) + Expect(res[0].NamespacedName.Namespace).To(Equal("")) + }) + }) + + Describe("Reconcile", func() { + JustBeforeEach(func() { + // default fake client built by each test case as needed + r = &ContentReconciler{Client: cl, Scheme: sch} + }) + + Context("when Content is not present", func() { + BeforeEach(func() { + cl = fake.NewClientBuilder().WithScheme(sch).Build() + }) + + It("should not return an error for a non-existent Content", func() { + _, err := r.Reconcile(ctx, ctrl.Request{NamespacedName: client.ObjectKey{Name: "does-not-exist"}}) + Expect(err).ToNot(HaveOccurred()) + }) + }) + + Context("when Content has no referencing BundleDeployments", func() { + BeforeEach(func() { + content = &fleet.Content{ + ObjectMeta: metav1.ObjectMeta{Name: "content-to-delete"}, + Status: fleet.ContentStatus{ReferenceCount: 1}, + } + cl = fake.NewClientBuilder().WithScheme(sch). + WithIndex(&fleet.BundleDeployment{}, config.ContentNameIndex, func(obj client.Object) []string { + bd, ok := obj.(*fleet.BundleDeployment) + if !ok { + return nil + } + if val, exists := bd.Labels[fleet.ContentNameLabel]; exists { + return []string{val} + } + return nil + }). + WithObjects(content). + Build() + }) + + It("deletes the content when there are no non-deleted BundleDeployments", func() { + // ensure content exists before reconcile + pre := &fleet.Content{} + Expect(cl.Get(ctx, client.ObjectKey{Name: content.Name}, pre)).To(Succeed()) + + _, err := r.Reconcile(ctx, ctrl.Request{NamespacedName: client.ObjectKey{Name: content.Name}}) + Expect(err).ToNot(HaveOccurred()) + + // content should be gone + got := &fleet.ContentList{} + Expect(cl.List(ctx, got)).To(Succeed()) + Expect(got.Items).To(BeEmpty()) + }) + }) + + Context("when there are referencing BundleDeployments", func() { + BeforeEach(func() { + content = &fleet.Content{ + ObjectMeta: metav1.ObjectMeta{Name: "content-1"}, + Status: fleet.ContentStatus{ReferenceCount: 0}, + } + + bd := &fleet.BundleDeployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "bd-1", + Namespace: "default", + Labels: map[string]string{fleet.ContentNameLabel: content.Name}, + }, + } + + cl = fake.NewClientBuilder().WithScheme(sch). + WithIndex(&fleet.BundleDeployment{}, config.ContentNameIndex, func(obj client.Object) []string { + bd, ok := obj.(*fleet.BundleDeployment) + if !ok { + return nil + } + if val, exists := bd.Labels[fleet.ContentNameLabel]; exists { + return []string{val} + } + return nil + }). + WithObjects(content, bd). + WithStatusSubresource(&fleet.Content{}). + Build() + }) + + It("updates the Content reference count to match non-deleted BundleDeployments", func() { + _, err := r.Reconcile(ctx, ctrl.Request{NamespacedName: client.ObjectKey{Name: content.Name}}) + Expect(err).ToNot(HaveOccurred()) + + gotList := &fleet.ContentList{} + Expect(cl.List(ctx, gotList)).To(Succeed()) + Expect(gotList.Items).To(HaveLen(1)) + Expect(gotList.Items[0].Status.ReferenceCount).To(Equal(1)) + + // re-run reconcile and ensure the count remains the same + _, err = r.Reconcile(ctx, ctrl.Request{NamespacedName: client.ObjectKey{Name: content.Name}}) + Expect(err).ToNot(HaveOccurred()) + + gotList2 := &fleet.ContentList{} + Expect(cl.List(ctx, gotList2)).To(Succeed()) + Expect(gotList2.Items).To(HaveLen(1)) + Expect(gotList2.Items[0].Status.ReferenceCount).To(Equal(1)) + }) + }) + + Context("when BundleDeployments referencing the content are marked for deletion", func() { + BeforeEach(func() { + content = &fleet.Content{ + ObjectMeta: metav1.ObjectMeta{Name: "content-2"}, + Status: fleet.ContentStatus{ReferenceCount: 0}, + } + + deletionTime := metav1.NewTime(time.Now()) + bd := &fleet.BundleDeployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "bd-deleted", + Namespace: "default", + Labels: map[string]string{fleet.ContentNameLabel: content.Name}, + DeletionTimestamp: &deletionTime, + Finalizers: []string{"test.finalizer"}, + }, + } + + cl = fake.NewClientBuilder().WithScheme(sch). + WithIndex(&fleet.BundleDeployment{}, config.ContentNameIndex, func(obj client.Object) []string { + bd, ok := obj.(*fleet.BundleDeployment) + if !ok { + return nil + } + if val, exists := bd.Labels[fleet.ContentNameLabel]; exists { + return []string{val} + } + return nil + }). + WithObjects(content, bd). + WithStatusSubresource(&fleet.Content{}). + Build() + }) + + It("ignores deleted BundleDeployments when counting references", func() { + _, err := r.Reconcile(ctx, ctrl.Request{NamespacedName: client.ObjectKey{Name: content.Name}}) + Expect(err).ToNot(HaveOccurred()) + + gotList := &fleet.ContentList{} + Expect(cl.List(ctx, gotList)).To(Succeed()) + Expect(gotList.Items).To(HaveLen(1)) + Expect(gotList.Items[0].Status.ReferenceCount).To(Equal(0)) + }) + }) + }) +}) diff --git a/internal/cmd/controller/root.go b/internal/cmd/controller/root.go index 22615c46e3..9d08d670cc 100644 --- a/internal/cmd/controller/root.go +++ b/internal/cmd/controller/root.go @@ -40,6 +40,7 @@ type ControllerReconcilerWorkers struct { ClusterGroup int ImageScan int Schedule int + Content int } type BindAddresses struct { @@ -134,6 +135,14 @@ func (f *FleetController) Run(cmd *cobra.Command, args []string) error { workersOpts.Schedule = w } + if d := os.Getenv("CONTENT_RECONCILER_WORKERS"); d != "" { + w, err := strconv.Atoi(d) + if err != nil { + setupLog.Error(err, "failed to parse CONTENT_RECONCILER_WORKERS", "value", d) + } + workersOpts.Content = w + } + go func() { log.Println(http.ListenAndServe("localhost:6060", nil)) //nolint:gosec // Debugging only }() diff --git a/internal/config/config.go b/internal/config/config.go index d694b13b17..9f944802e9 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -7,6 +7,7 @@ import ( "sync" "time" + fleet "github.com/rancher/fleet/pkg/apis/fleet.cattle.io/v1alpha1" corev1 "github.com/rancher/wrangler/v3/pkg/generated/controllers/core/v1" v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -48,6 +49,15 @@ const ( // Default secret name for oci storage, // used as a fallback if no secret is specified by the user in the GitRepo. DefaultOCIStorageSecretName = "ocistorage" + + // ContentNameIndex is the name of the index for the content name label in bundle deployments + ContentNameIndex = "metadata.labels." + fleet.ContentNameLabel + + // RepoNameIndex is the name of the index for the gitrepo name in bundles + RepoNameIndex = "metadata.labels." + fleet.RepoLabel + + // ImageScanGitRepoIndex is the name of the index for the gitrepo name in imagescans + ImageScanGitRepoIndex = "spec.gitrepoName" ) var ( diff --git a/pkg/apis/fleet.cattle.io/v1alpha1/bundledeployment_types.go b/pkg/apis/fleet.cattle.io/v1alpha1/bundledeployment_types.go index 174a26e09f..729d64b4c4 100644 --- a/pkg/apis/fleet.cattle.io/v1alpha1/bundledeployment_types.go +++ b/pkg/apis/fleet.cattle.io/v1alpha1/bundledeployment_types.go @@ -27,6 +27,7 @@ const ( SecretTypeBundleDeploymentOptions = "fleet.cattle.io/bundle-deployment/v1alpha1" BundleDeploymentOwnershipLabel = "fleet.cattle.io/bundledeployment" + ContentNameLabel = "fleet.cattle.io/content-name" ) const IgnoreOp = "ignore" diff --git a/pkg/apis/fleet.cattle.io/v1alpha1/content_types.go b/pkg/apis/fleet.cattle.io/v1alpha1/content_types.go index 416dff76d5..2271952a02 100644 --- a/pkg/apis/fleet.cattle.io/v1alpha1/content_types.go +++ b/pkg/apis/fleet.cattle.io/v1alpha1/content_types.go @@ -30,7 +30,15 @@ type Content struct { Content []byte `json:"content,omitempty"` // SHA256Sum of the Content field - SHA256Sum string `json:"sha256sum,omitempty"` + SHA256Sum string `json:"sha256sum,omitempty"` // SHA256Sum of the Content field + Status ContentStatus `json:"status,omitempty"` // +optional +} + +// ContentStatus defines the observed state of Content +type ContentStatus struct { + // ReferenceCount is the number of BundleDeployments that currently reference this Content resource. + // +optional + ReferenceCount int `json:"referenceCount,omitempty"` } // +kubebuilder:object:root=true diff --git a/pkg/apis/fleet.cattle.io/v1alpha1/gitrepo_types.go b/pkg/apis/fleet.cattle.io/v1alpha1/gitrepo_types.go index e292c6759e..7cb8d8cfab 100644 --- a/pkg/apis/fleet.cattle.io/v1alpha1/gitrepo_types.go +++ b/pkg/apis/fleet.cattle.io/v1alpha1/gitrepo_types.go @@ -8,15 +8,13 @@ func init() { InternalSchemeBuilder.Register(&GitRepo{}, &GitRepoList{}) } -var ( +const ( CommitLabel = "fleet.cattle.io/commit" RepoLabel = "fleet.cattle.io/repo-name" BundleLabel = "fleet.cattle.io/bundle-name" BundleNamespaceLabel = "fleet.cattle.io/bundle-namespace" CreatedByUserIDLabel = "fleet.cattle.io/created-by-user-id" -) -const ( GitRepoAcceptedCondition = "Accepted" ) diff --git a/pkg/apis/fleet.cattle.io/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/fleet.cattle.io/v1alpha1/zz_generated.deepcopy.go index f7c0074ec6..b12fd8b0fa 100644 --- a/pkg/apis/fleet.cattle.io/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/fleet.cattle.io/v1alpha1/zz_generated.deepcopy.go @@ -1236,6 +1236,7 @@ func (in *Content) DeepCopyInto(out *Content) { *out = make([]byte, len(*in)) copy(*out, *in) } + out.Status = in.Status } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Content. @@ -1288,6 +1289,21 @@ func (in *ContentList) DeepCopyObject() runtime.Object { return nil } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ContentStatus) DeepCopyInto(out *ContentStatus) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ContentStatus. +func (in *ContentStatus) DeepCopy() *ContentStatus { + if in == nil { + return nil + } + out := new(ContentStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *CorrectDrift) DeepCopyInto(out *CorrectDrift) { *out = *in diff --git a/pkg/generated/controllers/fleet.cattle.io/v1alpha1/content.go b/pkg/generated/controllers/fleet.cattle.io/v1alpha1/content.go index 84647fa647..0a0ab9728a 100644 --- a/pkg/generated/controllers/fleet.cattle.io/v1alpha1/content.go +++ b/pkg/generated/controllers/fleet.cattle.io/v1alpha1/content.go @@ -19,8 +19,19 @@ limitations under the License. package v1alpha1 import ( + "context" + "sync" + "time" + v1alpha1 "github.com/rancher/fleet/pkg/apis/fleet.cattle.io/v1alpha1" + "github.com/rancher/wrangler/v3/pkg/apply" + "github.com/rancher/wrangler/v3/pkg/condition" "github.com/rancher/wrangler/v3/pkg/generic" + "github.com/rancher/wrangler/v3/pkg/kv" + "k8s.io/apimachinery/pkg/api/equality" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" ) // ContentController interface for managing Content resources. @@ -37,3 +48,161 @@ type ContentClient interface { type ContentCache interface { generic.NonNamespacedCacheInterface[*v1alpha1.Content] } + +// ContentStatusHandler is executed for every added or modified Content. Should return the new status to be updated +type ContentStatusHandler func(obj *v1alpha1.Content, status v1alpha1.ContentStatus) (v1alpha1.ContentStatus, error) + +// ContentGeneratingHandler is the top-level handler that is executed for every Content event. It extends ContentStatusHandler by a returning a slice of child objects to be passed to apply.Apply +type ContentGeneratingHandler func(obj *v1alpha1.Content, status v1alpha1.ContentStatus) ([]runtime.Object, v1alpha1.ContentStatus, error) + +// RegisterContentStatusHandler configures a ContentController to execute a ContentStatusHandler for every events observed. +// If a non-empty condition is provided, it will be updated in the status conditions for every handler execution +func RegisterContentStatusHandler(ctx context.Context, controller ContentController, condition condition.Cond, name string, handler ContentStatusHandler) { + statusHandler := &contentStatusHandler{ + client: controller, + condition: condition, + handler: handler, + } + controller.AddGenericHandler(ctx, name, generic.FromObjectHandlerToHandler(statusHandler.sync)) +} + +// RegisterContentGeneratingHandler configures a ContentController to execute a ContentGeneratingHandler for every events observed, passing the returned objects to the provided apply.Apply. +// If a non-empty condition is provided, it will be updated in the status conditions for every handler execution +func RegisterContentGeneratingHandler(ctx context.Context, controller ContentController, apply apply.Apply, + condition condition.Cond, name string, handler ContentGeneratingHandler, opts *generic.GeneratingHandlerOptions) { + statusHandler := &contentGeneratingHandler{ + ContentGeneratingHandler: handler, + apply: apply, + name: name, + gvk: controller.GroupVersionKind(), + } + if opts != nil { + statusHandler.opts = *opts + } + controller.OnChange(ctx, name, statusHandler.Remove) + RegisterContentStatusHandler(ctx, controller, condition, name, statusHandler.Handle) +} + +type contentStatusHandler struct { + client ContentClient + condition condition.Cond + handler ContentStatusHandler +} + +// sync is executed on every resource addition or modification. Executes the configured handlers and sends the updated status to the Kubernetes API +func (a *contentStatusHandler) sync(key string, obj *v1alpha1.Content) (*v1alpha1.Content, error) { + if obj == nil { + return obj, nil + } + + origStatus := obj.Status.DeepCopy() + obj = obj.DeepCopy() + newStatus, err := a.handler(obj, obj.Status) + if err != nil { + // Revert to old status on error + newStatus = *origStatus.DeepCopy() + } + + if a.condition != "" { + if errors.IsConflict(err) { + a.condition.SetError(&newStatus, "", nil) + } else { + a.condition.SetError(&newStatus, "", err) + } + } + if !equality.Semantic.DeepEqual(origStatus, &newStatus) { + if a.condition != "" { + // Since status has changed, update the lastUpdatedTime + a.condition.LastUpdated(&newStatus, time.Now().UTC().Format(time.RFC3339)) + } + + var newErr error + obj.Status = newStatus + newObj, newErr := a.client.UpdateStatus(obj) + if err == nil { + err = newErr + } + if newErr == nil { + obj = newObj + } + } + return obj, err +} + +type contentGeneratingHandler struct { + ContentGeneratingHandler + apply apply.Apply + opts generic.GeneratingHandlerOptions + gvk schema.GroupVersionKind + name string + seen sync.Map +} + +// Remove handles the observed deletion of a resource, cascade deleting every associated resource previously applied +func (a *contentGeneratingHandler) Remove(key string, obj *v1alpha1.Content) (*v1alpha1.Content, error) { + if obj != nil { + return obj, nil + } + + obj = &v1alpha1.Content{} + obj.Namespace, obj.Name = kv.RSplit(key, "/") + obj.SetGroupVersionKind(a.gvk) + + if a.opts.UniqueApplyForResourceVersion { + a.seen.Delete(key) + } + + return nil, generic.ConfigureApplyForObject(a.apply, obj, &a.opts). + WithOwner(obj). + WithSetID(a.name). + ApplyObjects() +} + +// Handle executes the configured ContentGeneratingHandler and pass the resulting objects to apply.Apply, finally returning the new status of the resource +func (a *contentGeneratingHandler) Handle(obj *v1alpha1.Content, status v1alpha1.ContentStatus) (v1alpha1.ContentStatus, error) { + if !obj.DeletionTimestamp.IsZero() { + return status, nil + } + + objs, newStatus, err := a.ContentGeneratingHandler(obj, status) + if err != nil { + return newStatus, err + } + if !a.isNewResourceVersion(obj) { + return newStatus, nil + } + + err = generic.ConfigureApplyForObject(a.apply, obj, &a.opts). + WithOwner(obj). + WithSetID(a.name). + ApplyObjects(objs...) + if err != nil { + return newStatus, err + } + a.storeResourceVersion(obj) + return newStatus, nil +} + +// isNewResourceVersion detects if a specific resource version was already successfully processed. +// Only used if UniqueApplyForResourceVersion is set in generic.GeneratingHandlerOptions +func (a *contentGeneratingHandler) isNewResourceVersion(obj *v1alpha1.Content) bool { + if !a.opts.UniqueApplyForResourceVersion { + return true + } + + // Apply once per resource version + key := obj.Namespace + "/" + obj.Name + previous, ok := a.seen.Load(key) + return !ok || previous != obj.ResourceVersion +} + +// storeResourceVersion keeps track of the latest resource version of an object for which Apply was executed +// Only used if UniqueApplyForResourceVersion is set in generic.GeneratingHandlerOptions +func (a *contentGeneratingHandler) storeResourceVersion(obj *v1alpha1.Content) { + if !a.opts.UniqueApplyForResourceVersion { + return + } + + key := obj.Namespace + "/" + obj.Name + a.seen.Store(key, obj.ResourceVersion) +}