Deployment Controller 原理和源码分析,源码为 kubernetes 的 release-1.22 分支 .
写在前面
从 Deployment 控制器开始,代码量直接上来了,不过流程总的来讲都是跟其他控制器一样的。
入口函数
入口函数位于 kubernetes/cmd/kube-controller-manager/app/apps.go 下。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
func startDeploymentController(ctx ControllerContext) (http.Handler, bool, error) {
dc, err := deployment.NewDeploymentController(
ctx.InformerFactory.Apps().V1().Deployments(),
ctx.InformerFactory.Apps().V1().ReplicaSets(),
ctx.InformerFactory.Core().V1().Pods(),
ctx.ClientBuilder.ClientOrDie("deployment-controller"),
)
if err != nil {
return nil, true, fmt.Errorf("error creating Deployment controller: %v", err)
}
go dc.Run(int(ctx.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs), ctx.Stop)
return nil, true, nil
}
|
NewDeploymentController
跟 ReplicaSet 控制器相比,Deployment 控制器的构造函数内容就相对多一些,不过,我们发现一点,pod 事件监听只监听了 delete 部分的。原因我们后面有讲。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
func NewDeploymentController(dInformer appsinformers.DeploymentInformer, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartStructuredLogging(0)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})
if client != nil && client.CoreV1().RESTClient().GetRateLimiter() != nil {
if err := ratelimiter.RegisterMetricAndTrackRateLimiterUsage("deployment_controller", client.CoreV1().RESTClient().GetRateLimiter()); err != nil {
return nil, err
}
}
dc := &DeploymentController{
client: client,
eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "deployment-controller"}),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"),
}
dc.rsControl = controller.RealRSControl{
KubeClient: client,
Recorder: dc.eventRecorder,
}
dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: dc.addDeployment,
UpdateFunc: dc.updateDeployment,
// This will enter the sync loop and no-op, because the deployment has been deleted from the store.
DeleteFunc: dc.deleteDeployment,
})
// 监听 ReplicaSet 相关事件
rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: dc.addReplicaSet,
UpdateFunc: dc.updateReplicaSet,
DeleteFunc: dc.deleteReplicaSet,
})
// 只监听 pod 的 delete 事件
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: dc.deletePod,
})
dc.syncHandler = dc.syncDeployment
dc.enqueueDeployment = dc.enqueue
dc.dLister = dInformer.Lister()
dc.rsLister = rsInformer.Lister()
dc.podLister = podInformer.Lister()
dc.dListerSynced = dInformer.Informer().HasSynced
dc.rsListerSynced = rsInformer.Informer().HasSynced
dc.podListerSynced = podInformer.Informer().HasSynced
return dc, nil
}
|
Run
和所有控制器一样,启动流程为:Run -> worker -> processNextWorkItem -> syncHandler。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer dc.queue.ShutDown()
klog.InfoS("Starting controller", "controller", "deployment")
defer klog.InfoS("Shutting down controller", "controller", "deployment")
if !cache.WaitForNamedCacheSync("deployment", stopCh, dc.dListerSynced, dc.rsListerSynced, dc.podListerSynced) {
return
}
// 默认 5 线程
for i := 0; i < workers; i++ {
go wait.Until(dc.worker, time.Second, stopCh)
}
<-stopCh
}
|
addDeployment
直接入列。
1
2
3
4
5
|
func (dc *DeploymentController) addDeployment(obj interface{}) {
d := obj.(*apps.Deployment)
klog.V(4).InfoS("Adding deployment", "deployment", klog.KObj(d))
dc.enqueueDeployment(d)
}
|
updateDeployment
直接入列。
1
2
3
4
5
6
|
func (dc *DeploymentController) updateDeployment(old, cur interface{}) {
oldD := old.(*apps.Deployment)
curD := cur.(*apps.Deployment)
klog.V(4).InfoS("Updating deployment", "deployment", klog.KObj(oldD))
dc.enqueueDeployment(curD)
}
|
deleteDeployment
直接入列。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
func (dc *DeploymentController) deleteDeployment(obj interface{}) {
d, ok := obj.(*apps.Deployment)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
return
}
d, ok = tombstone.Obj.(*apps.Deployment)
if !ok {
utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a Deployment %#v", obj))
return
}
}
klog.V(4).InfoS("Deleting deployment", "deployment", klog.KObj(d))
dc.enqueueDeployment(d)
}
|
addReplicaSet
这里的处理逻辑跟 ReplicaSet 控制器是一样的,只不过入列的时候,是 Deployment 对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
func (dc *DeploymentController) addReplicaSet(obj interface{}) {
rs := obj.(*apps.ReplicaSet)
if rs.DeletionTimestamp != nil {
// On a restart of the controller manager, it's possible for an object to
// show up in a state that is already pending deletion.
dc.deleteReplicaSet(rs)
return
}
// If it has a ControllerRef, that's all that matters.
if controllerRef := metav1.GetControllerOf(rs); controllerRef != nil {
d := dc.resolveControllerRef(rs.Namespace, controllerRef)
if d == nil {
return
}
klog.V(4).InfoS("ReplicaSet added", "replicaSet", klog.KObj(rs))
dc.enqueueDeployment(d)
return
}
// 如果这个 rs 是孤儿的话,先去查找有哪些 Deployment 能收养,再将这些 Deployment 入列
ds := dc.getDeploymentsForReplicaSet(rs)
if len(ds) == 0 {
return
}
klog.V(4).InfoS("Orphan ReplicaSet added", "replicaSet", klog.KObj(rs))
for _, d := range ds {
dc.enqueueDeployment(d)
}
}
func (dc *DeploymentController) getDeploymentsForReplicaSet(rs *apps.ReplicaSet) []*apps.Deployment {
// 根据 rs 标签去查找匹配的 Deployment
deployments, err := util.GetDeploymentsForReplicaSet(dc.dLister, rs)
if err != nil || len(deployments) == 0 {
return nil
}
// 正常来讲,一个 rs 只归属于一个 Deployment ,如果上面的匹配结果出来以后大于1,则说明我们的标签是有问题的,需要我们人工介入去处理这件事。日志只打印大于1中的第一个 Deployment
if len(deployments) > 1 {
// ControllerRef will ensure we don't do anything crazy, but more than one
// item in this list nevertheless constitutes user error.
klog.V(4).InfoS("user error! more than one deployment is selecting replica set",
"replicaSet", klog.KObj(rs), "labels", rs.Labels, "deployment", klog.KObj(deployments[0]))
}
return deployments
}
|
updateReplicaSet
这里的处理逻辑跟 ReplicaSet 控制器是一样的.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
func (dc *DeploymentController) updateReplicaSet(old, cur interface{}) {
curRS := cur.(*apps.ReplicaSet)
oldRS := old.(*apps.ReplicaSet)
if curRS.ResourceVersion == oldRS.ResourceVersion {
// Periodic resync will send update events for all known replica sets.
// Two different versions of the same replica set will always have different RVs.
return
}
curControllerRef := metav1.GetControllerOf(curRS)
oldControllerRef := metav1.GetControllerOf(oldRS)
controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
if controllerRefChanged && oldControllerRef != nil {
// The ControllerRef was changed. Sync the old controller, if any.
if d := dc.resolveControllerRef(oldRS.Namespace, oldControllerRef); d != nil {
dc.enqueueDeployment(d)
}
}
// If it has a ControllerRef, that's all that matters.
if curControllerRef != nil {
d := dc.resolveControllerRef(curRS.Namespace, curControllerRef)
if d == nil {
return
}
klog.V(4).InfoS("ReplicaSet updated", "replicaSet", klog.KObj(curRS))
dc.enqueueDeployment(d)
return
}
// Otherwise, it's an orphan. If anything changed, sync matching controllers
// to see if anyone wants to adopt it now.
labelChanged := !reflect.DeepEqual(curRS.Labels, oldRS.Labels)
if labelChanged || controllerRefChanged {
ds := dc.getDeploymentsForReplicaSet(curRS)
if len(ds) == 0 {
return
}
klog.V(4).InfoS("Orphan ReplicaSet updated", "replicaSet", klog.KObj(curRS))
for _, d := range ds {
dc.enqueueDeployment(d)
}
}
}
|
deleteReplicaSet
这里的处理逻辑跟 ReplicaSet 控制器是一样的.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
func (dc *DeploymentController) deleteReplicaSet(obj interface{}) {
rs, ok := obj.(*apps.ReplicaSet)
// When a delete is dropped, the relist will notice a pod in the store not
// in the list, leading to the insertion of a tombstone object which contains
// the deleted key/value. Note that this value might be stale. If the ReplicaSet
// changed labels the new deployment will not be woken up till the periodic resync.
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
return
}
rs, ok = tombstone.Obj.(*apps.ReplicaSet)
if !ok {
utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a ReplicaSet %#v", obj))
return
}
}
controllerRef := metav1.GetControllerOf(rs)
if controllerRef == nil {
// No controller should care about orphans being deleted.
return
}
d := dc.resolveControllerRef(rs.Namespace, controllerRef)
if d == nil {
return
}
klog.V(4).InfoS("ReplicaSet deleted", "replicaSet", klog.KObj(rs))
dc.enqueueDeployment(d)
}
|
deletePod
监听到 pod delete 事件的处理逻辑,目的是为了区分更新策略是不是 Recreate 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
|
func (dc *DeploymentController) deletePod(obj interface{}) {
pod, ok := obj.(*v1.Pod)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
return
}
pod, ok = tombstone.Obj.(*v1.Pod)
if !ok {
utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a pod %#v", obj))
return
}
}
klog.V(4).InfoS("Pod deleted", "pod", klog.KObj(pod))
if d := dc.getDeploymentForPod(pod); d != nil && d.Spec.Strategy.Type == apps.RecreateDeploymentStrategyType {
// 等到全部 pod 都删除完了,再同步。
rsList, err := util.ListReplicaSets(d, util.RsListFromClient(dc.client.AppsV1()))
if err != nil {
return
}
podMap, err := dc.getPodMapForDeployment(d, rsList)
if err != nil {
return
}
numPods := 0
for _, podList := range podMap {
numPods += len(podList)
}
if numPods == 0 {
dc.enqueueDeployment(d)
}
}
}
// 根据 pod 获取 rs ,再根据 rs 去获取 Deployment
func (dc *DeploymentController) getDeploymentForPod(pod *v1.Pod) *apps.Deployment {
// Find the owning replica set
var rs *apps.ReplicaSet
var err error
controllerRef := metav1.GetControllerOf(pod)
if controllerRef == nil {
// No controller owns this Pod.
return nil
}
if controllerRef.Kind != apps.SchemeGroupVersion.WithKind("ReplicaSet").Kind {
// Not a pod owned by a replica set.
return nil
}
rs, err = dc.rsLister.ReplicaSets(pod.Namespace).Get(controllerRef.Name)
if err != nil || rs.UID != controllerRef.UID {
klog.V(4).InfoS("Cannot get replicaset for pod", "ownerReference", controllerRef.Name, "pod", klog.KObj(pod), "err", err)
return nil
}
// Now find the Deployment that owns that ReplicaSet.
controllerRef = metav1.GetControllerOf(rs)
if controllerRef == nil {
return nil
}
return dc.resolveControllerRef(rs.Namespace, controllerRef)
}
|
enqueue
队列有 3 种。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
func (dc *DeploymentController) enqueue(deployment *apps.Deployment) {
key, err := controller.KeyFunc(deployment)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", deployment, err))
return
}
dc.queue.Add(key)
}
func (dc *DeploymentController) enqueueRateLimited(deployment *apps.Deployment) {
key, err := controller.KeyFunc(deployment)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", deployment, err))
return
}
dc.queue.AddRateLimited(key)
}
// enqueueAfter will enqueue a deployment after the provided amount of time.
func (dc *DeploymentController) enqueueAfter(deployment *apps.Deployment, after time.Duration) {
key, err := controller.KeyFunc(deployment)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", deployment, err))
return
}
dc.queue.AddAfter(key, after)
}
|
worker
worker 这里也是启用多个协程来从队列里面获取并处理 Deployment 对象的事件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
func (dc *DeploymentController) worker() {
for dc.processNextWorkItem() {
}
}
func (dc *DeploymentController) processNextWorkItem() bool {
key, quit := dc.queue.Get()
if quit {
return false
}
defer dc.queue.Done(key)
err := dc.syncHandler(key.(string))
dc.handleErr(err, key)
return true
}
func (dc *DeploymentController) handleErr(err error, key interface{}) {
// 如果命名空间被删
if err == nil || errors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
dc.queue.Forget(key)
return
}
ns, name, keyErr := cache.SplitMetaNamespaceKey(key.(string))
// 获取唯一 key 失败
if keyErr != nil {
klog.ErrorS(err, "Failed to split meta namespace cache key", "cacheKey", key)
}
// 没有达到最大重试次数之前,重试都加入到限速队列
if dc.queue.NumRequeues(key) < maxRetries {
klog.V(2).InfoS("Error syncing deployment", "deployment", klog.KRef(ns, name), "err", err)
dc.queue.AddRateLimited(key)
return
}
// 其他错误,将 key 从队列移除
utilruntime.HandleError(err)
klog.V(2).InfoS("Dropping deployment out of the queue", "deployment", klog.KRef(ns, name), "err", err)
dc.queue.Forget(key)
}
|
syncDeployment
重点就在这一块区域,Deployment 的回滚,滚动更新,以及 pod 扩缩容的相关代码全在这了,不过源码是分开多个文件编码的,篇幅过长,这里只是精简拿出部分方法来进行分析。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
|
func (dc *DeploymentController) syncDeployment(key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
klog.ErrorS(err, "Failed to split meta namespace cache key", "cacheKey", key)
return err
}
startTime := time.Now()
klog.V(4).InfoS("Started syncing deployment", "deployment", klog.KRef(namespace, name), "startTime", startTime)
defer func() {
klog.V(4).InfoS("Finished syncing deployment", "deployment", klog.KRef(namespace, name), "duration", time.Since(startTime))
}()
deployment, err := dc.dLister.Deployments(namespace).Get(name)
// 查找不到,看看是不是被删了
if errors.IsNotFound(err) {
klog.V(2).InfoS("Deployment has been deleted", "deployment", klog.KRef(namespace, name))
return nil
}
if err != nil {
return err
}
// Deep-copy otherwise we are mutating our cache.
// TODO: Deep-copy only when needed.
d := deployment.DeepCopy()
everything := metav1.LabelSelector{}
// 这里判断 deployment 是不是去匹配了所有 pod,是的话,事件记录一下这个 deployment 通知用户设置标签,然后判断 Generation 是不是最新的,更新 status
if reflect.DeepEqual(d.Spec.Selector, &everything) {
dc.eventRecorder.Eventf(d, v1.EventTypeWarning, "SelectingAll", "This deployment is selecting all pods. A non-empty selector is required.")
if d.Status.ObservedGeneration < d.Generation {
d.Status.ObservedGeneration = d.Generation
dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(context.TODO(), d, metav1.UpdateOptions{})
}
return nil
}
// Deployment 获取所有属于它的 rs
rsList, err := dc.getReplicaSetsForDeployment(d)
if err != nil {
return err
}
// 根据 pod 先获取所有 rs ,再用一个 map[types.UID][]*v1.Pod 类型的 map ,以 rs 的 uid 为 key ,记录每个 rs 都有哪些 pod
podMap, err := dc.getPodMapForDeployment(d, rsList)
if err != nil {
return err
}
// 如果 Deployment 被删了,没什么特别的处理,就更新一下 status
if d.DeletionTimestamp != nil {
return dc.syncStatusOnly(d, rsList)
}
// 检查 deployment 是否暂停状态,期间也会检测有没有超过 progressDeadlineSeconds 部署超时时间
// progressDeadlineSeconds 默认是 600s ,在期限内没完成部署,则认为进程卡住,会打印出相关事件,如果 deployment 是暂停状态,则不做任何操作,非暂停状态会继续等待 deployment 完成部署。
if err = dc.checkPausedConditions(d); err != nil {
return err
}
// 如果 Paused 字段设置了值,则做 sync 同步操作
// 1. 先找到所有的旧 rs (根据创建时间排序)
// 2. 计算出并同步 rs 和 deployment 的 Revision 和 condition 状态 ,(这里 sync 没有涉及到新建 rs 的操作)
if d.Spec.Paused {
return dc.sync(d, rsList)
}
// 检测 Annotations 标签 "deprecated.deployment.rollback.to" 是否存在
if getRollbackTo(d) != nil {
// 回滚到指定 Revision
return dc.rollback(d, rsList)
}
// rs 的 desired != *(d.Spec.Replicas) 为 true 则任务是在进行 pod 扩缩容
scalingEvent, err := dc.isScalingEvent(d, rsList)
if err != nil {
return err
}
// 同步 dp 状态
if scalingEvent {
return dc.sync(d, rsList)
}
// 根据不同的滚动策略,执行不同的方法
switch d.Spec.Strategy.Type {
case apps.RecreateDeploymentStrategyType:
// 先把所有 rs 的 pod 数量设置为 0, 然后更新 DeploymentStatus ,再创建新的 rs ,pod 扩缩容(调用clientset update rs 的 pod 数量)
return dc.rolloutRecreate(d, rsList, podMap)
case apps.RollingUpdateDeploymentStrategyType:
// 创建新的 rs ,计算出新的 rs 要扩容的 pod 数量,和旧的 rs 要缩容的数量,调用clientset update rs 的 pod 数量
return dc.rolloutRolling(d, rsList)
}
return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type)
}
|
sync
扩缩容方法定义在这里。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
// kubernetes/pkg/controller/deployment/sync.go
func (dc *DeploymentController) sync(d *apps.Deployment, rsList []*apps.ReplicaSet) error {
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, false)
if err != nil {
return err
}
if err := dc.scale(d, newRS, oldRSs); err != nil {
// If we get an error while trying to scale, the deployment will be requeued
// so we can abort this resync
return err
}
// Clean up the deployment when it's paused and no rollback is in flight.
if d.Spec.Paused && getRollbackTo(d) == nil {
// 最后清理历史版本
if err := dc.cleanupDeployment(oldRSs, d); err != nil {
return err
}
}
allRSs := append(oldRSs, newRS)
return dc.syncDeploymentStatus(allRSs, newRS, d)
}
|
rollback
回滚的相关方法,这里只是部分代码,由于篇幅原因,一些工具类的方法没有列出。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
|
// kubernetes/pkg/controller/deployment/rollback.go
func (dc *DeploymentController) rollback(d *apps.Deployment, rsList []*apps.ReplicaSet) error {
newRS, allOldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, true)
if err != nil {
return err
}
allRSs := append(allOldRSs, newRS)
rollbackTo := getRollbackTo(d)
// 如果为 0 滚到最新的 revision
if rollbackTo.Revision == 0 {
if rollbackTo.Revision = deploymentutil.LastRevision(allRSs); rollbackTo.Revision == 0 {
// 没找到最新的 revision ,抛出事件
dc.emitRollbackWarningEvent(d, deploymentutil.RollbackRevisionNotFound, "Unable to find last revision.")
// 放弃回滚(删除 deprecated.deployment.rollback.to 标签,再更新 dp 状态)
return dc.updateDeploymentAndClearRollbackTo(d)
}
}
for _, rs := range allRSs {
v, err := deploymentutil.Revision(rs)
if err != nil {
klog.V(4).Infof("Unable to extract revision from deployment's replica set %q: %v", rs.Name, err)
continue
}
if v == rollbackTo.Revision {
klog.V(4).Infof("Found replica set %q with desired revision %d", rs.Name, v)
// 执行回滚(将匹配到的 rs pod Template 设置到 Deployment 上)
performedRollback, err := dc.rollbackToTemplate(d, rs)
if performedRollback && err == nil {
dc.emitRollbackNormalEvent(d, fmt.Sprintf("Rolled back deployment %q to revision %d", d.Name, rollbackTo.Revision))
}
return err
}
}
dc.emitRollbackWarningEvent(d, deploymentutil.RollbackRevisionNotFound, "Unable to find the revision to rollback to.")
// Gives up rollback
return dc.updateDeploymentAndClearRollbackTo(d)
}
func getRollbackTo(d *apps.Deployment) *extensions.RollbackConfig {
// 提取 Annotations 标签 "deprecated.deployment.rollback.to"
revision := d.Annotations[apps.DeprecatedRollbackTo]
if revision == "" {
return nil
}
revision64, err := strconv.ParseInt(revision, 10, 64)
if err != nil {
// If it's invalid, ignore it.
return nil
}
return &extensions.RollbackConfig{
Revision: revision64,
}
}
|
rolloutRecreate
重建更新。由于篇幅原因,一些工具类的方法没有列出。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
func (dc *DeploymentController) rolloutRecreate(d *apps.Deployment, rsList []*apps.ReplicaSet, podMap map[types.UID][]*v1.Pod) error {
// 先不创建新的 rs
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, false)
if err != nil {
return err
}
allRSs := append(oldRSs, newRS)
activeOldRSs := controller.FilterActiveReplicaSets(oldRSs)
// 把所有旧的rs缩容为 0 (等 pod 数量为 0 ,触发 dp 的同步)
scaledDown, err := dc.scaleDownOldReplicaSetsForRecreate(activeOldRSs, d)
if err != nil {
return err
}
if scaledDown {
// Update DeploymentStatus.
return dc.syncRolloutStatus(allRSs, newRS, d)
}
// 看看是不是还有 pod 在跑
if oldPodsRunning(newRS, oldRSs, podMap) {
return dc.syncRolloutStatus(allRSs, newRS, d)
}
// 这个时候就可以创建新的 rs 了
if newRS == nil {
newRS, oldRSs, err = dc.getAllReplicaSetsAndSyncRevision(d, rsList, true)
if err != nil {
return err
}
allRSs = append(oldRSs, newRS)
}
// scale up new replica set.
if _, err := dc.scaleUpNewReplicaSetForRecreate(newRS, d); err != nil {
return err
}
// 清理历史版本
if util.DeploymentComplete(d, &d.Status) {
if err := dc.cleanupDeployment(oldRSs, d); err != nil {
return err
}
}
// Sync deployment status.
return dc.syncRolloutStatus(allRSs, newRS, d)
}
|
rolloutRolling
滚动更新,由于篇幅原因,一些工具类的方法没有列出。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
func (dc *DeploymentController) rolloutRolling(d *apps.Deployment, rsList []*apps.ReplicaSet) error {
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, true)
if err != nil {
return err
}
allRSs := append(oldRSs, newRS)
// 更新 rs 的 pod 数量(创建新的 rs)
scaledUp, err := dc.reconcileNewReplicaSet(allRSs, newRS, d)
if err != nil {
return err
}
if scaledUp {
// Update DeploymentStatus
return dc.syncRolloutStatus(allRSs, newRS, d)
}
// 缩容旧的 rs 的 pod 数量
scaledDown, err := dc.reconcileOldReplicaSets(allRSs, controller.FilterActiveReplicaSets(oldRSs), newRS, d)
if err != nil {
return err
}
if scaledDown {
// Update DeploymentStatus
return dc.syncRolloutStatus(allRSs, newRS, d)
}
if deploymentutil.DeploymentComplete(d, &d.Status) {
// 清理历史版本
if err := dc.cleanupDeployment(oldRSs, d); err != nil {
return err
}
}
// Sync deployment status
return dc.syncRolloutStatus(allRSs, newRS, d)
}
|
总结
-
Deployment 控制器对于 pod 的监听只保留了 delete 事件,目的是为了区分更新策略为 Recreate 时 pod 有没有完全删除干净,再同步。
-
Deployment 只需要跟 rs 对接(调用 clientset 去更新 rs ),由 rs 来控制 pod 的数量,并且通过监听 rs 的事件来更新自身的状态。
-
Deployment 比较多的逻辑是在于它本身的滚动更新,比如回滚,重建更新,滚动更新,大部分代码都在实现这些功能。