2022-10-26 03:36:57 +08:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								// Copyright (c) 2015-2022 MinIO, Inc.
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// This file is part of MinIO Object Storage stack
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// This program is free software: you can redistribute it and/or modify
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// it under the terms of the GNU Affero General Public License as published by
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// the Free Software Foundation, either version 3 of the License, or
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// (at your option) any later version.
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// This program is distributed in the hope that it will be useful
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// but WITHOUT ANY WARRANTY; without even the implied warranty of
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// GNU Affero General Public License for more details.
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// You should have received a copy of the GNU Affero General Public License
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// along with this program.  If not, see <http://www.gnu.org/licenses/>.
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								package  cmd  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  (  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									"context" 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									"encoding/binary" 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									"errors" 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									"fmt" 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									"math" 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									"math/rand" 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									"net/http" 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									"strconv" 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									"strings" 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									"sync" 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									"time" 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									"github.com/lithammer/shortuuid/v4" 
							 
						 
					
						
							
								
									
										
										
										
											2022-12-07 05:46:50 +08:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
									"github.com/minio/madmin-go/v2" 
							 
						 
					
						
							
								
									
										
										
										
											2022-10-26 03:36:57 +08:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
									"github.com/minio/minio/internal/hash" 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									"github.com/minio/minio/internal/logger" 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									"github.com/minio/pkg/env" 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								)  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//go:generate msgp -file $GOFILE -unexported
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// rebalanceStats contains per-pool rebalance statistics like number of objects,
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// versions and bytes rebalanced out of a pool
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								type  rebalanceStats  struct  {  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									InitFreeSpace  uint64  ` json:"initFreeSpace" msg:"ifs" `  // Pool free space at the start of rebalance
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									InitCapacity   uint64  ` json:"initCapacity" msg:"ic" `    // Pool capacity at the start of rebalance
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									Buckets            [ ] string       ` json:"buckets" msg:"bus" `            // buckets being rebalanced or to be rebalanced
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									RebalancedBuckets  [ ] string       ` json:"rebalancedBuckets" msg:"rbs" `  // buckets rebalanced
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									Bucket             string         ` json:"bucket" msg:"bu" `              // Last rebalanced bucket
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									Object             string         ` json:"object" msg:"ob" `              // Last rebalanced object
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									NumObjects         uint64         ` json:"numObjects" msg:"no" `          // Number of objects rebalanced
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									NumVersions        uint64         ` json:"numVersions" msg:"nv" `         // Number of versions rebalanced
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									Bytes              uint64         ` json:"bytes" msg:"bs" `               // Number of bytes rebalanced
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									Participating      bool           ` json:"participating" msg:"par" ` 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									Info               rebalanceInfo  ` json:"info" msg:"inf" ` 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								}  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2022-12-14 16:15:14 +08:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								func  ( rs  * rebalanceStats )  update ( bucket  string ,  fi  FileInfo )  {  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									if  fi . IsLatest  { 
							 
						 
					
						
							
								
									
										
										
										
											2022-10-26 03:36:57 +08:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
										rs . NumObjects ++ 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									rs . NumVersions ++ 
							 
						 
					
						
							
								
									
										
										
										
											2022-12-14 16:15:14 +08:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
									onDiskSz  :=  fi . Size  *  int64 ( fi . Erasure . DataBlocks + fi . Erasure . ParityBlocks )  /  int64 ( fi . Erasure . DataBlocks ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									rs . Bytes  +=  uint64 ( onDiskSz ) 
							 
						 
					
						
							
								
									
										
										
										
											2022-10-26 03:36:57 +08:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
									rs . Bucket  =  bucket 
							 
						 
					
						
							
								
									
										
										
										
											2022-12-14 16:15:14 +08:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
									rs . Object  =  fi . Name 
							 
						 
					
						
							
								
									
										
										
										
											2022-10-26 03:36:57 +08:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								}  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								type  rstats  [ ] * rebalanceStats  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//go:generate stringer -type=rebalStatus -trimprefix=rebal $GOFILE
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								type  rebalStatus  uint8  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								const  (  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									rebalNone  rebalStatus  =  iota 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									rebalStarted 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									rebalCompleted 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									rebalStopped 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									rebalFailed 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								)  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								type  rebalanceInfo  struct  {  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									StartTime  time . Time    ` msg:"startTs" `  // Time at which rebalance-start was issued
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									EndTime    time . Time    ` msg:"stopTs" `   // Time at which rebalance operation completed or rebalance-stop was called
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									Status     rebalStatus  ` msg:"status" `   // Current state of rebalance operation. One of Started|Stopped|Completed|Failed.
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								}  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// rebalanceMeta contains information pertaining to an ongoing rebalance operation.
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								type  rebalanceMeta  struct  {  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									cancel           context . CancelFunc  ` msg:"-" `  // to be invoked on rebalance-stop
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									lastRefreshedAt  time . Time           ` msg:"-" ` 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									StoppedAt        time . Time           ` msg:"stopTs" `  // Time when rebalance-stop was issued.
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									ID               string              ` msg:"id" `      // ID of the ongoing rebalance operation
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									PercentFreeGoal  float64             ` msg:"pf" `      // Computed from total free space and capacity at the start of rebalance
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									PoolStats        [ ] * rebalanceStats   ` msg:"rss" `     // Per-pool rebalance stats keyed by pool index
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								}  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								var  errRebalanceNotStarted  =  errors . New ( "rebalance not started" )  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								func  ( z  * erasureServerPools )  loadRebalanceMeta ( ctx  context . Context )  error  {  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									r  :=  & rebalanceMeta { } 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									err  :=  r . load ( ctx ,  z . serverPools [ 0 ] ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									if  err  !=  nil  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										if  errors . Is ( err ,  errConfigNotFound )  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											return  nil 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										return  err 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									z . rebalMu . Lock ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									z . rebalMeta  =  r 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									z . rebalMu . Unlock ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									return  nil 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								}  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// initRebalanceMeta initializes rebalance metadata for a new rebalance
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// operation and saves it in the object store.
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								func  ( z  * erasureServerPools )  initRebalanceMeta ( ctx  context . Context ,  buckets  [ ] string )  ( arn  string ,  err  error )  {  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									r  :=  & rebalanceMeta { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										ID :         shortuuid . New ( ) , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										PoolStats :  make ( [ ] * rebalanceStats ,  len ( z . serverPools ) ) , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									// Fetch disk capacity and available space.
 
							 
						 
					
						
							
								
									
										
										
										
											2022-12-02 06:31:35 +08:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
									si  :=  z . StorageInfo ( ctx ) 
							 
						 
					
						
							
								
									
										
										
										
											2022-10-26 03:36:57 +08:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
									diskStats  :=  make ( [ ] struct  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										AvailableSpace  uint64 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										TotalSpace      uint64 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} ,  len ( z . serverPools ) ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									var  totalCap ,  totalFree  uint64 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									for  _ ,  disk  :=  range  si . Disks  { 
							 
						 
					
						
							
								
									
										
										
										
											2023-01-30 21:03:07 +08:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
										// Ignore invalid.
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										if  disk . PoolIndex  <  0  ||  len ( diskStats )  <=  disk . PoolIndex  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											// https://github.com/minio/minio/issues/16500
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											continue 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										} 
							 
						 
					
						
							
								
									
										
										
										
											2022-10-26 03:36:57 +08:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
										totalCap  +=  disk . TotalSpace 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										totalFree  +=  disk . AvailableSpace 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										diskStats [ disk . PoolIndex ] . AvailableSpace  +=  disk . AvailableSpace 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										diskStats [ disk . PoolIndex ] . TotalSpace  +=  disk . TotalSpace 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									r . PercentFreeGoal  =  float64 ( totalFree )  /  float64 ( totalCap ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									now  :=  time . Now ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									for  idx  :=  range  z . serverPools  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										r . PoolStats [ idx ]  =  & rebalanceStats { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											Buckets :            make ( [ ] string ,  len ( buckets ) ) , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											RebalancedBuckets :  make ( [ ] string ,  0 ,  len ( buckets ) ) , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											InitFreeSpace :      diskStats [ idx ] . AvailableSpace , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											InitCapacity :       diskStats [ idx ] . TotalSpace , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										copy ( r . PoolStats [ idx ] . Buckets ,  buckets ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										if  pfi  :=  float64 ( diskStats [ idx ] . AvailableSpace )  /  float64 ( diskStats [ idx ] . TotalSpace ) ;  pfi  <  r . PercentFreeGoal  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											r . PoolStats [ idx ] . Participating  =  true 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											r . PoolStats [ idx ] . Info  =  rebalanceInfo { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												StartTime :  now , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												Status :     rebalStarted , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									err  =  r . save ( ctx ,  z . serverPools [ 0 ] ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									if  err  !=  nil  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										return  arn ,  err 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									z . rebalMeta  =  r 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									return  r . ID ,  nil 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								}  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2022-12-14 16:15:14 +08:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								func  ( z  * erasureServerPools )  updatePoolStats ( poolIdx  int ,  bucket  string ,  fi  FileInfo )  {  
						 
					
						
							
								
									
										
										
										
											2022-10-26 03:36:57 +08:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
									z . rebalMu . Lock ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									defer  z . rebalMu . Unlock ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									r  :=  z . rebalMeta 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									if  r  ==  nil  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										return 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2022-12-14 16:15:14 +08:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
									r . PoolStats [ poolIdx ] . update ( bucket ,  fi ) 
							 
						 
					
						
							
								
									
										
										
										
											2022-10-26 03:36:57 +08:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								}  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								const  (  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									rebalMetaName  =  "rebalance.bin" 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									rebalMetaFmt   =  1 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									rebalMetaVer   =  1 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								)  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								func  ( z  * erasureServerPools )  nextRebalBucket ( poolIdx  int )  ( string ,  bool )  {  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									z . rebalMu . RLock ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									defer  z . rebalMu . RUnlock ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									r  :=  z . rebalMeta 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									if  r  ==  nil  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										return  "" ,  false 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									ps  :=  r . PoolStats [ poolIdx ] 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									if  ps  ==  nil  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										return  "" ,  false 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									if  ps . Info . Status  ==  rebalCompleted  ||  ! ps . Participating  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										return  "" ,  false 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									if  len ( ps . Buckets )  ==  0  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										return  "" ,  false 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									return  ps . Buckets [ 0 ] ,  true 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								}  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								func  ( z  * erasureServerPools )  bucketRebalanceDone ( bucket  string ,  poolIdx  int )  {  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									z . rebalMu . Lock ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									defer  z . rebalMu . Unlock ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									ps  :=  z . rebalMeta . PoolStats [ poolIdx ] 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									if  ps  ==  nil  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										return 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									for  i ,  b  :=  range  ps . Buckets  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										if  b  ==  bucket  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											ps . Buckets  =  append ( ps . Buckets [ : i ] ,  ps . Buckets [ i + 1 : ] ... ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											ps . RebalancedBuckets  =  append ( ps . RebalancedBuckets ,  bucket ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											break 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								}  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								func  ( r  * rebalanceMeta )  load ( ctx  context . Context ,  store  objectIO )  error  {  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									return  r . loadWithOpts ( ctx ,  store ,  ObjectOptions { } ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								}  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								func  ( r  * rebalanceMeta )  loadWithOpts ( ctx  context . Context ,  store  objectIO ,  opts  ObjectOptions )  error  {  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									data ,  _ ,  err  :=  readConfigWithMetadata ( ctx ,  store ,  rebalMetaName ,  opts ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									if  err  !=  nil  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										return  err 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									if  len ( data )  ==  0  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										return  nil 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									if  len ( data )  <=  4  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										return  fmt . Errorf ( "rebalanceMeta: no data" ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									// Read header
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									switch  binary . LittleEndian . Uint16 ( data [ 0 : 2 ] )  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									case  rebalMetaFmt : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									default : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										return  fmt . Errorf ( "rebalanceMeta: unknown format: %d" ,  binary . LittleEndian . Uint16 ( data [ 0 : 2 ] ) ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									switch  binary . LittleEndian . Uint16 ( data [ 2 : 4 ] )  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									case  rebalMetaVer : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									default : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										return  fmt . Errorf ( "rebalanceMeta: unknown version: %d" ,  binary . LittleEndian . Uint16 ( data [ 2 : 4 ] ) ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									// OK, parse data.
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									if  _ ,  err  =  r . UnmarshalMsg ( data [ 4 : ] ) ;  err  !=  nil  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										return  err 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									r . lastRefreshedAt  =  time . Now ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									return  nil 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								}  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								func  ( r  * rebalanceMeta )  saveWithOpts ( ctx  context . Context ,  store  objectIO ,  opts  ObjectOptions )  error  {  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									data  :=  make ( [ ] byte ,  4 ,  r . Msgsize ( ) + 4 ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									// Initialize the header.
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									binary . LittleEndian . PutUint16 ( data [ 0 : 2 ] ,  rebalMetaFmt ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									binary . LittleEndian . PutUint16 ( data [ 2 : 4 ] ,  rebalMetaVer ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									buf ,  err  :=  r . MarshalMsg ( data ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									if  err  !=  nil  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										return  err 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									return  saveConfigWithOpts ( ctx ,  store ,  rebalMetaName ,  buf ,  opts ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								}  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								func  ( r  * rebalanceMeta )  save ( ctx  context . Context ,  store  objectIO )  error  {  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									return  r . saveWithOpts ( ctx ,  store ,  ObjectOptions { } ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								}  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								func  ( z  * erasureServerPools )  IsRebalanceStarted ( )  bool  {  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									z . rebalMu . RLock ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									defer  z . rebalMu . RUnlock ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									if  r  :=  z . rebalMeta ;  r  !=  nil  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										if  r . StoppedAt . IsZero ( )  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											return  true 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									return  false 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								}  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								func  ( z  * erasureServerPools )  IsPoolRebalancing ( poolIndex  int )  bool  {  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									z . rebalMu . RLock ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									defer  z . rebalMu . RUnlock ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									if  r  :=  z . rebalMeta ;  r  !=  nil  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										if  ! r . StoppedAt . IsZero ( )  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											return  false 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										ps  :=  z . rebalMeta . PoolStats [ poolIndex ] 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										return  ps . Participating  &&  ps . Info . Status  ==  rebalStarted 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									return  false 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								}  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								func  ( z  * erasureServerPools )  rebalanceBuckets ( ctx  context . Context ,  poolIdx  int )  ( err  error )  {  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									doneCh  :=  make ( chan  struct { } ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									defer  close ( doneCh ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									// Save rebalance.bin periodically.
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									go  func ( )  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										// Update rebalance.bin periodically once every 5-10s, chosen randomly
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										// to avoid multiple pool leaders herding to update around the same
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										// time.
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										r  :=  rand . New ( rand . NewSource ( time . Now ( ) . UnixNano ( ) ) ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										randSleepFor  :=  func ( )  time . Duration  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											return  5 * time . Second  +  time . Duration ( float64 ( 5 * time . Second ) * r . Float64 ( ) ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										timer  :=  time . NewTimer ( randSleepFor ( ) ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										defer  timer . Stop ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										var  rebalDone  bool 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										var  traceMsg  string 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										for  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											select  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											case  <- doneCh : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												// rebalance completed for poolIdx
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												now  :=  time . Now ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												z . rebalMu . Lock ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												z . rebalMeta . PoolStats [ poolIdx ] . Info . Status  =  rebalCompleted 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												z . rebalMeta . PoolStats [ poolIdx ] . Info . EndTime  =  now 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												z . rebalMu . Unlock ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												rebalDone  =  true 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												traceMsg  =  fmt . Sprintf ( "completed at %s" ,  now ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											case  <- ctx . Done ( ) : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												// rebalance stopped for poolIdx
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												now  :=  time . Now ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												z . rebalMu . Lock ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												z . rebalMeta . PoolStats [ poolIdx ] . Info . Status  =  rebalStopped 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												z . rebalMeta . PoolStats [ poolIdx ] . Info . EndTime  =  now 
							 
						 
					
						
							
								
									
										
										
										
											2023-01-19 09:24:23 +08:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
												z . rebalMeta . cancel  =  nil  // remove the already used context.CancelFunc
 
							 
						 
					
						
							
								
									
										
										
										
											2022-10-26 03:36:57 +08:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
												z . rebalMu . Unlock ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												rebalDone  =  true 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												traceMsg  =  fmt . Sprintf ( "stopped at %s" ,  now ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											case  <- timer . C : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												traceMsg  =  fmt . Sprintf ( "saved at %s" ,  time . Now ( ) ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											stopFn  :=  globalRebalanceMetrics . log ( rebalanceMetricSaveMetadata ,  poolIdx ,  traceMsg ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											err  :=  z . saveRebalanceStats ( ctx ,  poolIdx ,  rebalSaveStats ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											stopFn ( err ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											logger . LogIf ( ctx ,  err ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											timer . Reset ( randSleepFor ( ) ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											if  rebalDone  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												return 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									for  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										select  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										case  <- ctx . Done ( ) : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											return 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										default : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										bucket ,  ok  :=  z . nextRebalBucket ( poolIdx ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										if  ! ok  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											// no more buckets to rebalance or target free_space/capacity reached
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											break 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										stopFn  :=  globalRebalanceMetrics . log ( rebalanceMetricRebalanceBucket ,  poolIdx ,  bucket ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										err  =  z . rebalanceBucket ( ctx ,  bucket ,  poolIdx ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										if  err  !=  nil  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											stopFn ( err ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											logger . LogIf ( ctx ,  err ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											return 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										stopFn ( nil ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										z . bucketRebalanceDone ( bucket ,  poolIdx ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									return  err 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								}  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								func  ( z  * erasureServerPools )  checkIfRebalanceDone ( poolIdx  int )  bool  {  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									z . rebalMu . Lock ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									defer  z . rebalMu . Unlock ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									// check if enough objects have been rebalanced
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									r  :=  z . rebalMeta 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									poolStats  :=  r . PoolStats [ poolIdx ] 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									if  poolStats . Info . Status  ==  rebalCompleted  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										return  true 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									pfi  :=  float64 ( poolStats . InitFreeSpace + poolStats . Bytes )  /  float64 ( poolStats . InitCapacity ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									// Mark pool rebalance as done if within 5% from PercentFreeGoal.
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									if  diff  :=  math . Abs ( pfi  -  r . PercentFreeGoal ) ;  diff  <=  0.05  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										r . PoolStats [ poolIdx ] . Info . Status  =  rebalCompleted 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										r . PoolStats [ poolIdx ] . Info . EndTime  =  time . Now ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										return  true 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									return  false 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								}  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// rebalanceBucket rebalances objects under bucket in poolIdx pool
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								func  ( z  * erasureServerPools )  rebalanceBucket ( ctx  context . Context ,  bucket  string ,  poolIdx  int )  error  {  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									ctx  =  logger . SetReqInfo ( ctx ,  & logger . ReqInfo { } ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									vc ,  _  :=  globalBucketVersioningSys . Get ( bucket ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									// Check if the current bucket has a configured lifecycle policy
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									lc ,  _  :=  globalLifecycleSys . Get ( bucket ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									// Check if bucket is object locked.
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									lr ,  _  :=  globalBucketObjectLockSys . Get ( bucket ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									pool  :=  z . serverPools [ poolIdx ] 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									const  envRebalanceWorkers  =  "_MINIO_REBALANCE_WORKERS" 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									wStr  :=  env . Get ( envRebalanceWorkers ,  strconv . Itoa ( len ( pool . sets ) ) ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									workerSize ,  err  :=  strconv . Atoi ( wStr ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									if  err  !=  nil  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										logger . LogIf ( ctx ,  fmt . Errorf ( "invalid %s value: %s err: %v, defaulting to %d" ,  envRebalanceWorkers ,  wStr ,  err ,  len ( pool . sets ) ) ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										workerSize  =  len ( pool . sets ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									workers  :=  make ( chan  struct { } ,  workerSize ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									var  wg  sync . WaitGroup 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									for  _ ,  set  :=  range  pool . sets  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										set  :=  set 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										disks  :=  set . getOnlineDisks ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										if  len ( disks )  ==  0  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											logger . LogIf ( ctx ,  fmt . Errorf ( "no online disks found for set with endpoints %s" , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												set . getEndpoints ( ) ) ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											continue 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										filterLifecycle  :=  func ( bucket ,  object  string ,  fi  FileInfo )  bool  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											if  lc  ==  nil  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												return  false 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											versioned  :=  vc  !=  nil  &&  vc . Versioned ( object ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											objInfo  :=  fi . ToObjectInfo ( bucket ,  object ,  versioned ) 
							 
						 
					
						
							
								
									
										
										
										
											2023-03-10 07:15:30 +08:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											evt  :=  evalActionFromLifecycle ( ctx ,  * lc ,  lr ,  objInfo ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											if  evt . Action . Delete ( )  { 
							 
						 
					
						
							
								
									
										
										
										
											2023-04-27 08:49:00 +08:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
												globalExpiryState . enqueueByDays ( objInfo ,  evt ) 
							 
						 
					
						
							
								
									
										
										
										
											2022-10-26 03:36:57 +08:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
												return  true 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											} 
							 
						 
					
						
							
								
									
										
										
										
											2023-03-10 07:15:30 +08:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2022-10-26 03:36:57 +08:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
											return  false 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										rebalanceEntry  :=  func ( entry  metaCacheEntry )  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											defer  func ( )  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												<- workers 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												wg . Done ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											} ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											if  entry . isDir ( )  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												return 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											// rebalance on poolIdx has reached its goal
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											if  z . checkIfRebalanceDone ( poolIdx )  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												return 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											fivs ,  err  :=  entry . fileInfoVersions ( bucket ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											if  err  !=  nil  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												return 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											// We need a reversed order for rebalance,
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											// to create the appropriate stack.
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											versionsSorter ( fivs . Versions ) . reverse ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											var  rebalanced  int 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											for  _ ,  version  :=  range  fivs . Versions  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												// Skip transitioned objects for now. TBD
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												if  version . IsRemote ( )  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
													continue 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												// Apply lifecycle rules on the objects that are expired.
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												if  filterLifecycle ( bucket ,  version . Name ,  version )  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
													logger . LogIf ( ctx ,  fmt . Errorf ( "found %s/%s (%s) expired object based on ILM rules, skipping and scheduled for deletion" ,  bucket ,  version . Name ,  version . VersionID ) ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
													continue 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												// We will skip rebalancing delete markers
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												// with single version, its as good as there
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												// is no data associated with the object.
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												if  version . Deleted  &&  len ( fivs . Versions )  ==  1  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
													logger . LogIf ( ctx ,  fmt . Errorf ( "found %s/%s delete marked object with no other versions, skipping since there is no data to be rebalanced" ,  bucket ,  version . Name ) ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
													continue 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												if  version . Deleted  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
													_ ,  err  :=  z . DeleteObject ( ctx , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
														bucket , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
														version . Name , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
														ObjectOptions { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
															Versioned :          vc . PrefixEnabled ( version . Name ) , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
															VersionID :          version . VersionID , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
															MTime :              version . ModTime , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
															DeleteReplication :  version . ReplicationState , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
															DeleteMarker :       true ,  // make sure we create a delete marker
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
															SkipRebalancing :    true ,  // make sure we skip the decommissioned pool
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
														} ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
													var  failure  bool 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
													if  err  !=  nil  &&  ! isErrObjectNotFound ( err )  &&  ! isErrVersionNotFound ( err )  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
														logger . LogIf ( ctx ,  err ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
														failure  =  true 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
													} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
													if  ! failure  { 
							 
						 
					
						
							
								
									
										
										
										
											2022-12-14 16:15:14 +08:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
														z . updatePoolStats ( poolIdx ,  bucket ,  version ) 
							 
						 
					
						
							
								
									
										
										
										
											2022-10-26 03:36:57 +08:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
														rebalanced ++ 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
													} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
													continue 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												var  failure  bool 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												for  try  :=  0 ;  try  <  3 ;  try ++  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
													// GetObjectReader.Close is called by rebalanceObject
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
													gr ,  err  :=  set . GetObjectNInfo ( ctx , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
														bucket , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
														encodeDirObject ( version . Name ) , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
														nil , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
														http . Header { } , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
														ObjectOptions { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
															VersionID :     version . VersionID , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
															NoDecryption :  true , 
							 
						 
					
						
							
								
									
										
										
										
											2023-04-18 03:16:37 +08:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
															NoLock :        true , 
							 
						 
					
						
							
								
									
										
										
										
											2022-10-26 03:36:57 +08:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
														} ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
													if  isErrObjectNotFound ( err )  ||  isErrVersionNotFound ( err )  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
														// object deleted by the application, nothing to do here we move on.
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
														return 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
													} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
													if  err  !=  nil  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
														failure  =  true 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
														logger . LogIf ( ctx ,  err ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
														continue 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
													} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
													stopFn  :=  globalRebalanceMetrics . log ( rebalanceMetricRebalanceObject ,  poolIdx ,  bucket ,  version . Name ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
													if  err  =  z . rebalanceObject ( ctx ,  bucket ,  gr ) ;  err  !=  nil  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
														stopFn ( err ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
														failure  =  true 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
														logger . LogIf ( ctx ,  err ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
														continue 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
													} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
													stopFn ( nil ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
													failure  =  false 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
													break 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												if  failure  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
													break  // break out on first error
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												} 
							 
						 
					
						
							
								
									
										
										
										
											2022-12-14 16:15:14 +08:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
												z . updatePoolStats ( poolIdx ,  bucket ,  version ) 
							 
						 
					
						
							
								
									
										
										
										
											2022-10-26 03:36:57 +08:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
												rebalanced ++ 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											// if all versions were rebalanced, we can delete the object versions.
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											if  rebalanced  ==  len ( fivs . Versions )  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												stopFn  :=  globalRebalanceMetrics . log ( rebalanceMetricRebalanceRemoveObject ,  poolIdx ,  bucket ,  entry . name ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												_ ,  err  :=  set . DeleteObject ( ctx , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
													bucket , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
													encodeDirObject ( entry . name ) , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
													ObjectOptions { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
														DeletePrefix :  true ,  // use prefix delete to delete all versions at once.
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
													} , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												stopFn ( err ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												auditLogRebalance ( ctx ,  "Rebalance:DeleteObject" ,  bucket ,  entry . name ,  "" ,  err ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												if  err  !=  nil  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
													logger . LogIf ( ctx ,  err ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										wg . Add ( 1 ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										go  func ( )  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											defer  wg . Done ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											// How to resolve partial results.
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											resolver  :=  metadataResolutionParams { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												dirQuorum :  len ( disks )  /  2 ,  // make sure to capture all quorum ratios
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												objQuorum :  len ( disks )  /  2 ,  // make sure to capture all quorum ratios
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												bucket :     bucket , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											err  :=  listPathRaw ( ctx ,  listPathRawOptions { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												disks :           disks , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												bucket :          bucket , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												recursive :       true , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												forwardTo :       "" , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												minDisks :        len ( disks )  /  2 ,  // to capture all quorum ratios
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												reportNotFound :  false , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												agreed :  func ( entry  metaCacheEntry )  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
													workers  <-  struct { } { } 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
													wg . Add ( 1 ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
													go  rebalanceEntry ( entry ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												} , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												partial :  func ( entries  metaCacheEntries ,  _  [ ] error )  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
													entry ,  ok  :=  entries . resolve ( & resolver ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
													if  ok  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
														workers  <-  struct { } { } 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
														wg . Add ( 1 ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
														go  rebalanceEntry ( * entry ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
													} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												} , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												finished :  nil , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											} ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											logger . LogIf ( ctx ,  err ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										} ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									wg . Wait ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									return  nil 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								}  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								type  rebalSaveOpts  uint8  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								const  (  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									rebalSaveStats  rebalSaveOpts  =  iota 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									rebalSaveStoppedAt 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								)  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								func  ( z  * erasureServerPools )  saveRebalanceStats ( ctx  context . Context ,  poolIdx  int ,  opts  rebalSaveOpts )  error  {  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									lock  :=  z . serverPools [ 0 ] . NewNSLock ( minioMetaBucket ,  rebalMetaName ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									lkCtx ,  err  :=  lock . GetLock ( ctx ,  globalOperationTimeout ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									if  err  !=  nil  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										logger . LogIf ( ctx ,  fmt . Errorf ( "failed to acquire write lock on %s/%s: %w" ,  minioMetaBucket ,  rebalMetaName ,  err ) ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										return  err 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
									
										
										
										
											2022-12-24 11:49:07 +08:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
									defer  lock . Unlock ( lkCtx ) 
							 
						 
					
						
							
								
									
										
										
										
											2022-10-26 03:36:57 +08:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									ctx  =  lkCtx . Context ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									noLockOpts  :=  ObjectOptions { NoLock :  true } 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									r  :=  & rebalanceMeta { } 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									if  err  :=  r . loadWithOpts ( ctx ,  z . serverPools [ 0 ] ,  noLockOpts ) ;  err  !=  nil  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										return  err 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									z . rebalMu . Lock ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									defer  z . rebalMu . Unlock ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									switch  opts  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									case  rebalSaveStoppedAt : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										r . StoppedAt  =  time . Now ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									case  rebalSaveStats : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										r . PoolStats [ poolIdx ]  =  z . rebalMeta . PoolStats [ poolIdx ] 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									z . rebalMeta  =  r 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									err  =  z . rebalMeta . saveWithOpts ( ctx ,  z . serverPools [ 0 ] ,  noLockOpts ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									return  err 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								}  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								func  auditLogRebalance ( ctx  context . Context ,  apiName ,  bucket ,  object ,  versionID  string ,  err  error )  {  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									errStr  :=  "" 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									if  err  !=  nil  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										errStr  =  err . Error ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									auditLogInternal ( ctx ,  AuditLogOptions { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										Event :      "rebalance" , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										APIName :    apiName , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										Bucket :     bucket , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										Object :     object , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										VersionID :  versionID , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										Error :      errStr , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								}  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								func  ( z  * erasureServerPools )  rebalanceObject ( ctx  context . Context ,  bucket  string ,  gr  * GetObjectReader )  ( err  error )  {  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									oi  :=  gr . ObjInfo 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									defer  func ( )  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										gr . Close ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										auditLogRebalance ( ctx ,  "RebalanceCopyData" ,  oi . Bucket ,  oi . Name ,  oi . VersionID ,  err ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									actualSize ,  err  :=  oi . GetActualSize ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									if  err  !=  nil  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										return  err 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									if  oi . isMultipart ( )  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										res ,  err  :=  z . NewMultipartUpload ( ctx ,  bucket ,  oi . Name ,  ObjectOptions { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											VersionID :    oi . VersionID , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											MTime :        oi . ModTime , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											UserDefined :  oi . UserDefined , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										} ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										if  err  !=  nil  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											return  fmt . Errorf ( "rebalanceObject: NewMultipartUpload() %w" ,  err ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										defer  z . AbortMultipartUpload ( ctx ,  bucket ,  oi . Name ,  res . UploadID ,  ObjectOptions { } ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										parts  :=  make ( [ ] CompletePart ,  len ( oi . Parts ) ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										for  i ,  part  :=  range  oi . Parts  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											hr ,  err  :=  hash . NewReader ( gr ,  part . Size ,  "" ,  "" ,  part . ActualSize ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											if  err  !=  nil  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												return  fmt . Errorf ( "rebalanceObject: hash.NewReader() %w" ,  err ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											pi ,  err  :=  z . PutObjectPart ( ctx ,  bucket ,  oi . Name ,  res . UploadID , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												part . Number , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												NewPutObjReader ( hr ) , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												ObjectOptions { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
													PreserveETag :  part . ETag ,  // Preserve original ETag to ensure same metadata.
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
													IndexCB :  func ( )  [ ] byte  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
														return  part . Index  // Preserve part Index to ensure decompression works.
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
													} , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												} ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											if  err  !=  nil  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												return  fmt . Errorf ( "rebalanceObject: PutObjectPart() %w" ,  err ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											parts [ i ]  =  CompletePart { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												ETag :        pi . ETag , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												PartNumber :  pi . PartNumber , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										_ ,  err  =  z . CompleteMultipartUpload ( ctx ,  bucket ,  oi . Name ,  res . UploadID ,  parts ,  ObjectOptions { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											MTime :  oi . ModTime , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										} ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										if  err  !=  nil  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											err  =  fmt . Errorf ( "rebalanceObject: CompleteMultipartUpload() %w" ,  err ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										return  err 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									hr ,  err  :=  hash . NewReader ( gr ,  oi . Size ,  "" ,  "" ,  actualSize ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									if  err  !=  nil  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										return  fmt . Errorf ( "rebalanceObject: hash.NewReader() %w" ,  err ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									_ ,  err  =  z . PutObject ( ctx , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										bucket , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										oi . Name , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										NewPutObjReader ( hr ) , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										ObjectOptions { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											VersionID :     oi . VersionID , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											MTime :         oi . ModTime , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											UserDefined :   oi . UserDefined , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											PreserveETag :  oi . ETag ,  // Preserve original ETag to ensure same metadata.
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											IndexCB :  func ( )  [ ] byte  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
												return  oi . Parts [ 0 ] . Index  // Preserve part Index to ensure decompression works.
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											} , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										} ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									if  err  !=  nil  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										err  =  fmt . Errorf ( "rebalanceObject: PutObject() %w" ,  err ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									return  err 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								}  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								func  ( z  * erasureServerPools )  StartRebalance ( )  {  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									z . rebalMu . Lock ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									if  z . rebalMeta  ==  nil  ||  ! z . rebalMeta . StoppedAt . IsZero ( )  {  // rebalance not running, nothing to do
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										z . rebalMu . Unlock ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										return 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									ctx ,  cancel  :=  context . WithCancel ( GlobalContext ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									z . rebalMeta . cancel  =  cancel  // to be used when rebalance-stop is called
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									z . rebalMu . Unlock ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									z . rebalMu . RLock ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									participants  :=  make ( [ ] bool ,  len ( z . rebalMeta . PoolStats ) ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									for  i ,  ps  :=  range  z . rebalMeta . PoolStats  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										// skip pools which have completed rebalancing
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										if  ps . Info . Status  !=  rebalStarted  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											continue 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										participants [ i ]  =  ps . Participating 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									z . rebalMu . RUnlock ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									for  poolIdx ,  doRebalance  :=  range  participants  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										if  ! doRebalance  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											continue 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										// nothing to do if this node is not pool's first node (i.e pool's rebalance 'leader').
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										if  ! globalEndpoints [ poolIdx ] . Endpoints [ 0 ] . IsLocal  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											continue 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										go  func ( idx  int )  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											stopfn  :=  globalRebalanceMetrics . log ( rebalanceMetricRebalanceBuckets ,  idx ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											err  :=  z . rebalanceBuckets ( ctx ,  idx ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											stopfn ( err ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										} ( poolIdx ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								}  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// StopRebalance signals the rebalance goroutine running on this node (if any)
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// to stop, using the context.CancelFunc(s) saved at the time ofStartRebalance.
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								func  ( z  * erasureServerPools )  StopRebalance ( )  error  {  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									z . rebalMu . Lock ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									defer  z . rebalMu . Unlock ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									r  :=  z . rebalMeta 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									if  r  ==  nil  {  // rebalance not running in this node, nothing to do
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										return  nil 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									if  cancel  :=  r . cancel ;  cancel  !=  nil  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										// cancel != nil only on pool leaders
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										r . cancel  =  nil 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										cancel ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									return  nil 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								}  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								// for rebalance trace support
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								type  rebalanceMetrics  struct { }  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								var  globalRebalanceMetrics  rebalanceMetrics  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								//go:generate stringer -type=rebalanceMetric -trimprefix=rebalanceMetric $GOFILE
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								type  rebalanceMetric  uint8  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								const  (  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									rebalanceMetricRebalanceBuckets  rebalanceMetric  =  iota 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									rebalanceMetricRebalanceBucket 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									rebalanceMetricRebalanceObject 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									rebalanceMetricRebalanceRemoveObject 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									rebalanceMetricSaveMetadata 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								)  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								func  rebalanceTrace ( r  rebalanceMetric ,  poolIdx  int ,  startTime  time . Time ,  duration  time . Duration ,  err  error ,  path  string )  madmin . TraceInfo  {  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									var  errStr  string 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									if  err  !=  nil  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										errStr  =  err . Error ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									return  madmin . TraceInfo { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										TraceType :  madmin . TraceRebalance , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										Time :       startTime , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										NodeName :   globalLocalNodeName , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										FuncName :   fmt . Sprintf ( "rebalance.%s (pool-id=%d)" ,  r . String ( ) ,  poolIdx ) , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										Duration :   duration , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										Path :       path , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										Error :      errStr , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								}  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								func  ( p  * rebalanceMetrics )  log ( r  rebalanceMetric ,  poolIdx  int ,  paths  ... string )  func ( err  error )  {  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									startTime  :=  time . Now ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									return  func ( err  error )  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										duration  :=  time . Since ( startTime ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										if  globalTrace . NumSubscribers ( madmin . TraceRebalance )  >  0  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
											globalTrace . Publish ( rebalanceTrace ( r ,  poolIdx ,  startTime ,  duration ,  err ,  strings . Join ( paths ,  " " ) ) ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
										} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								}