目录

kubernetes源码-kubelet 原理和源码分析(四)

kubelet 是如何创建进行健康检查的,源码为 kubernetes 的 release-1.26 分支 .

写在前面

前面我们已经了解了 kubelet 是如果同步 pod 的整个过程,但是还有很多功能我们还没来得及看,比如健康检查,pleg ,存储卷挂载,驱逐管理等,我们今天得闲,先看看健康检查这一块是怎么完成的。

我们先看流程图,这样会有一个更清晰的概念。

大致流程如下:

  1. podWorker 在对 pod 进行同步的时候,调用 AddPod() 方法对 pod 进行检查,看看 pod 是否配置了探针。

  2. 根据探针类型,构造 probe worker ,并拉起 worker 的 goroutine 进程。

  3. run() 的时候,调用 prober 的 probe() 方法对探针类型进行检查,并使用对应类型的方法进行探测。

  4. 结果返回。

  5. 通知 kubelet 更新 pod 容器状态。

/kubernetes%E6%BA%90%E7%A0%81-kubelet-%E5%8E%9F%E7%90%86%E5%92%8C%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90%E5%9B%9B/kubelet-prober.png
prober 管理器

probeManager

在看 kl.probeManager.AddPod() 方法之前,我们先看看 probeManager 的构造,因为 AddPod 方法是建立在这个管理器的基础上的。

  1. kubelet 在构建入口函数的时候,可以看到它在这里做了初始化。

  2. newProber() 构建 4 个探测器,其中 runner 用来在容器里面执行 exec 命令用的。

  3. 创建 livenessManager, readinessManager, readinessManager 空的结果管理器。

  4. 创建 workers 缓存管理器。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
  ...
  klet.livenessManager = proberesults.NewManager()
  klet.readinessManager = proberesults.NewManager()
  klet.readinessManager = proberesults.NewManager()
  ...
  // kubelet 在构建入口函数的时候,可以看到它在这里做了初始化。
  if kubeDeps.ProbeManager != nil {
    klet.probeManager = kubeDeps.ProbeManager
  } else {
    klet.probeManager = prober.NewManager(
      klet.statusManager,
      klet.livenessManager,
      klet.readinessManager,
      klet.startupManager,
      klet.runner,
      kubeDeps.Recorder)
  }
  ...

func NewManager(
  statusManager status.Manager,
  livenessManager results.Manager,
  readinessManager results.Manager,
  startupManager results.Manager,
  runner kubecontainer.CommandRunner,
  recorder record.EventRecorder) Manager {

  prober := newProber(runner, recorder)
  return &manager{
    statusManager:    statusManager,
    prober:           prober,
    readinessManager: readinessManager,
    livenessManager:  livenessManager,
    startupManager:   startupManager,
    workers:          make(map[probeKey]*worker),
    start:            clock.RealClock{}.Now(),
  }
}

func newProber(
  runner kubecontainer.CommandRunner,
  recorder record.EventRecorder) *prober {

  const followNonLocalRedirects = false
  return &prober{
    exec:     execprobe.New(),
    http:     httpprobe.New(followNonLocalRedirects),
    tcp:      tcpprobe.New(),
    grpc:     grpcprobe.New(),
    runner:   runner,
    recorder: recorder,
  }
}

// NewManager creates and returns an empty results manager.
func NewManager() Manager {
  return &manager{
    cache:   make(map[kubecontainer.ContainerID]Result),
    updates: make(chan Update, 20),
  }
}

AddPod()

  1. 检查 pod 里面的每个容器,有没有配置探针。

  2. 先检查缓存中有没有 worker , 没有的话, m.workers[key] = w 将 worker 加入 prober 管理器缓存。

  3. 根据不同类型的探针,构建不同的 probe worker 并启动。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
type probeKey struct {
    podUID        types.UID
    containerName string
    probeType     probeType
}

func (m *manager) AddPod(pod *v1.Pod) {
  m.workerLock.Lock()
  defer m.workerLock.Unlock()

  key := probeKey{podUID: pod.UID}
  for _, c := range pod.Spec.Containers {
    key.containerName = c.Name

    if c.StartupProbe != nil {
      key.probeType = startup
      if _, ok := m.workers[key]; ok {
        klog.V(8).ErrorS(nil, "Startup probe already exists for container",
          "pod", klog.KObj(pod), "containerName", c.Name)
        return
      }
      w := newWorker(m, startup, pod, c)
      m.workers[key] = w
      go w.run()
    }

    if c.ReadinessProbe != nil {
      key.probeType = readiness
      if _, ok := m.workers[key]; ok {
        klog.V(8).ErrorS(nil, "Readiness probe already exists for container",
          "pod", klog.KObj(pod), "containerName", c.Name)
        return
      }
      w := newWorker(m, readiness, pod, c)
      m.workers[key] = w
      go w.run()
    }

    if c.LivenessProbe != nil {
      key.probeType = liveness
      if _, ok := m.workers[key]; ok {
        klog.V(8).ErrorS(nil, "Liveness probe already exists for container",
          "pod", klog.KObj(pod), "containerName", c.Name)
        return
      }
      w := newWorker(m, liveness, pod, c)
      m.workers[key] = w
      go w.run()
    }
  }
}

newWorker()

我们看看构造函数里面做了哪些操作。

做了 2 个操作:

  1. 根据探针类型对 worker 进行初始化设置。

  2. 设置监控值。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
func newWorker(
  m *manager,
  probeType probeType,
  pod *v1.Pod,
  container v1.Container) *worker {

  w := &worker{
    stopCh:          make(chan struct{}, 1), // Buffer so stop() can be non-blocking.
    manualTriggerCh: make(chan struct{}, 1), // Buffer so prober_manager can do non-blocking calls to doProbe.
    pod:             pod,
    container:       container,
    probeType:       probeType,
    probeManager:    m,
  }

  switch probeType {
  case readiness:
    w.spec = container.ReadinessProbe
    w.resultsManager = m.readinessManager
    w.initialValue = results.Failure
  case liveness:
    w.spec = container.LivenessProbe
    w.resultsManager = m.livenessManager
    w.initialValue = results.Success
  case startup:
    w.spec = container.StartupProbe
    w.resultsManager = m.startupManager
    w.initialValue = results.Unknown
  }

  podName := getPodLabelName(w.pod)

  basicMetricLabels := metrics.Labels{
    "probe_type": w.probeType.String(),
    "container":  w.container.Name,
    "pod":        podName,
    "namespace":  w.pod.Namespace,
    "pod_uid":    string(w.pod.UID),
  }

  proberDurationLabels := metrics.Labels{
    "probe_type": w.probeType.String(),
    "container":  w.container.Name,
    "pod":        podName,
    "namespace":  w.pod.Namespace,
  }

  w.proberResultsSuccessfulMetricLabels = deepCopyPrometheusLabels(basicMetricLabels)
  w.proberResultsSuccessfulMetricLabels["result"] = probeResultSuccessful

  w.proberResultsFailedMetricLabels = deepCopyPrometheusLabels(basicMetricLabels)
  w.proberResultsFailedMetricLabels["result"] = probeResultFailed

  w.proberResultsUnknownMetricLabels = deepCopyPrometheusLabels(basicMetricLabels)
  w.proberResultsUnknownMetricLabels["result"] = probeResultUnknown

  w.proberDurationSuccessfulMetricLabels = deepCopyPrometheusLabels(proberDurationLabels)
  w.proberDurationUnknownMetricLabels = deepCopyPrometheusLabels(proberDurationLabels)

  return w
}

run()

接下来就是 worker 运行的逻辑代码。

  1. 设置定时器。

  2. 通过 doProbe 计算是否进行周期性循环健康检测。

  3. defer 清理。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
func (w *worker) run() {
  ctx := context.Background()
  probeTickerPeriod := time.Duration(w.spec.PeriodSeconds) * time.Second

  // If kubelet restarted the probes could be started in rapid succession.
  // Let the worker wait for a random portion of tickerPeriod before probing.
  // Do it only if the kubelet has started recently.
  if probeTickerPeriod > time.Since(w.probeManager.start) {
    time.Sleep(time.Duration(rand.Float64() * float64(probeTickerPeriod)))
  }

  probeTicker := time.NewTicker(probeTickerPeriod)

  defer func() {
    // Clean up.
    probeTicker.Stop()
    if !w.containerID.IsEmpty() {
      w.resultsManager.Remove(w.containerID)
    }

    w.probeManager.removeWorker(w.pod.UID, w.container.Name, w.probeType)
    ProberResults.Delete(w.proberResultsSuccessfulMetricLabels)
    ProberResults.Delete(w.proberResultsFailedMetricLabels)
    ProberResults.Delete(w.proberResultsUnknownMetricLabels)
    ProberDuration.Delete(w.proberDurationSuccessfulMetricLabels)
    ProberDuration.Delete(w.proberDurationUnknownMetricLabels)
  }()

probeLoop:
  for w.doProbe(ctx) {
    // Wait for next probe tick.
    select {
    case <-w.stopCh:
      break probeLoop
    case <-probeTicker.C:
    case <-w.manualTriggerCh:
      // continue
    }
  }
}

doProbe()

  1. 获取 pod 状态,检查 pod 是否处于退出状态,检查容器是否存在,检查 pod 是否处于优雅退出状态,是否在延迟时间内等,这些都不做探测。

  2. startup 只执行一次,只要成功了,以后都是返回 true ,防止容器重启也执行再次 startup 探测,其他探针会等到容器就绪再进行探测。

  3. 执行 w.probeManager.prober.probe() 进行探测。

  4. 对结果进行记录。

  5. 根据容器 id ,结果,和 pod 信息,通过 w.resultsManager.Set() 通知 kubelet 更新 pod 容器状态和处理探针同步。

  6. 如果 liveness 和 startup 探针检测失败,则容器需要重启,且需要等待新的容器 id 出现才再次探测,防止在执行 doccker exec 的时候,破坏了容器的状态, resultRun 变量归零。

  7. 如果 pod 终止,则探针退出。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
// doProbe probes the container once and records the result.
// Returns whether the worker should continue.
func (w *worker) doProbe(ctx context.Context) (keepGoing bool) {
  defer func() { recover() }() // Actually eat panics (HandleCrash takes care of logging)
  defer runtime.HandleCrash(func(_ interface{}) { keepGoing = true })

  startTime := time.Now()
  status, ok := w.probeManager.statusManager.GetPodStatus(w.pod.UID)
  if !ok {
    // Either the pod has not been created yet, or it was already deleted.
    klog.V(3).InfoS("No status for pod", "pod", klog.KObj(w.pod))
    return true
  }

  // Worker should terminate if pod is terminated.
  if status.Phase == v1.PodFailed || status.Phase == v1.PodSucceeded {
    klog.V(3).InfoS("Pod is terminated, exiting probe worker",
      "pod", klog.KObj(w.pod), "phase", status.Phase)
    return false
  }

  c, ok := podutil.GetContainerStatus(status.ContainerStatuses, w.container.Name)
  if !ok || len(c.ContainerID) == 0 {
    // Either the container has not been created yet, or it was deleted.
    klog.V(3).InfoS("Probe target container not found",
      "pod", klog.KObj(w.pod), "containerName", w.container.Name)
    return true // Wait for more information.
  }

  if w.containerID.String() != c.ContainerID {
    if !w.containerID.IsEmpty() {
      w.resultsManager.Remove(w.containerID)
    }
    w.containerID = kubecontainer.ParseContainerID(c.ContainerID)
    w.resultsManager.Set(w.containerID, w.initialValue, w.pod)
    // We've got a new container; resume probing.
    w.onHold = false
  }

  if w.onHold {
    // Worker is on hold until there is a new container.
    return true
  }

  if c.State.Running == nil {
    klog.V(3).InfoS("Non-running container probed",
      "pod", klog.KObj(w.pod), "containerName", w.container.Name)
    if !w.containerID.IsEmpty() {
      w.resultsManager.Set(w.containerID, results.Failure, w.pod)
    }
    // Abort if the container will not be restarted.
    return c.State.Terminated == nil ||
      w.pod.Spec.RestartPolicy != v1.RestartPolicyNever
  }

  // Graceful shutdown of the pod.
  if w.pod.ObjectMeta.DeletionTimestamp != nil && (w.probeType == liveness || w.probeType == startup) {
    klog.V(3).InfoS("Pod deletion requested, setting probe result to success",
      "probeType", w.probeType, "pod", klog.KObj(w.pod), "containerName", w.container.Name)
    if w.probeType == startup {
      klog.InfoS("Pod deletion requested before container has fully started",
        "pod", klog.KObj(w.pod), "containerName", w.container.Name)
    }
    // Set a last result to ensure quiet shutdown.
    w.resultsManager.Set(w.containerID, results.Success, w.pod)
    // Stop probing at this point.
    return false
  }

  // Probe disabled for InitialDelaySeconds.
  if int32(time.Since(c.State.Running.StartedAt.Time).Seconds()) < w.spec.InitialDelaySeconds {
    return true
  }

  if c.Started != nil && *c.Started {
    // Stop probing for startup once container has started.
    // we keep it running to make sure it will work for restarted container.
    // startup 只执行一次,只要成功了,以后都是返回 true ,防止容器重启也执行再次 startup 探测
    if w.probeType == startup {
      return true
    }
  } else {
    // Disable other probes until container has started.
    // 其他情况等到容器就绪再进行探测
    if w.probeType != startup {
      return true
    }
  }

  // Note, exec probe does NOT have access to pod environment variables or downward API
  result, err := w.probeManager.prober.probe(ctx, w.probeType, w.pod, status, w.container, w.containerID)
  if err != nil {
    // Prober error, throw away the result.
    return true
  }

  switch result {
  case results.Success:
    ProberResults.With(w.proberResultsSuccessfulMetricLabels).Inc()
    ProberDuration.With(w.proberDurationSuccessfulMetricLabels).Observe(time.Since(startTime).Seconds())
  case results.Failure:
    ProberResults.With(w.proberResultsFailedMetricLabels).Inc()
  default:
    ProberResults.With(w.proberResultsUnknownMetricLabels).Inc()
    ProberDuration.With(w.proberDurationUnknownMetricLabels).Observe(time.Since(startTime).Seconds())
  }

  if w.lastResult == result {
    w.resultRun++
  } else {
    w.lastResult = result
    w.resultRun = 1
  }

  if (result == results.Failure && w.resultRun < int(w.spec.FailureThreshold)) ||
    (result == results.Success && w.resultRun < int(w.spec.SuccessThreshold)) {
    // Success or failure is below threshold - leave the probe state unchanged.
    return true
  }

  w.resultsManager.Set(w.containerID, result, w.pod)

  if (w.probeType == liveness || w.probeType == startup) && result == results.Failure {
    // The container fails a liveness/startup check, it will need to be restarted.
    // Stop probing until we see a new container ID. This is to reduce the
    // chance of hitting #21751, where running `docker exec` when a
    // container is being stopped may lead to corrupted container state.
    w.onHold = true
    w.resultRun = 0
  }

  return true
}

probe()

  1. 根据探针类型赋值 probeSpec ,后面探测要用到这个变量,如果这个值为空,则说明没有探针,直接返回成功的值。

  2. 运行 pb.runProbeWithRetries() 探测,这个方法是自带重试次数的,默认是 3 次重试。

  3. pb.runProbeWithRetries() 调用 pb.runProbe() 执行探测, pb.runProbe() 根据探测器类型,执行对应的探测器,并将结果返回。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
// probe probes the container.
func (pb *prober) probe(ctx context.Context, probeType probeType, pod *v1.Pod, status v1.PodStatus, container v1.Container, containerID kubecontainer.ContainerID) (results.Result, error) {
  var probeSpec *v1.Probe
  switch probeType {
  case readiness:
    probeSpec = container.ReadinessProbe
  case liveness:
    probeSpec = container.LivenessProbe
  case startup:
    probeSpec = container.StartupProbe
  default:
    return results.Failure, fmt.Errorf("unknown probe type: %q", probeType)
  }

  if probeSpec == nil {
    klog.InfoS("Probe is nil", "probeType", probeType, "pod", klog.KObj(pod), "podUID", pod.UID, "containerName", container.Name)
    return results.Success, nil
  }

  result, output, err := pb.runProbeWithRetries(ctx, probeType, probeSpec, pod, status, container, containerID, maxProbeRetries)
  if err != nil || (result != probe.Success && result != probe.Warning) {
    // Probe failed in one way or another.
    if err != nil {
      klog.V(1).ErrorS(err, "Probe errored", "probeType", probeType, "pod", klog.KObj(pod), "podUID", pod.UID, "containerName", container.Name)
      pb.recordContainerEvent(pod, &container, v1.EventTypeWarning, events.ContainerUnhealthy, "%s probe errored: %v", probeType, err)
    } else { // result != probe.Success
      klog.V(1).InfoS("Probe failed", "probeType", probeType, "pod", klog.KObj(pod), "podUID", pod.UID, "containerName", container.Name, "probeResult", result, "output", output)
      pb.recordContainerEvent(pod, &container, v1.EventTypeWarning, events.ContainerUnhealthy, "%s probe failed: %s", probeType, output)
    }
    return results.Failure, err
  }
  if result == probe.Warning {
    pb.recordContainerEvent(pod, &container, v1.EventTypeWarning, events.ContainerProbeWarning, "%s probe warning: %s", probeType, output)
    klog.V(3).InfoS("Probe succeeded with a warning", "probeType", probeType, "pod", klog.KObj(pod), "podUID", pod.UID, "containerName", container.Name, "output", output)
  } else {
    klog.V(3).InfoS("Probe succeeded", "probeType", probeType, "pod", klog.KObj(pod), "podUID", pod.UID, "containerName", container.Name)
  }
  return results.Success, nil
}

// runProbeWithRetries tries to probe the container in a finite loop, it returns the last result
// if it never succeeds.
func (pb *prober) runProbeWithRetries(ctx context.Context, probeType probeType, p *v1.Probe, pod *v1.Pod, status v1.PodStatus, container v1.Container, containerID kubecontainer.ContainerID, retries int) (probe.Result, string, error) {
  var err error
  var result probe.Result
  var output string
  for i := 0; i < retries; i++ {
    result, output, err = pb.runProbe(ctx, probeType, p, pod, status, container, containerID)
    if err == nil {
      return result, output, nil
    }
  }
  return result, output, err
}

func (pb *prober) runProbe(ctx context.Context, probeType probeType, p *v1.Probe, pod *v1.Pod, status v1.PodStatus, container v1.Container, containerID kubecontainer.ContainerID) (probe.Result, string, error) {
  timeout := time.Duration(p.TimeoutSeconds) * time.Second
  if p.Exec != nil {
    klog.V(4).InfoS("Exec-Probe runProbe", "pod", klog.KObj(pod), "containerName", container.Name, "execCommand", p.Exec.Command)
    command := kubecontainer.ExpandContainerCommandOnlyStatic(p.Exec.Command, container.Env)
    return pb.exec.Probe(pb.newExecInContainer(ctx, container, containerID, command, timeout))
  }
  if p.HTTPGet != nil {
    req, err := httpprobe.NewRequestForHTTPGetAction(p.HTTPGet, &container, status.PodIP, "probe")
    if err != nil {
      return probe.Unknown, "", err
    }
    if klogV4 := klog.V(4); klogV4.Enabled() {
      port := req.URL.Port()
      host := req.URL.Hostname()
      path := req.URL.Path
      scheme := req.URL.Scheme
      headers := p.HTTPGet.HTTPHeaders
      klogV4.InfoS("HTTP-Probe", "scheme", scheme, "host", host, "port", port, "path", path, "timeout", timeout, "headers", headers)
    }
    return pb.http.Probe(req, timeout)
  }
  if p.TCPSocket != nil {
    port, err := probe.ResolveContainerPort(p.TCPSocket.Port, &container)
    if err != nil {
      return probe.Unknown, "", err
    }
    host := p.TCPSocket.Host
    if host == "" {
      host = status.PodIP
    }
    klog.V(4).InfoS("TCP-Probe", "host", host, "port", port, "timeout", timeout)
    return pb.tcp.Probe(host, port, timeout)
  }

  if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.GRPCContainerProbe) && p.GRPC != nil {
    host := status.PodIP
    service := ""
    if p.GRPC.Service != nil {
      service = *p.GRPC.Service
    }
    klog.V(4).InfoS("GRPC-Probe", "host", host, "service", service, "port", p.GRPC.Port, "timeout", timeout)
    return pb.grpc.Probe(host, service, int(p.GRPC.Port), timeout)
  }

  klog.InfoS("Failed to find probe builder for container", "containerName", container.Name)
  return probe.Unknown, "", fmt.Errorf("missing probe handler for %s:%s", format.Pod(pod), container.Name)
}

resultsManager.Set()

kubelet 的 syncLoopIteration() 方法内,接收探针结果,并对结果进行处理。

如果是就绪探针和启动探针,会分别调用到 kl.statusManager.SetContainerReadiness() , kl.statusManager.SetContainerStartup() ,设置 pod 容器状态。

  1. 查找容器。

  2. 修改容器状态值。

  3. 通过 statusManager 将容器和 pod 状态更新到缓存和 api server ,通过 podWorker 节点上 pod 的状态。

  4. resultsManager.Set() 方法主要是将健康检测的结果保存到 worker 缓存中,然后 kubelet 再次同步 pod 的时候,调用到 syncPodFn 这个方法对容器进行同步的时候,会去拿里面 liveness ,startup 的结果来判断是否需要将容器 kill 掉重建。具体代码在 pkg/kubelet/kuberuntime/kuberuntime_manager.go 的 computePodActions() 方法里面看到。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
func (m *manager) SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool) {
  m.podStatusesLock.Lock()
  defer m.podStatusesLock.Unlock()

  pod, ok := m.podManager.GetPodByUID(podUID)
  if !ok {
    klog.V(4).InfoS("Pod has been deleted, no need to update readiness", "podUID", string(podUID))
    return
  }

  oldStatus, found := m.podStatuses[pod.UID]
  if !found {
    klog.InfoS("Container readiness changed before pod has synced",
      "pod", klog.KObj(pod),
      "containerID", containerID.String())
    return
  }

  // Find the container to update.
  containerStatus, _, ok := findContainerStatus(&oldStatus.status, containerID.String())
  if !ok {
    klog.InfoS("Container readiness changed for unknown container",
      "pod", klog.KObj(pod),
      "containerID", containerID.String())
    return
  }

  if containerStatus.Ready == ready {
    klog.V(4).InfoS("Container readiness unchanged",
      "ready", ready,
      "pod", klog.KObj(pod),
      "containerID", containerID.String())
    return
  }

  // Make sure we're not updating the cached version.
  status := *oldStatus.status.DeepCopy()
  containerStatus, _, _ = findContainerStatus(&status, containerID.String())
  containerStatus.Ready = ready

  // updateConditionFunc updates the corresponding type of condition
  updateConditionFunc := func(conditionType v1.PodConditionType, condition v1.PodCondition) {
    conditionIndex := -1
    for i, condition := range status.Conditions {
      if condition.Type == conditionType {
        conditionIndex = i
        break
      }
    }
    if conditionIndex != -1 {
      status.Conditions[conditionIndex] = condition
    } else {
      klog.InfoS("PodStatus missing condition type", "conditionType", conditionType, "status", status)
      status.Conditions = append(status.Conditions, condition)
    }
  }
  updateConditionFunc(v1.PodReady, GeneratePodReadyCondition(&pod.Spec, status.Conditions, status.ContainerStatuses, status.Phase))
  updateConditionFunc(v1.ContainersReady, GenerateContainersReadyCondition(&pod.Spec, status.ContainerStatuses, status.Phase))
  m.updateStatusInternal(pod, status, false)
}

然后再次调用 handleProbeSync() 对 pod 状态进行同步。

总结

  1. kubelet 会对 pod 里面的每个容器进行检查,看看是否配置了探针,如果配置了,则每种探针类型都会给单独拉一个 goroutine 运行,知道 pod 销毁时再退出。

  2. 每种探针又会检查用的哪种协议或者说类型的方式来对容器进行检查,目前看是有 4 种, grpc 需要开启特性门口才能使用。

  3. 存活探针和启动探针的检测结果会存储在 kubelet 所在节点的缓存中,等 kubelet 对容器进行同步的时候,会取出结果进行检查,如果是失败的结果,则将容器 kill 掉重新创建。