目录

kubernetes组件开发-自定义调度器(二)

基于调度框架扩展点的特性,我们开发属于自己的调度插件.

源码

如果我们要实现自己的插件,必须向调度框架注册插件并完成配置,另外还必须实现扩展点接口。

对应的扩展点接口我们可以在源码我们可以在 kubernetes/pkg/scheduler/framework/interface.go 里面找到。

Plugin 接口是所有扩展点接口的父级。

1
2
3
4
5

// Plugin is the parent type for all the scheduling framework plugins.
type Plugin interface {
	Name() string
}

其他扩展点的接口:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206


// PreEnqueuePlugin is an interface that must be implemented by "PreEnqueue" plugins.
// These plugins are called prior to adding Pods to activeQ.
// Note: an preEnqueue plugin is expected to be lightweight and efficient, so it's not expected to
// involve expensive calls like accessing external endpoints; otherwise it'd block other
// Pods' enqueuing in event handlers.
type PreEnqueuePlugin interface {
	Plugin
	// PreEnqueue is called prior to adding Pods to activeQ.
	PreEnqueue(ctx context.Context, p *v1.Pod) *Status
}

// LessFunc is the function to sort pod info
type LessFunc func(podInfo1, podInfo2 *QueuedPodInfo) bool

// QueueSortPlugin is an interface that must be implemented by "QueueSort" plugins.
// These plugins are used to sort pods in the scheduling queue. Only one queue sort
// plugin may be enabled at a time.
type QueueSortPlugin interface {
	Plugin
	// Less are used to sort pods in the scheduling queue.
	Less(*QueuedPodInfo, *QueuedPodInfo) bool
}

// EnqueueExtensions is an optional interface that plugins can implement to efficiently
// move unschedulable Pods in internal scheduling queues. Plugins
// that fail pod scheduling (e.g., Filter plugins) are expected to implement this interface.
type EnqueueExtensions interface {
	// EventsToRegister returns a series of possible events that may cause a Pod
	// failed by this plugin schedulable.
	// The events will be registered when instantiating the internal scheduling queue,
	// and leveraged to build event handlers dynamically.
	// Note: the returned list needs to be static (not depend on configuration parameters);
	// otherwise it would lead to undefined behavior.
	EventsToRegister() []ClusterEvent
}

// PreFilterExtensions is an interface that is included in plugins that allow specifying
// callbacks to make incremental updates to its supposedly pre-calculated
// state.
type PreFilterExtensions interface {
	// AddPod is called by the framework while trying to evaluate the impact
	// of adding podToAdd to the node while scheduling podToSchedule.
	AddPod(ctx context.Context, state *CycleState, podToSchedule *v1.Pod, podInfoToAdd *PodInfo, nodeInfo *NodeInfo) *Status
	// RemovePod is called by the framework while trying to evaluate the impact
	// of removing podToRemove from the node while scheduling podToSchedule.
	RemovePod(ctx context.Context, state *CycleState, podToSchedule *v1.Pod, podInfoToRemove *PodInfo, nodeInfo *NodeInfo) *Status
}

// PreFilterPlugin is an interface that must be implemented by "PreFilter" plugins.
// These plugins are called at the beginning of the scheduling cycle.
type PreFilterPlugin interface {
	Plugin
	// PreFilter is called at the beginning of the scheduling cycle. All PreFilter
	// plugins must return success or the pod will be rejected. PreFilter could optionally
	// return a PreFilterResult to influence which nodes to evaluate downstream. This is useful
	// for cases where it is possible to determine the subset of nodes to process in O(1) time.
	PreFilter(ctx context.Context, state *CycleState, p *v1.Pod) (*PreFilterResult, *Status)
	// PreFilterExtensions returns a PreFilterExtensions interface if the plugin implements one,
	// or nil if it does not. A Pre-filter plugin can provide extensions to incrementally
	// modify its pre-processed info. The framework guarantees that the extensions
	// AddPod/RemovePod will only be called after PreFilter, possibly on a cloned
	// CycleState, and may call those functions more than once before calling
	// Filter again on a specific node.
	PreFilterExtensions() PreFilterExtensions
}

// FilterPlugin is an interface for Filter plugins. These plugins are called at the
// filter extension point for filtering out hosts that cannot run a pod.
// This concept used to be called 'predicate' in the original scheduler.
// These plugins should return "Success", "Unschedulable" or "Error" in Status.code.
// However, the scheduler accepts other valid codes as well.
// Anything other than "Success" will lead to exclusion of the given host from
// running the pod.
type FilterPlugin interface {
	Plugin
	// Filter is called by the scheduling framework.
	// All FilterPlugins should return "Success" to declare that
	// the given node fits the pod. If Filter doesn't return "Success",
	// it will return "Unschedulable", "UnschedulableAndUnresolvable" or "Error".
	// For the node being evaluated, Filter plugins should look at the passed
	// nodeInfo reference for this particular node's information (e.g., pods
	// considered to be running on the node) instead of looking it up in the
	// NodeInfoSnapshot because we don't guarantee that they will be the same.
	// For example, during preemption, we may pass a copy of the original
	// nodeInfo object that has some pods removed from it to evaluate the
	// possibility of preempting them to schedule the target pod.
	Filter(ctx context.Context, state *CycleState, pod *v1.Pod, nodeInfo *NodeInfo) *Status
}

// PostFilterPlugin is an interface for "PostFilter" plugins. These plugins are called
// after a pod cannot be scheduled.
type PostFilterPlugin interface {
	Plugin
	// PostFilter is called by the scheduling framework.
	// A PostFilter plugin should return one of the following statuses:
	// - Unschedulable: the plugin gets executed successfully but the pod cannot be made schedulable.
	// - Success: the plugin gets executed successfully and the pod can be made schedulable.
	// - Error: the plugin aborts due to some internal error.
	//
	// Informational plugins should be configured ahead of other ones, and always return Unschedulable status.
	// Optionally, a non-nil PostFilterResult may be returned along with a Success status. For example,
	// a preemption plugin may choose to return nominatedNodeName, so that framework can reuse that to update the
	// preemptor pod's .spec.status.nominatedNodeName field.
	PostFilter(ctx context.Context, state *CycleState, pod *v1.Pod, filteredNodeStatusMap NodeToStatusMap) (*PostFilterResult, *Status)
}

// PreScorePlugin is an interface for "PreScore" plugin. PreScore is an
// informational extension point. Plugins will be called with a list of nodes
// that passed the filtering phase. A plugin may use this data to update internal
// state or to generate logs/metrics.
type PreScorePlugin interface {
	Plugin
	// PreScore is called by the scheduling framework after a list of nodes
	// passed the filtering phase. All prescore plugins must return success or
	// the pod will be rejected
	PreScore(ctx context.Context, state *CycleState, pod *v1.Pod, nodes []*v1.Node) *Status
}

// ScoreExtensions is an interface for Score extended functionality.
type ScoreExtensions interface {
	// NormalizeScore is called for all node scores produced by the same plugin's "Score"
	// method. A successful run of NormalizeScore will update the scores list and return
	// a success status.
	NormalizeScore(ctx context.Context, state *CycleState, p *v1.Pod, scores NodeScoreList) *Status
}

// ScorePlugin is an interface that must be implemented by "Score" plugins to rank
// nodes that passed the filtering phase.
type ScorePlugin interface {
	Plugin
	// Score is called on each filtered node. It must return success and an integer
	// indicating the rank of the node. All scoring plugins must return success or
	// the pod will be rejected.
	Score(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string) (int64, *Status)

	// ScoreExtensions returns a ScoreExtensions interface if it implements one, or nil if does not.
	ScoreExtensions() ScoreExtensions
}

// ReservePlugin is an interface for plugins with Reserve and Unreserve
// methods. These are meant to update the state of the plugin. This concept
// used to be called 'assume' in the original scheduler. These plugins should
// return only Success or Error in Status.code. However, the scheduler accepts
// other valid codes as well. Anything other than Success will lead to
// rejection of the pod.
type ReservePlugin interface {
	Plugin
	// Reserve is called by the scheduling framework when the scheduler cache is
	// updated. If this method returns a failed Status, the scheduler will call
	// the Unreserve method for all enabled ReservePlugins.
	Reserve(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string) *Status
	// Unreserve is called by the scheduling framework when a reserved pod was
	// rejected, an error occurred during reservation of subsequent plugins, or
	// in a later phase. The Unreserve method implementation must be idempotent
	// and may be called by the scheduler even if the corresponding Reserve
	// method for the same plugin was not called.
	Unreserve(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string)
}

// PreBindPlugin is an interface that must be implemented by "PreBind" plugins.
// These plugins are called before a pod being scheduled.
type PreBindPlugin interface {
	Plugin
	// PreBind is called before binding a pod. All prebind plugins must return
	// success or the pod will be rejected and won't be sent for binding.
	PreBind(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string) *Status
}

// PostBindPlugin is an interface that must be implemented by "PostBind" plugins.
// These plugins are called after a pod is successfully bound to a node.
type PostBindPlugin interface {
	Plugin
	// PostBind is called after a pod is successfully bound. These plugins are
	// informational. A common application of this extension point is for cleaning
	// up. If a plugin needs to clean-up its state after a pod is scheduled and
	// bound, PostBind is the extension point that it should register.
	PostBind(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string)
}

// PermitPlugin is an interface that must be implemented by "Permit" plugins.
// These plugins are called before a pod is bound to a node.
type PermitPlugin interface {
	Plugin
	// Permit is called before binding a pod (and before prebind plugins). Permit
	// plugins are used to prevent or delay the binding of a Pod. A permit plugin
	// must return success or wait with timeout duration, or the pod will be rejected.
	// The pod will also be rejected if the wait timeout or the pod is rejected while
	// waiting. Note that if the plugin returns "wait", the framework will wait only
	// after running the remaining plugins given that no other plugin rejects the pod.
	Permit(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string) (*Status, time.Duration)
}

// BindPlugin is an interface that must be implemented by "Bind" plugins. Bind
// plugins are used to bind a pod to a Node.
type BindPlugin interface {
	Plugin
	// Bind plugins will not be called until all pre-bind plugins have completed. Each
	// bind plugin is called in the configured order. A bind plugin may choose whether
	// or not to handle the given Pod. If a bind plugin chooses to handle a Pod, the
	// remaining bind plugins are skipped. When a bind plugin does not handle a pod,
	// it must return Skip in its Status code. If a bind plugin returns an Error, the
	// pod is rejected and will not be bound.
	Bind(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string) *Status
}
注意

扩展的调用顺序如下:

  • 如果某个扩展点没有配置对应的扩展,调度框架将使用默认插件中的扩展。

  • 如果为某个扩展点配置且激活了扩展,则调度框架将先调用默认插件的扩展,再调用配置中的扩展。

  • 默认插件的扩展始终被最先调用,然后按照 KubeSchedulerConfiguration 中扩展的激活 enabled 顺序逐个调用扩展点的扩展。

  • 可以先禁用默认插件的扩展,然后在 enabled 列表中的某个位置激活默认插件的扩展,这种做法可以改变默认插件的扩展被调用时的顺序。

实现

我们想要实现的功能更适合用 score 扩展点来实现。通过给节点打分,优选出 Pods 最适合调度的节点,通常打分范围是[0,100]。如果计算出来的分数不在这个范围,可以通过 NormalizeScore 将分数转化为[0,100]之间的范围。(不然如果打分插件A算出来0-100分,打分插件B算出来100-1000分,那A的分数再高也不会对结果产生影响。)在 NormalizeScore 阶段之后,调度器将会把每个 score 扩展点对某个具体节点的评分结果和该扩展的权重合并起来,作为最终评分结果。

从上面的扩展点接口来看,我们的插件要实现打分扩展点,就先要实现 ScorePlugin 这个接口:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13

type PodCapacityOfNode struct {
	handle framework.Handle
}

var _ = framework.ScorePlugin(&PodCapacityOfNode{})

// Name is the name of the plugin used in the Registry and configurations.
const Name = "PodCapacityOfNode"

func (ps *PodCapacityOfNode) Name() string {
	return Name
}

我们再进一步看看 ScorePlugin 这个接口的源码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13

// ScorePlugin is an interface that must be implemented by "Score" plugins to rank
// nodes that passed the filtering phase.
type ScorePlugin interface {
	Plugin
	// Score is called on each filtered node. It must return success and an integer
	// indicating the rank of the node. All scoring plugins must return success or
	// the pod will be rejected.
	Score(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string) (int64, *Status)

	// ScoreExtensions returns a ScoreExtensions interface if it implements one, or nil if does not.
	ScoreExtensions() ScoreExtensions
}

我们只需要 PodCapacityOfNode 插件去实现 Score 接口,就可以对节点进行打分。

编码

知道原理后,我们来看看具体怎么去实现。

本次我们基于 kubernetes 社区的 scheduler-plugins 框架来二次开发,这样不仅能使用我们自己的插件,还可以直接对这份代码里面现有的插件进行使用。有兴趣大家也可以去看看。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95

package podcapacityofnode

import (
	"context"
	"math"
	"fmt"

	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/klog"
	v1 "k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/kubernetes/pkg/scheduler/framework"
)

type PodCapacityOfNode struct {
	handle framework.Handle
}

var _ = framework.ScorePlugin(&PodCapacityOfNode{})

// Name is the name of the plugin used in the Registry and configurations.
const Name = "PodCapacityOfNode"

func (ps *PodCapacityOfNode) Name() string {
	return Name
}

// Score invoked at the score extension point.
func (ps *PodCapacityOfNode) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
	nodeInfo, err := ps.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)
	if err != nil {
		return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err))
	}

	// pe.score favors nodes with terminating pods instead of nominated pods
	// It calculates the sum of the node's terminating pods and nominated pods
	return ps.score(nodeInfo)
}

// ScoreExtensions of the Score plugin.
func (ps *PodCapacityOfNode) ScoreExtensions() framework.ScoreExtensions {
	return ps
}

func (ps *PodCapacityOfNode) score(nodeInfo *framework.NodeInfo) (int64, *framework.Status) {
	var (
		runningPodNum, nominatedPodNum, scoreOfCapacity, allPodNum int64
	)
	nodeName := nodeInfo.Node().Name
	// get nominated Pods for node from nominatedPodMap
	nominatedPodNum = int64(len(ps.handle.PreemptHandle().NominatedPodsForNode(nodeName)))
	// get all running state Pods on node
	runningPodNum = int64(len(nodeInfo.Pods))
	// get all of Pods on this node
	allPodNum = nodeInfo.Node().Status.Capacity.Pods().Value()
	// calculate percentage of how many Pods on node
	scoreOfCapacity = ((allPodNum - (runningPodNum + nominatedPodNum)) * 100 / (allPodNum))

	klog.Infof("node %s score is %v\n", nodeName, scoreOfCapacity)
	
	return scoreOfCapacity, nil
}

func (ps *PodCapacityOfNode) NormalizeScore(ctx context.Context, state *framework.CycleState, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status {
	// Find highest and lowest scores.
	var highest int64 = -math.MaxInt64
	var lowest int64 = math.MaxInt64
	for _, nodeScore := range scores {
		if nodeScore.Score > highest {
			highest = nodeScore.Score
		}
		if nodeScore.Score < lowest {
			lowest = nodeScore.Score
		}
	}

	// Transform the highest to lowest score range to fit the framework's min to max node score range.
	oldRange := highest - lowest
	newRange := framework.MaxNodeScore - framework.MinNodeScore
	for i, nodeScore := range scores {
		if oldRange == 0 {
			scores[i].Score = framework.MinNodeScore
		} else {
			scores[i].Score = ((nodeScore.Score - lowest) * newRange / oldRange) + framework.MinNodeScore
		}
	}

	return nil
}

// New initializes a new plugin and returns it.
func New(_ runtime.Object, h framework.Handle) (framework.Plugin, error) {
	return &PodCapacityOfNode{handle: h}, nil
}

写完插件,我们得把插件注册到调度器里面去。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

func main() {
	rand.Seed(time.Now().UnixNano())

	// Register custom plugins to the scheduler framework.
	// Later they can consist of scheduler profile(s) and hence
	// used by various kinds of workloads.
	command := app.NewSchedulerCommand(
		......
		app.WithPlugin(podcapacityofnode.Name, podcapacityofnode.New),
	)

	// TODO: once we switch everything over to Cobra commands, we can go back to calling
	// utilflag.InitFlags() (by removing its pflag.Parse() call). For now, we have to set the
	// normalize func and add the go flag set by hand.
	// utilflag.InitFlags()
	logs.InitLogs()
	defer logs.FlushLogs()

	if err := command.Execute(); err != nil {
		os.Exit(1)
	}
}

测试

我们写个单元测试来测试一下我们的函数能不能正常运行。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168

package podcapacityofnode

import (
	"context"
	"fmt"
	"testing"
	"time"

	"k8s.io/api/core/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/types"
	"k8s.io/client-go/informers"
	clientsetfake "k8s.io/client-go/kubernetes/fake"
	"k8s.io/kubernetes/pkg/scheduler/framework"
	fakeframework "k8s.io/kubernetes/pkg/scheduler/framework/fake"
	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder"
	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort"
	frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
	st "k8s.io/kubernetes/pkg/scheduler/testing"
	testutil "sigs.k8s.io/scheduler-plugins/test/util"
	"k8s.io/apimachinery/pkg/api/resource"

)

func TestPodCapacityOfNode(t *testing.T) {
	tests := []struct {
		nodeInfos    []*framework.NodeInfo
		wantErr      string
		expectedList framework.NodeScoreList
		name         string
	}{
		{
			nodeInfos:    []*framework.NodeInfo{makeNodeInfo("node1", 0, 0, 110, 110), makeNodeInfo("node2", 3, 0, 10, 110), makeNodeInfo("node3", 0, 0, 0, 110)},
			expectedList: []framework.NodeScore{{Name: "node1", Score: framework.MinNodeScore}, {Name: "node2", Score: 88}, {Name: "node3", Score: framework.MaxNodeScore}},
			name:         "pod capacity of node score.",
		},
	}

	for _, test := range tests {
		t.Run(test.name, func(t *testing.T) {
			cs := clientsetfake.NewSimpleClientset()
			informerFactory := informers.NewSharedInformerFactory(cs, 0)
			registeredPlugins := []st.RegisterPluginFunc{
				st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
				st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
				st.RegisterPluginAsExtensions(Name, New, "Score"),
			}
			fakeSharedLister := &fakeSharedLister{nodes: test.nodeInfos}

			fh, err := st.NewFramework(
				registeredPlugins,
				frameworkruntime.WithClientSet(cs),
				frameworkruntime.WithInformerFactory(informerFactory),
				frameworkruntime.WithSnapshotSharedLister(fakeSharedLister),
				frameworkruntime.WithPodNominator(testutil.NewPodNominator()),
			)
			if err != nil {
				t.Fatalf("fail to create framework: %s", err)
			}
			// initialize nominated pod by adding nominated pods into nominatedPodMap
			for _, n := range test.nodeInfos {
				for _, pi := range n.Pods {
					if pi.Pod.Status.NominatedNodeName != "" {
						addNominatedPod(pi.Pod, n.Node().Name, fh.PreemptHandle())
					}
				}
			}
			pe, _ := New(nil, fh)
			var gotList framework.NodeScoreList
			plugin := pe.(framework.ScorePlugin)
			for i, n := range test.nodeInfos {
				score, err := plugin.Score(context.Background(), nil, nil, n.Node().Name)
				if err != nil {
					t.Errorf("unexpected error: %v", err)
				}
				gotList = append(gotList, framework.NodeScore{Name: test.nodeInfos[i].Node().Name, Score: score})
			}

			status := plugin.ScoreExtensions().NormalizeScore(context.Background(), nil, nil, gotList)
			if !status.IsSuccess() {
				t.Errorf("unexpected error: %v", status)
			}

			for i := range gotList {
				if test.expectedList[i].Score != gotList[i].Score {
					t.Errorf("expected %#v, got %#v", test.expectedList[i].Score, gotList[i].Score)
				}
			}
		})
	}
}

func makeNodeInfo(node string, terminatingPodNumber, nominatedPodNumber, regularPodNumber, pods int) *framework.NodeInfo {
	ni := framework.NewNodeInfo()
	for i := 0; i < terminatingPodNumber; i++ {
		podInfo := &framework.PodInfo{
			Pod: makeTerminatingPod(fmt.Sprintf("tpod_%s_%v", node, i+1)),
		}
		ni.Pods = append(ni.Pods, podInfo)
	}
	for i := 0; i < nominatedPodNumber; i++ {
		podInfo := &framework.PodInfo{
			Pod: makeNominatedPod(fmt.Sprintf("npod_%s_%v", node, i+1), node),
		}
		ni.Pods = append(ni.Pods, podInfo)
	}
	for i := 0; i < regularPodNumber; i++ {
		podInfo := &framework.PodInfo{
			Pod: makeRegularPod(fmt.Sprintf("rpod_%s_%v", node, i+1)),
		}
		ni.Pods = append(ni.Pods, podInfo)
	}
	ni.SetNode(&v1.Node{
		ObjectMeta: metav1.ObjectMeta{Name: node},
		Status: v1.NodeStatus{
			Capacity: v1.ResourceList{
				v1.ResourcePods: *resource.NewQuantity(int64(pods), resource.DecimalExponent),
			},
		},
	})
	return ni
}

func makeTerminatingPod(name string) *v1.Pod {
	deletionTimestamp := metav1.Time{Time: time.Now()}
	return &v1.Pod{
		ObjectMeta: metav1.ObjectMeta{
			Name:              name,
			DeletionTimestamp: &deletionTimestamp,
		},
	}
}

func makeNominatedPod(podName string, nodeName string) *v1.Pod {
	return &v1.Pod{
		ObjectMeta: metav1.ObjectMeta{
			Name: podName,
			UID:  types.UID(podName),
		},
		Status: v1.PodStatus{
			NominatedNodeName: nodeName,
		},
	}
}

func makeRegularPod(name string) *v1.Pod {
	return &v1.Pod{
		ObjectMeta: metav1.ObjectMeta{
			Name: name,
		},
	}
}

func addNominatedPod(pod *v1.Pod, nodeName string, ph framework.PreemptHandle) *v1.Pod {
	ph.AddNominatedPod(pod, nodeName)
	return pod
}

var _ framework.SharedLister = &fakeSharedLister{}

type fakeSharedLister struct {
	nodes []*framework.NodeInfo
}

func (f *fakeSharedLister) NodeInfos() framework.NodeInfoLister {
	return fakeframework.NodeInfoLister(f.nodes)
}

运行:go test -run TestPodCapacityOfNode

看看程序输出是不是我们想要的调度结果。

对插件整体的一个测试。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135

package integration

import (
	"context"
	"testing"
	"time"

	"k8s.io/api/core/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/util/wait"
	"k8s.io/kubernetes/pkg/scheduler"
	schedapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
	fwkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
	st "k8s.io/kubernetes/pkg/scheduler/testing"
	testutils "k8s.io/kubernetes/test/integration/util"
	imageutils "k8s.io/kubernetes/test/utils/image"
	"sigs.k8s.io/scheduler-plugins/pkg/podcapacityofnode"
	"sigs.k8s.io/scheduler-plugins/test/util"
)

func TestPodCapacityOfNodePlugin(t *testing.T) {
	pause := imageutils.GetPauseImageName()
	nodeNominatedSelector := map[string]string{"nominated": "true"}
	tests := []struct {
		name         string
		pod          *v1.Pod
		pods         []*v1.Pod
		nodes        []*v1.Node
		expectedNode string
	}{
		{
			name: "pod will be scheduled to nodes has less running pods",
			pod:  st.MakePod().Name("p1").UID("p1").Container(pause).Obj(),
			pods: []*v1.Pod{
				st.MakePod().Name("pod-1").UID("pod-1").Node("node-a").Terminating().Container(pause).Obj(),
				st.MakePod().Name("pod-2").UID("pod-2").Node("node-a").Terminating().Container(pause).Obj(),
				st.MakePod().Name("pod-3").UID("pod-3").Node("node-b").Container(pause).Obj(),
				st.MakePod().Name("pod-4").UID("pod-4").Node("node-b").Container(pause).Obj(),
				st.MakePod().Name("pod-5").UID("pod-5").NodeSelector(nodeNominatedSelector).Node("node-c").Container(pause).Obj(),
				st.MakePod().Name("pod-6").UID("pod-6").NodeSelector(nodeNominatedSelector).Priority(highPriority).Container(pause).Obj(),
			},
			nodes: []*v1.Node{
				st.MakeNode().Name("node-a").Label("node", "node-a").Obj(),
				st.MakeNode().Name("node-b").Label("node", "node-b").Obj(),
				st.MakeNode().Name("node-c").Label("node", "node-c").Label("nominated", "true").Obj(),
			},
			expectedNode: "node-c",
		},
	}

	for _, tt := range tests {
		t.Run(tt.name, func(t *testing.T) {
			registry := fwkruntime.Registry{podcapacityofnode.Name: podcapacityofnode.New}
			profile := schedapi.KubeSchedulerProfile{
				SchedulerName: v1.DefaultSchedulerName,
				Plugins: &schedapi.Plugins{
					Score: &schedapi.PluginSet{
						Enabled: []schedapi.Plugin{
							{Name: podcapacityofnode.Name},
						},
						Disabled: []schedapi.Plugin{
							{Name: "*"},
						},
					},
				},
			}
			testCtx := util.InitTestSchedulerWithOptions(
				t,
				testutils.InitTestMaster(t, "sched-podcapacityofnode", nil),
				true,
				scheduler.WithProfiles(profile),
				scheduler.WithFrameworkOutOfTreeRegistry(registry),
			)
			defer testutils.CleanupTest(t, testCtx)

			cs, ns := testCtx.ClientSet, testCtx.NS.Name

			// Create nodes and pods.
			for _, node := range tt.nodes {
				if _, err := cs.CoreV1().Nodes().Create(testCtx.Ctx, node, metav1.CreateOptions{}); err != nil {
					t.Fatalf("failed to create node: %v", err)
				}
			}

			// Create existing Pods on node.
			for _, pod := range tt.pods {

				// Create Nominated Pods by setting two pods exposing same host port in one node.
				if _, ok := pod.Spec.NodeSelector["nominated"]; ok {
					pod.Spec.Containers[0].Ports = []v1.ContainerPort{{HostPort: 8080, ContainerPort: 8080}}
				}
				if _, err := cs.CoreV1().Pods(ns).Create(testCtx.Ctx, pod, metav1.CreateOptions{}); err != nil {
					t.Fatalf("failed to create existing Pod %q: %v", pod.Name, err)
				}

				// Ensure the existing Pods are scheduled successfully except for the nominated pods.
				if err := wait.Poll(1*time.Second, 20*time.Second, func() (bool, error) {
					return podScheduled(cs, ns, pod.Name), nil
				}); err != nil {
					t.Logf("pod %q failed to be scheduled", pod.Name)
				}

				// Create Terminating Pods by deleting pods from cluster.
				if pod.DeletionTimestamp != nil {
					if err := cs.CoreV1().Pods(ns).Delete(context.TODO(), pod.Name, metav1.DeleteOptions{}); err != nil {
						t.Fatalf("failed to delete existing Pod %q: %v", pod.Name, err)
					}
				}
			}

			// Create Pod to be scheduled.
			if _, err := cs.CoreV1().Pods(ns).Create(testCtx.Ctx, tt.pod, metav1.CreateOptions{}); err != nil {
				t.Fatalf("failed to create Pod %q: %v", tt.pod.Name, err)
			}

			// Ensure the Pod is scheduled successfully.
			if err := wait.Poll(1*time.Second, 60*time.Second, func() (bool, error) {
				return podScheduled(cs, ns, tt.pod.Name), nil
			}); err != nil {
				t.Errorf("pod %q failed to be scheduled: %v", tt.pod.Name, err)
			}

			// Lastly, verify pod gets scheduled to the expected node.
			pod, err := cs.CoreV1().Pods(ns).Get(context.TODO(), tt.pod.Name, metav1.GetOptions{})
			if err != nil {
				t.Fatalf("failed to get Pod %q: %v", tt.pod.Name, err)
			}
			if pod.Spec.NodeName != tt.expectedNode {
				t.Errorf("Pod %q is expected on node %q, but found on node %q",
					pod.Name, tt.expectedNode, pod.Spec.NodeName)
			}
		})
	}
}
1
2
3
4
5
6
7
8
9
......
I0121 23:25:52.053570   16566 reflector.go:225] Stopping reflector *v1beta1.PriorityLevelConfiguration (0s) from k8s.io/client-go/informers/factory.go:134
I0121 23:25:52.074830   16566 httplog.go:89] "HTTP" verb="GET" URI="/api/v1/namespaces/kube-system/configmaps?allowWatchBookmarks=true&resourceVersion=1&timeout=5m6s&timeoutSeconds=306&watch=true" latency="30.85559685s" userAgent="integration.test/v0.0.0 (linux/amd64) kubernetes/$Format" srcIP="127.0.0.1:49842" resp=0
I0121 23:25:52.074934   16566 reflector.go:225] Stopping reflector *v1.ConfigMap (12h0m0s) from k8s.io/kubernetes/pkg/controlplane/controller/clusterauthenticationtrust/cluster_authentication_trust_controller.go:444
I0121 23:25:52.075059   16566 httplog.go:89] "HTTP" verb="GET" URI="/apis/flowcontrol.apiserver.k8s.io/v1beta1/prioritylevelconfigurations?allowWatchBookmarks=true&resourceVersion=1&timeout=8m17s&timeoutSeconds=497&watch=true" latency="30.876832353s" userAgent="integration.test/v0.0.0 (linux/amd64) kubernetes/$Format" srcIP="127.0.0.1:49838" resp=0
I0121 23:25:52.075131   16566 httplog.go:89] "HTTP" verb="GET" URI="/apis/flowcontrol.apiserver.k8s.io/v1beta1/flowschemas?allowWatchBookmarks=true&resourceVersion=1&timeout=7m31s&timeoutSeconds=451&watch=true" latency="30.860498397s" userAgent="integration.test/v0.0.0 (linux/amd64) kubernetes/$Format" srcIP="127.0.0.1:49834" resp=0
PASS
I0121 23:25:52.090120   16566 etcd.go:140] etcd exit status: signal: killed
ok      sigs.k8s.io/scheduler-plugins/test/integration  34.814s
注意

运行打分插件单元测试可能会报错:

1
2
3
# k8s.io/kubernetes/cmd/kube-apiserver/app
../../vendor/k8s.io/kubernetes/cmd/kube-apiserver/app/server.go:477:87: undefined: openapi.GetOpenAPIDefinitions
FAIL    sigs.k8s.io/scheduler-plugins/pkg/podcapacityofnode [build failed]

原因是,如果你从 master 分支切到低版本分支(如 v1.18 以下),就会报错,需要命令去生成 Kubernetes OpenAPI 来解决上面的报错,项目根目录下运行:

1
make autogen

插件整体的单元测试,先装etcd:

1
hack/install-etcd.sh

结束

我们这一小节分析了扩展点接口的实现以及插件的代码实现,后面我再来调度插件看看怎么部署到集群。