2015-09-03 08:02:22 +08:00
/ *
Copyright 2015 The Kubernetes Authors All rights reserved .
Licensed under the Apache License , Version 2.0 ( the "License" ) ;
you may not use this file except in compliance with the License .
You may obtain a copy of the License at
http : //www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing , software
distributed under the License is distributed on an "AS IS" BASIS ,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
See the License for the specific language governing permissions and
limitations under the License .
* /
package deployment
import (
"fmt"
2016-01-15 10:04:05 +08:00
"reflect"
2016-01-28 14:13:07 +08:00
"sort"
2016-01-13 09:52:18 +08:00
"strconv"
2015-09-03 08:02:22 +08:00
"time"
2015-09-29 23:09:33 +08:00
"github.com/golang/glog"
2015-09-03 08:02:22 +08:00
"k8s.io/kubernetes/pkg/api"
2016-05-05 08:37:03 +08:00
"k8s.io/kubernetes/pkg/api/annotations"
2016-01-28 14:13:07 +08:00
"k8s.io/kubernetes/pkg/api/errors"
2016-03-15 03:07:56 +08:00
"k8s.io/kubernetes/pkg/api/unversioned"
2015-10-10 06:04:41 +08:00
"k8s.io/kubernetes/pkg/apis/extensions"
2015-09-21 15:06:45 +08:00
"k8s.io/kubernetes/pkg/client/cache"
2016-02-06 05:58:03 +08:00
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
2016-03-30 05:52:43 +08:00
unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
2015-09-30 07:55:06 +08:00
"k8s.io/kubernetes/pkg/client/record"
2015-09-21 15:06:45 +08:00
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/runtime"
2015-10-20 05:08:35 +08:00
deploymentutil "k8s.io/kubernetes/pkg/util/deployment"
2016-01-28 14:13:07 +08:00
utilerrors "k8s.io/kubernetes/pkg/util/errors"
2016-02-09 11:31:58 +08:00
"k8s.io/kubernetes/pkg/util/integer"
2016-01-13 07:37:51 +08:00
labelsutil "k8s.io/kubernetes/pkg/util/labels"
2016-04-14 02:38:32 +08:00
"k8s.io/kubernetes/pkg/util/metrics"
2016-01-13 07:37:51 +08:00
podutil "k8s.io/kubernetes/pkg/util/pod"
2016-03-15 03:07:56 +08:00
rsutil "k8s.io/kubernetes/pkg/util/replicaset"
2016-01-15 15:32:10 +08:00
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
2016-02-02 18:57:06 +08:00
"k8s.io/kubernetes/pkg/util/wait"
2015-09-21 15:06:45 +08:00
"k8s.io/kubernetes/pkg/util/workqueue"
"k8s.io/kubernetes/pkg/watch"
)
const (
2015-11-19 07:12:11 +08:00
// FullDeploymentResyncPeriod means we'll attempt to recompute the required replicas
2016-02-28 10:13:32 +08:00
// of all deployments.
2015-11-19 07:12:11 +08:00
// This recomputation happens based on contents in the local caches.
2015-09-21 15:06:45 +08:00
FullDeploymentResyncPeriod = 30 * time . Second
2016-02-12 03:31:47 +08:00
// We must avoid creating new replica set / counting pods until the replica set / pods store has synced.
// If it hasn't synced, to avoid a hot loop, we'll wait this long between checks.
StoreSyncedPollPeriod = 100 * time . Millisecond
2015-09-03 08:02:22 +08:00
)
2015-11-19 07:12:11 +08:00
// DeploymentController is responsible for synchronizing Deployment objects stored
2016-01-20 08:40:18 +08:00
// in the system with actual running replica sets and pods.
2015-09-03 08:02:22 +08:00
type DeploymentController struct {
2016-01-15 13:00:58 +08:00
client clientset . Interface
2015-09-30 07:55:06 +08:00
eventRecorder record . EventRecorder
2015-09-21 15:06:45 +08:00
// To allow injection of syncDeployment for testing.
syncHandler func ( dKey string ) error
// A store of deployments, populated by the dController
dStore cache . StoreToDeploymentLister
// Watches changes to all deployments
dController * framework . Controller
2016-01-20 08:40:18 +08:00
// A store of ReplicaSets, populated by the rsController
rsStore cache . StoreToReplicaSetLister
// Watches changes to all ReplicaSets
rsController * framework . Controller
// rsStoreSynced returns true if the ReplicaSet store has been synced at least once.
2015-09-21 15:06:45 +08:00
// Added as a member to the struct to allow injection for testing.
2016-01-20 08:40:18 +08:00
rsStoreSynced func ( ) bool
2015-09-21 15:06:45 +08:00
// A store of pods, populated by the podController
podStore cache . StoreToPodLister
// Watches changes to all pods
podController * framework . Controller
// podStoreSynced returns true if the pod store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
podStoreSynced func ( ) bool
// Deployments that need to be synced
queue * workqueue . Type
2015-09-03 08:02:22 +08:00
}
2015-11-19 07:12:11 +08:00
// NewDeploymentController creates a new DeploymentController.
2016-01-15 13:00:58 +08:00
func NewDeploymentController ( client clientset . Interface , resyncPeriod controller . ResyncPeriodFunc ) * DeploymentController {
2015-09-30 07:55:06 +08:00
eventBroadcaster := record . NewBroadcaster ( )
eventBroadcaster . StartLogging ( glog . Infof )
2016-01-15 13:00:58 +08:00
// TODO: remove the wrapper when every clients have moved to use the clientset.
2016-03-24 07:45:24 +08:00
eventBroadcaster . StartRecordingToSink ( & unversionedcore . EventSinkImpl { Interface : client . Core ( ) . Events ( "" ) } )
2015-09-30 07:55:06 +08:00
2016-04-14 02:38:32 +08:00
if client != nil && client . Core ( ) . GetRESTClient ( ) . GetRateLimiter ( ) != nil {
metrics . RegisterMetricAndTrackRateLimiterUsage ( "deployment_controller" , client . Core ( ) . GetRESTClient ( ) . GetRateLimiter ( ) )
}
2015-09-21 15:06:45 +08:00
dc := & DeploymentController {
2016-02-28 10:13:32 +08:00
client : client ,
eventRecorder : eventBroadcaster . NewRecorder ( api . EventSource { Component : "deployment-controller" } ) ,
queue : workqueue . New ( ) ,
2015-09-03 08:02:22 +08:00
}
2015-09-21 15:06:45 +08:00
dc . dStore . Store , dc . dController = framework . NewInformer (
& cache . ListWatch {
2015-12-04 08:00:13 +08:00
ListFunc : func ( options api . ListOptions ) ( runtime . Object , error ) {
2016-01-15 13:00:58 +08:00
return dc . client . Extensions ( ) . Deployments ( api . NamespaceAll ) . List ( options )
2015-09-21 15:06:45 +08:00
} ,
2015-12-04 08:00:13 +08:00
WatchFunc : func ( options api . ListOptions ) ( watch . Interface , error ) {
2016-01-15 13:00:58 +08:00
return dc . client . Extensions ( ) . Deployments ( api . NamespaceAll ) . Watch ( options )
2015-09-21 15:06:45 +08:00
} ,
} ,
& extensions . Deployment { } ,
FullDeploymentResyncPeriod ,
framework . ResourceEventHandlerFuncs {
2016-03-01 10:13:01 +08:00
AddFunc : dc . addDeploymentNotification ,
UpdateFunc : dc . updateDeploymentNotification ,
2015-09-21 15:06:45 +08:00
// This will enter the sync loop and no-op, because the deployment has been deleted from the store.
2016-03-01 10:13:01 +08:00
DeleteFunc : dc . deleteDeploymentNotification ,
2015-09-21 15:06:45 +08:00
} ,
)
2016-01-20 08:40:18 +08:00
dc . rsStore . Store , dc . rsController = framework . NewInformer (
2015-09-21 15:06:45 +08:00
& cache . ListWatch {
2015-12-04 08:00:13 +08:00
ListFunc : func ( options api . ListOptions ) ( runtime . Object , error ) {
2016-01-20 08:40:18 +08:00
return dc . client . Extensions ( ) . ReplicaSets ( api . NamespaceAll ) . List ( options )
2015-09-21 15:06:45 +08:00
} ,
2015-12-04 08:00:13 +08:00
WatchFunc : func ( options api . ListOptions ) ( watch . Interface , error ) {
2016-01-20 08:40:18 +08:00
return dc . client . Extensions ( ) . ReplicaSets ( api . NamespaceAll ) . Watch ( options )
2015-09-21 15:06:45 +08:00
} ,
} ,
2016-01-20 08:40:18 +08:00
& extensions . ReplicaSet { } ,
2015-11-19 07:12:11 +08:00
resyncPeriod ( ) ,
2015-09-21 15:06:45 +08:00
framework . ResourceEventHandlerFuncs {
2016-01-20 08:40:18 +08:00
AddFunc : dc . addReplicaSet ,
UpdateFunc : dc . updateReplicaSet ,
DeleteFunc : dc . deleteReplicaSet ,
2015-09-21 15:06:45 +08:00
} ,
)
2016-04-07 20:15:21 +08:00
dc . podStore . Indexer , dc . podController = framework . NewIndexerInformer (
2015-09-21 15:06:45 +08:00
& cache . ListWatch {
2015-12-04 08:00:13 +08:00
ListFunc : func ( options api . ListOptions ) ( runtime . Object , error ) {
2016-02-04 05:21:05 +08:00
return dc . client . Core ( ) . Pods ( api . NamespaceAll ) . List ( options )
2015-09-21 15:06:45 +08:00
} ,
2015-12-04 08:00:13 +08:00
WatchFunc : func ( options api . ListOptions ) ( watch . Interface , error ) {
2016-02-04 05:21:05 +08:00
return dc . client . Core ( ) . Pods ( api . NamespaceAll ) . Watch ( options )
2015-09-21 15:06:45 +08:00
} ,
} ,
& api . Pod { } ,
2015-11-19 07:12:11 +08:00
resyncPeriod ( ) ,
framework . ResourceEventHandlerFuncs {
2016-02-28 10:13:32 +08:00
AddFunc : dc . addPod ,
2015-11-19 07:12:11 +08:00
UpdateFunc : dc . updatePod ,
2015-12-04 08:00:13 +08:00
DeleteFunc : dc . deletePod ,
2015-11-19 07:12:11 +08:00
} ,
2016-04-07 20:15:21 +08:00
cache . Indexers { cache . NamespaceIndex : cache . MetaNamespaceIndexFunc } ,
2015-09-21 15:06:45 +08:00
)
dc . syncHandler = dc . syncDeployment
2016-01-20 08:40:18 +08:00
dc . rsStoreSynced = dc . rsController . HasSynced
2015-11-19 07:12:11 +08:00
dc . podStoreSynced = dc . podController . HasSynced
2015-09-21 15:06:45 +08:00
return dc
2015-09-03 08:02:22 +08:00
}
2015-11-19 07:12:11 +08:00
// Run begins watching and syncing.
func ( dc * DeploymentController ) Run ( workers int , stopCh <- chan struct { } ) {
2016-01-15 15:32:10 +08:00
defer utilruntime . HandleCrash ( )
2015-11-19 07:12:11 +08:00
go dc . dController . Run ( stopCh )
2016-01-20 08:40:18 +08:00
go dc . rsController . Run ( stopCh )
2015-11-19 07:12:11 +08:00
go dc . podController . Run ( stopCh )
for i := 0 ; i < workers ; i ++ {
2016-02-02 18:57:06 +08:00
go wait . Until ( dc . worker , time . Second , stopCh )
2015-11-19 07:12:11 +08:00
}
<- stopCh
glog . Infof ( "Shutting down deployment controller" )
dc . queue . ShutDown ( )
}
2016-03-01 10:13:01 +08:00
func ( dc * DeploymentController ) addDeploymentNotification ( obj interface { } ) {
d := obj . ( * extensions . Deployment )
glog . V ( 4 ) . Infof ( "Adding deployment %s" , d . Name )
dc . enqueueDeployment ( d )
}
func ( dc * DeploymentController ) updateDeploymentNotification ( old , cur interface { } ) {
oldD := old . ( * extensions . Deployment )
glog . V ( 4 ) . Infof ( "Updating deployment %s" , oldD . Name )
// Resync on deployment object relist.
dc . enqueueDeployment ( cur . ( * extensions . Deployment ) )
}
func ( dc * DeploymentController ) deleteDeploymentNotification ( obj interface { } ) {
d , ok := obj . ( * extensions . Deployment )
if ! ok {
tombstone , ok := obj . ( cache . DeletedFinalStateUnknown )
if ! ok {
glog . Errorf ( "Couldn't get object from tombstone %+v" , obj )
return
}
d , ok = tombstone . Obj . ( * extensions . Deployment )
if ! ok {
glog . Errorf ( "Tombstone contained object that is not a Deployment %+v" , obj )
return
}
}
glog . V ( 4 ) . Infof ( "Deleting deployment %s" , d . Name )
dc . enqueueDeployment ( d )
}
2016-01-20 08:40:18 +08:00
// addReplicaSet enqueues the deployment that manages a ReplicaSet when the ReplicaSet is created.
func ( dc * DeploymentController ) addReplicaSet ( obj interface { } ) {
rs := obj . ( * extensions . ReplicaSet )
glog . V ( 4 ) . Infof ( "ReplicaSet %s added." , rs . Name )
if d := dc . getDeploymentForReplicaSet ( rs ) ; d != nil {
2015-09-21 15:06:45 +08:00
dc . enqueueDeployment ( d )
}
2015-09-03 08:02:22 +08:00
}
2016-01-20 08:40:18 +08:00
// getDeploymentForReplicaSet returns the deployment managing the given ReplicaSet.
// TODO: Surface that we are ignoring multiple deployments for a given ReplicaSet.
func ( dc * DeploymentController ) getDeploymentForReplicaSet ( rs * extensions . ReplicaSet ) * extensions . Deployment {
deployments , err := dc . dStore . GetDeploymentsForReplicaSet ( rs )
2015-11-19 07:12:11 +08:00
if err != nil || len ( deployments ) == 0 {
2016-01-20 08:40:18 +08:00
glog . V ( 4 ) . Infof ( "Error: %v. No deployment found for ReplicaSet %v, deployment controller will avoid syncing." , err , rs . Name )
2015-09-21 15:06:45 +08:00
return nil
}
2016-01-20 08:40:18 +08:00
// Because all ReplicaSet's belonging to a deployment should have a unique label key,
2015-09-21 15:06:45 +08:00
// there should never be more than one deployment returned by the above method.
// If that happens we should probably dynamically repair the situation by ultimately
// trying to clean up one of the controllers, for now we just return one of the two,
// likely randomly.
return & deployments [ 0 ]
}
2016-01-20 08:40:18 +08:00
// updateReplicaSet figures out what deployment(s) manage a ReplicaSet when the ReplicaSet
// is updated and wake them up. If the anything of the ReplicaSets have changed, we need to
// awaken both the old and new deployments. old and cur must be *extensions.ReplicaSet
// types.
func ( dc * DeploymentController ) updateReplicaSet ( old , cur interface { } ) {
2015-09-21 15:06:45 +08:00
if api . Semantic . DeepEqual ( old , cur ) {
// A periodic relist will send update events for all known controllers.
return
2015-09-03 08:02:22 +08:00
}
2015-09-21 15:06:45 +08:00
// TODO: Write a unittest for this case
2016-01-20 08:40:18 +08:00
curRS := cur . ( * extensions . ReplicaSet )
glog . V ( 4 ) . Infof ( "ReplicaSet %s updated." , curRS . Name )
if d := dc . getDeploymentForReplicaSet ( curRS ) ; d != nil {
2015-09-21 15:06:45 +08:00
dc . enqueueDeployment ( d )
}
// A number of things could affect the old deployment: labels changing,
// pod template changing, etc.
2016-01-20 08:40:18 +08:00
oldRS := old . ( * extensions . ReplicaSet )
if ! api . Semantic . DeepEqual ( oldRS , curRS ) {
if oldD := dc . getDeploymentForReplicaSet ( oldRS ) ; oldD != nil {
2015-09-21 15:06:45 +08:00
dc . enqueueDeployment ( oldD )
}
}
}
2016-01-20 08:40:18 +08:00
// deleteReplicaSet enqueues the deployment that manages a ReplicaSet when
// the ReplicaSet is deleted. obj could be an *extensions.ReplicaSet, or
// a DeletionFinalStateUnknown marker item.
func ( dc * DeploymentController ) deleteReplicaSet ( obj interface { } ) {
rs , ok := obj . ( * extensions . ReplicaSet )
2015-09-21 15:06:45 +08:00
// When a delete is dropped, the relist will notice a pod in the store not
// in the list, leading to the insertion of a tombstone object which contains
2016-01-20 08:40:18 +08:00
// the deleted key/value. Note that this value might be stale. If the ReplicaSet
2015-09-21 15:06:45 +08:00
// changed labels the new deployment will not be woken up till the periodic resync.
if ! ok {
tombstone , ok := obj . ( cache . DeletedFinalStateUnknown )
if ! ok {
2016-01-20 08:40:18 +08:00
glog . Errorf ( "Couldn't get object from tombstone %+v, could take up to %v before a deployment recreates/updates replicasets" , obj , FullDeploymentResyncPeriod )
2015-09-21 15:06:45 +08:00
return
}
2016-01-20 08:40:18 +08:00
rs , ok = tombstone . Obj . ( * extensions . ReplicaSet )
2015-09-21 15:06:45 +08:00
if ! ok {
2016-01-20 08:40:18 +08:00
glog . Errorf ( "Tombstone contained object that is not a ReplicaSet %+v, could take up to %v before a deployment recreates/updates replicasets" , obj , FullDeploymentResyncPeriod )
2015-09-21 15:06:45 +08:00
return
2015-09-03 08:02:22 +08:00
}
}
2016-01-20 08:40:18 +08:00
glog . V ( 4 ) . Infof ( "ReplicaSet %s deleted." , rs . Name )
if d := dc . getDeploymentForReplicaSet ( rs ) ; d != nil {
2015-09-21 15:06:45 +08:00
dc . enqueueDeployment ( d )
}
}
2016-01-20 08:40:18 +08:00
// getDeploymentForPod returns the deployment managing the ReplicaSet that manages the given Pod.
2015-11-19 07:12:11 +08:00
// TODO: Surface that we are ignoring multiple deployments for a given Pod.
func ( dc * DeploymentController ) getDeploymentForPod ( pod * api . Pod ) * extensions . Deployment {
2016-01-20 08:40:18 +08:00
rss , err := dc . rsStore . GetPodReplicaSets ( pod )
2015-11-19 07:12:11 +08:00
if err != nil {
2016-01-20 08:40:18 +08:00
glog . V ( 4 ) . Infof ( "Error: %v. No ReplicaSets found for pod %v, deployment controller will avoid syncing." , err , pod . Name )
2015-11-19 07:12:11 +08:00
return nil
}
2016-01-20 08:40:18 +08:00
for _ , rs := range rss {
deployments , err := dc . dStore . GetDeploymentsForReplicaSet ( & rs )
2015-11-19 07:12:11 +08:00
if err == nil && len ( deployments ) > 0 {
return & deployments [ 0 ]
}
}
glog . V ( 4 ) . Infof ( "No deployments found for pod %v, deployment controller will avoid syncing." , pod . Name )
return nil
}
2016-02-28 10:13:32 +08:00
// When a pod is created, ensure its controller syncs
2016-02-25 14:40:14 +08:00
func ( dc * DeploymentController ) addPod ( obj interface { } ) {
pod , ok := obj . ( * api . Pod )
if ! ok {
return
}
2016-03-05 05:57:34 +08:00
glog . V ( 4 ) . Infof ( "Pod %s created: %+v." , pod . Name , pod )
2016-02-25 14:40:14 +08:00
if d := dc . getDeploymentForPod ( pod ) ; d != nil {
dc . enqueueDeployment ( d )
}
}
2016-01-20 08:40:18 +08:00
// updatePod figures out what deployment(s) manage the ReplicaSet that manages the Pod when the Pod
2015-11-19 07:12:11 +08:00
// is updated and wake them up. If anything of the Pods have changed, we need to awaken both
// the old and new deployments. old and cur must be *api.Pod types.
func ( dc * DeploymentController ) updatePod ( old , cur interface { } ) {
if api . Semantic . DeepEqual ( old , cur ) {
return
}
curPod := cur . ( * api . Pod )
2016-03-05 05:57:34 +08:00
oldPod := old . ( * api . Pod )
2016-06-01 06:59:46 +08:00
glog . V ( 4 ) . Infof ( "Pod %s updated %#v -> %#v." , curPod . Name , oldPod , curPod )
2015-11-19 07:12:11 +08:00
if d := dc . getDeploymentForPod ( curPod ) ; d != nil {
dc . enqueueDeployment ( d )
}
if ! api . Semantic . DeepEqual ( oldPod , curPod ) {
if oldD := dc . getDeploymentForPod ( oldPod ) ; oldD != nil {
dc . enqueueDeployment ( oldD )
}
}
}
2016-02-28 10:13:32 +08:00
// When a pod is deleted, ensure its controller syncs.
2015-12-04 08:00:13 +08:00
// obj could be an *api.Pod, or a DeletionFinalStateUnknown marker item.
func ( dc * DeploymentController ) deletePod ( obj interface { } ) {
pod , ok := obj . ( * api . Pod )
// When a delete is dropped, the relist will notice a pod in the store not
// in the list, leading to the insertion of a tombstone object which contains
// the deleted key/value. Note that this value might be stale. If the pod
2016-01-20 08:40:18 +08:00
// changed labels the new ReplicaSet will not be woken up till the periodic
// resync.
2015-12-04 08:00:13 +08:00
if ! ok {
tombstone , ok := obj . ( cache . DeletedFinalStateUnknown )
if ! ok {
2016-02-25 06:01:48 +08:00
glog . Errorf ( "Couldn't get object from tombstone %+v" , obj )
2015-12-04 08:00:13 +08:00
return
}
pod , ok = tombstone . Obj . ( * api . Pod )
if ! ok {
2016-02-25 06:01:48 +08:00
glog . Errorf ( "Tombstone contained object that is not a pod %+v" , obj )
2015-12-04 08:00:13 +08:00
return
}
}
2016-03-05 05:57:34 +08:00
glog . V ( 4 ) . Infof ( "Pod %s deleted: %+v." , pod . Name , pod )
2015-12-04 08:00:13 +08:00
if d := dc . getDeploymentForPod ( pod ) ; d != nil {
2016-02-25 14:40:14 +08:00
dc . enqueueDeployment ( d )
2015-12-04 08:00:13 +08:00
}
}
2016-02-28 10:13:32 +08:00
func ( dc * DeploymentController ) enqueueDeployment ( deployment * extensions . Deployment ) {
key , err := controller . KeyFunc ( deployment )
2015-09-21 15:06:45 +08:00
if err != nil {
2016-02-28 10:13:32 +08:00
glog . Errorf ( "Couldn't get key for object %+v: %v" , deployment , err )
2015-09-21 15:06:45 +08:00
return
}
// TODO: Handle overlapping deployments better. Either disallow them at admission time or
2016-01-20 08:40:18 +08:00
// deterministically avoid syncing deployments that fight over ReplicaSet's. Currently, we
// only ensure that the same deployment is synced for a given ReplicaSet. When we
// periodically relist all deployments there will still be some ReplicaSet instability. One
// way to handle this is by querying the store for all deployments that this deployment
// overlaps, as well as all deployments that overlap this deployments, and sorting them.
2015-09-21 15:06:45 +08:00
dc . queue . Add ( key )
2015-09-03 08:02:22 +08:00
}
2015-09-21 15:06:45 +08:00
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func ( dc * DeploymentController ) worker ( ) {
for {
func ( ) {
key , quit := dc . queue . Get ( )
if quit {
return
}
defer dc . queue . Done ( key )
err := dc . syncHandler ( key . ( string ) )
if err != nil {
2016-02-25 14:40:14 +08:00
glog . Errorf ( "Error syncing deployment %v: %v" , key , err )
2015-09-21 15:06:45 +08:00
}
} ( )
}
}
2015-11-19 07:12:11 +08:00
// syncDeployment will sync the deployment with the given key.
// This function is not meant to be invoked concurrently with the same key.
2015-09-21 15:06:45 +08:00
func ( dc * DeploymentController ) syncDeployment ( key string ) error {
startTime := time . Now ( )
defer func ( ) {
glog . V ( 4 ) . Infof ( "Finished syncing deployment %q (%v)" , key , time . Now ( ) . Sub ( startTime ) )
} ( )
2016-02-13 11:47:33 +08:00
if ! dc . rsStoreSynced ( ) || ! dc . podStoreSynced ( ) {
// Sleep so we give the replica set / pod reflector goroutine a chance to run.
time . Sleep ( StoreSyncedPollPeriod )
glog . Infof ( "Waiting for replica set / pod controller to sync, requeuing deployment %s" , key )
dc . queue . Add ( key )
return nil
}
2015-09-21 15:06:45 +08:00
obj , exists , err := dc . dStore . Store . GetByKey ( key )
if err != nil {
glog . Infof ( "Unable to retrieve deployment %v from store: %v" , key , err )
dc . queue . Add ( key )
return err
}
2015-11-19 07:12:11 +08:00
if ! exists {
glog . Infof ( "Deployment has been deleted %v" , key )
return nil
}
2016-02-25 14:40:14 +08:00
2016-02-28 10:13:32 +08:00
d := obj . ( * extensions . Deployment )
2016-03-25 15:40:12 +08:00
everything := unversioned . LabelSelector { }
if reflect . DeepEqual ( d . Spec . Selector , & everything ) {
dc . eventRecorder . Eventf ( d , api . EventTypeWarning , "SelectingAll" , "This deployment is selecting all pods. A non-empty selector is required." )
return nil
}
2016-02-25 14:40:14 +08:00
2016-01-21 18:12:58 +08:00
if d . Spec . Paused {
2016-01-29 00:35:14 +08:00
return dc . sync ( d )
2016-01-21 18:12:58 +08:00
}
2016-01-29 00:35:14 +08:00
2016-01-15 10:04:05 +08:00
if d . Spec . RollbackTo != nil {
revision := d . Spec . RollbackTo . Revision
2016-02-28 10:13:32 +08:00
if _ , err = dc . rollback ( d , & revision ) ; err != nil {
2016-01-15 10:04:05 +08:00
return err
}
}
2016-01-29 00:35:14 +08:00
if dc . isScalingEvent ( d ) {
return dc . sync ( d )
}
2015-09-21 15:06:45 +08:00
switch d . Spec . Strategy . Type {
2015-10-10 06:49:10 +08:00
case extensions . RecreateDeploymentStrategyType :
2016-01-29 00:35:14 +08:00
return dc . rolloutRecreate ( d )
2015-10-10 06:49:10 +08:00
case extensions . RollingUpdateDeploymentStrategyType :
2016-01-29 00:35:14 +08:00
return dc . rolloutRolling ( d )
2015-09-18 03:41:06 +08:00
}
2015-11-19 07:12:11 +08:00
return fmt . Errorf ( "unexpected deployment strategy type: %s" , d . Spec . Strategy . Type )
2015-09-18 03:41:06 +08:00
}
2016-01-29 00:35:14 +08:00
// sync is responsible for reconciling deployments on scaling events or when they
// are paused.
func ( dc * DeploymentController ) sync ( deployment * extensions . Deployment ) error {
2016-03-03 09:51:35 +08:00
newRS , oldRSs , err := dc . getAllReplicaSetsAndSyncRevision ( deployment , false )
2016-02-24 16:07:46 +08:00
if err != nil {
return err
}
2016-01-29 00:35:14 +08:00
if err := dc . scale ( deployment , newRS , oldRSs ) ; err != nil {
// If we get an error while trying to scale, the deployment will be requeued
// so we can abort this resync
return err
}
dc . cleanupDeployment ( oldRSs , deployment )
2016-02-24 16:07:46 +08:00
2016-01-29 00:35:14 +08:00
allRSs := append ( oldRSs , newRS )
2016-02-28 10:13:32 +08:00
return dc . syncDeploymentStatus ( allRSs , newRS , deployment )
2016-02-24 16:07:46 +08:00
}
2016-01-29 00:35:14 +08:00
// scale scales proportionally in order to mitigate risk. Otherwise, scaling up can increase the size
// of the new replica set and scaling down can decrease the sizes of the old ones, both of which would
// have the effect of hastening the rollout progress, which could produce a higher proportion of unavailable
// replicas in the event of a problem with the rolled out template. Should run only on scaling events or
// when a deployment is paused and not during the normal rollout process.
func ( dc * DeploymentController ) scale ( deployment * extensions . Deployment , newRS * extensions . ReplicaSet , oldRSs [ ] * extensions . ReplicaSet ) error {
// If there is only one active replica set then we should scale that up to the full count of the
// deployment. If there is no active replica set, then we should scale up the newest replica set.
if activeOrLatest := findActiveOrLatest ( newRS , oldRSs ) ; activeOrLatest != nil {
if activeOrLatest . Spec . Replicas == deployment . Spec . Replicas {
return nil
}
_ , _ , err := dc . scaleReplicaSetAndRecordEvent ( activeOrLatest , deployment . Spec . Replicas , deployment )
return err
}
// If the new replica set is saturated, old replica sets should be fully scaled down.
// This case handles replica set adoption during a saturated new replica set.
if deploymentutil . IsSaturated ( deployment , newRS ) {
for _ , old := range controller . FilterActiveReplicaSets ( oldRSs ) {
if _ , _ , err := dc . scaleReplicaSetAndRecordEvent ( old , 0 , deployment ) ; err != nil {
return err
}
}
return nil
}
// There are old replica sets with pods and the new replica set is not saturated.
// We need to proportionally scale all replica sets (new and old) in case of a
// rolling deployment.
if deploymentutil . IsRollingUpdate ( deployment ) {
allRSs := controller . FilterActiveReplicaSets ( append ( oldRSs , newRS ) )
allRSsReplicas := deploymentutil . GetReplicaCountForReplicaSets ( allRSs )
allowedSize := int32 ( 0 )
if deployment . Spec . Replicas > 0 {
allowedSize = deployment . Spec . Replicas + maxSurge ( * deployment )
}
// Number of additional replicas that can be either added or removed from the total
// replicas count. These replicas should be distributed proportionally to the active
// replica sets.
deploymentReplicasToAdd := allowedSize - allRSsReplicas
// The additional replicas should be distributed proportionally amongst the active
// replica sets from the larger to the smaller in size replica set. Scaling direction
// drives what happens in case we are trying to scale replica sets of the same size.
// In such a case when scaling up, we should scale up newer replica sets first, and
// when scaling down, we should scale down older replica sets first.
scalingOperation := "up"
switch {
case deploymentReplicasToAdd > 0 :
sort . Sort ( controller . ReplicaSetsBySizeNewer ( allRSs ) )
case deploymentReplicasToAdd < 0 :
sort . Sort ( controller . ReplicaSetsBySizeOlder ( allRSs ) )
scalingOperation = "down"
default : /* deploymentReplicasToAdd == 0 */
// Nothing to add.
return nil
}
// Iterate over all active replica sets and estimate proportions for each of them.
// The absolute value of deploymentReplicasAdded should never exceed the absolute
// value of deploymentReplicasToAdd.
deploymentReplicasAdded := int32 ( 0 )
for i := range allRSs {
rs := allRSs [ i ]
proportion := getProportion ( rs , * deployment , deploymentReplicasToAdd , deploymentReplicasAdded )
rs . Spec . Replicas += proportion
deploymentReplicasAdded += proportion
}
// Update all replica sets
for i := range allRSs {
rs := allRSs [ i ]
// Add/remove any leftovers to the largest replica set.
if i == 0 {
leftover := deploymentReplicasToAdd - deploymentReplicasAdded
rs . Spec . Replicas += leftover
if rs . Spec . Replicas < 0 {
rs . Spec . Replicas = 0
}
}
if _ , err := dc . scaleReplicaSet ( rs , rs . Spec . Replicas , deployment , scalingOperation ) ; err != nil {
// Return as soon as we fail, the deployment is requeued
return err
}
}
}
return nil
}
2016-01-15 10:04:05 +08:00
// Rolling back to a revision; no-op if the toRevision is deployment's current revision
func ( dc * DeploymentController ) rollback ( deployment * extensions . Deployment , toRevision * int64 ) ( * extensions . Deployment , error ) {
2016-03-03 09:51:35 +08:00
newRS , allOldRSs , err := dc . getAllReplicaSetsAndSyncRevision ( deployment , true )
2016-01-15 10:04:05 +08:00
if err != nil {
return nil , err
}
2016-01-20 08:40:18 +08:00
allRSs := append ( allOldRSs , newRS )
2016-01-15 10:04:05 +08:00
// If rollback revision is 0, rollback to the last revision
if * toRevision == 0 {
2016-01-20 08:40:18 +08:00
if * toRevision = lastRevision ( allRSs ) ; * toRevision == 0 {
2016-01-15 10:04:05 +08:00
// If we still can't find the last revision, gives up rollback
2016-01-20 06:50:03 +08:00
dc . emitRollbackWarningEvent ( deployment , deploymentutil . RollbackRevisionNotFound , "Unable to find last revision." )
2016-01-15 10:04:05 +08:00
// Gives up rollback
return dc . updateDeploymentAndClearRollbackTo ( deployment )
}
}
2016-01-20 08:40:18 +08:00
for _ , rs := range allRSs {
v , err := deploymentutil . Revision ( rs )
2016-01-15 10:04:05 +08:00
if err != nil {
2016-01-20 08:40:18 +08:00
glog . V ( 4 ) . Infof ( "Unable to extract revision from deployment's replica set %q: %v" , rs . Name , err )
2016-01-15 10:04:05 +08:00
continue
}
if v == * toRevision {
2016-01-20 08:40:18 +08:00
glog . V ( 4 ) . Infof ( "Found replica set %q with desired revision %d" , rs . Name , v )
// rollback by copying podTemplate.Spec from the replica set, and increment revision number by 1
2016-01-15 10:04:05 +08:00
// no-op if the the spec matches current deployment's podTemplate.Spec
2016-01-20 08:40:18 +08:00
deployment , performedRollback , err := dc . rollbackToTemplate ( deployment , rs )
2016-01-15 10:04:05 +08:00
if performedRollback && err == nil {
dc . emitRollbackNormalEvent ( deployment , fmt . Sprintf ( "Rolled back deployment %q to revision %d" , deployment . Name , * toRevision ) )
}
return deployment , err
}
}
2016-01-20 06:50:03 +08:00
dc . emitRollbackWarningEvent ( deployment , deploymentutil . RollbackRevisionNotFound , "Unable to find the revision to rollback to." )
2016-01-15 10:04:05 +08:00
// Gives up rollback
return dc . updateDeploymentAndClearRollbackTo ( deployment )
}
func ( dc * DeploymentController ) emitRollbackWarningEvent ( deployment * extensions . Deployment , reason , message string ) {
dc . eventRecorder . Eventf ( deployment , api . EventTypeWarning , reason , message )
}
func ( dc * DeploymentController ) emitRollbackNormalEvent ( deployment * extensions . Deployment , message string ) {
2016-01-20 06:50:03 +08:00
dc . eventRecorder . Eventf ( deployment , api . EventTypeNormal , deploymentutil . RollbackDone , message )
2016-01-15 10:04:05 +08:00
}
// updateDeploymentAndClearRollbackTo sets .spec.rollbackTo to nil and update the input deployment
func ( dc * DeploymentController ) updateDeploymentAndClearRollbackTo ( deployment * extensions . Deployment ) ( * extensions . Deployment , error ) {
glog . V ( 4 ) . Infof ( "Cleans up rollbackTo of deployment %s" , deployment . Name )
deployment . Spec . RollbackTo = nil
return dc . updateDeployment ( deployment )
}
2016-01-29 00:35:14 +08:00
func ( dc * DeploymentController ) rolloutRecreate ( deployment * extensions . Deployment ) error {
2016-02-13 09:14:12 +08:00
// Don't create a new RS if not already existed, so that we avoid scaling up before scaling down
2016-03-03 09:51:35 +08:00
newRS , oldRSs , err := dc . getAllReplicaSetsAndSyncRevision ( deployment , false )
2016-01-13 11:27:26 +08:00
if err != nil {
return err
}
2016-01-29 00:35:14 +08:00
allRSs := append ( oldRSs , newRS )
2016-01-13 11:27:26 +08:00
2016-01-20 08:40:18 +08:00
// scale down old replica sets
2016-02-12 23:56:44 +08:00
scaledDown , err := dc . scaleDownOldReplicaSetsForRecreate ( controller . FilterActiveReplicaSets ( oldRSs ) , deployment )
2016-01-13 11:27:26 +08:00
if err != nil {
return err
}
if scaledDown {
// Update DeploymentStatus
2016-01-20 08:40:18 +08:00
return dc . updateDeploymentStatus ( allRSs , newRS , deployment )
2016-01-13 11:27:26 +08:00
}
2016-02-13 09:14:12 +08:00
// If we need to create a new RS, create it now
// TODO: Create a new RS without re-listing all RSs.
if newRS == nil {
2016-03-03 09:51:35 +08:00
newRS , oldRSs , err = dc . getAllReplicaSetsAndSyncRevision ( deployment , true )
2016-02-13 09:14:12 +08:00
if err != nil {
return err
}
allRSs = append ( oldRSs , newRS )
}
2016-01-20 08:40:18 +08:00
// scale up new replica set
scaledUp , err := dc . scaleUpNewReplicaSetForRecreate ( newRS , deployment )
2016-01-13 11:27:26 +08:00
if err != nil {
return err
}
if scaledUp {
// Update DeploymentStatus
2016-01-20 08:40:18 +08:00
return dc . updateDeploymentStatus ( allRSs , newRS , deployment )
2016-01-13 11:27:26 +08:00
}
2016-01-29 00:35:14 +08:00
dc . cleanupDeployment ( oldRSs , deployment )
2016-01-28 14:13:07 +08:00
2016-01-16 08:00:46 +08:00
// Sync deployment status
2016-01-20 08:40:18 +08:00
return dc . syncDeploymentStatus ( allRSs , newRS , deployment )
2015-09-18 03:41:06 +08:00
}
2016-01-29 00:35:14 +08:00
func ( dc * DeploymentController ) rolloutRolling ( deployment * extensions . Deployment ) error {
2016-03-03 09:51:35 +08:00
newRS , oldRSs , err := dc . getAllReplicaSetsAndSyncRevision ( deployment , true )
2015-09-18 03:41:06 +08:00
if err != nil {
return err
}
2016-01-29 00:35:14 +08:00
allRSs := append ( oldRSs , newRS )
2015-09-18 03:41:06 +08:00
// Scale up, if we can.
2016-01-20 08:40:18 +08:00
scaledUp , err := dc . reconcileNewReplicaSet ( allRSs , newRS , deployment )
2015-09-03 08:02:22 +08:00
if err != nil {
return err
}
2015-09-18 03:41:06 +08:00
if scaledUp {
// Update DeploymentStatus
2016-01-20 08:40:18 +08:00
return dc . updateDeploymentStatus ( allRSs , newRS , deployment )
2015-09-18 03:41:06 +08:00
}
// Scale down, if we can.
2016-02-25 14:40:14 +08:00
scaledDown , err := dc . reconcileOldReplicaSets ( allRSs , controller . FilterActiveReplicaSets ( oldRSs ) , newRS , deployment )
2015-09-03 08:02:22 +08:00
if err != nil {
return err
}
2015-09-18 03:41:06 +08:00
if scaledDown {
// Update DeploymentStatus
2016-01-20 08:40:18 +08:00
return dc . updateDeploymentStatus ( allRSs , newRS , deployment )
2015-09-03 08:02:22 +08:00
}
2015-11-19 07:12:11 +08:00
2016-01-29 00:35:14 +08:00
dc . cleanupDeployment ( oldRSs , deployment )
2016-01-28 14:13:07 +08:00
2015-11-19 07:12:11 +08:00
// Sync deployment status
2016-01-20 08:40:18 +08:00
return dc . syncDeploymentStatus ( allRSs , newRS , deployment )
2016-01-16 08:00:46 +08:00
}
// syncDeploymentStatus checks if the status is up-to-date and sync it if necessary
2016-02-28 10:13:32 +08:00
func ( dc * DeploymentController ) syncDeploymentStatus ( allRSs [ ] * extensions . ReplicaSet , newRS * extensions . ReplicaSet , d * extensions . Deployment ) error {
2016-01-29 00:35:14 +08:00
newStatus , err := dc . calculateStatus ( allRSs , newRS , d )
2016-01-09 07:58:52 +08:00
if err != nil {
2016-01-28 06:12:57 +08:00
return err
2016-01-09 07:58:52 +08:00
}
2016-01-29 00:35:14 +08:00
if ! reflect . DeepEqual ( d . Status , newStatus ) {
2016-02-23 22:23:14 +08:00
return dc . updateDeploymentStatus ( allRSs , newRS , d )
2015-11-19 07:12:11 +08:00
}
2015-09-03 08:02:22 +08:00
return nil
}
2016-03-03 09:51:35 +08:00
// getAllReplicaSetsAndSyncRevision returns all the replica sets for the provided deployment (new and all old), with new RS's and deployment's revision updated.
// 1. Get all old RSes this deployment targets, and calculate the max revision number among them (maxOldV).
// 2. Get new RS this deployment targets (whose pod template matches deployment's), and update new RS's revision number to (maxOldV + 1),
// only if its revision number is smaller than (maxOldV + 1). If this step failed, we'll update it in the next deployment sync loop.
// 3. Copy new RS's revision number to deployment (update deployment's revision). If this step failed, we'll update it in the next deployment sync loop.
2016-01-29 00:35:14 +08:00
// Note that currently the deployment controller is using caches to avoid querying the server for reads.
// This may lead to stale reads of replica sets, thus incorrect deployment status.
2016-03-03 09:51:35 +08:00
func ( dc * DeploymentController ) getAllReplicaSetsAndSyncRevision ( deployment * extensions . Deployment , createIfNotExisted bool ) ( * extensions . ReplicaSet , [ ] * extensions . ReplicaSet , error ) {
2016-06-03 02:24:43 +08:00
// List the deployment's RSes & Pods and apply pod-template-hash info to deployment's adopted RSes/Pods
rsList , podList , err := dc . rsAndPodsWithHashKeySynced ( deployment )
if err != nil {
return nil , nil , fmt . Errorf ( "error labeling replica sets and pods with pod-template-hash: %v" , err )
}
_ , allOldRSs , err := deploymentutil . FindOldReplicaSets ( deployment , rsList , podList )
2016-01-13 09:52:18 +08:00
if err != nil {
return nil , nil , err
}
2016-03-03 09:51:35 +08:00
// Calculate the max revision number among all old RSes
2016-01-20 08:40:18 +08:00
maxOldV := maxRevision ( allOldRSs )
2016-01-13 09:52:18 +08:00
2016-01-20 08:40:18 +08:00
// Get new replica set with the updated revision number
2016-06-03 02:24:43 +08:00
newRS , err := dc . getNewReplicaSet ( deployment , rsList , maxOldV , allOldRSs , createIfNotExisted )
2016-01-13 09:52:18 +08:00
if err != nil {
return nil , nil , err
}
2016-01-20 08:40:18 +08:00
// Sync deployment's revision number with new replica set
2016-02-13 09:14:12 +08:00
if newRS != nil && newRS . Annotations != nil && len ( newRS . Annotations [ deploymentutil . RevisionAnnotation ] ) > 0 &&
2016-01-20 08:40:18 +08:00
( deployment . Annotations == nil || deployment . Annotations [ deploymentutil . RevisionAnnotation ] != newRS . Annotations [ deploymentutil . RevisionAnnotation ] ) {
if err = dc . updateDeploymentRevision ( deployment , newRS . Annotations [ deploymentutil . RevisionAnnotation ] ) ; err != nil {
2016-01-13 09:52:18 +08:00
glog . V ( 4 ) . Infof ( "Error: %v. Unable to update deployment revision, will retry later." , err )
}
}
2016-01-15 10:04:05 +08:00
2016-02-12 23:56:44 +08:00
return newRS , allOldRSs , nil
2016-01-15 10:04:05 +08:00
}
2016-01-20 08:40:18 +08:00
func maxRevision ( allRSs [ ] * extensions . ReplicaSet ) int64 {
2016-01-15 10:04:05 +08:00
max := int64 ( 0 )
2016-01-20 08:40:18 +08:00
for _ , rs := range allRSs {
if v , err := deploymentutil . Revision ( rs ) ; err != nil {
// Skip the replica sets when it failed to parse their revision information
glog . V ( 4 ) . Infof ( "Error: %v. Couldn't parse revision for replica set %#v, deployment controller will skip it when reconciling revisions." , err , rs )
2016-01-13 09:52:18 +08:00
} else if v > max {
max = v
}
}
return max
}
2016-01-20 08:40:18 +08:00
// lastRevision finds the second max revision number in all replica sets (the last revision)
func lastRevision ( allRSs [ ] * extensions . ReplicaSet ) int64 {
2016-01-15 10:04:05 +08:00
max , secMax := int64 ( 0 ) , int64 ( 0 )
2016-01-20 08:40:18 +08:00
for _ , rs := range allRSs {
if v , err := deploymentutil . Revision ( rs ) ; err != nil {
// Skip the replica sets when it failed to parse their revision information
glog . V ( 4 ) . Infof ( "Error: %v. Couldn't parse revision for replica set %#v, deployment controller will skip it when reconciling revisions." , err , rs )
2016-01-15 10:04:05 +08:00
} else if v >= max {
secMax = max
max = v
} else if v > secMax {
secMax = v
}
}
return secMax
}
2016-03-15 03:07:56 +08:00
// Returns a replica set that matches the intent of the given deployment. Returns nil if the new replica set doesn't exist yet.
2016-03-03 09:51:35 +08:00
// 1. Get existing new RS (the RS that the given deployment targets, whose pod template is the same as deployment's).
// 2. If there's existing new RS, update its revision number if it's smaller than (maxOldRevision + 1), where maxOldRevision is the max revision number among all old RSes.
// 3. If there's no existing new RS and createIfNotExisted is true, create one with appropriate revision number (maxOldRevision + 1) and replicas.
2016-03-15 03:07:56 +08:00
// Note that the pod-template-hash will be added to adopted RSes and pods.
2016-06-03 02:24:43 +08:00
func ( dc * DeploymentController ) getNewReplicaSet ( deployment * extensions . Deployment , rsList [ ] extensions . ReplicaSet , maxOldRevision int64 , oldRSs [ ] * extensions . ReplicaSet , createIfNotExisted bool ) ( * extensions . ReplicaSet , error ) {
2016-01-20 08:40:18 +08:00
// Calculate revision number for this new replica set
2016-01-15 10:04:05 +08:00
newRevision := strconv . FormatInt ( maxOldRevision + 1 , 10 )
2016-01-13 09:52:18 +08:00
2016-03-15 03:07:56 +08:00
existingNewRS , err := deploymentutil . FindNewReplicaSet ( deployment , rsList )
2016-01-13 09:52:18 +08:00
if err != nil {
return nil , err
2016-01-20 08:40:18 +08:00
} else if existingNewRS != nil {
// Set existing new replica set's annotation
2016-01-29 00:35:14 +08:00
if setNewReplicaSetAnnotations ( deployment , existingNewRS , newRevision , true ) {
2016-01-20 08:40:18 +08:00
return dc . client . Extensions ( ) . ReplicaSets ( deployment . ObjectMeta . Namespace ) . Update ( existingNewRS )
2016-01-13 09:52:18 +08:00
}
2016-01-20 08:40:18 +08:00
return existingNewRS , nil
2015-09-03 08:02:22 +08:00
}
2016-02-13 09:14:12 +08:00
if ! createIfNotExisted {
return nil , nil
}
2016-01-20 08:40:18 +08:00
// new ReplicaSet does not exist, create one.
2015-09-19 04:35:56 +08:00
namespace := deployment . ObjectMeta . Namespace
2016-01-13 07:37:51 +08:00
podTemplateSpecHash := podutil . GetPodTemplateSpecHash ( deployment . Spec . Template )
2016-01-20 08:40:18 +08:00
newRSTemplate := deploymentutil . GetNewReplicaSetTemplate ( deployment )
// Add podTemplateHash label to selector.
newRSSelector := labelsutil . CloneSelectorAndAddLabel ( deployment . Spec . Selector , extensions . DefaultDeploymentUniqueLabelKey , podTemplateSpecHash )
2015-10-08 06:28:39 +08:00
2016-01-20 08:40:18 +08:00
// Create new ReplicaSet
newRS := extensions . ReplicaSet {
2015-09-03 08:02:22 +08:00
ObjectMeta : api . ObjectMeta {
2016-02-28 10:13:32 +08:00
// Make the name deterministic, to ensure idempotence
Name : deployment . Name + "-" + fmt . Sprintf ( "%d" , podTemplateSpecHash ) ,
Namespace : namespace ,
2015-09-03 08:02:22 +08:00
} ,
2016-01-20 08:40:18 +08:00
Spec : extensions . ReplicaSetSpec {
2015-09-03 08:02:22 +08:00
Replicas : 0 ,
2016-01-20 08:40:18 +08:00
Selector : newRSSelector ,
2016-03-10 05:11:13 +08:00
Template : newRSTemplate ,
2015-09-03 08:02:22 +08:00
} ,
}
2016-02-05 10:05:38 +08:00
allRSs := append ( oldRSs , & newRS )
2016-02-28 10:13:32 +08:00
newReplicasCount , err := deploymentutil . NewRSNewReplicas ( deployment , allRSs , & newRS )
2016-02-05 10:05:38 +08:00
if err != nil {
return nil , err
}
2016-02-25 14:40:14 +08:00
2016-02-05 10:05:38 +08:00
newRS . Spec . Replicas = newReplicasCount
2016-01-29 00:35:14 +08:00
// Set new replica set's annotation
setNewReplicaSetAnnotations ( deployment , & newRS , newRevision , false )
2016-01-20 08:40:18 +08:00
createdRS , err := dc . client . Extensions ( ) . ReplicaSets ( namespace ) . Create ( & newRS )
2015-09-03 08:02:22 +08:00
if err != nil {
2016-02-25 14:40:14 +08:00
dc . enqueueDeployment ( deployment )
2016-02-28 10:13:32 +08:00
return nil , fmt . Errorf ( "error creating replica set %v: %v" , deployment . Name , err )
2015-09-03 08:02:22 +08:00
}
2016-02-05 10:05:38 +08:00
if newReplicasCount > 0 {
2016-02-28 10:13:32 +08:00
dc . eventRecorder . Eventf ( deployment , api . EventTypeNormal , "ScalingReplicaSet" , "Scaled %s replica set %s to %d" , "up" , createdRS . Name , newReplicasCount )
2016-02-05 10:05:38 +08:00
}
2016-01-13 09:52:18 +08:00
2016-02-05 10:05:38 +08:00
return createdRS , dc . updateDeploymentRevision ( deployment , newRevision )
2015-09-03 08:02:22 +08:00
}
2015-09-18 03:41:06 +08:00
2016-03-15 03:07:56 +08:00
// rsAndPodsWithHashKeySynced returns the RSes and pods the given deployment targets, with pod-template-hash information synced.
func ( dc * DeploymentController ) rsAndPodsWithHashKeySynced ( deployment * extensions . Deployment ) ( [ ] extensions . ReplicaSet , * api . PodList , error ) {
rsList , err := deploymentutil . ListReplicaSets ( deployment ,
func ( namespace string , options api . ListOptions ) ( [ ] extensions . ReplicaSet , error ) {
return dc . rsStore . ReplicaSets ( namespace ) . List ( options . LabelSelector )
} )
if err != nil {
return nil , nil , fmt . Errorf ( "error listing ReplicaSets: %v" , err )
}
syncedRSList := [ ] extensions . ReplicaSet { }
for _ , rs := range rsList {
// Add pod-template-hash information if it's not in the RS.
// Otherwise, new RS produced by Deployment will overlap with pre-existing ones
// that aren't constrained by the pod-template-hash.
syncedRS , err := dc . addHashKeyToRSAndPods ( rs )
if err != nil {
return nil , nil , err
}
syncedRSList = append ( syncedRSList , * syncedRS )
}
2016-06-08 07:58:18 +08:00
syncedPodList , err := dc . listPods ( deployment )
2016-03-15 03:07:56 +08:00
if err != nil {
return nil , nil , err
}
return syncedRSList , syncedPodList , nil
}
2016-06-08 07:58:18 +08:00
func ( dc * DeploymentController ) listPods ( deployment * extensions . Deployment ) ( * api . PodList , error ) {
return deploymentutil . ListPods ( deployment ,
func ( namespace string , options api . ListOptions ) ( * api . PodList , error ) {
podList , err := dc . podStore . Pods ( namespace ) . List ( options . LabelSelector )
return & podList , err
} )
}
2016-03-15 03:07:56 +08:00
// addHashKeyToRSAndPods adds pod-template-hash information to the given rs, if it's not already there, with the following steps:
// 1. Add hash label to the rs's pod template, and make sure the controller sees this update so that no orphaned pods will be created
// 2. Add hash label to all pods this rs owns, wait until replicaset controller reports rs.Status.FullyLabeledReplicas equal to the desired number of replicas
// 3. Add hash label to the rs's label and selector
func ( dc * DeploymentController ) addHashKeyToRSAndPods ( rs extensions . ReplicaSet ) ( updatedRS * extensions . ReplicaSet , err error ) {
updatedRS = & rs
// If the rs already has the new hash label in its selector, it's done syncing
if labelsutil . SelectorHasLabel ( rs . Spec . Selector , extensions . DefaultDeploymentUniqueLabelKey ) {
return
}
namespace := rs . Namespace
hash := rsutil . GetPodTemplateSpecHash ( rs )
rsUpdated := false
// 1. Add hash template label to the rs. This ensures that any newly created pods will have the new label.
updatedRS , rsUpdated , err = rsutil . UpdateRSWithRetries ( dc . client . Extensions ( ) . ReplicaSets ( namespace ) , updatedRS ,
func ( updated * extensions . ReplicaSet ) error {
// Precondition: the RS doesn't contain the new hash in its pod template label.
if updated . Spec . Template . Labels [ extensions . DefaultDeploymentUniqueLabelKey ] == hash {
return utilerrors . ErrPreconditionViolated
}
updated . Spec . Template . Labels = labelsutil . AddLabel ( updated . Spec . Template . Labels , extensions . DefaultDeploymentUniqueLabelKey , hash )
return nil
} )
if err != nil {
return nil , fmt . Errorf ( "error updating %s %s/%s pod template label with template hash: %v" , updatedRS . Kind , updatedRS . Namespace , updatedRS . Name , err )
}
if ! rsUpdated {
// If RS wasn't updated but didn't return error in step 1, we've hit a RS not found error.
// Return here and retry in the next sync loop.
return & rs , nil
}
// Make sure rs pod template is updated so that it won't create pods without the new label (orphaned pods).
if updatedRS . Generation > updatedRS . Status . ObservedGeneration {
if err = deploymentutil . WaitForReplicaSetUpdated ( dc . client , updatedRS . Generation , namespace , updatedRS . Name ) ; err != nil {
return nil , fmt . Errorf ( "error waiting for %s %s/%s generation %d observed by controller: %v" , updatedRS . Kind , updatedRS . Namespace , updatedRS . Name , updatedRS . Generation , err )
}
}
glog . V ( 4 ) . Infof ( "Observed the update of %s %s/%s's pod template with hash %s." , rs . Kind , rs . Namespace , rs . Name , hash )
// 2. Update all pods managed by the rs to have the new hash label, so they will be correctly adopted.
selector , err := unversioned . LabelSelectorAsSelector ( updatedRS . Spec . Selector )
if err != nil {
return nil , fmt . Errorf ( "error in converting selector to label selector for replica set %s: %s" , updatedRS . Name , err )
}
options := api . ListOptions { LabelSelector : selector }
podList , err := dc . podStore . Pods ( namespace ) . List ( options . LabelSelector )
if err != nil {
return nil , fmt . Errorf ( "error in getting pod list for namespace %s and list options %+v: %s" , namespace , options , err )
}
allPodsLabeled := false
if allPodsLabeled , err = deploymentutil . LabelPodsWithHash ( & podList , updatedRS , dc . client , namespace , hash ) ; err != nil {
return nil , fmt . Errorf ( "error in adding template hash label %s to pods %+v: %s" , hash , podList , err )
}
// If not all pods are labeled but didn't return error in step 2, we've hit at least one pod not found error.
// Return here and retry in the next sync loop.
if ! allPodsLabeled {
return updatedRS , nil
}
// We need to wait for the replicaset controller to observe the pods being
// labeled with pod template hash. Because previously we've called
// WaitForReplicaSetUpdated, the replicaset controller should have dropped
// FullyLabeledReplicas to 0 already, we only need to wait it to increase
// back to the number of replicas in the spec.
if err = deploymentutil . WaitForPodsHashPopulated ( dc . client , updatedRS . Generation , namespace , updatedRS . Name ) ; err != nil {
return nil , fmt . Errorf ( "%s %s/%s: error waiting for replicaset controller to observe pods being labeled with template hash: %v" , updatedRS . Kind , updatedRS . Namespace , updatedRS . Name , err )
}
// 3. Update rs label and selector to include the new hash label
// Copy the old selector, so that we can scrub out any orphaned pods
if updatedRS , rsUpdated , err = rsutil . UpdateRSWithRetries ( dc . client . Extensions ( ) . ReplicaSets ( namespace ) , updatedRS ,
func ( updated * extensions . ReplicaSet ) error {
// Precondition: the RS doesn't contain the new hash in its label or selector.
if updated . Labels [ extensions . DefaultDeploymentUniqueLabelKey ] == hash && updated . Spec . Selector . MatchLabels [ extensions . DefaultDeploymentUniqueLabelKey ] == hash {
return utilerrors . ErrPreconditionViolated
}
updated . Labels = labelsutil . AddLabel ( updated . Labels , extensions . DefaultDeploymentUniqueLabelKey , hash )
updated . Spec . Selector = labelsutil . AddLabelToSelector ( updated . Spec . Selector , extensions . DefaultDeploymentUniqueLabelKey , hash )
return nil
} ) ; err != nil {
return nil , fmt . Errorf ( "error updating %s %s/%s label and selector with template hash: %v" , updatedRS . Kind , updatedRS . Namespace , updatedRS . Name , err )
}
if rsUpdated {
glog . V ( 4 ) . Infof ( "Updated %s %s/%s's selector and label with hash %s." , rs . Kind , rs . Namespace , rs . Name , hash )
}
// If the RS isn't actually updated in step 3, that's okay, we'll retry in the next sync loop since its selector isn't updated yet.
// TODO: look for orphaned pods and label them in the background somewhere else periodically
return updatedRS , nil
}
2016-01-20 08:40:18 +08:00
// setNewReplicaSetAnnotations sets new replica set's annotations appropriately by updating its revision and
// copying required deployment annotations to it; it returns true if replica set's annotation is changed.
2016-01-29 00:35:14 +08:00
func setNewReplicaSetAnnotations ( deployment * extensions . Deployment , newRS * extensions . ReplicaSet , newRevision string , exists bool ) bool {
2016-03-03 09:51:35 +08:00
// First, copy deployment's annotations (except for apply and revision annotations)
2016-03-03 03:19:48 +08:00
annotationChanged := copyDeploymentAnnotationsToReplicaSet ( deployment , newRS )
2016-01-20 08:40:18 +08:00
// Then, update replica set's revision annotation
2016-03-03 03:19:48 +08:00
if newRS . Annotations == nil {
newRS . Annotations = make ( map [ string ] string )
}
// The newRS's revision should be the greatest among all RSes. Usually, its revision number is newRevision (the max revision number
// of all old RSes + 1). However, it's possible that some of the old RSes are deleted after the newRS revision being updated, and
// newRevision becomes smaller than newRS's revision. We should only update newRS revision when it's smaller than newRevision.
if newRS . Annotations [ deploymentutil . RevisionAnnotation ] < newRevision {
newRS . Annotations [ deploymentutil . RevisionAnnotation ] = newRevision
2016-02-05 07:26:06 +08:00
annotationChanged = true
2016-01-29 00:35:14 +08:00
glog . V ( 4 ) . Infof ( "Updating replica set %q revision to %s" , newRS . Name , newRevision )
}
if ! exists && setReplicasAnnotations ( newRS , deployment . Spec . Replicas , deployment . Spec . Replicas + maxSurge ( * deployment ) ) {
annotationChanged = true
2016-02-05 07:26:06 +08:00
}
return annotationChanged
2016-01-13 09:52:18 +08:00
}
2016-01-29 00:35:14 +08:00
var annotationsToSkip = map [ string ] bool {
annotations . LastAppliedConfigAnnotation : true ,
deploymentutil . RevisionAnnotation : true ,
deploymentutil . DesiredReplicasAnnotation : true ,
deploymentutil . MaxReplicasAnnotation : true ,
}
2016-03-18 06:39:05 +08:00
// skipCopyAnnotation returns true if we should skip copying the annotation with the given annotation key
// TODO: How to decide which annotations should / should not be copied?
// See https://github.com/kubernetes/kubernetes/pull/20035#issuecomment-179558615
func skipCopyAnnotation ( key string ) bool {
2016-01-29 00:35:14 +08:00
return annotationsToSkip [ key ]
2016-03-18 06:39:05 +08:00
}
func getSkippedAnnotations ( annotations map [ string ] string ) map [ string ] string {
skippedAnnotations := make ( map [ string ] string )
for k , v := range annotations {
if skipCopyAnnotation ( k ) {
skippedAnnotations [ k ] = v
}
}
return skippedAnnotations
}
2016-01-20 08:40:18 +08:00
// copyDeploymentAnnotationsToReplicaSet copies deployment's annotations to replica set's annotations,
2016-03-03 03:19:48 +08:00
// and returns true if replica set's annotation is changed.
// Note that apply and revision annotations are not copied.
2016-01-20 08:40:18 +08:00
func copyDeploymentAnnotationsToReplicaSet ( deployment * extensions . Deployment , rs * extensions . ReplicaSet ) bool {
rsAnnotationsChanged := false
if rs . Annotations == nil {
rs . Annotations = make ( map [ string ] string )
2016-01-15 10:04:05 +08:00
}
2016-02-05 07:26:06 +08:00
for k , v := range deployment . Annotations {
2016-03-03 03:19:48 +08:00
// newRS revision is updated automatically in getNewReplicaSet, and the deployment's revision number is then updated
// by copying its newRS revision number. We should not copy deployment's revision to its newRS, since the update of
// deployment revision number may fail (revision becomes stale) and the revision number in newRS is more reliable.
2016-03-18 06:39:05 +08:00
if skipCopyAnnotation ( k ) || rs . Annotations [ k ] == v {
2016-02-05 07:26:06 +08:00
continue
}
2016-01-20 08:40:18 +08:00
rs . Annotations [ k ] = v
rsAnnotationsChanged = true
2016-02-05 07:26:06 +08:00
}
2016-01-20 08:40:18 +08:00
return rsAnnotationsChanged
2016-02-05 07:26:06 +08:00
}
2016-03-18 06:39:05 +08:00
// setDeploymentAnnotationsTo sets deployment's annotations as given RS's annotations.
// This action should be done if and only if the deployment is rolling back to this rs.
// Note that apply and revision annotations are not changed.
func setDeploymentAnnotationsTo ( deployment * extensions . Deployment , rollbackToRS * extensions . ReplicaSet ) {
deployment . Annotations = getSkippedAnnotations ( deployment . Annotations )
for k , v := range rollbackToRS . Annotations {
if ! skipCopyAnnotation ( k ) {
deployment . Annotations [ k ] = v
}
}
}
2016-02-28 10:13:32 +08:00
func ( dc * DeploymentController ) updateDeploymentRevision ( deployment * extensions . Deployment , revision string ) error {
2016-02-05 07:26:06 +08:00
if deployment . Annotations == nil {
deployment . Annotations = make ( map [ string ] string )
}
2016-02-05 10:05:38 +08:00
if deployment . Annotations [ deploymentutil . RevisionAnnotation ] != revision {
deployment . Annotations [ deploymentutil . RevisionAnnotation ] = revision
2016-02-28 10:13:32 +08:00
_ , err := dc . updateDeployment ( deployment )
2016-02-05 10:05:38 +08:00
return err
}
return nil
2016-01-15 10:04:05 +08:00
}
2016-02-28 10:13:32 +08:00
func ( dc * DeploymentController ) reconcileNewReplicaSet ( allRSs [ ] * extensions . ReplicaSet , newRS * extensions . ReplicaSet , deployment * extensions . Deployment ) ( bool , error ) {
2016-01-20 08:40:18 +08:00
if newRS . Spec . Replicas == deployment . Spec . Replicas {
2015-10-08 04:13:18 +08:00
// Scaling not required.
2015-09-18 03:41:06 +08:00
return false , nil
}
2016-01-20 08:40:18 +08:00
if newRS . Spec . Replicas > deployment . Spec . Replicas {
2015-10-08 04:13:18 +08:00
// Scale down.
2016-02-05 10:05:38 +08:00
scaled , _ , err := dc . scaleReplicaSetAndRecordEvent ( newRS , deployment . Spec . Replicas , deployment )
return scaled , err
2015-10-08 04:13:18 +08:00
}
2016-02-28 10:13:32 +08:00
newReplicasCount , err := deploymentutil . NewRSNewReplicas ( deployment , allRSs , newRS )
2015-09-18 03:41:06 +08:00
if err != nil {
2016-02-11 14:35:46 +08:00
return false , err
2015-09-18 03:41:06 +08:00
}
2016-02-05 10:05:38 +08:00
scaled , _ , err := dc . scaleReplicaSetAndRecordEvent ( newRS , newReplicasCount , deployment )
return scaled , err
2015-09-18 03:41:06 +08:00
}
2016-01-29 00:35:14 +08:00
func ( dc * DeploymentController ) getAvailablePodsForReplicaSets ( deployment * extensions . Deployment , rss [ ] * extensions . ReplicaSet ) ( int32 , error ) {
2016-06-08 07:58:18 +08:00
podList , err := dc . listPods ( deployment )
if err != nil {
return 0 , err
}
2016-01-29 00:35:14 +08:00
return deploymentutil . CountAvailablePodsForReplicaSets ( podList , rss , deployment . Spec . MinReadySeconds )
2016-06-08 07:58:18 +08:00
}
2016-02-28 10:13:32 +08:00
func ( dc * DeploymentController ) reconcileOldReplicaSets ( allRSs [ ] * extensions . ReplicaSet , oldRSs [ ] * extensions . ReplicaSet , newRS * extensions . ReplicaSet , deployment * extensions . Deployment ) ( bool , error ) {
2016-01-20 08:40:18 +08:00
oldPodsCount := deploymentutil . GetReplicaCountForReplicaSets ( oldRSs )
2015-09-18 03:41:06 +08:00
if oldPodsCount == 0 {
2016-01-30 15:09:26 +08:00
// Can't scale down further
2015-09-18 03:41:06 +08:00
return false , nil
}
2016-01-30 15:09:26 +08:00
minReadySeconds := deployment . Spec . MinReadySeconds
2016-01-20 08:40:18 +08:00
allPodsCount := deploymentutil . GetReplicaCountForReplicaSets ( allRSs )
2016-06-08 07:58:18 +08:00
// TODO: use dc.getAvailablePodsForReplicaSets instead
newRSAvailablePodCount , err := deploymentutil . GetAvailablePodsForReplicaSets ( dc . client , deployment , [ ] * extensions . ReplicaSet { newRS } , minReadySeconds )
2016-01-30 15:09:26 +08:00
if err != nil {
return false , fmt . Errorf ( "could not find available pods: %v" , err )
}
2016-01-29 00:35:14 +08:00
maxUnavailable := maxUnavailable ( * deployment )
2016-01-30 15:09:26 +08:00
// Check if we can scale down. We can scale down in the following 2 cases:
2016-01-20 08:40:18 +08:00
// * Some old replica sets have unhealthy replicas, we could safely scale down those unhealthy replicas since that won't further
2016-01-30 15:09:26 +08:00
// increase unavailability.
2016-01-20 08:40:18 +08:00
// * New replica set has scaled up and it's replicas becomes ready, then we can scale down old replica sets in a further step.
2016-01-30 15:09:26 +08:00
//
2016-01-20 08:40:18 +08:00
// maxScaledDown := allPodsCount - minAvailable - newReplicaSetPodsUnavailable
2016-01-30 15:09:26 +08:00
// take into account not only maxUnavailable and any surge pods that have been created, but also unavailable pods from
2016-01-20 08:40:18 +08:00
// the newRS, so that the unavailable pods from the newRS would not make us scale down old replica sets in a further
// step(that will increase unavailability).
2016-01-30 15:09:26 +08:00
//
// Concrete example:
//
// * 10 replicas
// * 2 maxUnavailable (absolute number, not percent)
// * 3 maxSurge (absolute number, not percent)
//
// case 1:
2016-01-20 08:40:18 +08:00
// * Deployment is updated, newRS is created with 3 replicas, oldRS is scaled down to 8, and newRS is scaled up to 5.
// * The new replica set pods crashloop and never become available.
// * allPodsCount is 13. minAvailable is 8. newRSPodsUnavailable is 5.
// * A node fails and causes one of the oldRS pods to become unavailable. However, 13 - 8 - 5 = 0, so the oldRS won't be scaled down.
2016-01-30 15:09:26 +08:00
// * The user notices the crashloop and does kubectl rollout undo to rollback.
2016-01-20 08:40:18 +08:00
// * newRSPodsUnavailable is 1, since we rolled back to the good replica set, so maxScaledDown = 13 - 8 - 1 = 4. 4 of the crashlooping pods will be scaled down.
// * The total number of pods will then be 9 and the newRS can be scaled up to 10.
2016-01-30 15:09:26 +08:00
//
// case 2:
// Same example, but pushing a new pod template instead of rolling back (aka "roll over"):
2016-01-20 08:40:18 +08:00
// * The new replica set created must start with 0 replicas because allPodsCount is already at 13.
// * However, newRSPodsUnavailable would also be 0, so the 2 old replica sets could be scaled down by 5 (13 - 8 - 0), which would then
// allow the new replica set to be scaled up by 5.
2015-09-18 03:41:06 +08:00
minAvailable := deployment . Spec . Replicas - maxUnavailable
2016-01-20 08:40:18 +08:00
newRSUnavailablePodCount := newRS . Spec . Replicas - newRSAvailablePodCount
maxScaledDown := allPodsCount - minAvailable - newRSUnavailablePodCount
2016-01-30 15:09:26 +08:00
if maxScaledDown <= 0 {
return false , nil
}
// Clean up unhealthy replicas first, otherwise unhealthy replicas will block deployment
// and cause timeout. See https://github.com/kubernetes/kubernetes/issues/16737
2016-06-21 04:52:19 +08:00
oldRSs , cleanupCount , err := dc . cleanupUnhealthyReplicas ( oldRSs , deployment , deployment . Spec . MinReadySeconds , maxScaledDown )
2015-12-04 08:00:13 +08:00
if err != nil {
2016-01-30 15:09:26 +08:00
return false , nil
2015-12-04 08:00:13 +08:00
}
2016-03-11 07:06:38 +08:00
glog . V ( 4 ) . Infof ( "Cleaned up unhealthy replicas from old RSes by %d" , cleanupCount )
2016-01-30 15:09:26 +08:00
2016-01-20 08:40:18 +08:00
// Scale down old replica sets, need check maxUnavailable to ensure we can scale down
2016-03-11 07:06:38 +08:00
allRSs = append ( oldRSs , newRS )
2016-01-20 08:40:18 +08:00
scaledDownCount , err := dc . scaleDownOldReplicaSetsForRollingUpdate ( allRSs , oldRSs , deployment )
2016-01-30 15:09:26 +08:00
if err != nil {
2015-12-04 08:00:13 +08:00
return false , nil
}
2016-06-01 06:59:46 +08:00
glog . V ( 4 ) . Infof ( "Scaled down old RSes of deployment %s by %d" , deployment . Name , scaledDownCount )
2016-01-30 15:09:26 +08:00
totalScaledDown := cleanupCount + scaledDownCount
return totalScaledDown > 0 , nil
}
2016-01-20 08:40:18 +08:00
// cleanupUnhealthyReplicas will scale down old replica sets with unhealthy replicas, so that all unhealthy replicas will be deleted.
2016-06-21 04:52:19 +08:00
func ( dc * DeploymentController ) cleanupUnhealthyReplicas ( oldRSs [ ] * extensions . ReplicaSet , deployment * extensions . Deployment , minReadySeconds , maxCleanupCount int32 ) ( [ ] * extensions . ReplicaSet , int32 , error ) {
2016-01-20 08:40:18 +08:00
sort . Sort ( controller . ReplicaSetsByCreationTimestamp ( oldRSs ) )
// Safely scale down all old replica sets with unhealthy replicas. Replica set will sort the pods in the order
2016-01-30 15:09:26 +08:00
// such that not-ready < ready, unscheduled < scheduled, and pending < running. This ensures that unhealthy replicas will
// been deleted first and won't increase unavailability.
2016-04-27 12:35:14 +08:00
totalScaledDown := int32 ( 0 )
2016-03-11 07:06:38 +08:00
for i , targetRS := range oldRSs {
2016-01-30 15:09:26 +08:00
if totalScaledDown >= maxCleanupCount {
break
}
2016-01-20 08:40:18 +08:00
if targetRS . Spec . Replicas == 0 {
// cannot scale down this replica set.
2016-01-30 15:09:26 +08:00
continue
}
2016-06-08 07:58:18 +08:00
// TODO: use dc.getAvailablePodsForReplicaSets instead
2016-06-21 04:52:19 +08:00
availablePodCount , err := deploymentutil . GetAvailablePodsForReplicaSets ( dc . client , deployment , [ ] * extensions . ReplicaSet { targetRS } , minReadySeconds )
2016-01-30 15:09:26 +08:00
if err != nil {
2016-03-11 07:06:38 +08:00
return nil , totalScaledDown , fmt . Errorf ( "could not find available pods: %v" , err )
2016-01-30 15:09:26 +08:00
}
2016-06-21 04:52:19 +08:00
if targetRS . Spec . Replicas == availablePodCount {
2016-01-30 15:09:26 +08:00
// no unhealthy replicas found, no scaling required.
continue
}
2016-06-21 04:52:19 +08:00
scaledDownCount := int32 ( integer . IntMin ( int ( maxCleanupCount - totalScaledDown ) , int ( targetRS . Spec . Replicas - availablePodCount ) ) )
2016-01-20 08:40:18 +08:00
newReplicasCount := targetRS . Spec . Replicas - scaledDownCount
2016-03-11 07:06:38 +08:00
if newReplicasCount > targetRS . Spec . Replicas {
return nil , 0 , fmt . Errorf ( "when cleaning up unhealthy replicas, got invalid request to scale down %s/%s %d -> %d" , targetRS . Namespace , targetRS . Name , targetRS . Spec . Replicas , newReplicasCount )
}
_ , updatedOldRS , err := dc . scaleReplicaSetAndRecordEvent ( targetRS , newReplicasCount , deployment )
2016-01-30 15:09:26 +08:00
if err != nil {
2016-03-11 07:06:38 +08:00
return nil , totalScaledDown , err
2016-01-30 15:09:26 +08:00
}
totalScaledDown += scaledDownCount
2016-03-11 07:06:38 +08:00
oldRSs [ i ] = updatedOldRS
2016-01-30 15:09:26 +08:00
}
2016-03-11 07:06:38 +08:00
return oldRSs , totalScaledDown , nil
2016-01-30 15:09:26 +08:00
}
2016-01-20 08:40:18 +08:00
// scaleDownOldReplicaSetsForRollingUpdate scales down old replica sets when deployment strategy is "RollingUpdate".
2016-01-30 15:09:26 +08:00
// Need check maxUnavailable to ensure availability
2016-04-27 12:35:14 +08:00
func ( dc * DeploymentController ) scaleDownOldReplicaSetsForRollingUpdate ( allRSs [ ] * extensions . ReplicaSet , oldRSs [ ] * extensions . ReplicaSet , deployment * extensions . Deployment ) ( int32 , error ) {
2016-01-29 00:35:14 +08:00
maxUnavailable := maxUnavailable ( * deployment )
2016-02-11 14:35:46 +08:00
2016-01-30 15:09:26 +08:00
// Check if we can scale down.
minAvailable := deployment . Spec . Replicas - maxUnavailable
minReadySeconds := deployment . Spec . MinReadySeconds
2015-09-18 03:41:06 +08:00
// Find the number of ready pods.
2016-06-08 07:58:18 +08:00
// TODO: use dc.getAvailablePodsForReplicaSets instead
availablePodCount , err := deploymentutil . GetAvailablePodsForReplicaSets ( dc . client , deployment , allRSs , minReadySeconds )
2015-09-30 07:55:06 +08:00
if err != nil {
2016-01-30 15:09:26 +08:00
return 0 , fmt . Errorf ( "could not find available pods: %v" , err )
2015-09-18 03:41:06 +08:00
}
2016-06-01 06:59:46 +08:00
if availablePodCount <= minAvailable {
2015-09-18 03:41:06 +08:00
// Cannot scale down.
2016-01-30 15:09:26 +08:00
return 0 , nil
2015-09-18 03:41:06 +08:00
}
2016-06-01 06:59:46 +08:00
glog . V ( 4 ) . Infof ( "Found %d available pods in deployment %s, scaling down old RSes" , availablePodCount , deployment . Name )
2016-01-30 15:09:26 +08:00
2016-01-20 08:40:18 +08:00
sort . Sort ( controller . ReplicaSetsByCreationTimestamp ( oldRSs ) )
2016-01-30 15:09:26 +08:00
2016-04-27 12:35:14 +08:00
totalScaledDown := int32 ( 0 )
2016-06-01 06:59:46 +08:00
totalScaleDownCount := availablePodCount - minAvailable
2016-01-20 08:40:18 +08:00
for _ , targetRS := range oldRSs {
2016-01-30 15:09:26 +08:00
if totalScaledDown >= totalScaleDownCount {
2015-09-18 03:41:06 +08:00
// No further scaling required.
break
}
2016-01-20 08:40:18 +08:00
if targetRS . Spec . Replicas == 0 {
// cannot scale down this ReplicaSet.
2015-09-18 03:41:06 +08:00
continue
}
// Scale down.
2016-04-27 12:35:14 +08:00
scaleDownCount := int32 ( integer . IntMin ( int ( targetRS . Spec . Replicas ) , int ( totalScaleDownCount - totalScaledDown ) ) )
2016-01-20 08:40:18 +08:00
newReplicasCount := targetRS . Spec . Replicas - scaleDownCount
2016-03-11 07:06:38 +08:00
if newReplicasCount > targetRS . Spec . Replicas {
return 0 , fmt . Errorf ( "when scaling down old RS, got invalid request to scale down %s/%s %d -> %d" , targetRS . Namespace , targetRS . Name , targetRS . Spec . Replicas , newReplicasCount )
}
2016-02-05 10:05:38 +08:00
_ , _ , err = dc . scaleReplicaSetAndRecordEvent ( targetRS , newReplicasCount , deployment )
2015-09-18 03:41:06 +08:00
if err != nil {
2016-01-30 15:09:26 +08:00
return totalScaledDown , err
2015-09-18 03:41:06 +08:00
}
2016-01-30 15:09:26 +08:00
2016-01-12 08:50:11 +08:00
totalScaledDown += scaleDownCount
}
2016-01-30 15:09:26 +08:00
return totalScaledDown , nil
2015-09-18 03:41:06 +08:00
}
2016-01-20 08:40:18 +08:00
// scaleDownOldReplicaSetsForRecreate scales down old replica sets when deployment strategy is "Recreate"
2016-02-28 10:13:32 +08:00
func ( dc * DeploymentController ) scaleDownOldReplicaSetsForRecreate ( oldRSs [ ] * extensions . ReplicaSet , deployment * extensions . Deployment ) ( bool , error ) {
2016-01-13 11:27:26 +08:00
scaled := false
2016-01-20 08:40:18 +08:00
for _ , rs := range oldRSs {
2016-01-13 11:27:26 +08:00
// Scaling not required.
2016-01-20 08:40:18 +08:00
if rs . Spec . Replicas == 0 {
2016-01-13 11:27:26 +08:00
continue
}
2016-02-05 10:05:38 +08:00
scaledRS , _ , err := dc . scaleReplicaSetAndRecordEvent ( rs , 0 , deployment )
2016-01-13 11:27:26 +08:00
if err != nil {
return false , err
}
2016-02-05 10:05:38 +08:00
if scaledRS {
scaled = true
}
2016-01-13 11:27:26 +08:00
}
return scaled , nil
}
2016-01-20 08:40:18 +08:00
// scaleUpNewReplicaSetForRecreate scales up new replica set when deployment strategy is "Recreate"
2016-02-28 10:13:32 +08:00
func ( dc * DeploymentController ) scaleUpNewReplicaSetForRecreate ( newRS * extensions . ReplicaSet , deployment * extensions . Deployment ) ( bool , error ) {
2016-02-05 10:05:38 +08:00
scaled , _ , err := dc . scaleReplicaSetAndRecordEvent ( newRS , deployment . Spec . Replicas , deployment )
return scaled , err
2016-01-13 11:27:26 +08:00
}
2016-01-29 00:35:14 +08:00
// cleanupDeployment is responsible for cleaning up a deployment ie. retains all but the latest N old replica sets
// where N=d.Spec.RevisionHistoryLimit. Old replica sets are older versions of the podtemplate of a deployment kept
// around by default 1) for historical reasons and 2) for the ability to rollback a deployment.
func ( dc * DeploymentController ) cleanupDeployment ( oldRSs [ ] * extensions . ReplicaSet , deployment * extensions . Deployment ) error {
if deployment . Spec . RevisionHistoryLimit == nil {
return nil
}
2016-04-27 12:35:14 +08:00
diff := int32 ( len ( oldRSs ) ) - * deployment . Spec . RevisionHistoryLimit
2016-01-28 14:13:07 +08:00
if diff <= 0 {
return nil
}
2016-02-06 10:43:02 +08:00
sort . Sort ( controller . ReplicaSetsByCreationTimestamp ( oldRSs ) )
2016-01-28 14:13:07 +08:00
var errList [ ] error
// TODO: This should be parallelized.
2016-04-27 12:35:14 +08:00
for i := int32 ( 0 ) ; i < diff ; i ++ {
2016-02-06 10:43:02 +08:00
rs := oldRSs [ i ]
2016-01-20 08:40:18 +08:00
// Avoid delete replica set with non-zero replica counts
2016-03-01 07:15:55 +08:00
if rs . Status . Replicas != 0 || rs . Spec . Replicas != 0 || rs . Generation > rs . Status . ObservedGeneration {
2016-01-28 14:13:07 +08:00
continue
}
2016-01-20 08:40:18 +08:00
if err := dc . client . Extensions ( ) . ReplicaSets ( rs . Namespace ) . Delete ( rs . Name , nil ) ; err != nil && ! errors . IsNotFound ( err ) {
glog . V ( 2 ) . Infof ( "Failed deleting old replica set %v for deployment %v: %v" , rs . Name , deployment . Name , err )
2016-01-28 14:13:07 +08:00
errList = append ( errList , err )
}
}
return utilerrors . NewAggregate ( errList )
}
2016-02-28 10:13:32 +08:00
func ( dc * DeploymentController ) updateDeploymentStatus ( allRSs [ ] * extensions . ReplicaSet , newRS * extensions . ReplicaSet , deployment * extensions . Deployment ) error {
2016-01-29 00:35:14 +08:00
newStatus , err := dc . calculateStatus ( allRSs , newRS , deployment )
2016-01-09 07:58:52 +08:00
if err != nil {
2016-01-28 06:12:57 +08:00
return err
2016-01-09 07:58:52 +08:00
}
2016-01-29 00:35:14 +08:00
newDeployment := deployment
newDeployment . Status = newStatus
_ , err = dc . client . Extensions ( ) . Deployments ( deployment . Namespace ) . UpdateStatus ( newDeployment )
2015-09-18 03:41:06 +08:00
return err
}
2016-01-29 00:35:14 +08:00
func ( dc * DeploymentController ) calculateStatus ( allRSs [ ] * extensions . ReplicaSet , newRS * extensions . ReplicaSet , deployment * extensions . Deployment ) ( extensions . DeploymentStatus , error ) {
availableReplicas , err := dc . getAvailablePodsForReplicaSets ( deployment , allRSs )
2016-01-28 06:12:57 +08:00
if err != nil {
2016-01-29 00:35:14 +08:00
return deployment . Status , fmt . Errorf ( "failed to count available pods: %v" , err )
2016-01-28 06:12:57 +08:00
}
2016-02-25 02:59:51 +08:00
totalReplicas := deploymentutil . GetReplicaCountForReplicaSets ( allRSs )
2016-01-29 00:35:14 +08:00
return extensions . DeploymentStatus {
// TODO: Ensure that if we start retrying status updates, we won't pick up a new Generation value.
ObservedGeneration : deployment . Generation ,
Replicas : deploymentutil . GetActualReplicaCountForReplicaSets ( allRSs ) ,
UpdatedReplicas : deploymentutil . GetActualReplicaCountForReplicaSets ( [ ] * extensions . ReplicaSet { newRS } ) ,
AvailableReplicas : availableReplicas ,
UnavailableReplicas : totalReplicas - availableReplicas ,
} , nil
2016-01-28 06:12:57 +08:00
}
2016-04-27 12:35:14 +08:00
func ( dc * DeploymentController ) scaleReplicaSetAndRecordEvent ( rs * extensions . ReplicaSet , newScale int32 , deployment * extensions . Deployment ) ( bool , * extensions . ReplicaSet , error ) {
2016-02-05 10:05:38 +08:00
// No need to scale
if rs . Spec . Replicas == newScale {
return false , rs , nil
}
2016-02-25 14:40:14 +08:00
var scalingOperation string
2016-01-20 08:40:18 +08:00
if rs . Spec . Replicas < newScale {
2015-10-08 04:13:18 +08:00
scalingOperation = "up"
2016-02-25 14:40:14 +08:00
} else {
scalingOperation = "down"
2015-10-08 04:13:18 +08:00
}
2016-01-29 00:35:14 +08:00
newRS , err := dc . scaleReplicaSet ( rs , newScale , deployment , scalingOperation )
2016-02-05 10:05:38 +08:00
return true , newRS , err
2015-10-08 04:13:18 +08:00
}
2016-01-29 00:35:14 +08:00
func ( dc * DeploymentController ) scaleReplicaSet ( rs * extensions . ReplicaSet , newScale int32 , deployment * extensions . Deployment , scalingOperation string ) ( * extensions . ReplicaSet , error ) {
2016-02-28 10:13:32 +08:00
// NOTE: This mutates the ReplicaSet passed in. Not sure if that's a good idea.
2016-01-20 08:40:18 +08:00
rs . Spec . Replicas = newScale
2016-01-29 00:35:14 +08:00
setReplicasAnnotations ( rs , deployment . Spec . Replicas , deployment . Spec . Replicas + maxSurge ( * deployment ) )
rs , err := dc . client . Extensions ( ) . ReplicaSets ( rs . ObjectMeta . Namespace ) . Update ( rs )
if err == nil {
dc . eventRecorder . Eventf ( deployment , api . EventTypeNormal , "ScalingReplicaSet" , "Scaled %s replica set %s to %d" , scalingOperation , rs . Name , newScale )
} else {
glog . Warningf ( "Cannot update replica set %q: %v" , rs . Name , err )
dc . enqueueDeployment ( deployment )
}
return rs , err
2015-09-18 03:41:06 +08:00
}
2015-09-21 15:06:45 +08:00
func ( dc * DeploymentController ) updateDeployment ( deployment * extensions . Deployment ) ( * extensions . Deployment , error ) {
2016-01-15 13:00:58 +08:00
return dc . client . Extensions ( ) . Deployments ( deployment . ObjectMeta . Namespace ) . Update ( deployment )
2015-09-18 03:41:06 +08:00
}
2016-01-15 10:04:05 +08:00
2016-01-20 08:40:18 +08:00
func ( dc * DeploymentController ) rollbackToTemplate ( deployment * extensions . Deployment , rs * extensions . ReplicaSet ) ( d * extensions . Deployment , performedRollback bool , err error ) {
2016-03-10 05:11:13 +08:00
if ! reflect . DeepEqual ( deploymentutil . GetNewReplicaSetTemplate ( deployment ) , rs . Spec . Template ) {
2016-01-20 08:40:18 +08:00
glog . Infof ( "Rolling back deployment %s to template spec %+v" , deployment . Name , rs . Spec . Template . Spec )
2016-03-10 05:11:13 +08:00
deploymentutil . SetFromReplicaSetTemplate ( deployment , rs . Spec . Template )
2016-03-18 06:39:05 +08:00
// set RS (the old RS we'll rolling back to) annotations back to the deployment;
// otherwise, the deployment's current annotations (should be the same as current new RS) will be copied to the RS after the rollback.
//
// For example,
// A Deployment has old RS1 with annotation {change-cause:create}, and new RS2 {change-cause:edit}.
// Note that both annotations are copied from Deployment, and the Deployment should be annotated {change-cause:edit} as well.
// Now, rollback Deployment to RS1, we should update Deployment's pod-template and also copy annotation from RS1.
// Deployment is now annotated {change-cause:create}, and we have new RS1 {change-cause:create}, old RS2 {change-cause:edit}.
//
// If we don't copy the annotations back from RS to deployment on rollback, the Deployment will stay as {change-cause:edit},
// and new RS1 becomes {change-cause:edit} (copied from deployment after rollback), old RS2 {change-cause:edit}, which is not correct.
setDeploymentAnnotationsTo ( deployment , rs )
2016-01-15 10:04:05 +08:00
performedRollback = true
} else {
glog . V ( 4 ) . Infof ( "Rolling back to a revision that contains the same template as current deployment %s, skipping rollback..." , deployment . Name )
2016-01-20 06:50:03 +08:00
dc . emitRollbackWarningEvent ( deployment , deploymentutil . RollbackTemplateUnchanged , fmt . Sprintf ( "The rollback revision contains the same template as current deployment %q" , deployment . Name ) )
2016-01-15 10:04:05 +08:00
}
d , err = dc . updateDeploymentAndClearRollbackTo ( deployment )
return
}
2016-01-29 00:35:14 +08:00
// isScalingEvent checks whether the provided deployment has been updated with a scaling event
// by looking at the desired-replicas annotation in the active replica sets of the deployment.
func ( dc * DeploymentController ) isScalingEvent ( d * extensions . Deployment ) bool {
newRS , oldRSs , err := dc . getAllReplicaSetsAndSyncRevision ( d , false )
if err != nil {
return false
}
// If there is no new replica set matching this deployment and the deployment isn't paused
// then there is a new rollout that waits to happen
if newRS == nil && ! d . Spec . Paused {
return false
}
allRSs := append ( oldRSs , newRS )
for _ , rs := range controller . FilterActiveReplicaSets ( allRSs ) {
desired , ok := getDesiredReplicasAnnotation ( rs )
if ! ok {
continue
}
if desired != d . Spec . Replicas {
return true
}
}
return false
}