215 lines
7.1 KiB
Go
215 lines
7.1 KiB
Go
/*
|
|
Copyright 2022 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package proxy
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net"
|
|
"os"
|
|
"reflect"
|
|
"sync"
|
|
"time"
|
|
|
|
v1 "k8s.io/api/core/v1"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/fields"
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
"k8s.io/client-go/informers"
|
|
v1informers "k8s.io/client-go/informers/core/v1"
|
|
clientset "k8s.io/client-go/kubernetes"
|
|
corelisters "k8s.io/client-go/listers/core/v1"
|
|
"k8s.io/client-go/tools/cache"
|
|
"k8s.io/klog/v2"
|
|
utilnode "k8s.io/kubernetes/pkg/util/node"
|
|
)
|
|
|
|
// NodeManager handles the life cycle of kube-proxy based on the NodeIPs and PodCIDRs handles
|
|
// node watch events and crashes kube-proxy if there are any changes in NodeIPs or PodCIDRs.
|
|
// Note: It only crashes on change on PodCIDR when watchPodCIDRs is set to true.
|
|
type NodeManager struct {
|
|
nodeInformer v1informers.NodeInformer
|
|
nodeLister corelisters.NodeLister
|
|
exitFunc func(exitCode int)
|
|
watchPodCIDRs bool
|
|
|
|
// These are constant after construct time
|
|
nodeIPs []net.IP
|
|
podCIDRs []string
|
|
|
|
mu sync.Mutex
|
|
node *v1.Node
|
|
}
|
|
|
|
// NewNodeManager initializes node informer that selects for the given node, waits for cache sync
|
|
// and returns NodeManager after waiting some amount of time for the node object to exist
|
|
// and have NodeIPs (and PodCIDRs if watchPodCIDRs is true). Note: for backward compatibility,
|
|
// NewNodeManager doesn't return any error if it failed to retrieve NodeIPs and watchPodCIDRs
|
|
// is false.
|
|
func NewNodeManager(ctx context.Context, client clientset.Interface,
|
|
resyncInterval time.Duration, nodeName string, watchPodCIDRs bool,
|
|
) (*NodeManager, error) {
|
|
return newNodeManager(ctx, client, resyncInterval, nodeName, watchPodCIDRs, os.Exit, time.Second, 30*time.Second, 5*time.Minute)
|
|
}
|
|
|
|
// newNodeManager implements NewNodeManager with configurable exit function, poll interval and timeouts.
|
|
func newNodeManager(ctx context.Context, client clientset.Interface, resyncInterval time.Duration,
|
|
nodeName string, watchPodCIDRs bool, exitFunc func(int),
|
|
pollInterval, nodeIPsTimeout, podCIDRsTimeout time.Duration,
|
|
) (*NodeManager, error) {
|
|
// make an informer that selects for the given node
|
|
thisNodeInformerFactory := informers.NewSharedInformerFactoryWithOptions(client, resyncInterval,
|
|
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
|
|
options.FieldSelector = fields.OneTermEqualSelector("metadata.name", nodeName).String()
|
|
}))
|
|
nodeInformer := thisNodeInformerFactory.Core().V1().Nodes()
|
|
nodeLister := nodeInformer.Lister()
|
|
|
|
// initialize the informer and wait for cache sync
|
|
thisNodeInformerFactory.Start(wait.NeverStop)
|
|
if !cache.WaitForNamedCacheSyncWithContext(ctx, nodeInformer.Informer().HasSynced) {
|
|
return nil, fmt.Errorf("can not sync node informer")
|
|
}
|
|
|
|
node, nodeIPs, podCIDRs := getNodeInfo(nodeLister, nodeName)
|
|
|
|
if len(nodeIPs) == 0 {
|
|
// wait for the node object to exist and have NodeIPs.
|
|
ctx, cancel := context.WithTimeout(ctx, nodeIPsTimeout)
|
|
defer cancel()
|
|
_ = wait.PollUntilContextCancel(ctx, pollInterval, false, func(context.Context) (bool, error) {
|
|
node, nodeIPs, podCIDRs = getNodeInfo(nodeLister, nodeName)
|
|
return len(nodeIPs) != 0, nil
|
|
})
|
|
}
|
|
|
|
if watchPodCIDRs && len(podCIDRs) == 0 {
|
|
// wait some additional time for the PodCIDRs.
|
|
ctx, cancel := context.WithTimeout(ctx, podCIDRsTimeout)
|
|
defer cancel()
|
|
_ = wait.PollUntilContextCancel(ctx, pollInterval, false, func(context.Context) (bool, error) {
|
|
node, nodeIPs, podCIDRs = getNodeInfo(nodeLister, nodeName)
|
|
return len(podCIDRs) != 0, nil
|
|
})
|
|
|
|
if len(podCIDRs) == 0 {
|
|
if node == nil {
|
|
return nil, fmt.Errorf("timeout waiting for node %q to exist", nodeName)
|
|
} else {
|
|
return nil, fmt.Errorf("timeout waiting for PodCIDR allocation on node %q", nodeName)
|
|
}
|
|
}
|
|
}
|
|
|
|
// For backward-compatibility, we keep going even if we didn't find a node (in
|
|
// non-watchPodCIDRs mode) or it didn't have IPs.
|
|
if node == nil {
|
|
klog.FromContext(ctx).Error(nil, "Timed out waiting for node %q to exist", nodeName)
|
|
} else if len(nodeIPs) == 0 {
|
|
klog.FromContext(ctx).Error(nil, "Timed out waiting for node %q to be assigned IPs", nodeName)
|
|
}
|
|
|
|
return &NodeManager{
|
|
nodeInformer: nodeInformer,
|
|
nodeLister: nodeLister,
|
|
exitFunc: exitFunc,
|
|
watchPodCIDRs: watchPodCIDRs,
|
|
|
|
node: node,
|
|
nodeIPs: nodeIPs,
|
|
podCIDRs: podCIDRs,
|
|
}, nil
|
|
}
|
|
|
|
func getNodeInfo(nodeLister corelisters.NodeLister, nodeName string) (*v1.Node, []net.IP, []string) {
|
|
node, _ := nodeLister.Get(nodeName)
|
|
if node == nil {
|
|
return nil, nil, nil
|
|
}
|
|
nodeIPs, _ := utilnode.GetNodeHostIPs(node)
|
|
return node, nodeIPs, node.Spec.PodCIDRs
|
|
}
|
|
|
|
// NodeIPs returns the NodeIPs polled in NewNodeManager(). (This may be empty if
|
|
// NewNodeManager timed out without getting any IPs.)
|
|
func (n *NodeManager) NodeIPs() []net.IP {
|
|
return n.nodeIPs
|
|
}
|
|
|
|
// PodCIDRs returns the PodCIDRs polled in NewNodeManager().
|
|
func (n *NodeManager) PodCIDRs() []string {
|
|
return n.podCIDRs
|
|
}
|
|
|
|
// Node returns a copy of the latest node object, or nil if the Node has not yet been seen.
|
|
func (n *NodeManager) Node() *v1.Node {
|
|
n.mu.Lock()
|
|
defer n.mu.Unlock()
|
|
|
|
if n.node == nil {
|
|
return nil
|
|
}
|
|
return n.node.DeepCopy()
|
|
}
|
|
|
|
// NodeInformer returns the NodeInformer.
|
|
func (n *NodeManager) NodeInformer() v1informers.NodeInformer {
|
|
return n.nodeInformer
|
|
}
|
|
|
|
// OnNodeChange is a handler for Node creation and update.
|
|
func (n *NodeManager) OnNodeChange(node *v1.Node) {
|
|
// update the node object
|
|
n.mu.Lock()
|
|
n.node = node
|
|
n.mu.Unlock()
|
|
|
|
// We exit whenever there is a change in PodCIDRs detected initially, and PodCIDRs received
|
|
// on node watch event if the node manager is configured with watchPodCIDRs.
|
|
if n.watchPodCIDRs {
|
|
if !reflect.DeepEqual(n.podCIDRs, node.Spec.PodCIDRs) {
|
|
klog.InfoS("PodCIDRs changed for the node",
|
|
"node", klog.KObj(node), "newPodCIDRs", node.Spec.PodCIDRs, "oldPodCIDRs", n.podCIDRs)
|
|
klog.Flush()
|
|
n.exitFunc(1)
|
|
}
|
|
}
|
|
|
|
nodeIPs, _ := utilnode.GetNodeHostIPs(node)
|
|
|
|
// We exit whenever there is a change in NodeIPs detected initially, and NodeIPs received
|
|
// on node watch event.
|
|
if !reflect.DeepEqual(n.nodeIPs, nodeIPs) {
|
|
klog.InfoS("NodeIPs changed for the node",
|
|
"node", klog.KObj(node), "newNodeIPs", nodeIPs, "oldNodeIPs", n.nodeIPs)
|
|
// FIXME: exit
|
|
// klog.Flush()
|
|
// n.exitFunc(1)
|
|
}
|
|
}
|
|
|
|
// OnNodeDelete is a handler for Node deletes.
|
|
func (n *NodeManager) OnNodeDelete(node *v1.Node) {
|
|
klog.InfoS("Node is being deleted", "node", klog.KObj(node))
|
|
// FIXME: exit
|
|
// klog.Flush()
|
|
// n.exitFunc(1)
|
|
}
|
|
|
|
// OnNodeSynced is called after the cache is synced and all pre-existing Nodes have been reported
|
|
func (n *NodeManager) OnNodeSynced() {}
|