4699 lines
176 KiB
Go
4699 lines
176 KiB
Go
/*
|
|
Copyright 2014 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package scheduler
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"math"
|
|
"math/rand"
|
|
"regexp"
|
|
goruntime "runtime"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/google/go-cmp/cmp"
|
|
|
|
v1 "k8s.io/api/core/v1"
|
|
eventsv1 "k8s.io/api/events/v1"
|
|
"k8s.io/apimachinery/pkg/api/resource"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
"k8s.io/apimachinery/pkg/types"
|
|
"k8s.io/apimachinery/pkg/util/sets"
|
|
"k8s.io/apimachinery/pkg/util/version"
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
|
"k8s.io/client-go/informers"
|
|
clientset "k8s.io/client-go/kubernetes"
|
|
clientsetfake "k8s.io/client-go/kubernetes/fake"
|
|
"k8s.io/client-go/kubernetes/scheme"
|
|
clienttesting "k8s.io/client-go/testing"
|
|
clientcache "k8s.io/client-go/tools/cache"
|
|
"k8s.io/client-go/tools/events"
|
|
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
|
"k8s.io/component-helpers/storage/volume"
|
|
"k8s.io/klog/v2"
|
|
"k8s.io/klog/v2/ktesting"
|
|
extenderv1 "k8s.io/kube-scheduler/extender/v1"
|
|
fwk "k8s.io/kube-scheduler/framework"
|
|
"k8s.io/kubernetes/pkg/features"
|
|
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
|
|
apicache "k8s.io/kubernetes/pkg/scheduler/backend/api_cache"
|
|
apidispatcher "k8s.io/kubernetes/pkg/scheduler/backend/api_dispatcher"
|
|
internalcache "k8s.io/kubernetes/pkg/scheduler/backend/cache"
|
|
fakecache "k8s.io/kubernetes/pkg/scheduler/backend/cache/fake"
|
|
internalqueue "k8s.io/kubernetes/pkg/scheduler/backend/queue"
|
|
"k8s.io/kubernetes/pkg/scheduler/framework"
|
|
apicalls "k8s.io/kubernetes/pkg/scheduler/framework/api_calls"
|
|
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder"
|
|
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
|
|
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/imagelocality"
|
|
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeports"
|
|
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources"
|
|
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread"
|
|
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort"
|
|
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding"
|
|
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
|
|
"k8s.io/kubernetes/pkg/scheduler/metrics"
|
|
"k8s.io/kubernetes/pkg/scheduler/profile"
|
|
st "k8s.io/kubernetes/pkg/scheduler/testing"
|
|
tf "k8s.io/kubernetes/pkg/scheduler/testing/framework"
|
|
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
|
|
"k8s.io/utils/ptr"
|
|
)
|
|
|
|
const (
|
|
testSchedulerName = "test-scheduler"
|
|
mb int64 = 1024 * 1024
|
|
)
|
|
|
|
var (
|
|
emptySnapshot = internalcache.NewEmptySnapshot()
|
|
podTopologySpreadFunc = frameworkruntime.FactoryAdapter(feature.Features{}, podtopologyspread.New)
|
|
errPrioritize = fmt.Errorf("priority map encounters an error")
|
|
schedulerCmpOpts = []cmp.Option{
|
|
cmp.AllowUnexported(framework.NodeToStatus{}),
|
|
}
|
|
)
|
|
|
|
type fakeExtender struct {
|
|
isBinder bool
|
|
interestedPodName string
|
|
ignorable bool
|
|
gotBind bool
|
|
errBind bool
|
|
isPrioritizer bool
|
|
isFilter bool
|
|
}
|
|
|
|
func (f *fakeExtender) Name() string {
|
|
return "fakeExtender"
|
|
}
|
|
|
|
func (f *fakeExtender) IsIgnorable() bool {
|
|
return f.ignorable
|
|
}
|
|
|
|
func (f *fakeExtender) ProcessPreemption(
|
|
_ *v1.Pod,
|
|
_ map[string]*extenderv1.Victims,
|
|
_ fwk.NodeInfoLister,
|
|
) (map[string]*extenderv1.Victims, error) {
|
|
return nil, nil
|
|
}
|
|
|
|
func (f *fakeExtender) SupportsPreemption() bool {
|
|
return false
|
|
}
|
|
|
|
func (f *fakeExtender) Filter(pod *v1.Pod, nodes []fwk.NodeInfo) ([]fwk.NodeInfo, extenderv1.FailedNodesMap, extenderv1.FailedNodesMap, error) {
|
|
return nil, nil, nil, nil
|
|
}
|
|
|
|
func (f *fakeExtender) Prioritize(
|
|
_ *v1.Pod,
|
|
_ []fwk.NodeInfo,
|
|
) (hostPriorities *extenderv1.HostPriorityList, weight int64, err error) {
|
|
return nil, 0, nil
|
|
}
|
|
|
|
func (f *fakeExtender) Bind(binding *v1.Binding) error {
|
|
if f.isBinder {
|
|
if f.errBind {
|
|
return errors.New("bind error")
|
|
}
|
|
f.gotBind = true
|
|
return nil
|
|
}
|
|
return errors.New("not a binder")
|
|
}
|
|
|
|
func (f *fakeExtender) IsBinder() bool {
|
|
return f.isBinder
|
|
}
|
|
|
|
func (f *fakeExtender) IsInterested(pod *v1.Pod) bool {
|
|
return pod != nil && pod.Name == f.interestedPodName
|
|
}
|
|
|
|
func (f *fakeExtender) IsPrioritizer() bool {
|
|
return f.isPrioritizer
|
|
}
|
|
|
|
func (f *fakeExtender) IsFilter() bool {
|
|
return f.isFilter
|
|
}
|
|
|
|
type falseMapPlugin struct{}
|
|
|
|
func newFalseMapPlugin() frameworkruntime.PluginFactory {
|
|
return func(_ context.Context, _ runtime.Object, _ fwk.Handle) (fwk.Plugin, error) {
|
|
return &falseMapPlugin{}, nil
|
|
}
|
|
}
|
|
|
|
func (pl *falseMapPlugin) Name() string {
|
|
return "FalseMap"
|
|
}
|
|
|
|
func (pl *falseMapPlugin) Score(_ context.Context, _ fwk.CycleState, _ *v1.Pod, _ fwk.NodeInfo) (int64, *fwk.Status) {
|
|
return 0, fwk.AsStatus(errPrioritize)
|
|
}
|
|
|
|
func (pl *falseMapPlugin) ScoreExtensions() fwk.ScoreExtensions {
|
|
return nil
|
|
}
|
|
|
|
type numericMapPlugin struct{}
|
|
|
|
func newNumericMapPlugin() frameworkruntime.PluginFactory {
|
|
return func(_ context.Context, _ runtime.Object, _ fwk.Handle) (fwk.Plugin, error) {
|
|
return &numericMapPlugin{}, nil
|
|
}
|
|
}
|
|
|
|
func (pl *numericMapPlugin) Name() string {
|
|
return "NumericMap"
|
|
}
|
|
|
|
func (pl *numericMapPlugin) Score(_ context.Context, _ fwk.CycleState, _ *v1.Pod, nodeInfo fwk.NodeInfo) (int64, *fwk.Status) {
|
|
nodeName := nodeInfo.Node().Name
|
|
score, err := strconv.Atoi(nodeName)
|
|
if err != nil {
|
|
return 0, fwk.NewStatus(fwk.Error, fmt.Sprintf("Error converting nodename to int: %+v", nodeName))
|
|
}
|
|
return int64(score), nil
|
|
}
|
|
|
|
func (pl *numericMapPlugin) ScoreExtensions() fwk.ScoreExtensions {
|
|
return nil
|
|
}
|
|
|
|
// NewNoPodsFilterPlugin initializes a noPodsFilterPlugin and returns it.
|
|
func NewNoPodsFilterPlugin(_ context.Context, _ runtime.Object, _ fwk.Handle) (fwk.Plugin, error) {
|
|
return &noPodsFilterPlugin{}, nil
|
|
}
|
|
|
|
type reverseNumericMapPlugin struct{}
|
|
|
|
func (pl *reverseNumericMapPlugin) Name() string {
|
|
return "ReverseNumericMap"
|
|
}
|
|
|
|
func (pl *reverseNumericMapPlugin) Score(_ context.Context, _ fwk.CycleState, _ *v1.Pod, nodeInfo fwk.NodeInfo) (int64, *fwk.Status) {
|
|
nodeName := nodeInfo.Node().Name
|
|
score, err := strconv.Atoi(nodeName)
|
|
if err != nil {
|
|
return 0, fwk.NewStatus(fwk.Error, fmt.Sprintf("Error converting nodename to int: %+v", nodeName))
|
|
}
|
|
return int64(score), nil
|
|
}
|
|
|
|
func (pl *reverseNumericMapPlugin) ScoreExtensions() fwk.ScoreExtensions {
|
|
return pl
|
|
}
|
|
|
|
func (pl *reverseNumericMapPlugin) NormalizeScore(_ context.Context, _ fwk.CycleState, _ *v1.Pod, nodeScores fwk.NodeScoreList) *fwk.Status {
|
|
var maxScore float64
|
|
minScore := math.MaxFloat64
|
|
|
|
for _, hostPriority := range nodeScores {
|
|
maxScore = math.Max(maxScore, float64(hostPriority.Score))
|
|
minScore = math.Min(minScore, float64(hostPriority.Score))
|
|
}
|
|
for i, hostPriority := range nodeScores {
|
|
nodeScores[i] = fwk.NodeScore{
|
|
Name: hostPriority.Name,
|
|
Score: int64(maxScore + minScore - float64(hostPriority.Score)),
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func newReverseNumericMapPlugin() frameworkruntime.PluginFactory {
|
|
return func(_ context.Context, _ runtime.Object, _ fwk.Handle) (fwk.Plugin, error) {
|
|
return &reverseNumericMapPlugin{}, nil
|
|
}
|
|
}
|
|
|
|
type trueMapPlugin struct{}
|
|
|
|
func (pl *trueMapPlugin) Name() string {
|
|
return "TrueMap"
|
|
}
|
|
|
|
func (pl *trueMapPlugin) Score(_ context.Context, _ fwk.CycleState, _ *v1.Pod, _ fwk.NodeInfo) (int64, *fwk.Status) {
|
|
return 1, nil
|
|
}
|
|
|
|
func (pl *trueMapPlugin) ScoreExtensions() fwk.ScoreExtensions {
|
|
return pl
|
|
}
|
|
|
|
func (pl *trueMapPlugin) NormalizeScore(_ context.Context, _ fwk.CycleState, _ *v1.Pod, nodeScores fwk.NodeScoreList) *fwk.Status {
|
|
for _, host := range nodeScores {
|
|
if host.Name == "" {
|
|
return fwk.NewStatus(fwk.Error, "unexpected empty host name")
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func newTrueMapPlugin() frameworkruntime.PluginFactory {
|
|
return func(_ context.Context, _ runtime.Object, _ fwk.Handle) (fwk.Plugin, error) {
|
|
return &trueMapPlugin{}, nil
|
|
}
|
|
}
|
|
|
|
type noPodsFilterPlugin struct{}
|
|
|
|
// Name returns name of the plugin.
|
|
func (pl *noPodsFilterPlugin) Name() string {
|
|
return "NoPodsFilter"
|
|
}
|
|
|
|
// Filter invoked at the filter extension point.
|
|
func (pl *noPodsFilterPlugin) Filter(_ context.Context, _ fwk.CycleState, pod *v1.Pod, nodeInfo fwk.NodeInfo) *fwk.Status {
|
|
if len(nodeInfo.GetPods()) == 0 {
|
|
return nil
|
|
}
|
|
return fwk.NewStatus(fwk.Unschedulable, tf.ErrReasonFake)
|
|
}
|
|
|
|
type fakeNodeSelectorArgs struct {
|
|
NodeName string `json:"nodeName"`
|
|
}
|
|
|
|
type fakeNodeSelector struct {
|
|
fakeNodeSelectorArgs
|
|
}
|
|
|
|
func (s *fakeNodeSelector) Name() string {
|
|
return "FakeNodeSelector"
|
|
}
|
|
|
|
func (s *fakeNodeSelector) Filter(_ context.Context, _ fwk.CycleState, _ *v1.Pod, nodeInfo fwk.NodeInfo) *fwk.Status {
|
|
if nodeInfo.Node().Name != s.NodeName {
|
|
return fwk.NewStatus(fwk.UnschedulableAndUnresolvable)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func newFakeNodeSelector(_ context.Context, args runtime.Object, _ fwk.Handle) (fwk.Plugin, error) {
|
|
pl := &fakeNodeSelector{}
|
|
if err := frameworkruntime.DecodeInto(args, &pl.fakeNodeSelectorArgs); err != nil {
|
|
return nil, err
|
|
}
|
|
return pl, nil
|
|
}
|
|
|
|
const (
|
|
fakeSpecifiedNodeNameAnnotation = "fake-specified-node-name"
|
|
)
|
|
|
|
// fakeNodeSelectorDependOnPodAnnotation schedules pod to the specified one node from pod.Annotations[fakeSpecifiedNodeNameAnnotation].
|
|
type fakeNodeSelectorDependOnPodAnnotation struct{}
|
|
|
|
func (f *fakeNodeSelectorDependOnPodAnnotation) Name() string {
|
|
return "FakeNodeSelectorDependOnPodAnnotation"
|
|
}
|
|
|
|
// Filter selects the specified one node and rejects other non-specified nodes.
|
|
func (f *fakeNodeSelectorDependOnPodAnnotation) Filter(_ context.Context, _ fwk.CycleState, pod *v1.Pod, nodeInfo fwk.NodeInfo) *fwk.Status {
|
|
resolveNodeNameFromPodAnnotation := func(pod *v1.Pod) (string, error) {
|
|
if pod == nil {
|
|
return "", fmt.Errorf("empty pod")
|
|
}
|
|
nodeName, ok := pod.Annotations[fakeSpecifiedNodeNameAnnotation]
|
|
if !ok {
|
|
return "", fmt.Errorf("no specified node name on pod %s/%s annotation", pod.Namespace, pod.Name)
|
|
}
|
|
return nodeName, nil
|
|
}
|
|
|
|
nodeName, err := resolveNodeNameFromPodAnnotation(pod)
|
|
if err != nil {
|
|
return fwk.AsStatus(err)
|
|
}
|
|
if nodeInfo.Node().Name != nodeName {
|
|
return fwk.NewStatus(fwk.UnschedulableAndUnresolvable)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func newFakeNodeSelectorDependOnPodAnnotation(_ context.Context, _ runtime.Object, _ fwk.Handle) (fwk.Plugin, error) {
|
|
return &fakeNodeSelectorDependOnPodAnnotation{}, nil
|
|
}
|
|
|
|
type TestPlugin struct {
|
|
name string
|
|
}
|
|
|
|
var _ fwk.ScorePlugin = &TestPlugin{}
|
|
var _ fwk.FilterPlugin = &TestPlugin{}
|
|
|
|
func (t *TestPlugin) Name() string {
|
|
return t.name
|
|
}
|
|
|
|
func (t *TestPlugin) Score(ctx context.Context, state fwk.CycleState, p *v1.Pod, nodeInfo fwk.NodeInfo) (int64, *fwk.Status) {
|
|
return 1, nil
|
|
}
|
|
|
|
func (t *TestPlugin) ScoreExtensions() fwk.ScoreExtensions {
|
|
return nil
|
|
}
|
|
|
|
func (t *TestPlugin) Filter(ctx context.Context, state fwk.CycleState, pod *v1.Pod, nodeInfo fwk.NodeInfo) *fwk.Status {
|
|
return nil
|
|
}
|
|
|
|
func nodeToStatusDiff(want, got *framework.NodeToStatus) string {
|
|
if want == nil || got == nil {
|
|
return cmp.Diff(want, got)
|
|
}
|
|
return cmp.Diff(*want, *got, schedulerCmpOpts...)
|
|
}
|
|
|
|
func TestSchedulerMultipleProfilesScheduling(t *testing.T) {
|
|
nodes := []runtime.Object{
|
|
st.MakeNode().Name("node1").UID("node1").Obj(),
|
|
st.MakeNode().Name("node2").UID("node2").Obj(),
|
|
st.MakeNode().Name("node3").UID("node3").Obj(),
|
|
}
|
|
pods := []*v1.Pod{
|
|
st.MakePod().Name("pod1").UID("pod1").SchedulerName("match-node3").Obj(),
|
|
st.MakePod().Name("pod2").UID("pod2").SchedulerName("match-node2").Obj(),
|
|
st.MakePod().Name("pod3").UID("pod3").SchedulerName("match-node2").Obj(),
|
|
st.MakePod().Name("pod4").UID("pod4").SchedulerName("match-node3").Obj(),
|
|
}
|
|
wantBindings := map[string]string{
|
|
"pod1": "node3",
|
|
"pod2": "node2",
|
|
"pod3": "node2",
|
|
"pod4": "node3",
|
|
}
|
|
wantControllers := map[string]string{
|
|
"pod1": "match-node3",
|
|
"pod2": "match-node2",
|
|
"pod3": "match-node2",
|
|
"pod4": "match-node3",
|
|
}
|
|
|
|
// Set up scheduler for the 3 nodes.
|
|
// We use a fake filter that only allows one particular node. We create two
|
|
// profiles, each with a different node in the filter configuration.
|
|
objs := append([]runtime.Object{
|
|
&v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: ""}}}, nodes...)
|
|
client := clientsetfake.NewClientset(objs...)
|
|
broadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()})
|
|
_, ctx := ktesting.NewTestContext(t)
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
|
|
informerFactory := informers.NewSharedInformerFactory(client, 0)
|
|
sched, err := New(
|
|
ctx,
|
|
client,
|
|
informerFactory,
|
|
nil,
|
|
profile.NewRecorderFactory(broadcaster),
|
|
WithProfiles(
|
|
schedulerapi.KubeSchedulerProfile{SchedulerName: "match-node2",
|
|
Plugins: &schedulerapi.Plugins{
|
|
Filter: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "FakeNodeSelector"}}},
|
|
QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}},
|
|
Bind: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}},
|
|
},
|
|
PluginConfig: []schedulerapi.PluginConfig{
|
|
{
|
|
Name: "FakeNodeSelector",
|
|
Args: &runtime.Unknown{Raw: []byte(`{"nodeName":"node2"}`)},
|
|
},
|
|
},
|
|
},
|
|
schedulerapi.KubeSchedulerProfile{
|
|
SchedulerName: "match-node3",
|
|
Plugins: &schedulerapi.Plugins{
|
|
Filter: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "FakeNodeSelector"}}},
|
|
QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}},
|
|
Bind: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}},
|
|
},
|
|
PluginConfig: []schedulerapi.PluginConfig{
|
|
{
|
|
Name: "FakeNodeSelector",
|
|
Args: &runtime.Unknown{Raw: []byte(`{"nodeName":"node3"}`)},
|
|
},
|
|
},
|
|
},
|
|
),
|
|
WithFrameworkOutOfTreeRegistry(frameworkruntime.Registry{
|
|
"FakeNodeSelector": newFakeNodeSelector,
|
|
}),
|
|
)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
// Capture the bindings and events' controllers.
|
|
var wg sync.WaitGroup
|
|
wg.Add(2 * len(pods))
|
|
bindings := make(map[string]string)
|
|
client.PrependReactor("create", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) {
|
|
if action.GetSubresource() != "binding" {
|
|
return false, nil, nil
|
|
}
|
|
binding := action.(clienttesting.CreateAction).GetObject().(*v1.Binding)
|
|
bindings[binding.Name] = binding.Target.Name
|
|
wg.Done()
|
|
return true, binding, nil
|
|
})
|
|
controllers := make(map[string]string)
|
|
stopFn, err := broadcaster.StartEventWatcher(func(obj runtime.Object) {
|
|
e, ok := obj.(*eventsv1.Event)
|
|
if !ok || e.Reason != "Scheduled" {
|
|
return
|
|
}
|
|
controllers[e.Regarding.Name] = e.ReportingController
|
|
wg.Done()
|
|
})
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
defer stopFn()
|
|
|
|
// Run scheduler.
|
|
informerFactory.Start(ctx.Done())
|
|
informerFactory.WaitForCacheSync(ctx.Done())
|
|
if err = sched.WaitForHandlersSync(ctx); err != nil {
|
|
t.Fatalf("Handlers failed to sync: %v: ", err)
|
|
}
|
|
go sched.Run(ctx)
|
|
|
|
// Send pods to be scheduled.
|
|
for _, p := range pods {
|
|
_, err := client.CoreV1().Pods("").Create(ctx, p, metav1.CreateOptions{})
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
}
|
|
wg.Wait()
|
|
|
|
// Verify correct bindings and reporting controllers.
|
|
if diff := cmp.Diff(wantBindings, bindings); diff != "" {
|
|
t.Errorf("pods were scheduled incorrectly (-want, +got):\n%s", diff)
|
|
}
|
|
if diff := cmp.Diff(wantControllers, controllers); diff != "" {
|
|
t.Errorf("events were reported with wrong controllers (-want, +got):\n%s", diff)
|
|
}
|
|
}
|
|
|
|
// TestSchedulerGuaranteeNonNilNodeInSchedulingCycle is for detecting potential panic on nil Node when iterating Nodes.
|
|
func TestSchedulerGuaranteeNonNilNodeInSchedulingCycle(t *testing.T) {
|
|
if goruntime.GOOS == "windows" {
|
|
// TODO: remove skip once the failing test has been fixed.
|
|
t.Skip("Skip failing test on Windows.")
|
|
}
|
|
random := rand.New(rand.NewSource(time.Now().UnixNano()))
|
|
_, ctx := ktesting.NewTestContext(t)
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
|
|
var (
|
|
initialNodeNumber = 1000
|
|
initialPodNumber = 500
|
|
waitSchedulingPodNumber = 200
|
|
deleteNodeNumberPerRound = 20
|
|
createPodNumberPerRound = 50
|
|
|
|
fakeSchedulerName = "fake-scheduler"
|
|
fakeNamespace = "fake-namespace"
|
|
|
|
initialNodes []runtime.Object
|
|
initialPods []runtime.Object
|
|
)
|
|
|
|
for i := 0; i < initialNodeNumber; i++ {
|
|
nodeName := fmt.Sprintf("node%d", i)
|
|
initialNodes = append(initialNodes, st.MakeNode().Name(nodeName).UID(nodeName).Obj())
|
|
}
|
|
// Randomly scatter initial pods onto nodes.
|
|
for i := 0; i < initialPodNumber; i++ {
|
|
podName := fmt.Sprintf("scheduled-pod%d", i)
|
|
assignedNodeName := fmt.Sprintf("node%d", random.Intn(initialNodeNumber))
|
|
initialPods = append(initialPods, st.MakePod().Name(podName).UID(podName).Node(assignedNodeName).Obj())
|
|
}
|
|
|
|
objs := []runtime.Object{&v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: fakeNamespace}}}
|
|
objs = append(objs, initialNodes...)
|
|
objs = append(objs, initialPods...)
|
|
client := clientsetfake.NewClientset(objs...)
|
|
broadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()})
|
|
|
|
informerFactory := informers.NewSharedInformerFactory(client, 0)
|
|
sched, err := New(
|
|
ctx,
|
|
client,
|
|
informerFactory,
|
|
nil,
|
|
profile.NewRecorderFactory(broadcaster),
|
|
WithProfiles(
|
|
schedulerapi.KubeSchedulerProfile{SchedulerName: fakeSchedulerName,
|
|
Plugins: &schedulerapi.Plugins{
|
|
Filter: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "FakeNodeSelectorDependOnPodAnnotation"}}},
|
|
QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}},
|
|
Bind: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}},
|
|
},
|
|
},
|
|
),
|
|
WithFrameworkOutOfTreeRegistry(frameworkruntime.Registry{
|
|
"FakeNodeSelectorDependOnPodAnnotation": newFakeNodeSelectorDependOnPodAnnotation,
|
|
}),
|
|
)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
// Run scheduler.
|
|
informerFactory.Start(ctx.Done())
|
|
informerFactory.WaitForCacheSync(ctx.Done())
|
|
go sched.Run(ctx)
|
|
|
|
var deleteNodeIndex int
|
|
deleteNodesOneRound := func() {
|
|
for i := 0; i < deleteNodeNumberPerRound; i++ {
|
|
if deleteNodeIndex >= initialNodeNumber {
|
|
// all initial nodes are already deleted
|
|
return
|
|
}
|
|
deleteNodeName := fmt.Sprintf("node%d", deleteNodeIndex)
|
|
if err := client.CoreV1().Nodes().Delete(ctx, deleteNodeName, metav1.DeleteOptions{}); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
deleteNodeIndex++
|
|
}
|
|
}
|
|
var createPodIndex int
|
|
createPodsOneRound := func() {
|
|
if createPodIndex > waitSchedulingPodNumber {
|
|
return
|
|
}
|
|
for i := 0; i < createPodNumberPerRound; i++ {
|
|
podName := fmt.Sprintf("pod%d", createPodIndex)
|
|
// Note: the node(specifiedNodeName) may already be deleted, which leads pod scheduled failed.
|
|
specifiedNodeName := fmt.Sprintf("node%d", random.Intn(initialNodeNumber))
|
|
|
|
waitSchedulingPod := st.MakePod().Namespace(fakeNamespace).Name(podName).UID(podName).Annotation(fakeSpecifiedNodeNameAnnotation, specifiedNodeName).SchedulerName(fakeSchedulerName).Obj()
|
|
if _, err := client.CoreV1().Pods(fakeNamespace).Create(ctx, waitSchedulingPod, metav1.CreateOptions{}); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
createPodIndex++
|
|
}
|
|
}
|
|
|
|
// Following we start 2 goroutines asynchronously to detect potential racing issues:
|
|
// 1) One is responsible for deleting several nodes in each round;
|
|
// 2) Another is creating several pods in each round to trigger scheduling;
|
|
// Those two goroutines will stop until ctx.Done() is called, which means all waiting pods are scheduled at least once.
|
|
go wait.Until(deleteNodesOneRound, 10*time.Millisecond, ctx.Done())
|
|
go wait.Until(createPodsOneRound, 9*time.Millisecond, ctx.Done())
|
|
|
|
// Capture the events to wait all pods to be scheduled at least once.
|
|
allWaitSchedulingPods := sets.New[string]()
|
|
for i := 0; i < waitSchedulingPodNumber; i++ {
|
|
allWaitSchedulingPods.Insert(fmt.Sprintf("pod%d", i))
|
|
}
|
|
var (
|
|
wg sync.WaitGroup
|
|
mu sync.Mutex
|
|
)
|
|
wg.Add(waitSchedulingPodNumber)
|
|
stopFn, err := broadcaster.StartEventWatcher(func(obj runtime.Object) {
|
|
e, ok := obj.(*eventsv1.Event)
|
|
if !ok || (e.Reason != "Scheduled" && e.Reason != "FailedScheduling") {
|
|
return
|
|
}
|
|
mu.Lock()
|
|
if allWaitSchedulingPods.Has(e.Regarding.Name) {
|
|
wg.Done()
|
|
allWaitSchedulingPods.Delete(e.Regarding.Name)
|
|
}
|
|
mu.Unlock()
|
|
})
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
defer stopFn()
|
|
|
|
wg.Wait()
|
|
}
|
|
|
|
func TestSchedulerScheduleOne(t *testing.T) {
|
|
testNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node1", UID: types.UID("node1")}}
|
|
client := clientsetfake.NewClientset(&testNode)
|
|
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()})
|
|
|
|
scheduleResultOk := ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}
|
|
emptyScheduleResult := ScheduleResult{}
|
|
fakeBinding := &v1.Binding{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: types.UID("foo")}, Target: v1.ObjectReference{Kind: "Node", Name: testNode.Name}}
|
|
|
|
reserveErr := errors.New("reserve error")
|
|
schedulingErr := errors.New("scheduler")
|
|
permitErr := errors.New("permit error")
|
|
preBindErr := errors.New("on PreBind")
|
|
preBindPreFlightErr := errors.New("on PreBindPreFlight")
|
|
bindingErr := errors.New("binder")
|
|
|
|
testPod := podWithID("foo", "")
|
|
assignedTestPod := podWithID("foo", testNode.Name)
|
|
|
|
type podToAdmit struct {
|
|
pluginName string
|
|
pod types.UID
|
|
}
|
|
|
|
table := []struct {
|
|
name string
|
|
sendPod *v1.Pod
|
|
registerPluginFuncs []tf.RegisterPluginFunc
|
|
injectBindError error
|
|
injectSchedulingError error
|
|
podToAdmit *podToAdmit
|
|
mockScheduleResult ScheduleResult
|
|
expectErrorPod *v1.Pod
|
|
expectForgetPod *v1.Pod
|
|
expectAssumedPod *v1.Pod
|
|
expectPodInBackoffQ *v1.Pod
|
|
expectPodInUnschedulable *v1.Pod
|
|
expectError error
|
|
expectNominatedNodeName string
|
|
expectBind *v1.Binding
|
|
eventReason string
|
|
// If nil, the test case is run with both enabled and disabled
|
|
nominatedNodeNameForExpectationEnabled *bool
|
|
// If nil, the test case is run with both enabled and disabled
|
|
asyncAPICallsEnabled *bool
|
|
}{
|
|
{
|
|
name: "schedule pod failed",
|
|
sendPod: testPod,
|
|
injectSchedulingError: schedulingErr,
|
|
mockScheduleResult: scheduleResultOk,
|
|
expectError: schedulingErr,
|
|
expectErrorPod: testPod,
|
|
expectPodInBackoffQ: testPod,
|
|
eventReason: "FailedScheduling",
|
|
},
|
|
{
|
|
name: "reserve failed with status code error",
|
|
sendPod: testPod,
|
|
registerPluginFuncs: []tf.RegisterPluginFunc{
|
|
tf.RegisterReservePlugin("FakeReserve", tf.NewFakeReservePlugin(fwk.AsStatus(reserveErr))),
|
|
},
|
|
mockScheduleResult: scheduleResultOk,
|
|
expectErrorPod: assignedTestPod,
|
|
expectForgetPod: assignedTestPod,
|
|
expectAssumedPod: assignedTestPod,
|
|
expectPodInBackoffQ: testPod,
|
|
expectError: fmt.Errorf(`running Reserve plugin "FakeReserve": %w`, reserveErr),
|
|
eventReason: "FailedScheduling",
|
|
},
|
|
{
|
|
name: "reserve failed with status code rejected",
|
|
sendPod: testPod,
|
|
registerPluginFuncs: []tf.RegisterPluginFunc{
|
|
tf.RegisterReservePlugin("FakeReserve", tf.NewFakeReservePlugin(fwk.NewStatus(fwk.UnschedulableAndUnresolvable, "rejected on reserve"))),
|
|
},
|
|
mockScheduleResult: scheduleResultOk,
|
|
expectErrorPod: assignedTestPod,
|
|
expectForgetPod: assignedTestPod,
|
|
expectAssumedPod: assignedTestPod,
|
|
expectPodInUnschedulable: testPod,
|
|
expectError: makePredicateError("1 rejected on reserve"),
|
|
eventReason: "FailedScheduling",
|
|
},
|
|
{
|
|
name: "permit failed with status code error",
|
|
sendPod: testPod,
|
|
registerPluginFuncs: []tf.RegisterPluginFunc{
|
|
tf.RegisterPermitPlugin("FakePermit", tf.NewFakePermitPlugin(fwk.AsStatus(permitErr), time.Minute)),
|
|
},
|
|
mockScheduleResult: scheduleResultOk,
|
|
expectErrorPod: assignedTestPod,
|
|
expectForgetPod: assignedTestPod,
|
|
expectAssumedPod: assignedTestPod,
|
|
expectPodInBackoffQ: testPod,
|
|
expectError: fmt.Errorf(`running Permit plugin "FakePermit": %w`, permitErr),
|
|
eventReason: "FailedScheduling",
|
|
},
|
|
{
|
|
name: "permit failed with status code rejected",
|
|
sendPod: testPod,
|
|
registerPluginFuncs: []tf.RegisterPluginFunc{
|
|
tf.RegisterPermitPlugin("FakePermit", tf.NewFakePermitPlugin(fwk.NewStatus(fwk.Unschedulable, "rejected on permit"), time.Minute)),
|
|
},
|
|
mockScheduleResult: scheduleResultOk,
|
|
expectErrorPod: assignedTestPod,
|
|
expectForgetPod: assignedTestPod,
|
|
expectAssumedPod: assignedTestPod,
|
|
expectPodInUnschedulable: testPod,
|
|
expectError: makePredicateError("1 rejected on permit"),
|
|
eventReason: "FailedScheduling",
|
|
},
|
|
{
|
|
name: "nominated node name is not set, permit plugin is working, but the feature gate NominatedNodeNameForExpectation is disabled",
|
|
sendPod: testPod,
|
|
registerPluginFuncs: []tf.RegisterPluginFunc{
|
|
tf.RegisterPermitPlugin("FakePermit", tf.NewFakePermitPlugin(fwk.NewStatus(fwk.Wait, "rejected on permit"), time.Minute)),
|
|
},
|
|
podToAdmit: &podToAdmit{pluginName: "FakePermit", pod: testPod.UID},
|
|
mockScheduleResult: scheduleResultOk,
|
|
nominatedNodeNameForExpectationEnabled: ptr.To(false),
|
|
expectAssumedPod: assignedTestPod,
|
|
expectBind: fakeBinding,
|
|
eventReason: "Scheduled",
|
|
},
|
|
{
|
|
name: "nominated node name is set, permit plugin is working in wait on permit phase",
|
|
sendPod: testPod,
|
|
registerPluginFuncs: []tf.RegisterPluginFunc{
|
|
tf.RegisterPermitPlugin("FakePermit", tf.NewFakePermitPlugin(fwk.NewStatus(fwk.Wait, "rejected on permit"), time.Minute)),
|
|
},
|
|
podToAdmit: &podToAdmit{pluginName: "FakePermit", pod: testPod.UID},
|
|
mockScheduleResult: scheduleResultOk,
|
|
expectNominatedNodeName: testNode.Name,
|
|
// Depending on the timing, if asyncAPICallsEnabled, we might miss NNN update because the nnn update is overwritten by the binding.
|
|
// So, it's safe to run this test with asyncAPICallsEnabled = false only.
|
|
asyncAPICallsEnabled: ptr.To(false),
|
|
nominatedNodeNameForExpectationEnabled: ptr.To(true),
|
|
expectAssumedPod: assignedTestPod,
|
|
expectBind: fakeBinding,
|
|
eventReason: "Scheduled",
|
|
},
|
|
{
|
|
name: "prebindpreflight failed with status code error",
|
|
sendPod: testPod,
|
|
registerPluginFuncs: []tf.RegisterPluginFunc{
|
|
tf.RegisterPreBindPlugin("FakePreBind", tf.NewFakePreBindPlugin(fwk.AsStatus(preBindPreFlightErr), nil)),
|
|
},
|
|
mockScheduleResult: scheduleResultOk,
|
|
expectErrorPod: assignedTestPod,
|
|
expectForgetPod: assignedTestPod,
|
|
nominatedNodeNameForExpectationEnabled: ptr.To(true),
|
|
expectAssumedPod: assignedTestPod,
|
|
expectPodInBackoffQ: testPod,
|
|
expectError: fmt.Errorf(`running PreBindPreFlight "FakePreBind": %w`, preBindPreFlightErr),
|
|
eventReason: "FailedScheduling",
|
|
},
|
|
{
|
|
name: "prebindpreflight failed with status code unschedulable",
|
|
sendPod: testPod,
|
|
registerPluginFuncs: []tf.RegisterPluginFunc{
|
|
tf.RegisterPreBindPlugin("FakePreBind", tf.NewFakePreBindPlugin(fwk.NewStatus(fwk.Unschedulable, "rejected on prebindpreflight"), nil)),
|
|
},
|
|
mockScheduleResult: scheduleResultOk,
|
|
expectErrorPod: assignedTestPod,
|
|
expectForgetPod: assignedTestPod,
|
|
nominatedNodeNameForExpectationEnabled: ptr.To(true),
|
|
expectAssumedPod: assignedTestPod,
|
|
expectPodInBackoffQ: testPod,
|
|
expectError: fmt.Errorf("PreBindPreFlight FakePreBind returned \"Unschedulable\", which is unsupported. It is supposed to return Success, Skip, or Error status"),
|
|
eventReason: "FailedScheduling",
|
|
},
|
|
{
|
|
name: "prebind isn't called and nominated node name isn't set, if preflight returns skip",
|
|
sendPod: testPod,
|
|
registerPluginFuncs: []tf.RegisterPluginFunc{
|
|
// Configure it to return error on prebind to make sure it's not called.
|
|
tf.RegisterPreBindPlugin("FakePreBind", tf.NewFakePreBindPlugin(fwk.NewStatus(fwk.Skip), fwk.NewStatus(fwk.Error, "rejected on prebind"))),
|
|
},
|
|
mockScheduleResult: scheduleResultOk,
|
|
expectAssumedPod: assignedTestPod,
|
|
nominatedNodeNameForExpectationEnabled: ptr.To(true),
|
|
expectBind: fakeBinding,
|
|
eventReason: "Scheduled",
|
|
},
|
|
{
|
|
name: "prebind is called and nominated node name is set, if preflight returns success",
|
|
sendPod: testPod,
|
|
registerPluginFuncs: []tf.RegisterPluginFunc{
|
|
tf.RegisterPreBindPlugin("FakePreBind", tf.NewFakePreBindPlugin(nil, nil)),
|
|
},
|
|
mockScheduleResult: scheduleResultOk,
|
|
nominatedNodeNameForExpectationEnabled: ptr.To(true),
|
|
// Depending on the timing, if asyncAPICallsEnabled, we might miss NNN update because the nnn update is overwritten by the binding.
|
|
// So, it's safe to run this test with asyncAPICallsEnabled = false only.
|
|
asyncAPICallsEnabled: ptr.To(false),
|
|
expectNominatedNodeName: testNode.Name,
|
|
expectAssumedPod: assignedTestPod,
|
|
expectBind: fakeBinding,
|
|
eventReason: "Scheduled",
|
|
},
|
|
{
|
|
name: "prebind failed with status code error",
|
|
sendPod: testPod,
|
|
registerPluginFuncs: []tf.RegisterPluginFunc{
|
|
tf.RegisterPreBindPlugin("FakePreBind", tf.NewFakePreBindPlugin(nil, fwk.AsStatus(preBindErr))),
|
|
},
|
|
mockScheduleResult: scheduleResultOk,
|
|
expectErrorPod: assignedTestPod,
|
|
expectForgetPod: assignedTestPod,
|
|
nominatedNodeNameForExpectationEnabled: ptr.To(true),
|
|
// Depending on the timing, if asyncAPICallsEnabled, we might miss NNN update because the nnn update is overwritten by the binding.
|
|
// So, it's safe to run this test with asyncAPICallsEnabled = false only.
|
|
asyncAPICallsEnabled: ptr.To(false),
|
|
expectNominatedNodeName: testNode.Name,
|
|
expectAssumedPod: assignedTestPod,
|
|
expectPodInBackoffQ: testPod,
|
|
expectError: fmt.Errorf(`running PreBind plugin "FakePreBind": %w`, preBindErr),
|
|
eventReason: "FailedScheduling",
|
|
},
|
|
{
|
|
name: "binding failed",
|
|
sendPod: testPod,
|
|
injectBindError: bindingErr,
|
|
mockScheduleResult: scheduleResultOk,
|
|
expectBind: fakeBinding,
|
|
expectAssumedPod: assignedTestPod,
|
|
expectError: fmt.Errorf("running Bind plugin %q: %w", "DefaultBinder", bindingErr),
|
|
expectErrorPod: assignedTestPod,
|
|
expectForgetPod: assignedTestPod,
|
|
expectPodInBackoffQ: testPod,
|
|
eventReason: "FailedScheduling",
|
|
},
|
|
{
|
|
name: "bind assumed pod scheduled",
|
|
sendPod: testPod,
|
|
mockScheduleResult: scheduleResultOk,
|
|
expectBind: fakeBinding,
|
|
expectAssumedPod: assignedTestPod,
|
|
eventReason: "Scheduled",
|
|
},
|
|
{
|
|
name: "deleting pod",
|
|
sendPod: deletingPod("foo"),
|
|
mockScheduleResult: emptyScheduleResult,
|
|
eventReason: "FailedScheduling",
|
|
},
|
|
{
|
|
name: "pod with existing nominated node name on scheduling error keeps nomination",
|
|
sendPod: func() *v1.Pod {
|
|
p := podWithID("foo", "")
|
|
p.Status.NominatedNodeName = "existing-node"
|
|
return p
|
|
}(),
|
|
injectSchedulingError: schedulingErr,
|
|
mockScheduleResult: scheduleResultOk,
|
|
expectError: schedulingErr,
|
|
expectErrorPod: func() *v1.Pod {
|
|
p := podWithID("foo", "")
|
|
p.Status.NominatedNodeName = "existing-node"
|
|
return p
|
|
}(),
|
|
expectPodInBackoffQ: func() *v1.Pod {
|
|
p := podWithID("foo", "")
|
|
p.Status.NominatedNodeName = "existing-node"
|
|
return p
|
|
}(),
|
|
// Depending on the timing, if asyncAPICallsEnabled, the NNN update might not be sent yet while checking the expectNominatedNodeName.
|
|
// So, asyncAPICallsEnabled is set to false.
|
|
asyncAPICallsEnabled: ptr.To(false),
|
|
nominatedNodeNameForExpectationEnabled: ptr.To(true),
|
|
expectNominatedNodeName: "existing-node",
|
|
eventReason: "FailedScheduling",
|
|
},
|
|
{
|
|
name: "pod with existing nominated node name on scheduling error clears nomination",
|
|
sendPod: func() *v1.Pod {
|
|
p := podWithID("foo", "")
|
|
p.Status.NominatedNodeName = "existing-node"
|
|
return p
|
|
}(),
|
|
injectSchedulingError: schedulingErr,
|
|
mockScheduleResult: scheduleResultOk,
|
|
expectError: schedulingErr,
|
|
expectErrorPod: func() *v1.Pod {
|
|
p := podWithID("foo", "")
|
|
p.Status.NominatedNodeName = "existing-node"
|
|
return p
|
|
}(),
|
|
expectPodInBackoffQ: func() *v1.Pod {
|
|
p := podWithID("foo", "")
|
|
p.Status.NominatedNodeName = "existing-node"
|
|
return p
|
|
}(),
|
|
// Depending on the timing, if asyncAPICallsEnabled, the NNN update might not be sent yet while checking the expectNominatedNodeName.
|
|
// So, asyncAPICallsEnabled is set to false.
|
|
asyncAPICallsEnabled: ptr.To(false),
|
|
nominatedNodeNameForExpectationEnabled: ptr.To(false),
|
|
eventReason: "FailedScheduling",
|
|
},
|
|
}
|
|
|
|
// Test with QueueingHints and NominatedNodeNameForExpectation feature gates
|
|
for _, qHintEnabled := range []bool{true, false} {
|
|
for _, item := range table {
|
|
asyncAPICallsEnabled := []bool{true, false}
|
|
if item.asyncAPICallsEnabled != nil {
|
|
asyncAPICallsEnabled = []bool{*item.asyncAPICallsEnabled}
|
|
}
|
|
for _, asyncAPICallsEnabled := range asyncAPICallsEnabled {
|
|
nominatedNodeNameForExpectationEnabled := []bool{true, false}
|
|
if item.nominatedNodeNameForExpectationEnabled != nil {
|
|
nominatedNodeNameForExpectationEnabled = []bool{*item.nominatedNodeNameForExpectationEnabled}
|
|
}
|
|
for _, nominatedNodeNameForExpectationEnabled := range nominatedNodeNameForExpectationEnabled {
|
|
if (asyncAPICallsEnabled || nominatedNodeNameForExpectationEnabled) && !qHintEnabled {
|
|
// If the QHint feature gate is disabled, NominatedNodeNameForExpectation and SchedulerAsyncAPICalls cannot be enabled
|
|
// because that means users set the emilation version to 1.33 or later.
|
|
continue
|
|
}
|
|
t.Run(fmt.Sprintf("%s (Queueing hints enabled: %v, Async API calls enabled: %v, NominatedNodeNameForExpectation enabled: %v)", item.name, qHintEnabled, asyncAPICallsEnabled, nominatedNodeNameForExpectationEnabled), func(t *testing.T) {
|
|
if !qHintEnabled {
|
|
featuregatetesting.SetFeatureGateEmulationVersionDuringTest(t, utilfeature.DefaultFeatureGate, version.MustParse("1.33"))
|
|
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerQueueingHints, false)
|
|
}
|
|
logger, ctx := ktesting.NewTestContext(t)
|
|
var gotError error
|
|
var gotPod *v1.Pod
|
|
var gotForgetPod *v1.Pod
|
|
var gotAssumedPod *v1.Pod
|
|
var gotBinding *v1.Binding
|
|
var gotNominatingInfo *fwk.NominatingInfo
|
|
|
|
client := clientsetfake.NewClientset(item.sendPod)
|
|
informerFactory := informers.NewSharedInformerFactory(client, 0)
|
|
client.PrependReactor("create", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) {
|
|
if action.GetSubresource() != "binding" {
|
|
return false, nil, nil
|
|
}
|
|
gotBinding = action.(clienttesting.CreateAction).GetObject().(*v1.Binding)
|
|
return true, gotBinding, item.injectBindError
|
|
})
|
|
|
|
var apiDispatcher *apidispatcher.APIDispatcher
|
|
if asyncAPICallsEnabled {
|
|
apiDispatcher = apidispatcher.New(client, 16, apicalls.Relevances)
|
|
apiDispatcher.Run(logger)
|
|
defer apiDispatcher.Close()
|
|
}
|
|
|
|
internalCache := internalcache.New(ctx, 30*time.Second, apiDispatcher)
|
|
cache := &fakecache.Cache{
|
|
Cache: internalCache,
|
|
ForgetFunc: func(pod *v1.Pod) {
|
|
gotForgetPod = pod
|
|
},
|
|
AssumeFunc: func(pod *v1.Pod) {
|
|
gotAssumedPod = pod
|
|
},
|
|
IsAssumedPodFunc: func(pod *v1.Pod) bool {
|
|
if pod == nil || gotAssumedPod == nil {
|
|
return false
|
|
}
|
|
return pod.UID == gotAssumedPod.UID
|
|
},
|
|
}
|
|
mu := &sync.Mutex{}
|
|
updatedNominatedNodeName := item.sendPod.Status.NominatedNodeName
|
|
client.PrependReactor("patch", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) {
|
|
if action.GetSubresource() != "status" {
|
|
return false, nil, nil
|
|
}
|
|
patchAction := action.(clienttesting.PatchAction)
|
|
patch := patchAction.GetPatch()
|
|
patchMap := map[string]map[string]json.RawMessage{}
|
|
if err := json.Unmarshal(patch, &patchMap); err != nil {
|
|
t.Fatalf("Failed to unmarshal patch %q: %v", patch, err)
|
|
}
|
|
statusMap, ok := patchMap["status"]
|
|
if !ok {
|
|
t.Fatalf("patch doesn't include status: %q", patch)
|
|
}
|
|
nnn, ok := statusMap["nominatedNodeName"]
|
|
if !ok {
|
|
return false, nil, nil
|
|
}
|
|
mu.Lock()
|
|
updatedNominatedNodeName = strings.Trim(string(nnn), "\"")
|
|
if updatedNominatedNodeName == "null" {
|
|
// NNN has to be cleared with this patch.
|
|
updatedNominatedNodeName = ""
|
|
}
|
|
mu.Unlock()
|
|
return false, nil, nil
|
|
})
|
|
|
|
schedFramework, err := tf.NewFramework(ctx,
|
|
append(item.registerPluginFuncs,
|
|
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
|
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
|
),
|
|
testSchedulerName,
|
|
frameworkruntime.WithClientSet(client),
|
|
frameworkruntime.WithAPIDispatcher(apiDispatcher),
|
|
frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName)),
|
|
frameworkruntime.WithWaitingPods(frameworkruntime.NewWaitingPodsMap()),
|
|
frameworkruntime.WithInformerFactory(informerFactory),
|
|
)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
ar := metrics.NewMetricsAsyncRecorder(10, 1*time.Second, ctx.Done())
|
|
queue := internalqueue.NewSchedulingQueue(nil, informerFactory, internalqueue.WithMetricsRecorder(ar), internalqueue.WithAPIDispatcher(apiDispatcher))
|
|
if asyncAPICallsEnabled {
|
|
schedFramework.SetAPICacher(apicache.New(queue, cache))
|
|
}
|
|
|
|
sched := &Scheduler{
|
|
Cache: cache,
|
|
client: client,
|
|
NextPod: queue.Pop,
|
|
SchedulingQueue: queue,
|
|
Profiles: profile.Map{testSchedulerName: schedFramework},
|
|
APIDispatcher: apiDispatcher,
|
|
nominatedNodeNameForExpectationEnabled: nominatedNodeNameForExpectationEnabled,
|
|
}
|
|
queue.Add(logger, item.sendPod)
|
|
|
|
sched.SchedulePod = func(ctx context.Context, fwk framework.Framework, state fwk.CycleState, pod *v1.Pod) (ScheduleResult, error) {
|
|
return item.mockScheduleResult, item.injectSchedulingError
|
|
}
|
|
sched.FailureHandler = func(ctx context.Context, fwk framework.Framework, p *framework.QueuedPodInfo, status *fwk.Status, ni *fwk.NominatingInfo, start time.Time) {
|
|
gotPod = p.Pod
|
|
gotError = status.AsError()
|
|
gotNominatingInfo = ni
|
|
|
|
sched.handleSchedulingFailure(ctx, fwk, p, status, ni, start)
|
|
}
|
|
called := make(chan struct{})
|
|
stopFunc, err := eventBroadcaster.StartEventWatcher(func(obj runtime.Object) {
|
|
e, _ := obj.(*eventsv1.Event)
|
|
if e.Reason != item.eventReason {
|
|
t.Errorf("got event %v, want %v", e.Reason, item.eventReason)
|
|
}
|
|
close(called)
|
|
})
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
informerFactory.Start(ctx.Done())
|
|
informerFactory.WaitForCacheSync(ctx.Done())
|
|
sched.ScheduleOne(ctx)
|
|
|
|
if item.podToAdmit != nil {
|
|
for {
|
|
if waitingPod := sched.Profiles[testSchedulerName].GetWaitingPod(item.podToAdmit.pod); waitingPod != nil {
|
|
waitingPod.Allow(item.podToAdmit.pluginName)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
<-called
|
|
|
|
mu.Lock()
|
|
if item.expectNominatedNodeName != updatedNominatedNodeName {
|
|
t.Errorf("Expected nominated node name %q, got %q", item.expectNominatedNodeName, updatedNominatedNodeName)
|
|
}
|
|
mu.Unlock()
|
|
|
|
if diff := cmp.Diff(item.expectAssumedPod, gotAssumedPod); diff != "" {
|
|
t.Errorf("Unexpected assumed pod (-want,+got):\n%s", diff)
|
|
}
|
|
if diff := cmp.Diff(item.expectErrorPod, gotPod); diff != "" {
|
|
t.Errorf("Unexpected error pod (-want,+got):\n%s", diff)
|
|
}
|
|
if diff := cmp.Diff(item.expectForgetPod, gotForgetPod); diff != "" {
|
|
t.Errorf("Unexpected forget pod (-want,+got):\n%s", diff)
|
|
}
|
|
if item.expectError == nil || gotError == nil {
|
|
if !errors.Is(gotError, item.expectError) {
|
|
t.Errorf("Unexpected error. Wanted %v, got %v", item.expectError, gotError)
|
|
}
|
|
} else if item.expectError.Error() != gotError.Error() {
|
|
t.Errorf("Unexpected error. Wanted %v, got %v", item.expectError.Error(), gotError.Error())
|
|
}
|
|
if item.expectError != nil {
|
|
var expectedNominatingInfo *fwk.NominatingInfo
|
|
// Check nominatingInfo expectation based on feature gate
|
|
if !nominatedNodeNameForExpectationEnabled {
|
|
expectedNominatingInfo = &fwk.NominatingInfo{NominatingMode: fwk.ModeOverride, NominatedNodeName: ""}
|
|
}
|
|
if diff := cmp.Diff(expectedNominatingInfo, gotNominatingInfo); diff != "" {
|
|
t.Errorf("Unexpected nominatingInfo (-want,+got):\n%s", diff)
|
|
}
|
|
}
|
|
if diff := cmp.Diff(item.expectBind, gotBinding); diff != "" {
|
|
t.Errorf("Unexpected binding (-want,+got):\n%s", diff)
|
|
}
|
|
// We have to use wait here because the Pod goes to the binding cycle in some test cases
|
|
// and the inflight pods might not be empty immediately at this point in such case.
|
|
if err := wait.PollUntilContextTimeout(ctx, time.Millisecond*200, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) {
|
|
return len(queue.InFlightPods()) == 0, nil
|
|
}); err != nil {
|
|
t.Errorf("in-flight pods should be always empty after SchedulingOne. It has %v Pods", len(queue.InFlightPods()))
|
|
}
|
|
podsInBackoffQ := queue.PodsInBackoffQ()
|
|
if item.expectPodInBackoffQ != nil {
|
|
if !podListContainsPod(podsInBackoffQ, item.expectPodInBackoffQ) {
|
|
t.Errorf("Expected to find pod in backoffQ, but it's not there.\nWant: %v,\ngot: %v", item.expectPodInBackoffQ, podsInBackoffQ)
|
|
}
|
|
} else {
|
|
if len(podsInBackoffQ) > 0 {
|
|
t.Errorf("Expected backoffQ to be empty, but it's not.\nGot: %v", podsInBackoffQ)
|
|
}
|
|
}
|
|
unschedulablePods := queue.UnschedulablePods()
|
|
if item.expectPodInUnschedulable != nil {
|
|
if !podListContainsPod(unschedulablePods, item.expectPodInUnschedulable) {
|
|
t.Errorf("Expected to find pod in unschedulable, but it's not there.\nWant: %v,\ngot: %v", item.expectPodInUnschedulable, unschedulablePods)
|
|
}
|
|
} else {
|
|
if len(unschedulablePods) > 0 {
|
|
t.Errorf("Expected unschedulable pods to be empty, but it's not.\nGot: %v", unschedulablePods)
|
|
}
|
|
}
|
|
stopFunc()
|
|
})
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Tests the logic removing pods from inFlightPods after Permit (needed to fix issue https://github.com/kubernetes/kubernetes/issues/129967).
|
|
// This needs to be a separate test case, because it mocks the waitOnPermit and runPrebindPlugins functions.
|
|
func TestScheduleOneMarksPodAsProcessedBeforePreBind(t *testing.T) {
|
|
testNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node1", UID: types.UID("node1")}}
|
|
client := clientsetfake.NewClientset(&testNode)
|
|
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()})
|
|
|
|
scheduleResultOk := ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}
|
|
emptyScheduleResult := ScheduleResult{}
|
|
bindingOk := &v1.Binding{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: types.UID("foo")}, Target: v1.ObjectReference{Kind: "Node", Name: testNode.Name}}
|
|
schedulingErr := errors.New("scheduler")
|
|
bindingErr := errors.New("binder")
|
|
preBindErr := errors.New("on PreBind")
|
|
permitErr := errors.New("permit")
|
|
waitOnPermitErr := errors.New("wait on permit")
|
|
|
|
testPod := podWithID("foo", "")
|
|
assignedTestPod := podWithID("foo", testNode.Name)
|
|
|
|
table := []struct {
|
|
name string
|
|
sendPod *v1.Pod
|
|
registerPluginFuncs []tf.RegisterPluginFunc
|
|
injectSchedulingError error
|
|
injectBindError error
|
|
mockScheduleResult ScheduleResult
|
|
mockWaitOnPermitResult *fwk.Status
|
|
mockRunPreBindPluginsResult *fwk.Status
|
|
expectErrorPod *v1.Pod
|
|
expectAssumedPod *v1.Pod
|
|
expectError error
|
|
expectBind *v1.Binding
|
|
eventReason string
|
|
expectPodIsInFlightAtFailureHandler bool
|
|
expectPodIsInFlightAtWaitOnPermit bool
|
|
}{
|
|
{
|
|
name: "error on permit",
|
|
sendPod: testPod,
|
|
mockScheduleResult: scheduleResultOk,
|
|
registerPluginFuncs: []tf.RegisterPluginFunc{
|
|
tf.RegisterPermitPlugin("FakePermit", tf.NewFakePermitPlugin(fwk.AsStatus(permitErr), time.Minute)),
|
|
},
|
|
expectErrorPod: assignedTestPod,
|
|
expectAssumedPod: assignedTestPod,
|
|
expectError: fmt.Errorf(`running Permit plugin "FakePermit": %w`, permitErr),
|
|
eventReason: "FailedScheduling",
|
|
expectPodIsInFlightAtFailureHandler: true,
|
|
},
|
|
{
|
|
name: "pod rejected on permit",
|
|
sendPod: testPod,
|
|
mockScheduleResult: scheduleResultOk,
|
|
registerPluginFuncs: []tf.RegisterPluginFunc{
|
|
tf.RegisterPermitPlugin("FakePermit", tf.NewFakePermitPlugin(fwk.NewStatus(fwk.Unschedulable, "on permit"), time.Minute)),
|
|
},
|
|
expectErrorPod: assignedTestPod,
|
|
expectAssumedPod: assignedTestPod,
|
|
expectError: makePredicateError("1 on permit"),
|
|
eventReason: "FailedScheduling",
|
|
expectPodIsInFlightAtFailureHandler: true,
|
|
},
|
|
{
|
|
name: "error on wait on permit",
|
|
sendPod: testPod,
|
|
mockScheduleResult: scheduleResultOk,
|
|
registerPluginFuncs: []tf.RegisterPluginFunc{
|
|
tf.RegisterPermitPlugin("FakePermit", tf.NewFakePermitPlugin(fwk.NewStatus(fwk.Wait), time.Minute)),
|
|
},
|
|
mockWaitOnPermitResult: fwk.AsStatus(waitOnPermitErr),
|
|
expectErrorPod: assignedTestPod,
|
|
expectAssumedPod: assignedTestPod,
|
|
expectError: waitOnPermitErr,
|
|
eventReason: "FailedScheduling",
|
|
expectPodIsInFlightAtFailureHandler: true,
|
|
expectPodIsInFlightAtWaitOnPermit: true,
|
|
},
|
|
{
|
|
name: "pod rejected while wait on permit",
|
|
sendPod: testPod,
|
|
mockScheduleResult: scheduleResultOk,
|
|
registerPluginFuncs: []tf.RegisterPluginFunc{
|
|
tf.RegisterPermitPlugin("FakePermit", tf.NewFakePermitPlugin(fwk.NewStatus(fwk.Wait), time.Minute)),
|
|
},
|
|
mockWaitOnPermitResult: fwk.NewStatus(fwk.Unschedulable, "wait on permit"),
|
|
expectErrorPod: assignedTestPod,
|
|
expectAssumedPod: assignedTestPod,
|
|
expectError: makePredicateError("1 wait on permit"),
|
|
eventReason: "FailedScheduling",
|
|
expectPodIsInFlightAtFailureHandler: true,
|
|
expectPodIsInFlightAtWaitOnPermit: true,
|
|
},
|
|
{
|
|
name: "error prebind pod",
|
|
sendPod: testPod,
|
|
mockScheduleResult: scheduleResultOk,
|
|
registerPluginFuncs: []tf.RegisterPluginFunc{
|
|
tf.RegisterPreBindPlugin("FakePreBind", tf.NewFakePreBindPlugin(nil, fwk.NewStatus(fwk.Unschedulable))),
|
|
},
|
|
mockWaitOnPermitResult: fwk.NewStatus(fwk.Success),
|
|
mockRunPreBindPluginsResult: fwk.NewStatus(fwk.Unschedulable, preBindErr.Error()),
|
|
expectErrorPod: assignedTestPod,
|
|
expectAssumedPod: assignedTestPod,
|
|
expectError: preBindErr,
|
|
eventReason: "FailedScheduling",
|
|
expectPodIsInFlightAtFailureHandler: false,
|
|
expectPodIsInFlightAtWaitOnPermit: true,
|
|
},
|
|
{
|
|
name: "bind assumed pod scheduled",
|
|
sendPod: testPod,
|
|
mockScheduleResult: scheduleResultOk,
|
|
expectBind: bindingOk,
|
|
expectAssumedPod: assignedTestPod,
|
|
mockWaitOnPermitResult: fwk.NewStatus(fwk.Success),
|
|
mockRunPreBindPluginsResult: fwk.NewStatus(fwk.Success),
|
|
eventReason: "Scheduled",
|
|
expectPodIsInFlightAtFailureHandler: false,
|
|
expectPodIsInFlightAtWaitOnPermit: true,
|
|
},
|
|
{
|
|
name: "error pod failed scheduling",
|
|
sendPod: testPod,
|
|
mockScheduleResult: scheduleResultOk,
|
|
injectSchedulingError: schedulingErr,
|
|
expectError: schedulingErr,
|
|
expectErrorPod: testPod,
|
|
eventReason: "FailedScheduling",
|
|
expectPodIsInFlightAtFailureHandler: true,
|
|
},
|
|
{
|
|
name: "error bind forget pod failed scheduling",
|
|
sendPod: testPod,
|
|
mockScheduleResult: scheduleResultOk,
|
|
mockWaitOnPermitResult: fwk.NewStatus(fwk.Success),
|
|
mockRunPreBindPluginsResult: fwk.NewStatus(fwk.Success),
|
|
expectBind: bindingOk,
|
|
expectAssumedPod: assignedTestPod,
|
|
injectBindError: bindingErr,
|
|
expectError: fmt.Errorf("running Bind plugin %q: %w", "DefaultBinder", bindingErr),
|
|
expectErrorPod: assignedTestPod,
|
|
eventReason: "FailedScheduling",
|
|
expectPodIsInFlightAtFailureHandler: false,
|
|
expectPodIsInFlightAtWaitOnPermit: true,
|
|
},
|
|
{
|
|
name: "deleting pod",
|
|
sendPod: deletingPod("foo"),
|
|
mockScheduleResult: emptyScheduleResult,
|
|
eventReason: "FailedScheduling",
|
|
expectPodIsInFlightAtFailureHandler: false,
|
|
expectPodIsInFlightAtWaitOnPermit: false,
|
|
},
|
|
}
|
|
|
|
for _, qHintEnabled := range []bool{true, false} {
|
|
for _, asyncAPICallsEnabled := range []bool{true, false} {
|
|
for _, item := range table {
|
|
t.Run(fmt.Sprintf("%s (Queueing hints enabled: %v, Async API calls enabled: %v)", item.name, qHintEnabled, asyncAPICallsEnabled), func(t *testing.T) {
|
|
if !qHintEnabled {
|
|
featuregatetesting.SetFeatureGateEmulationVersionDuringTest(t, utilfeature.DefaultFeatureGate, version.MustParse("1.33"))
|
|
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerQueueingHints, false)
|
|
}
|
|
logger, ctx := ktesting.NewTestContext(t)
|
|
var gotError error
|
|
var gotPod *v1.Pod
|
|
var gotAssumedPod *v1.Pod
|
|
var gotBinding *v1.Binding
|
|
var gotCallsToFailureHandler int
|
|
var gotPodIsInFlightAtFailureHandler bool
|
|
var gotPodIsInFlightAtWaitOnPermit bool
|
|
var gotPodIsInFlightAtRunPreBindPlugins bool
|
|
|
|
client := clientsetfake.NewClientset(item.sendPod)
|
|
client.PrependReactor("create", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) {
|
|
if action.GetSubresource() != "binding" {
|
|
return false, nil, nil
|
|
}
|
|
gotBinding = action.(clienttesting.CreateAction).GetObject().(*v1.Binding)
|
|
return true, gotBinding, item.injectBindError
|
|
})
|
|
|
|
var apiDispatcher *apidispatcher.APIDispatcher
|
|
if asyncAPICallsEnabled {
|
|
apiDispatcher = apidispatcher.New(client, 16, apicalls.Relevances)
|
|
apiDispatcher.Run(logger)
|
|
defer apiDispatcher.Close()
|
|
}
|
|
|
|
internalCache := internalcache.New(ctx, 30*time.Second, apiDispatcher)
|
|
cache := &fakecache.Cache{
|
|
Cache: internalCache,
|
|
ForgetFunc: func(pod *v1.Pod) {
|
|
},
|
|
AssumeFunc: func(pod *v1.Pod) {
|
|
gotAssumedPod = pod
|
|
},
|
|
IsAssumedPodFunc: func(pod *v1.Pod) bool {
|
|
if pod == nil || gotAssumedPod == nil {
|
|
return false
|
|
}
|
|
return pod.UID == gotAssumedPod.UID
|
|
},
|
|
}
|
|
|
|
informerFactory := informers.NewSharedInformerFactory(client, 0)
|
|
ar := metrics.NewMetricsAsyncRecorder(10, 1*time.Second, ctx.Done())
|
|
queue := internalqueue.NewSchedulingQueue(nil, informerFactory, internalqueue.WithMetricsRecorder(ar), internalqueue.WithAPIDispatcher(apiDispatcher))
|
|
|
|
schedFramework, err := NewFakeFramework(
|
|
ctx,
|
|
queue,
|
|
append(item.registerPluginFuncs,
|
|
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
|
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
|
),
|
|
testSchedulerName,
|
|
frameworkruntime.WithClientSet(client),
|
|
frameworkruntime.WithAPIDispatcher(apiDispatcher),
|
|
frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName)),
|
|
frameworkruntime.WithWaitingPods(frameworkruntime.NewWaitingPodsMap()),
|
|
)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if asyncAPICallsEnabled {
|
|
schedFramework.SetAPICacher(apicache.New(queue, cache))
|
|
}
|
|
|
|
schedFramework.waitOnPermitFn = func(_ context.Context, pod *v1.Pod) *fwk.Status {
|
|
gotPodIsInFlightAtWaitOnPermit = podListContainsPod(schedFramework.queue.InFlightPods(), pod)
|
|
return item.mockWaitOnPermitResult
|
|
}
|
|
schedFramework.runPreBindPluginsFn = func(_ context.Context, _ fwk.CycleState, pod *v1.Pod, _ string) *fwk.Status {
|
|
gotPodIsInFlightAtRunPreBindPlugins = podListContainsPod(schedFramework.queue.InFlightPods(), pod)
|
|
return item.mockRunPreBindPluginsResult
|
|
}
|
|
|
|
sched := &Scheduler{
|
|
Cache: cache,
|
|
client: client,
|
|
NextPod: queue.Pop,
|
|
SchedulingQueue: queue,
|
|
Profiles: profile.Map{testSchedulerName: schedFramework},
|
|
APIDispatcher: apiDispatcher,
|
|
}
|
|
queue.Add(logger, item.sendPod)
|
|
|
|
sched.SchedulePod = func(ctx context.Context, fwk framework.Framework, state fwk.CycleState, pod *v1.Pod) (ScheduleResult, error) {
|
|
return item.mockScheduleResult, item.injectSchedulingError
|
|
}
|
|
sched.FailureHandler = func(_ context.Context, fwk framework.Framework, p *framework.QueuedPodInfo, status *fwk.Status, _ *fwk.NominatingInfo, _ time.Time) {
|
|
gotCallsToFailureHandler++
|
|
gotPodIsInFlightAtFailureHandler = podListContainsPod(queue.InFlightPods(), p.Pod)
|
|
|
|
gotPod = p.Pod
|
|
gotError = status.AsError()
|
|
|
|
msg := truncateMessage(gotError.Error())
|
|
fwk.EventRecorder().Eventf(p.Pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", msg)
|
|
|
|
queue.Done(p.Pod.UID)
|
|
}
|
|
called := make(chan struct{})
|
|
stopFunc, err := eventBroadcaster.StartEventWatcher(func(obj runtime.Object) {
|
|
e, _ := obj.(*eventsv1.Event)
|
|
if e.Reason != item.eventReason {
|
|
t.Errorf("got event %v, want %v", e.Reason, item.eventReason)
|
|
}
|
|
close(called)
|
|
})
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
sched.ScheduleOne(ctx)
|
|
<-called
|
|
|
|
if diff := cmp.Diff(item.expectAssumedPod, gotAssumedPod); diff != "" {
|
|
t.Errorf("Unexpected assumed pod (-want,+got):\n%s", diff)
|
|
}
|
|
if diff := cmp.Diff(item.expectErrorPod, gotPod); diff != "" {
|
|
t.Errorf("Unexpected error pod (-want,+got):\n%s", diff)
|
|
}
|
|
if item.expectError == nil || gotError == nil {
|
|
if !errors.Is(gotError, item.expectError) {
|
|
t.Errorf("Unexpected error. Wanted %v, got %v", item.expectError, gotError)
|
|
}
|
|
} else if item.expectError.Error() != gotError.Error() {
|
|
t.Errorf("Unexpected error. Wanted %v, got %v", item.expectError.Error(), gotError.Error())
|
|
}
|
|
if diff := cmp.Diff(item.expectBind, gotBinding); diff != "" {
|
|
t.Errorf("Unexpected binding (-want,+got):\n%s", diff)
|
|
}
|
|
if item.expectError != nil && gotCallsToFailureHandler != 1 {
|
|
t.Errorf("expected 1 call to FailureHandlerFn, got %v", gotCallsToFailureHandler)
|
|
}
|
|
if item.expectError == nil && gotCallsToFailureHandler != 0 {
|
|
t.Errorf("expected 0 calls to FailureHandlerFn, got %v", gotCallsToFailureHandler)
|
|
}
|
|
if (item.expectPodIsInFlightAtFailureHandler && qHintEnabled) != gotPodIsInFlightAtFailureHandler {
|
|
t.Errorf("unexpected pod being in flight in FailureHandlerFn, expected %v but got %v.",
|
|
item.expectPodIsInFlightAtFailureHandler, gotPodIsInFlightAtFailureHandler)
|
|
}
|
|
if (item.expectPodIsInFlightAtWaitOnPermit && qHintEnabled) != gotPodIsInFlightAtWaitOnPermit {
|
|
t.Errorf("unexpected pod being in flight at start of WaitOnPermit, expected %v but got %v",
|
|
item.expectPodIsInFlightAtWaitOnPermit, gotPodIsInFlightAtWaitOnPermit)
|
|
}
|
|
if gotPodIsInFlightAtRunPreBindPlugins {
|
|
t.Errorf("unexpected pod being in flight at start of RunPreBindPlugins")
|
|
}
|
|
// We have to use wait here
|
|
// because the Pod goes to the binding cycle in some test cases and the inflight pods might not be empty immediately at this point in such case.
|
|
if err := wait.PollUntilContextTimeout(ctx, time.Millisecond*200, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) {
|
|
return len(queue.InFlightPods()) == 0, nil
|
|
}); err != nil {
|
|
t.Errorf("in-flight pods should be always empty after SchedulingOne. It has %v Pods", len(queue.InFlightPods()))
|
|
}
|
|
stopFunc()
|
|
})
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Fake Framework allows mocking calls to WaitOnPermit, RunPreBindPlugins and RunPostBindPlugins, to allow for
|
|
// simpler and more efficient testing of Scheduler's logic within the bindingCycle.
|
|
type FakeFramework struct {
|
|
framework.Framework
|
|
queue internalqueue.SchedulingQueue
|
|
waitOnPermitFn func(context.Context, *v1.Pod) *fwk.Status
|
|
runPreBindPluginsFn func(context.Context, fwk.CycleState, *v1.Pod, string) *fwk.Status
|
|
}
|
|
|
|
func NewFakeFramework(ctx context.Context, schedQueue internalqueue.SchedulingQueue, fns []tf.RegisterPluginFunc,
|
|
profileName string, opts ...frameworkruntime.Option) (*FakeFramework, error) {
|
|
fwk, err := tf.NewFramework(ctx, fns, profileName, opts...)
|
|
return &FakeFramework{
|
|
Framework: fwk,
|
|
queue: schedQueue},
|
|
err
|
|
}
|
|
|
|
func (ff *FakeFramework) WaitOnPermit(ctx context.Context, pod *v1.Pod) *fwk.Status {
|
|
return ff.waitOnPermitFn(ctx, pod)
|
|
}
|
|
|
|
func (ff *FakeFramework) RunPreBindPlugins(ctx context.Context, state fwk.CycleState, pod *v1.Pod, nodeName string) *fwk.Status {
|
|
return ff.runPreBindPluginsFn(ctx, state, pod, nodeName)
|
|
}
|
|
|
|
func podListContainsPod(list []*v1.Pod, pod *v1.Pod) bool {
|
|
for _, p := range list {
|
|
if p.UID == pod.UID {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func TestSchedulerNoPhantomPodAfterExpire(t *testing.T) {
|
|
for _, asyncAPICallsEnabled := range []bool{true, false} {
|
|
t.Run(fmt.Sprintf("Async API calls enabled: %v", asyncAPICallsEnabled), func(t *testing.T) {
|
|
logger, ctx := ktesting.NewTestContext(t)
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc)
|
|
client := clientsetfake.NewClientset()
|
|
bindingChan := interruptOnBind(client)
|
|
|
|
var apiDispatcher *apidispatcher.APIDispatcher
|
|
if asyncAPICallsEnabled {
|
|
apiDispatcher = apidispatcher.New(client, 16, apicalls.Relevances)
|
|
apiDispatcher.Run(logger)
|
|
defer apiDispatcher.Close()
|
|
}
|
|
|
|
scache := internalcache.New(ctx, 100*time.Millisecond, apiDispatcher)
|
|
pod := podWithPort("pod.Name", "", 8080)
|
|
node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node1", UID: types.UID("node1")}}
|
|
scache.AddNode(logger, &node)
|
|
|
|
fns := []tf.RegisterPluginFunc{
|
|
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
|
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
|
tf.RegisterPluginAsExtensions(nodeports.Name, frameworkruntime.FactoryAdapter(feature.Features{}, nodeports.New), "Filter", "PreFilter"),
|
|
}
|
|
scheduler, errChan := setupTestSchedulerWithOnePodOnNode(ctx, t, client, queuedPodStore, scache, apiDispatcher, pod, &node, bindingChan, fns...)
|
|
|
|
waitPodExpireChan := make(chan struct{})
|
|
timeout := make(chan struct{})
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-timeout:
|
|
return
|
|
default:
|
|
}
|
|
pods, err := scache.PodCount()
|
|
if err != nil {
|
|
errChan <- fmt.Errorf("cache.List failed: %w", err)
|
|
return
|
|
}
|
|
if pods == 0 {
|
|
close(waitPodExpireChan)
|
|
return
|
|
}
|
|
time.Sleep(100 * time.Millisecond)
|
|
}
|
|
}()
|
|
// waiting for the assumed pod to expire
|
|
select {
|
|
case err := <-errChan:
|
|
t.Fatal(err)
|
|
case <-waitPodExpireChan:
|
|
case <-time.After(wait.ForeverTestTimeout):
|
|
close(timeout)
|
|
t.Fatalf("timeout timeout in waiting pod expire after %v", wait.ForeverTestTimeout)
|
|
}
|
|
|
|
// We use conflicted pod ports to incur fit predicate failure if first pod not removed.
|
|
secondPod := podWithPort("bar", "", 8080)
|
|
if err := queuedPodStore.Add(secondPod); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
scheduler.ScheduleOne(ctx)
|
|
select {
|
|
case b := <-bindingChan:
|
|
expectBinding := &v1.Binding{
|
|
ObjectMeta: metav1.ObjectMeta{Name: "bar", UID: types.UID("bar")},
|
|
Target: v1.ObjectReference{Kind: "Node", Name: node.Name},
|
|
}
|
|
if diff := cmp.Diff(expectBinding, b); diff != "" {
|
|
t.Errorf("Unexpected binding (-want,+got):\n%s", diff)
|
|
}
|
|
case <-time.After(wait.ForeverTestTimeout):
|
|
t.Fatalf("timeout in binding after %v", wait.ForeverTestTimeout)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) {
|
|
for _, asyncAPICallsEnabled := range []bool{true, false} {
|
|
t.Run(fmt.Sprintf("Async API calls enabled: %v", asyncAPICallsEnabled), func(t *testing.T) {
|
|
logger, ctx := ktesting.NewTestContext(t)
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc)
|
|
client := clientsetfake.NewClientset()
|
|
bindingChan := interruptOnBind(client)
|
|
|
|
var apiDispatcher *apidispatcher.APIDispatcher
|
|
if asyncAPICallsEnabled {
|
|
apiDispatcher = apidispatcher.New(client, 16, apicalls.Relevances)
|
|
apiDispatcher.Run(logger)
|
|
defer apiDispatcher.Close()
|
|
}
|
|
|
|
scache := internalcache.New(ctx, 10*time.Minute, apiDispatcher)
|
|
firstPod := podWithPort("pod.Name", "", 8080)
|
|
node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node1", UID: types.UID("node1")}}
|
|
scache.AddNode(logger, &node)
|
|
fns := []tf.RegisterPluginFunc{
|
|
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
|
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
|
tf.RegisterPluginAsExtensions(nodeports.Name, frameworkruntime.FactoryAdapter(feature.Features{}, nodeports.New), "Filter", "PreFilter"),
|
|
}
|
|
scheduler, errChan := setupTestSchedulerWithOnePodOnNode(ctx, t, client, queuedPodStore, scache, apiDispatcher, firstPod, &node, bindingChan, fns...)
|
|
|
|
// We use conflicted pod ports to incur fit predicate failure.
|
|
secondPod := podWithPort("bar", "", 8080)
|
|
if err := queuedPodStore.Add(secondPod); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
// queuedPodStore: [bar:8080]
|
|
// cache: [(assumed)foo:8080]
|
|
|
|
scheduler.ScheduleOne(ctx)
|
|
select {
|
|
case err := <-errChan:
|
|
expectErr := &framework.FitError{
|
|
Pod: secondPod,
|
|
NumAllNodes: 1,
|
|
Diagnosis: framework.Diagnosis{
|
|
NodeToStatus: framework.NewNodeToStatus(map[string]*fwk.Status{
|
|
node.Name: fwk.NewStatus(fwk.Unschedulable, nodeports.ErrReason).WithPlugin(nodeports.Name),
|
|
}, fwk.NewStatus(fwk.UnschedulableAndUnresolvable)),
|
|
UnschedulablePlugins: sets.New(nodeports.Name),
|
|
},
|
|
}
|
|
if err == nil {
|
|
t.Errorf("expected error %v, got nil", expectErr)
|
|
} else if diff := cmp.Diff(expectErr, err, schedulerCmpOpts...); diff != "" {
|
|
t.Errorf("unexpected error (-want,+got):\n%s", diff)
|
|
}
|
|
case <-time.After(wait.ForeverTestTimeout):
|
|
t.Fatalf("timeout in fitting after %v", wait.ForeverTestTimeout)
|
|
}
|
|
|
|
// We mimic the workflow of cache behavior when a pod is removed by user.
|
|
// Note: if the schedulernodeinfo timeout would be super short, the first pod would expire
|
|
// and would be removed itself (without any explicit actions on schedulernodeinfo). Even in that case,
|
|
// explicitly AddPod will as well correct the behavior.
|
|
firstPod.Spec.NodeName = node.Name
|
|
if err := scache.AddPod(logger, firstPod); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if err := scache.RemovePod(logger, firstPod); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
if err := queuedPodStore.Add(secondPod); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
scheduler.ScheduleOne(ctx)
|
|
select {
|
|
case b := <-bindingChan:
|
|
expectBinding := &v1.Binding{
|
|
ObjectMeta: metav1.ObjectMeta{Name: "bar", UID: types.UID("bar")},
|
|
Target: v1.ObjectReference{Kind: "Node", Name: node.Name},
|
|
}
|
|
if diff := cmp.Diff(expectBinding, b); diff != "" {
|
|
t.Errorf("unexpected binding (-want,+got):\n%s", diff)
|
|
}
|
|
case <-time.After(wait.ForeverTestTimeout):
|
|
t.Fatalf("timeout in binding after %v", wait.ForeverTestTimeout)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestSchedulerFailedSchedulingReasons(t *testing.T) {
|
|
for _, asyncAPICallsEnabled := range []bool{true, false} {
|
|
t.Run(fmt.Sprintf("Async API calls enabled: %v", asyncAPICallsEnabled), func(t *testing.T) {
|
|
logger, ctx := ktesting.NewTestContext(t)
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc)
|
|
client := clientsetfake.NewClientset()
|
|
|
|
var apiDispatcher *apidispatcher.APIDispatcher
|
|
if asyncAPICallsEnabled {
|
|
apiDispatcher = apidispatcher.New(client, 16, apicalls.Relevances)
|
|
apiDispatcher.Run(logger)
|
|
defer apiDispatcher.Close()
|
|
}
|
|
|
|
scache := internalcache.New(ctx, 10*time.Minute, apiDispatcher)
|
|
|
|
// Design the baseline for the pods, and we will make nodes that don't fit it later.
|
|
var cpu = int64(4)
|
|
var mem = int64(500)
|
|
podWithTooBigResourceRequests := podWithResources("bar", "", v1.ResourceList{
|
|
v1.ResourceCPU: *(resource.NewQuantity(cpu, resource.DecimalSI)),
|
|
v1.ResourceMemory: *(resource.NewQuantity(mem, resource.DecimalSI)),
|
|
}, v1.ResourceList{
|
|
v1.ResourceCPU: *(resource.NewQuantity(cpu, resource.DecimalSI)),
|
|
v1.ResourceMemory: *(resource.NewQuantity(mem, resource.DecimalSI)),
|
|
})
|
|
|
|
// create several nodes which cannot schedule the above pod
|
|
var nodes []*v1.Node
|
|
var objects []runtime.Object
|
|
for i := 0; i < 100; i++ {
|
|
uid := fmt.Sprintf("node%v", i)
|
|
node := v1.Node{
|
|
ObjectMeta: metav1.ObjectMeta{Name: uid, UID: types.UID(uid)},
|
|
Status: v1.NodeStatus{
|
|
Capacity: v1.ResourceList{
|
|
v1.ResourceCPU: *(resource.NewQuantity(cpu/2, resource.DecimalSI)),
|
|
v1.ResourceMemory: *(resource.NewQuantity(mem/5, resource.DecimalSI)),
|
|
v1.ResourcePods: *(resource.NewQuantity(10, resource.DecimalSI)),
|
|
},
|
|
Allocatable: v1.ResourceList{
|
|
v1.ResourceCPU: *(resource.NewQuantity(cpu/2, resource.DecimalSI)),
|
|
v1.ResourceMemory: *(resource.NewQuantity(mem/5, resource.DecimalSI)),
|
|
v1.ResourcePods: *(resource.NewQuantity(10, resource.DecimalSI)),
|
|
}},
|
|
}
|
|
scache.AddNode(logger, &node)
|
|
nodes = append(nodes, &node)
|
|
objects = append(objects, &node)
|
|
}
|
|
|
|
// Create expected failure reasons for all the nodes. Hopefully they will get rolled up into a non-spammy summary.
|
|
failedNodeStatues := framework.NewDefaultNodeToStatus()
|
|
for _, node := range nodes {
|
|
failedNodeStatues.Set(node.Name, fwk.NewStatus(
|
|
fwk.UnschedulableAndUnresolvable,
|
|
fmt.Sprintf("Insufficient %v", v1.ResourceCPU),
|
|
fmt.Sprintf("Insufficient %v", v1.ResourceMemory),
|
|
).WithPlugin(noderesources.Name))
|
|
}
|
|
fns := []tf.RegisterPluginFunc{
|
|
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
|
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
|
tf.RegisterPluginAsExtensions(noderesources.Name, frameworkruntime.FactoryAdapter(feature.Features{}, noderesources.NewFit), "Filter", "PreFilter"),
|
|
}
|
|
|
|
informerFactory := informers.NewSharedInformerFactory(clientsetfake.NewClientset(objects...), 0)
|
|
scheduler, errChan := setupTestScheduler(ctx, t, client, queuedPodStore, scache, apiDispatcher, informerFactory, nil, fns...)
|
|
|
|
if err := queuedPodStore.Add(podWithTooBigResourceRequests); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
scheduler.ScheduleOne(ctx)
|
|
select {
|
|
case err := <-errChan:
|
|
expectErr := &framework.FitError{
|
|
Pod: podWithTooBigResourceRequests,
|
|
NumAllNodes: len(nodes),
|
|
Diagnosis: framework.Diagnosis{
|
|
NodeToStatus: failedNodeStatues,
|
|
UnschedulablePlugins: sets.New(noderesources.Name),
|
|
},
|
|
}
|
|
if len(fmt.Sprint(expectErr)) > 150 {
|
|
t.Errorf("message is too spammy ! %v ", len(fmt.Sprint(expectErr)))
|
|
}
|
|
if diff := cmp.Diff(expectErr, err, schedulerCmpOpts...); diff != "" {
|
|
t.Errorf("Unexpected error (-want,+got):\n%s", diff)
|
|
}
|
|
case <-time.After(wait.ForeverTestTimeout):
|
|
t.Fatalf("timeout after %v", wait.ForeverTestTimeout)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestSchedulerWithVolumeBinding(t *testing.T) {
|
|
findErr := fmt.Errorf("find err")
|
|
assumeErr := fmt.Errorf("assume err")
|
|
bindErr := fmt.Errorf("bind err")
|
|
|
|
// This can be small because we wait for pod to finish scheduling first
|
|
chanTimeout := 2 * time.Second
|
|
|
|
table := []struct {
|
|
name string
|
|
expectError error
|
|
expectPodBind *v1.Binding
|
|
expectAssumeCalled bool
|
|
expectBindCalled bool
|
|
eventReason string
|
|
volumeBinderConfig *volumebinding.FakeVolumeBinderConfig
|
|
}{
|
|
{
|
|
name: "all bound",
|
|
volumeBinderConfig: &volumebinding.FakeVolumeBinderConfig{
|
|
AllBound: true,
|
|
},
|
|
expectAssumeCalled: true,
|
|
expectPodBind: &v1.Binding{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "foo-ns", UID: types.UID("foo")}, Target: v1.ObjectReference{Kind: "Node", Name: "node1"}},
|
|
eventReason: "Scheduled",
|
|
},
|
|
{
|
|
name: "bound/invalid pv affinity",
|
|
volumeBinderConfig: &volumebinding.FakeVolumeBinderConfig{
|
|
AllBound: true,
|
|
FindReasons: volumebinding.ConflictReasons{volumebinding.ErrReasonNodeConflict},
|
|
},
|
|
eventReason: "FailedScheduling",
|
|
expectError: makePredicateError("1 node(s) didn't match PersistentVolume's node affinity"),
|
|
},
|
|
{
|
|
name: "unbound/no matches",
|
|
volumeBinderConfig: &volumebinding.FakeVolumeBinderConfig{
|
|
FindReasons: volumebinding.ConflictReasons{volumebinding.ErrReasonBindConflict},
|
|
},
|
|
eventReason: "FailedScheduling",
|
|
expectError: makePredicateError("1 node(s) didn't find available persistent volumes to bind"),
|
|
},
|
|
{
|
|
name: "bound and unbound unsatisfied",
|
|
volumeBinderConfig: &volumebinding.FakeVolumeBinderConfig{
|
|
FindReasons: volumebinding.ConflictReasons{volumebinding.ErrReasonBindConflict, volumebinding.ErrReasonNodeConflict},
|
|
},
|
|
eventReason: "FailedScheduling",
|
|
expectError: makePredicateError("1 node(s) didn't find available persistent volumes to bind, 1 node(s) didn't match PersistentVolume's node affinity"),
|
|
},
|
|
{
|
|
name: "unbound/found matches/bind succeeds",
|
|
volumeBinderConfig: &volumebinding.FakeVolumeBinderConfig{},
|
|
expectAssumeCalled: true,
|
|
expectBindCalled: true,
|
|
expectPodBind: &v1.Binding{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "foo-ns", UID: types.UID("foo")}, Target: v1.ObjectReference{Kind: "Node", Name: "node1"}},
|
|
eventReason: "Scheduled",
|
|
},
|
|
{
|
|
name: "predicate error",
|
|
volumeBinderConfig: &volumebinding.FakeVolumeBinderConfig{
|
|
FindErr: findErr,
|
|
},
|
|
eventReason: "FailedScheduling",
|
|
expectError: fmt.Errorf("running %q filter plugin: %v", volumebinding.Name, findErr),
|
|
},
|
|
{
|
|
name: "assume error",
|
|
volumeBinderConfig: &volumebinding.FakeVolumeBinderConfig{
|
|
AssumeErr: assumeErr,
|
|
},
|
|
expectAssumeCalled: true,
|
|
eventReason: "FailedScheduling",
|
|
expectError: fmt.Errorf("running Reserve plugin %q: %w", volumebinding.Name, assumeErr),
|
|
},
|
|
{
|
|
name: "bind error",
|
|
volumeBinderConfig: &volumebinding.FakeVolumeBinderConfig{
|
|
BindErr: bindErr,
|
|
},
|
|
expectAssumeCalled: true,
|
|
expectBindCalled: true,
|
|
eventReason: "FailedScheduling",
|
|
expectError: fmt.Errorf("running PreBind plugin %q: %w", volumebinding.Name, bindErr),
|
|
},
|
|
}
|
|
|
|
for _, asyncAPICallsEnabled := range []bool{true, false} {
|
|
for _, item := range table {
|
|
t.Run(fmt.Sprintf("%s (Async API calls enabled: %v)", item.name, asyncAPICallsEnabled), func(t *testing.T) {
|
|
_, ctx := ktesting.NewTestContext(t)
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
fakeVolumeBinder := volumebinding.NewFakeVolumeBinder(item.volumeBinderConfig)
|
|
client := clientsetfake.NewClientset()
|
|
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()})
|
|
s, bindingChan, errChan := setupTestSchedulerWithVolumeBinding(ctx, t, client, fakeVolumeBinder, eventBroadcaster, asyncAPICallsEnabled)
|
|
eventChan := make(chan struct{})
|
|
stopFunc, err := eventBroadcaster.StartEventWatcher(func(obj runtime.Object) {
|
|
e, _ := obj.(*eventsv1.Event)
|
|
if e, a := item.eventReason, e.Reason; e != a {
|
|
t.Errorf("expected %v, got %v", e, a)
|
|
}
|
|
close(eventChan)
|
|
})
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
s.ScheduleOne(ctx)
|
|
// Wait for pod to succeed or fail scheduling
|
|
select {
|
|
case <-eventChan:
|
|
case <-time.After(wait.ForeverTestTimeout):
|
|
t.Fatalf("scheduling timeout after %v", wait.ForeverTestTimeout)
|
|
}
|
|
stopFunc()
|
|
// Wait for scheduling to return an error or succeed binding.
|
|
var (
|
|
gotErr error
|
|
gotBind *v1.Binding
|
|
)
|
|
select {
|
|
case gotErr = <-errChan:
|
|
case gotBind = <-bindingChan:
|
|
case <-time.After(chanTimeout):
|
|
t.Fatalf("did not receive pod binding or error after %v", chanTimeout)
|
|
}
|
|
if item.expectError != nil {
|
|
if gotErr == nil || item.expectError.Error() != gotErr.Error() {
|
|
t.Errorf("err \nWANT=%+v,\nGOT=%+v", item.expectError, gotErr)
|
|
}
|
|
} else if gotErr != nil {
|
|
t.Errorf("err \nWANT=%+v,\nGOT=%+v", item.expectError, gotErr)
|
|
}
|
|
if !cmp.Equal(item.expectPodBind, gotBind) {
|
|
t.Errorf("err \nWANT=%+v,\nGOT=%+v", item.expectPodBind, gotBind)
|
|
}
|
|
|
|
if item.expectAssumeCalled != fakeVolumeBinder.AssumeCalled {
|
|
t.Errorf("expectedAssumeCall %v", item.expectAssumeCalled)
|
|
}
|
|
|
|
if item.expectBindCalled != fakeVolumeBinder.BindCalled {
|
|
t.Errorf("expectedBindCall %v", item.expectBindCalled)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestSchedulerBinding(t *testing.T) {
|
|
table := []struct {
|
|
podName string
|
|
extenders []fwk.Extender
|
|
wantBinderID int
|
|
name string
|
|
}{
|
|
{
|
|
name: "the extender is not a binder",
|
|
podName: "pod0",
|
|
extenders: []fwk.Extender{
|
|
&fakeExtender{isBinder: false, interestedPodName: "pod0"},
|
|
},
|
|
wantBinderID: -1, // default binding.
|
|
},
|
|
{
|
|
name: "one of the extenders is a binder and interested in pod",
|
|
podName: "pod0",
|
|
extenders: []fwk.Extender{
|
|
&fakeExtender{isBinder: false, interestedPodName: "pod0"},
|
|
&fakeExtender{isBinder: true, interestedPodName: "pod0"},
|
|
},
|
|
wantBinderID: 1,
|
|
},
|
|
{
|
|
name: "one of the extenders is a binder, but not interested in pod",
|
|
podName: "pod1",
|
|
extenders: []fwk.Extender{
|
|
&fakeExtender{isBinder: false, interestedPodName: "pod1"},
|
|
&fakeExtender{isBinder: true, interestedPodName: "pod0"},
|
|
},
|
|
wantBinderID: -1, // default binding.
|
|
},
|
|
{
|
|
name: "ignore when extender bind failed",
|
|
podName: "pod1",
|
|
extenders: []fwk.Extender{
|
|
&fakeExtender{isBinder: true, errBind: true, interestedPodName: "pod1", ignorable: true},
|
|
},
|
|
wantBinderID: -1, // default binding.
|
|
},
|
|
}
|
|
|
|
for _, asyncAPICallsEnabled := range []bool{true, false} {
|
|
for _, test := range table {
|
|
t.Run(fmt.Sprintf("%s (Async API calls enabled: %v)", test.name, asyncAPICallsEnabled), func(t *testing.T) {
|
|
pod := st.MakePod().Name(test.podName).Obj()
|
|
defaultBound := false
|
|
state := framework.NewCycleState()
|
|
client := clientsetfake.NewClientset(pod)
|
|
client.PrependReactor("create", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) {
|
|
if action.GetSubresource() == "binding" {
|
|
defaultBound = true
|
|
}
|
|
return false, nil, nil
|
|
})
|
|
logger, ctx := ktesting.NewTestContext(t)
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
|
|
var apiDispatcher *apidispatcher.APIDispatcher
|
|
if asyncAPICallsEnabled {
|
|
apiDispatcher = apidispatcher.New(client, 16, apicalls.Relevances)
|
|
apiDispatcher.Run(logger)
|
|
defer apiDispatcher.Close()
|
|
}
|
|
|
|
fwk, err := tf.NewFramework(ctx,
|
|
[]tf.RegisterPluginFunc{
|
|
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
|
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
|
}, "", frameworkruntime.WithClientSet(client), frameworkruntime.WithAPIDispatcher(apiDispatcher), frameworkruntime.WithEventRecorder(&events.FakeRecorder{}))
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
cache := internalcache.New(ctx, 100*time.Millisecond, apiDispatcher)
|
|
if asyncAPICallsEnabled {
|
|
informerFactory := informers.NewSharedInformerFactory(client, 0)
|
|
ar := metrics.NewMetricsAsyncRecorder(10, 1*time.Second, ctx.Done())
|
|
queue := internalqueue.NewSchedulingQueue(nil, informerFactory, internalqueue.WithMetricsRecorder(ar), internalqueue.WithAPIDispatcher(apiDispatcher))
|
|
fwk.SetAPICacher(apicache.New(queue, cache))
|
|
}
|
|
|
|
sched := &Scheduler{
|
|
Extenders: test.extenders,
|
|
Cache: cache,
|
|
nodeInfoSnapshot: nil,
|
|
percentageOfNodesToScore: 0,
|
|
APIDispatcher: apiDispatcher,
|
|
}
|
|
status := sched.bind(ctx, fwk, pod, "node", state)
|
|
if !status.IsSuccess() {
|
|
t.Error(status.AsError())
|
|
}
|
|
|
|
// Checking default binding.
|
|
if wantBound := test.wantBinderID == -1; defaultBound != wantBound {
|
|
t.Errorf("got bound with default binding: %v, want %v", defaultBound, wantBound)
|
|
}
|
|
|
|
// Checking extenders binding.
|
|
for i, ext := range test.extenders {
|
|
wantBound := i == test.wantBinderID
|
|
if gotBound := ext.(*fakeExtender).gotBind; gotBound != wantBound {
|
|
t.Errorf("got bound with extender #%d: %v, want %v", i, gotBound, wantBound)
|
|
}
|
|
}
|
|
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestUpdatePod(t *testing.T) {
|
|
tests := []struct {
|
|
name string
|
|
currentPodConditions []v1.PodCondition
|
|
newPodCondition *v1.PodCondition
|
|
currentNominatedNodeName string
|
|
newNominatingInfo *fwk.NominatingInfo
|
|
expectPatchRequest bool
|
|
expectedPatchDataPattern string
|
|
}{
|
|
{
|
|
name: "Should make patch request to add pod condition when there are none currently",
|
|
currentPodConditions: []v1.PodCondition{},
|
|
newPodCondition: &v1.PodCondition{
|
|
Type: "newType",
|
|
Status: "newStatus",
|
|
LastProbeTime: metav1.NewTime(time.Date(2020, 5, 13, 1, 1, 1, 1, time.UTC)),
|
|
LastTransitionTime: metav1.NewTime(time.Date(2020, 5, 12, 1, 1, 1, 1, time.UTC)),
|
|
Reason: "newReason",
|
|
Message: "newMessage",
|
|
},
|
|
expectPatchRequest: true,
|
|
expectedPatchDataPattern: `{"status":{"conditions":\[{"lastProbeTime":"2020-05-13T01:01:01Z","lastTransitionTime":".*","message":"newMessage","reason":"newReason","status":"newStatus","type":"newType"}]}}`,
|
|
},
|
|
{
|
|
name: "Should make patch request to add a new pod condition when there is already one with another type",
|
|
currentPodConditions: []v1.PodCondition{
|
|
{
|
|
Type: "someOtherType",
|
|
Status: "someOtherTypeStatus",
|
|
LastProbeTime: metav1.NewTime(time.Date(2020, 5, 11, 0, 0, 0, 0, time.UTC)),
|
|
LastTransitionTime: metav1.NewTime(time.Date(2020, 5, 10, 0, 0, 0, 0, time.UTC)),
|
|
Reason: "someOtherTypeReason",
|
|
Message: "someOtherTypeMessage",
|
|
},
|
|
},
|
|
newPodCondition: &v1.PodCondition{
|
|
Type: "newType",
|
|
Status: "newStatus",
|
|
LastProbeTime: metav1.NewTime(time.Date(2020, 5, 13, 1, 1, 1, 1, time.UTC)),
|
|
LastTransitionTime: metav1.NewTime(time.Date(2020, 5, 12, 1, 1, 1, 1, time.UTC)),
|
|
Reason: "newReason",
|
|
Message: "newMessage",
|
|
},
|
|
expectPatchRequest: true,
|
|
expectedPatchDataPattern: `{"status":{"\$setElementOrder/conditions":\[{"type":"someOtherType"},{"type":"newType"}],"conditions":\[{"lastProbeTime":"2020-05-13T01:01:01Z","lastTransitionTime":".*","message":"newMessage","reason":"newReason","status":"newStatus","type":"newType"}]}}`,
|
|
},
|
|
{
|
|
name: "Should make patch request to update an existing pod condition",
|
|
currentPodConditions: []v1.PodCondition{
|
|
{
|
|
Type: "currentType",
|
|
Status: "currentStatus",
|
|
LastProbeTime: metav1.NewTime(time.Date(2020, 5, 13, 0, 0, 0, 0, time.UTC)),
|
|
LastTransitionTime: metav1.NewTime(time.Date(2020, 5, 12, 0, 0, 0, 0, time.UTC)),
|
|
Reason: "currentReason",
|
|
Message: "currentMessage",
|
|
},
|
|
},
|
|
newPodCondition: &v1.PodCondition{
|
|
Type: "currentType",
|
|
Status: "newStatus",
|
|
LastProbeTime: metav1.NewTime(time.Date(2020, 5, 13, 1, 1, 1, 1, time.UTC)),
|
|
LastTransitionTime: metav1.NewTime(time.Date(2020, 5, 12, 1, 1, 1, 1, time.UTC)),
|
|
Reason: "newReason",
|
|
Message: "newMessage",
|
|
},
|
|
expectPatchRequest: true,
|
|
expectedPatchDataPattern: `{"status":{"\$setElementOrder/conditions":\[{"type":"currentType"}],"conditions":\[{"lastProbeTime":"2020-05-13T01:01:01Z","lastTransitionTime":".*","message":"newMessage","reason":"newReason","status":"newStatus","type":"currentType"}]}}`,
|
|
},
|
|
{
|
|
name: "Should make patch request to update an existing pod condition, but the transition time should remain unchanged because the status is the same",
|
|
currentPodConditions: []v1.PodCondition{
|
|
{
|
|
Type: "currentType",
|
|
Status: "currentStatus",
|
|
LastProbeTime: metav1.NewTime(time.Date(2020, 5, 13, 0, 0, 0, 0, time.UTC)),
|
|
LastTransitionTime: metav1.NewTime(time.Date(2020, 5, 12, 0, 0, 0, 0, time.UTC)),
|
|
Reason: "currentReason",
|
|
Message: "currentMessage",
|
|
},
|
|
},
|
|
newPodCondition: &v1.PodCondition{
|
|
Type: "currentType",
|
|
Status: "currentStatus",
|
|
LastProbeTime: metav1.NewTime(time.Date(2020, 5, 13, 1, 1, 1, 1, time.UTC)),
|
|
LastTransitionTime: metav1.NewTime(time.Date(2020, 5, 12, 0, 0, 0, 0, time.UTC)),
|
|
Reason: "newReason",
|
|
Message: "newMessage",
|
|
},
|
|
expectPatchRequest: true,
|
|
expectedPatchDataPattern: `{"status":{"\$setElementOrder/conditions":\[{"type":"currentType"}],"conditions":\[{"lastProbeTime":"2020-05-13T01:01:01Z","message":"newMessage","reason":"newReason","type":"currentType"}]}}`,
|
|
},
|
|
{
|
|
name: "Should not make patch request if pod condition already exists and is identical and nominated node name is not set",
|
|
currentPodConditions: []v1.PodCondition{
|
|
{
|
|
Type: "currentType",
|
|
Status: "currentStatus",
|
|
LastProbeTime: metav1.NewTime(time.Date(2020, 5, 13, 0, 0, 0, 0, time.UTC)),
|
|
LastTransitionTime: metav1.NewTime(time.Date(2020, 5, 12, 0, 0, 0, 0, time.UTC)),
|
|
Reason: "currentReason",
|
|
Message: "currentMessage",
|
|
},
|
|
},
|
|
newPodCondition: &v1.PodCondition{
|
|
Type: "currentType",
|
|
Status: "currentStatus",
|
|
LastProbeTime: metav1.NewTime(time.Date(2020, 5, 13, 0, 0, 0, 0, time.UTC)),
|
|
LastTransitionTime: metav1.NewTime(time.Date(2020, 5, 12, 0, 0, 0, 0, time.UTC)),
|
|
Reason: "currentReason",
|
|
Message: "currentMessage",
|
|
},
|
|
currentNominatedNodeName: "node1",
|
|
expectPatchRequest: false,
|
|
},
|
|
{
|
|
name: "Should make patch request if pod condition already exists and is identical but nominated node name is set and different",
|
|
currentPodConditions: []v1.PodCondition{
|
|
{
|
|
Type: "currentType",
|
|
Status: "currentStatus",
|
|
LastProbeTime: metav1.NewTime(time.Date(2020, 5, 13, 0, 0, 0, 0, time.UTC)),
|
|
LastTransitionTime: metav1.NewTime(time.Date(2020, 5, 12, 0, 0, 0, 0, time.UTC)),
|
|
Reason: "currentReason",
|
|
Message: "currentMessage",
|
|
},
|
|
},
|
|
newPodCondition: &v1.PodCondition{
|
|
Type: "currentType",
|
|
Status: "currentStatus",
|
|
LastProbeTime: metav1.NewTime(time.Date(2020, 5, 13, 0, 0, 0, 0, time.UTC)),
|
|
LastTransitionTime: metav1.NewTime(time.Date(2020, 5, 12, 0, 0, 0, 0, time.UTC)),
|
|
Reason: "currentReason",
|
|
Message: "currentMessage",
|
|
},
|
|
newNominatingInfo: &fwk.NominatingInfo{NominatingMode: fwk.ModeOverride, NominatedNodeName: "node1"},
|
|
expectPatchRequest: true,
|
|
expectedPatchDataPattern: `{"status":{"nominatedNodeName":"node1"}}`,
|
|
},
|
|
{
|
|
name: "Should not update nominated node name when nominatingInfo is nil",
|
|
currentPodConditions: []v1.PodCondition{
|
|
{
|
|
Type: "currentType",
|
|
Status: "currentStatus",
|
|
LastProbeTime: metav1.NewTime(time.Date(2020, 5, 13, 0, 0, 0, 0, time.UTC)),
|
|
LastTransitionTime: metav1.NewTime(time.Date(2020, 5, 12, 0, 0, 0, 0, time.UTC)),
|
|
Reason: "currentReason",
|
|
Message: "currentMessage",
|
|
},
|
|
},
|
|
newPodCondition: &v1.PodCondition{
|
|
Type: "currentType",
|
|
Status: "newStatus",
|
|
LastProbeTime: metav1.NewTime(time.Date(2020, 5, 13, 1, 1, 1, 1, time.UTC)),
|
|
LastTransitionTime: metav1.NewTime(time.Date(2020, 5, 12, 1, 1, 1, 1, time.UTC)),
|
|
Reason: "newReason",
|
|
Message: "newMessage",
|
|
},
|
|
currentNominatedNodeName: "existing-node",
|
|
newNominatingInfo: nil,
|
|
expectPatchRequest: true,
|
|
expectedPatchDataPattern: `{"status":{"\$setElementOrder/conditions":\[{"type":"currentType"}],"conditions":\[{"lastProbeTime":"2020-05-13T01:01:01Z","lastTransitionTime":".*","message":"newMessage","reason":"newReason","status":"newStatus","type":"currentType"}]}}`,
|
|
},
|
|
{
|
|
name: "Should not make patch request when nominatingInfo is nil and pod condition is unchanged",
|
|
currentPodConditions: []v1.PodCondition{
|
|
{
|
|
Type: "currentType",
|
|
Status: "currentStatus",
|
|
LastProbeTime: metav1.NewTime(time.Date(2020, 5, 13, 0, 0, 0, 0, time.UTC)),
|
|
LastTransitionTime: metav1.NewTime(time.Date(2020, 5, 12, 0, 0, 0, 0, time.UTC)),
|
|
Reason: "currentReason",
|
|
Message: "currentMessage",
|
|
},
|
|
},
|
|
newPodCondition: &v1.PodCondition{
|
|
Type: "currentType",
|
|
Status: "currentStatus",
|
|
LastProbeTime: metav1.NewTime(time.Date(2020, 5, 13, 0, 0, 0, 0, time.UTC)),
|
|
LastTransitionTime: metav1.NewTime(time.Date(2020, 5, 12, 0, 0, 0, 0, time.UTC)),
|
|
Reason: "currentReason",
|
|
Message: "currentMessage",
|
|
},
|
|
currentNominatedNodeName: "existing-node",
|
|
newNominatingInfo: nil,
|
|
expectPatchRequest: false,
|
|
},
|
|
}
|
|
for _, asyncAPICallsEnabled := range []bool{true, false} {
|
|
for _, test := range tests {
|
|
t.Run(fmt.Sprintf("%s (Async API calls enabled: %v)", test.name, asyncAPICallsEnabled), func(t *testing.T) {
|
|
actualPatchRequests := 0
|
|
var actualPatchData string
|
|
cs := &clientsetfake.Clientset{}
|
|
patchCalled := make(chan struct{}, 1)
|
|
cs.AddReactor("patch", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) {
|
|
actualPatchRequests++
|
|
patch := action.(clienttesting.PatchAction)
|
|
actualPatchData = string(patch.GetPatch())
|
|
patchCalled <- struct{}{}
|
|
// For this test, we don't care about the result of the patched pod, just that we got the expected
|
|
// patch request, so just returning &v1.Pod{} here is OK because scheduler doesn't use the response.
|
|
return true, &v1.Pod{}, nil
|
|
})
|
|
|
|
pod := st.MakePod().Name("foo").NominatedNodeName(test.currentNominatedNodeName).Conditions(test.currentPodConditions).Obj()
|
|
|
|
logger, ctx := ktesting.NewTestContext(t)
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
|
|
var apiCacher fwk.APICacher
|
|
if asyncAPICallsEnabled {
|
|
apiDispatcher := apidispatcher.New(cs, 16, apicalls.Relevances)
|
|
apiDispatcher.Run(logger)
|
|
defer apiDispatcher.Close()
|
|
|
|
informerFactory := informers.NewSharedInformerFactory(cs, 0)
|
|
ar := metrics.NewMetricsAsyncRecorder(10, 1*time.Second, ctx.Done())
|
|
queue := internalqueue.NewSchedulingQueue(nil, informerFactory, internalqueue.WithMetricsRecorder(ar), internalqueue.WithAPIDispatcher(apiDispatcher))
|
|
apiCacher = apicache.New(queue, nil)
|
|
}
|
|
|
|
if err := updatePod(ctx, cs, apiCacher, pod, test.newPodCondition, test.newNominatingInfo); err != nil {
|
|
t.Fatalf("Error calling update: %v", err)
|
|
}
|
|
|
|
if test.expectPatchRequest {
|
|
select {
|
|
case <-patchCalled:
|
|
case <-time.After(time.Second):
|
|
t.Fatalf("Timed out while waiting for patch to be called")
|
|
}
|
|
regex, err := regexp.Compile(test.expectedPatchDataPattern)
|
|
if err != nil {
|
|
t.Fatalf("Error compiling regexp for %v: %v", test.expectedPatchDataPattern, err)
|
|
}
|
|
if !regex.MatchString(actualPatchData) {
|
|
t.Fatalf("Patch data mismatch: Actual was %v, but expected to match regexp %v", actualPatchData, test.expectedPatchDataPattern)
|
|
}
|
|
} else {
|
|
select {
|
|
case <-patchCalled:
|
|
t.Fatalf("Expected patch not to be called, actual patch data: %v", actualPatchData)
|
|
case <-time.After(time.Second):
|
|
}
|
|
}
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
func Test_SelectHost(t *testing.T) {
|
|
tests := []struct {
|
|
name string
|
|
list []fwk.NodePluginScores
|
|
topNodesCnt int
|
|
possibleNodes sets.Set[string]
|
|
possibleNodeLists [][]fwk.NodePluginScores
|
|
wantError error
|
|
}{
|
|
{
|
|
name: "unique properly ordered scores",
|
|
list: []fwk.NodePluginScores{
|
|
{Name: "node1", TotalScore: 1},
|
|
{Name: "node2", TotalScore: 2},
|
|
},
|
|
topNodesCnt: 2,
|
|
possibleNodes: sets.New("node2"),
|
|
possibleNodeLists: [][]fwk.NodePluginScores{
|
|
{
|
|
{Name: "node2", TotalScore: 2},
|
|
{Name: "node1", TotalScore: 1},
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "numberOfNodeScoresToReturn > len(list)",
|
|
list: []fwk.NodePluginScores{
|
|
{Name: "node1", TotalScore: 1},
|
|
{Name: "node2", TotalScore: 2},
|
|
},
|
|
topNodesCnt: 100,
|
|
possibleNodes: sets.New("node2"),
|
|
possibleNodeLists: [][]fwk.NodePluginScores{
|
|
{
|
|
{Name: "node2", TotalScore: 2},
|
|
{Name: "node1", TotalScore: 1},
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "equal scores",
|
|
list: []fwk.NodePluginScores{
|
|
{Name: "node2.1", TotalScore: 2},
|
|
{Name: "node2.2", TotalScore: 2},
|
|
{Name: "node2.3", TotalScore: 2},
|
|
},
|
|
topNodesCnt: 2,
|
|
possibleNodes: sets.New("node2.1", "node2.2", "node2.3"),
|
|
possibleNodeLists: [][]fwk.NodePluginScores{
|
|
{
|
|
{Name: "node2.1", TotalScore: 2},
|
|
{Name: "node2.2", TotalScore: 2},
|
|
},
|
|
{
|
|
{Name: "node2.1", TotalScore: 2},
|
|
{Name: "node2.3", TotalScore: 2},
|
|
},
|
|
{
|
|
{Name: "node2.2", TotalScore: 2},
|
|
{Name: "node2.1", TotalScore: 2},
|
|
},
|
|
{
|
|
{Name: "node2.2", TotalScore: 2},
|
|
{Name: "node2.3", TotalScore: 2},
|
|
},
|
|
{
|
|
{Name: "node2.3", TotalScore: 2},
|
|
{Name: "node2.1", TotalScore: 2},
|
|
},
|
|
{
|
|
{Name: "node2.3", TotalScore: 2},
|
|
{Name: "node2.2", TotalScore: 2},
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "out of order scores",
|
|
list: []fwk.NodePluginScores{
|
|
{Name: "node3.1", TotalScore: 3},
|
|
{Name: "node2.1", TotalScore: 2},
|
|
{Name: "node1.1", TotalScore: 1},
|
|
{Name: "node3.2", TotalScore: 3},
|
|
},
|
|
topNodesCnt: 3,
|
|
possibleNodes: sets.New("node3.1", "node3.2"),
|
|
possibleNodeLists: [][]fwk.NodePluginScores{
|
|
{
|
|
{Name: "node3.1", TotalScore: 3},
|
|
{Name: "node3.2", TotalScore: 3},
|
|
{Name: "node2.1", TotalScore: 2},
|
|
},
|
|
{
|
|
{Name: "node3.2", TotalScore: 3},
|
|
{Name: "node3.1", TotalScore: 3},
|
|
{Name: "node2.1", TotalScore: 2},
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "empty priority list",
|
|
list: []fwk.NodePluginScores{},
|
|
possibleNodes: sets.Set[string]{},
|
|
wantError: errEmptyPriorityList,
|
|
},
|
|
}
|
|
|
|
for _, test := range tests {
|
|
t.Run(test.name, func(t *testing.T) {
|
|
// increase the randomness
|
|
for i := 0; i < 10; i++ {
|
|
got, scoreList, err := selectHost(test.list, test.topNodesCnt)
|
|
if err != test.wantError {
|
|
t.Fatalf("unexpected error is returned from selectHost: got: %v want: %v", err, test.wantError)
|
|
}
|
|
if test.possibleNodes.Len() == 0 {
|
|
if got != "" {
|
|
t.Fatalf("expected nothing returned as selected Node, but actually %s is returned from selectHost", got)
|
|
}
|
|
return
|
|
}
|
|
if !test.possibleNodes.Has(got) {
|
|
t.Errorf("got %s is not in the possible map %v", got, test.possibleNodes)
|
|
}
|
|
if got != scoreList[0].Name {
|
|
t.Errorf("The head of list should be the selected Node's score: got: %v, expected: %v", scoreList[0], got)
|
|
}
|
|
for _, list := range test.possibleNodeLists {
|
|
if cmp.Equal(list, scoreList) {
|
|
return
|
|
}
|
|
}
|
|
t.Errorf("Unexpected scoreList: %v", scoreList)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestFindNodesThatPassExtenders(t *testing.T) {
|
|
absentStatus := fwk.NewStatus(fwk.UnschedulableAndUnresolvable, "node(s) didn't satisfy plugin(s) [PreFilter]")
|
|
|
|
tests := []struct {
|
|
name string
|
|
extenders []tf.FakeExtender
|
|
nodes []*v1.Node
|
|
filteredNodesStatuses *framework.NodeToStatus
|
|
expectsErr bool
|
|
expectedNodes []*v1.Node
|
|
expectedStatuses *framework.NodeToStatus
|
|
}{
|
|
{
|
|
name: "error",
|
|
extenders: []tf.FakeExtender{
|
|
{
|
|
ExtenderName: "FakeExtender1",
|
|
Predicates: []tf.FitPredicate{tf.ErrorPredicateExtender},
|
|
},
|
|
},
|
|
nodes: makeNodeList([]string{"a"}),
|
|
filteredNodesStatuses: framework.NewNodeToStatus(make(map[string]*fwk.Status), absentStatus),
|
|
expectsErr: true,
|
|
},
|
|
{
|
|
name: "success",
|
|
extenders: []tf.FakeExtender{
|
|
{
|
|
ExtenderName: "FakeExtender1",
|
|
Predicates: []tf.FitPredicate{tf.TruePredicateExtender},
|
|
},
|
|
},
|
|
nodes: makeNodeList([]string{"a"}),
|
|
filteredNodesStatuses: framework.NewNodeToStatus(make(map[string]*fwk.Status), absentStatus),
|
|
expectsErr: false,
|
|
expectedNodes: makeNodeList([]string{"a"}),
|
|
expectedStatuses: framework.NewNodeToStatus(make(map[string]*fwk.Status), absentStatus),
|
|
},
|
|
{
|
|
name: "unschedulable",
|
|
extenders: []tf.FakeExtender{
|
|
{
|
|
ExtenderName: "FakeExtender1",
|
|
Predicates: []tf.FitPredicate{func(pod *v1.Pod, node fwk.NodeInfo) *fwk.Status {
|
|
if node.Node().Name == "a" {
|
|
return fwk.NewStatus(fwk.Success)
|
|
}
|
|
return fwk.NewStatus(fwk.Unschedulable, fmt.Sprintf("node %q is not allowed", node.Node().Name))
|
|
}},
|
|
},
|
|
},
|
|
nodes: makeNodeList([]string{"a", "b"}),
|
|
filteredNodesStatuses: framework.NewNodeToStatus(make(map[string]*fwk.Status), absentStatus),
|
|
expectsErr: false,
|
|
expectedNodes: makeNodeList([]string{"a"}),
|
|
expectedStatuses: framework.NewNodeToStatus(map[string]*fwk.Status{
|
|
"b": fwk.NewStatus(fwk.Unschedulable, fmt.Sprintf("FakeExtender: node %q failed", "b")),
|
|
}, absentStatus),
|
|
},
|
|
{
|
|
name: "unschedulable and unresolvable",
|
|
extenders: []tf.FakeExtender{
|
|
{
|
|
ExtenderName: "FakeExtender1",
|
|
Predicates: []tf.FitPredicate{func(pod *v1.Pod, node fwk.NodeInfo) *fwk.Status {
|
|
if node.Node().Name == "a" {
|
|
return fwk.NewStatus(fwk.Success)
|
|
}
|
|
if node.Node().Name == "b" {
|
|
return fwk.NewStatus(fwk.Unschedulable, fmt.Sprintf("node %q is not allowed", node.Node().Name))
|
|
}
|
|
return fwk.NewStatus(fwk.UnschedulableAndUnresolvable, fmt.Sprintf("node %q is not allowed", node.Node().Name))
|
|
}},
|
|
},
|
|
},
|
|
nodes: makeNodeList([]string{"a", "b", "c"}),
|
|
filteredNodesStatuses: framework.NewNodeToStatus(make(map[string]*fwk.Status), absentStatus),
|
|
expectsErr: false,
|
|
expectedNodes: makeNodeList([]string{"a"}),
|
|
expectedStatuses: framework.NewNodeToStatus(map[string]*fwk.Status{
|
|
"b": fwk.NewStatus(fwk.Unschedulable, fmt.Sprintf("FakeExtender: node %q failed", "b")),
|
|
"c": fwk.NewStatus(fwk.UnschedulableAndUnresolvable, fmt.Sprintf("FakeExtender: node %q failed and unresolvable", "c")),
|
|
}, absentStatus),
|
|
},
|
|
{
|
|
name: "extender does not overwrite the previous statuses",
|
|
extenders: []tf.FakeExtender{
|
|
{
|
|
ExtenderName: "FakeExtender1",
|
|
Predicates: []tf.FitPredicate{func(pod *v1.Pod, node fwk.NodeInfo) *fwk.Status {
|
|
if node.Node().Name == "a" {
|
|
return fwk.NewStatus(fwk.Success)
|
|
}
|
|
if node.Node().Name == "b" {
|
|
return fwk.NewStatus(fwk.Unschedulable, fmt.Sprintf("node %q is not allowed", node.Node().Name))
|
|
}
|
|
return fwk.NewStatus(fwk.UnschedulableAndUnresolvable, fmt.Sprintf("node %q is not allowed", node.Node().Name))
|
|
}},
|
|
},
|
|
},
|
|
nodes: makeNodeList([]string{"a", "b"}),
|
|
filteredNodesStatuses: framework.NewNodeToStatus(map[string]*fwk.Status{
|
|
"c": fwk.NewStatus(fwk.Unschedulable, fmt.Sprintf("FakeFilterPlugin: node %q failed", "c")),
|
|
}, absentStatus),
|
|
expectsErr: false,
|
|
expectedNodes: makeNodeList([]string{"a"}),
|
|
expectedStatuses: framework.NewNodeToStatus(map[string]*fwk.Status{
|
|
"b": fwk.NewStatus(fwk.Unschedulable, fmt.Sprintf("FakeExtender: node %q failed", "b")),
|
|
"c": fwk.NewStatus(fwk.Unschedulable, fmt.Sprintf("FakeFilterPlugin: node %q failed", "c")),
|
|
}, absentStatus),
|
|
},
|
|
{
|
|
name: "multiple extenders",
|
|
extenders: []tf.FakeExtender{
|
|
{
|
|
ExtenderName: "FakeExtender1",
|
|
Predicates: []tf.FitPredicate{func(pod *v1.Pod, node fwk.NodeInfo) *fwk.Status {
|
|
if node.Node().Name == "a" {
|
|
return fwk.NewStatus(fwk.Success)
|
|
}
|
|
if node.Node().Name == "b" {
|
|
return fwk.NewStatus(fwk.Unschedulable, fmt.Sprintf("node %q is not allowed", node.Node().Name))
|
|
}
|
|
return fwk.NewStatus(fwk.UnschedulableAndUnresolvable, fmt.Sprintf("node %q is not allowed", node.Node().Name))
|
|
}},
|
|
},
|
|
{
|
|
ExtenderName: "FakeExtender1",
|
|
Predicates: []tf.FitPredicate{func(pod *v1.Pod, node fwk.NodeInfo) *fwk.Status {
|
|
if node.Node().Name == "a" {
|
|
return fwk.NewStatus(fwk.Success)
|
|
}
|
|
return fwk.NewStatus(fwk.Unschedulable, fmt.Sprintf("node %q is not allowed", node.Node().Name))
|
|
}},
|
|
},
|
|
},
|
|
nodes: makeNodeList([]string{"a", "b", "c"}),
|
|
filteredNodesStatuses: framework.NewNodeToStatus(make(map[string]*fwk.Status), absentStatus),
|
|
expectsErr: false,
|
|
expectedNodes: makeNodeList([]string{"a"}),
|
|
expectedStatuses: framework.NewNodeToStatus(map[string]*fwk.Status{
|
|
"b": fwk.NewStatus(fwk.Unschedulable, fmt.Sprintf("FakeExtender: node %q failed", "b")),
|
|
"c": fwk.NewStatus(fwk.UnschedulableAndUnresolvable, fmt.Sprintf("FakeExtender: node %q failed and unresolvable", "c")),
|
|
}, absentStatus),
|
|
},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
_, ctx := ktesting.NewTestContext(t)
|
|
var extenders []fwk.Extender
|
|
for ii := range tt.extenders {
|
|
extenders = append(extenders, &tt.extenders[ii])
|
|
}
|
|
|
|
pod := st.MakePod().Name("1").UID("1").Obj()
|
|
got, err := findNodesThatPassExtenders(ctx, extenders, pod, tf.BuildNodeInfos(tt.nodes), tt.filteredNodesStatuses)
|
|
nodes := make([]*v1.Node, len(got))
|
|
for i := 0; i < len(got); i++ {
|
|
nodes[i] = got[i].Node()
|
|
}
|
|
if tt.expectsErr {
|
|
if err == nil {
|
|
t.Error("Unexpected non-error")
|
|
}
|
|
} else {
|
|
if err != nil {
|
|
t.Errorf("Unexpected error: %v", err)
|
|
}
|
|
if diff := cmp.Diff(tt.expectedNodes, nodes); diff != "" {
|
|
t.Errorf("filtered nodes (-want,+got):\n%s", diff)
|
|
}
|
|
if diff := nodeToStatusDiff(tt.expectedStatuses, tt.filteredNodesStatuses); diff != "" {
|
|
t.Errorf("filtered statuses (-want,+got):\n%s", diff)
|
|
}
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestSchedulerSchedulePod(t *testing.T) {
|
|
fts := feature.Features{}
|
|
tests := []struct {
|
|
name string
|
|
registerPlugins []tf.RegisterPluginFunc
|
|
extenders []tf.FakeExtender
|
|
nodes []*v1.Node
|
|
pvcs []v1.PersistentVolumeClaim
|
|
pvs []v1.PersistentVolume
|
|
pod *v1.Pod
|
|
pods []*v1.Pod
|
|
wantNodes sets.Set[string]
|
|
wantEvaluatedNodes *int32
|
|
wErr error
|
|
}{
|
|
{
|
|
registerPlugins: []tf.RegisterPluginFunc{
|
|
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
|
tf.RegisterFilterPlugin("FalseFilter", tf.NewFalseFilterPlugin),
|
|
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
|
},
|
|
nodes: []*v1.Node{
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "node1", Labels: map[string]string{"kubernetes.io/hostname": "node1"}}},
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "node2", Labels: map[string]string{"kubernetes.io/hostname": "node2"}}},
|
|
},
|
|
pod: st.MakePod().Name("2").UID("2").Obj(),
|
|
name: "test 1",
|
|
wErr: &framework.FitError{
|
|
Pod: st.MakePod().Name("2").UID("2").Obj(),
|
|
NumAllNodes: 2,
|
|
Diagnosis: framework.Diagnosis{
|
|
NodeToStatus: framework.NewNodeToStatus(map[string]*fwk.Status{
|
|
"node1": fwk.NewStatus(fwk.Unschedulable, tf.ErrReasonFake).WithPlugin("FalseFilter"),
|
|
"node2": fwk.NewStatus(fwk.Unschedulable, tf.ErrReasonFake).WithPlugin("FalseFilter"),
|
|
}, fwk.NewStatus(fwk.UnschedulableAndUnresolvable)),
|
|
UnschedulablePlugins: sets.New("FalseFilter"),
|
|
},
|
|
},
|
|
},
|
|
{
|
|
registerPlugins: []tf.RegisterPluginFunc{
|
|
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
|
tf.RegisterFilterPlugin("TrueFilter", tf.NewTrueFilterPlugin),
|
|
tf.RegisterScorePlugin("EqualPrioritizerPlugin", tf.NewEqualPrioritizerPlugin(), 1),
|
|
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
|
},
|
|
nodes: []*v1.Node{
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "node1", Labels: map[string]string{"kubernetes.io/hostname": "node1"}}},
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "node2", Labels: map[string]string{"kubernetes.io/hostname": "node2"}}},
|
|
},
|
|
pod: st.MakePod().Name("ignore").UID("ignore").Obj(),
|
|
wantNodes: sets.New("node1", "node2"),
|
|
name: "test 2",
|
|
wErr: nil,
|
|
},
|
|
{
|
|
// Fits on a node where the pod ID matches the node name
|
|
registerPlugins: []tf.RegisterPluginFunc{
|
|
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
|
tf.RegisterFilterPlugin("MatchFilter", tf.NewMatchFilterPlugin),
|
|
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
|
},
|
|
nodes: []*v1.Node{
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "node1", Labels: map[string]string{"kubernetes.io/hostname": "node1"}}},
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "node2", Labels: map[string]string{"kubernetes.io/hostname": "node2"}}},
|
|
},
|
|
pod: st.MakePod().Name("node2").UID("node2").Obj(),
|
|
wantNodes: sets.New("node2"),
|
|
name: "test 3",
|
|
wErr: nil,
|
|
},
|
|
{
|
|
registerPlugins: []tf.RegisterPluginFunc{
|
|
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
|
tf.RegisterFilterPlugin("TrueFilter", tf.NewTrueFilterPlugin),
|
|
tf.RegisterScorePlugin("NumericMap", newNumericMapPlugin(), 1),
|
|
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
|
},
|
|
nodes: []*v1.Node{
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "3", Labels: map[string]string{"kubernetes.io/hostname": "3"}}},
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "2", Labels: map[string]string{"kubernetes.io/hostname": "2"}}},
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "1", Labels: map[string]string{"kubernetes.io/hostname": "1"}}},
|
|
},
|
|
pod: st.MakePod().Name("ignore").UID("ignore").Obj(),
|
|
wantNodes: sets.New("3"),
|
|
name: "test 4",
|
|
wErr: nil,
|
|
},
|
|
{
|
|
registerPlugins: []tf.RegisterPluginFunc{
|
|
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
|
tf.RegisterFilterPlugin("MatchFilter", tf.NewMatchFilterPlugin),
|
|
tf.RegisterScorePlugin("NumericMap", newNumericMapPlugin(), 1),
|
|
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
|
},
|
|
nodes: []*v1.Node{
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "3", Labels: map[string]string{"kubernetes.io/hostname": "3"}}},
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "2", Labels: map[string]string{"kubernetes.io/hostname": "2"}}},
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "1", Labels: map[string]string{"kubernetes.io/hostname": "1"}}},
|
|
},
|
|
pod: st.MakePod().Name("2").UID("2").Obj(),
|
|
wantNodes: sets.New("2"),
|
|
name: "test 5",
|
|
wErr: nil,
|
|
},
|
|
{
|
|
registerPlugins: []tf.RegisterPluginFunc{
|
|
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
|
tf.RegisterFilterPlugin("TrueFilter", tf.NewTrueFilterPlugin),
|
|
tf.RegisterScorePlugin("NumericMap", newNumericMapPlugin(), 1),
|
|
tf.RegisterScorePlugin("ReverseNumericMap", newReverseNumericMapPlugin(), 2),
|
|
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
|
},
|
|
nodes: []*v1.Node{
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "3", Labels: map[string]string{"kubernetes.io/hostname": "3"}}},
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "2", Labels: map[string]string{"kubernetes.io/hostname": "2"}}},
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "1", Labels: map[string]string{"kubernetes.io/hostname": "1"}}},
|
|
},
|
|
pod: st.MakePod().Name("2").UID("2").Obj(),
|
|
wantNodes: sets.New("1"),
|
|
name: "test 6",
|
|
wErr: nil,
|
|
},
|
|
{
|
|
registerPlugins: []tf.RegisterPluginFunc{
|
|
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
|
tf.RegisterFilterPlugin("TrueFilter", tf.NewTrueFilterPlugin),
|
|
tf.RegisterFilterPlugin("FalseFilter", tf.NewFalseFilterPlugin),
|
|
tf.RegisterScorePlugin("NumericMap", newNumericMapPlugin(), 1),
|
|
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
|
},
|
|
nodes: []*v1.Node{
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "3", Labels: map[string]string{"kubernetes.io/hostname": "3"}}},
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "2", Labels: map[string]string{"kubernetes.io/hostname": "2"}}},
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "1", Labels: map[string]string{"kubernetes.io/hostname": "1"}}},
|
|
},
|
|
pod: st.MakePod().Name("2").UID("2").Obj(),
|
|
name: "test 7",
|
|
wErr: &framework.FitError{
|
|
Pod: st.MakePod().Name("2").UID("2").Obj(),
|
|
NumAllNodes: 3,
|
|
Diagnosis: framework.Diagnosis{
|
|
NodeToStatus: framework.NewNodeToStatus(map[string]*fwk.Status{
|
|
"3": fwk.NewStatus(fwk.Unschedulable, tf.ErrReasonFake).WithPlugin("FalseFilter"),
|
|
"2": fwk.NewStatus(fwk.Unschedulable, tf.ErrReasonFake).WithPlugin("FalseFilter"),
|
|
"1": fwk.NewStatus(fwk.Unschedulable, tf.ErrReasonFake).WithPlugin("FalseFilter"),
|
|
}, fwk.NewStatus(fwk.UnschedulableAndUnresolvable)),
|
|
UnschedulablePlugins: sets.New("FalseFilter"),
|
|
},
|
|
},
|
|
},
|
|
{
|
|
registerPlugins: []tf.RegisterPluginFunc{
|
|
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
|
tf.RegisterFilterPlugin("NoPodsFilter", NewNoPodsFilterPlugin),
|
|
tf.RegisterFilterPlugin("MatchFilter", tf.NewMatchFilterPlugin),
|
|
tf.RegisterScorePlugin("NumericMap", newNumericMapPlugin(), 1),
|
|
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
|
},
|
|
pods: []*v1.Pod{
|
|
st.MakePod().Name("2").UID("2").Node("2").Phase(v1.PodRunning).Obj(),
|
|
},
|
|
pod: st.MakePod().Name("2").UID("2").Obj(),
|
|
nodes: []*v1.Node{
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "1", Labels: map[string]string{"kubernetes.io/hostname": "1"}}},
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "2", Labels: map[string]string{"kubernetes.io/hostname": "2"}}},
|
|
},
|
|
name: "test 8",
|
|
wErr: &framework.FitError{
|
|
Pod: st.MakePod().Name("2").UID("2").Obj(),
|
|
NumAllNodes: 2,
|
|
Diagnosis: framework.Diagnosis{
|
|
NodeToStatus: framework.NewNodeToStatus(map[string]*fwk.Status{
|
|
"1": fwk.NewStatus(fwk.Unschedulable, tf.ErrReasonFake).WithPlugin("MatchFilter"),
|
|
"2": fwk.NewStatus(fwk.Unschedulable, tf.ErrReasonFake).WithPlugin("NoPodsFilter"),
|
|
}, fwk.NewStatus(fwk.UnschedulableAndUnresolvable)),
|
|
UnschedulablePlugins: sets.New("MatchFilter", "NoPodsFilter"),
|
|
},
|
|
},
|
|
},
|
|
{
|
|
// Pod with existing PVC
|
|
registerPlugins: []tf.RegisterPluginFunc{
|
|
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
|
tf.RegisterPreFilterPlugin(volumebinding.Name, frameworkruntime.FactoryAdapter(fts, volumebinding.New)),
|
|
tf.RegisterFilterPlugin("TrueFilter", tf.NewTrueFilterPlugin),
|
|
tf.RegisterScorePlugin("EqualPrioritizerPlugin", tf.NewEqualPrioritizerPlugin(), 1),
|
|
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
|
},
|
|
nodes: []*v1.Node{
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "node1", Labels: map[string]string{"kubernetes.io/hostname": "node1"}}},
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "node2", Labels: map[string]string{"kubernetes.io/hostname": "node2"}}},
|
|
},
|
|
pvcs: []v1.PersistentVolumeClaim{
|
|
{
|
|
ObjectMeta: metav1.ObjectMeta{Name: "existingPVC", UID: types.UID("existingPVC"), Namespace: v1.NamespaceDefault},
|
|
Spec: v1.PersistentVolumeClaimSpec{VolumeName: "existingPV"},
|
|
},
|
|
},
|
|
pvs: []v1.PersistentVolume{
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "existingPV"}},
|
|
},
|
|
pod: st.MakePod().Name("ignore").UID("ignore").Namespace(v1.NamespaceDefault).PVC("existingPVC").Obj(),
|
|
wantNodes: sets.New("node1", "node2"),
|
|
name: "existing PVC",
|
|
wErr: nil,
|
|
},
|
|
{
|
|
// Pod with non existing PVC
|
|
registerPlugins: []tf.RegisterPluginFunc{
|
|
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
|
tf.RegisterPreFilterPlugin(volumebinding.Name, frameworkruntime.FactoryAdapter(fts, volumebinding.New)),
|
|
tf.RegisterFilterPlugin("TrueFilter", tf.NewTrueFilterPlugin),
|
|
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
|
},
|
|
nodes: []*v1.Node{
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "node1", Labels: map[string]string{"kubernetes.io/hostname": "node1"}}},
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "node2", Labels: map[string]string{"kubernetes.io/hostname": "node2"}}},
|
|
},
|
|
pod: st.MakePod().Name("ignore").UID("ignore").PVC("unknownPVC").Obj(),
|
|
name: "unknown PVC",
|
|
wErr: &framework.FitError{
|
|
Pod: st.MakePod().Name("ignore").UID("ignore").PVC("unknownPVC").Obj(),
|
|
NumAllNodes: 2,
|
|
Diagnosis: framework.Diagnosis{
|
|
NodeToStatus: framework.NewNodeToStatus(make(map[string]*fwk.Status), fwk.NewStatus(fwk.UnschedulableAndUnresolvable, `persistentvolumeclaim "unknownPVC" not found`).WithPlugin("VolumeBinding")),
|
|
PreFilterMsg: `persistentvolumeclaim "unknownPVC" not found`,
|
|
UnschedulablePlugins: sets.New(volumebinding.Name),
|
|
},
|
|
},
|
|
},
|
|
{
|
|
// Pod with deleting PVC
|
|
registerPlugins: []tf.RegisterPluginFunc{
|
|
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
|
tf.RegisterPreFilterPlugin(volumebinding.Name, frameworkruntime.FactoryAdapter(fts, volumebinding.New)),
|
|
tf.RegisterFilterPlugin("TrueFilter", tf.NewTrueFilterPlugin),
|
|
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
|
},
|
|
nodes: []*v1.Node{
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "node1", Labels: map[string]string{"kubernetes.io/hostname": "node1"}}},
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "node2", Labels: map[string]string{"kubernetes.io/hostname": "node2"}}},
|
|
},
|
|
pvcs: []v1.PersistentVolumeClaim{{ObjectMeta: metav1.ObjectMeta{Name: "existingPVC", UID: types.UID("existingPVC"), Namespace: v1.NamespaceDefault, DeletionTimestamp: &metav1.Time{}}}},
|
|
pod: st.MakePod().Name("ignore").UID("ignore").Namespace(v1.NamespaceDefault).PVC("existingPVC").Obj(),
|
|
name: "deleted PVC",
|
|
wErr: &framework.FitError{
|
|
Pod: st.MakePod().Name("ignore").UID("ignore").Namespace(v1.NamespaceDefault).PVC("existingPVC").Obj(),
|
|
NumAllNodes: 2,
|
|
Diagnosis: framework.Diagnosis{
|
|
NodeToStatus: framework.NewNodeToStatus(make(map[string]*fwk.Status), fwk.NewStatus(fwk.UnschedulableAndUnresolvable, `persistentvolumeclaim "existingPVC" is being deleted`).WithPlugin("VolumeBinding")),
|
|
PreFilterMsg: `persistentvolumeclaim "existingPVC" is being deleted`,
|
|
UnschedulablePlugins: sets.New(volumebinding.Name),
|
|
},
|
|
},
|
|
},
|
|
{
|
|
registerPlugins: []tf.RegisterPluginFunc{
|
|
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
|
tf.RegisterFilterPlugin("TrueFilter", tf.NewTrueFilterPlugin),
|
|
tf.RegisterScorePlugin("FalseMap", newFalseMapPlugin(), 1),
|
|
tf.RegisterScorePlugin("TrueMap", newTrueMapPlugin(), 2),
|
|
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
|
},
|
|
nodes: []*v1.Node{
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "2", Labels: map[string]string{"kubernetes.io/hostname": "2"}}},
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "1", Labels: map[string]string{"kubernetes.io/hostname": "1"}}},
|
|
},
|
|
pod: st.MakePod().Name("2").Obj(),
|
|
name: "test error with priority map",
|
|
wErr: fmt.Errorf("running Score plugins: %w", fmt.Errorf(`plugin "FalseMap" failed with: %w`, errPrioritize)),
|
|
},
|
|
{
|
|
name: "test podtopologyspread plugin - 2 nodes with maxskew=1",
|
|
registerPlugins: []tf.RegisterPluginFunc{
|
|
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
|
tf.RegisterPluginAsExtensions(
|
|
podtopologyspread.Name,
|
|
podTopologySpreadFunc,
|
|
"PreFilter",
|
|
"Filter",
|
|
),
|
|
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
|
},
|
|
nodes: []*v1.Node{
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "node1", Labels: map[string]string{"kubernetes.io/hostname": "node1"}}},
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "node2", Labels: map[string]string{"kubernetes.io/hostname": "node2"}}},
|
|
},
|
|
pod: st.MakePod().Name("p").UID("p").Label("foo", "").SpreadConstraint(1, "kubernetes.io/hostname", v1.DoNotSchedule, &metav1.LabelSelector{
|
|
MatchExpressions: []metav1.LabelSelectorRequirement{
|
|
{
|
|
Key: "foo",
|
|
Operator: metav1.LabelSelectorOpExists,
|
|
},
|
|
},
|
|
}, nil, nil, nil, nil).Obj(),
|
|
pods: []*v1.Pod{
|
|
st.MakePod().Name("pod1").UID("pod1").Label("foo", "").Node("node1").Phase(v1.PodRunning).Obj(),
|
|
},
|
|
wantNodes: sets.New("node2"),
|
|
wErr: nil,
|
|
},
|
|
{
|
|
name: "test podtopologyspread plugin - 3 nodes with maxskew=2",
|
|
registerPlugins: []tf.RegisterPluginFunc{
|
|
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
|
tf.RegisterPluginAsExtensions(
|
|
podtopologyspread.Name,
|
|
podTopologySpreadFunc,
|
|
"PreFilter",
|
|
"Filter",
|
|
),
|
|
tf.RegisterScorePlugin("EqualPrioritizerPlugin", tf.NewEqualPrioritizerPlugin(), 1),
|
|
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
|
},
|
|
nodes: []*v1.Node{
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "node1", Labels: map[string]string{"kubernetes.io/hostname": "node1"}}},
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "node2", Labels: map[string]string{"kubernetes.io/hostname": "node2"}}},
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "node3", Labels: map[string]string{"kubernetes.io/hostname": "node3"}}},
|
|
},
|
|
pod: st.MakePod().Name("p").UID("p").Label("foo", "").SpreadConstraint(2, "kubernetes.io/hostname", v1.DoNotSchedule, &metav1.LabelSelector{
|
|
MatchExpressions: []metav1.LabelSelectorRequirement{
|
|
{
|
|
Key: "foo",
|
|
Operator: metav1.LabelSelectorOpExists,
|
|
},
|
|
},
|
|
}, nil, nil, nil, nil).Obj(),
|
|
pods: []*v1.Pod{
|
|
st.MakePod().Name("pod1a").UID("pod1a").Label("foo", "").Node("node1").Phase(v1.PodRunning).Obj(),
|
|
st.MakePod().Name("pod1b").UID("pod1b").Label("foo", "").Node("node1").Phase(v1.PodRunning).Obj(),
|
|
st.MakePod().Name("pod2").UID("pod2").Label("foo", "").Node("node2").Phase(v1.PodRunning).Obj(),
|
|
},
|
|
wantNodes: sets.New("node2", "node3"),
|
|
wErr: nil,
|
|
},
|
|
{
|
|
name: "test with filter plugin returning Unschedulable status",
|
|
registerPlugins: []tf.RegisterPluginFunc{
|
|
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
|
tf.RegisterFilterPlugin(
|
|
"FakeFilter",
|
|
tf.NewFakeFilterPlugin(map[string]fwk.Code{"3": fwk.Unschedulable}),
|
|
),
|
|
tf.RegisterScorePlugin("NumericMap", newNumericMapPlugin(), 1),
|
|
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
|
},
|
|
nodes: []*v1.Node{
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "3", Labels: map[string]string{"kubernetes.io/hostname": "3"}}},
|
|
},
|
|
pod: st.MakePod().Name("test-filter").UID("test-filter").Obj(),
|
|
wantNodes: nil,
|
|
wErr: &framework.FitError{
|
|
Pod: st.MakePod().Name("test-filter").UID("test-filter").Obj(),
|
|
NumAllNodes: 1,
|
|
Diagnosis: framework.Diagnosis{
|
|
NodeToStatus: framework.NewNodeToStatus(map[string]*fwk.Status{
|
|
"3": fwk.NewStatus(fwk.Unschedulable, "injecting failure for pod test-filter").WithPlugin("FakeFilter"),
|
|
}, fwk.NewStatus(fwk.UnschedulableAndUnresolvable)),
|
|
UnschedulablePlugins: sets.New("FakeFilter"),
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "test with extender which filters out some Nodes",
|
|
registerPlugins: []tf.RegisterPluginFunc{
|
|
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
|
tf.RegisterFilterPlugin(
|
|
"FakeFilter",
|
|
tf.NewFakeFilterPlugin(map[string]fwk.Code{"3": fwk.Unschedulable}),
|
|
),
|
|
tf.RegisterScorePlugin("NumericMap", newNumericMapPlugin(), 1),
|
|
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
|
},
|
|
extenders: []tf.FakeExtender{
|
|
{
|
|
ExtenderName: "FakeExtender1",
|
|
Predicates: []tf.FitPredicate{tf.FalsePredicateExtender},
|
|
},
|
|
},
|
|
nodes: []*v1.Node{
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "1", Labels: map[string]string{"kubernetes.io/hostname": "1"}}},
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "2", Labels: map[string]string{"kubernetes.io/hostname": "2"}}},
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "3", Labels: map[string]string{"kubernetes.io/hostname": "3"}}},
|
|
},
|
|
pod: st.MakePod().Name("test-filter").UID("test-filter").Obj(),
|
|
wantNodes: nil,
|
|
wErr: &framework.FitError{
|
|
Pod: st.MakePod().Name("test-filter").UID("test-filter").Obj(),
|
|
NumAllNodes: 3,
|
|
Diagnosis: framework.Diagnosis{
|
|
NodeToStatus: framework.NewNodeToStatus(map[string]*fwk.Status{
|
|
"1": fwk.NewStatus(fwk.Unschedulable, `FakeExtender: node "1" failed`),
|
|
"2": fwk.NewStatus(fwk.Unschedulable, `FakeExtender: node "2" failed`),
|
|
"3": fwk.NewStatus(fwk.Unschedulable, "injecting failure for pod test-filter").WithPlugin("FakeFilter"),
|
|
}, fwk.NewStatus(fwk.UnschedulableAndUnresolvable)),
|
|
UnschedulablePlugins: sets.New("FakeFilter", framework.ExtenderName),
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "test with filter plugin returning UnschedulableAndUnresolvable status",
|
|
registerPlugins: []tf.RegisterPluginFunc{
|
|
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
|
tf.RegisterFilterPlugin(
|
|
"FakeFilter",
|
|
tf.NewFakeFilterPlugin(map[string]fwk.Code{"3": fwk.UnschedulableAndUnresolvable}),
|
|
),
|
|
tf.RegisterScorePlugin("NumericMap", newNumericMapPlugin(), 1),
|
|
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
|
},
|
|
nodes: []*v1.Node{
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "3", Labels: map[string]string{"kubernetes.io/hostname": "3"}}},
|
|
},
|
|
pod: st.MakePod().Name("test-filter").UID("test-filter").Obj(),
|
|
wantNodes: nil,
|
|
wErr: &framework.FitError{
|
|
Pod: st.MakePod().Name("test-filter").UID("test-filter").Obj(),
|
|
NumAllNodes: 1,
|
|
Diagnosis: framework.Diagnosis{
|
|
NodeToStatus: framework.NewNodeToStatus(map[string]*fwk.Status{
|
|
"3": fwk.NewStatus(fwk.UnschedulableAndUnresolvable, "injecting failure for pod test-filter").WithPlugin("FakeFilter"),
|
|
}, fwk.NewStatus(fwk.UnschedulableAndUnresolvable)),
|
|
UnschedulablePlugins: sets.New("FakeFilter"),
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "test with partial failed filter plugin",
|
|
registerPlugins: []tf.RegisterPluginFunc{
|
|
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
|
tf.RegisterFilterPlugin(
|
|
"FakeFilter",
|
|
tf.NewFakeFilterPlugin(map[string]fwk.Code{"1": fwk.Unschedulable}),
|
|
),
|
|
tf.RegisterScorePlugin("NumericMap", newNumericMapPlugin(), 1),
|
|
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
|
},
|
|
nodes: []*v1.Node{
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "1", Labels: map[string]string{"kubernetes.io/hostname": "1"}}},
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "2", Labels: map[string]string{"kubernetes.io/hostname": "2"}}},
|
|
},
|
|
pod: st.MakePod().Name("test-filter").UID("test-filter").Obj(),
|
|
wantNodes: nil,
|
|
wErr: nil,
|
|
},
|
|
{
|
|
name: "test prefilter plugin returning Unschedulable status",
|
|
registerPlugins: []tf.RegisterPluginFunc{
|
|
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
|
tf.RegisterPreFilterPlugin(
|
|
"FakePreFilter",
|
|
tf.NewFakePreFilterPlugin("FakePreFilter", nil, fwk.NewStatus(fwk.UnschedulableAndUnresolvable, "injected unschedulable status")),
|
|
),
|
|
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
|
},
|
|
nodes: []*v1.Node{
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "1", Labels: map[string]string{"kubernetes.io/hostname": "1"}}},
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "2", Labels: map[string]string{"kubernetes.io/hostname": "2"}}},
|
|
},
|
|
pod: st.MakePod().Name("test-prefilter").UID("test-prefilter").Obj(),
|
|
wantNodes: nil,
|
|
wErr: &framework.FitError{
|
|
Pod: st.MakePod().Name("test-prefilter").UID("test-prefilter").Obj(),
|
|
NumAllNodes: 2,
|
|
Diagnosis: framework.Diagnosis{
|
|
NodeToStatus: framework.NewNodeToStatus(make(map[string]*fwk.Status), fwk.NewStatus(fwk.UnschedulableAndUnresolvable, "injected unschedulable status").WithPlugin("FakePreFilter")),
|
|
PreFilterMsg: "injected unschedulable status",
|
|
UnschedulablePlugins: sets.New("FakePreFilter"),
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "test prefilter plugin returning error status",
|
|
registerPlugins: []tf.RegisterPluginFunc{
|
|
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
|
tf.RegisterPreFilterPlugin(
|
|
"FakePreFilter",
|
|
tf.NewFakePreFilterPlugin("FakePreFilter", nil, fwk.NewStatus(fwk.Error, "injected error status")),
|
|
),
|
|
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
|
},
|
|
nodes: []*v1.Node{
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "1", Labels: map[string]string{"kubernetes.io/hostname": "1"}}},
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "2", Labels: map[string]string{"kubernetes.io/hostname": "2"}}},
|
|
},
|
|
pod: st.MakePod().Name("test-prefilter").UID("test-prefilter").Obj(),
|
|
wantNodes: nil,
|
|
wErr: fmt.Errorf(`running PreFilter plugin "FakePreFilter": %w`, errors.New("injected error status")),
|
|
},
|
|
{
|
|
name: "test prefilter plugin returning node",
|
|
registerPlugins: []tf.RegisterPluginFunc{
|
|
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
|
tf.RegisterPreFilterPlugin(
|
|
"FakePreFilter1",
|
|
tf.NewFakePreFilterPlugin("FakePreFilter1", nil, nil),
|
|
),
|
|
tf.RegisterPreFilterPlugin(
|
|
"FakePreFilter2",
|
|
tf.NewFakePreFilterPlugin("FakePreFilter2", &fwk.PreFilterResult{NodeNames: sets.New("node2")}, nil),
|
|
),
|
|
tf.RegisterPreFilterPlugin(
|
|
"FakePreFilter3",
|
|
tf.NewFakePreFilterPlugin("FakePreFilter3", &fwk.PreFilterResult{NodeNames: sets.New("node1", "node2")}, nil),
|
|
),
|
|
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
|
},
|
|
nodes: []*v1.Node{
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "node1", Labels: map[string]string{"kubernetes.io/hostname": "node1"}}},
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "node2", Labels: map[string]string{"kubernetes.io/hostname": "node2"}}},
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "node3", Labels: map[string]string{"kubernetes.io/hostname": "node3"}}},
|
|
},
|
|
pod: st.MakePod().Name("test-prefilter").UID("test-prefilter").Obj(),
|
|
wantNodes: sets.New("node2"),
|
|
// since this case has no score plugin, we'll only try to find one node in Filter stage
|
|
wantEvaluatedNodes: ptr.To[int32](1),
|
|
},
|
|
{
|
|
name: "test prefilter plugin returning non-intersecting nodes",
|
|
registerPlugins: []tf.RegisterPluginFunc{
|
|
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
|
tf.RegisterPreFilterPlugin(
|
|
"FakePreFilter1",
|
|
tf.NewFakePreFilterPlugin("FakePreFilter1", nil, nil),
|
|
),
|
|
tf.RegisterPreFilterPlugin(
|
|
"FakePreFilter2",
|
|
tf.NewFakePreFilterPlugin("FakePreFilter2", &fwk.PreFilterResult{NodeNames: sets.New("node2")}, nil),
|
|
),
|
|
tf.RegisterPreFilterPlugin(
|
|
"FakePreFilter3",
|
|
tf.NewFakePreFilterPlugin("FakePreFilter3", &fwk.PreFilterResult{NodeNames: sets.New("node1")}, nil),
|
|
),
|
|
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
|
},
|
|
nodes: []*v1.Node{
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "node1", Labels: map[string]string{"kubernetes.io/hostname": "node1"}}},
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "node2", Labels: map[string]string{"kubernetes.io/hostname": "node2"}}},
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "node3", Labels: map[string]string{"kubernetes.io/hostname": "node3"}}},
|
|
},
|
|
pod: st.MakePod().Name("test-prefilter").UID("test-prefilter").Obj(),
|
|
wErr: &framework.FitError{
|
|
Pod: st.MakePod().Name("test-prefilter").UID("test-prefilter").Obj(),
|
|
NumAllNodes: 3,
|
|
Diagnosis: framework.Diagnosis{
|
|
NodeToStatus: framework.NewNodeToStatus(make(map[string]*fwk.Status), fwk.NewStatus(fwk.UnschedulableAndUnresolvable, "node(s) didn't satisfy plugin(s) [FakePreFilter2 FakePreFilter3] simultaneously")),
|
|
UnschedulablePlugins: sets.New("FakePreFilter2", "FakePreFilter3"),
|
|
PreFilterMsg: "node(s) didn't satisfy plugin(s) [FakePreFilter2 FakePreFilter3] simultaneously",
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "test prefilter plugin returning empty node set",
|
|
registerPlugins: []tf.RegisterPluginFunc{
|
|
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
|
tf.RegisterPreFilterPlugin(
|
|
"FakePreFilter1",
|
|
tf.NewFakePreFilterPlugin("FakePreFilter1", nil, nil),
|
|
),
|
|
tf.RegisterPreFilterPlugin(
|
|
"FakePreFilter2",
|
|
tf.NewFakePreFilterPlugin("FakePreFilter2", &fwk.PreFilterResult{NodeNames: sets.New[string]()}, nil),
|
|
),
|
|
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
|
},
|
|
nodes: []*v1.Node{
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "node1", Labels: map[string]string{"kubernetes.io/hostname": "node1"}}},
|
|
},
|
|
pod: st.MakePod().Name("test-prefilter").UID("test-prefilter").Obj(),
|
|
wErr: &framework.FitError{
|
|
Pod: st.MakePod().Name("test-prefilter").UID("test-prefilter").Obj(),
|
|
NumAllNodes: 1,
|
|
Diagnosis: framework.Diagnosis{
|
|
NodeToStatus: framework.NewNodeToStatus(make(map[string]*fwk.Status), fwk.NewStatus(fwk.UnschedulableAndUnresolvable, "node(s) didn't satisfy plugin FakePreFilter2")),
|
|
UnschedulablePlugins: sets.New("FakePreFilter2"),
|
|
PreFilterMsg: "node(s) didn't satisfy plugin FakePreFilter2",
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "test some nodes are filtered out by prefilter plugin and other are filtered out by filter plugin",
|
|
registerPlugins: []tf.RegisterPluginFunc{
|
|
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
|
tf.RegisterPreFilterPlugin(
|
|
"FakePreFilter",
|
|
tf.NewFakePreFilterPlugin("FakePreFilter", &fwk.PreFilterResult{NodeNames: sets.New[string]("node2")}, nil),
|
|
),
|
|
tf.RegisterFilterPlugin(
|
|
"FakeFilter",
|
|
tf.NewFakeFilterPlugin(map[string]fwk.Code{"node2": fwk.Unschedulable}),
|
|
),
|
|
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
|
},
|
|
nodes: []*v1.Node{
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "node1", Labels: map[string]string{"kubernetes.io/hostname": "node1"}}},
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "node2", Labels: map[string]string{"kubernetes.io/hostname": "node2"}}},
|
|
},
|
|
pod: st.MakePod().Name("test-prefilter").UID("test-prefilter").Obj(),
|
|
wErr: &framework.FitError{
|
|
Pod: st.MakePod().Name("test-prefilter").UID("test-prefilter").Obj(),
|
|
NumAllNodes: 2,
|
|
Diagnosis: framework.Diagnosis{
|
|
NodeToStatus: framework.NewNodeToStatus(map[string]*fwk.Status{
|
|
"node2": fwk.NewStatus(fwk.Unschedulable, "injecting failure for pod test-prefilter").WithPlugin("FakeFilter"),
|
|
}, fwk.NewStatus(fwk.UnschedulableAndUnresolvable, "node(s) didn't satisfy plugin(s) [FakePreFilter]")),
|
|
UnschedulablePlugins: sets.New("FakePreFilter", "FakeFilter"),
|
|
PreFilterMsg: "",
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "test prefilter plugin returning skip",
|
|
registerPlugins: []tf.RegisterPluginFunc{
|
|
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
|
tf.RegisterPreFilterPlugin(
|
|
"FakePreFilter1",
|
|
tf.NewFakePreFilterPlugin("FakeFilter1", nil, nil),
|
|
),
|
|
tf.RegisterFilterPlugin(
|
|
"FakeFilter1",
|
|
tf.NewFakeFilterPlugin(map[string]fwk.Code{
|
|
"node1": fwk.Unschedulable,
|
|
}),
|
|
),
|
|
tf.RegisterPluginAsExtensions("FakeFilter2", func(_ context.Context, configuration runtime.Object, f fwk.Handle) (fwk.Plugin, error) {
|
|
return tf.FakePreFilterAndFilterPlugin{
|
|
FakePreFilterPlugin: &tf.FakePreFilterPlugin{
|
|
Result: nil,
|
|
Status: fwk.NewStatus(fwk.Skip),
|
|
},
|
|
FakeFilterPlugin: &tf.FakeFilterPlugin{
|
|
// This Filter plugin shouldn't be executed in the Filter extension point due to skip.
|
|
// To confirm that, return the status code Error to all Nodes.
|
|
FailedNodeReturnCodeMap: map[string]fwk.Code{
|
|
"node1": fwk.Error, "node2": fwk.Error, "node3": fwk.Error,
|
|
},
|
|
},
|
|
}, nil
|
|
}, "PreFilter", "Filter"),
|
|
tf.RegisterScorePlugin("EqualPrioritizerPlugin", tf.NewEqualPrioritizerPlugin(), 1),
|
|
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
|
},
|
|
nodes: []*v1.Node{
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "node1", Labels: map[string]string{"kubernetes.io/hostname": "node1"}}},
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "node2", Labels: map[string]string{"kubernetes.io/hostname": "node2"}}},
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "node3", Labels: map[string]string{"kubernetes.io/hostname": "node3"}}},
|
|
},
|
|
pod: st.MakePod().Name("test-prefilter").UID("test-prefilter").Obj(),
|
|
wantNodes: sets.New("node2", "node3"),
|
|
wantEvaluatedNodes: ptr.To[int32](3),
|
|
},
|
|
{
|
|
name: "test all prescore plugins return skip",
|
|
registerPlugins: []tf.RegisterPluginFunc{
|
|
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
|
tf.RegisterFilterPlugin("TrueFilter", tf.NewTrueFilterPlugin),
|
|
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
|
tf.RegisterPluginAsExtensions("FakePreScoreAndScorePlugin", tf.NewFakePreScoreAndScorePlugin("FakePreScoreAndScorePlugin", 0,
|
|
fwk.NewStatus(fwk.Skip, "fake skip"),
|
|
fwk.NewStatus(fwk.Error, "this score function shouldn't be executed because this plugin returned Skip in the PreScore"),
|
|
), "PreScore", "Score"),
|
|
},
|
|
nodes: []*v1.Node{
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "node1", Labels: map[string]string{"kubernetes.io/hostname": "node1"}}},
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "node2", Labels: map[string]string{"kubernetes.io/hostname": "node2"}}},
|
|
},
|
|
pod: st.MakePod().Name("ignore").UID("ignore").Obj(),
|
|
wantNodes: sets.New("node1", "node2"),
|
|
},
|
|
{
|
|
name: "test without score plugin no extra nodes are evaluated",
|
|
registerPlugins: []tf.RegisterPluginFunc{
|
|
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
|
tf.RegisterFilterPlugin("TrueFilter", tf.NewTrueFilterPlugin),
|
|
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
|
},
|
|
nodes: []*v1.Node{
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "node1", Labels: map[string]string{"kubernetes.io/hostname": "node1"}}},
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "node2", Labels: map[string]string{"kubernetes.io/hostname": "node2"}}},
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "node3", Labels: map[string]string{"kubernetes.io/hostname": "node3"}}},
|
|
},
|
|
pod: st.MakePod().Name("pod1").UID("pod1").Obj(),
|
|
wantNodes: sets.New("node1", "node2", "node3"),
|
|
wantEvaluatedNodes: ptr.To[int32](1),
|
|
},
|
|
{
|
|
name: "test no score plugin, prefilter plugin returning 2 nodes",
|
|
registerPlugins: []tf.RegisterPluginFunc{
|
|
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
|
tf.RegisterPreFilterPlugin(
|
|
"FakePreFilter",
|
|
tf.NewFakePreFilterPlugin("FakePreFilter", &fwk.PreFilterResult{NodeNames: sets.New("node1", "node2")}, nil),
|
|
),
|
|
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
|
},
|
|
nodes: []*v1.Node{
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "node1", Labels: map[string]string{"kubernetes.io/hostname": "node1"}}},
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "node2", Labels: map[string]string{"kubernetes.io/hostname": "node2"}}},
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "node3", Labels: map[string]string{"kubernetes.io/hostname": "node3"}}},
|
|
},
|
|
pod: st.MakePod().Name("test-prefilter").UID("test-prefilter").Obj(),
|
|
wantNodes: sets.New("node1", "node2"),
|
|
// since this case has no score plugin, we'll only try to find one node in Filter stage
|
|
wantEvaluatedNodes: ptr.To[int32](1),
|
|
},
|
|
{
|
|
name: "test prefilter plugin returned an invalid node",
|
|
registerPlugins: []tf.RegisterPluginFunc{
|
|
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
|
tf.RegisterPreFilterPlugin(
|
|
"FakePreFilter",
|
|
tf.NewFakePreFilterPlugin("FakePreFilter", &fwk.PreFilterResult{
|
|
NodeNames: sets.New("invalid-node"),
|
|
}, nil),
|
|
),
|
|
tf.RegisterFilterPlugin("TrueFilter", tf.NewTrueFilterPlugin),
|
|
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
|
},
|
|
nodes: []*v1.Node{
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "1", Labels: map[string]string{"kubernetes.io/hostname": "1"}}}, {ObjectMeta: metav1.ObjectMeta{Name: "2", Labels: map[string]string{"kubernetes.io/hostname": "2"}}},
|
|
},
|
|
pod: st.MakePod().Name("test-prefilter").UID("test-prefilter").Obj(),
|
|
wantNodes: nil,
|
|
wErr: &framework.FitError{
|
|
Pod: st.MakePod().Name("test-prefilter").UID("test-prefilter").Obj(),
|
|
NumAllNodes: 2,
|
|
Diagnosis: framework.Diagnosis{
|
|
NodeToStatus: framework.NewNodeToStatus(make(map[string]*fwk.Status), fwk.NewStatus(fwk.UnschedulableAndUnresolvable, "node(s) didn't satisfy plugin(s) [FakePreFilter]")),
|
|
UnschedulablePlugins: sets.New("FakePreFilter"),
|
|
},
|
|
},
|
|
},
|
|
{
|
|
registerPlugins: []tf.RegisterPluginFunc{
|
|
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
|
tf.RegisterPreFilterPlugin(volumebinding.Name, frameworkruntime.FactoryAdapter(fts, volumebinding.New)),
|
|
tf.RegisterFilterPlugin("TrueFilter", tf.NewTrueFilterPlugin),
|
|
tf.RegisterScorePlugin("EqualPrioritizerPlugin", tf.NewEqualPrioritizerPlugin(), 1),
|
|
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
|
},
|
|
nodes: []*v1.Node{
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "node1", Labels: map[string]string{"kubernetes.io/hostname": "host1"}}},
|
|
},
|
|
pvcs: []v1.PersistentVolumeClaim{
|
|
{
|
|
ObjectMeta: metav1.ObjectMeta{Name: "PVC1", UID: types.UID("PVC1"), Namespace: v1.NamespaceDefault},
|
|
Spec: v1.PersistentVolumeClaimSpec{VolumeName: "PV1"},
|
|
},
|
|
},
|
|
pvs: []v1.PersistentVolume{
|
|
{
|
|
ObjectMeta: metav1.ObjectMeta{Name: "PV1", UID: types.UID("PV1")},
|
|
Spec: v1.PersistentVolumeSpec{
|
|
NodeAffinity: &v1.VolumeNodeAffinity{
|
|
Required: &v1.NodeSelector{
|
|
NodeSelectorTerms: []v1.NodeSelectorTerm{
|
|
{
|
|
MatchExpressions: []v1.NodeSelectorRequirement{
|
|
{
|
|
Key: "kubernetes.io/hostname",
|
|
Operator: v1.NodeSelectorOpIn,
|
|
Values: []string{"host1"},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
pod: st.MakePod().Name("pod1").UID("pod1").Namespace(v1.NamespaceDefault).PVC("PVC1").Obj(),
|
|
wantNodes: sets.New("node1"),
|
|
name: "hostname and nodename of the node do not match",
|
|
wErr: nil,
|
|
},
|
|
}
|
|
for _, test := range tests {
|
|
t.Run(test.name, func(t *testing.T) {
|
|
logger, ctx := ktesting.NewTestContext(t)
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
|
|
cache := internalcache.New(ctx, time.Duration(0), nil)
|
|
for _, pod := range test.pods {
|
|
cache.AddPod(logger, pod)
|
|
}
|
|
var nodes []*v1.Node
|
|
for _, node := range test.nodes {
|
|
nodes = append(nodes, node)
|
|
cache.AddNode(logger, node)
|
|
}
|
|
|
|
cs := clientsetfake.NewClientset()
|
|
informerFactory := informers.NewSharedInformerFactory(cs, 0)
|
|
for _, pvc := range test.pvcs {
|
|
metav1.SetMetaDataAnnotation(&pvc.ObjectMeta, volume.AnnBindCompleted, "true")
|
|
cs.CoreV1().PersistentVolumeClaims(pvc.Namespace).Create(ctx, &pvc, metav1.CreateOptions{})
|
|
}
|
|
for _, pv := range test.pvs {
|
|
_, _ = cs.CoreV1().PersistentVolumes().Create(ctx, &pv, metav1.CreateOptions{})
|
|
}
|
|
snapshot := internalcache.NewSnapshot(test.pods, nodes)
|
|
schedFramework, err := tf.NewFramework(
|
|
ctx,
|
|
test.registerPlugins, "",
|
|
frameworkruntime.WithSnapshotSharedLister(snapshot),
|
|
frameworkruntime.WithInformerFactory(informerFactory),
|
|
frameworkruntime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)),
|
|
)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
var extenders []fwk.Extender
|
|
for ii := range test.extenders {
|
|
extenders = append(extenders, &test.extenders[ii])
|
|
}
|
|
sched := &Scheduler{
|
|
Cache: cache,
|
|
nodeInfoSnapshot: snapshot,
|
|
percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
|
|
Extenders: extenders,
|
|
}
|
|
sched.applyDefaultHandlers()
|
|
|
|
informerFactory.Start(ctx.Done())
|
|
informerFactory.WaitForCacheSync(ctx.Done())
|
|
|
|
result, err := sched.SchedulePod(ctx, schedFramework, framework.NewCycleState(), test.pod)
|
|
if err != test.wErr {
|
|
gotFitErr, gotOK := err.(*framework.FitError)
|
|
wantFitErr, wantOK := test.wErr.(*framework.FitError)
|
|
if gotOK != wantOK {
|
|
t.Errorf("Expected err to be FitError: %v, but got %v (error: %v)", wantOK, gotOK, err)
|
|
} else if gotOK {
|
|
if diff := cmp.Diff(wantFitErr, gotFitErr, schedulerCmpOpts...); diff != "" {
|
|
t.Errorf("Unexpected fitErr for map (-want, +got):\n%s", diff)
|
|
}
|
|
if diff := nodeToStatusDiff(wantFitErr.Diagnosis.NodeToStatus, gotFitErr.Diagnosis.NodeToStatus); diff != "" {
|
|
t.Errorf("Unexpected nodeToStatus within fitErr for map: (-want, +got):\n%s", diff)
|
|
}
|
|
}
|
|
}
|
|
if test.wantNodes != nil && !test.wantNodes.Has(result.SuggestedHost) {
|
|
t.Errorf("Expected: %s, got: %s", test.wantNodes, result.SuggestedHost)
|
|
}
|
|
wantEvaluatedNodes := len(test.nodes)
|
|
if test.wantEvaluatedNodes != nil {
|
|
wantEvaluatedNodes = int(*test.wantEvaluatedNodes)
|
|
}
|
|
if test.wErr == nil && wantEvaluatedNodes != result.EvaluatedNodes {
|
|
t.Errorf("Expected EvaluatedNodes: %d, got: %d", wantEvaluatedNodes, result.EvaluatedNodes)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestFindFitAllError(t *testing.T) {
|
|
_, ctx := ktesting.NewTestContext(t)
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
|
|
nodes := makeNodeList([]string{"3", "2", "1"})
|
|
scheduler := makeScheduler(ctx, nodes)
|
|
|
|
schedFramework, err := tf.NewFramework(
|
|
ctx,
|
|
[]tf.RegisterPluginFunc{
|
|
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
|
tf.RegisterFilterPlugin("TrueFilter", tf.NewTrueFilterPlugin),
|
|
tf.RegisterFilterPlugin("MatchFilter", tf.NewMatchFilterPlugin),
|
|
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
|
},
|
|
"",
|
|
frameworkruntime.WithPodNominator(internalqueue.NewTestQueue(ctx, nil)),
|
|
frameworkruntime.WithSnapshotSharedLister(scheduler.nodeInfoSnapshot),
|
|
)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
_, diagnosis, err := scheduler.findNodesThatFitPod(ctx, schedFramework, framework.NewCycleState(), &v1.Pod{})
|
|
if err != nil {
|
|
t.Errorf("unexpected error: %v", err)
|
|
}
|
|
|
|
expected := framework.Diagnosis{
|
|
NodeToStatus: framework.NewNodeToStatus(map[string]*fwk.Status{
|
|
"1": fwk.NewStatus(fwk.Unschedulable, tf.ErrReasonFake).WithPlugin("MatchFilter"),
|
|
"2": fwk.NewStatus(fwk.Unschedulable, tf.ErrReasonFake).WithPlugin("MatchFilter"),
|
|
"3": fwk.NewStatus(fwk.Unschedulable, tf.ErrReasonFake).WithPlugin("MatchFilter"),
|
|
}, fwk.NewStatus(fwk.UnschedulableAndUnresolvable)),
|
|
UnschedulablePlugins: sets.New("MatchFilter"),
|
|
}
|
|
if diff := cmp.Diff(expected, diagnosis, schedulerCmpOpts...); diff != "" {
|
|
t.Errorf("Unexpected diagnosis (-want, +got):\n%s", diff)
|
|
}
|
|
if diff := nodeToStatusDiff(expected.NodeToStatus, diagnosis.NodeToStatus); diff != "" {
|
|
t.Errorf("Unexpected nodeToStatus within diagnosis: (-want, +got):\n%s", diff)
|
|
}
|
|
}
|
|
|
|
func TestFindFitSomeError(t *testing.T) {
|
|
_, ctx := ktesting.NewTestContext(t)
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
|
|
nodes := makeNodeList([]string{"3", "2", "1"})
|
|
scheduler := makeScheduler(ctx, nodes)
|
|
|
|
fwk, err := tf.NewFramework(
|
|
ctx,
|
|
[]tf.RegisterPluginFunc{
|
|
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
|
tf.RegisterFilterPlugin("TrueFilter", tf.NewTrueFilterPlugin),
|
|
tf.RegisterFilterPlugin("MatchFilter", tf.NewMatchFilterPlugin),
|
|
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
|
},
|
|
"",
|
|
frameworkruntime.WithPodNominator(internalqueue.NewTestQueue(ctx, nil)),
|
|
frameworkruntime.WithSnapshotSharedLister(scheduler.nodeInfoSnapshot),
|
|
)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
pod := st.MakePod().Name("1").UID("1").Obj()
|
|
_, diagnosis, err := scheduler.findNodesThatFitPod(ctx, fwk, framework.NewCycleState(), pod)
|
|
if err != nil {
|
|
t.Errorf("unexpected error: %v", err)
|
|
}
|
|
|
|
if diagnosis.NodeToStatus.Len() != len(nodes)-1 {
|
|
t.Errorf("unexpected failed status map: %v", diagnosis.NodeToStatus)
|
|
}
|
|
|
|
if diff := cmp.Diff(sets.New("MatchFilter"), diagnosis.UnschedulablePlugins); diff != "" {
|
|
t.Errorf("Unexpected unschedulablePlugins: (-want, +got):\n%s", diff)
|
|
}
|
|
|
|
for _, node := range nodes {
|
|
if node.Name == pod.Name {
|
|
continue
|
|
}
|
|
t.Run(node.Name, func(t *testing.T) {
|
|
status := diagnosis.NodeToStatus.Get(node.Name)
|
|
reasons := status.Reasons()
|
|
if len(reasons) != 1 || reasons[0] != tf.ErrReasonFake {
|
|
t.Errorf("unexpected failures: %v", reasons)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestFindFitPredicateCallCounts(t *testing.T) {
|
|
tests := []struct {
|
|
name string
|
|
pod *v1.Pod
|
|
expectedCount int32
|
|
}{
|
|
{
|
|
name: "nominated pods have lower priority, predicate is called once",
|
|
pod: st.MakePod().Name("1").UID("1").Priority(highPriority).Obj(),
|
|
expectedCount: 1,
|
|
},
|
|
{
|
|
name: "nominated pods have higher priority, predicate is called twice",
|
|
pod: st.MakePod().Name("1").UID("1").Priority(lowPriority).Obj(),
|
|
expectedCount: 2,
|
|
},
|
|
}
|
|
|
|
for _, test := range tests {
|
|
t.Run(test.name, func(t *testing.T) {
|
|
nodes := makeNodeList([]string{"1"})
|
|
|
|
plugin := tf.FakeFilterPlugin{}
|
|
registerFakeFilterFunc := tf.RegisterFilterPlugin(
|
|
"FakeFilter",
|
|
func(_ context.Context, _ runtime.Object, fh fwk.Handle) (fwk.Plugin, error) {
|
|
return &plugin, nil
|
|
},
|
|
)
|
|
registerPlugins := []tf.RegisterPluginFunc{
|
|
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
|
registerFakeFilterFunc,
|
|
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
|
}
|
|
logger, ctx := ktesting.NewTestContext(t)
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
|
|
informerFactory := informers.NewSharedInformerFactory(clientsetfake.NewClientset(), 0)
|
|
podInformer := informerFactory.Core().V1().Pods().Informer()
|
|
err := podInformer.GetStore().Add(test.pod)
|
|
if err != nil {
|
|
t.Fatalf("Error adding pod to podInformer: %s", err)
|
|
}
|
|
scheduler := makeScheduler(ctx, nodes)
|
|
if err := scheduler.Cache.UpdateSnapshot(logger, scheduler.nodeInfoSnapshot); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
schedFramework, err := tf.NewFramework(
|
|
ctx,
|
|
registerPlugins, "",
|
|
frameworkruntime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)),
|
|
frameworkruntime.WithSnapshotSharedLister(scheduler.nodeInfoSnapshot),
|
|
)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
podinfo, err := framework.NewPodInfo(st.MakePod().UID("nominated").Priority(midPriority).Obj())
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
err = podInformer.GetStore().Add(podinfo.Pod)
|
|
if err != nil {
|
|
t.Fatalf("Error adding nominated pod to podInformer: %s", err)
|
|
}
|
|
schedFramework.AddNominatedPod(logger, podinfo, &fwk.NominatingInfo{NominatingMode: fwk.ModeOverride, NominatedNodeName: "1"})
|
|
|
|
_, _, err = scheduler.findNodesThatFitPod(ctx, schedFramework, framework.NewCycleState(), test.pod)
|
|
if err != nil {
|
|
t.Errorf("unexpected error: %v", err)
|
|
}
|
|
if test.expectedCount != plugin.NumFilterCalled {
|
|
t.Errorf("predicate was called %d times, expected is %d", plugin.NumFilterCalled, test.expectedCount)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
// The point of this test is to show that you:
|
|
// - get the same priority for a zero-request pod as for a pod with the defaults requests,
|
|
// both when the zero-request pod is already on the node and when the zero-request pod
|
|
// is the one being scheduled.
|
|
// - don't get the same score no matter what we schedule.
|
|
func TestZeroRequest(t *testing.T) {
|
|
// A pod with no resources. We expect spreading to count it as having the default resources.
|
|
noResources := v1.PodSpec{
|
|
Containers: []v1.Container{
|
|
{},
|
|
},
|
|
}
|
|
noResources1 := noResources
|
|
noResources1.NodeName = "node1"
|
|
// A pod with the same resources as a 0-request pod gets by default as its resources (for spreading).
|
|
small := v1.PodSpec{
|
|
Containers: []v1.Container{
|
|
{
|
|
Resources: v1.ResourceRequirements{
|
|
Requests: v1.ResourceList{
|
|
v1.ResourceCPU: resource.MustParse(
|
|
strconv.FormatInt(schedutil.DefaultMilliCPURequest, 10) + "m"),
|
|
v1.ResourceMemory: resource.MustParse(
|
|
strconv.FormatInt(schedutil.DefaultMemoryRequest, 10)),
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
small2 := small
|
|
small2.NodeName = "node2"
|
|
// A larger pod.
|
|
large := v1.PodSpec{
|
|
Containers: []v1.Container{
|
|
{
|
|
Resources: v1.ResourceRequirements{
|
|
Requests: v1.ResourceList{
|
|
v1.ResourceCPU: resource.MustParse(
|
|
strconv.FormatInt(schedutil.DefaultMilliCPURequest*3, 10) + "m"),
|
|
v1.ResourceMemory: resource.MustParse(
|
|
strconv.FormatInt(schedutil.DefaultMemoryRequest*3, 10)),
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
large1 := large
|
|
large1.NodeName = "node1"
|
|
large2 := large
|
|
large2.NodeName = "node2"
|
|
tests := []struct {
|
|
pod *v1.Pod
|
|
pods []*v1.Pod
|
|
nodes []*v1.Node
|
|
name string
|
|
expectedScore int64
|
|
}{
|
|
// The point of these next two tests is to show you get the same priority for a zero-request pod
|
|
// as for a pod with the defaults requests, both when the zero-request pod is already on the node
|
|
// and when the zero-request pod is the one being scheduled.
|
|
{
|
|
pod: &v1.Pod{Spec: noResources},
|
|
nodes: []*v1.Node{makeNode("node1", 1000, schedutil.DefaultMemoryRequest*10), makeNode("node2", 1000, schedutil.DefaultMemoryRequest*10)},
|
|
name: "test priority of zero-request pod with node with zero-request pod",
|
|
pods: []*v1.Pod{
|
|
{Spec: large1}, {Spec: noResources1},
|
|
{Spec: large2}, {Spec: small2},
|
|
},
|
|
expectedScore: 50,
|
|
},
|
|
{
|
|
pod: &v1.Pod{Spec: small},
|
|
nodes: []*v1.Node{makeNode("node1", 1000, schedutil.DefaultMemoryRequest*10), makeNode("node2", 1000, schedutil.DefaultMemoryRequest*10)},
|
|
name: "test priority of nonzero-request pod with node with zero-request pod",
|
|
pods: []*v1.Pod{
|
|
{Spec: large1}, {Spec: noResources1},
|
|
{Spec: large2}, {Spec: small2},
|
|
},
|
|
expectedScore: 150,
|
|
},
|
|
// The point of this test is to verify that we're not just getting the same score no matter what we schedule.
|
|
{
|
|
pod: &v1.Pod{Spec: large},
|
|
nodes: []*v1.Node{makeNode("node1", 1000, schedutil.DefaultMemoryRequest*10), makeNode("node2", 1000, schedutil.DefaultMemoryRequest*10)},
|
|
name: "test priority of larger pod with node with zero-request pod",
|
|
pods: []*v1.Pod{
|
|
{Spec: large1}, {Spec: noResources1},
|
|
{Spec: large2}, {Spec: small2},
|
|
},
|
|
expectedScore: 130,
|
|
},
|
|
}
|
|
|
|
for _, test := range tests {
|
|
t.Run(test.name, func(t *testing.T) {
|
|
client := clientsetfake.NewClientset()
|
|
informerFactory := informers.NewSharedInformerFactory(client, 0)
|
|
|
|
snapshot := internalcache.NewSnapshot(test.pods, test.nodes)
|
|
fts := feature.Features{}
|
|
pluginRegistrations := []tf.RegisterPluginFunc{
|
|
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
|
tf.RegisterScorePlugin(noderesources.Name, frameworkruntime.FactoryAdapter(fts, noderesources.NewFit), 1),
|
|
tf.RegisterScorePlugin(noderesources.BalancedAllocationName, frameworkruntime.FactoryAdapter(fts, noderesources.NewBalancedAllocation), 1),
|
|
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
|
}
|
|
_, ctx := ktesting.NewTestContext(t)
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
fwk, err := tf.NewFramework(
|
|
ctx,
|
|
pluginRegistrations, "",
|
|
frameworkruntime.WithInformerFactory(informerFactory),
|
|
frameworkruntime.WithSnapshotSharedLister(snapshot),
|
|
frameworkruntime.WithClientSet(client),
|
|
frameworkruntime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)),
|
|
)
|
|
if err != nil {
|
|
t.Fatalf("error creating framework: %+v", err)
|
|
}
|
|
|
|
sched := &Scheduler{
|
|
nodeInfoSnapshot: snapshot,
|
|
percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
|
|
}
|
|
sched.applyDefaultHandlers()
|
|
|
|
state := framework.NewCycleState()
|
|
_, _, err = sched.findNodesThatFitPod(ctx, fwk, state, test.pod)
|
|
if err != nil {
|
|
t.Fatalf("error filtering nodes: %+v", err)
|
|
}
|
|
nodeInfos, err := snapshot.NodeInfos().List()
|
|
if err != nil {
|
|
t.Fatalf("failed to list node from snapshot: %v", err)
|
|
}
|
|
fwk.RunPreScorePlugins(ctx, state, test.pod, nodeInfos)
|
|
list, err := prioritizeNodes(ctx, nil, fwk, state, test.pod, nodeInfos)
|
|
if err != nil {
|
|
t.Errorf("unexpected error: %v", err)
|
|
}
|
|
for _, hp := range list {
|
|
if hp.TotalScore != test.expectedScore {
|
|
t.Errorf("expected %d for all priorities, got list %#v", test.expectedScore, list)
|
|
}
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func Test_prioritizeNodes(t *testing.T) {
|
|
imageStatus1 := []v1.ContainerImage{
|
|
{
|
|
Names: []string{
|
|
"gcr.io/40:latest",
|
|
"gcr.io/40:v1",
|
|
},
|
|
SizeBytes: int64(80 * mb),
|
|
},
|
|
{
|
|
Names: []string{
|
|
"gcr.io/300:latest",
|
|
"gcr.io/300:v1",
|
|
},
|
|
SizeBytes: int64(300 * mb),
|
|
},
|
|
}
|
|
|
|
imageStatus2 := []v1.ContainerImage{
|
|
{
|
|
Names: []string{
|
|
"gcr.io/300:latest",
|
|
},
|
|
SizeBytes: int64(300 * mb),
|
|
},
|
|
{
|
|
Names: []string{
|
|
"gcr.io/40:latest",
|
|
"gcr.io/40:v1",
|
|
},
|
|
SizeBytes: int64(80 * mb),
|
|
},
|
|
}
|
|
|
|
imageStatus3 := []v1.ContainerImage{
|
|
{
|
|
Names: []string{
|
|
"gcr.io/600:latest",
|
|
},
|
|
SizeBytes: int64(600 * mb),
|
|
},
|
|
{
|
|
Names: []string{
|
|
"gcr.io/40:latest",
|
|
},
|
|
SizeBytes: int64(80 * mb),
|
|
},
|
|
{
|
|
Names: []string{
|
|
"gcr.io/900:latest",
|
|
},
|
|
SizeBytes: int64(900 * mb),
|
|
},
|
|
}
|
|
tests := []struct {
|
|
name string
|
|
pod *v1.Pod
|
|
pods []*v1.Pod
|
|
nodes []*v1.Node
|
|
pluginRegistrations []tf.RegisterPluginFunc
|
|
extenders []tf.FakeExtender
|
|
want []fwk.NodePluginScores
|
|
}{
|
|
{
|
|
name: "the score from all plugins should be recorded in PluginToNodeScores",
|
|
pod: &v1.Pod{},
|
|
nodes: []*v1.Node{makeNode("node1", 1000, schedutil.DefaultMemoryRequest*10), makeNode("node2", 1000, schedutil.DefaultMemoryRequest*10)},
|
|
pluginRegistrations: []tf.RegisterPluginFunc{
|
|
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
|
tf.RegisterScorePlugin(noderesources.BalancedAllocationName, frameworkruntime.FactoryAdapter(feature.Features{}, noderesources.NewBalancedAllocation), 1),
|
|
tf.RegisterScorePlugin("Node2Prioritizer", tf.NewNode2PrioritizerPlugin(), 1),
|
|
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
|
},
|
|
extenders: nil,
|
|
want: []fwk.NodePluginScores{
|
|
{
|
|
Name: "node1",
|
|
Scores: []fwk.PluginScore{
|
|
{
|
|
Name: "Node2Prioritizer",
|
|
Score: 10,
|
|
},
|
|
{
|
|
Name: "NodeResourcesBalancedAllocation",
|
|
Score: 0,
|
|
},
|
|
},
|
|
TotalScore: 10,
|
|
},
|
|
{
|
|
Name: "node2",
|
|
Scores: []fwk.PluginScore{
|
|
{
|
|
Name: "Node2Prioritizer",
|
|
Score: 100,
|
|
},
|
|
{
|
|
Name: "NodeResourcesBalancedAllocation",
|
|
Score: 0,
|
|
},
|
|
},
|
|
TotalScore: 100,
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "the score from extender should also be recorded in PluginToNodeScores with plugin scores",
|
|
pod: &v1.Pod{},
|
|
nodes: []*v1.Node{makeNode("node1", 1000, schedutil.DefaultMemoryRequest*10), makeNode("node2", 1000, schedutil.DefaultMemoryRequest*10)},
|
|
pluginRegistrations: []tf.RegisterPluginFunc{
|
|
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
|
tf.RegisterScorePlugin(noderesources.BalancedAllocationName, frameworkruntime.FactoryAdapter(feature.Features{}, noderesources.NewBalancedAllocation), 1),
|
|
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
|
},
|
|
extenders: []tf.FakeExtender{
|
|
{
|
|
ExtenderName: "FakeExtender1",
|
|
Weight: 1,
|
|
Prioritizers: []tf.PriorityConfig{
|
|
{
|
|
Weight: 3,
|
|
Function: tf.Node1PrioritizerExtender,
|
|
},
|
|
},
|
|
},
|
|
{
|
|
ExtenderName: "FakeExtender2",
|
|
Weight: 1,
|
|
Prioritizers: []tf.PriorityConfig{
|
|
{
|
|
Weight: 2,
|
|
Function: tf.Node2PrioritizerExtender,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
want: []fwk.NodePluginScores{
|
|
{
|
|
Name: "node1",
|
|
Scores: []fwk.PluginScore{
|
|
|
|
{
|
|
Name: "FakeExtender1",
|
|
Score: 300,
|
|
},
|
|
{
|
|
Name: "FakeExtender2",
|
|
Score: 20,
|
|
},
|
|
{
|
|
Name: "NodeResourcesBalancedAllocation",
|
|
Score: 0,
|
|
},
|
|
},
|
|
TotalScore: 320,
|
|
},
|
|
{
|
|
Name: "node2",
|
|
Scores: []fwk.PluginScore{
|
|
{
|
|
Name: "FakeExtender1",
|
|
Score: 30,
|
|
},
|
|
{
|
|
Name: "FakeExtender2",
|
|
Score: 200,
|
|
},
|
|
{
|
|
Name: "NodeResourcesBalancedAllocation",
|
|
Score: 0,
|
|
},
|
|
},
|
|
TotalScore: 230,
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "plugin which returned skip in preScore shouldn't be executed in the score phase",
|
|
pod: &v1.Pod{},
|
|
nodes: []*v1.Node{makeNode("node1", 1000, schedutil.DefaultMemoryRequest*10), makeNode("node2", 1000, schedutil.DefaultMemoryRequest*10)},
|
|
pluginRegistrations: []tf.RegisterPluginFunc{
|
|
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
|
tf.RegisterScorePlugin(noderesources.BalancedAllocationName, frameworkruntime.FactoryAdapter(feature.Features{}, noderesources.NewBalancedAllocation), 1),
|
|
tf.RegisterScorePlugin("Node2Prioritizer", tf.NewNode2PrioritizerPlugin(), 1),
|
|
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
|
tf.RegisterPluginAsExtensions("FakePreScoreAndScorePlugin", tf.NewFakePreScoreAndScorePlugin("FakePreScoreAndScorePlugin", 0,
|
|
fwk.NewStatus(fwk.Skip, "fake skip"),
|
|
fwk.NewStatus(fwk.Error, "this score function shouldn't be executed because this plugin returned Skip in the PreScore"),
|
|
), "PreScore", "Score"),
|
|
},
|
|
extenders: nil,
|
|
want: []fwk.NodePluginScores{
|
|
{
|
|
Name: "node1",
|
|
Scores: []fwk.PluginScore{
|
|
{
|
|
Name: "Node2Prioritizer",
|
|
Score: 10,
|
|
},
|
|
{
|
|
Name: "NodeResourcesBalancedAllocation",
|
|
Score: 0,
|
|
},
|
|
},
|
|
TotalScore: 10,
|
|
},
|
|
{
|
|
Name: "node2",
|
|
Scores: []fwk.PluginScore{
|
|
{
|
|
Name: "Node2Prioritizer",
|
|
Score: 100,
|
|
},
|
|
{
|
|
Name: "NodeResourcesBalancedAllocation",
|
|
Score: 0,
|
|
},
|
|
},
|
|
TotalScore: 100,
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "all score plugins are skipped",
|
|
pod: &v1.Pod{},
|
|
nodes: []*v1.Node{makeNode("node1", 1000, schedutil.DefaultMemoryRequest*10), makeNode("node2", 1000, schedutil.DefaultMemoryRequest*10)},
|
|
pluginRegistrations: []tf.RegisterPluginFunc{
|
|
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
|
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
|
tf.RegisterPluginAsExtensions("FakePreScoreAndScorePlugin", tf.NewFakePreScoreAndScorePlugin("FakePreScoreAndScorePlugin", 0,
|
|
fwk.NewStatus(fwk.Skip, "fake skip"),
|
|
fwk.NewStatus(fwk.Error, "this score function shouldn't be executed because this plugin returned Skip in the PreScore"),
|
|
), "PreScore", "Score"),
|
|
},
|
|
extenders: nil,
|
|
want: []fwk.NodePluginScores{
|
|
{Name: "node1", Scores: []fwk.PluginScore{}},
|
|
{Name: "node2", Scores: []fwk.PluginScore{}},
|
|
},
|
|
},
|
|
{
|
|
name: "the score from Image Locality plugin with image in all nodes",
|
|
pod: &v1.Pod{
|
|
Spec: v1.PodSpec{
|
|
Containers: []v1.Container{
|
|
{
|
|
Image: "gcr.io/40",
|
|
},
|
|
},
|
|
},
|
|
},
|
|
nodes: []*v1.Node{
|
|
makeNode("node1", 1000, schedutil.DefaultMemoryRequest*10, imageStatus1...),
|
|
makeNode("node2", 1000, schedutil.DefaultMemoryRequest*10, imageStatus2...),
|
|
makeNode("node3", 1000, schedutil.DefaultMemoryRequest*10, imageStatus3...),
|
|
},
|
|
pluginRegistrations: []tf.RegisterPluginFunc{
|
|
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
|
tf.RegisterScorePlugin(imagelocality.Name, imagelocality.New, 1),
|
|
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
|
},
|
|
extenders: nil,
|
|
want: []fwk.NodePluginScores{
|
|
{
|
|
Name: "node1",
|
|
Scores: []fwk.PluginScore{
|
|
{
|
|
Name: "ImageLocality",
|
|
Score: 5,
|
|
},
|
|
},
|
|
TotalScore: 5,
|
|
},
|
|
{
|
|
Name: "node2",
|
|
Scores: []fwk.PluginScore{
|
|
{
|
|
Name: "ImageLocality",
|
|
Score: 5,
|
|
},
|
|
},
|
|
TotalScore: 5,
|
|
},
|
|
{
|
|
Name: "node3",
|
|
Scores: []fwk.PluginScore{
|
|
{
|
|
Name: "ImageLocality",
|
|
Score: 5,
|
|
},
|
|
},
|
|
TotalScore: 5,
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "the score from Image Locality plugin with image in partial nodes",
|
|
pod: &v1.Pod{
|
|
Spec: v1.PodSpec{
|
|
Containers: []v1.Container{
|
|
{
|
|
Image: "gcr.io/300",
|
|
},
|
|
},
|
|
},
|
|
},
|
|
nodes: []*v1.Node{makeNode("node1", 1000, schedutil.DefaultMemoryRequest*10, imageStatus1...),
|
|
makeNode("node2", 1000, schedutil.DefaultMemoryRequest*10, imageStatus2...),
|
|
makeNode("node3", 1000, schedutil.DefaultMemoryRequest*10, imageStatus3...),
|
|
},
|
|
pluginRegistrations: []tf.RegisterPluginFunc{
|
|
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
|
tf.RegisterScorePlugin(imagelocality.Name, imagelocality.New, 1),
|
|
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
|
},
|
|
extenders: nil,
|
|
want: []fwk.NodePluginScores{
|
|
{
|
|
Name: "node1",
|
|
Scores: []fwk.PluginScore{
|
|
{
|
|
Name: "ImageLocality",
|
|
Score: 18,
|
|
},
|
|
},
|
|
TotalScore: 18,
|
|
},
|
|
{
|
|
Name: "node2",
|
|
Scores: []fwk.PluginScore{
|
|
{
|
|
Name: "ImageLocality",
|
|
Score: 18,
|
|
},
|
|
},
|
|
TotalScore: 18,
|
|
},
|
|
{
|
|
Name: "node3",
|
|
Scores: []fwk.PluginScore{
|
|
{
|
|
Name: "ImageLocality",
|
|
Score: 0,
|
|
},
|
|
},
|
|
TotalScore: 0,
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
for _, test := range tests {
|
|
t.Run(test.name, func(t *testing.T) {
|
|
client := clientsetfake.NewClientset()
|
|
informerFactory := informers.NewSharedInformerFactory(client, 0)
|
|
|
|
_, ctx := ktesting.NewTestContext(t)
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
cache := internalcache.New(ctx, time.Duration(0), nil)
|
|
for _, node := range test.nodes {
|
|
cache.AddNode(klog.FromContext(ctx), node)
|
|
}
|
|
snapshot := internalcache.NewEmptySnapshot()
|
|
if err := cache.UpdateSnapshot(klog.FromContext(ctx), snapshot); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
schedFramework, err := tf.NewFramework(
|
|
ctx,
|
|
test.pluginRegistrations, "",
|
|
frameworkruntime.WithInformerFactory(informerFactory),
|
|
frameworkruntime.WithSnapshotSharedLister(snapshot),
|
|
frameworkruntime.WithClientSet(client),
|
|
frameworkruntime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)),
|
|
)
|
|
if err != nil {
|
|
t.Fatalf("error creating framework: %+v", err)
|
|
}
|
|
|
|
state := framework.NewCycleState()
|
|
var extenders []fwk.Extender
|
|
for ii := range test.extenders {
|
|
extenders = append(extenders, &test.extenders[ii])
|
|
}
|
|
nodeInfos, err := snapshot.NodeInfos().List()
|
|
if err != nil {
|
|
t.Fatalf("failed to list node from snapshot: %v", err)
|
|
}
|
|
nodesscores, err := prioritizeNodes(ctx, extenders, schedFramework, state, test.pod, nodeInfos)
|
|
if err != nil {
|
|
t.Errorf("unexpected error: %v", err)
|
|
}
|
|
for i := range nodesscores {
|
|
sort.Slice(nodesscores[i].Scores, func(j, k int) bool {
|
|
return nodesscores[i].Scores[j].Name < nodesscores[i].Scores[k].Name
|
|
})
|
|
}
|
|
|
|
if diff := cmp.Diff(test.want, nodesscores); diff != "" {
|
|
t.Errorf("returned nodes scores (-want,+got):\n%s", diff)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
var lowPriority, midPriority, highPriority = int32(0), int32(100), int32(1000)
|
|
|
|
func TestNumFeasibleNodesToFind(t *testing.T) {
|
|
tests := []struct {
|
|
name string
|
|
globalPercentage int32
|
|
profilePercentage *int32
|
|
numAllNodes int32
|
|
wantNumNodes int32
|
|
}{
|
|
{
|
|
name: "not set percentageOfNodesToScore and nodes number not more than 50",
|
|
numAllNodes: 10,
|
|
wantNumNodes: 10,
|
|
},
|
|
{
|
|
name: "set profile percentageOfNodesToScore and nodes number not more than 50",
|
|
profilePercentage: ptr.To[int32](40),
|
|
numAllNodes: 10,
|
|
wantNumNodes: 10,
|
|
},
|
|
{
|
|
name: "not set percentageOfNodesToScore and nodes number more than 50",
|
|
numAllNodes: 1000,
|
|
wantNumNodes: 420,
|
|
},
|
|
{
|
|
name: "set profile percentageOfNodesToScore and nodes number more than 50",
|
|
profilePercentage: ptr.To[int32](40),
|
|
numAllNodes: 1000,
|
|
wantNumNodes: 400,
|
|
},
|
|
{
|
|
name: "set global and profile percentageOfNodesToScore and nodes number more than 50",
|
|
globalPercentage: 100,
|
|
profilePercentage: ptr.To[int32](40),
|
|
numAllNodes: 1000,
|
|
wantNumNodes: 400,
|
|
},
|
|
{
|
|
name: "set global percentageOfNodesToScore and nodes number more than 50",
|
|
globalPercentage: 40,
|
|
numAllNodes: 1000,
|
|
wantNumNodes: 400,
|
|
},
|
|
{
|
|
name: "not set profile percentageOfNodesToScore and nodes number more than 50*125",
|
|
numAllNodes: 6000,
|
|
wantNumNodes: 300,
|
|
},
|
|
{
|
|
name: "set profile percentageOfNodesToScore and nodes number more than 50*125",
|
|
profilePercentage: ptr.To[int32](40),
|
|
numAllNodes: 6000,
|
|
wantNumNodes: 2400,
|
|
},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
sched := &Scheduler{
|
|
percentageOfNodesToScore: tt.globalPercentage,
|
|
}
|
|
if gotNumNodes := sched.numFeasibleNodesToFind(tt.profilePercentage, tt.numAllNodes); gotNumNodes != tt.wantNumNodes {
|
|
t.Errorf("Scheduler.numFeasibleNodesToFind() = %v, want %v", gotNumNodes, tt.wantNumNodes)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestFairEvaluationForNodes(t *testing.T) {
|
|
numAllNodes := 500
|
|
nodeNames := make([]string, 0, numAllNodes)
|
|
for i := 0; i < numAllNodes; i++ {
|
|
nodeNames = append(nodeNames, strconv.Itoa(i))
|
|
}
|
|
nodes := makeNodeList(nodeNames)
|
|
_, ctx := ktesting.NewTestContext(t)
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
sched := makeScheduler(ctx, nodes)
|
|
|
|
fwk, err := tf.NewFramework(
|
|
ctx,
|
|
[]tf.RegisterPluginFunc{
|
|
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
|
tf.RegisterFilterPlugin("TrueFilter", tf.NewTrueFilterPlugin),
|
|
tf.RegisterScorePlugin("EqualPrioritizerPlugin", tf.NewEqualPrioritizerPlugin(), 1),
|
|
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
|
},
|
|
"",
|
|
frameworkruntime.WithPodNominator(internalqueue.NewTestQueue(ctx, nil)),
|
|
frameworkruntime.WithSnapshotSharedLister(sched.nodeInfoSnapshot),
|
|
)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
// To make numAllNodes % nodesToFind != 0
|
|
sched.percentageOfNodesToScore = 30
|
|
nodesToFind := int(sched.numFeasibleNodesToFind(fwk.PercentageOfNodesToScore(), int32(numAllNodes)))
|
|
|
|
// Iterating over all nodes more than twice
|
|
for i := 0; i < 2*(numAllNodes/nodesToFind+1); i++ {
|
|
nodesThatFit, _, err := sched.findNodesThatFitPod(ctx, fwk, framework.NewCycleState(), &v1.Pod{})
|
|
if err != nil {
|
|
t.Errorf("unexpected error: %v", err)
|
|
}
|
|
if len(nodesThatFit) != nodesToFind {
|
|
t.Errorf("got %d nodes filtered, want %d", len(nodesThatFit), nodesToFind)
|
|
}
|
|
if sched.nextStartNodeIndex != (i+1)*nodesToFind%numAllNodes {
|
|
t.Errorf("got %d lastProcessedNodeIndex, want %d", sched.nextStartNodeIndex, (i+1)*nodesToFind%numAllNodes)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestPreferNominatedNodeFilterCallCounts(t *testing.T) {
|
|
tests := []struct {
|
|
name string
|
|
pod *v1.Pod
|
|
nodeReturnCodeMap map[string]fwk.Code
|
|
expectedCount int32
|
|
}{
|
|
{
|
|
name: "pod has the nominated node set, filter is called only once",
|
|
pod: st.MakePod().Name("p_with_nominated_node").UID("p").Priority(highPriority).NominatedNodeName("node1").Obj(),
|
|
expectedCount: 1,
|
|
},
|
|
{
|
|
name: "pod without the nominated pod, filter is called for each node",
|
|
pod: st.MakePod().Name("p_without_nominated_node").UID("p").Priority(highPriority).Obj(),
|
|
expectedCount: 3,
|
|
},
|
|
{
|
|
name: "nominated pod cannot pass the filter, filter is called for each node",
|
|
pod: st.MakePod().Name("p_with_nominated_node").UID("p").Priority(highPriority).NominatedNodeName("node1").Obj(),
|
|
nodeReturnCodeMap: map[string]fwk.Code{"node1": fwk.Unschedulable},
|
|
expectedCount: 4,
|
|
},
|
|
}
|
|
|
|
for _, test := range tests {
|
|
t.Run(test.name, func(t *testing.T) {
|
|
logger, ctx := ktesting.NewTestContext(t)
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
|
|
// create three nodes in the cluster.
|
|
nodes := makeNodeList([]string{"node1", "node2", "node3"})
|
|
client := clientsetfake.NewClientset(test.pod)
|
|
informerFactory := informers.NewSharedInformerFactory(client, 0)
|
|
cache := internalcache.New(ctx, time.Duration(0), nil)
|
|
for _, n := range nodes {
|
|
cache.AddNode(logger, n)
|
|
}
|
|
plugin := tf.FakeFilterPlugin{FailedNodeReturnCodeMap: test.nodeReturnCodeMap}
|
|
registerFakeFilterFunc := tf.RegisterFilterPlugin(
|
|
"FakeFilter",
|
|
func(_ context.Context, _ runtime.Object, fh fwk.Handle) (fwk.Plugin, error) {
|
|
return &plugin, nil
|
|
},
|
|
)
|
|
registerPlugins := []tf.RegisterPluginFunc{
|
|
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
|
registerFakeFilterFunc,
|
|
tf.RegisterScorePlugin("EqualPrioritizerPlugin", tf.NewEqualPrioritizerPlugin(), 1),
|
|
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
|
}
|
|
snapshot := internalcache.NewSnapshot(nil, nodes)
|
|
fwk, err := tf.NewFramework(
|
|
ctx,
|
|
registerPlugins, "",
|
|
frameworkruntime.WithClientSet(client),
|
|
frameworkruntime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)),
|
|
frameworkruntime.WithSnapshotSharedLister(snapshot),
|
|
)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
sched := &Scheduler{
|
|
Cache: cache,
|
|
nodeInfoSnapshot: snapshot,
|
|
percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
|
|
}
|
|
sched.applyDefaultHandlers()
|
|
|
|
_, _, err = sched.findNodesThatFitPod(ctx, fwk, framework.NewCycleState(), test.pod)
|
|
if err != nil {
|
|
t.Errorf("unexpected error: %v", err)
|
|
}
|
|
if test.expectedCount != plugin.NumFilterCalled {
|
|
t.Errorf("predicate was called %d times, expected is %d", plugin.NumFilterCalled, test.expectedCount)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func podWithID(id, desiredHost string) *v1.Pod {
|
|
return st.MakePod().Name(id).UID(id).Node(desiredHost).SchedulerName(testSchedulerName).Obj()
|
|
}
|
|
|
|
func deletingPod(id string) *v1.Pod {
|
|
return st.MakePod().Name(id).UID(id).Terminating().Node("").SchedulerName(testSchedulerName).Obj()
|
|
}
|
|
|
|
func podWithPort(id, desiredHost string, port int) *v1.Pod {
|
|
pod := podWithID(id, desiredHost)
|
|
pod.Spec.Containers = []v1.Container{
|
|
{Name: "ctr", Ports: []v1.ContainerPort{{HostPort: int32(port)}}},
|
|
}
|
|
return pod
|
|
}
|
|
|
|
func podWithResources(id, desiredHost string, limits v1.ResourceList, requests v1.ResourceList) *v1.Pod {
|
|
pod := podWithID(id, desiredHost)
|
|
pod.Spec.Containers = []v1.Container{
|
|
{Name: "ctr", Resources: v1.ResourceRequirements{Limits: limits, Requests: requests}},
|
|
}
|
|
return pod
|
|
}
|
|
|
|
func makeNodeList(nodeNames []string) []*v1.Node {
|
|
result := make([]*v1.Node, 0, len(nodeNames))
|
|
for _, nodeName := range nodeNames {
|
|
result = append(result, &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}})
|
|
}
|
|
return result
|
|
}
|
|
|
|
// makeScheduler makes a simple Scheduler for testing.
|
|
func makeScheduler(ctx context.Context, nodes []*v1.Node) *Scheduler {
|
|
logger := klog.FromContext(ctx)
|
|
cache := internalcache.New(ctx, time.Duration(0), nil)
|
|
for _, n := range nodes {
|
|
cache.AddNode(logger, n)
|
|
}
|
|
|
|
sched := &Scheduler{
|
|
Cache: cache,
|
|
nodeInfoSnapshot: emptySnapshot,
|
|
percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
|
|
}
|
|
sched.applyDefaultHandlers()
|
|
cache.UpdateSnapshot(logger, sched.nodeInfoSnapshot)
|
|
return sched
|
|
}
|
|
|
|
func makeNode(node string, milliCPU, memory int64, images ...v1.ContainerImage) *v1.Node {
|
|
return &v1.Node{
|
|
ObjectMeta: metav1.ObjectMeta{Name: node},
|
|
Status: v1.NodeStatus{
|
|
Capacity: v1.ResourceList{
|
|
v1.ResourceCPU: *resource.NewMilliQuantity(milliCPU, resource.DecimalSI),
|
|
v1.ResourceMemory: *resource.NewQuantity(memory, resource.BinarySI),
|
|
"pods": *resource.NewQuantity(100, resource.DecimalSI),
|
|
},
|
|
Allocatable: v1.ResourceList{
|
|
|
|
v1.ResourceCPU: *resource.NewMilliQuantity(milliCPU, resource.DecimalSI),
|
|
v1.ResourceMemory: *resource.NewQuantity(memory, resource.BinarySI),
|
|
"pods": *resource.NewQuantity(100, resource.DecimalSI),
|
|
},
|
|
Images: images,
|
|
},
|
|
}
|
|
}
|
|
|
|
func interruptOnBind(client *clientsetfake.Clientset) chan *v1.Binding {
|
|
bindingChan := make(chan *v1.Binding, 1)
|
|
client.PrependReactor("create", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) {
|
|
var b *v1.Binding
|
|
if action.GetSubresource() == "binding" {
|
|
b := action.(clienttesting.CreateAction).GetObject().(*v1.Binding)
|
|
bindingChan <- b
|
|
}
|
|
return true, b, nil
|
|
})
|
|
return bindingChan
|
|
}
|
|
|
|
// queuedPodStore: pods queued before processing.
|
|
// cache: scheduler cache that might contain assumed pods.
|
|
func setupTestSchedulerWithOnePodOnNode(ctx context.Context, t *testing.T, client clientset.Interface, queuedPodStore *clientcache.FIFO, scache internalcache.Cache, apiDispatcher *apidispatcher.APIDispatcher,
|
|
pod *v1.Pod, node *v1.Node, bindingChan chan *v1.Binding, fns ...tf.RegisterPluginFunc) (*Scheduler, chan error) {
|
|
scheduler, errChan := setupTestScheduler(ctx, t, client, queuedPodStore, scache, apiDispatcher, nil, nil, fns...)
|
|
|
|
if err := queuedPodStore.Add(pod); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
// queuedPodStore: [foo:8080]
|
|
// cache: []
|
|
|
|
scheduler.ScheduleOne(ctx)
|
|
// queuedPodStore: []
|
|
// cache: [(assumed)foo:8080]
|
|
|
|
select {
|
|
case b := <-bindingChan:
|
|
expectBinding := &v1.Binding{
|
|
ObjectMeta: metav1.ObjectMeta{Name: pod.Name, UID: types.UID(pod.Name)},
|
|
Target: v1.ObjectReference{Kind: "Node", Name: node.Name},
|
|
}
|
|
if diff := cmp.Diff(expectBinding, b); diff != "" {
|
|
t.Errorf("Unexpected binding (-want,+got):\n%s", diff)
|
|
}
|
|
case <-time.After(wait.ForeverTestTimeout):
|
|
t.Fatalf("timeout after %v", wait.ForeverTestTimeout)
|
|
}
|
|
return scheduler, errChan
|
|
}
|
|
|
|
// queuedPodStore: pods queued before processing.
|
|
// scache: scheduler cache that might contain assumed pods.
|
|
func setupTestScheduler(ctx context.Context, t *testing.T, client clientset.Interface, queuedPodStore *clientcache.FIFO, cache internalcache.Cache, apiDispatcher *apidispatcher.APIDispatcher,
|
|
informerFactory informers.SharedInformerFactory, broadcaster events.EventBroadcaster, fns ...tf.RegisterPluginFunc) (*Scheduler, chan error) {
|
|
|
|
var recorder events.EventRecorder
|
|
if broadcaster != nil {
|
|
recorder = broadcaster.NewRecorder(scheme.Scheme, testSchedulerName)
|
|
} else {
|
|
recorder = &events.FakeRecorder{}
|
|
}
|
|
|
|
if informerFactory == nil {
|
|
informerFactory = informers.NewSharedInformerFactory(clientsetfake.NewClientset(), 0)
|
|
}
|
|
schedulingQueue := internalqueue.NewTestQueueWithInformerFactory(ctx, nil, informerFactory, internalqueue.WithAPIDispatcher(apiDispatcher))
|
|
waitingPods := frameworkruntime.NewWaitingPodsMap()
|
|
snapshot := internalcache.NewEmptySnapshot()
|
|
|
|
schedFramework, _ := tf.NewFramework(
|
|
ctx,
|
|
fns,
|
|
testSchedulerName,
|
|
frameworkruntime.WithClientSet(client),
|
|
frameworkruntime.WithAPIDispatcher(apiDispatcher),
|
|
frameworkruntime.WithEventRecorder(recorder),
|
|
frameworkruntime.WithInformerFactory(informerFactory),
|
|
frameworkruntime.WithPodNominator(schedulingQueue),
|
|
frameworkruntime.WithWaitingPods(waitingPods),
|
|
frameworkruntime.WithSnapshotSharedLister(snapshot),
|
|
)
|
|
if apiDispatcher != nil {
|
|
schedFramework.SetAPICacher(apicache.New(schedulingQueue, cache))
|
|
}
|
|
|
|
errChan := make(chan error, 1)
|
|
sched := &Scheduler{
|
|
Cache: cache,
|
|
client: client,
|
|
nodeInfoSnapshot: snapshot,
|
|
percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
|
|
NextPod: func(logger klog.Logger) (*framework.QueuedPodInfo, error) {
|
|
return &framework.QueuedPodInfo{PodInfo: mustNewPodInfo(t, clientcache.Pop(queuedPodStore).(*v1.Pod))}, nil
|
|
},
|
|
SchedulingQueue: schedulingQueue,
|
|
APIDispatcher: apiDispatcher,
|
|
Profiles: profile.Map{testSchedulerName: schedFramework},
|
|
}
|
|
|
|
sched.SchedulePod = sched.schedulePod
|
|
sched.FailureHandler = func(_ context.Context, _ framework.Framework, p *framework.QueuedPodInfo, status *fwk.Status, _ *fwk.NominatingInfo, _ time.Time) {
|
|
err := status.AsError()
|
|
errChan <- err
|
|
|
|
msg := truncateMessage(err.Error())
|
|
schedFramework.EventRecorder().Eventf(p.Pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", msg)
|
|
}
|
|
return sched, errChan
|
|
}
|
|
|
|
func setupTestSchedulerWithVolumeBinding(ctx context.Context, t *testing.T, client *clientsetfake.Clientset, volumeBinder volumebinding.SchedulerVolumeBinder, broadcaster events.EventBroadcaster, asyncAPICallsEnabled bool) (*Scheduler, chan *v1.Binding, chan error) {
|
|
if client == nil {
|
|
client = clientsetfake.NewClientset()
|
|
}
|
|
logger := klog.FromContext(ctx)
|
|
testNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node1", UID: types.UID("node1")}}
|
|
queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc)
|
|
pod := podWithID("foo", "")
|
|
pod.Namespace = "foo-ns"
|
|
pod.Spec.Volumes = append(pod.Spec.Volumes, v1.Volume{Name: "testVol",
|
|
VolumeSource: v1.VolumeSource{PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ClaimName: "testPVC"}}})
|
|
if err := queuedPodStore.Add(pod); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
testPVC := v1.PersistentVolumeClaim{ObjectMeta: metav1.ObjectMeta{Name: "testPVC", Namespace: pod.Namespace, UID: types.UID("testPVC")}}
|
|
|
|
var apiDispatcher *apidispatcher.APIDispatcher
|
|
if asyncAPICallsEnabled {
|
|
apiDispatcher = apidispatcher.New(client, 16, apicalls.Relevances)
|
|
apiDispatcher.Run(logger)
|
|
t.Cleanup(apiDispatcher.Close)
|
|
}
|
|
|
|
scache := internalcache.New(ctx, 10*time.Minute, apiDispatcher)
|
|
scache.AddNode(logger, &testNode)
|
|
informerFactory := informers.NewSharedInformerFactory(client, 0)
|
|
pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims()
|
|
pvcInformer.Informer().GetStore().Add(&testPVC)
|
|
if _, err := client.CoreV1().Pods(pod.Namespace).Create(ctx, pod, metav1.CreateOptions{}); err != nil {
|
|
t.Fatalf("failed to create pod: %v", err)
|
|
}
|
|
if _, err := client.CoreV1().PersistentVolumeClaims(pod.Namespace).Create(ctx, &testPVC, metav1.CreateOptions{}); err != nil {
|
|
t.Fatalf("failed to create PVC: %v", err)
|
|
}
|
|
if _, err := client.CoreV1().Nodes().Create(ctx, &testNode, metav1.CreateOptions{}); err != nil {
|
|
t.Fatalf("failed to create node: %v", err)
|
|
}
|
|
bindingChan := interruptOnBind(client)
|
|
fns := []tf.RegisterPluginFunc{
|
|
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
|
|
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
|
|
tf.RegisterPluginAsExtensions(volumebinding.Name, func(ctx context.Context, plArgs runtime.Object, handle fwk.Handle) (fwk.Plugin, error) {
|
|
return &volumebinding.VolumeBinding{Binder: volumeBinder, PVCLister: pvcInformer.Lister()}, nil
|
|
}, "PreFilter", "Filter", "Reserve", "PreBind"),
|
|
}
|
|
s, errChan := setupTestScheduler(ctx, t, client, queuedPodStore, scache, apiDispatcher, informerFactory, broadcaster, fns...)
|
|
return s, bindingChan, errChan
|
|
}
|
|
|
|
// This is a workaround because golint complains that errors cannot
|
|
// end with punctuation. However, the real predicate error message does
|
|
// end with a period.
|
|
func makePredicateError(failReason string) error {
|
|
s := fmt.Sprintf("0/1 nodes are available: %v.", failReason)
|
|
return errors.New(s)
|
|
}
|
|
|
|
func mustNewPodInfo(t *testing.T, pod *v1.Pod) *framework.PodInfo {
|
|
podInfo, err := framework.NewPodInfo(pod)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
return podInfo
|
|
}
|