2022-03-27 23:51:20 +08:00
/ *
Copyright 2014 The Kubernetes Authors .
Licensed under the Apache License , Version 2.0 ( the "License" ) ;
you may not use this file except in compliance with the License .
You may obtain a copy of the License at
http : //www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing , software
distributed under the License is distributed on an "AS IS" BASIS ,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
See the License for the specific language governing permissions and
limitations under the License .
* /
package scheduler
import (
2022-12-25 11:51:42 +08:00
"container/heap"
2022-03-27 23:51:20 +08:00
"context"
2022-12-25 11:51:42 +08:00
"errors"
2022-03-27 23:51:20 +08:00
"fmt"
"math/rand"
"strconv"
"sync"
"sync/atomic"
"time"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
extenderv1 "k8s.io/kube-scheduler/extender/v1"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/apis/core/validation"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/parallelize"
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
"k8s.io/kubernetes/pkg/scheduler/metrics"
"k8s.io/kubernetes/pkg/scheduler/util"
utiltrace "k8s.io/utils/trace"
)
const (
// Percentage of plugin metrics to be sampled.
pluginMetricsSamplePercent = 10
// minFeasibleNodesToFind is the minimum number of nodes that would be scored
// in each scheduling cycle. This is a semi-arbitrary value to ensure that a
// certain minimum of nodes are checked for feasibility. This in turn helps
// ensure a minimum level of spreading.
minFeasibleNodesToFind = 100
// minFeasibleNodesPercentageToFind is the minimum percentage of nodes that
// would be scored in each scheduling cycle. This is a semi-arbitrary value
// to ensure that a certain minimum of nodes are checked for feasibility.
// This in turn helps ensure a minimum level of spreading.
minFeasibleNodesPercentageToFind = 5
2022-12-25 11:51:42 +08:00
// numberOfHighestScoredNodesToReport is the number of node scores
// to be included in ScheduleResult.
numberOfHighestScoredNodesToReport = 3
2022-03-27 23:51:20 +08:00
)
// scheduleOne does the entire scheduling workflow for a single pod. It is serialized on the scheduling algorithm's host fitting.
func ( sched * Scheduler ) scheduleOne ( ctx context . Context ) {
2023-03-22 19:48:04 +08:00
logger := klog . FromContext ( ctx )
2023-09-29 10:59:22 +08:00
podInfo , err := sched . NextPod ( logger )
2023-07-18 06:53:07 +08:00
if err != nil {
logger . Error ( err , "Error while retrieving next pod from scheduling queue" )
return
}
2022-03-27 23:51:20 +08:00
// pod could be nil when schedulerQueue is closed
if podInfo == nil || podInfo . Pod == nil {
return
}
2023-07-18 06:53:07 +08:00
2022-03-27 23:51:20 +08:00
pod := podInfo . Pod
2023-11-02 21:36:23 +08:00
// TODO(knelasevero): Remove duplicated keys from log entry calls
// When contextualized logging hits GA
// https://github.com/kubernetes/kubernetes/issues/111672
logger = klog . LoggerWithValues ( logger , "pod" , klog . KObj ( pod ) )
ctx = klog . NewContext ( ctx , logger )
2023-07-18 06:53:07 +08:00
logger . V ( 4 ) . Info ( "About to try and schedule pod" , "pod" , klog . KObj ( pod ) )
2022-03-27 23:51:20 +08:00
fwk , err := sched . frameworkForPod ( pod )
if err != nil {
// This shouldn't happen, because we only accept for scheduling the pods
// which specify a scheduler name that matches one of the profiles.
2023-03-22 19:48:04 +08:00
logger . Error ( err , "Error occurred" )
2022-03-27 23:51:20 +08:00
return
}
2023-03-22 19:48:04 +08:00
if sched . skipPodSchedule ( ctx , fwk , pod ) {
2022-03-27 23:51:20 +08:00
return
}
2023-03-22 19:48:04 +08:00
logger . V ( 3 ) . Info ( "Attempting to schedule pod" , "pod" , klog . KObj ( pod ) )
2022-03-27 23:51:20 +08:00
// Synchronously attempt to find a fit for the pod.
start := time . Now ( )
state := framework . NewCycleState ( )
state . SetRecordPluginMetrics ( rand . Intn ( 100 ) < pluginMetricsSamplePercent )
2022-08-10 13:11:36 +08:00
2022-03-27 23:51:20 +08:00
// Initialize an empty podsToActivate struct, which will be filled up by plugins or stay empty.
podsToActivate := framework . NewPodsToActivate ( )
state . Write ( framework . PodsToActivateKey , podsToActivate )
schedulingCycleCtx , cancel := context . WithCancel ( ctx )
defer cancel ( )
2022-10-21 13:53:18 +08:00
2022-12-07 18:46:36 +08:00
scheduleResult , assumedPodInfo , status := sched . schedulingCycle ( schedulingCycleCtx , state , fwk , podInfo , start , podsToActivate )
if ! status . IsSuccess ( ) {
sched . FailureHandler ( schedulingCycleCtx , fwk , assumedPodInfo , status , scheduleResult . nominatingInfo , start )
2022-08-10 13:11:36 +08:00
return
}
// bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
go func ( ) {
bindingCycleCtx , cancel := context . WithCancel ( ctx )
defer cancel ( )
2022-08-24 18:10:32 +08:00
metrics . Goroutines . WithLabelValues ( metrics . Binding ) . Inc ( )
defer metrics . Goroutines . WithLabelValues ( metrics . Binding ) . Dec ( )
2022-08-10 13:11:36 +08:00
2022-10-21 13:53:18 +08:00
status := sched . bindingCycle ( bindingCycleCtx , state , fwk , scheduleResult , assumedPodInfo , start , podsToActivate )
if ! status . IsSuccess ( ) {
sched . handleBindingCycleError ( bindingCycleCtx , state , fwk , assumedPodInfo , start , scheduleResult , status )
2023-08-02 02:00:16 +08:00
return
2022-10-21 13:53:18 +08:00
}
2023-07-18 06:53:07 +08:00
// Usually, DonePod is called inside the scheduling queue,
// but in this case, we need to call it here because this Pod won't go back to the scheduling queue.
sched . SchedulingQueue . Done ( assumedPodInfo . Pod . UID )
2022-08-10 13:11:36 +08:00
} ( )
}
var clearNominatedNode = & framework . NominatingInfo { NominatingMode : framework . ModeOverride , NominatedNodeName : "" }
// schedulingCycle tries to schedule a single Pod.
2022-10-21 13:53:18 +08:00
func ( sched * Scheduler ) schedulingCycle (
ctx context . Context ,
state * framework . CycleState ,
fwk framework . Framework ,
podInfo * framework . QueuedPodInfo ,
start time . Time ,
podsToActivate * framework . PodsToActivate ,
2022-12-07 18:46:36 +08:00
) ( ScheduleResult , * framework . QueuedPodInfo , * framework . Status ) {
2023-03-22 19:48:04 +08:00
logger := klog . FromContext ( ctx )
2022-08-10 13:11:36 +08:00
pod := podInfo . Pod
scheduleResult , err := sched . SchedulePod ( ctx , fwk , state , pod )
2022-03-27 23:51:20 +08:00
if err != nil {
2023-11-22 18:53:06 +08:00
defer func ( ) {
metrics . SchedulingAlgorithmLatency . Observe ( metrics . SinceInSeconds ( start ) )
} ( )
2022-12-22 01:22:34 +08:00
if err == ErrNoNodesAvailable {
status := framework . NewStatus ( framework . UnschedulableAndUnresolvable ) . WithError ( err )
return ScheduleResult { nominatingInfo : clearNominatedNode } , podInfo , status
}
fitError , ok := err . ( * framework . FitError )
if ! ok {
2023-03-22 19:48:04 +08:00
logger . Error ( err , "Error selecting node for pod" , "pod" , klog . KObj ( pod ) )
2022-12-22 01:22:34 +08:00
return ScheduleResult { nominatingInfo : clearNominatedNode } , podInfo , framework . AsStatus ( err )
}
2022-03-27 23:51:20 +08:00
// SchedulePod() may have failed because the pod would not fit on any host, so we try to
// preempt, with the expectation that the next time the pod is tried for scheduling it
// will fit due to the preemption. It is also possible that a different pod will schedule
// into the resources that were preempted, but this is harmless.
2022-12-22 01:22:34 +08:00
if ! fwk . HasPostFilterPlugins ( ) {
2023-03-22 19:48:04 +08:00
logger . V ( 3 ) . Info ( "No PostFilter plugins are registered, so no preemption will be performed" )
2022-12-22 01:22:34 +08:00
return ScheduleResult { } , podInfo , framework . NewStatus ( framework . Unschedulable ) . WithError ( err )
}
// Run PostFilter plugins to attempt to make the pod schedulable in a future scheduling cycle.
result , status := fwk . RunPostFilterPlugins ( ctx , state , pod , fitError . Diagnosis . NodeToStatusMap )
msg := status . Message ( )
fitError . Diagnosis . PostFilterMsg = msg
if status . Code ( ) == framework . Error {
2023-03-22 19:48:04 +08:00
logger . Error ( nil , "Status after running PostFilter plugins for pod" , "pod" , klog . KObj ( pod ) , "status" , msg )
2022-03-27 23:51:20 +08:00
} else {
2023-03-22 19:48:04 +08:00
logger . V ( 5 ) . Info ( "Status after running PostFilter plugins for pod" , "pod" , klog . KObj ( pod ) , "status" , msg )
2022-03-27 23:51:20 +08:00
}
2022-12-07 18:46:36 +08:00
2022-12-22 01:22:34 +08:00
var nominatingInfo * framework . NominatingInfo
if result != nil {
nominatingInfo = result . NominatingInfo
}
return ScheduleResult { nominatingInfo : nominatingInfo } , podInfo , framework . NewStatus ( framework . Unschedulable ) . WithError ( err )
2022-03-27 23:51:20 +08:00
}
2022-12-07 18:46:36 +08:00
2022-03-27 23:51:20 +08:00
metrics . SchedulingAlgorithmLatency . Observe ( metrics . SinceInSeconds ( start ) )
// Tell the cache to assume that a pod now is running on a given node, even though it hasn't been bound yet.
// This allows us to keep scheduling without waiting on binding to occur.
assumedPodInfo := podInfo . DeepCopy ( )
assumedPod := assumedPodInfo . Pod
// assume modifies `assumedPod` by setting NodeName=scheduleResult.SuggestedHost
2023-03-22 19:48:04 +08:00
err = sched . assume ( logger , assumedPod , scheduleResult . SuggestedHost )
2022-03-27 23:51:20 +08:00
if err != nil {
// This is most probably result of a BUG in retrying logic.
// We report an error here so that pod scheduling can be retried.
// This relies on the fact that Error will check if the pod has been bound
// to a node and if so will not add it back to the unscheduled pods queue
// (otherwise this would cause an infinite loop).
2023-07-07 10:35:59 +08:00
return ScheduleResult { nominatingInfo : clearNominatedNode } , assumedPodInfo , framework . AsStatus ( err )
2022-03-27 23:51:20 +08:00
}
// Run the Reserve method of reserve plugins.
2022-08-10 13:11:36 +08:00
if sts := fwk . RunReservePluginsReserve ( ctx , state , assumedPod , scheduleResult . SuggestedHost ) ; ! sts . IsSuccess ( ) {
2022-03-27 23:51:20 +08:00
// trigger un-reserve to clean up state associated with the reserved Pod
2022-08-10 13:11:36 +08:00
fwk . RunReservePluginsUnreserve ( ctx , state , assumedPod , scheduleResult . SuggestedHost )
2023-03-22 19:48:04 +08:00
if forgetErr := sched . Cache . ForgetPod ( logger , assumedPod ) ; forgetErr != nil {
logger . Error ( forgetErr , "Scheduler cache ForgetPod failed" )
2022-03-27 23:51:20 +08:00
}
2022-10-17 23:24:24 +08:00
2023-10-25 20:01:07 +08:00
if sts . IsRejected ( ) {
2023-07-07 10:51:30 +08:00
fitErr := & framework . FitError {
NumAllNodes : 1 ,
Pod : pod ,
Diagnosis : framework . Diagnosis {
2023-10-19 19:02:11 +08:00
NodeToStatusMap : framework . NodeToStatusMap { scheduleResult . SuggestedHost : sts } ,
2023-07-07 10:51:30 +08:00
} ,
}
2023-10-20 10:53:06 +08:00
fitErr . Diagnosis . AddPluginStatus ( sts )
2023-07-07 10:51:30 +08:00
return ScheduleResult { nominatingInfo : clearNominatedNode } , assumedPodInfo , framework . NewStatus ( sts . Code ( ) ) . WithError ( fitErr )
}
2023-07-07 10:35:59 +08:00
return ScheduleResult { nominatingInfo : clearNominatedNode } , assumedPodInfo , sts
2022-03-27 23:51:20 +08:00
}
// Run "permit" plugins.
2022-08-10 13:11:36 +08:00
runPermitStatus := fwk . RunPermitPlugins ( ctx , state , assumedPod , scheduleResult . SuggestedHost )
2022-04-14 15:06:37 +08:00
if ! runPermitStatus . IsWait ( ) && ! runPermitStatus . IsSuccess ( ) {
2022-10-21 13:53:18 +08:00
// trigger un-reserve to clean up state associated with the reserved Pod
2022-10-17 23:24:24 +08:00
fwk . RunReservePluginsUnreserve ( ctx , state , assumedPod , scheduleResult . SuggestedHost )
2023-03-22 19:48:04 +08:00
if forgetErr := sched . Cache . ForgetPod ( logger , assumedPod ) ; forgetErr != nil {
logger . Error ( forgetErr , "Scheduler cache ForgetPod failed" )
2022-10-17 23:24:24 +08:00
}
2023-10-25 20:01:07 +08:00
if runPermitStatus . IsRejected ( ) {
2023-07-07 10:35:59 +08:00
fitErr := & framework . FitError {
NumAllNodes : 1 ,
Pod : pod ,
Diagnosis : framework . Diagnosis {
2023-10-19 19:02:11 +08:00
NodeToStatusMap : framework . NodeToStatusMap { scheduleResult . SuggestedHost : runPermitStatus } ,
2023-07-07 10:35:59 +08:00
} ,
}
2023-10-20 10:53:06 +08:00
fitErr . Diagnosis . AddPluginStatus ( runPermitStatus )
2023-07-07 10:35:59 +08:00
return ScheduleResult { nominatingInfo : clearNominatedNode } , assumedPodInfo , framework . NewStatus ( runPermitStatus . Code ( ) ) . WithError ( fitErr )
}
return ScheduleResult { nominatingInfo : clearNominatedNode } , assumedPodInfo , runPermitStatus
2022-03-27 23:51:20 +08:00
}
// At the end of a successful scheduling cycle, pop and move up Pods if needed.
if len ( podsToActivate . Map ) != 0 {
2023-03-22 19:48:04 +08:00
sched . SchedulingQueue . Activate ( logger , podsToActivate . Map )
2022-03-27 23:51:20 +08:00
// Clear the entries after activation.
podsToActivate . Map = make ( map [ string ] * v1 . Pod )
}
2022-10-21 13:53:18 +08:00
return scheduleResult , assumedPodInfo , nil
2022-08-10 13:11:36 +08:00
}
2022-03-27 23:51:20 +08:00
2022-08-10 13:11:36 +08:00
// bindingCycle tries to bind an assumed Pod.
2022-10-21 13:53:18 +08:00
func ( sched * Scheduler ) bindingCycle (
ctx context . Context ,
state * framework . CycleState ,
fwk framework . Framework ,
scheduleResult ScheduleResult ,
assumedPodInfo * framework . QueuedPodInfo ,
start time . Time ,
podsToActivate * framework . PodsToActivate ) * framework . Status {
2023-03-22 19:48:04 +08:00
logger := klog . FromContext ( ctx )
2022-03-27 23:51:20 +08:00
2022-10-21 13:53:18 +08:00
assumedPod := assumedPodInfo . Pod
2022-03-27 23:51:20 +08:00
2022-10-21 13:53:18 +08:00
// Run "permit" plugins.
if status := fwk . WaitOnPermit ( ctx , assumedPod ) ; ! status . IsSuccess ( ) {
2023-10-25 20:01:07 +08:00
if status . IsRejected ( ) {
2023-08-06 17:06:20 +08:00
fitErr := & framework . FitError {
NumAllNodes : 1 ,
Pod : assumedPodInfo . Pod ,
Diagnosis : framework . Diagnosis {
NodeToStatusMap : framework . NodeToStatusMap { scheduleResult . SuggestedHost : status } ,
2023-10-25 20:01:07 +08:00
UnschedulablePlugins : sets . New ( status . Plugin ( ) ) ,
2023-08-06 17:06:20 +08:00
} ,
}
return framework . NewStatus ( status . Code ( ) ) . WithError ( fitErr )
}
2022-10-21 13:53:18 +08:00
return status
2022-10-17 23:24:24 +08:00
}
2022-09-09 23:31:23 +08:00
2022-10-17 23:24:24 +08:00
// Run "prebind" plugins.
2022-10-21 13:53:18 +08:00
if status := fwk . RunPreBindPlugins ( ctx , state , assumedPod , scheduleResult . SuggestedHost ) ; ! status . IsSuccess ( ) {
return status
2022-10-17 23:24:24 +08:00
}
2022-10-21 13:53:18 +08:00
// Run "bind" plugins.
if status := sched . bind ( ctx , fwk , assumedPod , scheduleResult . SuggestedHost , state ) ; ! status . IsSuccess ( ) {
return status
2022-08-10 13:11:36 +08:00
}
2022-10-21 13:53:18 +08:00
2022-08-10 13:11:36 +08:00
// Calculating nodeResourceString can be heavy. Avoid it if klog verbosity is below 2.
2023-03-22 19:48:04 +08:00
logger . V ( 2 ) . Info ( "Successfully bound pod to node" , "pod" , klog . KObj ( assumedPod ) , "node" , scheduleResult . SuggestedHost , "evaluatedNodes" , scheduleResult . EvaluatedNodes , "feasibleNodes" , scheduleResult . FeasibleNodes )
2022-08-10 13:11:36 +08:00
metrics . PodScheduled ( fwk . ProfileName ( ) , metrics . SinceInSeconds ( start ) )
metrics . PodSchedulingAttempts . Observe ( float64 ( assumedPodInfo . Attempts ) )
2023-05-16 03:20:44 +08:00
if assumedPodInfo . InitialAttemptTimestamp != nil {
metrics . PodSchedulingDuration . WithLabelValues ( getAttemptsLabel ( assumedPodInfo ) ) . Observe ( metrics . SinceInSeconds ( * assumedPodInfo . InitialAttemptTimestamp ) )
2023-08-16 06:17:41 +08:00
metrics . PodSchedulingSLIDuration . WithLabelValues ( getAttemptsLabel ( assumedPodInfo ) ) . Observe ( metrics . SinceInSeconds ( * assumedPodInfo . InitialAttemptTimestamp ) )
2023-05-16 03:20:44 +08:00
}
2022-08-10 13:11:36 +08:00
// Run "postbind" plugins.
fwk . RunPostBindPlugins ( ctx , state , assumedPod , scheduleResult . SuggestedHost )
// At the end of a successful binding cycle, move up Pods if needed.
if len ( podsToActivate . Map ) != 0 {
2023-03-22 19:48:04 +08:00
sched . SchedulingQueue . Activate ( logger , podsToActivate . Map )
2022-08-10 13:11:36 +08:00
// Unlike the logic in schedulingCycle(), we don't bother deleting the entries
// as `podsToActivate.Map` is no longer consumed.
}
2022-10-21 13:53:18 +08:00
return nil
}
func ( sched * Scheduler ) handleBindingCycleError (
ctx context . Context ,
state * framework . CycleState ,
fwk framework . Framework ,
podInfo * framework . QueuedPodInfo ,
start time . Time ,
scheduleResult ScheduleResult ,
status * framework . Status ) {
2023-03-22 19:48:04 +08:00
logger := klog . FromContext ( ctx )
2022-10-21 13:53:18 +08:00
assumedPod := podInfo . Pod
// trigger un-reserve plugins to clean up state associated with the reserved Pod
fwk . RunReservePluginsUnreserve ( ctx , state , assumedPod , scheduleResult . SuggestedHost )
2023-03-22 19:48:04 +08:00
if forgetErr := sched . Cache . ForgetPod ( logger , assumedPod ) ; forgetErr != nil {
logger . Error ( forgetErr , "scheduler cache ForgetPod failed" )
2022-10-21 13:53:18 +08:00
} else {
// "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event,
// as the assumed Pod had occupied a certain amount of resources in scheduler cache.
//
// Avoid moving the assumed Pod itself as it's always Unschedulable.
// It's intentional to "defer" this operation; otherwise MoveAllToActiveOrBackoffQueue() would
// update `q.moveRequest` and thus move the assumed pod to backoffQ anyways.
2023-10-25 20:01:07 +08:00
if status . IsRejected ( ) {
2023-06-08 12:54:30 +08:00
defer sched . SchedulingQueue . MoveAllToActiveOrBackoffQueue ( logger , internalqueue . AssignedPodDelete , assumedPod , nil , func ( pod * v1 . Pod ) bool {
2022-10-21 13:53:18 +08:00
return assumedPod . UID != pod . UID
} )
} else {
2023-06-08 12:54:30 +08:00
sched . SchedulingQueue . MoveAllToActiveOrBackoffQueue ( logger , internalqueue . AssignedPodDelete , assumedPod , nil , nil )
2022-10-21 13:53:18 +08:00
}
}
2022-12-07 18:46:36 +08:00
sched . FailureHandler ( ctx , fwk , podInfo , status , clearNominatedNode , start )
2022-03-27 23:51:20 +08:00
}
func ( sched * Scheduler ) frameworkForPod ( pod * v1 . Pod ) ( framework . Framework , error ) {
fwk , ok := sched . Profiles [ pod . Spec . SchedulerName ]
if ! ok {
return nil , fmt . Errorf ( "profile not found for scheduler name %q" , pod . Spec . SchedulerName )
}
return fwk , nil
}
// skipPodSchedule returns true if we could skip scheduling the pod for specified cases.
2023-03-22 19:48:04 +08:00
func ( sched * Scheduler ) skipPodSchedule ( ctx context . Context , fwk framework . Framework , pod * v1 . Pod ) bool {
2022-03-27 23:51:20 +08:00
// Case 1: pod is being deleted.
if pod . DeletionTimestamp != nil {
fwk . EventRecorder ( ) . Eventf ( pod , nil , v1 . EventTypeWarning , "FailedScheduling" , "Scheduling" , "skip schedule deleting pod: %v/%v" , pod . Namespace , pod . Name )
2023-03-22 19:48:04 +08:00
klog . FromContext ( ctx ) . V ( 3 ) . Info ( "Skip schedule deleting pod" , "pod" , klog . KObj ( pod ) )
2022-03-27 23:51:20 +08:00
return true
}
// Case 2: pod that has been assumed could be skipped.
// An assumed pod can be added again to the scheduling queue if it got an update event
// during its previous scheduling cycle but before getting assumed.
isAssumed , err := sched . Cache . IsAssumedPod ( pod )
if err != nil {
2023-03-22 19:48:04 +08:00
// TODO(91633): pass ctx into a revised HandleError
2022-03-27 23:51:20 +08:00
utilruntime . HandleError ( fmt . Errorf ( "failed to check whether pod %s/%s is assumed: %v" , pod . Namespace , pod . Name , err ) )
return false
}
return isAssumed
}
// schedulePod tries to schedule the given pod to one of the nodes in the node list.
// If it succeeds, it will return the name of the node.
// If it fails, it will return a FitError with reasons.
func ( sched * Scheduler ) schedulePod ( ctx context . Context , fwk framework . Framework , state * framework . CycleState , pod * v1 . Pod ) ( result ScheduleResult , err error ) {
trace := utiltrace . New ( "Scheduling" , utiltrace . Field { Key : "namespace" , Value : pod . Namespace } , utiltrace . Field { Key : "name" , Value : pod . Name } )
defer trace . LogIfLong ( 100 * time . Millisecond )
2023-03-22 19:48:04 +08:00
if err := sched . Cache . UpdateSnapshot ( klog . FromContext ( ctx ) , sched . nodeInfoSnapshot ) ; err != nil {
2022-03-27 23:51:20 +08:00
return result , err
}
trace . Step ( "Snapshotting scheduler cache and node infos done" )
if sched . nodeInfoSnapshot . NumNodes ( ) == 0 {
return result , ErrNoNodesAvailable
}
feasibleNodes , diagnosis , err := sched . findNodesThatFitPod ( ctx , fwk , state , pod )
if err != nil {
return result , err
}
trace . Step ( "Computing predicates done" )
if len ( feasibleNodes ) == 0 {
return result , & framework . FitError {
Pod : pod ,
NumAllNodes : sched . nodeInfoSnapshot . NumNodes ( ) ,
Diagnosis : diagnosis ,
}
}
// When only one node after predicate, just use it.
if len ( feasibleNodes ) == 1 {
return ScheduleResult {
SuggestedHost : feasibleNodes [ 0 ] . Name ,
EvaluatedNodes : 1 + len ( diagnosis . NodeToStatusMap ) ,
FeasibleNodes : 1 ,
} , nil
}
2023-12-14 17:27:25 +08:00
priorityList , err := prioritizeNodes ( ctx , sched . Extenders , fwk , state , pod , feasibleNodes )
2022-03-27 23:51:20 +08:00
if err != nil {
return result , err
}
2022-12-25 11:51:42 +08:00
host , _ , err := selectHost ( priorityList , numberOfHighestScoredNodesToReport )
2022-03-27 23:51:20 +08:00
trace . Step ( "Prioritizing done" )
return ScheduleResult {
SuggestedHost : host ,
EvaluatedNodes : len ( feasibleNodes ) + len ( diagnosis . NodeToStatusMap ) ,
FeasibleNodes : len ( feasibleNodes ) ,
} , err
}
// Filters the nodes to find the ones that fit the pod based on the framework
// filter plugins and filter extenders.
func ( sched * Scheduler ) findNodesThatFitPod ( ctx context . Context , fwk framework . Framework , state * framework . CycleState , pod * v1 . Pod ) ( [ ] * v1 . Node , framework . Diagnosis , error ) {
2023-03-22 19:48:04 +08:00
logger := klog . FromContext ( ctx )
2022-03-27 23:51:20 +08:00
diagnosis := framework . Diagnosis {
2023-10-19 19:02:11 +08:00
NodeToStatusMap : make ( framework . NodeToStatusMap ) ,
2022-03-27 23:51:20 +08:00
}
allNodes , err := sched . nodeInfoSnapshot . NodeInfos ( ) . List ( )
if err != nil {
return nil , diagnosis , err
}
2022-07-01 02:27:25 +08:00
// Run "prefilter" plugins.
preRes , s := fwk . RunPreFilterPlugins ( ctx , state , pod )
2022-03-27 23:51:20 +08:00
if ! s . IsSuccess ( ) {
2023-10-25 20:01:07 +08:00
if ! s . IsRejected ( ) {
2022-03-27 23:51:20 +08:00
return nil , diagnosis , s . AsError ( )
}
2023-08-12 14:58:49 +08:00
// All nodes in NodeToStatusMap will have the same status so that they can be handled in the preemption.
// Some non trivial refactoring is needed to avoid this copy.
for _ , n := range allNodes {
diagnosis . NodeToStatusMap [ n . Node ( ) . Name ] = s
}
2022-07-01 05:52:57 +08:00
// Record the messages from PreFilter in Diagnosis.PreFilterMsg.
2022-12-01 02:05:07 +08:00
msg := s . Message ( )
diagnosis . PreFilterMsg = msg
2023-03-22 19:48:04 +08:00
logger . V ( 5 ) . Info ( "Status after running PreFilter plugins for pod" , "pod" , klog . KObj ( pod ) , "status" , msg )
2023-10-20 10:53:06 +08:00
diagnosis . AddPluginStatus ( s )
2022-03-27 23:51:20 +08:00
return nil , diagnosis , nil
}
// "NominatedNodeName" can potentially be set in a previous scheduling cycle as a result of preemption.
// This node is likely the only candidate that will fit the pod, and hence we try it first before iterating over all nodes.
if len ( pod . Status . NominatedNodeName ) > 0 {
feasibleNodes , err := sched . evaluateNominatedNode ( ctx , pod , fwk , state , diagnosis )
if err != nil {
2023-03-22 19:48:04 +08:00
logger . Error ( err , "Evaluation failed on nominated node" , "pod" , klog . KObj ( pod ) , "node" , pod . Status . NominatedNodeName )
2022-03-27 23:51:20 +08:00
}
// Nominated node passes all the filters, scheduler is good to assign this node to the pod.
if len ( feasibleNodes ) != 0 {
return feasibleNodes , diagnosis , nil
}
}
nodes := allNodes
if ! preRes . AllNodes ( ) {
nodes = make ( [ ] * framework . NodeInfo , 0 , len ( preRes . NodeNames ) )
for n := range preRes . NodeNames {
nInfo , err := sched . nodeInfoSnapshot . NodeInfos ( ) . Get ( n )
if err != nil {
return nil , diagnosis , err
}
nodes = append ( nodes , nInfo )
}
}
2023-10-19 19:02:11 +08:00
feasibleNodes , err := sched . findNodesThatPassFilters ( ctx , fwk , state , pod , & diagnosis , nodes )
2022-07-04 23:02:22 +08:00
// always try to update the sched.nextStartNodeIndex regardless of whether an error has occurred
// this is helpful to make sure that all the nodes have a chance to be searched
processedNodes := len ( feasibleNodes ) + len ( diagnosis . NodeToStatusMap )
sched . nextStartNodeIndex = ( sched . nextStartNodeIndex + processedNodes ) % len ( nodes )
2022-03-27 23:51:20 +08:00
if err != nil {
return nil , diagnosis , err
}
2023-11-23 20:17:47 +08:00
feasibleNodesAfterExtender , err := findNodesThatPassExtenders ( ctx , sched . Extenders , pod , feasibleNodes , diagnosis . NodeToStatusMap )
2022-03-27 23:51:20 +08:00
if err != nil {
return nil , diagnosis , err
}
2023-11-23 20:17:47 +08:00
if len ( feasibleNodesAfterExtender ) != len ( feasibleNodes ) {
// Extenders filtered out some nodes.
//
// Extender doesn't support any kind of requeueing feature like EnqueueExtensions in the scheduling framework.
// When Extenders reject some Nodes and the pod ends up being unschedulable,
// we put framework.ExtenderName to pInfo.UnschedulablePlugins.
// This Pod will be requeued from unschedulable pod pool to activeQ/backoffQ
// by any kind of cluster events.
// https://github.com/kubernetes/kubernetes/issues/122019
if diagnosis . UnschedulablePlugins == nil {
diagnosis . UnschedulablePlugins = sets . New [ string ] ( )
}
diagnosis . UnschedulablePlugins . Insert ( framework . ExtenderName )
}
return feasibleNodesAfterExtender , diagnosis , nil
2022-03-27 23:51:20 +08:00
}
func ( sched * Scheduler ) evaluateNominatedNode ( ctx context . Context , pod * v1 . Pod , fwk framework . Framework , state * framework . CycleState , diagnosis framework . Diagnosis ) ( [ ] * v1 . Node , error ) {
nnn := pod . Status . NominatedNodeName
nodeInfo , err := sched . nodeInfoSnapshot . Get ( nnn )
if err != nil {
return nil , err
}
node := [ ] * framework . NodeInfo { nodeInfo }
2023-10-19 19:02:11 +08:00
feasibleNodes , err := sched . findNodesThatPassFilters ( ctx , fwk , state , pod , & diagnosis , node )
2022-03-27 23:51:20 +08:00
if err != nil {
return nil , err
}
2023-03-22 19:48:04 +08:00
feasibleNodes , err = findNodesThatPassExtenders ( ctx , sched . Extenders , pod , feasibleNodes , diagnosis . NodeToStatusMap )
2022-03-27 23:51:20 +08:00
if err != nil {
return nil , err
}
return feasibleNodes , nil
}
// findNodesThatPassFilters finds the nodes that fit the filter plugins.
func ( sched * Scheduler ) findNodesThatPassFilters (
ctx context . Context ,
fwk framework . Framework ,
state * framework . CycleState ,
pod * v1 . Pod ,
2023-10-19 19:02:11 +08:00
diagnosis * framework . Diagnosis ,
2022-03-27 23:51:20 +08:00
nodes [ ] * framework . NodeInfo ) ( [ ] * v1 . Node , error ) {
2022-07-04 23:02:22 +08:00
numAllNodes := len ( nodes )
2022-09-17 04:18:12 +08:00
numNodesToFind := sched . numFeasibleNodesToFind ( fwk . PercentageOfNodesToScore ( ) , int32 ( numAllNodes ) )
2022-03-27 23:51:20 +08:00
// Create feasible list with enough space to avoid growing it
// and allow assigning.
feasibleNodes := make ( [ ] * v1 . Node , numNodesToFind )
if ! fwk . HasFilterPlugins ( ) {
for i := range feasibleNodes {
2022-07-04 23:02:22 +08:00
feasibleNodes [ i ] = nodes [ ( sched . nextStartNodeIndex + i ) % numAllNodes ] . Node ( )
2022-03-27 23:51:20 +08:00
}
return feasibleNodes , nil
}
errCh := parallelize . NewErrorChannel ( )
var statusesLock sync . Mutex
var feasibleNodesLen int32
ctx , cancel := context . WithCancel ( ctx )
2022-08-12 13:22:40 +08:00
defer cancel ( )
2022-03-27 23:51:20 +08:00
checkNode := func ( i int ) {
// We check the nodes starting from where we left off in the previous scheduling cycle,
// this is to make sure all nodes have the same chance of being examined across pods.
2022-07-04 23:02:22 +08:00
nodeInfo := nodes [ ( sched . nextStartNodeIndex + i ) % numAllNodes ]
2022-03-27 23:51:20 +08:00
status := fwk . RunFilterPluginsWithNominatedPods ( ctx , state , pod , nodeInfo )
if status . Code ( ) == framework . Error {
errCh . SendErrorWithCancel ( status . AsError ( ) , cancel )
return
}
if status . IsSuccess ( ) {
length := atomic . AddInt32 ( & feasibleNodesLen , 1 )
if length > numNodesToFind {
cancel ( )
atomic . AddInt32 ( & feasibleNodesLen , - 1 )
} else {
feasibleNodes [ length - 1 ] = nodeInfo . Node ( )
}
} else {
statusesLock . Lock ( )
diagnosis . NodeToStatusMap [ nodeInfo . Node ( ) . Name ] = status
2023-10-20 10:53:06 +08:00
diagnosis . AddPluginStatus ( status )
2022-03-27 23:51:20 +08:00
statusesLock . Unlock ( )
}
}
beginCheckNode := time . Now ( )
statusCode := framework . Success
defer func ( ) {
// We record Filter extension point latency here instead of in framework.go because framework.RunFilterPlugins
// function is called for each node, whereas we want to have an overall latency for all nodes per scheduling cycle.
// Note that this latency also includes latency for `addNominatedPods`, which calls framework.RunPreFilterAddPod.
2023-03-07 05:54:01 +08:00
metrics . FrameworkExtensionPointDuration . WithLabelValues ( metrics . Filter , statusCode . String ( ) , fwk . ProfileName ( ) ) . Observe ( metrics . SinceInSeconds ( beginCheckNode ) )
2022-03-27 23:51:20 +08:00
} ( )
// Stops searching for more nodes once the configured number of feasible nodes
// are found.
2023-03-07 05:54:01 +08:00
fwk . Parallelizer ( ) . Until ( ctx , numAllNodes , checkNode , metrics . Filter )
2022-03-27 23:51:20 +08:00
feasibleNodes = feasibleNodes [ : feasibleNodesLen ]
if err := errCh . ReceiveError ( ) ; err != nil {
statusCode = framework . Error
2022-07-04 23:02:22 +08:00
return feasibleNodes , err
2022-03-27 23:51:20 +08:00
}
return feasibleNodes , nil
}
// numFeasibleNodesToFind returns the number of feasible nodes that once found, the scheduler stops
// its search for more feasible nodes.
2022-09-17 04:18:12 +08:00
func ( sched * Scheduler ) numFeasibleNodesToFind ( percentageOfNodesToScore * int32 , numAllNodes int32 ) ( numNodes int32 ) {
if numAllNodes < minFeasibleNodesToFind {
2022-03-27 23:51:20 +08:00
return numAllNodes
}
2022-09-17 04:18:12 +08:00
// Use profile percentageOfNodesToScore if it's set. Otherwise, use global percentageOfNodesToScore.
var percentage int32
if percentageOfNodesToScore != nil {
percentage = * percentageOfNodesToScore
} else {
percentage = sched . percentageOfNodesToScore
}
if percentage == 0 {
percentage = int32 ( 50 ) - numAllNodes / 125
if percentage < minFeasibleNodesPercentageToFind {
percentage = minFeasibleNodesPercentageToFind
2022-03-27 23:51:20 +08:00
}
}
2022-09-17 04:18:12 +08:00
numNodes = numAllNodes * percentage / 100
2022-03-27 23:51:20 +08:00
if numNodes < minFeasibleNodesToFind {
return minFeasibleNodesToFind
}
return numNodes
}
2023-03-22 19:48:04 +08:00
func findNodesThatPassExtenders ( ctx context . Context , extenders [ ] framework . Extender , pod * v1 . Pod , feasibleNodes [ ] * v1 . Node , statuses framework . NodeToStatusMap ) ( [ ] * v1 . Node , error ) {
logger := klog . FromContext ( ctx )
2022-03-27 23:51:20 +08:00
// Extenders are called sequentially.
// Nodes in original feasibleNodes can be excluded in one extender, and pass on to the next
// extender in a decreasing manner.
for _ , extender := range extenders {
if len ( feasibleNodes ) == 0 {
break
}
if ! extender . IsInterested ( pod ) {
continue
}
// Status of failed nodes in failedAndUnresolvableMap will be added or overwritten in <statuses>,
// so that the scheduler framework can respect the UnschedulableAndUnresolvable status for
// particular nodes, and this may eventually improve preemption efficiency.
// Note: users are recommended to configure the extenders that may return UnschedulableAndUnresolvable
// status ahead of others.
feasibleList , failedMap , failedAndUnresolvableMap , err := extender . Filter ( pod , feasibleNodes )
if err != nil {
if extender . IsIgnorable ( ) {
2023-03-22 19:48:04 +08:00
logger . Info ( "Skipping extender as it returned error and has ignorable flag set" , "extender" , extender , "err" , err )
2022-03-27 23:51:20 +08:00
continue
}
return nil , err
}
for failedNodeName , failedMsg := range failedAndUnresolvableMap {
var aggregatedReasons [ ] string
if _ , found := statuses [ failedNodeName ] ; found {
aggregatedReasons = statuses [ failedNodeName ] . Reasons ( )
}
aggregatedReasons = append ( aggregatedReasons , failedMsg )
statuses [ failedNodeName ] = framework . NewStatus ( framework . UnschedulableAndUnresolvable , aggregatedReasons ... )
}
for failedNodeName , failedMsg := range failedMap {
if _ , found := failedAndUnresolvableMap [ failedNodeName ] ; found {
// failedAndUnresolvableMap takes precedence over failedMap
// note that this only happens if the extender returns the node in both maps
continue
}
if _ , found := statuses [ failedNodeName ] ; ! found {
statuses [ failedNodeName ] = framework . NewStatus ( framework . Unschedulable , failedMsg )
} else {
statuses [ failedNodeName ] . AppendReason ( failedMsg )
}
}
feasibleNodes = feasibleList
}
return feasibleNodes , nil
}
// prioritizeNodes prioritizes the nodes by running the score plugins,
// which return a score for each node from the call to RunScorePlugins().
// The scores from each plugin are added together to make the score for that node, then
// any extenders are run as well.
// All scores are finally combined (added) to get the total weighted scores of all nodes
2023-12-14 17:27:25 +08:00
func prioritizeNodes (
2022-03-27 23:51:20 +08:00
ctx context . Context ,
2023-12-14 17:27:25 +08:00
extenders [ ] framework . Extender ,
2022-03-27 23:51:20 +08:00
fwk framework . Framework ,
state * framework . CycleState ,
pod * v1 . Pod ,
nodes [ ] * v1 . Node ,
2022-10-31 07:17:27 +08:00
) ( [ ] framework . NodePluginScores , error ) {
2023-03-22 19:48:04 +08:00
logger := klog . FromContext ( ctx )
2022-03-27 23:51:20 +08:00
// If no priority configs are provided, then all nodes will have a score of one.
// This is required to generate the priority list in the required format
2023-12-14 17:27:25 +08:00
if len ( extenders ) == 0 && ! fwk . HasScorePlugins ( ) {
2022-10-31 07:17:27 +08:00
result := make ( [ ] framework . NodePluginScores , 0 , len ( nodes ) )
2022-03-27 23:51:20 +08:00
for i := range nodes {
2022-10-31 07:17:27 +08:00
result = append ( result , framework . NodePluginScores {
Name : nodes [ i ] . Name ,
TotalScore : 1 ,
2022-03-27 23:51:20 +08:00
} )
}
return result , nil
}
// Run PreScore plugins.
preScoreStatus := fwk . RunPreScorePlugins ( ctx , state , pod , nodes )
if ! preScoreStatus . IsSuccess ( ) {
return nil , preScoreStatus . AsError ( )
}
// Run the Score plugins.
2022-03-04 01:00:00 +08:00
nodesScores , scoreStatus := fwk . RunScorePlugins ( ctx , state , pod , nodes )
2022-03-27 23:51:20 +08:00
if ! scoreStatus . IsSuccess ( ) {
return nil , scoreStatus . AsError ( )
}
// Additional details logged at level 10 if enabled.
2023-03-22 19:48:04 +08:00
loggerVTen := logger . V ( 10 )
if loggerVTen . Enabled ( ) {
2022-03-04 01:00:00 +08:00
for _ , nodeScore := range nodesScores {
for _ , pluginScore := range nodeScore . Scores {
2023-03-22 19:48:04 +08:00
loggerVTen . Info ( "Plugin scored node for pod" , "pod" , klog . KObj ( pod ) , "plugin" , pluginScore . Name , "node" , nodeScore . Name , "score" , pluginScore . Score )
2022-03-27 23:51:20 +08:00
}
}
}
2023-12-14 17:27:25 +08:00
if len ( extenders ) != 0 && nodes != nil {
2022-10-31 07:17:27 +08:00
// allNodeExtendersScores has all extenders scores for all nodes.
// It is keyed with node name.
allNodeExtendersScores := make ( map [ string ] * framework . NodePluginScores , len ( nodes ) )
2022-03-27 23:51:20 +08:00
var mu sync . Mutex
var wg sync . WaitGroup
2023-12-14 17:27:25 +08:00
for i := range extenders {
if ! extenders [ i ] . IsInterested ( pod ) {
2022-03-27 23:51:20 +08:00
continue
}
wg . Add ( 1 )
go func ( extIndex int ) {
2022-08-24 18:10:32 +08:00
metrics . Goroutines . WithLabelValues ( metrics . PrioritizingExtender ) . Inc ( )
2022-03-27 23:51:20 +08:00
defer func ( ) {
2022-08-24 18:10:32 +08:00
metrics . Goroutines . WithLabelValues ( metrics . PrioritizingExtender ) . Dec ( )
2022-03-27 23:51:20 +08:00
wg . Done ( )
} ( )
2023-12-14 17:27:25 +08:00
prioritizedList , weight , err := extenders [ extIndex ] . Prioritize ( pod , nodes )
2022-03-27 23:51:20 +08:00
if err != nil {
// Prioritization errors from extender can be ignored, let k8s/other extenders determine the priorities
2023-12-14 17:27:25 +08:00
logger . V ( 5 ) . Info ( "Failed to run extender's priority function. No score given by this extender." , "error" , err , "pod" , klog . KObj ( pod ) , "extender" , extenders [ extIndex ] . Name ( ) )
2022-03-27 23:51:20 +08:00
return
}
mu . Lock ( )
2022-10-31 07:17:27 +08:00
defer mu . Unlock ( )
2022-03-27 23:51:20 +08:00
for i := range * prioritizedList {
2022-10-31 07:17:27 +08:00
nodename := ( * prioritizedList ) [ i ] . Host
score := ( * prioritizedList ) [ i ] . Score
2023-03-22 19:48:04 +08:00
if loggerVTen . Enabled ( ) {
2023-12-14 17:27:25 +08:00
loggerVTen . Info ( "Extender scored node for pod" , "pod" , klog . KObj ( pod ) , "extender" , extenders [ extIndex ] . Name ( ) , "node" , nodename , "score" , score )
2022-10-31 07:17:27 +08:00
}
// MaxExtenderPriority may diverge from the max priority used in the scheduler and defined by MaxNodeScore,
// therefore we need to scale the score returned by extenders to the score range used by the scheduler.
finalscore := score * weight * ( framework . MaxNodeScore / extenderv1 . MaxExtenderPriority )
if allNodeExtendersScores [ nodename ] == nil {
allNodeExtendersScores [ nodename ] = & framework . NodePluginScores {
Name : nodename ,
2023-12-14 17:27:25 +08:00
Scores : make ( [ ] framework . PluginScore , 0 , len ( extenders ) ) ,
2022-10-31 07:17:27 +08:00
}
2022-03-27 23:51:20 +08:00
}
2022-10-31 07:17:27 +08:00
allNodeExtendersScores [ nodename ] . Scores = append ( allNodeExtendersScores [ nodename ] . Scores , framework . PluginScore {
2023-12-14 17:27:25 +08:00
Name : extenders [ extIndex ] . Name ( ) ,
2022-10-31 07:17:27 +08:00
Score : finalscore ,
} )
allNodeExtendersScores [ nodename ] . TotalScore += finalscore
2022-03-27 23:51:20 +08:00
}
} ( i )
}
// wait for all go routines to finish
wg . Wait ( )
2022-10-31 07:17:27 +08:00
for i := range nodesScores {
if score , ok := allNodeExtendersScores [ nodes [ i ] . Name ] ; ok {
nodesScores [ i ] . Scores = append ( nodesScores [ i ] . Scores , score . Scores ... )
nodesScores [ i ] . TotalScore += score . TotalScore
}
2022-03-27 23:51:20 +08:00
}
}
2023-03-22 19:48:04 +08:00
if loggerVTen . Enabled ( ) {
2022-10-31 07:17:27 +08:00
for i := range nodesScores {
2023-03-22 19:48:04 +08:00
loggerVTen . Info ( "Calculated node's final score for pod" , "pod" , klog . KObj ( pod ) , "node" , nodesScores [ i ] . Name , "score" , nodesScores [ i ] . TotalScore )
2022-03-27 23:51:20 +08:00
}
}
2022-10-31 07:17:27 +08:00
return nodesScores , nil
2022-03-27 23:51:20 +08:00
}
2022-12-25 11:51:42 +08:00
var errEmptyPriorityList = errors . New ( "empty priorityList" )
2022-03-27 23:51:20 +08:00
// selectHost takes a prioritized list of nodes and then picks one
// in a reservoir sampling manner from the nodes that had the highest score.
2022-12-25 11:51:42 +08:00
// It also returns the top {count} Nodes,
// and the top of the list will be always the selected host.
func selectHost ( nodeScoreList [ ] framework . NodePluginScores , count int ) ( string , [ ] framework . NodePluginScores , error ) {
if len ( nodeScoreList ) == 0 {
return "" , nil , errEmptyPriorityList
2022-03-27 23:51:20 +08:00
}
2022-12-25 11:51:42 +08:00
var h nodeScoreHeap = nodeScoreList
heap . Init ( & h )
2022-03-27 23:51:20 +08:00
cntOfMaxScore := 1
2022-12-25 11:51:42 +08:00
selectedIndex := 0
// The top of the heap is the NodeScoreResult with the highest score.
sortedNodeScoreList := make ( [ ] framework . NodePluginScores , 0 , count )
sortedNodeScoreList = append ( sortedNodeScoreList , heap . Pop ( & h ) . ( framework . NodePluginScores ) )
// This for-loop will continue until all Nodes with the highest scores get checked for a reservoir sampling,
// and sortedNodeScoreList gets (count - 1) elements.
for ns := heap . Pop ( & h ) . ( framework . NodePluginScores ) ; ; ns = heap . Pop ( & h ) . ( framework . NodePluginScores ) {
if ns . TotalScore != sortedNodeScoreList [ 0 ] . TotalScore && len ( sortedNodeScoreList ) == count {
break
}
if ns . TotalScore == sortedNodeScoreList [ 0 ] . TotalScore {
2022-03-27 23:51:20 +08:00
cntOfMaxScore ++
if rand . Intn ( cntOfMaxScore ) == 0 {
// Replace the candidate with probability of 1/cntOfMaxScore
2022-12-25 11:51:42 +08:00
selectedIndex = cntOfMaxScore - 1
2022-03-27 23:51:20 +08:00
}
}
2022-12-25 11:51:42 +08:00
sortedNodeScoreList = append ( sortedNodeScoreList , ns )
if h . Len ( ) == 0 {
break
}
}
if selectedIndex != 0 {
// replace the first one with selected one
previous := sortedNodeScoreList [ 0 ]
sortedNodeScoreList [ 0 ] = sortedNodeScoreList [ selectedIndex ]
sortedNodeScoreList [ selectedIndex ] = previous
2022-03-27 23:51:20 +08:00
}
2022-12-25 11:51:42 +08:00
if len ( sortedNodeScoreList ) > count {
sortedNodeScoreList = sortedNodeScoreList [ : count ]
}
return sortedNodeScoreList [ 0 ] . Name , sortedNodeScoreList , nil
}
// nodeScoreHeap is a heap of framework.NodePluginScores.
type nodeScoreHeap [ ] framework . NodePluginScores
// nodeScoreHeap implements heap.Interface.
var _ heap . Interface = & nodeScoreHeap { }
func ( h nodeScoreHeap ) Len ( ) int { return len ( h ) }
func ( h nodeScoreHeap ) Less ( i , j int ) bool { return h [ i ] . TotalScore > h [ j ] . TotalScore }
func ( h nodeScoreHeap ) Swap ( i , j int ) { h [ i ] , h [ j ] = h [ j ] , h [ i ] }
func ( h * nodeScoreHeap ) Push ( x interface { } ) {
* h = append ( * h , x . ( framework . NodePluginScores ) )
}
func ( h * nodeScoreHeap ) Pop ( ) interface { } {
old := * h
n := len ( old )
x := old [ n - 1 ]
* h = old [ 0 : n - 1 ]
return x
2022-03-27 23:51:20 +08:00
}
// assume signals to the cache that a pod is already in the cache, so that binding can be asynchronous.
// assume modifies `assumed`.
2023-03-22 19:48:04 +08:00
func ( sched * Scheduler ) assume ( logger klog . Logger , assumed * v1 . Pod , host string ) error {
2022-03-27 23:51:20 +08:00
// Optimistically assume that the binding will succeed and send it to apiserver
// in the background.
// If the binding fails, scheduler will release resources allocated to assumed pod
// immediately.
assumed . Spec . NodeName = host
2023-03-22 19:48:04 +08:00
if err := sched . Cache . AssumePod ( logger , assumed ) ; err != nil {
logger . Error ( err , "Scheduler cache AssumePod failed" )
2022-03-27 23:51:20 +08:00
return err
}
// if "assumed" is a nominated pod, we should remove it from internal cache
if sched . SchedulingQueue != nil {
sched . SchedulingQueue . DeleteNominatedPodIfExists ( assumed )
}
return nil
}
// bind binds a pod to a given node defined in a binding object.
// The precedence for binding is: (1) extenders and (2) framework plugins.
// We expect this to run asynchronously, so we handle binding metrics internally.
2022-09-09 23:31:23 +08:00
func ( sched * Scheduler ) bind ( ctx context . Context , fwk framework . Framework , assumed * v1 . Pod , targetNode string , state * framework . CycleState ) ( status * framework . Status ) {
2023-03-22 19:48:04 +08:00
logger := klog . FromContext ( ctx )
2022-03-27 23:51:20 +08:00
defer func ( ) {
2023-03-22 19:48:04 +08:00
sched . finishBinding ( logger , fwk , assumed , targetNode , status )
2022-03-27 23:51:20 +08:00
} ( )
bound , err := sched . extendersBinding ( assumed , targetNode )
if bound {
2022-09-09 23:31:23 +08:00
return framework . AsStatus ( err )
2022-03-27 23:51:20 +08:00
}
2022-09-09 23:31:23 +08:00
return fwk . RunBindPlugins ( ctx , state , assumed , targetNode )
2022-03-27 23:51:20 +08:00
}
// TODO(#87159): Move this to a Plugin.
func ( sched * Scheduler ) extendersBinding ( pod * v1 . Pod , node string ) ( bool , error ) {
for _ , extender := range sched . Extenders {
if ! extender . IsBinder ( ) || ! extender . IsInterested ( pod ) {
continue
}
return true , extender . Bind ( & v1 . Binding {
ObjectMeta : metav1 . ObjectMeta { Namespace : pod . Namespace , Name : pod . Name , UID : pod . UID } ,
Target : v1 . ObjectReference { Kind : "Node" , Name : node } ,
} )
}
return false , nil
}
2023-03-22 19:48:04 +08:00
func ( sched * Scheduler ) finishBinding ( logger klog . Logger , fwk framework . Framework , assumed * v1 . Pod , targetNode string , status * framework . Status ) {
if finErr := sched . Cache . FinishBinding ( logger , assumed ) ; finErr != nil {
logger . Error ( finErr , "Scheduler cache FinishBinding failed" )
2022-03-27 23:51:20 +08:00
}
2022-09-09 23:31:23 +08:00
if ! status . IsSuccess ( ) {
2023-03-22 19:48:04 +08:00
logger . V ( 1 ) . Info ( "Failed to bind pod" , "pod" , klog . KObj ( assumed ) )
2022-03-27 23:51:20 +08:00
return
}
fwk . EventRecorder ( ) . Eventf ( assumed , nil , v1 . EventTypeNormal , "Scheduled" , "Binding" , "Successfully assigned %v/%v to %v" , assumed . Namespace , assumed . Name , targetNode )
}
func getAttemptsLabel ( p * framework . QueuedPodInfo ) string {
// We breakdown the pod scheduling duration by attempts capped to a limit
// to avoid ending up with a high cardinality metric.
if p . Attempts >= 15 {
return "15+"
}
return strconv . Itoa ( p . Attempts )
}
// handleSchedulingFailure records an event for the pod that indicates the
// pod has failed to schedule. Also, update the pod condition and nominated node name if set.
2022-12-07 18:46:36 +08:00
func ( sched * Scheduler ) handleSchedulingFailure ( ctx context . Context , fwk framework . Framework , podInfo * framework . QueuedPodInfo , status * framework . Status , nominatingInfo * framework . NominatingInfo , start time . Time ) {
2023-07-18 06:53:07 +08:00
calledDone := false
defer func ( ) {
if ! calledDone {
// Basically, AddUnschedulableIfNotPresent calls DonePod internally.
// But, AddUnschedulableIfNotPresent isn't called in some corner cases.
// Here, we call DonePod explicitly to avoid leaking the pod.
sched . SchedulingQueue . Done ( podInfo . Pod . UID )
}
} ( )
2023-03-22 19:48:04 +08:00
logger := klog . FromContext ( ctx )
2022-12-07 18:46:36 +08:00
reason := v1 . PodReasonSchedulerError
2023-10-25 20:01:07 +08:00
if status . IsRejected ( ) {
2022-12-07 18:46:36 +08:00
reason = v1 . PodReasonUnschedulable
}
2022-10-21 13:53:18 +08:00
switch reason {
case v1 . PodReasonUnschedulable :
metrics . PodUnschedulable ( fwk . ProfileName ( ) , metrics . SinceInSeconds ( start ) )
case v1 . PodReasonSchedulerError :
metrics . PodScheduleError ( fwk . ProfileName ( ) , metrics . SinceInSeconds ( start ) )
}
2022-07-08 04:35:48 +08:00
pod := podInfo . Pod
2022-12-07 18:46:36 +08:00
err := status . AsError ( )
2022-12-13 23:35:06 +08:00
errMsg := status . Message ( )
2022-12-07 18:46:36 +08:00
2022-07-08 04:35:48 +08:00
if err == ErrNoNodesAvailable {
2023-03-22 19:48:04 +08:00
logger . V ( 2 ) . Info ( "Unable to schedule pod; no nodes are registered to the cluster; waiting" , "pod" , klog . KObj ( pod ) )
} else if fitError , ok := err . ( * framework . FitError ) ; ok { // Inject UnschedulablePlugins to PodInfo, which will be used later for moving Pods between queues efficiently.
2022-07-08 04:35:48 +08:00
podInfo . UnschedulablePlugins = fitError . Diagnosis . UnschedulablePlugins
2023-10-19 19:02:11 +08:00
podInfo . PendingPlugins = fitError . Diagnosis . PendingPlugins
2023-03-22 19:48:04 +08:00
logger . V ( 2 ) . Info ( "Unable to schedule pod; no fit; waiting" , "pod" , klog . KObj ( pod ) , "err" , errMsg )
2022-07-08 04:35:48 +08:00
} else {
2023-03-22 19:48:04 +08:00
logger . Error ( err , "Error scheduling pod; retrying" , "pod" , klog . KObj ( pod ) )
2022-07-08 04:35:48 +08:00
}
// Check if the Pod exists in informer cache.
podLister := fwk . SharedInformerFactory ( ) . Core ( ) . V1 ( ) . Pods ( ) . Lister ( )
cachedPod , e := podLister . Pods ( pod . Namespace ) . Get ( pod . Name )
if e != nil {
2023-03-22 19:48:04 +08:00
logger . Info ( "Pod doesn't exist in informer cache" , "pod" , klog . KObj ( pod ) , "err" , e )
2023-07-18 06:53:07 +08:00
// We need to call DonePod here because we don't call AddUnschedulableIfNotPresent in this case.
2022-07-08 04:35:48 +08:00
} else {
// In the case of extender, the pod may have been bound successfully, but timed out returning its response to the scheduler.
// It could result in the live version to carry .spec.nodeName, and that's inconsistent with the internal-queued version.
if len ( cachedPod . Spec . NodeName ) != 0 {
2023-03-22 19:48:04 +08:00
logger . Info ( "Pod has been assigned to node. Abort adding it back to queue." , "pod" , klog . KObj ( pod ) , "node" , cachedPod . Spec . NodeName )
2023-07-18 06:53:07 +08:00
// We need to call DonePod here because we don't call AddUnschedulableIfNotPresent in this case.
2022-07-08 04:35:48 +08:00
} else {
// As <cachedPod> is from SharedInformer, we need to do a DeepCopy() here.
2022-09-05 12:43:45 +08:00
// ignore this err since apiserver doesn't properly validate affinity terms
// and we can't fix the validation for backwards compatibility.
podInfo . PodInfo , _ = framework . NewPodInfo ( cachedPod . DeepCopy ( ) )
2023-03-22 19:48:04 +08:00
if err := sched . SchedulingQueue . AddUnschedulableIfNotPresent ( logger , podInfo , sched . SchedulingQueue . SchedulingCycle ( ) ) ; err != nil {
logger . Error ( err , "Error occurred" )
2022-07-08 04:35:48 +08:00
}
2023-07-18 06:53:07 +08:00
calledDone = true
2022-07-08 04:35:48 +08:00
}
}
2022-03-27 23:51:20 +08:00
// Update the scheduling queue with the nominated pod information. Without
// this, there would be a race condition between the next scheduling cycle
// and the time the scheduler receives a Pod Update for the nominated pod.
// Here we check for nil only for tests.
if sched . SchedulingQueue != nil {
2023-03-22 19:48:04 +08:00
logger := klog . FromContext ( ctx )
sched . SchedulingQueue . AddNominatedPod ( logger , podInfo . PodInfo , nominatingInfo )
2022-03-27 23:51:20 +08:00
}
2022-07-08 04:35:48 +08:00
if err == nil {
// Only tests can reach here.
return
}
2022-07-19 08:57:50 +08:00
msg := truncateMessage ( errMsg )
2022-03-27 23:51:20 +08:00
fwk . EventRecorder ( ) . Eventf ( pod , nil , v1 . EventTypeWarning , "FailedScheduling" , "Scheduling" , msg )
2022-05-25 10:14:40 +08:00
if err := updatePod ( ctx , sched . client , pod , & v1 . PodCondition {
2022-03-27 23:51:20 +08:00
Type : v1 . PodScheduled ,
Status : v1 . ConditionFalse ,
Reason : reason ,
2022-07-19 08:57:50 +08:00
Message : errMsg ,
2022-03-27 23:51:20 +08:00
} , nominatingInfo ) ; err != nil {
2023-03-22 19:48:04 +08:00
klog . FromContext ( ctx ) . Error ( err , "Error updating pod" , "pod" , klog . KObj ( pod ) )
2022-03-27 23:51:20 +08:00
}
}
// truncateMessage truncates a message if it hits the NoteLengthLimit.
func truncateMessage ( message string ) string {
max := validation . NoteLengthLimit
if len ( message ) <= max {
return message
}
suffix := " ..."
return message [ : max - len ( suffix ) ] + suffix
}
2022-05-25 10:14:40 +08:00
func updatePod ( ctx context . Context , client clientset . Interface , pod * v1 . Pod , condition * v1 . PodCondition , nominatingInfo * framework . NominatingInfo ) error {
2023-03-22 19:48:04 +08:00
logger := klog . FromContext ( ctx )
logger . V ( 3 ) . Info ( "Updating pod condition" , "pod" , klog . KObj ( pod ) , "conditionType" , condition . Type , "conditionStatus" , condition . Status , "conditionReason" , condition . Reason )
2022-03-27 23:51:20 +08:00
podStatusCopy := pod . Status . DeepCopy ( )
// NominatedNodeName is updated only if we are trying to set it, and the value is
// different from the existing one.
nnnNeedsUpdate := nominatingInfo . Mode ( ) == framework . ModeOverride && pod . Status . NominatedNodeName != nominatingInfo . NominatedNodeName
if ! podutil . UpdatePodCondition ( podStatusCopy , condition ) && ! nnnNeedsUpdate {
return nil
}
if nnnNeedsUpdate {
podStatusCopy . NominatedNodeName = nominatingInfo . NominatedNodeName
}
2022-05-25 10:14:40 +08:00
return util . PatchPodStatus ( ctx , client , pod , podStatusCopy )
2022-03-27 23:51:20 +08:00
}