目录

kubernetes源码-Endpoints Controller 原理和源码分析

Endpoints Controller 原理和源码分析,源码为 kubernetes 的 release-1.22 分支 .

写在前面

我们前面看的都是常见的控制器,他们都只是负责管理和维护一组组 pod (也就是我们常说的:工作负载),这个时候,要访问 pod 里面的服务只能通过一个个 pod 去访问(假设客户端已打通和 pod 之间的路由,否则无法访问),这不合理,也没有办法负载均衡。

  1. 这个时候,就轮到我们的 service 上场了。 service,它定义了一组 Pods 的逻辑集合和一个用于访问它们的策略。一个 service 的目标 Pods 集合通常是由 Label Selector 来决定的。

  2. 在创建 service 的时候如果我们设置了通过 Label Selector 标签关联相关的 Pods ,那么 Endpoint Controller 就会自动创建一个与 service 同名的 Endpoints 。存储在 etcd 中,用来记录一个 service 对应的所有 Pods 的访问地址和端口。

  3. 不过 Endpoints 只是记录了一个 service 下的一组 pod 的 ip 和 端口信息,正真对 service 的实现是 kube-proxy ,也就是 service 只是用来抽象定义,真正具体干活的是 kube-proxy 。它运行在每一个 node 节点上,负责该节点的网络代理。不过这是后话,我们后面再具体分析 kube-proxy 。

  4. 我们这一小节先看看 Endpoint Controller 是怎么通过 Label Selector 标签关联相关的 Pods 的。

startEndpointController

源码入口文件:cmd/kube-controller-manager/app/core.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
func startEndpointController(ctx ControllerContext) (http.Handler, bool, error) {
  go endpointcontroller.NewEndpointController(
    ctx.InformerFactory.Core().V1().Pods(),
    ctx.InformerFactory.Core().V1().Services(),
    ctx.InformerFactory.Core().V1().Endpoints(),
    ctx.ClientBuilder.ClientOrDie("endpoint-controller"),
    ctx.ComponentConfig.EndpointController.EndpointUpdatesBatchPeriod.Duration,
  ).Run(int(ctx.ComponentConfig.EndpointController.ConcurrentEndpointSyncs), ctx.Stop)
  return nil, true, nil
}

NewEndpointController

EndpointController 构造函数。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
func NewEndpointController(podInformer coreinformers.PodInformer, serviceInformer coreinformers.ServiceInformer,
  endpointsInformer coreinformers.EndpointsInformer, client clientset.Interface, endpointUpdatesBatchPeriod time.Duration) *Controller {
  broadcaster := record.NewBroadcaster()
  broadcaster.StartStructuredLogging(0)
  broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})
  recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "endpoint-controller"})

  if client != nil && client.CoreV1().RESTClient().GetRateLimiter() != nil {
    ratelimiter.RegisterMetricAndTrackRateLimiterUsage("endpoint_controller", client.CoreV1().RESTClient().GetRateLimiter())
  }
  e := &Controller{
    client:           client,
    queue:            workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "endpoint"),
    workerLoopPeriod: time.Second,
  }
  // 监听 service 对象
  serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    AddFunc: e.onServiceUpdate,
    UpdateFunc: func(old, cur interface{}) {
      e.onServiceUpdate(cur)
    },
    DeleteFunc: e.onServiceDelete,
  })
  e.serviceLister = serviceInformer.Lister()
  e.servicesSynced = serviceInformer.Informer().HasSynced
  // 监听 pod 对象
  podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    AddFunc:    e.addPod,
    UpdateFunc: e.updatePod,
    DeleteFunc: e.deletePod,
  })
  e.podLister = podInformer.Lister()
  e.podsSynced = podInformer.Informer().HasSynced
  // 监听 endpoints 对象
  endpointsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    DeleteFunc: e.onEndpointsDelete,
  })
  e.endpointsLister = endpointsInformer.Lister()
  e.endpointsSynced = endpointsInformer.Informer().HasSynced

  e.triggerTimeTracker = endpointutil.NewTriggerTimeTracker()
  e.eventBroadcaster = broadcaster
  e.eventRecorder = recorder

  e.endpointUpdatesBatchPeriod = endpointUpdatesBatchPeriod

  e.serviceSelectorCache = endpointutil.NewServiceSelectorCache()

  return e
}

监听函数

跟其他控制器一样,它也设置了很多的事件监听。

service 对象

AddFunc() 和 UpdateFunc() 事件函数都调用 onServiceUpdate() 去处理,值得注意的是,UpdateFunc() 与其他控制器不同,它只过滤了 cur 对象,old 对象直接丢弃了。

  • onServiceUpdate

    1. 获取 service 对象唯一 key ,调用 e.serviceSelectorCache.Update() 将 Selector 保存到缓存,减少频繁引起的不必要的 cpu 资源消耗。

    2. 将 service 对象加入到同步队列。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
func (e *Controller) onServiceUpdate(obj interface{}) {
  key, err := controller.KeyFunc(obj)
  if err != nil {
    utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
    return
  }

  _ = e.serviceSelectorCache.Update(key, obj.(*v1.Service).Spec.Selector)
  e.queue.Add(key)
}
  • onServiceDelete

    1. 从缓存中将 service 的 Selector 删除。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
func (e *Controller) onServiceDelete(obj interface{}) {
  key, err := controller.KeyFunc(obj)
  if err != nil {
    utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
    return
  }

  e.serviceSelectorCache.Delete(key)
  e.queue.Add(key)
}

pod 对象

pod 事件的处理跟其他控制器有点区别,它会先去获取它所对应的 service 对象,再去做对应的处理。

  • addPod

    1. 根据 pod 的命名空间获取所有 service 对象。

    2. 遍历所有 service 对象,检查 service.Spec.Selector 是否为空,不为空则从缓存获取 Selector 的值(获取不到则将 Selector 更新到缓存中),检查 Selector 是否能够匹配 pod.Labels ,匹配的话,则将 service 的唯一 key 加入集合,并返回这个包含了 service 对象唯一 key 的集合。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
func (e *Controller) addPod(obj interface{}) {
  pod := obj.(*v1.Pod)
  services, err := e.serviceSelectorCache.GetPodServiceMemberships(e.serviceLister, pod)
  if err != nil {
    utilruntime.HandleError(fmt.Errorf("Unable to get pod %s/%s's service memberships: %v", pod.Namespace, pod.Name, err))
    return
  }
  for key := range services {
    e.queue.AddAfter(key, e.endpointUpdatesBatchPeriod)
  }
}

func (sc *ServiceSelectorCache) GetPodServiceMemberships(serviceLister v1listers.ServiceLister, pod *v1.Pod) (sets.String, error) {
  set := sets.String{}
  services, err := serviceLister.Services(pod.Namespace).List(labels.Everything())
  if err != nil {
    return set, err
  }

  var selector labels.Selector
  for _, service := range services {
    if service.Spec.Selector == nil {
      // if the service has a nil selector this means selectors match nothing, not everything.
      continue
    }
    key, err := controller.KeyFunc(service)
    if err != nil {
      return nil, err
    }
    if v, ok := sc.Get(key); ok {
      selector = v
    } else {
      selector = sc.Update(key, service.Spec.Selector)
    }

    if selector.Matches(labels.Set(pod.Labels)) {
      set.Insert(key)
    }
  }
  return set, nil
}
  • updatePod

    1. 检查新旧 pods ResourceVersion,没有变化则不做处理。

    2. 否则调用 podEndpointsChanged() 函数检查 pods 并返回 podChanged, labelsChanged 参数,前者 true 表示 pod 发生了更换新,后者 true 表示 pod 的标签或者主机名发生了变化, 可能影响到 service 的匹配。如果2个参数都为 false ,否则返回空的 service 集合。

    3. 如果标签发生改变,遍历新旧 pod 所在命名空间所有 service ,合并新旧 service 唯一 key 的集合并返回。

    4. 遍历返回的集合,取出里面的 service key 延迟添加到队列。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
func (e *Controller) updatePod(old, cur interface{}) {
  services := endpointutil.GetServicesToUpdateOnPodChange(e.serviceLister, e.serviceSelectorCache, old, cur)
  for key := range services {
    e.queue.AddAfter(key, e.endpointUpdatesBatchPeriod)
  }
}

func GetServicesToUpdateOnPodChange(serviceLister v1listers.ServiceLister, selectorCache *ServiceSelectorCache, old, cur interface{}) sets.String {
  newPod := cur.(*v1.Pod)
  oldPod := old.(*v1.Pod)
  if newPod.ResourceVersion == oldPod.ResourceVersion {
    // Periodic resync will send update events for all known pods.
    // Two different versions of the same pod will always have different RVs
    return sets.String{}
  }

  podChanged, labelsChanged := podEndpointsChanged(oldPod, newPod)

  // If both the pod and labels are unchanged, no update is needed
  if !podChanged && !labelsChanged {
    return sets.String{}
  }

  services, err := selectorCache.GetPodServiceMemberships(serviceLister, newPod)
  if err != nil {
    utilruntime.HandleError(fmt.Errorf("Unable to get pod %s/%s's service memberships: %v", newPod.Namespace, newPod.Name, err))
    return sets.String{}
  }

  if labelsChanged {
    oldServices, err := selectorCache.GetPodServiceMemberships(serviceLister, oldPod)
    if err != nil {
      utilruntime.HandleError(fmt.Errorf("Unable to get pod %s/%s's service memberships: %v", newPod.Namespace, newPod.Name, err))
    }
    services = determineNeededServiceUpdates(oldServices, services, podChanged)
  }

  return services
}
  • deletePod

    1. 交个 addPod() 函数处理。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (e *Controller) deletePod(obj interface{}) {
  pod := endpointutil.GetPodFromDeleteAction(obj)
  if pod != nil {
    e.addPod(pod)
  }
}

func GetPodFromDeleteAction(obj interface{}) *v1.Pod {
  if pod, ok := obj.(*v1.Pod); ok {
    // Enqueue all the services that the pod used to be a member of.
    // This is the same thing we do when we add a pod.
    return pod
  }
  // If we reached here it means the pod was deleted but its final state is unrecorded.
  tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
  if !ok {
    utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))
    return nil
  }
  pod, ok := tombstone.Obj.(*v1.Pod)
  if !ok {
    utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a Pod: %#v", obj))
    return nil
  }
  return pod
}

endpoints 对象

endpoints 对象只监听删除事件。

  • onEndpointsDelete

    1. service 跟 endpoints 同名,相当于把 service 的key 加到队列。
1
2
3
4
5
6
7
8
func (e *Controller) onEndpointsDelete(obj interface{}) {
  key, err := controller.KeyFunc(obj)
  if err != nil {
    utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
    return
  }
  e.queue.Add(key)
}

enqueue

Endpoints 控制器没有重新对 queue 进行封装,而是直接就在事件函数里面调用控制器的限速队列去处理。

Run

默认也是 5 线程。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
func (e *Controller) Run(workers int, stopCh <-chan struct{}) {
  defer utilruntime.HandleCrash()
  defer e.queue.ShutDown()

  klog.Infof("Starting endpoint controller")
  defer klog.Infof("Shutting down endpoint controller")

  if !cache.WaitForNamedCacheSync("endpoint", stopCh, e.podsSynced, e.servicesSynced, e.endpointsSynced) {
    return
  }

  for i := 0; i < workers; i++ {
    go wait.Until(e.worker, e.workerLoopPeriod, stopCh)
  }

  go func() {
    defer utilruntime.HandleCrash()
    e.checkLeftoverEndpoints()
  }()

  <-stopCh
}

func (e *Controller) worker() {
  for e.processNextWorkItem() {
  }
}

func (e *Controller) processNextWorkItem() bool {
  eKey, quit := e.queue.Get()
  if quit {
    return false
  }
  defer e.queue.Done(eKey)

  err := e.syncService(eKey.(string))
  e.handleErr(err, eKey)

  return true
}

func (e *Controller) handleErr(err error, key interface{}) {
  if err == nil {
    e.queue.Forget(key)
    return
  }

  ns, name, keyErr := cache.SplitMetaNamespaceKey(key.(string))
  if keyErr != nil {
    klog.ErrorS(err, "Failed to split meta namespace cache key", "key", key)
  }

  if e.queue.NumRequeues(key) < maxRetries {
    klog.V(2).InfoS("Error syncing endpoints, retrying", "service", klog.KRef(ns, name), "err", err)
    e.queue.AddRateLimited(key)
    return
  }

  klog.Warningf("Dropping service %q out of the queue: %v", key, err)
  e.queue.Forget(key)
  utilruntime.HandleError(err)
}

syncService

控制器的核心逻辑在这里实现。内容比较长,而且工具类函数还不太好理解。

  1. 根据唯一 key 获取 service 实例对象,查找不到时,删除同名 Endpoints 对象。

  2. 如果 service.Spec.Selector == nil 则不做处理直接返回。

  3. 根据 service.Spec.Selector 查找 service 对应命名空间下的所有 pods 。

  4. 查询 service 的 annotations 中是否配置了TolerateUnreadyEndpoints ,表示将 unready 的 pod 也加入到 endpoints 。

  5. triggerTimeTracker 用一份 map 存放了 service 的 pod 和 service 的最后一次更新时间, e.triggerTimeTracker.ComputeEndpointLastChangeTriggerTime() 计算出 service 的最后更新时间(如果 pod 的最后一次更新时间比 service 的晚,则取 pod 的最后一次更新时间,如果取不到则返回 service 的创建时间)。

  6. 获取 EndpointSubset 列表,用 totalReadyEps 、 totalNotReadyEps 记录准备就绪和未就绪的 endpoints 。

  7. 调用 endpointutil.ShouldPodBeInEndpoints() 检查 pod 是否应可以加入 Endpoints ,调用 podToEndpointAddressForService() 把 service、 pod 转化为 EndpointAddress 对象。

  8. 如果是无头服务,计算出 headless service 的 subsets, totalReadyEps, totalNotReadyEps 。

  9. 否则获取 service 的 port 列表,并转化成 EndpointPort 对象,再调用 endpoints.RepackSubsets() 函数,重新打包整理 EndpointSubset 对象(Addresses、NotReadyAddresses、Ports 整合到一起,ports 下哪些 pod id 就绪,哪些未就绪,再进行排序)。

  10. 查找当前 service 对象 Endpoints 实例是否存在,不存在则新建。

  11. 对比 subsets 和标签,并且除去 “service.kubernetes.io/headless” 标签,检查 Endpoints 数量是否超过最大容量,超过最大容量则截断超出的部分,如果 label 和 subsets 一致,则跳过更新。

  12. 调用 clientset 创建/更新 Endpoints 。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
func (e *Controller) syncService(key string) error {
  startTime := time.Now()
  defer func() {
    klog.V(4).Infof("Finished syncing service %q endpoints. (%v)", key, time.Since(startTime))
  }()

  namespace, name, err := cache.SplitMetaNamespaceKey(key)
  if err != nil {
    return err
  }
  service, err := e.serviceLister.Services(namespace).Get(name)
  if err != nil {
    if !errors.IsNotFound(err) {
      return err
    }

    // Delete the corresponding endpoint, as the service has been deleted.
    // TODO: Please note that this will delete an endpoint when a
    // service is deleted. However, if we're down at the time when
    // the service is deleted, we will miss that deletion, so this
    // doesn't completely solve the problem. See #6877.
    err = e.client.CoreV1().Endpoints(namespace).Delete(context.TODO(), name, metav1.DeleteOptions{})
    if err != nil && !errors.IsNotFound(err) {
      return err
    }
    e.triggerTimeTracker.DeleteService(namespace, name)
    return nil
  }

  if service.Spec.Selector == nil {
    // services without a selector receive no endpoints from this controller;
    // these services will receive the endpoints that are created out-of-band via the REST API.
    return nil
  }

  klog.V(5).Infof("About to update endpoints for service %q", key)
  pods, err := e.podLister.Pods(service.Namespace).List(labels.Set(service.Spec.Selector).AsSelectorPreValidated())
  if err != nil {
    // Since we're getting stuff from a local cache, it is
    // basically impossible to get this error.
    return err
  }

  // If the user specified the older (deprecated) annotation, we have to respect it.
  tolerateUnreadyEndpoints := service.Spec.PublishNotReadyAddresses
  if v, ok := service.Annotations[TolerateUnreadyEndpointsAnnotation]; ok {
    b, err := strconv.ParseBool(v)
    if err == nil {
      tolerateUnreadyEndpoints = b
    } else {
      utilruntime.HandleError(fmt.Errorf("Failed to parse annotation %v: %v", TolerateUnreadyEndpointsAnnotation, err))
    }
  }

  // We call ComputeEndpointLastChangeTriggerTime here to make sure that the
  // state of the trigger time tracker gets updated even if the sync turns out
  // to be no-op and we don't update the endpoints object.
  endpointsLastChangeTriggerTime := e.triggerTimeTracker.
    ComputeEndpointLastChangeTriggerTime(namespace, service, pods)

  subsets := []v1.EndpointSubset{}
  var totalReadyEps int
  var totalNotReadyEps int

  for _, pod := range pods {
    if !endpointutil.ShouldPodBeInEndpoints(pod, tolerateUnreadyEndpoints) {
      klog.V(5).Infof("Pod %s/%s is not included on endpoints for Service %s/%s", pod.Namespace, pod.Name, service.Namespace, service.Name)
      continue
    }

    ep, err := podToEndpointAddressForService(service, pod)
    if err != nil {
      // this will happen, if the cluster runs with some nodes configured as dual stack and some as not
      // such as the case of an upgrade..
      klog.V(2).Infof("failed to find endpoint for service:%v with ClusterIP:%v on pod:%v with error:%v", service.Name, service.Spec.ClusterIP, pod.Name, err)
      continue
    }

    epa := *ep
    if endpointutil.ShouldSetHostname(pod, service) {
      epa.Hostname = pod.Spec.Hostname
    }

    // Allow headless service not to have ports.
    if len(service.Spec.Ports) == 0 {
      if service.Spec.ClusterIP == api.ClusterIPNone {
        subsets, totalReadyEps, totalNotReadyEps = addEndpointSubset(subsets, pod, epa, nil, tolerateUnreadyEndpoints)
        // No need to repack subsets for headless service without ports.
      }
    } else {
      for i := range service.Spec.Ports {
        servicePort := &service.Spec.Ports[i]
        portNum, err := podutil.FindPort(pod, servicePort)
        if err != nil {
          klog.V(4).Infof("Failed to find port for service %s/%s: %v", service.Namespace, service.Name, err)
          continue
        }
        epp := endpointPortFromServicePort(servicePort, portNum)

        var readyEps, notReadyEps int
        subsets, readyEps, notReadyEps = addEndpointSubset(subsets, pod, epa, epp, tolerateUnreadyEndpoints)
        totalReadyEps = totalReadyEps + readyEps
        totalNotReadyEps = totalNotReadyEps + notReadyEps
      }
    }
  }
  subsets = endpoints.RepackSubsets(subsets)

  // See if there's actually an update here.
  currentEndpoints, err := e.endpointsLister.Endpoints(service.Namespace).Get(service.Name)
  if err != nil {
    if errors.IsNotFound(err) {
      currentEndpoints = &v1.Endpoints{
        ObjectMeta: metav1.ObjectMeta{
          Name:   service.Name,
          Labels: service.Labels,
        },
      }
    } else {
      return err
    }
  }

  createEndpoints := len(currentEndpoints.ResourceVersion) == 0

  // Compare the sorted subsets and labels
  // Remove the HeadlessService label from the endpoints if it exists,
  // as this won't be set on the service itself
  // and will cause a false negative in this diff check.
  // But first check if it has that label to avoid expensive copies.
  compareLabels := currentEndpoints.Labels
  if _, ok := currentEndpoints.Labels[v1.IsHeadlessService]; ok {
    compareLabels = utillabels.CloneAndRemoveLabel(currentEndpoints.Labels, v1.IsHeadlessService)
  }
  // When comparing the subsets, we ignore the difference in ResourceVersion of Pod to avoid unnecessary Endpoints
  // updates caused by Pod updates that we don't care, e.g. annotation update.
  if !createEndpoints &&
    endpointutil.EndpointSubsetsEqualIgnoreResourceVersion(currentEndpoints.Subsets, subsets) &&
    apiequality.Semantic.DeepEqual(compareLabels, service.Labels) &&
    capacityAnnotationSetCorrectly(currentEndpoints.Annotations, currentEndpoints.Subsets) {
    klog.V(5).Infof("endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name)
    return nil
  }
  newEndpoints := currentEndpoints.DeepCopy()
  newEndpoints.Subsets = subsets
  newEndpoints.Labels = service.Labels
  if newEndpoints.Annotations == nil {
    newEndpoints.Annotations = make(map[string]string)
  }

  if !endpointsLastChangeTriggerTime.IsZero() {
    newEndpoints.Annotations[v1.EndpointsLastChangeTriggerTime] =
      endpointsLastChangeTriggerTime.UTC().Format(time.RFC3339Nano)
  } else { // No new trigger time, clear the annotation.
    delete(newEndpoints.Annotations, v1.EndpointsLastChangeTriggerTime)
  }

  if truncateEndpoints(newEndpoints) {
    newEndpoints.Annotations[v1.EndpointsOverCapacity] = truncated
  } else {
    delete(newEndpoints.Annotations, v1.EndpointsOverCapacity)
  }

  if newEndpoints.Labels == nil {
    newEndpoints.Labels = make(map[string]string)
  }

  if !helper.IsServiceIPSet(service) {
    newEndpoints.Labels = utillabels.CloneAndAddLabel(newEndpoints.Labels, v1.IsHeadlessService, "")
  } else {
    newEndpoints.Labels = utillabels.CloneAndRemoveLabel(newEndpoints.Labels, v1.IsHeadlessService)
  }

  klog.V(4).Infof("Update endpoints for %v/%v, ready: %d not ready: %d", service.Namespace, service.Name, totalReadyEps, totalNotReadyEps)
  if createEndpoints {
    // No previous endpoints, create them
    _, err = e.client.CoreV1().Endpoints(service.Namespace).Create(context.TODO(), newEndpoints, metav1.CreateOptions{})
  } else {
    // Pre-existing
    _, err = e.client.CoreV1().Endpoints(service.Namespace).Update(context.TODO(), newEndpoints, metav1.UpdateOptions{})
  }
  if err != nil {
    if createEndpoints && errors.IsForbidden(err) {
      // A request is forbidden primarily for two reasons:
      // 1. namespace is terminating, endpoint creation is not allowed by default.
      // 2. policy is misconfigured, in which case no service would function anywhere.
      // Given the frequency of 1, we log at a lower level.
      klog.V(5).Infof("Forbidden from creating endpoints: %v", err)

      // If the namespace is terminating, creates will continue to fail. Simply drop the item.
      if errors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
        return nil
      }
    }

    if createEndpoints {
      e.eventRecorder.Eventf(newEndpoints, v1.EventTypeWarning, "FailedToCreateEndpoint", "Failed to create endpoint for service %v/%v: %v", service.Namespace, service.Name, err)
    } else {
      e.eventRecorder.Eventf(newEndpoints, v1.EventTypeWarning, "FailedToUpdateEndpoint", "Failed to update endpoint %v/%v: %v", service.Namespace, service.Name, err)
    }

    return err
  }
  return nil
}

总结

  1. 计算 Endpoints subsets 部分比较复杂,大概逻辑就是,先通过 service 获取到对应的 pods 列表,再从 pods 列表提取出 EndpointAddress ,再通过 addEndpointSubset() 函数组装成 []v1.EndpointSubset 实例 subsets 。

  2. 这个时候,subsets 里面的对象 readyEps, notReadyEps 是各自分开的,通过 endpoints.RepackSubsets() 将他们和 ports 重新整合到一起。

  3. 然后再检查 Endpoints 实例存不存在。不存在则新建,存在则更新 Endpoints 对象。

  4. 整个过程代码量比较多,不过看起来也还算清晰,就是比较费时间。

  5. syncService() 太长了,代码块里面就不写注释了。