目录

kubernetes源码-EndpointSlice Controller 原理和源码分析

EndpointSlice Controller 原理和源码分析,源码为 kubernetes 的 release-1.22 分支 .

写在前面

我们在之前的 kube-proxy 中有看到过 EndpointSlice ,那什么是 EndpointSlice ,具体什么作用?

  1. 在 Kubernetes v1.19 中,此功能将默认启用。

  2. EndpointSlice 是一个新 API,它提供了 Endpoint API 可伸缩和可拓展的替代方案。EndpointSlice 会跟踪 Service Pod 的 IP 地址、端口、readiness 和拓扑信息。

  3. EndpointSlice API 大大提高了网络的可伸缩性,因为现在添加或删除 Pod 时,只需更新 1 个小的 EndpointSlice。尤其是成百上千个 Pod 支持单个 Service 时,差异将非常明显。

  4. 如果使用 Endpoint API,Service 只有一个 Endpoint 资源。这意味着它需要为 Service 的每个 Pod 都存储好 IP 地址和端口(网络端点),这需要大量的 API 资源。另外,kube-proxy 会在每个节点上运行,并监控 Endpoint 资源的任何更新。如果 Endpoint 资源中有一个端口发生更改,那么整个对象都会分发到 kube-proxy 的每个实例。

  5. Endpoint API 另一个局限是,它会限制跟踪 Service 的网络端点数量。一般存储在 etcd 中的对象默认大小限制为 1.5MB。在某些情况下,它会将 Endpoint 资源限制为 5000 个 Pod IP。对于大多数用户而言,这没什么关系,但是对于接近这个大小的 Service 而言,就有大问题了。

  6. 为了说明这些问题的严重程度,这里举一个简单的例子。如果一个 Service 有 5000 个 Pod,它如果有 1.5MB 的 Endpoint 资源。当该列表中的某个网络端点发生了变化,那么就要将完整的 Endpoint 资源分发给集群中的每个节点。在具有 3000 个节点的大型集群中,这会是个很大的问题。每次更新将跨集群发送 4.5GB 的数据(1.5MB * 3000 , 即 Endpoint 大小 * 节点个数),并且每次端点更新都要发送这么多数据。想象一下,如果进行一次滚动更新,共有 5000 个 Pod 全部被替换,那么传输的数据量将超过 22 TB。本段内容引用于此

入口函数

入口函数位于 cmd/kube-controller-manager/app/discovery.go

这里比较奇怪为什么起名是 discovery ?纯属个人疑问。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
func startEndpointSliceController(ctx ControllerContext) (http.Handler, bool, error) {
  go endpointslicecontroller.NewController(
    ctx.InformerFactory.Core().V1().Pods(),
    ctx.InformerFactory.Core().V1().Services(),
    ctx.InformerFactory.Core().V1().Nodes(),
    ctx.InformerFactory.Discovery().V1().EndpointSlices(),
    ctx.ComponentConfig.EndpointSliceController.MaxEndpointsPerSlice,
    ctx.ClientBuilder.ClientOrDie("endpointslice-controller"),
    ctx.ComponentConfig.EndpointSliceController.EndpointUpdatesBatchPeriod.Duration,
  ).Run(int(ctx.ComponentConfig.EndpointSliceController.ConcurrentServiceEndpointSyncs), ctx.Stop)
  return nil, true, nil
}

构造函数

主要重点留意的几个变量和函数。

变量:

  • c.maxEndpointsPerSlice

    每组切片的最大 endpoint 数量。

  • features.TopologyAwareHints

    是否开启拓扑感知提示特性,就近路由,比如节点 A B 属于同一区域,C D 属于另一个区域,pod 在 A B C D 节点上各有一个,查看 A B 节点上面的 ipvs 规则,会发现,通往该 pod service 的流量的 ipvs 后端,只有 A B 节点上的 pod ip ,C D 同理 ,可以参考这篇文章,说得很直白:Kubernetes Service 开启拓扑感知(就近访问)能力

函数:

  • c.triggerTimeTracker

    计算 service 和 pods 最后一次更新时间,并存到缓存,然会 2 者中最后一次更新的时间。

  • c.reconciler

    控制器的核心逻辑所在。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
// NewController creates and initializes a new Controller
func NewController(podInformer coreinformers.PodInformer,
  serviceInformer coreinformers.ServiceInformer,
  nodeInformer coreinformers.NodeInformer,
  endpointSliceInformer discoveryinformers.EndpointSliceInformer,
  maxEndpointsPerSlice int32,
  client clientset.Interface,
  endpointUpdatesBatchPeriod time.Duration,
) *Controller {
  broadcaster := record.NewBroadcaster()
  broadcaster.StartStructuredLogging(0)
  broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})
  recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "endpoint-slice-controller"})

  if client != nil && client.CoreV1().RESTClient().GetRateLimiter() != nil {
    ratelimiter.RegisterMetricAndTrackRateLimiterUsage("endpoint_slice_controller", client.DiscoveryV1().RESTClient().GetRateLimiter())
  }

  endpointslicemetrics.RegisterMetrics()

  c := &Controller{
    client: client,
    // This is similar to the DefaultControllerRateLimiter, just with a
    // significantly higher default backoff (1s vs 5ms). This controller
    // processes events that can require significant EndpointSlice changes,
    // such as an update to a Service or Deployment. A more significant
    // rate limit back off here helps ensure that the Controller does not
    // overwhelm the API Server.
    queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewMaxOfRateLimiter(
      workqueue.NewItemExponentialFailureRateLimiter(defaultSyncBackOff, maxSyncBackOff),
      // 10 qps, 100 bucket size. This is only for retry speed and its
      // only the overall factor (not per item).
      &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
    ), "endpoint_slice"),
    workerLoopPeriod: time.Second,
  }

  serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    AddFunc: c.onServiceUpdate,
    UpdateFunc: func(old, cur interface{}) {
      c.onServiceUpdate(cur)
    },
    DeleteFunc: c.onServiceDelete,
  })
  c.serviceLister = serviceInformer.Lister()
  c.servicesSynced = serviceInformer.Informer().HasSynced

  podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    AddFunc:    c.addPod,
    UpdateFunc: c.updatePod,
    DeleteFunc: c.deletePod,
  })
  c.podLister = podInformer.Lister()
  c.podsSynced = podInformer.Informer().HasSynced

  c.nodeLister = nodeInformer.Lister()
  c.nodesSynced = nodeInformer.Informer().HasSynced

  endpointSliceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    AddFunc:    c.onEndpointSliceAdd,
    UpdateFunc: c.onEndpointSliceUpdate,
    DeleteFunc: c.onEndpointSliceDelete,
  })

  c.endpointSliceLister = endpointSliceInformer.Lister()
  c.endpointSlicesSynced = endpointSliceInformer.Informer().HasSynced
  c.endpointSliceTracker = endpointsliceutil.NewEndpointSliceTracker()

  c.maxEndpointsPerSlice = maxEndpointsPerSlice

  c.triggerTimeTracker = endpointutil.NewTriggerTimeTracker()

  c.eventBroadcaster = broadcaster
  c.eventRecorder = recorder

  c.endpointUpdatesBatchPeriod = endpointUpdatesBatchPeriod
  c.serviceSelectorCache = endpointutil.NewServiceSelectorCache()

  if utilfeature.DefaultFeatureGate.Enabled(features.TopologyAwareHints) {
    nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
      AddFunc:    c.addNode,
      UpdateFunc: c.updateNode,
      DeleteFunc: c.deleteNode,
    })

    c.topologyCache = topologycache.NewTopologyCache()
  }

  c.reconciler = &reconciler{
    client:               c.client,
    nodeLister:           c.nodeLister,
    maxEndpointsPerSlice: c.maxEndpointsPerSlice,
    endpointSliceTracker: c.endpointSliceTracker,
    metricsCache:         endpointslicemetrics.NewCache(maxEndpointsPerSlice),
    topologyCache:        c.topologyCache,
  }

  return c
}

监听函数

监听 service pod node endpointSlice 对象。

service 对象

  • AddFunc

    onServiceUpdate 缓存 service Selector ,并加入令牌桶队列。

  • UpdateFunc

    onServiceUpdate 缓存 service Selector ,并加入令牌桶队列。

  • DeleteFunc

    onServiceDelete 删除缓存的 service Selector ,并加入令牌桶队列。

pod 对象

  • AddFunc

    addPod

    根据 pod 获取 service 对象,并把对应的 service 加入到延迟队列。

  • UpdateFunc

    updatePod 同上。

  • DeleteFunc

    deletePod

    如果 pod 对象不为 nil ,调用 addPod 事件函数处理。

node 对象

只有启用了 TopologyAwareHints 特性,才有对应的监听事件。

  • addNode

    调用 c.checkNodeTopologyDistribution() 检查节点拓扑分布情况。

  • updateNode

    检查节点状态,调用 c.checkNodeTopologyDistribution() 检查节点拓扑分布情况。

  • deleteNode

    调用 c.checkNodeTopologyDistribution() 检查节点拓扑分布情况。

endpointSlice 对象

  • AddFunc

    onEndpointSliceAdd

    调用 c.queueServiceForEndpointSlice() 接口,获取 service 唯一 key ,并计算更新延迟,按照延迟时间加入到延迟队列。

  • UpdateFunc

    onEndpointSliceUpdate

    最终调用 c.queueServiceForEndpointSlice() 接口,获取 service 唯一 key ,并计算更新延迟,按照延迟时间加入到延迟队列。

  • DeleteFunc

    onEndpointSliceDelete

    判断是否需要被删除,如果不希望被删除,则调用 c.queueServiceForEndpointSlice() 接口,获取 service 唯一 key ,并计算更新延迟,按照延迟时间加入到延迟队列。

Run

跟其他控制器一样,不需要过多讲解。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
// Run will not return until stopCh is closed.
func (c *Controller) Run(workers int, stopCh <-chan struct{}) {
  defer utilruntime.HandleCrash()
  defer c.queue.ShutDown()

  klog.Infof("Starting endpoint slice controller")
  defer klog.Infof("Shutting down endpoint slice controller")

  if !cache.WaitForNamedCacheSync("endpoint_slice", stopCh, c.podsSynced, c.servicesSynced, c.endpointSlicesSynced, c.nodesSynced) {
    return
  }

  for i := 0; i < workers; i++ {
    go wait.Until(c.worker, c.workerLoopPeriod, stopCh)
  }

  go func() {
    defer utilruntime.HandleCrash()
  }()

  <-stopCh
}

// worker runs a worker thread that just dequeues items, processes them, and
// marks them done. You may run as many of these in parallel as you wish; the
// workqueue guarantees that they will not end up processing the same service
// at the same time
func (c *Controller) worker() {
  for c.processNextWorkItem() {
  }
}

func (c *Controller) processNextWorkItem() bool {
  cKey, quit := c.queue.Get()
  if quit {
    return false
  }
  defer c.queue.Done(cKey)

  err := c.syncService(cKey.(string))
  c.handleErr(err, cKey)

  return true
}

func (c *Controller) handleErr(err error, key interface{}) {
  trackSync(err)

  if err == nil {
    c.queue.Forget(key)
    return
  }

  if c.queue.NumRequeues(key) < maxRetries {
    klog.Warningf("Error syncing endpoint slices for service %q, retrying. Error: %v", key, err)
    c.queue.AddRateLimited(key)
    return
  }

  klog.Warningf("Retry budget exceeded, dropping service %q out of the queue: %v", key, err)
  c.queue.Forget(key)
  utilruntime.HandleError(err)
}

核心逻辑

核心逻辑入口 syncService ,实际最终调用的是 r.finalize() 函数。

syncService

  1. 获取 service 对象。

  2. 根据 service 的标签获取 pods (这里获取到的 pods 就是 slicesToCreate 凭据的点)。

  3. 根据 service 命名空间和标签获取 apiserver 已有的所有关联的 endpointSlices 。

  4. 过滤掉被标记为删除的 endpointSlice 。

  5. 实际最终调用 c.reconciler.reconcile() 。

reconcile

核心逻辑函数,所有核心逻辑基本都在这里面实现了。

c.reconciler.reconcile()

存放切片的变量:数组 slicesToDelete , map slicesByAddressType

  1. 检查 endpointSlice 的 AddressType ,没匹配到类型的加入到 slicesToDelete 数组等待删除。匹配到响应的地址类型的 endpointSlice 加入到 slicesByAddressType 数组。

  2. 不同地址类型的 endpointSlice 都会调用 r.reconcileByAddressType() 函数去调谐,传的参数里面就包含了地址类型。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
func (r *reconciler) reconcile(service *corev1.Service, pods []*corev1.Pod, existingSlices []*discovery.EndpointSlice, triggerTime time.Time) error {
  slicesToDelete := []*discovery.EndpointSlice{}                                    // slices that are no longer  matching any address the service has
  errs := []error{}                                                                 // all errors generated in the process of reconciling
  slicesByAddressType := make(map[discovery.AddressType][]*discovery.EndpointSlice) // slices by address type

  // addresses that this service supports [o(1) find]
  serviceSupportedAddressesTypes := getAddressTypesForService(service)

  // loop through slices identifying their address type.
  // slices that no longer match address type supported by services
  // go to delete, other slices goes to the reconciler machinery
  // for further adjustment
  for _, existingSlice := range existingSlices {
    // service no longer supports that address type, add it to deleted slices
    if _, ok := serviceSupportedAddressesTypes[existingSlice.AddressType]; !ok {
      if r.topologyCache != nil {
        svcKey, err := serviceControllerKey(existingSlice)
        if err != nil {
          klog.Warningf("Couldn't get key to remove EndpointSlice from topology cache %+v: %v", existingSlice, err)
        } else {
          r.topologyCache.RemoveHints(svcKey, existingSlice.AddressType)
        }
      }

      slicesToDelete = append(slicesToDelete, existingSlice)
      continue
    }

    // add list if it is not on our map
    if _, ok := slicesByAddressType[existingSlice.AddressType]; !ok {
      slicesByAddressType[existingSlice.AddressType] = make([]*discovery.EndpointSlice, 0, 1)
    }

    slicesByAddressType[existingSlice.AddressType] = append(slicesByAddressType[existingSlice.AddressType], existingSlice)
  }

  // reconcile for existing.
  for addressType := range serviceSupportedAddressesTypes {
    existingSlices := slicesByAddressType[addressType]
    err := r.reconcileByAddressType(service, pods, existingSlices, triggerTime, addressType)
    if err != nil {
      errs = append(errs, err)
    }
  }

  // delete those which are of addressType that is no longer supported
  // by the service
  for _, sliceToDelete := range slicesToDelete {
    err := r.client.DiscoveryV1().EndpointSlices(service.Namespace).Delete(context.TODO(), sliceToDelete.Name, metav1.DeleteOptions{})
    if err != nil {
      errs = append(errs, fmt.Errorf("error deleting %s EndpointSlice for Service %s/%s: %w", sliceToDelete.Name, service.Namespace, service.Name, err))
    } else {
      r.endpointSliceTracker.ExpectDeletion(sliceToDelete)
      metrics.EndpointSliceChanges.WithLabelValues("delete").Inc()
    }
  }

  return utilerrors.NewAggregate(errs)
}

r.reconcileByAddressType()

  1. 数组 slicesToCreate 、 slicesToUpdate 、 slicesToDelete 。

  2. 构建一个用于存放 endpointSlice 存在状态的结构体 existingSlicesByPortMap 。

  3. 构建一个用于存放 endpointSlice 期望状态的结构体 desiredEndpointsByPortMap 。

  4. 确定每组 endpointSlice 是否需要更新,调用 r.reconcileByPortMapping() 计算需要更新的 endpointSlice ,并返回 slicesToCreate, slicesToUpdate, slicesToDelete, numAdded, numRemoved 对象(计算过程遍历每个 slice 并填满至设定好的 endpoint 个数,默认 100 个,总长度不满 100 的单独一个 slice )给 r.finalize() 函数处理。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
func (r *reconciler) reconcileByAddressType(service *corev1.Service, pods []*corev1.Pod, existingSlices []*discovery.EndpointSlice, triggerTime time.Time, addressType discovery.AddressType) error {

  slicesToCreate := []*discovery.EndpointSlice{}
  slicesToUpdate := []*discovery.EndpointSlice{}
  slicesToDelete := []*discovery.EndpointSlice{}

  // Build data structures for existing state.
  existingSlicesByPortMap := map[endpointutil.PortMapKey][]*discovery.EndpointSlice{}
  numExistingEndpoints := 0
  for _, existingSlice := range existingSlices {
    if ownedBy(existingSlice, service) {
      epHash := endpointutil.NewPortMapKey(existingSlice.Ports)
      existingSlicesByPortMap[epHash] = append(existingSlicesByPortMap[epHash], existingSlice)
      numExistingEndpoints += len(existingSlice.Endpoints)
    } else {
      slicesToDelete = append(slicesToDelete, existingSlice)
    }
  }

  // Build data structures for desired state.
  desiredMetaByPortMap := map[endpointutil.PortMapKey]*endpointMeta{}
  desiredEndpointsByPortMap := map[endpointutil.PortMapKey]endpointsliceutil.EndpointSet{}
  numDesiredEndpoints := 0

  for _, pod := range pods {
    includeTerminating := service.Spec.PublishNotReadyAddresses || utilfeature.DefaultFeatureGate.Enabled(features.EndpointSliceTerminatingCondition)
    if !endpointutil.ShouldPodBeInEndpoints(pod, includeTerminating) {
      continue
    }

    endpointPorts := getEndpointPorts(service, pod)
    epHash := endpointutil.NewPortMapKey(endpointPorts)
    if _, ok := desiredEndpointsByPortMap[epHash]; !ok {
      desiredEndpointsByPortMap[epHash] = endpointsliceutil.EndpointSet{}
    }

    if _, ok := desiredMetaByPortMap[epHash]; !ok {
      desiredMetaByPortMap[epHash] = &endpointMeta{
        AddressType: addressType,
        Ports:       endpointPorts,
      }
    }

    node, err := r.nodeLister.Get(pod.Spec.NodeName)
    if err != nil {
      return err
    }
    endpoint := podToEndpoint(pod, node, service, addressType)
    if len(endpoint.Addresses) > 0 {
      desiredEndpointsByPortMap[epHash].Insert(&endpoint)
      numDesiredEndpoints++
    }
  }

  spMetrics := metrics.NewServicePortCache()
  totalAdded := 0
  totalRemoved := 0

  // Determine changes necessary for each group of slices by port map.
  for portMap, desiredEndpoints := range desiredEndpointsByPortMap {
    numEndpoints := len(desiredEndpoints)
    pmSlicesToCreate, pmSlicesToUpdate, pmSlicesToDelete, added, removed := r.reconcileByPortMapping(
      service, existingSlicesByPortMap[portMap], desiredEndpoints, desiredMetaByPortMap[portMap])

    totalAdded += added
    totalRemoved += removed

    spMetrics.Set(portMap, metrics.EfficiencyInfo{
      Endpoints: numEndpoints,
      Slices:    len(existingSlicesByPortMap[portMap]) + len(pmSlicesToCreate) - len(pmSlicesToDelete),
    })

    slicesToCreate = append(slicesToCreate, pmSlicesToCreate...)
    slicesToUpdate = append(slicesToUpdate, pmSlicesToUpdate...)
    slicesToDelete = append(slicesToDelete, pmSlicesToDelete...)
  }

  // If there are unique sets of ports that are no longer desired, mark
  // the corresponding endpoint slices for deletion.
  for portMap, existingSlices := range existingSlicesByPortMap {
    if _, ok := desiredEndpointsByPortMap[portMap]; !ok {
      for _, existingSlice := range existingSlices {
        slicesToDelete = append(slicesToDelete, existingSlice)
      }
    }
  }

  // When no endpoint slices would usually exist, we need to add a placeholder.
  if len(existingSlices) == len(slicesToDelete) && len(slicesToCreate) < 1 {
    placeholderSlice := newEndpointSlice(service, &endpointMeta{Ports: []discovery.EndpointPort{}, AddressType: addressType})
    slicesToCreate = append(slicesToCreate, placeholderSlice)
    spMetrics.Set(endpointutil.NewPortMapKey(placeholderSlice.Ports), metrics.EfficiencyInfo{
      Endpoints: 0,
      Slices:    1,
    })
  }

  metrics.EndpointsAddedPerSync.WithLabelValues().Observe(float64(totalAdded))
  metrics.EndpointsRemovedPerSync.WithLabelValues().Observe(float64(totalRemoved))

  serviceNN := types.NamespacedName{Name: service.Name, Namespace: service.Namespace}
  r.metricsCache.UpdateServicePortCache(serviceNN, spMetrics)

  // Topology hints are assigned per address type. This means it is
  // theoretically possible for endpoints of one address type to be assigned
  // hints while another endpoints of another address type are not.
  si := &topologycache.SliceInfo{
    ServiceKey: fmt.Sprintf("%s/%s", service.Namespace, service.Name),
    ToCreate:   slicesToCreate,
    ToUpdate:   slicesToUpdate,
    Unchanged:  unchangedSlices(existingSlices, slicesToUpdate, slicesToDelete),
  }

  if r.topologyCache != nil && hintsEnabled(service.Annotations) {
    slicesToCreate, slicesToUpdate = r.topologyCache.AddHints(si)
  } else {
    if r.topologyCache != nil {
      r.topologyCache.RemoveHints(si.ServiceKey, addressType)
    }
    slicesToCreate, slicesToUpdate = topologycache.RemoveHintsFromSlices(si)
  }

  return r.finalize(service, slicesToCreate, slicesToUpdate, slicesToDelete, triggerTime)
}

调用 r.finalize() 创建、更新或删除指定的 endpointSlice 对象。

r.finalize()

  1. 当同时有需要删除和新增的 slice 时,会优先把要删除的 slice 名替换到需要新增的 slice 上,再执行 slice 更新(意图是减少开销? 比如,要新增 A B C 三个,要删除 D E 两个,会遍历需要新增的 slice ,把 A 名替换成 D 的,B 替换成 E 的,再执行更新)

  2. 之后依次执行新增,更新和删除 slices 。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
func (r *reconciler) finalize(
  service *corev1.Service,
  slicesToCreate,
  slicesToUpdate,
  slicesToDelete []*discovery.EndpointSlice,
  triggerTime time.Time,
) error {
  // If there are slices to create and delete, change the creates to updates
  // of the slices that would otherwise be deleted.
  for i := 0; i < len(slicesToDelete); {
    if len(slicesToCreate) == 0 {
      break
    }
    sliceToDelete := slicesToDelete[i]
    slice := slicesToCreate[len(slicesToCreate)-1]
    // Only update EndpointSlices that are owned by this Service and have
    // the same AddressType. We need to avoid updating EndpointSlices that
    // are being garbage collected for an old Service with the same name.
    // The AddressType field is immutable. Since Services also consider
    // IPFamily immutable, the only case where this should matter will be
    // the migration from IP to IPv4 and IPv6 AddressTypes, where there's a
    // chance EndpointSlices with an IP AddressType would otherwise be
    // updated to IPv4 or IPv6 without this check.
    if sliceToDelete.AddressType == slice.AddressType && ownedBy(sliceToDelete, service) {
      slice.Name = sliceToDelete.Name
      slicesToCreate = slicesToCreate[:len(slicesToCreate)-1]
      slicesToUpdate = append(slicesToUpdate, slice)
      slicesToDelete = append(slicesToDelete[:i], slicesToDelete[i+1:]...)
    } else {
      i++
    }
  }

  // Don't create new EndpointSlices if the Service is pending deletion. This
  // is to avoid a potential race condition with the garbage collector where
  // it tries to delete EndpointSlices as this controller replaces them.
  if service.DeletionTimestamp == nil {
    for _, endpointSlice := range slicesToCreate {
      addTriggerTimeAnnotation(endpointSlice, triggerTime)
      createdSlice, err := r.client.DiscoveryV1().EndpointSlices(service.Namespace).Create(context.TODO(), endpointSlice, metav1.CreateOptions{})
      if err != nil {
        // If the namespace is terminating, creates will continue to fail. Simply drop the item.
        if errors.HasStatusCause(err, corev1.NamespaceTerminatingCause) {
          return nil
        }
        return fmt.Errorf("failed to create EndpointSlice for Service %s/%s: %v", service.Namespace, service.Name, err)
      }
      r.endpointSliceTracker.Update(createdSlice)
      metrics.EndpointSliceChanges.WithLabelValues("create").Inc()
    }
  }

  for _, endpointSlice := range slicesToUpdate {
    addTriggerTimeAnnotation(endpointSlice, triggerTime)
    updatedSlice, err := r.client.DiscoveryV1().EndpointSlices(service.Namespace).Update(context.TODO(), endpointSlice, metav1.UpdateOptions{})
    if err != nil {
      return fmt.Errorf("failed to update %s EndpointSlice for Service %s/%s: %v", endpointSlice.Name, service.Namespace, service.Name, err)
    }
    r.endpointSliceTracker.Update(updatedSlice)
    metrics.EndpointSliceChanges.WithLabelValues("update").Inc()
  }

  for _, endpointSlice := range slicesToDelete {
    err := r.client.DiscoveryV1().EndpointSlices(service.Namespace).Delete(context.TODO(), endpointSlice.Name, metav1.DeleteOptions{})
    if err != nil {
      return fmt.Errorf("failed to delete %s EndpointSlice for Service %s/%s: %v", endpointSlice.Name, service.Namespace, service.Name, err)
    }
    r.endpointSliceTracker.ExpectDeletion(endpointSlice)
    metrics.EndpointSliceChanges.WithLabelValues("delete").Inc()
  }

  topologyLabel := "Disabled"
  if r.topologyCache != nil && hintsEnabled(service.Annotations) {
    topologyLabel = "Auto"
  }

  numSlicesChanged := len(slicesToCreate) + len(slicesToUpdate) + len(slicesToDelete)
  metrics.EndpointSlicesChangedPerSync.WithLabelValues(topologyLabel).Observe(float64(numSlicesChanged))

  return nil
}

总结

  1. 总的来说,跟其他的控制器的逻辑是差不多的,都是先监听相关资源的事件,然后调谐。

  2. 从上面的代码我们也不难看出,endpointslice 有个特点就是,默认情况下,每个 slice 都是满 100 个条目就 new 一个新的切片,把每个切片的容量都控制在 100 个条目以内。

  3. 我们看完 endpointslice ,该控制器具有新增,更新和删除 slices 的功能,但是我们还发现源码里头还有 endpointslicemirroring 控制器。

  4. endpointslicemirroring:在某些场合,应用会创建定制的 Endpoints 资源。为了保证这些应用不需要并发的更改 Endpoints 和 EndpointSlice 资源,集群的控制面将大多数 Endpoints 映射到对应的 EndpointSlice 之上。

    控制面对 Endpoints 资源进行映射的例外情况有:

    • Endpoints 资源上标签 endpointslice.kubernetes.io/skip-mirror 值为 true。

    • Endpoints 资源包含标签 control-plane.alpha.kubernetes.io/leader。

    • 对应的 Service 资源不存在。

    • 对应的 Service 的选择算符不为空。

  5. endpointslicemirroring 控制器我们等有时间再看看,我们先看看其他组件。