mirror of https://github.com/minio/minio.git
				
				
				
			
		
			
	
	
		
			290 lines
		
	
	
		
			8.2 KiB
		
	
	
	
		
			Go
		
	
	
	
		
		
			
		
	
	
			290 lines
		
	
	
		
			8.2 KiB
		
	
	
	
		
			Go
		
	
	
	
|  | // Copyright (c) 2015-2023 MinIO, Inc.
 | ||
|  | //
 | ||
|  | // This file is part of MinIO Object Storage stack
 | ||
|  | //
 | ||
|  | // This program is free software: you can redistribute it and/or modify
 | ||
|  | // it under the terms of the GNU Affero General Public License as published by
 | ||
|  | // the Free Software Foundation, either version 3 of the License, or
 | ||
|  | // (at your option) any later version.
 | ||
|  | //
 | ||
|  | // This program is distributed in the hope that it will be useful
 | ||
|  | // but WITHOUT ANY WARRANTY; without even the implied warranty of
 | ||
|  | // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | ||
|  | // GNU Affero General Public License for more details.
 | ||
|  | //
 | ||
|  | // You should have received a copy of the GNU Affero General Public License
 | ||
|  | // along with this program.  If not, see <http://www.gnu.org/licenses/>.
 | ||
|  | 
 | ||
|  | package cmd | ||
|  | 
 | ||
|  | import ( | ||
|  | 	"fmt" | ||
|  | 	"sync" | ||
|  | 	"sync/atomic" | ||
|  | 	"time" | ||
|  | 
 | ||
|  | 	"github.com/minio/madmin-go/v3" | ||
|  | 	"github.com/minio/minio-go/v7" | ||
|  | ) | ||
|  | 
 | ||
|  | //go:generate msgp -file $GOFILE
 | ||
|  | 
 | ||
|  | // RStat has replication error stats
 | ||
|  | type RStat struct { | ||
|  | 	Count int64 `json:"count"` | ||
|  | 	Bytes int64 `json:"bytes"` | ||
|  | } | ||
|  | 
 | ||
|  | // RTimedMetrics has replication error stats for various time windows
 | ||
|  | type RTimedMetrics struct { | ||
|  | 	LastHour    ReplicationLastHour `json:"lastHour"` | ||
|  | 	SinceUptime RStat               `json:"sinceUptime"` | ||
|  | 	LastMinute  ReplicationLastMinute | ||
|  | 	// Error counts
 | ||
|  | 	ErrCounts map[string]int `json:"errCounts"` // Count of credential errors
 | ||
|  | } | ||
|  | 
 | ||
|  | func (rt *RTimedMetrics) String() string { | ||
|  | 	s := rt.toMetric() | ||
|  | 	return fmt.Sprintf("Errors in LastMinute: %v, LastHour: %v, SinceUptime: %v", s.LastMinute.Count, s.LastHour.Count, s.Totals.Count) | ||
|  | } | ||
|  | 
 | ||
|  | func (rt *RTimedMetrics) toMetric() madmin.TimedErrStats { | ||
|  | 	if rt == nil { | ||
|  | 		return madmin.TimedErrStats{} | ||
|  | 	} | ||
|  | 	errCounts := make(map[string]int) | ||
|  | 	for k, v := range rt.ErrCounts { | ||
|  | 		errCounts[k] = v | ||
|  | 	} | ||
|  | 	minuteTotals := rt.LastMinute.getTotal() | ||
|  | 	hourTotals := rt.LastHour.getTotal() | ||
|  | 	return madmin.TimedErrStats{ | ||
|  | 		LastMinute: madmin.RStat{ | ||
|  | 			Count: float64(minuteTotals.N), | ||
|  | 			Bytes: minuteTotals.Size, | ||
|  | 		}, | ||
|  | 		LastHour: madmin.RStat{ | ||
|  | 			Count: float64(hourTotals.N), | ||
|  | 			Bytes: hourTotals.Size, | ||
|  | 		}, | ||
|  | 		Totals: madmin.RStat{ | ||
|  | 			Count: float64(rt.SinceUptime.Count), | ||
|  | 			Bytes: rt.SinceUptime.Bytes, | ||
|  | 		}, | ||
|  | 		ErrCounts: errCounts, | ||
|  | 	} | ||
|  | } | ||
|  | 
 | ||
|  | func (rt *RTimedMetrics) addsize(size int64, err error) { | ||
|  | 	// failures seen since uptime
 | ||
|  | 	atomic.AddInt64(&rt.SinceUptime.Bytes, size) | ||
|  | 	atomic.AddInt64(&rt.SinceUptime.Count, 1) | ||
|  | 	rt.LastMinute.addsize(size) | ||
|  | 	rt.LastHour.addsize(size) | ||
|  | 	if err != nil && minio.ToErrorResponse(err).Code == "AccessDenied" { | ||
|  | 		if rt.ErrCounts == nil { | ||
|  | 			rt.ErrCounts = make(map[string]int) | ||
|  | 		} | ||
|  | 		rt.ErrCounts["AccessDenied"]++ | ||
|  | 	} | ||
|  | } | ||
|  | 
 | ||
|  | func (rt *RTimedMetrics) merge(o RTimedMetrics) (n RTimedMetrics) { | ||
|  | 	n.SinceUptime.Bytes = atomic.LoadInt64(&rt.SinceUptime.Bytes) + atomic.LoadInt64(&o.SinceUptime.Bytes) | ||
|  | 	n.SinceUptime.Count = atomic.LoadInt64(&rt.SinceUptime.Count) + atomic.LoadInt64(&o.SinceUptime.Count) | ||
|  | 
 | ||
|  | 	n.LastMinute = n.LastMinute.merge(rt.LastMinute) | ||
|  | 	n.LastMinute = n.LastMinute.merge(o.LastMinute) | ||
|  | 	n.LastHour = n.LastHour.merge(rt.LastHour) | ||
|  | 	n.LastHour = n.LastHour.merge(o.LastHour) | ||
|  | 	n.ErrCounts = make(map[string]int) | ||
|  | 	for k, v := range rt.ErrCounts { | ||
|  | 		n.ErrCounts[k] = v | ||
|  | 	} | ||
|  | 	for k, v := range o.ErrCounts { | ||
|  | 		n.ErrCounts[k] += v | ||
|  | 	} | ||
|  | 	return n | ||
|  | } | ||
|  | 
 | ||
|  | // SRStats has replication stats at site level
 | ||
|  | type SRStats struct { | ||
|  | 	// Total Replica size in bytes
 | ||
|  | 	ReplicaSize int64 `json:"replicaSize"` | ||
|  | 	// Total Replica received
 | ||
|  | 	ReplicaCount int64                `json:"replicaCount"` | ||
|  | 	M            map[string]*SRStatus `json:"srStatusMap"` | ||
|  | 
 | ||
|  | 	movingAvgTicker *time.Ticker // Ticker for calculating moving averages
 | ||
|  | 	lock            sync.RWMutex // mutex for srStats
 | ||
|  | } | ||
|  | 
 | ||
|  | // SRStatus has replication stats at deployment level
 | ||
|  | type SRStatus struct { | ||
|  | 	ReplicatedSize int64 `json:"completedReplicationSize"` | ||
|  | 	// Total number of failed operations including metadata updates in the last minute
 | ||
|  | 	Failed RTimedMetrics `json:"failedReplication"` | ||
|  | 	// Total number of completed operations
 | ||
|  | 	ReplicatedCount int64 `json:"replicationCount"` | ||
|  | 	// Replication latency information
 | ||
|  | 	Latency ReplicationLatency `json:"replicationLatency"` | ||
|  | 	// transfer rate for large uploads
 | ||
|  | 	XferRateLrg *XferStats `json:"largeTransferRate" msg:"lt"` | ||
|  | 	// transfer rate for small uploads
 | ||
|  | 	XferRateSml *XferStats `json:"smallTransferRate" msg:"st"` | ||
|  | 	// Endpoint is the replication target endpoint
 | ||
|  | 	Endpoint string `json:"-"` | ||
|  | 	// Secure is true if the replication target endpoint is secure
 | ||
|  | 	Secure bool `json:"-"` | ||
|  | } | ||
|  | 
 | ||
|  | func (sr *SRStats) update(st replStat, dID string) { | ||
|  | 	sr.lock.Lock() | ||
|  | 	defer sr.lock.Unlock() | ||
|  | 	srs, ok := sr.M[dID] | ||
|  | 	if !ok { | ||
|  | 		srs = &SRStatus{ | ||
|  | 			XferRateLrg: newXferStats(), | ||
|  | 			XferRateSml: newXferStats(), | ||
|  | 		} | ||
|  | 	} | ||
|  | 	srs.Endpoint = st.Endpoint | ||
|  | 	srs.Secure = st.Secure | ||
|  | 	switch { | ||
|  | 	case st.Completed: | ||
|  | 		srs.ReplicatedSize += st.TransferSize | ||
|  | 		srs.ReplicatedCount++ | ||
|  | 		if st.TransferDuration > 0 { | ||
|  | 			srs.Latency.update(st.TransferSize, st.TransferDuration) | ||
|  | 			srs.updateXferRate(st.TransferSize, st.TransferDuration) | ||
|  | 		} | ||
|  | 	case st.Failed: | ||
|  | 		srs.Failed.addsize(st.TransferSize, st.Err) | ||
|  | 	case st.Pending: | ||
|  | 	} | ||
|  | 	sr.M[dID] = srs | ||
|  | } | ||
|  | 
 | ||
|  | func (sr *SRStats) get() map[string]SRMetric { | ||
|  | 	epMap := globalBucketTargetSys.healthStats() | ||
|  | 
 | ||
|  | 	sr.lock.RLock() | ||
|  | 	defer sr.lock.RUnlock() | ||
|  | 	m := make(map[string]SRMetric, len(sr.M)) | ||
|  | 	for dID, v := range sr.M { | ||
|  | 		t := newXferStats() | ||
|  | 		mx := make(map[RMetricName]XferStats) | ||
|  | 
 | ||
|  | 		if v.XferRateLrg != nil { | ||
|  | 			mx[Large] = *v.XferRateLrg.Clone() | ||
|  | 			m := t.merge(*v.XferRateLrg) | ||
|  | 			t = &m | ||
|  | 		} | ||
|  | 		if v.XferRateSml != nil { | ||
|  | 			mx[Small] = *v.XferRateSml.Clone() | ||
|  | 			m := t.merge(*v.XferRateSml) | ||
|  | 			t = &m | ||
|  | 		} | ||
|  | 
 | ||
|  | 		mx[Total] = *t | ||
|  | 		metric := SRMetric{ | ||
|  | 			ReplicatedSize:  v.ReplicatedSize, | ||
|  | 			ReplicatedCount: v.ReplicatedCount, | ||
|  | 			DeploymentID:    dID, | ||
|  | 			Failed:          v.Failed.toMetric(), | ||
|  | 			XferStats:       mx, | ||
|  | 		} | ||
|  | 		epHealth, ok := epMap[v.Endpoint] | ||
|  | 		if ok { | ||
|  | 			metric.Endpoint = epHealth.Endpoint | ||
|  | 			metric.TotalDowntime = epHealth.offlineDuration | ||
|  | 			metric.LastOnline = epHealth.lastOnline | ||
|  | 			metric.Online = epHealth.Online | ||
|  | 			metric.Latency = madmin.LatencyStat{ | ||
|  | 				Curr: epHealth.latency.curr, | ||
|  | 				Avg:  epHealth.latency.avg, | ||
|  | 				Max:  epHealth.latency.peak, | ||
|  | 			} | ||
|  | 		} | ||
|  | 		m[dID] = metric | ||
|  | 	} | ||
|  | 	return m | ||
|  | } | ||
|  | 
 | ||
|  | func (srs *SRStatus) updateXferRate(sz int64, duration time.Duration) { | ||
|  | 	if sz > minLargeObjSize { | ||
|  | 		srs.XferRateLrg.addSize(sz, duration) | ||
|  | 	} else { | ||
|  | 		srs.XferRateSml.addSize(sz, duration) | ||
|  | 	} | ||
|  | } | ||
|  | 
 | ||
|  | func newSRStats() *SRStats { | ||
|  | 	s := SRStats{ | ||
|  | 		M:               make(map[string]*SRStatus), | ||
|  | 		movingAvgTicker: time.NewTicker(time.Second * 2), | ||
|  | 	} | ||
|  | 	go s.trackEWMA() | ||
|  | 	return &s | ||
|  | } | ||
|  | 
 | ||
|  | func (sr *SRStats) trackEWMA() { | ||
|  | 	for { | ||
|  | 		select { | ||
|  | 		case <-sr.movingAvgTicker.C: | ||
|  | 			sr.updateMovingAvg() | ||
|  | 		case <-GlobalContext.Done(): | ||
|  | 			return | ||
|  | 		} | ||
|  | 	} | ||
|  | } | ||
|  | 
 | ||
|  | func (sr *SRStats) updateMovingAvg() { | ||
|  | 	sr.lock.Lock() | ||
|  | 	defer sr.lock.Unlock() | ||
|  | 	for _, s := range sr.M { | ||
|  | 		s.XferRateLrg.measure.updateExponentialMovingAverage(time.Now()) | ||
|  | 		s.XferRateSml.measure.updateExponentialMovingAverage(time.Now()) | ||
|  | 	} | ||
|  | } | ||
|  | 
 | ||
|  | // SRMetric captures replication metrics for a deployment
 | ||
|  | type SRMetric struct { | ||
|  | 	DeploymentID  string             `json:"deploymentID"` | ||
|  | 	Endpoint      string             `json:"endpoint"` | ||
|  | 	TotalDowntime time.Duration      `json:"totalDowntime"` | ||
|  | 	LastOnline    time.Time          `json:"lastOnline"` | ||
|  | 	Online        bool               `json:"isOnline"` | ||
|  | 	Latency       madmin.LatencyStat `json:"latency"` | ||
|  | 
 | ||
|  | 	// replication metrics across buckets roll up
 | ||
|  | 	ReplicatedSize int64 `json:"replicatedSize"` | ||
|  | 	// Total number of completed operations
 | ||
|  | 	ReplicatedCount int64 `json:"replicatedCount"` | ||
|  | 	// Failed captures replication errors in various time windows
 | ||
|  | 
 | ||
|  | 	Failed madmin.TimedErrStats `json:"failed,omitempty"` | ||
|  | 
 | ||
|  | 	XferStats map[RMetricName]XferStats `json:"transferSummary"` | ||
|  | } | ||
|  | 
 | ||
|  | // SRMetricsSummary captures summary of replication counts across buckets on site
 | ||
|  | // along with op metrics rollup.
 | ||
|  | type SRMetricsSummary struct { | ||
|  | 	// op metrics roll up
 | ||
|  | 	ActiveWorkers ActiveWorkerStat `json:"activeWorkers"` | ||
|  | 
 | ||
|  | 	// Total Replica size in bytes
 | ||
|  | 	ReplicaSize int64 `json:"replicaSize"` | ||
|  | 
 | ||
|  | 	// Total number of replica received
 | ||
|  | 	ReplicaCount int64 `json:"replicaCount"` | ||
|  | 	// Queued operations
 | ||
|  | 	Queued InQueueMetric `json:"queued"` | ||
|  | 	// replication metrics summary for each site replication peer
 | ||
|  | 	Metrics map[string]SRMetric `json:"replMetrics"` | ||
|  | 	// uptime of node being queried for site replication metrics
 | ||
|  | 	Uptime int64 `json:"uptime"` | ||
|  | } |