目录

kubernetes源码-kube-proxy 原理和源码分析(一)

kube-proxy 原理和源码分析,源码为 kubernetes 的 release-1.22 分支 .

写在前面

kube-proxy 是核心组件之一。在 endpoints 控制器那里,我们有提到过它,它是 service 的具体实现。

入口函数

位于:cmd/kube-proxy/proxy.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
func main() {
  rand.Seed(time.Now().UnixNano())

  command := app.NewProxyCommand()

  // TODO: once we switch everything over to Cobra commands, we can go back to calling
  // utilflag.InitFlags() (by removing its pflag.Parse() call). For now, we have to set the
  // normalize func and add the go flag set by hand.
  pflag.CommandLine.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)
  pflag.CommandLine.AddGoFlagSet(goflag.CommandLine)
  // utilflag.InitFlags()
  logs.InitLogs()
  defer logs.FlushLogs()
  // 执行启动方法
  if err := command.Execute(); err != nil {
    os.Exit(1)
  }
}

跟随 app.NewProxyCommand() ,我们可以看到,实际上 command.Execute() 它是去执行 opts.Run() 。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
func NewProxyCommand() *cobra.Command {
  opts := NewOptions()

  cmd := &cobra.Command{
    Use: "kube-proxy",
    Long: `The Kubernetes network proxy runs on each node. This
reflects services as defined in the Kubernetes API on each node and can do simple
TCP, UDP, and SCTP stream forwarding or round robin TCP, UDP, and SCTP forwarding across a set of backends.
Service cluster IPs and ports are currently found through Docker-links-compatible
environment variables specifying ports opened by the service proxy. There is an optional
addon that provides cluster DNS for these cluster IPs. The user must create a service
with the apiserver API to configure the proxy.`,
    Run: func(cmd *cobra.Command, args []string) {
      verflag.PrintAndExitIfRequested()
      cliflag.PrintFlags(cmd.Flags())

      if err := initForOS(opts.WindowsService); err != nil {
        klog.Fatalf("failed OS init: %v", err)
      }

      if err := opts.Complete(); err != nil {
        klog.Fatalf("failed complete: %v", err)
      }

      if err := opts.Validate(); err != nil {
        klog.Fatalf("failed validate: %v", err)
      }
      // 这里执行启动函数
      if err := opts.Run(); err != nil {
        klog.Exit(err)
      }
    },
    Args: func(cmd *cobra.Command, args []string) error {
      for _, arg := range args {
        if len(arg) > 0 {
          return fmt.Errorf("%q does not take any arguments, got %q", cmd.CommandPath(), args)
        }
      }
      return nil
    },
  }

  var err error
  opts.config, err = opts.ApplyDefaults(opts.config)
  if err != nil {
    klog.Fatalf("unable to create flag defaults: %v", err)
  }

  opts.AddFlags(cmd.Flags())

  // TODO handle error
  cmd.MarkFlagFilename("config", "yaml", "yml", "json")

  return cmd
}

opts.Run() 是启动 kube-proxy 的入口,跟随 NewOptions() 构造函数,我们看到它返回的是 Options 结构体。

Options 实现了 Run 接口。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
func (o *Options) Run() error {
  defer close(o.errCh)
  if len(o.WriteConfigTo) > 0 {
    return o.writeConfigFile()
  }

  proxyServer, err := NewProxyServer(o)
  if err != nil {
    return err
  }

  if o.CleanupAndExit {
    return proxyServer.CleanupAndExit()
  }

  o.proxyServer = proxyServer
  // 最终调用 o.runLoop()
  return o.runLoop()
}

Run 接口最终调用 o.runLoop() ,我们跟随它。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
func (o *Options) runLoop() error {
  if o.watcher != nil {
    o.watcher.Run()
  }

  // run the proxy in goroutine
  go func() {
    err := o.proxyServer.Run()
    o.errCh <- err
  }()

  for {
    err := <-o.errCh
    if err != nil {
      return err
    }
  }
}

发现它是调用 o.proxyServer.Run()

NewProxyServer()

这里会判断是 ipvs 、 iptables 或者是 userspace 模式,并构造 proxier ,最终返回 &ProxyServer 对象,并调用它的 Run() 接口。

proxyServer 是通过 NewProxyServer() 构造函数创建的。

位于: cmd/kube-proxy/app/server_others.go 。

  1. 检查是否所有 ipvs 必须的内核模块是否可以被加载,如果正常,返回 ipvsInterface 用于调用 ipvs 的各种接口。

  2. 根据配置参数检查使用哪种代理模式,返回 proxyMode 。判断是否检查本地流量,即同一节点的两个pod交互的流量(这里还不是很清楚用途)。

  3. 设置 ip 协议栈,默认是 ipv4 (会检查是否同时启用 ipv4 和 ipv6)。初始化 iptInterface ,用于执行 iptables 命令。

  4. 根据第 5 步返回的 proxyMode 创建 proxy.Provider 类型的 proxier 对象。

  5. 返回 &ProxyServer 对象,我们从上面得知,它的重点接口是 Run() ,我接下来看看 Run() 是怎么实现的。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
// NewProxyServer() 是调用 newProxyServer() 完成构造的。
func NewProxyServer(o *Options) (*ProxyServer, error) {
  return newProxyServer(o.config, o.CleanupAndExit, o.master)
}

func newProxyServer(
  config *proxyconfigapi.KubeProxyConfiguration,
  cleanupAndExit bool,
  master string) (*ProxyServer, error) {

  if config == nil {
    return nil, errors.New("config is required")
  }
  // 初始化配置
  if c, err := configz.New(proxyconfigapi.GroupName); err == nil {
    c.Set(config)
  } else {
    return nil, fmt.Errorf("unable to register configz: %s", err)
  }

  var iptInterface utiliptables.Interface
  var ipvsInterface utilipvs.Interface
  var kernelHandler ipvs.KernelHandler
  var ipsetInterface utilipset.Interface

  // Create a iptables utils.
  execer := exec.New()

  kernelHandler = ipvs.NewLinuxKernelHandler()
  ipsetInterface = utilipset.New(execer)
  canUseIPVS, err := ipvs.CanUseIPVSProxier(kernelHandler, ipsetInterface, config.IPVS.Scheduler)
  if string(config.Mode) == proxyModeIPVS && err != nil {
    klog.Errorf("Can't use the IPVS proxier: %v", err)
  }

  if canUseIPVS {
    ipvsInterface = utilipvs.New(execer)
  }

  // We omit creation of pretty much everything if we run in cleanup mode
  if cleanupAndExit {
    return &ProxyServer{
      execer:         execer,
      IpvsInterface:  ipvsInterface,
      IpsetInterface: ipsetInterface,
    }, nil
  }

  if len(config.ShowHiddenMetricsForVersion) > 0 {
    metrics.SetShowHidden()
  }

  hostname, err := utilnode.GetHostname(config.HostnameOverride)
  if err != nil {
    return nil, err
  }

  client, eventClient, err := createClients(config.ClientConnection, master)
  if err != nil {
    return nil, err
  }

  nodeIP := detectNodeIP(client, hostname, config.BindAddress)
  klog.Infof("Detected node IP %s", nodeIP.String())

  // Create event recorder
  eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()})
  recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "kube-proxy")

  nodeRef := &v1.ObjectReference{
    Kind:      "Node",
    Name:      hostname,
    UID:       types.UID(hostname),
    Namespace: "",
  }

  var healthzServer healthcheck.ProxierHealthUpdater
  if len(config.HealthzBindAddress) > 0 {
    healthzServer = healthcheck.NewProxierHealthServer(config.HealthzBindAddress, 2*config.IPTables.SyncPeriod.Duration, recorder, nodeRef)
  }

  var proxier proxy.Provider
  var detectLocalMode proxyconfigapi.LocalMode

  // 检查代理模式
  proxyMode := getProxyMode(string(config.Mode), canUseIPVS, iptables.LinuxKernelCompatTester{})
  detectLocalMode, err = getDetectLocalMode(config)
  if err != nil {
    return nil, fmt.Errorf("cannot determine detect-local-mode: %v", err)
  }

  var nodeInfo *v1.Node
  if detectLocalMode == proxyconfigapi.LocalModeNodeCIDR {
    klog.Infof("Watching for node %s, awaiting podCIDR allocation", hostname)
    nodeInfo, err = waitForPodCIDR(client, hostname)
    if err != nil {
      return nil, err
    }
    klog.Infof("NodeInfo PodCIDR: %v, PodCIDRs: %v", nodeInfo.Spec.PodCIDR, nodeInfo.Spec.PodCIDRs)
  }

  klog.V(2).Info("DetectLocalMode: '", string(detectLocalMode), "'")

  primaryProtocol := utiliptables.ProtocolIPv4
  if utilsnet.IsIPv6(nodeIP) {
    primaryProtocol = utiliptables.ProtocolIPv6
  }
  iptInterface = utiliptables.New(execer, primaryProtocol)

  var ipt [2]utiliptables.Interface
  dualStack := utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) && proxyMode != proxyModeUserspace
  if dualStack {
    // Create iptables handlers for both families, one is already created
    // Always ordered as IPv4, IPv6
    if primaryProtocol == utiliptables.ProtocolIPv4 {
      ipt[0] = iptInterface
      ipt[1] = utiliptables.New(execer, utiliptables.ProtocolIPv6)

      // Just because the feature gate is enabled doesn't mean the node
      // actually supports dual-stack
      if _, err := ipt[1].ChainExists(utiliptables.TableNAT, utiliptables.ChainPostrouting); err != nil {
        klog.Warningf("No iptables support for IPv6: %v", err)
        dualStack = false
      }
    } else {
      ipt[0] = utiliptables.New(execer, utiliptables.ProtocolIPv4)
      ipt[1] = iptInterface
    }
  }
  if dualStack {
    klog.V(0).Infof("kube-proxy running in dual-stack mode, %s-primary", iptInterface.Protocol())
  } else {
    klog.V(0).Infof("kube-proxy running in single-stack %s mode", iptInterface.Protocol())
  }

  if proxyMode == proxyModeIPTables {
    klog.V(0).Info("Using iptables Proxier.")
    if config.IPTables.MasqueradeBit == nil {
      // MasqueradeBit must be specified or defaulted.
      return nil, fmt.Errorf("unable to read IPTables MasqueradeBit from config")
    }

    if dualStack {
      klog.V(0).Info("creating dualStackProxier for iptables.")

      // Always ordered to match []ipt
      var localDetectors [2]proxyutiliptables.LocalTrafficDetector
      localDetectors, err = getDualStackLocalDetectorTuple(detectLocalMode, config, ipt, nodeInfo)
      if err != nil {
        return nil, fmt.Errorf("unable to create proxier: %v", err)
      }

      // TODO this has side effects that should only happen when Run() is invoked.
      proxier, err = iptables.NewDualStackProxier(
        ipt,
        utilsysctl.New(),
        execer,
        config.IPTables.SyncPeriod.Duration,
        config.IPTables.MinSyncPeriod.Duration,
        config.IPTables.MasqueradeAll,
        int(*config.IPTables.MasqueradeBit),
        localDetectors,
        hostname,
        nodeIPTuple(config.BindAddress),
        recorder,
        healthzServer,
        config.NodePortAddresses,
      )
    } else { // Create a single-stack proxier.
      var localDetector proxyutiliptables.LocalTrafficDetector
      localDetector, err = getLocalDetector(detectLocalMode, config, iptInterface, nodeInfo)
      if err != nil {
        return nil, fmt.Errorf("unable to create proxier: %v", err)
      }

      // TODO this has side effects that should only happen when Run() is invoked.
      proxier, err = iptables.NewProxier(
        iptInterface,
        utilsysctl.New(),
        execer,
        config.IPTables.SyncPeriod.Duration,
        config.IPTables.MinSyncPeriod.Duration,
        config.IPTables.MasqueradeAll,
        int(*config.IPTables.MasqueradeBit),
        localDetector,
        hostname,
        nodeIP,
        recorder,
        healthzServer,
        config.NodePortAddresses,
      )
    }

    if err != nil {
      return nil, fmt.Errorf("unable to create proxier: %v", err)
    }
    proxymetrics.RegisterMetrics()
  } else if proxyMode == proxyModeIPVS {
    klog.V(0).Info("Using ipvs Proxier.")
    if dualStack {
      klog.V(0).Info("creating dualStackProxier for ipvs.")

      nodeIPs := nodeIPTuple(config.BindAddress)

      // Always ordered to match []ipt
      var localDetectors [2]proxyutiliptables.LocalTrafficDetector
      localDetectors, err = getDualStackLocalDetectorTuple(detectLocalMode, config, ipt, nodeInfo)
      if err != nil {
        return nil, fmt.Errorf("unable to create proxier: %v", err)
      }

      proxier, err = ipvs.NewDualStackProxier(
        ipt,
        ipvsInterface,
        ipsetInterface,
        utilsysctl.New(),
        execer,
        config.IPVS.SyncPeriod.Duration,
        config.IPVS.MinSyncPeriod.Duration,
        config.IPVS.ExcludeCIDRs,
        config.IPVS.StrictARP,
        config.IPVS.TCPTimeout.Duration,
        config.IPVS.TCPFinTimeout.Duration,
        config.IPVS.UDPTimeout.Duration,
        config.IPTables.MasqueradeAll,
        int(*config.IPTables.MasqueradeBit),
        localDetectors,
        hostname,
        nodeIPs,
        recorder,
        healthzServer,
        config.IPVS.Scheduler,
        config.NodePortAddresses,
        kernelHandler,
      )
    } else {
      var localDetector proxyutiliptables.LocalTrafficDetector
      localDetector, err = getLocalDetector(detectLocalMode, config, iptInterface, nodeInfo)
      if err != nil {
        return nil, fmt.Errorf("unable to create proxier: %v", err)
      }

      proxier, err = ipvs.NewProxier(
        iptInterface,
        ipvsInterface,
        ipsetInterface,
        utilsysctl.New(),
        execer,
        config.IPVS.SyncPeriod.Duration,
        config.IPVS.MinSyncPeriod.Duration,
        config.IPVS.ExcludeCIDRs,
        config.IPVS.StrictARP,
        config.IPVS.TCPTimeout.Duration,
        config.IPVS.TCPFinTimeout.Duration,
        config.IPVS.UDPTimeout.Duration,
        config.IPTables.MasqueradeAll,
        int(*config.IPTables.MasqueradeBit),
        localDetector,
        hostname,
        nodeIP,
        recorder,
        healthzServer,
        config.IPVS.Scheduler,
        config.NodePortAddresses,
        kernelHandler,
      )
    }
    if err != nil {
      return nil, fmt.Errorf("unable to create proxier: %v", err)
    }
    proxymetrics.RegisterMetrics()
  } else {
    klog.V(0).Info("Using userspace Proxier.")

    // TODO this has side effects that should only happen when Run() is invoked.
    proxier, err = userspace.NewProxier(
      userspace.NewLoadBalancerRR(),
      net.ParseIP(config.BindAddress),
      iptInterface,
      execer,
      *utilnet.ParsePortRangeOrDie(config.PortRange),
      config.IPTables.SyncPeriod.Duration,
      config.IPTables.MinSyncPeriod.Duration,
      config.UDPIdleTimeout.Duration,
      config.NodePortAddresses,
    )
    if err != nil {
      return nil, fmt.Errorf("unable to create proxier: %v", err)
    }
  }

  useEndpointSlices := true
  if proxyMode == proxyModeUserspace {
    // userspace mode doesn't support endpointslice.
    useEndpointSlices = false
  }

  return &ProxyServer{
    Client:                 client,
    EventClient:            eventClient,
    IptInterface:           iptInterface,
    IpvsInterface:          ipvsInterface,
    IpsetInterface:         ipsetInterface,
    execer:                 execer,
    Proxier:                proxier,
    Broadcaster:            eventBroadcaster,
    Recorder:               recorder,
    ConntrackConfiguration: config.Conntrack,
    Conntracker:            &realConntracker{},
    ProxyMode:              proxyMode,
    NodeRef:                nodeRef,
    MetricsBindAddress:     config.MetricsBindAddress,
    BindAddressHardFail:    config.BindAddressHardFail,
    EnableProfiling:        config.EnableProfiling,
    OOMScoreAdj:            config.OOMScoreAdj,
    ConfigSyncPeriod:       config.ConfigSyncPeriod.Duration,
    HealthzServer:          healthzServer,
    UseEndpointSlices:      useEndpointSlices,
  }, nil
}

核心逻辑 Run()

  1. 监控检查。

  2. metrics 数据上报。

  3. 设置 nf_conntrack_max、nf_conntrack_tcp_timeout_established、nf_conntrack_tcp_timeout_close_wait 内核参数的值。

  4. 通过 client-go 从 apiserver 获取 services 和 endpoints/endpointSlice 配置。

  5. 创建相应的 informer 并注册事件函数。

  6. 进入循环。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
func (s *ProxyServer) Run() error {
  // To help debugging, immediately log version
  klog.Infof("Version: %+v", version.Get())

  // 用来跳转OOM参数,保证系统在内存紧张的时候不优先kill掉kube-proxy的进程
  var oomAdjuster *oom.OOMAdjuster
  if s.OOMScoreAdj != nil {
    oomAdjuster = oom.NewOOMAdjuster()
    if err := oomAdjuster.ApplyOOMScoreAdj(0, int(*s.OOMScoreAdj)); err != nil {
      klog.V(2).Info(err)
    }
  }

  if s.Broadcaster != nil && s.EventClient != nil {
    stopCh := make(chan struct{})
    s.Broadcaster.StartRecordingToSink(stopCh)
  }

  // TODO(thockin): make it possible for healthz and metrics to be on the same port.

  var errCh chan error
  if s.BindAddressHardFail {
    errCh = make(chan error)
  }

  // 健康检查
  serveHealthz(s.HealthzServer, errCh)

  // 指标上报
  serveMetrics(s.MetricsBindAddress, s.ProxyMode, s.EnableProfiling, errCh)

  // 内核参数设置
  if s.Conntracker != nil {
    max, err := getConntrackMax(s.ConntrackConfiguration)
    if err != nil {
      return err
    }
    if max > 0 {
      err := s.Conntracker.SetMax(max)
      if err != nil {
        if err != errReadOnlySysFS {
          return err
        }
        // errReadOnlySysFS is caused by a known docker issue (https://github.com/docker/docker/issues/24000),
        // the only remediation we know is to restart the docker daemon.
        // Here we'll send an node event with specific reason and message, the
        // administrator should decide whether and how to handle this issue,
        // whether to drain the node and restart docker.  Occurs in other container runtimes
        // as well.
        // TODO(random-liu): Remove this when the docker bug is fixed.
        const message = "CRI error: /sys is read-only: " +
          "cannot modify conntrack limits, problems may arise later (If running Docker, see docker issue #24000)"
        s.Recorder.Eventf(s.NodeRef, nil, api.EventTypeWarning, err.Error(), "StartKubeProxy", message)
      }
    }

    if s.ConntrackConfiguration.TCPEstablishedTimeout != nil && s.ConntrackConfiguration.TCPEstablishedTimeout.Duration > 0 {
      timeout := int(s.ConntrackConfiguration.TCPEstablishedTimeout.Duration / time.Second)
      if err := s.Conntracker.SetTCPEstablishedTimeout(timeout); err != nil {
        return err
      }
    }

    if s.ConntrackConfiguration.TCPCloseWaitTimeout != nil && s.ConntrackConfiguration.TCPCloseWaitTimeout.Duration > 0 {
      timeout := int(s.ConntrackConfiguration.TCPCloseWaitTimeout.Duration / time.Second)
      if err := s.Conntracker.SetTCPCloseWaitTimeout(timeout); err != nil {
        return err
      }
    }
  }

  // NewRequirement 用来过滤掉 serviceProxyName 和 noheadless 的 endpoint.
  noProxyName, err := labels.NewRequirement(apis.LabelServiceProxyName, selection.DoesNotExist, nil)
  if err != nil {
    return err
  }

  noHeadlessEndpoints, err := labels.NewRequirement(v1.IsHeadlessService, selection.DoesNotExist, nil)
  if err != nil {
    return err
  }

  labelSelector := labels.NewSelector()
  labelSelector = labelSelector.Add(*noProxyName, *noHeadlessEndpoints)

  // 创建 informer 工厂函数,最后创建相应的 informer 用于监听 service, endpointslice 等资源
  informerFactory := informers.NewSharedInformerFactoryWithOptions(s.Client, s.ConfigSyncPeriod,
    informers.WithTweakListOptions(func(options *metav1.ListOptions) {
      options.LabelSelector = labelSelector.String()
    }))

  // Create configs (i.e. Watches for Services and Endpoints or EndpointSlices)
  // Note: RegisterHandler() calls need to happen before creation of Sources because sources
  // only notify on changes, and the initial update (on process start) may be lost if no handlers
  // are registered yet.
  // 开始创建相应的 informer 并注册事件函数
  // 依次是service informer, endpointslieces informer
  serviceConfig := config.NewServiceConfig(informerFactory.Core().V1().Services(), s.ConfigSyncPeriod)
  serviceConfig.RegisterEventHandler(s.Proxier)
  go serviceConfig.Run(wait.NeverStop)

  if s.UseEndpointSlices {
    endpointSliceConfig := config.NewEndpointSliceConfig(informerFactory.Discovery().V1().EndpointSlices(), s.ConfigSyncPeriod)
    endpointSliceConfig.RegisterEventHandler(s.Proxier)
    go endpointSliceConfig.Run(wait.NeverStop)
  } else {
    endpointsConfig := config.NewEndpointsConfig(informerFactory.Core().V1().Endpoints(), s.ConfigSyncPeriod)
    endpointsConfig.RegisterEventHandler(s.Proxier)
    go endpointsConfig.Run(wait.NeverStop)
  }

  // 启动所有 informer 
  informerFactory.Start(wait.NeverStop)

  // 判断是否启用TopologyAwareHints特性以创建 node informer
  if utilfeature.DefaultFeatureGate.Enabled(features.TopologyAwareHints) {
    // Make an informer that selects for our nodename.
    currentNodeInformerFactory := informers.NewSharedInformerFactoryWithOptions(s.Client, s.ConfigSyncPeriod,
      informers.WithTweakListOptions(func(options *metav1.ListOptions) {
        options.FieldSelector = fields.OneTermEqualSelector("metadata.name", s.NodeRef.Name).String()
      }))
    nodeConfig := config.NewNodeConfig(currentNodeInformerFactory.Core().V1().Nodes(), s.ConfigSyncPeriod)
    nodeConfig.RegisterEventHandler(s.Proxier)
    go nodeConfig.Run(wait.NeverStop)

    // This has to start after the calls to NewNodeConfig because that must
    // configure the shared informer event handler first.
    currentNodeInformerFactory.Start(wait.NeverStop)
  }

  // 事件记录 kube-proxy 启动
  s.birthCry()

  go s.Proxier.SyncLoop()
  // 有错就退出
  return <-errCh
}

注册事件函数

我们看 Run() 函数里面有这么一段代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
  serviceConfig := config.NewServiceConfig(informerFactory.Core().V1().Services(), s.ConfigSyncPeriod)
  serviceConfig.RegisterEventHandler(s.Proxier)
  go serviceConfig.Run(wait.NeverStop)

  if s.UseEndpointSlices {
    endpointSliceConfig := config.NewEndpointSliceConfig(informerFactory.Discovery().V1().EndpointSlices(), s.ConfigSyncPeriod)
    endpointSliceConfig.RegisterEventHandler(s.Proxier)
    go endpointSliceConfig.Run(wait.NeverStop)
  } else {
    endpointsConfig := config.NewEndpointsConfig(informerFactory.Core().V1().Endpoints(), s.ConfigSyncPeriod)
    endpointsConfig.RegisterEventHandler(s.Proxier)
    go endpointsConfig.Run(wait.NeverStop)
  }
  1. 创建xxxConfig 对象。

  2. 通过 xxxConfig.RegisterEventHandler() 注册事件函数。

    监听函数基本都由 handleAddxxx、 handleUpdatexxx、 handleDeletexxx 组成,他们分别对应 Proxier 的 OnxxxAdd、 OnxxxUpdate、 OnxxxDelete 函数。

    我们这里拿 serviceConfig 举例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
// ServiceConfig tracks a set of service configurations.
type ServiceConfig struct {
  listerSynced  cache.InformerSynced
  eventHandlers []ServiceHandler
}

// NewServiceConfig creates a new ServiceConfig.
func NewServiceConfig(serviceInformer coreinformers.ServiceInformer, resyncPeriod time.Duration) *ServiceConfig {
  result := &ServiceConfig{
    listerSynced: serviceInformer.Informer().HasSynced,
  }
  
  serviceInformer.Informer().AddEventHandlerWithResyncPeriod(
    cache.ResourceEventHandlerFuncs{
      AddFunc:    result.handleAddService,
      UpdateFunc: result.handleUpdateService,
      DeleteFunc: result.handleDeleteService,
    },
    resyncPeriod,
  )

  return result
}

// RegisterEventHandler registers a handler which is called on every service change.
func (c *ServiceConfig) RegisterEventHandler(handler ServiceHandler) {
  c.eventHandlers = append(c.eventHandlers, handler)
}

// Run waits for cache synced and invokes handlers after syncing.
func (c *ServiceConfig) Run(stopCh <-chan struct{}) {
  klog.Info("Starting service config controller")
  
  // 等待缓存同步完成
  if !cache.WaitForNamedCacheSync("service config", stopCh, c.listerSynced) {
    return
  }

  for i := range c.eventHandlers {
    klog.V(3).Info("Calling handler.OnServiceSynced()")
    c.eventHandlers[i].OnServiceSynced()
  }
}

func (c *ServiceConfig) handleAddService(obj interface{}) {
  service, ok := obj.(*v1.Service)
  if !ok {
    utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
    return
  }
  for i := range c.eventHandlers {
    klog.V(4).Info("Calling handler.OnServiceAdd")
    c.eventHandlers[i].OnServiceAdd(service)
  }
}

func (c *ServiceConfig) handleUpdateService(oldObj, newObj interface{}) {
  oldService, ok := oldObj.(*v1.Service)
  if !ok {
    utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", oldObj))
    return
  }
  service, ok := newObj.(*v1.Service)
  if !ok {
    utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", newObj))
    return
  }
  for i := range c.eventHandlers {
    klog.V(4).Info("Calling handler.OnServiceUpdate")
    c.eventHandlers[i].OnServiceUpdate(oldService, service)
  }
}

func (c *ServiceConfig) handleDeleteService(obj interface{}) {
  service, ok := obj.(*v1.Service)
  if !ok {
    tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
    if !ok {
      utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
      return
    }
    if service, ok = tombstone.Obj.(*v1.Service); !ok {
      utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
      return
    }
  }
  for i := range c.eventHandlers {
    klog.V(4).Info("Calling handler.OnServiceDelete")
    c.eventHandlers[i].OnServiceDelete(service)
  }
}
  1. 调用 xxxConfig.Run(wait.NeverStop) 函数运行。

  2. 第 3 步 Run() 最终调用 c.eventHandlers[i].OnServiceSynced() ,其实就是调用我们注册进去的 Proxier 的 OnServiceSynced() 函数。

  3. 我们后面在 Proxier 的构建会详细介绍 OnServiceSynced() 接口的实现。

进入循环

我们后面在 Proxier 的构建会详细介绍 s.Proxier.SyncLoop() 接口的实现。

Proxier 构建

三种代理模式,不同模式之间逻辑不一样,但是实现的接口都是一样的。

userspace

userspace 模式其实就是直接通过 kube-proxy 将数据包转发到后端 Pods,kube-proxy 在这里起到了路由规则下发、包转发规则、负载均衡的功能,由于 kube-proxy 是运行在用户空间的,会存在用户空间和内核空间的频繁切换,这对性能影响很大,所以后面默认就换成 iptables 了。我们重点关注 iptables 和 ipvs 部分。

/kubernetes%E6%BA%90%E7%A0%81-kube-proxy-%E5%8E%9F%E7%90%86%E5%92%8C%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90%E4%B8%80/userspace.png
userspace 架构图

iptables

iptables 基于 netfilter 实现,所有操作都在内核空间相比基于 kube-proxy 直接做转发和负载均衡在性能上得到很大的提升。这里 kube-proxy 只是起到设置 iptables 的规则作用。

/kubernetes%E6%BA%90%E7%A0%81-kube-proxy-%E5%8E%9F%E7%90%86%E5%92%8C%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90%E4%B8%80/iptables.png
iptables 架构图

ipvs

另一个是 IPVS, IPVS 在性能上比 iptables 更进一步,底层和 iptables 一样是基于 netfilter ,但IPVS 基于hash tabels来存储网络转发规则相比于 iptables 这种线性 O(n) 的算法要快很多。

/kubernetes%E6%BA%90%E7%A0%81-kube-proxy-%E5%8E%9F%E7%90%86%E5%92%8C%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90%E4%B8%80/ipvs.png
ipvs 架构图

未完

由于篇幅原因,如果全写一起,会很长,我们分开讲,下一篇我们看看基于 ipv4 的 ipvs 是怎么实现的。