目录

kubernetes源码-kube-proxy 原理和源码分析(二)

kube-proxy 原理和源码分析,源码为 kubernetes 的 release-1.22 分支 .

写在前面

由于篇幅和精力的原因,我们这里就先拿 ipvs 模式来做分析,如果后续有精力,可以考虑是不是要去看其他模式的代码。

ipvs 模式

上一篇我们提到 userspace 由于经常在内核态用户态之间频繁切换,从而导致性能的巨大的消耗,所以,正常情况下是不使用 userspace 模式的; kubernetes 默认使用的是 iptables 模式,不过,如果集群规模较大的时候,iptables 规则的同步和规则查找规则的过程也是很耗性能的。

我们先拆解一下步骤:

  1. 首先,我们先看看 ipvs 模式的构造函数。

  2. 然后前面我们也了解到了事件函数会因为不同的模式有不同的实现逻辑,我们再看监听函数。

  3. 最后,我们再看 syncProxyRules 代理规则同步。

NewProxier

平时用的 ipv4 比较多,我们先看只有 ipv4 协议栈的构造函数 proxier, err = ipvs.NewProxier()

  1. 检查内核参数 bridge-nf-call-iptables 是否等于 1 (主要是解决 Service 同节点通信问题),启用 contrack 保留连接跟踪信息。

  2. 解析 kernelVersion ,设置 net.ipv4.vs.conn_reuse_mode=0 。(当 net.ipv4.vs.conn_reuse_mode=0 时,ipvs 不会对新连接进行重新负载,而是复用之前的负载结果,将新连接转发到原来的 rs 上;当 net.ipv4.vs.conn_reuse_mode=1 时,ipvs 则会对新连接进行重新调度。)

  3. 设置 net.ipv4.vs.expire_nodest_conn=1 ,当 rs 的 weight 被设置成 0 后,流量立刻被摘掉。实现快速平滑的摘除 rs 。

  4. 设置 net.ipv4.vs.expire_quiescent_template=1 ,默认值为0,当 rs 的 weight 值=0(如,健康检测失败,应用程序将 RS weight 置0)时,会话保持的新建连接还会继续调度到该 rs 上;如果配置为1,则马上将会话保持的连接模板置为无效,重新调度新的 rs ,如果有会话保持的业务,建议该值配置为1。

  5. 从 kubernetes 角度来说,kube-proxy 需要在保证性能的前提下,找到一种能让新连接重新调度的方式。目前从内核代码中可以看到,需要将参数设置如下:

    • net.ipv4.vs.conntrack=0

    • net.ipv4.vs.conn_reuse_mode=1

    • net.ipv4.vs.expire_nodest_conn=1

  6. 设置 net.ipv4.ip_forward=1 ,启用ip转发。

  7. net.ipv4.conf.all.arp_ignore=1 ,只回答目标IP地址是本机上来访网络接口(网卡)IP地址段的 ARP 查询请求 。比如 eth0=192.168.0.1/24, eth1=10.1.1.1/24, 那么 eth0 收到来自 10.1.1.2 地址发起的对 192.168.0.1/24 的查询会回应,eth0 收到对 10.1.1.1/24 的 arp 查询不会回应。

  8. net.ipv4.conf.all.arp_announce=2 ,始终使用与目标IP地址对应的最佳本地 IP 地址作为 ARP 请求的源 IP 地址。在此模式下将忽略 IP 数据包的源 IP 地址并尝试选择能与目标 IP 地址通信的本机地址。首要是选择所有网络接口中子网包含该目标 IP 地址的本机 IP 地址。如果没有合适的地址,将选择当前的网络接口或其他的有可能接受到该 ARP 回应的网络接口来进行发送 ARP 请求,并把发送 ARP 请求的网络接口卡的 IP 地址设置为 ARP 请求的源 IP 。

  9. 为了使其他节点不对 local 网卡上的 Service External IP 进行 ARP Reply,节点需要设置 arp_ignore=1 以及 arp_announce=2 ,或者是设置 Kube-proxy 的 –ipvs-strict-arp 参数为 true 。

  10. 如果 tcpTimeout > 0 || tcpFinTimeout > 0 || udpTimeout > 0 ,则配置 ipvs 超时时间为这些参数的值。

  11. 生成 masquerade 标志用于SNAT规则。检查ip协议簇,并对节点ip进行分类记录,检查 ipvs 转发策略。

  12. 设置定时(1分钟)优雅清理 rs 任务。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
func NewProxier(ipt utiliptables.Interface,
  ipvs utilipvs.Interface,
  ipset utilipset.Interface,
  sysctl utilsysctl.Interface,
  exec utilexec.Interface,
  syncPeriod time.Duration,
  minSyncPeriod time.Duration,
  excludeCIDRs []string,
  strictARP bool,
  tcpTimeout time.Duration,
  tcpFinTimeout time.Duration,
  udpTimeout time.Duration,
  masqueradeAll bool,
  masqueradeBit int,
  localDetector proxyutiliptables.LocalTrafficDetector,
  hostname string,
  nodeIP net.IP,
  recorder events.EventRecorder,
  healthzServer healthcheck.ProxierHealthUpdater,
  scheduler string,
  nodePortAddresses []string,
  kernelHandler KernelHandler,
) (*Proxier, error) {
  // 需要启用 br_netfilter and bridge-nf-call-iptables=1 启用 bridge-nf-call-iptables 这个内核参数 (置为 1),表示 bridge 设备在二层转发时也去调用 iptables 配置的三层规则 (包含 conntrack),所以开启这个参数就能够解决上述 Service 同节点通信问题,这也是为什么在 Kubernetes 环境中,大多都要求开启 bridge-nf-call-iptables 的原因。
  if val, err := sysctl.GetSysctl(sysctlBridgeCallIPTables); err == nil && val != 1 {
    klog.InfoS("Missing br-netfilter module or unset sysctl br-nf-call-iptables; proxy may not work as intended")
  }

  // 设置启用 conntrack 保留连接跟踪信息
  if err := utilproxy.EnsureSysctl(sysctl, sysctlVSConnTrack, 1); err != nil {
    return nil, err
  }

  kernelVersionStr, err := kernelHandler.GetKernelVersion()
  if err != nil {
    return nil, fmt.Errorf("error determining kernel version to find required kernel modules for ipvs support: %v", err)
  }
  kernelVersion, err := version.ParseGeneric(kernelVersionStr)
  if err != nil {
    return nil, fmt.Errorf("error parsing kernel version %q: %v", kernelVersionStr, err)
  }
  if kernelVersion.LessThan(version.MustParseGeneric(connReuseMinSupportedKernelVersion)) {
    klog.ErrorS(nil, fmt.Sprintf("can't set sysctl %s, kernel version must be at least %s", sysctlConnReuse, connReuseMinSupportedKernelVersion))
  } else if kernelVersion.AtLeast(version.MustParseGeneric(connReuseFixedKernelVersion)) {
    // https://github.com/kubernetes/kubernetes/issues/93297
    klog.V(2).InfoS("Left as-is", "sysctl", sysctlConnReuse)
  } else {
    // 当 net.ipv4.vs.conn_reuse_mode=0 时,ipvs 不会对新连接进行重新负载,而是复用之前的负载结果,将新连接转发到原来的 rs 上;当 net.ipv4.vs.conn_reuse_mode=1 时,ipvs 则会对新连接进行重新调度。
    if err := utilproxy.EnsureSysctl(sysctl, sysctlConnReuse, 0); err != nil {
      return nil, err
    }
  }

  // 当 net.ipv4.vs.conn_reuse_mode=0 时,新建联的请求不会再转发给 rs,但是以前已经建立的连接,还是会接收请求,kube-proxy 只是将 rs 的 weight 置为 0 不会直接删除。这样一来,只要不断的有端口复用的连接请求发来,rs 就不会被 kube-proxy 删除,优雅删除就无法实现。设置 net.ipv4.vs.expire_nodest_conn=1 ,rs 的 weight 被设置成 0 后,流量立刻被摘掉。
  if err := utilproxy.EnsureSysctl(sysctl, sysctlExpireNoDestConn, 1); err != nil {
    return nil, err
  }

  // Set the expire_quiescent_template sysctl we need for
  if err := utilproxy.EnsureSysctl(sysctl, sysctlExpireQuiescentTemplate, 1); err != nil {
    return nil, err
  }

  // Set the ip_forward sysctl we need for
  if err := utilproxy.EnsureSysctl(sysctl, sysctlForward, 1); err != nil {
    return nil, err
  }

  if strictARP {
    // Set the arp_ignore sysctl we need for
    if err := utilproxy.EnsureSysctl(sysctl, sysctlArpIgnore, 1); err != nil {
      return nil, err
    }

    // Set the arp_announce sysctl we need for
    if err := utilproxy.EnsureSysctl(sysctl, sysctlArpAnnounce, 2); err != nil {
      return nil, err
    }
  }

  // Configure IPVS timeouts if any one of the timeout parameters have been set.
  // This is the equivalent to running ipvsadm --set, a value of 0 indicates the
  // current system timeout should be preserved
  if tcpTimeout > 0 || tcpFinTimeout > 0 || udpTimeout > 0 {
    if err := ipvs.ConfigureTimeouts(tcpTimeout, tcpFinTimeout, udpTimeout); err != nil {
      klog.ErrorS(err, "failed to configure IPVS timeouts")
    }
  }

  // 生成 masquerade 标志用于SNAT规则
  masqueradeValue := 1 << uint(masqueradeBit)
  masqueradeMark := fmt.Sprintf("%#08x", masqueradeValue)

  ipFamily := v1.IPv4Protocol
  if ipt.IsIPv6() {
    ipFamily = v1.IPv6Protocol
  }

  klog.V(2).InfoS("record nodeIP and family", "nodeIP", nodeIP, "family", ipFamily)

  if len(scheduler) == 0 {
    klog.InfoS("IPVS scheduler not specified, use rr by default")
    scheduler = DefaultScheduler
  }

  serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder)
  
  // 对节点ip进行分类记录
  ipFamilyMap := utilproxy.MapCIDRsByIPFamily(nodePortAddresses)
  nodePortAddresses = ipFamilyMap[ipFamily]
  // Log the IPs not matching the ipFamily
  if ips, ok := ipFamilyMap[utilproxy.OtherIPFamily(ipFamily)]; ok && len(ips) > 0 {
    klog.InfoS("found node IPs of the wrong family", "ipFamily", ipFamily, "ips", strings.Join(ips, ","))
  }

  // excludeCIDRs has been validated before, here we just parse it to IPNet list
  parsedExcludeCIDRs, _ := utilnet.ParseCIDRs(excludeCIDRs)

  proxier := &Proxier{
    ipFamily:              ipFamily,
    serviceMap:            make(proxy.ServiceMap),
    serviceChanges:        proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil),
    endpointsMap:          make(proxy.EndpointsMap),
    endpointsChanges:      proxy.NewEndpointChangeTracker(hostname, nil, ipFamily, recorder, nil),
    syncPeriod:            syncPeriod,
    minSyncPeriod:         minSyncPeriod,
    excludeCIDRs:          parsedExcludeCIDRs,
    iptables:              ipt,
    masqueradeAll:         masqueradeAll,
    masqueradeMark:        masqueradeMark,
    exec:                  exec,
    localDetector:         localDetector,
    hostname:              hostname,
    nodeIP:                nodeIP,
    recorder:              recorder,
    serviceHealthServer:   serviceHealthServer,
    healthzServer:         healthzServer,
    ipvs:                  ipvs,
    ipvsScheduler:         scheduler,
    ipGetter:              &realIPGetter{nl: NewNetLinkHandle(ipFamily == v1.IPv6Protocol)},
    iptablesData:          bytes.NewBuffer(nil),
    filterChainsData:      bytes.NewBuffer(nil),
    natChains:             bytes.NewBuffer(nil),
    natRules:              bytes.NewBuffer(nil),
    filterChains:          bytes.NewBuffer(nil),
    filterRules:           bytes.NewBuffer(nil),
    netlinkHandle:         NewNetLinkHandle(ipFamily == v1.IPv6Protocol),
    ipset:                 ipset,
    nodePortAddresses:     nodePortAddresses,
    networkInterfacer:     utilproxy.RealNetwork{},
    gracefuldeleteManager: NewGracefulTerminationManager(ipvs),
  }
  // 初始化 kubernetes ipset 默认集
  proxier.ipsetList = make(map[string]*IPSet)
  for _, is := range ipsetInfo {
    proxier.ipsetList[is.name] = NewIPSet(ipset, is.name, is.setType, (ipFamily == v1.IPv6Protocol), is.comment)
  }
  burstSyncs := 2
  klog.V(2).InfoS("ipvs sync params", "ipFamily", ipt.Protocol(), "minSyncPeriod", minSyncPeriod, "syncPeriod", syncPeriod, "burstSyncs", burstSyncs)
  proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)
  // 定时(1分钟)优雅清理 rs (realServer记录) 
  proxier.gracefuldeleteManager.Run()
  return proxier, nil
}

事件函数实现

在上一篇,我们在核心逻辑 Run() 函数里面遗留了事件函数这么一段还没去看,我们现在看一下这一部分。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
  // 开始创建相应的 informer 并注册事件函数
  // 依次是service informer, endpointslieces informer
  serviceConfig := config.NewServiceConfig(informerFactory.Core().V1().Services(), s.ConfigSyncPeriod)
  serviceConfig.RegisterEventHandler(s.Proxier)
  go serviceConfig.Run(wait.NeverStop)

  if s.UseEndpointSlices {
    endpointSliceConfig := config.NewEndpointSliceConfig(informerFactory.Discovery().V1().EndpointSlices(), s.ConfigSyncPeriod)
    endpointSliceConfig.RegisterEventHandler(s.Proxier)
    go endpointSliceConfig.Run(wait.NeverStop)
  } else {
    endpointsConfig := config.NewEndpointsConfig(informerFactory.Core().V1().Endpoints(), s.ConfigSyncPeriod)
    endpointsConfig.RegisterEventHandler(s.Proxier)
    go endpointsConfig.Run(wait.NeverStop)
  }

service 事件函数

我们跟随 NewServiceConfig() 构造函数,看看它是怎么实现的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
func NewServiceConfig(serviceInformer coreinformers.ServiceInformer, resyncPeriod time.Duration) *ServiceConfig {
  result := &ServiceConfig{
    listerSynced: serviceInformer.Informer().HasSynced,
  }

  serviceInformer.Informer().AddEventHandlerWithResyncPeriod(
    cache.ResourceEventHandlerFuncs{
      // 事件函数在这里完成添加
      AddFunc:    result.handleAddService,
      UpdateFunc: result.handleUpdateService,
      DeleteFunc: result.handleDeleteService,
    },
    resyncPeriod,
  )

  return result
}

我们看看 handleAddService、 handleUpdateService、 handleDeleteService 里面是什么样的? 原来,它最终是去调 Proxier 的接口来实现的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
func (c *ServiceConfig) handleAddService(obj interface{}) {
  service, ok := obj.(*v1.Service)
  if !ok {
    utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
    return
  }
  for i := range c.eventHandlers {
    klog.V(4).Info("Calling handler.OnServiceAdd")
    c.eventHandlers[i].OnServiceAdd(service)
  }
}

func (c *ServiceConfig) handleUpdateService(oldObj, newObj interface{}) {
  oldService, ok := oldObj.(*v1.Service)
  if !ok {
    utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", oldObj))
    return
  }
  service, ok := newObj.(*v1.Service)
  if !ok {
    utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", newObj))
    return
  }
  for i := range c.eventHandlers {
    klog.V(4).Info("Calling handler.OnServiceUpdate")
    c.eventHandlers[i].OnServiceUpdate(oldService, service)
  }
}

func (c *ServiceConfig) handleDeleteService(obj interface{}) {
  service, ok := obj.(*v1.Service)
  if !ok {
    tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
    if !ok {
      utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
      return
    }
    if service, ok = tombstone.Obj.(*v1.Service); !ok {
      utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
      return
    }
  }
  for i := range c.eventHandlers {
    klog.V(4).Info("Calling handler.OnServiceDelete")
    c.eventHandlers[i].OnServiceDelete(service)
  }
}
  1. 我们看看 Proxier 的这些接口具体是什么样的逻辑。

  2. OnServiceAdd()、 OnServiceDelete 直接调用 OnServiceUpdate() 。

  3. OnServiceUpdate() 内部调用 proxier.serviceChanges.Update(oldService, service) 生成 serviceMap (以命名空间、端口名称、端口协议作为 map 的 key ,ServicePort (ip,端口,协议)接口类型做为值) 对比对象有没有发生变化,需不需要生成 proxy 规则,最再后去调用 proxier.Sync() 接口同步处理。

  4. serviceMap e.g. : {{"ns", "cluster-ip", "TCP"}: {"172.16.55.10", 1234, "TCP"}}

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// OnServiceAdd is called whenever creation of new service object is observed.
func (proxier *Proxier) OnServiceAdd(service *v1.Service) {
  proxier.OnServiceUpdate(nil, service)
}

// OnServiceUpdate is called whenever modification of an existing service object is observed.
func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) {
  if proxier.serviceChanges.Update(oldService, service) && proxier.isInitialized() {
    proxier.Sync()
  }
}

// OnServiceDelete is called whenever deletion of an existing service object is observed.
func (proxier *Proxier) OnServiceDelete(service *v1.Service) {
  proxier.OnServiceUpdate(service, nil)
}

endpoints 事件函数

同理,endpointSlice 类型的同步逻辑也是观察到有变化就去调用 proxier.Sync() 接口同步处理。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// OnEndpointSliceAdd is called whenever creation of a new endpoint slice object
// is observed.
func (proxier *Proxier) OnEndpointSliceAdd(endpointSlice *discovery.EndpointSlice) {
  if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, false) && proxier.isInitialized() {
    proxier.Sync()
  }
}

// OnEndpointSliceUpdate is called whenever modification of an existing endpoint
// slice object is observed.
func (proxier *Proxier) OnEndpointSliceUpdate(_, endpointSlice *discovery.EndpointSlice) {
  if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, false) && proxier.isInitialized() {
    proxier.Sync()
  }
}

// OnEndpointSliceDelete is called whenever deletion of an existing endpoint slice
// object is observed.
func (proxier *Proxier) OnEndpointSliceDelete(endpointSlice *discovery.EndpointSlice) {
  if proxier.endpointsChanges.EndpointSliceUpdate(endpointSlice, true) && proxier.isInitialized() {
    proxier.Sync()
  }
}

proxier.Sync()

1
2
3
4
5
6
7
func (proxier *Proxier) Sync() {
  if proxier.healthzServer != nil {
    proxier.healthzServer.QueuedUpdate()
  }
  metrics.SyncProxyRulesLastQueuedTimestamp.SetToCurrentTime()
  proxier.syncRunner.Run()
}

proxier.syncRunner.Run()

proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)

1.发送消息给通道 bfr.run 触发 tryRun() 。 2.最终也是去触发 proxier.syncProxyRules() 接口。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
func (bfr *BoundedFrequencyRunner) Run() {
  // If it takes a lot of time to run the underlying function, noone is really
  // processing elements from <run> channel. So to avoid blocking here on the
  // putting element to it, we simply skip it if there is already an element
  // in it.
  select {
  // 发送消息给通道 bfr.run
  case bfr.run <- struct{}{}:
  default:
  }
}

func (bfr *BoundedFrequencyRunner) Loop(stop <-chan struct{}) {
  klog.V(3).Infof("%s Loop running", bfr.name)
  bfr.timer.Reset(bfr.maxInterval)
  for {
    select {
    case <-stop:
      bfr.stop()
      klog.V(3).Infof("%s Loop stopping", bfr.name)
      return
    case <-bfr.timer.C():
      bfr.tryRun()
    case <-bfr.run:
      bfr.tryRun()
    case <-bfr.retry:
      bfr.doRetry()
    }
  }
}

func (bfr *BoundedFrequencyRunner) tryRun() {
  bfr.mu.Lock()
  defer bfr.mu.Unlock()

  if bfr.limiter.TryAccept() {
    // We're allowed to run the function right now.
    bfr.fn()
    bfr.lastRun = bfr.timer.Now()
    bfr.timer.Stop()
    bfr.timer.Reset(bfr.maxInterval)
    klog.V(3).Infof("%s: ran, next possible in %v, periodic in %v", bfr.name, bfr.minInterval, bfr.maxInterval)
    return
  }

  // It can't run right now, figure out when it can run next.
  elapsed := bfr.timer.Since(bfr.lastRun)   // how long since last run
  nextPossible := bfr.minInterval - elapsed // time to next possible run
  nextScheduled := bfr.timer.Remaining()    // time to next scheduled run
  klog.V(4).Infof("%s: %v since last run, possible in %v, scheduled in %v", bfr.name, elapsed, nextPossible, nextScheduled)

  // It's hard to avoid race conditions in the unit tests unless we always reset
  // the timer here, even when it's unchanged
  if nextPossible < nextScheduled {
    nextScheduled = nextPossible
  }
  bfr.timer.Stop()
  bfr.timer.Reset(nextScheduled)
}

proxier.syncProxyRules()

proxier.Sync() 最终是调用 proxier.syncRunner.Run() 接口做规则同步。

  1. serviceConfig.Run(wait.NeverStop) 是去调用 Proxier 对象的 OnServiceSynced() 接口实现的。

  2. endpointSliceConfig.Run(wait.NeverStop) 是去调用 Proxier 对象的 OnEndpointSlicesSynced() 接口实现的。

  3. 2 者最终都是调用 proxier.syncProxyRules() ,注意:这里只有在 2 者都完成同步后,才去无条件执行一次。

serviceConfig.Run()

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// Run waits for cache synced and invokes handlers after syncing.
func (c *ServiceConfig) Run(stopCh <-chan struct{}) {
  klog.Info("Starting service config controller")

  if !cache.WaitForNamedCacheSync("service config", stopCh, c.listerSynced) {
    return
  }

  for i := range c.eventHandlers {
    klog.V(3).Info("Calling handler.OnServiceSynced()")
    c.eventHandlers[i].OnServiceSynced()
  }
}

// OnServiceSynced is called once all the initial event handlers were called and the state is fully propagated to local cache.
func (proxier *Proxier) OnServiceSynced() {
  proxier.mu.Lock()
  proxier.servicesSynced = true
  proxier.setInitialized(proxier.endpointSlicesSynced)
  proxier.mu.Unlock()

  // Sync unconditionally - this is called once per lifetime.
  proxier.syncProxyRules()
}

endpointSliceConfig.Run()

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// Run waits for cache synced and invokes handlers after syncing.
func (c *EndpointSliceConfig) Run(stopCh <-chan struct{}) {
  klog.Info("Starting endpoint slice config controller")

  if !cache.WaitForNamedCacheSync("endpoint slice config", stopCh, c.listerSynced) {
    return
  }

  for _, h := range c.eventHandlers {
    klog.V(3).Infof("Calling handler.OnEndpointSlicesSynced()")
    h.OnEndpointSlicesSynced()
  }
}

// OnEndpointSlicesSynced is called once all the initial event handlers were
// called and the state is fully propagated to local cache.
func (proxier *Proxier) OnEndpointSlicesSynced() {
  proxier.mu.Lock()
  proxier.endpointSlicesSynced = true
  proxier.setInitialized(proxier.servicesSynced)
  proxier.mu.Unlock()

  // Sync unconditionally - this is called once per lifetime.
  proxier.syncProxyRules()
}

s.Proxier.SyncLoop() 函数

  1. 从上面 service 和 endpointslice 对象的监听函数可以看出,只要一监听到 service 和 endpointslice 事件,就触发 syncProxyRules 规则同步,还记得前面的核心逻辑 Run() 函数吗?它启动的 s.Proxier.SyncLoop() 最终也是去调用 syncProxyRules 接口去同步代理规则的,我们先看看 s.Proxier.SyncLoop() 函数的逻辑,再解析 proxier.syncProxyRules() 的逻辑。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// SyncLoop runs periodic work.  This is expected to run as a goroutine or as the main loop of the app.  It does not return.
func (proxier *Proxier) SyncLoop() {
  // Update healthz timestamp at beginning in case Sync() never succeeds.
  if proxier.healthzServer != nil {
    proxier.healthzServer.Updated()
  }
  // synthesize "last change queued" time as the informers are syncing.
  metrics.SyncProxyRulesLastQueuedTimestamp.SetToCurrentTime()
  proxier.syncRunner.Loop(wait.NeverStop)
}

proxier.syncRunner.Loop()

跟 proxier.syncRunner.Run() 不同的是,proxier.syncRunner.Loop() 跟它字面意思一样,一直循环执行的,默认时间间隔是 30s。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
func (bfr *BoundedFrequencyRunner) Loop(stop <-chan struct{}) {
  klog.V(3).Infof("%s Loop running", bfr.name)
  bfr.timer.Reset(bfr.maxInterval) // 默认30s
  for {
    select {
    case <-stop:
      bfr.stop()
      klog.V(3).Infof("%s Loop stopping", bfr.name)
      return
    case <-bfr.timer.C():
      bfr.tryRun()
    case <-bfr.run:
      bfr.tryRun()
    case <-bfr.retry:
      bfr.doRetry()
    }
  }
}

Loop() 接口会分别根据条件运行 bfr.stop()、 tryRun()、 doRetry() 函数

  1. 接受到 <-stop 信号,运行 bfr.stop() 停止同步。

  2. 接收到 <-bfr.timer.C() 或 <-bfr.run 信号,运行 bfr.tryRun() 进行同步。

  3. 接收到 <-bfr.retry 信号,运行 bfr.doRetry() 。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
// assumes the lock is not held
// bfr.stop() 停止时间计时器,停止限速器。
func (bfr *BoundedFrequencyRunner) stop() {
  bfr.mu.Lock()
  defer bfr.mu.Unlock()
  bfr.limiter.Stop()
  bfr.timer.Stop()
}

// assumes the lock is not held
func (bfr *BoundedFrequencyRunner) doRetry() {
  bfr.mu.Lock()
  defer bfr.mu.Unlock()
  bfr.retryMu.Lock()
  defer bfr.retryMu.Unlock()

  if bfr.retryTime.IsZero() {
    return
  }

  // Timer wants an interval not an absolute time, so convert retryTime back now
  // 计算重试时间间隔,如果重试时间间隔小于下一次同步的间隔,则停止计时器,重新设置计时器时间为重试时间间隔
  retryInterval := bfr.retryTime.Sub(bfr.timer.Now())
  bfr.retryTime = time.Time{}
  if retryInterval < bfr.timer.Remaining() {
    klog.V(3).Infof("%s: retrying in %v", bfr.name, retryInterval)
    bfr.timer.Stop()
    bfr.timer.Reset(retryInterval)
  }
}

// assumes the lock is not held
func (bfr *BoundedFrequencyRunner) tryRun() {
  bfr.mu.Lock()
  defer bfr.mu.Unlock()

  // 这个在创建的时候,就是 true
  if bfr.limiter.TryAccept() {
    // We're allowed to run the function right now.
    // 立即执行同步规则函数
    bfr.fn()
    bfr.lastRun = bfr.timer.Now()
    // 重置计时器
    bfr.timer.Stop()
    bfr.timer.Reset(bfr.maxInterval)
    klog.V(3).Infof("%s: ran, next possible in %v, periodic in %v", bfr.name, bfr.minInterval, bfr.maxInterval)
    return
  }

  // It can't run right now, figure out when it can run next.
  // 计算下一次执行时间
  elapsed := bfr.timer.Since(bfr.lastRun)   // how long since last run
  nextPossible := bfr.minInterval - elapsed // time to next possible run
  nextScheduled := bfr.timer.Remaining()    // time to next scheduled run
  klog.V(4).Infof("%s: %v since last run, possible in %v, scheduled in %v", bfr.name, elapsed, nextPossible, nextScheduled)

  // It's hard to avoid race conditions in the unit tests unless we always reset
  // the timer here, even when it's unchanged
  // 重置计时器,避免单元测试出现的数据竞争
  if nextPossible < nextScheduled {
    nextScheduled = nextPossible
  }
  bfr.timer.Stop()
  bfr.timer.Reset(nextScheduled)
}

未完

看完 s.Proxier.SyncLoop() 函数后,接下来要开始看 proxier.syncProxyRules() 函数了,我们先暂停一下,因为 proxier.syncProxyRules() 函数内容实在是太长了,我们不得不分开来讲。