1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
|
package service
import (
"context"
"encoding/json"
"log"
// "strconv"
cf "auto-cordon/conf"
"auto-cordon/pkg/utils"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
)
func AutoCordon(clientset *kubernetes.Clientset) {
var (
pods int
err error
max = cf.Conf.CordonConfig.Max
min = cf.Conf.CordonConfig.Min
)
log.Println("任务开始,请留意相关日志输出.")
// 获取所有节点信息
list, _ := clientset.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
for _, item := range list.Items {
// 获取节点就绪状态
conditions := item.Status.Conditions
status := utils.PharseNodeStatus(conditions)
// 获取pod数量
pods, err = CountPods(item.Name, clientset)
if err != nil {
panic(err)
}
// 获取节点cordon状态
unschedulable := item.Spec.Unschedulable
// 获取节点标签
labels := item.Labels
auto_cordon, donot_cordon := utils.PharseLables(labels)
// 条件判断,是否cordon
if status && pods > max && !donot_cordon && !unschedulable {
// cordon 节点
type patchStringValue struct {
Op string `json:"op"`
Path string `json:"path"`
Value bool `json:"value"`
}
payload := []patchStringValue{{
Op: "replace",
Path: "/spec/unschedulable",
Value: true,
}}
payloadBytes, _ := json.Marshal(payload)
_, err := clientset.CoreV1().Nodes().Patch(context.TODO(), item.Name, types.JSONPatchType, payloadBytes, metav1.PatchOptions{})
log.Printf("节点:%s, pod 数量超过阈值%v, 当前:%v, 自动cordon. \n", item.Name, max, pods)
if err != nil {
panic(err)
}
// 打上自动cordon过的标签
labels["auto_cordon"] = "yes"
lablesData := map[string]interface{} {
"metadata": map[string]map[string]string{
"labels": labels,
},
}
lablesBytes, _ := json.Marshal(lablesData)
_, err = clientset.CoreV1().Nodes().Patch(context.Background(), item.Name, types.StrategicMergePatchType, lablesBytes, metav1.PatchOptions{})
if err != nil {
panic(err)
}
log.Printf("节点:%s, 打上自动cordon过的标签. \n", item.Name)
} else if status && pods < min && auto_cordon && unschedulable {
// uncordon 节点
type patchStringValue struct {
Op string `json:"op"`
Path string `json:"path"`
Value bool `json:"value"`
}
payload := []patchStringValue{{
Op: "replace",
Path: "/spec/unschedulable",
Value: false,
}}
payloadBytes, _ := json.Marshal(payload)
_, err := clientset.CoreV1().Nodes().Patch(context.Background(), item.Name, types.JSONPatchType, payloadBytes,metav1.PatchOptions{})
log.Printf("节点:%s, pod 数量低于阈值%v, 当前:%v, 解除cordon. \n", item.Name, min, pods)
if err != nil {
panic(err)
}
// 删除自动cordon过的标签
lablesData := map[string]interface{} {
"metadata": map[string]map[string]interface{}{
"labels": {
"auto_cordon": nil,
},
},
}
lablesBytes, _ := json.Marshal(lablesData)
_, err = clientset.CoreV1().Nodes().Patch(context.Background(), item.Name, types.StrategicMergePatchType, lablesBytes, metav1.PatchOptions{})
if err != nil {
panic(err)
}
log.Printf("节点:%s, 删除自动cordon过的标签. \n", item.Name)
}
log.Printf("节点:%s, pod 当前数量:%v. \n", item.Name, pods)
}
log.Println("任务结束.")
}
func CountPods(nodeName string, clientset *kubernetes.Clientset) (int, error) {
var count int
oo, err := clientset.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{})
if err != nil {
return 0, err
}
for _, o := range oo.Items {
if o.Spec.NodeName == nodeName {
count ++
}
}
return count, nil
}
|