Skip to content

Commit cff9957

Browse files
authored
Use patch status on provisioning controller. (#8128)
* Use PatchStatus in handleError(). * Use PatchStatus in updateCheckState().
1 parent 598f652 commit cff9957

File tree

1 file changed

+91
-95
lines changed

1 file changed

+91
-95
lines changed

pkg/controller/admissionchecks/provisioning/controller.go

Lines changed: 91 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -324,17 +324,11 @@ func (c *Controller) syncOwnedProvisionRequest(
324324

325325
func (c *Controller) handleError(ctx context.Context, wl *kueue.Workload, ac *kueue.AdmissionCheckState, msg string, err error) error {
326326
c.record.Eventf(wl, corev1.EventTypeWarning, "FailedCreate", api.TruncateEventMessage(msg))
327-
328-
ac.Message = api.TruncateConditionMessage(msg)
329-
wlPatch := workload.BaseSSAWorkload(wl, true)
330-
workload.SetAdmissionCheckState(&wlPatch.Status.AdmissionChecks, *ac, c.clock)
331-
332-
patchErr := c.client.Status().Patch(
333-
ctx, wlPatch, client.Apply,
334-
client.FieldOwner(kueue.ProvisioningRequestControllerName),
335-
client.ForceOwnership,
336-
)
337-
327+
patchErr := workload.PatchStatus(ctx, c.client, wl, kueue.ProvisioningRequestControllerName, func(wl *kueue.Workload) (bool, error) {
328+
ac.Message = api.TruncateConditionMessage(msg)
329+
ac.LastTransitionTime = metav1.NewTime(c.clock.Now())
330+
return workload.SetAdmissionCheckState(&wl.Status.AdmissionChecks, *ac, c.clock), nil
331+
})
338332
return errors.Join(err, patchErr)
339333
}
340334

@@ -496,119 +490,121 @@ func (c *Controller) syncCheckStates(
496490
log := ctrl.LoggerFrom(ctx)
497491
wlInfo.update(wl, c.clock)
498492
checksMap := slices.ToRefMap(wl.Status.AdmissionChecks, func(c *kueue.AdmissionCheckState) kueue.AdmissionCheckReference { return c.Name })
499-
wlPatch := workload.BaseSSAWorkload(wl, true)
500493
recorderMessages := make([]string, 0, len(checkConfig))
501494
updated := false
502-
for check, prc := range checkConfig {
503-
checkState := *checksMap[check]
504-
//nolint:gocritic // ignore ifElseChain
505-
if prc == nil {
506-
// the check is not active
507-
updated = updateCheckState(&checkState, kueue.CheckStatePending) || updated
508-
updated = updateCheckMessage(&checkState, CheckInactiveMessage) || updated
509-
} else if !c.reqIsNeeded(wl, prc) {
510-
if updateCheckState(&checkState, kueue.CheckStateReady) {
511-
updated = true
512-
checkState.Message = NoRequestNeeded
513-
checkState.PodSetUpdates = nil
514-
}
515-
} else {
516-
pr := activeOrLastPRForChecks[check]
517-
if pr == nil {
518-
return nil
519-
}
520-
log.V(3).Info("Synchronizing admission check state based on provisioning request", "wl", klog.KObj(wl),
521-
"check", check,
522-
"prName", pr.Name,
523-
"failed", isFailed(pr),
524-
"provisioned", isProvisioned(pr),
525-
"accepted", isAccepted(pr),
526-
"bookingExpired", isBookingExpired(pr),
527-
"capacityRevoked", isCapacityRevoked(pr))
528-
backoffBaseSeconds := *prc.Spec.RetryStrategy.BackoffBaseSeconds
529-
backoffMaxSeconds := *prc.Spec.RetryStrategy.BackoffMaxSeconds
530-
backoffLimitCount := *prc.Spec.RetryStrategy.BackoffLimitCount
531-
switch {
532-
case isFailed(pr):
533-
if attempt := getAttempt(log, pr, wl.Name, check); attempt <= backoffLimitCount {
534-
// it is going to be retried
535-
message := fmt.Sprintf("Retrying after failure: %s", apimeta.FindStatusCondition(pr.Status.Conditions, autoscaling.Failed).Message)
536-
updated = updateCheckMessage(&checkState, message) || updated
537-
if getAttempt(log, pr, wl.Name, check) > ptr.Deref(checkState.RetryCount, 0) {
538-
// We don't want to Retry on old ProvisioningRequests
539-
updated = true
540-
updateCheckState(&checkState, kueue.CheckStateRetry)
541-
workload.UpdateAdmissionCheckRequeueState(&checkState, backoffBaseSeconds, backoffMaxSeconds, c.clock)
542-
}
543-
} else {
495+
err := workload.PatchStatus(ctx, c.client, wl, kueue.ProvisioningRequestControllerName, func(wlPatch *kueue.Workload) (bool, error) {
496+
for check, prc := range checkConfig {
497+
checkState := *checksMap[check]
498+
//nolint:gocritic // ignore ifElseChain
499+
if prc == nil {
500+
// the check is not active
501+
updated = updateCheckState(&checkState, kueue.CheckStatePending) || updated
502+
updated = updateCheckMessage(&checkState, CheckInactiveMessage) || updated
503+
} else if !c.reqIsNeeded(wl, prc) {
504+
if updateCheckState(&checkState, kueue.CheckStateReady) {
544505
updated = true
545-
checkState.State = kueue.CheckStateRejected
546-
checkState.Message = apimeta.FindStatusCondition(pr.Status.Conditions, autoscaling.Failed).Message
506+
checkState.Message = NoRequestNeeded
507+
checkState.PodSetUpdates = nil
547508
}
548-
case isCapacityRevoked(pr):
549-
if workload.IsActive(wl) && !workload.IsFinished(wl) {
550-
// We mark the admission check as rejected to trigger workload deactivation.
551-
// This is needed to prevent replacement pods being stuck in the pending phase indefinitely
552-
// as the nodes are already deleted by Cluster Autoscaler.
553-
updated = updateCheckState(&checkState, kueue.CheckStateRejected) || updated
554-
updated = updateCheckMessage(&checkState, apimeta.FindStatusCondition(pr.Status.Conditions, autoscaling.CapacityRevoked).Message) || updated
509+
} else {
510+
pr := activeOrLastPRForChecks[check]
511+
if pr == nil {
512+
return false, nil
555513
}
556-
case isBookingExpired(pr):
557-
if !workload.IsAdmitted(wl) {
514+
log.V(3).Info("Synchronizing admission check state based on provisioning request", "wl", klog.KObj(wl),
515+
"check", check,
516+
"prName", pr.Name,
517+
"failed", isFailed(pr),
518+
"provisioned", isProvisioned(pr),
519+
"accepted", isAccepted(pr),
520+
"bookingExpired", isBookingExpired(pr),
521+
"capacityRevoked", isCapacityRevoked(pr))
522+
backoffBaseSeconds := *prc.Spec.RetryStrategy.BackoffBaseSeconds
523+
backoffMaxSeconds := *prc.Spec.RetryStrategy.BackoffMaxSeconds
524+
backoffLimitCount := *prc.Spec.RetryStrategy.BackoffLimitCount
525+
switch {
526+
case isFailed(pr):
558527
if attempt := getAttempt(log, pr, wl.Name, check); attempt <= backoffLimitCount {
559528
// it is going to be retried
560-
message := fmt.Sprintf("Retrying after booking expired: %s", apimeta.FindStatusCondition(pr.Status.Conditions, autoscaling.BookingExpired).Message)
529+
message := fmt.Sprintf("Retrying after failure: %s", apimeta.FindStatusCondition(pr.Status.Conditions, autoscaling.Failed).Message)
561530
updated = updateCheckMessage(&checkState, message) || updated
562531
if getAttempt(log, pr, wl.Name, check) > ptr.Deref(checkState.RetryCount, 0) {
532+
// We don't want to Retry on old ProvisioningRequests
563533
updated = true
564534
updateCheckState(&checkState, kueue.CheckStateRetry)
565535
workload.UpdateAdmissionCheckRequeueState(&checkState, backoffBaseSeconds, backoffMaxSeconds, c.clock)
566536
}
567537
} else {
568538
updated = true
569539
checkState.State = kueue.CheckStateRejected
570-
checkState.Message = apimeta.FindStatusCondition(pr.Status.Conditions, autoscaling.BookingExpired).Message
540+
checkState.Message = apimeta.FindStatusCondition(pr.Status.Conditions, autoscaling.Failed).Message
571541
}
572-
}
573-
case isProvisioned(pr):
574-
if updateCheckState(&checkState, kueue.CheckStateReady) {
575-
updated = true
576-
// add the pod podSetUpdates
577-
checkState.PodSetUpdates = podSetUpdates(log, wl, pr, prc)
578-
// propagate the message from the provisioning request status into the workload
579-
// to change to the "successfully provisioned" message after provisioning
580-
updateCheckMessage(&checkState, apimeta.FindStatusCondition(pr.Status.Conditions, autoscaling.Provisioned).Message)
581-
}
582-
case isAccepted(pr):
583-
if provisionedCond := apimeta.FindStatusCondition(pr.Status.Conditions, autoscaling.Provisioned); provisionedCond != nil {
584-
// propagate the ETA update from the provisioning request into the workload
585-
updated = updateCheckMessage(&checkState, provisionedCond.Message) || updated
542+
case isCapacityRevoked(pr):
543+
if workload.IsActive(wl) && !workload.IsFinished(wl) {
544+
// We mark the admission check as rejected to trigger workload deactivation.
545+
// This is needed to prevent replacement pods being stuck in the pending phase indefinitely
546+
// as the nodes are already deleted by Cluster Autoscaler.
547+
updated = updateCheckState(&checkState, kueue.CheckStateRejected) || updated
548+
updated = updateCheckMessage(&checkState, apimeta.FindStatusCondition(pr.Status.Conditions, autoscaling.CapacityRevoked).Message) || updated
549+
}
550+
case isBookingExpired(pr):
551+
if !workload.IsAdmitted(wl) {
552+
if attempt := getAttempt(log, pr, wl.Name, check); attempt <= backoffLimitCount {
553+
// it is going to be retried
554+
message := fmt.Sprintf("Retrying after booking expired: %s", apimeta.FindStatusCondition(pr.Status.Conditions, autoscaling.BookingExpired).Message)
555+
updated = updateCheckMessage(&checkState, message) || updated
556+
if getAttempt(log, pr, wl.Name, check) > ptr.Deref(checkState.RetryCount, 0) {
557+
updated = true
558+
updateCheckState(&checkState, kueue.CheckStateRetry)
559+
workload.UpdateAdmissionCheckRequeueState(&checkState, backoffBaseSeconds, backoffMaxSeconds, c.clock)
560+
}
561+
} else {
562+
updated = true
563+
checkState.State = kueue.CheckStateRejected
564+
checkState.Message = apimeta.FindStatusCondition(pr.Status.Conditions, autoscaling.BookingExpired).Message
565+
}
566+
}
567+
case isProvisioned(pr):
568+
if updateCheckState(&checkState, kueue.CheckStateReady) {
569+
updated = true
570+
// add the pod podSetUpdates
571+
checkState.PodSetUpdates = podSetUpdates(log, wl, pr, prc)
572+
// propagate the message from the provisioning request status into the workload
573+
// to change to the "successfully provisioned" message after provisioning
574+
updateCheckMessage(&checkState, apimeta.FindStatusCondition(pr.Status.Conditions, autoscaling.Provisioned).Message)
575+
}
576+
case isAccepted(pr):
577+
if provisionedCond := apimeta.FindStatusCondition(pr.Status.Conditions, autoscaling.Provisioned); provisionedCond != nil {
578+
// propagate the ETA update from the provisioning request into the workload
579+
updated = updateCheckMessage(&checkState, provisionedCond.Message) || updated
580+
updated = updateCheckState(&checkState, kueue.CheckStatePending) || updated
581+
}
582+
default:
586583
updated = updateCheckState(&checkState, kueue.CheckStatePending) || updated
587584
}
588-
default:
589-
updated = updateCheckState(&checkState, kueue.CheckStatePending) || updated
590585
}
591-
}
592586

593-
existingCondition := admissioncheck.FindAdmissionCheck(wlPatch.Status.AdmissionChecks, checkState.Name)
594-
if existingCondition != nil && existingCondition.State != checkState.State {
595-
message := fmt.Sprintf("Admission check %s updated state from %s to %s", checkState.Name, existingCondition.State, checkState.State)
596-
if checkState.Message != "" {
597-
message += fmt.Sprintf(" with message %s", checkState.Message)
587+
existingCondition := admissioncheck.FindAdmissionCheck(wlPatch.Status.AdmissionChecks, checkState.Name)
588+
if existingCondition != nil && existingCondition.State != checkState.State {
589+
message := fmt.Sprintf("Admission check %s updated state from %s to %s", checkState.Name, existingCondition.State, checkState.State)
590+
if checkState.Message != "" {
591+
message += fmt.Sprintf(" with message %s", checkState.Message)
592+
}
593+
recorderMessages = append(recorderMessages, message)
598594
}
599-
recorderMessages = append(recorderMessages, message)
595+
workload.SetAdmissionCheckState(&wlPatch.Status.AdmissionChecks, checkState, c.clock)
600596
}
601-
workload.SetAdmissionCheckState(&wlPatch.Status.AdmissionChecks, checkState, c.clock)
597+
return updated, nil
598+
})
599+
if err != nil {
600+
return err
602601
}
603602
if updated {
604-
if err := c.client.Status().Patch(ctx, wlPatch, client.Apply, client.FieldOwner(kueue.ProvisioningRequestControllerName), client.ForceOwnership); err != nil {
605-
return err
606-
}
607603
for i := range recorderMessages {
608604
c.record.Event(wl, corev1.EventTypeNormal, "AdmissionCheckUpdated", api.TruncateEventMessage(recorderMessages[i]))
609605
}
610606
}
611-
wlInfo.update(wlPatch, c.clock)
607+
wlInfo.update(wl, c.clock)
612608
return nil
613609
}
614610

0 commit comments

Comments
 (0)