Skip to content

Commit 3bb8eb4

Browse files
committed
optimize to scheduler some workload with zero replicas
Signed-off-by: LivingCcj <livingccj@163.com>
1 parent 63ad83d commit 3bb8eb4

File tree

2 files changed

+94
-43
lines changed

2 files changed

+94
-43
lines changed

pkg/scheduler/event_handler.go

+8
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,10 @@ func (s *Scheduler) onResourceBindingUpdate(old, cur interface{}) {
202202

203203
func (s *Scheduler) onResourceBindingRequeue(binding *workv1alpha2.ResourceBinding, event string) {
204204
klog.Infof("Requeue ResourceBinding(%s/%s) due to event(%s).", binding.Namespace, binding.Name, event)
205+
if s.isWorkloadByRB(binding) && binding.Spec.Replicas == 0 {
206+
klog.Infof("Skip requeue ResourceBinding(%s) due to event(%s) as workload replicas is 0", binding.Name, event)
207+
return
208+
}
205209
if features.FeatureGate.Enabled(features.PriorityBasedScheduling) {
206210
s.priorityQueue.Push(&internalqueue.QueuedBindingInfo{
207211
NamespacedKey: cache.ObjectName{Namespace: binding.Namespace, Name: binding.Name}.String(),
@@ -221,6 +225,10 @@ func (s *Scheduler) onResourceBindingRequeue(binding *workv1alpha2.ResourceBindi
221225

222226
func (s *Scheduler) onClusterResourceBindingRequeue(clusterResourceBinding *workv1alpha2.ClusterResourceBinding, event string) {
223227
klog.Infof("Requeue ClusterResourceBinding(%s) due to event(%s).", clusterResourceBinding.Name, event)
228+
if s.isWorkloadByCRB(clusterResourceBinding) && clusterResourceBinding.Spec.Replicas == 0 {
229+
klog.Infof("Skip requeue ClusterResourceBinding(%s) due to event(%s) as workload replicas is 0", clusterResourceBinding.Name, event)
230+
return
231+
}
224232
if features.FeatureGate.Enabled(features.PriorityBasedScheduling) {
225233
s.priorityQueue.Push(&internalqueue.QueuedBindingInfo{
226234
NamespacedKey: clusterResourceBinding.Name,

pkg/scheduler/scheduler.go

+86-43
Original file line numberDiff line numberDiff line change
@@ -355,7 +355,6 @@ func (s *Scheduler) scheduleNext() bool {
355355
return false
356356
}
357357
defer s.queue.Done(key)
358-
359358
err := s.doSchedule(key.(string))
360359
s.legacyHandleErr(err, key)
361360
}
@@ -420,7 +419,7 @@ func (s *Scheduler) doScheduleBinding(namespace, name string) (err error) {
420419
metrics.BindingSchedule(string(ReconcileSchedule), utilmetrics.DurationInSeconds(start), err)
421420
return err
422421
}
423-
if rb.Spec.Replicas == 0 ||
422+
if !s.isWorkloadByRB(rb) ||
424423
rb.Spec.Placement.ReplicaSchedulingType() == policyv1alpha1.ReplicaSchedulingTypeDuplicated {
425424
// Duplicated resources should always be scheduled. Note: non-workload is considered as duplicated
426425
// even if scheduling type is divided.
@@ -490,7 +489,7 @@ func (s *Scheduler) doScheduleClusterBinding(name string) (err error) {
490489
metrics.BindingSchedule(string(ReconcileSchedule), utilmetrics.DurationInSeconds(start), err)
491490
return err
492491
}
493-
if crb.Spec.Replicas == 0 ||
492+
if !s.isWorkloadByCRB(crb) ||
494493
crb.Spec.Placement.ReplicaSchedulingType() == policyv1alpha1.ReplicaSchedulingTypeDuplicated {
495494
// Duplicated resources should always be scheduled. Note: non-workload is considered as duplicated
496495
// even if scheduling type is divided.
@@ -541,14 +540,19 @@ func (s *Scheduler) scheduleResourceBindingWithClusterAffinity(rb *workv1alpha2.
541540
klog.V(4).ErrorS(err, "Failed to marshal binding placement", "resourceBinding", klog.KObj(rb))
542541
return err
543542
}
544-
545-
scheduleResult, err := s.Algorithm.Schedule(context.TODO(), &rb.Spec, &rb.Status, &core.ScheduleAlgorithmOption{EnableEmptyWorkloadPropagation: s.enableEmptyWorkloadPropagation})
546-
var fitErr *framework.FitError
547-
// in case of no cluster error, can not return but continue to patch(cleanup) the result.
548-
if err != nil && !errors.As(err, &fitErr) {
549-
s.recordScheduleResultEventForResourceBinding(rb, nil, err)
550-
klog.Errorf("Failed scheduling ResourceBinding(%s/%s): %v", rb.Namespace, rb.Name, err)
551-
return err
543+
var scheduleResult core.ScheduleResult
544+
// scale down to zero replicas
545+
if s.isWorkloadByRB(rb) && rb.Spec.Replicas == 0 {
546+
scheduleResult = s.scaleDownToZeroReplicasWithClusters(rb.Spec.Clusters)
547+
} else {
548+
scheduleResult, err = s.Algorithm.Schedule(context.TODO(), &rb.Spec, &rb.Status, &core.ScheduleAlgorithmOption{EnableEmptyWorkloadPropagation: s.enableEmptyWorkloadPropagation})
549+
var fitErr *framework.FitError
550+
// in case of no cluster error, can not return but continue to patch(cleanup) the result.
551+
if err != nil && !errors.As(err, &fitErr) {
552+
s.recordScheduleResultEventForResourceBinding(rb, nil, err)
553+
klog.Errorf("Failed scheduling ResourceBinding(%s/%s): %v", rb.Namespace, rb.Name, err)
554+
return err
555+
}
552556
}
553557

554558
klog.V(4).Infof("ResourceBinding(%s/%s) scheduled to clusters %v", rb.Namespace, rb.Name, scheduleResult.SuggestedClusters)
@@ -580,20 +584,24 @@ func (s *Scheduler) scheduleResourceBindingWithClusterAffinities(rb *workv1alpha
580584
for affinityIndex < len(rb.Spec.Placement.ClusterAffinities) {
581585
klog.V(4).Infof("Schedule ResourceBinding(%s/%s) with clusterAffiliates index(%d)", rb.Namespace, rb.Name, affinityIndex)
582586
updatedStatus.SchedulerObservedAffinityName = rb.Spec.Placement.ClusterAffinities[affinityIndex].AffinityName
583-
scheduleResult, err = s.Algorithm.Schedule(context.TODO(), &rb.Spec, updatedStatus, &core.ScheduleAlgorithmOption{EnableEmptyWorkloadPropagation: s.enableEmptyWorkloadPropagation})
584-
if err == nil {
585-
break
586-
}
587+
if s.isWorkloadByRB(rb) && rb.Spec.Replicas == 0 {
588+
scheduleResult = s.scaleDownToZeroReplicasWithClusters(rb.Spec.Clusters)
589+
} else {
590+
scheduleResult, err = s.Algorithm.Schedule(context.TODO(), &rb.Spec, updatedStatus, &core.ScheduleAlgorithmOption{EnableEmptyWorkloadPropagation: s.enableEmptyWorkloadPropagation})
591+
if err == nil {
592+
break
593+
}
587594

588-
// obtain to err of the first scheduling
589-
if firstErr == nil {
590-
firstErr = err
591-
}
595+
// obtain to err of the first scheduling
596+
if firstErr == nil {
597+
firstErr = err
598+
}
592599

593-
err = fmt.Errorf("failed to schedule ResourceBinding(%s/%s) with clusterAffiliates index(%d): %v", rb.Namespace, rb.Name, affinityIndex, err)
594-
klog.Error(err)
595-
s.recordScheduleResultEventForResourceBinding(rb, nil, err)
596-
affinityIndex++
600+
err = fmt.Errorf("failed to schedule ResourceBinding(%s/%s) with clusterAffiliates index(%d): %v", rb.Namespace, rb.Name, affinityIndex, err)
601+
klog.Error(err)
602+
s.recordScheduleResultEventForResourceBinding(rb, nil, err)
603+
affinityIndex++
604+
}
597605
}
598606

599607
if affinityIndex >= len(rb.Spec.Placement.ClusterAffinities) {
@@ -679,14 +687,18 @@ func (s *Scheduler) scheduleClusterResourceBindingWithClusterAffinity(crb *workv
679687
klog.V(4).ErrorS(err, "Failed to marshal binding placement", "clusterResourceBinding", klog.KObj(crb))
680688
return err
681689
}
682-
683-
scheduleResult, err := s.Algorithm.Schedule(context.TODO(), &crb.Spec, &crb.Status, &core.ScheduleAlgorithmOption{EnableEmptyWorkloadPropagation: s.enableEmptyWorkloadPropagation})
684-
var fitErr *framework.FitError
685-
// in case of no cluster error, can not return but continue to patch(cleanup) the result.
686-
if err != nil && !errors.As(err, &fitErr) {
687-
s.recordScheduleResultEventForClusterResourceBinding(crb, nil, err)
688-
klog.Errorf("Failed scheduling clusterResourceBinding(%s): %v", crb.Name, err)
689-
return err
690+
var scheduleResult core.ScheduleResult
691+
if s.isWorkloadByCRB(crb) && crb.Spec.Replicas == 0 {
692+
scheduleResult = s.scaleDownToZeroReplicasWithClusters(crb.Spec.Clusters)
693+
} else {
694+
scheduleResult, err = s.Algorithm.Schedule(context.TODO(), &crb.Spec, &crb.Status, &core.ScheduleAlgorithmOption{EnableEmptyWorkloadPropagation: s.enableEmptyWorkloadPropagation})
695+
var fitErr *framework.FitError
696+
// in case of no cluster error, can not return but continue to patch(cleanup) the result.
697+
if err != nil && !errors.As(err, &fitErr) {
698+
s.recordScheduleResultEventForClusterResourceBinding(crb, nil, err)
699+
klog.Errorf("Failed scheduling clusterResourceBinding(%s): %v", crb.Name, err)
700+
return err
701+
}
690702
}
691703

692704
klog.V(4).Infof("clusterResourceBinding(%s) scheduled to clusters %v", crb.Name, scheduleResult.SuggestedClusters)
@@ -718,20 +730,24 @@ func (s *Scheduler) scheduleClusterResourceBindingWithClusterAffinities(crb *wor
718730
for affinityIndex < len(crb.Spec.Placement.ClusterAffinities) {
719731
klog.V(4).Infof("Schedule ClusterResourceBinding(%s) with clusterAffiliates index(%d)", crb.Name, affinityIndex)
720732
updatedStatus.SchedulerObservedAffinityName = crb.Spec.Placement.ClusterAffinities[affinityIndex].AffinityName
721-
scheduleResult, err = s.Algorithm.Schedule(context.TODO(), &crb.Spec, updatedStatus, &core.ScheduleAlgorithmOption{EnableEmptyWorkloadPropagation: s.enableEmptyWorkloadPropagation})
722-
if err == nil {
723-
break
724-
}
733+
if s.isWorkloadByCRB(crb) && crb.Spec.Replicas == 0 {
734+
scheduleResult = s.scaleDownToZeroReplicasWithClusters(crb.Spec.Clusters)
735+
} else {
736+
scheduleResult, err = s.Algorithm.Schedule(context.TODO(), &crb.Spec, updatedStatus, &core.ScheduleAlgorithmOption{EnableEmptyWorkloadPropagation: s.enableEmptyWorkloadPropagation})
737+
if err == nil {
738+
break
739+
}
725740

726-
// obtain to err of the first scheduling
727-
if firstErr == nil {
728-
firstErr = err
729-
}
741+
// obtain to err of the first scheduling
742+
if firstErr == nil {
743+
firstErr = err
744+
}
730745

731-
err = fmt.Errorf("failed to schedule ClusterResourceBinding(%s) with clusterAffiliates index(%d): %v", crb.Name, affinityIndex, err)
732-
klog.Error(err)
733-
s.recordScheduleResultEventForClusterResourceBinding(crb, nil, err)
734-
affinityIndex++
746+
err = fmt.Errorf("failed to schedule ClusterResourceBinding(%s) with clusterAffiliates index(%d): %v", crb.Name, affinityIndex, err)
747+
klog.Error(err)
748+
s.recordScheduleResultEventForClusterResourceBinding(crb, nil, err)
749+
affinityIndex++
750+
}
735751
}
736752

737753
if affinityIndex >= len(crb.Spec.Placement.ClusterAffinities) {
@@ -860,6 +876,33 @@ func (s *Scheduler) establishEstimatorConnections() {
860876
}
861877
}
862878

879+
func (s *Scheduler) isWorkloadByRB(rb *workv1alpha2.ResourceBinding) bool {
880+
// workload is defined as the resource binding with non-nil resource request field
881+
if rb.Spec.ReplicaRequirements == nil || rb.Spec.ReplicaRequirements.ResourceRequest == nil {
882+
return false
883+
}
884+
return true
885+
}
886+
887+
func (s *Scheduler) isWorkloadByCRB(crb *workv1alpha2.ClusterResourceBinding) bool {
888+
// workload is defined as the cluster resource binding with non-nil resource request field
889+
if crb.Spec.ReplicaRequirements == nil || crb.Spec.ReplicaRequirements.ResourceRequest == nil {
890+
return false
891+
}
892+
return true
893+
}
894+
895+
func (s *Scheduler) scaleDownToZeroReplicasWithClusters(clusters []workv1alpha2.TargetCluster) core.ScheduleResult {
896+
var newCluster []workv1alpha2.TargetCluster
897+
for _, cluster := range clusters {
898+
cluster.Replicas = 0
899+
newCluster = append(newCluster, cluster)
900+
}
901+
return core.ScheduleResult{
902+
SuggestedClusters: newCluster,
903+
}
904+
}
905+
863906
// patchBindingStatusCondition patches schedule status condition of ResourceBinding when necessary.
864907
func patchBindingStatusCondition(karmadaClient karmadaclientset.Interface, rb *workv1alpha2.ResourceBinding, newScheduledCondition metav1.Condition) error {
865908
klog.V(4).Infof("Begin to patch status condition to ResourceBinding(%s/%s)", rb.Namespace, rb.Name)

0 commit comments

Comments
 (0)