Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 103 additions & 0 deletions api/v1alpha1/temporalworkerownedresource_types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Unless explicitly stated otherwise all files in this repository are licensed under the MIT License.
//
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2024 Datadog, Inc.

package v1alpha1

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
)

// WorkerDeploymentReference references a TemporalWorkerDeployment in the same namespace.
type WorkerDeploymentReference struct {
// Name of the TemporalWorkerDeployment resource in the same namespace.
// +kubebuilder:validation:Required
// +kubebuilder:validation:Pattern=`^[a-z0-9]([-a-z0-9]*[a-z0-9])?$`
Name string `json:"name"`
}

// TemporalWorkerOwnedResourceSpec defines the desired state of TemporalWorkerOwnedResource.
type TemporalWorkerOwnedResourceSpec struct {
// WorkerRef references the TemporalWorkerDeployment to attach this resource to.
// +kubebuilder:validation:Required
WorkerRef WorkerDeploymentReference `json:"workerRef"`

// Object is the Kubernetes resource template to attach to each versioned Deployment.
// One copy of this resource is created per active Build ID, owned by the corresponding
// versioned Deployment (so it is garbage collected when the Deployment is deleted).
//
// The object must include apiVersion, kind, and spec. The metadata.name and
// metadata.namespace are generated by the controller and must not be set by the user.
//
// String values in the spec may contain Go template expressions:
// {{ .DeploymentName }} - the controller-generated versioned Deployment name
// {{ .Namespace }} - the namespace of the resource
// {{ .BuildID }} - the Build ID for this version
//
// The controller also auto-injects two well-known fields if they are absent:
// scaleTargetRef - set to point at the versioned Deployment (for HPA, KEDA, WPA, etc.)
// matchLabels - set to the versioned Deployment's selector labels (for PDB, WPA, etc.)
// +kubebuilder:validation:Required
// +kubebuilder:pruning:PreserveUnknownFields
Object runtime.RawExtension `json:"object"`
}

// OwnedResourceVersionStatus describes the status of an owned resource for a single Build ID.
type OwnedResourceVersionStatus struct {
// BuildID is the Build ID of the versioned Deployment this status entry refers to.
BuildID string `json:"buildID"`

// Applied is true if the resource was successfully applied for this Build ID.
Applied bool `json:"applied"`

// ResourceName is the name of the applied Kubernetes resource.
// +optional
ResourceName string `json:"resourceName,omitempty"`

// Message describes any error if Applied is false.
// +optional
Message string `json:"message,omitempty"`

// LastTransitionTime is the last time this status entry was updated.
LastTransitionTime metav1.Time `json:"lastTransitionTime"`
}

// TemporalWorkerOwnedResourceStatus defines the observed state of TemporalWorkerOwnedResource.
type TemporalWorkerOwnedResourceStatus struct {
// Versions describes the per-Build-ID status of owned resources.
// +optional
Versions []OwnedResourceVersionStatus `json:"versions,omitempty"`
}

//+kubebuilder:object:root=true
//+kubebuilder:subresource:status
//+kubebuilder:resource:shortName=twor
//+kubebuilder:printcolumn:name="Worker",type="string",JSONPath=".spec.workerRef.name",description="Referenced TemporalWorkerDeployment"
//+kubebuilder:printcolumn:name="Kind",type="string",JSONPath=".spec.object.kind",description="Kind of owned resource"
//+kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp",description="Age"

// TemporalWorkerOwnedResource attaches an arbitrary namespaced Kubernetes resource
// (HPA, PDB, WPA, custom CRDs, etc.) to each per-Build-ID versioned Deployment
// managed by a TemporalWorkerDeployment. One copy of the resource is created per
// active Build ID and is owned by the corresponding versioned Deployment.
type TemporalWorkerOwnedResource struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`

Spec TemporalWorkerOwnedResourceSpec `json:"spec,omitempty"`
Status TemporalWorkerOwnedResourceStatus `json:"status,omitempty"`
}

//+kubebuilder:object:root=true

// TemporalWorkerOwnedResourceList contains a list of TemporalWorkerOwnedResource.
type TemporalWorkerOwnedResourceList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []TemporalWorkerOwnedResource `json:"items"`
}

func init() {
SchemeBuilder.Register(&TemporalWorkerOwnedResource{}, &TemporalWorkerOwnedResourceList{})
}
129 changes: 129 additions & 0 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 34 additions & 0 deletions internal/controller/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,5 +190,39 @@ func (r *TemporalWorkerDeploymentReconciler) executePlan(ctx context.Context, l
}
}

// Apply owned resources via Server-Side Apply.
// Partial failure isolation: all resources are attempted even if some fail;
// errors are collected and returned together.
var ownedResourceErrors []error
for _, apply := range p.ApplyOwnedResources {
l.Info("applying owned resource",
"name", apply.Resource.GetName(),
"kind", apply.Resource.GetKind(),
"fieldManager", apply.FieldManager,
)
// client.Apply uses Server-Side Apply, which is a create-or-update operation:
// if the resource does not yet exist the API server creates it; if it already
// exists the API server merges only the fields owned by this field manager,
// leaving fields owned by other managers (e.g. the HPA controller) untouched.
// client.ForceOwnership allows this field manager to claim any fields that were
// previously owned by a different manager (e.g. after a field manager rename).
if err := r.Client.Patch(
ctx,
apply.Resource,
client.Apply,
client.ForceOwnership,
client.FieldOwner(apply.FieldManager),
); err != nil {
l.Error(err, "unable to apply owned resource",
"name", apply.Resource.GetName(),
"kind", apply.Resource.GetKind(),
)
ownedResourceErrors = append(ownedResourceErrors, err)
}
}
if len(ownedResourceErrors) > 0 {
return fmt.Errorf("errors applying owned resources (%d failures): %v", len(ownedResourceErrors), ownedResourceErrors[0])
}

return nil
}
16 changes: 16 additions & 0 deletions internal/controller/genplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
)

// plan holds the actions to execute during reconciliation
Expand All @@ -38,6 +39,9 @@ type plan struct {
// Build IDs of versions from which the controller should
// remove IgnoreLastModifierKey from the version metadata
RemoveIgnoreLastModifierBuilds []string

// OwnedResources to apply via Server-Side Apply, one per (TWOR × Build ID) pair.
ApplyOwnedResources []planner.OwnedResourceApply
}

// startWorkflowConfig defines a workflow to be started
Expand Down Expand Up @@ -128,6 +132,16 @@ func (r *TemporalWorkerDeploymentReconciler) generatePlan(
RolloutStrategy: rolloutStrategy,
}

// Fetch all TemporalWorkerOwnedResources that reference this TWD so that the planner
// can render one apply action per (TWOR × active Build ID) pair.
var tworList temporaliov1alpha1.TemporalWorkerOwnedResourceList
if err := r.List(ctx, &tworList,
client.InNamespace(w.Namespace),
client.MatchingFields{tworWorkerRefKey: w.Name},
); err != nil {
return nil, fmt.Errorf("unable to list TemporalWorkerOwnedResources: %w", err)
}

planResult, err := planner.GeneratePlan(
l,
k8sState,
Expand All @@ -140,6 +154,7 @@ func (r *TemporalWorkerDeploymentReconciler) generatePlan(
r.MaxDeploymentVersionsIneligibleForDeletion,
gateInput,
isGateInputSecret,
tworList.Items,
)
if err != nil {
return nil, fmt.Errorf("error generating plan: %w", err)
Expand All @@ -154,6 +169,7 @@ func (r *TemporalWorkerDeploymentReconciler) generatePlan(
plan.UpdateVersionConfig = planResult.VersionConfig

plan.RemoveIgnoreLastModifierBuilds = planResult.RemoveIgnoreLastModifierBuilds
plan.ApplyOwnedResources = planResult.ApplyOwnedResources

// Convert test workflows
for _, wf := range planResult.TestWorkflows {
Expand Down
32 changes: 32 additions & 0 deletions internal/controller/worker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
// TODO(jlegrone): add this everywhere
deployOwnerKey = ".metadata.controller"
buildIDLabel = "temporal.io/build-id"

// tworWorkerRefKey is the field index key for TemporalWorkerOwnedResource by workerRef.name.
tworWorkerRefKey = ".spec.workerRef.name"
)

// getAPIKeySecretName extracts the secret name from a SecretKeySelector
Expand Down Expand Up @@ -274,18 +277,47 @@
return err
}

// Index TemporalWorkerOwnedResource by spec.workerRef.name for efficient listing.
if err := mgr.GetFieldIndexer().IndexField(context.Background(), &temporaliov1alpha1.TemporalWorkerOwnedResource{}, tworWorkerRefKey, func(rawObj client.Object) []string {
twor, ok := rawObj.(*temporaliov1alpha1.TemporalWorkerOwnedResource)
if !ok {
mgr.GetLogger().Error(fmt.Errorf("error indexing TemporalWorkerOwnedResources"), "could not convert raw object", rawObj)

Check failure on line 284 in internal/controller/worker_controller.go

View workflow job for this annotation

GitHub Actions / golangci

use-errors-new: replace fmt.Errorf by errors.New (revive)
return nil
}
return []string{twor.Spec.WorkerRef.Name}
}); err != nil {
return err
}

recoverPanic := !r.DisableRecoverPanic
return ctrl.NewControllerManagedBy(mgr).
For(&temporaliov1alpha1.TemporalWorkerDeployment{}).
Owns(&appsv1.Deployment{}).
Watches(&temporaliov1alpha1.TemporalConnection{}, handler.EnqueueRequestsFromMapFunc(r.findTWDsUsingConnection)).
Watches(&temporaliov1alpha1.TemporalWorkerOwnedResource{}, handler.EnqueueRequestsFromMapFunc(r.findTWDsForOwnedResource)).
WithOptions(controller.Options{
MaxConcurrentReconciles: 100,
RecoverPanic: &recoverPanic,
}).
Complete(r)
}

// findTWDsForOwnedResource maps a TemporalWorkerOwnedResource to the TWD reconcile request.
func (r *TemporalWorkerDeploymentReconciler) findTWDsForOwnedResource(ctx context.Context, twor client.Object) []reconcile.Request {
tworObj, ok := twor.(*temporaliov1alpha1.TemporalWorkerOwnedResource)
if !ok {
return nil
}
return []reconcile.Request{
{
NamespacedName: types.NamespacedName{
Name: tworObj.Spec.WorkerRef.Name,
Namespace: twor.GetNamespace(),
},
},
}
}

func (r *TemporalWorkerDeploymentReconciler) findTWDsUsingConnection(ctx context.Context, tc client.Object) []reconcile.Request {
var requests []reconcile.Request

Expand Down
5 changes: 1 addition & 4 deletions internal/k8s/deployments.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,7 @@ func NewDeploymentWithOwnerRef(
buildID string,
connection temporaliov1alpha1.TemporalConnectionSpec,
) *appsv1.Deployment {
selectorLabels := map[string]string{
twdNameLabel: TruncateString(CleanStringForDNS(objectMeta.GetName()), 63),
BuildIDLabel: TruncateString(buildID, 63),
}
selectorLabels := ComputeSelectorLabels(objectMeta.GetName(), buildID)

// Set pod labels
podLabels := make(map[string]string)
Expand Down
Loading
Loading