目录

kubernetes源码-ReplicaSet Controller 原理和源码分析

ReplicaSet Controller 原理和源码分析,源码为 kubernetes 的 release-1.22 分支 .

写在前面

前面因为一时兴起看了 job 控制器的源码,还不算难,所以打算由浅入深,从 ReplicaSet 控制器开始,再到后面的其他控制器,都看一遍。

入口函数

入口函数位于 kubernetes/cmd/kube-controller-manager/app/apps.go 下,可以看到 rs 也是属于 kube-controller-manager 的一员。

1
2
3
4
5
6
7
8
9
func startReplicaSetController(ctx ControllerContext) (http.Handler, bool, error) {
  go replicaset.NewReplicaSetController(
    ctx.InformerFactory.Apps().V1().ReplicaSets(),
    ctx.InformerFactory.Core().V1().Pods(),
    ctx.ClientBuilder.ClientOrDie("replicaset-controller"),
    replicaset.BurstReplicas,
  ).Run(int(ctx.ComponentConfig.ReplicaSetController.ConcurrentRSSyncs), ctx.Stop)
  return nil, true, nil
}

NewReplicaSetController

我们可以看到,其实每个控制器的其实函数都是差不多的,都是统一规范来的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
func NewReplicaSetController(rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int) *ReplicaSetController {
  eventBroadcaster := record.NewBroadcaster()
  eventBroadcaster.StartStructuredLogging(0)
  eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
  if err := metrics.Register(legacyregistry.Register); err != nil {
    klog.ErrorS(err, "unable to register metrics")
  }
  return NewBaseController(rsInformer, podInformer, kubeClient, burstReplicas,
    apps.SchemeGroupVersion.WithKind("ReplicaSet"),
    "replicaset_controller",
    "replicaset",
    controller.RealPodControl{
      KubeClient: kubeClient,
      Recorder:   eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "replicaset-controller"}),
    },
  )
}

NewReplicaSetController 实际是调用 NewBaseController 方法去完成构造的。还是老样子,没什么变化,跟 job 控制器一样,无非就是 job 类型的 Informer 换成了 ReplicaSet 类型。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func NewBaseController(rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int,
  gvk schema.GroupVersionKind, metricOwnerName, queueName string, podControl controller.PodControlInterface) *ReplicaSetController {
  if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
    ratelimiter.RegisterMetricAndTrackRateLimiterUsage(metricOwnerName, kubeClient.CoreV1().RESTClient().GetRateLimiter())
  }

  rsc := &ReplicaSetController{
    GroupVersionKind: gvk,
    kubeClient:       kubeClient,
    podControl:       podControl,
    burstReplicas:    burstReplicas,
    expectations:     controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()),
    queue:            workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), queueName),
  }

  rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    AddFunc:    rsc.addRS,
    UpdateFunc: rsc.updateRS,
    DeleteFunc: rsc.deleteRS,
  })
  rsc.rsLister = rsInformer.Lister()
  rsc.rsListerSynced = rsInformer.Informer().HasSynced

  podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    AddFunc: rsc.addPod,
    UpdateFunc: rsc.updatePod,
    DeleteFunc: rsc.deletePod,
  })
  rsc.podLister = podInformer.Lister()
  rsc.podListerSynced = podInformer.Informer().HasSynced

  rsc.syncHandler = rsc.syncReplicaSet

  return rsc
}

Run

和所有控制器一样,启动流程为:Run -> worker -> processNextWorkItem -> syncHandler。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) {
  defer utilruntime.HandleCrash()
  defer rsc.queue.ShutDown()

  controllerName := strings.ToLower(rsc.Kind)
  klog.Infof("Starting %v controller", controllerName)
  defer klog.Infof("Shutting down %v controller", controllerName)

  if !cache.WaitForNamedCacheSync(rsc.Kind, stopCh, rsc.podListerSynced, rsc.rsListerSynced) {
    return
  }

  for i := 0; i < workers; i++ {
    go wait.Until(rsc.worker, time.Second, stopCh)
  }

  <-stopCh
}

addRS/updateRS/deleteRS

我们先不急往下走,先看看 增加、更新和删除 ReplicaSet 的方法。注释已经解释得很详细了。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
// 监听 rs 的新增事件
func (rsc *ReplicaSetController) addRS(obj interface{}) {
  rs := obj.(*apps.ReplicaSet)
  klog.V(4).Infof("Adding %s %s/%s", rsc.Kind, rs.Namespace, rs.Name)
  // 看到新增事件,直接入列
  rsc.enqueueRS(rs)
}

// callback when RS is updated
func (rsc *ReplicaSetController) updateRS(old, cur interface{}) {
  oldRS := old.(*apps.ReplicaSet)
  curRS := cur.(*apps.ReplicaSet)

  // 判断新旧 rs 的 uid 是否相等
  if curRS.UID != oldRS.UID {
    key, err := controller.KeyFunc(oldRS)
    if err != nil {
      utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", oldRS, err))
      return
    }
    // 如果 uid 不一样,则删掉旧的 rs
    rsc.deleteRS(cache.DeletedFinalStateUnknown{
      Key: key,
      Obj: oldRS,
    })
  }

  // 副本数发生变更,则打印日志
  if *(oldRS.Spec.Replicas) != *(curRS.Spec.Replicas) {
    klog.V(4).Infof("%v %v updated. Desired pod count change: %d->%d", rsc.Kind, curRS.Name, *(oldRS.Spec.Replicas), *(curRS.Spec.Replicas))
  }
  // 重新入列
  rsc.enqueueRS(curRS)
}

func (rsc *ReplicaSetController) deleteRS(obj interface{}) {
  rs, ok := obj.(*apps.ReplicaSet)
  if !ok {
    // 当一个obj被删除了, 但是这个程序这边由于某种原因miss了这次deletion event, 那么假如在做同步操作时, 从服务器获取的列表中已经没有了这个obj, 因为该程序没有接收到deletion event, 所以该obj在本地缓存中依然存在, 所以此时会给这个obj构造成这个DeletedFinalStateUnknown类型.
    tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
    if !ok {
      utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
      return
    }
    // 转换成 rs 对象
    rs, ok = tombstone.Obj.(*apps.ReplicaSet)
    if !ok {
      utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a ReplicaSet %#v", obj))
      return
    }
  }
  // 获取唯一 key
  key, err := controller.KeyFunc(rs)
  if err != nil {
    utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", rs, err))
    return
  }

  klog.V(4).Infof("Deleting %s %q", rsc.Kind, key)

  // ReplicaSet 删除时,删除期望记录
  rsc.expectations.DeleteExpectations(key)
  // 入列
  rsc.queue.Add(key)
}

enqueueRS

增加和更新函数都是调用 enqueueRS 入列的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
// 直接入列,很好理解
func (rsc *ReplicaSetController) enqueueRS(rs *apps.ReplicaSet) {
  key, err := controller.KeyFunc(rs)
  if err != nil {
    utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", rs, err))
    return
  }

  rsc.queue.Add(key)
}

// 延迟入列,很好理解
func (rsc *ReplicaSetController) enqueueRSAfter(rs *apps.ReplicaSet, duration time.Duration) {
  key, err := controller.KeyFunc(rs)
  if err != nil {
    utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", rs, err))
    return
  }

  rsc.queue.AddAfter(key, duration)
}

addPod

我们再看看 pod 相关事件的一些方法,监听到 pod add 事件的处理逻辑。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
func (rsc *ReplicaSetController) addPod(obj interface{}) {
  pod := obj.(*v1.Pod)

  if pod.DeletionTimestamp != nil {
    // 有可能 rs 控制器重启的时候,会收到多一次 add 事件,而这个事件里面可能包含了 pod 的删除事件而造成多执行了一次 creation observation.
    rsc.deletePod(pod)
    return
  }

  // 解析 pod 的 owner rs
  if controllerRef := metav1.GetControllerOf(pod); controllerRef != nil {
    rs := rsc.resolveControllerRef(pod.Namespace, controllerRef)
    if rs == nil {
      return
    }
    // 拿到 rs 的唯一 key
    rsKey, err := controller.KeyFunc(rs)
    if err != nil {
      return
    }
    klog.V(4).Infof("Pod %s created: %#v.", pod.Name, pod)
    // 期望增加 pod 的数 -1
    rsc.expectations.CreationObserved(rsKey)
    // 入列
    rsc.queue.Add(rsKey)
    return
  }

  // 如果没找到自己的 owner rs,则被认为是孤儿 pod,尝试使用 pod label 标签去匹配看看有没有 rs 能收养它
  rss := rsc.getPodReplicaSets(pod)
  if len(rss) == 0 {
    return
  }
  klog.V(4).Infof("Orphan Pod %s created: %#v.", pod.Name, pod)
  for _, rs := range rss {
    rsc.enqueueRS(rs)
  }
}

// getPodReplicaSets 调用的是 client-go 的 GetPodReplicaSets 方法,根据标签去查找匹配的 rs
func (rsc *ReplicaSetController) getPodReplicaSets(pod *v1.Pod) []*apps.ReplicaSet {
  rss, err := rsc.rsLister.GetPodReplicaSets(pod)
  if err != nil {
    return nil
  }
  if len(rss) > 1 {
    // ControllerRef will ensure we don't do anything crazy, but more than one
    // item in this list nevertheless constitutes user error.
    utilruntime.HandleError(fmt.Errorf("user error! more than one %v is selecting pods with labels: %+v", rsc.Kind, pod.Labels))
  }
  return rss
}

updatePod

监听到 pod update 事件的处理逻辑。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
// pod 被更新了,需要唤醒他们的 rs 去更新状态,如果 pod 标签变更,需要同时唤醒新旧 rs 
func (rsc *ReplicaSetController) updatePod(old, cur interface{}) {
  curPod := cur.(*v1.Pod)
  oldPod := old.(*v1.Pod)
  if curPod.ResourceVersion == oldPod.ResourceVersion {
    // ResourceVersion 没变化则不做变更
    return
  }

  labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels)
  if curPod.DeletionTimestamp != nil {
    // pod 处于删除状态
    rsc.deletePod(curPod)
    if labelChanged {
      // we don't need to check the oldPod.DeletionTimestamp because DeletionTimestamp cannot be unset.
      rsc.deletePod(oldPod)
    }
    return
  }

  curControllerRef := metav1.GetControllerOf(curPod)
  oldControllerRef := metav1.GetControllerOf(oldPod)
  controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
  if controllerRefChanged && oldControllerRef != nil {
    // The ControllerRef was changed. Sync the old controller, if any.
    if rs := rsc.resolveControllerRef(oldPod.Namespace, oldControllerRef); rs != nil {
      rsc.enqueueRS(rs)
    }
  }

  // 查找 pod 的父级
  if curControllerRef != nil {
    rs := rsc.resolveControllerRef(curPod.Namespace, curControllerRef)
    if rs == nil {
      return
    }
    klog.V(4).Infof("Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta)
    rsc.enqueueRS(rs)
    // spec.minReadySeconds 能够定义在新的Pod对象创建后至少需要等待多长的时间才能会被认为其就绪,在该段时间内,就算准备就绪探针ok了,更新操作也会被阻塞。
    if !podutil.IsPodReady(oldPod) && podutil.IsPodReady(curPod) && rs.Spec.MinReadySeconds > 0 {
      klog.V(2).Infof("%v %q will be enqueued after %ds for availability check", rsc.Kind, rs.Name, rs.Spec.MinReadySeconds)
      // Add a second to avoid milliseconds skew in AddAfter.
      // See https://github.com/kubernetes/kubernetes/issues/39785#issuecomment-279959133 for more info.
      rsc.enqueueRSAfter(rs, (time.Duration(rs.Spec.MinReadySeconds)*time.Second)+time.Second)
    }
    return
  }

  // 如果标签和 owner rs 都变了
  if labelChanged || controllerRefChanged {
    rss := rsc.getPodReplicaSets(curPod)
    // 为0 就是孤儿了
    if len(rss) == 0 {
      return
    }
    klog.V(4).Infof("Orphan Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta)
    // 不为零说明有 rs 接管,重新入列,去重新同步处理
    for _, rs := range rss {
      rsc.enqueueRS(rs)
    }
  }
}

deletePod

监听到 pod delete 事件的处理逻辑。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func (rsc *ReplicaSetController) deletePod(obj interface{}) {
  pod, ok := obj.(*v1.Pod)

  if !ok {
    tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
    if !ok {
      utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %+v", obj))
      return
    }
    pod, ok = tombstone.Obj.(*v1.Pod)
    if !ok {
      utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a pod %#v", obj))
      return
    }
  }

  controllerRef := metav1.GetControllerOf(pod)
  if controllerRef == nil {
    // No controller should care about orphans being deleted.
    return
  }
  rs := rsc.resolveControllerRef(pod.Namespace, controllerRef)
  if rs == nil {
    return
  }
  // 获取 rs 唯一 key
  rsKey, err := controller.KeyFunc(rs)
  if err != nil {
    utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", rs, err))
    return
  }
  klog.V(4).Infof("Pod %s/%s deleted through %v, timestamp %+v: %#v.", pod.Namespace, pod.Name, utilruntime.GetCaller(), pod.DeletionTimestamp, pod)
  // 期望删除 pod 的数量 -1
  rsc.expectations.DeletionObserved(rsKey, controller.PodKey(pod))
  // 重新入列
  rsc.queue.Add(rsKey)
}

NewUIDTrackingControllerExpectations

前面我们在 job 控制的源码里面看到过 ControllerExpectations ,有没有很眼熟?

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
// 结构体
type UIDTrackingControllerExpectations struct {
  ControllerExpectationsInterface
  // TODO: There is a much nicer way to do this that involves a single store,
  // a lock per entry, and a ControlleeExpectationsInterface type.
  uidStoreLock sync.Mutex
  // Store used for the UIDs associated with any expectation tracked via the
  // ControllerExpectationsInterface.
  uidStore cache.Store
}

func (u *UIDTrackingControllerExpectations) GetUIDs(controllerKey string) sets.String {
  if uid, exists, err := u.uidStore.GetByKey(controllerKey); err == nil && exists {
    return uid.(*UIDSet).String
  }
  return nil
}

// 重写了 ControllerExpectationsInterface 的 ExpectDeletions 方法,增加了 uid 集合的设置
func (u *UIDTrackingControllerExpectations) ExpectDeletions(rcKey string, deletedKeys []string) error {
  expectedUIDs := sets.NewString()
  for _, k := range deletedKeys {
    expectedUIDs.Insert(k)
  }
  klog.V(4).Infof("Controller %v waiting on deletions for: %+v", rcKey, deletedKeys)
  u.uidStoreLock.Lock()
  defer u.uidStoreLock.Unlock()

  // 按道理是不应该获取得到的,因为期望达成的情况下,相关 uid 集合都删掉了,所以这里会忽略存在的 uid,直接往下走覆盖掉
  if existing := u.GetUIDs(rcKey); existing != nil && existing.Len() != 0 {
    klog.Errorf("Clobbering existing delete keys: %+v", existing)
  }
  if err := u.uidStore.Add(&UIDSet{expectedUIDs, rcKey}); err != nil {
    return err
  }
  return u.ControllerExpectationsInterface.ExpectDeletions(rcKey, expectedUIDs.Len())
}

// 重写了 ControllerExpectationsInterface 的 DeletionObserved 方法,删除期望多了删除对应 rs 集合内对应的 pod uid 这一步
func (u *UIDTrackingControllerExpectations) DeletionObserved(rcKey, deleteKey string) {
  u.uidStoreLock.Lock()
  defer u.uidStoreLock.Unlock()

  uids := u.GetUIDs(rcKey)
  if uids != nil && uids.Has(deleteKey) {
    klog.V(4).Infof("Controller %v received delete for pod %v", rcKey, deleteKey)
    u.ControllerExpectationsInterface.DeletionObserved(rcKey)
    uids.Delete(deleteKey)
  }
}

// 重写了 ControllerExpectationsInterface 的 DeleteExpectations 方法,多了删除 uid 的步骤
func (u *UIDTrackingControllerExpectations) DeleteExpectations(rcKey string) {
  u.uidStoreLock.Lock()
  defer u.uidStoreLock.Unlock()

  u.ControllerExpectationsInterface.DeleteExpectations(rcKey)
  if uidExp, exists, err := u.uidStore.GetByKey(rcKey); err == nil && exists {
    if err := u.uidStore.Delete(uidExp); err != nil {
      klog.V(2).Infof("Error deleting uid expectations for controller %v: %v", rcKey, err)
    }
  }
}

// NewUIDTrackingControllerExpectations returns a wrapper around
// ControllerExpectations that is aware of deleteKeys.
func NewUIDTrackingControllerExpectations(ce ControllerExpectationsInterface) *UIDTrackingControllerExpectations {
  return &UIDTrackingControllerExpectations{ControllerExpectationsInterface: ce, uidStore: cache.NewStore(UIDSetKeyFunc)}
}

uid 其实就是 pod 的 [命名空间/pod名称] 这样的组合

在 ControllerExpectations 的基础上多了一个集合的 map

key values
ns/rsname [“ns1/pod1”,“ns1/pod2”, …]

这样就可以记录到具体的 rs 删除哪些 pod ,注意只有删除 pod 的时候才需要这样做。

worker

worker 这里也是启用多个协程来从队列里面获取并处理 ReplicaSet 对象的事件

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (rsc *ReplicaSetController) worker() {
  for rsc.processNextWorkItem() {
  }
}

func (rsc *ReplicaSetController) processNextWorkItem() bool {
  key, quit := rsc.queue.Get()
  if quit {
    return false
  }
  defer rsc.queue.Done(key)

  err := rsc.syncHandler(key.(string))
  if err == nil {
    rsc.queue.Forget(key)
    return true
  }

  utilruntime.HandleError(fmt.Errorf("sync %q failed with %v", key, err))
  rsc.queue.AddRateLimited(key)

  return true
}

syncReplicaSet

如果期望达成,则进行 ReplicaSet 同步。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
  startTime := time.Now()
  defer func() {
    klog.V(4).Infof("Finished syncing %v %q (%v)", rsc.Kind, key, time.Since(startTime))
  }()

  namespace, name, err := cache.SplitMetaNamespaceKey(key)
  if err != nil {
    return err
  }
  // 获取 rs 实例对象
  rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name)
  // 找不到报错,看看是不是已经被删了
  if apierrors.IsNotFound(err) {
    klog.V(4).Infof("%v %v has been deleted", rsc.Kind, key)
    rsc.expectations.DeleteExpectations(key)
    return nil
  }
  if err != nil {
    return err
  }

  // 检查期望是否达成
  rsNeedsSync := rsc.expectations.SatisfiedExpectations(key)
  // 拿到 rs 的 Selector 标签,关联了哪些 pod
  selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector)
  if err != nil {
    utilruntime.HandleError(fmt.Errorf("error converting pod selector to selector: %v", err))
    return nil
  }

  // 获取 rs 所在命名空间下的所有 pod
  allPods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything())
  if err != nil {
    return err
  }
  // 排除掉 terminated 状态下的 pod
  filteredPods := controller.FilterActivePods(allPods)

  // 遍历每个 pod ,如果有 rs 控制,则根据 uis 判断 rs 是否是控制自己的 rs
  // 如果 pod 的 rs 被删除,或者 pod 被释放,或者 2者 同时发生,则过滤掉这些 pod
  // 如果 pod 标签跟当前的 rs 匹配,则尝试给 pod 配置上 对应的 OwnerReferences,如果 pod 已经有其他的 OwnerReferences,则收养失败,会把 pod 过滤掉
  filteredPods, err = rsc.claimPods(rs, selector, filteredPods)
  if err != nil {
    return err
  }

  var manageReplicasErr error
  // 期望达成,且 rs 没有被删
  if rsNeedsSync && rs.DeletionTimestamp == nil {
    // 同步 rs
    manageReplicasErr = rsc.manageReplicas(filteredPods, rs)
  }
  // 更新 rs status 字段
  rs = rs.DeepCopy()
  // kubernetes/pkg/controller/replicaset/replica_set_utils.go
  // 设置 rs 状态,如果有错误 manageReplicasErr ,且过滤到的所有的 pod 总数少于指定的 Replicas 个数,则认为有 pod 创建失败,多了则认为有删除 pod 的操作失败,如果没有错误,则移除失败的条件状态设置,并配置状态下 Replicas、FullyLabeledReplicas、ReadyReplicas、AvailableReplicas 这些字段数值
  newStatus := calculateStatus(rs, filteredPods, manageReplicasErr)

  // 通过 clientset 更新 rs 的状态,并将更新完状态的 rs 对象返回
  updatedRS, err := updateReplicaSetStatus(rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus)
  if err != nil {
    // Multiple things could lead to this update failing. Requeuing the replica set ensures
    // Returning an error causes a requeue without forcing a hotloop
    return err
  }
  // Resync the ReplicaSet after MinReadySeconds as a last line of defense to guard against clock-skew.
  if manageReplicasErr == nil && updatedRS.Spec.MinReadySeconds > 0 &&
    updatedRS.Status.ReadyReplicas == *(updatedRS.Spec.Replicas) &&
    updatedRS.Status.AvailableReplicas != *(updatedRS.Spec.Replicas) {
    rsc.queue.AddAfter(key, time.Duration(updatedRS.Spec.MinReadySeconds)*time.Second)
  }
  return manageReplicasErr
}

func (rsc *ReplicaSetController) claimPods(rs *apps.ReplicaSet, selector labels.Selector, filteredPods []*v1.Pod) ([]*v1.Pod, error) {
  // 收养相关,方法内容是根据已有的 rs 查找 rs 对象,查到后匹配是不是同一个 rs 对象
  canAdoptFunc := controller.RecheckDeletionTimestamp(func() (metav1.Object, error) {
    fresh, err := rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace).Get(context.TODO(), rs.Name, metav1.GetOptions{})
    if err != nil {
      return nil, err
    }
    if fresh.UID != rs.UID {
      return nil, fmt.Errorf("original %v %v/%v is gone: got uid %v, wanted %v", rsc.Kind, rs.Namespace, rs.Name, fresh.UID, rs.UID)
    }
    return fresh, nil
  })
  cm := controller.NewPodControllerRefManager(rsc.podControl, rs, selector, rsc.GroupVersionKind, canAdoptFunc)
  // kubernetes/pkg/controller/controller_ref_manager.go
  return cm.ClaimPods(filteredPods)
}

manageReplicas

这里是管理 ReplicaSet pod 个数的地方。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {
  // 查看 rs 的 pod 个数
  diff := len(filteredPods) - int(*(rs.Spec.Replicas))
  rsKey, err := controller.KeyFunc(rs)
  if err != nil {
    utilruntime.HandleError(fmt.Errorf("couldn't get key for %v %#v: %v", rsc.Kind, rs, err))
    return nil
  }
  // 少补
  if diff < 0 {
    diff *= -1
    if diff > rsc.burstReplicas {
      diff = rsc.burstReplicas
    }
    // 期望增加 pod +diff 个
    rsc.expectations.ExpectCreations(rsKey, diff)
    klog.V(2).InfoS("Too few replicas", "replicaSet", klog.KObj(rs), "need", *(rs.Spec.Replicas), "creating", diff)
    // 批量创建 pod ,个数 1、2、4、8 以此类推
    successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error {
      err := rsc.podControl.CreatePods(rs.Namespace, &rs.Spec.Template, rs, metav1.NewControllerRef(rs, rsc.GroupVersionKind))
      if err != nil {
        if apierrors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
          // if the namespace is being terminated, we don't have to do
          // anything because any creation will fail
          return nil
        }
      }
      return err
    })

    // 统计创建失败的 pod 个数
    if skippedPods := diff - successfulCreations; skippedPods > 0 {
      klog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for %v %v/%v", skippedPods, rsc.Kind, rs.Namespace, rs.Name)
      for i := 0; i < skippedPods; i++ {
        // 期望创建pod数量相应要减去这些失败的个数
        rsc.expectations.CreationObserved(rsKey)
      }
    }
    return err
    // 多删
  } else if diff > 0 {
    if diff > rsc.burstReplicas {
      diff = rsc.burstReplicas
    }
    klog.V(2).InfoS("Too many replicas", "replicaSet", klog.KObj(rs), "need", *(rs.Spec.Replicas), "deleting", diff)

    // 获取 rs 对象本身控制的 pod 
    relatedPods, err := rsc.getIndirectlyRelatedPods(rs)
    utilruntime.HandleError(err)

    // 如果 diff > filteredPods,则说明全部 pod 都要删掉,否则根据算法算出要删除的 pod 并返回
    podsToDelete := getPodsToDelete(filteredPods, relatedPods, diff)

    // 期望删除的 pod ,加上 pod 们的 uid 集合
    rsc.expectations.ExpectDeletions(rsKey, getPodKeys(podsToDelete))

    errCh := make(chan error, diff)
    var wg sync.WaitGroup
    wg.Add(diff)
    for _, pod := range podsToDelete {
      go func(targetPod *v1.Pod) {
        defer wg.Done()
        if err := rsc.podControl.DeletePod(rs.Namespace, targetPod.Name, rs); err != nil {
          // Decrement the expected number of deletes because the informer won't observe this deletion
          podKey := controller.PodKey(targetPod)
          rsc.expectations.DeletionObserved(rsKey, podKey)
          if !apierrors.IsNotFound(err) {
            klog.V(2).Infof("Failed to delete %v, decremented expectations for %v %s/%s", podKey, rsc.Kind, rs.Namespace, rs.Name)
            errCh <- err
          }
        }
      }(pod)
    }
    wg.Wait()

    select {
    case err := <-errCh:
      // 如果批量删除的过程中,只要有一个报错就返回,因为后面的操作肯定也都是报同样的错
      if err != nil {
        return err
      }
    default:
    }
  }

  return nil
}

func getPodsToDelete(filteredPods, relatedPods []*v1.Pod, diff int) []*v1.Pod {
  // No need to sort pods if we are about to delete all of them.
  // diff will always be <= len(filteredPods), so not need to handle > case.
  if diff < len(filteredPods) {
    podsWithRanks := getPodsRankedByRelatedPodsOnSameNode(filteredPods, relatedPods)
    sort.Sort(podsWithRanks)
    reportSortingDeletionAgeRatioMetric(filteredPods, diff)
  }
  return filteredPods[:diff]
}

总结

  • 总体来看, ReplicaSet 的逻辑并不难,理解起来也很简单,跟 job 控制器差不多。

  • ReplicaSet 控制器启动后监听 ReplicaSet 增删改事件,和其他控制器一样,它监听到事件后并没有直接做操作,而是加入到队列里面,通过队列来实现多线程操作。

  • 在入列的时候,更新事件会检查 ReplicaSet 副本数和 uid 是否相同,删除事件会做一个删除期望记录的操作。

  • ReplicaSet 控制器也引入了 pod 控制器,用来创建/删除/更新 pod 信息,这个 pod 控制器是再 controller_util 里面对 Clientset 操作 Pod 接口的再封装。

  • 跟其他控制器一样,监听到 pod 事件时, 先判断 pod 是否处于删除状态,再判断 pod 是否属于 ReplicaSet 对象的,是的话,将 pod 的父对象加入队列,更新 ReplicaSet 对象的状态什么的。

  • 我们注意到 NewUIDTrackingControllerExpectations 这个东西,这个在 job 控制的源码里面看到过 ControllerExpectations 的很类似,只不过多了一个集合用来记录 ReplicaSet 的 pod 集合,key 是 ReplicaSet 的 key ,用 pod 命名空间和 pod 名 来做 uid 。

  • syncReplicaSet 期望达成才进入下一步的操作。

  • manageReplicas 这里是控制 pod 数量的地方,多则删除,少则新增。