目录

kubernetes源码-kube-proxy 原理和源码分析(三)

kube-proxy 原理和源码分析,源码为 kubernetes 的 release-1.22 分支 .

写在前面

千呼万唤始出来,这个函数特别长,我们慢慢来看它的实现逻辑。

由于 linux 内核原生的 ipvs 模式只支持 DNAT,不支持 SNAT,所以,在以下几种场景中 ipvs 仍需要依赖 iptables 规则:

  1. kube-proxy 启动时指定 –masquerade-all=true 参数,即集群中所有经过 kube-proxy 的包都做一次 SNAT。

  2. kube-proxy 启动时指定 –cluster-cidr= 参数。

  3. 对于 Load Balancer 类型的 service,用于配置白名单。

  4. 对于 NodePort 类型的 service,用于配置 MASQUERADE。

  5. 对于 externalIPs 类型的 service 。但对于 ipvs 模式的 kube-proxy,无论有多少 pod/service,iptables 的规则数都是固定的。

s.Proxier.SyncLoop() 函数

内容实在太长太多了,我们分几个步骤来看。

前置操作

  1. 检查 Proxier 是否已经完成初始化了,否的话,直接退出函数。

  2. 记录同步时长。

  3. 合并更新 proxier.serviceMap map 对象的内容,使内容永远保持最新,内容的保存形式 {“ns, svc名”, “端口名”, “TCP”}: {“172.16.55.10”, 1234, “TCP”} ,同时清理掉 map 对象过期的端口信息,并收集过期端口为 udp 协议的 ip 待后续使用。

  4. 计算得到过时的规则,根据这些过时的规则可以用于后续清理操作。

  5. iptables 前置操作,确保前置规则链存在。

  6. 确保本机已创建 dummy 网卡和 ipset 默认列表,默认为 kube-ipvs0。为什么要创建 dummy 网卡?因为 ipvs netfilter 的 DNAT 钩子挂载在 INPUT 链上,当访问 ClusterIP 时,将 ClusterIP 绑定在 dummy 网卡上为了让内核识别该 IP 就是本机 IP,进而进入 INPUT 链,然后通过钩子函数 ip_vs_in 转发到 POSTROUTING 链;将 ClusterIP 绑定到 dummy 网卡;为每个 ClusterIP 创建 IPVS virtual servers 和 real server,分别对应 service 和 endpoints;

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
func (proxier *Proxier) syncProxyRules() {
  proxier.mu.Lock()
  defer proxier.mu.Unlock()

  // 检查 Proxier 是否已经完成初始化了,否的话,直接退出函数。
  if !proxier.isInitialized() {
    klog.V(2).InfoS("Not syncing ipvs rules until Services and Endpoints have been received from master")
    return
  }

  // 录同步时长。
  start := time.Now()
  defer func() {
    metrics.SyncProxyRulesLatency.Observe(metrics.SinceInSeconds(start))
    klog.V(4).InfoS("syncProxyRules complete", "elapsed", time.Since(start))
  }()

  // We assume that if this was called, we really want to sync them,
  // even if nothing changed in the meantime. In other words, callers are
  // responsible for detecting no-op changes and not calling this function.
  // 合并更新 proxier.serviceMap map 对象的内容,使内容永远保持最新,内容的保存形式 {"ns, svc名", "端口名", "TCP"}: {"172.16.55.10", 1234, "TCP"} ,同时清理掉 map 对象过期的端口信息,并收集过期端口为 udp 协议的 ip 待后续使用,然后将 changes 置为空。 
  serviceUpdateResult := proxier.serviceMap.Update(proxier.serviceChanges)
  // 同上
  endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges)

  staleServices := serviceUpdateResult.UDPStaleClusterIP
  // merge stale services gathered from updateEndpointsMap
  // 计算得到过时的规则,根据这些过时的规则可以用于后续清理操作。
  for _, svcPortName := range endpointUpdateResult.StaleServiceNames {
    if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) {
      klog.V(2).InfoS("Stale service", "protocol", strings.ToLower(string(svcInfo.Protocol())), "svcPortName", svcPortName.String(), "clusterIP", svcInfo.ClusterIP().String())
      staleServices.Insert(svcInfo.ClusterIP().String())
      for _, extIP := range svcInfo.ExternalIPStrings() {
        staleServices.Insert(extIP)
      }
      for _, extIP := range svcInfo.LoadBalancerIPStrings() {
        staleServices.Insert(extIP)
      }
    }
  }

  klog.V(3).InfoS("Syncing ipvs Proxier rules")

  // Begin install iptables
  // iptables 前置操作

  // Reset all buffers used later.
  // This is to avoid memory reallocations and thus improve performance.
  proxier.natChains.Reset()
  proxier.natRules.Reset()
  proxier.filterChains.Reset()
  proxier.filterRules.Reset()

  // Write table headers.
  utilproxy.WriteLine(proxier.filterChains, "*filter")
  utilproxy.WriteLine(proxier.natChains, "*nat")

  // 确保前置规则链存在
  // 检查 nat 表 KUBE-MARK-DROP chain 是否存在,不修改任何规则
  // 检查 nat 表 KUBE-SERVICES KUBE-POSTROUTING KUBE-FIREWALL KUBE-NODE-PORT KUBE-LOAD-BALANCER KUBE-MARK-MASQ chain 是否存在并加载到 buff
  // 检查 filter 表 KUBE-FORWARD KUBE-NODE-PORT chain 是否存在并加载到 buff
  // 检查以下规则,不存在则创建
  // -I nat OUTPUT -m comment --comment "kubernetes service portals" -j KUBE-SERVICES
  // -I nat PREROUTING -m comment --comment "kubernetes service portals" -j KUBE-SERVICES
  // -I nat POSTROUTING -m comment --comment "kubernetes postrouting rules" -j KUBE-POSTROUTING
  // -I filter FORWARD -m comment --comment "kubernetes forwarding rules" -j KUBE-FORWARD
  // -I filter INPUT -m comment --comment "kubernetes health check rules" -j KUBE-NODE-PORT
  proxier.createAndLinkKubeChain()

  // make sure dummy interface exists in the system where ipvs Proxier will bind service address on it
  // 确保本机已创建 dummy 网卡,默认为 kube-ipvs0。为什么要创建 dummy 网卡?因为 ipvs netfilter 的 DNAT 钩子挂载在 INPUT 链上,当访问 ClusterIP 时,将 ClusterIP 绑定在 dummy 网卡上为了让内核识别该 IP 就是本机 IP,进而进入 INPUT 链,然后通过钩子函数 ip_vs_in 转发到 POSTROUTING 链;将 ClusterIP 绑定到 dummy 网卡;为每个 ClusterIP 创建 IPVS virtual servers 和 real server,分别对应 service 和 endpoints;
  _, err := proxier.netlinkHandle.EnsureDummyDevice(DefaultDummyDevice)
  if err != nil {
    klog.ErrorS(err, "Failed to create dummy interface", "interface", DefaultDummyDevice)
    return
  }

  // make sure ip sets exists in the system.
  // 检查 ipset 集是否存在
  // KUBE-LOOP-BACK hash:ip,port,ip "Kubernetes endpoints dst ip:port, source ip for solving hairpin purpose"
  // KUBE-CLUSTER-IP hash:ip,port "Kubernetes service cluster ip + port for masquerade purpose"
  // KUBE-EXTERNAL-IP hash:ip,port "Kubernetes service external ip + port for masquerade and filter purpose"
  // KUBE-EXTERNAL-IP-LOCAL hash:ip,port "Kubernetes service external ip + port with externalTrafficPolicy=local"
  // KUBE-LOAD-BALANCER hash:ip,port "Kubernetes service lb portal"
  // KUBE-LOAD-BALANCER-FW hash:ip,port "Kubernetes service load balancer ip + port for load balancer with sourceRange"
  // KUBE-LOAD-BALANCER-LOCAL hash:ip,port "Kubernetes service load balancer ip + port with externalTrafficPolicy=local"
  // KUBE-LOAD-BALANCER-SOURCE-IP hash:ip,port,ip "Kubernetes service load balancer ip + port + source IP for packet filter purpose"
  // KUBE-LOAD-BALANCER-SOURCE-CIDR hash:ip,port,net "Kubernetes service load balancer ip + port + source cidr for packet filter purpose"
  // KUBE-NODE-PORT-TCP bitmap:port "Kubernetes nodeport TCP port for masquerade purpose"
  // KUBE-NODE-PORT-LOCAL-TCP bitmap:port "Kubernetes nodeport TCP port with externalTrafficPolicy=local"
  // KUBE-NODE-PORT-UDP bitmap:port "Kubernetes nodeport UDP port for masquerade purpose"
  // KUBE-NODE-PORT-LOCAL-UDP bitmap:port "Kubernetes nodeport UDP port with externalTrafficPolicy=local"
  // KUBE-NODE-PORT-SCTP-HASH hash:ip,port "Kubernetes nodeport SCTP port for masquerade purpose with type 'hash ip:port'"
  // KUBE-NODE-PORT-LOCAL-SCTP-HASH hash:ip,port "Kubernetes nodeport SCTP port with externalTrafficPolicy=local with type 'hash ip:port'"
  // KUBE-HEALTH-CHECK-NODE-PORT bitmap:port "Kubernetes health check node port"
  for _, set := range proxier.ipsetList {
    if err := ensureIPSet(set); err != nil {
      return
    }
    // 重置 set.activeEntries
    set.resetEntries()
  }
  1. 检查当前节点系统 ipvs 规则。

  2. 检查 kube-ipvs0 网卡上成功绑定的 ip 。

  3. 检查节点 ip ,用于后面的 NodePort 类型 service 的端口监听。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
  // activeIPVSServices represents IPVS service successfully created in this round of sync
  activeIPVSServices := map[string]bool{}
  // currentIPVSServices represent IPVS services listed from the system
  currentIPVSServices := make(map[string]*utilipvs.VirtualServer)
  // activeBindAddrs represents ip address successfully bind to DefaultDummyDevice in this round of sync
  activeBindAddrs := map[string]bool{}

  // 获取机器上绑定的 ip,相当于执行 $ ip route show table local type local proto kernel dev kube-ipvs0
  // 10.0.0.1  scope host  src 10.0.0.1
  // 10.0.0.10  scope host  src 10.0.0.10
  // 截取唯一源 IP 字段,
  // --> result set: [10.0.0.1, 10.0.0.10]
  bindedAddresses, err := proxier.ipGetter.BindedIPs()
  if err != nil {
    klog.ErrorS(err, "error listing addresses binded to dummy interface")
  }

  // 检查NodePort
  hasNodePort := false
  for _, svc := range proxier.serviceMap {
    svcInfo, ok := svc.(*serviceInfo)
    if ok && svcInfo.NodePort() != 0 {
      hasNodePort = true
      break
    }
  }

  // Both nodeAddresses and nodeIPs can be reused for all nodePort services
  // and only need to be computed if we have at least one nodePort service.
  var (
    // List of node addresses to listen on if a nodePort is set.
    nodeAddresses []string
    // List of node IP addresses to be used as IPVS services if nodePort is set.
    nodeIPs []net.IP
  )

  // 获取主机节点 ip
  if hasNodePort {
    nodeAddrSet, err := utilproxy.GetNodeAddresses(proxier.nodePortAddresses, proxier.networkInterfacer)
    if err != nil {
      klog.ErrorS(err, "Failed to get node ip address matching nodeport cidr")
    } else {
      nodeAddresses = nodeAddrSet.List()
      for _, address := range nodeAddresses {
        a := net.ParseIP(address)
        if a.IsLoopback() {
          continue
        }
        if utilproxy.IsZeroCIDR(address) {
          nodeIPs, err = proxier.ipGetter.NodeIPs()
          if err != nil {
            klog.ErrorS(err, "Failed to list all node IPs from host")
          }
          break
        }
        nodeIPs = append(nodeIPs, a)
      }
    }
  }

  // 过滤对应的 ip 协议簇的 ip
  idx := 0
  for _, nodeIP := range nodeIPs {
    if (proxier.ipFamily == v1.IPv6Protocol) == utilnet.IsIPv6(nodeIP) {
      nodeIPs[idx] = nodeIP
      idx++
    }
  }
  // reset slice to filtered entries
  nodeIPs = nodeIPs[:idx]

配置 ipset/ipvs 规则

  1. 为每个 service 绑定 ipvs 规则。

  2. 先配置 ipset 集。

    2.1 配置回环地址、clusterIp集: ipset KUBE-LOOP-BACK,KUBE-CLUSTER-IP 。

    2.2 配置 EXTERNAL-IP 集: KUBE-EXTERNAL-IP-LOCAL,KUBE-EXTERNAL-IP 。

    2.3 配置 LOAD-BALANCER 集: KUBE-LOAD-BALANCER,UBE-LOAD-BALANCER-LOCAL,KUBE-LOAD-BALANCER-FW,KUBE-LOAD-BALANCER-SOURCE-CIDR,KUBE-LOAD-BALANCER-SOURCE-IP 。

    2.4 配置 NodePort 集: KUBE-NODE-PORT-TCP,KUBE-NODE-PORT-UDP,KUBE-NODE-PORT-SCTP-HASH,KUBE-NODE-PORT-LOCAL-TCP,KUBE-NODE-PORT-LOCAL-TCP,KUBE-NODE-PORT-LOCAL-SCTP-HASH 。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
  // 为每个 service 绑定 ipvs 规则
  for svcName, svc := range proxier.serviceMap {
    svcInfo, ok := svc.(*serviceInfo)
    if !ok {
      klog.ErrorS(nil, "Failed to cast serviceInfo", "svcName", svcName.String())
      continue
    }
    isIPv6 := utilnet.IsIPv6(svcInfo.ClusterIP())
    localPortIPFamily := utilnet.IPv4
    if isIPv6 {
      localPortIPFamily = utilnet.IPv6
    }
    protocol := strings.ToLower(string(svcInfo.Protocol()))
    // Precompute svcNameString; with many services the many calls
    // to ServicePortName.String() show up in CPU profiles.
    svcNameString := svcName.String()

    // Handle traffic that loops back to the originator with SNAT.
    // 配置 ipset KUBE-LOOP-BACK 集
    for _, e := range proxier.endpointsMap[svcName] {
      ep, ok := e.(*proxy.BaseEndpointInfo)
      if !ok {
        klog.ErrorS(nil, "Failed to cast BaseEndpointInfo", "endpoint", e.String())
        continue
      }
      if !ep.IsLocal {
        continue
      }
      epIP := ep.IP()
      epPort, err := ep.Port()
      // Error parsing this endpoint has been logged. Skip to next endpoint.
      if epIP == "" || err != nil {
        continue
      }
      // 组装成 ipset 条目
      entry := &utilipset.Entry{
        IP:       epIP,
        Port:     epPort,
        Protocol: protocol,
        IP2:      epIP,
        SetType:  utilipset.HashIPPortIP,
      }
      if valid := proxier.ipsetList[kubeLoopBackIPSet].validateEntry(entry); !valid {
        klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeLoopBackIPSet].Name)
        continue
      }
      // ipset 条目插入到 KUBE-LOOP-BACK 集下
      proxier.ipsetList[kubeLoopBackIPSet].activeEntries.Insert(entry.String())
    }

    // Capture the clusterIP.
    // 获取 clusterIP 配置 ipset 集
    // ipset call
    entry := &utilipset.Entry{
      IP:       svcInfo.ClusterIP().String(),
      Port:     svcInfo.Port(),
      Protocol: protocol,
      SetType:  utilipset.HashIPPort,
    }
    // add service Cluster IP:Port to kubeServiceAccess ip set for the purpose of solving hairpin.
    // service IP 加入到 kubeServiceAccess ip set 下是为了解决 hairpin ?
    // proxier.kubeServiceAccessSet.activeEntries.Insert(entry.String()) 
    if valid := proxier.ipsetList[kubeClusterIPSet].validateEntry(entry); !valid {
      klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeClusterIPSet].Name)
      continue
    }
    // ipset 条目插入到 KUBE-CLUSTER-IP 表下
    proxier.ipsetList[kubeClusterIPSet].activeEntries.Insert(entry.String())
    // ipvs call,定义 ipvs 规则
    serv := &utilipvs.VirtualServer{
      Address:   svcInfo.ClusterIP(),
      Port:      uint16(svcInfo.Port()),
      Protocol:  string(svcInfo.Protocol()),
      Scheduler: proxier.ipvsScheduler,
    }
    // Set session affinity flag and timeout for IPVS service
    if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
      serv.Flags |= utilipvs.FlagPersistent
      serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds())
    }
    // We need to bind ClusterIP to dummy interface, so set `bindAddr` parameter to `true` in syncService()
    // 同步 ipvs 规则, IPVS 规则不存在则创建, 存在则检查是否有变更,更新规则, 更新期间 vip 不会 down 掉
    if err := proxier.syncService(svcNameString, serv, true, bindedAddresses); err == nil {
      activeIPVSServices[serv.String()] = true
      activeBindAddrs[serv.Address.String()] = true
      // ExternalTrafficPolicy only works for NodePort and external LB traffic, does not affect ClusterIP
      // So we still need clusterIP rules in onlyNodeLocalEndpoints mode.
      // 检查有没有开启 internalNodeLocal 特性,只路由内部流量到和发起方处于相同节点的服务端点
      internalNodeLocal := false
      if utilfeature.DefaultFeatureGate.Enabled(features.ServiceInternalTrafficPolicy) && svcInfo.NodeLocalInternal() {
        internalNodeLocal = true
      }
      if err := proxier.syncEndpoint(svcName, internalNodeLocal, serv); err != nil {
        klog.ErrorS(err, "Failed to sync endpoint for service", "service", serv.String())
      }
    } else {
      klog.ErrorS(err, "Failed to sync service", "service", serv.String())
    }

    // Capture externalIPs.
    // 获取 externalIPs 配置 ipset 集
    for _, externalIP := range svcInfo.ExternalIPStrings() {
      // ipset call
      entry := &utilipset.Entry{
        IP:       externalIP,
        Port:     svcInfo.Port(),
        Protocol: protocol,
        SetType:  utilipset.HashIPPort,
      }

      // 检查策略是否为 Local
      if svcInfo.NodeLocalExternal() {
        if valid := proxier.ipsetList[kubeExternalIPLocalSet].validateEntry(entry); !valid {
          klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeExternalIPLocalSet].Name)
          continue
        }
        // 加入到 KUBE-EXTERNAL-IP-LOCAL ip set
        proxier.ipsetList[kubeExternalIPLocalSet].activeEntries.Insert(entry.String())
      } else {
        // We have to SNAT packets to external IPs.
        if valid := proxier.ipsetList[kubeExternalIPSet].validateEntry(entry); !valid {
          klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeExternalIPSet].Name)
          continue
        }
        // 加入到 KUBE-EXTERNAL-IP ip set
        proxier.ipsetList[kubeExternalIPSet].activeEntries.Insert(entry.String())
      }

      // ipvs call
      // 创建 ipvs 规则
      serv := &utilipvs.VirtualServer{
        Address:   net.ParseIP(externalIP),
        Port:      uint16(svcInfo.Port()),
        Protocol:  string(svcInfo.Protocol()),
        Scheduler: proxier.ipvsScheduler,
      }
      if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
        serv.Flags |= utilipvs.FlagPersistent
        serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds())
      }
      if err := proxier.syncService(svcNameString, serv, true, bindedAddresses); err == nil {
        activeIPVSServices[serv.String()] = true
        activeBindAddrs[serv.Address.String()] = true

        if err := proxier.syncEndpoint(svcName, svcInfo.NodeLocalExternal(), serv); err != nil {
          klog.ErrorS(err, "Failed to sync endpoint for service", "service", serv.String())
        }
      } else {
        klog.ErrorS(err, "Failed to sync service", "service", serv.String())
      }
    }

    // Capture load-balancer ingress.
    // 获取 lb 入口 配置 ipset 集
    for _, ingress := range svcInfo.LoadBalancerIPStrings() {
      if ingress != "" {
        // ipset call
        entry = &utilipset.Entry{
          IP:       ingress,
          Port:     svcInfo.Port(),
          Protocol: protocol,
          SetType:  utilipset.HashIPPort,
        }
        // add service load balancer ingressIP:Port to kubeServiceAccess ip set for the purpose of solving hairpin.
        // proxier.kubeServiceAccessSet.activeEntries.Insert(entry.String())
        // If we are proxying globally, we need to masquerade in case we cross nodes.
        // If we are proxying only locally, we can retain the source IP.
        if valid := proxier.ipsetList[kubeLoadBalancerSet].validateEntry(entry); !valid {
          klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeLoadBalancerSet].Name)
          continue
        }
        // 加入到 KUBE-LOAD-BALANCER ip set
        proxier.ipsetList[kubeLoadBalancerSet].activeEntries.Insert(entry.String())
        // insert loadbalancer entry to lbIngressLocalSet if service externaltrafficpolicy=local
        // 策略 externaltrafficpolicy=local
        if svcInfo.NodeLocalExternal() {
          if valid := proxier.ipsetList[kubeLoadBalancerLocalSet].validateEntry(entry); !valid {
            klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeLoadBalancerLocalSet].Name)
            continue
          }
          proxier.ipsetList[kubeLoadBalancerLocalSet].activeEntries.Insert(entry.String())
        }
        if len(svcInfo.LoadBalancerSourceRanges()) != 0 {
          // The service firewall rules are created based on ServiceSpec.loadBalancerSourceRanges field.
          // This currently works for loadbalancers that preserves source ips.
          // For loadbalancers which direct traffic to service NodePort, the firewall rules will not apply.
          // KUBE-LOAD-BALANCER-FW ip set
          if valid := proxier.ipsetList[kubeLoadbalancerFWSet].validateEntry(entry); !valid {
            klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeLoadbalancerFWSet].Name)
            continue
          }
          proxier.ipsetList[kubeLoadbalancerFWSet].activeEntries.Insert(entry.String())
          allowFromNode := false
          for _, src := range svcInfo.LoadBalancerSourceRanges() {
            // ipset call
            entry = &utilipset.Entry{
              IP:       ingress,
              Port:     svcInfo.Port(),
              Protocol: protocol,
              Net:      src,
              SetType:  utilipset.HashIPPortNet,
            }
            // enumerate all white list source cidr KUBE-LOAD-BALANCER-SOURCE-CIDR
            if valid := proxier.ipsetList[kubeLoadBalancerSourceCIDRSet].validateEntry(entry); !valid {
              klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeLoadBalancerSourceCIDRSet].Name)
              continue
            }
            proxier.ipsetList[kubeLoadBalancerSourceCIDRSet].activeEntries.Insert(entry.String())

            // ignore error because it has been validated
            _, cidr, _ := net.ParseCIDR(src)
            if cidr.Contains(proxier.nodeIP) {
              allowFromNode = true
            }
          }
          // generally, ip route rule was added to intercept request to loadbalancer vip from the
          // loadbalancer's backend hosts. In this case, request will not hit the loadbalancer but loop back directly.
          // Need to add the following rule to allow request on host.
          // 放行 lb 白名单
          if allowFromNode {
            entry = &utilipset.Entry{
              IP:       ingress,
              Port:     svcInfo.Port(),
              Protocol: protocol,
              IP2:      ingress,
              SetType:  utilipset.HashIPPortIP,
            }
            // enumerate all white list source ip KUBE-LOAD-BALANCER-SOURCE-IP
            if valid := proxier.ipsetList[kubeLoadBalancerSourceIPSet].validateEntry(entry); !valid {
              klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeLoadBalancerSourceIPSet].Name)
              continue
            }
            proxier.ipsetList[kubeLoadBalancerSourceIPSet].activeEntries.Insert(entry.String())
          }
        }

        // ipvs call
        serv := &utilipvs.VirtualServer{
          Address:   net.ParseIP(ingress),
          Port:      uint16(svcInfo.Port()),
          Protocol:  string(svcInfo.Protocol()),
          Scheduler: proxier.ipvsScheduler,
        }
        if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
          serv.Flags |= utilipvs.FlagPersistent
          serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds())
        }
        if err := proxier.syncService(svcNameString, serv, true, bindedAddresses); err == nil {
          activeIPVSServices[serv.String()] = true
          activeBindAddrs[serv.Address.String()] = true
          if err := proxier.syncEndpoint(svcName, svcInfo.NodeLocalExternal(), serv); err != nil {
            klog.ErrorS(err, "Failed to sync endpoint for service", "service", serv)
          }
        } else {
          klog.ErrorS(err, "Failed to sync service", "service", serv)
        }
      }
    }

    // 针对 NodePort 场景配置 ipset 集
    if svcInfo.NodePort() != 0 {
      if len(nodeAddresses) == 0 || len(nodeIPs) == 0 {
        // Skip nodePort configuration since an error occurred when
        // computing nodeAddresses or nodeIPs.
        continue
      }

      var lps []utilnet.LocalPort
      for _, address := range nodeAddresses {
        lp := utilnet.LocalPort{
          Description: "nodePort for " + svcNameString,
          IP:          address,
          IPFamily:    localPortIPFamily,
          Port:        svcInfo.NodePort(),
          Protocol:    utilnet.Protocol(svcInfo.Protocol()),
        }
        if utilproxy.IsZeroCIDR(address) {
          // Empty IP address means all
          lp.IP = ""
          lps = append(lps, lp)
          // If we encounter a zero CIDR, then there is no point in processing the rest of the addresses.
          break
        }
        lps = append(lps, lp)
      }

      // For ports on node IPs, open the actual port and hold it.
      for _, lp := range lps {
        if svcInfo.Protocol() != v1.ProtocolSCTP && lp.Protocol == utilnet.UDP {
          conntrack.ClearEntriesForPort(proxier.exec, lp.Port, isIPv6, v1.ProtocolUDP)
        }
      }

      // Nodeports need SNAT, unless they're local.
      // 回程需要 snat 处理
      // ipset call

      var (
        nodePortSet *IPSet
        entries     []*utilipset.Entry
      )

      switch protocol {
      case utilipset.ProtocolTCP:
        nodePortSet = proxier.ipsetList[kubeNodePortSetTCP]
        entries = []*utilipset.Entry{{
          // No need to provide ip info
          Port:     svcInfo.NodePort(),
          Protocol: protocol,
          SetType:  utilipset.BitmapPort,
        }}
      case utilipset.ProtocolUDP:
        nodePortSet = proxier.ipsetList[kubeNodePortSetUDP]
        entries = []*utilipset.Entry{{
          // No need to provide ip info
          Port:     svcInfo.NodePort(),
          Protocol: protocol,
          SetType:  utilipset.BitmapPort,
        }}
      case utilipset.ProtocolSCTP:
        nodePortSet = proxier.ipsetList[kubeNodePortSetSCTP]
        // Since hash ip:port is used for SCTP, all the nodeIPs to be used in the SCTP ipset entries.
        entries = []*utilipset.Entry{}
        for _, nodeIP := range nodeIPs {
          entries = append(entries, &utilipset.Entry{
            IP:       nodeIP.String(),
            Port:     svcInfo.NodePort(),
            Protocol: protocol,
            SetType:  utilipset.HashIPPort,
          })
        }
      default:
        // It should never hit
        klog.ErrorS(nil, "Unsupported protocol type", "protocol", protocol)
      }
      if nodePortSet != nil {
        entryInvalidErr := false
        for _, entry := range entries {
          if valid := nodePortSet.validateEntry(entry); !valid {
            klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", nodePortSet.Name)
            entryInvalidErr = true
            break
          }
          nodePortSet.activeEntries.Insert(entry.String())
        }
        if entryInvalidErr {
          continue
        }
      }

      // Add externaltrafficpolicy=local type nodeport entry
      if svcInfo.NodeLocalExternal() {
        var nodePortLocalSet *IPSet
        switch protocol {
        case utilipset.ProtocolTCP:
          nodePortLocalSet = proxier.ipsetList[kubeNodePortLocalSetTCP]
        case utilipset.ProtocolUDP:
          nodePortLocalSet = proxier.ipsetList[kubeNodePortLocalSetUDP]
        case utilipset.ProtocolSCTP:
          nodePortLocalSet = proxier.ipsetList[kubeNodePortLocalSetSCTP]
        default:
          // It should never hit
          klog.ErrorS(nil, "Unsupported protocol type", "protocol", protocol)
        }
        if nodePortLocalSet != nil {
          entryInvalidErr := false
          for _, entry := range entries {
            if valid := nodePortLocalSet.validateEntry(entry); !valid {
              klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", nodePortLocalSet.Name)
              entryInvalidErr = true
              break
            }
            nodePortLocalSet.activeEntries.Insert(entry.String())
          }
          if entryInvalidErr {
            continue
          }
        }
      }

      // Build ipvs kernel routes for each node ip address
      // 为节点 ip 配置 ipvs 规则
      for _, nodeIP := range nodeIPs {
        // ipvs call
        // 创建 ipvs 规则
        serv := &utilipvs.VirtualServer{
          Address:   nodeIP,
          Port:      uint16(svcInfo.NodePort()),
          Protocol:  string(svcInfo.Protocol()),
          Scheduler: proxier.ipvsScheduler,
        }
        if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
          serv.Flags |= utilipvs.FlagPersistent
          serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds())
        }
        // There is no need to bind Node IP to dummy interface, so set parameter `bindAddr` to `false`.
        if err := proxier.syncService(svcNameString, serv, false, bindedAddresses); err == nil {
          activeIPVSServices[serv.String()] = true
          if err := proxier.syncEndpoint(svcName, svcInfo.NodeLocalExternal(), serv); err != nil {
            klog.ErrorS(err, "Failed to sync endpoint for service", "service", serv)
          }
        } else {
          klog.ErrorS(err, "Failed to sync service", "service", serv)
        }
      }
    }

    if svcInfo.HealthCheckNodePort() != 0 {
      nodePortSet := proxier.ipsetList[kubeHealthCheckNodePortSet]
      entry := &utilipset.Entry{
        // No need to provide ip info
        Port:     svcInfo.HealthCheckNodePort(),
        Protocol: "tcp",
        SetType:  utilipset.BitmapPort,
      }

      if valid := nodePortSet.validateEntry(entry); !valid {
        klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", nodePortSet.Name)
        continue
      }
      nodePortSet.activeEntries.Insert(entry.String())
    }
  }

同步规则

writeIptablesRules() 会根据不同的场景判断是否创建与 ipset 对应的 iptables 规则,整理过程过于痛苦,查阅资料过程中,发现别人已经有整理好的了,于是做了一回伸手党。引用这里的

  1. 同步 ipset 条目,写入到节点系统。

  2. 根据前面配置的 ipset ,配置 iptables 规则。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
  // sync ipset entries
  // 同步 ipset 条目,写入到节点系统。
  for _, set := range proxier.ipsetList {
    set.syncIPSetEntries()
  }

  // Tail call iptables rules for ipset, make sure only call iptables once
  // in a single loop per ip set.
  // 每次循环只调用一次配置 ipset 集的 iptables 规则,参考 https://www.qikqiak.com/post/how-to-use-ipvs-in-kubernetes/
  proxier.writeIptablesRules()

  // Sync iptables rules.
  // NOTE: NoFlushTables is used so we don't flush non-kubernetes chains in the table.
  proxier.iptablesData.Reset()
  proxier.iptablesData.Write(proxier.natChains.Bytes())
  proxier.iptablesData.Write(proxier.natRules.Bytes())
  proxier.iptablesData.Write(proxier.filterChains.Bytes())
  proxier.iptablesData.Write(proxier.filterRules.Bytes())

  klog.V(5).InfoS("Restoring iptables", "rules", string(proxier.iptablesData.Bytes()))
  err = proxier.iptables.RestoreAll(proxier.iptablesData.Bytes(), utiliptables.NoFlushTables, utiliptables.RestoreCounters)
  if err != nil {
    klog.ErrorS(err, "Failed to execute iptables-restore", "rules", string(proxier.iptablesData.Bytes()))
    metrics.IptablesRestoreFailuresTotal.Inc()
    return
  }

2.1 如果 kube-proxy 配置了–masquerade-all=true参数,则 ipvs 将伪装所有访问 Service 的 Cluster IP 的流量,此时的行为和 iptables 是一致的,由 ipvs 添加的 iptables 规则如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# iptables -t nat -nL

Chain PREROUTING (policy ACCEPT)
target     prot opt source               destination
KUBE-SERVICES  all  --  0.0.0.0/0            0.0.0.0/0            /* kubernetes service portals */

Chain OUTPUT (policy ACCEPT)
target     prot opt source               destination
KUBE-SERVICES  all  --  0.0.0.0/0            0.0.0.0/0            /* kubernetes service portals */

Chain POSTROUTING (policy ACCEPT)
target     prot opt source               destination
KUBE-POSTROUTING  all  --  0.0.0.0/0            0.0.0.0/0            /* kubernetes postrouting rules */

Chain KUBE-MARK-MASQ (2 references)
target     prot opt source               destination
MARK       all  --  0.0.0.0/0            0.0.0.0/0            MARK or 0x4000

Chain KUBE-POSTROUTING (1 references)
target     prot opt source               destination
MASQUERADE  all  --  0.0.0.0/0            0.0.0.0/0            /* kubernetes service traffic requiring SNAT */ mark match 0x4000/0x4000
MASQUERADE  all  --  0.0.0.0/0            0.0.0.0/0            match-set KUBE-LOOP-BACK dst,dst,src

Chain KUBE-SERVICES (2 references)
target     prot opt source               destination
KUBE-MARK-MASQ  all  --  0.0.0.0/0            0.0.0.0/0            match-set KUBE-CLUSTER-IP dst,dst
ACCEPT     all  --  0.0.0.0/0            0.0.0.0/0            match-set KUBE-CLUSTER-IP dst,dst

2.2 如果 kube-proxy 配置了--cluster-cidr=<cidr>参数,则 ipvs 会伪装所有访问 Service Cluster IP 的外部流量,其行为和 iptables 相同,假设 kube-proxy 提供的集群 CIDR 值为:10.244.16.0/24,那么 ipvs 添加的 iptables 规则应该如下所示:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# iptables -t nat -nL

Chain PREROUTING (policy ACCEPT)
target     prot opt source               destination
KUBE-SERVICES  all  --  0.0.0.0/0            0.0.0.0/0            /* kubernetes service portals */

Chain OUTPUT (policy ACCEPT)
target     prot opt source               destination
KUBE-SERVICES  all  --  0.0.0.0/0            0.0.0.0/0            /* kubernetes service portals */

Chain POSTROUTING (policy ACCEPT)
target     prot opt source               destination
KUBE-POSTROUTING  all  --  0.0.0.0/0            0.0.0.0/0            /* kubernetes postrouting rules */

Chain KUBE-MARK-MASQ (3 references)
target     prot opt source               destination
MARK       all  --  0.0.0.0/0            0.0.0.0/0            MARK or 0x4000

Chain KUBE-POSTROUTING (1 references)
target     prot opt source               destination
MASQUERADE  all  --  0.0.0.0/0            0.0.0.0/0            /* kubernetes service traffic requiring SNAT */ mark match 0x4000/0x4000
MASQUERADE  all  --  0.0.0.0/0            0.0.0.0/0            match-set KUBE-LOOP-BACK dst,dst,src

Chain KUBE-SERVICES (2 references)
target     prot opt source               destination
KUBE-MARK-MASQ  all  -- !10.244.16.0/24       0.0.0.0/0            match-set KUBE-CLUSTER-IP dst,dst
ACCEPT     all  --  0.0.0.0/0            0.0.0.0/0            match-set KUBE-CLUSTER-IP dst,dst

2.3 对于loadBalancer类型的服务,ipvs 将安装匹配 KUBE-LOAD-BALANCER 的 ipset 的 iptables 规则。特别当服务的 LoadBalancerSourceRanges 被指定或指定 externalTrafficPolicy=local 的时候,ipvs 将创建 ipset 集合KUBE-LOAD-BALANCER-LOCAL/KUBE-LOAD-BALANCER-FW/KUBE-LOAD-BALANCER-SOURCE-CIDR,并添加相应的 iptables 规则,如下所示规则:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# iptables -t nat -nL

Chain PREROUTING (policy ACCEPT)
target     prot opt source               destination
KUBE-SERVICES  all  --  0.0.0.0/0            0.0.0.0/0            /* kubernetes service portals */

Chain OUTPUT (policy ACCEPT)
target     prot opt source               destination
KUBE-SERVICES  all  --  0.0.0.0/0            0.0.0.0/0            /* kubernetes service portals */

Chain POSTROUTING (policy ACCEPT)
target     prot opt source               destination
KUBE-POSTROUTING  all  --  0.0.0.0/0            0.0.0.0/0            /* kubernetes postrouting rules */

Chain KUBE-FIREWALL (1 references)
target     prot opt source               destination
RETURN     all  --  0.0.0.0/0            0.0.0.0/0            match-set KUBE-LOAD-BALANCER-SOURCE-CIDR dst,dst,src
KUBE-MARK-DROP  all  --  0.0.0.0/0            0.0.0.0/0

Chain KUBE-LOAD-BALANCER (1 references)
target     prot opt source               destination
KUBE-FIREWALL  all  --  0.0.0.0/0            0.0.0.0/0            match-set KUBE-LOAD-BALANCER-FW dst,dst
RETURN     all  --  0.0.0.0/0            0.0.0.0/0            match-set KUBE-LOAD-BALANCER-LOCAL dst,dst
KUBE-MARK-MASQ  all  --  0.0.0.0/0            0.0.0.0/0

Chain KUBE-MARK-DROP (1 references)
target     prot opt source               destination
MARK       all  --  0.0.0.0/0            0.0.0.0/0            MARK or 0x8000

Chain KUBE-MARK-MASQ (2 references)
target     prot opt source               destination
MARK       all  --  0.0.0.0/0            0.0.0.0/0            MARK or 0x4000

Chain KUBE-POSTROUTING (1 references)
target     prot opt source               destination
MASQUERADE  all  --  0.0.0.0/0            0.0.0.0/0            /* kubernetes service traffic requiring SNAT */ mark match 0x4000/0x4000
MASQUERADE  all  --  0.0.0.0/0            0.0.0.0/0            match-set KUBE-LOOP-BACK dst,dst,src

Chain KUBE-SERVICES (2 references)
target     prot opt source               destination
KUBE-LOAD-BALANCER  all  --  0.0.0.0/0            0.0.0.0/0            match-set KUBE-LOAD-BALANCER dst,dst
ACCEPT     all  --  0.0.0.0/0            0.0.0.0/0            match-set KUBE-LOAD-BALANCER dst,dst

2.4 对于 NodePort 类型的服务,ipvs 将添加匹配KUBE-NODE-PORT-TCP/KUBE-NODE-PORT-UDP的 ipset 的iptables 规则。当指定externalTrafficPolicy=local时,ipvs 将创建 ipset 集KUBE-NODE-PORT-LOCAL-TC/KUBE-NODE-PORT-LOCAL-UDP并安装相应的 iptables 规则,如下所示:(假设服务使用 TCP 类型 nodePort)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
Chain PREROUTING (policy ACCEPT)
target     prot opt source               destination
KUBE-SERVICES  all  --  0.0.0.0/0            0.0.0.0/0            /* kubernetes service portals */

Chain OUTPUT (policy ACCEPT)
target     prot opt source               destination
KUBE-SERVICES  all  --  0.0.0.0/0            0.0.0.0/0            /* kubernetes service portals */

Chain POSTROUTING (policy ACCEPT)
target     prot opt source               destination
KUBE-POSTROUTING  all  --  0.0.0.0/0            0.0.0.0/0            /* kubernetes postrouting rules */

Chain KUBE-MARK-MASQ (2 references)
target     prot opt source               destination
MARK       all  --  0.0.0.0/0            0.0.0.0/0            MARK or 0x4000

Chain KUBE-NODE-PORT (1 references)
target     prot opt source               destination
RETURN     all  --  0.0.0.0/0            0.0.0.0/0            match-set KUBE-NODE-PORT-LOCAL-TCP dst
KUBE-MARK-MASQ  all  --  0.0.0.0/0            0.0.0.0/0

Chain KUBE-POSTROUTING (1 references)
target     prot opt source               destination
MASQUERADE  all  --  0.0.0.0/0            0.0.0.0/0            /* kubernetes service traffic requiring SNAT */ mark match 0x4000/0x4000
MASQUERADE  all  --  0.0.0.0/0            0.0.0.0/0            match-set KUBE-LOOP-BACK dst,dst,src

Chain KUBE-SERVICES (2 references)
target     prot opt source               destination
KUBE-NODE-PORT  all  --  0.0.0.0/0            0.0.0.0/0            match-set KUBE-NODE-PORT-TCP dst

2.5 对于指定了externalIPs的 Service,ipvs 会安装匹配KUBE-EXTERNAL-IP ipset 集的 iptables 规则,假设我们有指定了 externalIPs 的 Service,则 iptables 规则应该如下所示:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
Chain PREROUTING (policy ACCEPT)
target     prot opt source               destination
KUBE-SERVICES  all  --  0.0.0.0/0            0.0.0.0/0            /* kubernetes service portals */

Chain OUTPUT (policy ACCEPT)
target     prot opt source               destination
KUBE-SERVICES  all  --  0.0.0.0/0            0.0.0.0/0            /* kubernetes service portals */

Chain POSTROUTING (policy ACCEPT)
target     prot opt source               destination
KUBE-POSTROUTING  all  --  0.0.0.0/0            0.0.0.0/0            /* kubernetes postrouting rules */

Chain KUBE-MARK-MASQ (2 references)
target     prot opt source               destination
MARK       all  --  0.0.0.0/0            0.0.0.0/0            MARK or 0x4000

Chain KUBE-POSTROUTING (1 references)
target     prot opt source               destination
MASQUERADE  all  --  0.0.0.0/0            0.0.0.0/0            /* kubernetes service traffic requiring SNAT */ mark match 0x4000/0x4000
MASQUERADE  all  --  0.0.0.0/0            0.0.0.0/0            match-set KUBE-LOOP-BACK dst,dst,src

Chain KUBE-SERVICES (2 references)
target     prot opt source               destination
KUBE-MARK-MASQ  all  --  0.0.0.0/0            0.0.0.0/0            match-set KUBE-EXTERNAL-IP dst,dst
ACCEPT     all  --  0.0.0.0/0            0.0.0.0/0            match-set KUBE-EXTERNAL-IP dst,dst PHYSDEV match ! --physdev-is-in ADDRTYPE match src-type !LOCAL
ACCEPT     all  --  0.0.0.0/0            0.0.0.0/0            match-set KUBE-EXTERNAL-IP dst,dst ADDRTYPE match dst-type LOCAL

收尾工作

  1. 清理遗留的 ipvs 规则,清理 conntrack 。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
  for name, lastChangeTriggerTimes := range endpointUpdateResult.LastChangeTriggerTimes {
    for _, lastChangeTriggerTime := range lastChangeTriggerTimes {
      latency := metrics.SinceInSeconds(lastChangeTriggerTime)
      metrics.NetworkProgrammingLatency.Observe(latency)
      klog.V(4).InfoS("Network programming", "endpoint", klog.KRef(name.Namespace, name.Name), "elapsed", latency)
    }
  }

  // Get legacy bind address
  // currentBindAddrs represents ip addresses bind to DefaultDummyDevice from the system
  currentBindAddrs, err := proxier.netlinkHandle.ListBindAddress(DefaultDummyDevice)
  if err != nil {
    klog.ErrorS(err, "Failed to get bind address")
  }
  legacyBindAddrs := proxier.getLegacyBindAddr(activeBindAddrs, currentBindAddrs)

  // Clean up legacy IPVS services and unbind addresses
  appliedSvcs, err := proxier.ipvs.GetVirtualServers()
  if err == nil {
    for _, appliedSvc := range appliedSvcs {
      currentIPVSServices[appliedSvc.String()] = appliedSvc
    }
  } else {
    klog.ErrorS(err, "Failed to get ipvs service")
  }
  proxier.cleanLegacyService(activeIPVSServices, currentIPVSServices, legacyBindAddrs)

  if proxier.healthzServer != nil {
    proxier.healthzServer.Updated()
  }
  metrics.SyncProxyRulesLastTimestamp.SetToCurrentTime()

  // Update service healthchecks.  The endpoints list might include services that are
  // not "OnlyLocal", but the services list will not, and the serviceHealthServer
  // will just drop those endpoints.
  if err := proxier.serviceHealthServer.SyncServices(serviceUpdateResult.HCServiceNodePorts); err != nil {
    klog.ErrorS(err, "Error syncing healthcheck services")
  }
  if err := proxier.serviceHealthServer.SyncEndpoints(endpointUpdateResult.HCEndpointsLocalIPSize); err != nil {
    klog.ErrorS(err, "Error syncing healthcheck endpoints")
  }

  // Finish housekeeping.
  // TODO: these could be made more consistent.
  for _, svcIP := range staleServices.UnsortedList() {
    if err := conntrack.ClearEntriesForIP(proxier.exec, svcIP, v1.ProtocolUDP); err != nil {
      klog.ErrorS(err, "Failed to delete stale service IP connections", "ip", svcIP)
    }
  }
  proxier.deleteEndpointConnections(endpointUpdateResult.StaleEndpoints)
}

总结

proxy 核心逻辑实现这一块的代码相当长,需要对 iptables 和 ipvs 有一定的知识储备才能流畅阅读,像 ipset 这一块,我也是在看的过程中现学的,理解没那么透彻,同步规则 这一块,我也是完整看过了,不过不好组织语言,引用的是别人的内容。