目录

kubernetes源码-kube-scheduler 原理和源码分析(二)

kube-scheduler 原理和源码分析,源码为 kubernetes 的 release-1.22 分支 .

写在前面

kube-scheduler 的一些核心算法和逻辑,估计得分多篇来看,它涉及的内容比较多,自己语文又不好,可能也不太好总结。

控制器入口

继上篇我们看到了 sched.Run() 接口,它实际是调用 pkg/scheduler/scheduler.go 的 Run() 方法来的。

  1. 启动 Scheduler 的队列。队列有三种,activeQ 、 backoffQ 和 unschedulableQ 。

  2. Scheduler 启动的时候所有等待被调度的 pod 都会进入 activieQ,activeQ 会按照 Pod 的 priority 进行排序,Scheduler 会从 activeQ 获取一个 pod 进行执行调度流程,当调度失败之后会直接根据情况选择进入 unschedulableQ 或者 backoffQ,如果在当前 pod 调度期间 Node Cache、pod Cache 等 Scheduler Cache 有变化就进入 backoffQ,否则进入 unschedulableQ 。

  3. 使用 wait.UntilWithContext() 函数去运行 sched.scheduleOne 接口,sched.scheduleOne 通过 sched.NextPod() (实际是调用 queue.Pop() )从队列弹出一个待调度的 pod 来执行调度,直到从队列拿不出需要调度的 pod 时,则阻塞等待 pod 的到来。

  4. 调度器退出,关闭 Scheduler 的队列。

1
2
3
4
5
6
7
8
9
// Run begins watching and scheduling. It starts scheduling and blocked until the context is done.
func (sched *Scheduler) Run(ctx context.Context) {
  // 队列相关
  sched.SchedulingQueue.Run()
  // 真正的逻辑
  wait.UntilWithContext(ctx, sched.scheduleOne, 0)
  // 关闭队列
  sched.SchedulingQueue.Close()
}

scheduleOne()

顾名思义,每次拿出一个 pod 来进行调度,这里是核心逻辑。

我们之前使用调度框架实现自定义调度器的时候,我们了解到了调度器的拓展点知识点,在这里也一样,Scheduler 也是从这些拓展点上面一个个去实现它的逻辑的。

流程大体是这样的: PreFilter –> Filter –> PostFilter –> PreScore –> Score –> NormalizeScore –> Reserve –> Permit –> PreBind –> Bind –> PostBind 。

  1. 进入 scheduleOne() 函数。

  2. 检查 pod 是否存在指定的调度器名称字段 pod.Spec.SchedulerName ,必须要有。

  3. 检查 pod 是否需要跳过调度。处于正在被删除状态的 pod 跳过本次调度。 pod 在 AssumedPod 缓存里面,也跳过调度(AssumedPod,即完成调度后的 pod 会被加入到这个集合里面)。

  4. 初始化一个空的 podsToActivate 结构体,用于存放即将被调度的 pod 的信息。

  5. 调用 sched.Algorithm.Schedule() 接口开始对 pod 进行调度计算。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// scheduleOne does the entire scheduling workflow for a single pod. It is serialized on the scheduling algorithm's host fitting.
func (sched *Scheduler) scheduleOne(ctx context.Context) {
  podInfo := sched.NextPod()
  // pod could be nil when schedulerQueue is closed
  if podInfo == nil || podInfo.Pod == nil {
    return
  }
  pod := podInfo.Pod
  fwk, err := sched.frameworkForPod(pod)
  if err != nil {
    // This shouldn't happen, because we only accept for scheduling the pods
    // which specify a scheduler name that matches one of the profiles.
    klog.ErrorS(err, "Error occurred")
    return
  }
  if sched.skipPodSchedule(fwk, pod) {
    return
  }

  klog.V(3).InfoS("Attempting to schedule pod", "pod", klog.KObj(pod))

  // Synchronously attempt to find a fit for the pod.
  start := time.Now()
  state := framework.NewCycleState()
  state.SetRecordPluginMetrics(rand.Intn(100) < pluginMetricsSamplePercent)
  // Initialize an empty podsToActivate struct, which will be filled up by plugins or stay empty.
  podsToActivate := framework.NewPodsToActivate()
  state.Write(framework.PodsToActivateKey, podsToActivate)
  ...
  schedulingCycleCtx, cancel := context.WithCancel(ctx)
  defer cancel()
  scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, sched.Extenders, fwk, state, pod)
  ...

sched.Algorithm.Schedule()

我们先进入到 sched.Algorithm.Schedule() 接口看看。这里需要展开讲一下。

  1. g.snapshot()

    给 Scheduler 更新缓存,更新节点信息缓存、节点上面有哪些 pods 的缓存。

  2. 如果节点为 0 个,则说明 pod 无法被调度。

  3. findNodesThatFitPod()

    这里运行已经被注册过的 Prefilter 和 Filter 插件,需要展开讲讲。

  4. 如果 feasibleNodes 的数量为 1 ,那就是只有一个节点可以调度,如果是 0 ,则说明没有节点可以调度。

  5. prioritizeNodes()

    运行已经被注册过的打分插件给节点打分,这里后面也需要展开讲讲。

  6. g.selectHost() 挑选出打分最高的一个节点,如果有多个最高打分的节点,则随机从里面选出一个。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
func (g *genericScheduler) Schedule(ctx context.Context, extenders []framework.Extender, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
  trace := utiltrace.New("Scheduling", utiltrace.Field{Key: "namespace", Value: pod.Namespace}, utiltrace.Field{Key: "name", Value: pod.Name})
  defer trace.LogIfLong(100 * time.Millisecond)

  if err := g.snapshot(); err != nil {
    return result, err
  }
  trace.Step("Snapshotting scheduler cache and node infos done")

  if g.nodeInfoSnapshot.NumNodes() == 0 {
    return result, ErrNoNodesAvailable
  }

  feasibleNodes, diagnosis, err := g.findNodesThatFitPod(ctx, extenders, fwk, state, pod)
  if err != nil {
    return result, err
  }
  trace.Step("Computing predicates done")

  if len(feasibleNodes) == 0 {
    return result, &framework.FitError{
      Pod:         pod,
      NumAllNodes: g.nodeInfoSnapshot.NumNodes(),
      Diagnosis:   diagnosis,
    }
  }

  // When only one node after predicate, just use it.
  if len(feasibleNodes) == 1 {
    return ScheduleResult{
      SuggestedHost:  feasibleNodes[0].Name,
      EvaluatedNodes: 1 + len(diagnosis.NodeToStatusMap),
      FeasibleNodes:  1,
    }, nil
  }

  priorityList, err := prioritizeNodes(ctx, extenders, fwk, state, pod, feasibleNodes)
  if err != nil {
    return result, err
  }

  host, err := g.selectHost(priorityList)
  trace.Step("Prioritizing done")

  return ScheduleResult{
    SuggestedHost:  host,
    EvaluatedNodes: len(feasibleNodes) + len(diagnosis.NodeToStatusMap),
    FeasibleNodes:  len(feasibleNodes),
  }, err
}

接下来展开讲讲 g.findNodesThatFitPod() 。

  1. prefilter阶段,运行 prefilter 插件。

    • fwk.RunPreFilterPlugins()

      接口预处理 pod 的相关信息。实际上,这里就是 Prefilter 阶段的逻辑,主要对 node 上的 pods 进行 pod 亲和性检查等过滤性操作。

    • g.evaluateNominatedNode()

      在 1.22 版本以后,调度器还默认开启了抢占特性: pod 被创建后会进入队列等待调度。 调度器从队列中挑选一个 pod 并尝试将它调度到某个节点上。 如果没有找到满足 pod 的所指定的所有要求的节点,则触发对悬决 pod 的抢占逻辑。 让我们将悬决 pod 称为 P。抢占逻辑试图找到一个节点, 在该节点中删除一个或多个优先级低于 P 的 pod,则可以将 P 调度到该节点上。 如果找到这样的节点,一个或多个优先级较低的 pod 会被从节点中驱逐。 被驱逐的 pod 消失后,P 可以被调度到该节点上。

  2. g.findNodesThatPassFilters() 这里是 filter 阶段,在这个阶段运行 filter 插件。

    • g.numFeasibleNodesToFind()

      如果节点的数量在 100 以内或,percentageOfNodesToScore 为 100% ,则返回所有节点参与调度。

      percentageOfNodesToScore 默认值是 50 ,节点在 100 以内,则是 100% ,节点在 5000 时,取 10% ,并随集群节点数增对,该数值不停减少,最小取值是 5% 。

    • !fwk.HasFilterPlugins()

      如果调度器没有 Filter 插件,如果集群节点大于 100 的情况下,每次都返回前 100 个节点,是不公平的,这里会设置一个 g.nextStartNodeIndex 标记位,每次都从上一次结束的位置拿节点。

    • fwk.Parallelizer().Until(ctx, len(nodes), checkNode)

      开启 N 个线程并行运行 filter 插件,寻找符合条件的 node 节点,数量等于feasibleNodes 。一旦找到配置的可行节点数,就停止搜索更多节点,并设置 g.nextStartNodeIndex 标记位。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, extenders []framework.Extender, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.Diagnosis, error) {
  diagnosis := framework.Diagnosis{
    NodeToStatusMap:      make(framework.NodeToStatusMap),
    UnschedulablePlugins: sets.NewString(),
  }

  // Run "prefilter" plugins.
  s := fwk.RunPreFilterPlugins(ctx, state, pod)
  allNodes, err := g.nodeInfoSnapshot.NodeInfos().List()
  if err != nil {
    return nil, diagnosis, err
  }
  ...
  // "NominatedNodeName" can potentially be set in a previous scheduling cycle as a result of preemption.
  // This node is likely the only candidate that will fit the pod, and hence we try it first before iterating over all nodes.
  if len(pod.Status.NominatedNodeName) > 0 && feature.DefaultFeatureGate.Enabled(features.PreferNominatedNode) {
    feasibleNodes, err := g.evaluateNominatedNode(ctx, extenders, pod, fwk, state, diagnosis)
    if err != nil {
      klog.ErrorS(err, "Evaluation failed on nominated node", "pod", klog.KObj(pod), "node", pod.Status.NominatedNodeName)
    }
    // Nominated node passes all the filters, scheduler is good to assign this node to the pod.
    if len(feasibleNodes) != 0 {
      return feasibleNodes, diagnosis, nil
    }
  }
  ...
  feasibleNodes, err := g.findNodesThatPassFilters(ctx, fwk, state, pod, diagnosis, allNodes)
  if err != nil {
    return nil, diagnosis, err
  }

  feasibleNodes, err = findNodesThatPassExtenders(extenders, pod, feasibleNodes, diagnosis.NodeToStatusMap)
  if err != nil {
    return nil, diagnosis, err
  }
  return feasibleNodes, diagnosis, nil
}

// findNodesThatPassFilters finds the nodes that fit the filter plugins.
func (g *genericScheduler) findNodesThatPassFilters(
  ctx context.Context,
  fwk framework.Framework,
  state *framework.CycleState,
  pod *v1.Pod,
  diagnosis framework.Diagnosis,
  nodes []*framework.NodeInfo) ([]*v1.Node, error) {
  numNodesToFind := g.numFeasibleNodesToFind(int32(len(nodes)))

  // Create feasible list with enough space to avoid growing it
  // and allow assigning.
  feasibleNodes := make([]*v1.Node, numNodesToFind)

  if !fwk.HasFilterPlugins() {
    length := len(nodes)
    for i := range feasibleNodes {
      feasibleNodes[i] = nodes[(g.nextStartNodeIndex+i)%length].Node()
    }
    g.nextStartNodeIndex = (g.nextStartNodeIndex + len(feasibleNodes)) % length
    return feasibleNodes, nil
  }

  errCh := parallelize.NewErrorChannel()
  var statusesLock sync.Mutex
  var feasibleNodesLen int32
  ctx, cancel := context.WithCancel(ctx)
  checkNode := func(i int) {
    // We check the nodes starting from where we left off in the previous scheduling cycle,
    // this is to make sure all nodes have the same chance of being examined across pods.
    nodeInfo := nodes[(g.nextStartNodeIndex+i)%len(nodes)]
    status := fwk.RunFilterPluginsWithNominatedPods(ctx, state, pod, nodeInfo)
    if status.Code() == framework.Error {
      errCh.SendErrorWithCancel(status.AsError(), cancel)
      return
    }
    if status.IsSuccess() {
      length := atomic.AddInt32(&feasibleNodesLen, 1)
      if length > numNodesToFind {
        cancel()
        atomic.AddInt32(&feasibleNodesLen, -1)
      } else {
        feasibleNodes[length-1] = nodeInfo.Node()
      }
    } else {
      statusesLock.Lock()
      diagnosis.NodeToStatusMap[nodeInfo.Node().Name] = status
      diagnosis.UnschedulablePlugins.Insert(status.FailedPlugin())
      statusesLock.Unlock()
    }
  }

  beginCheckNode := time.Now()
  statusCode := framework.Success
  defer func() {
    // We record Filter extension point latency here instead of in framework.go because framework.RunFilterPlugins
    // function is called for each node, whereas we want to have an overall latency for all nodes per scheduling cycle.
    // Note that this latency also includes latency for `addNominatedPods`, which calls framework.RunPreFilterAddPod.
    metrics.FrameworkExtensionPointDuration.WithLabelValues(runtime.Filter, statusCode.String(), fwk.ProfileName()).Observe(metrics.SinceInSeconds(beginCheckNode))
  }()

  // Stops searching for more nodes once the configured number of feasible nodes
  // are found.
  fwk.Parallelizer().Until(ctx, len(nodes), checkNode)
  processedNodes := int(feasibleNodesLen) + len(diagnosis.NodeToStatusMap)
  g.nextStartNodeIndex = (g.nextStartNodeIndex + processedNodes) % len(nodes)

  feasibleNodes = feasibleNodes[:feasibleNodesLen]
  if err := errCh.ReceiveError(); err != nil {
    statusCode = framework.Error
    return nil, err
  }
  return feasibleNodes, nil
}

接下来展开讲讲 prioritizeNodes() ,这里就是我们所说的打分阶段了。

  1. prescore

    这个阶段主要运行 pod 亲和性、pod 拓扑/标签选择器打散算法、污点/容忍度相关的插件。

  2. score

    这个阶段同时启用多个线程对节点进行打分,打完分以后调用 NormalizeScore() 方法对各个插件的打分进行均衡,防止有点插件打分过高出现的一票否决这种情况。然后再给各个插件的打分加权重。返回一个打完分的节点列表。

  3. 汇总 scores

    汇总每个节点的所有打分插件的打分,返回节点汇总后的打分列表,供后续的逻辑挑选出最合适的节点,也就是给我们前面看到过的 g.selectHost() 方法去挑选。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
func prioritizeNodes(
  ctx context.Context,
  extenders []framework.Extender,
  fwk framework.Framework,
  state *framework.CycleState,
  pod *v1.Pod,
  nodes []*v1.Node,
) (framework.NodeScoreList, error) {
  // If no priority configs are provided, then all nodes will have a score of one.
  // This is required to generate the priority list in the required format
  if len(extenders) == 0 && !fwk.HasScorePlugins() {
    result := make(framework.NodeScoreList, 0, len(nodes))
    for i := range nodes {
      result = append(result, framework.NodeScore{
        Name:  nodes[i].Name,
        Score: 1,
      })
    }
    return result, nil
  }

  // Run PreScore plugins.
  preScoreStatus := fwk.RunPreScorePlugins(ctx, state, pod, nodes)
  if !preScoreStatus.IsSuccess() {
    return nil, preScoreStatus.AsError()
  }

  // Run the Score plugins.
  scoresMap, scoreStatus := fwk.RunScorePlugins(ctx, state, pod, nodes)
  if !scoreStatus.IsSuccess() {
    return nil, scoreStatus.AsError()
  }

  if klog.V(10).Enabled() {
    for plugin, nodeScoreList := range scoresMap {
      for _, nodeScore := range nodeScoreList {
        klog.InfoS("Plugin scored node for pod", "pod", klog.KObj(pod), "plugin", plugin, "node", nodeScore.Name, "score", nodeScore.Score)
      }
    }
  }

  // Summarize all scores.
  result := make(framework.NodeScoreList, 0, len(nodes))

  for i := range nodes {
    result = append(result, framework.NodeScore{Name: nodes[i].Name, Score: 0})
    for j := range scoresMap {
      result[i].Score += scoresMap[j][i].Score
    }
  }

  if len(extenders) != 0 && nodes != nil {
    var mu sync.Mutex
    var wg sync.WaitGroup
    combinedScores := make(map[string]int64, len(nodes))
    for i := range extenders {
      if !extenders[i].IsInterested(pod) {
        continue
      }
      wg.Add(1)
      go func(extIndex int) {
        metrics.SchedulerGoroutines.WithLabelValues(metrics.PrioritizingExtender).Inc()
        defer func() {
          metrics.SchedulerGoroutines.WithLabelValues(metrics.PrioritizingExtender).Dec()
          wg.Done()
        }()
        prioritizedList, weight, err := extenders[extIndex].Prioritize(pod, nodes)
        if err != nil {
          // Prioritization errors from extender can be ignored, let k8s/other extenders determine the priorities
          return
        }
        mu.Lock()
        for i := range *prioritizedList {
          host, score := (*prioritizedList)[i].Host, (*prioritizedList)[i].Score
          if klog.V(10).Enabled() {
            klog.InfoS("Extender scored node for pod", "pod", klog.KObj(pod), "extender", extenders[extIndex].Name(), "node", host, "score", score)
          }
          combinedScores[host] += score * weight
        }
        mu.Unlock()
      }(i)
    }
    // wait for all go routines to finish
    wg.Wait()
    for i := range result {
      // MaxExtenderPriority may diverge from the max priority used in the scheduler and defined by MaxNodeScore,
      // therefore we need to scale the score returned by extenders to the score range used by the scheduler.
      result[i].Score += combinedScores[result[i].Name] * (framework.MaxNodeScore / extenderv1.MaxExtenderPriority)
    }
  }

  if klog.V(10).Enabled() {
    for i := range result {
      klog.InfoS("Calculated node's final score for pod", "pod", klog.KObj(pod), "node", result[i].Name, "score", result[i].Score)
    }
  }
  return result, nil
}

未完待续

由于我平时都是业余看的代码,断断续续花了几天理清了 prefilter 、 filter 、prescore 、 score 阶段的逻辑,我要在这做一个分割,主要是这代码阅读量太大了,需要点时间整理剩下的部分代码。