mirror of https://github.com/minio/minio.git
				
				
				
			
		
			
				
	
	
		
			341 lines
		
	
	
		
			8.4 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			341 lines
		
	
	
		
			8.4 KiB
		
	
	
	
		
			Go
		
	
	
	
// Copyright (c) 2022 MinIO, Inc.
 | 
						|
//
 | 
						|
// This file is part of MinIO Object Storage stack
 | 
						|
//
 | 
						|
// This program is free software: you can redistribute it and/or modify
 | 
						|
// it under the terms of the GNU Affero General Public License as published by
 | 
						|
// the Free Software Foundation, either version 3 of the License, or
 | 
						|
// (at your option) any later version.
 | 
						|
//
 | 
						|
// This program is distributed in the hope that it will be useful
 | 
						|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
						|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
						|
// GNU Affero General Public License for more details.
 | 
						|
//
 | 
						|
// You should have received a copy of the GNU Affero General Public License
 | 
						|
// along with this program.  If not, see <http://www.gnu.org/licenses/>.
 | 
						|
 | 
						|
package cmd
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"errors"
 | 
						|
	"fmt"
 | 
						|
	"io"
 | 
						|
	"math/rand"
 | 
						|
	"net/http"
 | 
						|
	"sync"
 | 
						|
	"sync/atomic"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/dustin/go-humanize"
 | 
						|
	"github.com/minio/madmin-go/v2"
 | 
						|
	"github.com/minio/minio-go/v7"
 | 
						|
	"github.com/minio/pkg/randreader"
 | 
						|
)
 | 
						|
 | 
						|
// SpeedTestResult return value of the speedtest function
 | 
						|
type SpeedTestResult struct {
 | 
						|
	Endpoint      string
 | 
						|
	Uploads       uint64
 | 
						|
	Downloads     uint64
 | 
						|
	UploadTimes   madmin.TimeDurations
 | 
						|
	DownloadTimes madmin.TimeDurations
 | 
						|
	DownloadTTFB  madmin.TimeDurations
 | 
						|
	Error         string
 | 
						|
}
 | 
						|
 | 
						|
func newRandomReader(size int) io.Reader {
 | 
						|
	return io.LimitReader(randreader.New(), int64(size))
 | 
						|
}
 | 
						|
 | 
						|
type firstByteRecorder struct {
 | 
						|
	t *time.Time
 | 
						|
	r io.Reader
 | 
						|
}
 | 
						|
 | 
						|
func (f *firstByteRecorder) Read(p []byte) (n int, err error) {
 | 
						|
	if f.t != nil || len(p) == 0 {
 | 
						|
		return f.r.Read(p)
 | 
						|
	}
 | 
						|
	// Read a single byte.
 | 
						|
	n, err = f.r.Read(p[:1])
 | 
						|
	if n > 0 {
 | 
						|
		t := time.Now()
 | 
						|
		f.t = &t
 | 
						|
	}
 | 
						|
	return n, err
 | 
						|
}
 | 
						|
 | 
						|
// Runs the speedtest on local MinIO process.
 | 
						|
func selfSpeedTest(ctx context.Context, opts speedTestOpts) (SpeedTestResult, error) {
 | 
						|
	objAPI := newObjectLayerFn()
 | 
						|
	if objAPI == nil {
 | 
						|
		return SpeedTestResult{}, errServerNotInitialized
 | 
						|
	}
 | 
						|
 | 
						|
	var wg sync.WaitGroup
 | 
						|
	var errOnce sync.Once
 | 
						|
	var retError string
 | 
						|
	var totalBytesWritten uint64
 | 
						|
	var totalBytesRead uint64
 | 
						|
 | 
						|
	objCountPerThread := make([]uint64, opts.concurrency)
 | 
						|
 | 
						|
	uploadsCtx, uploadsCancel := context.WithCancel(context.Background())
 | 
						|
	defer uploadsCancel()
 | 
						|
 | 
						|
	go func() {
 | 
						|
		time.Sleep(opts.duration)
 | 
						|
		uploadsCancel()
 | 
						|
	}()
 | 
						|
 | 
						|
	objNamePrefix := pathJoin(speedTest, mustGetUUID())
 | 
						|
 | 
						|
	userMetadata := make(map[string]string)
 | 
						|
	userMetadata[globalObjectPerfUserMetadata] = "true" // Bypass S3 API freeze
 | 
						|
	popts := minio.PutObjectOptions{
 | 
						|
		UserMetadata:         userMetadata,
 | 
						|
		DisableContentSha256: true,
 | 
						|
		DisableMultipart:     true,
 | 
						|
	}
 | 
						|
 | 
						|
	var mu sync.Mutex
 | 
						|
	var uploadTimes madmin.TimeDurations
 | 
						|
	wg.Add(opts.concurrency)
 | 
						|
	for i := 0; i < opts.concurrency; i++ {
 | 
						|
		go func(i int) {
 | 
						|
			defer wg.Done()
 | 
						|
			for {
 | 
						|
				t := time.Now()
 | 
						|
				reader := newRandomReader(opts.objectSize)
 | 
						|
				tmpObjName := pathJoin(objNamePrefix, fmt.Sprintf("%d/%d", i, objCountPerThread[i]))
 | 
						|
				info, err := globalMinioClient.PutObject(uploadsCtx, opts.bucketName, tmpObjName, reader, int64(opts.objectSize), popts)
 | 
						|
				if err != nil {
 | 
						|
					if !contextCanceled(uploadsCtx) && !errors.Is(err, context.Canceled) {
 | 
						|
						errOnce.Do(func() {
 | 
						|
							retError = err.Error()
 | 
						|
						})
 | 
						|
					}
 | 
						|
					uploadsCancel()
 | 
						|
					return
 | 
						|
				}
 | 
						|
				response := time.Since(t)
 | 
						|
				atomic.AddUint64(&totalBytesWritten, uint64(info.Size))
 | 
						|
				objCountPerThread[i]++
 | 
						|
				mu.Lock()
 | 
						|
				uploadTimes = append(uploadTimes, response)
 | 
						|
				mu.Unlock()
 | 
						|
			}
 | 
						|
		}(i)
 | 
						|
	}
 | 
						|
	wg.Wait()
 | 
						|
 | 
						|
	// We already saw write failures, no need to proceed into read's
 | 
						|
	if retError != "" {
 | 
						|
		return SpeedTestResult{
 | 
						|
			Uploads:     totalBytesWritten,
 | 
						|
			Downloads:   totalBytesRead,
 | 
						|
			UploadTimes: uploadTimes,
 | 
						|
			Error:       retError,
 | 
						|
		}, nil
 | 
						|
	}
 | 
						|
 | 
						|
	downloadsCtx, downloadsCancel := context.WithCancel(context.Background())
 | 
						|
	defer downloadsCancel()
 | 
						|
	go func() {
 | 
						|
		time.Sleep(opts.duration)
 | 
						|
		downloadsCancel()
 | 
						|
	}()
 | 
						|
 | 
						|
	gopts := minio.GetObjectOptions{}
 | 
						|
	gopts.Set(globalObjectPerfUserMetadata, "true") // Bypass S3 API freeze
 | 
						|
 | 
						|
	var downloadTimes madmin.TimeDurations
 | 
						|
	var downloadTTFB madmin.TimeDurations
 | 
						|
	wg.Add(opts.concurrency)
 | 
						|
	for i := 0; i < opts.concurrency; i++ {
 | 
						|
		go func(i int) {
 | 
						|
			defer wg.Done()
 | 
						|
			var j uint64
 | 
						|
			if objCountPerThread[i] == 0 {
 | 
						|
				return
 | 
						|
			}
 | 
						|
			for {
 | 
						|
				if objCountPerThread[i] == j {
 | 
						|
					j = 0
 | 
						|
				}
 | 
						|
				tmpObjName := pathJoin(objNamePrefix, fmt.Sprintf("%d/%d", i, j))
 | 
						|
				t := time.Now()
 | 
						|
				r, err := globalMinioClient.GetObject(downloadsCtx, opts.bucketName, tmpObjName, gopts)
 | 
						|
				if err != nil {
 | 
						|
					errResp, ok := err.(minio.ErrorResponse)
 | 
						|
					if ok && errResp.StatusCode == http.StatusNotFound {
 | 
						|
						continue
 | 
						|
					}
 | 
						|
					if !contextCanceled(downloadsCtx) && !errors.Is(err, context.Canceled) {
 | 
						|
						errOnce.Do(func() {
 | 
						|
							retError = err.Error()
 | 
						|
						})
 | 
						|
					}
 | 
						|
					downloadsCancel()
 | 
						|
					return
 | 
						|
				}
 | 
						|
				fbr := firstByteRecorder{
 | 
						|
					r: r,
 | 
						|
				}
 | 
						|
				n, err := io.Copy(io.Discard, &fbr)
 | 
						|
				r.Close()
 | 
						|
				if err == nil {
 | 
						|
					response := time.Since(t)
 | 
						|
					ttfb := time.Since(*fbr.t)
 | 
						|
					// Only capture success criteria - do not
 | 
						|
					// have to capture failed reads, truncated
 | 
						|
					// reads etc.
 | 
						|
					atomic.AddUint64(&totalBytesRead, uint64(n))
 | 
						|
					mu.Lock()
 | 
						|
					downloadTimes = append(downloadTimes, response)
 | 
						|
					downloadTTFB = append(downloadTTFB, ttfb)
 | 
						|
					mu.Unlock()
 | 
						|
				}
 | 
						|
				if err != nil {
 | 
						|
					if !contextCanceled(downloadsCtx) && !errors.Is(err, context.Canceled) {
 | 
						|
						errOnce.Do(func() {
 | 
						|
							retError = err.Error()
 | 
						|
						})
 | 
						|
					}
 | 
						|
					downloadsCancel()
 | 
						|
					return
 | 
						|
				}
 | 
						|
				j++
 | 
						|
			}
 | 
						|
		}(i)
 | 
						|
	}
 | 
						|
	wg.Wait()
 | 
						|
 | 
						|
	return SpeedTestResult{
 | 
						|
		Uploads:       totalBytesWritten,
 | 
						|
		Downloads:     totalBytesRead,
 | 
						|
		UploadTimes:   uploadTimes,
 | 
						|
		DownloadTimes: downloadTimes,
 | 
						|
		DownloadTTFB:  downloadTTFB,
 | 
						|
		Error:         retError,
 | 
						|
	}, nil
 | 
						|
}
 | 
						|
 | 
						|
// To collect RX stats during "mc support perf net"
 | 
						|
// RXSample holds the RX bytes for the duration between
 | 
						|
// the last peer to connect and the first peer to disconnect.
 | 
						|
// This is to improve the RX throughput accuracy.
 | 
						|
type netPerfRX struct {
 | 
						|
	RX                uint64    // RX bytes
 | 
						|
	lastToConnect     time.Time // time at which last peer to connect to us
 | 
						|
	firstToDisconnect time.Time // time at which the first peer disconnects from us
 | 
						|
	RXSample          uint64    // RX bytes between lastToConnect and firstToDisconnect
 | 
						|
	activeConnections uint64
 | 
						|
	sync.RWMutex
 | 
						|
}
 | 
						|
 | 
						|
func (n *netPerfRX) Connect() {
 | 
						|
	n.Lock()
 | 
						|
	defer n.Unlock()
 | 
						|
	n.activeConnections++
 | 
						|
	atomic.StoreUint64(&globalNetPerfRX.RX, 0)
 | 
						|
	n.lastToConnect = time.Now()
 | 
						|
}
 | 
						|
 | 
						|
func (n *netPerfRX) Disconnect() {
 | 
						|
	n.Lock()
 | 
						|
	defer n.Unlock()
 | 
						|
	n.activeConnections--
 | 
						|
	if n.firstToDisconnect.IsZero() {
 | 
						|
		n.RXSample = atomic.LoadUint64(&n.RX)
 | 
						|
		n.firstToDisconnect = time.Now()
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (n *netPerfRX) ActiveConnections() uint64 {
 | 
						|
	n.RLock()
 | 
						|
	defer n.RUnlock()
 | 
						|
	return n.activeConnections
 | 
						|
}
 | 
						|
 | 
						|
func (n *netPerfRX) Reset() {
 | 
						|
	n.Lock()
 | 
						|
	defer n.Unlock()
 | 
						|
	n.RX = 0
 | 
						|
	n.RXSample = 0
 | 
						|
	n.lastToConnect = time.Time{}
 | 
						|
	n.firstToDisconnect = time.Time{}
 | 
						|
}
 | 
						|
 | 
						|
// Reader to read random data.
 | 
						|
type netperfReader struct {
 | 
						|
	n   uint64
 | 
						|
	eof chan struct{}
 | 
						|
	buf []byte
 | 
						|
}
 | 
						|
 | 
						|
func (m *netperfReader) Read(b []byte) (int, error) {
 | 
						|
	select {
 | 
						|
	case <-m.eof:
 | 
						|
		return 0, io.EOF
 | 
						|
	default:
 | 
						|
	}
 | 
						|
	n := copy(b, m.buf)
 | 
						|
	atomic.AddUint64(&m.n, uint64(n))
 | 
						|
	return n, nil
 | 
						|
}
 | 
						|
 | 
						|
func netperf(ctx context.Context, duration time.Duration) madmin.NetperfNodeResult {
 | 
						|
	r := &netperfReader{eof: make(chan struct{})}
 | 
						|
	r.buf = make([]byte, 128*humanize.KiByte)
 | 
						|
	rand.Read(r.buf)
 | 
						|
 | 
						|
	connectionsPerPeer := 16
 | 
						|
 | 
						|
	if len(globalNotificationSys.peerClients) > 16 {
 | 
						|
		// For a large cluster it's enough to have 1 connection per peer to saturate the network.
 | 
						|
		connectionsPerPeer = 1
 | 
						|
	}
 | 
						|
 | 
						|
	errStr := ""
 | 
						|
	var wg sync.WaitGroup
 | 
						|
	for index := range globalNotificationSys.peerClients {
 | 
						|
		if globalNotificationSys.peerClients[index] == nil {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		go func(index int) {
 | 
						|
			for i := 0; i < connectionsPerPeer; i++ {
 | 
						|
				wg.Add(1)
 | 
						|
				go func() {
 | 
						|
					defer wg.Done()
 | 
						|
					err := globalNotificationSys.peerClients[index].DevNull(ctx, r)
 | 
						|
					if err != nil {
 | 
						|
						errStr = err.Error()
 | 
						|
					}
 | 
						|
				}()
 | 
						|
			}
 | 
						|
		}(index)
 | 
						|
	}
 | 
						|
 | 
						|
	time.Sleep(duration)
 | 
						|
	close(r.eof)
 | 
						|
	wg.Wait()
 | 
						|
	for {
 | 
						|
		if globalNetPerfRX.ActiveConnections() == 0 {
 | 
						|
			break
 | 
						|
		}
 | 
						|
		time.Sleep(time.Second)
 | 
						|
	}
 | 
						|
	rx := float64(globalNetPerfRX.RXSample)
 | 
						|
	delta := globalNetPerfRX.firstToDisconnect.Sub(globalNetPerfRX.lastToConnect)
 | 
						|
	if delta < 0 {
 | 
						|
		rx = 0
 | 
						|
		errStr = "network disconnection issues detected"
 | 
						|
	}
 | 
						|
 | 
						|
	globalNetPerfRX.Reset()
 | 
						|
	return madmin.NetperfNodeResult{Endpoint: "", TX: r.n / uint64(duration.Seconds()), RX: uint64(rx / delta.Seconds()), Error: errStr}
 | 
						|
}
 |