2022-03-09 01:54:38 +08:00 
										
									 
								 
							 
							
								
							 
							
								 
							 
							
							
								// Copyright (c) 2022 MinIO, Inc.
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								//
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								// This file is part of MinIO Object Storage stack
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								//
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								// This program is free software: you can redistribute it and/or modify
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								// it under the terms of the GNU Affero General Public License as published by
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								// the Free Software Foundation, either version 3 of the License, or
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								// (at your option) any later version.
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								//
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								// This program is distributed in the hope that it will be useful
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								// but WITHOUT ANY WARRANTY; without even the implied warranty of
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								// GNU Affero General Public License for more details.
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								//
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								// You should have received a copy of the GNU Affero General Public License
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								// along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								package  cmd 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								import  ( 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									"context" 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									"errors" 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									"fmt" 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									"io" 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									"io/ioutil" 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									"math/rand" 
							 
						 
					
						
							
								
									
										
										
										
											2022-04-08 12:20:40 +08:00 
										
									 
								 
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
									"net/http" 
							 
						 
					
						
							
								
									
										
										
										
											2022-03-09 01:54:38 +08:00 
										
									 
								 
							 
							
								
							 
							
								 
							 
							
							
									"sync" 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									"sync/atomic" 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									"time" 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									"github.com/dustin/go-humanize" 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									"github.com/google/uuid" 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									"github.com/minio/madmin-go" 
							 
						 
					
						
							
								
									
										
										
										
											2022-04-08 12:20:40 +08:00 
										
									 
								 
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
									"github.com/minio/minio-go/v7" 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									"github.com/minio/minio-go/v7/pkg/credentials" 
							 
						 
					
						
							
								
									
										
										
										
											2022-03-09 01:54:38 +08:00 
										
									 
								 
							 
							
								
							 
							
								 
							 
							
							
									"github.com/minio/pkg/randreader" 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								// SpeedtestResult return value of the speedtest function
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								type  SpeedtestResult  struct  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									Endpoint   string 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									Uploads    uint64 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									Downloads  uint64 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									Error      string 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								func  newRandomReader ( size  int )  io . Reader  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									return  io . LimitReader ( randreader . New ( ) ,  int64 ( size ) ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								// Runs the speedtest on local MinIO process.
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								func  selfSpeedtest ( ctx  context . Context ,  size ,  concurrent  int ,  duration  time . Duration ,  storageClass  string )  ( SpeedtestResult ,  error )  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									objAPI  :=  newObjectLayerFn ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									if  objAPI  ==  nil  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
										return  SpeedtestResult { } ,  errServerNotInitialized 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									var  errOnce  sync . Once 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									var  retError  string 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									var  wg  sync . WaitGroup 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									var  totalBytesWritten  uint64 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									var  totalBytesRead  uint64 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2022-04-08 12:20:40 +08:00 
										
									 
								 
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
									region  :=  globalSite . Region 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									if  region  ==  ""  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
										region  =  "us-east-1" 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									client ,  err  :=  minio . New ( globalLocalNodeName ,  & minio . Options { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
										Creds :      credentials . NewStaticV4 ( globalActiveCred . AccessKey ,  globalActiveCred . SecretKey ,  "" ) , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
										Secure :     globalIsTLS , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
										Transport :  globalProxyTransport , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
										Region :     region , 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									} ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									if  err  !=  nil  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
										return  SpeedtestResult { } ,  err 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2022-03-09 01:54:38 +08:00 
										
									 
								 
							 
							
								
							 
							
								 
							 
							
							
									objCountPerThread  :=  make ( [ ] uint64 ,  concurrent ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									uploadsCtx ,  uploadsCancel  :=  context . WithCancel ( context . Background ( ) ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									defer  uploadsCancel ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									go  func ( )  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
										time . Sleep ( duration ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
										uploadsCancel ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									} ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2022-04-08 12:20:40 +08:00 
										
									 
								 
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
									objNamePrefix  :=  uuid . New ( ) . String ( )  +  "/" 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									userMetadata  :=  make ( map [ string ] string ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									userMetadata [ globalObjectPerfUserMetadata ]  =  "true" 
							 
						 
					
						
							
								
									
										
										
										
											2022-03-09 01:54:38 +08:00 
										
									 
								 
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									wg . Add ( concurrent ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									for  i  :=  0 ;  i  <  concurrent ;  i ++  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
										go  func ( i  int )  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
											defer  wg . Done ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
											for  { 
							 
						 
					
						
							
								
									
										
										
										
											2022-04-08 12:20:40 +08:00 
										
									 
								 
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
												reader  :=  newRandomReader ( size ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
												info ,  err  :=  client . PutObject ( uploadsCtx ,  globalObjectPerfBucket ,  fmt . Sprintf ( "%s%d.%d" ,  objNamePrefix ,  i ,  objCountPerThread [ i ] ) ,  reader ,  int64 ( size ) ,  minio . PutObjectOptions { UserMetadata :  userMetadata ,  DisableMultipart :  true } )  // Bypass S3 API freeze
 
							 
						 
					
						
							
								
									
										
										
										
											2022-03-09 01:54:38 +08:00 
										
									 
								 
							 
							
								
							 
							
								 
							 
							
							
												if  err  !=  nil  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
													if  ! contextCanceled ( uploadsCtx )  &&  ! errors . Is ( err ,  context . Canceled )  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
														errOnce . Do ( func ( )  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
															retError  =  err . Error ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
														} ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
													} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
													uploadsCancel ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
													return 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
												} 
							 
						 
					
						
							
								
									
										
										
										
											2022-04-08 12:20:40 +08:00 
										
									 
								 
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
												atomic . AddUint64 ( & totalBytesWritten ,  uint64 ( info . Size ) ) 
							 
						 
					
						
							
								
									
										
										
										
											2022-03-09 01:54:38 +08:00 
										
									 
								 
							 
							
								
							 
							
								 
							 
							
							
												objCountPerThread [ i ] ++ 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
											} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
										} ( i ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									wg . Wait ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									// We already saw write failures, no need to proceed into read's
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									if  retError  !=  ""  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
										return  SpeedtestResult { Uploads :  totalBytesWritten ,  Downloads :  totalBytesRead ,  Error :  retError } ,  nil 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									downloadsCtx ,  downloadsCancel  :=  context . WithCancel ( context . Background ( ) ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									defer  downloadsCancel ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									go  func ( )  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
										time . Sleep ( duration ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
										downloadsCancel ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									} ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									wg . Add ( concurrent ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									for  i  :=  0 ;  i  <  concurrent ;  i ++  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
										go  func ( i  int )  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
											defer  wg . Done ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
											var  j  uint64 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
											if  objCountPerThread [ i ]  ==  0  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
												return 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
											} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
											for  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
												if  objCountPerThread [ i ]  ==  j  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
													j  =  0 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
												} 
							 
						 
					
						
							
								
									
										
										
										
											2022-04-08 12:20:40 +08:00 
										
									 
								 
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
												opts  :=  minio . GetObjectOptions { } 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
												opts . Set ( globalObjectPerfUserMetadata ,  "true" )  // Bypass S3 API freeze
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
												r ,  err  :=  client . GetObject ( downloadsCtx ,  globalObjectPerfBucket ,  fmt . Sprintf ( "%s%d.%d" ,  objNamePrefix ,  i ,  j ) ,  opts ) 
							 
						 
					
						
							
								
									
										
										
										
											2022-03-09 01:54:38 +08:00 
										
									 
								 
							 
							
								
							 
							
								 
							 
							
							
												if  err  !=  nil  { 
							 
						 
					
						
							
								
									
										
										
										
											2022-04-08 12:20:40 +08:00 
										
									 
								 
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
													errResp ,  ok  :=  err . ( minio . ErrorResponse ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
													if  ok  &&  errResp . StatusCode  ==  http . StatusNotFound  { 
							 
						 
					
						
							
								
									
										
										
										
											2022-03-09 01:54:38 +08:00 
										
									 
								 
							 
							
								
							 
							
								 
							 
							
							
														continue 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
													} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
													if  ! contextCanceled ( downloadsCtx )  &&  ! errors . Is ( err ,  context . Canceled )  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
														errOnce . Do ( func ( )  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
															retError  =  err . Error ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
														} ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
													} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
													downloadsCancel ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
													return 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
												} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
												n ,  err  :=  io . Copy ( ioutil . Discard ,  r ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
												r . Close ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
												if  err  ==  nil  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
													// Only capture success criteria - do not
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
													// have to capture failed reads, truncated
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
													// reads etc.
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
													atomic . AddUint64 ( & totalBytesRead ,  uint64 ( n ) ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
												} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
												if  err  !=  nil  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
													if  ! contextCanceled ( downloadsCtx )  &&  ! errors . Is ( err ,  context . Canceled )  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
														errOnce . Do ( func ( )  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
															retError  =  err . Error ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
														} ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
													} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
													downloadsCancel ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
													return 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
												} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
												j ++ 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
											} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
										} ( i ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									wg . Wait ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									return  SpeedtestResult { Uploads :  totalBytesWritten ,  Downloads :  totalBytesRead ,  Error :  retError } ,  nil 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								// To collect RX stats during "mc support perf net"
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								// RXSample holds the RX bytes for the duration between
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								// the last peer to connect and the first peer to disconnect.
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								// This is to improve the RX throughput accuracy.
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								type  netPerfRX  struct  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									RX                 uint64     // RX bytes
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									lastToConnect      time . Time  // time at which last peer to connect to us
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									firstToDisconnect  time . Time  // time at which the first peer disconnects from us
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									RXSample           uint64     // RX bytes between lastToConnect and firstToDisconnect
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									activeConnections  uint64 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									sync . RWMutex 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								func  ( n  * netPerfRX )  Connect ( )  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									n . Lock ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									defer  n . Unlock ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									n . activeConnections ++ 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									atomic . StoreUint64 ( & globalNetPerfRX . RX ,  0 ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									n . lastToConnect  =  time . Now ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								func  ( n  * netPerfRX )  Disconnect ( )  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									n . Lock ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									defer  n . Unlock ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									n . activeConnections -- 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									if  n . firstToDisconnect . IsZero ( )  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
										n . RXSample  =  atomic . LoadUint64 ( & n . RX ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
										n . firstToDisconnect  =  time . Now ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								func  ( n  * netPerfRX )  ActiveConnections ( )  uint64  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									n . RLock ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									defer  n . RUnlock ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									return  n . activeConnections 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								func  ( n  * netPerfRX )  Reset ( )  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									n . RLock ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									defer  n . RUnlock ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									n . RX  =  0 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									n . RXSample  =  0 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									n . lastToConnect  =  time . Time { } 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									n . firstToDisconnect  =  time . Time { } 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								// Reader to read random data.
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								type  netperfReader  struct  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									n    uint64 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									eof  chan  struct { } 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									buf  [ ] byte 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								func  ( m  * netperfReader )  Read ( b  [ ] byte )  ( int ,  error )  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									select  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									case  <- m . eof : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
										return  0 ,  io . EOF 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									default : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									n  :=  copy ( b ,  m . buf ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									atomic . AddUint64 ( & m . n ,  uint64 ( n ) ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									return  n ,  nil 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								func  netperf ( ctx  context . Context ,  duration  time . Duration )  madmin . NetperfNodeResult  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									r  :=  & netperfReader { eof :  make ( chan  struct { } ) } 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									r . buf  =  make ( [ ] byte ,  128 * humanize . KiByte ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									rand . Read ( r . buf ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									connectionsPerPeer  :=  16 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									if  len ( globalNotificationSys . peerClients )  >  16  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
										// For a large cluster it's enough to have 1 connection per peer to saturate the network.
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
										connectionsPerPeer  =  1 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									errStr  :=  "" 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									var  wg  sync . WaitGroup 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									for  index  :=  range  globalNotificationSys . peerClients  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
										if  globalNotificationSys . peerClients [ index ]  ==  nil  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
											continue 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
										} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
										go  func ( index  int )  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
											for  i  :=  0 ;  i  <  connectionsPerPeer ;  i ++  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
												wg . Add ( 1 ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
												go  func ( )  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
													defer  wg . Done ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
													err  :=  globalNotificationSys . peerClients [ index ] . DevNull ( ctx ,  r ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
													if  err  !=  nil  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
														errStr  =  err . Error ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
													} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
												} ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
											} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
										} ( index ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									time . Sleep ( duration ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									close ( r . eof ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									wg . Wait ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									for  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
										if  globalNetPerfRX . ActiveConnections ( )  ==  0  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
											break 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
										} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
										time . Sleep ( time . Second ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									rx  :=  float64 ( globalNetPerfRX . RXSample ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									delta  :=  globalNetPerfRX . firstToDisconnect . Sub ( globalNetPerfRX . lastToConnect ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									if  delta  <  0  { 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
										rx  =  0 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
										errStr  =  "network disconnection issues detected" 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									} 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									globalNetPerfRX . Reset ( ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
									return  madmin . NetperfNodeResult { Endpoint :  "" ,  TX :  r . n  /  uint64 ( duration . Seconds ( ) ) ,  RX :  uint64 ( rx  /  delta . Seconds ( ) ) ,  Error :  errStr } 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								}