Allow to configure maxConcurrentRequests

This commit is contained in:
gm42 2018-05-25 15:43:09 +02:00
parent 1073df2ac4
commit f9330fc78c
1 changed files with 26 additions and 13 deletions

View File

@ -76,6 +76,19 @@ func MaxPacket(size int) ClientOption {
return MaxPacketChecked(size) return MaxPacketChecked(size)
} }
// MaxConcurrentRequestsPerFile sets the maximum concurrent requests allowed for a single file.
//
// The default maximum concurrent requests is 64.
func MaxConcurrentRequestsPerFile(n int) ClientOption {
return func(c *Client) error {
if n < 1 {
return errors.Errorf("n must be greater or equal to 1")
}
c.maxConcurrentRequests = n
return nil
}
}
// NewClient creates a new SFTP client on conn, using zero or more option // NewClient creates a new SFTP client on conn, using zero or more option
// functions. // functions.
func NewClient(conn *ssh.Client, opts ...ClientOption) (*Client, error) { func NewClient(conn *ssh.Client, opts ...ClientOption) (*Client, error) {
@ -110,7 +123,8 @@ func NewClientPipe(rd io.Reader, wr io.WriteCloser, opts ...ClientOption) (*Clie
}, },
inflight: make(map[uint32]chan<- result), inflight: make(map[uint32]chan<- result),
}, },
maxPacket: 1 << 15, maxPacket: 1 << 15,
maxConcurrentRequests: 64,
} }
if err := sftp.applyOptions(opts...); err != nil { if err := sftp.applyOptions(opts...); err != nil {
wr.Close() wr.Close()
@ -137,8 +151,9 @@ func NewClientPipe(rd io.Reader, wr io.WriteCloser, opts ...ClientOption) (*Clie
type Client struct { type Client struct {
clientConn clientConn
maxPacket int // max packet size read or written. maxPacket int // max packet size read or written.
nextid uint32 nextid uint32
maxConcurrentRequests int
} }
// Create creates the named file mode 0666 (before umask), truncating it if it // Create creates the named file mode 0666 (before umask), truncating it if it
@ -759,8 +774,6 @@ func (f *File) Name() string {
return f.path return f.path
} }
const maxConcurrentRequests = 64
// Read reads up to len(b) bytes from the File. It returns the number of bytes // Read reads up to len(b) bytes from the File. It returns the number of bytes
// read and an error, if any. Read follows io.Reader semantics, so when Read // read and an error, if any. Read follows io.Reader semantics, so when Read
// encounters an error or EOF condition after successfully reading n > 0 bytes, // encounters an error or EOF condition after successfully reading n > 0 bytes,
@ -780,7 +793,7 @@ func (f *File) Read(b []byte) (int, error) {
offset := f.offset offset := f.offset
// maxConcurrentRequests buffer to deal with broadcastErr() floods // maxConcurrentRequests buffer to deal with broadcastErr() floods
// also must have a buffer of max value of (desiredInFlight - inFlight) // also must have a buffer of max value of (desiredInFlight - inFlight)
ch := make(chan result, maxConcurrentRequests+1) ch := make(chan result, f.c.maxConcurrentRequests+1)
type inflightRead struct { type inflightRead struct {
b []byte b []byte
offset uint64 offset uint64
@ -845,7 +858,7 @@ func (f *File) Read(b []byte) (int, error) {
if n < len(req.b) { if n < len(req.b) {
sendReq(req.b[l:], req.offset+uint64(l)) sendReq(req.b[l:], req.offset+uint64(l))
} }
if desiredInFlight < maxConcurrentRequests { if desiredInFlight < f.c.maxConcurrentRequests {
desiredInFlight++ desiredInFlight++
} }
default: default:
@ -880,7 +893,7 @@ func (f *File) WriteTo(w io.Writer) (int64, error) {
writeOffset := offset writeOffset := offset
fileSize := uint64(fi.Size()) fileSize := uint64(fi.Size())
// see comment on same line in Read() above // see comment on same line in Read() above
ch := make(chan result, maxConcurrentRequests+1) ch := make(chan result, f.c.maxConcurrentRequests+1)
type inflightRead struct { type inflightRead struct {
b []byte b []byte
offset uint64 offset uint64
@ -960,7 +973,7 @@ func (f *File) WriteTo(w io.Writer) (int64, error) {
switch { switch {
case offset > fileSize: case offset > fileSize:
desiredInFlight = 1 desiredInFlight = 1
case desiredInFlight < maxConcurrentRequests: case desiredInFlight < f.c.maxConcurrentRequests:
desiredInFlight++ desiredInFlight++
} }
writeOffset += uint64(nbytes) writeOffset += uint64(nbytes)
@ -1028,7 +1041,7 @@ func (f *File) Write(b []byte) (int, error) {
desiredInFlight := 1 desiredInFlight := 1
offset := f.offset offset := f.offset
// see comment on same line in Read() above // see comment on same line in Read() above
ch := make(chan result, maxConcurrentRequests+1) ch := make(chan result, f.c.maxConcurrentRequests+1)
var firstErr error var firstErr error
written := len(b) written := len(b)
for len(b) > 0 || inFlight > 0 { for len(b) > 0 || inFlight > 0 {
@ -1064,7 +1077,7 @@ func (f *File) Write(b []byte) (int, error) {
firstErr = err firstErr = err
break break
} }
if desiredInFlight < maxConcurrentRequests { if desiredInFlight < f.c.maxConcurrentRequests {
desiredInFlight++ desiredInFlight++
} }
default: default:
@ -1093,7 +1106,7 @@ func (f *File) ReadFrom(r io.Reader) (int64, error) {
desiredInFlight := 1 desiredInFlight := 1
offset := f.offset offset := f.offset
// see comment on same line in Read() above // see comment on same line in Read() above
ch := make(chan result, maxConcurrentRequests+1) ch := make(chan result, f.c.maxConcurrentRequests+1)
var firstErr error var firstErr error
read := int64(0) read := int64(0)
b := make([]byte, f.c.maxPacket) b := make([]byte, f.c.maxPacket)
@ -1132,7 +1145,7 @@ func (f *File) ReadFrom(r io.Reader) (int64, error) {
firstErr = err firstErr = err
break break
} }
if desiredInFlight < maxConcurrentRequests { if desiredInFlight < f.c.maxConcurrentRequests {
desiredInFlight++ desiredInFlight++
} }
default: default: