Skip to content

Commit 3954cdd

Browse files
committed
Use PatchStatus in updateCheckState().
1 parent 7506423 commit 3954cdd

File tree

1 file changed

+86
-84
lines changed

1 file changed

+86
-84
lines changed

pkg/controller/admissionchecks/provisioning/controller.go

Lines changed: 86 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -490,119 +490,121 @@ func (c *Controller) syncCheckStates(
490490
log := ctrl.LoggerFrom(ctx)
491491
wlInfo.update(wl, c.clock)
492492
checksMap := slices.ToRefMap(wl.Status.AdmissionChecks, func(c *kueue.AdmissionCheckState) kueue.AdmissionCheckReference { return c.Name })
493-
wlPatch := workload.BaseSSAWorkload(wl, true)
494493
recorderMessages := make([]string, 0, len(checkConfig))
495494
updated := false
496-
for check, prc := range checkConfig {
497-
checkState := *checksMap[check]
498-
//nolint:gocritic // ignore ifElseChain
499-
if prc == nil {
500-
// the check is not active
501-
updated = updateCheckState(&checkState, kueue.CheckStatePending) || updated
502-
updated = updateCheckMessage(&checkState, CheckInactiveMessage) || updated
503-
} else if !c.reqIsNeeded(wl, prc) {
504-
if updateCheckState(&checkState, kueue.CheckStateReady) {
505-
updated = true
506-
checkState.Message = NoRequestNeeded
507-
checkState.PodSetUpdates = nil
508-
}
509-
} else {
510-
pr := activeOrLastPRForChecks[check]
511-
if pr == nil {
512-
return nil
513-
}
514-
log.V(3).Info("Synchronizing admission check state based on provisioning request", "wl", klog.KObj(wl),
515-
"check", check,
516-
"prName", pr.Name,
517-
"failed", isFailed(pr),
518-
"provisioned", isProvisioned(pr),
519-
"accepted", isAccepted(pr),
520-
"bookingExpired", isBookingExpired(pr),
521-
"capacityRevoked", isCapacityRevoked(pr))
522-
backoffBaseSeconds := *prc.Spec.RetryStrategy.BackoffBaseSeconds
523-
backoffMaxSeconds := *prc.Spec.RetryStrategy.BackoffMaxSeconds
524-
backoffLimitCount := *prc.Spec.RetryStrategy.BackoffLimitCount
525-
switch {
526-
case isFailed(pr):
527-
if attempt := getAttempt(log, pr, wl.Name, check); attempt <= backoffLimitCount {
528-
// it is going to be retried
529-
message := fmt.Sprintf("Retrying after failure: %s", apimeta.FindStatusCondition(pr.Status.Conditions, autoscaling.Failed).Message)
530-
updated = updateCheckMessage(&checkState, message) || updated
531-
if getAttempt(log, pr, wl.Name, check) > ptr.Deref(checkState.RetryCount, 0) {
532-
// We don't want to Retry on old ProvisioningRequests
533-
updated = true
534-
updateCheckState(&checkState, kueue.CheckStateRetry)
535-
workload.UpdateAdmissionCheckRequeueState(&checkState, backoffBaseSeconds, backoffMaxSeconds, c.clock)
536-
}
537-
} else {
495+
err := workload.PatchStatus(ctx, c.client, wl, kueue.ProvisioningRequestControllerName, func(wlPatch *kueue.Workload) (bool, error) {
496+
for check, prc := range checkConfig {
497+
checkState := *checksMap[check]
498+
//nolint:gocritic // ignore ifElseChain
499+
if prc == nil {
500+
// the check is not active
501+
updated = updateCheckState(&checkState, kueue.CheckStatePending) || updated
502+
updated = updateCheckMessage(&checkState, CheckInactiveMessage) || updated
503+
} else if !c.reqIsNeeded(wl, prc) {
504+
if updateCheckState(&checkState, kueue.CheckStateReady) {
538505
updated = true
539-
checkState.State = kueue.CheckStateRejected
540-
checkState.Message = apimeta.FindStatusCondition(pr.Status.Conditions, autoscaling.Failed).Message
506+
checkState.Message = NoRequestNeeded
507+
checkState.PodSetUpdates = nil
541508
}
542-
case isCapacityRevoked(pr):
543-
if workload.IsActive(wl) && !workload.IsFinished(wl) {
544-
// We mark the admission check as rejected to trigger workload deactivation.
545-
// This is needed to prevent replacement pods being stuck in the pending phase indefinitely
546-
// as the nodes are already deleted by Cluster Autoscaler.
547-
updated = updateCheckState(&checkState, kueue.CheckStateRejected) || updated
548-
updated = updateCheckMessage(&checkState, apimeta.FindStatusCondition(pr.Status.Conditions, autoscaling.CapacityRevoked).Message) || updated
509+
} else {
510+
pr := activeOrLastPRForChecks[check]
511+
if pr == nil {
512+
return false, nil
549513
}
550-
case isBookingExpired(pr):
551-
if !workload.IsAdmitted(wl) {
514+
log.V(3).Info("Synchronizing admission check state based on provisioning request", "wl", klog.KObj(wl),
515+
"check", check,
516+
"prName", pr.Name,
517+
"failed", isFailed(pr),
518+
"provisioned", isProvisioned(pr),
519+
"accepted", isAccepted(pr),
520+
"bookingExpired", isBookingExpired(pr),
521+
"capacityRevoked", isCapacityRevoked(pr))
522+
backoffBaseSeconds := *prc.Spec.RetryStrategy.BackoffBaseSeconds
523+
backoffMaxSeconds := *prc.Spec.RetryStrategy.BackoffMaxSeconds
524+
backoffLimitCount := *prc.Spec.RetryStrategy.BackoffLimitCount
525+
switch {
526+
case isFailed(pr):
552527
if attempt := getAttempt(log, pr, wl.Name, check); attempt <= backoffLimitCount {
553528
// it is going to be retried
554-
message := fmt.Sprintf("Retrying after booking expired: %s", apimeta.FindStatusCondition(pr.Status.Conditions, autoscaling.BookingExpired).Message)
529+
message := fmt.Sprintf("Retrying after failure: %s", apimeta.FindStatusCondition(pr.Status.Conditions, autoscaling.Failed).Message)
555530
updated = updateCheckMessage(&checkState, message) || updated
556531
if getAttempt(log, pr, wl.Name, check) > ptr.Deref(checkState.RetryCount, 0) {
532+
// We don't want to Retry on old ProvisioningRequests
557533
updated = true
558534
updateCheckState(&checkState, kueue.CheckStateRetry)
559535
workload.UpdateAdmissionCheckRequeueState(&checkState, backoffBaseSeconds, backoffMaxSeconds, c.clock)
560536
}
561537
} else {
562538
updated = true
563539
checkState.State = kueue.CheckStateRejected
564-
checkState.Message = apimeta.FindStatusCondition(pr.Status.Conditions, autoscaling.BookingExpired).Message
540+
checkState.Message = apimeta.FindStatusCondition(pr.Status.Conditions, autoscaling.Failed).Message
565541
}
566-
}
567-
case isProvisioned(pr):
568-
if updateCheckState(&checkState, kueue.CheckStateReady) {
569-
updated = true
570-
// add the pod podSetUpdates
571-
checkState.PodSetUpdates = podSetUpdates(log, wl, pr, prc)
572-
// propagate the message from the provisioning request status into the workload
573-
// to change to the "successfully provisioned" message after provisioning
574-
updateCheckMessage(&checkState, apimeta.FindStatusCondition(pr.Status.Conditions, autoscaling.Provisioned).Message)
575-
}
576-
case isAccepted(pr):
577-
if provisionedCond := apimeta.FindStatusCondition(pr.Status.Conditions, autoscaling.Provisioned); provisionedCond != nil {
578-
// propagate the ETA update from the provisioning request into the workload
579-
updated = updateCheckMessage(&checkState, provisionedCond.Message) || updated
542+
case isCapacityRevoked(pr):
543+
if workload.IsActive(wl) && !workload.IsFinished(wl) {
544+
// We mark the admission check as rejected to trigger workload deactivation.
545+
// This is needed to prevent replacement pods being stuck in the pending phase indefinitely
546+
// as the nodes are already deleted by Cluster Autoscaler.
547+
updated = updateCheckState(&checkState, kueue.CheckStateRejected) || updated
548+
updated = updateCheckMessage(&checkState, apimeta.FindStatusCondition(pr.Status.Conditions, autoscaling.CapacityRevoked).Message) || updated
549+
}
550+
case isBookingExpired(pr):
551+
if !workload.IsAdmitted(wl) {
552+
if attempt := getAttempt(log, pr, wl.Name, check); attempt <= backoffLimitCount {
553+
// it is going to be retried
554+
message := fmt.Sprintf("Retrying after booking expired: %s", apimeta.FindStatusCondition(pr.Status.Conditions, autoscaling.BookingExpired).Message)
555+
updated = updateCheckMessage(&checkState, message) || updated
556+
if getAttempt(log, pr, wl.Name, check) > ptr.Deref(checkState.RetryCount, 0) {
557+
updated = true
558+
updateCheckState(&checkState, kueue.CheckStateRetry)
559+
workload.UpdateAdmissionCheckRequeueState(&checkState, backoffBaseSeconds, backoffMaxSeconds, c.clock)
560+
}
561+
} else {
562+
updated = true
563+
checkState.State = kueue.CheckStateRejected
564+
checkState.Message = apimeta.FindStatusCondition(pr.Status.Conditions, autoscaling.BookingExpired).Message
565+
}
566+
}
567+
case isProvisioned(pr):
568+
if updateCheckState(&checkState, kueue.CheckStateReady) {
569+
updated = true
570+
// add the pod podSetUpdates
571+
checkState.PodSetUpdates = podSetUpdates(log, wl, pr, prc)
572+
// propagate the message from the provisioning request status into the workload
573+
// to change to the "successfully provisioned" message after provisioning
574+
updateCheckMessage(&checkState, apimeta.FindStatusCondition(pr.Status.Conditions, autoscaling.Provisioned).Message)
575+
}
576+
case isAccepted(pr):
577+
if provisionedCond := apimeta.FindStatusCondition(pr.Status.Conditions, autoscaling.Provisioned); provisionedCond != nil {
578+
// propagate the ETA update from the provisioning request into the workload
579+
updated = updateCheckMessage(&checkState, provisionedCond.Message) || updated
580+
updated = updateCheckState(&checkState, kueue.CheckStatePending) || updated
581+
}
582+
default:
580583
updated = updateCheckState(&checkState, kueue.CheckStatePending) || updated
581584
}
582-
default:
583-
updated = updateCheckState(&checkState, kueue.CheckStatePending) || updated
584585
}
585-
}
586586

587-
existingCondition := admissioncheck.FindAdmissionCheck(wlPatch.Status.AdmissionChecks, checkState.Name)
588-
if existingCondition != nil && existingCondition.State != checkState.State {
589-
message := fmt.Sprintf("Admission check %s updated state from %s to %s", checkState.Name, existingCondition.State, checkState.State)
590-
if checkState.Message != "" {
591-
message += fmt.Sprintf(" with message %s", checkState.Message)
587+
existingCondition := admissioncheck.FindAdmissionCheck(wlPatch.Status.AdmissionChecks, checkState.Name)
588+
if existingCondition != nil && existingCondition.State != checkState.State {
589+
message := fmt.Sprintf("Admission check %s updated state from %s to %s", checkState.Name, existingCondition.State, checkState.State)
590+
if checkState.Message != "" {
591+
message += fmt.Sprintf(" with message %s", checkState.Message)
592+
}
593+
recorderMessages = append(recorderMessages, message)
592594
}
593-
recorderMessages = append(recorderMessages, message)
595+
workload.SetAdmissionCheckState(&wlPatch.Status.AdmissionChecks, checkState, c.clock)
594596
}
595-
workload.SetAdmissionCheckState(&wlPatch.Status.AdmissionChecks, checkState, c.clock)
597+
return updated, nil
598+
})
599+
if err != nil {
600+
return err
596601
}
597602
if updated {
598-
if err := c.client.Status().Patch(ctx, wlPatch, client.Apply, client.FieldOwner(kueue.ProvisioningRequestControllerName), client.ForceOwnership); err != nil {
599-
return err
600-
}
601603
for i := range recorderMessages {
602604
c.record.Event(wl, corev1.EventTypeNormal, "AdmissionCheckUpdated", api.TruncateEventMessage(recorderMessages[i]))
603605
}
604606
}
605-
wlInfo.update(wlPatch, c.clock)
607+
wlInfo.update(wl, c.clock)
606608
return nil
607609
}
608610

0 commit comments

Comments
 (0)