目录

kubernetes源码-kube-scheduler 原理和源码分析(三)

kube-scheduler 原理和源码分析,源码为 kubernetes 的 release-1.22 分支 .

写在前面

前面 2 篇我们看完它是如何过滤和给节点打分的,这一篇我们看看它是怎么给 pod 预留资源并分配到节点上面的。

assumedPod

给 pod 打完分后,调度器先修改它的 NodeName 为 scheduleResult.SuggestedHost ,然后把该 pod 信息写到内存,给后面的阶段使用。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
  // Tell the cache to assume that a pod now is running on a given node, even though it hasn't been bound yet.
  // This allows us to keep scheduling without waiting on binding to occur.
  assumedPodInfo := podInfo.DeepCopy()
  assumedPod := assumedPodInfo.Pod
  // assume modifies `assumedPod` by setting NodeName=scheduleResult.SuggestedHost
  err = sched.assume(assumedPod, scheduleResult.SuggestedHost)
  if err != nil {
    metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
    // This is most probably result of a BUG in retrying logic.
    // We report an error here so that pod scheduling can be retried.
    // This relies on the fact that Error will check if the pod has been bound
    // to a node and if so will not add it back to the unscheduled pods queue
    // (otherwise this would cause an infinite loop).
    sched.recordSchedulingFailure(fwk, assumedPodInfo, err, SchedulerError, "")
    return
  }

Reserve

我们看看 Reserve 阶段,具体做了哪些操作。从目前版本的代码来看,这里只针对 pod 、 pv 、 pvc 、 node 这些资源对象做了处理。如果节点不匹配 PodVolumes 相关的信息,则返回错误。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
  // Run the Reserve method of reserve plugins.
  if sts := fwk.RunReservePluginsReserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
    metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
    // trigger un-reserve to clean up state associated with the reserved Pod
    fwk.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
    if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil {
      klog.ErrorS(forgetErr, "scheduler cache ForgetPod failed")
    }
    sched.recordSchedulingFailure(fwk, assumedPodInfo, sts.AsError(), SchedulerError, "")
    return
  }

permit

permit 这里作为 bind 阶段的准入,抢占资源用的,目前看起来是没有设置插件的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
  // Run "permit" plugins.
  runPermitStatus := fwk.RunPermitPlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
  if runPermitStatus.Code() != framework.Wait && !runPermitStatus.IsSuccess() {
    var reason string
    if runPermitStatus.IsUnschedulable() {
      metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
      reason = v1.PodReasonUnschedulable
    } else {
      metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
      reason = SchedulerError
    }
    // One of the plugins returned status different than success or wait.
    fwk.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
    if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil {
      klog.ErrorS(forgetErr, "scheduler cache ForgetPod failed")
    }
    sched.recordSchedulingFailure(fwk, assumedPodInfo, runPermitStatus.AsError(), reason, "")
    return
  }

  // At the end of a successful scheduling cycle, pop and move up Pods if needed.
  if len(podsToActivate.Map) != 0 {
    sched.SchedulingQueue.Activate(podsToActivate.Map)
    // Clear the entries after activation.
    podsToActivate.Map = make(map[string]*v1.Pod)
  }

bind

我们可以看到 bind 阶段,这里是一个 goroutine ,意味着它可能处理起来需要比较耗时,索性就单独给他丢一个线程让他自己去处理,然后进入到下一个调度循环。

它在这个阶段会先阻塞住,等待前面 permit 阶段的完成,然后进入 pre-bind 处理 volume 相关的资源,最后 bind 是通过 clientset 去执行 pod 绑定的:err := b.handle.ClientSet().CoreV1().Pods(binding.Namespace).Bind(ctx, binding, metav1.CreateOptions{})

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
  // bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
  go func() {
    bindingCycleCtx, cancel := context.WithCancel(ctx)
    defer cancel()
    metrics.SchedulerGoroutines.WithLabelValues(metrics.Binding).Inc()
    defer metrics.SchedulerGoroutines.WithLabelValues(metrics.Binding).Dec()

    waitOnPermitStatus := fwk.WaitOnPermit(bindingCycleCtx, assumedPod)
    if !waitOnPermitStatus.IsSuccess() {
      var reason string
      if waitOnPermitStatus.IsUnschedulable() {
        metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
        reason = v1.PodReasonUnschedulable
      } else {
        metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
        reason = SchedulerError
      }
      // trigger un-reserve plugins to clean up state associated with the reserved Pod
      fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
      if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil {
        klog.ErrorS(forgetErr, "scheduler cache ForgetPod failed")
      }
      sched.recordSchedulingFailure(fwk, assumedPodInfo, waitOnPermitStatus.AsError(), reason, "")
      return
    }

    // Run "prebind" plugins.
    preBindStatus := fwk.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
    if !preBindStatus.IsSuccess() {
      metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
      // trigger un-reserve plugins to clean up state associated with the reserved Pod
      fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
      if forgetErr := sched.SchedulerCache.ForgetPod(assumedPod); forgetErr != nil {
        klog.ErrorS(forgetErr, "scheduler cache ForgetPod failed")
      }
      sched.recordSchedulingFailure(fwk, assumedPodInfo, preBindStatus.AsError(), SchedulerError, "")
      return
    }

    err := sched.bind(bindingCycleCtx, fwk, assumedPod, scheduleResult.SuggestedHost, state)
    if err != nil {
      metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
      // trigger un-reserve plugins to clean up state associated with the reserved Pod
      fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
      if err := sched.SchedulerCache.ForgetPod(assumedPod); err != nil {
        klog.ErrorS(err, "scheduler cache ForgetPod failed")
      }
      sched.recordSchedulingFailure(fwk, assumedPodInfo, fmt.Errorf("binding rejected: %w", err), SchedulerError, "")
    } else {
      // Calculating nodeResourceString can be heavy. Avoid it if klog verbosity is below 2.
      if klog.V(2).Enabled() {
        klog.InfoS("Successfully bound pod to node", "pod", klog.KObj(pod), "node", scheduleResult.SuggestedHost, "evaluatedNodes", scheduleResult.EvaluatedNodes, "feasibleNodes", scheduleResult.FeasibleNodes)
      }
      metrics.PodScheduled(fwk.ProfileName(), metrics.SinceInSeconds(start))
      metrics.PodSchedulingAttempts.Observe(float64(podInfo.Attempts))
      metrics.PodSchedulingDuration.WithLabelValues(getAttemptsLabel(podInfo)).Observe(metrics.SinceInSeconds(podInfo.InitialAttemptTimestamp))

      // Run "postbind" plugins.
      fwk.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)

      // At the end of a successful binding cycle, move up Pods if needed.
      if len(podsToActivate.Map) != 0 {
        sched.SchedulingQueue.Activate(podsToActivate.Map)
        // Unlike the logic in scheduling cycle, we don't bother deleting the entries
        // as `podsToActivate.Map` is no longer consumed.
      }
    }
  }()

总结

1.kube-scheduler 相对来说比较难的部分是在前面 4 个阶段,过滤和打分阶段我并没有每个插件都拧出来单独解释,一是篇幅原因,而是精力有限,我们知道他是抽象成一个个插件来实现的即可。

2.扩展器部分我也没看,我们之前有看过自定义调度框架,跟这一部分是有重复的。

3.由于调度的代码穿插比较厉害,阅读过程中经常不停地从这个接口跳到另一个接口,这一部分内容不能很好记录下来,毕竟理科生,功力有限,无所谓了,反正也是留着自己将就着看。

4.调度器的最后,是使用 clientset 来实现 pod 和 node 的绑定的。