Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 83 additions & 0 deletions pkg/cluster/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,20 @@ SPDX-License-Identifier: Apache-2.0
package cluster

import (
"context"
"net/http"
"sync"
"time"

apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
apitypes "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/discovery"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/sap/component-operator-runtime/pkg/types"
)

func NewClient(clnt client.Client, discoveryClient discovery.DiscoveryInterface, eventRecorder record.EventRecorder, config *rest.Config, httpClient *http.Client) Client {
Expand All @@ -21,15 +29,23 @@ func NewClient(clnt client.Client, discoveryClient discovery.DiscoveryInterface,
eventRecorder: eventRecorder,
config: config,
httpClient: httpClient,
inflightRetries: make(map[apitypes.UID]time.Time),
}
}

const (
retryAfter = time.Second
nextRetryNotBefore = time.Minute
)

type clientImpl struct {
client.Client
discoveryClient discovery.DiscoveryInterface
eventRecorder record.EventRecorder
config *rest.Config
httpClient *http.Client
mu sync.Mutex
inflightRetries map[apitypes.UID]time.Time
}

func (c *clientImpl) DiscoveryClient() discovery.DiscoveryInterface {
Expand All @@ -47,3 +63,70 @@ func (c *clientImpl) Config() *rest.Config {
func (c *clientImpl) HttpClient() *http.Client {
return c.httpClient
}

func (c *clientImpl) Apply(ctx context.Context, obj runtime.ApplyConfiguration, opts ...client.ApplyOption) error {
return c.Client.Apply(ctx, obj, opts...)
}

func (c *clientImpl) Create(ctx context.Context, obj client.Object, opts ...client.CreateOption) error {
return c.retryIfEligible(c.Client.Create(ctx, obj, opts...), obj.GetUID())
}

func (c *clientImpl) Delete(ctx context.Context, obj client.Object, opts ...client.DeleteOption) error {
return c.retryIfEligible(c.Client.Delete(ctx, obj, opts...), obj.GetUID())
}

func (c *clientImpl) Update(ctx context.Context, obj client.Object, opts ...client.UpdateOption) error {
return c.retryIfEligible(c.Client.Update(ctx, obj, opts...), obj.GetUID())
}

func (c *clientImpl) Patch(ctx context.Context, obj client.Object, patch client.Patch, opts ...client.PatchOption) error {
return c.retryIfEligible(c.Client.Patch(ctx, obj, patch, opts...), obj.GetUID())
}

func (c *clientImpl) DeleteAllOf(ctx context.Context, obj client.Object, opts ...client.DeleteAllOfOption) error {
return c.Client.DeleteAllOf(ctx, obj, opts...)
}

func (c *clientImpl) Status() client.SubResourceWriter {
return &subResourceClientImpl{SubResourceClient: c.Client.SubResource("status"), client: c}
}

func (c *clientImpl) SubResource(subResource string) client.SubResourceClient {
return &subResourceClientImpl{SubResourceClient: c.Client.SubResource(subResource), client: c}
}

func (c *clientImpl) retryIfEligible(err error, uid apitypes.UID) error {
if apierrors.IsConflict(err) && uid != "" {
c.mu.Lock()
defer c.mu.Unlock()
now := time.Now()
for uid, notBefore := range c.inflightRetries {
if notBefore.After(now) {
delete(c.inflightRetries, uid)
}
}
if _, ok := c.inflightRetries[uid]; !ok {
c.inflightRetries[uid] = now.Add(nextRetryNotBefore)
return types.NewRetriableError(err, ref(retryAfter))
}
}
return err
}

type subResourceClientImpl struct {
client.SubResourceClient
client *clientImpl
}

func (s *subResourceClientImpl) Create(ctx context.Context, obj client.Object, subResource client.Object, opts ...client.SubResourceCreateOption) error {
return s.client.retryIfEligible(s.SubResourceClient.Create(ctx, obj, subResource, opts...), obj.GetUID())
}

func (s *subResourceClientImpl) Update(ctx context.Context, obj client.Object, opts ...client.SubResourceUpdateOption) error {
return s.client.retryIfEligible(s.SubResourceClient.Update(ctx, obj, opts...), obj.GetUID())
}

func (s *subResourceClientImpl) Patch(ctx context.Context, obj client.Object, patch client.Patch, opts ...client.SubResourcePatchOption) error {
return s.client.retryIfEligible(s.SubResourceClient.Patch(ctx, obj, patch, opts...), obj.GetUID())
}
12 changes: 12 additions & 0 deletions pkg/cluster/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
SPDX-FileCopyrightText: 2024 SAP SE or an SAP affiliate company and component-operator-runtime contributors
SPDX-License-Identifier: Apache-2.0
*/

package cluster

// TODO: consolidate all the util files into an internal reuse package

func ref[T any](x T) *T {
return &x
}
18 changes: 14 additions & 4 deletions pkg/component/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,8 +319,7 @@ func (r *Reconciler[T]) Reconcile(ctx context.Context, req ctrl.Request) (result

if err != nil {
// convert retriable errors into non-errors (Pending or DeletionPending state), and return specified or default backoff
retriableError := &types.RetriableError{}
if errors.As(err, retriableError) {
if retriableError := new(types.RetriableError); errors.As(err, retriableError) {
retryAfter := retriableError.RetryAfter()
if retryAfter == nil || *retryAfter == 0 {
retryAfter = &retryInterval
Expand Down Expand Up @@ -444,8 +443,19 @@ func (r *Reconciler[T]) Reconcile(ctx context.Context, req ctrl.Request) (result
}
}
if updateErr := r.client.Status().Update(ctx, component, client.FieldOwner(*r.options.FieldOwner)); updateErr != nil {
err = errors.Join(err, updateErr)
result = ctrl.Result{}
if retriableError := new(types.RetriableError); errors.As(updateErr, retriableError) {
log.V(1).Info("error updating status, retrying", "requeue", result.Requeue || result.RequeueAfter > 0, "requeueAfter", result.RequeueAfter.String())
retryAfter := retriableError.RetryAfter()
if retryAfter == nil || *retryAfter == 0 {
retryAfter = &retryInterval
}
result = ctrl.Result{RequeueAfter: *retryAfter}
err = nil
} else {
log.V(1).Info("error updating status, not retrying")
err = errors.Join(err, updateErr)
result = ctrl.Result{}
}
}
}()

Expand Down
2 changes: 2 additions & 0 deletions pkg/reconciler/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1299,6 +1299,7 @@ func (r *Reconciler) deleteObject(ctx context.Context, key types.ObjectKey, exis
if ok := controllerutil.RemoveFinalizer(crd, r.finalizer); ok {
// note: 409 error is very likely here (because of concurrent updates happening through the api server); this is why we retry once
if err := r.client.Update(ctx, crd, client.FieldOwner(r.fieldOwner)); err != nil {
// TODO: this immediate retry is actually no longer necessary because of the retry logic in the client
if i == 1 && apierrors.IsConflict(err) {
log.V(1).Info("error while updating CustomResourcedefinition (409 conflict); doing one retry", "error", err.Error())
continue
Expand All @@ -1324,6 +1325,7 @@ func (r *Reconciler) deleteObject(ctx context.Context, key types.ObjectKey, exis
if ok := controllerutil.RemoveFinalizer(apiService, r.finalizer); ok {
// note: 409 error is very likely here (because of concurrent updates happening through the api server); this is why we retry once
if err := r.client.Update(ctx, apiService, client.FieldOwner(r.fieldOwner)); err != nil {
// TODO: this immediate retry is actually no longer necessary because of the retry logic in the client
if i == 1 && apierrors.IsConflict(err) {
log.V(1).Info("error while updating APIService (409 conflict); doing one retry", "error", err.Error())
continue
Expand Down