diff --git a/pkg/cluster/client.go b/pkg/cluster/client.go index cf7deba5..d3b1e8aa 100644 --- a/pkg/cluster/client.go +++ b/pkg/cluster/client.go @@ -6,12 +6,20 @@ SPDX-License-Identifier: Apache-2.0 package cluster import ( + "context" "net/http" + "sync" + "time" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" + apitypes "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/discovery" "k8s.io/client-go/rest" "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/sap/component-operator-runtime/pkg/types" ) func NewClient(clnt client.Client, discoveryClient discovery.DiscoveryInterface, eventRecorder record.EventRecorder, config *rest.Config, httpClient *http.Client) Client { @@ -21,15 +29,23 @@ func NewClient(clnt client.Client, discoveryClient discovery.DiscoveryInterface, eventRecorder: eventRecorder, config: config, httpClient: httpClient, + inflightRetries: make(map[apitypes.UID]time.Time), } } +const ( + retryAfter = time.Second + nextRetryNotBefore = time.Minute +) + type clientImpl struct { client.Client discoveryClient discovery.DiscoveryInterface eventRecorder record.EventRecorder config *rest.Config httpClient *http.Client + mu sync.Mutex + inflightRetries map[apitypes.UID]time.Time } func (c *clientImpl) DiscoveryClient() discovery.DiscoveryInterface { @@ -47,3 +63,70 @@ func (c *clientImpl) Config() *rest.Config { func (c *clientImpl) HttpClient() *http.Client { return c.httpClient } + +func (c *clientImpl) Apply(ctx context.Context, obj runtime.ApplyConfiguration, opts ...client.ApplyOption) error { + return c.Client.Apply(ctx, obj, opts...) +} + +func (c *clientImpl) Create(ctx context.Context, obj client.Object, opts ...client.CreateOption) error { + return c.retryIfEligible(c.Client.Create(ctx, obj, opts...), obj.GetUID()) +} + +func (c *clientImpl) Delete(ctx context.Context, obj client.Object, opts ...client.DeleteOption) error { + return c.retryIfEligible(c.Client.Delete(ctx, obj, opts...), obj.GetUID()) +} + +func (c *clientImpl) Update(ctx context.Context, obj client.Object, opts ...client.UpdateOption) error { + return c.retryIfEligible(c.Client.Update(ctx, obj, opts...), obj.GetUID()) +} + +func (c *clientImpl) Patch(ctx context.Context, obj client.Object, patch client.Patch, opts ...client.PatchOption) error { + return c.retryIfEligible(c.Client.Patch(ctx, obj, patch, opts...), obj.GetUID()) +} + +func (c *clientImpl) DeleteAllOf(ctx context.Context, obj client.Object, opts ...client.DeleteAllOfOption) error { + return c.Client.DeleteAllOf(ctx, obj, opts...) +} + +func (c *clientImpl) Status() client.SubResourceWriter { + return &subResourceClientImpl{SubResourceClient: c.Client.SubResource("status"), client: c} +} + +func (c *clientImpl) SubResource(subResource string) client.SubResourceClient { + return &subResourceClientImpl{SubResourceClient: c.Client.SubResource(subResource), client: c} +} + +func (c *clientImpl) retryIfEligible(err error, uid apitypes.UID) error { + if apierrors.IsConflict(err) && uid != "" { + c.mu.Lock() + defer c.mu.Unlock() + now := time.Now() + for uid, notBefore := range c.inflightRetries { + if notBefore.After(now) { + delete(c.inflightRetries, uid) + } + } + if _, ok := c.inflightRetries[uid]; !ok { + c.inflightRetries[uid] = now.Add(nextRetryNotBefore) + return types.NewRetriableError(err, ref(retryAfter)) + } + } + return err +} + +type subResourceClientImpl struct { + client.SubResourceClient + client *clientImpl +} + +func (s *subResourceClientImpl) Create(ctx context.Context, obj client.Object, subResource client.Object, opts ...client.SubResourceCreateOption) error { + return s.client.retryIfEligible(s.SubResourceClient.Create(ctx, obj, subResource, opts...), obj.GetUID()) +} + +func (s *subResourceClientImpl) Update(ctx context.Context, obj client.Object, opts ...client.SubResourceUpdateOption) error { + return s.client.retryIfEligible(s.SubResourceClient.Update(ctx, obj, opts...), obj.GetUID()) +} + +func (s *subResourceClientImpl) Patch(ctx context.Context, obj client.Object, patch client.Patch, opts ...client.SubResourcePatchOption) error { + return s.client.retryIfEligible(s.SubResourceClient.Patch(ctx, obj, patch, opts...), obj.GetUID()) +} diff --git a/pkg/cluster/util.go b/pkg/cluster/util.go new file mode 100644 index 00000000..3e9bc314 --- /dev/null +++ b/pkg/cluster/util.go @@ -0,0 +1,12 @@ +/* +SPDX-FileCopyrightText: 2024 SAP SE or an SAP affiliate company and component-operator-runtime contributors +SPDX-License-Identifier: Apache-2.0 +*/ + +package cluster + +// TODO: consolidate all the util files into an internal reuse package + +func ref[T any](x T) *T { + return &x +} diff --git a/pkg/component/reconciler.go b/pkg/component/reconciler.go index 66a681dc..eb2c81eb 100755 --- a/pkg/component/reconciler.go +++ b/pkg/component/reconciler.go @@ -319,8 +319,7 @@ func (r *Reconciler[T]) Reconcile(ctx context.Context, req ctrl.Request) (result if err != nil { // convert retriable errors into non-errors (Pending or DeletionPending state), and return specified or default backoff - retriableError := &types.RetriableError{} - if errors.As(err, retriableError) { + if retriableError := new(types.RetriableError); errors.As(err, retriableError) { retryAfter := retriableError.RetryAfter() if retryAfter == nil || *retryAfter == 0 { retryAfter = &retryInterval @@ -444,8 +443,19 @@ func (r *Reconciler[T]) Reconcile(ctx context.Context, req ctrl.Request) (result } } if updateErr := r.client.Status().Update(ctx, component, client.FieldOwner(*r.options.FieldOwner)); updateErr != nil { - err = errors.Join(err, updateErr) - result = ctrl.Result{} + if retriableError := new(types.RetriableError); errors.As(updateErr, retriableError) { + log.V(1).Info("error updating status, retrying", "requeue", result.Requeue || result.RequeueAfter > 0, "requeueAfter", result.RequeueAfter.String()) + retryAfter := retriableError.RetryAfter() + if retryAfter == nil || *retryAfter == 0 { + retryAfter = &retryInterval + } + result = ctrl.Result{RequeueAfter: *retryAfter} + err = nil + } else { + log.V(1).Info("error updating status, not retrying") + err = errors.Join(err, updateErr) + result = ctrl.Result{} + } } }() diff --git a/pkg/reconciler/reconciler.go b/pkg/reconciler/reconciler.go index be56d7f9..dcc3ef28 100644 --- a/pkg/reconciler/reconciler.go +++ b/pkg/reconciler/reconciler.go @@ -1299,6 +1299,7 @@ func (r *Reconciler) deleteObject(ctx context.Context, key types.ObjectKey, exis if ok := controllerutil.RemoveFinalizer(crd, r.finalizer); ok { // note: 409 error is very likely here (because of concurrent updates happening through the api server); this is why we retry once if err := r.client.Update(ctx, crd, client.FieldOwner(r.fieldOwner)); err != nil { + // TODO: this immediate retry is actually no longer necessary because of the retry logic in the client if i == 1 && apierrors.IsConflict(err) { log.V(1).Info("error while updating CustomResourcedefinition (409 conflict); doing one retry", "error", err.Error()) continue @@ -1324,6 +1325,7 @@ func (r *Reconciler) deleteObject(ctx context.Context, key types.ObjectKey, exis if ok := controllerutil.RemoveFinalizer(apiService, r.finalizer); ok { // note: 409 error is very likely here (because of concurrent updates happening through the api server); this is why we retry once if err := r.client.Update(ctx, apiService, client.FieldOwner(r.fieldOwner)); err != nil { + // TODO: this immediate retry is actually no longer necessary because of the retry logic in the client if i == 1 && apierrors.IsConflict(err) { log.V(1).Info("error while updating APIService (409 conflict); doing one retry", "error", err.Error()) continue