1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
|
func (jm *Controller) syncJob(key string) (forget bool, rErr error) {
startTime := time.Now()
defer func() {
klog.V(4).Infof("Finished syncing job %q (%v)", key, time.Since(startTime))
}()
// 根据 key 获取 job 对象名和命名空间
ns, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return false, err
}
if len(ns) == 0 || len(name) == 0 {
return false, fmt.Errorf("invalid job key %q: either namespace or name is missing", key)
}
// 根据名和命名空间得到 job 对象
sharedJob, err := jm.jobLister.Jobs(ns).Get(name)
if err != nil {
if apierrors.IsNotFound(err) {
klog.V(4).Infof("Job has been deleted: %v", key)
jm.expectations.DeleteExpectations(key)
return true, nil
}
return false, err
}
// 从缓存中深拷贝一份数据,不会动到缓存中的数据
job := *sharedJob.DeepCopy()
// 检查 job 是否已完成状态
if IsJobFinished(&job) {
return true, nil
}
// 检查 Indexed Job 特性有没有开启.
if !feature.DefaultFeatureGate.Enabled(features.IndexedJob) && isIndexedJob(&job) {
jm.recorder.Event(&job, v1.EventTypeWarning, "IndexedJobDisabled", "Skipped Indexed Job sync because feature is disabled.")
return false, nil
}
// 处理未知 CompletionMode
if job.Spec.CompletionMode != nil && *job.Spec.CompletionMode != batch.NonIndexedCompletion && *job.Spec.CompletionMode != batch.IndexedCompletion {
jm.recorder.Event(&job, v1.EventTypeWarning, "UnknownCompletionMode", "Skipped Job sync because completion mode is unknown")
return false, nil
}
// 设置 completionMode NonIndexed
completionMode := string(batch.NonIndexedCompletion)
if isIndexedJob(&job) {
completionMode = string(batch.IndexedCompletion)
}
action := metrics.JobSyncActionReconciling
// metrics 相关部分
defer func() {
result := "success"
if rErr != nil {
result = "error"
}
metrics.JobSyncDurationSeconds.WithLabelValues(completionMode, result, action).Observe(time.Since(startTime).Seconds())
metrics.JobSyncNum.WithLabelValues(completionMode, result, action).Inc()
}()
// 处理 pod finalizer,1.22 版本 alpha 的特性
// UncountedTerminatedPods 保存已终止但尚未被作业控制器纳入状态计数器中的 Pod 的 UID 的集合。
// job 控制器所创建 Pod 带有终结器。当 Pod 终止(成功或失败)时,控制器将执行三个步骤以在作业状态中对其进行说明: (1)将 Pod UID 添加到此字段的列表中。(2)去掉 Pod 中的终结器。(3)从数组中删除 Pod UID,同时为相应的计数器加一。
var uncounted *uncountedTerminatedPods
if trackingUncountedPods(&job) {
klog.V(4).InfoS("Tracking uncounted Pods with pod finalizers", "job", klog.KObj(&job))
if job.Status.UncountedTerminatedPods == nil {
job.Status.UncountedTerminatedPods = &batch.UncountedTerminatedPods{}
}
uncounted = newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods)
// 删除 job 的 "batch.kubernetes.io/job-tracking" 注解
} else if patch := removeTrackingAnnotationPatch(&job); patch != nil {
if err := jm.patchJobHandler(&job, patch); err != nil {
return false, fmt.Errorf("removing tracking finalizer from job %s: %w", key, err)
}
}
// 在统计活跃 pod 之前,检查期望状态是否达成,如果先统计 pod 的 active,succeeded,failed 状态,再检查期望是否达成,有可能出现一个问题就是,pod 状态已经统计完了,在统计过程中假如有 pod add 事件没有被统计到 active 中,接下来会出现期望已经达成,但是 active 的数量不够的情况出现。
jobNeedsSync := jm.expectations.SatisfiedExpectations(key)
pods, err := jm.getPodsForJob(&job, uncounted != nil)
if err != nil {
return false, err
}
activePods := controller.FilterActivePods(pods)
active := int32(len(activePods))
succeeded, failed := getStatus(&job, pods, uncounted)
// 第一次启动且非 Suspended 状态,设置 StartTime
if job.Status.StartTime == nil && !jobSuspended(&job) {
now := metav1.Now()
job.Status.StartTime = &now
}
var manageJobErr error
var finishedCondition *batch.JobCondition
// 这里意思是拿到 job 后,又有新的错误产生并同步到缓存中?
jobHasNewFailure := failed > job.Status.Failed
exceedsBackoffLimit := jobHasNewFailure && (active != *job.Spec.Parallelism) &&
(failed > *job.Spec.BackoffLimit)
if exceedsBackoffLimit || pastBackoffLimitOnFailure(&job, pods) {
// 错误重试次数超过了阈值,作业失败
finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, "BackoffLimitExceeded", "Job has reached the specified backoff limit")
} else if pastActiveDeadline(&job) {
// 执行超时
finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, "DeadlineExceeded", "Job was active longer than specified deadline")
// job 配置了 ActiveDeadlineSeconds ,重新加入队列
} else if job.Spec.ActiveDeadlineSeconds != nil && !jobSuspended(&job) {
syncDuration := time.Duration(*job.Spec.ActiveDeadlineSeconds)*time.Second - time.Since(job.Status.StartTime.Time)
klog.V(2).InfoS("Job has activeDeadlineSeconds configuration. Will sync this job again", "job", key, "nextSyncIn", syncDuration)
jm.queue.AddAfter(key, syncDuration)
}
var prevSucceededIndexes, succeededIndexes orderedIntervals
if isIndexedJob(&job) {
prevSucceededIndexes, succeededIndexes = calculateSucceededIndexes(&job, pods)
succeeded = int32(succeededIndexes.total())
}
suspendCondChanged := false
// 如果 job 执行失败,移除所有活跃的 pod
if finishedCondition != nil {
deleted, err := jm.deleteActivePods(&job, activePods)
if uncounted == nil {
// 默认操作,预设所有 active 状态的 pod 都删掉
deleted = active
} else if deleted != active {
// 可能 有 pod finalizers 的存在.
finishedCondition = nil
}
active -= deleted
failed += deleted
manageJobErr = err
} else {
// job 执行成功,且未处于被删除状态
manageJobCalled := false
// 调用 manageJob 管理 pod 数量
if jobNeedsSync && job.DeletionTimestamp == nil {
active, action, manageJobErr = jm.manageJob(&job, activePods, succeeded, succeededIndexes)
manageJobCalled = true
}
complete := false
if job.Spec.Completions == nil {
// 没有活跃状态的 pod,且有一个成功,则认为 job 成功了
complete = succeeded > 0 && active == 0
} else {
// 成功的 pod 数量大于等于 Completions 指定的数量也认为是成功了
complete = succeeded >= *job.Spec.Completions && active == 0
}
if complete {
// 更新状态
finishedCondition = newCondition(batch.JobComplete, v1.ConditionTrue, "", "")
// 1.21 新特性,将 suspend 字段设置为 true ,Job 控制器将不会创建 Pod,直到准备好启动 Job
} else if feature.DefaultFeatureGate.Enabled(features.SuspendJob) && manageJobCalled {
// 如果 Suspend 字段非空
if job.Spec.Suspend != nil && *job.Spec.Suspend {
// Suspend 在 job 执行中中途变更
var isUpdated bool
job.Status.Conditions, isUpdated = ensureJobConditionStatus(job.Status.Conditions, batch.JobSuspended, v1.ConditionTrue, "JobSuspended", "Job suspended")
if isUpdated {
suspendCondChanged = true
jm.recorder.Event(&job, v1.EventTypeNormal, "Suspended", "Job suspended")
}
} else {
// Job not suspended.
var isUpdated bool
job.Status.Conditions, isUpdated = ensureJobConditionStatus(job.Status.Conditions, batch.JobSuspended, v1.ConditionFalse, "JobResumed", "Job resumed")
if isUpdated {
suspendCondChanged = true
jm.recorder.Event(&job, v1.EventTypeNormal, "Resumed", "Job resumed")
// 继续执行,StartTime 将重置
now := metav1.Now()
job.Status.StartTime = &now
}
}
}
}
forget = false
// 如果 pod 成功数量比上一次多,则退避次数清零
if job.Status.Succeeded < succeeded {
forget = true
}
if uncounted != nil {
needsStatusUpdate := suspendCondChanged || active != job.Status.Active
job.Status.Active = active
err = jm.trackJobStatusAndRemoveFinalizers(&job, pods, prevSucceededIndexes, *uncounted, finishedCondition, needsStatusUpdate)
if err != nil {
return false, err
}
jobFinished := IsJobFinished(&job)
if jobHasNewFailure && !jobFinished {
// returning an error will re-enqueue Job after the backoff period
return forget, fmt.Errorf("failed pod(s) detected for job key %q", key)
}
forget = true
return forget, manageJobErr
}
// Legacy path: tracking without finalizers.
// Ensure that there are no leftover tracking finalizers.
if err := jm.removeTrackingFinalizersFromAllPods(pods); err != nil {
return false, fmt.Errorf("removing disabled finalizers from job pods %s: %w", key, err)
}
// 如果 job status 没有发生变化,则不做任何变更
if job.Status.Active != active || job.Status.Succeeded != succeeded || job.Status.Failed != failed || suspendCondChanged || finishedCondition != nil {
job.Status.Active = active
job.Status.Succeeded = succeeded
job.Status.Failed = failed
if isIndexedJob(&job) {
job.Status.CompletedIndexes = succeededIndexes.String()
}
job.Status.UncountedTerminatedPods = nil
jm.enactJobFinished(&job, finishedCondition)
if err := jm.updateStatusHandler(&job); err != nil {
return forget, err
}
if jobHasNewFailure && !IsJobFinished(&job) {
// returning an error will re-enqueue Job after the backoff period
return forget, fmt.Errorf("failed pod(s) detected for job key %q", key)
}
forget = true
}
return forget, manageJobErr
}
|