kubernetes/pkg/controller/endpoint/endpoints_controller.go

708 lines
25 KiB
Go
Raw Normal View History

2014-06-07 07:40:48 +08:00
/*
Copyright 2014 The Kubernetes Authors.
2014-06-07 07:40:48 +08:00
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package endpoint
2014-06-07 07:40:48 +08:00
import (
2020-02-08 10:16:47 +08:00
"context"
"fmt"
"math"
2015-04-17 07:18:02 +08:00
"time"
2014-06-07 07:40:48 +08:00
v1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
2017-01-14 01:48:50 +08:00
"k8s.io/apimachinery/pkg/api/errors"
2017-01-11 22:09:48 +08:00
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
2017-01-24 22:11:51 +08:00
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/tools/record"
2017-01-27 23:20:40 +08:00
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
2016-11-19 04:50:17 +08:00
"k8s.io/kubernetes/pkg/api/v1/endpoints"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
api "k8s.io/kubernetes/pkg/apis/core"
helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/controller"
2019-07-31 06:42:01 +08:00
endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint"
utillabels "k8s.io/kubernetes/pkg/util/labels"
utilnet "k8s.io/utils/net"
2014-06-07 07:40:48 +08:00
)
2015-04-17 07:18:02 +08:00
const (
// maxRetries is the number of times a service will be retried before it is dropped out of the queue.
// With the current rate-limiter in use (5ms*2^(maxRetries-1)) the following numbers represent the
// sequence of delays between successive queuings of a service.
//
// 5ms, 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1.3s, 2.6s, 5.1s, 10.2s, 20.4s, 41s, 82s
maxRetries = 15
// maxCapacity represents the maximum number of addresses that should be
// stored in an Endpoints resource. In a future release, this controller
// may truncate endpoints exceeding this length.
maxCapacity = 1000
// truncated is a possible value for `endpoints.kubernetes.io/over-capacity` annotation on an
// endpoint resource and indicates that the number of endpoints have been truncated to
// maxCapacity
truncated = "truncated"
2015-04-17 07:18:02 +08:00
)
// NewEndpointController returns a new *Controller.
func NewEndpointController(podInformer coreinformers.PodInformer, serviceInformer coreinformers.ServiceInformer,
endpointsInformer coreinformers.EndpointsInformer, client clientset.Interface, endpointUpdatesBatchPeriod time.Duration) *Controller {
broadcaster := record.NewBroadcaster()
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "endpoint-controller"})
e := &Controller{
client: client,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "endpoint"),
workerLoopPeriod: time.Second,
2015-04-17 07:18:02 +08:00
}
serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: e.onServiceUpdate,
UpdateFunc: func(old, cur interface{}) {
e.onServiceUpdate(cur)
2015-04-17 07:18:02 +08:00
},
DeleteFunc: e.onServiceDelete,
})
e.serviceLister = serviceInformer.Lister()
e.servicesSynced = serviceInformer.Informer().HasSynced
2015-04-17 07:18:02 +08:00
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: e.addPod,
UpdateFunc: e.updatePod,
DeleteFunc: e.deletePod,
})
e.podLister = podInformer.Lister()
e.podsSynced = podInformer.Informer().HasSynced
2015-04-17 07:18:02 +08:00
endpointsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: e.onEndpointsDelete,
})
e.endpointsLister = endpointsInformer.Lister()
e.endpointsSynced = endpointsInformer.Informer().HasSynced
2019-07-31 06:42:01 +08:00
e.triggerTimeTracker = endpointutil.NewTriggerTimeTracker()
e.eventBroadcaster = broadcaster
e.eventRecorder = recorder
e.endpointUpdatesBatchPeriod = endpointUpdatesBatchPeriod
e.serviceSelectorCache = endpointutil.NewServiceSelectorCache()
2015-04-17 07:18:02 +08:00
return e
}
// Controller manages selector-based service endpoints.
type Controller struct {
client clientset.Interface
eventBroadcaster record.EventBroadcaster
eventRecorder record.EventRecorder
2015-04-17 07:18:02 +08:00
// serviceLister is able to list/get services and is populated by the shared informer passed to
// NewEndpointController.
serviceLister corelisters.ServiceLister
// servicesSynced returns true if the service shared informer has been synced at least once.
// Added as a member to the struct to allow injection for testing.
servicesSynced cache.InformerSynced
2015-04-17 07:18:02 +08:00
// podLister is able to list/get pods and is populated by the shared informer passed to
// NewEndpointController.
podLister corelisters.PodLister
// podsSynced returns true if the pod shared informer has been synced at least once.
// Added as a member to the struct to allow injection for testing.
podsSynced cache.InformerSynced
// endpointsLister is able to list/get endpoints and is populated by the shared informer passed to
// NewEndpointController.
endpointsLister corelisters.EndpointsLister
// endpointsSynced returns true if the endpoints shared informer has been synced at least once.
// Added as a member to the struct to allow injection for testing.
endpointsSynced cache.InformerSynced
2015-04-17 07:18:02 +08:00
// Services that need to be updated. A channel is inappropriate here,
// because it allows services with lots of pods to be serviced much
// more often than services with few pods; it also would cause a
// service that's inserted multiple times to be processed more than
// necessary.
queue workqueue.RateLimitingInterface
// workerLoopPeriod is the time between worker runs. The workers process the queue of service and pod changes.
workerLoopPeriod time.Duration
// triggerTimeTracker is an util used to compute and export the EndpointsLastChangeTriggerTime
// annotation.
2019-07-31 06:42:01 +08:00
triggerTimeTracker *endpointutil.TriggerTimeTracker
endpointUpdatesBatchPeriod time.Duration
// serviceSelectorCache is a cache of service selectors to avoid high CPU consumption caused by frequent calls
// to AsSelectorPreValidated (see #73527)
serviceSelectorCache *endpointutil.ServiceSelectorCache
2014-06-07 07:40:48 +08:00
}
2017-11-03 22:40:03 +08:00
// Run will not return until stopCh is closed. workers determines how many
2015-04-17 07:18:02 +08:00
// endpoints will be handled in parallel.
2021-04-23 02:27:59 +08:00
func (e *Controller) Run(ctx context.Context, workers int) {
2016-01-15 15:32:10 +08:00
defer utilruntime.HandleCrash()
// Start events processing pipeline.
e.eventBroadcaster.StartStructuredLogging(0)
e.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: e.client.CoreV1().Events("")})
defer e.eventBroadcaster.Shutdown()
defer e.queue.ShutDown()
klog.Infof("Starting endpoint controller")
defer klog.Infof("Shutting down endpoint controller")
2021-04-23 02:27:59 +08:00
if !cache.WaitForNamedCacheSync("endpoint", ctx.Done(), e.podsSynced, e.servicesSynced, e.endpointsSynced) {
return
}
2015-04-17 07:18:02 +08:00
for i := 0; i < workers; i++ {
2021-04-23 02:27:59 +08:00
go wait.UntilWithContext(ctx, e.worker, e.workerLoopPeriod)
}
2015-04-25 05:16:27 +08:00
go func() {
2016-01-15 15:32:10 +08:00
defer utilruntime.HandleCrash()
2015-04-25 05:16:27 +08:00
e.checkLeftoverEndpoints()
}()
2021-04-23 02:27:59 +08:00
<-ctx.Done()
}
2015-04-17 07:18:02 +08:00
// When a pod is added, figure out what services it will be a member of and
2016-11-19 04:50:17 +08:00
// enqueue them. obj must have *v1.Pod type.
func (e *Controller) addPod(obj interface{}) {
2016-11-19 04:50:17 +08:00
pod := obj.(*v1.Pod)
services, err := e.serviceSelectorCache.GetPodServiceMemberships(e.serviceLister, pod)
2015-04-17 07:18:02 +08:00
if err != nil {
2018-01-03 17:45:28 +08:00
utilruntime.HandleError(fmt.Errorf("Unable to get pod %s/%s's service memberships: %v", pod.Namespace, pod.Name, err))
2015-04-17 07:18:02 +08:00
return
}
for key := range services {
e.queue.AddAfter(key, e.endpointUpdatesBatchPeriod)
2015-04-17 07:18:02 +08:00
}
}
func podToEndpointAddressForService(svc *v1.Service, pod *v1.Pod) (*v1.EndpointAddress, error) {
var endpointIP string
dual stack services (#91824) * api: structure change * api: defaulting, conversion, and validation * [FIX] validation: auto remove second ip/family when service changes to SingleStack * [FIX] api: defaulting, conversion, and validation * api-server: clusterIPs alloc, printers, storage and strategy * [FIX] clusterIPs default on read * alloc: auto remove second ip/family when service changes to SingleStack * api-server: repair loop handling for clusterIPs * api-server: force kubernetes default service into single stack * api-server: tie dualstack feature flag with endpoint feature flag * controller-manager: feature flag, endpoint, and endpointSlice controllers handling multi family service * [FIX] controller-manager: feature flag, endpoint, and endpointSlicecontrollers handling multi family service * kube-proxy: feature-flag, utils, proxier, and meta proxier * [FIX] kubeproxy: call both proxier at the same time * kubenet: remove forced pod IP sorting * kubectl: modify describe to include ClusterIPs, IPFamilies, and IPFamilyPolicy * e2e: fix tests that depends on IPFamily field AND add dual stack tests * e2e: fix expected error message for ClusterIP immutability * add integration tests for dualstack the third phase of dual stack is a very complex change in the API, basically it introduces Dual Stack services. Main changes are: - It pluralizes the Service IPFamily field to IPFamilies, and removes the singular field. - It introduces a new field IPFamilyPolicyType that can take 3 values to express the "dual-stack(mad)ness" of the cluster: SingleStack, PreferDualStack and RequireDualStack - It pluralizes ClusterIP to ClusterIPs. The goal is to add coverage to the services API operations, taking into account the 6 different modes a cluster can have: - single stack: IP4 or IPv6 (as of today) - dual stack: IPv4 only, IPv6 only, IPv4 - IPv6, IPv6 - IPv4 * [FIX] add integration tests for dualstack * generated data * generated files Co-authored-by: Antonio Ojea <aojea@redhat.com>
2020-10-27 04:15:59 +08:00
ipFamily := v1.IPv4Protocol
if len(svc.Spec.IPFamilies) > 0 {
// controller is connected to an api-server that correctly sets IPFamilies
ipFamily = svc.Spec.IPFamilies[0] // this works for headful and headless
} else {
// controller is connected to an api server that does not correctly
// set IPFamilies (e.g. old api-server during an upgrade)
// TODO (khenidak): remove by when the possibility of upgrading
// from a cluster that does not support dual stack is nil
if len(svc.Spec.ClusterIP) > 0 && svc.Spec.ClusterIP != v1.ClusterIPNone {
// headful service. detect via service clusterIP
if utilnet.IsIPv6String(svc.Spec.ClusterIP) {
ipFamily = v1.IPv6Protocol
}
dual stack services (#91824) * api: structure change * api: defaulting, conversion, and validation * [FIX] validation: auto remove second ip/family when service changes to SingleStack * [FIX] api: defaulting, conversion, and validation * api-server: clusterIPs alloc, printers, storage and strategy * [FIX] clusterIPs default on read * alloc: auto remove second ip/family when service changes to SingleStack * api-server: repair loop handling for clusterIPs * api-server: force kubernetes default service into single stack * api-server: tie dualstack feature flag with endpoint feature flag * controller-manager: feature flag, endpoint, and endpointSlice controllers handling multi family service * [FIX] controller-manager: feature flag, endpoint, and endpointSlicecontrollers handling multi family service * kube-proxy: feature-flag, utils, proxier, and meta proxier * [FIX] kubeproxy: call both proxier at the same time * kubenet: remove forced pod IP sorting * kubectl: modify describe to include ClusterIPs, IPFamilies, and IPFamilyPolicy * e2e: fix tests that depends on IPFamily field AND add dual stack tests * e2e: fix expected error message for ClusterIP immutability * add integration tests for dualstack the third phase of dual stack is a very complex change in the API, basically it introduces Dual Stack services. Main changes are: - It pluralizes the Service IPFamily field to IPFamilies, and removes the singular field. - It introduces a new field IPFamilyPolicyType that can take 3 values to express the "dual-stack(mad)ness" of the cluster: SingleStack, PreferDualStack and RequireDualStack - It pluralizes ClusterIP to ClusterIPs. The goal is to add coverage to the services API operations, taking into account the 6 different modes a cluster can have: - single stack: IP4 or IPv6 (as of today) - dual stack: IPv4 only, IPv6 only, IPv4 - IPv6, IPv6 - IPv4 * [FIX] add integration tests for dualstack * generated data * generated files Co-authored-by: Antonio Ojea <aojea@redhat.com>
2020-10-27 04:15:59 +08:00
} else {
// Since this is a headless service we use podIP to identify the family.
// This assumes that status.PodIP is assigned correctly (follows pod cidr and
// pod cidr list order is same as service cidr list order). The expectation is
// this is *most probably* the case.
// if the family was incorrectly identified then this will be corrected once the
// the upgrade is completed (controller connects to api-server that correctly defaults services)
if utilnet.IsIPv6String(pod.Status.PodIP) {
ipFamily = v1.IPv6Protocol
dual stack services (#91824) * api: structure change * api: defaulting, conversion, and validation * [FIX] validation: auto remove second ip/family when service changes to SingleStack * [FIX] api: defaulting, conversion, and validation * api-server: clusterIPs alloc, printers, storage and strategy * [FIX] clusterIPs default on read * alloc: auto remove second ip/family when service changes to SingleStack * api-server: repair loop handling for clusterIPs * api-server: force kubernetes default service into single stack * api-server: tie dualstack feature flag with endpoint feature flag * controller-manager: feature flag, endpoint, and endpointSlice controllers handling multi family service * [FIX] controller-manager: feature flag, endpoint, and endpointSlicecontrollers handling multi family service * kube-proxy: feature-flag, utils, proxier, and meta proxier * [FIX] kubeproxy: call both proxier at the same time * kubenet: remove forced pod IP sorting * kubectl: modify describe to include ClusterIPs, IPFamilies, and IPFamilyPolicy * e2e: fix tests that depends on IPFamily field AND add dual stack tests * e2e: fix expected error message for ClusterIP immutability * add integration tests for dualstack the third phase of dual stack is a very complex change in the API, basically it introduces Dual Stack services. Main changes are: - It pluralizes the Service IPFamily field to IPFamilies, and removes the singular field. - It introduces a new field IPFamilyPolicyType that can take 3 values to express the "dual-stack(mad)ness" of the cluster: SingleStack, PreferDualStack and RequireDualStack - It pluralizes ClusterIP to ClusterIPs. The goal is to add coverage to the services API operations, taking into account the 6 different modes a cluster can have: - single stack: IP4 or IPv6 (as of today) - dual stack: IPv4 only, IPv6 only, IPv4 - IPv6, IPv6 - IPv4 * [FIX] add integration tests for dualstack * generated data * generated files Co-authored-by: Antonio Ojea <aojea@redhat.com>
2020-10-27 04:15:59 +08:00
}
}
}
dual stack services (#91824) * api: structure change * api: defaulting, conversion, and validation * [FIX] validation: auto remove second ip/family when service changes to SingleStack * [FIX] api: defaulting, conversion, and validation * api-server: clusterIPs alloc, printers, storage and strategy * [FIX] clusterIPs default on read * alloc: auto remove second ip/family when service changes to SingleStack * api-server: repair loop handling for clusterIPs * api-server: force kubernetes default service into single stack * api-server: tie dualstack feature flag with endpoint feature flag * controller-manager: feature flag, endpoint, and endpointSlice controllers handling multi family service * [FIX] controller-manager: feature flag, endpoint, and endpointSlicecontrollers handling multi family service * kube-proxy: feature-flag, utils, proxier, and meta proxier * [FIX] kubeproxy: call both proxier at the same time * kubenet: remove forced pod IP sorting * kubectl: modify describe to include ClusterIPs, IPFamilies, and IPFamilyPolicy * e2e: fix tests that depends on IPFamily field AND add dual stack tests * e2e: fix expected error message for ClusterIP immutability * add integration tests for dualstack the third phase of dual stack is a very complex change in the API, basically it introduces Dual Stack services. Main changes are: - It pluralizes the Service IPFamily field to IPFamilies, and removes the singular field. - It introduces a new field IPFamilyPolicyType that can take 3 values to express the "dual-stack(mad)ness" of the cluster: SingleStack, PreferDualStack and RequireDualStack - It pluralizes ClusterIP to ClusterIPs. The goal is to add coverage to the services API operations, taking into account the 6 different modes a cluster can have: - single stack: IP4 or IPv6 (as of today) - dual stack: IPv4 only, IPv6 only, IPv4 - IPv6, IPv6 - IPv4 * [FIX] add integration tests for dualstack * generated data * generated files Co-authored-by: Antonio Ojea <aojea@redhat.com>
2020-10-27 04:15:59 +08:00
// find an ip that matches the family
for _, podIP := range pod.Status.PodIPs {
if (ipFamily == v1.IPv6Protocol) == utilnet.IsIPv6String(podIP.IP) {
endpointIP = podIP.IP
break
}
dual stack services (#91824) * api: structure change * api: defaulting, conversion, and validation * [FIX] validation: auto remove second ip/family when service changes to SingleStack * [FIX] api: defaulting, conversion, and validation * api-server: clusterIPs alloc, printers, storage and strategy * [FIX] clusterIPs default on read * alloc: auto remove second ip/family when service changes to SingleStack * api-server: repair loop handling for clusterIPs * api-server: force kubernetes default service into single stack * api-server: tie dualstack feature flag with endpoint feature flag * controller-manager: feature flag, endpoint, and endpointSlice controllers handling multi family service * [FIX] controller-manager: feature flag, endpoint, and endpointSlicecontrollers handling multi family service * kube-proxy: feature-flag, utils, proxier, and meta proxier * [FIX] kubeproxy: call both proxier at the same time * kubenet: remove forced pod IP sorting * kubectl: modify describe to include ClusterIPs, IPFamilies, and IPFamilyPolicy * e2e: fix tests that depends on IPFamily field AND add dual stack tests * e2e: fix expected error message for ClusterIP immutability * add integration tests for dualstack the third phase of dual stack is a very complex change in the API, basically it introduces Dual Stack services. Main changes are: - It pluralizes the Service IPFamily field to IPFamilies, and removes the singular field. - It introduces a new field IPFamilyPolicyType that can take 3 values to express the "dual-stack(mad)ness" of the cluster: SingleStack, PreferDualStack and RequireDualStack - It pluralizes ClusterIP to ClusterIPs. The goal is to add coverage to the services API operations, taking into account the 6 different modes a cluster can have: - single stack: IP4 or IPv6 (as of today) - dual stack: IPv4 only, IPv6 only, IPv4 - IPv6, IPv6 - IPv4 * [FIX] add integration tests for dualstack * generated data * generated files Co-authored-by: Antonio Ojea <aojea@redhat.com>
2020-10-27 04:15:59 +08:00
}
if endpointIP == "" {
return nil, fmt.Errorf("failed to find a matching endpoint for service %v", svc.Name)
}
return &v1.EndpointAddress{
IP: endpointIP,
NodeName: &pod.Spec.NodeName,
TargetRef: &v1.ObjectReference{
Kind: "Pod",
Namespace: pod.ObjectMeta.Namespace,
Name: pod.ObjectMeta.Name,
UID: pod.ObjectMeta.UID,
},
}, nil
}
2015-04-17 07:18:02 +08:00
// When a pod is updated, figure out what services it used to be a member of
// and what services it will be a member of, and enqueue the union of these.
2016-11-19 04:50:17 +08:00
// old and cur must be *v1.Pod types.
func (e *Controller) updatePod(old, cur interface{}) {
services := endpointutil.GetServicesToUpdateOnPodChange(e.serviceLister, e.serviceSelectorCache, old, cur)
2015-04-17 07:18:02 +08:00
for key := range services {
e.queue.AddAfter(key, e.endpointUpdatesBatchPeriod)
2015-04-17 07:18:02 +08:00
}
}
2014-11-13 23:52:13 +08:00
2015-04-17 07:18:02 +08:00
// When a pod is deleted, enqueue the services the pod used to be a member of.
2016-11-19 04:50:17 +08:00
// obj could be an *v1.Pod, or a DeletionFinalStateUnknown marker item.
func (e *Controller) deletePod(obj interface{}) {
2019-07-31 06:42:01 +08:00
pod := endpointutil.GetPodFromDeleteAction(obj)
if pod != nil {
e.addPod(pod)
}
2015-04-17 07:18:02 +08:00
}
// onServiceUpdate updates the Service Selector in the cache and queues the Service for processing.
func (e *Controller) onServiceUpdate(obj interface{}) {
key, err := controller.KeyFunc(obj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
return
}
_ = e.serviceSelectorCache.Update(key, obj.(*v1.Service).Spec.Selector)
e.queue.Add(key)
}
// onServiceDelete removes the Service Selector from the cache and queues the Service for processing.
func (e *Controller) onServiceDelete(obj interface{}) {
2018-08-03 16:41:13 +08:00
key, err := controller.KeyFunc(obj)
2015-04-17 07:18:02 +08:00
if err != nil {
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
return
2015-04-17 07:18:02 +08:00
}
2015-02-03 02:51:52 +08:00
e.serviceSelectorCache.Delete(key)
2015-04-17 07:18:02 +08:00
e.queue.Add(key)
}
func (e *Controller) onEndpointsDelete(obj interface{}) {
key, err := controller.KeyFunc(obj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
return
}
e.queue.Add(key)
}
2015-04-17 07:18:02 +08:00
// worker runs a worker thread that just dequeues items, processes them, and
// marks them done. You may run as many of these in parallel as you wish; the
// workqueue guarantees that they will not end up processing the same service
// at the same time.
2021-04-23 02:27:59 +08:00
func (e *Controller) worker(ctx context.Context) {
for e.processNextWorkItem(ctx) {
2015-04-17 07:18:02 +08:00
}
}
2021-04-23 02:27:59 +08:00
func (e *Controller) processNextWorkItem(ctx context.Context) bool {
eKey, quit := e.queue.Get()
if quit {
return false
}
defer e.queue.Done(eKey)
2021-04-23 02:27:59 +08:00
err := e.syncService(ctx, eKey.(string))
e.handleErr(err, eKey)
return true
}
func (e *Controller) handleErr(err error, key interface{}) {
if err == nil {
e.queue.Forget(key)
return
}
ns, name, keyErr := cache.SplitMetaNamespaceKey(key.(string))
if keyErr != nil {
klog.ErrorS(err, "Failed to split meta namespace cache key", "key", key)
}
if e.queue.NumRequeues(key) < maxRetries {
klog.V(2).InfoS("Error syncing endpoints, retrying", "service", klog.KRef(ns, name), "err", err)
e.queue.AddRateLimited(key)
return
}
klog.Warningf("Dropping service %q out of the queue: %v", key, err)
e.queue.Forget(key)
utilruntime.HandleError(err)
}
2021-04-23 02:27:59 +08:00
func (e *Controller) syncService(ctx context.Context, key string) error {
2015-04-17 07:18:02 +08:00
startTime := time.Now()
defer func() {
klog.V(4).Infof("Finished syncing service %q endpoints. (%v)", key, time.Since(startTime))
2015-04-17 07:18:02 +08:00
}()
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
service, err := e.serviceLister.Services(namespace).Get(name)
if err != nil {
if !errors.IsNotFound(err) {
return err
}
// Delete the corresponding endpoint, as the service has been deleted.
// TODO: Please note that this will delete an endpoint when a
// service is deleted. However, if we're down at the time when
// the service is deleted, we will miss that deletion, so this
// doesn't completely solve the problem. See #6877.
2021-04-23 02:27:59 +08:00
err = e.client.CoreV1().Endpoints(namespace).Delete(ctx, name, metav1.DeleteOptions{})
if err != nil && !errors.IsNotFound(err) {
return err
}
2019-07-31 06:42:01 +08:00
e.triggerTimeTracker.DeleteService(namespace, name)
return nil
2015-04-17 07:18:02 +08:00
}
if service.Spec.Selector == nil {
// services without a selector receive no endpoints from this controller;
// these services will receive the endpoints that are created out-of-band via the REST API.
return nil
2015-04-17 07:18:02 +08:00
}
klog.V(5).Infof("About to update endpoints for service %q", key)
pods, err := e.podLister.Pods(service.Namespace).List(labels.Set(service.Spec.Selector).AsSelectorPreValidated())
2015-04-17 07:18:02 +08:00
if err != nil {
// Since we're getting stuff from a local cache, it is
// basically impossible to get this error.
return err
2015-04-17 07:18:02 +08:00
}
2019-07-31 06:42:01 +08:00
// We call ComputeEndpointLastChangeTriggerTime here to make sure that the
// state of the trigger time tracker gets updated even if the sync turns out
// to be no-op and we don't update the endpoints object.
endpointsLastChangeTriggerTime := e.triggerTimeTracker.
2019-07-31 06:42:01 +08:00
ComputeEndpointLastChangeTriggerTime(namespace, service, pods)
subsets := []v1.EndpointSubset{}
var totalReadyEps int
var totalNotReadyEps int
2015-04-17 07:18:02 +08:00
for _, pod := range pods {
if !endpointutil.ShouldPodBeInEndpoints(pod, service.Spec.PublishNotReadyAddresses) {
klog.V(5).Infof("Pod %s/%s is not included on endpoints for Service %s/%s", pod.Namespace, pod.Name, service.Namespace, service.Name)
continue
}
2015-04-17 07:18:02 +08:00
ep, err := podToEndpointAddressForService(service, pod)
if err != nil {
// this will happen, if the cluster runs with some nodes configured as dual stack and some as not
// such as the case of an upgrade..
2022-07-14 11:24:19 +08:00
klog.V(2).Infof("Failed to find endpoint for service:%s with ClusterIP:%s on pod:%s with error:%v", service.Name, service.Spec.ClusterIP, klog.KObj(pod), err)
continue
}
epa := *ep
if endpointutil.ShouldSetHostname(pod, service) {
epa.Hostname = pod.Spec.Hostname
}
2015-04-17 07:18:02 +08:00
// Allow headless service not to have ports.
if len(service.Spec.Ports) == 0 {
if service.Spec.ClusterIP == api.ClusterIPNone {
subsets, totalReadyEps, totalNotReadyEps = addEndpointSubset(subsets, pod, epa, nil, service.Spec.PublishNotReadyAddresses)
// No need to repack subsets for headless service without ports.
}
} else {
for i := range service.Spec.Ports {
servicePort := &service.Spec.Ports[i]
portNum, err := podutil.FindPort(pod, servicePort)
if err != nil {
klog.V(4).Infof("Failed to find port for service %s/%s: %v", service.Namespace, service.Name, err)
continue
}
epp := endpointPortFromServicePort(servicePort, portNum)
var readyEps, notReadyEps int
subsets, readyEps, notReadyEps = addEndpointSubset(subsets, pod, epa, epp, service.Spec.PublishNotReadyAddresses)
totalReadyEps = totalReadyEps + readyEps
totalNotReadyEps = totalNotReadyEps + notReadyEps
}
}
2015-04-17 07:18:02 +08:00
}
subsets = endpoints.RepackSubsets(subsets)
2015-04-17 07:18:02 +08:00
// See if there's actually an update here.
currentEndpoints, err := e.endpointsLister.Endpoints(service.Namespace).Get(service.Name)
2015-04-17 07:18:02 +08:00
if err != nil {
if errors.IsNotFound(err) {
2016-11-19 04:50:17 +08:00
currentEndpoints = &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
2015-04-17 07:18:02 +08:00
Name: service.Name,
Labels: service.Labels,
},
}
} else {
return err
2014-06-07 07:40:48 +08:00
}
}
createEndpoints := len(currentEndpoints.ResourceVersion) == 0
// Compare the sorted subsets and labels
// Remove the HeadlessService label from the endpoints if it exists,
// as this won't be set on the service itself
// and will cause a false negative in this diff check.
// But first check if it has that label to avoid expensive copies.
compareLabels := currentEndpoints.Labels
if _, ok := currentEndpoints.Labels[v1.IsHeadlessService]; ok {
compareLabels = utillabels.CloneAndRemoveLabel(currentEndpoints.Labels, v1.IsHeadlessService)
}
// When comparing the subsets, we ignore the difference in ResourceVersion of Pod to avoid unnecessary Endpoints
// updates caused by Pod updates that we don't care, e.g. annotation update.
if !createEndpoints &&
endpointutil.EndpointSubsetsEqualIgnoreResourceVersion(currentEndpoints.Subsets, subsets) &&
apiequality.Semantic.DeepEqual(compareLabels, service.Labels) &&
capacityAnnotationSetCorrectly(currentEndpoints.Annotations, currentEndpoints.Subsets) {
klog.V(5).Infof("endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name)
return nil
2015-04-17 07:18:02 +08:00
}
2017-08-15 20:14:21 +08:00
newEndpoints := currentEndpoints.DeepCopy()
2015-04-17 07:18:02 +08:00
newEndpoints.Subsets = subsets
newEndpoints.Labels = service.Labels
if newEndpoints.Annotations == nil {
newEndpoints.Annotations = make(map[string]string)
}
if !endpointsLastChangeTriggerTime.IsZero() {
newEndpoints.Annotations[v1.EndpointsLastChangeTriggerTime] =
endpointsLastChangeTriggerTime.UTC().Format(time.RFC3339Nano)
} else { // No new trigger time, clear the annotation.
delete(newEndpoints.Annotations, v1.EndpointsLastChangeTriggerTime)
}
if truncateEndpoints(newEndpoints) {
newEndpoints.Annotations[v1.EndpointsOverCapacity] = truncated
} else {
delete(newEndpoints.Annotations, v1.EndpointsOverCapacity)
}
if newEndpoints.Labels == nil {
newEndpoints.Labels = make(map[string]string)
}
if !helper.IsServiceIPSet(service) {
newEndpoints.Labels = utillabels.CloneAndAddLabel(newEndpoints.Labels, v1.IsHeadlessService, "")
} else {
newEndpoints.Labels = utillabels.CloneAndRemoveLabel(newEndpoints.Labels, v1.IsHeadlessService)
}
klog.V(4).Infof("Update endpoints for %v/%v, ready: %d not ready: %d", service.Namespace, service.Name, totalReadyEps, totalNotReadyEps)
if createEndpoints {
2015-04-17 07:18:02 +08:00
// No previous endpoints, create them
2021-04-23 02:27:59 +08:00
_, err = e.client.CoreV1().Endpoints(service.Namespace).Create(ctx, newEndpoints, metav1.CreateOptions{})
2015-04-17 07:18:02 +08:00
} else {
// Pre-existing
2021-04-23 02:27:59 +08:00
_, err = e.client.CoreV1().Endpoints(service.Namespace).Update(ctx, newEndpoints, metav1.UpdateOptions{})
2015-04-17 07:18:02 +08:00
}
if err != nil {
if createEndpoints && errors.IsForbidden(err) {
// A request is forbidden primarily for two reasons:
// 1. namespace is terminating, endpoint creation is not allowed by default.
// 2. policy is misconfigured, in which case no service would function anywhere.
// Given the frequency of 1, we log at a lower level.
klog.V(5).Infof("Forbidden from creating endpoints: %v", err)
// If the namespace is terminating, creates will continue to fail. Simply drop the item.
if errors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
return nil
}
}
if createEndpoints {
e.eventRecorder.Eventf(newEndpoints, v1.EventTypeWarning, "FailedToCreateEndpoint", "Failed to create endpoint for service %v/%v: %v", service.Namespace, service.Name, err)
} else {
e.eventRecorder.Eventf(newEndpoints, v1.EventTypeWarning, "FailedToUpdateEndpoint", "Failed to update endpoint %v/%v: %v", service.Namespace, service.Name, err)
}
return err
2015-04-17 07:18:02 +08:00
}
return nil
2014-06-07 07:40:48 +08:00
}
2015-04-25 05:16:27 +08:00
// checkLeftoverEndpoints lists all currently existing endpoints and adds their
// service to the queue. This will detect endpoints that exist with no
// corresponding service; these endpoints need to be deleted. We only need to
// do this once on startup, because in steady-state these are detected (but
// some stragglers could have been left behind if the endpoint controller
// reboots).
func (e *Controller) checkLeftoverEndpoints() {
list, err := e.endpointsLister.List(labels.Everything())
2015-04-25 05:16:27 +08:00
if err != nil {
utilruntime.HandleError(fmt.Errorf("Unable to list endpoints (%v); orphaned endpoints will not be cleaned up. (They're pretty harmless, but you can restart this component if you want another attempt made.)", err))
2015-04-25 05:16:27 +08:00
return
}
for _, ep := range list {
if _, ok := ep.Annotations[resourcelock.LeaderElectionRecordAnnotationKey]; ok {
// when there are multiple controller-manager instances,
// we observe that it will delete leader-election endpoints after 5min
// and cause re-election
// so skip the delete here
// as leader-election only have endpoints without service
continue
}
2018-08-03 16:41:13 +08:00
key, err := controller.KeyFunc(ep)
2015-04-25 05:16:27 +08:00
if err != nil {
utilruntime.HandleError(fmt.Errorf("Unable to get key for endpoint %#v", ep))
2015-04-25 05:16:27 +08:00
continue
}
e.queue.Add(key)
}
}
// addEndpointSubset add the endpoints addresses and ports to the EndpointSubset.
// The addresses are added to the corresponding field, ready or not ready, depending
// on the pod status and the Service PublishNotReadyAddresses field value.
// The pod passed to this function must have already been filtered through ShouldPodBeInEndpoints.
func addEndpointSubset(subsets []v1.EndpointSubset, pod *v1.Pod, epa v1.EndpointAddress,
epp *v1.EndpointPort, tolerateUnreadyEndpoints bool) ([]v1.EndpointSubset, int, int) {
var readyEps int
var notReadyEps int
ports := []v1.EndpointPort{}
if epp != nil {
ports = append(ports, *epp)
}
if tolerateUnreadyEndpoints || podutil.IsPodReady(pod) {
subsets = append(subsets, v1.EndpointSubset{
Addresses: []v1.EndpointAddress{epa},
Ports: ports,
})
readyEps++
} else { // if it is not a ready address it has to be not ready
klog.V(5).Infof("Pod is out of service: %s/%s", pod.Namespace, pod.Name)
subsets = append(subsets, v1.EndpointSubset{
NotReadyAddresses: []v1.EndpointAddress{epa},
Ports: ports,
})
notReadyEps++
}
return subsets, readyEps, notReadyEps
}
func endpointPortFromServicePort(servicePort *v1.ServicePort, portNum int) *v1.EndpointPort {
epp := &v1.EndpointPort{
2020-11-07 09:46:32 +08:00
Name: servicePort.Name,
Port: int32(portNum),
Protocol: servicePort.Protocol,
AppProtocol: servicePort.AppProtocol,
}
return epp
}
// capacityAnnotationSetCorrectly returns false if number of endpoints is greater than maxCapacity or
// returns true if underCapacity and the annotation is not set.
func capacityAnnotationSetCorrectly(annotations map[string]string, subsets []v1.EndpointSubset) bool {
numEndpoints := 0
for _, subset := range subsets {
numEndpoints += len(subset.Addresses) + len(subset.NotReadyAddresses)
}
if numEndpoints > maxCapacity {
// If subsets are over capacity, they must be truncated so consider
// the annotation as not set correctly
return false
}
_, ok := annotations[v1.EndpointsOverCapacity]
return !ok
}
// truncateEndpoints by best effort will distribute the endpoints over the subsets based on the proportion
// of endpoints per subset and will prioritize Ready Endpoints over NotReady Endpoints.
func truncateEndpoints(endpoints *v1.Endpoints) bool {
totalReady := 0
totalNotReady := 0
for _, subset := range endpoints.Subsets {
totalReady += len(subset.Addresses)
totalNotReady += len(subset.NotReadyAddresses)
}
if totalReady+totalNotReady <= maxCapacity {
return false
}
truncateReady := false
max := maxCapacity - totalReady
numTotal := totalNotReady
if totalReady > maxCapacity {
truncateReady = true
max = maxCapacity
numTotal = totalReady
}
canBeAdded := max
for i := range endpoints.Subsets {
subset := endpoints.Subsets[i]
numInSubset := len(subset.Addresses)
if !truncateReady {
numInSubset = len(subset.NotReadyAddresses)
}
// The number of endpoints per subset will be based on the propotion of endpoints
// in this subset versus the total number of endpoints. The proportion of endpoints
// will be rounded up which most likely will lead to the last subset having less
// endpoints than the expected proportion.
toBeAdded := int(math.Ceil((float64(numInSubset) / float64(numTotal)) * float64(max)))
// If there is not enough endpoints for the last subset, ensure only the number up
// to the capacity are added
if toBeAdded > canBeAdded {
toBeAdded = canBeAdded
}
if truncateReady {
// Truncate ready Addresses to allocated proportion and truncate all not ready
// addresses
subset.Addresses = addressSubset(subset.Addresses, toBeAdded)
subset.NotReadyAddresses = []v1.EndpointAddress{}
canBeAdded -= len(subset.Addresses)
} else {
// Only truncate the not ready addresses
subset.NotReadyAddresses = addressSubset(subset.NotReadyAddresses, toBeAdded)
canBeAdded -= len(subset.NotReadyAddresses)
}
endpoints.Subsets[i] = subset
}
return true
}
// addressSubset takes a list of addresses and returns a subset if the length is greater
// than the maxNum. If less than the maxNum, the entire list is returned.
func addressSubset(addresses []v1.EndpointAddress, maxNum int) []v1.EndpointAddress {
if len(addresses) <= maxNum {
return addresses
}
return addresses[0:maxNum]
}