目录

kubernetes源码-kubelet 原理和源码分析(二)

kubelet 对 Pod 的同步流程,源码为 kubernetes 的 release-1.22 分支 .

写在前面

前面我们粗略地梳理了 kubelet 的启动流程后,对 kubelet 也有了大致的了解,这也给我们接下来要看那部分代码起到了指引的作用,我们按照它的启动顺序,先来看看 kubelet 是如何对 Pod 进行同步的。

在开始之前,我们先了解一个概念,静态 pod镜像 Pod ,不然后面会一直看到 kubelet 检查 pod 的类型是否静态 pod镜像 Pod ,如果不清楚这个概念,可能会一头雾水。

注意

静态 Pod 不通过 master 节点上的 apiserver 操作及管理,直接由特定节点上的 kubelet 进程来管理的。

无法与我们常用的控制器 Deployment 或者 DaemonSet 等进行关联,它由 kubelet 进程自己来监控,当 pod 崩溃时重启该 pod ,kubelete 也无法对他们进行健康检查。

静态 pod 始终绑定在某个 kubelet ,并且始终运行在同一个节点上。

kubelet 会自动为每一个静态 pod 在 kubernetes 的 apiserver 上创建一个镜像 pod(mirrorPod),因此我们可以在 apiserver 中查询到该 pod,但是不能通过 apiserver 进行控制(例如不能删除)。

创建静态 pod 有两种方式:配置文件和 HTTP 两种方式。

syncLoopIteration

前面我们分析过, syncLoopIteration() 接口是 kubelet 的核心逻辑所在,所以这次 Pod 的操作流程,包括后面的诸如健康检测、 pleg 、同步管理、状态管理等,都是以这个接口作为出发点的。

configCh

前面我们也大致了解到 configCh 就是获取从 file/http/apiserver 这几个源过来的配置变更,并根据不同的变更类型,下发对应的回调处理函数进行对应的事件处理。

我们可以看到,这一块逻辑会对 Pod 的以下几种操作类型做出响应。

  • kubetypes.ADD

  • kubetypes.UPDATE

  • kubetypes.REMOVE

  • kubetypes.RECONCILE

  • kubetypes.DELETE

  • kubetypes.SET

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
    syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
    ...
    case u, open := <-configCh:
        // Update from a config source; dispatch it to the right handler
        // callback.
        if !open {
            klog.ErrorS(nil, "Update channel is closed, exiting the sync loop")
            return false
        }

        switch u.Op {
        case kubetypes.ADD:
            klog.V(2).InfoS("SyncLoop ADD", "source", u.Source, "pods", format.Pods(u.Pods))
            // After restarting, kubelet will get all existing pods through
            // ADD as if they are new pods. These pods will then go through the
            // admission process and *may* be rejected. This can be resolved
            // once we have checkpointing.
            handler.HandlePodAdditions(u.Pods)
        case kubetypes.UPDATE:
            klog.V(2).InfoS("SyncLoop UPDATE", "source", u.Source, "pods", format.Pods(u.Pods))
            handler.HandlePodUpdates(u.Pods)
        case kubetypes.REMOVE:
            klog.V(2).InfoS("SyncLoop REMOVE", "source", u.Source, "pods", format.Pods(u.Pods))
            handler.HandlePodRemoves(u.Pods)
        case kubetypes.RECONCILE:
            klog.V(4).InfoS("SyncLoop RECONCILE", "source", u.Source, "pods", format.Pods(u.Pods))
            handler.HandlePodReconcile(u.Pods)
        case kubetypes.DELETE:
            klog.V(2).InfoS("SyncLoop DELETE", "source", u.Source, "pods", format.Pods(u.Pods))
            // DELETE is treated as a UPDATE because of graceful deletion.
            handler.HandlePodUpdates(u.Pods)
        case kubetypes.SET:
            // TODO: Do we want to support this?
            klog.ErrorS(nil, "Kubelet does not support snapshot update")
        default:
            klog.ErrorS(nil, "Invalid operation type received", "operation", u.Op)
        }

        kl.sourcesReady.AddSource(u.Source)
    ...
}

Pod 的操作类型

除了 kubetypes.DELETE 删除操作之外,其他几个操作都会调用到 kl.dispatchWork() 接口来对 Pod 进行处理。

kubetypes.ADD

kubetypes.ADD 既是 Pod 的创建,它调用了 handler.HandlePodAdditions(u.Pods) 接口来响应 Pod 的创建操作,我们先看看 Pod 具体是怎么样被创建的。

  1. HandlePodAdditions() 接口接收的是 pods 切片,意味着可能会出现多个 pod 同时创建。

  2. pods 按创建时间排序,以此对排好序的 pods 进行循环遍历。

  3. 获取 kubelet 所在节点上现有的所有 pods 。

  4. kl.podManager.AddPod(pod) 把当前 pod 信息缓存进 podManager 的 podByUID 这个 map 里面。

  5. 检查 pod 是否 MirrorPod ,如果是 MirrorPod ,则调用 kl.handleMirrorPod(pod, start) 对 Pod 进行处理 。

  6. 无论是 MirrorPod 还是正常 Pod ,最终都是调用 kl.dispatchWork() 接口进行处理,他们的区别在于传进去的参数的不同。

  7. MirrorPod 是 静态 Pod 在 apiserver 上的一个镜像,kubelet 并不需要在节点上给他创建真实的容器,它的 ADD/UPDATE/DELETE 操作类型都被当做 UPDATE 来处理,更新它在 apiserver 上的状态。

  8. 这里比较迷惑的一点是,代码的最后,创建正常的 pod 要调用 kl.dispatchWork() 这个接口的时候,为什么还要去获取 mirrorPod 参数并传参进去,即使这个变量的结果可能是 nil ?这里看了一下,在后面的 syncPod() 接口里面会提到。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
    start := kl.clock.Now()
    sort.Sort(sliceutils.PodsByCreationTime(pods))
    for _, pod := range pods {
        existingPods := kl.podManager.GetPods()
        // Always add the pod to the pod manager. Kubelet relies on the pod
        // manager as the source of truth for the desired state. If a pod does
        // not exist in the pod manager, it means that it has been deleted in
        // the apiserver and no action (other than cleanup) is required.
        kl.podManager.AddPod(pod)

        if kubetypes.IsMirrorPod(pod) {
            kl.handleMirrorPod(pod, start)
            continue
        }

        if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
            activePods := kl.filterOutInactivePods(existingPods)

            // Check if we can admit the pod; if not, reject it.
            if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {
                kl.rejectPod(pod, reason, message)
                continue
            }
        }
        mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
        kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
    }
}

kubetypes.UPDATE

handler.HandlePodUpdates() ,先更新 pod 状态到缓存,检查 pod 是否镜像 pod ,之后跟 kubetypes.ADD 一样调用 kl.dispatchWork() 接口对 pod 进行同步,这里不再赘述。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
func (kl *Kubelet) HandlePodUpdates(pods []*v1.Pod) {
    start := kl.clock.Now()
    for _, pod := range pods {
        kl.podManager.UpdatePod(pod)
        if kubetypes.IsMirrorPod(pod) {
            kl.handleMirrorPod(pod, start)
            continue
        }
        mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
        kl.dispatchWork(pod, kubetypes.SyncPodUpdate, mirrorPod, start)
    }
}

kubetypes.REMOVE

handler.HandlePodRemoves() 接口,将 pod 状态从缓存中删除,检查 pod 是否镜像 pod ,之后调用 kl.deletePod() 将 pod 删除,这里的 kl.deletePod() 实际上是调用 podWorkers 的 kl.podWorkers.UpdatePod() 接口来完成删除的,跟调用 kl.dispatchWork() 其实相差无几。

  • kl.deletePod()

  • kl.podManager.DeletePod()

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
func (kl *Kubelet) HandlePodRemoves(pods []*v1.Pod) {
    start := kl.clock.Now()
    for _, pod := range pods {
        kl.podManager.DeletePod(pod)
        if kubetypes.IsMirrorPod(pod) {
            kl.handleMirrorPod(pod, start)
            continue
        }
        // Deletion is allowed to fail because the periodic cleanup routine
        // will trigger deletion again.
        if err := kl.deletePod(pod); err != nil {
            klog.V(2).InfoS("Failed to delete pod", "pod", klog.KObj(pod), "err", err)
        }
    }
}

kubetypes.RECONCILE

handler.HandlePodReconcile() 分 3 步走,先将 pod 状态更新到缓存中,然后判断 pod 状态,如果是重新调谐的 pod ,则调用 kl.dispatchWork() 完成同步,如果是被驱逐的,则删掉 pod 中的所有容器。

  • kl.podManager.UpdatePod()

  • kl.dispatchWork()

  • kl.containerDeletor.deleteContainersInPod()

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
func (kl *Kubelet) HandlePodReconcile(pods []*v1.Pod) {
    start := kl.clock.Now()
    for _, pod := range pods {
        // Update the pod in pod manager, status manager will do periodically reconcile according
        // to the pod manager.
        kl.podManager.UpdatePod(pod)

        // Reconcile Pod "Ready" condition if necessary. Trigger sync pod for reconciliation.
        if status.NeedToReconcilePodReadiness(pod) {
            mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
            kl.dispatchWork(pod, kubetypes.SyncPodSync, mirrorPod, start)
        }

        // After an evicted pod is synced, all dead containers in the pod can be removed.
        if eviction.PodIsEvicted(pod.Status) {
            if podStatus, err := kl.podCache.Get(pod.UID); err == nil {
                kl.containerDeletor.deleteContainersInPod("", podStatus, true)
            }
        }
    }
}

kubetypes.DELETE

直接调用 handler.HandlePodUpdates() 接口,流程跟 kubetypes.UPDATE 一样,这里不再赘述。

kl.dispatchWork()

  1. dispatchWork() 调用 kl.podWorkers.UpdatePod() 接口进行处理。

  2. 监控指标记录 pod 的容器数量

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
    // Run the sync in an async worker.
    kl.podWorkers.UpdatePod(UpdatePodOptions{
        Pod:        pod,
        MirrorPod:  mirrorPod,
        UpdateType: syncType,
        StartTime:  start,
    })
    // Note the number of containers for new pods.
    if syncType == kubetypes.SyncPodCreate {
        metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
    }
}

kl.podWorkers.UpdatePod()

  1. 处理被标记为孤儿且处于运行状态的 pod ,把它们标记为即将删除,等后续处理。

  2. 把 pod 的状态缓存到 p.podSyncStatuses[uid] 这个 map 里面。

  3. 如果能从缓存中拿到该 pod 的缓存,而且 pod 处于终止状态,而这时候 options.UpdateType == kubetypes.SyncPodCreate ,则说明使用相同 uid 的 pod 需要被重启,会被后续的程序处理。

  4. 检查 pod 是否转换到终止状态。并根据相关状态组合 podWork{} 结构体。

  5. p.managePodLoop() 给每个 pod 都创建一个 goroutine ,用于监听 pod 的更新动作并做出同步。

  6. 每次更新实际都是用 podWork{} 携带 pod 的相关信息发送到 podUpdates 通道,这个通道的另一端就是 p.managePodLoop() 在监听。

  7. 接下来,我们看看 p.managePodLoop() 的代码逻辑。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
func (p *podWorkers) UpdatePod(options UpdatePodOptions) {
    // handle when the pod is an orphan (no config) and we only have runtime status by running only
    // the terminating part of the lifecycle
    pod := options.Pod
    var isRuntimePod bool
    if options.RunningPod != nil {
        ...
    }
    uid := pod.UID

    p.podLock.Lock()
    defer p.podLock.Unlock()

    // decide what to do with this pod - we are either setting it up, tearing it down, or ignoring it
    now := time.Now()
    status, ok := p.podSyncStatuses[uid]
    ...

    // once a pod is terminating, all updates are kills and the grace period can only decrease
    var workType PodWorkType
    var wasGracePeriodShortened bool
    switch {
    case status.IsTerminated():
        ...

    // the desired work we want to be performing
    work := podWork{
        WorkType: workType,
        Options:  options,
    }

    // start the pod worker goroutine if it doesn't exist
    podUpdates, exists := p.podUpdates[uid]
    if !exists {
        podUpdates = make(chan podWork, 1)
        p.podUpdates[uid] = podUpdates

        // ensure that static pods start in the order they are received by UpdatePod
        if kubetypes.IsStaticPod(pod) {
            p.waitingToStartStaticPodsByFullname[status.fullname] =
                append(p.waitingToStartStaticPodsByFullname[status.fullname], uid)
        }

        // allow testing of delays in the pod update channel
        var outCh <-chan podWork
        if p.workerChannelFn != nil {
            outCh = p.workerChannelFn(uid, podUpdates)
        } else {
            outCh = podUpdates
        }

        go func() {
            defer runtime.HandleCrash()
            p.managePodLoop(outCh)
        }()
    }

    // dispatch a request to the pod worker if none are running
    if !status.IsWorking() {
        status.working = true
        podUpdates <- work
        return
    }
    ...
}

p.managePodLoop()

  1. 检查 pod 是否可以启动,并将 pod 标记为启动。

  2. 从缓存中根据 uid 获取 pod 的状态信息,并根据 update.WorkType 的不同调用不同的接口来同步 pod ,有以下接口。

    • p.syncPodFn() ,对新的 pod 进行同步。

    • p.syncTerminatedPodFn() ,对已退出状态的 pod 进行同步。

    • p.syncTerminatingPodFn() ,对正在退出状态的 pod 进行同步。

    • p.completeTerminated() ,当已退出状态的 pod 完成同步时调用。

  3. 根据同步类型的完成,不同的同步接口回调不同的完成接口。

    • p.completeTerminatingRuntimePod() ,当退出状态的孤儿 pod 完成同步时调用。

    • p.completeTerminating() ,当退出状态的 pod 完成同步时调用。

    • p.completeSync() ,当完成新 pod 的同步时调用。

    • p.completeWork() ,完成同步工作时调用。

  4. 下面我们先从不同的同步接口开始阅读,然后再看看对应的接口完成后的回调接口。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
func (p *podWorkers) managePodLoop(podUpdates <-chan podWork) {
    var lastSyncTime time.Time
    var podStarted bool
    for update := range podUpdates {
        pod := update.Options.Pod
        ...
            podStarted = true
        }
        ...
        var isTerminal bool
        err := func() error {
            var status *kubecontainer.PodStatus
            var err error
            switch {
            case update.Options.RunningPod != nil:
                // when we receive a running pod, we don't need status at all
            default:
                status, err = p.podCache.GetNewerThan(pod.UID, lastSyncTime)
            }
            if err != nil {
                p.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedSync, "error determining status: %v", err)
                return err
            }

            ctx := p.contextForWorker(pod.UID)

            // Take the appropriate action (illegal phases are prevented by UpdatePod)
            switch {
            case update.WorkType == TerminatedPodWork:
                err = p.syncTerminatedPodFn(ctx, pod, status)

            case update.WorkType == TerminatingPodWork:
                var gracePeriod *int64
                if opt := update.Options.KillPodOptions; opt != nil {
                    gracePeriod = opt.PodTerminationGracePeriodSecondsOverride
                }
                podStatusFn := p.acknowledgeTerminating(pod)

                err = p.syncTerminatingPodFn(ctx, pod, status, update.Options.RunningPod, gracePeriod, podStatusFn)

            default:
                isTerminal, err = p.syncPodFn(ctx, update.Options.UpdateType, pod, update.Options.MirrorPod, status)
            }

            lastSyncTime = time.Now()
            return err
        }()

        var phaseTransition bool
        switch {
        case err == context.Canceled:
            // when the context is cancelled we expect an update to already be queued
            klog.V(2).InfoS("Sync exited with context cancellation error", "pod", klog.KObj(pod), "podUID", pod.UID, "updateType", update.WorkType)

        case err != nil:
            // we will queue a retry
            klog.ErrorS(err, "Error syncing pod, skipping", "pod", klog.KObj(pod), "podUID", pod.UID)

        case update.WorkType == TerminatedPodWork:
            // we can shut down the worker
            p.completeTerminated(pod)
            if start := update.Options.StartTime; !start.IsZero() {
                metrics.PodWorkerDuration.WithLabelValues("terminated").Observe(metrics.SinceInSeconds(start))
            }
            klog.V(4).InfoS("Processing pod event done", "pod", klog.KObj(pod), "podUID", pod.UID, "updateType", update.WorkType)
            return

        case update.WorkType == TerminatingPodWork:
            // pods that don't exist in config don't need to be terminated, garbage collection will cover them
            if update.Options.RunningPod != nil {
                p.completeTerminatingRuntimePod(pod)
                if start := update.Options.StartTime; !start.IsZero() {
                    metrics.PodWorkerDuration.WithLabelValues(update.Options.UpdateType.String()).Observe(metrics.SinceInSeconds(start))
                }
                klog.V(4).InfoS("Processing pod event done", "pod", klog.KObj(pod), "podUID", pod.UID, "updateType", update.WorkType)
                return
            }
            // otherwise we move to the terminating phase
            p.completeTerminating(pod)
            phaseTransition = true

        case isTerminal:
            // if syncPod indicated we are now terminal, set the appropriate pod status to move to terminating
            klog.V(4).InfoS("Pod is terminal", "pod", klog.KObj(pod), "podUID", pod.UID, "updateType", update.WorkType)
            p.completeSync(pod)
            phaseTransition = true
        }

        // queue a retry if necessary, then put the next event in the channel if any
        p.completeWork(pod, phaseTransition, err)
        if start := update.Options.StartTime; !start.IsZero() {
            metrics.PodWorkerDuration.WithLabelValues(update.Options.UpdateType.String()).Observe(metrics.SinceInSeconds(start))
        }
        klog.V(4).InfoS("Processing pod event done", "pod", klog.KObj(pod), "podUID", pod.UID, "updateType", update.WorkType)
    }
}
p.syncPodFn()
  1. 调用 generateAPIPodStatus() 接口准备 pod 相关状态信息,并把信息更新到缓存中。

  2. 调用 kl.canRunPod() 对 pod 进行软准入校验,检查扩展资源、节点是否处于正在关闭、security context 白名单、驱逐、拓扑等信息,如果检查不通过则终止掉 pod 。

  3. 确保对已经正在运行的 pod 进行后台追踪。

  4. 如果 pod 是静态 pod ,且 mirror pod 不存在,则创建 mirror pod 。

  5. 如果 pod 的相关数据目录不存在,则为 pod 创建相关数据目录。

  6. 等待 volumes 挂载/卸载,拉取 secrets 或 configmap ,获取镜像拉取密钥。

  7. 调用容器运行时的 kl.containerRuntime.SyncPod() 接口创建容器。

  8. 更新 pod 的 ingress 和 egress 流量限制。

  9. 如果流程中有任何一部步骤出错,则返回错误,等待下一次的 pod 同步。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (isTerminal bool, err error) {
    ...
    // Generate final API pod status with pod and status manager status
    apiPodStatus := kl.generateAPIPodStatus(pod, podStatus)
    podStatus.IPs = make([]string, 0, len(apiPodStatus.PodIPs))
    for _, ipInfo := range apiPodStatus.PodIPs {
        podStatus.IPs = append(podStatus.IPs, ipInfo.IP)
    }
    if len(podStatus.IPs) == 0 && len(apiPodStatus.PodIP) > 0 {
        podStatus.IPs = []string{apiPodStatus.PodIP}
    }

    // If the pod is terminal, we don't need to continue to setup the pod
    if apiPodStatus.Phase == v1.PodSucceeded || apiPodStatus.Phase == v1.PodFailed {
        kl.statusManager.SetPodStatus(pod, apiPodStatus)
        isTerminal = true
        return isTerminal, nil
    }

    runnable := kl.canRunPod(pod)
    if !runnable.Admit {
        // Pod is not runnable; and update the Pod and Container statuses to why.
        if apiPodStatus.Phase != v1.PodFailed && apiPodStatus.Phase != v1.PodSucceeded {
            apiPodStatus.Phase = v1.PodPending
        }
        apiPodStatus.Reason = runnable.Reason
        apiPodStatus.Message = runnable.Message
        // Waiting containers are not creating.
        const waitingReason = "Blocked"
        for _, cs := range apiPodStatus.InitContainerStatuses {
            if cs.State.Waiting != nil {
                cs.State.Waiting.Reason = waitingReason
            }
        }
        for _, cs := range apiPodStatus.ContainerStatuses {
            if cs.State.Waiting != nil {
                cs.State.Waiting.Reason = waitingReason
            }
        }
    }

    // Record the time it takes for the pod to become running.
    existingStatus, ok := kl.statusManager.GetPodStatus(pod.UID)
    if !ok || existingStatus.Phase == v1.PodPending && apiPodStatus.Phase == v1.PodRunning &&
        !firstSeenTime.IsZero() {
        metrics.PodStartDuration.Observe(metrics.SinceInSeconds(firstSeenTime))
    }

    kl.statusManager.SetPodStatus(pod, apiPodStatus)

    // Pods that are not runnable must be stopped - return a typed error to the pod worker
    if !runnable.Admit {
        klog.V(2).InfoS("Pod is not runnable and must have running containers stopped", "pod", klog.KObj(pod), "podUID", pod.UID, "message", runnable.Message)
        var syncErr error
        p := kubecontainer.ConvertPodStatusToRunningPod(kl.getRuntime().Type(), podStatus)
        if err := kl.killPod(pod, p, nil); err != nil {
            kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err)
            syncErr = fmt.Errorf("error killing pod: %v", err)
            utilruntime.HandleError(syncErr)
        } else {
            // There was no error killing the pod, but the pod cannot be run.
            // Return an error to signal that the sync loop should back off.
            syncErr = fmt.Errorf("pod cannot be run: %s", runnable.Message)
        }
        return false, syncErr
    }

    // If the network plugin is not ready, only start the pod if it uses the host network
    if err := kl.runtimeState.networkErrors(); err != nil && !kubecontainer.IsHostNetworkPod(pod) {
        kl.recorder.Eventf(pod, v1.EventTypeWarning, events.NetworkNotReady, "%s: %v", NetworkNotReadyErrorMsg, err)
        return false, fmt.Errorf("%s: %v", NetworkNotReadyErrorMsg, err)
    }

    // ensure the kubelet knows about referenced secrets or configmaps used by the pod
    if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
        if kl.secretManager != nil {
            kl.secretManager.RegisterPod(pod)
        }
        if kl.configMapManager != nil {
            kl.configMapManager.RegisterPod(pod)
        }
    }

    pcm := kl.containerManager.NewPodContainerManager()
    if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
        firstSync := true
        for _, containerStatus := range apiPodStatus.ContainerStatuses {
            if containerStatus.State.Running != nil {
                firstSync = false
                break
            }
        }
        // Don't kill containers in pod if pod's cgroups already
        // exists or the pod is running for the first time
        podKilled := false
        if !pcm.Exists(pod) && !firstSync {
            p := kubecontainer.ConvertPodStatusToRunningPod(kl.getRuntime().Type(), podStatus)
            if err := kl.killPod(pod, p, nil); err == nil {
                podKilled = true
            } else {
                klog.ErrorS(err, "KillPod failed", "pod", klog.KObj(pod), "podStatus", podStatus)
            }
        }
        if !(podKilled && pod.Spec.RestartPolicy == v1.RestartPolicyNever) {
            if !pcm.Exists(pod) {
                if err := kl.containerManager.UpdateQOSCgroups(); err != nil {
                    klog.V(2).InfoS("Failed to update QoS cgroups while syncing pod", "pod", klog.KObj(pod), "err", err)
                }
                if err := pcm.EnsureExists(pod); err != nil {
                    kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToCreatePodContainer, "unable to ensure pod container exists: %v", err)
                    return false, fmt.Errorf("failed to ensure that the pod: %v cgroups exist and are correctly applied: %v", pod.UID, err)
                }
            }
        }
    }

    // Create Mirror Pod for Static Pod if it doesn't already exist
    if kubetypes.IsStaticPod(pod) {
        deleted := false
        if mirrorPod != nil {
            if mirrorPod.DeletionTimestamp != nil || !kl.podManager.IsMirrorPodOf(mirrorPod, pod) {
                // The mirror pod is semantically different from the static pod. Remove
                // it. The mirror pod will get recreated later.
                klog.InfoS("Trying to delete pod", "pod", klog.KObj(pod), "podUID", mirrorPod.ObjectMeta.UID)
                podFullName := kubecontainer.GetPodFullName(pod)
                var err error
                deleted, err = kl.podManager.DeleteMirrorPod(podFullName, &mirrorPod.ObjectMeta.UID)
                if deleted {
                    klog.InfoS("Deleted mirror pod because it is outdated", "pod", klog.KObj(mirrorPod))
                } else if err != nil {
                    klog.ErrorS(err, "Failed deleting mirror pod", "pod", klog.KObj(mirrorPod))
                }
            }
        }
        if mirrorPod == nil || deleted {
            node, err := kl.GetNode()
            if err != nil || node.DeletionTimestamp != nil {
                klog.V(4).InfoS("No need to create a mirror pod, since node has been removed from the cluster", "node", klog.KRef("", string(kl.nodeName)))
            } else {
                klog.V(4).InfoS("Creating a mirror pod for static pod", "pod", klog.KObj(pod))
                if err := kl.podManager.CreateMirrorPod(pod); err != nil {
                    klog.ErrorS(err, "Failed creating a mirror pod for", "pod", klog.KObj(pod))
                }
            }
        }
    }

    // Make data directories for the pod
    if err := kl.makePodDataDirs(pod); err != nil {
        kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToMakePodDataDirectories, "error making pod data directories: %v", err)
        klog.ErrorS(err, "Unable to make pod data directories for pod", "pod", klog.KObj(pod))
        return false, err
    }

    // Volume manager will not mount volumes for terminating pods
    // TODO: once context cancellation is added this check can be removed
    if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
        // Wait for volumes to attach/mount
        if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil {
            kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedMountVolume, "Unable to attach or mount volumes: %v", err)
            klog.ErrorS(err, "Unable to attach or mount volumes for pod; skipping pod", "pod", klog.KObj(pod))
            return false, err
        }
    }

    // Fetch the pull secrets for the pod
    pullSecrets := kl.getPullSecretsForPod(pod)

    // Ensure the pod is being probed
    kl.probeManager.AddPod(pod)

    // Call the container runtime's SyncPod callback
    result := kl.containerRuntime.SyncPod(pod, podStatus, pullSecrets, kl.backOff)
    kl.reasonCache.Update(pod.UID, result)
    if err := result.Error(); err != nil {
        // Do not return error if the only failures were pods in backoff
        for _, r := range result.SyncResults {
            if r.Error != kubecontainer.ErrCrashLoopBackOff && r.Error != images.ErrImagePullBackOff {
                // Do not record an event here, as we keep all event logging for sync pod failures
                // local to container runtime, so we get better errors.
                return false, err
            }
        }

        return false, nil
    }

    return false, nil
}
p.syncTerminatedPodFn()

清除 pod ,且该 pod 已经没有任何处于运行状态的容器,dockershim 会后台回收 pod 的相关资源,期间还会持续接收日志,直到 pod 从节点上物理消失。

  1. 卸载存储卷。

  2. 注销 secrets 和 configmap 。

  3. 移除 cgroup 资源限制,回收资源。

  4. 标注 pod 的最终状态。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
func (kl *Kubelet) syncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus) error {
    klog.V(4).InfoS("syncTerminatedPod enter", "pod", klog.KObj(pod), "podUID", pod.UID)
    defer klog.V(4).InfoS("syncTerminatedPod exit", "pod", klog.KObj(pod), "podUID", pod.UID)

    // generate the final status of the pod
    // TODO: should we simply fold this into TerminatePod? that would give a single pod update
    apiPodStatus := kl.generateAPIPodStatus(pod, podStatus)
    kl.statusManager.SetPodStatus(pod, apiPodStatus)

    // volumes are unmounted after the pod worker reports ShouldPodRuntimeBeRemoved (which is satisfied
    // before syncTerminatedPod is invoked)
    if err := kl.volumeManager.WaitForUnmount(pod); err != nil {
        return err
    }
    klog.V(4).InfoS("Pod termination unmounted volumes", "pod", klog.KObj(pod), "podUID", pod.UID)

    // After volume unmount is complete, let the secret and configmap managers know we're done with this pod
    if kl.secretManager != nil {
        kl.secretManager.UnregisterPod(pod)
    }
    if kl.configMapManager != nil {
        kl.configMapManager.UnregisterPod(pod)
    }

    // Note: we leave pod containers to be reclaimed in the background since dockershim requires the
    // container for retrieving logs and we want to make sure logs are available until the pod is
    // physically deleted.

    // remove any cgroups in the hierarchy for pods that are no longer running.
    if kl.cgroupsPerQOS {
        pcm := kl.containerManager.NewPodContainerManager()
        name, _ := pcm.GetPodContainerName(pod)
        if err := pcm.Destroy(name); err != nil {
            return err
        }
        klog.V(4).InfoS("Pod termination removed cgroups", "pod", klog.KObj(pod), "podUID", pod.UID)
    }

    // mark the final pod status
    kl.statusManager.TerminatePod(pod)
    klog.V(4).InfoS("Pod is terminated and will need no more status updates", "pod", klog.KObj(pod), "podUID", pod.UID)

    return nil
}
p.syncTerminatingPodFn()
  1. 调用 kl.killPod() 容器运行时终止该 pod 上的所有容器,该接口会在容器完全终止前阻塞住。

  2. 停止 iveness 和 startup 探针。

  3. 更新 pod 状态到缓存中,检查是否所有容器都已经终止。

  4. 如果收到错误返回,则会在下一次循环继续。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
func (kl *Kubelet) syncTerminatingPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, runningPod *kubecontainer.Pod, gracePeriod *int64, podStatusFn func(*v1.PodStatus)) error {
    klog.V(4).InfoS("syncTerminatingPod enter", "pod", klog.KObj(pod), "podUID", pod.UID)
    defer klog.V(4).InfoS("syncTerminatingPod exit", "pod", klog.KObj(pod), "podUID", pod.UID)

    // when we receive a runtime only pod (runningPod != nil) we don't need to update the status
    // manager or refresh the status of the cache, because a successful killPod will ensure we do
    // not get invoked again
    if runningPod != nil {
        // we kill the pod with the specified grace period since this is a termination
        if gracePeriod != nil {
            klog.V(4).InfoS("Pod terminating with grace period", "pod", klog.KObj(pod), "podUID", pod.UID, "gracePeriod", *gracePeriod)
        } else {
            klog.V(4).InfoS("Pod terminating with grace period", "pod", klog.KObj(pod), "podUID", pod.UID, "gracePeriod", nil)
        }
        if err := kl.killPod(pod, *runningPod, gracePeriod); err != nil {
            kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err)
            // there was an error killing the pod, so we return that error directly
            utilruntime.HandleError(err)
            return err
        }
        klog.V(4).InfoS("Pod termination stopped all running orphan containers", "pod", klog.KObj(pod), "podUID", pod.UID)
        return nil
    }

    apiPodStatus := kl.generateAPIPodStatus(pod, podStatus)
    if podStatusFn != nil {
        podStatusFn(&apiPodStatus)
    }
    kl.statusManager.SetPodStatus(pod, apiPodStatus)

    if gracePeriod != nil {
        klog.V(4).InfoS("Pod terminating with grace period", "pod", klog.KObj(pod), "podUID", pod.UID, "gracePeriod", *gracePeriod)
    } else {
        klog.V(4).InfoS("Pod terminating with grace period", "pod", klog.KObj(pod), "podUID", pod.UID, "gracePeriod", nil)
    }

    kl.probeManager.StopLivenessAndStartup(pod)

    p := kubecontainer.ConvertPodStatusToRunningPod(kl.getRuntime().Type(), podStatus)
    if err := kl.killPod(pod, p, gracePeriod); err != nil {
        kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err)
        // there was an error killing the pod, so we return that error directly
        utilruntime.HandleError(err)
        return err
    }

    // Once the containers are stopped, we can stop probing for liveness and readiness.
    // TODO: once a pod is terminal, certain probes (liveness exec) could be stopped immediately after
    //   the detection of a container shutdown or (for readiness) after the first failure. Tracked as
    //   https://github.com/kubernetes/kubernetes/issues/107894 although may not be worth optimizing.
    kl.probeManager.RemovePod(pod)

    // Guard against consistency issues in KillPod implementations by checking that there are no
    // running containers. This method is invoked infrequently so this is effectively free and can
    // catch race conditions introduced by callers updating pod status out of order.
    // TODO: have KillPod return the terminal status of stopped containers and write that into the
    //  cache immediately
    podStatus, err := kl.containerRuntime.GetPodStatus(pod.UID, pod.Name, pod.Namespace)
    if err != nil {
        klog.ErrorS(err, "Unable to read pod status prior to final pod termination", "pod", klog.KObj(pod), "podUID", pod.UID)
        return err
    }
    var runningContainers []string
    var containers []string
    for _, s := range podStatus.ContainerStatuses {
        if s.State == kubecontainer.ContainerStateRunning {
            runningContainers = append(runningContainers, s.ID.String())
        }
        containers = append(containers, fmt.Sprintf("(%s state=%s exitCode=%d finishedAt=%s)", s.Name, s.State, s.ExitCode, s.FinishedAt.UTC().Format(time.RFC3339Nano)))
    }
    if klog.V(4).Enabled() {
        sort.Strings(containers)
        klog.InfoS("Post-termination container state", "pod", klog.KObj(pod), "podUID", pod.UID, "containers", strings.Join(containers, " "))
    }
    if len(runningContainers) > 0 {
        return fmt.Errorf("detected running containers after a successful KillPod, CRI violation: %v", runningContainers)
    }

    // we have successfully stopped all containers, the pod is terminating, our status is "done"
    klog.V(4).InfoS("Pod termination stopped all running containers", "pod", klog.KObj(pod), "podUID", pod.UID)

    return nil
}
p.completeTerminated()

这一步是终止 podWorker ,关停 PodUpdates channel 通道,并将 pod 从缓存中移除。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (p *podWorkers) completeTerminated(pod *v1.Pod) {
    p.podLock.Lock()
    defer p.podLock.Unlock()

    klog.V(4).InfoS("Pod is complete and the worker can now stop", "pod", klog.KObj(pod), "podUID", pod.UID)

    p.cleanupPodUpdates(pod.UID)

    if status, ok := p.podSyncStatuses[pod.UID]; ok {
        if status.terminatingAt.IsZero() {
            klog.V(4).InfoS("Pod worker is complete but did not have terminatingAt set, likely programmer error", "pod", klog.KObj(pod), "podUID", pod.UID)
        }
        if status.terminatedAt.IsZero() {
            klog.V(4).InfoS("Pod worker is complete but did not have terminatedAt set, likely programmer error", "pod", klog.KObj(pod), "podUID", pod.UID)
        }
        status.finished = true
        status.working = false

        if p.startedStaticPodsByFullname[status.fullname] == pod.UID {
            delete(p.startedStaticPodsByFullname, status.fullname)
        }
    }
}
p.completeTerminatingRuntimePod()

这一步主要是用来清理孤儿 pod 的,更新 pod 相关状态,并确保不被后面的同步逻辑同步,确保能被 kubelet 始别到它已经没有任何容器运行了。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
func (p *podWorkers) completeTerminatingRuntimePod(pod *v1.Pod) {
    p.podLock.Lock()
    defer p.podLock.Unlock()

    klog.V(4).InfoS("Pod terminated all orphaned containers successfully and worker can now stop", "pod", klog.KObj(pod), "podUID", pod.UID)

    if status, ok := p.podSyncStatuses[pod.UID]; ok {
        if status.terminatingAt.IsZero() {
            klog.V(4).InfoS("Pod worker was terminated but did not have terminatingAt set, likely programmer error", "pod", klog.KObj(pod), "podUID", pod.UID)
        }
        status.terminatedAt = time.Now()
        status.finished = true
        status.working = false

        if p.startedStaticPodsByFullname[status.fullname] == pod.UID {
            delete(p.startedStaticPodsByFullname, status.fullname)
        }
    }

    p.cleanupPodUpdates(pod.UID)
}
p.completeTerminating()

这一意味着 pod 上的容器已经全部终止,且容器不会在后面的同步逻辑中继续启动了,更新 pod 相关状态,准备被后续的清理逻辑接管,并确保不被后面的同步逻辑同步,确保能被 kubelet 始别到它已经没有任何容器运行了。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (p *podWorkers) completeTerminating(pod *v1.Pod) {
    p.podLock.Lock()
    defer p.podLock.Unlock()

    klog.V(4).InfoS("Pod terminated all containers successfully", "pod", klog.KObj(pod), "podUID", pod.UID)

    if status, ok := p.podSyncStatuses[pod.UID]; ok {
        if status.terminatingAt.IsZero() {
            klog.V(4).InfoS("Pod worker was terminated but did not have terminatingAt set, likely programmer error", "pod", klog.KObj(pod), "podUID", pod.UID)
        }
        status.terminatedAt = time.Now()
        for _, ch := range status.notifyPostTerminating {
            close(ch)
        }
        status.notifyPostTerminating = nil
        status.statusPostTerminating = nil
    }

    p.lastUndeliveredWorkUpdate[pod.UID] = podWork{
        WorkType: TerminatedPodWork,
        Options: UpdatePodOptions{
            Pod: pod,
        },
    }
}
p.completeSync()

这一步是处理在自然的生命周期内,pod 被终止了。类似驱逐、api驱使删除等非自然生命周期的处理,是走 UpdatePod() 这个接口的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (p *podWorkers) completeSync(pod *v1.Pod) {
    p.podLock.Lock()
    defer p.podLock.Unlock()

    klog.V(4).InfoS("Pod indicated lifecycle completed naturally and should now terminate", "pod", klog.KObj(pod), "podUID", pod.UID)

    if status, ok := p.podSyncStatuses[pod.UID]; ok {
        if status.terminatingAt.IsZero() {
            status.terminatingAt = time.Now()
        } else {
            klog.V(4).InfoS("Pod worker attempted to set terminatingAt twice, likely programmer error", "pod", klog.KObj(pod), "podUID", pod.UID)
        }
        status.startedTerminating = true
    }

    p.lastUndeliveredWorkUpdate[pod.UID] = podWork{
        WorkType: TerminatingPodWork,
        Options: UpdatePodOptions{
            Pod: pod,
        },
    }
}
p.completeWork()

当 pod 同步出现错误时调用该接口,接口逻辑是检查错误类型,根据错误类型将 pod 立即或者按一定时间间隔重新加入到同步队列,直到 pod 完成同步。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
func (p *podWorkers) completeWork(pod *v1.Pod, phaseTransition bool, syncErr error) {
    // Requeue the last update if the last sync returned error.
    switch {
    case phaseTransition:
        p.workQueue.Enqueue(pod.UID, 0)
    case syncErr == nil:
        // No error; requeue at the regular resync interval.
        p.workQueue.Enqueue(pod.UID, wait.Jitter(p.resyncInterval, workerResyncIntervalJitterFactor))
    case strings.Contains(syncErr.Error(), NetworkNotReadyErrorMsg):
        // Network is not ready; back off for short period of time and retry as network might be ready soon.
        p.workQueue.Enqueue(pod.UID, wait.Jitter(backOffOnTransientErrorPeriod, workerBackOffPeriodJitterFactor))
    default:
        // Error occurred during the sync; back off and then retry.
        p.workQueue.Enqueue(pod.UID, wait.Jitter(p.backOffPeriod, workerBackOffPeriodJitterFactor))
    }
    p.completeWorkQueueNext(pod.UID)
}

调用关系图

各个接口函数之间不停穿插,很容易把人搞晕,为了加强理解,我们整理了一下接口之间的调用关系。

/kubernetes%E6%BA%90%E7%A0%81-kubelet-%E5%8E%9F%E7%90%86%E5%92%8C%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90%E4%BA%8C/syncLoopIteration.png
调用关系图

总结

  1. 根据接收到不同的同步类型信号,kubelet 分别对他们做出不同的响应。

  2. 核心的点在 podWorker ,它为每个 pod 都启动一个 goroutine 来同时对多个 pod 进行同步,直到 pod 生命周期的结束为止。

  3. 根据 podStatus 也扮演者重要角色,kubelet 根据 pod 的不同状态,对 pod 做出不同的响应。

  4. 我们这一次仅仅只是数量了 kubelet 对 pod 的同步流程,具体 pod 是怎么被创建的被我们一笔带过,下期我们重点来看看 pod 创建容器的流程。