目录

kubernetes源码-kubelet 原理和源码分析(三)

kubelet 是如何创建 pod 下的容器的,源码为 kubernetes 的 release-1.22 分支 .

写在前面

上一篇的 pod 同步流程,我们只是简单分析了 pod 的同步函数 syncLoopIteration() ,该函数涵盖了 pod 的整个生命周期每个阶段的同步。不过因为篇幅原因,我们没有展开太多来讲。

这一篇我们主要看看 pod 容器是怎么被创建的。

调用关系全景图

有了前面的基础以后,我再来看 pod 容器的创建过程就稍微简单一点,在讲容器创建过程我先针对前面的 pod 同步过程再稍微展开一点说一下,因为后面的篇幅可能会涉及到这些过程。

  1. kubelet 从 configCh 通道接受到创建 pod 的指令消息后,通过 UpdatePod() 接口创建 pod 和容器,这一步流程的最后,会把 pod id 加入到工作队列,供后面 syncCh 同步用。

  2. 创建容器后会通过 statusManager 将状态上报给 apiserver 。

  3. 同时,PLEG 也会以每秒一次的频率对 kubelet 上的所有 pod 的所有容器状态进行检测(虽然是每秒一次,但是实际场景可能要远超于 1 秒的时间间隔),如果状态有更新,则调用 UpdatePod() 接口对 pod 继续进行同步处理。

  4. syncCh 每秒从工作队列获取一批需要被同步的 pod 实例,并对调用 UpdatePod() 接口对 pod 进行同步处理。这里有个点需要注意,第一步那里,被加入到工作队列的 pod 会携带一个 10 秒的延迟时间, syncCh 获取这些实例的时候,会检查这些 pod 的延时是不是已经过时了,是的话才取出来做同步。

/kubernetes%E6%BA%90%E7%A0%81-kubelet-%E5%8E%9F%E7%90%86%E5%92%8C%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90%E4%B8%89/kubelet-pod-workflow.png
调用关系全景图

容器创建步骤

言归正传,通过前面的阅读,我们已经知道,真正创建容器的接口实际就在 syncPod() 里面,通过调用容器运行时的 SyncPod() 接口,来完成容器的创建,代码如下。

1
2
3
4
...
// Call the container runtime's SyncPod callback
    result := kl.containerRuntime.SyncPod(pod, podStatus, pullSecrets, kl.backOff)
...

我们跳转到 pkg/kubelet/kuberuntime/kuberuntime_manager.go 文件,这里就是调用容器运行时 SyncPod() 接口的源码所在,很长,我们先总结一下他的步骤。

  1. 计算 sandbox 容器和普通容器的状态有没有变更。

  2. 如果必要,kill 掉 sandbox 容器重建。

  3. 如果容器不该被运行,则 kill 掉 pod 下的所有容器。

  4. 如果需要,创建 sandbox 容器。

  5. 创建临时容器。

  6. 创建初始化容器 initContainers 。

  7. 创建普通容器。

我们接下来,在根据上面的每一个步骤,来分析这些步骤的具体内容。

注意

sanbox 容器到底是什么东西?kubelet 创建 pod 的时候,你通过 docker ps 去看,会发现它使用 pause 镜像也跟着创建一个同名容器。

1
2
57f7fc7cf97b        605c77e624dd                                               "/docker-entrypoint.…"   14 hours ago        Up 14 hours                                                       k8s_nginx_octopus-deployment-549545bf46-x9cqm_default_c799bb7f-d5d9-41bd-ba60-9a968f0fac54_0
4e0c7fb21e49        registry.aliyuncs.com/google_containers/pause:3.2          "/pause"                 14 hours ago        Up 14 hours                                                       k8s_POD_octopus-deployment-549545bf46-x9cqm_default_c799bb7f-d5d9-41bd-ba60-9a968f0fac54_0

它是用来为 pod 下的一组容器创建各种 namespace 环境和资源隔离用的, pod 下的各个容器就可以在这个隔离的环境里面共享各种资源。使得容器无需像 kvm 一样,需要创建一个操作系统实例来对资源进行隔离,这样就可以很好地利用到宿主机的各种资源,这也就是 kubernetes 的精髓所在。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
    // Step 1: Compute sandbox and container changes.
    podContainerChanges := m.computePodActions(pod, podStatus)
    klog.V(3).InfoS("computePodActions got for pod", "podActions", podContainerChanges, "pod", klog.KObj(pod))
    if podContainerChanges.CreateSandbox {
        ref, err := ref.GetReference(legacyscheme.Scheme, pod)
        if err != nil {
            klog.ErrorS(err, "Couldn't make a ref to pod", "pod", klog.KObj(pod))
        }
        if podContainerChanges.SandboxID != "" {
            m.recorder.Eventf(ref, v1.EventTypeNormal, events.SandboxChanged, "Pod sandbox changed, it will be killed and re-created.")
        } else {
            klog.V(4).InfoS("SyncPod received new pod, will create a sandbox for it", "pod", klog.KObj(pod))
        }
    }

    // Step 2: Kill the pod if the sandbox has changed.
    if podContainerChanges.KillPod {
        if podContainerChanges.CreateSandbox {
            klog.V(4).InfoS("Stopping PodSandbox for pod, will start new one", "pod", klog.KObj(pod))
        } else {
            klog.V(4).InfoS("Stopping PodSandbox for pod, because all other containers are dead", "pod", klog.KObj(pod))
        }

        killResult := m.killPodWithSyncResult(pod, kubecontainer.ConvertPodStatusToRunningPod(m.runtimeName, podStatus), nil)
        result.AddPodSyncResult(killResult)
        if killResult.Error() != nil {
            klog.ErrorS(killResult.Error(), "killPodWithSyncResult failed")
            return
        }

        if podContainerChanges.CreateSandbox {
            m.purgeInitContainers(pod, podStatus)
        }
    } else {
        // Step 3: kill any running containers in this pod which are not to keep.
        for containerID, containerInfo := range podContainerChanges.ContainersToKill {
            klog.V(3).InfoS("Killing unwanted container for pod", "containerName", containerInfo.name, "containerID", containerID, "pod", klog.KObj(pod))
            killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, containerInfo.name)
            result.AddSyncResult(killContainerResult)
            if err := m.killContainer(pod, containerID, containerInfo.name, containerInfo.message, containerInfo.reason, nil); err != nil {
                killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error())
                klog.ErrorS(err, "killContainer for pod failed", "containerName", containerInfo.name, "containerID", containerID, "pod", klog.KObj(pod))
                return
            }
        }
    }

    // Keep terminated init containers fairly aggressively controlled
    // This is an optimization because container removals are typically handled
    // by container garbage collector.
    m.pruneInitContainersBeforeStart(pod, podStatus)

    // We pass the value of the PRIMARY podIP and list of podIPs down to
    // generatePodSandboxConfig and generateContainerConfig, which in turn
    // passes it to various other functions, in order to facilitate functionality
    // that requires this value (hosts file and downward API) and avoid races determining
    // the pod IP in cases where a container requires restart but the
    // podIP isn't in the status manager yet. The list of podIPs is used to
    // generate the hosts file.
    //
    // We default to the IPs in the passed-in pod status, and overwrite them if the
    // sandbox needs to be (re)started.
    var podIPs []string
    if podStatus != nil {
        podIPs = podStatus.IPs
    }

    // Step 4: Create a sandbox for the pod if necessary.
    podSandboxID := podContainerChanges.SandboxID
    if podContainerChanges.CreateSandbox {
        var msg string
        var err error

        klog.V(4).InfoS("Creating PodSandbox for pod", "pod", klog.KObj(pod))
        metrics.StartedPodsTotal.Inc()
        createSandboxResult := kubecontainer.NewSyncResult(kubecontainer.CreatePodSandbox, format.Pod(pod))
        result.AddSyncResult(createSandboxResult)
        podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt)
        if err != nil {
            // createPodSandbox can return an error from CNI, CSI,
            // or CRI if the Pod has been deleted while the POD is
            // being created. If the pod has been deleted then it's
            // not a real error.
            //
            // SyncPod can still be running when we get here, which
            // means the PodWorker has not acked the deletion.
            if m.podStateProvider.IsPodTerminationRequested(pod.UID) {
                klog.V(4).InfoS("Pod was deleted and sandbox failed to be created", "pod", klog.KObj(pod), "podUID", pod.UID)
                return
            }
            metrics.StartedPodsErrorsTotal.Inc()
            createSandboxResult.Fail(kubecontainer.ErrCreatePodSandbox, msg)
            klog.ErrorS(err, "CreatePodSandbox for pod failed", "pod", klog.KObj(pod))
            ref, referr := ref.GetReference(legacyscheme.Scheme, pod)
            if referr != nil {
                klog.ErrorS(referr, "Couldn't make a ref to pod", "pod", klog.KObj(pod))
            }
            m.recorder.Eventf(ref, v1.EventTypeWarning, events.FailedCreatePodSandBox, "Failed to create pod sandbox: %v", err)
            return
        }
        klog.V(4).InfoS("Created PodSandbox for pod", "podSandboxID", podSandboxID, "pod", klog.KObj(pod))

        podSandboxStatus, err := m.runtimeService.PodSandboxStatus(podSandboxID)
        if err != nil {
            ref, referr := ref.GetReference(legacyscheme.Scheme, pod)
            if referr != nil {
                klog.ErrorS(referr, "Couldn't make a ref to pod", "pod", klog.KObj(pod))
            }
            m.recorder.Eventf(ref, v1.EventTypeWarning, events.FailedStatusPodSandBox, "Unable to get pod sandbox status: %v", err)
            klog.ErrorS(err, "Failed to get pod sandbox status; Skipping pod", "pod", klog.KObj(pod))
            result.Fail(err)
            return
        }

        // If we ever allow updating a pod from non-host-network to
        // host-network, we may use a stale IP.
        if !kubecontainer.IsHostNetworkPod(pod) {
            // Overwrite the podIPs passed in the pod status, since we just started the pod sandbox.
            podIPs = m.determinePodSandboxIPs(pod.Namespace, pod.Name, podSandboxStatus)
            klog.V(4).InfoS("Determined the ip for pod after sandbox changed", "IPs", podIPs, "pod", klog.KObj(pod))
        }
    }

    // the start containers routines depend on pod ip(as in primary pod ip)
    // instead of trying to figure out if we have 0 < len(podIPs)
    // everytime, we short circuit it here
    podIP := ""
    if len(podIPs) != 0 {
        podIP = podIPs[0]
    }

    // Get podSandboxConfig for containers to start.
    configPodSandboxResult := kubecontainer.NewSyncResult(kubecontainer.ConfigPodSandbox, podSandboxID)
    result.AddSyncResult(configPodSandboxResult)
    podSandboxConfig, err := m.generatePodSandboxConfig(pod, podContainerChanges.Attempt)
    if err != nil {
        message := fmt.Sprintf("GeneratePodSandboxConfig for pod %q failed: %v", format.Pod(pod), err)
        klog.ErrorS(err, "GeneratePodSandboxConfig for pod failed", "pod", klog.KObj(pod))
        configPodSandboxResult.Fail(kubecontainer.ErrConfigPodSandbox, message)
        return
    }

    // Helper containing boilerplate common to starting all types of containers.
    // typeName is a description used to describe this type of container in log messages,
    // currently: "container", "init container" or "ephemeral container"
    // metricLabel is the label used to describe this type of container in monitoring metrics.
    // currently: "container", "init_container" or "ephemeral_container"
    start := func(typeName, metricLabel string, spec *startSpec) error {
        startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, spec.container.Name)
        result.AddSyncResult(startContainerResult)

        isInBackOff, msg, err := m.doBackOff(pod, spec.container, podStatus, backOff)
        if isInBackOff {
            startContainerResult.Fail(err, msg)
            klog.V(4).InfoS("Backing Off restarting container in pod", "containerType", typeName, "container", spec.container, "pod", klog.KObj(pod))
            return err
        }

        metrics.StartedContainersTotal.WithLabelValues(metricLabel).Inc()
        klog.V(4).InfoS("Creating container in pod", "containerType", typeName, "container", spec.container, "pod", klog.KObj(pod))
        // NOTE (aramase) podIPs are populated for single stack and dual stack clusters. Send only podIPs.
        if msg, err := m.startContainer(podSandboxID, podSandboxConfig, spec, pod, podStatus, pullSecrets, podIP, podIPs); err != nil {
            // startContainer() returns well-defined error codes that have reasonable cardinality for metrics and are
            // useful to cluster administrators to distinguish "server errors" from "user errors".
            metrics.StartedContainersErrorsTotal.WithLabelValues(metricLabel, err.Error()).Inc()
            startContainerResult.Fail(err, msg)
            // known errors that are logged in other places are logged at higher levels here to avoid
            // repetitive log spam
            switch {
            case err == images.ErrImagePullBackOff:
                klog.V(3).InfoS("Container start failed in pod", "containerType", typeName, "container", spec.container, "pod", klog.KObj(pod), "containerMessage", msg, "err", err)
            default:
                utilruntime.HandleError(fmt.Errorf("%v %+v start failed in pod %v: %v: %s", typeName, spec.container, format.Pod(pod), err, msg))
            }
            return err
        }

        return nil
    }

    // Step 5: start ephemeral containers
    // These are started "prior" to init containers to allow running ephemeral containers even when there
    // are errors starting an init container. In practice init containers will start first since ephemeral
    // containers cannot be specified on pod creation.
    if utilfeature.DefaultFeatureGate.Enabled(features.EphemeralContainers) {
        for _, idx := range podContainerChanges.EphemeralContainersToStart {
            start("ephemeral container", metrics.EphemeralContainer, ephemeralContainerStartSpec(&pod.Spec.EphemeralContainers[idx]))
        }
    }

    // Step 6: start the init container.
    if container := podContainerChanges.NextInitContainerToStart; container != nil {
        // Start the next init container.
        if err := start("init container", metrics.InitContainer, containerStartSpec(container)); err != nil {
            return
        }

        // Successfully started the container; clear the entry in the failure
        klog.V(4).InfoS("Completed init container for pod", "containerName", container.Name, "pod", klog.KObj(pod))
    }

    // Step 7: start containers in podContainerChanges.ContainersToStart.
    for _, idx := range podContainerChanges.ContainersToStart {
        start("container", metrics.Container, containerStartSpec(&pod.Spec.Containers[idx]))
    }

    return
}

计算容器状态变更

通过 computePodActions() 接口计算 sanbox 容器和普通容器有没有发生变更,并更新相关监控指标。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
    // Step 1: Compute sandbox and container changes.
    podContainerChanges := m.computePodActions(pod, podStatus)
    klog.V(3).InfoS("computePodActions got for pod", "podActions", podContainerChanges, "pod", klog.KObj(pod))
    if podContainerChanges.CreateSandbox {
        ref, err := ref.GetReference(legacyscheme.Scheme, pod)
        if err != nil {
            klog.ErrorS(err, "Couldn't make a ref to pod", "pod", klog.KObj(pod))
        }
        if podContainerChanges.SandboxID != "" {
            m.recorder.Eventf(ref, v1.EventTypeNormal, events.SandboxChanged, "Pod sandbox changed, it will be killed and re-created.")
        } else {
            klog.V(4).InfoS("SyncPod received new pod, will create a sandbox for it", "pod", klog.KObj(pod))
        }
    }

我们重点来看看 computePodActions() 接口里面的逻辑。

  1. 检查 sandbox 容器有没有变更,以下这些情况,sandbox 容器都需要重建。

    • 如果 sanbox 容器大于 1 或者就绪的 sandbox 容器不是最新的状态。

    • network 命名空间发生了变更。

    • sandbox 没有 ip 。

  2. 如果 pod 的重启策略为 Never ,或者 pod 没有容器需要启动,则无需处理,接口直接返回。

  3. 如果 sandbox 需要重建,且 InitContainers 数量不为 0 ,则接口返回第一个 InitContainers 供容器运行时启动,如果没有 InitContainers ,则返回普通容器组 ContainersToStart ,启动这些容器。

  4. 如果 sandbox 已经创建了,不需要重建了,则先获取需要启动的临时容器,然后检查初始化容器 InitContainers 启动完了没有,如果有多个初始化容器,则返回下一个需要启动的初始化容器。

  5. 检查 pod 下的所有容器,并针对以下几种情况分别做不同的处理,并返回给容器运行时处理。

    • 如果容器处于非运行状态,则运行 post-stop 生命周期钩子处理用于释放 cpu 资源。

    • 如果容器不存在,或者非运行状态,检查容器是否需要被重启,是的话,加入到 ContainersToStart 切片待后续重启处理。如果容器状态处于 unknown ,则接入到 ContainersToKill 切片,等待运行时 kill 掉重建。

    • 如果容器是运行的,且先检查容器的 hash 值,如果 hash 不一致则说容器发生过变更,需要重启,liveness , startup 探针失败的,也需要重启。

    • 如果 ContainersToStart 长度为 0 ,且 keepCount 也为 0 ,则说明需要重建所有容器,包括 sandbox ,设置变量 KillPod = true

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
func (m *kubeGenericRuntimeManager) computePodActions(pod *v1.Pod, podStatus *kubecontainer.PodStatus) podActions {
    klog.V(5).InfoS("Syncing Pod", "pod", klog.KObj(pod))

    createPodSandbox, attempt, sandboxID := m.podSandboxChanged(pod, podStatus)
    changes := podActions{
        KillPod:           createPodSandbox,
        CreateSandbox:     createPodSandbox,
        SandboxID:         sandboxID,
        Attempt:           attempt,
        ContainersToStart: []int{},
        ContainersToKill:  make(map[kubecontainer.ContainerID]containerToKillInfo),
    }

    // If we need to (re-)create the pod sandbox, everything will need to be
    // killed and recreated, and init containers should be purged.
    if createPodSandbox {
        if !shouldRestartOnFailure(pod) && attempt != 0 && len(podStatus.ContainerStatuses) != 0 {
            // Should not restart the pod, just return.
            // we should not create a sandbox for a pod if it is already done.
            // if all containers are done and should not be started, there is no need to create a new sandbox.
            // this stops confusing logs on pods whose containers all have exit codes, but we recreate a sandbox before terminating it.
            //
            // If ContainerStatuses is empty, we assume that we've never
            // successfully created any containers. In this case, we should
            // retry creating the sandbox.
            changes.CreateSandbox = false
            return changes
        }

        // Get the containers to start, excluding the ones that succeeded if RestartPolicy is OnFailure.
        var containersToStart []int
        for idx, c := range pod.Spec.Containers {
            if pod.Spec.RestartPolicy == v1.RestartPolicyOnFailure && containerSucceeded(&c, podStatus) {
                continue
            }
            containersToStart = append(containersToStart, idx)
        }
        // We should not create a sandbox for a Pod if initialization is done and there is no container to start.
        if len(containersToStart) == 0 {
            _, _, done := findNextInitContainerToRun(pod, podStatus)
            if done {
                changes.CreateSandbox = false
                return changes
            }
        }

        if len(pod.Spec.InitContainers) != 0 {
            // Pod has init containers, return the first one.
            changes.NextInitContainerToStart = &pod.Spec.InitContainers[0]
            return changes
        }
        changes.ContainersToStart = containersToStart
        return changes
    }

    // Ephemeral containers may be started even if initialization is not yet complete.
    if utilfeature.DefaultFeatureGate.Enabled(features.EphemeralContainers) {
        for i := range pod.Spec.EphemeralContainers {
            c := (*v1.Container)(&pod.Spec.EphemeralContainers[i].EphemeralContainerCommon)

            // Ephemeral Containers are never restarted
            if podStatus.FindContainerStatusByName(c.Name) == nil {
                changes.EphemeralContainersToStart = append(changes.EphemeralContainersToStart, i)
            }
        }
    }

    // Check initialization progress.
    initLastStatus, next, done := findNextInitContainerToRun(pod, podStatus)
    if !done {
        if next != nil {
            initFailed := initLastStatus != nil && isInitContainerFailed(initLastStatus)
            if initFailed && !shouldRestartOnFailure(pod) {
                changes.KillPod = true
            } else {
                // Always try to stop containers in unknown state first.
                if initLastStatus != nil && initLastStatus.State == kubecontainer.ContainerStateUnknown {
                    changes.ContainersToKill[initLastStatus.ID] = containerToKillInfo{
                        name:      next.Name,
                        container: next,
                        message: fmt.Sprintf("Init container is in %q state, try killing it before restart",
                            initLastStatus.State),
                        reason: reasonUnknown,
                    }
                }
                changes.NextInitContainerToStart = next
            }
        }
        // Initialization failed or still in progress. Skip inspecting non-init
        // containers.
        return changes
    }

    // Number of running containers to keep.
    keepCount := 0
    // check the status of containers.
    for idx, container := range pod.Spec.Containers {
        containerStatus := podStatus.FindContainerStatusByName(container.Name)

        // Call internal container post-stop lifecycle hook for any non-running container so that any
        // allocated cpus are released immediately. If the container is restarted, cpus will be re-allocated
        // to it.
        if containerStatus != nil && containerStatus.State != kubecontainer.ContainerStateRunning {
            if err := m.internalLifecycle.PostStopContainer(containerStatus.ID.ID); err != nil {
                klog.ErrorS(err, "Internal container post-stop lifecycle hook failed for container in pod with error",
                    "containerName", container.Name, "pod", klog.KObj(pod))
            }
        }

        // If container does not exist, or is not running, check whether we
        // need to restart it.
        if containerStatus == nil || containerStatus.State != kubecontainer.ContainerStateRunning {
            if kubecontainer.ShouldContainerBeRestarted(&container, pod, podStatus) {
                klog.V(3).InfoS("Container of pod is not in the desired state and shall be started", "containerName", container.Name, "pod", klog.KObj(pod))
                changes.ContainersToStart = append(changes.ContainersToStart, idx)
                if containerStatus != nil && containerStatus.State == kubecontainer.ContainerStateUnknown {
                    // If container is in unknown state, we don't know whether it
                    // is actually running or not, always try killing it before
                    // restart to avoid having 2 running instances of the same container.
                    changes.ContainersToKill[containerStatus.ID] = containerToKillInfo{
                        name:      containerStatus.Name,
                        container: &pod.Spec.Containers[idx],
                        message: fmt.Sprintf("Container is in %q state, try killing it before restart",
                            containerStatus.State),
                        reason: reasonUnknown,
                    }
                }
            }
            continue
        }
        // The container is running, but kill the container if any of the following condition is met.
        var message string
        var reason containerKillReason
        restart := shouldRestartOnFailure(pod)
        if _, _, changed := containerChanged(&container, containerStatus); changed {
            message = fmt.Sprintf("Container %s definition changed", container.Name)
            // Restart regardless of the restart policy because the container
            // spec changed.
            restart = true
        } else if liveness, found := m.livenessManager.Get(containerStatus.ID); found && liveness == proberesults.Failure {
            // If the container failed the liveness probe, we should kill it.
            message = fmt.Sprintf("Container %s failed liveness probe", container.Name)
            reason = reasonLivenessProbe
        } else if startup, found := m.startupManager.Get(containerStatus.ID); found && startup == proberesults.Failure {
            // If the container failed the startup probe, we should kill it.
            message = fmt.Sprintf("Container %s failed startup probe", container.Name)
            reason = reasonStartupProbe
        } else {
            // Keep the container.
            keepCount++
            continue
        }

        // We need to kill the container, but if we also want to restart the
        // container afterwards, make the intent clear in the message. Also do
        // not kill the entire pod since we expect container to be running eventually.
        if restart {
            message = fmt.Sprintf("%s, will be restarted", message)
            changes.ContainersToStart = append(changes.ContainersToStart, idx)
        }

        changes.ContainersToKill[containerStatus.ID] = containerToKillInfo{
            name:      containerStatus.Name,
            container: &pod.Spec.Containers[idx],
            message:   message,
            reason:    reason,
        }
        klog.V(2).InfoS("Message for Container of pod", "containerName", container.Name, "containerStatusID", containerStatus.ID, "pod", klog.KObj(pod), "containerMessage", message)
    }

    if keepCount == 0 && len(changes.ContainersToStart) == 0 {
        changes.KillPod = true
    }

    return changes
}

创建前的清理工作

  1. 我们前面 computePodActions() 接口第 5 步,如果 KillPod = true ,sandbox 容器也需要重建的情况下,则清理掉所有 InitContainers 再重新创建一个新的 sandbox 容器。

  2. 然后再检查 ContainersToKill 切片,将里面需要 kill 掉的容器全部都给 kill 掉。接着会再次对 InitContainers 进行一次清理(清理同名 InitContainers ,确保只保留一个)。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
    // Step 2: Kill the pod if the sandbox has changed.
    if podContainerChanges.KillPod {
        if podContainerChanges.CreateSandbox {
            klog.V(4).InfoS("Stopping PodSandbox for pod, will start new one", "pod", klog.KObj(pod))
        } else {
            klog.V(4).InfoS("Stopping PodSandbox for pod, because all other containers are dead", "pod", klog.KObj(pod))
        }

        killResult := m.killPodWithSyncResult(pod, kubecontainer.ConvertPodStatusToRunningPod(m.runtimeName, podStatus), nil)
        result.AddPodSyncResult(killResult)
        if killResult.Error() != nil {
            klog.ErrorS(killResult.Error(), "killPodWithSyncResult failed")
            return
        }

        if podContainerChanges.CreateSandbox {
            m.purgeInitContainers(pod, podStatus)
        }
    } else {
        // Step 3: kill any running containers in this pod which are not to keep.
        for containerID, containerInfo := range podContainerChanges.ContainersToKill {
            klog.V(3).InfoS("Killing unwanted container for pod", "containerName", containerInfo.name, "containerID", containerID, "pod", klog.KObj(pod))
            killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, containerInfo.name)
            result.AddSyncResult(killContainerResult)
            if err := m.killContainer(pod, containerID, containerInfo.name, containerInfo.message, containerInfo.reason, nil); err != nil {
                killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error())
                klog.ErrorS(err, "killContainer for pod failed", "containerName", containerInfo.name, "containerID", containerID, "pod", klog.KObj(pod))
                return
            }
        }
    }

sandbox 容器创建

完成前面的清理工作后,开始判断是否需要重建 sandbox 容器。

  1. 如果需要重建 sandbox 容器。

    • 调用 createPodSandbox() 接口,调用接口期间可能会收到来自 cni 、 csi 、 或者 cri 的错误,这是因为刚创建完 pod 就收到删除 pod 的请求导致的,属于正常的错误。

    • 然后检查 pod 的网络模式,为 sandbox 设置 ip 地址。

  2. 获取 sandbox 配置信息,后面启动容器调用接口的时候需要用到。

  3. start 匿名函数,用于启动各种类型的容器。里面最核心的接口是 startContainer() 接口。我们有必要分析一下这个接口。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
    // Step 4: Create a sandbox for the pod if necessary.
    podSandboxID := podContainerChanges.SandboxID
    if podContainerChanges.CreateSandbox {
        var msg string
        var err error

        klog.V(4).InfoS("Creating PodSandbox for pod", "pod", klog.KObj(pod))
        metrics.StartedPodsTotal.Inc()
        createSandboxResult := kubecontainer.NewSyncResult(kubecontainer.CreatePodSandbox, format.Pod(pod))
        result.AddSyncResult(createSandboxResult)
        podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt)
        if err != nil {
            // createPodSandbox can return an error from CNI, CSI,
            // or CRI if the Pod has been deleted while the POD is
            // being created. If the pod has been deleted then it's
            // not a real error.
            //
            // SyncPod can still be running when we get here, which
            // means the PodWorker has not acked the deletion.
            if m.podStateProvider.IsPodTerminationRequested(pod.UID) {
                klog.V(4).InfoS("Pod was deleted and sandbox failed to be created", "pod", klog.KObj(pod), "podUID", pod.UID)
                return
            }
            metrics.StartedPodsErrorsTotal.Inc()
            createSandboxResult.Fail(kubecontainer.ErrCreatePodSandbox, msg)
            klog.ErrorS(err, "CreatePodSandbox for pod failed", "pod", klog.KObj(pod))
            ref, referr := ref.GetReference(legacyscheme.Scheme, pod)
            if referr != nil {
                klog.ErrorS(referr, "Couldn't make a ref to pod", "pod", klog.KObj(pod))
            }
            m.recorder.Eventf(ref, v1.EventTypeWarning, events.FailedCreatePodSandBox, "Failed to create pod sandbox: %v", err)
            return
        }
        klog.V(4).InfoS("Created PodSandbox for pod", "podSandboxID", podSandboxID, "pod", klog.KObj(pod))

        podSandboxStatus, err := m.runtimeService.PodSandboxStatus(podSandboxID)
        if err != nil {
            ref, referr := ref.GetReference(legacyscheme.Scheme, pod)
            if referr != nil {
                klog.ErrorS(referr, "Couldn't make a ref to pod", "pod", klog.KObj(pod))
            }
            m.recorder.Eventf(ref, v1.EventTypeWarning, events.FailedStatusPodSandBox, "Unable to get pod sandbox status: %v", err)
            klog.ErrorS(err, "Failed to get pod sandbox status; Skipping pod", "pod", klog.KObj(pod))
            result.Fail(err)
            return
        }

        // If we ever allow updating a pod from non-host-network to
        // host-network, we may use a stale IP.
        if !kubecontainer.IsHostNetworkPod(pod) {
            // Overwrite the podIPs passed in the pod status, since we just started the pod sandbox.
            podIPs = m.determinePodSandboxIPs(pod.Namespace, pod.Name, podSandboxStatus)
            klog.V(4).InfoS("Determined the ip for pod after sandbox changed", "IPs", podIPs, "pod", klog.KObj(pod))
        }
    }

    // the start containers routines depend on pod ip(as in primary pod ip)
    // instead of trying to figure out if we have 0 < len(podIPs)
    // everytime, we short circuit it here
    podIP := ""
    if len(podIPs) != 0 {
        podIP = podIPs[0]
    }

    // Get podSandboxConfig for containers to start.
    configPodSandboxResult := kubecontainer.NewSyncResult(kubecontainer.ConfigPodSandbox, podSandboxID)
    result.AddSyncResult(configPodSandboxResult)
    podSandboxConfig, err := m.generatePodSandboxConfig(pod, podContainerChanges.Attempt)
    if err != nil {
        message := fmt.Sprintf("GeneratePodSandboxConfig for pod %q failed: %v", format.Pod(pod), err)
        klog.ErrorS(err, "GeneratePodSandboxConfig for pod failed", "pod", klog.KObj(pod))
        configPodSandboxResult.Fail(kubecontainer.ErrConfigPodSandbox, message)
        return
    }

    // Helper containing boilerplate common to starting all types of containers.
    // typeName is a description used to describe this type of container in log messages,
    // currently: "container", "init container" or "ephemeral container"
    // metricLabel is the label used to describe this type of container in monitoring metrics.
    // currently: "container", "init_container" or "ephemeral_container"
    start := func(typeName, metricLabel string, spec *startSpec) error {
        startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, spec.container.Name)
        result.AddSyncResult(startContainerResult)

        isInBackOff, msg, err := m.doBackOff(pod, spec.container, podStatus, backOff)
        if isInBackOff {
            startContainerResult.Fail(err, msg)
            klog.V(4).InfoS("Backing Off restarting container in pod", "containerType", typeName, "container", spec.container, "pod", klog.KObj(pod))
            return err
        }

        metrics.StartedContainersTotal.WithLabelValues(metricLabel).Inc()
        klog.V(4).InfoS("Creating container in pod", "containerType", typeName, "container", spec.container, "pod", klog.KObj(pod))
        // NOTE (aramase) podIPs are populated for single stack and dual stack clusters. Send only podIPs.
        if msg, err := m.startContainer(podSandboxID, podSandboxConfig, spec, pod, podStatus, pullSecrets, podIP, podIPs); err != nil {
            // startContainer() returns well-defined error codes that have reasonable cardinality for metrics and are
            // useful to cluster administrators to distinguish "server errors" from "user errors".
            metrics.StartedContainersErrorsTotal.WithLabelValues(metricLabel, err.Error()).Inc()
            startContainerResult.Fail(err, msg)
            // known errors that are logged in other places are logged at higher levels here to avoid
            // repetitive log spam
            switch {
            case err == images.ErrImagePullBackOff:
                klog.V(3).InfoS("Container start failed in pod", "containerType", typeName, "container", spec.container, "pod", klog.KObj(pod), "containerMessage", msg, "err", err)
            default:
                utilruntime.HandleError(fmt.Errorf("%v %+v start failed in pod %v: %v: %s", typeName, spec.container, format.Pod(pod), err, msg))
            }
            return err
        }

        return nil
    }

startContainer 逻辑

这里是本篇的核心所在,创建并启动容器。

  1. pull 容器镜像。

  2. 创建容器,过程如下。

    • 创建日志目录。

    • 生成容器配置。

    • 容器预创建。

    • 创建容器,获取容器 id 。

    • 执行 PreStartContainer 钩子用于资源分配。

    • 启动容器。

    • 日志目录创建软连接。

    • 执行 postStart 钩子。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
func (m *kubeGenericRuntimeManager) startContainer(podSandboxID string, podSandboxConfig *runtimeapi.PodSandboxConfig, spec *startSpec, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, podIP string, podIPs []string) (string, error) {
    container := spec.container

    // Step 1: pull the image.
    imageRef, msg, err := m.imagePuller.EnsureImageExists(pod, container, pullSecrets, podSandboxConfig)
    if err != nil {
        s, _ := grpcstatus.FromError(err)
        m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", s.Message())
        return msg, err
    }

    // Step 2: create the container.
    // For a new container, the RestartCount should be 0
    restartCount := 0
    containerStatus := podStatus.FindContainerStatusByName(container.Name)
    if containerStatus != nil {
        restartCount = containerStatus.RestartCount + 1
    } else {
        // The container runtime keeps state on container statuses and
        // what the container restart count is. When nodes are rebooted
        // some container runtimes clear their state which causes the
        // restartCount to be reset to 0. This causes the logfile to
        // start at 0.log, which either overwrites or appends to the
        // already existing log.
        //
        // We are checking to see if the log directory exists, and find
        // the latest restartCount by checking the log name -
        // {restartCount}.log - and adding 1 to it.
        logDir := BuildContainerLogsDirectory(pod.Namespace, pod.Name, pod.UID, container.Name)
        restartCount, err = calcRestartCountByLogDir(logDir)
        if err != nil {
            klog.InfoS("Log directory exists but could not calculate restartCount", "logDir", logDir, "err", err)
        }
    }

    target, err := spec.getTargetID(podStatus)
    if err != nil {
        s, _ := grpcstatus.FromError(err)
        m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", s.Message())
        return s.Message(), ErrCreateContainerConfig
    }

    containerConfig, cleanupAction, err := m.generateContainerConfig(container, pod, restartCount, podIP, imageRef, podIPs, target)
    if cleanupAction != nil {
        defer cleanupAction()
    }
    if err != nil {
        s, _ := grpcstatus.FromError(err)
        m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", s.Message())
        return s.Message(), ErrCreateContainerConfig
    }

    err = m.internalLifecycle.PreCreateContainer(pod, container, containerConfig)
    if err != nil {
        s, _ := grpcstatus.FromError(err)
        m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Internal PreCreateContainer hook failed: %v", s.Message())
        return s.Message(), ErrPreCreateHook
    }

    containerID, err := m.runtimeService.CreateContainer(podSandboxID, containerConfig, podSandboxConfig)
    if err != nil {
        s, _ := grpcstatus.FromError(err)
        m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", s.Message())
        return s.Message(), ErrCreateContainer
    }
    err = m.internalLifecycle.PreStartContainer(pod, container, containerID)
    if err != nil {
        s, _ := grpcstatus.FromError(err)
        m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToStartContainer, "Internal PreStartContainer hook failed: %v", s.Message())
        return s.Message(), ErrPreStartHook
    }
    m.recordContainerEvent(pod, container, containerID, v1.EventTypeNormal, events.CreatedContainer, fmt.Sprintf("Created container %s", container.Name))

    // Step 3: start the container.
    err = m.runtimeService.StartContainer(containerID)
    if err != nil {
        s, _ := grpcstatus.FromError(err)
        m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToStartContainer, "Error: %v", s.Message())
        return s.Message(), kubecontainer.ErrRunContainer
    }
    m.recordContainerEvent(pod, container, containerID, v1.EventTypeNormal, events.StartedContainer, fmt.Sprintf("Started container %s", container.Name))

    // Symlink container logs to the legacy container log location for cluster logging
    // support.
    // TODO(random-liu): Remove this after cluster logging supports CRI container log path.
    containerMeta := containerConfig.GetMetadata()
    sandboxMeta := podSandboxConfig.GetMetadata()
    legacySymlink := legacyLogSymlink(containerID, containerMeta.Name, sandboxMeta.Name,
        sandboxMeta.Namespace)
    containerLog := filepath.Join(podSandboxConfig.LogDirectory, containerConfig.LogPath)
    // only create legacy symlink if containerLog path exists (or the error is not IsNotExist).
    // Because if containerLog path does not exist, only dangling legacySymlink is created.
    // This dangling legacySymlink is later removed by container gc, so it does not make sense
    // to create it in the first place. it happens when journald logging driver is used with docker.
    if _, err := m.osInterface.Stat(containerLog); !os.IsNotExist(err) {
        if err := m.osInterface.Symlink(containerLog, legacySymlink); err != nil {
            klog.ErrorS(err, "Failed to create legacy symbolic link", "path", legacySymlink,
                "containerID", containerID, "containerLogPath", containerLog)
        }
    }

    // Step 4: execute the post start hook.
    if container.Lifecycle != nil && container.Lifecycle.PostStart != nil {
        kubeContainerID := kubecontainer.ContainerID{
            Type: m.runtimeName,
            ID:   containerID,
        }
        msg, handlerErr := m.runner.Run(kubeContainerID, pod, container, container.Lifecycle.PostStart)
        if handlerErr != nil {
            klog.ErrorS(handlerErr, "Failed to execute PostStartHook", "pod", klog.KObj(pod),
                "podUID", pod.UID, "containerName", container.Name, "containerID", kubeContainerID.String())
            m.recordContainerEvent(pod, container, kubeContainerID.ID, v1.EventTypeWarning, events.FailedPostStartHook, msg)
            if err := m.killContainer(pod, kubeContainerID, container.Name, "FailedPostStartHook", reasonFailedPostStartHook, nil); err != nil {
                klog.ErrorS(err, "Failed to kill container", "pod", klog.KObj(pod),
                    "podUID", pod.UID, "containerName", container.Name, "containerID", kubeContainerID.String())
            }
            return msg, ErrPostStartHook
        }
    }

    return "", nil
}

创建并启动容器

调用上面的 start 匿名函数创建并启动容器。

创建顺序

  1. 先检查临时容器特性门控有没有启用,有的话先创建并启动临时容器。

  2. 创建并启动 InitContainers 。

  3. 创建并启动普通容器。

  4. 当 pod 启动完后上报状态给 apiserver 。

信息

InitContainer 这里我们看到它是每次启动一个,那么问题来了,我们之前了解到,syncPod() 接口在流程的最后会把 pod 加一个 10 秒的延迟丢进工作队列等待下一次同步,那么这里当第一个 InitContainer 创建并启动完的时候,距离下一次同步还在 10 秒延迟内,它是怎么立刻触发启动下一个 InitContainer 容器的呢?这里它并不像普通容器一样,把容器记录在一个切片内,然后通过 for 循环对多个容器同时进行异步创建和启动的。

问题的关键就在 PLEG ,它每秒执行一次,检查 pod 中的每个容器的状态,当容器的状态跟上一次检查的时候不一致时,触发 UpdatePod() 对 pod 进行再一次同步。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
    // Step 5: start ephemeral containers
    // These are started "prior" to init containers to allow running ephemeral containers even when there
    // are errors starting an init container. In practice init containers will start first since ephemeral
    // containers cannot be specified on pod creation.
    if utilfeature.DefaultFeatureGate.Enabled(features.EphemeralContainers) {
        for _, idx := range podContainerChanges.EphemeralContainersToStart {
            start("ephemeral container", metrics.EphemeralContainer, ephemeralContainerStartSpec(&pod.Spec.EphemeralContainers[idx]))
        }
    }

    // Step 6: start the init container.
    if container := podContainerChanges.NextInitContainerToStart; container != nil {
        // Start the next init container.
        if err := start("init container", metrics.InitContainer, containerStartSpec(container)); err != nil {
            return
        }

        // Successfully started the container; clear the entry in the failure
        klog.V(4).InfoS("Completed init container for pod", "containerName", container.Name, "pod", klog.KObj(pod))
    }

    // Step 7: start containers in podContainerChanges.ContainersToStart.
    for _, idx := range podContainerChanges.ContainersToStart {
        start("container", metrics.Container, containerStartSpec(&pod.Spec.Containers[idx]))
    }

总结

  1. sandbox 容器是 kubernetes 的精髓,用于隔离各种命名空间环境和资源。

  2. 存在初始化容器的情况下,首次启动先启动初始化容器,每次启动一个初始化容器,只有初始化容器正常退出,才会继续启动下一个初始化容器或普通容器。

  3. PLEG 会检查所有容器的状态,并触发 pod 再次同步。

  4. syncCh 也会定期对 pod 进行同步。

  5. 创建完 pod 后上报状态给 apiserver 。

这里面还有很多东西我们还没有去看,比如 PLEG , statusManager , 各种探针,等等。我们可以在后面的章节再对这些内容进行分析和记录。 这里由于篇幅原因我们就先到此为止。