2014-06-07 07:40:48 +08:00
/ *
2016-06-03 08:25:58 +08:00
Copyright 2014 The Kubernetes Authors .
2014-06-07 07:40:48 +08:00
Licensed under the Apache License , Version 2.0 ( the "License" ) ;
you may not use this file except in compliance with the License .
You may obtain a copy of the License at
http : //www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing , software
distributed under the License is distributed on an "AS IS" BASIS ,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
See the License for the specific language governing permissions and
limitations under the License .
* /
2014-06-19 07:01:49 +08:00
2015-10-10 11:58:57 +08:00
package endpoint
2014-06-07 07:40:48 +08:00
import (
2016-09-20 11:16:40 +08:00
"fmt"
2015-03-21 05:24:43 +08:00
"reflect"
2016-05-07 04:15:49 +08:00
"strconv"
2015-04-17 07:18:02 +08:00
"time"
2014-06-07 07:40:48 +08:00
2017-06-23 02:24:23 +08:00
"k8s.io/api/core/v1"
2017-05-13 01:01:54 +08:00
apiequality "k8s.io/apimachinery/pkg/api/equality"
2017-01-14 01:48:50 +08:00
"k8s.io/apimachinery/pkg/api/errors"
2017-01-11 22:09:48 +08:00
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
2017-06-24 04:56:37 +08:00
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
2017-07-11 01:54:48 +08:00
"k8s.io/client-go/kubernetes/scheme"
2017-06-24 04:56:37 +08:00
corelisters "k8s.io/client-go/listers/core/v1"
2017-01-24 22:11:51 +08:00
"k8s.io/client-go/tools/cache"
2017-07-08 04:59:32 +08:00
"k8s.io/client-go/tools/leaderelection/resourcelock"
2017-01-27 23:20:40 +08:00
"k8s.io/client-go/util/workqueue"
2017-06-19 23:47:29 +08:00
"k8s.io/kubernetes/pkg/api"
2016-11-19 04:50:17 +08:00
"k8s.io/kubernetes/pkg/api/v1/endpoints"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
2017-04-13 03:49:17 +08:00
"k8s.io/kubernetes/pkg/controller"
2016-04-14 02:38:32 +08:00
"k8s.io/kubernetes/pkg/util/metrics"
2016-09-20 11:16:40 +08:00
"github.com/golang/glog"
2014-06-07 07:40:48 +08:00
)
2015-04-17 07:18:02 +08:00
const (
2017-06-20 23:50:37 +08:00
// maxRetries is the number of times a service will be retried before it is dropped out of the queue.
// With the current rate-limiter in use (5ms*2^(maxRetries-1)) the following numbers represent the
// sequence of delays between successive queuings of a service.
//
// 5ms, 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1.3s, 2.6s, 5.1s, 10.2s, 20.4s, 41s, 82s
maxRetries = 15
2016-05-07 04:15:49 +08:00
// An annotation on the Service denoting if the endpoints controller should
// go ahead and create endpoints for unready pods. This annotation is
2016-10-27 04:44:07 +08:00
// currently only used by StatefulSets, where we need the pod to be DNS
2017-01-03 21:00:15 +08:00
// resolvable during initialization and termination. In this situation we
// create a headless Service just for the StatefulSet, and clients shouldn't
// be using this Service for anything so unready endpoints don't matter.
// Endpoints of these Services retain their DNS records and continue
// receiving traffic for the Service from the moment the kubelet starts all
// containers in the pod and marks it "Running", till the kubelet stops all
// containers and deletes the pod from the apiserver.
2017-08-10 06:17:56 +08:00
// This field is deprecated. v1.Service.PublishNotReadyAddresses will replace it
// subsequent releases.
2016-05-07 04:15:49 +08:00
TolerateUnreadyEndpointsAnnotation = "service.alpha.kubernetes.io/tolerate-unready-endpoints"
2015-04-17 07:18:02 +08:00
)
var (
2016-09-15 02:35:38 +08:00
keyFunc = cache . DeletionHandlingMetaNamespaceKeyFunc
2015-04-17 07:18:02 +08:00
)
// NewEndpointController returns a new *EndpointController.
2017-06-19 23:47:29 +08:00
func NewEndpointController ( podInformer coreinformers . PodInformer , serviceInformer coreinformers . ServiceInformer ,
endpointsInformer coreinformers . EndpointsInformer , client clientset . Interface ) * EndpointController {
2016-10-13 20:56:07 +08:00
if client != nil && client . Core ( ) . RESTClient ( ) . GetRateLimiter ( ) != nil {
metrics . RegisterMetricAndTrackRateLimiterUsage ( "endpoint_controller" , client . Core ( ) . RESTClient ( ) . GetRateLimiter ( ) )
2016-04-14 02:38:32 +08:00
}
2015-04-17 07:18:02 +08:00
e := & EndpointController {
2017-06-19 23:47:29 +08:00
client : client ,
queue : workqueue . NewNamedRateLimitingQueue ( workqueue . DefaultControllerRateLimiter ( ) , "endpoint" ) ,
workerLoopPeriod : time . Second ,
2015-04-17 07:18:02 +08:00
}
2017-06-20 20:48:57 +08:00
serviceInformer . Informer ( ) . AddEventHandler ( cache . ResourceEventHandlerFuncs {
AddFunc : e . enqueueService ,
UpdateFunc : func ( old , cur interface { } ) {
e . enqueueService ( cur )
2015-04-17 07:18:02 +08:00
} ,
2017-06-20 20:48:57 +08:00
DeleteFunc : e . enqueueService ,
} )
2017-02-08 09:25:52 +08:00
e . serviceLister = serviceInformer . Lister ( )
e . servicesSynced = serviceInformer . Informer ( ) . HasSynced
2015-04-17 07:18:02 +08:00
2017-02-08 09:25:52 +08:00
podInformer . Informer ( ) . AddEventHandler ( cache . ResourceEventHandlerFuncs {
2016-04-15 02:00:52 +08:00
AddFunc : e . addPod ,
UpdateFunc : e . updatePod ,
DeleteFunc : e . deletePod ,
} )
2017-02-08 09:25:52 +08:00
e . podLister = podInformer . Lister ( )
e . podsSynced = podInformer . Informer ( ) . HasSynced
2015-04-17 07:18:02 +08:00
2017-06-19 23:47:29 +08:00
e . endpointsLister = endpointsInformer . Lister ( )
e . endpointsSynced = endpointsInformer . Informer ( ) . HasSynced
2015-04-17 07:18:02 +08:00
return e
}
2015-02-05 03:21:33 +08:00
// EndpointController manages selector-based service endpoints.
2014-06-07 07:40:48 +08:00
type EndpointController struct {
2016-09-20 21:43:11 +08:00
client clientset . Interface
2015-04-17 07:18:02 +08:00
2017-02-08 09:25:52 +08:00
// serviceLister is able to list/get services and is populated by the shared informer passed to
// NewEndpointController.
serviceLister corelisters . ServiceLister
// servicesSynced returns true if the service shared informer has been synced at least once.
// Added as a member to the struct to allow injection for testing.
servicesSynced cache . InformerSynced
2015-04-17 07:18:02 +08:00
2017-02-08 09:25:52 +08:00
// podLister is able to list/get pods and is populated by the shared informer passed to
// NewEndpointController.
podLister corelisters . PodLister
// podsSynced returns true if the pod shared informer has been synced at least once.
// Added as a member to the struct to allow injection for testing.
podsSynced cache . InformerSynced
2016-04-15 02:00:52 +08:00
2017-06-19 23:47:29 +08:00
// endpointsLister is able to list/get endpoints and is populated by the shared informer passed to
// NewEndpointController.
endpointsLister corelisters . EndpointsLister
// endpointsSynced returns true if the endpoints shared informer has been synced at least once.
// Added as a member to the struct to allow injection for testing.
endpointsSynced cache . InformerSynced
2015-04-17 07:18:02 +08:00
// Services that need to be updated. A channel is inappropriate here,
// because it allows services with lots of pods to be serviced much
// more often than services with few pods; it also would cause a
// service that's inserted multiple times to be processed more than
// necessary.
2016-09-15 18:30:06 +08:00
queue workqueue . RateLimitingInterface
2017-06-19 23:47:29 +08:00
// workerLoopPeriod is the time between worker runs. The workers process the queue of service and pod changes.
workerLoopPeriod time . Duration
2014-06-07 07:40:48 +08:00
}
2015-04-17 07:18:02 +08:00
// Runs e; will not return until stopCh is closed. workers determines how many
// endpoints will be handled in parallel.
func ( e * EndpointController ) Run ( workers int , stopCh <- chan struct { } ) {
2016-01-15 15:32:10 +08:00
defer utilruntime . HandleCrash ( )
2016-09-15 18:30:06 +08:00
defer e . queue . ShutDown ( )
2017-04-13 03:49:17 +08:00
glog . Infof ( "Starting endpoint controller" )
defer glog . Infof ( "Shutting down endpoint controller" )
2017-06-19 23:47:29 +08:00
if ! controller . WaitForCacheSync ( "endpoint" , stopCh , e . podsSynced , e . servicesSynced , e . endpointsSynced ) {
2016-09-15 18:30:06 +08:00
return
}
2015-04-17 07:18:02 +08:00
for i := 0 ; i < workers ; i ++ {
2017-06-19 23:47:29 +08:00
go wait . Until ( e . worker , e . workerLoopPeriod , stopCh )
2014-07-12 15:15:30 +08:00
}
2017-04-13 03:49:17 +08:00
2015-04-25 05:16:27 +08:00
go func ( ) {
2016-01-15 15:32:10 +08:00
defer utilruntime . HandleCrash ( )
2015-04-25 05:16:27 +08:00
e . checkLeftoverEndpoints ( )
} ( )
2016-04-15 02:00:52 +08:00
2015-04-17 07:18:02 +08:00
<- stopCh
2014-07-12 15:15:30 +08:00
}
2016-11-19 04:50:17 +08:00
func ( e * EndpointController ) getPodServiceMemberships ( pod * v1 . Pod ) ( sets . String , error ) {
2015-09-10 01:45:01 +08:00
set := sets . String { }
2017-02-08 09:25:52 +08:00
services , err := e . serviceLister . GetPodServices ( pod )
2014-06-07 07:40:48 +08:00
if err != nil {
2015-04-17 07:18:02 +08:00
// don't log this error because this function makes pointless
// errors when no services match.
return set , nil
2014-06-07 07:40:48 +08:00
}
2015-04-17 07:18:02 +08:00
for i := range services {
2016-09-22 21:55:24 +08:00
key , err := keyFunc ( services [ i ] )
2015-04-17 07:18:02 +08:00
if err != nil {
return nil , err
2014-10-28 08:56:33 +08:00
}
2015-04-17 07:18:02 +08:00
set . Insert ( key )
}
return set , nil
}
// When a pod is added, figure out what services it will be a member of and
2016-11-19 04:50:17 +08:00
// enqueue them. obj must have *v1.Pod type.
2015-04-17 07:18:02 +08:00
func ( e * EndpointController ) addPod ( obj interface { } ) {
2016-11-19 04:50:17 +08:00
pod := obj . ( * v1 . Pod )
2015-04-17 07:18:02 +08:00
services , err := e . getPodServiceMemberships ( pod )
if err != nil {
2016-09-15 18:30:06 +08:00
utilruntime . HandleError ( fmt . Errorf ( "Unable to get pod %v/%v's service memberships: %v" , pod . Namespace , pod . Name , err ) )
2015-04-17 07:18:02 +08:00
return
}
for key := range services {
e . queue . Add ( key )
}
}
2017-08-19 04:09:46 +08:00
func podToEndpointAddress ( pod * v1 . Pod ) * v1 . EndpointAddress {
return & v1 . EndpointAddress {
IP : pod . Status . PodIP ,
NodeName : & pod . Spec . NodeName ,
TargetRef : & v1 . ObjectReference {
Kind : "Pod" ,
Namespace : pod . ObjectMeta . Namespace ,
Name : pod . ObjectMeta . Name ,
UID : pod . ObjectMeta . UID ,
ResourceVersion : pod . ObjectMeta . ResourceVersion ,
} }
}
2017-08-23 03:19:26 +08:00
func podChanged ( oldPod , newPod * v1 . Pod ) bool {
// If the pod's readiness has changed, the associated endpoint address
// will move from the unready endpoints set to the ready endpoints.
// So for the purposes of an endpoint, a readiness change on a pod
// means we have a changed pod.
if podutil . IsPodReady ( oldPod ) != podutil . IsPodReady ( newPod ) {
return true
}
2017-08-19 04:09:46 +08:00
// Convert the pod to an EndpointAddress, clear inert fields,
// and see if they are the same.
newEndpointAddress := podToEndpointAddress ( newPod )
oldEndpointAddress := podToEndpointAddress ( oldPod )
// Ignore the ResourceVersion because it changes
// with every pod update. This allows the comparison to
// show equality if all other relevant fields match.
newEndpointAddress . TargetRef . ResourceVersion = ""
oldEndpointAddress . TargetRef . ResourceVersion = ""
if reflect . DeepEqual ( newEndpointAddress , oldEndpointAddress ) {
// The pod has not changed in any way that impacts the endpoints
return false
}
return true
}
func determineNeededServiceUpdates ( oldServices , services sets . String , podChanged bool ) sets . String {
if podChanged {
// if the labels and pod changed, all services need to be updated
services = services . Union ( oldServices )
} else {
// if only the labels changed, services not common to
// both the new and old service set (i.e the disjunctive union)
// need to be updated
services = services . Difference ( oldServices ) . Union ( oldServices . Difference ( services ) )
}
return services
}
2015-04-17 07:18:02 +08:00
// When a pod is updated, figure out what services it used to be a member of
// and what services it will be a member of, and enqueue the union of these.
2016-11-19 04:50:17 +08:00
// old and cur must be *v1.Pod types.
2015-04-17 07:18:02 +08:00
func ( e * EndpointController ) updatePod ( old , cur interface { } ) {
2016-11-19 04:50:17 +08:00
newPod := cur . ( * v1 . Pod )
oldPod := old . ( * v1 . Pod )
2016-08-09 21:57:21 +08:00
if newPod . ResourceVersion == oldPod . ResourceVersion {
// Periodic resync will send update events for all known pods.
// Two different versions of the same pod will always have different RVs.
2015-04-17 07:18:02 +08:00
return
}
2017-08-19 04:09:46 +08:00
2017-08-23 03:19:26 +08:00
podChangedFlag := podChanged ( oldPod , newPod )
2017-08-19 04:09:46 +08:00
// Check if the pod labels have changed, indicating a possibe
// change in the service membership
labelsChanged := false
if ! reflect . DeepEqual ( newPod . Labels , oldPod . Labels ) ||
! hostNameAndDomainAreEqual ( newPod , oldPod ) {
labelsChanged = true
}
// If both the pod and labels are unchanged, no update is needed
2017-08-23 03:19:26 +08:00
if ! podChangedFlag && ! labelsChanged {
2017-08-19 04:09:46 +08:00
return
}
2015-04-17 07:18:02 +08:00
services , err := e . getPodServiceMemberships ( newPod )
if err != nil {
2016-09-15 18:30:06 +08:00
utilruntime . HandleError ( fmt . Errorf ( "Unable to get pod %v/%v's service memberships: %v" , newPod . Namespace , newPod . Name , err ) )
2015-04-17 07:18:02 +08:00
return
}
2014-11-19 01:49:00 +08:00
2017-08-19 04:09:46 +08:00
if labelsChanged {
2015-04-17 07:18:02 +08:00
oldServices , err := e . getPodServiceMemberships ( oldPod )
2014-06-07 07:40:48 +08:00
if err != nil {
2016-09-15 18:30:06 +08:00
utilruntime . HandleError ( fmt . Errorf ( "Unable to get pod %v/%v's service memberships: %v" , oldPod . Namespace , oldPod . Name , err ) )
2015-04-17 07:18:02 +08:00
return
2014-06-07 07:40:48 +08:00
}
2017-08-23 03:19:26 +08:00
services = determineNeededServiceUpdates ( oldServices , services , podChangedFlag )
2015-04-17 07:18:02 +08:00
}
2017-08-19 04:09:46 +08:00
2015-04-17 07:18:02 +08:00
for key := range services {
e . queue . Add ( key )
}
}
2014-11-13 23:52:13 +08:00
2016-11-19 04:50:17 +08:00
func hostNameAndDomainAreEqual ( pod1 , pod2 * v1 . Pod ) bool {
2017-04-06 15:32:21 +08:00
return pod1 . Spec . Hostname == pod2 . Spec . Hostname &&
pod1 . Spec . Subdomain == pod2 . Spec . Subdomain
2016-02-03 02:59:54 +08:00
}
2015-04-17 07:18:02 +08:00
// When a pod is deleted, enqueue the services the pod used to be a member of.
2016-11-19 04:50:17 +08:00
// obj could be an *v1.Pod, or a DeletionFinalStateUnknown marker item.
2015-04-17 07:18:02 +08:00
func ( e * EndpointController ) deletePod ( obj interface { } ) {
2016-11-19 04:50:17 +08:00
if _ , ok := obj . ( * v1 . Pod ) ; ok {
2015-04-17 07:18:02 +08:00
// Enqueue all the services that the pod used to be a member
// of. This happens to be exactly the same thing we do when a
// pod is added.
e . addPod ( obj )
return
}
2017-06-20 20:48:57 +08:00
// If we reached here it means the pod was deleted but its final state is unrecorded.
tombstone , ok := obj . ( cache . DeletedFinalStateUnknown )
if ! ok {
utilruntime . HandleError ( fmt . Errorf ( "Couldn't get object from tombstone %#v" , obj ) )
2016-05-29 20:44:20 +08:00
return
2015-04-17 07:18:02 +08:00
}
2017-06-20 20:48:57 +08:00
pod , ok := tombstone . Obj . ( * v1 . Pod )
if ! ok {
utilruntime . HandleError ( fmt . Errorf ( "Tombstone contained object that is not a Pod: %#v" , obj ) )
return
}
glog . V ( 4 ) . Infof ( "Enqueuing services of deleted pod %s having final state unrecorded" , pod . Name )
e . addPod ( pod )
2015-04-17 07:18:02 +08:00
}
2016-11-19 04:50:17 +08:00
// obj could be an *v1.Service, or a DeletionFinalStateUnknown marker item.
2015-04-17 07:18:02 +08:00
func ( e * EndpointController ) enqueueService ( obj interface { } ) {
key , err := keyFunc ( obj )
if err != nil {
2016-09-15 18:30:06 +08:00
utilruntime . HandleError ( fmt . Errorf ( "Couldn't get key for object %+v: %v" , obj , err ) )
2016-05-29 20:44:20 +08:00
return
2015-04-17 07:18:02 +08:00
}
2015-02-03 02:51:52 +08:00
2015-04-17 07:18:02 +08:00
e . queue . Add ( key )
}
// worker runs a worker thread that just dequeues items, processes them, and
// marks them done. You may run as many of these in parallel as you wish; the
// workqueue guarantees that they will not end up processing the same service
// at the same time.
func ( e * EndpointController ) worker ( ) {
2016-09-15 18:30:06 +08:00
for e . processNextWorkItem ( ) {
2015-04-17 07:18:02 +08:00
}
}
2015-03-21 05:24:43 +08:00
2016-09-15 18:30:06 +08:00
func ( e * EndpointController ) processNextWorkItem ( ) bool {
eKey , quit := e . queue . Get ( )
if quit {
return false
}
defer e . queue . Done ( eKey )
err := e . syncService ( eKey . ( string ) )
2017-06-20 23:50:37 +08:00
e . handleErr ( err , eKey )
return true
}
func ( e * EndpointController ) handleErr ( err error , key interface { } ) {
2016-09-15 18:30:06 +08:00
if err == nil {
2017-06-20 23:50:37 +08:00
e . queue . Forget ( key )
return
2016-09-15 18:30:06 +08:00
}
2017-06-20 23:50:37 +08:00
if e . queue . NumRequeues ( key ) < maxRetries {
glog . V ( 2 ) . Infof ( "Error syncing endpoints for service %q: %v" , key , err )
e . queue . AddRateLimited ( key )
return
}
2016-09-15 18:30:06 +08:00
2017-06-20 23:50:37 +08:00
glog . Warningf ( "Dropping service %q out of the queue: %v" , key , err )
e . queue . Forget ( key )
utilruntime . HandleError ( err )
2016-09-15 18:30:06 +08:00
}
func ( e * EndpointController ) syncService ( key string ) error {
2015-04-17 07:18:02 +08:00
startTime := time . Now ( )
defer func ( ) {
glog . V ( 4 ) . Infof ( "Finished syncing service %q endpoints. (%v)" , key , time . Now ( ) . Sub ( startTime ) )
} ( )
2016-03-16 16:47:30 +08:00
2017-02-08 09:25:52 +08:00
namespace , name , err := cache . SplitMetaNamespaceKey ( key )
if err != nil {
return err
}
service , err := e . serviceLister . Services ( namespace ) . Get ( name )
if err != nil {
2015-04-17 07:18:02 +08:00
// Delete the corresponding endpoint, as the service has been deleted.
// TODO: Please note that this will delete an endpoint when a
// service is deleted. However, if we're down at the time when
// the service is deleted, we will miss that deletion, so this
// doesn't completely solve the problem. See #6877.
2016-09-20 21:43:11 +08:00
err = e . client . Core ( ) . Endpoints ( namespace ) . Delete ( name , nil )
2015-04-17 07:18:02 +08:00
if err != nil && ! errors . IsNotFound ( err ) {
2016-09-15 18:30:06 +08:00
return err
2015-04-17 07:18:02 +08:00
}
2016-09-15 18:30:06 +08:00
return nil
2015-04-17 07:18:02 +08:00
}
if service . Spec . Selector == nil {
// services without a selector receive no endpoints from this controller;
// these services will receive the endpoints that are created out-of-band via the REST API.
2016-09-15 18:30:06 +08:00
return nil
2015-04-17 07:18:02 +08:00
}
glog . V ( 5 ) . Infof ( "About to update endpoints for service %q" , key )
2017-02-08 09:25:52 +08:00
pods , err := e . podLister . Pods ( service . Namespace ) . List ( labels . Set ( service . Spec . Selector ) . AsSelectorPreValidated ( ) )
2015-04-17 07:18:02 +08:00
if err != nil {
// Since we're getting stuff from a local cache, it is
// basically impossible to get this error.
2016-09-15 18:30:06 +08:00
return err
2015-04-17 07:18:02 +08:00
}
2016-05-07 04:15:49 +08:00
var tolerateUnreadyEndpoints bool
if v , ok := service . Annotations [ TolerateUnreadyEndpointsAnnotation ] ; ok {
b , err := strconv . ParseBool ( v )
if err == nil {
tolerateUnreadyEndpoints = b
} else {
2016-09-15 18:30:06 +08:00
utilruntime . HandleError ( fmt . Errorf ( "Failed to parse annotation %v: %v" , TolerateUnreadyEndpointsAnnotation , err ) )
2016-05-07 04:15:49 +08:00
}
}
2017-06-09 23:22:37 +08:00
subsets := [ ] v1 . EndpointSubset { }
var totalReadyEps int = 0
var totalNotReadyEps int = 0
2015-04-17 07:18:02 +08:00
2017-06-09 23:22:37 +08:00
for _ , pod := range pods {
if len ( pod . Status . PodIP ) == 0 {
glog . V ( 5 ) . Infof ( "Failed to find an IP for pod %s/%s" , pod . Namespace , pod . Name )
continue
}
if ! tolerateUnreadyEndpoints && pod . DeletionTimestamp != nil {
glog . V ( 5 ) . Infof ( "Pod is being deleted %s/%s" , pod . Namespace , pod . Name )
continue
}
2015-04-17 07:18:02 +08:00
2017-08-19 04:09:46 +08:00
epa := * podToEndpointAddress ( pod )
2017-06-09 23:22:37 +08:00
hostname := pod . Spec . Hostname
if len ( hostname ) > 0 && pod . Spec . Subdomain == service . Name && service . Namespace == pod . Namespace {
epa . Hostname = hostname
}
2015-04-17 07:18:02 +08:00
2017-06-09 23:22:37 +08:00
// Allow headless service not to have ports.
if len ( service . Spec . Ports ) == 0 {
if service . Spec . ClusterIP == api . ClusterIPNone {
epp := v1 . EndpointPort { Port : 0 , Protocol : v1 . ProtocolTCP }
subsets , totalReadyEps , totalNotReadyEps = addEndpointSubset ( subsets , pod , epa , epp , tolerateUnreadyEndpoints )
2016-04-15 01:45:29 +08:00
}
2017-06-09 23:22:37 +08:00
} else {
for i := range service . Spec . Ports {
servicePort := & service . Spec . Ports [ i ]
portName := servicePort . Name
portProto := servicePort . Protocol
portNum , err := podutil . FindPort ( pod , servicePort )
if err != nil {
glog . V ( 4 ) . Infof ( "Failed to find port for service %s/%s: %v" , service . Namespace , service . Name , err )
continue
}
var readyEps , notReadyEps int
epp := v1 . EndpointPort { Name : portName , Port : int32 ( portNum ) , Protocol : portProto }
subsets , readyEps , notReadyEps = addEndpointSubset ( subsets , pod , epa , epp , tolerateUnreadyEndpoints )
totalReadyEps = totalReadyEps + readyEps
totalNotReadyEps = totalNotReadyEps + notReadyEps
2015-09-10 09:28:53 +08:00
}
2014-09-27 04:34:55 +08:00
}
2015-04-17 07:18:02 +08:00
}
subsets = endpoints . RepackSubsets ( subsets )
2014-09-27 04:34:55 +08:00
2015-04-17 07:18:02 +08:00
// See if there's actually an update here.
2017-06-19 23:47:29 +08:00
currentEndpoints , err := e . endpointsLister . Endpoints ( service . Namespace ) . Get ( service . Name )
2015-04-17 07:18:02 +08:00
if err != nil {
if errors . IsNotFound ( err ) {
2016-11-19 04:50:17 +08:00
currentEndpoints = & v1 . Endpoints {
2017-01-17 11:38:19 +08:00
ObjectMeta : metav1 . ObjectMeta {
2015-04-17 07:18:02 +08:00
Name : service . Name ,
Labels : service . Labels ,
} ,
}
2014-09-27 04:34:55 +08:00
} else {
2016-09-15 18:30:06 +08:00
return err
2014-06-07 07:40:48 +08:00
}
}
2016-02-03 02:59:54 +08:00
2017-05-13 01:01:54 +08:00
createEndpoints := len ( currentEndpoints . ResourceVersion ) == 0
if ! createEndpoints &&
apiequality . Semantic . DeepEqual ( currentEndpoints . Subsets , subsets ) &&
apiequality . Semantic . DeepEqual ( currentEndpoints . Labels , service . Labels ) {
2015-04-17 07:18:02 +08:00
glog . V ( 5 ) . Infof ( "endpoints are equal for %s/%s, skipping update" , service . Namespace , service . Name )
2016-09-15 18:30:06 +08:00
return nil
2015-04-17 07:18:02 +08:00
}
2017-07-11 01:54:48 +08:00
copy , err := scheme . Scheme . DeepCopy ( currentEndpoints )
2017-06-19 23:47:29 +08:00
if err != nil {
return err
}
newEndpoints := copy . ( * v1 . Endpoints )
2015-04-17 07:18:02 +08:00
newEndpoints . Subsets = subsets
newEndpoints . Labels = service . Labels
2016-02-03 02:59:54 +08:00
if newEndpoints . Annotations == nil {
newEndpoints . Annotations = make ( map [ string ] string )
}
2016-08-12 23:39:56 +08:00
2017-06-09 23:22:37 +08:00
glog . V ( 4 ) . Infof ( "Update endpoints for %v/%v, ready: %d not ready: %d" , service . Namespace , service . Name , totalReadyEps , totalNotReadyEps )
2016-08-12 23:39:56 +08:00
if createEndpoints {
2015-04-17 07:18:02 +08:00
// No previous endpoints, create them
2016-09-20 21:43:11 +08:00
_ , err = e . client . Core ( ) . Endpoints ( service . Namespace ) . Create ( newEndpoints )
2015-04-17 07:18:02 +08:00
} else {
// Pre-existing
2016-09-20 21:43:11 +08:00
_ , err = e . client . Core ( ) . Endpoints ( service . Namespace ) . Update ( newEndpoints )
2015-04-17 07:18:02 +08:00
}
if err != nil {
2016-08-12 23:39:56 +08:00
if createEndpoints && errors . IsForbidden ( err ) {
// A request is forbidden primarily for two reasons:
// 1. namespace is terminating, endpoint creation is not allowed by default.
// 2. policy is misconfigured, in which case no service would function anywhere.
// Given the frequency of 1, we log at a lower level.
glog . V ( 5 ) . Infof ( "Forbidden from creating endpoints: %v" , err )
}
2016-09-15 18:30:06 +08:00
return err
2015-04-17 07:18:02 +08:00
}
2016-09-15 18:30:06 +08:00
return nil
2014-06-07 07:40:48 +08:00
}
2014-08-11 15:34:59 +08:00
2015-04-25 05:16:27 +08:00
// checkLeftoverEndpoints lists all currently existing endpoints and adds their
// service to the queue. This will detect endpoints that exist with no
// corresponding service; these endpoints need to be deleted. We only need to
// do this once on startup, because in steady-state these are detected (but
// some stragglers could have been left behind if the endpoint controller
// reboots).
func ( e * EndpointController ) checkLeftoverEndpoints ( ) {
2017-06-19 23:47:29 +08:00
list , err := e . endpointsLister . List ( labels . Everything ( ) )
2015-04-25 05:16:27 +08:00
if err != nil {
2016-09-15 18:30:06 +08:00
utilruntime . HandleError ( fmt . Errorf ( "Unable to list endpoints (%v); orphaned endpoints will not be cleaned up. (They're pretty harmless, but you can restart this component if you want another attempt made.)" , err ) )
2015-04-25 05:16:27 +08:00
return
}
2017-06-19 23:47:29 +08:00
for _ , ep := range list {
2017-05-08 17:16:33 +08:00
if _ , ok := ep . Annotations [ resourcelock . LeaderElectionRecordAnnotationKey ] ; ok {
// when there are multiple controller-manager instances,
// we observe that it will delete leader-election endpoints after 5min
// and cause re-election
// so skip the delete here
// as leader-election only have endpoints without service
continue
}
2015-04-25 05:16:27 +08:00
key , err := keyFunc ( ep )
if err != nil {
2016-09-15 18:30:06 +08:00
utilruntime . HandleError ( fmt . Errorf ( "Unable to get key for endpoint %#v" , ep ) )
2015-04-25 05:16:27 +08:00
continue
}
e . queue . Add ( key )
}
}
2017-06-09 23:22:37 +08:00
func addEndpointSubset ( subsets [ ] v1 . EndpointSubset , pod * v1 . Pod , epa v1 . EndpointAddress ,
epp v1 . EndpointPort , tolerateUnreadyEndpoints bool ) ( [ ] v1 . EndpointSubset , int , int ) {
var readyEps int = 0
var notReadyEps int = 0
if tolerateUnreadyEndpoints || podutil . IsPodReady ( pod ) {
subsets = append ( subsets , v1 . EndpointSubset {
Addresses : [ ] v1 . EndpointAddress { epa } ,
Ports : [ ] v1 . EndpointPort { epp } ,
} )
readyEps ++
2017-06-14 15:54:33 +08:00
} else if shouldPodBeInEndpoints ( pod ) {
2017-06-09 23:22:37 +08:00
glog . V ( 5 ) . Infof ( "Pod is out of service: %v/%v" , pod . Namespace , pod . Name )
subsets = append ( subsets , v1 . EndpointSubset {
NotReadyAddresses : [ ] v1 . EndpointAddress { epa } ,
Ports : [ ] v1 . EndpointPort { epp } ,
} )
notReadyEps ++
}
return subsets , readyEps , notReadyEps
}
2017-06-14 15:54:33 +08:00
func shouldPodBeInEndpoints ( pod * v1 . Pod ) bool {
switch pod . Spec . RestartPolicy {
case v1 . RestartPolicyNever :
return pod . Status . Phase != v1 . PodFailed && pod . Status . Phase != v1 . PodSucceeded
case v1 . RestartPolicyOnFailure :
return pod . Status . Phase != v1 . PodSucceeded
default :
return true
}
}