diff --git a/pkg/reconciler/internal/updater/updater.go b/pkg/reconciler/internal/updater/updater.go index 4c4bd31..0908748 100644 --- a/pkg/reconciler/internal/updater/updater.go +++ b/pkg/reconciler/internal/updater/updater.go @@ -18,7 +18,10 @@ package updater import ( "context" + "fmt" + "reflect" + "github.com/go-logr/logr" "helm.sh/helm/v3/pkg/release" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -33,17 +36,26 @@ import ( "github.com/operator-framework/helm-operator-plugins/pkg/internal/status" ) -func New(client client.Client) Updater { +func New(client client.Client, logger logr.Logger) Updater { + logger = logger.WithName("updater") return Updater{ client: client, + logger: logger, } } type Updater struct { isCanceled bool client client.Client + logger logr.Logger updateFuncs []UpdateFunc updateStatusFuncs []UpdateStatusFunc + + enableAggressiveConflictResolution bool +} + +func (u *Updater) EnableAggressiveConflictResolution() { + u.enableAggressiveConflictResolution = true } type UpdateFunc func(*unstructured.Unstructured) bool @@ -83,54 +95,143 @@ func isRetryableUpdateError(err error) bool { // retryOnRetryableUpdateError retries the given function until it succeeds, // until the given backoff is exhausted, or until the error is not retryable. // -// In case of a Conflict error, the update cannot be retried because the underlying -// resource has been modified in the meantime, and the reconciliation loop needs -// to be restarted anew. +// In case of a Conflict error, the update is not retried by default because the +// underlying resource has been modified in the meantime, and the reconciliation loop +// needs to be restarted anew. However, when aggressive conflict resolution is enabled, +// the updater attempts to refresh the object from the cluster and retry if it's safe +// to do so (e.g., when only the status has changed). // // A NotFound error means that the object has been deleted, and the reconciliation loop // needs to be restarted anew as well. -func retryOnRetryableUpdateError(backoff wait.Backoff, f func() error) error { - return retry.OnError(backoff, isRetryableUpdateError, f) +func retryOnRetryableUpdateError(backoff wait.Backoff, f func(attemptNum uint) error) error { + var attemptNum uint = 1 + countingF := func() error { + err := f(attemptNum) + attemptNum++ + return err + } + return retry.OnError(backoff, isRetryableUpdateError, countingF) } -func (u *Updater) Apply(ctx context.Context, obj *unstructured.Unstructured) error { +func (u *Updater) Apply(ctx context.Context, baseObj *unstructured.Unstructured) error { if u.isCanceled { return nil } backoff := retry.DefaultRetry - st := statusFor(obj) - needsStatusUpdate := false - for _, f := range u.updateStatusFuncs { - needsStatusUpdate = f(st) || needsStatusUpdate - } - // Always update the status first. During uninstall, if // we remove the finalizer, updating the status will fail // because the object and its status will be garbage-collected. - if needsStatusUpdate { + err := retryOnRetryableUpdateError(backoff, func(attemptNumber uint) error { + // Note that st will also include all status conditions, also those not managed by helm-operator. + obj := baseObj.DeepCopy() + st := statusFor(obj) + needsStatusUpdate := false + for _, f := range u.updateStatusFuncs { + needsStatusUpdate = f(st) || needsStatusUpdate + } + + if !needsStatusUpdate { + return nil + } st.updateStatusObject() obj.Object["status"] = st.StatusObject - if err := retryOnRetryableUpdateError(backoff, func() error { - return u.client.Status().Update(ctx, obj) - }); err != nil { - return err + + if attemptNumber > 1 { + u.logger.V(1).Info("Retrying status update", "attempt", attemptNumber) + } + updateErr := u.client.Status().Update(ctx, obj) + if errors.IsConflict(updateErr) && u.enableAggressiveConflictResolution { + u.logger.V(1).Info("Status update conflict detected") + resolved, resolveErr := u.tryRefresh(ctx, baseObj, isSafeForStatusUpdate) + if resolveErr != nil { + return resolveErr + } + if !resolved { + return updateErr + } + return fmt.Errorf("status update conflict") // retriable error. + } else if updateErr != nil { + return updateErr } + baseObj.Object = obj.Object + return nil + }) + if err != nil { + return err } - needsUpdate := false - for _, f := range u.updateFuncs { - needsUpdate = f(obj) || needsUpdate - } - if needsUpdate { - if err := retryOnRetryableUpdateError(backoff, func() error { - return u.client.Update(ctx, obj) - }); err != nil { - return err + err = retryOnRetryableUpdateError(backoff, func(attemptNumber uint) error { + obj := baseObj.DeepCopy() + needsUpdate := false + for _, f := range u.updateFuncs { + needsUpdate = f(obj) || needsUpdate + } + if !needsUpdate { + return nil } + if attemptNumber > 1 { + u.logger.V(1).Info("Retrying update", "attempt", attemptNumber) + } + updateErr := u.client.Update(ctx, obj) + if errors.IsConflict(updateErr) && u.enableAggressiveConflictResolution { + u.logger.V(1).Info("Update conflict detected") + resolved, resolveErr := u.tryRefresh(ctx, baseObj, isSafeForUpdate) + if resolveErr != nil { + return resolveErr + } + if !resolved { + return updateErr + } + return fmt.Errorf("update conflict due to externally-managed status conditions") // retriable error. + } else if updateErr != nil { + return updateErr + } + baseObj.Object = obj.Object + return nil + }) + + return err +} + +func isSafeForStatusUpdate(_ logr.Logger, _ *unstructured.Unstructured, _ *unstructured.Unstructured) bool { + return true +} + +func isSafeForUpdate(logger logr.Logger, inMemory *unstructured.Unstructured, onCluster *unstructured.Unstructured) bool { + if !reflect.DeepEqual(inMemory.Object["spec"], onCluster.Object["spec"]) { + // Diff in object spec. Nothing we can do about it -> Fail. + logger.V(1).Info("Not refreshing object due to spec mismatch", + "namespace", inMemory.GetNamespace(), + "name", inMemory.GetName(), + "gkv", inMemory.GroupVersionKind(), + ) + return false + } + return true +} + +func (u *Updater) tryRefresh(ctx context.Context, obj *unstructured.Unstructured, isSafe func(logger logr.Logger, inMemory *unstructured.Unstructured, onCluster *unstructured.Unstructured) bool) (bool, error) { + // Re-fetch object with client. + current := &unstructured.Unstructured{} + current.SetGroupVersionKind(obj.GroupVersionKind()) + objectKey := client.ObjectKeyFromObject(obj) + if err := u.client.Get(ctx, objectKey, current); err != nil { + err = fmt.Errorf("refreshing object %s/%s: %w", objectKey.Namespace, objectKey.Name, err) + return false, err } - return nil + + if !isSafe(u.logger, obj, current) { + return false, nil + } + + obj.Object = current.Object + u.logger.V(1).Info("Refreshed object", + "namespace", objectKey.Namespace, + "name", objectKey.Name, + "gvk", obj.GroupVersionKind()) + return true, nil } func RemoveFinalizer(finalizer string) UpdateFunc { diff --git a/pkg/reconciler/internal/updater/updater_test.go b/pkg/reconciler/internal/updater/updater_test.go index a4f8c3a..6fdfdc8 100644 --- a/pkg/reconciler/internal/updater/updater_test.go +++ b/pkg/reconciler/internal/updater/updater_test.go @@ -20,6 +20,7 @@ import ( "context" "errors" + "github.com/go-logr/logr" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -27,11 +28,14 @@ import ( corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" "sigs.k8s.io/controller-runtime/pkg/client/interceptor" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + pkgStatus "github.com/operator-framework/helm-operator-plugins/pkg/internal/status" "github.com/operator-framework/helm-operator-plugins/pkg/reconciler/internal/conditions" ) @@ -41,6 +45,10 @@ const ( replicasStatus = int64(5) ) +var ( + errTransient = errors.New("transient error") +) + var _ = Describe("Updater", func() { var ( cl client.Client @@ -51,7 +59,7 @@ var _ = Describe("Updater", func() { JustBeforeEach(func() { cl = fake.NewClientBuilder().WithInterceptorFuncs(interceptorFuncs).Build() - u = New(cl) + u = New(cl, logr.Discard()) obj = &unstructured.Unstructured{Object: map[string]interface{}{ "apiVersion": "apps/v1", "kind": "Deployment", @@ -85,7 +93,7 @@ var _ = Describe("Updater", func() { interceptorFuncs.SubResourceUpdate = func(ctx context.Context, interceptorClient client.Client, subResourceName string, obj client.Object, opts ...client.SubResourceUpdateOption) error { updateCallCount++ if updateCallCount == 1 { - return errors.New("transient error") + return errTransient } return interceptorClient.SubResource(subResourceName).Update(ctx, obj, opts...) } @@ -177,9 +185,106 @@ var _ = Describe("Updater", func() { Expect(found).To(BeTrue()) Expect(err).To(Succeed()) }) + + It("should add a finalizer", func() { + u.Update(func(u *unstructured.Unstructured) bool { + return controllerutil.AddFinalizer(u, testFinalizer) + }) + Expect(u.Apply(context.TODO(), obj)).To(Succeed()) + Expect(cl.Get(context.TODO(), types.NamespacedName{Namespace: "testNamespace", Name: "testDeployment"}, obj)).To(Succeed()) + Expect(obj.GetFinalizers()).To(ContainElement(testFinalizer)) + }) + + It("should remove a finalizer", func() { + obj.SetFinalizers([]string{testFinalizer}) + Expect(cl.Update(context.TODO(), obj)).To(Succeed()) + + u.Update(func(u *unstructured.Unstructured) bool { + return controllerutil.RemoveFinalizer(u, testFinalizer) + }) + Expect(u.Apply(context.TODO(), obj)).To(Succeed()) + Expect(cl.Get(context.TODO(), types.NamespacedName{Namespace: "testNamespace", Name: "testDeployment"}, obj)).To(Succeed()) + Expect(obj.GetFinalizers()).ToNot(ContainElement(testFinalizer)) + }) + It("should preserve a finalizer when removing a different one", func() { + otherFinalizer := "otherFinalizer" + obj.SetFinalizers([]string{testFinalizer, otherFinalizer}) + Expect(cl.Update(context.TODO(), obj)).To(Succeed()) + + u.Update(func(u *unstructured.Unstructured) bool { + return controllerutil.RemoveFinalizer(u, testFinalizer) + }) + Expect(u.Apply(context.TODO(), obj)).To(Succeed()) + Expect(cl.Get(context.TODO(), types.NamespacedName{Namespace: "testNamespace", Name: "testDeployment"}, obj)).To(Succeed()) + Expect(obj.GetFinalizers()).ToNot(ContainElement(testFinalizer)) + Expect(obj.GetFinalizers()).To(ContainElement(otherFinalizer)) + }) + Context("with aggressive conflict resolution enabled", func() { + It("should preserve a finalizer when removing a different one", func() { + otherFinalizer := "otherFinalizer" + obj.SetFinalizers([]string{testFinalizer, otherFinalizer}) + Expect(cl.Update(context.TODO(), obj)).To(Succeed()) + u.EnableAggressiveConflictResolution() + + u.Update(func(u *unstructured.Unstructured) bool { + return controllerutil.RemoveFinalizer(u, testFinalizer) + }) + Expect(u.Apply(context.TODO(), obj)).To(Succeed()) + Expect(cl.Get(context.TODO(), types.NamespacedName{Namespace: "testNamespace", Name: "testDeployment"}, obj)).To(Succeed()) + Expect(obj.GetFinalizers()).ToNot(ContainElement(testFinalizer)) + Expect(obj.GetFinalizers()).To(ContainElement(otherFinalizer)) + }) + }) + Context("when in-cluster object has been updated", func() { + JustBeforeEach(func() { + // Add external status condition on cluster. + clusterObj := obj.DeepCopy() + unknownCondition := map[string]interface{}{ + "type": "UnknownCondition", + "status": string(corev1.ConditionTrue), + "reason": "ExternallyManaged", + } + Expect(unstructured.SetNestedSlice(clusterObj.Object, []interface{}{unknownCondition}, "status", "conditions")).To(Succeed()) + err := retryOnceOnTransientError(func() error { + return cl.Status().Update(context.TODO(), clusterObj) + }) + Expect(err).ToNot(HaveOccurred()) + }) + + It("should preserve unknown status conditions", func() { + // Add status condition using updater. + u.UpdateStatus(EnsureCondition(conditions.Deployed(corev1.ConditionTrue, "", ""))) + u.EnableAggressiveConflictResolution() + Expect(u.Apply(context.TODO(), obj)).To(Succeed()) + // Retrieve object from cluster and extract status conditions. + Expect(cl.Get(context.TODO(), types.NamespacedName{Namespace: "testNamespace", Name: "testDeployment"}, obj)).To(Succeed()) + objConditionsSlice, _, err := unstructured.NestedSlice(obj.Object, "status", "conditions") + Expect(err).ToNot(HaveOccurred()) + objConditions := conditionsFromUnstructured(objConditionsSlice) + // Verify both status conditions are present. + Expect(objConditions.IsTrueFor(pkgStatus.ConditionType("UnknownCondition"))).To(BeTrue()) + Expect(objConditions.IsTrueFor(pkgStatus.ConditionType("Deployed"))).To(BeTrue()) + }) + + It("should fail on conflict without aggressive resolution", func() { + // Add status condition using updater. + u.UpdateStatus(EnsureCondition(conditions.Deployed(corev1.ConditionTrue, "", ""))) + err := u.Apply(context.TODO(), obj) + // Verify conflict error is returned. + Expect(apierrors.IsConflict(err)).To(BeTrue()) + }) + }) }) }) +func retryOnceOnTransientError(f func() error) error { + err := f() + if errors.Is(err, errTransient) { + err = f() + } + return err +} + var _ = Describe("RemoveFinalizer", func() { var obj *unstructured.Unstructured @@ -325,4 +430,32 @@ var _ = Describe("statusFor", func() { obj.Object["status"] = "hello" Expect(statusFor(obj)).To(Equal(&helmAppStatus{})) }) + + It("should handle unknown status conditions", func() { + uSt := map[string]interface{}{ + "conditions": []interface{}{ + map[string]interface{}{ + "type": "UnknownCondition", + "status": string(corev1.ConditionTrue), + }, + }, + } + obj.Object["status"] = uSt + status := statusFor(obj) + Expect(status).ToNot(BeNil()) + Expect(status.Conditions.IsTrueFor(pkgStatus.ConditionType("UnknownCondition"))).To(BeTrue()) + }) }) + +func conditionsFromUnstructured(conditionsSlice []interface{}) pkgStatus.Conditions { + conditions := make(pkgStatus.Conditions, 0, len(conditionsSlice)) + for _, c := range conditionsSlice { + condMap, ok := c.(map[string]interface{}) + Expect(ok).To(BeTrue(), "condition is not a map[string]interface{}") + cond := pkgStatus.Condition{} + err := runtime.DefaultUnstructuredConverter.FromUnstructured(condMap, &cond) + Expect(err).ToNot(HaveOccurred(), "failed to convert status condition from unstructured") + conditions = append(conditions, cond) + } + return conditions +} diff --git a/pkg/reconciler/reconciler.go b/pkg/reconciler/reconciler.go index db92150..8d7597c 100644 --- a/pkg/reconciler/reconciler.go +++ b/pkg/reconciler/reconciler.go @@ -100,6 +100,8 @@ type Reconciler struct { upgradeAnnotations map[string]annotation.Upgrade uninstallAnnotations map[string]annotation.Uninstall pauseReconcileAnnotation string + + enableAggressiveConflictResolution bool } // New creates a new Reconciler that reconciles custom resources that define a @@ -617,6 +619,13 @@ func WithControllerSetupFunc(f ControllerSetupFunc) Option { } } +func WithAggressiveConflictResolution(enabled bool) Option { + return func(r *Reconciler) error { + r.enableAggressiveConflictResolution = enabled + return nil + } +} + // ControllerSetup allows restricted access to the Controller using the WithControllerSetupFunc option. // Currently, the only supposed configuration is adding additional watchers do the controller. type ControllerSetup interface { @@ -688,7 +697,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Re } } - u := updater.New(r.client) + u := r.newUpdater(log) defer func() { applyErr := u.Apply(ctx, obj) if err == nil && !apierrors.IsNotFound(applyErr) { @@ -841,6 +850,14 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Re return ctrl.Result{RequeueAfter: r.reconcilePeriod}, nil } +func (r *Reconciler) newUpdater(log logr.Logger) updater.Updater { + u := updater.New(r.client, log) + if r.enableAggressiveConflictResolution { + u.EnableAggressiveConflictResolution() + } + return u +} + func (r *Reconciler) getValues(ctx context.Context, obj *unstructured.Unstructured) (chartutil.Values, error) { if err := internalvalues.ApplyOverrides(r.overrideValues, obj); err != nil { return chartutil.Values{}, err @@ -876,7 +893,7 @@ func (r *Reconciler) handleDeletion(ctx context.Context, actionClient helmclient // and we need to be able to update the conditions on the CR to // indicate that the uninstall failed. if err := func() (err error) { - uninstallUpdater := updater.New(r.client) + uninstallUpdater := r.newUpdater(log) defer func() { applyErr := uninstallUpdater.Apply(ctx, obj) if err == nil { diff --git a/pkg/reconciler/reconciler_test.go b/pkg/reconciler/reconciler_test.go index 7c5af66..699f266 100644 --- a/pkg/reconciler/reconciler_test.go +++ b/pkg/reconciler/reconciler_test.go @@ -480,6 +480,16 @@ var _ = Describe("Reconciler", func() { Expect(r.selectorPredicate.Generic(event.GenericEvent{Object: objUnlabeled})).To(BeFalse()) }) }) + _ = Describe("WithAggressiveConflictResolution", func() { + It("should set aggressive conflict resolution to true", func() { + Expect(WithAggressiveConflictResolution(true)(r)).To(Succeed()) + Expect(r.enableAggressiveConflictResolution).To(BeTrue()) + }) + It("should set aggressive conflict resolution to false", func() { + Expect(WithAggressiveConflictResolution(false)(r)).To(Succeed()) + Expect(r.enableAggressiveConflictResolution).To(BeFalse()) + }) + }) }) _ = Describe("Reconcile", func() {