| 
									
										
										
										
											2022-03-09 01:54:38 +08:00
										 |  |  | // Copyright (c) 2022 MinIO, Inc.
 | 
					
						
							|  |  |  | //
 | 
					
						
							|  |  |  | // This file is part of MinIO Object Storage stack
 | 
					
						
							|  |  |  | //
 | 
					
						
							|  |  |  | // This program is free software: you can redistribute it and/or modify
 | 
					
						
							|  |  |  | // it under the terms of the GNU Affero General Public License as published by
 | 
					
						
							|  |  |  | // the Free Software Foundation, either version 3 of the License, or
 | 
					
						
							|  |  |  | // (at your option) any later version.
 | 
					
						
							|  |  |  | //
 | 
					
						
							|  |  |  | // This program is distributed in the hope that it will be useful
 | 
					
						
							|  |  |  | // but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
					
						
							|  |  |  | // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
					
						
							|  |  |  | // GNU Affero General Public License for more details.
 | 
					
						
							|  |  |  | //
 | 
					
						
							|  |  |  | // You should have received a copy of the GNU Affero General Public License
 | 
					
						
							|  |  |  | // along with this program.  If not, see <http://www.gnu.org/licenses/>.
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | package cmd | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import ( | 
					
						
							|  |  |  | 	"context" | 
					
						
							|  |  |  | 	"errors" | 
					
						
							|  |  |  | 	"fmt" | 
					
						
							|  |  |  | 	"io" | 
					
						
							|  |  |  | 	"io/ioutil" | 
					
						
							|  |  |  | 	"math/rand" | 
					
						
							| 
									
										
										
										
											2022-04-08 12:20:40 +08:00
										 |  |  | 	"net/http" | 
					
						
							| 
									
										
										
										
											2022-03-09 01:54:38 +08:00
										 |  |  | 	"sync" | 
					
						
							|  |  |  | 	"sync/atomic" | 
					
						
							|  |  |  | 	"time" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	"github.com/dustin/go-humanize" | 
					
						
							|  |  |  | 	"github.com/google/uuid" | 
					
						
							|  |  |  | 	"github.com/minio/madmin-go" | 
					
						
							| 
									
										
										
										
											2022-04-08 12:20:40 +08:00
										 |  |  | 	"github.com/minio/minio-go/v7" | 
					
						
							|  |  |  | 	"github.com/minio/minio-go/v7/pkg/credentials" | 
					
						
							| 
									
										
										
										
											2022-03-09 01:54:38 +08:00
										 |  |  | 	"github.com/minio/pkg/randreader" | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // SpeedtestResult return value of the speedtest function
 | 
					
						
							|  |  |  | type SpeedtestResult struct { | 
					
						
							|  |  |  | 	Endpoint  string | 
					
						
							|  |  |  | 	Uploads   uint64 | 
					
						
							|  |  |  | 	Downloads uint64 | 
					
						
							|  |  |  | 	Error     string | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func newRandomReader(size int) io.Reader { | 
					
						
							|  |  |  | 	return io.LimitReader(randreader.New(), int64(size)) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Runs the speedtest on local MinIO process.
 | 
					
						
							|  |  |  | func selfSpeedtest(ctx context.Context, size, concurrent int, duration time.Duration, storageClass string) (SpeedtestResult, error) { | 
					
						
							|  |  |  | 	objAPI := newObjectLayerFn() | 
					
						
							|  |  |  | 	if objAPI == nil { | 
					
						
							|  |  |  | 		return SpeedtestResult{}, errServerNotInitialized | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	var errOnce sync.Once | 
					
						
							|  |  |  | 	var retError string | 
					
						
							|  |  |  | 	var wg sync.WaitGroup | 
					
						
							|  |  |  | 	var totalBytesWritten uint64 | 
					
						
							|  |  |  | 	var totalBytesRead uint64 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-04-08 12:20:40 +08:00
										 |  |  | 	region := globalSite.Region | 
					
						
							|  |  |  | 	if region == "" { | 
					
						
							|  |  |  | 		region = "us-east-1" | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	client, err := minio.New(globalLocalNodeName, &minio.Options{ | 
					
						
							| 
									
										
										
										
											2022-06-24 02:26:53 +08:00
										 |  |  | 		Creds:     credentials.NewStaticV4(globalActiveCred.AccessKey, globalActiveCred.SecretKey, ""), | 
					
						
							| 
									
										
										
										
											2022-04-08 12:20:40 +08:00
										 |  |  | 		Secure:    globalIsTLS, | 
					
						
							|  |  |  | 		Transport: globalProxyTransport, | 
					
						
							|  |  |  | 		Region:    region, | 
					
						
							|  |  |  | 	}) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return SpeedtestResult{}, err | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-03-09 01:54:38 +08:00
										 |  |  | 	objCountPerThread := make([]uint64, concurrent) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	uploadsCtx, uploadsCancel := context.WithCancel(context.Background()) | 
					
						
							|  |  |  | 	defer uploadsCancel() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	go func() { | 
					
						
							|  |  |  | 		time.Sleep(duration) | 
					
						
							|  |  |  | 		uploadsCancel() | 
					
						
							|  |  |  | 	}() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-06-15 02:22:07 +08:00
										 |  |  | 	objNamePrefix := uuid.New().String() + SlashSeparator | 
					
						
							| 
									
										
										
										
											2022-04-08 12:20:40 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	userMetadata := make(map[string]string) | 
					
						
							|  |  |  | 	userMetadata[globalObjectPerfUserMetadata] = "true" | 
					
						
							| 
									
										
										
										
											2022-03-09 01:54:38 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	wg.Add(concurrent) | 
					
						
							|  |  |  | 	for i := 0; i < concurrent; i++ { | 
					
						
							|  |  |  | 		go func(i int) { | 
					
						
							|  |  |  | 			defer wg.Done() | 
					
						
							|  |  |  | 			for { | 
					
						
							| 
									
										
										
										
											2022-04-08 12:20:40 +08:00
										 |  |  | 				reader := newRandomReader(size) | 
					
						
							| 
									
										
										
										
											2022-06-15 02:22:07 +08:00
										 |  |  | 				tmpObjName := fmt.Sprintf("%s%d.%d", objNamePrefix, i, objCountPerThread[i]) | 
					
						
							|  |  |  | 				info, err := client.PutObject(uploadsCtx, globalObjectPerfBucket, tmpObjName, reader, int64(size), minio.PutObjectOptions{ | 
					
						
							| 
									
										
										
										
											2022-06-24 02:26:53 +08:00
										 |  |  | 					UserMetadata:         userMetadata, | 
					
						
							|  |  |  | 					DisableContentSha256: true, | 
					
						
							|  |  |  | 					DisableMultipart:     true, | 
					
						
							| 
									
										
										
										
											2022-06-15 02:22:07 +08:00
										 |  |  | 				}) // Bypass S3 API freeze
 | 
					
						
							| 
									
										
										
										
											2022-03-09 01:54:38 +08:00
										 |  |  | 				if err != nil { | 
					
						
							|  |  |  | 					if !contextCanceled(uploadsCtx) && !errors.Is(err, context.Canceled) { | 
					
						
							|  |  |  | 						errOnce.Do(func() { | 
					
						
							|  |  |  | 							retError = err.Error() | 
					
						
							|  |  |  | 						}) | 
					
						
							|  |  |  | 					} | 
					
						
							|  |  |  | 					uploadsCancel() | 
					
						
							|  |  |  | 					return | 
					
						
							|  |  |  | 				} | 
					
						
							| 
									
										
										
										
											2022-04-08 12:20:40 +08:00
										 |  |  | 				atomic.AddUint64(&totalBytesWritten, uint64(info.Size)) | 
					
						
							| 
									
										
										
										
											2022-03-09 01:54:38 +08:00
										 |  |  | 				objCountPerThread[i]++ | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 		}(i) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	wg.Wait() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// We already saw write failures, no need to proceed into read's
 | 
					
						
							|  |  |  | 	if retError != "" { | 
					
						
							|  |  |  | 		return SpeedtestResult{Uploads: totalBytesWritten, Downloads: totalBytesRead, Error: retError}, nil | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	downloadsCtx, downloadsCancel := context.WithCancel(context.Background()) | 
					
						
							|  |  |  | 	defer downloadsCancel() | 
					
						
							|  |  |  | 	go func() { | 
					
						
							|  |  |  | 		time.Sleep(duration) | 
					
						
							|  |  |  | 		downloadsCancel() | 
					
						
							|  |  |  | 	}() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	wg.Add(concurrent) | 
					
						
							|  |  |  | 	for i := 0; i < concurrent; i++ { | 
					
						
							|  |  |  | 		go func(i int) { | 
					
						
							|  |  |  | 			defer wg.Done() | 
					
						
							|  |  |  | 			var j uint64 | 
					
						
							|  |  |  | 			if objCountPerThread[i] == 0 { | 
					
						
							|  |  |  | 				return | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 			for { | 
					
						
							|  |  |  | 				if objCountPerThread[i] == j { | 
					
						
							|  |  |  | 					j = 0 | 
					
						
							|  |  |  | 				} | 
					
						
							| 
									
										
										
										
											2022-04-08 12:20:40 +08:00
										 |  |  | 				opts := minio.GetObjectOptions{} | 
					
						
							|  |  |  | 				opts.Set(globalObjectPerfUserMetadata, "true") // Bypass S3 API freeze
 | 
					
						
							|  |  |  | 				r, err := client.GetObject(downloadsCtx, globalObjectPerfBucket, fmt.Sprintf("%s%d.%d", objNamePrefix, i, j), opts) | 
					
						
							| 
									
										
										
										
											2022-03-09 01:54:38 +08:00
										 |  |  | 				if err != nil { | 
					
						
							| 
									
										
										
										
											2022-04-08 12:20:40 +08:00
										 |  |  | 					errResp, ok := err.(minio.ErrorResponse) | 
					
						
							|  |  |  | 					if ok && errResp.StatusCode == http.StatusNotFound { | 
					
						
							| 
									
										
										
										
											2022-03-09 01:54:38 +08:00
										 |  |  | 						continue | 
					
						
							|  |  |  | 					} | 
					
						
							|  |  |  | 					if !contextCanceled(downloadsCtx) && !errors.Is(err, context.Canceled) { | 
					
						
							|  |  |  | 						errOnce.Do(func() { | 
					
						
							|  |  |  | 							retError = err.Error() | 
					
						
							|  |  |  | 						}) | 
					
						
							|  |  |  | 					} | 
					
						
							|  |  |  | 					downloadsCancel() | 
					
						
							|  |  |  | 					return | 
					
						
							|  |  |  | 				} | 
					
						
							|  |  |  | 				n, err := io.Copy(ioutil.Discard, r) | 
					
						
							|  |  |  | 				r.Close() | 
					
						
							|  |  |  | 				if err == nil { | 
					
						
							|  |  |  | 					// Only capture success criteria - do not
 | 
					
						
							|  |  |  | 					// have to capture failed reads, truncated
 | 
					
						
							|  |  |  | 					// reads etc.
 | 
					
						
							|  |  |  | 					atomic.AddUint64(&totalBytesRead, uint64(n)) | 
					
						
							|  |  |  | 				} | 
					
						
							|  |  |  | 				if err != nil { | 
					
						
							|  |  |  | 					if !contextCanceled(downloadsCtx) && !errors.Is(err, context.Canceled) { | 
					
						
							|  |  |  | 						errOnce.Do(func() { | 
					
						
							|  |  |  | 							retError = err.Error() | 
					
						
							|  |  |  | 						}) | 
					
						
							|  |  |  | 					} | 
					
						
							|  |  |  | 					downloadsCancel() | 
					
						
							|  |  |  | 					return | 
					
						
							|  |  |  | 				} | 
					
						
							|  |  |  | 				j++ | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 		}(i) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	wg.Wait() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	return SpeedtestResult{Uploads: totalBytesWritten, Downloads: totalBytesRead, Error: retError}, nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // To collect RX stats during "mc support perf net"
 | 
					
						
							|  |  |  | // RXSample holds the RX bytes for the duration between
 | 
					
						
							|  |  |  | // the last peer to connect and the first peer to disconnect.
 | 
					
						
							|  |  |  | // This is to improve the RX throughput accuracy.
 | 
					
						
							|  |  |  | type netPerfRX struct { | 
					
						
							|  |  |  | 	RX                uint64    // RX bytes
 | 
					
						
							|  |  |  | 	lastToConnect     time.Time // time at which last peer to connect to us
 | 
					
						
							|  |  |  | 	firstToDisconnect time.Time // time at which the first peer disconnects from us
 | 
					
						
							|  |  |  | 	RXSample          uint64    // RX bytes between lastToConnect and firstToDisconnect
 | 
					
						
							|  |  |  | 	activeConnections uint64 | 
					
						
							|  |  |  | 	sync.RWMutex | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (n *netPerfRX) Connect() { | 
					
						
							|  |  |  | 	n.Lock() | 
					
						
							|  |  |  | 	defer n.Unlock() | 
					
						
							|  |  |  | 	n.activeConnections++ | 
					
						
							|  |  |  | 	atomic.StoreUint64(&globalNetPerfRX.RX, 0) | 
					
						
							|  |  |  | 	n.lastToConnect = time.Now() | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (n *netPerfRX) Disconnect() { | 
					
						
							|  |  |  | 	n.Lock() | 
					
						
							|  |  |  | 	defer n.Unlock() | 
					
						
							|  |  |  | 	n.activeConnections-- | 
					
						
							|  |  |  | 	if n.firstToDisconnect.IsZero() { | 
					
						
							|  |  |  | 		n.RXSample = atomic.LoadUint64(&n.RX) | 
					
						
							|  |  |  | 		n.firstToDisconnect = time.Now() | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (n *netPerfRX) ActiveConnections() uint64 { | 
					
						
							|  |  |  | 	n.RLock() | 
					
						
							|  |  |  | 	defer n.RUnlock() | 
					
						
							|  |  |  | 	return n.activeConnections | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (n *netPerfRX) Reset() { | 
					
						
							|  |  |  | 	n.RLock() | 
					
						
							|  |  |  | 	defer n.RUnlock() | 
					
						
							|  |  |  | 	n.RX = 0 | 
					
						
							|  |  |  | 	n.RXSample = 0 | 
					
						
							|  |  |  | 	n.lastToConnect = time.Time{} | 
					
						
							|  |  |  | 	n.firstToDisconnect = time.Time{} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Reader to read random data.
 | 
					
						
							|  |  |  | type netperfReader struct { | 
					
						
							|  |  |  | 	n   uint64 | 
					
						
							|  |  |  | 	eof chan struct{} | 
					
						
							|  |  |  | 	buf []byte | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (m *netperfReader) Read(b []byte) (int, error) { | 
					
						
							|  |  |  | 	select { | 
					
						
							|  |  |  | 	case <-m.eof: | 
					
						
							|  |  |  | 		return 0, io.EOF | 
					
						
							|  |  |  | 	default: | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	n := copy(b, m.buf) | 
					
						
							|  |  |  | 	atomic.AddUint64(&m.n, uint64(n)) | 
					
						
							|  |  |  | 	return n, nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func netperf(ctx context.Context, duration time.Duration) madmin.NetperfNodeResult { | 
					
						
							|  |  |  | 	r := &netperfReader{eof: make(chan struct{})} | 
					
						
							|  |  |  | 	r.buf = make([]byte, 128*humanize.KiByte) | 
					
						
							|  |  |  | 	rand.Read(r.buf) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	connectionsPerPeer := 16 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	if len(globalNotificationSys.peerClients) > 16 { | 
					
						
							|  |  |  | 		// For a large cluster it's enough to have 1 connection per peer to saturate the network.
 | 
					
						
							|  |  |  | 		connectionsPerPeer = 1 | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	errStr := "" | 
					
						
							|  |  |  | 	var wg sync.WaitGroup | 
					
						
							|  |  |  | 	for index := range globalNotificationSys.peerClients { | 
					
						
							|  |  |  | 		if globalNotificationSys.peerClients[index] == nil { | 
					
						
							|  |  |  | 			continue | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		go func(index int) { | 
					
						
							|  |  |  | 			for i := 0; i < connectionsPerPeer; i++ { | 
					
						
							|  |  |  | 				wg.Add(1) | 
					
						
							|  |  |  | 				go func() { | 
					
						
							|  |  |  | 					defer wg.Done() | 
					
						
							|  |  |  | 					err := globalNotificationSys.peerClients[index].DevNull(ctx, r) | 
					
						
							|  |  |  | 					if err != nil { | 
					
						
							|  |  |  | 						errStr = err.Error() | 
					
						
							|  |  |  | 					} | 
					
						
							|  |  |  | 				}() | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 		}(index) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	time.Sleep(duration) | 
					
						
							|  |  |  | 	close(r.eof) | 
					
						
							|  |  |  | 	wg.Wait() | 
					
						
							|  |  |  | 	for { | 
					
						
							|  |  |  | 		if globalNetPerfRX.ActiveConnections() == 0 { | 
					
						
							|  |  |  | 			break | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		time.Sleep(time.Second) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	rx := float64(globalNetPerfRX.RXSample) | 
					
						
							|  |  |  | 	delta := globalNetPerfRX.firstToDisconnect.Sub(globalNetPerfRX.lastToConnect) | 
					
						
							|  |  |  | 	if delta < 0 { | 
					
						
							|  |  |  | 		rx = 0 | 
					
						
							|  |  |  | 		errStr = "network disconnection issues detected" | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	globalNetPerfRX.Reset() | 
					
						
							|  |  |  | 	return madmin.NetperfNodeResult{Endpoint: "", TX: r.n / uint64(duration.Seconds()), RX: uint64(rx / delta.Seconds()), Error: errStr} | 
					
						
							|  |  |  | } |