| 
									
										
										
										
											2023-08-02 12:52:31 +08:00
										 |  |  | // Copyright (c) 2015-2023 MinIO, Inc.
 | 
					
						
							| 
									
										
										
										
											2021-04-19 03:41:13 +08:00
										 |  |  | //
 | 
					
						
							|  |  |  | // This file is part of MinIO Object Storage stack
 | 
					
						
							|  |  |  | //
 | 
					
						
							|  |  |  | // This program is free software: you can redistribute it and/or modify
 | 
					
						
							|  |  |  | // it under the terms of the GNU Affero General Public License as published by
 | 
					
						
							|  |  |  | // the Free Software Foundation, either version 3 of the License, or
 | 
					
						
							|  |  |  | // (at your option) any later version.
 | 
					
						
							|  |  |  | //
 | 
					
						
							|  |  |  | // This program is distributed in the hope that it will be useful
 | 
					
						
							|  |  |  | // but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
					
						
							|  |  |  | // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
					
						
							|  |  |  | // GNU Affero General Public License for more details.
 | 
					
						
							|  |  |  | //
 | 
					
						
							|  |  |  | // You should have received a copy of the GNU Affero General Public License
 | 
					
						
							|  |  |  | // along with this program.  If not, see <http://www.gnu.org/licenses/>.
 | 
					
						
							| 
									
										
										
										
											2020-10-29 00:18:35 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | package cmd | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import ( | 
					
						
							|  |  |  | 	"context" | 
					
						
							|  |  |  | 	"io" | 
					
						
							|  |  |  | 	"sort" | 
					
						
							|  |  |  | 	"strings" | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
											  
											
												perf: websocket grid connectivity for all internode communication (#18461)
This PR adds a WebSocket grid feature that allows servers to communicate via 
a single two-way connection.
There are two request types:
* Single requests, which are `[]byte => ([]byte, error)`. This is for efficient small
  roundtrips with small payloads.
* Streaming requests which are `[]byte, chan []byte => chan []byte (and error)`,
  which allows for different combinations of full two-way streams with an initial payload.
Only a single stream is created between two machines - and there is, as such, no
server/client relation since both sides can initiate and handle requests. Which server
initiates the request is decided deterministically on the server names.
Requests are made through a mux client and server, which handles message
passing, congestion, cancelation, timeouts, etc.
If a connection is lost, all requests are canceled, and the calling server will try
to reconnect. Registered handlers can operate directly on byte 
slices or use a higher-level generics abstraction.
There is no versioning of handlers/clients, and incompatible changes should
be handled by adding new handlers.
The request path can be changed to a new one for any protocol changes.
First, all servers create a "Manager." The manager must know its address 
as well as all remote addresses. This will manage all connections.
To get a connection to any remote, ask the manager to provide it given
the remote address using.
```
func (m *Manager) Connection(host string) *Connection
```
All serverside handlers must also be registered on the manager. This will
make sure that all incoming requests are served. The number of in-flight 
requests and responses must also be given for streaming requests.
The "Connection" returned manages the mux-clients. Requests issued
to the connection will be sent to the remote.
* `func (c *Connection) Request(ctx context.Context, h HandlerID, req []byte) ([]byte, error)`
   performs a single request and returns the result. Any deadline provided on the request is
   forwarded to the server, and canceling the context will make the function return at once.
* `func (c *Connection) NewStream(ctx context.Context, h HandlerID, payload []byte) (st *Stream, err error)`
   will initiate a remote call and send the initial payload.
```Go
// A Stream is a two-way stream.
// All responses *must* be read by the caller.
// If the call is canceled through the context,
//The appropriate error will be returned.
type Stream struct {
	// Responses from the remote server.
	// Channel will be closed after an error or when the remote closes.
	// All responses *must* be read by the caller until either an error is returned or the channel is closed.
	// Canceling the context will cause the context cancellation error to be returned.
	Responses <-chan Response
	// Requests sent to the server.
	// If the handler is defined with 0 incoming capacity this will be nil.
	// Channel *must* be closed to signal the end of the stream.
	// If the request context is canceled, the stream will no longer process requests.
	Requests chan<- []byte
}
type Response struct {
	Msg []byte
	Err error
}
```
There are generic versions of the server/client handlers that allow the use of type
safe implementations for data types that support msgpack marshal/unmarshal.
											
										 
											2023-11-21 09:09:35 +08:00
										 |  |  | 	"github.com/minio/minio/internal/grid" | 
					
						
							| 
									
										
										
										
											2021-06-02 05:59:40 +08:00
										 |  |  | 	xioutil "github.com/minio/minio/internal/ioutil" | 
					
						
							|  |  |  | 	"github.com/minio/minio/internal/logger" | 
					
						
							| 
									
										
										
										
											2023-09-01 08:58:48 +08:00
										 |  |  | 	"github.com/valyala/bytebufferpool" | 
					
						
							| 
									
										
										
										
											2020-10-29 00:18:35 +08:00
										 |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
											  
											
												perf: websocket grid connectivity for all internode communication (#18461)
This PR adds a WebSocket grid feature that allows servers to communicate via 
a single two-way connection.
There are two request types:
* Single requests, which are `[]byte => ([]byte, error)`. This is for efficient small
  roundtrips with small payloads.
* Streaming requests which are `[]byte, chan []byte => chan []byte (and error)`,
  which allows for different combinations of full two-way streams with an initial payload.
Only a single stream is created between two machines - and there is, as such, no
server/client relation since both sides can initiate and handle requests. Which server
initiates the request is decided deterministically on the server names.
Requests are made through a mux client and server, which handles message
passing, congestion, cancelation, timeouts, etc.
If a connection is lost, all requests are canceled, and the calling server will try
to reconnect. Registered handlers can operate directly on byte 
slices or use a higher-level generics abstraction.
There is no versioning of handlers/clients, and incompatible changes should
be handled by adding new handlers.
The request path can be changed to a new one for any protocol changes.
First, all servers create a "Manager." The manager must know its address 
as well as all remote addresses. This will manage all connections.
To get a connection to any remote, ask the manager to provide it given
the remote address using.
```
func (m *Manager) Connection(host string) *Connection
```
All serverside handlers must also be registered on the manager. This will
make sure that all incoming requests are served. The number of in-flight 
requests and responses must also be given for streaming requests.
The "Connection" returned manages the mux-clients. Requests issued
to the connection will be sent to the remote.
* `func (c *Connection) Request(ctx context.Context, h HandlerID, req []byte) ([]byte, error)`
   performs a single request and returns the result. Any deadline provided on the request is
   forwarded to the server, and canceling the context will make the function return at once.
* `func (c *Connection) NewStream(ctx context.Context, h HandlerID, payload []byte) (st *Stream, err error)`
   will initiate a remote call and send the initial payload.
```Go
// A Stream is a two-way stream.
// All responses *must* be read by the caller.
// If the call is canceled through the context,
//The appropriate error will be returned.
type Stream struct {
	// Responses from the remote server.
	// Channel will be closed after an error or when the remote closes.
	// All responses *must* be read by the caller until either an error is returned or the channel is closed.
	// Canceling the context will cause the context cancellation error to be returned.
	Responses <-chan Response
	// Requests sent to the server.
	// If the handler is defined with 0 incoming capacity this will be nil.
	// Channel *must* be closed to signal the end of the stream.
	// If the request context is canceled, the stream will no longer process requests.
	Requests chan<- []byte
}
type Response struct {
	Msg []byte
	Err error
}
```
There are generic versions of the server/client handlers that allow the use of type
safe implementations for data types that support msgpack marshal/unmarshal.
											
										 
											2023-11-21 09:09:35 +08:00
										 |  |  | //go:generate msgp -file $GOFILE
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-10-29 00:18:35 +08:00
										 |  |  | // WalkDirOptions provides options for WalkDir operations.
 | 
					
						
							|  |  |  | type WalkDirOptions struct { | 
					
						
							| 
									
										
										
										
											2021-02-27 07:11:42 +08:00
										 |  |  | 	// Bucket to scanner
 | 
					
						
							| 
									
										
										
										
											2020-10-29 00:18:35 +08:00
										 |  |  | 	Bucket string | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Directory inside the bucket.
 | 
					
						
							|  |  |  | 	BaseDir string | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Do a full recursive scan.
 | 
					
						
							|  |  |  | 	Recursive bool | 
					
						
							| 
									
										
										
										
											2020-11-19 02:44:18 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-12-02 04:07:39 +08:00
										 |  |  | 	// ReportNotFound will return errFileNotFound if all disks reports the BaseDir cannot be found.
 | 
					
						
							|  |  |  | 	ReportNotFound bool | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-11-19 02:44:18 +08:00
										 |  |  | 	// FilterPrefix will only return results with given prefix within folder.
 | 
					
						
							|  |  |  | 	// Should never contain a slash.
 | 
					
						
							|  |  |  | 	FilterPrefix string | 
					
						
							| 
									
										
										
										
											2021-02-19 03:06:54 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	// ForwardTo will forward to the given object path.
 | 
					
						
							|  |  |  | 	ForwardTo string | 
					
						
							| 
									
										
										
										
											2022-09-09 23:13:06 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	// Limit the number of returned objects if > 0.
 | 
					
						
							|  |  |  | 	Limit int | 
					
						
							| 
									
										
											  
											
												perf: websocket grid connectivity for all internode communication (#18461)
This PR adds a WebSocket grid feature that allows servers to communicate via 
a single two-way connection.
There are two request types:
* Single requests, which are `[]byte => ([]byte, error)`. This is for efficient small
  roundtrips with small payloads.
* Streaming requests which are `[]byte, chan []byte => chan []byte (and error)`,
  which allows for different combinations of full two-way streams with an initial payload.
Only a single stream is created between two machines - and there is, as such, no
server/client relation since both sides can initiate and handle requests. Which server
initiates the request is decided deterministically on the server names.
Requests are made through a mux client and server, which handles message
passing, congestion, cancelation, timeouts, etc.
If a connection is lost, all requests are canceled, and the calling server will try
to reconnect. Registered handlers can operate directly on byte 
slices or use a higher-level generics abstraction.
There is no versioning of handlers/clients, and incompatible changes should
be handled by adding new handlers.
The request path can be changed to a new one for any protocol changes.
First, all servers create a "Manager." The manager must know its address 
as well as all remote addresses. This will manage all connections.
To get a connection to any remote, ask the manager to provide it given
the remote address using.
```
func (m *Manager) Connection(host string) *Connection
```
All serverside handlers must also be registered on the manager. This will
make sure that all incoming requests are served. The number of in-flight 
requests and responses must also be given for streaming requests.
The "Connection" returned manages the mux-clients. Requests issued
to the connection will be sent to the remote.
* `func (c *Connection) Request(ctx context.Context, h HandlerID, req []byte) ([]byte, error)`
   performs a single request and returns the result. Any deadline provided on the request is
   forwarded to the server, and canceling the context will make the function return at once.
* `func (c *Connection) NewStream(ctx context.Context, h HandlerID, payload []byte) (st *Stream, err error)`
   will initiate a remote call and send the initial payload.
```Go
// A Stream is a two-way stream.
// All responses *must* be read by the caller.
// If the call is canceled through the context,
//The appropriate error will be returned.
type Stream struct {
	// Responses from the remote server.
	// Channel will be closed after an error or when the remote closes.
	// All responses *must* be read by the caller until either an error is returned or the channel is closed.
	// Canceling the context will cause the context cancellation error to be returned.
	Responses <-chan Response
	// Requests sent to the server.
	// If the handler is defined with 0 incoming capacity this will be nil.
	// Channel *must* be closed to signal the end of the stream.
	// If the request context is canceled, the stream will no longer process requests.
	Requests chan<- []byte
}
type Response struct {
	Msg []byte
	Err error
}
```
There are generic versions of the server/client handlers that allow the use of type
safe implementations for data types that support msgpack marshal/unmarshal.
											
										 
											2023-11-21 09:09:35 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	// DiskID contains the disk ID of the disk.
 | 
					
						
							|  |  |  | 	// Leave empty to not check disk ID.
 | 
					
						
							|  |  |  | 	DiskID string | 
					
						
							| 
									
										
										
										
											2020-10-29 00:18:35 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // WalkDir will traverse a directory and return all entries found.
 | 
					
						
							|  |  |  | // On success a sorted meta cache stream will be returned.
 | 
					
						
							| 
									
										
										
										
											2021-03-30 08:00:55 +08:00
										 |  |  | // Metadata has data stripped, if any.
 | 
					
						
							| 
									
										
										
										
											2021-06-16 05:34:26 +08:00
										 |  |  | func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writer) (err error) { | 
					
						
							| 
									
										
										
										
											2020-10-29 00:18:35 +08:00
										 |  |  | 	// Verify if volume is valid and it exists.
 | 
					
						
							|  |  |  | 	volumeDir, err := s.getVolDir(opts.Bucket) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return err | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-08-29 06:16:41 +08:00
										 |  |  | 	if !skipAccessChecks(opts.Bucket) { | 
					
						
							|  |  |  | 		// Stat a volume entry.
 | 
					
						
							|  |  |  | 		if err = Access(volumeDir); err != nil { | 
					
						
							| 
									
										
										
										
											2024-01-31 04:43:25 +08:00
										 |  |  | 			return convertAccessError(err, errVolumeAccessDenied) | 
					
						
							| 
									
										
										
										
											2020-10-29 00:18:35 +08:00
										 |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-08-02 12:52:31 +08:00
										 |  |  | 	s.RLock() | 
					
						
							|  |  |  | 	legacy := s.formatLegacy | 
					
						
							|  |  |  | 	s.RUnlock() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-10-29 00:18:35 +08:00
										 |  |  | 	// Use a small block size to start sending quickly
 | 
					
						
							|  |  |  | 	w := newMetacacheWriter(wr, 16<<10) | 
					
						
							| 
									
										
										
										
											2021-08-13 05:27:22 +08:00
										 |  |  | 	w.reuseBlocks = true // We are not sharing results, so reuse buffers.
 | 
					
						
							| 
									
										
										
										
											2020-10-29 00:18:35 +08:00
										 |  |  | 	defer w.Close() | 
					
						
							|  |  |  | 	out, err := w.stream() | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return err | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-01-29 02:04:17 +08:00
										 |  |  | 	defer xioutil.SafeClose(out) | 
					
						
							| 
									
										
										
										
											2022-09-09 23:13:06 +08:00
										 |  |  | 	var objsReturned int | 
					
						
							| 
									
										
										
										
											2020-10-29 00:18:35 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-09-15 03:11:27 +08:00
										 |  |  | 	objReturned := func(metadata []byte) { | 
					
						
							|  |  |  | 		if opts.Limit <= 0 { | 
					
						
							|  |  |  | 			return | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2023-01-31 01:13:53 +08:00
										 |  |  | 		if m, _, _ := isIndexedMetaV2(metadata); m != nil && !m.AllHidden(true) { | 
					
						
							| 
									
										
										
										
											2022-09-15 03:11:27 +08:00
										 |  |  | 			objsReturned++ | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2023-07-25 00:30:19 +08:00
										 |  |  | 	send := func(entry metaCacheEntry) error { | 
					
						
							|  |  |  | 		objReturned(entry.metadata) | 
					
						
							|  |  |  | 		select { | 
					
						
							|  |  |  | 		case <-ctx.Done(): | 
					
						
							|  |  |  | 			return ctx.Err() | 
					
						
							|  |  |  | 		case out <- entry: | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		return nil | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2022-09-15 03:11:27 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-02-06 02:12:25 +08:00
										 |  |  | 	// Fast exit track to check if we are listing an object with
 | 
					
						
							|  |  |  | 	// a trailing slash, this will avoid to list the object content.
 | 
					
						
							|  |  |  | 	if HasSuffix(opts.BaseDir, SlashSeparator) { | 
					
						
							| 
									
										
										
										
											2021-09-18 05:11:01 +08:00
										 |  |  | 		metadata, err := s.readMetadata(ctx, pathJoin(volumeDir, | 
					
						
							| 
									
										
										
										
											2021-02-06 02:12:25 +08:00
										 |  |  | 			opts.BaseDir[:len(opts.BaseDir)-1]+globalDirSuffix, | 
					
						
							|  |  |  | 			xlStorageFormatFile)) | 
					
						
							| 
									
										
										
										
											2023-08-02 12:52:31 +08:00
										 |  |  | 		diskHealthCheckOK(ctx, err) | 
					
						
							| 
									
										
										
										
											2021-02-06 02:12:25 +08:00
										 |  |  | 		if err == nil { | 
					
						
							|  |  |  | 			// if baseDir is already a directory object, consider it
 | 
					
						
							| 
									
										
										
										
											2022-09-09 23:13:06 +08:00
										 |  |  | 			// as part of the list call, this is AWS S3 specific
 | 
					
						
							| 
									
										
										
										
											2021-02-06 02:12:25 +08:00
										 |  |  | 			// behavior.
 | 
					
						
							| 
									
										
										
										
											2023-07-25 00:30:19 +08:00
										 |  |  | 			if err := send(metaCacheEntry{ | 
					
						
							| 
									
										
										
										
											2021-02-06 02:12:25 +08:00
										 |  |  | 				name:     opts.BaseDir, | 
					
						
							| 
									
										
										
										
											2021-05-22 02:41:25 +08:00
										 |  |  | 				metadata: metadata, | 
					
						
							| 
									
										
										
										
											2023-07-25 00:30:19 +08:00
										 |  |  | 			}); err != nil { | 
					
						
							|  |  |  | 				return err | 
					
						
							| 
									
										
										
										
											2021-02-06 02:12:25 +08:00
										 |  |  | 			} | 
					
						
							|  |  |  | 		} else { | 
					
						
							| 
									
										
										
										
											2021-07-25 13:03:38 +08:00
										 |  |  | 			st, sterr := Lstat(pathJoin(volumeDir, opts.BaseDir, xlStorageFormatFile)) | 
					
						
							|  |  |  | 			if sterr == nil && st.Mode().IsRegular() { | 
					
						
							| 
									
										
										
										
											2021-02-06 02:12:25 +08:00
										 |  |  | 				return errFileNotFound | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-11-19 02:44:18 +08:00
										 |  |  | 	prefix := opts.FilterPrefix | 
					
						
							| 
									
										
										
										
											2020-10-29 00:18:35 +08:00
										 |  |  | 	var scanDir func(path string) error | 
					
						
							| 
									
										
										
										
											2021-07-06 06:34:41 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-10-29 00:18:35 +08:00
										 |  |  | 	scanDir = func(current string) error { | 
					
						
							| 
									
										
										
										
											2021-07-06 06:34:41 +08:00
										 |  |  | 		// Skip forward, if requested...
 | 
					
						
							| 
									
										
										
										
											2023-09-01 08:58:48 +08:00
										 |  |  | 		sb := bytebufferpool.Get() | 
					
						
							|  |  |  | 		defer func() { | 
					
						
							|  |  |  | 			sb.Reset() | 
					
						
							|  |  |  | 			bytebufferpool.Put(sb) | 
					
						
							|  |  |  | 		}() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-07-06 06:34:41 +08:00
										 |  |  | 		forward := "" | 
					
						
							|  |  |  | 		if len(opts.ForwardTo) > 0 && strings.HasPrefix(opts.ForwardTo, current) { | 
					
						
							|  |  |  | 			forward = strings.TrimPrefix(opts.ForwardTo, current) | 
					
						
							| 
									
										
										
										
											2021-11-11 02:41:21 +08:00
										 |  |  | 			// Trim further directories and trailing slash.
 | 
					
						
							| 
									
										
										
										
											2021-07-06 06:34:41 +08:00
										 |  |  | 			if idx := strings.IndexByte(forward, '/'); idx > 0 { | 
					
						
							|  |  |  | 				forward = forward[:idx] | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2021-03-27 02:18:30 +08:00
										 |  |  | 		if contextCanceled(ctx) { | 
					
						
							|  |  |  | 			return ctx.Err() | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2022-09-09 23:13:06 +08:00
										 |  |  | 		if opts.Limit > 0 && objsReturned >= opts.Limit { | 
					
						
							|  |  |  | 			return nil | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2021-09-18 03:14:12 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-07-25 00:30:19 +08:00
										 |  |  | 		if s.walkMu != nil { | 
					
						
							|  |  |  | 			s.walkMu.Lock() | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2024-01-31 04:43:25 +08:00
										 |  |  | 		entries, err := s.ListDir(ctx, "", opts.Bucket, current, -1) | 
					
						
							| 
									
										
										
										
											2023-07-25 00:30:19 +08:00
										 |  |  | 		if s.walkMu != nil { | 
					
						
							|  |  |  | 			s.walkMu.Unlock() | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2020-10-29 00:18:35 +08:00
										 |  |  | 		if err != nil { | 
					
						
							|  |  |  | 			// Folder could have gone away in-between
 | 
					
						
							|  |  |  | 			if err != errVolumeNotFound && err != errFileNotFound { | 
					
						
							| 
									
										
										
										
											2023-06-25 11:29:13 +08:00
										 |  |  | 				logger.LogOnceIf(ctx, err, "metacache-walk-scan-dir") | 
					
						
							| 
									
										
										
										
											2020-10-29 00:18:35 +08:00
										 |  |  | 			} | 
					
						
							| 
									
										
										
										
											2020-12-02 04:07:39 +08:00
										 |  |  | 			if opts.ReportNotFound && err == errFileNotFound && current == opts.BaseDir { | 
					
						
							| 
									
										
										
										
											2023-07-25 00:30:19 +08:00
										 |  |  | 				err = errFileNotFound | 
					
						
							|  |  |  | 			} else { | 
					
						
							|  |  |  | 				err = nil | 
					
						
							| 
									
										
										
										
											2020-12-02 04:07:39 +08:00
										 |  |  | 			} | 
					
						
							| 
									
										
										
										
											2023-07-25 00:30:19 +08:00
										 |  |  | 			diskHealthCheckOK(ctx, err) | 
					
						
							|  |  |  | 			return err | 
					
						
							| 
									
										
										
										
											2020-10-29 00:18:35 +08:00
										 |  |  | 		} | 
					
						
							| 
									
										
										
										
											2022-03-10 03:38:54 +08:00
										 |  |  | 		diskHealthCheckOK(ctx, err) | 
					
						
							| 
									
										
										
										
											2021-07-25 13:03:38 +08:00
										 |  |  | 		if len(entries) == 0 { | 
					
						
							|  |  |  | 			return nil | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2020-11-13 05:09:34 +08:00
										 |  |  | 		dirObjects := make(map[string]struct{}) | 
					
						
							| 
									
										
										
										
											2023-04-16 01:25:25 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 		// Avoid a bunch of cleanup when joining.
 | 
					
						
							|  |  |  | 		current = strings.Trim(current, SlashSeparator) | 
					
						
							| 
									
										
										
										
											2020-10-29 00:18:35 +08:00
										 |  |  | 		for i, entry := range entries { | 
					
						
							| 
									
										
										
										
											2022-09-09 23:13:06 +08:00
										 |  |  | 			if opts.Limit > 0 && objsReturned >= opts.Limit { | 
					
						
							|  |  |  | 				return nil | 
					
						
							|  |  |  | 			} | 
					
						
							| 
									
										
										
										
											2021-08-18 22:40:53 +08:00
										 |  |  | 			if len(prefix) > 0 && !strings.HasPrefix(entry, prefix) { | 
					
						
							| 
									
										
										
										
											2022-09-15 03:11:27 +08:00
										 |  |  | 				// Do not retain the file, since it doesn't
 | 
					
						
							| 
									
										
										
										
											2021-08-18 22:40:53 +08:00
										 |  |  | 				// match the prefix.
 | 
					
						
							|  |  |  | 				entries[i] = "" | 
					
						
							|  |  |  | 				continue | 
					
						
							|  |  |  | 			} | 
					
						
							| 
									
										
										
										
											2021-02-19 03:06:54 +08:00
										 |  |  | 			if len(forward) > 0 && entry < forward { | 
					
						
							| 
									
										
										
										
											2022-09-15 03:11:27 +08:00
										 |  |  | 				// Do not retain the file, since its
 | 
					
						
							| 
									
										
										
										
											2021-08-18 22:40:53 +08:00
										 |  |  | 				// lexially smaller than 'forward'
 | 
					
						
							|  |  |  | 				entries[i] = "" | 
					
						
							| 
									
										
										
										
											2021-02-19 03:06:54 +08:00
										 |  |  | 				continue | 
					
						
							|  |  |  | 			} | 
					
						
							| 
									
										
										
										
											2023-04-16 01:25:25 +08:00
										 |  |  | 			if hasSuffixByte(entry, SlashSeparatorChar) { | 
					
						
							| 
									
										
										
										
											2020-11-13 05:09:34 +08:00
										 |  |  | 				if strings.HasSuffix(entry, globalDirSuffixWithSlash) { | 
					
						
							|  |  |  | 					// Add without extension so it is sorted correctly.
 | 
					
						
							|  |  |  | 					entry = strings.TrimSuffix(entry, globalDirSuffixWithSlash) + slashSeparator | 
					
						
							|  |  |  | 					dirObjects[entry] = struct{}{} | 
					
						
							|  |  |  | 					entries[i] = entry | 
					
						
							|  |  |  | 					continue | 
					
						
							|  |  |  | 				} | 
					
						
							| 
									
										
										
										
											2021-11-11 02:41:21 +08:00
										 |  |  | 				// Trim slash, since we don't know if this is folder or object.
 | 
					
						
							| 
									
										
										
										
											2020-10-29 00:18:35 +08:00
										 |  |  | 				entries[i] = entries[i][:len(entry)-1] | 
					
						
							|  |  |  | 				continue | 
					
						
							|  |  |  | 			} | 
					
						
							| 
									
										
										
										
											2022-08-25 00:11:16 +08:00
										 |  |  | 			// Do not retain the file.
 | 
					
						
							| 
									
										
										
										
											2020-10-29 00:18:35 +08:00
										 |  |  | 			entries[i] = "" | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-03-27 02:18:30 +08:00
										 |  |  | 			if contextCanceled(ctx) { | 
					
						
							|  |  |  | 				return ctx.Err() | 
					
						
							|  |  |  | 			} | 
					
						
							| 
									
										
										
										
											2020-10-29 00:18:35 +08:00
										 |  |  | 			// If root was an object return it as such.
 | 
					
						
							|  |  |  | 			if HasSuffix(entry, xlStorageFormatFile) { | 
					
						
							|  |  |  | 				var meta metaCacheEntry | 
					
						
							| 
									
										
										
										
											2023-07-25 00:30:19 +08:00
										 |  |  | 				if s.walkReadMu != nil { | 
					
						
							|  |  |  | 					s.walkReadMu.Lock() | 
					
						
							|  |  |  | 				} | 
					
						
							| 
									
										
										
										
											2023-09-01 08:58:48 +08:00
										 |  |  | 				meta.metadata, err = s.readMetadata(ctx, pathJoinBuf(sb, volumeDir, current, entry)) | 
					
						
							| 
									
										
										
										
											2023-07-25 00:30:19 +08:00
										 |  |  | 				if s.walkReadMu != nil { | 
					
						
							|  |  |  | 					s.walkReadMu.Unlock() | 
					
						
							|  |  |  | 				} | 
					
						
							| 
									
										
										
										
											2022-03-10 03:38:54 +08:00
										 |  |  | 				diskHealthCheckOK(ctx, err) | 
					
						
							| 
									
										
										
										
											2021-05-22 00:10:54 +08:00
										 |  |  | 				if err != nil { | 
					
						
							| 
									
										
										
										
											2022-06-17 23:23:47 +08:00
										 |  |  | 					// It is totally possible that xl.meta was overwritten
 | 
					
						
							|  |  |  | 					// while being concurrently listed at the same time in
 | 
					
						
							|  |  |  | 					// such scenarios the 'xl.meta' might get truncated
 | 
					
						
							|  |  |  | 					if !IsErrIgnored(err, io.EOF, io.ErrUnexpectedEOF) { | 
					
						
							| 
									
										
										
										
											2023-06-25 11:29:13 +08:00
										 |  |  | 						logger.LogOnceIf(ctx, err, "metacache-walk-read-metadata") | 
					
						
							| 
									
										
										
										
											2022-06-17 23:23:47 +08:00
										 |  |  | 					} | 
					
						
							| 
									
										
										
										
											2021-05-22 00:10:54 +08:00
										 |  |  | 					continue | 
					
						
							|  |  |  | 				} | 
					
						
							| 
									
										
										
										
											2020-11-14 08:58:20 +08:00
										 |  |  | 				meta.name = strings.TrimSuffix(entry, xlStorageFormatFile) | 
					
						
							| 
									
										
										
										
											2020-10-29 00:18:35 +08:00
										 |  |  | 				meta.name = strings.TrimSuffix(meta.name, SlashSeparator) | 
					
						
							| 
									
										
										
										
											2023-09-01 08:58:48 +08:00
										 |  |  | 				meta.name = pathJoinBuf(sb, current, meta.name) | 
					
						
							| 
									
										
										
										
											2020-11-13 05:09:34 +08:00
										 |  |  | 				meta.name = decodeDirObject(meta.name) | 
					
						
							| 
									
										
										
										
											2023-10-10 08:27:55 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 				return send(meta) | 
					
						
							| 
									
										
										
										
											2020-10-29 00:18:35 +08:00
										 |  |  | 			} | 
					
						
							|  |  |  | 			// Check legacy.
 | 
					
						
							| 
									
										
										
										
											2023-08-02 12:52:31 +08:00
										 |  |  | 			if HasSuffix(entry, xlStorageFormatFileV1) && legacy { | 
					
						
							| 
									
										
										
										
											2020-10-29 00:18:35 +08:00
										 |  |  | 				var meta metaCacheEntry | 
					
						
							| 
									
										
										
										
											2023-09-01 08:58:48 +08:00
										 |  |  | 				meta.metadata, err = xioutil.ReadFile(pathJoinBuf(sb, volumeDir, current, entry)) | 
					
						
							| 
									
										
										
										
											2022-03-10 03:38:54 +08:00
										 |  |  | 				diskHealthCheckOK(ctx, err) | 
					
						
							| 
									
										
										
										
											2020-10-29 00:18:35 +08:00
										 |  |  | 				if err != nil { | 
					
						
							| 
									
										
										
										
											2022-06-17 23:23:47 +08:00
										 |  |  | 					if !IsErrIgnored(err, io.EOF, io.ErrUnexpectedEOF) { | 
					
						
							|  |  |  | 						logger.LogIf(ctx, err) | 
					
						
							|  |  |  | 					} | 
					
						
							| 
									
										
										
										
											2020-10-29 00:18:35 +08:00
										 |  |  | 					continue | 
					
						
							|  |  |  | 				} | 
					
						
							| 
									
										
										
										
											2020-11-14 08:58:20 +08:00
										 |  |  | 				meta.name = strings.TrimSuffix(entry, xlStorageFormatFileV1) | 
					
						
							| 
									
										
										
										
											2020-10-29 00:18:35 +08:00
										 |  |  | 				meta.name = strings.TrimSuffix(meta.name, SlashSeparator) | 
					
						
							| 
									
										
										
										
											2023-09-01 08:58:48 +08:00
										 |  |  | 				meta.name = pathJoinBuf(sb, current, meta.name) | 
					
						
							| 
									
										
										
										
											2023-10-10 08:27:55 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 				return send(meta) | 
					
						
							| 
									
										
										
										
											2020-10-29 00:18:35 +08:00
										 |  |  | 			} | 
					
						
							|  |  |  | 			// Skip all other files.
 | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		// Process in sort order.
 | 
					
						
							|  |  |  | 		sort.Strings(entries) | 
					
						
							|  |  |  | 		dirStack := make([]string, 0, 5) | 
					
						
							| 
									
										
										
										
											2021-08-18 22:40:53 +08:00
										 |  |  | 		prefix = "" // Remove prefix after first level as we have already filtered the list.
 | 
					
						
							| 
									
										
										
										
											2021-07-06 06:34:41 +08:00
										 |  |  | 		if len(forward) > 0 { | 
					
						
							| 
									
										
										
										
											2021-11-11 02:41:21 +08:00
										 |  |  | 			// Conservative forwarding. Entries may be either objects or prefixes.
 | 
					
						
							|  |  |  | 			for i, entry := range entries { | 
					
						
							|  |  |  | 				if entry >= forward || strings.HasPrefix(forward, entry) { | 
					
						
							|  |  |  | 					entries = entries[i:] | 
					
						
							|  |  |  | 					break | 
					
						
							|  |  |  | 				} | 
					
						
							| 
									
										
										
										
											2021-07-06 06:34:41 +08:00
										 |  |  | 			} | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2021-02-19 03:06:54 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-10-29 00:18:35 +08:00
										 |  |  | 		for _, entry := range entries { | 
					
						
							| 
									
										
										
										
											2022-09-09 23:13:06 +08:00
										 |  |  | 			if opts.Limit > 0 && objsReturned >= opts.Limit { | 
					
						
							|  |  |  | 				return nil | 
					
						
							|  |  |  | 			} | 
					
						
							| 
									
										
										
										
											2020-10-29 00:18:35 +08:00
										 |  |  | 			if entry == "" { | 
					
						
							|  |  |  | 				continue | 
					
						
							|  |  |  | 			} | 
					
						
							| 
									
										
										
										
											2021-03-27 02:18:30 +08:00
										 |  |  | 			if contextCanceled(ctx) { | 
					
						
							|  |  |  | 				return ctx.Err() | 
					
						
							|  |  |  | 			} | 
					
						
							| 
									
										
										
										
											2023-09-01 08:58:48 +08:00
										 |  |  | 			meta := metaCacheEntry{name: pathJoinBuf(sb, current, entry)} | 
					
						
							| 
									
										
										
										
											2020-10-29 00:18:35 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 			// If directory entry on stack before this, pop it now.
 | 
					
						
							|  |  |  | 			for len(dirStack) > 0 && dirStack[len(dirStack)-1] < meta.name { | 
					
						
							|  |  |  | 				pop := dirStack[len(dirStack)-1] | 
					
						
							| 
									
										
										
										
											2024-02-21 07:00:35 +08:00
										 |  |  | 				select { | 
					
						
							|  |  |  | 				case <-ctx.Done(): | 
					
						
							|  |  |  | 					return ctx.Err() | 
					
						
							|  |  |  | 				case out <- metaCacheEntry{name: pop}: | 
					
						
							|  |  |  | 				} | 
					
						
							| 
									
										
										
										
											2020-10-29 00:18:35 +08:00
										 |  |  | 				if opts.Recursive { | 
					
						
							|  |  |  | 					// Scan folder we found. Should be in correct sort order where we are.
 | 
					
						
							| 
									
										
										
										
											2022-06-17 23:23:47 +08:00
										 |  |  | 					err := scanDir(pop) | 
					
						
							|  |  |  | 					if err != nil && !IsErrIgnored(err, context.Canceled) { | 
					
						
							|  |  |  | 						logger.LogIf(ctx, err) | 
					
						
							|  |  |  | 					} | 
					
						
							| 
									
										
										
										
											2020-10-29 00:18:35 +08:00
										 |  |  | 				} | 
					
						
							|  |  |  | 				dirStack = dirStack[:len(dirStack)-1] | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 			// All objects will be returned as directories, there has been no object check yet.
 | 
					
						
							|  |  |  | 			// Check it by attempting to read metadata.
 | 
					
						
							| 
									
										
										
										
											2020-11-13 05:09:34 +08:00
										 |  |  | 			_, isDirObj := dirObjects[entry] | 
					
						
							|  |  |  | 			if isDirObj { | 
					
						
							|  |  |  | 				meta.name = meta.name[:len(meta.name)-1] + globalDirSuffixWithSlash | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-07-25 00:30:19 +08:00
										 |  |  | 			if s.walkReadMu != nil { | 
					
						
							|  |  |  | 				s.walkReadMu.Lock() | 
					
						
							|  |  |  | 			} | 
					
						
							| 
									
										
										
										
											2023-09-01 08:58:48 +08:00
										 |  |  | 			meta.metadata, err = s.readMetadata(ctx, pathJoinBuf(sb, volumeDir, meta.name, xlStorageFormatFile)) | 
					
						
							| 
									
										
										
										
											2023-07-25 00:30:19 +08:00
										 |  |  | 			if s.walkReadMu != nil { | 
					
						
							|  |  |  | 				s.walkReadMu.Unlock() | 
					
						
							|  |  |  | 			} | 
					
						
							| 
									
										
										
										
											2022-03-10 03:38:54 +08:00
										 |  |  | 			diskHealthCheckOK(ctx, err) | 
					
						
							| 
									
										
										
										
											2020-10-29 00:18:35 +08:00
										 |  |  | 			switch { | 
					
						
							|  |  |  | 			case err == nil: | 
					
						
							|  |  |  | 				// It was an object
 | 
					
						
							| 
									
										
										
										
											2020-11-13 05:09:34 +08:00
										 |  |  | 				if isDirObj { | 
					
						
							|  |  |  | 					meta.name = strings.TrimSuffix(meta.name, globalDirSuffixWithSlash) + slashSeparator | 
					
						
							|  |  |  | 				} | 
					
						
							| 
									
										
										
										
											2023-07-25 00:30:19 +08:00
										 |  |  | 				if err := send(meta); err != nil { | 
					
						
							|  |  |  | 					return err | 
					
						
							|  |  |  | 				} | 
					
						
							| 
									
										
										
										
											2021-08-21 15:12:29 +08:00
										 |  |  | 			case osIsNotExist(err), isSysErrIsDir(err): | 
					
						
							| 
									
										
										
										
											2023-08-02 12:52:31 +08:00
										 |  |  | 				if legacy { | 
					
						
							| 
									
										
										
										
											2023-09-01 08:58:48 +08:00
										 |  |  | 					meta.metadata, err = xioutil.ReadFile(pathJoinBuf(sb, volumeDir, meta.name, xlStorageFormatFileV1)) | 
					
						
							| 
									
										
										
										
											2023-08-02 12:52:31 +08:00
										 |  |  | 					diskHealthCheckOK(ctx, err) | 
					
						
							|  |  |  | 					if err == nil { | 
					
						
							|  |  |  | 						// It was an object
 | 
					
						
							|  |  |  | 						if err := send(meta); err != nil { | 
					
						
							|  |  |  | 							return err | 
					
						
							|  |  |  | 						} | 
					
						
							|  |  |  | 						continue | 
					
						
							| 
									
										
										
										
											2023-07-25 00:30:19 +08:00
										 |  |  | 					} | 
					
						
							| 
									
										
										
										
											2020-10-29 00:18:35 +08:00
										 |  |  | 				} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 				// NOT an object, append to stack (with slash)
 | 
					
						
							| 
									
										
										
										
											2020-11-13 05:09:34 +08:00
										 |  |  | 				// If dirObject, but no metadata (which is unexpected) we skip it.
 | 
					
						
							|  |  |  | 				if !isDirObj { | 
					
						
							| 
									
										
										
										
											2023-09-01 08:58:48 +08:00
										 |  |  | 					if !isDirEmpty(pathJoinBuf(sb, volumeDir, meta.name)) { | 
					
						
							| 
									
										
										
										
											2021-02-04 06:06:54 +08:00
										 |  |  | 						dirStack = append(dirStack, meta.name+slashSeparator) | 
					
						
							|  |  |  | 					} | 
					
						
							| 
									
										
										
										
											2020-11-13 05:09:34 +08:00
										 |  |  | 				} | 
					
						
							| 
									
										
										
										
											2020-11-20 01:15:09 +08:00
										 |  |  | 			case isSysErrNotDir(err): | 
					
						
							|  |  |  | 				// skip
 | 
					
						
							| 
									
										
										
										
											2020-10-29 00:18:35 +08:00
										 |  |  | 			} | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2021-07-25 13:03:38 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-10-29 00:18:35 +08:00
										 |  |  | 		// If directory entry left on stack, pop it now.
 | 
					
						
							|  |  |  | 		for len(dirStack) > 0 { | 
					
						
							| 
									
										
										
										
											2022-09-09 23:13:06 +08:00
										 |  |  | 			if opts.Limit > 0 && objsReturned >= opts.Limit { | 
					
						
							|  |  |  | 				return nil | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 			if contextCanceled(ctx) { | 
					
						
							|  |  |  | 				return ctx.Err() | 
					
						
							|  |  |  | 			} | 
					
						
							| 
									
										
										
										
											2020-10-29 00:18:35 +08:00
										 |  |  | 			pop := dirStack[len(dirStack)-1] | 
					
						
							| 
									
										
										
										
											2024-02-21 07:00:35 +08:00
										 |  |  | 			select { | 
					
						
							|  |  |  | 			case <-ctx.Done(): | 
					
						
							|  |  |  | 				return ctx.Err() | 
					
						
							|  |  |  | 			case out <- metaCacheEntry{name: pop}: | 
					
						
							|  |  |  | 			} | 
					
						
							| 
									
										
										
										
											2020-10-29 00:18:35 +08:00
										 |  |  | 			if opts.Recursive { | 
					
						
							|  |  |  | 				// Scan folder we found. Should be in correct sort order where we are.
 | 
					
						
							| 
									
										
										
										
											2021-02-19 03:06:54 +08:00
										 |  |  | 				logger.LogIf(ctx, scanDir(pop)) | 
					
						
							| 
									
										
										
										
											2020-10-29 00:18:35 +08:00
										 |  |  | 			} | 
					
						
							|  |  |  | 			dirStack = dirStack[:len(dirStack)-1] | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		return nil | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Stream output.
 | 
					
						
							|  |  |  | 	return scanDir(opts.BaseDir) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-03-10 03:38:54 +08:00
										 |  |  | func (p *xlStorageDiskIDCheck) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writer) (err error) { | 
					
						
							| 
									
										
											  
											
												perf: websocket grid connectivity for all internode communication (#18461)
This PR adds a WebSocket grid feature that allows servers to communicate via 
a single two-way connection.
There are two request types:
* Single requests, which are `[]byte => ([]byte, error)`. This is for efficient small
  roundtrips with small payloads.
* Streaming requests which are `[]byte, chan []byte => chan []byte (and error)`,
  which allows for different combinations of full two-way streams with an initial payload.
Only a single stream is created between two machines - and there is, as such, no
server/client relation since both sides can initiate and handle requests. Which server
initiates the request is decided deterministically on the server names.
Requests are made through a mux client and server, which handles message
passing, congestion, cancelation, timeouts, etc.
If a connection is lost, all requests are canceled, and the calling server will try
to reconnect. Registered handlers can operate directly on byte 
slices or use a higher-level generics abstraction.
There is no versioning of handlers/clients, and incompatible changes should
be handled by adding new handlers.
The request path can be changed to a new one for any protocol changes.
First, all servers create a "Manager." The manager must know its address 
as well as all remote addresses. This will manage all connections.
To get a connection to any remote, ask the manager to provide it given
the remote address using.
```
func (m *Manager) Connection(host string) *Connection
```
All serverside handlers must also be registered on the manager. This will
make sure that all incoming requests are served. The number of in-flight 
requests and responses must also be given for streaming requests.
The "Connection" returned manages the mux-clients. Requests issued
to the connection will be sent to the remote.
* `func (c *Connection) Request(ctx context.Context, h HandlerID, req []byte) ([]byte, error)`
   performs a single request and returns the result. Any deadline provided on the request is
   forwarded to the server, and canceling the context will make the function return at once.
* `func (c *Connection) NewStream(ctx context.Context, h HandlerID, payload []byte) (st *Stream, err error)`
   will initiate a remote call and send the initial payload.
```Go
// A Stream is a two-way stream.
// All responses *must* be read by the caller.
// If the call is canceled through the context,
//The appropriate error will be returned.
type Stream struct {
	// Responses from the remote server.
	// Channel will be closed after an error or when the remote closes.
	// All responses *must* be read by the caller until either an error is returned or the channel is closed.
	// Canceling the context will cause the context cancellation error to be returned.
	Responses <-chan Response
	// Requests sent to the server.
	// If the handler is defined with 0 incoming capacity this will be nil.
	// Channel *must* be closed to signal the end of the stream.
	// If the request context is canceled, the stream will no longer process requests.
	Requests chan<- []byte
}
type Response struct {
	Msg []byte
	Err error
}
```
There are generic versions of the server/client handlers that allow the use of type
safe implementations for data types that support msgpack marshal/unmarshal.
											
										 
											2023-11-21 09:09:35 +08:00
										 |  |  | 	if err := p.checkID(opts.DiskID); err != nil { | 
					
						
							|  |  |  | 		return err | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2022-03-10 03:38:54 +08:00
										 |  |  | 	ctx, done, err := p.TrackDiskHealth(ctx, storageMetricWalkDir, opts.Bucket, opts.BaseDir) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							| 
									
										
										
										
											2020-10-29 00:18:35 +08:00
										 |  |  | 		return err | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2022-03-10 03:38:54 +08:00
										 |  |  | 	defer done(&err) | 
					
						
							| 
									
										
										
										
											2021-09-18 05:11:01 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-10-29 00:18:35 +08:00
										 |  |  | 	return p.storage.WalkDir(ctx, opts, wr) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // WalkDir will traverse a directory and return all entries found.
 | 
					
						
							|  |  |  | // On success a meta cache stream will be returned, that should be closed when done.
 | 
					
						
							|  |  |  | func (client *storageRESTClient) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writer) error { | 
					
						
							| 
									
										
											  
											
												perf: websocket grid connectivity for all internode communication (#18461)
This PR adds a WebSocket grid feature that allows servers to communicate via 
a single two-way connection.
There are two request types:
* Single requests, which are `[]byte => ([]byte, error)`. This is for efficient small
  roundtrips with small payloads.
* Streaming requests which are `[]byte, chan []byte => chan []byte (and error)`,
  which allows for different combinations of full two-way streams with an initial payload.
Only a single stream is created between two machines - and there is, as such, no
server/client relation since both sides can initiate and handle requests. Which server
initiates the request is decided deterministically on the server names.
Requests are made through a mux client and server, which handles message
passing, congestion, cancelation, timeouts, etc.
If a connection is lost, all requests are canceled, and the calling server will try
to reconnect. Registered handlers can operate directly on byte 
slices or use a higher-level generics abstraction.
There is no versioning of handlers/clients, and incompatible changes should
be handled by adding new handlers.
The request path can be changed to a new one for any protocol changes.
First, all servers create a "Manager." The manager must know its address 
as well as all remote addresses. This will manage all connections.
To get a connection to any remote, ask the manager to provide it given
the remote address using.
```
func (m *Manager) Connection(host string) *Connection
```
All serverside handlers must also be registered on the manager. This will
make sure that all incoming requests are served. The number of in-flight 
requests and responses must also be given for streaming requests.
The "Connection" returned manages the mux-clients. Requests issued
to the connection will be sent to the remote.
* `func (c *Connection) Request(ctx context.Context, h HandlerID, req []byte) ([]byte, error)`
   performs a single request and returns the result. Any deadline provided on the request is
   forwarded to the server, and canceling the context will make the function return at once.
* `func (c *Connection) NewStream(ctx context.Context, h HandlerID, payload []byte) (st *Stream, err error)`
   will initiate a remote call and send the initial payload.
```Go
// A Stream is a two-way stream.
// All responses *must* be read by the caller.
// If the call is canceled through the context,
//The appropriate error will be returned.
type Stream struct {
	// Responses from the remote server.
	// Channel will be closed after an error or when the remote closes.
	// All responses *must* be read by the caller until either an error is returned or the channel is closed.
	// Canceling the context will cause the context cancellation error to be returned.
	Responses <-chan Response
	// Requests sent to the server.
	// If the handler is defined with 0 incoming capacity this will be nil.
	// Channel *must* be closed to signal the end of the stream.
	// If the request context is canceled, the stream will no longer process requests.
	Requests chan<- []byte
}
type Response struct {
	Msg []byte
	Err error
}
```
There are generic versions of the server/client handlers that allow the use of type
safe implementations for data types that support msgpack marshal/unmarshal.
											
										 
											2023-11-21 09:09:35 +08:00
										 |  |  | 	// Ensure remote has the same disk ID.
 | 
					
						
							|  |  |  | 	opts.DiskID = client.diskID | 
					
						
							|  |  |  | 	b, err := opts.MarshalMsg(grid.GetByteBuffer()[:0]) | 
					
						
							| 
									
										
										
										
											2020-10-29 00:18:35 +08:00
										 |  |  | 	if err != nil { | 
					
						
							| 
									
										
										
										
											2024-01-28 08:13:41 +08:00
										 |  |  | 		return toStorageErr(err) | 
					
						
							| 
									
										
										
										
											2020-10-29 00:18:35 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
											  
											
												perf: websocket grid connectivity for all internode communication (#18461)
This PR adds a WebSocket grid feature that allows servers to communicate via 
a single two-way connection.
There are two request types:
* Single requests, which are `[]byte => ([]byte, error)`. This is for efficient small
  roundtrips with small payloads.
* Streaming requests which are `[]byte, chan []byte => chan []byte (and error)`,
  which allows for different combinations of full two-way streams with an initial payload.
Only a single stream is created between two machines - and there is, as such, no
server/client relation since both sides can initiate and handle requests. Which server
initiates the request is decided deterministically on the server names.
Requests are made through a mux client and server, which handles message
passing, congestion, cancelation, timeouts, etc.
If a connection is lost, all requests are canceled, and the calling server will try
to reconnect. Registered handlers can operate directly on byte 
slices or use a higher-level generics abstraction.
There is no versioning of handlers/clients, and incompatible changes should
be handled by adding new handlers.
The request path can be changed to a new one for any protocol changes.
First, all servers create a "Manager." The manager must know its address 
as well as all remote addresses. This will manage all connections.
To get a connection to any remote, ask the manager to provide it given
the remote address using.
```
func (m *Manager) Connection(host string) *Connection
```
All serverside handlers must also be registered on the manager. This will
make sure that all incoming requests are served. The number of in-flight 
requests and responses must also be given for streaming requests.
The "Connection" returned manages the mux-clients. Requests issued
to the connection will be sent to the remote.
* `func (c *Connection) Request(ctx context.Context, h HandlerID, req []byte) ([]byte, error)`
   performs a single request and returns the result. Any deadline provided on the request is
   forwarded to the server, and canceling the context will make the function return at once.
* `func (c *Connection) NewStream(ctx context.Context, h HandlerID, payload []byte) (st *Stream, err error)`
   will initiate a remote call and send the initial payload.
```Go
// A Stream is a two-way stream.
// All responses *must* be read by the caller.
// If the call is canceled through the context,
//The appropriate error will be returned.
type Stream struct {
	// Responses from the remote server.
	// Channel will be closed after an error or when the remote closes.
	// All responses *must* be read by the caller until either an error is returned or the channel is closed.
	// Canceling the context will cause the context cancellation error to be returned.
	Responses <-chan Response
	// Requests sent to the server.
	// If the handler is defined with 0 incoming capacity this will be nil.
	// Channel *must* be closed to signal the end of the stream.
	// If the request context is canceled, the stream will no longer process requests.
	Requests chan<- []byte
}
type Response struct {
	Msg []byte
	Err error
}
```
There are generic versions of the server/client handlers that allow the use of type
safe implementations for data types that support msgpack marshal/unmarshal.
											
										 
											2023-11-21 09:09:35 +08:00
										 |  |  | 	st, err := client.gridConn.NewStream(ctx, grid.HandlerWalkDir, b) | 
					
						
							| 
									
										
										
										
											2020-10-29 00:18:35 +08:00
										 |  |  | 	if err != nil { | 
					
						
							| 
									
										
										
										
											2024-01-28 08:13:41 +08:00
										 |  |  | 		return toStorageErr(err) | 
					
						
							| 
									
										
										
										
											2020-10-29 00:18:35 +08:00
										 |  |  | 	} | 
					
						
							| 
									
										
											  
											
												perf: websocket grid connectivity for all internode communication (#18461)
This PR adds a WebSocket grid feature that allows servers to communicate via 
a single two-way connection.
There are two request types:
* Single requests, which are `[]byte => ([]byte, error)`. This is for efficient small
  roundtrips with small payloads.
* Streaming requests which are `[]byte, chan []byte => chan []byte (and error)`,
  which allows for different combinations of full two-way streams with an initial payload.
Only a single stream is created between two machines - and there is, as such, no
server/client relation since both sides can initiate and handle requests. Which server
initiates the request is decided deterministically on the server names.
Requests are made through a mux client and server, which handles message
passing, congestion, cancelation, timeouts, etc.
If a connection is lost, all requests are canceled, and the calling server will try
to reconnect. Registered handlers can operate directly on byte 
slices or use a higher-level generics abstraction.
There is no versioning of handlers/clients, and incompatible changes should
be handled by adding new handlers.
The request path can be changed to a new one for any protocol changes.
First, all servers create a "Manager." The manager must know its address 
as well as all remote addresses. This will manage all connections.
To get a connection to any remote, ask the manager to provide it given
the remote address using.
```
func (m *Manager) Connection(host string) *Connection
```
All serverside handlers must also be registered on the manager. This will
make sure that all incoming requests are served. The number of in-flight 
requests and responses must also be given for streaming requests.
The "Connection" returned manages the mux-clients. Requests issued
to the connection will be sent to the remote.
* `func (c *Connection) Request(ctx context.Context, h HandlerID, req []byte) ([]byte, error)`
   performs a single request and returns the result. Any deadline provided on the request is
   forwarded to the server, and canceling the context will make the function return at once.
* `func (c *Connection) NewStream(ctx context.Context, h HandlerID, payload []byte) (st *Stream, err error)`
   will initiate a remote call and send the initial payload.
```Go
// A Stream is a two-way stream.
// All responses *must* be read by the caller.
// If the call is canceled through the context,
//The appropriate error will be returned.
type Stream struct {
	// Responses from the remote server.
	// Channel will be closed after an error or when the remote closes.
	// All responses *must* be read by the caller until either an error is returned or the channel is closed.
	// Canceling the context will cause the context cancellation error to be returned.
	Responses <-chan Response
	// Requests sent to the server.
	// If the handler is defined with 0 incoming capacity this will be nil.
	// Channel *must* be closed to signal the end of the stream.
	// If the request context is canceled, the stream will no longer process requests.
	Requests chan<- []byte
}
type Response struct {
	Msg []byte
	Err error
}
```
There are generic versions of the server/client handlers that allow the use of type
safe implementations for data types that support msgpack marshal/unmarshal.
											
										 
											2023-11-21 09:09:35 +08:00
										 |  |  | 	return toStorageErr(st.Results(func(in []byte) error { | 
					
						
							|  |  |  | 		_, err := wr.Write(in) | 
					
						
							|  |  |  | 		return err | 
					
						
							|  |  |  | 	})) | 
					
						
							|  |  |  | } | 
					
						
							| 
									
										
										
										
											2020-12-29 02:31:00 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
											  
											
												perf: websocket grid connectivity for all internode communication (#18461)
This PR adds a WebSocket grid feature that allows servers to communicate via 
a single two-way connection.
There are two request types:
* Single requests, which are `[]byte => ([]byte, error)`. This is for efficient small
  roundtrips with small payloads.
* Streaming requests which are `[]byte, chan []byte => chan []byte (and error)`,
  which allows for different combinations of full two-way streams with an initial payload.
Only a single stream is created between two machines - and there is, as such, no
server/client relation since both sides can initiate and handle requests. Which server
initiates the request is decided deterministically on the server names.
Requests are made through a mux client and server, which handles message
passing, congestion, cancelation, timeouts, etc.
If a connection is lost, all requests are canceled, and the calling server will try
to reconnect. Registered handlers can operate directly on byte 
slices or use a higher-level generics abstraction.
There is no versioning of handlers/clients, and incompatible changes should
be handled by adding new handlers.
The request path can be changed to a new one for any protocol changes.
First, all servers create a "Manager." The manager must know its address 
as well as all remote addresses. This will manage all connections.
To get a connection to any remote, ask the manager to provide it given
the remote address using.
```
func (m *Manager) Connection(host string) *Connection
```
All serverside handlers must also be registered on the manager. This will
make sure that all incoming requests are served. The number of in-flight 
requests and responses must also be given for streaming requests.
The "Connection" returned manages the mux-clients. Requests issued
to the connection will be sent to the remote.
* `func (c *Connection) Request(ctx context.Context, h HandlerID, req []byte) ([]byte, error)`
   performs a single request and returns the result. Any deadline provided on the request is
   forwarded to the server, and canceling the context will make the function return at once.
* `func (c *Connection) NewStream(ctx context.Context, h HandlerID, payload []byte) (st *Stream, err error)`
   will initiate a remote call and send the initial payload.
```Go
// A Stream is a two-way stream.
// All responses *must* be read by the caller.
// If the call is canceled through the context,
//The appropriate error will be returned.
type Stream struct {
	// Responses from the remote server.
	// Channel will be closed after an error or when the remote closes.
	// All responses *must* be read by the caller until either an error is returned or the channel is closed.
	// Canceling the context will cause the context cancellation error to be returned.
	Responses <-chan Response
	// Requests sent to the server.
	// If the handler is defined with 0 incoming capacity this will be nil.
	// Channel *must* be closed to signal the end of the stream.
	// If the request context is canceled, the stream will no longer process requests.
	Requests chan<- []byte
}
type Response struct {
	Msg []byte
	Err error
}
```
There are generic versions of the server/client handlers that allow the use of type
safe implementations for data types that support msgpack marshal/unmarshal.
											
										 
											2023-11-21 09:09:35 +08:00
										 |  |  | // WalkDirHandler - remote caller to list files and folders in a requested directory path.
 | 
					
						
							|  |  |  | func (s *storageRESTServer) WalkDirHandler(ctx context.Context, payload []byte, _ <-chan []byte, out chan<- []byte) (gerr *grid.RemoteErr) { | 
					
						
							|  |  |  | 	var opts WalkDirOptions | 
					
						
							|  |  |  | 	_, err := opts.UnmarshalMsg(payload) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return grid.NewRemoteErr(err) | 
					
						
							| 
									
										
										
										
											2020-12-29 02:31:00 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-12-02 04:01:14 +08:00
										 |  |  | 	if !s.checkID(opts.DiskID) { | 
					
						
							|  |  |  | 		return grid.NewRemoteErr(errDiskNotFound) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
											  
											
												perf: websocket grid connectivity for all internode communication (#18461)
This PR adds a WebSocket grid feature that allows servers to communicate via 
a single two-way connection.
There are two request types:
* Single requests, which are `[]byte => ([]byte, error)`. This is for efficient small
  roundtrips with small payloads.
* Streaming requests which are `[]byte, chan []byte => chan []byte (and error)`,
  which allows for different combinations of full two-way streams with an initial payload.
Only a single stream is created between two machines - and there is, as such, no
server/client relation since both sides can initiate and handle requests. Which server
initiates the request is decided deterministically on the server names.
Requests are made through a mux client and server, which handles message
passing, congestion, cancelation, timeouts, etc.
If a connection is lost, all requests are canceled, and the calling server will try
to reconnect. Registered handlers can operate directly on byte 
slices or use a higher-level generics abstraction.
There is no versioning of handlers/clients, and incompatible changes should
be handled by adding new handlers.
The request path can be changed to a new one for any protocol changes.
First, all servers create a "Manager." The manager must know its address 
as well as all remote addresses. This will manage all connections.
To get a connection to any remote, ask the manager to provide it given
the remote address using.
```
func (m *Manager) Connection(host string) *Connection
```
All serverside handlers must also be registered on the manager. This will
make sure that all incoming requests are served. The number of in-flight 
requests and responses must also be given for streaming requests.
The "Connection" returned manages the mux-clients. Requests issued
to the connection will be sent to the remote.
* `func (c *Connection) Request(ctx context.Context, h HandlerID, req []byte) ([]byte, error)`
   performs a single request and returns the result. Any deadline provided on the request is
   forwarded to the server, and canceling the context will make the function return at once.
* `func (c *Connection) NewStream(ctx context.Context, h HandlerID, payload []byte) (st *Stream, err error)`
   will initiate a remote call and send the initial payload.
```Go
// A Stream is a two-way stream.
// All responses *must* be read by the caller.
// If the call is canceled through the context,
//The appropriate error will be returned.
type Stream struct {
	// Responses from the remote server.
	// Channel will be closed after an error or when the remote closes.
	// All responses *must* be read by the caller until either an error is returned or the channel is closed.
	// Canceling the context will cause the context cancellation error to be returned.
	Responses <-chan Response
	// Requests sent to the server.
	// If the handler is defined with 0 incoming capacity this will be nil.
	// Channel *must* be closed to signal the end of the stream.
	// If the request context is canceled, the stream will no longer process requests.
	Requests chan<- []byte
}
type Response struct {
	Msg []byte
	Err error
}
```
There are generic versions of the server/client handlers that allow the use of type
safe implementations for data types that support msgpack marshal/unmarshal.
											
										 
											2023-11-21 09:09:35 +08:00
										 |  |  | 	ctx, cancel := context.WithCancel(ctx) | 
					
						
							|  |  |  | 	defer cancel() | 
					
						
							| 
									
										
										
										
											2023-12-02 04:01:14 +08:00
										 |  |  | 	return grid.NewRemoteErr(s.getStorage().WalkDir(ctx, opts, grid.WriterToChannel(ctx, out))) | 
					
						
							| 
									
										
										
										
											2020-10-29 00:18:35 +08:00
										 |  |  | } |