mirror of https://github.com/minio/minio.git
				
				
				
			
		
			
				
	
	
		
			303 lines
		
	
	
		
			7.6 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			303 lines
		
	
	
		
			7.6 KiB
		
	
	
	
		
			Go
		
	
	
	
| // Copyright (c) 2022 MinIO, Inc.
 | |
| //
 | |
| // This file is part of MinIO Object Storage stack
 | |
| //
 | |
| // This program is free software: you can redistribute it and/or modify
 | |
| // it under the terms of the GNU Affero General Public License as published by
 | |
| // the Free Software Foundation, either version 3 of the License, or
 | |
| // (at your option) any later version.
 | |
| //
 | |
| // This program is distributed in the hope that it will be useful
 | |
| // but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
| // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | |
| // GNU Affero General Public License for more details.
 | |
| //
 | |
| // You should have received a copy of the GNU Affero General Public License
 | |
| // along with this program.  If not, see <http://www.gnu.org/licenses/>.
 | |
| 
 | |
| package cmd
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"errors"
 | |
| 	"fmt"
 | |
| 	"io"
 | |
| 	"io/ioutil"
 | |
| 	"math/rand"
 | |
| 	"net/http"
 | |
| 	"sync"
 | |
| 	"sync/atomic"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/dustin/go-humanize"
 | |
| 	"github.com/google/uuid"
 | |
| 	"github.com/minio/madmin-go"
 | |
| 	"github.com/minio/minio-go/v7"
 | |
| 	"github.com/minio/minio-go/v7/pkg/credentials"
 | |
| 	"github.com/minio/pkg/randreader"
 | |
| )
 | |
| 
 | |
| // SpeedtestResult return value of the speedtest function
 | |
| type SpeedtestResult struct {
 | |
| 	Endpoint  string
 | |
| 	Uploads   uint64
 | |
| 	Downloads uint64
 | |
| 	Error     string
 | |
| }
 | |
| 
 | |
| func newRandomReader(size int) io.Reader {
 | |
| 	return io.LimitReader(randreader.New(), int64(size))
 | |
| }
 | |
| 
 | |
| // Runs the speedtest on local MinIO process.
 | |
| func selfSpeedtest(ctx context.Context, size, concurrent int, duration time.Duration, storageClass string) (SpeedtestResult, error) {
 | |
| 	objAPI := newObjectLayerFn()
 | |
| 	if objAPI == nil {
 | |
| 		return SpeedtestResult{}, errServerNotInitialized
 | |
| 	}
 | |
| 
 | |
| 	var errOnce sync.Once
 | |
| 	var retError string
 | |
| 	var wg sync.WaitGroup
 | |
| 	var totalBytesWritten uint64
 | |
| 	var totalBytesRead uint64
 | |
| 
 | |
| 	region := globalSite.Region
 | |
| 	if region == "" {
 | |
| 		region = "us-east-1"
 | |
| 	}
 | |
| 
 | |
| 	client, err := minio.New(globalLocalNodeName, &minio.Options{
 | |
| 		Creds:     credentials.NewStaticV4(globalActiveCred.AccessKey, globalActiveCred.SecretKey, ""),
 | |
| 		Secure:    globalIsTLS,
 | |
| 		Transport: globalProxyTransport,
 | |
| 		Region:    region,
 | |
| 	})
 | |
| 	if err != nil {
 | |
| 		return SpeedtestResult{}, err
 | |
| 	}
 | |
| 
 | |
| 	objCountPerThread := make([]uint64, concurrent)
 | |
| 
 | |
| 	uploadsCtx, uploadsCancel := context.WithCancel(context.Background())
 | |
| 	defer uploadsCancel()
 | |
| 
 | |
| 	go func() {
 | |
| 		time.Sleep(duration)
 | |
| 		uploadsCancel()
 | |
| 	}()
 | |
| 
 | |
| 	objNamePrefix := uuid.New().String() + SlashSeparator
 | |
| 
 | |
| 	userMetadata := make(map[string]string)
 | |
| 	userMetadata[globalObjectPerfUserMetadata] = "true"
 | |
| 
 | |
| 	wg.Add(concurrent)
 | |
| 	for i := 0; i < concurrent; i++ {
 | |
| 		go func(i int) {
 | |
| 			defer wg.Done()
 | |
| 			for {
 | |
| 				reader := newRandomReader(size)
 | |
| 				tmpObjName := fmt.Sprintf("%s%d.%d", objNamePrefix, i, objCountPerThread[i])
 | |
| 				info, err := client.PutObject(uploadsCtx, globalObjectPerfBucket, tmpObjName, reader, int64(size), minio.PutObjectOptions{
 | |
| 					UserMetadata:     userMetadata,
 | |
| 					DisableMultipart: true,
 | |
| 				}) // Bypass S3 API freeze
 | |
| 				if err != nil {
 | |
| 					if !contextCanceled(uploadsCtx) && !errors.Is(err, context.Canceled) {
 | |
| 						errOnce.Do(func() {
 | |
| 							retError = err.Error()
 | |
| 						})
 | |
| 					}
 | |
| 					uploadsCancel()
 | |
| 					return
 | |
| 				}
 | |
| 				atomic.AddUint64(&totalBytesWritten, uint64(info.Size))
 | |
| 				objCountPerThread[i]++
 | |
| 			}
 | |
| 		}(i)
 | |
| 	}
 | |
| 	wg.Wait()
 | |
| 
 | |
| 	// We already saw write failures, no need to proceed into read's
 | |
| 	if retError != "" {
 | |
| 		return SpeedtestResult{Uploads: totalBytesWritten, Downloads: totalBytesRead, Error: retError}, nil
 | |
| 	}
 | |
| 
 | |
| 	downloadsCtx, downloadsCancel := context.WithCancel(context.Background())
 | |
| 	defer downloadsCancel()
 | |
| 	go func() {
 | |
| 		time.Sleep(duration)
 | |
| 		downloadsCancel()
 | |
| 	}()
 | |
| 
 | |
| 	wg.Add(concurrent)
 | |
| 	for i := 0; i < concurrent; i++ {
 | |
| 		go func(i int) {
 | |
| 			defer wg.Done()
 | |
| 			var j uint64
 | |
| 			if objCountPerThread[i] == 0 {
 | |
| 				return
 | |
| 			}
 | |
| 			for {
 | |
| 				if objCountPerThread[i] == j {
 | |
| 					j = 0
 | |
| 				}
 | |
| 				opts := minio.GetObjectOptions{}
 | |
| 				opts.Set(globalObjectPerfUserMetadata, "true") // Bypass S3 API freeze
 | |
| 				r, err := client.GetObject(downloadsCtx, globalObjectPerfBucket, fmt.Sprintf("%s%d.%d", objNamePrefix, i, j), opts)
 | |
| 				if err != nil {
 | |
| 					errResp, ok := err.(minio.ErrorResponse)
 | |
| 					if ok && errResp.StatusCode == http.StatusNotFound {
 | |
| 						continue
 | |
| 					}
 | |
| 					if !contextCanceled(downloadsCtx) && !errors.Is(err, context.Canceled) {
 | |
| 						errOnce.Do(func() {
 | |
| 							retError = err.Error()
 | |
| 						})
 | |
| 					}
 | |
| 					downloadsCancel()
 | |
| 					return
 | |
| 				}
 | |
| 				n, err := io.Copy(ioutil.Discard, r)
 | |
| 				r.Close()
 | |
| 				if err == nil {
 | |
| 					// Only capture success criteria - do not
 | |
| 					// have to capture failed reads, truncated
 | |
| 					// reads etc.
 | |
| 					atomic.AddUint64(&totalBytesRead, uint64(n))
 | |
| 				}
 | |
| 				if err != nil {
 | |
| 					if !contextCanceled(downloadsCtx) && !errors.Is(err, context.Canceled) {
 | |
| 						errOnce.Do(func() {
 | |
| 							retError = err.Error()
 | |
| 						})
 | |
| 					}
 | |
| 					downloadsCancel()
 | |
| 					return
 | |
| 				}
 | |
| 				j++
 | |
| 			}
 | |
| 		}(i)
 | |
| 	}
 | |
| 	wg.Wait()
 | |
| 
 | |
| 	return SpeedtestResult{Uploads: totalBytesWritten, Downloads: totalBytesRead, Error: retError}, nil
 | |
| }
 | |
| 
 | |
| // To collect RX stats during "mc support perf net"
 | |
| // RXSample holds the RX bytes for the duration between
 | |
| // the last peer to connect and the first peer to disconnect.
 | |
| // This is to improve the RX throughput accuracy.
 | |
| type netPerfRX struct {
 | |
| 	RX                uint64    // RX bytes
 | |
| 	lastToConnect     time.Time // time at which last peer to connect to us
 | |
| 	firstToDisconnect time.Time // time at which the first peer disconnects from us
 | |
| 	RXSample          uint64    // RX bytes between lastToConnect and firstToDisconnect
 | |
| 	activeConnections uint64
 | |
| 	sync.RWMutex
 | |
| }
 | |
| 
 | |
| func (n *netPerfRX) Connect() {
 | |
| 	n.Lock()
 | |
| 	defer n.Unlock()
 | |
| 	n.activeConnections++
 | |
| 	atomic.StoreUint64(&globalNetPerfRX.RX, 0)
 | |
| 	n.lastToConnect = time.Now()
 | |
| }
 | |
| 
 | |
| func (n *netPerfRX) Disconnect() {
 | |
| 	n.Lock()
 | |
| 	defer n.Unlock()
 | |
| 	n.activeConnections--
 | |
| 	if n.firstToDisconnect.IsZero() {
 | |
| 		n.RXSample = atomic.LoadUint64(&n.RX)
 | |
| 		n.firstToDisconnect = time.Now()
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (n *netPerfRX) ActiveConnections() uint64 {
 | |
| 	n.RLock()
 | |
| 	defer n.RUnlock()
 | |
| 	return n.activeConnections
 | |
| }
 | |
| 
 | |
| func (n *netPerfRX) Reset() {
 | |
| 	n.RLock()
 | |
| 	defer n.RUnlock()
 | |
| 	n.RX = 0
 | |
| 	n.RXSample = 0
 | |
| 	n.lastToConnect = time.Time{}
 | |
| 	n.firstToDisconnect = time.Time{}
 | |
| }
 | |
| 
 | |
| // Reader to read random data.
 | |
| type netperfReader struct {
 | |
| 	n   uint64
 | |
| 	eof chan struct{}
 | |
| 	buf []byte
 | |
| }
 | |
| 
 | |
| func (m *netperfReader) Read(b []byte) (int, error) {
 | |
| 	select {
 | |
| 	case <-m.eof:
 | |
| 		return 0, io.EOF
 | |
| 	default:
 | |
| 	}
 | |
| 	n := copy(b, m.buf)
 | |
| 	atomic.AddUint64(&m.n, uint64(n))
 | |
| 	return n, nil
 | |
| }
 | |
| 
 | |
| func netperf(ctx context.Context, duration time.Duration) madmin.NetperfNodeResult {
 | |
| 	r := &netperfReader{eof: make(chan struct{})}
 | |
| 	r.buf = make([]byte, 128*humanize.KiByte)
 | |
| 	rand.Read(r.buf)
 | |
| 
 | |
| 	connectionsPerPeer := 16
 | |
| 
 | |
| 	if len(globalNotificationSys.peerClients) > 16 {
 | |
| 		// For a large cluster it's enough to have 1 connection per peer to saturate the network.
 | |
| 		connectionsPerPeer = 1
 | |
| 	}
 | |
| 
 | |
| 	errStr := ""
 | |
| 	var wg sync.WaitGroup
 | |
| 	for index := range globalNotificationSys.peerClients {
 | |
| 		if globalNotificationSys.peerClients[index] == nil {
 | |
| 			continue
 | |
| 		}
 | |
| 		go func(index int) {
 | |
| 			for i := 0; i < connectionsPerPeer; i++ {
 | |
| 				wg.Add(1)
 | |
| 				go func() {
 | |
| 					defer wg.Done()
 | |
| 					err := globalNotificationSys.peerClients[index].DevNull(ctx, r)
 | |
| 					if err != nil {
 | |
| 						errStr = err.Error()
 | |
| 					}
 | |
| 				}()
 | |
| 			}
 | |
| 		}(index)
 | |
| 	}
 | |
| 
 | |
| 	time.Sleep(duration)
 | |
| 	close(r.eof)
 | |
| 	wg.Wait()
 | |
| 	for {
 | |
| 		if globalNetPerfRX.ActiveConnections() == 0 {
 | |
| 			break
 | |
| 		}
 | |
| 		time.Sleep(time.Second)
 | |
| 	}
 | |
| 	rx := float64(globalNetPerfRX.RXSample)
 | |
| 	delta := globalNetPerfRX.firstToDisconnect.Sub(globalNetPerfRX.lastToConnect)
 | |
| 	if delta < 0 {
 | |
| 		rx = 0
 | |
| 		errStr = "network disconnection issues detected"
 | |
| 	}
 | |
| 
 | |
| 	globalNetPerfRX.Reset()
 | |
| 	return madmin.NetperfNodeResult{Endpoint: "", TX: r.n / uint64(duration.Seconds()), RX: uint64(rx / delta.Seconds()), Error: errStr}
 | |
| }
 |