| 
									
										
										
										
											2021-04-19 03:41:13 +08:00
										 |  |  | // Copyright (c) 2015-2021 MinIO, Inc.
 | 
					
						
							|  |  |  | //
 | 
					
						
							|  |  |  | // This file is part of MinIO Object Storage stack
 | 
					
						
							|  |  |  | //
 | 
					
						
							|  |  |  | // This program is free software: you can redistribute it and/or modify
 | 
					
						
							|  |  |  | // it under the terms of the GNU Affero General Public License as published by
 | 
					
						
							|  |  |  | // the Free Software Foundation, either version 3 of the License, or
 | 
					
						
							|  |  |  | // (at your option) any later version.
 | 
					
						
							|  |  |  | //
 | 
					
						
							|  |  |  | // This program is distributed in the hope that it will be useful
 | 
					
						
							|  |  |  | // but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
					
						
							|  |  |  | // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
					
						
							|  |  |  | // GNU Affero General Public License for more details.
 | 
					
						
							|  |  |  | //
 | 
					
						
							|  |  |  | // You should have received a copy of the GNU Affero General Public License
 | 
					
						
							|  |  |  | // along with this program.  If not, see <http://www.gnu.org/licenses/>.
 | 
					
						
							| 
									
										
										
										
											2021-04-04 00:03:42 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | package cmd | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import ( | 
					
						
							|  |  |  | 	"context" | 
					
						
							|  |  |  | 	"sync" | 
					
						
							| 
									
										
										
										
											2023-08-30 16:00:59 +08:00
										 |  |  | 	"sync/atomic" | 
					
						
							| 
									
										
										
										
											2021-10-22 09:52:55 +08:00
										 |  |  | 	"time" | 
					
						
							| 
									
										
										
										
											2021-04-04 00:03:42 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-06-02 05:59:40 +08:00
										 |  |  | 	"github.com/minio/minio/internal/bucket/replication" | 
					
						
							| 
									
										
										
										
											2023-08-30 16:00:59 +08:00
										 |  |  | 	"github.com/rcrowley/go-metrics" | 
					
						
							| 
									
										
										
										
											2021-04-04 00:03:42 +08:00
										 |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (b *BucketReplicationStats) hasReplicationUsage() bool { | 
					
						
							| 
									
										
										
										
											2021-09-19 04:31:35 +08:00
										 |  |  | 	for _, s := range b.Stats { | 
					
						
							|  |  |  | 		if s.hasReplicationUsage() { | 
					
						
							|  |  |  | 			return true | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return false | 
					
						
							| 
									
										
										
										
											2021-04-04 00:03:42 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // ReplicationStats holds the global in-memory replication stats
 | 
					
						
							|  |  |  | type ReplicationStats struct { | 
					
						
							| 
									
										
										
										
											2023-08-30 16:00:59 +08:00
										 |  |  | 	// map of site deployment ID to site replication status
 | 
					
						
							|  |  |  | 	// for site replication - maintain stats at global level
 | 
					
						
							|  |  |  | 	srStats *SRStats | 
					
						
							|  |  |  | 	// active worker stats
 | 
					
						
							|  |  |  | 	workers *ActiveWorkerStat | 
					
						
							|  |  |  | 	// queue stats cache
 | 
					
						
							|  |  |  | 	qCache queueCache | 
					
						
							| 
									
										
										
										
											2024-02-06 14:00:45 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	pCache proxyStatsCache | 
					
						
							| 
									
										
										
										
											2023-08-30 16:00:59 +08:00
										 |  |  | 	// mrf backlog stats
 | 
					
						
							|  |  |  | 	mrfStats ReplicationMRFStats | 
					
						
							|  |  |  | 	// for bucket replication, continue to use existing cache
 | 
					
						
							| 
									
										
										
										
											2022-11-05 00:59:14 +08:00
										 |  |  | 	Cache             map[string]*BucketReplicationStats | 
					
						
							|  |  |  | 	mostRecentStats   BucketStatsMap | 
					
						
							| 
									
										
										
										
											2023-08-30 16:00:59 +08:00
										 |  |  | 	registry          metrics.Registry | 
					
						
							|  |  |  | 	sync.RWMutex                 // mutex for Cache
 | 
					
						
							|  |  |  | 	mostRecentStatsMu sync.Mutex // mutex for mostRecentStats
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	wlock sync.RWMutex // mutex for active workers
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	movingAvgTicker *time.Ticker // Ticker for calculating moving averages
 | 
					
						
							|  |  |  | 	wTimer          *time.Ticker // ticker for calculating active workers
 | 
					
						
							|  |  |  | 	qTimer          *time.Ticker // ticker for calculating queue stats
 | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (r *ReplicationStats) trackEWMA() { | 
					
						
							|  |  |  | 	for { | 
					
						
							|  |  |  | 		select { | 
					
						
							|  |  |  | 		case <-r.movingAvgTicker.C: | 
					
						
							|  |  |  | 			r.updateMovingAvg() | 
					
						
							|  |  |  | 		case <-GlobalContext.Done(): | 
					
						
							|  |  |  | 			return | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (r *ReplicationStats) updateMovingAvg() { | 
					
						
							|  |  |  | 	r.RLock() | 
					
						
							|  |  |  | 	for _, s := range r.Cache { | 
					
						
							|  |  |  | 		for _, st := range s.Stats { | 
					
						
							|  |  |  | 			st.XferRateLrg.measure.updateExponentialMovingAverage(time.Now()) | 
					
						
							|  |  |  | 			st.XferRateSml.measure.updateExponentialMovingAverage(time.Now()) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	r.RUnlock() | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // ActiveWorkers returns worker stats
 | 
					
						
							|  |  |  | func (r *ReplicationStats) ActiveWorkers() ActiveWorkerStat { | 
					
						
							| 
									
										
										
										
											2024-08-15 20:04:40 +08:00
										 |  |  | 	if r == nil { | 
					
						
							|  |  |  | 		return ActiveWorkerStat{} | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2023-08-30 16:00:59 +08:00
										 |  |  | 	r.wlock.RLock() | 
					
						
							|  |  |  | 	defer r.wlock.RUnlock() | 
					
						
							|  |  |  | 	w := r.workers.get() | 
					
						
							|  |  |  | 	return ActiveWorkerStat{ | 
					
						
							|  |  |  | 		Curr: w.Curr, | 
					
						
							|  |  |  | 		Max:  w.Max, | 
					
						
							|  |  |  | 		Avg:  w.Avg, | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (r *ReplicationStats) collectWorkerMetrics(ctx context.Context) { | 
					
						
							|  |  |  | 	if r == nil { | 
					
						
							|  |  |  | 		return | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	for { | 
					
						
							|  |  |  | 		select { | 
					
						
							|  |  |  | 		case <-ctx.Done(): | 
					
						
							|  |  |  | 			return | 
					
						
							|  |  |  | 		case <-r.wTimer.C: | 
					
						
							|  |  |  | 			r.wlock.Lock() | 
					
						
							|  |  |  | 			r.workers.update() | 
					
						
							|  |  |  | 			r.wlock.Unlock() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (r *ReplicationStats) collectQueueMetrics(ctx context.Context) { | 
					
						
							|  |  |  | 	if r == nil { | 
					
						
							|  |  |  | 		return | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	for { | 
					
						
							|  |  |  | 		select { | 
					
						
							|  |  |  | 		case <-ctx.Done(): | 
					
						
							|  |  |  | 			return | 
					
						
							|  |  |  | 		case <-r.qTimer.C: | 
					
						
							|  |  |  | 			r.qCache.update() | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2021-04-04 00:03:42 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Delete deletes in-memory replication statistics for a bucket.
 | 
					
						
							| 
									
										
										
										
											2021-04-05 06:34:33 +08:00
										 |  |  | func (r *ReplicationStats) Delete(bucket string) { | 
					
						
							| 
									
										
										
										
											2021-04-04 00:03:42 +08:00
										 |  |  | 	if r == nil { | 
					
						
							|  |  |  | 		return | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	r.Lock() | 
					
						
							|  |  |  | 	defer r.Unlock() | 
					
						
							|  |  |  | 	delete(r.Cache, bucket) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-09-19 04:31:35 +08:00
										 |  |  | // UpdateReplicaStat updates in-memory replica statistics with new values.
 | 
					
						
							|  |  |  | func (r *ReplicationStats) UpdateReplicaStat(bucket string, n int64) { | 
					
						
							| 
									
										
										
										
											2021-04-04 00:03:42 +08:00
										 |  |  | 	if r == nil { | 
					
						
							|  |  |  | 		return | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-09-19 04:31:35 +08:00
										 |  |  | 	r.Lock() | 
					
						
							|  |  |  | 	defer r.Unlock() | 
					
						
							|  |  |  | 	bs, ok := r.Cache[bucket] | 
					
						
							|  |  |  | 	if !ok { | 
					
						
							| 
									
										
										
										
											2023-08-30 16:00:59 +08:00
										 |  |  | 		bs = newBucketReplicationStats() | 
					
						
							| 
									
										
										
										
											2021-09-19 04:31:35 +08:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2021-11-18 04:10:57 +08:00
										 |  |  | 	bs.ReplicaSize += n | 
					
						
							| 
									
										
										
										
											2023-08-30 16:00:59 +08:00
										 |  |  | 	bs.ReplicaCount++ | 
					
						
							| 
									
										
										
										
											2021-09-19 04:31:35 +08:00
										 |  |  | 	r.Cache[bucket] = bs | 
					
						
							| 
									
										
										
										
											2023-08-30 16:00:59 +08:00
										 |  |  | 	r.srUpdateReplicaStat(n) | 
					
						
							| 
									
										
										
										
											2021-09-19 04:31:35 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-08-30 16:00:59 +08:00
										 |  |  | func (r *ReplicationStats) srUpdateReplicaStat(sz int64) { | 
					
						
							| 
									
										
										
										
											2021-09-19 04:31:35 +08:00
										 |  |  | 	if r == nil { | 
					
						
							|  |  |  | 		return | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2023-08-30 16:00:59 +08:00
										 |  |  | 	atomic.AddInt64(&r.srStats.ReplicaSize, sz) | 
					
						
							|  |  |  | 	atomic.AddInt64(&r.srStats.ReplicaCount, 1) | 
					
						
							|  |  |  | } | 
					
						
							| 
									
										
										
										
											2021-11-18 04:10:57 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-08-30 16:00:59 +08:00
										 |  |  | func (r *ReplicationStats) srUpdate(sr replStat) { | 
					
						
							|  |  |  | 	dID, err := globalSiteReplicationSys.getDeplIDForEndpoint(sr.endpoint()) | 
					
						
							|  |  |  | 	if err == nil { | 
					
						
							|  |  |  | 		r.srStats.update(sr, dID) | 
					
						
							| 
									
										
										
										
											2021-09-19 04:31:35 +08:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2023-08-30 16:00:59 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Update updates in-memory replication statistics with new values.
 | 
					
						
							|  |  |  | func (r *ReplicationStats) Update(bucket string, ri replicatedTargetInfo, status, prevStatus replication.StatusType) { | 
					
						
							|  |  |  | 	if r == nil { | 
					
						
							|  |  |  | 		return | 
					
						
							| 
									
										
										
										
											2021-04-04 00:03:42 +08:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2023-08-30 16:00:59 +08:00
										 |  |  | 	var rs replStat | 
					
						
							| 
									
										
										
										
											2021-04-04 00:03:42 +08:00
										 |  |  | 	switch status { | 
					
						
							| 
									
										
										
										
											2022-09-13 03:40:02 +08:00
										 |  |  | 	case replication.Pending: | 
					
						
							| 
									
										
										
										
											2023-08-30 16:00:59 +08:00
										 |  |  | 		if ri.OpType.IsDataReplication() && prevStatus != status { | 
					
						
							|  |  |  | 			rs.set(ri.Arn, ri.Size, 0, status, ri.OpType, ri.endpoint, ri.secure, ri.Err) | 
					
						
							| 
									
										
										
										
											2022-09-13 03:40:02 +08:00
										 |  |  | 		} | 
					
						
							| 
									
										
										
										
											2021-04-04 00:03:42 +08:00
										 |  |  | 	case replication.Completed: | 
					
						
							| 
									
										
										
										
											2023-08-30 16:00:59 +08:00
										 |  |  | 		if ri.OpType.IsDataReplication() { | 
					
						
							|  |  |  | 			rs.set(ri.Arn, ri.Size, ri.Duration, status, ri.OpType, ri.endpoint, ri.secure, ri.Err) | 
					
						
							| 
									
										
										
										
											2021-04-04 00:03:42 +08:00
										 |  |  | 		} | 
					
						
							|  |  |  | 	case replication.Failed: | 
					
						
							| 
									
										
										
										
											2023-08-30 16:00:59 +08:00
										 |  |  | 		if ri.OpType.IsDataReplication() && prevStatus == replication.Pending { | 
					
						
							|  |  |  | 			rs.set(ri.Arn, ri.Size, ri.Duration, status, ri.OpType, ri.endpoint, ri.secure, ri.Err) | 
					
						
							| 
									
										
										
										
											2021-04-04 00:03:42 +08:00
										 |  |  | 		} | 
					
						
							|  |  |  | 	case replication.Replica: | 
					
						
							| 
									
										
										
										
											2023-08-30 16:00:59 +08:00
										 |  |  | 		if ri.OpType == replication.ObjectReplicationType { | 
					
						
							|  |  |  | 			rs.set(ri.Arn, ri.Size, 0, status, ri.OpType, "", false, ri.Err) | 
					
						
							| 
									
										
										
										
											2021-04-04 00:03:42 +08:00
										 |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2021-04-06 23:36:54 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-08-30 16:00:59 +08:00
										 |  |  | 	// update site-replication in-memory stats
 | 
					
						
							|  |  |  | 	if rs.Completed || rs.Failed { | 
					
						
							|  |  |  | 		r.srUpdate(rs) | 
					
						
							| 
									
										
										
										
											2021-04-06 23:36:54 +08:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2023-08-30 16:00:59 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	r.Lock() | 
					
						
							|  |  |  | 	defer r.Unlock() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// update bucket replication in-memory stats
 | 
					
						
							|  |  |  | 	bs, ok := r.Cache[bucket] | 
					
						
							|  |  |  | 	if !ok { | 
					
						
							|  |  |  | 		bs = newBucketReplicationStats() | 
					
						
							|  |  |  | 		r.Cache[bucket] = bs | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	b, ok := bs.Stats[ri.Arn] | 
					
						
							| 
									
										
										
										
											2021-12-18 07:33:13 +08:00
										 |  |  | 	if !ok { | 
					
						
							| 
									
										
										
										
											2023-08-30 16:00:59 +08:00
										 |  |  | 		b = &BucketReplicationStat{ | 
					
						
							|  |  |  | 			XferRateLrg: newXferStats(), | 
					
						
							|  |  |  | 			XferRateSml: newXferStats(), | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		bs.Stats[ri.Arn] = b | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	switch { | 
					
						
							|  |  |  | 	case rs.Completed: | 
					
						
							|  |  |  | 		b.ReplicatedSize += rs.TransferSize | 
					
						
							|  |  |  | 		b.ReplicatedCount++ | 
					
						
							|  |  |  | 		if rs.TransferDuration > 0 { | 
					
						
							|  |  |  | 			b.Latency.update(rs.TransferSize, rs.TransferDuration) | 
					
						
							|  |  |  | 			b.updateXferRate(rs.TransferSize, rs.TransferDuration) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	case rs.Failed: | 
					
						
							|  |  |  | 		b.FailStats.addsize(rs.TransferSize, rs.Err) | 
					
						
							|  |  |  | 	case rs.Pending: | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | type replStat struct { | 
					
						
							|  |  |  | 	Arn       string | 
					
						
							|  |  |  | 	Completed bool | 
					
						
							|  |  |  | 	Pending   bool | 
					
						
							|  |  |  | 	Failed    bool | 
					
						
							|  |  |  | 	opType    replication.Type | 
					
						
							|  |  |  | 	// transfer size
 | 
					
						
							|  |  |  | 	TransferSize int64 | 
					
						
							|  |  |  | 	// transfer duration
 | 
					
						
							|  |  |  | 	TransferDuration time.Duration | 
					
						
							|  |  |  | 	Endpoint         string | 
					
						
							|  |  |  | 	Secure           bool | 
					
						
							|  |  |  | 	Err              error | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (rs *replStat) endpoint() string { | 
					
						
							|  |  |  | 	scheme := "http" | 
					
						
							|  |  |  | 	if rs.Secure { | 
					
						
							|  |  |  | 		scheme = "https" | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return scheme + "://" + rs.Endpoint | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (rs *replStat) set(arn string, n int64, duration time.Duration, status replication.StatusType, opType replication.Type, endpoint string, secure bool, err error) { | 
					
						
							|  |  |  | 	rs.Endpoint = endpoint | 
					
						
							|  |  |  | 	rs.Secure = secure | 
					
						
							|  |  |  | 	rs.TransferSize = n | 
					
						
							|  |  |  | 	rs.Arn = arn | 
					
						
							|  |  |  | 	rs.TransferDuration = duration | 
					
						
							|  |  |  | 	rs.opType = opType | 
					
						
							|  |  |  | 	switch status { | 
					
						
							|  |  |  | 	case replication.Completed: | 
					
						
							|  |  |  | 		rs.Completed = true | 
					
						
							|  |  |  | 	case replication.Pending: | 
					
						
							|  |  |  | 		rs.Pending = true | 
					
						
							|  |  |  | 	case replication.Failed: | 
					
						
							|  |  |  | 		rs.Failed = true | 
					
						
							|  |  |  | 		rs.Err = err | 
					
						
							| 
									
										
										
										
											2021-09-19 04:31:35 +08:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2021-04-04 00:03:42 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-05-24 00:15:30 +08:00
										 |  |  | // GetAll returns replication metrics for all buckets at once.
 | 
					
						
							|  |  |  | func (r *ReplicationStats) GetAll() map[string]BucketReplicationStats { | 
					
						
							|  |  |  | 	if r == nil { | 
					
						
							|  |  |  | 		return map[string]BucketReplicationStats{} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	r.RLock() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	bucketReplicationStats := make(map[string]BucketReplicationStats, len(r.Cache)) | 
					
						
							|  |  |  | 	for k, v := range r.Cache { | 
					
						
							|  |  |  | 		bucketReplicationStats[k] = v.Clone() | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2023-08-30 16:00:59 +08:00
										 |  |  | 	r.RUnlock() | 
					
						
							|  |  |  | 	for k, v := range bucketReplicationStats { | 
					
						
							|  |  |  | 		v.QStat = r.qCache.getBucketStats(k) | 
					
						
							|  |  |  | 		bucketReplicationStats[k] = v | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2022-05-24 00:15:30 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	return bucketReplicationStats | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-08-30 16:00:59 +08:00
										 |  |  | func (r *ReplicationStats) getSRMetricsForNode() SRMetricsSummary { | 
					
						
							|  |  |  | 	if r == nil { | 
					
						
							|  |  |  | 		return SRMetricsSummary{} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	m := SRMetricsSummary{ | 
					
						
							|  |  |  | 		Uptime:        UTCNow().Unix() - globalBootTime.Unix(), | 
					
						
							|  |  |  | 		Queued:        r.qCache.getSiteStats(), | 
					
						
							|  |  |  | 		ActiveWorkers: r.ActiveWorkers(), | 
					
						
							|  |  |  | 		Metrics:       r.srStats.get(), | 
					
						
							| 
									
										
										
										
											2024-02-06 14:00:45 +08:00
										 |  |  | 		Proxied:       r.pCache.getSiteStats(), | 
					
						
							| 
									
										
										
										
											2023-08-30 16:00:59 +08:00
										 |  |  | 		ReplicaSize:   atomic.LoadInt64(&r.srStats.ReplicaSize), | 
					
						
							|  |  |  | 		ReplicaCount:  atomic.LoadInt64(&r.srStats.ReplicaCount), | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return m | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-04-07 02:32:52 +08:00
										 |  |  | // Get replication metrics for a bucket from this node since this node came up.
 | 
					
						
							| 
									
										
										
										
											2021-04-04 00:03:42 +08:00
										 |  |  | func (r *ReplicationStats) Get(bucket string) BucketReplicationStats { | 
					
						
							|  |  |  | 	if r == nil { | 
					
						
							| 
									
										
										
										
											2021-09-19 04:31:35 +08:00
										 |  |  | 		return BucketReplicationStats{Stats: make(map[string]*BucketReplicationStat)} | 
					
						
							| 
									
										
										
										
											2021-04-04 00:03:42 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	r.RLock() | 
					
						
							|  |  |  | 	defer r.RUnlock() | 
					
						
							| 
									
										
										
										
											2021-04-05 06:34:33 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-04-04 00:03:42 +08:00
										 |  |  | 	st, ok := r.Cache[bucket] | 
					
						
							|  |  |  | 	if !ok { | 
					
						
							| 
									
										
										
										
											2023-08-30 16:00:59 +08:00
										 |  |  | 		return BucketReplicationStats{Stats: make(map[string]*BucketReplicationStat)} | 
					
						
							| 
									
										
										
										
											2021-04-04 00:03:42 +08:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2021-09-19 04:31:35 +08:00
										 |  |  | 	return st.Clone() | 
					
						
							| 
									
										
										
										
											2021-04-04 00:03:42 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // NewReplicationStats initialize in-memory replication statistics
 | 
					
						
							|  |  |  | func NewReplicationStats(ctx context.Context, objectAPI ObjectLayer) *ReplicationStats { | 
					
						
							| 
									
										
										
										
											2023-08-30 16:00:59 +08:00
										 |  |  | 	r := metrics.NewRegistry() | 
					
						
							|  |  |  | 	rs := ReplicationStats{ | 
					
						
							|  |  |  | 		Cache:           make(map[string]*BucketReplicationStats), | 
					
						
							|  |  |  | 		qCache:          newQueueCache(r), | 
					
						
							| 
									
										
										
										
											2024-02-06 14:00:45 +08:00
										 |  |  | 		pCache:          newProxyStatsCache(), | 
					
						
							| 
									
										
										
										
											2023-08-30 16:00:59 +08:00
										 |  |  | 		srStats:         newSRStats(), | 
					
						
							|  |  |  | 		movingAvgTicker: time.NewTicker(2 * time.Second), | 
					
						
							|  |  |  | 		wTimer:          time.NewTicker(2 * time.Second), | 
					
						
							|  |  |  | 		qTimer:          time.NewTicker(2 * time.Second), | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		workers:  newActiveWorkerStat(r), | 
					
						
							|  |  |  | 		registry: r, | 
					
						
							| 
									
										
										
										
											2022-09-27 00:04:54 +08:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2023-08-30 16:00:59 +08:00
										 |  |  | 	go rs.collectWorkerMetrics(ctx) | 
					
						
							|  |  |  | 	go rs.collectQueueMetrics(ctx) | 
					
						
							|  |  |  | 	return &rs | 
					
						
							| 
									
										
										
										
											2022-09-13 03:40:02 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-08-30 16:00:59 +08:00
										 |  |  | func (r *ReplicationStats) getAllLatest(bucketsUsage map[string]BucketUsageInfo) (bucketsReplicationStats map[string]BucketStats) { | 
					
						
							| 
									
										
										
										
											2024-08-15 20:04:40 +08:00
										 |  |  | 	if r == nil { | 
					
						
							|  |  |  | 		return nil | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2022-09-13 03:40:02 +08:00
										 |  |  | 	peerBucketStatsList := globalNotificationSys.GetClusterAllBucketStats(GlobalContext) | 
					
						
							| 
									
										
										
										
											2023-08-30 16:00:59 +08:00
										 |  |  | 	bucketsReplicationStats = make(map[string]BucketStats, len(bucketsUsage)) | 
					
						
							| 
									
										
										
										
											2022-09-13 03:40:02 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-08-30 16:00:59 +08:00
										 |  |  | 	for bucket := range bucketsUsage { | 
					
						
							| 
									
										
										
										
											2022-09-13 03:40:02 +08:00
										 |  |  | 		bucketStats := make([]BucketStats, len(peerBucketStatsList)) | 
					
						
							|  |  |  | 		for i, peerBucketStats := range peerBucketStatsList { | 
					
						
							|  |  |  | 			bucketStat, ok := peerBucketStats.Stats[bucket] | 
					
						
							|  |  |  | 			if !ok { | 
					
						
							|  |  |  | 				continue | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 			bucketStats[i] = bucketStat | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2023-08-30 16:00:59 +08:00
										 |  |  | 		bucketsReplicationStats[bucket] = r.calculateBucketReplicationStats(bucket, bucketStats) | 
					
						
							| 
									
										
										
										
											2022-09-13 03:40:02 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 	return bucketsReplicationStats | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-08-30 16:00:59 +08:00
										 |  |  | func (r *ReplicationStats) calculateBucketReplicationStats(bucket string, bucketStats []BucketStats) (bs BucketStats) { | 
					
						
							| 
									
										
										
										
											2022-09-17 08:09:45 +08:00
										 |  |  | 	if r == nil { | 
					
						
							| 
									
										
										
										
											2023-08-30 16:00:59 +08:00
										 |  |  | 		bs = BucketStats{ | 
					
						
							|  |  |  | 			ReplicationStats: BucketReplicationStats{ | 
					
						
							|  |  |  | 				Stats: make(map[string]*BucketReplicationStat), | 
					
						
							|  |  |  | 			}, | 
					
						
							|  |  |  | 			QueueStats: ReplicationQueueStats{}, | 
					
						
							| 
									
										
										
										
											2024-02-06 14:00:45 +08:00
										 |  |  | 			ProxyStats: ProxyMetric{}, | 
					
						
							| 
									
										
										
										
											2022-09-17 08:09:45 +08:00
										 |  |  | 		} | 
					
						
							| 
									
										
										
										
											2023-08-30 16:00:59 +08:00
										 |  |  | 		return bs | 
					
						
							| 
									
										
										
										
											2022-09-17 08:09:45 +08:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2023-08-30 16:00:59 +08:00
										 |  |  | 	var s BucketReplicationStats | 
					
						
							| 
									
										
										
										
											2022-09-13 03:40:02 +08:00
										 |  |  | 	// accumulate cluster bucket stats
 | 
					
						
							|  |  |  | 	stats := make(map[string]*BucketReplicationStat) | 
					
						
							| 
									
										
										
										
											2023-08-30 16:00:59 +08:00
										 |  |  | 	var ( | 
					
						
							|  |  |  | 		totReplicaSize, totReplicatedSize   int64 | 
					
						
							|  |  |  | 		totReplicaCount, totReplicatedCount int64 | 
					
						
							|  |  |  | 		totFailed                           RTimedMetrics | 
					
						
							|  |  |  | 		tq                                  InQueueMetric | 
					
						
							|  |  |  | 	) | 
					
						
							| 
									
										
										
										
											2022-09-13 03:40:02 +08:00
										 |  |  | 	for _, bucketStat := range bucketStats { | 
					
						
							|  |  |  | 		totReplicaSize += bucketStat.ReplicationStats.ReplicaSize | 
					
						
							| 
									
										
										
										
											2023-08-30 16:00:59 +08:00
										 |  |  | 		totReplicaCount += bucketStat.ReplicationStats.ReplicaCount | 
					
						
							|  |  |  | 		for _, q := range bucketStat.QueueStats.Nodes { | 
					
						
							|  |  |  | 			tq = tq.merge(q.QStats) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-09-13 03:40:02 +08:00
										 |  |  | 		for arn, stat := range bucketStat.ReplicationStats.Stats { | 
					
						
							|  |  |  | 			oldst := stats[arn] | 
					
						
							|  |  |  | 			if oldst == nil { | 
					
						
							| 
									
										
										
										
											2023-08-30 16:00:59 +08:00
										 |  |  | 				oldst = &BucketReplicationStat{ | 
					
						
							|  |  |  | 					XferRateLrg: newXferStats(), | 
					
						
							|  |  |  | 					XferRateSml: newXferStats(), | 
					
						
							|  |  |  | 				} | 
					
						
							| 
									
										
										
										
											2022-09-13 03:40:02 +08:00
										 |  |  | 			} | 
					
						
							| 
									
										
										
										
											2023-08-30 16:00:59 +08:00
										 |  |  | 			fstats := stat.FailStats.merge(oldst.FailStats) | 
					
						
							|  |  |  | 			lrg := oldst.XferRateLrg.merge(*stat.XferRateLrg) | 
					
						
							|  |  |  | 			sml := oldst.XferRateSml.merge(*stat.XferRateSml) | 
					
						
							| 
									
										
										
										
											2022-09-13 03:40:02 +08:00
										 |  |  | 			stats[arn] = &BucketReplicationStat{ | 
					
						
							| 
									
										
										
										
											2023-08-30 16:00:59 +08:00
										 |  |  | 				Failed:          fstats.toMetric(), | 
					
						
							|  |  |  | 				FailStats:       fstats, | 
					
						
							|  |  |  | 				ReplicatedSize:  stat.ReplicatedSize + oldst.ReplicatedSize, | 
					
						
							|  |  |  | 				ReplicatedCount: stat.ReplicatedCount + oldst.ReplicatedCount, | 
					
						
							|  |  |  | 				Latency:         stat.Latency.merge(oldst.Latency), | 
					
						
							|  |  |  | 				XferRateLrg:     &lrg, | 
					
						
							|  |  |  | 				XferRateSml:     &sml, | 
					
						
							| 
									
										
										
										
											2022-09-13 03:40:02 +08:00
										 |  |  | 			} | 
					
						
							| 
									
										
										
										
											2023-08-30 16:00:59 +08:00
										 |  |  | 			totReplicatedSize += stat.ReplicatedSize | 
					
						
							|  |  |  | 			totReplicatedCount += stat.ReplicatedCount | 
					
						
							|  |  |  | 			totFailed = totFailed.merge(stat.FailStats) | 
					
						
							| 
									
										
										
										
											2022-09-13 03:40:02 +08:00
										 |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	s = BucketReplicationStats{ | 
					
						
							| 
									
										
										
										
											2023-08-30 16:00:59 +08:00
										 |  |  | 		Stats:           stats, | 
					
						
							|  |  |  | 		QStat:           tq, | 
					
						
							|  |  |  | 		ReplicaSize:     totReplicaSize, | 
					
						
							|  |  |  | 		ReplicaCount:    totReplicaCount, | 
					
						
							|  |  |  | 		ReplicatedSize:  totReplicatedSize, | 
					
						
							|  |  |  | 		ReplicatedCount: totReplicatedCount, | 
					
						
							|  |  |  | 		Failed:          totFailed.toMetric(), | 
					
						
							| 
									
										
										
										
											2022-09-13 03:40:02 +08:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2022-12-20 16:07:53 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-08-30 16:00:59 +08:00
										 |  |  | 	var qs ReplicationQueueStats | 
					
						
							|  |  |  | 	for _, bs := range bucketStats { | 
					
						
							|  |  |  | 		qs.Nodes = append(qs.Nodes, bs.QueueStats.Nodes...) | 
					
						
							| 
									
										
										
										
											2022-09-13 03:40:02 +08:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2023-08-30 16:00:59 +08:00
										 |  |  | 	qs.Uptime = UTCNow().Unix() - globalBootTime.Unix() | 
					
						
							| 
									
										
										
										
											2024-02-06 14:00:45 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	var ps ProxyMetric | 
					
						
							|  |  |  | 	for _, bs := range bucketStats { | 
					
						
							|  |  |  | 		ps.add(bs.ProxyStats) | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2023-08-30 16:00:59 +08:00
										 |  |  | 	bs = BucketStats{ | 
					
						
							|  |  |  | 		ReplicationStats: s, | 
					
						
							|  |  |  | 		QueueStats:       qs, | 
					
						
							| 
									
										
										
										
											2024-02-06 14:00:45 +08:00
										 |  |  | 		ProxyStats:       ps, | 
					
						
							| 
									
										
										
										
											2023-08-30 16:00:59 +08:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2022-11-05 00:59:14 +08:00
										 |  |  | 	r.mostRecentStatsMu.Lock() | 
					
						
							| 
									
										
										
										
											2022-09-13 03:40:02 +08:00
										 |  |  | 	if len(r.mostRecentStats.Stats) == 0 { | 
					
						
							|  |  |  | 		r.mostRecentStats = BucketStatsMap{Stats: make(map[string]BucketStats, 1), Timestamp: UTCNow()} | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2023-08-30 16:00:59 +08:00
										 |  |  | 	if len(bs.ReplicationStats.Stats) > 0 { | 
					
						
							|  |  |  | 		r.mostRecentStats.Stats[bucket] = bs | 
					
						
							| 
									
										
										
										
											2022-12-20 16:07:53 +08:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2022-09-13 03:40:02 +08:00
										 |  |  | 	r.mostRecentStats.Timestamp = UTCNow() | 
					
						
							| 
									
										
										
										
											2022-11-05 00:59:14 +08:00
										 |  |  | 	r.mostRecentStatsMu.Unlock() | 
					
						
							| 
									
										
										
										
											2023-08-30 16:00:59 +08:00
										 |  |  | 	return bs | 
					
						
							| 
									
										
										
										
											2022-09-13 03:40:02 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // get the most current of in-memory replication stats  and data usage info from crawler.
 | 
					
						
							| 
									
										
										
										
											2023-08-30 16:00:59 +08:00
										 |  |  | func (r *ReplicationStats) getLatestReplicationStats(bucket string) (s BucketStats) { | 
					
						
							| 
									
										
										
										
											2024-08-15 20:04:40 +08:00
										 |  |  | 	if r == nil { | 
					
						
							|  |  |  | 		return s | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2022-09-13 03:40:02 +08:00
										 |  |  | 	bucketStats := globalNotificationSys.GetClusterBucketStats(GlobalContext, bucket) | 
					
						
							| 
									
										
										
										
											2023-08-30 16:00:59 +08:00
										 |  |  | 	return r.calculateBucketReplicationStats(bucket, bucketStats) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-02-22 14:26:06 +08:00
										 |  |  | func (r *ReplicationStats) incQ(bucket string, sz int64, isDeleteRepl bool, opType replication.Type) { | 
					
						
							| 
									
										
										
										
											2023-08-30 16:00:59 +08:00
										 |  |  | 	r.qCache.Lock() | 
					
						
							|  |  |  | 	defer r.qCache.Unlock() | 
					
						
							|  |  |  | 	v, ok := r.qCache.bucketStats[bucket] | 
					
						
							|  |  |  | 	if !ok { | 
					
						
							|  |  |  | 		v = newInQueueStats(r.registry, bucket) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	atomic.AddInt64(&v.nowBytes, sz) | 
					
						
							|  |  |  | 	atomic.AddInt64(&v.nowCount, 1) | 
					
						
							|  |  |  | 	r.qCache.bucketStats[bucket] = v | 
					
						
							|  |  |  | 	atomic.AddInt64(&r.qCache.srQueueStats.nowBytes, sz) | 
					
						
							|  |  |  | 	atomic.AddInt64(&r.qCache.srQueueStats.nowCount, 1) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (r *ReplicationStats) decQ(bucket string, sz int64, isDelMarker bool, opType replication.Type) { | 
					
						
							|  |  |  | 	r.qCache.Lock() | 
					
						
							|  |  |  | 	defer r.qCache.Unlock() | 
					
						
							|  |  |  | 	v, ok := r.qCache.bucketStats[bucket] | 
					
						
							|  |  |  | 	if !ok { | 
					
						
							|  |  |  | 		v = newInQueueStats(r.registry, bucket) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	atomic.AddInt64(&v.nowBytes, -1*sz) | 
					
						
							|  |  |  | 	atomic.AddInt64(&v.nowCount, -1) | 
					
						
							|  |  |  | 	r.qCache.bucketStats[bucket] = v | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	atomic.AddInt64(&r.qCache.srQueueStats.nowBytes, -1*sz) | 
					
						
							|  |  |  | 	atomic.AddInt64(&r.qCache.srQueueStats.nowCount, -1) | 
					
						
							| 
									
										
										
										
											2021-04-04 00:03:42 +08:00
										 |  |  | } | 
					
						
							| 
									
										
										
										
											2024-02-06 14:00:45 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | // incProxy increments proxy metrics for proxied calls
 | 
					
						
							|  |  |  | func (r *ReplicationStats) incProxy(bucket string, api replProxyAPI, isErr bool) { | 
					
						
							| 
									
										
										
										
											2024-08-15 20:04:40 +08:00
										 |  |  | 	if r != nil { | 
					
						
							|  |  |  | 		r.pCache.inc(bucket, api, isErr) | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-02-06 14:00:45 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (r *ReplicationStats) getProxyStats(bucket string) ProxyMetric { | 
					
						
							| 
									
										
										
										
											2024-08-15 20:04:40 +08:00
										 |  |  | 	if r == nil { | 
					
						
							|  |  |  | 		return ProxyMetric{} | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-02-06 14:00:45 +08:00
										 |  |  | 	return r.pCache.getBucketStats(bucket) | 
					
						
							|  |  |  | } |