目录

kubernetes源码-StatefulSet Controller 原理和源码分析

StatefulSet Controller 原理和源码分析,源码为 kubernetes 的 release-1.22 分支 .

写在前面

前面我们看了那几个控制器,都是无状态的,而 StatefulSet 控制器主打的就是有状态的,所以他也包含了管理存储卷相关的一些逻辑。

我个人觉得 StatefulSet 控制器的代码逻辑是比较乱的,在阅读过程中 StatefulSetController 和 defaultStatefulSetControl 相关部分的代码,接口命名差不多,很容易造成视觉干扰。

跟 Deployment 控制器一样,它的入口函数位于 kubernetes/cmd/kube-controller-manager/app/apps.go 里面。

NewStatefulSetController

我们废话不多说,直接看它的 NewStatefulSetController 结构。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
func NewStatefulSetController(
  podInformer coreinformers.PodInformer,
  setInformer appsinformers.StatefulSetInformer,
  pvcInformer coreinformers.PersistentVolumeClaimInformer,
  revInformer appsinformers.ControllerRevisionInformer,
  kubeClient clientset.Interface,
) *StatefulSetController {
  eventBroadcaster := record.NewBroadcaster()
  eventBroadcaster.StartStructuredLogging(0)
  eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
  recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "statefulset-controller"})
  ssc := &StatefulSetController{
    kubeClient: kubeClient,
    control: NewDefaultStatefulSetControl(
      NewRealStatefulPodControl(
        kubeClient,
        setInformer.Lister(),
        podInformer.Lister(),
        pvcInformer.Lister(),
        recorder),
      NewRealStatefulSetStatusUpdater(kubeClient, setInformer.Lister()),
      history.NewHistory(kubeClient, revInformer.Lister()),
      recorder,
    ),
    pvcListerSynced: pvcInformer.Informer().HasSynced,
    queue:           workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "statefulset"),
    podControl:      controller.RealPodControl{KubeClient: kubeClient, Recorder: recorder},

    revListerSynced: revInformer.Informer().HasSynced,
  }

  podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    // lookup the statefulset and enqueue
    AddFunc: ssc.addPod,
    // lookup current and old statefulset if labels changed
    UpdateFunc: ssc.updatePod,
    // lookup statefulset accounting for deletion tombstones
    DeleteFunc: ssc.deletePod,
  })
  ssc.podLister = podInformer.Lister()
  ssc.podListerSynced = podInformer.Informer().HasSynced

  setInformer.Informer().AddEventHandler(
    cache.ResourceEventHandlerFuncs{
      AddFunc: ssc.enqueueStatefulSet,
      UpdateFunc: func(old, cur interface{}) {
        oldPS := old.(*apps.StatefulSet)
        curPS := cur.(*apps.StatefulSet)
        if oldPS.Status.Replicas != curPS.Status.Replicas {
          klog.V(4).Infof("Observed updated replica count for StatefulSet: %v, %d->%d", curPS.Name, oldPS.Status.Replicas, curPS.Status.Replicas)
        }
        ssc.enqueueStatefulSet(cur)
      },
      DeleteFunc: ssc.enqueueStatefulSet,
    },
  )
  ssc.setLister = setInformer.Lister()
  ssc.setListerSynced = setInformer.Informer().HasSynced

  // TODO: Watch volumes
  return ssc
}

可以看到,监听了自身对象的事件和 pod 对象的事件,除此以外,我们还看到有 pvcInformer 的监听,不过并未设置监听事件的相关函数,我们继续往后看,看看它是怎么应用的。

监听函数

跟其他控制器一样,它也设置了很多的事件监听,我们看看其中一些比较有用的。

StatefulSet 对象

监听后做一些常规判断后,加入队列,updateDaemonset 还会跟上一次同步简单对比一下 pod 数量,如果数量不一样会打印日志,我们了解一下就可以。

  • addDaemonset

  • updateDaemonset

  • deleteDaemonset

Pod 对象

这里没有什么特殊的,都是简单判断一下,然后入列,简单看看就可以了。

  • addPod

  • updatePod

  • deletePod

enqueue

跟其他控制器一样,也是延迟入列和立即入列,没什么特殊的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
// enqueueStatefulSet enqueues the given statefulset in the work queue.
func (ssc *StatefulSetController) enqueueStatefulSet(obj interface{}) {
  key, err := controller.KeyFunc(obj)
  if err != nil {
    utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err))
    return
  }
  ssc.queue.Add(key)
}

// enqueueStatefulSet enqueues the given statefulset in the work queue after given time
func (ssc *StatefulSetController) enqueueSSAfter(ss *apps.StatefulSet, duration time.Duration) {
  key, err := controller.KeyFunc(ss)
  if err != nil {
    utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", ss, err))
    return
  }
  ssc.queue.AddAfter(key, duration)
}

Run

默认5个线程。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// Run runs the statefulset controller.
func (ssc *StatefulSetController) Run(workers int, stopCh <-chan struct{}) {
  defer utilruntime.HandleCrash()
  defer ssc.queue.ShutDown()

  klog.Infof("Starting stateful set controller")
  defer klog.Infof("Shutting down statefulset controller")

  if !cache.WaitForNamedCacheSync("stateful set", stopCh, ssc.podListerSynced, ssc.setListerSynced, ssc.pvcListerSynced, ssc.revListerSynced) {
    return
  }

  for i := 0; i < workers; i++ {
    go wait.Until(ssc.worker, time.Second, stopCh)
  }

  <-stopCh
}

func (ssc *StatefulSetController) processNextWorkItem() bool {
  key, quit := ssc.queue.Get()
  if quit {
    return false
  }
  defer ssc.queue.Done(key)
  if err := ssc.sync(key.(string)); err != nil {
    utilruntime.HandleError(fmt.Errorf("error syncing StatefulSet %v, requeuing: %v", key.(string), err))
    ssc.queue.AddRateLimited(key)
  } else {
    ssc.queue.Forget(key)
  }
  return true
}

// worker runs a worker goroutine that invokes processNextWorkItem until the controller's queue is closed
func (ssc *StatefulSetController) worker() {
  for ssc.processNextWorkItem() {
  }
}

sync

控制器的核心逻辑在这里实现。

  1. 先根据唯一 key 从缓存中获取 StatefulSet 对象。

  2. 拿到 StatefulSet 对象的所有 pod 对象,然后将所有 pod 交给 syncStatefulSet() f方法去做同步处理。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func (ssc *StatefulSetController) sync(key string) error {
  startTime := time.Now()
  defer func() {
    klog.V(4).Infof("Finished syncing statefulset %q (%v)", key, time.Since(startTime))
  }()

  namespace, name, err := cache.SplitMetaNamespaceKey(key)
  if err != nil {
    return err
  }
  set, err := ssc.setLister.StatefulSets(namespace).Get(name)
  if errors.IsNotFound(err) {
    klog.Infof("StatefulSet has been deleted %v", key)
    return nil
  }
  if err != nil {
    utilruntime.HandleError(fmt.Errorf("unable to retrieve StatefulSet %v from store: %v", key, err))
    return err
  }

  selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  if err != nil {
    utilruntime.HandleError(fmt.Errorf("error converting StatefulSet %v selector: %v", key, err))
    // This is a non-transient error, so don't retry.
    return nil
  }

  if err := ssc.adoptOrphanRevisions(set); err != nil {
    return err
  }

  pods, err := ssc.getPodsForStatefulSet(set, selector)
  if err != nil {
    return err
  }

  return ssc.syncStatefulSet(set, pods)
}

syncStatefulSet

  1. 它转手就把 StatefulSet 对象和它的所有 pod 交给 ssc.control.UpdateStatefulSet() 方法去处理。

  2. ssc.control.UpdateStatefulSet() 没有报错的话,检测 StatefulSetMinReadySeconds 特性是否开启,是的话,计算延迟时间,再将 StatefulSet 重新加入到延迟队列等待同步。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
func (ssc *StatefulSetController) syncStatefulSet(set *apps.StatefulSet, pods []*v1.Pod) error {
  klog.V(4).Infof("Syncing StatefulSet %v/%v with %d pods", set.Namespace, set.Name, len(pods))
  var status *apps.StatefulSetStatus
  var err error
  // TODO: investigate where we mutate the set during the update as it is not obvious.
  status, err = ssc.control.UpdateStatefulSet(set.DeepCopy(), pods)
  if err != nil {
    return err
  }
  klog.V(4).Infof("Successfully synced StatefulSet %s/%s successful", set.Namespace, set.Name)
  // One more sync to handle the clock skew. This is also helping in requeuing right after status update
  if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetMinReadySeconds) && set.Spec.MinReadySeconds > 0 && status != nil && status.AvailableReplicas != *set.Spec.Replicas {
    ssc.enqueueSSAfter(set, time.Duration(set.Spec.MinReadySeconds)*time.Second)
  }

  return nil
}

ssc.control.UpdateStatefulSet()

  1. 核心逻辑的入口。

  2. 获取 StatefulSet 对象的所有 Revisions 实例,再对这些实例进行排序。

  3. 使用 ssc.performUpdate() 方法返回 currentRevision, updateRevision, status 者三个参数。

  4. 最后再使用 ssc.truncateHistory() 方法对 Revisions 的历史版本个数进行维护。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
func (ssc *defaultStatefulSetControl) UpdateStatefulSet(set *apps.StatefulSet, pods []*v1.Pod) (*apps.StatefulSetStatus, error) {
  // 获取所有 revisions 版本实例
  revisions, err := ssc.ListRevisions(set)
  if err != nil {
    return nil, err
  }
  // 排序
  history.SortControllerRevisions(revisions)
  // 获取 currentRevision, updateRevision ,用于清理 revision 个数。
  currentRevision, updateRevision, status, err := ssc.performUpdate(set, pods, revisions)
  if err != nil {
    return nil, utilerrors.NewAggregate([]error{err, ssc.truncateHistory(set, pods, revisions, currentRevision, updateRevision)})
  }

  // 清理历史版本个数
  return status, ssc.truncateHistory(set, pods, revisions, currentRevision, updateRevision)
}

ssc.performUpdate

defaultStatefulSetControl,源码文件 pkg/controller/statefulset/stateful_set_control.go

调用 ss.updateStatefulSet() 和 ssc.updateStatefulSetStatus() 进行 StatefulSet 对象同步。

  1. 使用 ssc.getStatefulSetRevisions() 方法获取 currentRevision, updateRevision, collisionCount 参数。(ssc.getStatefulSetRevisions 通过 newRevision 函数生成 updateRevision,会把当前版本 Revision 存储到 updateRevision 的 data 字段上)

  2. 调用 ssc.updateStatefulSet() 同步 StatefulSet 对象。

  3. 调用 ssc.updateStatefulSetStatus() 同步 StatefulSet 对象的 status 状态。

  4. 返回 currentRevision, updateRevision, currentStatus 给 ssc.control.UpdateStatefulSet() 方法。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func (ssc *defaultStatefulSetControl) performUpdate(
  set *apps.StatefulSet, pods []*v1.Pod, revisions []*apps.ControllerRevision) (*apps.ControllerRevision, *apps.ControllerRevision, *apps.StatefulSetStatus, error) {
  var currentStatus *apps.StatefulSetStatus
  // 根据 hash 获取 currentRevision, updateRevision ,用于判断是否更新 sts 实例对象。
  currentRevision, updateRevision, collisionCount, err := ssc.getStatefulSetRevisions(set, revisions)
  if err != nil {
    return currentRevision, updateRevision, currentStatus, err
  }

  // 执行 sts 实例扩缩容,滚动更新
  currentStatus, err = ssc.updateStatefulSet(set, currentRevision, updateRevision, collisionCount, pods)
  if err != nil {
    return currentRevision, updateRevision, currentStatus, err
  }
  // 设置 sts 实例对象 status 数值
  err = ssc.updateStatefulSetStatus(set, currentStatus)
  if err != nil {
    return currentRevision, updateRevision, currentStatus, err
  }
  klog.V(4).Infof("StatefulSet %s/%s pod status replicas=%d ready=%d current=%d updated=%d",
    set.Namespace,
    set.Name,
    currentStatus.Replicas,
    currentStatus.ReadyReplicas,
    currentStatus.CurrentReplicas,
    currentStatus.UpdatedReplicas)

  klog.V(4).Infof("StatefulSet %s/%s revisions current=%s update=%s",
    set.Namespace,
    set.Name,
    currentStatus.CurrentRevision,
    currentStatus.UpdateRevision)

  return currentRevision, updateRevision, currentStatus, nil
}

ssc.getStatefulSetRevisions

ssc.getStatefulSetRevisions(),源码文件 pkg/controller/statefulset/stateful_set_control.go

  1. 先新创建一个 Revision ,并判断新的 Revision 的 hash(不是拿 Revision.revision 去判断的,而是 hash) 在旧的 Revision 实例中存不存在一样的,如果存在,且都是位于排序(升序)的最后一个,则认为不需要更新 Revision 。如果存在,但不是旧 Revision 中的最后一个,则认为 Revision 版本发生了变更,需要更新。最后,如果不存在这个 Revision 则新建。

  2. 返回 currentRevision, updateRevision, collisionCount 供给 ssc.updateStatefulSet() 方法调用。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
func (ssc *defaultStatefulSetControl) getStatefulSetRevisions(
  set *apps.StatefulSet,
  revisions []*apps.ControllerRevision) (*apps.ControllerRevision, *apps.ControllerRevision, int32, error) {
  var currentRevision, updateRevision *apps.ControllerRevision

  revisionCount := len(revisions)
  history.SortControllerRevisions(revisions)

  // Use a local copy of set.Status.CollisionCount to avoid modifying set.Status directly.
  // This copy is returned so the value gets carried over to set.Status in updateStatefulSet.
  var collisionCount int32
  if set.Status.CollisionCount != nil {
    collisionCount = *set.Status.CollisionCount
  }

  // create a new revision from the current set
  updateRevision, err := newRevision(set, nextRevision(revisions), &collisionCount)
  if err != nil {
    return nil, nil, collisionCount, err
  }

  // find any equivalent revisions
  equalRevisions := history.FindEqualRevisions(revisions, updateRevision)
  equalCount := len(equalRevisions)

  if equalCount > 0 && history.EqualRevision(revisions[revisionCount-1], equalRevisions[equalCount-1]) {
    // 如果 revisions 的最新版本 跟 equalRevisions 里面的最新版本 hash 一致 则无需更新和同步
    updateRevision = revisions[revisionCount-1]
  } else if equalCount > 0 {
    // 如果 hash 有一致的,但不是最新的版本,则认为需要更新 revision 为当前的 equalRevisions[equalCount-1]
    updateRevision, err = ssc.controllerHistory.UpdateControllerRevision(
      equalRevisions[equalCount-1],
      updateRevision.Revision)
    if err != nil {
      return nil, nil, collisionCount, err
    }
  } else {
    // 如果不存在则创建
    updateRevision, err = ssc.controllerHistory.CreateControllerRevision(set, updateRevision, &collisionCount)
    if err != nil {
      return nil, nil, collisionCount, err
    }
  }

  // attempt to find the revision that corresponds to the current revision
  for i := range revisions {
    if revisions[i].Name == set.Status.CurrentRevision {
      currentRevision = revisions[i]
      break
    }
  }

  // if the current revision is nil we initialize the history by setting it to the update revision
  if currentRevision == nil {
    currentRevision = updateRevision
  }

  return currentRevision, updateRevision, collisionCount, nil
}

ssc.updateStatefulSet

ssc.updateStatefulSet(),源码文件 pkg/controller/statefulset/stateful_set_control.go

  1. StatefulSet 对象同步核心逻辑,代码极长。

  2. 先统计 replicaCount ,看看 StatefulSet 需要多少个 pod 。

  3. replicas 切片存放 replicaCount 数量的 pod,condemned 存放超过 replicaCount 数量的 pod,unhealthy,firstUnhealthyPod 存放不健康的 pod 。

  4. 先检查 pod 是否就绪,然后判断 pod 创建了没有,并更新 status.CurrentReplicas ,status.UpdatedReplicas 的数值。

  5. 再检查 pod 的数量,多出的则加入到 condemned 切片,少了则加入到 replicas 切片。

  6. 检测 pod 健康,不健康则删除重建。

  7. 检测 replicas 切片内容,把不健康的 pod 删除重建,少了的 pod 新建补全。

  8. 检测 condemned 切片内容,把多出来的 pod 拿出来删了,ds 的 PodManagementPolicyType 默认是不等于 “Parallel” 的,也就是只能一次处理一个 pod 。如果是 “Parallel” ,则并行处理。

  9. 如果要滚动更新,则从排序最后一个的一个 pod 开始。而且更新没有并发模式的。

  10. 返回 status 给 ssc.updateStatefulSetStatus() 方法使用。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
func (ssc *defaultStatefulSetControl) updateStatefulSet(
  set *apps.StatefulSet,
  currentRevision *apps.ControllerRevision,
  updateRevision *apps.ControllerRevision,
  collisionCount int32,
  pods []*v1.Pod) (*apps.StatefulSetStatus, error) {
  // get the current and update revisions of the set.
  currentSet, err := ApplyRevision(set, currentRevision)
  if err != nil {
    return nil, err
  }
  updateSet, err := ApplyRevision(set, updateRevision)
  if err != nil {
    return nil, err
  }

  // set the generation, and revisions in the returned status
  status := apps.StatefulSetStatus{}
  status.ObservedGeneration = set.Generation
  status.CurrentRevision = currentRevision.Name
  status.UpdateRevision = updateRevision.Name
  status.CollisionCount = new(int32)
  *status.CollisionCount = collisionCount

  replicaCount := int(*set.Spec.Replicas)
  // slice that will contain all Pods such that 0 <= getOrdinal(pod) < set.Spec.Replicas
  replicas := make([]*v1.Pod, replicaCount)
  // slice that will contain all Pods such that set.Spec.Replicas <= getOrdinal(pod)
  condemned := make([]*v1.Pod, 0, len(pods))
  unhealthy := 0
  var firstUnhealthyPod *v1.Pod

  // 过滤 pod 到2个切片
  for i := range pods {
    status.Replicas++

    // count the number of running and ready replicas
    if isRunningAndReady(pods[i]) {
      status.ReadyReplicas++
      // count the number of running and available replicas
      if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetMinReadySeconds) {
        if isRunningAndAvailable(pods[i], set.Spec.MinReadySeconds) {
          status.AvailableReplicas++
        }
      } else {
        // If the featuregate is not enabled, all the ready replicas should be considered as available replicas
        status.AvailableReplicas = status.ReadyReplicas
      }
    }

    // pod 存不存在
    if isCreated(pods[i]) && !isTerminating(pods[i]) {
      if getPodRevision(pods[i]) == currentRevision.Name {
        status.CurrentReplicas++
      }
      if getPodRevision(pods[i]) == updateRevision.Name {
        status.UpdatedReplicas++
      }
    }

    if ord := getOrdinal(pods[i]); 0 <= ord && ord < replicaCount {
      // 少了的 pod 加入待创建的切片
      replicas[ord] = pods[i]

    } else if ord >= replicaCount {
      // 超过数量的 pod 加入待删除切片
      condemned = append(condemned, pods[i])
    }
    // If the ordinal could not be parsed (ord < 0), ignore the Pod.
  }

  // 判断是否需要创建 pod
  for ord := 0; ord < replicaCount; ord++ {
    if replicas[ord] == nil {
      replicas[ord] = newVersionedStatefulSetPod(
        currentSet,
        updateSet,
        currentRevision.Name,
        updateRevision.Name, ord)
    }
  }

  // sort the condemned Pods by their ordinals
  sort.Sort(ascendingOrdinal(condemned))

  // find the first unhealthy Pod
  for i := range replicas {
    if !isHealthy(replicas[i]) {
      unhealthy++
      if firstUnhealthyPod == nil {
        firstUnhealthyPod = replicas[i]
      }
    }
  }

  for i := range condemned {
    if !isHealthy(condemned[i]) {
      unhealthy++
      if firstUnhealthyPod == nil {
        firstUnhealthyPod = condemned[i]
      }
    }
  }

  if unhealthy > 0 {
    klog.V(4).Infof("StatefulSet %s/%s has %d unhealthy Pods starting with %s",
      set.Namespace,
      set.Name,
      unhealthy,
      firstUnhealthyPod.Name)
  }

  // If the StatefulSet is being deleted, don't do anything other than updating
  // status.
  if set.DeletionTimestamp != nil {
    return &status, nil
  }

  monotonic := !allowsBurst(set)

  // Examine each replica with respect to its ordinal
  for i := range replicas {
    // 重建不健康的 pod
    if isFailed(replicas[i]) {
      ssc.recorder.Eventf(set, v1.EventTypeWarning, "RecreatingFailedPod",
        "StatefulSet %s/%s is recreating failed Pod %s",
        set.Namespace,
        set.Name,
        replicas[i].Name)
      if err := ssc.podControl.DeleteStatefulPod(set, replicas[i]); err != nil {
        return &status, err
      }
      if getPodRevision(replicas[i]) == currentRevision.Name {
        status.CurrentReplicas--
      }
      if getPodRevision(replicas[i]) == updateRevision.Name {
        status.UpdatedReplicas--
      }
      status.Replicas--
      replicas[i] = newVersionedStatefulSetPod(
        currentSet,
        updateSet,
        currentRevision.Name,
        updateRevision.Name,
        i)
    }
    // 扩容
    if !isCreated(replicas[i]) {
      if err := ssc.podControl.CreateStatefulPod(set, replicas[i]); err != nil {
        return &status, err
      }
      status.Replicas++
      if getPodRevision(replicas[i]) == currentRevision.Name {
        status.CurrentReplicas++
      }
      if getPodRevision(replicas[i]) == updateRevision.Name {
        status.UpdatedReplicas++
      }

      // if the set does not allow bursting, return immediately
      if monotonic {
        return &status, nil
      }
      // pod created, no more work possible for this round
      continue
    }
    // If we find a Pod that is currently terminating, we must wait until graceful deletion
    // completes before we continue to make progress.
    if isTerminating(replicas[i]) && monotonic {
      klog.V(4).Infof(
        "StatefulSet %s/%s is waiting for Pod %s to Terminate",
        set.Namespace,
        set.Name,
        replicas[i].Name)
      return &status, nil
    }
    // If we have a Pod that has been created but is not running and ready we can not make progress.
    // We must ensure that all for each Pod, when we create it, all of its predecessors, with respect to its
    // ordinal, are Running and Ready.
    if !isRunningAndReady(replicas[i]) && monotonic {
      klog.V(4).Infof(
        "StatefulSet %s/%s is waiting for Pod %s to be Running and Ready",
        set.Namespace,
        set.Name,
        replicas[i].Name)
      return &status, nil
    }
    // If we have a Pod that has been created but is not available we can not make progress.
    // We must ensure that all for each Pod, when we create it, all of its predecessors, with respect to its
    // ordinal, are Available.
    // TODO: Since available is superset of Ready, once we have this featuregate enabled by default, we can remove the
    // isRunningAndReady block as only Available pods should be brought down.
    if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetMinReadySeconds) && !isRunningAndAvailable(replicas[i], set.Spec.MinReadySeconds) && monotonic {
      klog.V(4).Infof(
        "StatefulSet %s/%s is waiting for Pod %s to be Available",
        set.Namespace,
        set.Name,
        replicas[i].Name)
      return &status, nil
    }
    // Enforce the StatefulSet invariants
    if identityMatches(set, replicas[i]) && storageMatches(set, replicas[i]) {
      continue
    }
    // Make a deep copy so we don't mutate the shared cache
    replica := replicas[i].DeepCopy()
    if err := ssc.podControl.UpdateStatefulPod(updateSet, replica); err != nil {
      return &status, err
    }
  }

  // 缩容
  for target := len(condemned) - 1; target >= 0; target-- {
    // wait for terminating pods to expire
    if isTerminating(condemned[target]) {
      klog.V(4).Infof(
        "StatefulSet %s/%s is waiting for Pod %s to Terminate prior to scale down",
        set.Namespace,
        set.Name,
        condemned[target].Name)
      // block if we are in monotonic mode
      if monotonic {
        return &status, nil
      }
      continue
    }
    // if we are in monotonic mode and the condemned target is not the first unhealthy Pod block
    if !isRunningAndReady(condemned[target]) && monotonic && condemned[target] != firstUnhealthyPod {
      klog.V(4).Infof(
        "StatefulSet %s/%s is waiting for Pod %s to be Running and Ready prior to scale down",
        set.Namespace,
        set.Name,
        firstUnhealthyPod.Name)
      return &status, nil
    }
    // if we are in monotonic mode and the condemned target is not the first unhealthy Pod, block.
    // TODO: Since available is superset of Ready, once we have this featuregate enabled by default, we can remove the
    // isRunningAndReady block as only Available pods should be brought down.
    if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetMinReadySeconds) && !isRunningAndAvailable(condemned[target], set.Spec.MinReadySeconds) && monotonic && condemned[target] != firstUnhealthyPod {
      klog.V(4).Infof(
        "StatefulSet %s/%s is waiting for Pod %s to be Available prior to scale down",
        set.Namespace,
        set.Name,
        firstUnhealthyPod.Name)
      return &status, nil
    }
    klog.V(2).Infof("StatefulSet %s/%s terminating Pod %s for scale down",
      set.Namespace,
      set.Name,
      condemned[target].Name)

    if err := ssc.podControl.DeleteStatefulPod(set, condemned[target]); err != nil {
      return &status, err
    }
    if getPodRevision(condemned[target]) == currentRevision.Name {
      status.CurrentReplicas--
    }
    if getPodRevision(condemned[target]) == updateRevision.Name {
      status.UpdatedReplicas--
    }
    if monotonic {
      return &status, nil
    }
  }

  // for the OnDelete strategy we short circuit. Pods will be updated when they are manually deleted.
  if set.Spec.UpdateStrategy.Type == apps.OnDeleteStatefulSetStrategyType {
    return &status, nil
  }

  // 滚动更新
  updateMin := 0
  if set.Spec.UpdateStrategy.RollingUpdate != nil {
    updateMin = int(*set.Spec.UpdateStrategy.RollingUpdate.Partition)
  }
  // we terminate the Pod with the largest ordinal that does not match the update revision.
  for target := len(replicas) - 1; target >= updateMin; target-- {

    // delete the Pod if it is not already terminating and does not match the update revision.
    if getPodRevision(replicas[target]) != updateRevision.Name && !isTerminating(replicas[target]) {
      klog.V(2).Infof("StatefulSet %s/%s terminating Pod %s for update",
        set.Namespace,
        set.Name,
        replicas[target].Name)
      err := ssc.podControl.DeleteStatefulPod(set, replicas[target])
      status.CurrentReplicas--
      return &status, err
    }

    // wait for unhealthy Pods on update
    if !isHealthy(replicas[target]) {
      klog.V(4).Infof(
        "StatefulSet %s/%s is waiting for Pod %s to update",
        set.Namespace,
        set.Name,
        replicas[target].Name)
      return &status, nil
    }

  }
  return &status, nil
}

ssc.updateStatefulSetStatus

源码文件 pkg/controller/statefulset/stateful_set_control.go

  1. StatefulSet status 同步核心逻辑之一。

  2. 设置 status 属性及数值。

  3. 如果设置完以后 status 的数值不一致时,则放弃更新,等一致时再执行更新。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
func (ssc *defaultStatefulSetControl) updateStatefulSetStatus(
  set *apps.StatefulSet,
  status *apps.StatefulSetStatus) error {
  // complete any in progress rolling update if necessary
  completeRollingUpdate(set, status)

  // if the status is not inconsistent do not perform an update
  if !inconsistentStatus(set, status) {
    return nil
  }

  // copy set and update its status
  set = set.DeepCopy()
  if err := ssc.statusUpdater.UpdateStatefulSetStatus(set, status); err != nil {
    return err
  }

  return nil
}

总结

  1. StatefulSet 的代码比较绕,它基本把所有的功能都写在 ssc.updateStatefulSet() 方法里面,要花很长时间去理解。

  2. 存储卷这一块目前没有体现在 StatefulSet 控制器上,后面我们看 volume 控制器应该才能看到 pvc 的一些相关操作。

  3. 它也没有 rs 控制器去控制 pod 数量,也是用 controllerRevision 来控制和管理版本的。