1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
|
func (e *Controller) syncService(key string) error {
startTime := time.Now()
defer func() {
klog.V(4).Infof("Finished syncing service %q endpoints. (%v)", key, time.Since(startTime))
}()
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
service, err := e.serviceLister.Services(namespace).Get(name)
if err != nil {
if !errors.IsNotFound(err) {
return err
}
// Delete the corresponding endpoint, as the service has been deleted.
// TODO: Please note that this will delete an endpoint when a
// service is deleted. However, if we're down at the time when
// the service is deleted, we will miss that deletion, so this
// doesn't completely solve the problem. See #6877.
err = e.client.CoreV1().Endpoints(namespace).Delete(context.TODO(), name, metav1.DeleteOptions{})
if err != nil && !errors.IsNotFound(err) {
return err
}
e.triggerTimeTracker.DeleteService(namespace, name)
return nil
}
if service.Spec.Selector == nil {
// services without a selector receive no endpoints from this controller;
// these services will receive the endpoints that are created out-of-band via the REST API.
return nil
}
klog.V(5).Infof("About to update endpoints for service %q", key)
pods, err := e.podLister.Pods(service.Namespace).List(labels.Set(service.Spec.Selector).AsSelectorPreValidated())
if err != nil {
// Since we're getting stuff from a local cache, it is
// basically impossible to get this error.
return err
}
// If the user specified the older (deprecated) annotation, we have to respect it.
tolerateUnreadyEndpoints := service.Spec.PublishNotReadyAddresses
if v, ok := service.Annotations[TolerateUnreadyEndpointsAnnotation]; ok {
b, err := strconv.ParseBool(v)
if err == nil {
tolerateUnreadyEndpoints = b
} else {
utilruntime.HandleError(fmt.Errorf("Failed to parse annotation %v: %v", TolerateUnreadyEndpointsAnnotation, err))
}
}
// We call ComputeEndpointLastChangeTriggerTime here to make sure that the
// state of the trigger time tracker gets updated even if the sync turns out
// to be no-op and we don't update the endpoints object.
endpointsLastChangeTriggerTime := e.triggerTimeTracker.
ComputeEndpointLastChangeTriggerTime(namespace, service, pods)
subsets := []v1.EndpointSubset{}
var totalReadyEps int
var totalNotReadyEps int
for _, pod := range pods {
if !endpointutil.ShouldPodBeInEndpoints(pod, tolerateUnreadyEndpoints) {
klog.V(5).Infof("Pod %s/%s is not included on endpoints for Service %s/%s", pod.Namespace, pod.Name, service.Namespace, service.Name)
continue
}
ep, err := podToEndpointAddressForService(service, pod)
if err != nil {
// this will happen, if the cluster runs with some nodes configured as dual stack and some as not
// such as the case of an upgrade..
klog.V(2).Infof("failed to find endpoint for service:%v with ClusterIP:%v on pod:%v with error:%v", service.Name, service.Spec.ClusterIP, pod.Name, err)
continue
}
epa := *ep
if endpointutil.ShouldSetHostname(pod, service) {
epa.Hostname = pod.Spec.Hostname
}
// Allow headless service not to have ports.
if len(service.Spec.Ports) == 0 {
if service.Spec.ClusterIP == api.ClusterIPNone {
subsets, totalReadyEps, totalNotReadyEps = addEndpointSubset(subsets, pod, epa, nil, tolerateUnreadyEndpoints)
// No need to repack subsets for headless service without ports.
}
} else {
for i := range service.Spec.Ports {
servicePort := &service.Spec.Ports[i]
portNum, err := podutil.FindPort(pod, servicePort)
if err != nil {
klog.V(4).Infof("Failed to find port for service %s/%s: %v", service.Namespace, service.Name, err)
continue
}
epp := endpointPortFromServicePort(servicePort, portNum)
var readyEps, notReadyEps int
subsets, readyEps, notReadyEps = addEndpointSubset(subsets, pod, epa, epp, tolerateUnreadyEndpoints)
totalReadyEps = totalReadyEps + readyEps
totalNotReadyEps = totalNotReadyEps + notReadyEps
}
}
}
subsets = endpoints.RepackSubsets(subsets)
// See if there's actually an update here.
currentEndpoints, err := e.endpointsLister.Endpoints(service.Namespace).Get(service.Name)
if err != nil {
if errors.IsNotFound(err) {
currentEndpoints = &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: service.Name,
Labels: service.Labels,
},
}
} else {
return err
}
}
createEndpoints := len(currentEndpoints.ResourceVersion) == 0
// Compare the sorted subsets and labels
// Remove the HeadlessService label from the endpoints if it exists,
// as this won't be set on the service itself
// and will cause a false negative in this diff check.
// But first check if it has that label to avoid expensive copies.
compareLabels := currentEndpoints.Labels
if _, ok := currentEndpoints.Labels[v1.IsHeadlessService]; ok {
compareLabels = utillabels.CloneAndRemoveLabel(currentEndpoints.Labels, v1.IsHeadlessService)
}
// When comparing the subsets, we ignore the difference in ResourceVersion of Pod to avoid unnecessary Endpoints
// updates caused by Pod updates that we don't care, e.g. annotation update.
if !createEndpoints &&
endpointutil.EndpointSubsetsEqualIgnoreResourceVersion(currentEndpoints.Subsets, subsets) &&
apiequality.Semantic.DeepEqual(compareLabels, service.Labels) &&
capacityAnnotationSetCorrectly(currentEndpoints.Annotations, currentEndpoints.Subsets) {
klog.V(5).Infof("endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name)
return nil
}
newEndpoints := currentEndpoints.DeepCopy()
newEndpoints.Subsets = subsets
newEndpoints.Labels = service.Labels
if newEndpoints.Annotations == nil {
newEndpoints.Annotations = make(map[string]string)
}
if !endpointsLastChangeTriggerTime.IsZero() {
newEndpoints.Annotations[v1.EndpointsLastChangeTriggerTime] =
endpointsLastChangeTriggerTime.UTC().Format(time.RFC3339Nano)
} else { // No new trigger time, clear the annotation.
delete(newEndpoints.Annotations, v1.EndpointsLastChangeTriggerTime)
}
if truncateEndpoints(newEndpoints) {
newEndpoints.Annotations[v1.EndpointsOverCapacity] = truncated
} else {
delete(newEndpoints.Annotations, v1.EndpointsOverCapacity)
}
if newEndpoints.Labels == nil {
newEndpoints.Labels = make(map[string]string)
}
if !helper.IsServiceIPSet(service) {
newEndpoints.Labels = utillabels.CloneAndAddLabel(newEndpoints.Labels, v1.IsHeadlessService, "")
} else {
newEndpoints.Labels = utillabels.CloneAndRemoveLabel(newEndpoints.Labels, v1.IsHeadlessService)
}
klog.V(4).Infof("Update endpoints for %v/%v, ready: %d not ready: %d", service.Namespace, service.Name, totalReadyEps, totalNotReadyEps)
if createEndpoints {
// No previous endpoints, create them
_, err = e.client.CoreV1().Endpoints(service.Namespace).Create(context.TODO(), newEndpoints, metav1.CreateOptions{})
} else {
// Pre-existing
_, err = e.client.CoreV1().Endpoints(service.Namespace).Update(context.TODO(), newEndpoints, metav1.UpdateOptions{})
}
if err != nil {
if createEndpoints && errors.IsForbidden(err) {
// A request is forbidden primarily for two reasons:
// 1. namespace is terminating, endpoint creation is not allowed by default.
// 2. policy is misconfigured, in which case no service would function anywhere.
// Given the frequency of 1, we log at a lower level.
klog.V(5).Infof("Forbidden from creating endpoints: %v", err)
// If the namespace is terminating, creates will continue to fail. Simply drop the item.
if errors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
return nil
}
}
if createEndpoints {
e.eventRecorder.Eventf(newEndpoints, v1.EventTypeWarning, "FailedToCreateEndpoint", "Failed to create endpoint for service %v/%v: %v", service.Namespace, service.Name, err)
} else {
e.eventRecorder.Eventf(newEndpoints, v1.EventTypeWarning, "FailedToUpdateEndpoint", "Failed to update endpoint %v/%v: %v", service.Namespace, service.Name, err)
}
return err
}
return nil
}
|