目录

kubernetes源码-Deployment Controller 原理和源码分析

Deployment Controller 原理和源码分析,源码为 kubernetes 的 release-1.22 分支 .

写在前面

从 Deployment 控制器开始,代码量直接上来了,不过流程总的来讲都是跟其他控制器一样的。

入口函数

入口函数位于 kubernetes/cmd/kube-controller-manager/app/apps.go 下。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
func startDeploymentController(ctx ControllerContext) (http.Handler, bool, error) {
  dc, err := deployment.NewDeploymentController(
    ctx.InformerFactory.Apps().V1().Deployments(),
    ctx.InformerFactory.Apps().V1().ReplicaSets(),
    ctx.InformerFactory.Core().V1().Pods(),
    ctx.ClientBuilder.ClientOrDie("deployment-controller"),
  )
  if err != nil {
    return nil, true, fmt.Errorf("error creating Deployment controller: %v", err)
  }
  go dc.Run(int(ctx.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs), ctx.Stop)
  return nil, true, nil
}

NewDeploymentController

跟 ReplicaSet 控制器相比,Deployment 控制器的构造函数内容就相对多一些,不过,我们发现一点,pod 事件监听只监听了 delete 部分的。原因我们后面有讲。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
func NewDeploymentController(dInformer appsinformers.DeploymentInformer, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) {
  eventBroadcaster := record.NewBroadcaster()
  eventBroadcaster.StartStructuredLogging(0)
  eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})

  if client != nil && client.CoreV1().RESTClient().GetRateLimiter() != nil {
    if err := ratelimiter.RegisterMetricAndTrackRateLimiterUsage("deployment_controller", client.CoreV1().RESTClient().GetRateLimiter()); err != nil {
      return nil, err
    }
  }
  dc := &DeploymentController{
    client:        client,
    eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "deployment-controller"}),
    queue:         workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"),
  }
  dc.rsControl = controller.RealRSControl{
    KubeClient: client,
    Recorder:   dc.eventRecorder,
  }

  dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    AddFunc:    dc.addDeployment,
    UpdateFunc: dc.updateDeployment,
    // This will enter the sync loop and no-op, because the deployment has been deleted from the store.
    DeleteFunc: dc.deleteDeployment,
  })
  // 监听 ReplicaSet 相关事件
  rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    AddFunc:    dc.addReplicaSet,
    UpdateFunc: dc.updateReplicaSet,
    DeleteFunc: dc.deleteReplicaSet,
  })
  // 只监听 pod 的 delete 事件
  podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    DeleteFunc: dc.deletePod,
  })

  dc.syncHandler = dc.syncDeployment
  dc.enqueueDeployment = dc.enqueue

  dc.dLister = dInformer.Lister()
  dc.rsLister = rsInformer.Lister()
  dc.podLister = podInformer.Lister()
  dc.dListerSynced = dInformer.Informer().HasSynced
  dc.rsListerSynced = rsInformer.Informer().HasSynced
  dc.podListerSynced = podInformer.Informer().HasSynced
  return dc, nil
}

Run

和所有控制器一样,启动流程为:Run -> worker -> processNextWorkItem -> syncHandler。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) {
  defer utilruntime.HandleCrash()
  defer dc.queue.ShutDown()

  klog.InfoS("Starting controller", "controller", "deployment")
  defer klog.InfoS("Shutting down controller", "controller", "deployment")

  if !cache.WaitForNamedCacheSync("deployment", stopCh, dc.dListerSynced, dc.rsListerSynced, dc.podListerSynced) {
    return
  }
  // 默认 5 线程
  for i := 0; i < workers; i++ {
    go wait.Until(dc.worker, time.Second, stopCh)
  }

  <-stopCh
}

addDeployment

直接入列。

1
2
3
4
5
func (dc *DeploymentController) addDeployment(obj interface{}) {
  d := obj.(*apps.Deployment)
  klog.V(4).InfoS("Adding deployment", "deployment", klog.KObj(d))
  dc.enqueueDeployment(d)
}

updateDeployment

直接入列。

1
2
3
4
5
6
func (dc *DeploymentController) updateDeployment(old, cur interface{}) {
  oldD := old.(*apps.Deployment)
  curD := cur.(*apps.Deployment)
  klog.V(4).InfoS("Updating deployment", "deployment", klog.KObj(oldD))
  dc.enqueueDeployment(curD)
}

deleteDeployment

直接入列。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
func (dc *DeploymentController) deleteDeployment(obj interface{}) {
  d, ok := obj.(*apps.Deployment)
  if !ok {
    tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
    if !ok {
      utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
      return
    }
    d, ok = tombstone.Obj.(*apps.Deployment)
    if !ok {
      utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a Deployment %#v", obj))
      return
    }
  }
  klog.V(4).InfoS("Deleting deployment", "deployment", klog.KObj(d))
  dc.enqueueDeployment(d)
}

addReplicaSet

这里的处理逻辑跟 ReplicaSet 控制器是一样的,只不过入列的时候,是 Deployment 对象

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
func (dc *DeploymentController) addReplicaSet(obj interface{}) {
  rs := obj.(*apps.ReplicaSet)

  if rs.DeletionTimestamp != nil {
    // On a restart of the controller manager, it's possible for an object to
    // show up in a state that is already pending deletion.
    dc.deleteReplicaSet(rs)
    return
  }

  // If it has a ControllerRef, that's all that matters.
  if controllerRef := metav1.GetControllerOf(rs); controllerRef != nil {
    d := dc.resolveControllerRef(rs.Namespace, controllerRef)
    if d == nil {
      return
    }
    klog.V(4).InfoS("ReplicaSet added", "replicaSet", klog.KObj(rs))
    dc.enqueueDeployment(d)
    return
  }

  // 如果这个 rs 是孤儿的话,先去查找有哪些 Deployment 能收养,再将这些 Deployment 入列
  ds := dc.getDeploymentsForReplicaSet(rs)
  if len(ds) == 0 {
    return
  }
  klog.V(4).InfoS("Orphan ReplicaSet added", "replicaSet", klog.KObj(rs))
  for _, d := range ds {
    dc.enqueueDeployment(d)
  }
}

func (dc *DeploymentController) getDeploymentsForReplicaSet(rs *apps.ReplicaSet) []*apps.Deployment {
  // 根据 rs 标签去查找匹配的 Deployment
  deployments, err := util.GetDeploymentsForReplicaSet(dc.dLister, rs)
  if err != nil || len(deployments) == 0 {
    return nil
  }
  // 正常来讲,一个 rs 只归属于一个 Deployment ,如果上面的匹配结果出来以后大于1,则说明我们的标签是有问题的,需要我们人工介入去处理这件事。日志只打印大于1中的第一个 Deployment 
  if len(deployments) > 1 {
    // ControllerRef will ensure we don't do anything crazy, but more than one
    // item in this list nevertheless constitutes user error.
    klog.V(4).InfoS("user error! more than one deployment is selecting replica set",
      "replicaSet", klog.KObj(rs), "labels", rs.Labels, "deployment", klog.KObj(deployments[0]))
  }
  return deployments
}

updateReplicaSet

这里的处理逻辑跟 ReplicaSet 控制器是一样的.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
func (dc *DeploymentController) updateReplicaSet(old, cur interface{}) {
  curRS := cur.(*apps.ReplicaSet)
  oldRS := old.(*apps.ReplicaSet)
  if curRS.ResourceVersion == oldRS.ResourceVersion {
    // Periodic resync will send update events for all known replica sets.
    // Two different versions of the same replica set will always have different RVs.
    return
  }

  curControllerRef := metav1.GetControllerOf(curRS)
  oldControllerRef := metav1.GetControllerOf(oldRS)
  controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
  if controllerRefChanged && oldControllerRef != nil {
    // The ControllerRef was changed. Sync the old controller, if any.
    if d := dc.resolveControllerRef(oldRS.Namespace, oldControllerRef); d != nil {
      dc.enqueueDeployment(d)
    }
  }

  // If it has a ControllerRef, that's all that matters.
  if curControllerRef != nil {
    d := dc.resolveControllerRef(curRS.Namespace, curControllerRef)
    if d == nil {
      return
    }
    klog.V(4).InfoS("ReplicaSet updated", "replicaSet", klog.KObj(curRS))
    dc.enqueueDeployment(d)
    return
  }

  // Otherwise, it's an orphan. If anything changed, sync matching controllers
  // to see if anyone wants to adopt it now.
  labelChanged := !reflect.DeepEqual(curRS.Labels, oldRS.Labels)
  if labelChanged || controllerRefChanged {
    ds := dc.getDeploymentsForReplicaSet(curRS)
    if len(ds) == 0 {
      return
    }
    klog.V(4).InfoS("Orphan ReplicaSet updated", "replicaSet", klog.KObj(curRS))
    for _, d := range ds {
      dc.enqueueDeployment(d)
    }
  }
}

deleteReplicaSet

这里的处理逻辑跟 ReplicaSet 控制器是一样的.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func (dc *DeploymentController) deleteReplicaSet(obj interface{}) {
  rs, ok := obj.(*apps.ReplicaSet)

  // When a delete is dropped, the relist will notice a pod in the store not
  // in the list, leading to the insertion of a tombstone object which contains
  // the deleted key/value. Note that this value might be stale. If the ReplicaSet
  // changed labels the new deployment will not be woken up till the periodic resync.
  if !ok {
    tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
    if !ok {
      utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
      return
    }
    rs, ok = tombstone.Obj.(*apps.ReplicaSet)
    if !ok {
      utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a ReplicaSet %#v", obj))
      return
    }
  }

  controllerRef := metav1.GetControllerOf(rs)
  if controllerRef == nil {
    // No controller should care about orphans being deleted.
    return
  }
  d := dc.resolveControllerRef(rs.Namespace, controllerRef)
  if d == nil {
    return
  }
  klog.V(4).InfoS("ReplicaSet deleted", "replicaSet", klog.KObj(rs))
  dc.enqueueDeployment(d)
}

deletePod

监听到 pod delete 事件的处理逻辑,目的是为了区分更新策略是不是 Recreate 。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
func (dc *DeploymentController) deletePod(obj interface{}) {
  pod, ok := obj.(*v1.Pod)

  if !ok {
    tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
    if !ok {
      utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
      return
    }
    pod, ok = tombstone.Obj.(*v1.Pod)
    if !ok {
      utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a pod %#v", obj))
      return
    }
  }
  klog.V(4).InfoS("Pod deleted", "pod", klog.KObj(pod))
  if d := dc.getDeploymentForPod(pod); d != nil && d.Spec.Strategy.Type == apps.RecreateDeploymentStrategyType {
    // 等到全部 pod 都删除完了,再同步。
    rsList, err := util.ListReplicaSets(d, util.RsListFromClient(dc.client.AppsV1()))
    if err != nil {
      return
    }
    podMap, err := dc.getPodMapForDeployment(d, rsList)
    if err != nil {
      return
    }
    numPods := 0
    for _, podList := range podMap {
      numPods += len(podList)
    }
    if numPods == 0 {
      dc.enqueueDeployment(d)
    }
  }
}

// 根据 pod 获取 rs ,再根据 rs 去获取 Deployment
func (dc *DeploymentController) getDeploymentForPod(pod *v1.Pod) *apps.Deployment {
  // Find the owning replica set
  var rs *apps.ReplicaSet
  var err error
  controllerRef := metav1.GetControllerOf(pod)
  if controllerRef == nil {
    // No controller owns this Pod.
    return nil
  }
  if controllerRef.Kind != apps.SchemeGroupVersion.WithKind("ReplicaSet").Kind {
    // Not a pod owned by a replica set.
    return nil
  }
  rs, err = dc.rsLister.ReplicaSets(pod.Namespace).Get(controllerRef.Name)
  if err != nil || rs.UID != controllerRef.UID {
    klog.V(4).InfoS("Cannot get replicaset for pod", "ownerReference", controllerRef.Name, "pod", klog.KObj(pod), "err", err)
    return nil
  }

  // Now find the Deployment that owns that ReplicaSet.
  controllerRef = metav1.GetControllerOf(rs)
  if controllerRef == nil {
    return nil
  }
  return dc.resolveControllerRef(rs.Namespace, controllerRef)
}

enqueue

队列有 3 种。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (dc *DeploymentController) enqueue(deployment *apps.Deployment) {
  key, err := controller.KeyFunc(deployment)
  if err != nil {
    utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", deployment, err))
    return
  }

  dc.queue.Add(key)
}

func (dc *DeploymentController) enqueueRateLimited(deployment *apps.Deployment) {
  key, err := controller.KeyFunc(deployment)
  if err != nil {
    utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", deployment, err))
    return
  }

  dc.queue.AddRateLimited(key)
}

// enqueueAfter will enqueue a deployment after the provided amount of time.
func (dc *DeploymentController) enqueueAfter(deployment *apps.Deployment, after time.Duration) {
  key, err := controller.KeyFunc(deployment)
  if err != nil {
    utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", deployment, err))
    return
  }

  dc.queue.AddAfter(key, after)
}

worker

worker 这里也是启用多个协程来从队列里面获取并处理 Deployment 对象的事件

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
func (dc *DeploymentController) worker() {
  for dc.processNextWorkItem() {
  }
}

func (dc *DeploymentController) processNextWorkItem() bool {
  key, quit := dc.queue.Get()
  if quit {
    return false
  }
  defer dc.queue.Done(key)

  err := dc.syncHandler(key.(string))
  dc.handleErr(err, key)

  return true
}

func (dc *DeploymentController) handleErr(err error, key interface{}) {
  // 如果命名空间被删
  if err == nil || errors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
    dc.queue.Forget(key)
    return
  }

  ns, name, keyErr := cache.SplitMetaNamespaceKey(key.(string))
  // 获取唯一 key 失败
  if keyErr != nil {
    klog.ErrorS(err, "Failed to split meta namespace cache key", "cacheKey", key)
  }

  // 没有达到最大重试次数之前,重试都加入到限速队列
  if dc.queue.NumRequeues(key) < maxRetries {
    klog.V(2).InfoS("Error syncing deployment", "deployment", klog.KRef(ns, name), "err", err)
    dc.queue.AddRateLimited(key)
    return
  }
  // 其他错误,将 key 从队列移除
  utilruntime.HandleError(err)
  klog.V(2).InfoS("Dropping deployment out of the queue", "deployment", klog.KRef(ns, name), "err", err)
  dc.queue.Forget(key)
}

syncDeployment

重点就在这一块区域,Deployment 的回滚,滚动更新,以及 pod 扩缩容的相关代码全在这了,不过源码是分开多个文件编码的,篇幅过长,这里只是精简拿出部分方法来进行分析。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
func (dc *DeploymentController) syncDeployment(key string) error {
  namespace, name, err := cache.SplitMetaNamespaceKey(key)
  if err != nil {
    klog.ErrorS(err, "Failed to split meta namespace cache key", "cacheKey", key)
    return err
  }

  startTime := time.Now()
  klog.V(4).InfoS("Started syncing deployment", "deployment", klog.KRef(namespace, name), "startTime", startTime)
  defer func() {
    klog.V(4).InfoS("Finished syncing deployment", "deployment", klog.KRef(namespace, name), "duration", time.Since(startTime))
  }()

  deployment, err := dc.dLister.Deployments(namespace).Get(name)
  // 查找不到,看看是不是被删了
  if errors.IsNotFound(err) {
    klog.V(2).InfoS("Deployment has been deleted", "deployment", klog.KRef(namespace, name))
    return nil
  }
  if err != nil {
    return err
  }

  // Deep-copy otherwise we are mutating our cache.
  // TODO: Deep-copy only when needed.
  d := deployment.DeepCopy()

  everything := metav1.LabelSelector{}
  // 这里判断 deployment 是不是去匹配了所有 pod,是的话,事件记录一下这个 deployment 通知用户设置标签,然后判断 Generation 是不是最新的,更新 status
  if reflect.DeepEqual(d.Spec.Selector, &everything) {
    dc.eventRecorder.Eventf(d, v1.EventTypeWarning, "SelectingAll", "This deployment is selecting all pods. A non-empty selector is required.")
    if d.Status.ObservedGeneration < d.Generation {
      d.Status.ObservedGeneration = d.Generation
      dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(context.TODO(), d, metav1.UpdateOptions{})
    }
    return nil
  }

  // Deployment 获取所有属于它的 rs
  rsList, err := dc.getReplicaSetsForDeployment(d)
  if err != nil {
    return err
  }
  // 根据 pod 先获取所有 rs ,再用一个 map[types.UID][]*v1.Pod 类型的 map ,以 rs 的 uid 为 key ,记录每个 rs 都有哪些 pod
  podMap, err := dc.getPodMapForDeployment(d, rsList)
  if err != nil {
    return err
  }

  // 如果 Deployment 被删了,没什么特别的处理,就更新一下 status
  if d.DeletionTimestamp != nil {
    return dc.syncStatusOnly(d, rsList)
  }

  // 检查 deployment 是否暂停状态,期间也会检测有没有超过 progressDeadlineSeconds 部署超时时间
  // progressDeadlineSeconds 默认是 600s ,在期限内没完成部署,则认为进程卡住,会打印出相关事件,如果 deployment 是暂停状态,则不做任何操作,非暂停状态会继续等待 deployment 完成部署。
  if err = dc.checkPausedConditions(d); err != nil {
    return err
  }
  // 如果 Paused 字段设置了值,则做 sync 同步操作
  // 1. 先找到所有的旧 rs (根据创建时间排序)
  // 2. 计算出并同步 rs 和 deployment 的 Revision 和 condition 状态 ,(这里 sync 没有涉及到新建 rs 的操作)
  if d.Spec.Paused {
    return dc.sync(d, rsList)
  }

  // 检测 Annotations 标签 "deprecated.deployment.rollback.to" 是否存在
  if getRollbackTo(d) != nil {
    // 回滚到指定 Revision
    return dc.rollback(d, rsList)
  }

  // rs 的 desired != *(d.Spec.Replicas) 为 true 则任务是在进行 pod 扩缩容
  scalingEvent, err := dc.isScalingEvent(d, rsList)
  if err != nil {
    return err
  }
  // 同步 dp 状态
  if scalingEvent {
    return dc.sync(d, rsList)
  }

  // 根据不同的滚动策略,执行不同的方法
  switch d.Spec.Strategy.Type {
  case apps.RecreateDeploymentStrategyType:
    // 先把所有 rs 的 pod 数量设置为 0, 然后更新 DeploymentStatus ,再创建新的 rs ,pod 扩缩容(调用clientset update rs 的 pod 数量)
    return dc.rolloutRecreate(d, rsList, podMap)
  case apps.RollingUpdateDeploymentStrategyType:
    // 创建新的 rs ,计算出新的 rs 要扩容的 pod 数量,和旧的 rs 要缩容的数量,调用clientset update rs 的 pod 数量
    return dc.rolloutRolling(d, rsList)
  }
  return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type)
}

sync

扩缩容方法定义在这里。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// kubernetes/pkg/controller/deployment/sync.go
func (dc *DeploymentController) sync(d *apps.Deployment, rsList []*apps.ReplicaSet) error {
  newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, false)
  if err != nil {
    return err
  }
  if err := dc.scale(d, newRS, oldRSs); err != nil {
    // If we get an error while trying to scale, the deployment will be requeued
    // so we can abort this resync
    return err
  }

  // Clean up the deployment when it's paused and no rollback is in flight.
  if d.Spec.Paused && getRollbackTo(d) == nil {
    // 最后清理历史版本
    if err := dc.cleanupDeployment(oldRSs, d); err != nil {
      return err
    }
  }

  allRSs := append(oldRSs, newRS)
  return dc.syncDeploymentStatus(allRSs, newRS, d)
}

rollback

回滚的相关方法,这里只是部分代码,由于篇幅原因,一些工具类的方法没有列出。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// kubernetes/pkg/controller/deployment/rollback.go
func (dc *DeploymentController) rollback(d *apps.Deployment, rsList []*apps.ReplicaSet) error {
  newRS, allOldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, true)
  if err != nil {
    return err
  }

  allRSs := append(allOldRSs, newRS)
  rollbackTo := getRollbackTo(d)
  // 如果为 0 滚到最新的 revision
  if rollbackTo.Revision == 0 {
    if rollbackTo.Revision = deploymentutil.LastRevision(allRSs); rollbackTo.Revision == 0 {
      // 没找到最新的 revision ,抛出事件
      dc.emitRollbackWarningEvent(d, deploymentutil.RollbackRevisionNotFound, "Unable to find last revision.")
      // 放弃回滚(删除 deprecated.deployment.rollback.to 标签,再更新 dp 状态)
      return dc.updateDeploymentAndClearRollbackTo(d)
    }
  }
  for _, rs := range allRSs {
    v, err := deploymentutil.Revision(rs)
    if err != nil {
      klog.V(4).Infof("Unable to extract revision from deployment's replica set %q: %v", rs.Name, err)
      continue
    }
    if v == rollbackTo.Revision {
      klog.V(4).Infof("Found replica set %q with desired revision %d", rs.Name, v)
      // 执行回滚(将匹配到的 rs pod Template 设置到 Deployment 上)
      performedRollback, err := dc.rollbackToTemplate(d, rs)
      if performedRollback && err == nil {
        dc.emitRollbackNormalEvent(d, fmt.Sprintf("Rolled back deployment %q to revision %d", d.Name, rollbackTo.Revision))
      }
      return err
    }
  }
  dc.emitRollbackWarningEvent(d, deploymentutil.RollbackRevisionNotFound, "Unable to find the revision to rollback to.")
  // Gives up rollback
  return dc.updateDeploymentAndClearRollbackTo(d)
}

func getRollbackTo(d *apps.Deployment) *extensions.RollbackConfig {
  // 提取 Annotations 标签 "deprecated.deployment.rollback.to"
  revision := d.Annotations[apps.DeprecatedRollbackTo]
  if revision == "" {
    return nil
  }
  revision64, err := strconv.ParseInt(revision, 10, 64)
  if err != nil {
    // If it's invalid, ignore it.
    return nil
  }
  return &extensions.RollbackConfig{
    Revision: revision64,
  }
}

rolloutRecreate

重建更新。由于篇幅原因,一些工具类的方法没有列出。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
func (dc *DeploymentController) rolloutRecreate(d *apps.Deployment, rsList []*apps.ReplicaSet, podMap map[types.UID][]*v1.Pod) error {
  // 先不创建新的 rs
  newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, false)
  if err != nil {
    return err
  }
  allRSs := append(oldRSs, newRS)
  activeOldRSs := controller.FilterActiveReplicaSets(oldRSs)

  // 把所有旧的rs缩容为 0 (等 pod 数量为 0 ,触发 dp 的同步)
  scaledDown, err := dc.scaleDownOldReplicaSetsForRecreate(activeOldRSs, d)
  if err != nil {
    return err
  }
  if scaledDown {
    // Update DeploymentStatus.
    return dc.syncRolloutStatus(allRSs, newRS, d)
  }

  // 看看是不是还有 pod 在跑
  if oldPodsRunning(newRS, oldRSs, podMap) {
    return dc.syncRolloutStatus(allRSs, newRS, d)
  }

  // 这个时候就可以创建新的 rs 了
  if newRS == nil {
    newRS, oldRSs, err = dc.getAllReplicaSetsAndSyncRevision(d, rsList, true)
    if err != nil {
      return err
    }
    allRSs = append(oldRSs, newRS)
  }

  // scale up new replica set.
  if _, err := dc.scaleUpNewReplicaSetForRecreate(newRS, d); err != nil {
    return err
  }

  // 清理历史版本
  if util.DeploymentComplete(d, &d.Status) {
    if err := dc.cleanupDeployment(oldRSs, d); err != nil {
      return err
    }
  }

  // Sync deployment status.
  return dc.syncRolloutStatus(allRSs, newRS, d)
}

rolloutRolling

滚动更新,由于篇幅原因,一些工具类的方法没有列出。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func (dc *DeploymentController) rolloutRolling(d *apps.Deployment, rsList []*apps.ReplicaSet) error {
  newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, true)
  if err != nil {
    return err
  }
  allRSs := append(oldRSs, newRS)

  // 更新 rs 的 pod 数量(创建新的 rs)
  scaledUp, err := dc.reconcileNewReplicaSet(allRSs, newRS, d)
  if err != nil {
    return err
  }
  if scaledUp {
    // Update DeploymentStatus
    return dc.syncRolloutStatus(allRSs, newRS, d)
  }

  // 缩容旧的 rs 的 pod 数量
  scaledDown, err := dc.reconcileOldReplicaSets(allRSs, controller.FilterActiveReplicaSets(oldRSs), newRS, d)
  if err != nil {
    return err
  }
  if scaledDown {
    // Update DeploymentStatus
    return dc.syncRolloutStatus(allRSs, newRS, d)
  }

  if deploymentutil.DeploymentComplete(d, &d.Status) {
    // 清理历史版本
    if err := dc.cleanupDeployment(oldRSs, d); err != nil {
      return err
    }
  }

  // Sync deployment status
  return dc.syncRolloutStatus(allRSs, newRS, d)
}

总结

  • Deployment 控制器对于 pod 的监听只保留了 delete 事件,目的是为了区分更新策略为 Recreate 时 pod 有没有完全删除干净,再同步。

  • Deployment 只需要跟 rs 对接(调用 clientset 去更新 rs ),由 rs 来控制 pod 的数量,并且通过监听 rs 的事件来更新自身的状态。

  • Deployment 比较多的逻辑是在于它本身的滚动更新,比如回滚,重建更新,滚动更新,大部分代码都在实现这些功能。