目录

kubernetes源码-Cronjob Controller 原理和源码分析

Cronjob Controller 原理和源码分析,源码为 kubernetes 的 release-1.22 分支 .

写在前面

Cronjob 控制器是 Job 控制器的升级版,了解 kubernetes 基础的都知道,它是具有定时属性的 Job ,能周期性执行 Job ,类似我们 linux 系统里面的 crontab 。

跟 Job 控制器一样,它的入口函数位于 kubernetes/cmd/kube-controller-manager/app/batch.go 里面。

startCronJobController

我们看到在启动函数里面有去判断 CronJobControllerV2 特性是否开启的逻辑,在我们当前 1.22 的版本里面,默认是 enable 的。所以我们直接看 CronJobControllerV2 里面的代码,不过它里面也有会有引用到 CronJobController 的一些接口和其他的一些函数,这个有兴趣的可以自行阅读。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
func startCronJobController(ctx ControllerContext) (http.Handler, bool, error) {
  if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CronJobControllerV2) {
    cj2c, err := cronjob.NewControllerV2(ctx.InformerFactory.Batch().V1().Jobs(),
      ctx.InformerFactory.Batch().V1().CronJobs(),
      ctx.ClientBuilder.ClientOrDie("cronjob-controller"),
    )
    if err != nil {
      return nil, true, fmt.Errorf("error creating CronJob controller V2: %v", err)
    }
    go cj2c.Run(int(ctx.ComponentConfig.CronJobController.ConcurrentCronJobSyncs), ctx.Stop)
    return nil, true, nil
  }
  cjc, err := cronjob.NewController(
    ctx.ClientBuilder.ClientOrDie("cronjob-controller"),
  )
  if err != nil {
    return nil, true, fmt.Errorf("error creating CronJob controller: %v", err)
  }
  go cjc.Run(ctx.Stop)
  return nil, true, nil
}

NewControllerV2

构造函数,跟其他控制器差不多,监听 cronJob 和 job 事件。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
func NewControllerV2(jobInformer batchv1informers.JobInformer, cronJobsInformer batchv1informers.CronJobInformer, kubeClient clientset.Interface) (*ControllerV2, error) {
  eventBroadcaster := record.NewBroadcaster()
  eventBroadcaster.StartStructuredLogging(0)
  eventBroadcaster.StartRecordingToSink(&covev1client.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})

  if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
    if err := ratelimiter.RegisterMetricAndTrackRateLimiterUsage("cronjob_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter()); err != nil {
      return nil, err
    }
  }

  jm := &ControllerV2{
    queue:    workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "cronjob"),
    recorder: eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "cronjob-controller"}),

    jobControl:     realJobControl{KubeClient: kubeClient},
    cronJobControl: &realCJControl{KubeClient: kubeClient},

    jobLister:     jobInformer.Lister(),
    cronJobLister: cronJobsInformer.Lister(),

    jobListerSynced:     jobInformer.Informer().HasSynced,
    cronJobListerSynced: cronJobsInformer.Informer().HasSynced,
    now:                 time.Now,
  }

  jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    AddFunc:    jm.addJob,
    UpdateFunc: jm.updateJob,
    DeleteFunc: jm.deleteJob,
  })

  cronJobsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    AddFunc: func(obj interface{}) {
      jm.enqueueController(obj)
    },
    UpdateFunc: jm.updateCronJob,
    DeleteFunc: func(obj interface{}) {
      jm.enqueueController(obj)
    },
  })

  metrics.Register()

  return jm, nil
}

监听函数

跟其他控制器一样,它也设置了很多的事件监听,我们看看其中一些比较有用的。

cronJob 对象

AddFunc ,DeleteFunc 都是直接加入队列,updateCronJob 简单对比一下新旧 cronJob 的 Schedule 时间是否一致,如果数量不一致会对 Schedule 时间做解析,然后判断一下距离下一次执行 job 还需要多久,再加入延迟队列,在对应的时间后进行同步。

  • AddFunc

  • updateCronJob

  • DeleteFunc

job 对象

这里没有什么特殊的,都是简单判断一下,然后入列,简单看看就可以了。

  • addJob

  • updateJob

  • deleteJob

enqueue

跟其他控制器一样,也是延迟入列和立即入列,没什么特殊的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
func (jm *ControllerV2) enqueueController(obj interface{}) {
  key, err := controller.KeyFunc(obj)
  if err != nil {
    utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err))
    return
  }

  jm.queue.Add(key)
}

func (jm *ControllerV2) enqueueControllerAfter(obj interface{}, t time.Duration) {
  key, err := controller.KeyFunc(obj)
  if err != nil {
    utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err))
    return
  }

  jm.queue.AddAfter(key, t)
}

Run

默认5个线程。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
func (jm *ControllerV2) Run(workers int, stopCh <-chan struct{}) {
  defer utilruntime.HandleCrash()
  defer jm.queue.ShutDown()

  klog.InfoS("Starting cronjob controller v2")
  defer klog.InfoS("Shutting down cronjob controller v2")

  if !cache.WaitForNamedCacheSync("cronjob", stopCh, jm.jobListerSynced, jm.cronJobListerSynced) {
    return
  }

  for i := 0; i < workers; i++ {
    go wait.Until(jm.worker, time.Second, stopCh)
  }

  <-stopCh
}

func (jm *ControllerV2) worker() {
  for jm.processNextWorkItem() {
  }
}

func (jm *ControllerV2) processNextWorkItem() bool {
  key, quit := jm.queue.Get()
  if quit {
    return false
  }
  defer jm.queue.Done(key)

  requeueAfter, err := jm.sync(key.(string))
  switch {
  case err != nil:
    utilruntime.HandleError(fmt.Errorf("error syncing CronJobController %v, requeuing: %v", key.(string), err))
    jm.queue.AddRateLimited(key)
  // 检查是否延迟入列
  case requeueAfter != nil:
    jm.queue.Forget(key)
    jm.queue.AddAfter(key, *requeueAfter)
  }
  return true
}

sync

控制器的核心逻辑在这里实现。

  1. 先根据唯一 key 从缓存中获取 cronJob 对象。

  2. 拿到 cronJob 对象,再根据 cronJob 对象调用 jm.getJobsToBeReconciled() 获取它的所有 job 对象。

  3. 调用 jm.syncCronJob() 返回 cronJobCopy, requeueAfter 参数,根据 cronJobCopy 对象来清理已完成的 job ,requeueAfter 用于重新将 cronjob 对象加入下一次同步的延迟队列。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
func (jm *ControllerV2) sync(cronJobKey string) (*time.Duration, error) {
  ns, name, err := cache.SplitMetaNamespaceKey(cronJobKey)
  if err != nil {
    return nil, err
  }

  cronJob, err := jm.cronJobLister.CronJobs(ns).Get(name)
  switch {
  case errors.IsNotFound(err):
    // may be cronjob is deleted, don't need to requeue this key
    klog.V(4).InfoS("CronJob not found, may be it is deleted", "cronjob", klog.KRef(ns, name), "err", err)
    return nil, nil
  case err != nil:
    // for other transient apiserver error requeue with exponential backoff
    return nil, err
  }
  // 获取它的所有 job 对象
  jobsToBeReconciled, err := jm.getJobsToBeReconciled(cronJob)
  if err != nil {
    return nil, err
  }
  // 拿到同步后的 cronjob 对象,和同步的延迟时间
  cronJobCopy, requeueAfter, err := jm.syncCronJob(cronJob, jobsToBeReconciled)
  if err != nil {
    klog.V(2).InfoS("Error reconciling cronjob", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()), "err", err)
    return nil, err
  }
  // 清理已完成的 job
  err = jm.cleanupFinishedJobs(cronJobCopy, jobsToBeReconciled)
  if err != nil {
    klog.V(2).InfoS("Error cleaning up jobs", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()), "resourceVersion", cronJob.GetResourceVersion(), "err", err)
    return nil, err
  }
  // 延迟入列
  if requeueAfter != nil {
    klog.V(4).InfoS("Re-queuing cronjob", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()), "requeueAfter", requeueAfter)
    return requeueAfter, nil
  }
  // this marks the key done, currently only happens when the cronjob is suspended or spec has invalid schedule format
  return nil, nil
}

func (jm *ControllerV2) getJobsToBeReconciled(cronJob *batchv1.CronJob) ([]*batchv1.Job, error) {
  // 获取 cronjob 对应命名空间下的所有 job 对象
  jobList, err := jm.jobLister.Jobs(cronJob.Namespace).List(labels.Everything())
  if err != nil {
    return nil, err
  }

  jobsToBeReconciled := []*batchv1.Job{}
  // 根据 GetControllerOf() 获取属于该 cronjob 对象的 job
  for _, job := range jobList {
    // If it has a ControllerRef, that's all that matters.
    if controllerRef := metav1.GetControllerOf(job); controllerRef != nil && controllerRef.Name == cronJob.Name {
      // this job is needs to be reconciled
      jobsToBeReconciled = append(jobsToBeReconciled, job)
    }
  }
  return jobsToBeReconciled, nil
}

syncCronJob

这个函数是 cronJob 的核心逻辑所在。

  1. 过滤正在运行的 job ,检查 job 是否已完成。

  2. 处于暂停状态的 cronjob 不做处理。

  3. 解析调度时间,解析时区,获取下一次启动 job 的时间。

  4. 检查是否太晚执行,是的话, +100ms 用于调整下次入列的时间差。

  5. 检查并发策略根据 Replace、 Forbid 做不同操作。

  6. 获取 job Template,用于创建 job 。

  7. 根据创建 job 返回的实例对象更新 cronjob status 。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
func (jm *ControllerV2) syncCronJob(
  cj *batchv1.CronJob,
  js []*batchv1.Job) (*batchv1.CronJob, *time.Duration, error) {

  cj = cj.DeepCopy()
  now := jm.now()

  childrenJobs := make(map[types.UID]bool)
  for _, j := range js {
    childrenJobs[j.ObjectMeta.UID] = true
    // 检测当前 job 是否正在运行
    found := inActiveList(*cj, j.ObjectMeta.UID)
    // 如果不是,job 也没完成
    if !found && !IsJobFinished(j) {
      // 在此查找 cronjob 对象
      cjCopy, err := jm.cronJobControl.GetCronJob(cj.Namespace, cj.Name)
      if err != nil {
        return nil, nil, err
      }
      // 再检测当前 job 是否正在运行,防止创建完 job 后控制器奔溃没有更新到 status ,或者 cronjob 的 status 不对最新的状态,再次检查防止漏了。
      if inActiveList(*cjCopy, j.ObjectMeta.UID) {
        cj = cjCopy
        continue
      }
      jm.recorder.Eventf(cj, corev1.EventTypeWarning, "UnexpectedJob", "Saw a job that the controller did not create or forgot: %s", j.Name)
      // 这里也是防止 job 加入到 inActiveList 后,job 完成了但 cronjob 的 status 不是最新的的情况
    } else if found && IsJobFinished(j) {
      _, status := getFinishedStatus(j)
      // 把它从活跃 map 中移除
      deleteFromActiveList(cj, j.ObjectMeta.UID)
      jm.recorder.Eventf(cj, corev1.EventTypeNormal, "SawCompletedJob", "Saw completed job: %s, status: %v", j.Name, status)
    } else if IsJobFinished(j) {
      // 如果已经完成,更新 cronjob 的完成时间
      if cj.Status.LastSuccessfulTime == nil {
        cj.Status.LastSuccessfulTime = j.Status.CompletionTime
      }
      if j.Status.CompletionTime != nil && j.Status.CompletionTime.After(cj.Status.LastSuccessfulTime.Time) {
        cj.Status.LastSuccessfulTime = j.Status.CompletionTime
      }
    }
  }

  // 再次检查 job 是否完成,并将已完成的 job 移出活跃 map 列表
  for _, j := range cj.Status.Active {
    _, found := childrenJobs[j.UID]
    if found {
      continue
    }
    // 直接从 api-server 获取,防止本地缓存没有及时同步而拿不到最新的结果
    _, err := jm.jobControl.GetJob(j.Namespace, j.Name)
    switch {
    case errors.IsNotFound(err):
      // 如果已经不存在了
      jm.recorder.Eventf(cj, corev1.EventTypeNormal, "MissingJob", "Active job went missing: %v", j.Name)
      deleteFromActiveList(cj, j.UID)
    case err != nil:
      return cj, nil, err
    }
    // the job is missing in the lister but found in api-server
  }
  // 更新 cronjob status
  updatedCJ, err := jm.cronJobControl.UpdateStatus(cj)
  if err != nil {
    klog.V(2).InfoS("Unable to update status for cronjob", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()), "resourceVersion", cj.ResourceVersion, "err", err)
    return cj, nil, err
  }
  *cj = *updatedCJ

  if cj.DeletionTimestamp != nil {
    // The CronJob is being deleted.
    // Don't do anything other than updating status.
    return cj, nil, nil
  }
  // 如果是暂停状态则直接返回,不做处理
  if cj.Spec.Suspend != nil && *cj.Spec.Suspend {
    klog.V(4).InfoS("Not starting job because the cron is suspended", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()))
    return cj, nil, nil
  }
  // 解析调度时间
  sched, err := cron.ParseStandard(cj.Spec.Schedule)
  if err != nil {
    // this is likely a user error in defining the spec value
    // we should log the error and not reconcile this cronjob until an update to spec
    klog.V(2).InfoS("Unparseable schedule", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()), "schedule", cj.Spec.Schedule, "err", err)
    jm.recorder.Eventf(cj, corev1.EventTypeWarning, "UnparseableSchedule", "unparseable schedule: %q : %s", cj.Spec.Schedule, err)
    return cj, nil, nil
  }
  // 解析时区
  if strings.Contains(cj.Spec.Schedule, "TZ") {
    jm.recorder.Eventf(cj, corev1.EventTypeWarning, "UnsupportedSchedule", "CRON_TZ or TZ used in schedule %q is not officially supported, see https://kubernetes.io/docs/concepts/workloads/controllers/cron-jobs/ for more details", cj.Spec.Schedule)
  }
  // 获取下一次启动 job 的时间
  scheduledTime, err := getNextScheduleTime(*cj, now, sched, jm.recorder)
  if err != nil {
    // this is likely a user error in defining the spec value
    // we should log the error and not reconcile this cronjob until an update to spec
    klog.V(2).InfoS("invalid schedule", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()), "schedule", cj.Spec.Schedule, "err", err)
    jm.recorder.Eventf(cj, corev1.EventTypeWarning, "InvalidSchedule", "invalid schedule: %s : %s", cj.Spec.Schedule, err)
    return cj, nil, nil
  }
  // 如果是 nil 
  if scheduledTime == nil {
    // 最早的运行时间比当前时间还早,记录一下日志
    // the scheduled time, that will give atleast 1 unmet time schedule
    klog.V(4).InfoS("No unmet start times", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()))
    // 加了 100ms 用于在下次入列的时候调整时间差
    t := nextScheduledTimeDuration(sched, now)
    return cj, t, nil
  }

  tooLate := false
  if cj.Spec.StartingDeadlineSeconds != nil {
    tooLate = scheduledTime.Add(time.Second * time.Duration(*cj.Spec.StartingDeadlineSeconds)).Before(now)
  }
  // 调度时间 + StartingDeadlineSeconds 比当前时间还要超前,说明已经错过了运行时间,已经太晚了
  if tooLate {
    klog.V(4).InfoS("Missed starting window", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()))
    jm.recorder.Eventf(cj, corev1.EventTypeWarning, "MissSchedule", "Missed scheduled time to start a job: %s", scheduledTime.UTC().Format(time.RFC1123Z))

    // 加 100ms 调整时间差
    t := nextScheduledTimeDuration(sched, now)
    return cj, t, nil
  }
  if isJobInActiveList(&batchv1.Job{
    ObjectMeta: metav1.ObjectMeta{
      Name:      getJobName(cj, *scheduledTime),
      Namespace: cj.Namespace,
    }}, cj.Status.Active) || cj.Status.LastScheduleTime.Equal(&metav1.Time{Time: *scheduledTime}) {
    klog.V(4).InfoS("Not starting job because the scheduled time is already processed", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()), "schedule", scheduledTime)
    t := nextScheduledTimeDuration(sched, now)
    return cj, t, nil
  }
  if cj.Spec.ConcurrencyPolicy == batchv1.ForbidConcurrent && len(cj.Status.Active) > 0 {
    // Regardless which source of information we use for the set of active jobs,
    // there is some risk that we won't see an active job when there is one.
    // (because we haven't seen the status update to the SJ or the created pod).
    // So it is theoretically possible to have concurrency with Forbid.
    // As long the as the invocations are "far enough apart in time", this usually won't happen.
    //
    // TODO: for Forbid, we could use the same name for every execution, as a lock.
    // With replace, we could use a name that is deterministic per execution time.
    // But that would mean that you could not inspect prior successes or failures of Forbid jobs.
    klog.V(4).InfoS("Not starting job because prior execution is still running and concurrency policy is Forbid", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()))
    jm.recorder.Eventf(cj, corev1.EventTypeNormal, "JobAlreadyActive", "Not starting job because prior execution is running and concurrency policy is Forbid")
    t := nextScheduledTimeDuration(sched, now)
    return cj, t, nil
  }
  if cj.Spec.ConcurrencyPolicy == batchv1.ReplaceConcurrent {
    for _, j := range cj.Status.Active {
      klog.V(4).InfoS("Deleting job that was still running at next scheduled start time", "job", klog.KRef(j.Namespace, j.Name))

      job, err := jm.jobControl.GetJob(j.Namespace, j.Name)
      if err != nil {
        jm.recorder.Eventf(cj, corev1.EventTypeWarning, "FailedGet", "Get job: %v", err)
        return cj, nil, err
      }
      if !deleteJob(cj, job, jm.jobControl, jm.recorder) {
        return cj, nil, fmt.Errorf("could not replace job %s/%s", job.Namespace, job.Name)
      }
    }
  }

  jobReq, err := getJobFromTemplate2(cj, *scheduledTime)
  if err != nil {
    klog.ErrorS(err, "Unable to make Job from template", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()))
    return cj, nil, err
  }
  jobResp, err := jm.jobControl.CreateJob(cj.Namespace, jobReq)
  switch {
  case errors.HasStatusCause(err, corev1.NamespaceTerminatingCause):
  case errors.IsAlreadyExists(err):
    // If the job is created by other actor, assume  it has updated the cronjob status accordingly
    klog.InfoS("Job already exists", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()), "job", klog.KRef(jobReq.GetNamespace(), jobReq.GetName()))
    return cj, nil, err
  case err != nil:
    // default error handling
    jm.recorder.Eventf(cj, corev1.EventTypeWarning, "FailedCreate", "Error creating job: %v", err)
    return cj, nil, err
  }

  metrics.CronJobCreationSkew.Observe(jobResp.ObjectMeta.GetCreationTimestamp().Sub(*scheduledTime).Seconds())
  klog.V(4).InfoS("Created Job", "job", klog.KRef(jobResp.GetNamespace(), jobResp.GetName()), "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()))
  jm.recorder.Eventf(cj, corev1.EventTypeNormal, "SuccessfulCreate", "Created job %v", jobResp.Name)

  // ------------------------------------------------------------------ //

  // 更新 cronjob status
  jobRef, err := getRef(jobResp)
  if err != nil {
    klog.V(2).InfoS("Unable to make object reference", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()), "err", err)
    return cj, nil, fmt.Errorf("unable to make object reference for job for %s", klog.KRef(cj.GetNamespace(), cj.GetName()))
  }
  cj.Status.Active = append(cj.Status.Active, *jobRef)
  cj.Status.LastScheduleTime = &metav1.Time{Time: *scheduledTime}
  updatedCJ, err = jm.cronJobControl.UpdateStatus(cj)
  if err != nil {
    klog.InfoS("Unable to update status", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()), "resourceVersion", cj.ResourceVersion, "err", err)
    return cj, nil, fmt.Errorf("unable to update status for %s (rv = %s): %v", klog.KRef(cj.GetNamespace(), cj.GetName()), cj.ResourceVersion, err)
  }

  t := nextScheduledTimeDuration(sched, now)
  return updatedCJ, t, nil
}

func getNextScheduleTime(cj batchv1.CronJob, now time.Time, schedule cron.Schedule, recorder record.EventRecorder) (*time.Time, error) {
  var (
    earliestTime time.Time
  )
  if cj.Status.LastScheduleTime != nil {
    earliestTime = cj.Status.LastScheduleTime.Time
  } else {
    // 等于创建时间
    earliestTime = cj.ObjectMeta.CreationTimestamp.Time
  }
  if cj.Spec.StartingDeadlineSeconds != nil {
    schedulingDeadline := now.Add(-time.Second * time.Duration(*cj.Spec.StartingDeadlineSeconds))
    // 如果当前时间减去 StartingDeadlineSeconds 在 earliestTime 之后,则上一次启动时间要从 StartingDeadlineSeconds 开始前算起
    if schedulingDeadline.After(earliestTime) {
      earliestTime = schedulingDeadline
    }
  }
  if earliestTime.After(now) {
    return nil, nil
  }
  // 计算错过多少次运行
  t, numberOfMissedSchedules, err := getMostRecentScheduleTime(earliestTime, now, schedule)

  if numberOfMissedSchedules > 100 {
    // 防止过多错过运行的 job 运行,耗完节点资源
    recorder.Eventf(&cj, corev1.EventTypeWarning, "TooManyMissedTimes", "too many missed start times: %d. Set or decrease .spec.startingDeadlineSeconds or check clock skew", numberOfMissedSchedules)
    klog.InfoS("too many missed times", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()), "missed times", numberOfMissedSchedules)
  }
  return t, err
}

pkg/controller/cronjob/utils.go

1
2
3
4
5
6
7
8
func inActiveList(cj batchv1.CronJob, uid types.UID) bool {
  for _, j := range cj.Status.Active {
    if j.UID == uid {
      return true
    }
  }
  return false
}

总结

  1. cronjob 实际也没那么复杂,更多的逻辑是在处理时间问题上。

  2. 它直接对接 job 对象,由 job 控制器来控制 pod 的生命周期,它只负责维护和清理 job 即可。

  3. CronJobControllerV2 默认是开启状态。

  4. 由于篇幅关系,部分函数没有列出,感兴趣的朋友可以自行阅读。