Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/retry"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"

Expand All @@ -38,9 +39,9 @@ import (
apisv1alpha2 "github.com/kcp-dev/sdk/apis/apis/v1alpha2"
corev1alpha1 "github.com/kcp-dev/sdk/apis/core/v1alpha1"
tenancyv1alpha1 "github.com/kcp-dev/sdk/apis/tenancy/v1alpha1"
"github.com/kcp-dev/sdk/apis/third_party/conditions/util/conditions"
kcpclientset "github.com/kcp-dev/sdk/client/clientset/versioned/cluster"
apisv1alpha2client "github.com/kcp-dev/sdk/client/clientset/versioned/typed/apis/v1alpha2"
corev1alpha1client "github.com/kcp-dev/sdk/client/clientset/versioned/typed/core/v1alpha1"
apisv1alpha2informers "github.com/kcp-dev/sdk/client/informers/externalversions/apis/v1alpha2"
corev1alpha1informers "github.com/kcp-dev/sdk/client/informers/externalversions/core/v1alpha1"
tenancyv1alpha1informers "github.com/kcp-dev/sdk/client/informers/externalversions/tenancy/v1alpha1"
Expand Down Expand Up @@ -115,8 +116,14 @@ func NewDefaultAPIBindingController(
return indexers.ByPathAndNameWithFallback[*apisv1alpha2.APIExport](apisv1alpha2.Resource("apiexports"), apiExportsInformer.Informer().GetIndexer(), globalAPIExportsInformer.Informer().GetIndexer(), path, name)
},

commitApiBinding: committer.NewCommitter[*apisv1alpha2.APIBinding, apisv1alpha2client.APIBindingInterface, *apisv1alpha2.APIBindingSpec, *apisv1alpha2.APIBindingStatus](kcpClusterClient.ApisV1alpha2().APIBindings()),
commitLogicalCluster: committer.NewCommitter[*corev1alpha1.LogicalCluster, corev1alpha1client.LogicalClusterInterface, *corev1alpha1.LogicalClusterSpec, *corev1alpha1.LogicalClusterStatus](kcpClusterClient.CoreV1alpha1().LogicalClusters()),
commitApiBinding: committer.NewCommitter[*apisv1alpha2.APIBinding, apisv1alpha2client.APIBindingInterface, *apisv1alpha2.APIBindingSpec, *apisv1alpha2.APIBindingStatus](kcpClusterClient.ApisV1alpha2().APIBindings()),

getLCDirect: func(ctx context.Context, clusterName logicalcluster.Name) (*corev1alpha1.LogicalCluster, error) {
return kcpClusterClient.Cluster(clusterName.Path()).CoreV1alpha1().LogicalClusters().Get(ctx, corev1alpha1.LogicalClusterName, metav1.GetOptions{})
},
updateLCStatus: func(ctx context.Context, clusterName logicalcluster.Name, lc *corev1alpha1.LogicalCluster) (*corev1alpha1.LogicalCluster, error) {
return kcpClusterClient.Cluster(clusterName.Path()).CoreV1alpha1().LogicalClusters().UpdateStatus(ctx, lc, metav1.UpdateOptions{})
},
}

c.transitiveTypeResolver = admission.NewTransitiveTypeResolver(c.getWorkspaceType)
Expand Down Expand Up @@ -144,7 +151,6 @@ func NewDefaultAPIBindingController(
}

type apiBindingResource = committer.Resource[*apisv1alpha2.APIBindingSpec, *apisv1alpha2.APIBindingStatus]
type logicalClusterResource = committer.Resource[*corev1alpha1.LogicalClusterSpec, *corev1alpha1.LogicalClusterStatus]

// DefaultAPIBindingController is a controller which instantiates APIBindings and waits for them to be fully bound
// in new Workspaces.
Expand All @@ -161,8 +167,14 @@ type DefaultAPIBindingController struct {
createAPIBinding func(ctx context.Context, clusterName logicalcluster.Path, binding *apisv1alpha2.APIBinding) (*apisv1alpha2.APIBinding, error)
getAPIExport func(clusterName logicalcluster.Path, name string) (*apisv1alpha2.APIExport, error)

commitApiBinding func(ctx context.Context, old, new *apiBindingResource) error
commitLogicalCluster func(ctx context.Context, old, new *logicalClusterResource) error
commitApiBinding func(ctx context.Context, old, new *apiBindingResource) error

// getLCDirect fetches the LogicalCluster directly from the API (bypassing the cache) so
// that RetryOnConflict loops always start from the freshest resourceVersion.
getLCDirect func(ctx context.Context, clusterName logicalcluster.Name) (*corev1alpha1.LogicalCluster, error)
// updateLCStatus submits a status update for the LogicalCluster; the server rejects it
// with a conflict error if the resourceVersion has changed since getLCDirect was called.
updateLCStatus func(ctx context.Context, clusterName logicalcluster.Name, lc *corev1alpha1.LogicalCluster) (*corev1alpha1.LogicalCluster, error)

transitiveTypeResolver transitiveTypeResolver
}
Expand Down Expand Up @@ -297,23 +309,35 @@ func (c *DefaultAPIBindingController) process(ctx context.Context, key string) e
return nil // nothing we can do here
}

old := logicalCluster
before := logicalCluster
logicalCluster = logicalCluster.DeepCopy()

logger = logging.WithObject(logger, logicalCluster)
ctx = klog.NewContext(ctx, logger)

var errs []error
err = c.reconcile(ctx, logicalCluster)
if err != nil {
if err := c.reconcile(ctx, logicalCluster); err != nil {
errs = append(errs, err)
}

// If the object being reconciled changed as a result, update it.
oldResource := &logicalClusterResource{ObjectMeta: old.ObjectMeta, Spec: &old.Spec, Status: &old.Status}
newResource := &logicalClusterResource{ObjectMeta: logicalCluster.ObjectMeta, Spec: &logicalCluster.Spec, Status: &logicalCluster.Status}
if err := c.commitLogicalCluster(ctx, oldResource, newResource); err != nil {
errs = append(errs, err)
// Compute the condition delta and apply it onto a freshly-fetched object inside a
// RetryOnConflict loop. This ensures concurrent writes from APIBinderInitializerController
// (which owns WorkspaceAPIBindingsInitialized) cannot overwrite our condition and vice versa.
condPatch := conditions.NewPatch(before, logicalCluster)
if !condPatch.IsZero() {
if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
fresh, err := c.getLCDirect(ctx, clusterName)
if err != nil {
return err
}
if err := condPatch.Apply(fresh, conditions.WithOwnedConditions(tenancyv1alpha1.WorkspaceAPIBindingsReconciled)); err != nil {
return err
}
_, err = c.updateLCStatus(ctx, clusterName, fresh)
return err
}); err != nil {
errs = append(errs, err)
}
}

return utilerrors.NewAggregate(errs)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
Copyright 2026 The kcp Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package defaultapibindinglifecycle

import (
"context"
"testing"

"github.com/stretchr/testify/require"

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"

"github.com/kcp-dev/logicalcluster/v3"
apisv1alpha2 "github.com/kcp-dev/sdk/apis/apis/v1alpha2"
corev1alpha1 "github.com/kcp-dev/sdk/apis/core/v1alpha1"
tenancyv1alpha1 "github.com/kcp-dev/sdk/apis/tenancy/v1alpha1"
conditionsv1alpha1 "github.com/kcp-dev/sdk/apis/third_party/conditions/apis/conditions/v1alpha1"
"github.com/kcp-dev/sdk/apis/third_party/conditions/util/conditions"
)

// TestProcessConditionIsolation verifies that process() only writes the condition this
// controller owns onto the live object, leaving conditions set by other controllers intact.
func TestProcessConditionIsolation(t *testing.T) {
t.Parallel()

// The "live" object already has a condition written by a different controller
// (WorkspaceAPIBindingsInitialized). After our controller runs, that foreign
// condition must still be present.
foreignCondition := conditionsv1alpha1.Condition{
Type: tenancyv1alpha1.WorkspaceAPIBindingsInitialized,
Status: corev1.ConditionTrue,
LastTransitionTime: metav1.Now(),
}

// Cached object seen before reconcile — no conditions yet. The workspace-type
// annotation is set so that reconcile resolves the type, finds no required bindings
// (getWorkspaceType returns an empty WorkspaceType), and marks WorkspaceAPIBindingsReconciled=True.
cached := &corev1alpha1.LogicalCluster{
ObjectMeta: metav1.ObjectMeta{
Name: corev1alpha1.LogicalClusterName,
ResourceVersion: "1",
Annotations: map[string]string{
tenancyv1alpha1.LogicalClusterTypeAnnotationKey: "root:universal",
},
},
}

// The live object returned by getLCDirect already has the foreign condition.
live := cached.DeepCopy()
live.ResourceVersion = "2"
conditions.Set(live, &foreignCondition)

var updateStatusCalled bool
var receivedLC *corev1alpha1.LogicalCluster

c := &DefaultAPIBindingController{
getLogicalCluster: func(logicalcluster.Name) (*corev1alpha1.LogicalCluster, error) {
return cached, nil
},
getLogicalClusterByPath: func(logicalcluster.Path) (*corev1alpha1.LogicalCluster, error) {
return nil, apierrors.NewNotFound(schema.GroupResource{}, "")
},
getWorkspaceType: func(logicalcluster.Path, string) (*tenancyv1alpha1.WorkspaceType, error) {
// Return an empty WorkspaceType so reconcile proceeds past the early-exit.
// transitiveTypeResolver returns an empty list, so no bindings are required
// and reconcile marks WorkspaceAPIBindingsReconciled=True.
return &tenancyv1alpha1.WorkspaceType{}, nil
},
listLogicalClusters: func() ([]*corev1alpha1.LogicalCluster, error) { return nil, nil },
listAPIBindings: func(logicalcluster.Name) ([]*apisv1alpha2.APIBinding, error) {
return nil, nil
},
getAPIBinding: func(logicalcluster.Name, string) (*apisv1alpha2.APIBinding, error) {
return nil, apierrors.NewNotFound(schema.GroupResource{}, "")
},
createAPIBinding: func(context.Context, logicalcluster.Path, *apisv1alpha2.APIBinding) (*apisv1alpha2.APIBinding, error) {
return nil, nil
},
getAPIExport: func(logicalcluster.Path, string) (*apisv1alpha2.APIExport, error) {
return nil, apierrors.NewNotFound(schema.GroupResource{}, "")
},
commitApiBinding: func(context.Context, *apiBindingResource, *apiBindingResource) error {
return nil
},
getLCDirect: func(_ context.Context, _ logicalcluster.Name) (*corev1alpha1.LogicalCluster, error) {
return live.DeepCopy(), nil
},
updateLCStatus: func(_ context.Context, _ logicalcluster.Name, lc *corev1alpha1.LogicalCluster) (*corev1alpha1.LogicalCluster, error) {
updateStatusCalled = true
receivedLC = lc.DeepCopy()
return lc, nil
},
transitiveTypeResolver: &noopResolver{},
}

err := c.process(context.Background(), "root:ws|cluster")
require.NoError(t, err)

require.True(t, updateStatusCalled, "updateLCStatus should have been called")

// The condition this controller owns must be present.
ownedCond := conditions.Get(receivedLC, tenancyv1alpha1.WorkspaceAPIBindingsReconciled)
require.NotNil(t, ownedCond, "owned condition WorkspaceAPIBindingsReconciled must be set")

// The foreign condition must not have been removed.
foreignCond := conditions.Get(receivedLC, tenancyv1alpha1.WorkspaceAPIBindingsInitialized)
require.NotNil(t, foreignCond, "foreign condition must be preserved on the live object")
require.Equal(t, corev1.ConditionTrue, foreignCond.Status)
}

type noopResolver struct{}

func (n *noopResolver) Resolve(*tenancyv1alpha1.WorkspaceType) ([]*tenancyv1alpha1.WorkspaceType, error) {
return nil, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package initialization
import (
"context"
"fmt"
"slices"
"time"

"github.com/go-logr/logr"
Expand All @@ -30,24 +31,25 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/retry"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"

kcpcache "github.com/kcp-dev/apimachinery/v2/pkg/cache"
"github.com/kcp-dev/logicalcluster/v3"
apisv1alpha2 "github.com/kcp-dev/sdk/apis/apis/v1alpha2"
corev1alpha1 "github.com/kcp-dev/sdk/apis/core/v1alpha1"
sdkinitialization "github.com/kcp-dev/sdk/apis/tenancy/initialization"
tenancyv1alpha1 "github.com/kcp-dev/sdk/apis/tenancy/v1alpha1"
"github.com/kcp-dev/sdk/apis/third_party/conditions/util/conditions"
kcpclientset "github.com/kcp-dev/sdk/client/clientset/versioned/cluster"
corev1alpha1client "github.com/kcp-dev/sdk/client/clientset/versioned/typed/core/v1alpha1"
apisv1alpha2informers "github.com/kcp-dev/sdk/client/informers/externalversions/apis/v1alpha2"
corev1alpha1informers "github.com/kcp-dev/sdk/client/informers/externalversions/core/v1alpha1"
tenancyv1alpha1informers "github.com/kcp-dev/sdk/client/informers/externalversions/tenancy/v1alpha1"

admission "github.com/kcp-dev/kcp/pkg/admission/workspacetypeexists"
"github.com/kcp-dev/kcp/pkg/indexers"
"github.com/kcp-dev/kcp/pkg/logging"
"github.com/kcp-dev/kcp/pkg/reconciler/committer"
"github.com/kcp-dev/kcp/pkg/reconciler/events"
)

Expand Down Expand Up @@ -112,7 +114,12 @@ func NewAPIBinder(
return indexers.ByPathAndNameWithFallback[*apisv1alpha2.APIExport](apisv1alpha2.Resource("apiexports"), apiExportsInformer.Informer().GetIndexer(), globalAPIExportsInformer.Informer().GetIndexer(), path, name)
},

commit: committer.NewCommitter[*corev1alpha1.LogicalCluster, corev1alpha1client.LogicalClusterInterface, *corev1alpha1.LogicalClusterSpec, *corev1alpha1.LogicalClusterStatus](kcpClusterClient.CoreV1alpha1().LogicalClusters()),
getLCDirect: func(ctx context.Context, clusterName logicalcluster.Name) (*corev1alpha1.LogicalCluster, error) {
return kcpClusterClient.Cluster(clusterName.Path()).CoreV1alpha1().LogicalClusters().Get(ctx, corev1alpha1.LogicalClusterName, metav1.GetOptions{})
},
updateLCStatus: func(ctx context.Context, clusterName logicalcluster.Name, lc *corev1alpha1.LogicalCluster) (*corev1alpha1.LogicalCluster, error) {
return kcpClusterClient.Cluster(clusterName.Path()).CoreV1alpha1().LogicalClusters().UpdateStatus(ctx, lc, metav1.UpdateOptions{})
},
}

c.transitiveTypeResolver = admission.NewTransitiveTypeResolver(c.getWorkspaceType)
Expand Down Expand Up @@ -158,8 +165,6 @@ func NewAPIBinder(
return c, nil
}

type logicalClusterResource = committer.Resource[*corev1alpha1.LogicalClusterSpec, *corev1alpha1.LogicalClusterStatus]

// APIBinder is a controller which instantiates APIBindings and waits for them to be fully bound
// in new Workspaces.
type APIBinder struct {
Expand All @@ -178,8 +183,12 @@ type APIBinder struct {

transitiveTypeResolver transitiveTypeResolver

// commit creates a patch and submits it, if needed.
commit func(ctx context.Context, old, new *logicalClusterResource) error
// getLCDirect fetches the LogicalCluster directly from the API (bypassing the cache) so
// that RetryOnConflict loops always start from the freshest resourceVersion.
getLCDirect func(ctx context.Context, clusterName logicalcluster.Name) (*corev1alpha1.LogicalCluster, error)
// updateLCStatus submits a status update for the LogicalCluster; the server rejects it
// with a conflict error if the resourceVersion has changed since getLCDirect was called.
updateLCStatus func(ctx context.Context, clusterName logicalcluster.Name, lc *corev1alpha1.LogicalCluster) (*corev1alpha1.LogicalCluster, error)
}

type transitiveTypeResolver interface {
Expand Down Expand Up @@ -311,27 +320,45 @@ func (b *APIBinder) process(ctx context.Context, key string) error {
if !apierrors.IsNotFound(err) {
logger.Error(err, "failed to get LogicalCluster from lister", "cluster", clusterName)
}

return nil // nothing we can do here
return nil
}

old := logicalCluster
before := logicalCluster
logicalCluster = logicalCluster.DeepCopy()

logger = logging.WithObject(logger, logicalCluster)
ctx = klog.NewContext(ctx, logger)

var errs []error
err = b.reconcile(ctx, logicalCluster)
if err != nil {
if err := b.reconcile(ctx, logicalCluster); err != nil {
errs = append(errs, err)
}

// If the object being reconciled changed as a result, update it.
oldResource := &logicalClusterResource{ObjectMeta: old.ObjectMeta, Spec: &old.Spec, Status: &old.Status}
newResource := &logicalClusterResource{ObjectMeta: logicalCluster.ObjectMeta, Spec: &logicalCluster.Spec, Status: &logicalCluster.Status}
if err := b.commit(ctx, oldResource, newResource); err != nil {
errs = append(errs, err)
// Compute the condition delta and whether the initializer was removed during reconcile.
// Using conditions.NewPatch + RetryOnConflict ensures we only write the conditions this
// controller owns onto the freshest resourceVersion, so a concurrent write from
// DefaultAPIBindingLifecycleController cannot silently overwrite our changes (or vice versa).
condPatch := conditions.NewPatch(before, logicalCluster)
initializerRemoved := slices.Contains(before.Status.Initializers, tenancyv1alpha1.WorkspaceAPIBindingsInitializer) &&
!slices.Contains(logicalCluster.Status.Initializers, tenancyv1alpha1.WorkspaceAPIBindingsInitializer)

if !condPatch.IsZero() || initializerRemoved {
if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
fresh, err := b.getLCDirect(ctx, clusterName)
if err != nil {
return err
}
if err := condPatch.Apply(fresh, conditions.WithOwnedConditions(tenancyv1alpha1.WorkspaceAPIBindingsInitialized)); err != nil {
return err
}
if initializerRemoved {
fresh.Status.Initializers = sdkinitialization.EnsureInitializerAbsent(tenancyv1alpha1.WorkspaceAPIBindingsInitializer, fresh.Status.Initializers)
}
_, err = b.updateLCStatus(ctx, clusterName, fresh)
return err
}); err != nil {
errs = append(errs, err)
}
}

return utilerrors.NewAggregate(errs)
Expand Down
Loading