目录

kubernetes源码-DaemonSet Controller 原理和源码分析

DaemonSet Controller 原理和源码分析,源码为 kubernetes 的 release-1.22 分支 .

写在前面

个人觉得 DaemonSet 控制器相比 Deployment 控制器逻辑要绕一点,它不用 ReplicaSet 去控制 pod 数量 ,pod 的数量是按节点的数量为依据的。

跟 Deployment 控制器一样,它的入口函数位于 kubernetes/cmd/kube-controller-manager/app/apps.go 里面。

NewDaemonSetsController

我们废话不多说,直接看它的 NewDaemonSetsController 结构。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
func NewDaemonSetsController(
  daemonSetInformer appsinformers.DaemonSetInformer,
  historyInformer appsinformers.ControllerRevisionInformer,
  podInformer coreinformers.PodInformer,
  nodeInformer coreinformers.NodeInformer,
  kubeClient clientset.Interface,
  failedPodsBackoff *flowcontrol.Backoff,
) (*DaemonSetsController, error) {
  eventBroadcaster := record.NewBroadcaster()
  eventBroadcaster.StartStructuredLogging(0)
  eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})

  if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
    if err := ratelimiter.RegisterMetricAndTrackRateLimiterUsage("daemon_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter()); err != nil {
      return nil, err
    }
  }
  dsc := &DaemonSetsController{
    kubeClient:    kubeClient,
    eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "daemonset-controller"}),
    podControl: controller.RealPodControl{
      KubeClient: kubeClient,
      Recorder:   eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "daemonset-controller"}),
    },
    crControl: controller.RealControllerRevisionControl{
      KubeClient: kubeClient,
    },
    burstReplicas: BurstReplicas,
    expectations:  controller.NewControllerExpectations(),
    queue:         workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "daemonset"),
  }

  daemonSetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    AddFunc:    dsc.addDaemonset,
    UpdateFunc: dsc.updateDaemonset,
    DeleteFunc: dsc.deleteDaemonset,
  })
  dsc.dsLister = daemonSetInformer.Lister()
  dsc.dsStoreSynced = daemonSetInformer.Informer().HasSynced

  historyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    AddFunc:    dsc.addHistory,
    UpdateFunc: dsc.updateHistory,
    DeleteFunc: dsc.deleteHistory,
  })
  dsc.historyLister = historyInformer.Lister()
  dsc.historyStoreSynced = historyInformer.Informer().HasSynced

  // Watch for creation/deletion of pods. The reason we watch is that we don't want a daemon set to create/delete
  // more pods until all the effects (expectations) of a daemon set's create/delete have been observed.
  podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    AddFunc:    dsc.addPod,
    UpdateFunc: dsc.updatePod,
    DeleteFunc: dsc.deletePod,
  })
  dsc.podLister = podInformer.Lister()

  // This custom indexer will index pods based on their NodeName which will decrease the amount of pods we need to get in simulate() call.
  podInformer.Informer().GetIndexer().AddIndexers(cache.Indexers{
    "nodeName": indexByPodNodeName,
  })
  dsc.podNodeIndex = podInformer.Informer().GetIndexer()
  dsc.podStoreSynced = podInformer.Informer().HasSynced

  nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    AddFunc:    dsc.addNode,
    UpdateFunc: dsc.updateNode,
  },
  )
  dsc.nodeStoreSynced = nodeInformer.Informer().HasSynced
  dsc.nodeLister = nodeInformer.Lister()

  dsc.syncHandler = dsc.syncDaemonSet
  dsc.enqueueDaemonSet = dsc.enqueue

  dsc.failedPodsBackoff = failedPodsBackoff

  return dsc, nil
}

可以发现,除了监听自身对象和 pod 对象以外,还监听 node 对象 和 历史版本对象,它的回滚是通过更新历史版本来做。

监听函数

跟其他控制器一样,它也设置了很多的事件监听,我们看看其中一些比较有用的。

Daemonset 对象

监听后做一些常规判断后,加入队列,没什么特殊的,我们了解一下,知道有这些函数就可以。

  • addDaemonset

  • updateDaemonset

  • deleteDaemonset

History 对象

这里比较需要关注的就是 updateHistory ,它检测到历史版本变化后,会把对应的 Daemonset 对象滚到到对应的历史版本去。

  • addHistory

  • updateHistory

  • deleteHistory

Node 对象

在 Deployment 控制器,是没看到有监听节点事件的,这是 Daemonset 控制器的特色。

addNode

监听节点新增事件。

  1. 监听到节点添加进来的事件后,会使用 nodeShouldRunDaemonPod() 方法判断 Daemonset 对象 pod 应不应该跑在该节点上面,如果当前 Daemonset 对象可以跑在新加进来的节点,则将 Daemonset 对象丢进队列做进一步的调谐。

  2. nodeShouldRunDaemonPod() 原理是先根据 Daemonset 对象 New 一个 pod 对象,给这个 pod 加一堆污点容忍度,然后去比对新加入的节点身上的污点,如果比对到第一个不容忍的污点则返回首个不匹配的污点。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
func (dsc *DaemonSetsController) addNode(obj interface{}) {
  // TODO: it'd be nice to pass a hint with these enqueues, so that each ds would only examine the added node (unless it has other work to do, too).
  dsList, err := dsc.dsLister.List(labels.Everything())
  if err != nil {
    klog.V(4).Infof("Error enqueueing daemon sets: %v", err)
    return
  }
  node := obj.(*v1.Node)
  for _, ds := range dsList {
    if shouldRun, _ := dsc.nodeShouldRunDaemonPod(node, ds); shouldRun {
      dsc.enqueueDaemonSet(ds)
    }
  }
}

updateNode

监听节点更新事件。

  1. 对比 2 个节点的 NodeCondition ,如果一致,则判断 2 个节点是否其他配置也一样,是的话就跳过 Daemonset 对象同步。

  2. 如果不一致,则对比 old 和 new Daemonset 对象能不能在新加入的节点上面跑 pod,如果旧 Daemonset 和新的 Daemonset 出现不一样的比对结果,则需要重新同步 Daemonset 对象。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
func (dsc *DaemonSetsController) updateNode(old, cur interface{}) {
  oldNode := old.(*v1.Node)
  curNode := cur.(*v1.Node)
  if shouldIgnoreNodeUpdate(*oldNode, *curNode) {
    return
  }

  dsList, err := dsc.dsLister.List(labels.Everything())
  if err != nil {
    klog.V(4).Infof("Error listing daemon sets: %v", err)
    return
  }
  // TODO: it'd be nice to pass a hint with these enqueues, so that each ds would only examine the added node (unless it has other work to do, too).
  for _, ds := range dsList {
    oldShouldRun, oldShouldContinueRunning := dsc.nodeShouldRunDaemonPod(oldNode, ds)
    currentShouldRun, currentShouldContinueRunning := dsc.nodeShouldRunDaemonPod(curNode, ds)
    if (oldShouldRun != currentShouldRun) || (oldShouldContinueRunning != currentShouldContinueRunning) {
      dsc.enqueueDaemonSet(ds)
    }
  }
}

我们发现这里没有监听 deleteNode 事件,我个人认为,节点如果删除, pod 会重新发生调度,然后根据 manage 的逻辑,会判断 pod 该不该运行在其他节点上,不该运行则删除,所以这里才没做对应的事件监听和处理。

pod 对象

没什么特别的,了解一下即可。

  • addPod

  • updatePod

  • deletePod

enqueue

跟其他控制器一样,也是延迟入列和立即入列,没什么特殊的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
func (dsc *DaemonSetsController) enqueue(ds *apps.DaemonSet) {
  key, err := controller.KeyFunc(ds)
  if err != nil {
    utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", ds, err))
    return
  }

  // TODO: Handle overlapping controllers better. See comment in ReplicationManager.
  dsc.queue.Add(key)
}

func (dsc *DaemonSetsController) enqueueDaemonSetAfter(obj interface{}, after time.Duration) {
  key, err := controller.KeyFunc(obj)
  if err != nil {
    utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
    return
  }

  // TODO: Handle overlapping controllers better. See comment in ReplicationManager.
  dsc.queue.AddAfter(key, after)
}

Run

默认2个线程,多了个 failedPodsBackoff.GC 函数,用于周期性重启失败的 pod 的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
func (dsc *DaemonSetsController) Run(workers int, stopCh <-chan struct{}) {
  defer utilruntime.HandleCrash()
  defer dsc.queue.ShutDown()

  klog.Infof("Starting daemon sets controller")
  defer klog.Infof("Shutting down daemon sets controller")

  if !cache.WaitForNamedCacheSync("daemon sets", stopCh, dsc.podStoreSynced, dsc.nodeStoreSynced, dsc.historyStoreSynced, dsc.dsStoreSynced) {
    return
  }

  for i := 0; i < workers; i++ {
    go wait.Until(dsc.runWorker, time.Second, stopCh)
  }

  // 我们注意到,这里多了个 GC,后面解释
  go wait.Until(dsc.failedPodsBackoff.GC, BackoffGCInterval, stopCh)

  <-stopCh
}

func (dsc *DaemonSetsController) runWorker() {
  for dsc.processNextWorkItem() {
  }
}

// processNextWorkItem deals with one key off the queue.  It returns false when it's time to quit.
func (dsc *DaemonSetsController) processNextWorkItem() bool {
  dsKey, quit := dsc.queue.Get()
  if quit {
    return false
  }
  defer dsc.queue.Done(dsKey)

  err := dsc.syncHandler(dsKey.(string))
  if err == nil {
    dsc.queue.Forget(dsKey)
    return true
  }

  utilruntime.HandleError(fmt.Errorf("%v failed with : %v", dsKey, err))
  dsc.queue.AddRateLimited(dsKey)

  return true
}

syncDaemonSet

控制器的核心逻辑在这里实现。

  1. 获取 DaemonSet cur old 历史版本对象,获取 cur.Labels[apps.DefaultDaemonSetUniqueLabelKey] 的值,用于后面的同步操作。

  2. 还没满足期望则继续等待期望达成,在此期间只同步 DaemonSet status 状态。

  3. 满足期望则先执行 manage 和 syncNodes 维持 pod 数量。

  4. 再根据策略类型执行 rollingUpdate ,维护 DaemonSet 对象的版本和其他信息的同步。如果更新策略是 OnDelete ,则等手动删除 pod 的时候再创建新 pod ,如果是 RollingUpdate ,则根据 maxSurge, maxUnavailable 的值,做相应的动作(如果 maxSurge, maxUnavailable 都是默认值,根据 rollingUpdate 方法的逻辑,默认是删除1个 pod ,等 pod 删完后再新建一个新的 pod ,以此类推)。

  5. 清理历史版本。

  6. 同步 DaemonSet Status 状态字段。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
func (dsc *DaemonSetsController) syncDaemonSet(key string) error {
  startTime := dsc.failedPodsBackoff.Clock.Now()

  defer func() {
    klog.V(4).Infof("Finished syncing daemon set %q (%v)", key, dsc.failedPodsBackoff.Clock.Now().Sub(startTime))
  }()

  namespace, name, err := cache.SplitMetaNamespaceKey(key)
  if err != nil {
    return err
  }
  ds, err := dsc.dsLister.DaemonSets(namespace).Get(name)
  if apierrors.IsNotFound(err) {
    klog.V(3).Infof("daemon set has been deleted %v", key)
    dsc.expectations.DeleteExpectations(key)
    return nil
  }
  if err != nil {
    return fmt.Errorf("unable to retrieve ds %v from store: %v", key, err)
  }

  nodeList, err := dsc.nodeLister.List(labels.Everything())
  if err != nil {
    return fmt.Errorf("couldn't get list of nodes when syncing daemon set %#v: %v", ds, err)
  }

  everything := metav1.LabelSelector{}
  if reflect.DeepEqual(ds.Spec.Selector, &everything) {
    dsc.eventRecorder.Eventf(ds, v1.EventTypeWarning, SelectingAllReason, "This daemon set is selecting all pods. A non-empty selector is required.")
    return nil
  }

  // 获取 DaemonSet 对象唯一 key
  dsKey, err := controller.KeyFunc(ds)
  if err != nil {
    return fmt.Errorf("couldn't get key for object %#v: %v", ds, err)
  }

  // DaemonSet 对象正在删除
  if ds.DeletionTimestamp != nil {
    return nil
  }

  // 获取 DaemonSet 历史版本信息
  cur, old, err := dsc.constructHistory(ds)
  if err != nil {
    return fmt.Errorf("failed to construct revisions of DaemonSet: %v", err)
  }
  hash := cur.Labels[apps.DefaultDaemonSetUniqueLabelKey]

  if !dsc.expectations.SatisfiedExpectations(dsKey) {
    // 只同步状态信息
    return dsc.updateDaemonSetStatus(ds, nodeList, hash, false)
  }
  // 同步 pod 数量
  err = dsc.manage(ds, nodeList, hash)
  if err != nil {
    return err
  }

  // 同步 DaemonSet 对象信息
  if dsc.expectations.SatisfiedExpectations(dsKey) {
    switch ds.Spec.UpdateStrategy.Type {
    case apps.OnDeleteDaemonSetStrategyType:
    case apps.RollingUpdateDaemonSetStrategyType:
      err = dsc.rollingUpdate(ds, nodeList, hash)
    }
    if err != nil {
      return err
    }
  }
  // 历史历史版本
  err = dsc.cleanupHistory(ds, old)
  if err != nil {
    return fmt.Errorf("failed to clean up revisions of DaemonSet: %v", err)
  }
  // 更新状态信息
  return dsc.updateDaemonSetStatus(ds, nodeList, hash, true)
}

manage

  1. 获取 DaemonSet 对象所有 pod ,并以 nodename 分组。

  2. 检查所有节点,如果节点条件不允许上面跑 DaemonSet 的 pod ,则删除 pod ,反之则新建 pod 。同时,如果 pod 处于失败状态,会尝试周期性 kill 掉(间隔:从 1s 开始,指数级增长, dsc.failedPodsBackoff.GC 一分钟执行一次,将重启超过 30min 还失败的 pod 的重启间隔重置为 1s)。

  3. 检查 DaemonSet 的滚动策略,如果 pod 大于 1,且 maxSurge > 0 ,则删除节点上所有 pod ,重新创建(因为这种策略在更新期间,同时只允许一个 pod 运行)

  4. 调度到不存在的节点的 pod 也需要删除。

  5. manage 核心逻辑是统计需要新建 pod 的节点和需要删除的 pod ,并把结果交给 syncNodes 方法执行。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (dsc *DaemonSetsController) manage(ds *apps.DaemonSet, nodeList []*v1.Node, hash string) error {
  // Find out the pods which are created for the nodes by DaemonSet.
  nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds)
  if err != nil {
    return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
  }

  // 分拣出需要创建和删除的 pod 列表
  var nodesNeedingDaemonPods, podsToDelete []string
  for _, node := range nodeList {
    nodesNeedingDaemonPodsOnNode, podsToDeleteOnNode := dsc.podsShouldBeOnNode(
      node, nodeToDaemonPods, ds, hash)

    nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, nodesNeedingDaemonPodsOnNode...)
    podsToDelete = append(podsToDelete, podsToDeleteOnNode...)
  }

  // 需要被删除的 pod 列表
  podsToDelete = append(podsToDelete, getUnscheduledPodsWithoutNode(nodeList, nodeToDaemonPods)...)

  // 调用 syncNodes 去做删除和创建 pod 的操作
  if err = dsc.syncNodes(ds, podsToDelete, nodesNeedingDaemonPods, hash); err != nil {
    return err
  }

  return nil
}

syncNodes

  1. 计算要新建和删除的 pod 数量,设置期望。

  2. 新建的之前先给 pod 添加亲和性相关的字段,再执行新建。

  3. 删除 pod 。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
func (dsc *DaemonSetsController) syncNodes(ds *apps.DaemonSet, podsToDelete, nodesNeedingDaemonPods []string, hash string) error {
  // We need to set expectations before creating/deleting pods to avoid race conditions.
  dsKey, err := controller.KeyFunc(ds)
  if err != nil {
    return fmt.Errorf("couldn't get key for object %#v: %v", ds, err)
  }

  createDiff := len(nodesNeedingDaemonPods)
  deleteDiff := len(podsToDelete)

  if createDiff > dsc.burstReplicas {
    createDiff = dsc.burstReplicas
  }
  if deleteDiff > dsc.burstReplicas {
    deleteDiff = dsc.burstReplicas
  }

  dsc.expectations.SetExpectations(dsKey, createDiff, deleteDiff)

  // error channel
  errCh := make(chan error, createDiff+deleteDiff)

  klog.V(4).Infof("Nodes needing daemon pods for daemon set %s: %+v, creating %d", ds.Name, nodesNeedingDaemonPods, createDiff)
  createWait := sync.WaitGroup{}
  // If the returned error is not nil we have a parse error.
  // The controller handles this via the hash.
  generation, err := util.GetTemplateGeneration(ds)
  if err != nil {
    generation = nil
  }
  template := util.CreatePodTemplate(ds.Spec.Template, generation, hash)
  // 指数批量创建
  batchSize := integer.IntMin(createDiff, controller.SlowStartInitialBatchSize)
  for pos := 0; createDiff > pos; batchSize, pos = integer.IntMin(2*batchSize, createDiff-(pos+batchSize)), pos+batchSize {
    errorCount := len(errCh)
    createWait.Add(batchSize)
    for i := pos; i < pos+batchSize; i++ {
      go func(ix int) {
        defer createWait.Done()

        podTemplate := template.DeepCopy()
        // 给 pod 添加亲和性
        podTemplate.Spec.Affinity = util.ReplaceDaemonSetPodNodeNameNodeAffinity(
          podTemplate.Spec.Affinity, nodesNeedingDaemonPods[ix])

        err := dsc.podControl.CreatePods(ds.Namespace, podTemplate,
          ds, metav1.NewControllerRef(ds, controllerKind))

        if err != nil {
          if apierrors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
            // 命名空间不存在的错误
            return
          }
        }
        if err != nil {
          klog.V(2).Infof("Failed creation, decrementing expectations for set %q/%q", ds.Namespace, ds.Name)
          dsc.expectations.CreationObserved(dsKey)
          errCh <- err
          utilruntime.HandleError(err)
        }
      }(i)
    }
    createWait.Wait()
    // 等待多有 pod 创建完成
    skippedPods := createDiff - (batchSize + pos)
    if errorCount < len(errCh) && skippedPods > 0 {
      klog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for set %q/%q", skippedPods, ds.Namespace, ds.Name)
      dsc.expectations.LowerExpectations(dsKey, skippedPods, 0)
      // The skipped pods will be retried later. The next controller resync will
      // retry the slow start process.
      break
    }
  }

  klog.V(4).Infof("Pods to delete for daemon set %s: %+v, deleting %d", ds.Name, podsToDelete, deleteDiff)
  deleteWait := sync.WaitGroup{}
  // 删除 pod 
  deleteWait.Add(deleteDiff)
  for i := 0; i < deleteDiff; i++ {
    go func(ix int) {
      defer deleteWait.Done()
      if err := dsc.podControl.DeletePod(ds.Namespace, podsToDelete[ix], ds); err != nil {
        dsc.expectations.DeletionObserved(dsKey)
        if !apierrors.IsNotFound(err) {
          klog.V(2).Infof("Failed deletion, decremented expectations for set %q/%q", ds.Namespace, ds.Name)
          errCh <- err
          utilruntime.HandleError(err)
        }
      }
    }(i)
  }
  deleteWait.Wait()

  // collect errors if any for proper reporting/retry logic in the controller
  errors := []error{}
  close(errCh)
  for err := range errCh {
    errors = append(errors, err)
  }
  return utilerrors.NewAggregate(errors)
}

总结

  • DaemonSet 不用 replicaset 去管理 pod 的数量。

  • 根据历史版本 hash 来做版本管理。

  • 周期性重启失败的 pod 。