目录

kubernetes源码-Job Controller 原理和源码分析

Job Controller 原理和源码分析,源码为 kubernetes 的 release-1.22 分支 .

写在前面

最近闲暇之余,对 kubernetes 的源码来了兴趣,想了解一下其中各个控制器的运行原理,在查阅了相关资料和源码后,感觉单靠脑力去记是记不住这么知识点的,便顺手记一下,方便日后翻阅。

和所有控制器一样,启动流程为:Run -> worker -> processNextWorkItem -> syncHandler。

入口函数

入口函数 startJobController ,启动 job 控制器。

源码路径 kubernetes\cmd\kube-controller-manager\app\batch.go

1
2
3
4
5
6
7
8
func startJobController(ctx ControllerContext) (http.Handler, bool, error) {
  go job.NewController(
    ctx.InformerFactory.Core().V1().Pods(),
    ctx.InformerFactory.Batch().V1().Jobs(),
    ctx.ClientBuilder.ClientOrDie("job-controller"),
  ).Run(int(ctx.ComponentConfig.JobController.ConcurrentJobSyncs), ctx.Stop)
  return nil, true, nil
}

NewController

startJobController 实际就是调用 job.NewController 构造函数初始化的,我们看看 NewController 部分的代码。

源码路径 kubernetes\pkg\controller\job\job_controller.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
func NewController(podInformer coreinformers.PodInformer, jobInformer batchinformers.JobInformer, kubeClient clientset.Interface) *Controller {
  // 事件广播器,用于记录事件
  eventBroadcaster := record.NewBroadcaster()
  eventBroadcaster.StartStructuredLogging(0)
  eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})

  // metrics 相关部分
  if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
    ratelimiter.RegisterMetricAndTrackRateLimiterUsage("job_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter())
  }

  // 创建控制器部分比较简单,没什么好说的
  jm := &Controller{
    kubeClient: kubeClient,
    podControl: controller.RealPodControl{
      KubeClient: kubeClient,
      Recorder:   eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}),
    },
    expectations: controller.NewControllerExpectations(),
    queue:        workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(DefaultJobBackOff, MaxJobBackOff), "job"),
    orphanQueue:  workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(DefaultJobBackOff, MaxJobBackOff), "job_orphan_pod"),
    recorder:     eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}),
  }

  // 监听 job 对象事件
  jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    AddFunc: func(obj interface{}) {
      jm.enqueueController(obj, true)
    },
    UpdateFunc: jm.updateJob,
    DeleteFunc: func(obj interface{}) {
      jm.enqueueController(obj, true)
    },
  })
  jm.jobLister = jobInformer.Lister()
  jm.jobStoreSynced = jobInformer.Informer().HasSynced

  // 监听 pod 对象事件
  podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    AddFunc:    jm.addPod,
    UpdateFunc: jm.updatePod,
    DeleteFunc: jm.deletePod,
  })
  jm.podStore = podInformer.Lister()
  jm.podStoreSynced = podInformer.Informer().HasSynced

  // patch/更新 job 对象 status 部分
  jm.updateStatusHandler = jm.updateJobStatus
  jm.patchJobHandler = jm.patchJob
  // 同步处理 job 对象
  jm.syncHandler = jm.syncJob

  metrics.Register()

  return jm
}

Run

完成构建后,跟着运行 Run 函数,启动控制器。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
func (jm *Controller) Run(workers int, stopCh <-chan struct{}) {
    defer utilruntime.HandleCrash()
    defer jm.queue.ShutDown()

    klog.Infof("Starting job controller")
    defer klog.Infof("Shutting down job controller")
  // 等待数据从 apiserver 同步到缓存中
    if !cache.WaitForNamedCacheSync("job", stopCh, jm.podStoreSynced, jm.jobStoreSynced) {
        return
    }
  // 启动多个协程处理 job 对象
    for i := 0; i < workers; i++ {
        go wait.Until(jm.worker, time.Second, stopCh)
    }

    <-stopCh
}

接下,我们先不急着去看 worker 函数具体什么样,我们先回头看构造控制器的时候,用到的一些函数。

enqueueController

在构建 Controller 的时候,job 的 Add/Delete 事件处理函数直接调用了 enqueueController() 函数。

该函数是拿到 job 对象后,将 job 对象加入到延迟队列,一开始刚启动控制器的时候,拿到的都是 add 事件,我们看到他这里是立即加入到队列。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
func (jm *Controller) enqueueController(obj interface{}, immediate bool) {
  // 获取 job 的唯一键值
    key, err := controller.KeyFunc(obj)
    if err != nil {
        utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
        return
    }
  // 将退避时间归零
    backoff := time.Duration(0)
  // 如果 immediate 不为 true,则重新计算退避时间
    if !immediate {
        backoff = getBackoff(jm.queue, key)
    }
  // 延迟队列,backoff 为0则直接加入队列
    jm.queue.AddAfter(key, backoff)
}

updateJob

jobInformer 的 updateJob 是 job 的 Update 事件处理函数。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func (jm *Controller) updateJob(old, cur interface{}) {
    oldJob := old.(*batch.Job)
    curJob := cur.(*batch.Job)

    key, err := controller.KeyFunc(curJob)
  // 有错误不处理也不返回错误直接返回?
    if err != nil {
        return
    }
    jm.enqueueController(curJob, true)
    // 检查 ActiveDeadlineSeconds 是否存在,存在则计算延迟多少秒后再加入队列重新处理
    if curJob.Status.StartTime != nil {
        curADS := curJob.Spec.ActiveDeadlineSeconds
        if curADS == nil {
            return
        }
        oldADS := oldJob.Spec.ActiveDeadlineSeconds
        if oldADS == nil || *oldADS != *curADS {
            now := metav1.Now()
            start := curJob.Status.StartTime.Time
            passed := now.Time.Sub(start)
            total := time.Duration(*curADS) * time.Second
            // 相当于如果 curADS != nil ,重新经过 total-passed 秒后,再将 job 加入到队列,给 worker 处理
            jm.queue.AddAfter(key, total-passed)
            klog.V(4).Infof("job %q ActiveDeadlineSeconds updated, will rsync after %d seconds", key, total-passed)
        }
    }
}

add/update/deletePod

podInformer 的 addPod,updatePod,deletePod 函数

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
// 当有 pod 创建的时候,触发 addPod 事件,通过 pod 找到对应的 job 对象,加入队列去重新同步一次 job 的期望值.
func (jm *Controller) addPod(obj interface{}) {
    pod := obj.(*v1.Pod)
    if pod.DeletionTimestamp != nil {
        // 控制器重启的时候可能会错过 pod 删除事件,已经删除的 pod 可能会以 add 事件通知控制器,这里检测到 pod 处于删除状态中,则调用 deletePod 再删除一遍,避免触发多一次 CreationObserved.
        jm.deletePod(pod)
        return
    }

    // 查找 job 对象.
    if controllerRef := metav1.GetControllerOf(pod); controllerRef != nil {
        job := jm.resolveControllerRef(pod.Namespace, controllerRef)
        if job == nil {
            return
        }
        jobKey, err := controller.KeyFunc(job)
        if err != nil {
            return
        }
    // 检测到 add 事件,控制器期望创建的Pod数量-1,等同于 sync.WaitGroup.Done()
        jm.expectations.CreationObserved(jobKey)
    // 当前属于 job 的 pod 创建成功了,说明 JobStatus.Active 发生了变化,就应该唤醒 worker 协程来处理这个 job
        jm.enqueueController(job, true)
        return
    }

    // 对于那些孤儿,先看看有哪些 job 能收养,再把有能力收养的 job 对象放入队列让 worker 去处理
    for _, job := range jm.getPodJobs(pod) {
        jm.enqueueController(job, true)
    }
}

// 当 pod 发生更新事件时,看看标签有没有变化,如果有变化要跟旧的 pod 一起处理
func (jm *Controller) updatePod(old, cur interface{}) {
    curPod := cur.(*v1.Pod)
    oldPod := old.(*v1.Pod)
    if curPod.ResourceVersion == oldPod.ResourceVersion {
        // 如果 ResourceVersion 没变化不做处理
        return
    }
    if curPod.DeletionTimestamp != nil {
        // 如果处于删除状态,则直接删除
        jm.deletePod(curPod)
        return
    }

    // 如果 pod 状态为失败,需要根据失败的次数计算退避时间延迟放入
    immediate := curPod.Status.Phase != v1.PodFailed

    curControllerRef := metav1.GetControllerOf(curPod)
    oldControllerRef := metav1.GetControllerOf(oldPod)
    controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
    if controllerRefChanged && oldControllerRef != nil {
        // 如果控制器发生变化,说明旧的 job 对象也发生变化,需要加入到队列重新进行同步
        if job := jm.resolveControllerRef(oldPod.Namespace, oldControllerRef); job != nil {
            jm.enqueueController(job, immediate)
        }
    }

    // pod 发生更新, job 对象也需要响应的更新
    if curControllerRef != nil {
        job := jm.resolveControllerRef(curPod.Namespace, curControllerRef)
        if job == nil {
            return
        }
        jm.enqueueController(job, immediate)
        return
    }

    // 如果是孤儿 pod ,标签页发生了变化,尝试查找有没有符合收养的 job 对象来收养这些 pod
    labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels)
    if labelChanged || controllerRefChanged {
        for _, job := range jm.getPodJobs(curPod) {
            jm.enqueueController(job, immediate)
        }
    }
}

// pod 发生删除事件,响应的也需要更新控制器的 pod 期望值
func (jm *Controller) deletePod(obj interface{}) {
    pod, ok := obj.(*v1.Pod)

    // 判断 obj 是否 pod 对象
    if !ok {
        tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
        if !ok {
            utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %+v", obj))
            return
        }
        pod, ok = tombstone.Obj.(*v1.Pod)
        if !ok {
            utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a pod %+v", obj))
            return
        }
    }

    controllerRef := metav1.GetControllerOf(pod)
    if controllerRef == nil {
        // 如果是孤儿 pod ,并没有控制器关心它被删除
        return
    }
    job := jm.resolveControllerRef(pod.Namespace, controllerRef)
    if job == nil {
        return
    }
    jobKey, err := controller.KeyFunc(job)
    if err != nil {
        return
    }
  // 期望删除的 pod -1
    jm.expectations.DeletionObserved(jobKey)
  // pod 被删除,JobStatus.Active 可能会变化
    jm.enqueueController(job, true)
}

期望值是什么?

expectations

前面我们有看到 expectations.CreationObserved ,expectations.DeletionObserved ,这个又是什么呢?

我们看看它的一些相关方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
type ControllerExpectationsInterface interface {
    // 获取Controller的期望,ControlleeExpectations后文有介绍,此处的controllerKey就是Xxx的唯一键,即NS/Name
    GetExpectations(controllerKey string) (*ControlleeExpectations, bool, error)
    // 判断Controller的期望是否达成
    SatisfiedExpectations(controllerKey string) bool
    // 删除Controller的期望
    DeleteExpectations(controllerKey string)
    // 设置Controller的期望,其中add是创建子对象的数量,del是删除子对象的数量
    SetExpectations(controllerKey string, add, del int) error
    // Controller期望创建adds个子对象
    ExpectCreations(controllerKey string, adds int) error
    // Controller期望删除dels个子对象
    ExpectDeletions(controllerKey string, dels int) error
    // 观测到Controller的一个子对象创建完成,此时Controller期望创建的子对象数量减一
    CreationObserved(controllerKey string)
    // 观测到Controller的一个子对象删除完成,此时Controller期望删除的子对象数量减一
    DeletionObserved(controllerKey string)
    // 提升Controller的期望,add和del分别是创建和删除子对象的增量
    RaiseExpectations(controllerKey string, add, del int)
    // 降低Controller的期望,add和del分别是创建和删除子对象的增量
    LowerExpectations(controllerKey string, add, del int)
}

源码路径 kubernetes\pkg\controller\controller_utils.go

ControllerExpectations 里面存储的是 ControlleeExpectations 对象,一个 ControlleeExpectations 对象就相当于 map 里面的一行数据

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
// 该结构体实现了 ControllerExpectationsInterface 接口
type ControllerExpectations struct {
    cache.Store
}
// 判断期望是否达成
func (r *ControllerExpectations) SatisfiedExpectations(controllerKey string) bool {
    if exp, exists, err := r.GetExpectations(controllerKey); exists {
        // 期望已经达成
        if exp.Fulfilled() {
            klog.V(4).Infof("Controller expectations fulfilled %#v", exp)
            return true
        // 期望过期
        } else if exp.isExpired() {
            klog.V(4).Infof("Controller expectations expired %#v", exp)
            return true
        // 期望还没完成,等待期望完成
        } else {
            klog.V(4).Infof("Controller still waiting on expectations %#v", exp)
            return false
        }
        // 期望报错
    } else if err != nil {
        klog.V(2).Infof("Error encountered while checking expectations %#v, forcing sync", err)
        // 没有期望记录
    } else {
        klog.V(4).Infof("Controller %v either never recorded expectations, or the ttl expired.", controllerKey)
    }
    return true
}

type ControlleeExpectations struct {
    add       int64
    del       int64
    key       string
    timestamp time.Time
}

// 增加 add/del 期望值.
func (e *ControlleeExpectations) Add(add, del int64) {
    atomic.AddInt64(&e.add, add)
    atomic.AddInt64(&e.del, del)
}

// 期望达成返回 true.
func (e *ControlleeExpectations) Fulfilled() bool {
    return atomic.LoadInt64(&e.add) <= 0 && atomic.LoadInt64(&e.del) <= 0
}

// 获取 add/del 的期望值
func (e *ControlleeExpectations) GetExpectations() (int64, int64) {
    return atomic.LoadInt64(&e.add), atomic.LoadInt64(&e.del)
}

// 期望是否过期
func (exp *ControlleeExpectations) isExpired() bool {
    return clock.RealClock{}.Since(exp.timestamp) > ExpectationsTimeout
}

// 给控制器注册一个新的期望
func (r *ControllerExpectations) SetExpectations(controllerKey string, add, del int) error {
    exp := &ControlleeExpectations{add: int64(add), del: int64(del), key: controllerKey, timestamp: clock.RealClock{}.Now()}
    klog.V(4).Infof("Setting expectations %#v", exp)
    return r.Add(exp)
}

ControllerExpectations 管理所有Xxx(比如ReplicaSet)对象的期望,ControlleeExpectations 就是某一个 Xxx 对象的期望。

ControllerExpectations 可以理解为一个 map。举例来说,这个 map 可以认为有四个关键字段(对应 ControlleeExpectations)。

key: 有rs的ns和 rs的name组成

Add: 表示这个rs还需要增加多少个rs

del: 表示这个rs还需要删除多少个pod

Time: 表示创建时间

Key Add Del Time
Default/zx1 0 0 2021.07.04 16:00:00
zx/zx1 1 0 2021.07.04 16:00:00

worker

我们再来看看 worker 协程是怎么处理 job 对象的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// worker 线程从队列中取出 job 对象,并处理,处理完成后做响应的标记。
func (jm *Controller) worker() {
  for jm.processNextWorkItem() {
  }
}

func (jm *Controller) processNextWorkItem() bool {
  key, quit := jm.queue.Get()
  if quit {
    return false
  }
  // 正常处理完成,将 item 标记为 Done
  defer jm.queue.Done(key)

  forget, err := jm.syncHandler(key.(string))
  if err == nil {
    if forget {
      // 删除重试item, 表明 item 已经被处理了。注意!不是在队列中删除,而是在限速器中删除
      jm.queue.Forget(key)
    }
    return true
  }

  // 重新加入队列,item 被放入了多少次(即重试了多少次,这个重试记录在限速器中)
  utilruntime.HandleError(fmt.Errorf("Error syncing job: %v", err))
  jm.queue.AddRateLimited(key)

  return true
}

syncJob

从上面不难看出,重点的处理逻辑就在 jm.syncHandler() 这个函数里面,我们来看看他是怎么做的,这个方法有点长。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
func (jm *Controller) syncJob(key string) (forget bool, rErr error) {
    startTime := time.Now()
    defer func() {
        klog.V(4).Infof("Finished syncing job %q (%v)", key, time.Since(startTime))
    }()
    // 根据 key 获取 job 对象名和命名空间
    ns, name, err := cache.SplitMetaNamespaceKey(key)
    if err != nil {
        return false, err
    }
    if len(ns) == 0 || len(name) == 0 {
        return false, fmt.Errorf("invalid job key %q: either namespace or name is missing", key)
    }
    // 根据名和命名空间得到 job 对象
    sharedJob, err := jm.jobLister.Jobs(ns).Get(name)
    if err != nil {
        if apierrors.IsNotFound(err) {
            klog.V(4).Infof("Job has been deleted: %v", key)
            jm.expectations.DeleteExpectations(key)
            return true, nil
        }
        return false, err
    }
    // 从缓存中深拷贝一份数据,不会动到缓存中的数据
    job := *sharedJob.DeepCopy()

    // 检查 job 是否已完成状态
    if IsJobFinished(&job) {
        return true, nil
    }

    // 检查 Indexed Job 特性有没有开启.
    if !feature.DefaultFeatureGate.Enabled(features.IndexedJob) && isIndexedJob(&job) {
        jm.recorder.Event(&job, v1.EventTypeWarning, "IndexedJobDisabled", "Skipped Indexed Job sync because feature is disabled.")
        return false, nil
    }
    // 处理未知 CompletionMode
    if job.Spec.CompletionMode != nil && *job.Spec.CompletionMode != batch.NonIndexedCompletion && *job.Spec.CompletionMode != batch.IndexedCompletion {
        jm.recorder.Event(&job, v1.EventTypeWarning, "UnknownCompletionMode", "Skipped Job sync because completion mode is unknown")
        return false, nil
    }

    // 设置 completionMode NonIndexed
    completionMode := string(batch.NonIndexedCompletion)
    if isIndexedJob(&job) {
        completionMode = string(batch.IndexedCompletion)
    }
    action := metrics.JobSyncActionReconciling

    // metrics 相关部分
    defer func() {
        result := "success"
        if rErr != nil {
            result = "error"
        }

        metrics.JobSyncDurationSeconds.WithLabelValues(completionMode, result, action).Observe(time.Since(startTime).Seconds())
        metrics.JobSyncNum.WithLabelValues(completionMode, result, action).Inc()
    }()

    // 处理 pod finalizer,1.22 版本 alpha 的特性
    // UncountedTerminatedPods 保存已终止但尚未被作业控制器纳入状态计数器中的 Pod 的 UID 的集合。
    // job 控制器所创建 Pod 带有终结器。当 Pod 终止(成功或失败)时,控制器将执行三个步骤以在作业状态中对其进行说明: (1)将 Pod UID 添加到此字段的列表中。(2)去掉 Pod 中的终结器。(3)从数组中删除 Pod UID,同时为相应的计数器加一。
    var uncounted *uncountedTerminatedPods
    if trackingUncountedPods(&job) {
        klog.V(4).InfoS("Tracking uncounted Pods with pod finalizers", "job", klog.KObj(&job))
        if job.Status.UncountedTerminatedPods == nil {
            job.Status.UncountedTerminatedPods = &batch.UncountedTerminatedPods{}
        }
        uncounted = newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods)
    // 删除 job 的 "batch.kubernetes.io/job-tracking" 注解
    } else if patch := removeTrackingAnnotationPatch(&job); patch != nil {
        if err := jm.patchJobHandler(&job, patch); err != nil {
            return false, fmt.Errorf("removing tracking finalizer from job %s: %w", key, err)
        }
    }

    // 在统计活跃 pod 之前,检查期望状态是否达成,如果先统计 pod 的 active,succeeded,failed 状态,再检查期望是否达成,有可能出现一个问题就是,pod 状态已经统计完了,在统计过程中假如有 pod add 事件没有被统计到 active 中,接下来会出现期望已经达成,但是 active 的数量不够的情况出现。
    jobNeedsSync := jm.expectations.SatisfiedExpectations(key)

    pods, err := jm.getPodsForJob(&job, uncounted != nil)
    if err != nil {
        return false, err
    }

    activePods := controller.FilterActivePods(pods)
    active := int32(len(activePods))

    succeeded, failed := getStatus(&job, pods, uncounted)
    // 第一次启动且非 Suspended 状态,设置 StartTime
    if job.Status.StartTime == nil && !jobSuspended(&job) {
        now := metav1.Now()
        job.Status.StartTime = &now
    }

    var manageJobErr error
    var finishedCondition *batch.JobCondition

    // 这里意思是拿到 job 后,又有新的错误产生并同步到缓存中?
    jobHasNewFailure := failed > job.Status.Failed
    exceedsBackoffLimit := jobHasNewFailure && (active != *job.Spec.Parallelism) &&
        (failed > *job.Spec.BackoffLimit)

    if exceedsBackoffLimit || pastBackoffLimitOnFailure(&job, pods) {
        // 错误重试次数超过了阈值,作业失败
        finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, "BackoffLimitExceeded", "Job has reached the specified backoff limit")
    } else if pastActiveDeadline(&job) {
        // 执行超时
        finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, "DeadlineExceeded", "Job was active longer than specified deadline")
        // job 配置了 ActiveDeadlineSeconds ,重新加入队列
    } else if job.Spec.ActiveDeadlineSeconds != nil && !jobSuspended(&job) {
        syncDuration := time.Duration(*job.Spec.ActiveDeadlineSeconds)*time.Second - time.Since(job.Status.StartTime.Time)
        klog.V(2).InfoS("Job has activeDeadlineSeconds configuration. Will sync this job again", "job", key, "nextSyncIn", syncDuration)
        jm.queue.AddAfter(key, syncDuration)
    }

    var prevSucceededIndexes, succeededIndexes orderedIntervals
    if isIndexedJob(&job) {
        prevSucceededIndexes, succeededIndexes = calculateSucceededIndexes(&job, pods)
        succeeded = int32(succeededIndexes.total())
    }
    suspendCondChanged := false
    // 如果 job 执行失败,移除所有活跃的 pod
    if finishedCondition != nil {
        deleted, err := jm.deleteActivePods(&job, activePods)
        if uncounted == nil {
            // 默认操作,预设所有 active 状态的 pod 都删掉
            deleted = active
        } else if deleted != active {
            // 可能 有 pod finalizers 的存在.
            finishedCondition = nil
        }
        active -= deleted
        failed += deleted
        manageJobErr = err
    } else {
        // job 执行成功,且未处于被删除状态
        manageJobCalled := false
        // 调用 manageJob 管理 pod 数量
        if jobNeedsSync && job.DeletionTimestamp == nil {
            active, action, manageJobErr = jm.manageJob(&job, activePods, succeeded, succeededIndexes)
            manageJobCalled = true
        }
        complete := false
        if job.Spec.Completions == nil {
            // 没有活跃状态的 pod,且有一个成功,则认为 job 成功了
            complete = succeeded > 0 && active == 0
        } else {
            // 成功的 pod 数量大于等于 Completions 指定的数量也认为是成功了
            complete = succeeded >= *job.Spec.Completions && active == 0
        }
        if complete {
            // 更新状态
            finishedCondition = newCondition(batch.JobComplete, v1.ConditionTrue, "", "")
            // 1.21 新特性,将 suspend 字段设置为 true ,Job 控制器将不会创建 Pod,直到准备好启动 Job
        } else if feature.DefaultFeatureGate.Enabled(features.SuspendJob) && manageJobCalled {
            // 如果 Suspend 字段非空
            if job.Spec.Suspend != nil && *job.Spec.Suspend {
                // Suspend 在 job 执行中中途变更
                var isUpdated bool
                job.Status.Conditions, isUpdated = ensureJobConditionStatus(job.Status.Conditions, batch.JobSuspended, v1.ConditionTrue, "JobSuspended", "Job suspended")
                if isUpdated {
                    suspendCondChanged = true
                    jm.recorder.Event(&job, v1.EventTypeNormal, "Suspended", "Job suspended")
                }
            } else {
                // Job not suspended.
                var isUpdated bool
                job.Status.Conditions, isUpdated = ensureJobConditionStatus(job.Status.Conditions, batch.JobSuspended, v1.ConditionFalse, "JobResumed", "Job resumed")
                if isUpdated {
                    suspendCondChanged = true
                    jm.recorder.Event(&job, v1.EventTypeNormal, "Resumed", "Job resumed")
                    // 继续执行,StartTime 将重置
                    now := metav1.Now()
                    job.Status.StartTime = &now
                }
            }
        }
    }

    forget = false
    // 如果 pod 成功数量比上一次多,则退避次数清零
    if job.Status.Succeeded < succeeded {
        forget = true
    }

    if uncounted != nil {
        needsStatusUpdate := suspendCondChanged || active != job.Status.Active
        job.Status.Active = active
        err = jm.trackJobStatusAndRemoveFinalizers(&job, pods, prevSucceededIndexes, *uncounted, finishedCondition, needsStatusUpdate)
        if err != nil {
            return false, err
        }
        jobFinished := IsJobFinished(&job)
        if jobHasNewFailure && !jobFinished {
            // returning an error will re-enqueue Job after the backoff period
            return forget, fmt.Errorf("failed pod(s) detected for job key %q", key)
        }
        forget = true
        return forget, manageJobErr
    }
    // Legacy path: tracking without finalizers.

    // Ensure that there are no leftover tracking finalizers.
    if err := jm.removeTrackingFinalizersFromAllPods(pods); err != nil {
        return false, fmt.Errorf("removing disabled finalizers from job pods %s: %w", key, err)
    }

    // 如果 job status 没有发生变化,则不做任何变更
    if job.Status.Active != active || job.Status.Succeeded != succeeded || job.Status.Failed != failed || suspendCondChanged || finishedCondition != nil {
        job.Status.Active = active
        job.Status.Succeeded = succeeded
        job.Status.Failed = failed
        if isIndexedJob(&job) {
            job.Status.CompletedIndexes = succeededIndexes.String()
        }
        job.Status.UncountedTerminatedPods = nil
        jm.enactJobFinished(&job, finishedCondition)

        if err := jm.updateStatusHandler(&job); err != nil {
            return forget, err
        }

        if jobHasNewFailure && !IsJobFinished(&job) {
            // returning an error will re-enqueue Job after the backoff period
            return forget, fmt.Errorf("failed pod(s) detected for job key %q", key)
        }

        forget = true
    }

    return forget, manageJobErr
}

manageJob

管理 pod 数量,多删少补

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
func (jm *Controller) manageJob(job *batch.Job, activePods []*v1.Pod, succeeded int32, succeededIndexes []interval) (int32, string, error) {
    active := int32(len(activePods))
    parallelism := *job.Spec.Parallelism
    jobKey, err := controller.KeyFunc(job)
    if err != nil {
        utilruntime.HandleError(fmt.Errorf("Couldn't get key for job %#v: %v", job, err))
        return 0, metrics.JobSyncActionTracking, nil
    }

    // 如果 job 中途 Suspended ,删掉所有活跃的 pod , 这些 pod 不会被 job 控制器视为失败
    if jobSuspended(job) {
        klog.V(4).InfoS("Deleting all active pods in suspended job", "job", klog.KObj(job), "active", active)
        podsToDelete := activePodsForRemoval(job, activePods, int(active))
        jm.expectations.ExpectDeletions(jobKey, len(podsToDelete))
        removed, err := jm.deleteJobPods(job, jobKey, podsToDelete)
        active -= removed
        return active, metrics.JobSyncActionPodsDeleted, err
    }
    // 期望的 active pod 数量
    wantActive := int32(0)
    if job.Spec.Completions == nil {
        // Completions == nil 有一个成功则认为 job 成功完成
        if succeeded > 0 {
            wantActive = active
        } else {
            // 否则,等于他的最大并发度
            wantActive = parallelism
        }
    } else {
        // 指定了Completions,则需要再新建 Completions - succeeded 个 pod
        wantActive = *job.Spec.Completions - succeeded
        if wantActive > parallelism {
            // 如果 wantActive > parallelism ,则需要新建 parallelism 这么多个 pod, 很好理解
            wantActive = parallelism
        }
        if wantActive < 0 {
            // 小于0就不需要新建了
            wantActive = 0
        }
    }

    rmAtLeast := active - wantActive
    if rmAtLeast < 0 {
        rmAtLeast = 0
    }
    // 计算最小删除代价的 pod
    podsToDelete := activePodsForRemoval(job, activePods, int(rmAtLeast))
    if len(podsToDelete) > maxPodCreateDeletePerSync {
        // 一次最多删除 maxPodCreateDeletePerSync 个,默认是 500
        podsToDelete = podsToDelete[:maxPodCreateDeletePerSync]
    }
    // podsToDelete > 0 ,则需要删除多余的 pod
    if len(podsToDelete) > 0 {
        // 期望删除 +len(podsToDelete)
        jm.expectations.ExpectDeletions(jobKey, len(podsToDelete))
        klog.V(4).InfoS("Too many pods running for job", "job", klog.KObj(job), "deleted", len(podsToDelete), "target", parallelism)
        // 相应的 deleteJobPods 会减去期望删除的值
        removed, err := jm.deleteJobPods(job, jobKey, podsToDelete)
        active -= removed
        // While it is possible for a Job to require both pod creations and
        // deletions at the same time (e.g. indexed Jobs with repeated indexes), we
        // restrict ourselves to either just pod deletion or pod creation in any
        // given sync cycle. Of these two, pod deletion takes precedence.
        return active, metrics.JobSyncActionPodsDeleted, err
    }

    // pod 数量少于 active ,新建少的部分 pod
    if active < wantActive {
        diff := wantActive - active
        if diff > int32(maxPodCreateDeletePerSync) {
            diff = int32(maxPodCreateDeletePerSync)
        }

        // 期望新增 +int(diff) 等同于 sync.WaitGroup.Add(N)
        jm.expectations.ExpectCreations(jobKey, int(diff))
        errCh := make(chan error, diff)
        klog.V(4).Infof("Too few pods running job %q, need %d, creating %d", jobKey, wantActive, diff)

        wait := sync.WaitGroup{}

        var indexesToAdd []int
        if isIndexedJob(job) {
            indexesToAdd = firstPendingIndexes(activePods, succeededIndexes, int(diff), int(*job.Spec.Completions))
            diff = int32(len(indexesToAdd))
        }
        active += diff

        podTemplate := job.Spec.Template.DeepCopy()
        if isIndexedJob(job) {
            addCompletionIndexEnvVariables(podTemplate)
        }
        if trackingUncountedPods(job) {
            podTemplate.Finalizers = appendJobCompletionFinalizerIfNotFound(podTemplate.Finalizers)
        }

        // 分批创建Pod,第一批是1个,第二批是2个,第三批是4个,以此类推
        for batchSize := int32(integer.IntMin(int(diff), controller.SlowStartInitialBatchSize)); diff > 0; batchSize = integer.Int32Min(2*batchSize, diff) {
            errorCount := len(errCh)
            wait.Add(int(batchSize))
            for i := int32(0); i < batchSize; i++ {
                completionIndex := unknownCompletionIndex
                if len(indexesToAdd) > 0 {
                    completionIndex = indexesToAdd[0]
                    indexesToAdd = indexesToAdd[1:]
                }
                go func() {
                    template := podTemplate
                    generateName := ""
                    if completionIndex != unknownCompletionIndex {
                        template = podTemplate.DeepCopy()
                        addCompletionIndexAnnotation(template, completionIndex)
                        template.Spec.Hostname = fmt.Sprintf("%s-%d", job.Name, completionIndex)
                        generateName = podGenerateNameWithIndex(job.Name, completionIndex)
                    }
                    defer wait.Done()
                    err := jm.podControl.CreatePodsWithGenerateName(job.Namespace, template, job, metav1.NewControllerRef(job, controllerKind), generateName)
                    if err != nil {
                        if apierrors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
                            // If the namespace is being torn down, we can safely ignore
                            // this error since all subsequent creations will fail.
                            return
                        }
                    }
                    if err != nil {
                        defer utilruntime.HandleError(err)
                        klog.V(2).Infof("Failed creation, decrementing expectations for job %q/%q", job.Namespace, job.Name)
                        // 等同于 sync.WaitGroup.Done(),因为没有成功执行,要减去多余的期望
                        jm.expectations.CreationObserved(jobKey)
                        atomic.AddInt32(&active, -1)
                        errCh <- err
                    }
                }()
            }
            wait.Wait()
            // 如果上面执行第一 pod 报错,从报错开始的 pod 后面的其他 pod 都跳过不执行,这里 skippedPods 记录的就是不执行的 pod 个数
            skippedPods := diff - batchSize
            if errorCount < len(errCh) && skippedPods > 0 {
                klog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for job %q/%q", skippedPods, job.Namespace, job.Name)
                active -= skippedPods
                for i := int32(0); i < skippedPods; i++ {
                    // 要减去忽略的 pod 期望数量
                    jm.expectations.CreationObserved(jobKey)
                }
                // The skipped pods will be retried later. The next controller resync will
                // retry the slow start process.
                break
            }
            diff -= batchSize
        }
        return active, metrics.JobSyncActionPodsCreated, errorFromChannel(errCh)
    }

    return active, metrics.JobSyncActionTracking, nil
}

总结

  • job 控制器启动后监听 job 增删改事件,和其他控制器一样,它监听到事件后并没有直接做操作,而是加入到队列里面,通过队列来实现多线程操作。

  • 在入列的时候,immediate 参数为 true 则立即加入队列,否则根据失败次数计算退避时间再再次加入到延迟队列。

  • 在更新 job 对象的时候,会计算 ActiveDeadlineSeconds 时间,如果对比上次更新多了 ActiveDeadlineSeconds 字段,则计算出延迟加入队列的时间再入列。

  • job 控制器也引入了 pod 控制器,用来创建/删除/更新 pod 信息,这个 pod 控制器是再 controller_util 里面对 Clientset 操作 Pod 接口的再封装。

  • 监听到 pod 事件时, 先判断 pod 是否处于删除状态,再判断 pod 是否属于 job 对象的,是的话,将 pod 的父对象加入队列,更新 job 对象的状态什么的。

  • 我们注意到一直提到 ControllerExpectations 这个东西,这个按我目前的理解是类似于 sync.WaitGroup ,期望创建的 Pod 的数量为 N (等同于 sync.WaitGroup.Add(N) ),监视( SharedIndexInformer 的 Pod 的事件)到一个Pod创建成功就会通知 ControllerExpectations 对期望创建的 Pod 数量减一(等同于 sync.WaitGroup.Done() )

  • 正是由于有 ControllerExpectations 这个的存在,syncJob 才能处理 job 对象看起来像在同步操作一样,直到期望达成才进入下一步的操作。

  • manageJob 这里是控制 pod 数量的地方,多则删除,少则新增。