目录

kubernetes组件开发-自定义调度器(四)

时隔一年,再次使用 kubernetes 的调度器框架.

动机

之前用自定义调度框架写过一个 demo ,是基于节点 pod 数量的打分调度插件,然后最近想根据节点负载来写一个调度插件,仔细看了一下调度框架,他本身就集成了节点资源的插件,所以我们可以出一期来记录一下。

不过他的打分我看了一下,感觉实际没什么太大用处,他是基于节点的可分配资源多少来打分的,而不是基于预留出去的资源来动态感知的(即 requests 部分占用的资源),不过最近新公司岗位变动我也懒得去写了,看了下源码,自己实现也是挺简单的,我们就将就这来吧(我知道市面有诸如 descheduler 和 koordinator 这类产品,但是实测 koordinator 对 api-server 不是很友好,我们这边垃圾机械磁盘服务器,每次调度的时候,api-server 对 etcd 的读写延迟就要几十秒甚至超时,条件好的朋友可以试试,不用重复造轮子。

scheduler-plugins/pkg/noderesources/resource_allocation.go

虽然他有分别计算节点总的可分配资源和已预留 + 当前 pod 的 requests 。但是再往下看 score := r.scorer(requested, allocatable)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
...
func (r *resourceAllocationScorer) score(
  logger klog.Logger,
  pod *v1.Pod,
  nodeInfo *framework.NodeInfo) (int64, *framework.Status) {
  node := nodeInfo.Node()
  if node == nil {
    return 0, framework.NewStatus(framework.Error, "node not found")
  }
  if r.resourceToWeightMap == nil {
    return 0, framework.NewStatus(framework.Error, "resources not found")
  }
  requested := make(resourceToValueMap, len(r.resourceToWeightMap))
  allocatable := make(resourceToValueMap, len(r.resourceToWeightMap))
  for resource := range r.resourceToWeightMap {
    allocatable[resource], requested[resource] = calculateResourceAllocatableRequest(logger, nodeInfo, pod, resource)
  }

  score := r.scorer(requested, allocatable)

  if logger.V(10).Enabled() {
    logger.Info("Resources and score",
      "podName", pod.Name, "nodeName", node.Name, "scorer", r.Name,
      "allocatableResources", allocatable, "requestedResources", requested,
      "score", score)
  }

  return score, nil
}
...

scheduler-plugins/pkg/noderesources/allocatable.go

这里是 scorer 的构造函数。

1
2
3
4
5
6
7
8
  return &Allocatable{
    handle: h,
    resourceAllocationScorer: resourceAllocationScorer{
      Name:                AllocatableName,
      scorer:              resourceScorer(logger, resToWeightMap, mode),
      resourceToWeightMap: resToWeightMap,
    },
  }, nil

这里就很神奇了, func(requested, allocable resourceToValueMap) 入参虽然有 requested ,但是函数里面并没有用到,而是拿的总的可分配资源去做的打分。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
...
func resourceScorer(logger klog.Logger, resToWeightMap resourceToWeightMap, mode config.ModeType) func(resourceToValueMap, resourceToValueMap) int64 {
  return func(requested, allocable resourceToValueMap) int64 {
    // TODO: consider volumes in scoring.
    var nodeScore, weightSum int64
    for resource, weight := range resToWeightMap {
      resourceScore := score(logger, allocable[resource], mode)
      nodeScore += resourceScore * weight
      weightSum += weight
    }
    return nodeScore / weightSum
  }
}
...

安装

在 v0.19 版本以上开始,调度器框架需要控制器,说是为了优化设计和实现。

github 原文: ❗IMPORTANT❗ Starting with release v0.19, several plugins (e.g., coscheduling) introduced CRD to optimize their design and implementation.

安装我们这里按官网的方法,部署了控制器和调度器,有自己实现其他插件的,需要自己编译打包成镜像后再实施,我这边是按默认调度器的方式安装的,安装过程略

我看到有 2 个比较有意思的内置插件,简单介绍一下这 2 个插件,有兴趣的话,可以自己去官方 github 查看。

  • Coscheduling

    pod 组,官方的 demo 是: deploymentA 最小设置 6 个副本,pod 组设置最小需要启动的副本数是 3 ,那么启动这个 deploymentA 的时候,集群资源刚好够 3 个副本的时候,服务会启动 3 个,其他几个 pending,如果 pod 组设置的 4,如果最小副本不够 4,则服务启动不起来,全部 pending 。

  • ElasticQuota

    自动扩缩容,看官方的 demo:创建一个 cpu 资源在 4-6 核的 deploymentA ,创建一个 cpu 资源在 4-6 核的 deploymentB , deploymentA 3 个副本,每个副本 2 cpu 的预留,可以启动到 3 个,这个时候如果剩下的资源只够 deploymentB 启动 1 个 pod ,部署 2 个副本的 deploymentB ,那么他会自动缩减 deploymentA 一个副本,使 deploymentB 达到最小的 4 cpu 预留的副本。

配置

Node Resources demo 配置

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
apiVersion: kubescheduler.config.k8s.io/v1
kind: KubeSchedulerConfiguration
leaderElection:
  leaderElect: false
clientConnection:
  kubeconfig: "/etc/kubernetes/scheduler.conf"
profiles:
- schedulerName: default-scheduler
  plugins:
    score:
      enabled:
      - name: NodeResourcesAllocatable
  pluginConfig:
  - name: NodeResourcesAllocatable
    args:
      # 资源最少使用优先,还有 Most,对应的是资源最多使用的优先。
      mode: Least
      resources:
      - name: cpu
        weight: 1000000
      - name: memory
        weight: 1

配置参数

写过插件的都知道,如果要给插件设置参数,那我们不可能单独在调度器框架里面让框架去单独从某个地方读取配置文件获取参数,这种方式不是不行,就是得改太多地方,如果非要,最方便的方法就是独立写一个程序来完成,不过其实调度框架已经帮我们考虑到这种情况了,比如上面 KubeSchedulerConfiguration 这种配置,可以通过在里面 pluginConfig 字段来实现参数设置。

我也是出于好奇,看了一下源码,了解一下他是怎么读取的配置参数。

scheduler-plugins/cmd/scheduler/main.go

k8s 调度器框架的启动及注册。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func main() {
    // Register custom plugins to the scheduler framework.
    // Later they can consist of scheduler profile(s) and hence
    // used by various kinds of workloads.
    command := app.NewSchedulerCommand(
        app.WithPlugin(capacityscheduling.Name, capacityscheduling.New),
        app.WithPlugin(coscheduling.Name, coscheduling.New),
        app.WithPlugin(loadvariationriskbalancing.Name, loadvariationriskbalancing.New),
        app.WithPlugin(networkoverhead.Name, networkoverhead.New),
        app.WithPlugin(topologicalsort.Name, topologicalsort.New),
        app.WithPlugin(noderesources.AllocatableName, noderesources.NewAllocatable),
        app.WithPlugin(noderesourcetopology.Name, noderesourcetopology.New),
        app.WithPlugin(preemptiontoleration.Name, preemptiontoleration.New),
        app.WithPlugin(targetloadpacking.Name, targetloadpacking.New),
        app.WithPlugin(lowriskovercommitment.Name, lowriskovercommitment.New),
        app.WithPlugin(sysched.Name, sysched.New),
        // Sample plugins below.
        // app.WithPlugin(crossnodepreemption.Name, crossnodepreemption.New),
        app.WithPlugin(podstate.Name, podstate.New),
        app.WithPlugin(qos.Name, qos.New),
    )

    code := cli.Run(command)
    os.Exit(code)
}

kubernetes/pkg/scheduler/framework/runtime/framework.go

这里是 k8s 调度器获取插件,及插件对应的参数配置的源码,这里可以看到,他是根据插件名去获取 pluginConfig 下对应插件的参数配置的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
    ...
    // get needed plugins from config
    pg := f.pluginsNeeded(profile.Plugins)

    pluginConfig := make(map[string]runtime.Object, len(profile.PluginConfig))
    for i := range profile.PluginConfig {
        name := profile.PluginConfig[i].Name
        if _, ok := pluginConfig[name]; ok {
            return nil, fmt.Errorf("repeated config for plugin %s", name)
        }
        pluginConfig[name] = profile.PluginConfig[i].Args
    }
    ...

最后

最近也没什么东西更新的,水一期,最近行情特别差,到了新公司以后,还没半年公司直接没了,估计后面又要很久没更新了。