目录

kubernetes源码-kube-scheduler 原理和源码分析(一)

kube-scheduler 原理和源码分析,源码为 kubernetes 的 release-1.22 分支 .

写在前面

kube-scheduler 是 kubernetes 的核心组件之一,看名字就知道,它是专门负责 pods 调度的。看起来简单,实际很复杂,它会根据一些列调度算法和策略,将 pods 调度到最优工作节点上。

在开始之前,我们先简单了解一下 kubernetes 调度器。

调度步骤

kube-scheduler 给一个 Pod 做调度选择时包含两个步骤:

  • 过滤

  • 打分

(也有叫预选和优选的。)

过滤阶段会将所有满足 Pod 调度需求的节点选出来。 例如,PodFitsResources 过滤函数会检查候选节点的可用资源能否满足 Pod 的资源请求。 在过滤之后,得出一个节点列表,里面包含了所有可调度节点;通常情况下, 这个节点列表包含不止一个节点。如果这个列表是空的,代表这个 Pod 不可调度。

在打分阶段,调度器会为 Pod 从所有可调度节点中选取一个最合适的节点。 根据当前启用的打分规则,调度器会给每一个可调度节点进行打分。

最后,kube-scheduler 会将 Pod 调度到得分最高的节点上。 如果存在多个得分最高的节点,kube-scheduler 会从中随机选取一个。

支持以下两种方式配置调度器的过滤和打分行为:

  • 调度策略 允许你配置过滤所用的 断言(Predicates) 和打分所用的 优先级(Priorities)。

  • 调度配置 允许你配置实现不同调度阶段的插件, 包括:QueueSort、Filter、Score、Bind、Reserve、Permit 等等。 你也可以配置 kube-scheduler 运行不同的配置文件。

调度器性能

在大规模集群中,你可以调节调度器的表现来平衡调度的延迟(新 Pod 快速就位) 和精度(调度器很少做出糟糕的放置决策)。

节点打分阈值

要提升调度性能,kube-scheduler 可以在找到足够的可调度节点之后停止查找。 在大规模集群中,比起考虑每个节点的简单方法相比可以节省时间。

你可以使用整个集群节点总数的百分比作为阈值来指定需要多少节点就足够。 kube-scheduler 会将它转换为节点数的整数值。在调度期间,如果 kube-scheduler 已确认的可调度节点数足以超过了配置的百分比数量, kube-scheduler 将停止继续查找可调度节点并继续进行 打分阶段。

默认阈值

如果你不指定阈值,Kubernetes 使用线性公式计算出一个比例,在大于 100 节点集群取 50%,在 5000 节点的集群取 10% ,随节点数增加,这个数组不停在减少。这个自动设置的参数的最低值是 5%。

这意味着,调度器至少会对集群中 5% 的节点进行打分,除非用户将该参数设置的低于 5。

如果你想让调度器对集群内所有节点进行打分,则将 percentageOfNodesToScore 设置为 100。

入口函数

cmd/kube-scheduler/scheduler.go 是总入口,调用 command.Execute() 启用服务。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
func main() {
  rand.Seed(time.Now().UnixNano())

  pflag.CommandLine.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)

  command := app.NewSchedulerCommand()

  logs.InitLogs()
  defer logs.FlushLogs()

  if err := command.Execute(); err != nil {
    os.Exit(1)
  }
}

cmd/kube-scheduler/app/server.go 是调度器的启动逻辑所在,调用 runCommand() 函数运行调度器。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// NewSchedulerCommand creates a *cobra.Command object with default parameters and registryOptions
func NewSchedulerCommand(registryOptions ...Option) *cobra.Command {
  opts := options.NewOptions()

  cmd := &cobra.Command{
    Use: "kube-scheduler",
    Long: `The Kubernetes scheduler is a control plane process which assigns
Pods to Nodes. The scheduler determines which Nodes are valid placements for
each Pod in the scheduling queue according to constraints and available
resources. The scheduler then ranks each valid Node and binds the Pod to a
suitable Node. Multiple different schedulers may be used within a cluster;
kube-scheduler is the reference implementation.
See [scheduling](https://kubernetes.io/docs/concepts/scheduling-eviction/)
for more information about scheduling and the kube-scheduler component.`,
    Run: func(cmd *cobra.Command, args []string) {
      if err := runCommand(cmd, opts, registryOptions...); err != nil {
        fmt.Fprintf(os.Stderr, "%v\n", err)
        os.Exit(1)
      }
    },
    Args: func(cmd *cobra.Command, args []string) error {
      for _, arg := range args {
        if len(arg) > 0 {
          return fmt.Errorf("%q does not take any arguments, got %q", cmd.CommandPath(), args)
        }
      }
      return nil
    },
  }

  nfs := opts.Flags
  verflag.AddFlags(nfs.FlagSet("global"))
  globalflag.AddGlobalFlags(nfs.FlagSet("global"), cmd.Name())
  fs := cmd.Flags()
  for _, f := range nfs.FlagSets {
    fs.AddFlagSet(f)
  }

  cols, _, _ := term.TerminalSize(cmd.OutOrStdout())
  cliflag.SetUsageAndHelpFunc(cmd, *nfs, cols)

  cmd.MarkFlagFilename("config", "yaml", "yml", "json")

  return cmd
}

runCommand

Setup() 函数构建调度器, Run() 函数启动调度器。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
// runCommand runs the scheduler.
func runCommand(cmd *cobra.Command, opts *options.Options, registryOptions ...Option) error {
  verflag.PrintAndExitIfRequested()
  cliflag.PrintFlags(cmd.Flags())

  ctx, cancel := context.WithCancel(context.Background())
  defer cancel()
  go func() {
    stopCh := server.SetupSignalHandler()
    <-stopCh
    cancel()
  }()

  cc, sched, err := Setup(ctx, opts, registryOptions...)
  if err != nil {
    return err
  }

  return Run(ctx, cc, sched)
}

构造 scheduler

我们先简单看看 Setup() 函数。

  1. 生成默认配置。

  2. 校验参数是否有误。

  3. 构建调度器。

  4. 最后返回生成好的配置,和调度器对象。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
// Setup creates a completed config and a scheduler based on the command args and options
func Setup(ctx context.Context, opts *options.Options, outOfTreeRegistryOptions ...Option) (*schedulerserverconfig.CompletedConfig, *scheduler.Scheduler, error) {
  if cfg, err := latest.Default(); err != nil {
    return nil, nil, err
  } else {
    opts.ComponentConfig = cfg
  }

  if errs := opts.Validate(); len(errs) > 0 {
    return nil, nil, utilerrors.NewAggregate(errs)
  }

  c, err := opts.Config()
  if err != nil {
    return nil, nil, err
  }

  // Get the completed config
  cc := c.Complete()

  outOfTreeRegistry := make(runtime.Registry)
  for _, option := range outOfTreeRegistryOptions {
    if err := option(outOfTreeRegistry); err != nil {
      return nil, nil, err
    }
  }

  recorderFactory := getRecorderFactory(&cc)
  completedProfiles := make([]kubeschedulerconfig.KubeSchedulerProfile, 0)
  // Create the scheduler.
  sched, err := scheduler.New(cc.Client,
    cc.InformerFactory,
    recorderFactory,
    ctx.Done(),
    scheduler.WithComponentConfigVersion(cc.ComponentConfig.TypeMeta.APIVersion),
    scheduler.WithKubeConfig(cc.KubeConfig),
    scheduler.WithProfiles(cc.ComponentConfig.Profiles...),
    scheduler.WithLegacyPolicySource(cc.LegacyPolicySource),
    scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
    scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry),
    scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds),
    scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds),
    scheduler.WithExtenders(cc.ComponentConfig.Extenders...),
    scheduler.WithParallelism(cc.ComponentConfig.Parallelism),
    scheduler.WithBuildFrameworkCapturer(func(profile kubeschedulerconfig.KubeSchedulerProfile) {
      // Profiles are processed during Framework instantiation to set default plugins and configurations. Capturing them for logging
      completedProfiles = append(completedProfiles, profile)
    }),
  )
  if err != nil {
    return nil, nil, err
  }
  if err := options.LogOrWriteConfig(opts.WriteConfigTo, &cc.ComponentConfig, completedProfiles); err != nil {
    return nil, nil, err
  }

  return &cc, sched, nil
}

sched.Run

启动调度过程。

  1. 配置对象注册。

  2. 准备事件记录器。

  3. 设置健康检查,并启动健康检查服务。

  4. 启动所有 informers ,并等待缓存同步完毕。

  5. leader 选举相关机制逻辑,并等到选举完成。

  6. 调用 sched.Run() 启动调度器,这里是调度器的核心逻辑入口。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
// Run executes the scheduler based on the given configuration. It only returns on error or when context is done.
func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *scheduler.Scheduler) error {
  // To help debugging, immediately log version
  klog.V(1).InfoS("Starting Kubernetes Scheduler version", "version", version.Get())

  // Configz registration.
  if cz, err := configz.New("componentconfig"); err == nil {
    cz.Set(cc.ComponentConfig)
  } else {
    return fmt.Errorf("unable to register configz: %s", err)
  }

  // Prepare the event broadcaster.
  cc.EventBroadcaster.StartRecordingToSink(ctx.Done())

  // Setup healthz checks.
  var checks []healthz.HealthChecker
  if cc.ComponentConfig.LeaderElection.LeaderElect {
    checks = append(checks, cc.LeaderElection.WatchDog)
  }

  waitingForLeader := make(chan struct{})
  isLeader := func() bool {
    select {
    case _, ok := <-waitingForLeader:
      // if channel is closed, we are leading
      return !ok
    default:
      // channel is open, we are waiting for a leader
      return false
    }
  }

  // Start up the healthz server.
  if cc.InsecureServing != nil {
    separateMetrics := cc.InsecureMetricsServing != nil
    handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, cc.InformerFactory, isLeader, separateMetrics, checks...), nil, nil)
    if err := cc.InsecureServing.Serve(handler, 0, ctx.Done()); err != nil {
      return fmt.Errorf("failed to start healthz server: %v", err)
    }
  }
  if cc.InsecureMetricsServing != nil {
    handler := buildHandlerChain(newMetricsHandler(&cc.ComponentConfig, cc.InformerFactory, isLeader), nil, nil)
    if err := cc.InsecureMetricsServing.Serve(handler, 0, ctx.Done()); err != nil {
      return fmt.Errorf("failed to start metrics server: %v", err)
    }
  }
  if cc.SecureServing != nil {
    handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, cc.InformerFactory, isLeader, false, checks...), cc.Authentication.Authenticator, cc.Authorization.Authorizer)
    // TODO: handle stoppedCh returned by c.SecureServing.Serve
    if _, err := cc.SecureServing.Serve(handler, 0, ctx.Done()); err != nil {
      // fail early for secure handlers, removing the old error loop from above
      return fmt.Errorf("failed to start secure server: %v", err)
    }
  }

  // Start all informers.
  cc.InformerFactory.Start(ctx.Done())

  // Wait for all caches to sync before scheduling.
  cc.InformerFactory.WaitForCacheSync(ctx.Done())

  // If leader election is enabled, runCommand via LeaderElector until done and exit.
  if cc.LeaderElection != nil {
    cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
      OnStartedLeading: func(ctx context.Context) {
        close(waitingForLeader)
        sched.Run(ctx)
      },
      OnStoppedLeading: func() {
        select {
        case <-ctx.Done():
          // We were asked to terminate. Exit 0.
          klog.Info("Requested to terminate. Exiting.")
          os.Exit(0)
        default:
          // We lost the lock.
          klog.Exitf("leaderelection lost")
        }
      },
    }
    leaderElector, err := leaderelection.NewLeaderElector(*cc.LeaderElection)
    if err != nil {
      return fmt.Errorf("couldn't create leader elector: %v", err)
    }

    leaderElector.Run(ctx)

    return fmt.Errorf("lost lease")
  }

  // Leader election is disabled, so runCommand inline until done.
  close(waitingForLeader)
  sched.Run(ctx)
  return fmt.Errorf("finished without leader elect")
}

总结

1.调度器在调度 Pod 的时候,大致分两个阶段,预选和优选。预选既是先过滤出一批合适的节点,优选则是从这一批合适的节点挑选出最合适调度的节点。

2.简单了解了一下调度器预选的一些阈值。

3.调度器允许在 QueueSort、Filter、Score、Bind、Reserve、Permit 等阶段通过使用插件来实现自己的调度算法。我们之前的自定义调度器框架就有详细说过。

由于篇幅原因,我们先简单看一下前面入口逻辑部分,后面我们再详细看看 sched.Run() 函数,具体的调度流程和算法全都在这里面实现,不得不分开来阅读,不然这一篇的内容会很长。