247 lines
8.7 KiB
Go
247 lines
8.7 KiB
Go
/*
|
|
Copyright 2019 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package proxy
|
|
|
|
import (
|
|
v1 "k8s.io/api/core/v1"
|
|
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
|
"k8s.io/klog/v2"
|
|
"k8s.io/kubernetes/pkg/features"
|
|
)
|
|
|
|
// CategorizeEndpoints returns:
|
|
//
|
|
// - The service's usable Cluster-traffic-policy endpoints (taking topology into account, if
|
|
// relevant). This will be nil if the service does not ever use Cluster traffic policy.
|
|
//
|
|
// - The service's usable Local-traffic-policy endpoints. This will be nil if the
|
|
// service does not ever use Local traffic policy.
|
|
//
|
|
// - The combined list of all endpoints reachable from this node (which is the union of the
|
|
// previous two lists, but in the case where it is identical to one or the other, we avoid
|
|
// allocating a separate list).
|
|
//
|
|
// - An indication of whether the service has any endpoints reachable from anywhere in the
|
|
// cluster. (This may be true even if allReachableEndpoints is empty.)
|
|
//
|
|
// "Usable endpoints" means Ready endpoints by default, but will fall back to
|
|
// Serving-Terminating endpoints (independently for Cluster and Local) if no Ready
|
|
// endpoints are available.
|
|
//
|
|
// Note: NodeTopologyConfig.handleNodeEvent (pkg/proxy/config) filters topology labels
|
|
// before notifying proxiers. If you modify the logic over here to watch other endpoint
|
|
// types or labels, ensure the filtering logic in NodeTopologyConfig is updated accordingly.
|
|
func CategorizeEndpoints(endpoints []Endpoint, svcInfo ServicePort, nodeName string, topologyLabels map[string]string) (clusterEndpoints, localEndpoints, allReachableEndpoints []Endpoint, hasAnyEndpoints bool) {
|
|
if len(endpoints) == 0 {
|
|
// If there are no endpoints, we have nothing to categorize
|
|
return
|
|
}
|
|
|
|
var topologyMode string
|
|
var useServingTerminatingEndpoints bool
|
|
|
|
if svcInfo.UsesClusterEndpoints() {
|
|
zone := topologyLabels[v1.LabelTopologyZone]
|
|
topologyMode = topologyModeFromHints(svcInfo, endpoints, nodeName, zone)
|
|
clusterEndpoints = filterEndpoints(endpoints, func(ep Endpoint) bool {
|
|
if !ep.IsReady() {
|
|
return false
|
|
}
|
|
if !availableForTopology(ep, topologyMode, nodeName, zone) {
|
|
return false
|
|
}
|
|
return true
|
|
})
|
|
|
|
// If we didn't get any endpoints, try again using terminating endpoints.
|
|
// (Note that we would already have chosen to ignore topology if there
|
|
// were no ready endpoints for the given topology, so the problem at this
|
|
// point must be that there are no ready endpoints anywhere.)
|
|
if len(clusterEndpoints) == 0 {
|
|
clusterEndpoints = filterEndpoints(endpoints, func(ep Endpoint) bool {
|
|
if ep.IsServing() && ep.IsTerminating() {
|
|
return true
|
|
}
|
|
|
|
return false
|
|
})
|
|
}
|
|
|
|
// If there are any Ready endpoints anywhere in the cluster, we are
|
|
// guaranteed to get one in clusterEndpoints.
|
|
if len(clusterEndpoints) > 0 {
|
|
hasAnyEndpoints = true
|
|
}
|
|
}
|
|
|
|
if !svcInfo.UsesLocalEndpoints() {
|
|
allReachableEndpoints = clusterEndpoints
|
|
return
|
|
}
|
|
|
|
// Pre-scan the endpoints, to figure out which type of endpoint Local
|
|
// traffic policy will use, and also to see if there are any usable
|
|
// endpoints anywhere in the cluster.
|
|
var hasLocalReadyEndpoints, hasLocalServingTerminatingEndpoints bool
|
|
for _, ep := range endpoints {
|
|
if ep.IsReady() {
|
|
hasAnyEndpoints = true
|
|
if ep.IsLocal() {
|
|
hasLocalReadyEndpoints = true
|
|
}
|
|
} else if ep.IsServing() && ep.IsTerminating() {
|
|
hasAnyEndpoints = true
|
|
if ep.IsLocal() {
|
|
hasLocalServingTerminatingEndpoints = true
|
|
}
|
|
}
|
|
}
|
|
|
|
if hasLocalReadyEndpoints {
|
|
localEndpoints = filterEndpoints(endpoints, func(ep Endpoint) bool {
|
|
return ep.IsLocal() && ep.IsReady()
|
|
})
|
|
} else if hasLocalServingTerminatingEndpoints {
|
|
useServingTerminatingEndpoints = true
|
|
localEndpoints = filterEndpoints(endpoints, func(ep Endpoint) bool {
|
|
return ep.IsLocal() && ep.IsServing() && ep.IsTerminating()
|
|
})
|
|
}
|
|
|
|
if !svcInfo.UsesClusterEndpoints() {
|
|
allReachableEndpoints = localEndpoints
|
|
return
|
|
}
|
|
|
|
if topologyMode == "" && !useServingTerminatingEndpoints {
|
|
// !useServingTerminatingEndpoints means that localEndpoints contains only
|
|
// Ready endpoints. topologyMode=="" means that clusterEndpoints contains *every*
|
|
// Ready endpoint. So clusterEndpoints must be a superset of localEndpoints.
|
|
allReachableEndpoints = clusterEndpoints
|
|
return
|
|
}
|
|
|
|
// clusterEndpoints may contain remote endpoints that aren't in localEndpoints, while
|
|
// localEndpoints may contain terminating or topologically-unavailable local endpoints
|
|
// that aren't in clusterEndpoints. So we have to merge the two lists.
|
|
endpointsMap := make(map[string]Endpoint, len(clusterEndpoints)+len(localEndpoints))
|
|
for _, ep := range clusterEndpoints {
|
|
endpointsMap[ep.String()] = ep
|
|
}
|
|
for _, ep := range localEndpoints {
|
|
endpointsMap[ep.String()] = ep
|
|
}
|
|
allReachableEndpoints = make([]Endpoint, 0, len(endpointsMap))
|
|
for _, ep := range endpointsMap {
|
|
allReachableEndpoints = append(allReachableEndpoints, ep)
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// topologyModeFromHints returns a topology mode ("", "PreferSameZone", or
|
|
// "PreferSameNode") based on the Endpoint hints:
|
|
// - If the PreferSameTrafficDistribution feature gate is enabled, and every ready
|
|
// endpoint has a node hint, and at least one endpoint is hinted for this node, then
|
|
// it returns "PreferSameNode".
|
|
// - Otherwise, if every ready endpoint has a zone hint, and at least one endpoint is
|
|
// hinted for this node's zone, then it returns "PreferSameZone".
|
|
// - Otherwise it returns "" (meaning, no topology / default traffic distribution).
|
|
func topologyModeFromHints(svcInfo ServicePort, endpoints []Endpoint, nodeName, zone string) string {
|
|
if len(endpoints) == 0 {
|
|
// The code below assumes at least 1 endpoint; if there are no endpoints,
|
|
// there are no hints.
|
|
return ""
|
|
}
|
|
|
|
hasEndpointForNode := false
|
|
allEndpointsHaveNodeHints := true
|
|
hasEndpointForZone := false
|
|
allEndpointsHaveZoneHints := true
|
|
for _, endpoint := range endpoints {
|
|
if !endpoint.IsReady() {
|
|
continue
|
|
}
|
|
|
|
if endpoint.NodeHints().Len() == 0 {
|
|
allEndpointsHaveNodeHints = false
|
|
} else if endpoint.NodeHints().Has(nodeName) {
|
|
hasEndpointForNode = true
|
|
}
|
|
|
|
if endpoint.ZoneHints().Len() == 0 {
|
|
allEndpointsHaveZoneHints = false
|
|
} else if endpoint.ZoneHints().Has(zone) {
|
|
hasEndpointForZone = true
|
|
}
|
|
}
|
|
|
|
if utilfeature.DefaultFeatureGate.Enabled(features.PreferSameTrafficDistribution) {
|
|
if allEndpointsHaveNodeHints {
|
|
if hasEndpointForNode {
|
|
return v1.ServiceTrafficDistributionPreferSameNode
|
|
}
|
|
klog.V(2).InfoS("Ignoring same-node topology hints for service since no hints were provided for node", "service", svcInfo, "node", nodeName)
|
|
} else {
|
|
klog.V(7).InfoS("Ignoring same-node topology hints for service since one or more endpoints is missing a node hint", "service", svcInfo)
|
|
}
|
|
}
|
|
if allEndpointsHaveZoneHints {
|
|
if hasEndpointForZone {
|
|
return v1.ServiceTrafficDistributionPreferSameZone
|
|
}
|
|
if zone == "" {
|
|
klog.V(2).InfoS("Ignoring same-zone topology hints for service since node is missing label", "service", svcInfo, "label", v1.LabelTopologyZone)
|
|
} else {
|
|
klog.V(2).InfoS("Ignoring same-zone topology hints for service since no hints were provided for zone", "service", svcInfo, "zone", zone)
|
|
}
|
|
} else {
|
|
klog.V(7).InfoS("Ignoring same-zone topology hints for service since one or more endpoints is missing a zone hint", "service", svcInfo.String())
|
|
}
|
|
|
|
return ""
|
|
}
|
|
|
|
// availableForTopology checks if this endpoint is available for use on this node when
|
|
// using the given topologyMode. (Note that there's no fallback here; the fallback happens
|
|
// when deciding which mode to use, not when applying that decision.)
|
|
func availableForTopology(endpoint Endpoint, topologyMode, nodeName, zone string) bool {
|
|
switch topologyMode {
|
|
case "":
|
|
return true
|
|
case v1.ServiceTrafficDistributionPreferSameNode:
|
|
return endpoint.NodeHints().Has(nodeName)
|
|
case v1.ServiceTrafficDistributionPreferSameZone:
|
|
return endpoint.ZoneHints().Has(zone)
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
// filterEndpoints filters endpoints according to predicate
|
|
func filterEndpoints(endpoints []Endpoint, predicate func(Endpoint) bool) []Endpoint {
|
|
filteredEndpoints := make([]Endpoint, 0, len(endpoints))
|
|
|
|
for _, ep := range endpoints {
|
|
if predicate(ep) {
|
|
filteredEndpoints = append(filteredEndpoints, ep)
|
|
}
|
|
}
|
|
|
|
return filteredEndpoints
|
|
}
|