kube-proxy 原理和源码分析,源码为 kubernetes 的 release-1.20 及 release-1.22 分支 .
写在前面
印象中, nodePort 类型的 service 会在宿主机上监听 30000-32767 范围内的随机端口,最近我们集群升级到 1.22 版本,发现 service 启用了 nodePort ,但在宿主机上看不到相关的端口监听,百度和谷歌半天无果,决定自己查看源码,看看到底怎么一回事。
iptables 模式
书接上回,我们看了 ipvs 模式后,留了个坑,iptables 模式和 userspace 模式没去看,我们先了解一下 iptables 模式下,流量具体的走向是怎么样的。
注意:要看 iptables 模式的话,集群得启用的是 iptables 模式。
调谐部分的源码我们先略过,基本上跟 ipvs 差不多,都是根据条件生成 iptables 规则,看的头皮发麻,我们直接看它生成的 iptables 规则。
我们本次只讲 nodePort 类型的流量转发。
1
2
3
4
5
6
|
# 老规矩
# iptables -t nat -nL
Chain PREROUTING (policy ACCEPT)
target prot opt source destination
KUBE-SERVICES all -- 0.0.0.0/0 0.0.0.0/0 /* kubernetes service portals */
DOCKER_OUTPUT all -- 0.0.0.0/0 172.18.0.1
|
我们看到数据在 PREROUTING 链给你转发到 KUBE-SERVICES 链。我们看看 KUBE-SERVICES 。
1
2
3
4
5
6
7
8
9
10
11
|
Chain KUBE-SERVICES (2 references)
target prot opt source destination
KUBE-MARK-MASQ tcp -- !10.244.0.0/16 10.96.0.1 /* default/kubernetes:https cluster IP */ tcp dpt:443
KUBE-SVC-NPX46M4PTMTKRN6Y tcp -- 0.0.0.0/0 10.96.0.1 /* default/kubernetes:https cluster IP */ tcp dpt:443
KUBE-MARK-MASQ udp -- !10.244.0.0/16 10.96.0.10 /* kube-system/kube-dns:dns cluster IP */ udp dpt:53
KUBE-SVC-TCOU7JCQXEZGVUNU udp -- 0.0.0.0/0 10.96.0.10 /* kube-system/kube-dns:dns cluster IP */ udp dpt:53
KUBE-MARK-MASQ tcp -- !10.244.0.0/16 10.96.0.10 /* kube-system/kube-dns:dns-tcp cluster IP */ tcp dpt:53
KUBE-SVC-ERIFXISQEP7F7OF4 tcp -- 0.0.0.0/0 10.96.0.10 /* kube-system/kube-dns:dns-tcp cluster IP */ tcp dpt:53
KUBE-MARK-MASQ tcp -- !10.244.0.0/16 10.96.0.10 /* kube-system/kube-dns:metrics cluster IP */ tcp dpt:9153
KUBE-SVC-JD5MR3NA4I4DYORP tcp -- 0.0.0.0/0 10.96.0.10 /* kube-system/kube-dns:metrics cluster IP */ tcp dpt:9153
KUBE-NODEPORTS all -- 0.0.0.0/0 0.0.0.0/0 /* kubernetes service nodeports; NOTE: this must be the last rule in this chain */ ADDRTYPE match dst-type LOCAL
|
很显然,我们 nodePort 的流量是匹配 KUBE-NODEPORTS 这条链的,很好理解,匹配到目标地址是去往本地网卡的请求走 KUBE-NODEPORTS 。
1
2
3
4
|
Chain KUBE-NODEPORTS (1 references)
target prot opt source destination
KUBE-MARK-MASQ tcp -- 0.0.0.0/0 0.0.0.0/0 /* default/kubernetes:https */ tcp dpt:30661
KUBE-SVC-NPX46M4PTMTKRN6Y tcp -- 0.0.0.0/0 0.0.0.0/0 /* default/kubernetes:https */ tcp dpt:30661
|
方便我们观察,我这边的集群是新集群,特意只开了一条 nodePort 相关的 service ,nodePort 是 30661 。我们可以看到通往本地网卡 30661 端口的请求,它是匹配到 KUBE-SVC-NPX46M4PTMTKRN6Y 链去了。
1
2
3
|
Chain KUBE-SVC-NPX46M4PTMTKRN6Y (2 references)
target prot opt source destination
KUBE-SEP-V2PECCYPB6X2GSCW all -- 0.0.0.0/0 0.0.0.0/0 /* default/kubernetes:https */
|
然后走 KUBE-SEP-V2PECCYPB6X2GSCW 做负载均衡去了。
1
2
3
4
|
Chain KUBE-SEP-V2PECCYPB6X2GSCW (1 references)
target prot opt source destination
KUBE-MARK-MASQ all -- 172.18.0.2 0.0.0.0/0 /* default/kubernetes:https */
DNAT tcp -- 0.0.0.0/0 0.0.0.0/0 /* default/kubernetes:https */ tcp to:172.18.0.2:6443
|
KUBE-SEP-V2PECCYPB6X2GSCW 链在这里做了一层 DNAT ,这里后端只有一个 pod ,它把去往本地的 30661 端口的请求,DNAT 修改后,目标地址和端口就变成了 172.18.0.2:6443 。
然后查路由表,完成请求。
总结一下:
-
将目标地址为 SVCIP:Port 的数据包分派到相应的 KUBE-SVC-xxx 链
-
将目标地址为本地网卡的数据包分派到 KUBE-NODEPORTS 链
-
KUBE-NODEPORTS:根据 dst-port 匹配 NodePort 端口
-
数据包分派到相应的 KUBE-SVC-xxx 链(externalTrafficPolicy=Cluster),数据包分派到相应的 KUBE-XLB-xxx 链(externalTrafficPolicy=Local)
-
KUBE-SVC-xxx: 对应 service(Cluster类型),数据包将随机进入 KUBE-SEP-xxx 链,KUBE-XLB-xxx: 对应 service(Local类型),数据包可能进入 KUBE-SEP-xxx 链或者被丢弃
-
KUBE-SEP-xxx: 对应 endpoint 中的IP地址,数据包将 DNAT 到 Pod IP
-
KUBE-MARK-MASQ:标记数据包为0x4000(需要SNAT)
-
KUBE-MARK-DROP:标记数据包为0x8000(DROP包)
-
KUBE-POSTROUTING(nat.POSTROUTING):MASQUERADE 0x4000 的包
端口监听
好了,我们看看端口监听问题,一开始,我看的是 1.22 版本的源码,和 1.21 版本的源码,死活没找到关于端口监听部分的代码,只看到 kubelet 创建 hostNetwork 部分的代码,一度怀疑 nodePort 是不是其他组件创建的,后面切到 1.20 版本的源码才看到。
位于:pkg/proxy/iptables/proxier.go
在构建 Proxier 部分,我们看到了 portMapper: &listenPortOpener{}
,就是它了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
// listenPortOpener opens ports by calling bind() and listen().
type listenPortOpener struct{}
// OpenLocalPort holds the given local port open.
func (l *listenPortOpener) OpenLocalPort(lp *utilproxy.LocalPort, isIPv6 bool) (utilproxy.Closeable, error) {
return openLocalPort(lp, isIPv6)
}
func NewProxier(...) {
...
proxier := &Proxier{
portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable),
serviceMap: make(proxy.ServiceMap),
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil),
endpointsMap: make(proxy.EndpointsMap),
endpointsChanges: proxy.NewEndpointChangeTracker(hostname, newEndpointInfo, ipFamily, recorder, endpointSlicesEnabled, nil),
syncPeriod: syncPeriod,
iptables: ipt,
masqueradeAll: masqueradeAll,
masqueradeMark: masqueradeMark,
exec: exec,
localDetector: localDetector,
hostname: hostname,
nodeIP: nodeIP,
portMapper: &listenPortOpener{},
recorder: recorder,
serviceHealthServer: serviceHealthServer,
healthzServer: healthzServer,
precomputedProbabilities: make([]string, 0, 1001),
iptablesData: bytes.NewBuffer(nil),
existingFilterChainsData: bytes.NewBuffer(nil),
filterChains: bytes.NewBuffer(nil),
filterRules: bytes.NewBuffer(nil),
natChains: bytes.NewBuffer(nil),
natRules: bytes.NewBuffer(nil),
nodePortAddresses: nodePortAddresses,
networkInterfacer: utilproxy.RealNetwork{},
}
...
}
|
继续往下看 syncProxyRules 同步规则部分。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
func (proxier *Proxier) syncProxyRules() {
...
// serviceMap 以 service名 + port 为维度,ServicePort接口为值的结构体
for svcName, svc := range proxier.serviceMap {
...
if svcInfo.NodePort() != 0 {
// Hold the local port open so no other process can open it
// (because the socket might open but it would never work).
if len(nodeAddresses) == 0 {
continue
}
lps := make([]utilproxy.LocalPort, 0)
for address := range nodeAddresses {
lp := utilproxy.LocalPort{
Description: "nodePort for " + svcNameString,
IP: address,
Port: svcInfo.NodePort(),
Protocol: protocol,
}
if utilproxy.IsZeroCIDR(address) {
// Empty IP address means all
lp.IP = ""
lps = append(lps, lp)
// If we encounter a zero CIDR, then there is no point in processing the rest of the addresses.
break
}
lps = append(lps, lp)
}
// For ports on node IPs, open the actual port and hold it.
for _, lp := range lps {
if proxier.portsMap[lp] != nil {
klog.V(4).Infof("Port %s was open before and is still needed", lp.String())
replacementPortsMap[lp] = proxier.portsMap[lp]
} else if svcInfo.Protocol() != v1.ProtocolSCTP {
socket, err := proxier.portMapper.OpenLocalPort(&lp, isIPv6)
if err != nil {
klog.Errorf("can't open %s, skipping this nodePort: %v", lp.String(), err)
continue
}
replacementPortsMap[lp] = socket
}
}
...
}
...
}
...
}
|
我们可以看到它调用了 proxier.portMapper.OpenLocalPort(&lp, isIPv6) 这个函数去设置端口监听。上面的注释写的很清楚了 Hold the local port open so no other process can open it (because the socket might open but it would never work).
目的就是为了先占用主机上的这个端口,免得有其他应用或进程使用了该端口,不然即使进程使用了被派给 service 的 nodePort 端口,流量也会被 iptables 流量到 service 的后端 pod 。下面这段代码就是监听 nodePort 的代码,那一坨注释太长了,又不好理解,反正大概意思就是我们刚刚说的那样,直接就不看了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
func openLocalPort(lp *utilproxy.LocalPort, isIPv6 bool) (utilproxy.Closeable, error) {
// For ports on node IPs, open the actual port and hold it, even though we
// use iptables to redirect traffic.
// This ensures a) that it's safe to use that port and b) that (a) stays
// true. The risk is that some process on the node (e.g. sshd or kubelet)
// is using a port and we give that same port out to a Service. That would
// be bad because iptables would silently claim the traffic but the process
// would never know.
// NOTE: We should not need to have a real listen()ing socket - bind()
// should be enough, but I can't figure out a way to e2e test without
// it. Tools like 'ss' and 'netstat' do not show sockets that are
// bind()ed but not listen()ed, and at least the default debian netcat
// has no way to avoid about 10 seconds of retries.
var socket utilproxy.Closeable
switch lp.Protocol {
case "tcp":
network := "tcp4"
if isIPv6 {
network = "tcp6"
}
listener, err := net.Listen(network, net.JoinHostPort(lp.IP, strconv.Itoa(lp.Port)))
if err != nil {
return nil, err
}
socket = listener
case "udp":
network := "udp4"
if isIPv6 {
network = "udp6"
}
addr, err := net.ResolveUDPAddr(network, net.JoinHostPort(lp.IP, strconv.Itoa(lp.Port)))
if err != nil {
return nil, err
}
conn, err := net.ListenUDP(network, addr)
if err != nil {
return nil, err
}
socket = conn
default:
return nil, fmt.Errorf("unknown protocol %q", lp.Protocol)
}
klog.V(2).Infof("Opened local port %s", lp.String())
return socket, nil
}
|
验证
我们验证一下,看看是不是 1.20 以后的版本就没有了。
1.20
废话不多说,直接看端口监听和 iptables 规则。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
root@kind-control-plane:/# ss -tnlp
State Recv-Q Send-Q Local Address:Port Peer Address:Port Process
LISTEN 0 4096 127.0.0.1:41695 0.0.0.0:* users:(("containerd",pid=105,fd=13))
LISTEN 0 4096 127.0.0.11:35451 0.0.0.0:*
LISTEN 0 4096 127.0.0.1:10257 0.0.0.0:* users:(("kube-controller",pid=527,fd=7))
LISTEN 0 4096 127.0.0.1:10259 0.0.0.0:* users:(("kube-scheduler",pid=471,fd=7))
LISTEN 0 4096 127.0.0.1:10249 0.0.0.0:* users:(("kube-proxy",pid=903,fd=17))
LISTEN 0 4096 127.0.0.1:10248 0.0.0.0:* users:(("kubelet",pid=678,fd=22))
LISTEN 0 4096 0.0.0.0:30532 0.0.0.0:* users:(("kube-proxy",pid=903,fd=13))
LISTEN 0 4096 127.0.0.1:2379 0.0.0.0:* users:(("etcd",pid=611,fd=5))
LISTEN 0 4096 127.0.0.1:2381 0.0.0.0:* users:(("etcd",pid=611,fd=10))
LISTEN 0 4096 172.18.0.2:2379 0.0.0.0:* users:(("etcd",pid=611,fd=6))
LISTEN 0 4096 172.18.0.2:2380 0.0.0.0:* users:(("etcd",pid=611,fd=3))
LISTEN 0 4096 *:6443 *:* users:(("kube-apiserver",pid=538,fd=7))
LISTEN 0 4096 *:10256 *:* users:(("kube-proxy",pid=903,fd=12))
LISTEN 0 4096 *:10250 *:* users:(("kubelet",pid=678,fd=21))
root@kind-control-plane:/# kubectl get svc -o wide
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE SELECTOR
kubernetes NodePort 10.96.0.1 <none> 443:30532/TCP 95s <none>
root@kind-control-plane:/# kubectl version
Client Version: version.Info{Major:"1", Minor:"20", GitVersion:"v1.20.15", GitCommit:"8f1e5bf0b9729a899b8df86249b56e2c74aebc55", GitTreeState:"clean", BuildDate:"2022-10-26T15:32:46Z", GoVersion:"go1.15.15", Compiler:"gc", Platform:"linux/amd64"}
Server Version: version.Info{Major:"1", Minor:"20", GitVersion:"v1.20.15", GitCommit:"8f1e5bf0b9729a899b8df86249b56e2c74aebc55", GitTreeState:"clean", BuildDate:"2022-10-26T15:31:34Z", GoVersion:"go1.15.15", Compiler:"gc", Platform:"linux/amd64"}
|
我们可以看到宿主机监听了 30532 的 nodePort 端口。
1
|
LISTEN 0 4096 0.0.0.0:30532 0.0.0.0:* users:(("kube-proxy",pid=903,fd=13))
|
1.21
直接看端口监听和规则。比我的兜还干净,宿主机没有监听 31061 端口,这完全符合我们的预期。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
root@kind-worker:/# ss -tnlp
State Recv-Q Send-Q Local Address:Port Peer Address:Port Process
LISTEN 0 4096 127.0.0.11:35149 0.0.0.0:*
LISTEN 0 4096 127.0.0.1:33487 0.0.0.0:* users:(("containerd",pid=107,fd=13))
LISTEN 0 4096 127.0.0.1:10248 0.0.0.0:* users:(("kubelet",pid=181,fd=31))
LISTEN 0 4096 127.0.0.1:10249 0.0.0.0:* users:(("kube-proxy",pid=407,fd=12))
LISTEN 0 4096 *:10256 *:* users:(("kube-proxy",pid=407,fd=18))
LISTEN 0 4096 *:10250 *:* users:(("kubelet",pid=181,fd=28))
root@kind-worker:/# kubectl get svc -o wide
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE SELECTOR
kubernetes NodePort 10.96.0.1 <none> 443:31061/TCP 2m35s <none>
[root@fedora ~]# kubectl version
Client Version: version.Info{Major:"1", Minor:"21", GitVersion:"v1.21.14", GitCommit:"0f77da5bd4809927e15d1658fb4aa8f13ad890a5", GitTreeState:"clean", BuildDate:"2023-05-16T23:15:47Z", GoVersion:"go1.16.15", Compiler:"gc", Platform:"linux/amd64"}
Server Version: version.Info{Major:"1", Minor:"21", GitVersion:"v1.21.14", GitCommit:"0f77da5bd4809927e15d1658fb4aa8f13ad890a5", GitTreeState:"clean", BuildDate:"2023-05-16T23:15:47Z", GoVersion:"go1.16.15", Compiler:"gc", Platform:"linux/amd64"}
|
总结
-
1.20 以后版本取消了在宿主机上监听 nodePort 的操作。
-
本地监听 nodePort 端口目的只是为了防止其他程序使用了分配给 service 的端口,导致的结果就是流量不能正确转发给程序处理。
-
流量通过 iptables 做 DNAT 转发到 service 后端 pod 。