Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions pkg/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,32 @@ limitations under the License.
package index

import (
"context"
"fmt"
"maps"
"net/url"
"strings"
"sync"
"time"

"k8s.io/apimachinery/pkg/api/equality"

"github.com/kcp-dev/logicalcluster/v3"
corev1alpha1 "github.com/kcp-dev/sdk/apis/core/v1alpha1"
tenancyv1alpha1 "github.com/kcp-dev/sdk/apis/tenancy/v1alpha1"

"github.com/kcp-dev/kcp/pkg/contextmanager"
)

// defaultMigrationGracePeriod is the period during which a cluster is considered "recently migrated".
// Watches with a non-zero RV (so clients trying to reestablish watch) get a 410 returned while a
// cluster is considered recently migrated to force the client to do a full relist.
//
// This is set to 90s since client-go reflectors, the most likely clients, may have started to backoff
// trying to reconnect during the migration. The backoff ceiling is 60s, so 90s should be enough of a
// window for all clients to receive a 410 and start a new watch.
const defaultMigrationGracePeriod = 90 * time.Second

// Index implements a mapping from logical cluster to (shard) URL.
type Index interface {
Lookup(path logicalcluster.Path) (Result, bool)
Expand Down Expand Up @@ -73,6 +87,18 @@ type State struct {
shardClusterWorkspaceMount map[string]map[logicalcluster.Name]map[string]tenancyv1alpha1.WorkspaceSpec // (shard name, logical cluster, workspace name) -> WorkspaceSpec

shardClusterWorkspaceNameErrorCode map[string]map[logicalcluster.Name]map[string]int // (shard name, logical cluster, workspace name) -> error code

// now is overridable for testing.
now func() time.Time

// migrated at maps logicalcluster.Name to the timestamp it was migrated.
// The timestamp is cleared after the grace period on the next request.
migratedAt sync.Map
// migrationGracePeriod is the grace period for migratedAt.
migrationGracePeriod time.Duration

// clusterContexts allows deriving contexts from both a connection and a shared per-cluster context.
clusterContexts *contextmanager.Manager[logicalcluster.Name]
}

func New(rewriters []PathRewriter) *State {
Expand All @@ -93,6 +119,10 @@ func New(rewriters []PathRewriter) *State {
// shardClusterWorkspaceNameErrorCode is a map of shar,logical cluster, workspace to error code when we want to return an error code
// instead of a URL.
shardClusterWorkspaceNameErrorCode: map[string]map[logicalcluster.Name]map[string]int{},

migrationGracePeriod: defaultMigrationGracePeriod,
now: time.Now,
clusterContexts: contextmanager.New[logicalcluster.Name](context.Background()),
}
}

Expand Down Expand Up @@ -227,6 +257,16 @@ func (c *State) UpsertLogicalCluster(shard string, logicalCluster *corev1alpha1.
c.lock.Lock()
defer c.lock.Unlock()

// If got is not empty then the logical cluster was migrated from shard `got` to shard `shard`.
// Record the timestamp and delete the context from the manager.
// The timestamp is recorded so clients with a watch are getting a 410 sent back to trigger a full relist.
// The context is cancelled to force close watches, which will then cause them to get the aforementioned 410s to relist.
// The relist is important because the RV on the destination shard will be different, leading to erroneous watch results if no relist is done.
if got != "" {
c.migratedAt.Store(clusterName, c.now())
c.clusterContexts.Delete(clusterName, fmt.Errorf("logical cluster %s migrated from shard %s to shard %s", clusterName, got, shard))
}

c.clusterShards[clusterName] = shard

// LogicalClusters are annotated with "path:name" of their workspace's type.
Expand Down Expand Up @@ -254,6 +294,7 @@ func (c *State) DeleteLogicalCluster(shard string, logicalCluster *corev1alpha1.
defer c.lock.Unlock()
if got := c.clusterShards[clusterName]; got == shard {
delete(c.clusterShards, clusterName)
c.clusterContexts.Delete(clusterName, fmt.Errorf("logical cluster %s deleted from shard %s", clusterName, shard))
}

// This LC keyed as the cluster being addressed.
Expand Down Expand Up @@ -457,3 +498,26 @@ func (c *State) LookupURL(path logicalcluster.Path) (Result, bool) {
URL: strings.TrimSuffix(baseURL, "/") + result.Cluster.Path().RequestPath(),
}, true
}

// RecentlyMigrated returns true if the given cluster was recently migrated.
func (c *State) RecentlyMigrated(cluster logicalcluster.Name) bool {
loaded, stored := c.migratedAt.Load(cluster)
if !stored {
return false
}

t := loaded.(time.Time)
if c.now().Sub(t) < c.migrationGracePeriod {
return true
}

// Grace period is over, drop it
c.migratedAt.Delete(cluster)
return false
}

// ClusterContext returns a context derived from the parent that will be
// cancelled when the cluster is deleted or migrated.
func (c *State) ClusterContext(parent context.Context, cluster logicalcluster.Name) (context.Context, context.CancelFunc) {
return c.clusterContexts.Context(parent, cluster)
}
115 changes: 115 additions & 0 deletions pkg/index/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@ limitations under the License.
package index

import (
"context"
"testing"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"

"github.com/kcp-dev/logicalcluster/v3"
corev1alpha1 "github.com/kcp-dev/sdk/apis/core/v1alpha1"
Expand Down Expand Up @@ -696,3 +699,115 @@ func newLogicalCluster(cluster string) *corev1alpha1.LogicalCluster {
ObjectMeta: metav1.ObjectMeta{Name: "cluster", Annotations: map[string]string{"kcp.io/cluster": cluster}},
}
}

// TestMigrationDetection verifies that RecentlyMigrated only reports true when a
// logical cluster actually changes shards (a migration), not on first insert or
// repeated upserts to the same shard.
func TestMigrationDetection(t *testing.T) {
t.Parallel()
target := New(nil)

cluster := logicalcluster.Name("34")

// First observation of the logical cluster is not a migration.
target.UpsertLogicalCluster("root", newLogicalCluster("34"))
if target.RecentlyMigrated(cluster) {
t.Fatal("first upsert must not be considered a migration")
}

// Repeated upsert to the same shard is not a migration.
target.UpsertLogicalCluster("root", newLogicalCluster("34"))
if target.RecentlyMigrated(cluster) {
t.Fatal("re-upsert to the same shard must not be considered a migration")
}

// Changing shards is a migration.
target.UpsertLogicalCluster("amber", newLogicalCluster("34"))
if !target.RecentlyMigrated(cluster) {
t.Fatal("changing shards must be considered a migration")
}

// A different, never-migrated cluster is not affected.
if target.RecentlyMigrated(logicalcluster.Name("99")) {
t.Fatal("unrelated cluster must not be considered migrated")
}
}

// TestMigrationGraceExpiry verifies RecentlyMigrated returns false once the
// grace period elapses, and that the entry is pruned.
func TestMigrationGraceExpiry(t *testing.T) {
t.Parallel()
target := New(nil)

now := time.Now()
target.now = func() time.Time { return now }
target.migrationGracePeriod = 30 * time.Second

cluster := logicalcluster.Name("34")
target.UpsertLogicalCluster("root", newLogicalCluster("34"))
target.UpsertLogicalCluster("amber", newLogicalCluster("34"))

if !target.RecentlyMigrated(cluster) {
t.Fatal("expected cluster to be recently migrated within the grace period")
}

// Advance time past the grace period.
now = now.Add(31 * time.Second)
if target.RecentlyMigrated(cluster) {
t.Fatal("expected cluster to no longer be recently migrated after grace period")
}

// The entry should have been pruned lazily.
_, found := target.migratedAt.Load(cluster)
if found {
t.Fatal("expected migratedAt entry to be pruned after grace period")
}
}

// TestMigrationForceClosesInflightWatches verifies that an in-flight watch
// context is cancelled when its logical cluster changes shards, and that a
// watch for a different cluster is left untouched.
func TestMigrationForceClosesInflightWatches(t *testing.T) {
t.Parallel()
target := New(nil)

cluster := logicalcluster.Name("34")
target.UpsertLogicalCluster("root", newLogicalCluster("34"))

// Open an in-flight watch context and make sure it is cancelled on migration.
ctx, cancel := target.ClusterContext(context.Background(), cluster)
defer cancel()

target.UpsertLogicalCluster("amber", newLogicalCluster("34"))

select {
case <-ctx.Done():
case <-time.After(wait.ForeverTestTimeout):
t.Fatal("expected in-flight watch context to be cancelled on shard change")
}

// A watch opened *after* the migration must be live, not stuck in a sticky
// cancelled state. This guards against using a sticky Cancel instead of a
// one-shot Delete, which would permanently kill all watches for the cluster.
Comment on lines +789 to +791

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huh? Sounds like the agent had a bit more context than what's in the comment here :P

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:D Sortof yes, the comments in the original code (from where I copied the context handling) talked about the sticky cancellation and thats what it ran with when building the tests :D

reconnectCtx, reconnectCancel := target.ClusterContext(context.Background(), cluster)
defer reconnectCancel()
select {
case <-reconnectCtx.Done():
t.Fatal("expected post-migration watch context to stay open (sticky cancellation regression)")
case <-time.After(100 * time.Millisecond):
}

// A watch context for a different logical cluster must not be cancelled when
// an unrelated cluster migrates.
other := logicalcluster.Name("99")
target.UpsertLogicalCluster("root", newLogicalCluster("99"))
otherCtx, otherCancel := target.ClusterContext(context.Background(), other)
defer otherCancel()

target.UpsertLogicalCluster("amber", newLogicalCluster("34")) // re-migrate 34, not 99
select {
case <-otherCtx.Done():
t.Fatal("expected watch context for unrelated cluster to stay open")
case <-time.After(100 * time.Millisecond):
}
}
13 changes: 13 additions & 0 deletions pkg/proxy/index/index_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ const (

type Index interface {
LookupURL(path logicalcluster.Path) (index.Result, bool)
// RecentlyMigrated returns true if the given cluster was recently migrated.
RecentlyMigrated(cluster logicalcluster.Name) bool
// ClusterContext returns a context derived from the parent that will be
// cancelled when the cluster is deleted or migrated.
ClusterContext(parent context.Context, cluster logicalcluster.Name) (context.Context, context.CancelFunc)
}

type ClusterClientGetter func(shard *corev1alpha1.Shard) (kcpclientset.ClusterInterface, error)
Expand Down Expand Up @@ -294,3 +299,11 @@ func (c *Controller) stopShard(shardName string) {
func (c *Controller) LookupURL(path logicalcluster.Path) (index.Result, bool) {
return c.state.LookupURL(path)
}

func (c *Controller) RecentlyMigrated(cluster logicalcluster.Name) bool {
return c.state.RecentlyMigrated(cluster)
}

func (c *Controller) ClusterContext(parent context.Context, cluster logicalcluster.Name) (context.Context, context.CancelFunc) {
return c.state.ClusterContext(parent, cluster)
}
44 changes: 44 additions & 0 deletions pkg/proxy/lookup/lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,16 @@ package lookup

import (
"context"
"fmt"
"net/http"
"net/url"
"strings"

apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/endpoints/filters"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
"k8s.io/apiserver/pkg/endpoints/request"
kubernetesscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/klog/v2"

Expand Down Expand Up @@ -87,6 +91,18 @@ func newClusterResolveHandler(delegate http.Handler, index proxyindex.Index) htt
return
}

// If the cluster was recently migrated and the request is not a fresh list or watch
// return a 410 to force the client to do a relist.
//
// This is only required for watches.
// Lists with a lower RV will get appropriate objects from the shard.
// Lists with a higher RV will get a 504 from the shard.
if index.RecentlyMigrated(result.Cluster) && isInProgressWatch(req) {

@gman0 gman0 Jun 24, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, what about clients that HAVE done a list before, already know a good RV on the new shard, and want to watch from there on; all while this 90s period is ticking? Maybe the error message could be a bit more helpful, like: ...; watch with resourceVersion temporarily unavailable, or something along those lines.

This could be called a pretty hard nit-picking, but a random user, looking at logs of their client might not know what is going on.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lists shouldn't be affected, they are just passed through to the shard and as mentioned in the comment they'll either have a lower RV than the new shard, in which case they'll get a diff to the current RV of the new shard.
If they have a higher RV than the new shard the shard will respond with a 504, causing the client to do a relist.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original implementation also rejected lists but the more I read the code for reflectors the less it made sense to handle lists instead of letting them through^^

err := apierrors.NewResourceExpired(fmt.Sprintf("logical cluster %q migrated to another shard; relist required", result.Cluster))
responsewriters.ErrorNegotiated(err, kubernetesscheme.Codecs, schema.GroupVersion{}, w, req)
return
}

shardURL, err := url.Parse(result.URL)
if err != nil {
responsewriters.InternalError(w, req, err)
Expand All @@ -105,12 +121,40 @@ func newClusterResolveHandler(delegate http.Handler, index proxyindex.Index) htt
}

ctx = WithShardURL(ctx, shardURL)

// Bound watch requests by the per-cluster context to cancel them when the cluster is migrated or deleted.
if isWatchRequest(req) {
var cancel context.CancelFunc
ctx, cancel = index.ClusterContext(ctx, result.Cluster)
defer cancel()
}

req = req.WithContext(ctx)

delegate.ServeHTTP(w, req)
}
}

func isWatchRequest(req *http.Request) bool {
if info, ok := request.RequestInfoFrom(req.Context()); ok && info.IsResourceRequest {
return info.Verb == "watch"
}
switch req.URL.Query().Get("watch") {
case "true", "1":
return true
}
return false
}

func isInProgressWatch(req *http.Request) bool {
rv := req.URL.Query().Get("resourceVersion")
if rv == "" || rv == "0" {
return false
}

return isWatchRequest(req)
}

func newMappingHandler(delegate http.Handler, index proxyindex.Index) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
// Not every virtual workspace and/or every mapping has a {cluster} in its URL;
Expand Down
Loading