mirror of https://github.com/minio/minio.git
				
				
				
			
		
			
	
	
		
			290 lines
		
	
	
		
			8.2 KiB
		
	
	
	
		
			Go
		
	
	
	
		
		
			
		
	
	
			290 lines
		
	
	
		
			8.2 KiB
		
	
	
	
		
			Go
		
	
	
	
| 
								 | 
							
								// Copyright (c) 2015-2023 MinIO, Inc.
							 | 
						||
| 
								 | 
							
								//
							 | 
						||
| 
								 | 
							
								// This file is part of MinIO Object Storage stack
							 | 
						||
| 
								 | 
							
								//
							 | 
						||
| 
								 | 
							
								// This program is free software: you can redistribute it and/or modify
							 | 
						||
| 
								 | 
							
								// it under the terms of the GNU Affero General Public License as published by
							 | 
						||
| 
								 | 
							
								// the Free Software Foundation, either version 3 of the License, or
							 | 
						||
| 
								 | 
							
								// (at your option) any later version.
							 | 
						||
| 
								 | 
							
								//
							 | 
						||
| 
								 | 
							
								// This program is distributed in the hope that it will be useful
							 | 
						||
| 
								 | 
							
								// but WITHOUT ANY WARRANTY; without even the implied warranty of
							 | 
						||
| 
								 | 
							
								// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
							 | 
						||
| 
								 | 
							
								// GNU Affero General Public License for more details.
							 | 
						||
| 
								 | 
							
								//
							 | 
						||
| 
								 | 
							
								// You should have received a copy of the GNU Affero General Public License
							 | 
						||
| 
								 | 
							
								// along with this program.  If not, see <http://www.gnu.org/licenses/>.
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								package cmd
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								import (
							 | 
						||
| 
								 | 
							
									"fmt"
							 | 
						||
| 
								 | 
							
									"sync"
							 | 
						||
| 
								 | 
							
									"sync/atomic"
							 | 
						||
| 
								 | 
							
									"time"
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									"github.com/minio/madmin-go/v3"
							 | 
						||
| 
								 | 
							
									"github.com/minio/minio-go/v7"
							 | 
						||
| 
								 | 
							
								)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								//go:generate msgp -file $GOFILE
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// RStat has replication error stats
							 | 
						||
| 
								 | 
							
								type RStat struct {
							 | 
						||
| 
								 | 
							
									Count int64 `json:"count"`
							 | 
						||
| 
								 | 
							
									Bytes int64 `json:"bytes"`
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// RTimedMetrics has replication error stats for various time windows
							 | 
						||
| 
								 | 
							
								type RTimedMetrics struct {
							 | 
						||
| 
								 | 
							
									LastHour    ReplicationLastHour `json:"lastHour"`
							 | 
						||
| 
								 | 
							
									SinceUptime RStat               `json:"sinceUptime"`
							 | 
						||
| 
								 | 
							
									LastMinute  ReplicationLastMinute
							 | 
						||
| 
								 | 
							
									// Error counts
							 | 
						||
| 
								 | 
							
									ErrCounts map[string]int `json:"errCounts"` // Count of credential errors
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								func (rt *RTimedMetrics) String() string {
							 | 
						||
| 
								 | 
							
									s := rt.toMetric()
							 | 
						||
| 
								 | 
							
									return fmt.Sprintf("Errors in LastMinute: %v, LastHour: %v, SinceUptime: %v", s.LastMinute.Count, s.LastHour.Count, s.Totals.Count)
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								func (rt *RTimedMetrics) toMetric() madmin.TimedErrStats {
							 | 
						||
| 
								 | 
							
									if rt == nil {
							 | 
						||
| 
								 | 
							
										return madmin.TimedErrStats{}
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
									errCounts := make(map[string]int)
							 | 
						||
| 
								 | 
							
									for k, v := range rt.ErrCounts {
							 | 
						||
| 
								 | 
							
										errCounts[k] = v
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
									minuteTotals := rt.LastMinute.getTotal()
							 | 
						||
| 
								 | 
							
									hourTotals := rt.LastHour.getTotal()
							 | 
						||
| 
								 | 
							
									return madmin.TimedErrStats{
							 | 
						||
| 
								 | 
							
										LastMinute: madmin.RStat{
							 | 
						||
| 
								 | 
							
											Count: float64(minuteTotals.N),
							 | 
						||
| 
								 | 
							
											Bytes: minuteTotals.Size,
							 | 
						||
| 
								 | 
							
										},
							 | 
						||
| 
								 | 
							
										LastHour: madmin.RStat{
							 | 
						||
| 
								 | 
							
											Count: float64(hourTotals.N),
							 | 
						||
| 
								 | 
							
											Bytes: hourTotals.Size,
							 | 
						||
| 
								 | 
							
										},
							 | 
						||
| 
								 | 
							
										Totals: madmin.RStat{
							 | 
						||
| 
								 | 
							
											Count: float64(rt.SinceUptime.Count),
							 | 
						||
| 
								 | 
							
											Bytes: rt.SinceUptime.Bytes,
							 | 
						||
| 
								 | 
							
										},
							 | 
						||
| 
								 | 
							
										ErrCounts: errCounts,
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								func (rt *RTimedMetrics) addsize(size int64, err error) {
							 | 
						||
| 
								 | 
							
									// failures seen since uptime
							 | 
						||
| 
								 | 
							
									atomic.AddInt64(&rt.SinceUptime.Bytes, size)
							 | 
						||
| 
								 | 
							
									atomic.AddInt64(&rt.SinceUptime.Count, 1)
							 | 
						||
| 
								 | 
							
									rt.LastMinute.addsize(size)
							 | 
						||
| 
								 | 
							
									rt.LastHour.addsize(size)
							 | 
						||
| 
								 | 
							
									if err != nil && minio.ToErrorResponse(err).Code == "AccessDenied" {
							 | 
						||
| 
								 | 
							
										if rt.ErrCounts == nil {
							 | 
						||
| 
								 | 
							
											rt.ErrCounts = make(map[string]int)
							 | 
						||
| 
								 | 
							
										}
							 | 
						||
| 
								 | 
							
										rt.ErrCounts["AccessDenied"]++
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								func (rt *RTimedMetrics) merge(o RTimedMetrics) (n RTimedMetrics) {
							 | 
						||
| 
								 | 
							
									n.SinceUptime.Bytes = atomic.LoadInt64(&rt.SinceUptime.Bytes) + atomic.LoadInt64(&o.SinceUptime.Bytes)
							 | 
						||
| 
								 | 
							
									n.SinceUptime.Count = atomic.LoadInt64(&rt.SinceUptime.Count) + atomic.LoadInt64(&o.SinceUptime.Count)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									n.LastMinute = n.LastMinute.merge(rt.LastMinute)
							 | 
						||
| 
								 | 
							
									n.LastMinute = n.LastMinute.merge(o.LastMinute)
							 | 
						||
| 
								 | 
							
									n.LastHour = n.LastHour.merge(rt.LastHour)
							 | 
						||
| 
								 | 
							
									n.LastHour = n.LastHour.merge(o.LastHour)
							 | 
						||
| 
								 | 
							
									n.ErrCounts = make(map[string]int)
							 | 
						||
| 
								 | 
							
									for k, v := range rt.ErrCounts {
							 | 
						||
| 
								 | 
							
										n.ErrCounts[k] = v
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
									for k, v := range o.ErrCounts {
							 | 
						||
| 
								 | 
							
										n.ErrCounts[k] += v
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
									return n
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// SRStats has replication stats at site level
							 | 
						||
| 
								 | 
							
								type SRStats struct {
							 | 
						||
| 
								 | 
							
									// Total Replica size in bytes
							 | 
						||
| 
								 | 
							
									ReplicaSize int64 `json:"replicaSize"`
							 | 
						||
| 
								 | 
							
									// Total Replica received
							 | 
						||
| 
								 | 
							
									ReplicaCount int64                `json:"replicaCount"`
							 | 
						||
| 
								 | 
							
									M            map[string]*SRStatus `json:"srStatusMap"`
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									movingAvgTicker *time.Ticker // Ticker for calculating moving averages
							 | 
						||
| 
								 | 
							
									lock            sync.RWMutex // mutex for srStats
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// SRStatus has replication stats at deployment level
							 | 
						||
| 
								 | 
							
								type SRStatus struct {
							 | 
						||
| 
								 | 
							
									ReplicatedSize int64 `json:"completedReplicationSize"`
							 | 
						||
| 
								 | 
							
									// Total number of failed operations including metadata updates in the last minute
							 | 
						||
| 
								 | 
							
									Failed RTimedMetrics `json:"failedReplication"`
							 | 
						||
| 
								 | 
							
									// Total number of completed operations
							 | 
						||
| 
								 | 
							
									ReplicatedCount int64 `json:"replicationCount"`
							 | 
						||
| 
								 | 
							
									// Replication latency information
							 | 
						||
| 
								 | 
							
									Latency ReplicationLatency `json:"replicationLatency"`
							 | 
						||
| 
								 | 
							
									// transfer rate for large uploads
							 | 
						||
| 
								 | 
							
									XferRateLrg *XferStats `json:"largeTransferRate" msg:"lt"`
							 | 
						||
| 
								 | 
							
									// transfer rate for small uploads
							 | 
						||
| 
								 | 
							
									XferRateSml *XferStats `json:"smallTransferRate" msg:"st"`
							 | 
						||
| 
								 | 
							
									// Endpoint is the replication target endpoint
							 | 
						||
| 
								 | 
							
									Endpoint string `json:"-"`
							 | 
						||
| 
								 | 
							
									// Secure is true if the replication target endpoint is secure
							 | 
						||
| 
								 | 
							
									Secure bool `json:"-"`
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								func (sr *SRStats) update(st replStat, dID string) {
							 | 
						||
| 
								 | 
							
									sr.lock.Lock()
							 | 
						||
| 
								 | 
							
									defer sr.lock.Unlock()
							 | 
						||
| 
								 | 
							
									srs, ok := sr.M[dID]
							 | 
						||
| 
								 | 
							
									if !ok {
							 | 
						||
| 
								 | 
							
										srs = &SRStatus{
							 | 
						||
| 
								 | 
							
											XferRateLrg: newXferStats(),
							 | 
						||
| 
								 | 
							
											XferRateSml: newXferStats(),
							 | 
						||
| 
								 | 
							
										}
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
									srs.Endpoint = st.Endpoint
							 | 
						||
| 
								 | 
							
									srs.Secure = st.Secure
							 | 
						||
| 
								 | 
							
									switch {
							 | 
						||
| 
								 | 
							
									case st.Completed:
							 | 
						||
| 
								 | 
							
										srs.ReplicatedSize += st.TransferSize
							 | 
						||
| 
								 | 
							
										srs.ReplicatedCount++
							 | 
						||
| 
								 | 
							
										if st.TransferDuration > 0 {
							 | 
						||
| 
								 | 
							
											srs.Latency.update(st.TransferSize, st.TransferDuration)
							 | 
						||
| 
								 | 
							
											srs.updateXferRate(st.TransferSize, st.TransferDuration)
							 | 
						||
| 
								 | 
							
										}
							 | 
						||
| 
								 | 
							
									case st.Failed:
							 | 
						||
| 
								 | 
							
										srs.Failed.addsize(st.TransferSize, st.Err)
							 | 
						||
| 
								 | 
							
									case st.Pending:
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
									sr.M[dID] = srs
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								func (sr *SRStats) get() map[string]SRMetric {
							 | 
						||
| 
								 | 
							
									epMap := globalBucketTargetSys.healthStats()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									sr.lock.RLock()
							 | 
						||
| 
								 | 
							
									defer sr.lock.RUnlock()
							 | 
						||
| 
								 | 
							
									m := make(map[string]SRMetric, len(sr.M))
							 | 
						||
| 
								 | 
							
									for dID, v := range sr.M {
							 | 
						||
| 
								 | 
							
										t := newXferStats()
							 | 
						||
| 
								 | 
							
										mx := make(map[RMetricName]XferStats)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
										if v.XferRateLrg != nil {
							 | 
						||
| 
								 | 
							
											mx[Large] = *v.XferRateLrg.Clone()
							 | 
						||
| 
								 | 
							
											m := t.merge(*v.XferRateLrg)
							 | 
						||
| 
								 | 
							
											t = &m
							 | 
						||
| 
								 | 
							
										}
							 | 
						||
| 
								 | 
							
										if v.XferRateSml != nil {
							 | 
						||
| 
								 | 
							
											mx[Small] = *v.XferRateSml.Clone()
							 | 
						||
| 
								 | 
							
											m := t.merge(*v.XferRateSml)
							 | 
						||
| 
								 | 
							
											t = &m
							 | 
						||
| 
								 | 
							
										}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
										mx[Total] = *t
							 | 
						||
| 
								 | 
							
										metric := SRMetric{
							 | 
						||
| 
								 | 
							
											ReplicatedSize:  v.ReplicatedSize,
							 | 
						||
| 
								 | 
							
											ReplicatedCount: v.ReplicatedCount,
							 | 
						||
| 
								 | 
							
											DeploymentID:    dID,
							 | 
						||
| 
								 | 
							
											Failed:          v.Failed.toMetric(),
							 | 
						||
| 
								 | 
							
											XferStats:       mx,
							 | 
						||
| 
								 | 
							
										}
							 | 
						||
| 
								 | 
							
										epHealth, ok := epMap[v.Endpoint]
							 | 
						||
| 
								 | 
							
										if ok {
							 | 
						||
| 
								 | 
							
											metric.Endpoint = epHealth.Endpoint
							 | 
						||
| 
								 | 
							
											metric.TotalDowntime = epHealth.offlineDuration
							 | 
						||
| 
								 | 
							
											metric.LastOnline = epHealth.lastOnline
							 | 
						||
| 
								 | 
							
											metric.Online = epHealth.Online
							 | 
						||
| 
								 | 
							
											metric.Latency = madmin.LatencyStat{
							 | 
						||
| 
								 | 
							
												Curr: epHealth.latency.curr,
							 | 
						||
| 
								 | 
							
												Avg:  epHealth.latency.avg,
							 | 
						||
| 
								 | 
							
												Max:  epHealth.latency.peak,
							 | 
						||
| 
								 | 
							
											}
							 | 
						||
| 
								 | 
							
										}
							 | 
						||
| 
								 | 
							
										m[dID] = metric
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
									return m
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								func (srs *SRStatus) updateXferRate(sz int64, duration time.Duration) {
							 | 
						||
| 
								 | 
							
									if sz > minLargeObjSize {
							 | 
						||
| 
								 | 
							
										srs.XferRateLrg.addSize(sz, duration)
							 | 
						||
| 
								 | 
							
									} else {
							 | 
						||
| 
								 | 
							
										srs.XferRateSml.addSize(sz, duration)
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								func newSRStats() *SRStats {
							 | 
						||
| 
								 | 
							
									s := SRStats{
							 | 
						||
| 
								 | 
							
										M:               make(map[string]*SRStatus),
							 | 
						||
| 
								 | 
							
										movingAvgTicker: time.NewTicker(time.Second * 2),
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
									go s.trackEWMA()
							 | 
						||
| 
								 | 
							
									return &s
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								func (sr *SRStats) trackEWMA() {
							 | 
						||
| 
								 | 
							
									for {
							 | 
						||
| 
								 | 
							
										select {
							 | 
						||
| 
								 | 
							
										case <-sr.movingAvgTicker.C:
							 | 
						||
| 
								 | 
							
											sr.updateMovingAvg()
							 | 
						||
| 
								 | 
							
										case <-GlobalContext.Done():
							 | 
						||
| 
								 | 
							
											return
							 | 
						||
| 
								 | 
							
										}
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								func (sr *SRStats) updateMovingAvg() {
							 | 
						||
| 
								 | 
							
									sr.lock.Lock()
							 | 
						||
| 
								 | 
							
									defer sr.lock.Unlock()
							 | 
						||
| 
								 | 
							
									for _, s := range sr.M {
							 | 
						||
| 
								 | 
							
										s.XferRateLrg.measure.updateExponentialMovingAverage(time.Now())
							 | 
						||
| 
								 | 
							
										s.XferRateSml.measure.updateExponentialMovingAverage(time.Now())
							 | 
						||
| 
								 | 
							
									}
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// SRMetric captures replication metrics for a deployment
							 | 
						||
| 
								 | 
							
								type SRMetric struct {
							 | 
						||
| 
								 | 
							
									DeploymentID  string             `json:"deploymentID"`
							 | 
						||
| 
								 | 
							
									Endpoint      string             `json:"endpoint"`
							 | 
						||
| 
								 | 
							
									TotalDowntime time.Duration      `json:"totalDowntime"`
							 | 
						||
| 
								 | 
							
									LastOnline    time.Time          `json:"lastOnline"`
							 | 
						||
| 
								 | 
							
									Online        bool               `json:"isOnline"`
							 | 
						||
| 
								 | 
							
									Latency       madmin.LatencyStat `json:"latency"`
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									// replication metrics across buckets roll up
							 | 
						||
| 
								 | 
							
									ReplicatedSize int64 `json:"replicatedSize"`
							 | 
						||
| 
								 | 
							
									// Total number of completed operations
							 | 
						||
| 
								 | 
							
									ReplicatedCount int64 `json:"replicatedCount"`
							 | 
						||
| 
								 | 
							
									// Failed captures replication errors in various time windows
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									Failed madmin.TimedErrStats `json:"failed,omitempty"`
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									XferStats map[RMetricName]XferStats `json:"transferSummary"`
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								// SRMetricsSummary captures summary of replication counts across buckets on site
							 | 
						||
| 
								 | 
							
								// along with op metrics rollup.
							 | 
						||
| 
								 | 
							
								type SRMetricsSummary struct {
							 | 
						||
| 
								 | 
							
									// op metrics roll up
							 | 
						||
| 
								 | 
							
									ActiveWorkers ActiveWorkerStat `json:"activeWorkers"`
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									// Total Replica size in bytes
							 | 
						||
| 
								 | 
							
									ReplicaSize int64 `json:"replicaSize"`
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
									// Total number of replica received
							 | 
						||
| 
								 | 
							
									ReplicaCount int64 `json:"replicaCount"`
							 | 
						||
| 
								 | 
							
									// Queued operations
							 | 
						||
| 
								 | 
							
									Queued InQueueMetric `json:"queued"`
							 | 
						||
| 
								 | 
							
									// replication metrics summary for each site replication peer
							 | 
						||
| 
								 | 
							
									Metrics map[string]SRMetric `json:"replMetrics"`
							 | 
						||
| 
								 | 
							
									// uptime of node being queried for site replication metrics
							 | 
						||
| 
								 | 
							
									Uptime int64 `json:"uptime"`
							 | 
						||
| 
								 | 
							
								}
							 |