sftp/client.go

2249 lines
56 KiB
Go
Raw Normal View History

package sftp
import (
"bytes"
2023-11-10 23:46:49 +08:00
"context"
"encoding/binary"
"errors"
"fmt"
"io"
2021-01-23 00:45:57 +08:00
"math"
"os"
2013-11-06 12:40:35 +08:00
"path"
"sync"
"sync/atomic"
"syscall"
"time"
2013-11-11 09:57:03 +08:00
"github.com/kr/fs"
"golang.org/x/crypto/ssh"
"github.com/pkg/sftp/internal/encoding/ssh/filexfer/openssh"
)
var (
// ErrInternalInconsistency indicates the packets sent and the data queued to be
// written to the file don't match up. It is an unusual error and usually is
// caused by bad behavior server side or connection issues. The error is
// limited in scope to the call where it happened, the client object is still
// OK to use as long as the connection is still open.
ErrInternalInconsistency = errors.New("internal inconsistency")
// InternalInconsistency alias for ErrInternalInconsistency.
//
// Deprecated: please use ErrInternalInconsistency
InternalInconsistency = ErrInternalInconsistency
)
2017-07-26 12:04:32 +08:00
// A ClientOption is a function which applies configuration to a Client.
type ClientOption func(*Client) error
// MaxPacketChecked sets the maximum size of the payload, measured in bytes.
// This option only accepts sizes servers should support, ie. <= 32768 bytes.
//
// If you get the error "failed to send packet header: EOF" when copying a
// large file, try lowering this number.
//
// The default packet size is 32768 bytes.
func MaxPacketChecked(size int) ClientOption {
return func(c *Client) error {
if size < 1 {
2021-03-17 19:03:24 +08:00
return errors.New("size must be greater or equal to 1")
}
if size > 32768 {
2021-03-17 19:03:24 +08:00
return errors.New("sizes larger than 32KB might not work with all servers")
}
c.maxPacket = size
return nil
}
}
// MaxPacketUnchecked sets the maximum size of the payload, measured in bytes.
// It accepts sizes larger than the 32768 bytes all servers should support.
// Only use a setting higher than 32768 if your application always connects to
// the same server or after sufficiently broad testing.
//
// If you get the error "failed to send packet header: EOF" when copying a
// large file, try lowering this number.
//
// The default packet size is 32768 bytes.
func MaxPacketUnchecked(size int) ClientOption {
return func(c *Client) error {
if size < 1 {
2021-03-17 19:03:24 +08:00
return errors.New("size must be greater or equal to 1")
}
c.maxPacket = size
return nil
}
}
// MaxPacket sets the maximum size of the payload, measured in bytes.
// This option only accepts sizes servers should support, ie. <= 32768 bytes.
// This is a synonym for MaxPacketChecked that provides backward compatibility.
//
// If you get the error "failed to send packet header: EOF" when copying a
// large file, try lowering this number.
//
// The default packet size is 32768 bytes.
func MaxPacket(size int) ClientOption {
return MaxPacketChecked(size)
}
// MaxConcurrentRequestsPerFile sets the maximum concurrent requests allowed for a single file.
//
// The default maximum concurrent requests is 64.
func MaxConcurrentRequestsPerFile(n int) ClientOption {
return func(c *Client) error {
if n < 1 {
2021-03-17 19:03:24 +08:00
return errors.New("n must be greater or equal to 1")
}
c.maxConcurrentRequests = n
return nil
}
}
2021-02-22 04:32:09 +08:00
// UseConcurrentWrites allows the Client to perform concurrent Writes.
//
// Using concurrency while doing writes, requires special consideration.
// A write to a later offset in a file after an error,
// could end up with a file length longer than what was successfully written.
//
// When using this option, if you receive an error during `io.Copy` or `io.WriteTo`,
// you may need to `Truncate` the target Writer to avoid “holes” in the data written.
func UseConcurrentWrites(value bool) ClientOption {
return func(c *Client) error {
c.useConcurrentWrites = value
return nil
}
}
// UseConcurrentReads allows the Client to perform concurrent Reads.
//
// Concurrent reads are generally safe to use and not using them will degrade
// performance, so this option is enabled by default.
//
// When enabled, WriteTo will use Stat/Fstat to get the file size and determines
// how many concurrent workers to use.
// Some "read once" servers will delete the file if they receive a stat call on an
// open file and then the download will fail.
// Disabling concurrent reads you will be able to download files from these servers.
// If concurrent reads are disabled, the UseFstat option is ignored.
func UseConcurrentReads(value bool) ClientOption {
return func(c *Client) error {
c.disableConcurrentReads = !value
return nil
}
}
2021-02-22 04:32:09 +08:00
// UseFstat sets whether to use Fstat or Stat when File.WriteTo is called
// (usually when copying files).
// Some servers limit the amount of open files and calling Stat after opening
// the file will throw an error From the server. Setting this flag will call
// Fstat instead of Stat which is suppose to be called on an open file handle.
//
// It has been found that that with IBM Sterling SFTP servers which have
// "extractability" level set to 1 which means only 1 file can be opened at
// any given time.
//
// If the server you are working with still has an issue with both Stat and
// Fstat calls you can always open a file and read it until the end.
//
// Another reason to read the file until its end and Fstat doesn't work is
// that in some servers, reading a full file will automatically delete the
// file as some of these mainframes map the file to a message in a queue.
// Once the file has been read it will get deleted.
func UseFstat(value bool) ClientOption {
return func(c *Client) error {
c.useFstat = value
return nil
}
}
// Client represents an SFTP session on a *ssh.ClientConn SSH connection.
// Multiple Clients can be active on a single SSH connection, and a Client
// may be called concurrently from multiple Goroutines.
//
// Client implements the github.com/kr/fs.FileSystem interface.
type Client struct {
clientConn
ext map[string]string // Extensions (name -> data).
maxPacket int // max packet size read or written.
maxConcurrentRequests int
nextid uint32
// write concurrency is… error prone.
// Default behavior should be to not use it.
useConcurrentWrites bool
useFstat bool
disableConcurrentReads bool
2021-02-22 04:32:09 +08:00
}
// NewClient creates a new SFTP client on conn, using zero or more option
// functions.
func NewClient(conn *ssh.Client, opts ...ClientOption) (*Client, error) {
s, err := conn.NewSession()
if err != nil {
return nil, err
}
if err := s.RequestSubsystem("sftp"); err != nil {
return nil, err
}
pw, err := s.StdinPipe()
if err != nil {
return nil, err
}
pr, err := s.StdoutPipe()
if err != nil {
return nil, err
}
2014-10-10 02:49:08 +08:00
return NewClientPipe(pr, pw, opts...)
}
// NewClientPipe creates a new SFTP client given a Reader and a WriteCloser.
// This can be used for connecting to an SFTP server over TCP/TLS or by using
// the system's ssh client program (e.g. via exec.Command).
func NewClientPipe(rd io.Reader, wr io.WriteCloser, opts ...ClientOption) (*Client, error) {
sftp := &Client{
clientConn: clientConn{
conn: conn{
Reader: rd,
WriteCloser: wr,
},
inflight: make(map[uint32]chan<- result),
closed: make(chan struct{}),
},
ext: make(map[string]string),
maxPacket: 1 << 15,
maxConcurrentRequests: 64,
}
2021-02-22 04:32:09 +08:00
for _, opt := range opts {
if err := opt(sftp); err != nil {
wr.Close()
return nil, err
}
}
2021-02-22 04:32:09 +08:00
if err := sftp.sendInit(); err != nil {
wr.Close()
return nil, fmt.Errorf("error sending init packet to server: %w", err)
}
Handle recvPacket in a single goroutine. Previously recvPacket would be invoked in several goroutines. This meant that when multiple concurrent requests were in flight there were N goroutines each waiting on recvPacket. For optimal throughput the goal is to send a new request as quickly as possible once a response is received. The previous mechanism worked counter to this because the goroutine sending new requests would be competing against N recvPacket goroutines that may become runnable as data streams in. Having a single goroutine responsible for recvPacket means that the recv and send goroutines will ping-pong back and forth optimizing throughput. This changes shows a ~10-25% increase in throughput in the the *Delay* benchmark tests. $ go test -bench=. -integration PASS BenchmarkRead1k 2 840068631 ns/op 12.48 MB/s BenchmarkRead16k 20 72968548 ns/op 143.70 MB/s BenchmarkRead32k 30 56871347 ns/op 184.38 MB/s BenchmarkRead128k 100 34150953 ns/op 307.05 MB/s BenchmarkRead512k 100 15730685 ns/op 666.59 MB/s BenchmarkRead1MiB 200 10462421 ns/op 1002.24 MB/s BenchmarkRead4MiB 200 7325236 ns/op 1431.47 MB/s BenchmarkRead4MiBDelay10Msec 10 186893765 ns/op 56.11 MB/s BenchmarkRead4MiBDelay50Msec 2 907127114 ns/op 11.56 MB/s BenchmarkRead4MiBDelay150Msec 1 2708025060 ns/op 3.87 MB/s BenchmarkWrite1k 1 1623940932 ns/op 6.46 MB/s BenchmarkWrite16k 10 174293843 ns/op 60.16 MB/s BenchmarkWrite32k 10 120377272 ns/op 87.11 MB/s BenchmarkWrite128k 20 54592205 ns/op 192.08 MB/s BenchmarkWrite512k 50 66449591 ns/op 157.80 MB/s BenchmarkWrite1MiB 50 70965660 ns/op 147.76 MB/s BenchmarkWrite4MiB 50 69234861 ns/op 151.45 MB/s BenchmarkWrite4MiBDelay10Msec 5 276624260 ns/op 37.91 MB/s BenchmarkWrite4MiBDelay50Msec 1 1318396552 ns/op 7.95 MB/s BenchmarkWrite4MiBDelay150Msec 1 3918416658 ns/op 2.68 MB/s BenchmarkCopyDown10MiBDelay10Msec 10 152240808 ns/op 68.88 MB/s BenchmarkCopyDown10MiBDelay50Msec 2 715003188 ns/op 14.67 MB/s BenchmarkCopyDown10MiBDelay150Msec 1 2116878801 ns/op 4.95 MB/s BenchmarkCopyUp10MiBDelay10Msec 10 192748258 ns/op 54.40 MB/s BenchmarkCopyUp10MiBDelay50Msec 2 691486538 ns/op 15.16 MB/s BenchmarkCopyUp10MiBDelay150Msec 1 1997162991 ns/op 5.25 MB/s BenchmarkMarshalInit 2000000 644 ns/op BenchmarkMarshalOpen 3000000 562 ns/op BenchmarkMarshalWriteWorstCase 20000 75166 ns/op BenchmarkMarshalWrite1k 500000 3862 ns/op ok github.com/pkg/sftp 71.174s
2015-06-03 04:03:39 +08:00
if err := sftp.recvVersion(); err != nil {
wr.Close()
return nil, fmt.Errorf("error receiving version packet from server: %w", err)
Handle recvPacket in a single goroutine. Previously recvPacket would be invoked in several goroutines. This meant that when multiple concurrent requests were in flight there were N goroutines each waiting on recvPacket. For optimal throughput the goal is to send a new request as quickly as possible once a response is received. The previous mechanism worked counter to this because the goroutine sending new requests would be competing against N recvPacket goroutines that may become runnable as data streams in. Having a single goroutine responsible for recvPacket means that the recv and send goroutines will ping-pong back and forth optimizing throughput. This changes shows a ~10-25% increase in throughput in the the *Delay* benchmark tests. $ go test -bench=. -integration PASS BenchmarkRead1k 2 840068631 ns/op 12.48 MB/s BenchmarkRead16k 20 72968548 ns/op 143.70 MB/s BenchmarkRead32k 30 56871347 ns/op 184.38 MB/s BenchmarkRead128k 100 34150953 ns/op 307.05 MB/s BenchmarkRead512k 100 15730685 ns/op 666.59 MB/s BenchmarkRead1MiB 200 10462421 ns/op 1002.24 MB/s BenchmarkRead4MiB 200 7325236 ns/op 1431.47 MB/s BenchmarkRead4MiBDelay10Msec 10 186893765 ns/op 56.11 MB/s BenchmarkRead4MiBDelay50Msec 2 907127114 ns/op 11.56 MB/s BenchmarkRead4MiBDelay150Msec 1 2708025060 ns/op 3.87 MB/s BenchmarkWrite1k 1 1623940932 ns/op 6.46 MB/s BenchmarkWrite16k 10 174293843 ns/op 60.16 MB/s BenchmarkWrite32k 10 120377272 ns/op 87.11 MB/s BenchmarkWrite128k 20 54592205 ns/op 192.08 MB/s BenchmarkWrite512k 50 66449591 ns/op 157.80 MB/s BenchmarkWrite1MiB 50 70965660 ns/op 147.76 MB/s BenchmarkWrite4MiB 50 69234861 ns/op 151.45 MB/s BenchmarkWrite4MiBDelay10Msec 5 276624260 ns/op 37.91 MB/s BenchmarkWrite4MiBDelay50Msec 1 1318396552 ns/op 7.95 MB/s BenchmarkWrite4MiBDelay150Msec 1 3918416658 ns/op 2.68 MB/s BenchmarkCopyDown10MiBDelay10Msec 10 152240808 ns/op 68.88 MB/s BenchmarkCopyDown10MiBDelay50Msec 2 715003188 ns/op 14.67 MB/s BenchmarkCopyDown10MiBDelay150Msec 1 2116878801 ns/op 4.95 MB/s BenchmarkCopyUp10MiBDelay10Msec 10 192748258 ns/op 54.40 MB/s BenchmarkCopyUp10MiBDelay50Msec 2 691486538 ns/op 15.16 MB/s BenchmarkCopyUp10MiBDelay150Msec 1 1997162991 ns/op 5.25 MB/s BenchmarkMarshalInit 2000000 644 ns/op BenchmarkMarshalOpen 3000000 562 ns/op BenchmarkMarshalWriteWorstCase 20000 75166 ns/op BenchmarkMarshalWrite1k 500000 3862 ns/op ok github.com/pkg/sftp 71.174s
2015-06-03 04:03:39 +08:00
}
2021-02-22 04:32:09 +08:00
2016-06-15 16:23:51 +08:00
sftp.clientConn.wg.Add(1)
go func() {
defer sftp.clientConn.wg.Done()
if err := sftp.clientConn.recv(); err != nil {
sftp.clientConn.broadcastErr(err)
}
}()
2021-02-22 04:32:09 +08:00
return sftp, nil
}
// Create creates the named file mode 0666 (before umask), truncating it if it
// already exists. If successful, methods on the returned File can be used for
// I/O; the associated file descriptor has mode O_RDWR. If you need more
// control over the flags/mode used to open the file see client.OpenFile.
//
// Note that some SFTP servers (eg. AWS Transfer) do not support opening files
// read/write at the same time. For those services you will need to use
// `client.OpenFile(os.O_WRONLY|os.O_CREATE|os.O_TRUNC)`.
2013-11-06 10:04:40 +08:00
func (c *Client) Create(path string) (*File, error) {
return c.open(path, toPflags(os.O_RDWR|os.O_CREATE|os.O_TRUNC))
2013-11-06 10:04:40 +08:00
}
const sftpProtocolVersion = 3 // https://filezilla-project.org/specs/draft-ietf-secsh-filexfer-02.txt
2014-10-10 02:45:30 +08:00
func (c *Client) sendInit() error {
return c.clientConn.conn.sendPacket(&sshFxInitPacket{
Version: sftpProtocolVersion, // https://filezilla-project.org/specs/draft-ietf-secsh-filexfer-02.txt
})
}
// returns the next value of c.nextid
2016-01-05 05:15:21 +08:00
func (c *Client) nextID() uint32 {
return atomic.AddUint32(&c.nextid, 1)
}
func (c *Client) recvVersion() error {
typ, data, err := c.recvPacket(0)
if err != nil {
if err == io.EOF {
2022-07-11 19:34:22 +08:00
return fmt.Errorf("server unexpectedly closed connection: %w", io.ErrUnexpectedEOF)
}
return err
}
if typ != sshFxpVersion {
return &unexpectedPacketErr{sshFxpVersion, typ}
}
2014-10-10 02:45:30 +08:00
version, data, err := unmarshalUint32Safe(data)
if err != nil {
return err
}
2014-10-10 02:45:30 +08:00
if version != sftpProtocolVersion {
return &unexpectedVersionErr{sftpProtocolVersion, version}
}
for len(data) > 0 {
var ext extensionPair
ext, data, err = unmarshalExtensionPair(data)
if err != nil {
return err
}
c.ext[ext.Name] = ext.Data
}
return nil
}
// HasExtension checks whether the server supports a named extension.
//
// The first return value is the extension data reported by the server
// (typically a version number).
func (c *Client) HasExtension(name string) (string, bool) {
data, ok := c.ext[name]
return data, ok
}
// Walk returns a new Walker rooted at root.
2013-11-07 08:31:46 +08:00
func (c *Client) Walk(root string) *fs.Walker {
2013-11-07 14:23:51 +08:00
return fs.WalkFS(root, c)
}
2023-11-10 23:46:49 +08:00
// ReadDir reads the directory named by p
// and returns a list of directory entries.
2013-11-07 14:23:51 +08:00
func (c *Client) ReadDir(p string) ([]os.FileInfo, error) {
2023-11-10 23:46:49 +08:00
return c.ReadDirContext(context.Background(), p)
}
// ReadDirContext reads the directory named by p
// and returns a list of directory entries.
// The passed context can be used to cancel the operation
// returning all entries listed up to the cancellation.
func (c *Client) ReadDirContext(ctx context.Context, p string) ([]os.FileInfo, error) {
2023-11-13 16:13:33 +08:00
handle, err := c.opendir(ctx, p)
if err != nil {
return nil, err
}
2013-11-08 18:24:50 +08:00
defer c.close(handle) // this has to defer earlier than the lock below
2023-11-10 23:46:49 +08:00
var entries []os.FileInfo
var done = false
for !done {
2016-01-05 05:15:21 +08:00
id := c.nextID()
2023-11-13 16:13:33 +08:00
typ, data, err1 := c.sendPacket(ctx, nil, &sshFxpReaddirPacket{
2016-01-05 05:15:21 +08:00
ID: id,
Handle: handle,
})
if err1 != nil {
err = err1
done = true
break
}
switch typ {
case sshFxpName:
sid, data := unmarshalUint32(data)
if sid != id {
2016-01-05 05:15:21 +08:00
return nil, &unexpectedIDErr{id, sid}
}
count, data := unmarshalUint32(data)
for i := uint32(0); i < count; i++ {
var filename string
filename, data = unmarshalString(data)
_, data = unmarshalString(data) // discard longname
var attr *FileStat
2024-01-19 09:23:22 +08:00
attr, data, err = unmarshalAttrs(data)
if err != nil {
return nil, err
}
if filename == "." || filename == ".." {
continue
}
2023-11-10 23:46:49 +08:00
entries = append(entries, fileInfoFromStat(attr, path.Base(filename)))
}
case sshFxpStatus:
// TODO(dfc) scope warning!
err = normaliseError(unmarshalStatus(id, data))
done = true
default:
return nil, unimplementedPacketErr(typ)
}
}
if err == io.EOF {
err = nil
}
2023-11-10 23:46:49 +08:00
return entries, err
}
2015-12-22 05:45:40 +08:00
2023-11-13 16:13:33 +08:00
func (c *Client) opendir(ctx context.Context, path string) (string, error) {
2016-01-05 05:15:21 +08:00
id := c.nextID()
2023-11-13 16:13:33 +08:00
typ, data, err := c.sendPacket(ctx, nil, &sshFxpOpendirPacket{
2016-01-05 05:15:21 +08:00
ID: id,
Path: path,
})
if err != nil {
return "", err
}
switch typ {
case sshFxpHandle:
sid, data := unmarshalUint32(data)
if sid != id {
2016-01-05 05:15:21 +08:00
return "", &unexpectedIDErr{id, sid}
}
handle, _ := unmarshalString(data)
return handle, nil
case sshFxpStatus:
return "", normaliseError(unmarshalStatus(id, data))
default:
return "", unimplementedPacketErr(typ)
}
}
// Stat returns a FileInfo structure describing the file specified by path 'p'.
// If 'p' is a symbolic link, the returned FileInfo structure describes the referent file.
func (c *Client) Stat(p string) (os.FileInfo, error) {
fs, err := c.stat(p)
if err != nil {
return nil, err
}
return fileInfoFromStat(fs, path.Base(p)), nil
}
// Lstat returns a FileInfo structure describing the file specified by path 'p'.
// If 'p' is a symbolic link, the returned FileInfo structure describes the symbolic link.
2013-11-06 12:40:35 +08:00
func (c *Client) Lstat(p string) (os.FileInfo, error) {
2016-01-05 05:15:21 +08:00
id := c.nextID()
2023-11-13 16:13:33 +08:00
typ, data, err := c.sendPacket(context.Background(), nil, &sshFxpLstatPacket{
2016-01-05 05:15:21 +08:00
ID: id,
2013-11-06 12:40:35 +08:00
Path: p,
})
if err != nil {
return nil, err
}
switch typ {
case sshFxpAttrs:
sid, data := unmarshalUint32(data)
if sid != id {
2016-01-05 05:15:21 +08:00
return nil, &unexpectedIDErr{id, sid}
}
2024-01-19 09:23:22 +08:00
attr, _, err := unmarshalAttrs(data)
2024-02-06 16:12:54 +08:00
if err != nil {
// avoid returning a valid value from fileInfoFromStats if err != nil.
2024-02-06 16:12:54 +08:00
return nil, err
}
return fileInfoFromStat(attr, path.Base(p)), nil
case sshFxpStatus:
return nil, normaliseError(unmarshalStatus(id, data))
default:
return nil, unimplementedPacketErr(typ)
}
}
2013-11-06 08:04:26 +08:00
2014-09-24 02:42:28 +08:00
// ReadLink reads the target of a symbolic link.
2014-09-23 10:29:20 +08:00
func (c *Client) ReadLink(p string) (string, error) {
2016-01-05 05:15:21 +08:00
id := c.nextID()
2023-11-13 16:13:33 +08:00
typ, data, err := c.sendPacket(context.Background(), nil, &sshFxpReadlinkPacket{
2016-01-05 05:15:21 +08:00
ID: id,
2014-09-23 10:29:20 +08:00
Path: p,
})
if err != nil {
return "", err
}
switch typ {
case sshFxpName:
2014-09-23 10:29:20 +08:00
sid, data := unmarshalUint32(data)
if sid != id {
2016-01-05 05:15:21 +08:00
return "", &unexpectedIDErr{id, sid}
2014-09-23 10:29:20 +08:00
}
count, data := unmarshalUint32(data)
if count != 1 {
2014-09-24 02:42:28 +08:00
return "", unexpectedCount(1, count)
2014-09-23 10:29:20 +08:00
}
2014-09-24 02:42:28 +08:00
filename, _ := unmarshalString(data) // ignore dummy attributes
2014-09-23 10:29:20 +08:00
return filename, nil
case sshFxpStatus:
return "", normaliseError(unmarshalStatus(id, data))
2014-09-23 10:29:20 +08:00
default:
return "", unimplementedPacketErr(typ)
}
}
// Link creates a hard link at 'newname', pointing at the same inode as 'oldname'
func (c *Client) Link(oldname, newname string) error {
id := c.nextID()
2023-11-13 16:13:33 +08:00
typ, data, err := c.sendPacket(context.Background(), nil, &sshFxpHardlinkPacket{
ID: id,
Oldpath: oldname,
Newpath: newname,
})
if err != nil {
return err
}
switch typ {
case sshFxpStatus:
return normaliseError(unmarshalStatus(id, data))
default:
return unimplementedPacketErr(typ)
}
}
2015-09-07 16:05:16 +08:00
// Symlink creates a symbolic link at 'newname', pointing at target 'oldname'
func (c *Client) Symlink(oldname, newname string) error {
2016-01-05 05:15:21 +08:00
id := c.nextID()
2023-11-13 16:13:33 +08:00
typ, data, err := c.sendPacket(context.Background(), nil, &sshFxpSymlinkPacket{
2016-01-05 05:15:21 +08:00
ID: id,
2015-09-07 16:05:16 +08:00
Linkpath: newname,
Targetpath: oldname,
})
if err != nil {
return err
}
switch typ {
case sshFxpStatus:
return normaliseError(unmarshalStatus(id, data))
2015-09-07 16:05:16 +08:00
default:
return unimplementedPacketErr(typ)
}
}
func (c *Client) fsetstat(handle string, flags uint32, attrs interface{}) error {
id := c.nextID()
2023-11-13 16:13:33 +08:00
typ, data, err := c.sendPacket(context.Background(), nil, &sshFxpFsetstatPacket{
ID: id,
Handle: handle,
Flags: flags,
Attrs: attrs,
})
if err != nil {
return err
}
switch typ {
case sshFxpStatus:
return normaliseError(unmarshalStatus(id, data))
default:
return unimplementedPacketErr(typ)
}
}
// setstat is a convience wrapper to allow for changing of various parts of the file descriptor.
func (c *Client) setstat(path string, flags uint32, attrs interface{}) error {
2016-01-05 05:15:21 +08:00
id := c.nextID()
2023-11-13 16:13:33 +08:00
typ, data, err := c.sendPacket(context.Background(), nil, &sshFxpSetstatPacket{
2016-01-05 05:15:21 +08:00
ID: id,
Path: path,
Flags: flags,
Attrs: attrs,
})
if err != nil {
return err
}
switch typ {
case sshFxpStatus:
return normaliseError(unmarshalStatus(id, data))
default:
return unimplementedPacketErr(typ)
}
}
// Chtimes changes the access and modification times of the named file.
func (c *Client) Chtimes(path string, atime time.Time, mtime time.Time) error {
type times struct {
Atime uint32
Mtime uint32
}
2014-06-23 04:01:04 +08:00
attrs := times{uint32(atime.Unix()), uint32(mtime.Unix())}
return c.setstat(path, sshFileXferAttrACmodTime, attrs)
}
// Chown changes the user and group owners of the named file.
func (c *Client) Chown(path string, uid, gid int) error {
type owner struct {
2016-01-05 05:15:21 +08:00
UID uint32
GID uint32
}
2014-06-23 04:01:04 +08:00
attrs := owner{uint32(uid), uint32(gid)}
return c.setstat(path, sshFileXferAttrUIDGID, attrs)
}
// Chmod changes the permissions of the named file.
//
// Chmod does not apply a umask, because even retrieving the umask is not
// possible in a portable way without causing a race condition. Callers
// should mask off umask bits, if desired.
func (c *Client) Chmod(path string, mode os.FileMode) error {
return c.setstat(path, sshFileXferAttrPermissions, toChmodPerm(mode))
}
// Truncate sets the size of the named file. Although it may be safely assumed
// that if the size is less than its current size it will be truncated to fit,
// the SFTP protocol does not specify what behavior the server should do when setting
// size greater than the current size.
func (c *Client) Truncate(path string, size int64) error {
return c.setstat(path, sshFileXferAttrSize, uint64(size))
}
// SetExtendedData sets extended attributes of the named file. It uses the
// SSH_FILEXFER_ATTR_EXTENDED flag in the setstat request.
//
// This flag provides a general extension mechanism for vendor-specific extensions.
// Names of the attributes should be a string of the format "name@domain", where "domain"
// is a valid, registered domain name and "name" identifies the method. Server
// implementations SHOULD ignore extended data fields that they do not understand.
func (c *Client) SetExtendedData(path string, extended []StatExtended) error {
attrs := &FileStat{
Extended: extended,
}
return c.setstat(path, sshFileXferAttrExtended, attrs)
}
2013-11-06 08:04:26 +08:00
// Open opens the named file for reading. If successful, methods on the
// returned file can be used for reading; the associated file descriptor
// has mode O_RDONLY.
func (c *Client) Open(path string) (*File, error) {
return c.open(path, toPflags(os.O_RDONLY))
2013-11-14 12:32:21 +08:00
}
// OpenFile is the generalized open call; most users will use Open or
// Create instead. It opens the named file with specified flag (O_RDONLY
// etc.). If successful, methods on the returned File can be used for I/O.
func (c *Client) OpenFile(path string, f int) (*File, error) {
return c.open(path, toPflags(f))
2013-11-06 09:53:45 +08:00
}
func (c *Client) open(path string, pflags uint32) (*File, error) {
2016-01-05 05:15:21 +08:00
id := c.nextID()
2023-11-13 16:13:33 +08:00
typ, data, err := c.sendPacket(context.Background(), nil, &sshFxpOpenPacket{
2016-01-05 05:15:21 +08:00
ID: id,
2013-11-06 08:04:26 +08:00
Path: path,
2013-11-06 09:53:45 +08:00
Pflags: pflags,
})
2013-11-06 08:04:26 +08:00
if err != nil {
return nil, err
}
switch typ {
case sshFxpHandle:
2013-11-06 08:04:26 +08:00
sid, data := unmarshalUint32(data)
if sid != id {
2016-01-05 05:15:21 +08:00
return nil, &unexpectedIDErr{id, sid}
2013-11-06 08:04:26 +08:00
}
handle, _ := unmarshalString(data)
2013-11-06 09:36:05 +08:00
return &File{c: c, path: path, handle: handle}, nil
case sshFxpStatus:
return nil, normaliseError(unmarshalStatus(id, data))
2013-11-06 08:04:26 +08:00
default:
return nil, unimplementedPacketErr(typ)
}
}
2013-11-06 08:30:01 +08:00
// close closes a handle handle previously returned in the response
// to SSH_FXP_OPEN or SSH_FXP_OPENDIR. The handle becomes invalid
// immediately after this request has been sent.
func (c *Client) close(handle string) error {
2016-01-05 05:15:21 +08:00
id := c.nextID()
2023-11-13 16:13:33 +08:00
typ, data, err := c.sendPacket(context.Background(), nil, &sshFxpClosePacket{
2016-01-05 05:15:21 +08:00
ID: id,
2013-11-06 08:30:01 +08:00
Handle: handle,
})
2013-11-06 08:30:01 +08:00
if err != nil {
return err
}
switch typ {
case sshFxpStatus:
return normaliseError(unmarshalStatus(id, data))
2013-11-06 08:30:01 +08:00
default:
return unimplementedPacketErr(typ)
}
}
2013-11-06 09:36:05 +08:00
func (c *Client) stat(path string) (*FileStat, error) {
id := c.nextID()
2023-11-13 16:13:33 +08:00
typ, data, err := c.sendPacket(context.Background(), nil, &sshFxpStatPacket{
ID: id,
Path: path,
})
if err != nil {
return nil, err
}
switch typ {
case sshFxpAttrs:
sid, data := unmarshalUint32(data)
if sid != id {
return nil, &unexpectedIDErr{id, sid}
}
2024-01-19 09:23:22 +08:00
attr, _, err := unmarshalAttrs(data)
return attr, err
case sshFxpStatus:
return nil, normaliseError(unmarshalStatus(id, data))
default:
return nil, unimplementedPacketErr(typ)
}
}
func (c *Client) fstat(handle string) (*FileStat, error) {
2016-01-05 05:15:21 +08:00
id := c.nextID()
2023-11-13 16:13:33 +08:00
typ, data, err := c.sendPacket(context.Background(), nil, &sshFxpFstatPacket{
2016-01-05 05:15:21 +08:00
ID: id,
2013-11-06 09:36:05 +08:00
Handle: handle,
})
2013-11-06 09:36:05 +08:00
if err != nil {
return nil, err
}
switch typ {
case sshFxpAttrs:
2013-11-06 09:36:05 +08:00
sid, data := unmarshalUint32(data)
if sid != id {
2016-01-05 05:15:21 +08:00
return nil, &unexpectedIDErr{id, sid}
2013-11-06 09:36:05 +08:00
}
2024-01-19 09:23:22 +08:00
attr, _, err := unmarshalAttrs(data)
return attr, err
case sshFxpStatus:
return nil, normaliseError(unmarshalStatus(id, data))
2013-11-06 09:36:05 +08:00
default:
return nil, unimplementedPacketErr(typ)
}
}
2013-11-06 09:42:14 +08:00
// StatVFS retrieves VFS statistics from a remote host.
//
// It implements the statvfs@openssh.com SSH_FXP_EXTENDED feature
// from http://www.opensource.apple.com/source/OpenSSH/OpenSSH-175/openssh/PROTOCOL?txt.
func (c *Client) StatVFS(path string) (*StatVFS, error) {
// send the StatVFS packet to the server
2016-01-05 05:15:21 +08:00
id := c.nextID()
2023-11-13 16:13:33 +08:00
typ, data, err := c.sendPacket(context.Background(), nil, &sshFxpStatvfsPacket{
2016-01-05 05:15:21 +08:00
ID: id,
Path: path,
})
if err != nil {
return nil, err
}
switch typ {
// server responded with valid data
case sshFxpExtendedReply:
var response StatVFS
err = binary.Read(bytes.NewReader(data), binary.BigEndian, &response)
if err != nil {
return nil, errors.New("can not parse reply")
}
return &response, nil
// the resquest failed
case sshFxpStatus:
return nil, normaliseError(unmarshalStatus(id, data))
default:
return nil, unimplementedPacketErr(typ)
}
}
2013-11-07 14:23:51 +08:00
// Join joins any number of path elements into a single path, adding a
// separating slash if necessary. The result is Cleaned; in particular, all
// empty strings are ignored.
func (c *Client) Join(elem ...string) string { return path.Join(elem...) }
// Remove removes the specified file or directory. An error will be returned if no
// file or directory with the specified path exists, or if the specified directory
// is not empty.
2013-11-06 11:08:26 +08:00
func (c *Client) Remove(path string) error {
errF := c.removeFile(path)
if errF == nil {
return nil
}
errD := c.RemoveDirectory(path)
if errD == nil {
return nil
}
// Both failed: figure out which error to return.
if errF == errD {
// If they are the same error, then just return that.
return errF
}
fi, err := c.Stat(path)
if err != nil {
return err
}
if fi.IsDir() {
return errD
}
return errF
}
func (c *Client) removeFile(path string) error {
2016-01-05 05:15:21 +08:00
id := c.nextID()
2023-11-13 16:13:33 +08:00
typ, data, err := c.sendPacket(context.Background(), nil, &sshFxpRemovePacket{
2016-01-05 05:15:21 +08:00
ID: id,
2013-11-06 11:08:26 +08:00
Filename: path,
})
2013-11-06 11:08:26 +08:00
if err != nil {
return err
}
switch typ {
case sshFxpStatus:
2025-02-27 22:02:29 +08:00
err = normaliseError(unmarshalStatus(id, data))
if err == nil {
return nil
}
return &os.PathError{
Op: "remove",
Path: path,
2025-02-27 22:02:29 +08:00
Err: err,
}
2013-11-06 11:08:26 +08:00
default:
return unimplementedPacketErr(typ)
}
}
2016-10-21 22:04:00 +08:00
// RemoveDirectory removes a directory path.
func (c *Client) RemoveDirectory(path string) error {
2016-01-05 05:15:21 +08:00
id := c.nextID()
2023-11-13 16:13:33 +08:00
typ, data, err := c.sendPacket(context.Background(), nil, &sshFxpRmdirPacket{
2016-01-05 05:15:21 +08:00
ID: id,
Path: path,
})
if err != nil {
return err
}
switch typ {
case sshFxpStatus:
2025-02-27 22:02:29 +08:00
err = normaliseError(unmarshalStatus(id, data))
if err == nil {
return nil
}
return &os.PathError{
Op: "remove",
Path: path,
2025-02-27 22:02:29 +08:00
Err: err,
}
default:
return unimplementedPacketErr(typ)
}
}
2013-11-06 11:15:26 +08:00
// Rename renames a file.
func (c *Client) Rename(oldname, newname string) error {
2016-01-05 05:15:21 +08:00
id := c.nextID()
2023-11-13 16:13:33 +08:00
typ, data, err := c.sendPacket(context.Background(), nil, &sshFxpRenamePacket{
2016-01-05 05:15:21 +08:00
ID: id,
2013-11-06 11:15:26 +08:00
Oldpath: oldname,
Newpath: newname,
})
2013-11-06 11:15:26 +08:00
if err != nil {
return err
}
switch typ {
case sshFxpStatus:
return normaliseError(unmarshalStatus(id, data))
2013-11-06 11:15:26 +08:00
default:
return unimplementedPacketErr(typ)
}
}
// PosixRename renames a file using the posix-rename@openssh.com extension
// which will replace newname if it already exists.
func (c *Client) PosixRename(oldname, newname string) error {
id := c.nextID()
2023-11-13 16:13:33 +08:00
typ, data, err := c.sendPacket(context.Background(), nil, &sshFxpPosixRenamePacket{
ID: id,
Oldpath: oldname,
Newpath: newname,
})
if err != nil {
return err
}
switch typ {
case sshFxpStatus:
return normaliseError(unmarshalStatus(id, data))
default:
return unimplementedPacketErr(typ)
}
}
2021-03-06 17:03:37 +08:00
// RealPath can be used to have the server canonicalize any given path name to an absolute path.
//
// This is useful for converting path names containing ".." components,
// or relative pathnames without a leading slash into absolute paths.
func (c *Client) RealPath(path string) (string, error) {
2016-01-05 05:15:21 +08:00
id := c.nextID()
2023-11-13 16:13:33 +08:00
typ, data, err := c.sendPacket(context.Background(), nil, &sshFxpRealpathPacket{
2016-01-05 05:15:21 +08:00
ID: id,
2015-12-22 05:45:40 +08:00
Path: path,
})
if err != nil {
return "", err
}
switch typ {
case sshFxpName:
2015-12-22 05:45:40 +08:00
sid, data := unmarshalUint32(data)
if sid != id {
2016-01-05 05:15:21 +08:00
return "", &unexpectedIDErr{id, sid}
2015-12-22 05:45:40 +08:00
}
count, data := unmarshalUint32(data)
if count != 1 {
return "", unexpectedCount(1, count)
}
filename, _ := unmarshalString(data) // ignore attributes
return filename, nil
case sshFxpStatus:
return "", normaliseError(unmarshalStatus(id, data))
2015-12-22 05:45:40 +08:00
default:
return "", unimplementedPacketErr(typ)
}
}
// Getwd returns the current working directory of the server. Operations
// involving relative paths will be based at this location.
func (c *Client) Getwd() (string, error) {
return c.RealPath(".")
2015-12-22 05:45:40 +08:00
}
// Mkdir creates the specified directory. An error will be returned if a file or
// directory with the specified path already exists, or if the directory's
// parent folder does not exist (the method cannot create complete paths).
func (c *Client) Mkdir(path string) error {
2016-01-05 05:15:21 +08:00
id := c.nextID()
2023-11-13 16:13:33 +08:00
typ, data, err := c.sendPacket(context.Background(), nil, &sshFxpMkdirPacket{
2016-01-05 05:15:21 +08:00
ID: id,
Path: path,
})
if err != nil {
return err
}
switch typ {
case sshFxpStatus:
return normaliseError(unmarshalStatus(id, data))
default:
return unimplementedPacketErr(typ)
}
}
2018-04-25 14:58:10 +08:00
// MkdirAll creates a directory named path, along with any necessary parents,
// and returns nil, or else returns an error.
// If path is already a directory, MkdirAll does nothing and returns nil.
// If, while making any directory, that path is found to already be a regular file, an error is returned.
2018-04-25 14:58:10 +08:00
func (c *Client) MkdirAll(path string) error {
// Most of this code mimics https://golang.org/src/os/path.go?s=514:561#L13
// Fast path: if we can tell whether path is a directory or file, stop with success or error.
dir, err := c.Stat(path)
if err == nil {
if dir.IsDir() {
return nil
2018-04-25 14:58:10 +08:00
}
return &os.PathError{Op: "mkdir", Path: path, Err: syscall.ENOTDIR}
}
// Slow path: make sure parent exists and then call Mkdir for path.
i := len(path)
for i > 0 && path[i-1] == '/' { // Skip trailing path separator.
i--
}
j := i
for j > 0 && path[j-1] != '/' { // Scan backward over element.
j--
}
if j > 1 {
// Create parent
err = c.MkdirAll(path[0 : j-1])
2018-04-25 14:58:10 +08:00
if err != nil {
return err
}
}
// Parent now exists; invoke Mkdir and use its result.
err = c.Mkdir(path)
if err != nil {
// Handle arguments like "foo/." by
// double-checking that directory doesn't exist.
dir, err1 := c.Lstat(path)
if err1 == nil && dir.IsDir() {
return nil
}
return err
}
2018-04-25 14:58:10 +08:00
return nil
}
// RemoveAll delete files recursively in the directory and Recursively delete subdirectories.
// An error will be returned if no file or directory with the specified path exists
func (c *Client) RemoveAll(path string) error {
// Get the file/directory information
fi, err := c.Stat(path)
if err != nil {
return err
}
if fi.IsDir() {
// Delete files recursively in the directory
files, err := c.ReadDir(path)
if err != nil {
return err
}
for _, file := range files {
if file.IsDir() {
// Recursively delete subdirectories
err = c.RemoveAll(path + "/" + file.Name())
if err != nil {
return err
}
} else {
// Delete individual files
err = c.Remove(path + "/" + file.Name())
if err != nil {
return err
}
}
}
}
2023-05-19 14:08:25 +08:00
return c.Remove(path)
2023-05-19 01:27:23 +08:00
}
2013-11-06 09:42:14 +08:00
// File represents a remote file.
type File struct {
2024-01-19 09:23:22 +08:00
c *Client
path string
mu sync.RWMutex
handle string
2021-02-22 04:32:09 +08:00
offset int64 // current offset within remote file
2013-11-06 09:42:14 +08:00
}
// Close closes the File, rendering it unusable for I/O. It returns an
// error, if any.
func (f *File) Close() error {
f.mu.Lock()
defer f.mu.Unlock()
if f.handle == "" {
return os.ErrClosed
}
// The design principle here is that when `openssh-portable/sftp-server.c` is doing `handle_close`,
2024-01-19 09:23:22 +08:00
// it will unconditionally mark the handle as unused,
// so we need to also unconditionally mark this handle as invalid.
// By invalidating our local copy of the handle,
// we ensure that there cannot be any erroneous use-after-close requests sent after Close.
2024-01-19 09:23:22 +08:00
handle := f.handle
f.handle = ""
return f.c.close(handle)
2013-11-06 09:42:14 +08:00
}
// Name returns the name of the file as presented to Open or Create.
func (f *File) Name() string {
return f.path
}
// Read reads up to len(b) bytes from the File. It returns the number of bytes
// read and an error, if any. Read follows io.Reader semantics, so when Read
// encounters an error or EOF condition after successfully reading n > 0 bytes,
// it returns the number of bytes read.
//
// To maximise throughput for transferring the entire file (especially
// over high latency links) it is recommended to use WriteTo rather
// than calling Read multiple times. io.Copy will do this
// automatically.
2013-11-06 09:42:14 +08:00
func (f *File) Read(b []byte) (int, error) {
f.mu.Lock()
defer f.mu.Unlock()
n, err := f.readAt(b, f.offset)
2021-02-22 04:32:09 +08:00
f.offset += int64(n)
return n, err
}
// readChunkAt attempts to read the whole entire length of the buffer from the file starting at the offset.
// It will continue progressively reading into the buffer until it fills the whole buffer, or an error occurs.
func (f *File) readChunkAt(ch chan result, b []byte, off int64) (n int, err error) {
for err == nil && n < len(b) {
id := f.c.nextID()
2023-11-13 16:13:33 +08:00
typ, data, err := f.c.sendPacket(context.Background(), ch, &sshFxpReadPacket{
ID: id,
Handle: f.handle,
Offset: uint64(off) + uint64(n),
Len: uint32(len(b) - n),
})
if err != nil {
return n, err
}
switch typ {
case sshFxpStatus:
return n, normaliseError(unmarshalStatus(id, data))
case sshFxpData:
sid, data := unmarshalUint32(data)
if id != sid {
return n, &unexpectedIDErr{id, sid}
}
l, data := unmarshalUint32(data)
n += copy(b[n:], data[:l])
default:
return n, unimplementedPacketErr(typ)
}
}
return
}
2021-03-06 00:10:39 +08:00
func (f *File) readAtSequential(b []byte, off int64) (read int, err error) {
for read < len(b) {
rb := b[read:]
if len(rb) > f.c.maxPacket {
rb = rb[:f.c.maxPacket]
}
n, err := f.readChunkAt(nil, rb, off+int64(read))
if n < 0 {
panic("sftp.File: returned negative count from readChunkAt")
}
if n > 0 {
2021-03-06 00:10:39 +08:00
read += n
}
if err != nil {
2021-03-06 00:10:39 +08:00
return read, err
}
}
2021-03-06 00:10:39 +08:00
return read, nil
}
// ReadAt reads up to len(b) byte from the File at a given offset `off`. It returns
// the number of bytes read and an error, if any. ReadAt follows io.ReaderAt semantics,
// so the file offset is not altered during the read.
func (f *File) ReadAt(b []byte, off int64) (int, error) {
f.mu.RLock()
defer f.mu.RUnlock()
return f.readAt(b, off)
}
2024-02-06 16:12:54 +08:00
// readAt must be called while holding either the Read or Write mutex in File.
// This code is concurrent safe with itself, but not with Close.
func (f *File) readAt(b []byte, off int64) (int, error) {
2024-01-19 09:56:44 +08:00
if f.handle == "" {
return 0, os.ErrClosed
}
2024-02-06 16:12:54 +08:00
if len(b) <= f.c.maxPacket {
// This should be able to be serviced with 1/2 requests.
// So, just do it directly.
2021-02-22 04:32:09 +08:00
return f.readChunkAt(nil, b, off)
}
if f.c.disableConcurrentReads {
return f.readAtSequential(b, off)
}
2021-02-22 04:32:09 +08:00
// Split the read into multiple maxPacket-sized concurrent reads bounded by maxConcurrentRequests.
// This allows writes with a suitably large buffer to transfer data at a much faster rate
// by overlapping round trip times.
cancel := make(chan struct{})
concurrency := len(b)/f.c.maxPacket + 1
if concurrency > f.c.maxConcurrentRequests || concurrency < 1 {
concurrency = f.c.maxConcurrentRequests
}
resPool := newResChanPool(concurrency)
type work struct {
id uint32
res chan result
b []byte
off int64
Increase throughput of Read/Write. Break up Read/Write calls into multiple concurrent requests to allow the roundtrip time to the server to overlap. This provides a roughly 10x throughput increase when using large buffers over a high latency link. This does not help the naive io.Copy case since io.Copy defaults to an 8k buffer. $ go test -bench=. -integration PASS BenchmarkRead1k 20 82017395 ns/op 127.85 MB/s BenchmarkRead16k 100 14634723 ns/op 716.51 MB/s BenchmarkRead32k 100 13706765 ns/op 765.02 MB/s BenchmarkRead128k 200 9614364 ns/op 1090.65 MB/s BenchmarkRead512k 200 5778457 ns/op 1814.65 MB/s BenchmarkRead1MiB 300 5624251 ns/op 1864.41 MB/s BenchmarkRead4MiB 200 5798324 ns/op 1808.43 MB/s BenchmarkRead4MiBDelay10Msec 5 214369945 ns/op 48.91 MB/s BenchmarkRead4MiBDelay50Msec 1 1014850552 ns/op 10.33 MB/s BenchmarkRead4MiBDelay150Msec 1 3016993337 ns/op 3.48 MB/s BenchmarkWrite1k 10 200740041 ns/op 52.24 MB/s BenchmarkWrite16k 50 74597799 ns/op 140.57 MB/s BenchmarkWrite32k 20 63229429 ns/op 165.84 MB/s BenchmarkWrite128k 20 78691019 ns/op 133.25 MB/s BenchmarkWrite512k 20 64372711 ns/op 162.89 MB/s BenchmarkWrite1MiB 20 95393443 ns/op 109.92 MB/s BenchmarkWrite4MiB 20 72211301 ns/op 145.21 MB/s BenchmarkWrite4MiBDelay10Msec 3 335329748 ns/op 31.27 MB/s BenchmarkWrite4MiBDelay50Msec 1 1668562466 ns/op 6.28 MB/s BenchmarkWrite4MiBDelay150Msec 1 4535944414 ns/op 2.31 MB/s BenchmarkCopyDown10MiBDelay10Msec 1 3371273197 ns/op 3.11 MB/s BenchmarkCopyDown10MiBDelay50Msec 1 16250399252 ns/op 0.65 MB/s BenchmarkCopyDown10MiBDelay150Msec 1 48459210755 ns/op 0.22 MB/s BenchmarkCopyUp10MiBDelay10Msec 1 3410202609 ns/op 3.07 MB/s BenchmarkCopyUp10MiBDelay50Msec 1 16291168491 ns/op 0.64 MB/s BenchmarkCopyUp10MiBDelay150Msec 1 48478335678 ns/op 0.22 MB/s BenchmarkMarshalInit 2000000 716 ns/op BenchmarkMarshalOpen 2000000 638 ns/op BenchmarkMarshalWriteWorstCase 20000 61127 ns/op BenchmarkMarshalWrite1k 300000 4724 ns/op ok github.com/pkg/sftp 186.575s
2015-06-03 00:37:35 +08:00
}
workCh := make(chan work)
// Slice: cut up the Read into any number of buffers of length <= f.c.maxPacket, and at appropriate offsets.
go func() {
defer close(workCh)
b := b
2021-02-22 04:32:09 +08:00
offset := off
chunkSize := f.c.maxPacket
for len(b) > 0 {
rb := b
2021-02-22 04:32:09 +08:00
if len(rb) > chunkSize {
rb = rb[:chunkSize]
}
id := f.c.nextID()
res := resPool.Get()
f.c.dispatchRequest(res, &sshFxpReadPacket{
ID: id,
2024-02-06 16:12:54 +08:00
Handle: f.handle,
Offset: uint64(offset),
Len: uint32(chunkSize),
})
select {
case workCh <- work{id, res, rb, offset}:
case <-cancel:
return
}
offset += int64(len(rb))
b = b[len(rb):]
}
}()
2021-02-22 04:32:09 +08:00
type rErr struct {
off int64
err error
}
errCh := make(chan rErr)
var wg sync.WaitGroup
wg.Add(concurrency)
for i := 0; i < concurrency; i++ {
2021-02-22 09:09:26 +08:00
// Map_i: each worker gets work, and then performs the Read into its buffer from its respective offset.
go func() {
defer wg.Done()
Increase throughput of Read/Write. Break up Read/Write calls into multiple concurrent requests to allow the roundtrip time to the server to overlap. This provides a roughly 10x throughput increase when using large buffers over a high latency link. This does not help the naive io.Copy case since io.Copy defaults to an 8k buffer. $ go test -bench=. -integration PASS BenchmarkRead1k 20 82017395 ns/op 127.85 MB/s BenchmarkRead16k 100 14634723 ns/op 716.51 MB/s BenchmarkRead32k 100 13706765 ns/op 765.02 MB/s BenchmarkRead128k 200 9614364 ns/op 1090.65 MB/s BenchmarkRead512k 200 5778457 ns/op 1814.65 MB/s BenchmarkRead1MiB 300 5624251 ns/op 1864.41 MB/s BenchmarkRead4MiB 200 5798324 ns/op 1808.43 MB/s BenchmarkRead4MiBDelay10Msec 5 214369945 ns/op 48.91 MB/s BenchmarkRead4MiBDelay50Msec 1 1014850552 ns/op 10.33 MB/s BenchmarkRead4MiBDelay150Msec 1 3016993337 ns/op 3.48 MB/s BenchmarkWrite1k 10 200740041 ns/op 52.24 MB/s BenchmarkWrite16k 50 74597799 ns/op 140.57 MB/s BenchmarkWrite32k 20 63229429 ns/op 165.84 MB/s BenchmarkWrite128k 20 78691019 ns/op 133.25 MB/s BenchmarkWrite512k 20 64372711 ns/op 162.89 MB/s BenchmarkWrite1MiB 20 95393443 ns/op 109.92 MB/s BenchmarkWrite4MiB 20 72211301 ns/op 145.21 MB/s BenchmarkWrite4MiBDelay10Msec 3 335329748 ns/op 31.27 MB/s BenchmarkWrite4MiBDelay50Msec 1 1668562466 ns/op 6.28 MB/s BenchmarkWrite4MiBDelay150Msec 1 4535944414 ns/op 2.31 MB/s BenchmarkCopyDown10MiBDelay10Msec 1 3371273197 ns/op 3.11 MB/s BenchmarkCopyDown10MiBDelay50Msec 1 16250399252 ns/op 0.65 MB/s BenchmarkCopyDown10MiBDelay150Msec 1 48459210755 ns/op 0.22 MB/s BenchmarkCopyUp10MiBDelay10Msec 1 3410202609 ns/op 3.07 MB/s BenchmarkCopyUp10MiBDelay50Msec 1 16291168491 ns/op 0.64 MB/s BenchmarkCopyUp10MiBDelay150Msec 1 48478335678 ns/op 0.22 MB/s BenchmarkMarshalInit 2000000 716 ns/op BenchmarkMarshalOpen 2000000 638 ns/op BenchmarkMarshalWriteWorstCase 20000 61127 ns/op BenchmarkMarshalWrite1k 300000 4724 ns/op ok github.com/pkg/sftp 186.575s
2015-06-03 00:37:35 +08:00
for packet := range workCh {
var n int
s := <-packet.res
resPool.Put(packet.res)
err := s.err
if err == nil {
switch s.typ {
case sshFxpStatus:
err = normaliseError(unmarshalStatus(packet.id, s.data))
case sshFxpData:
sid, data := unmarshalUint32(s.data)
if packet.id != sid {
err = &unexpectedIDErr{packet.id, sid}
} else {
l, data := unmarshalUint32(data)
n = copy(packet.b, data[:l])
2021-06-29 02:27:13 +08:00
// For normal disk files, it is guaranteed that this will read
// the specified number of bytes, or up to end of file.
// This implies, if we have a short read, that means EOF.
if n < len(packet.b) {
err = io.EOF
}
}
default:
err = unimplementedPacketErr(s.typ)
}
}
if err != nil {
// return the offset as the start + how much we read before the error.
2021-02-22 04:32:09 +08:00
errCh <- rErr{packet.off + int64(n), err}
2024-02-06 16:12:54 +08:00
// DO NOT return.
// We want to ensure that workCh is drained before wg.Wait returns.
}
Increase throughput of Read/Write. Break up Read/Write calls into multiple concurrent requests to allow the roundtrip time to the server to overlap. This provides a roughly 10x throughput increase when using large buffers over a high latency link. This does not help the naive io.Copy case since io.Copy defaults to an 8k buffer. $ go test -bench=. -integration PASS BenchmarkRead1k 20 82017395 ns/op 127.85 MB/s BenchmarkRead16k 100 14634723 ns/op 716.51 MB/s BenchmarkRead32k 100 13706765 ns/op 765.02 MB/s BenchmarkRead128k 200 9614364 ns/op 1090.65 MB/s BenchmarkRead512k 200 5778457 ns/op 1814.65 MB/s BenchmarkRead1MiB 300 5624251 ns/op 1864.41 MB/s BenchmarkRead4MiB 200 5798324 ns/op 1808.43 MB/s BenchmarkRead4MiBDelay10Msec 5 214369945 ns/op 48.91 MB/s BenchmarkRead4MiBDelay50Msec 1 1014850552 ns/op 10.33 MB/s BenchmarkRead4MiBDelay150Msec 1 3016993337 ns/op 3.48 MB/s BenchmarkWrite1k 10 200740041 ns/op 52.24 MB/s BenchmarkWrite16k 50 74597799 ns/op 140.57 MB/s BenchmarkWrite32k 20 63229429 ns/op 165.84 MB/s BenchmarkWrite128k 20 78691019 ns/op 133.25 MB/s BenchmarkWrite512k 20 64372711 ns/op 162.89 MB/s BenchmarkWrite1MiB 20 95393443 ns/op 109.92 MB/s BenchmarkWrite4MiB 20 72211301 ns/op 145.21 MB/s BenchmarkWrite4MiBDelay10Msec 3 335329748 ns/op 31.27 MB/s BenchmarkWrite4MiBDelay50Msec 1 1668562466 ns/op 6.28 MB/s BenchmarkWrite4MiBDelay150Msec 1 4535944414 ns/op 2.31 MB/s BenchmarkCopyDown10MiBDelay10Msec 1 3371273197 ns/op 3.11 MB/s BenchmarkCopyDown10MiBDelay50Msec 1 16250399252 ns/op 0.65 MB/s BenchmarkCopyDown10MiBDelay150Msec 1 48459210755 ns/op 0.22 MB/s BenchmarkCopyUp10MiBDelay10Msec 1 3410202609 ns/op 3.07 MB/s BenchmarkCopyUp10MiBDelay50Msec 1 16291168491 ns/op 0.64 MB/s BenchmarkCopyUp10MiBDelay150Msec 1 48478335678 ns/op 0.22 MB/s BenchmarkMarshalInit 2000000 716 ns/op BenchmarkMarshalOpen 2000000 638 ns/op BenchmarkMarshalWriteWorstCase 20000 61127 ns/op BenchmarkMarshalWrite1k 300000 4724 ns/op ok github.com/pkg/sftp 186.575s
2015-06-03 00:37:35 +08:00
}
}()
}
// Wait for long tail, before closing results.
go func() {
wg.Wait()
close(errCh)
}()
2021-02-22 09:09:26 +08:00
// Reduce: collect all the results into a relevant return: the earliest offset to return an error.
2021-02-22 04:32:09 +08:00
firstErr := rErr{math.MaxInt64, nil}
for rErr := range errCh {
if rErr.off <= firstErr.off {
firstErr = rErr
}
select {
case <-cancel:
default:
// stop any more work from being distributed. (Just in case.)
close(cancel)
}
}
if firstErr.err != nil {
// firstErr.err != nil if and only if firstErr.off > our starting offset.
return int(firstErr.off - off), firstErr.err
Increase throughput of Read/Write. Break up Read/Write calls into multiple concurrent requests to allow the roundtrip time to the server to overlap. This provides a roughly 10x throughput increase when using large buffers over a high latency link. This does not help the naive io.Copy case since io.Copy defaults to an 8k buffer. $ go test -bench=. -integration PASS BenchmarkRead1k 20 82017395 ns/op 127.85 MB/s BenchmarkRead16k 100 14634723 ns/op 716.51 MB/s BenchmarkRead32k 100 13706765 ns/op 765.02 MB/s BenchmarkRead128k 200 9614364 ns/op 1090.65 MB/s BenchmarkRead512k 200 5778457 ns/op 1814.65 MB/s BenchmarkRead1MiB 300 5624251 ns/op 1864.41 MB/s BenchmarkRead4MiB 200 5798324 ns/op 1808.43 MB/s BenchmarkRead4MiBDelay10Msec 5 214369945 ns/op 48.91 MB/s BenchmarkRead4MiBDelay50Msec 1 1014850552 ns/op 10.33 MB/s BenchmarkRead4MiBDelay150Msec 1 3016993337 ns/op 3.48 MB/s BenchmarkWrite1k 10 200740041 ns/op 52.24 MB/s BenchmarkWrite16k 50 74597799 ns/op 140.57 MB/s BenchmarkWrite32k 20 63229429 ns/op 165.84 MB/s BenchmarkWrite128k 20 78691019 ns/op 133.25 MB/s BenchmarkWrite512k 20 64372711 ns/op 162.89 MB/s BenchmarkWrite1MiB 20 95393443 ns/op 109.92 MB/s BenchmarkWrite4MiB 20 72211301 ns/op 145.21 MB/s BenchmarkWrite4MiBDelay10Msec 3 335329748 ns/op 31.27 MB/s BenchmarkWrite4MiBDelay50Msec 1 1668562466 ns/op 6.28 MB/s BenchmarkWrite4MiBDelay150Msec 1 4535944414 ns/op 2.31 MB/s BenchmarkCopyDown10MiBDelay10Msec 1 3371273197 ns/op 3.11 MB/s BenchmarkCopyDown10MiBDelay50Msec 1 16250399252 ns/op 0.65 MB/s BenchmarkCopyDown10MiBDelay150Msec 1 48459210755 ns/op 0.22 MB/s BenchmarkCopyUp10MiBDelay10Msec 1 3410202609 ns/op 3.07 MB/s BenchmarkCopyUp10MiBDelay50Msec 1 16291168491 ns/op 0.64 MB/s BenchmarkCopyUp10MiBDelay150Msec 1 48478335678 ns/op 0.22 MB/s BenchmarkMarshalInit 2000000 716 ns/op BenchmarkMarshalOpen 2000000 638 ns/op BenchmarkMarshalWriteWorstCase 20000 61127 ns/op BenchmarkMarshalWrite1k 300000 4724 ns/op ok github.com/pkg/sftp 186.575s
2015-06-03 00:37:35 +08:00
}
// As per spec for io.ReaderAt, we return nil error if and only if we read everything.
return len(b), nil
2013-11-06 09:42:14 +08:00
}
2021-02-22 04:32:09 +08:00
// writeToSequential implements WriteTo, but works sequentially with no parallelism.
2021-01-23 00:45:57 +08:00
func (f *File) writeToSequential(w io.Writer) (written int64, err error) {
b := make([]byte, f.c.maxPacket)
ch := make(chan result, 1) // reusable channel
2021-02-22 04:32:09 +08:00
for {
n, err := f.readChunkAt(ch, b, f.offset)
2021-01-23 00:45:57 +08:00
if n < 0 {
panic("sftp.File: returned negative count from readChunkAt")
}
if n > 0 {
2021-02-22 04:32:09 +08:00
f.offset += int64(n)
2021-01-23 00:45:57 +08:00
m, err := w.Write(b[:n])
2021-01-23 00:45:57 +08:00
written += int64(m)
if err != nil {
return written, err
2021-01-23 00:45:57 +08:00
}
}
if err != nil {
if err == io.EOF {
return written, nil // return nil explicitly.
}
return written, err
}
}
}
2021-02-22 04:32:09 +08:00
// WriteTo writes the file to the given Writer.
// The return value is the number of bytes written.
// Any error encountered during the write is also returned.
//
2021-02-22 04:32:09 +08:00
// This method is preferred over calling Read multiple times
// to maximise throughput for transferring the entire file,
// especially over high latency links.
func (f *File) WriteTo(w io.Writer) (written int64, err error) {
f.mu.Lock()
defer f.mu.Unlock()
if f.handle == "" {
return 0, os.ErrClosed
}
if f.c.disableConcurrentReads {
return f.writeToSequential(w)
}
2021-02-22 09:09:26 +08:00
// For concurrency, we want to guess how many concurrent workers we should use.
var fileStat *FileStat
if f.c.useFstat {
fileStat, err = f.c.fstat(f.handle)
} else {
fileStat, err = f.c.stat(f.path)
}
if err != nil {
return 0, err
Implement WriteTo for *sftp.File Improve the naive io.Copy case by splitting up the transfer into multiple concurrent chunks similar to how large Read's are performed. This improves the throughput on the BenchmarkCopyDown tests by 15-20x. $ go test -bench=. -integration PASS BenchmarkRead1k 20 80039871 ns/op 131.01 MB/s BenchmarkRead16k 100 13109576 ns/op 799.86 MB/s BenchmarkRead32k 100 13002925 ns/op 806.42 MB/s BenchmarkRead128k 200 9189480 ns/op 1141.07 MB/s BenchmarkRead512k 300 5863892 ns/op 1788.21 MB/s BenchmarkRead1MiB 300 5350731 ns/op 1959.71 MB/s BenchmarkRead4MiB 300 5880209 ns/op 1783.25 MB/s BenchmarkRead4MiBDelay10Msec 5 211600615 ns/op 49.56 MB/s BenchmarkRead4MiBDelay50Msec 1 1014580728 ns/op 10.34 MB/s BenchmarkRead4MiBDelay150Msec 1 3015748763 ns/op 3.48 MB/s BenchmarkWrite1k 10 210602614 ns/op 49.79 MB/s BenchmarkWrite16k 30 53914210 ns/op 194.49 MB/s BenchmarkWrite32k 20 68630676 ns/op 152.79 MB/s BenchmarkWrite128k 50 70518854 ns/op 148.70 MB/s BenchmarkWrite512k 30 69846510 ns/op 150.13 MB/s BenchmarkWrite1MiB 30 70971873 ns/op 147.75 MB/s BenchmarkWrite4MiB 20 68902426 ns/op 152.18 MB/s BenchmarkWrite4MiBDelay10Msec 5 334770724 ns/op 31.32 MB/s BenchmarkWrite4MiBDelay50Msec 1 1439154435 ns/op 7.29 MB/s BenchmarkWrite4MiBDelay150Msec 1 4381710538 ns/op 2.39 MB/s BenchmarkCopyDown10MiBDelay10Msec 10 161331837 ns/op 64.99 MB/s BenchmarkCopyDown10MiBDelay50Msec 2 844679071 ns/op 12.41 MB/s BenchmarkCopyDown10MiBDelay150Msec 1 2721133400 ns/op 3.85 MB/s BenchmarkCopyUp10MiBDelay10Msec 1 3410147635 ns/op 3.07 MB/s BenchmarkCopyUp10MiBDelay50Msec 1 16310789039 ns/op 0.64 MB/s BenchmarkCopyUp10MiBDelay150Msec 1 48479031068 ns/op 0.22 MB/s BenchmarkMarshalInit 2000000 685 ns/op BenchmarkMarshalOpen 3000000 606 ns/op BenchmarkMarshalWriteWorstCase 20000 81904 ns/op BenchmarkMarshalWrite1k 300000 4646 ns/op ok github.com/pkg/sftp 128.842s
2015-06-03 01:41:36 +08:00
}
fileSize := fileStat.Size
if fileSize <= uint64(f.c.maxPacket) || !isRegular(fileStat.Mode) {
// only regular files are guaranteed to return (full read) xor (partial read, next error)
2021-01-23 00:45:57 +08:00
return f.writeToSequential(w)
}
2021-04-23 07:46:11 +08:00
concurrency64 := fileSize/uint64(f.c.maxPacket) + 1 // a bad guess, but better than no guess
if concurrency64 > uint64(f.c.maxConcurrentRequests) || concurrency64 < 1 {
concurrency64 = uint64(f.c.maxConcurrentRequests)
2021-01-23 00:45:57 +08:00
}
// Now that concurrency64 is saturated to an int value, we know this assignment cannot possibly overflow.
2021-04-23 07:46:11 +08:00
concurrency := int(concurrency64)
2021-01-23 00:45:57 +08:00
chunkSize := f.c.maxPacket
pool := newBufPool(concurrency, chunkSize)
resPool := newResChanPool(concurrency)
2021-01-23 00:45:57 +08:00
cancel := make(chan struct{})
2021-02-22 04:32:09 +08:00
var wg sync.WaitGroup
defer func() {
2021-02-22 09:09:26 +08:00
// Once the writing Reduce phase has ended, all the feed work needs to unconditionally stop.
2021-02-22 04:32:09 +08:00
close(cancel)
2021-02-22 09:09:26 +08:00
2021-02-22 04:32:09 +08:00
// We want to wait until all outstanding goroutines with an `f` or `f.c` reference have completed.
// Just to be sure we dont orphan any goroutines any hanging references.
wg.Wait()
}()
2021-01-23 00:45:57 +08:00
type writeWork struct {
2021-02-22 04:32:09 +08:00
b []byte
off int64
err error
2021-01-23 00:45:57 +08:00
next chan writeWork
}
2021-02-22 04:32:09 +08:00
writeCh := make(chan writeWork)
2021-01-23 00:45:57 +08:00
2021-02-22 04:32:09 +08:00
type readWork struct {
id uint32
res chan result
off int64
2021-02-22 04:32:09 +08:00
cur, next chan writeWork
}
readCh := make(chan readWork)
// Slice: hand out chunks of work on demand, with a `cur` and `next` channel built-in for sequencing.
2021-02-22 09:09:26 +08:00
go func() {
defer close(readCh)
2021-01-23 00:45:57 +08:00
2021-02-22 09:09:26 +08:00
off := f.offset
2021-01-23 00:45:57 +08:00
2021-02-22 09:09:26 +08:00
cur := writeCh
2021-02-22 04:32:09 +08:00
for {
id := f.c.nextID()
res := resPool.Get()
2021-02-22 04:32:09 +08:00
next := make(chan writeWork)
readWork := readWork{
id: id,
res: res,
off: off,
2021-02-22 04:32:09 +08:00
cur: cur,
next: next,
}
2021-01-23 00:45:57 +08:00
f.c.dispatchRequest(res, &sshFxpReadPacket{
ID: id,
Handle: f.handle,
Offset: uint64(off),
Len: uint32(chunkSize),
})
2021-02-22 04:32:09 +08:00
select {
2021-02-22 09:09:26 +08:00
case readCh <- readWork:
2021-02-22 04:32:09 +08:00
case <-cancel:
return
2021-01-23 00:45:57 +08:00
}
off += int64(chunkSize)
2021-02-22 09:09:26 +08:00
cur = next
2021-02-22 04:32:09 +08:00
}
2021-02-22 09:09:26 +08:00
}()
2021-02-22 04:32:09 +08:00
wg.Add(concurrency)
for i := 0; i < concurrency; i++ {
// Map_i: each worker gets readWork, and does the Read into a buffer at the given offset.
go func() {
defer wg.Done()
2021-02-22 09:09:26 +08:00
for readWork := range readCh {
var b []byte
var n int
s := <-readWork.res
resPool.Put(readWork.res)
err := s.err
if err == nil {
switch s.typ {
case sshFxpStatus:
err = normaliseError(unmarshalStatus(readWork.id, s.data))
case sshFxpData:
sid, data := unmarshalUint32(s.data)
if readWork.id != sid {
err = &unexpectedIDErr{readWork.id, sid}
} else {
l, data := unmarshalUint32(data)
b = pool.Get()[:l]
n = copy(b, data[:l])
b = b[:n]
}
default:
err = unimplementedPacketErr(s.typ)
}
2021-02-22 04:32:09 +08:00
}
writeWork := writeWork{
b: b,
2021-02-22 09:09:26 +08:00
off: readWork.off,
2021-02-22 04:32:09 +08:00
err: err,
next: readWork.next,
}
select {
case readWork.cur <- writeWork:
case <-cancel:
2021-01-23 00:45:57 +08:00
}
// DO NOT return.
// We want to ensure that readCh is drained before wg.Wait returns.
}
2021-02-22 04:32:09 +08:00
}()
}
2021-01-23 00:45:57 +08:00
2021-02-22 04:32:09 +08:00
// Reduce: serialize the results from the reads into sequential writes.
2021-01-23 00:45:57 +08:00
cur := writeCh
for {
2021-02-22 04:32:09 +08:00
packet, ok := <-cur
if !ok {
return written, errors.New("sftp.File.WriteTo: unexpectedly closed channel")
2021-02-22 04:32:09 +08:00
}
2021-01-23 00:45:57 +08:00
2021-02-22 09:09:26 +08:00
// Because writes are serialized, this will always be the last successfully read byte.
f.offset = packet.off + int64(len(packet.b))
2021-02-22 04:32:09 +08:00
if len(packet.b) > 0 {
n, err := w.Write(packet.b)
2021-01-23 00:45:57 +08:00
written += int64(n)
if err != nil {
return written, err
Implement WriteTo for *sftp.File Improve the naive io.Copy case by splitting up the transfer into multiple concurrent chunks similar to how large Read's are performed. This improves the throughput on the BenchmarkCopyDown tests by 15-20x. $ go test -bench=. -integration PASS BenchmarkRead1k 20 80039871 ns/op 131.01 MB/s BenchmarkRead16k 100 13109576 ns/op 799.86 MB/s BenchmarkRead32k 100 13002925 ns/op 806.42 MB/s BenchmarkRead128k 200 9189480 ns/op 1141.07 MB/s BenchmarkRead512k 300 5863892 ns/op 1788.21 MB/s BenchmarkRead1MiB 300 5350731 ns/op 1959.71 MB/s BenchmarkRead4MiB 300 5880209 ns/op 1783.25 MB/s BenchmarkRead4MiBDelay10Msec 5 211600615 ns/op 49.56 MB/s BenchmarkRead4MiBDelay50Msec 1 1014580728 ns/op 10.34 MB/s BenchmarkRead4MiBDelay150Msec 1 3015748763 ns/op 3.48 MB/s BenchmarkWrite1k 10 210602614 ns/op 49.79 MB/s BenchmarkWrite16k 30 53914210 ns/op 194.49 MB/s BenchmarkWrite32k 20 68630676 ns/op 152.79 MB/s BenchmarkWrite128k 50 70518854 ns/op 148.70 MB/s BenchmarkWrite512k 30 69846510 ns/op 150.13 MB/s BenchmarkWrite1MiB 30 70971873 ns/op 147.75 MB/s BenchmarkWrite4MiB 20 68902426 ns/op 152.18 MB/s BenchmarkWrite4MiBDelay10Msec 5 334770724 ns/op 31.32 MB/s BenchmarkWrite4MiBDelay50Msec 1 1439154435 ns/op 7.29 MB/s BenchmarkWrite4MiBDelay150Msec 1 4381710538 ns/op 2.39 MB/s BenchmarkCopyDown10MiBDelay10Msec 10 161331837 ns/op 64.99 MB/s BenchmarkCopyDown10MiBDelay50Msec 2 844679071 ns/op 12.41 MB/s BenchmarkCopyDown10MiBDelay150Msec 1 2721133400 ns/op 3.85 MB/s BenchmarkCopyUp10MiBDelay10Msec 1 3410147635 ns/op 3.07 MB/s BenchmarkCopyUp10MiBDelay50Msec 1 16310789039 ns/op 0.64 MB/s BenchmarkCopyUp10MiBDelay150Msec 1 48479031068 ns/op 0.22 MB/s BenchmarkMarshalInit 2000000 685 ns/op BenchmarkMarshalOpen 3000000 606 ns/op BenchmarkMarshalWriteWorstCase 20000 81904 ns/op BenchmarkMarshalWrite1k 300000 4646 ns/op ok github.com/pkg/sftp 128.842s
2015-06-03 01:41:36 +08:00
}
2021-02-22 04:32:09 +08:00
}
2021-01-23 00:45:57 +08:00
2021-02-22 04:32:09 +08:00
if packet.err != nil {
if packet.err == io.EOF {
return written, nil
}
2021-01-23 00:45:57 +08:00
2021-02-22 04:32:09 +08:00
return written, packet.err
}
2021-02-22 04:32:09 +08:00
pool.Put(packet.b)
cur = packet.next
}
2013-11-06 09:42:14 +08:00
}
2024-01-19 09:23:22 +08:00
// Stat returns the FileInfo structure describing file. If there is an
// error.
func (f *File) Stat() (os.FileInfo, error) {
f.mu.RLock()
defer f.mu.RUnlock()
if f.handle == "" {
return nil, os.ErrClosed
}
return f.stat()
}
func (f *File) stat() (os.FileInfo, error) {
2014-06-24 14:06:55 +08:00
fs, err := f.c.fstat(f.handle)
if err != nil {
return nil, err
2013-11-06 09:53:45 +08:00
}
2014-06-24 14:06:55 +08:00
return fileInfoFromStat(fs, path.Base(f.path)), nil
2013-11-06 09:42:14 +08:00
}
2013-11-06 10:04:40 +08:00
// Write writes len(b) bytes to the File. It returns the number of bytes
// written and an error, if any. Write returns a non-nil error when n !=
// len(b).
//
// To maximise throughput for transferring the entire file (especially
// over high latency links) it is recommended to use ReadFrom rather
// than calling Write multiple times. io.Copy will do this
// automatically.
2013-11-06 10:04:40 +08:00
func (f *File) Write(b []byte) (int, error) {
f.mu.Lock()
defer f.mu.Unlock()
if f.handle == "" {
return 0, os.ErrClosed
}
n, err := f.writeAt(b, f.offset)
2021-02-22 04:32:09 +08:00
f.offset += int64(n)
return n, err
}
func (f *File) writeChunkAt(ch chan result, b []byte, off int64) (int, error) {
2023-11-13 16:13:33 +08:00
typ, data, err := f.c.sendPacket(context.Background(), ch, &sshFxpWritePacket{
ID: f.c.nextID(),
Handle: f.handle,
Offset: uint64(off),
Length: uint32(len(b)),
Data: b,
})
if err != nil {
return 0, err
}
switch typ {
case sshFxpStatus:
id, _ := unmarshalUint32(data)
err := normaliseError(unmarshalStatus(id, data))
if err != nil {
return 0, err
}
default:
return 0, unimplementedPacketErr(typ)
}
return len(b), nil
}
2021-02-22 04:32:09 +08:00
// writeAtConcurrent implements WriterAt, but works concurrently rather than sequentially.
func (f *File) writeAtConcurrent(b []byte, off int64) (int, error) {
Increase throughput of Read/Write. Break up Read/Write calls into multiple concurrent requests to allow the roundtrip time to the server to overlap. This provides a roughly 10x throughput increase when using large buffers over a high latency link. This does not help the naive io.Copy case since io.Copy defaults to an 8k buffer. $ go test -bench=. -integration PASS BenchmarkRead1k 20 82017395 ns/op 127.85 MB/s BenchmarkRead16k 100 14634723 ns/op 716.51 MB/s BenchmarkRead32k 100 13706765 ns/op 765.02 MB/s BenchmarkRead128k 200 9614364 ns/op 1090.65 MB/s BenchmarkRead512k 200 5778457 ns/op 1814.65 MB/s BenchmarkRead1MiB 300 5624251 ns/op 1864.41 MB/s BenchmarkRead4MiB 200 5798324 ns/op 1808.43 MB/s BenchmarkRead4MiBDelay10Msec 5 214369945 ns/op 48.91 MB/s BenchmarkRead4MiBDelay50Msec 1 1014850552 ns/op 10.33 MB/s BenchmarkRead4MiBDelay150Msec 1 3016993337 ns/op 3.48 MB/s BenchmarkWrite1k 10 200740041 ns/op 52.24 MB/s BenchmarkWrite16k 50 74597799 ns/op 140.57 MB/s BenchmarkWrite32k 20 63229429 ns/op 165.84 MB/s BenchmarkWrite128k 20 78691019 ns/op 133.25 MB/s BenchmarkWrite512k 20 64372711 ns/op 162.89 MB/s BenchmarkWrite1MiB 20 95393443 ns/op 109.92 MB/s BenchmarkWrite4MiB 20 72211301 ns/op 145.21 MB/s BenchmarkWrite4MiBDelay10Msec 3 335329748 ns/op 31.27 MB/s BenchmarkWrite4MiBDelay50Msec 1 1668562466 ns/op 6.28 MB/s BenchmarkWrite4MiBDelay150Msec 1 4535944414 ns/op 2.31 MB/s BenchmarkCopyDown10MiBDelay10Msec 1 3371273197 ns/op 3.11 MB/s BenchmarkCopyDown10MiBDelay50Msec 1 16250399252 ns/op 0.65 MB/s BenchmarkCopyDown10MiBDelay150Msec 1 48459210755 ns/op 0.22 MB/s BenchmarkCopyUp10MiBDelay10Msec 1 3410202609 ns/op 3.07 MB/s BenchmarkCopyUp10MiBDelay50Msec 1 16291168491 ns/op 0.64 MB/s BenchmarkCopyUp10MiBDelay150Msec 1 48478335678 ns/op 0.22 MB/s BenchmarkMarshalInit 2000000 716 ns/op BenchmarkMarshalOpen 2000000 638 ns/op BenchmarkMarshalWriteWorstCase 20000 61127 ns/op BenchmarkMarshalWrite1k 300000 4724 ns/op ok github.com/pkg/sftp 186.575s
2015-06-03 00:37:35 +08:00
// Split the write into multiple maxPacket sized concurrent writes
// bounded by maxConcurrentRequests. This allows writes with a suitably
// large buffer to transfer data at a much faster rate due to
// overlapping round trip times.
2021-02-22 04:32:09 +08:00
cancel := make(chan struct{})
type work struct {
id uint32
res chan result
off int64
}
workCh := make(chan work)
concurrency := len(b)/f.c.maxPacket + 1
if concurrency > f.c.maxConcurrentRequests || concurrency < 1 {
concurrency = f.c.maxConcurrentRequests
}
pool := newResChanPool(concurrency)
// Slice: cut up the Read into any number of buffers of length <= f.c.maxPacket, and at appropriate offsets.
go func() {
defer close(workCh)
2021-02-22 04:32:09 +08:00
var read int
chunkSize := f.c.maxPacket
2021-02-22 04:32:09 +08:00
for read < len(b) {
wb := b[read:]
if len(wb) > chunkSize {
wb = wb[:chunkSize]
}
id := f.c.nextID()
res := pool.Get()
off := off + int64(read)
f.c.dispatchRequest(res, &sshFxpWritePacket{
ID: id,
Handle: f.handle,
Offset: uint64(off),
Length: uint32(len(wb)),
Data: wb,
})
select {
case workCh <- work{id, res, off}:
case <-cancel:
return
Increase throughput of Read/Write. Break up Read/Write calls into multiple concurrent requests to allow the roundtrip time to the server to overlap. This provides a roughly 10x throughput increase when using large buffers over a high latency link. This does not help the naive io.Copy case since io.Copy defaults to an 8k buffer. $ go test -bench=. -integration PASS BenchmarkRead1k 20 82017395 ns/op 127.85 MB/s BenchmarkRead16k 100 14634723 ns/op 716.51 MB/s BenchmarkRead32k 100 13706765 ns/op 765.02 MB/s BenchmarkRead128k 200 9614364 ns/op 1090.65 MB/s BenchmarkRead512k 200 5778457 ns/op 1814.65 MB/s BenchmarkRead1MiB 300 5624251 ns/op 1864.41 MB/s BenchmarkRead4MiB 200 5798324 ns/op 1808.43 MB/s BenchmarkRead4MiBDelay10Msec 5 214369945 ns/op 48.91 MB/s BenchmarkRead4MiBDelay50Msec 1 1014850552 ns/op 10.33 MB/s BenchmarkRead4MiBDelay150Msec 1 3016993337 ns/op 3.48 MB/s BenchmarkWrite1k 10 200740041 ns/op 52.24 MB/s BenchmarkWrite16k 50 74597799 ns/op 140.57 MB/s BenchmarkWrite32k 20 63229429 ns/op 165.84 MB/s BenchmarkWrite128k 20 78691019 ns/op 133.25 MB/s BenchmarkWrite512k 20 64372711 ns/op 162.89 MB/s BenchmarkWrite1MiB 20 95393443 ns/op 109.92 MB/s BenchmarkWrite4MiB 20 72211301 ns/op 145.21 MB/s BenchmarkWrite4MiBDelay10Msec 3 335329748 ns/op 31.27 MB/s BenchmarkWrite4MiBDelay50Msec 1 1668562466 ns/op 6.28 MB/s BenchmarkWrite4MiBDelay150Msec 1 4535944414 ns/op 2.31 MB/s BenchmarkCopyDown10MiBDelay10Msec 1 3371273197 ns/op 3.11 MB/s BenchmarkCopyDown10MiBDelay50Msec 1 16250399252 ns/op 0.65 MB/s BenchmarkCopyDown10MiBDelay150Msec 1 48459210755 ns/op 0.22 MB/s BenchmarkCopyUp10MiBDelay10Msec 1 3410202609 ns/op 3.07 MB/s BenchmarkCopyUp10MiBDelay50Msec 1 16291168491 ns/op 0.64 MB/s BenchmarkCopyUp10MiBDelay150Msec 1 48478335678 ns/op 0.22 MB/s BenchmarkMarshalInit 2000000 716 ns/op BenchmarkMarshalOpen 2000000 638 ns/op BenchmarkMarshalWriteWorstCase 20000 61127 ns/op BenchmarkMarshalWrite1k 300000 4724 ns/op ok github.com/pkg/sftp 186.575s
2015-06-03 00:37:35 +08:00
}
2021-02-22 04:32:09 +08:00
read += len(wb)
}
}()
2021-02-22 04:32:09 +08:00
type wErr struct {
off int64
err error
}
errCh := make(chan wErr)
var wg sync.WaitGroup
wg.Add(concurrency)
for i := 0; i < concurrency; i++ {
// Map_i: each worker gets work, and does the Write from each buffer to its respective offset.
go func() {
defer wg.Done()
for work := range workCh {
s := <-work.res
pool.Put(work.res)
err := s.err
if err == nil {
switch s.typ {
case sshFxpStatus:
err = normaliseError(unmarshalStatus(work.id, s.data))
default:
err = unimplementedPacketErr(s.typ)
}
}
if err != nil {
errCh <- wErr{work.off, err}
}
Increase throughput of Read/Write. Break up Read/Write calls into multiple concurrent requests to allow the roundtrip time to the server to overlap. This provides a roughly 10x throughput increase when using large buffers over a high latency link. This does not help the naive io.Copy case since io.Copy defaults to an 8k buffer. $ go test -bench=. -integration PASS BenchmarkRead1k 20 82017395 ns/op 127.85 MB/s BenchmarkRead16k 100 14634723 ns/op 716.51 MB/s BenchmarkRead32k 100 13706765 ns/op 765.02 MB/s BenchmarkRead128k 200 9614364 ns/op 1090.65 MB/s BenchmarkRead512k 200 5778457 ns/op 1814.65 MB/s BenchmarkRead1MiB 300 5624251 ns/op 1864.41 MB/s BenchmarkRead4MiB 200 5798324 ns/op 1808.43 MB/s BenchmarkRead4MiBDelay10Msec 5 214369945 ns/op 48.91 MB/s BenchmarkRead4MiBDelay50Msec 1 1014850552 ns/op 10.33 MB/s BenchmarkRead4MiBDelay150Msec 1 3016993337 ns/op 3.48 MB/s BenchmarkWrite1k 10 200740041 ns/op 52.24 MB/s BenchmarkWrite16k 50 74597799 ns/op 140.57 MB/s BenchmarkWrite32k 20 63229429 ns/op 165.84 MB/s BenchmarkWrite128k 20 78691019 ns/op 133.25 MB/s BenchmarkWrite512k 20 64372711 ns/op 162.89 MB/s BenchmarkWrite1MiB 20 95393443 ns/op 109.92 MB/s BenchmarkWrite4MiB 20 72211301 ns/op 145.21 MB/s BenchmarkWrite4MiBDelay10Msec 3 335329748 ns/op 31.27 MB/s BenchmarkWrite4MiBDelay50Msec 1 1668562466 ns/op 6.28 MB/s BenchmarkWrite4MiBDelay150Msec 1 4535944414 ns/op 2.31 MB/s BenchmarkCopyDown10MiBDelay10Msec 1 3371273197 ns/op 3.11 MB/s BenchmarkCopyDown10MiBDelay50Msec 1 16250399252 ns/op 0.65 MB/s BenchmarkCopyDown10MiBDelay150Msec 1 48459210755 ns/op 0.22 MB/s BenchmarkCopyUp10MiBDelay10Msec 1 3410202609 ns/op 3.07 MB/s BenchmarkCopyUp10MiBDelay50Msec 1 16291168491 ns/op 0.64 MB/s BenchmarkCopyUp10MiBDelay150Msec 1 48478335678 ns/op 0.22 MB/s BenchmarkMarshalInit 2000000 716 ns/op BenchmarkMarshalOpen 2000000 638 ns/op BenchmarkMarshalWriteWorstCase 20000 61127 ns/op BenchmarkMarshalWrite1k 300000 4724 ns/op ok github.com/pkg/sftp 186.575s
2015-06-03 00:37:35 +08:00
}
}()
}
// Wait for long tail, before closing results.
go func() {
wg.Wait()
close(errCh)
}()
2021-02-22 04:32:09 +08:00
// Reduce: collect all the results into a relevant return: the earliest offset to return an error.
firstErr := wErr{math.MaxInt64, nil}
for wErr := range errCh {
if wErr.off <= firstErr.off {
firstErr = wErr
}
select {
case <-cancel:
default:
// stop any more work from being distributed. (Just in case.)
close(cancel)
2013-11-08 18:24:50 +08:00
}
}
if firstErr.err != nil {
2021-02-22 04:32:09 +08:00
// firstErr.err != nil if and only if firstErr.off >= our starting offset.
return int(firstErr.off - off), firstErr.err
2013-11-08 18:24:50 +08:00
}
return len(b), nil
2013-11-08 18:24:50 +08:00
}
2021-06-30 23:09:07 +08:00
// WriteAt writes up to len(b) byte to the File at a given offset `off`. It returns
2021-02-22 04:32:09 +08:00
// the number of bytes written and an error, if any. WriteAt follows io.WriterAt semantics,
// so the file offset is not altered during the write.
2021-02-22 09:09:26 +08:00
func (f *File) WriteAt(b []byte, off int64) (written int, err error) {
f.mu.RLock()
defer f.mu.RUnlock()
if f.handle == "" {
return 0, os.ErrClosed
}
return f.writeAt(b, off)
}
2024-02-06 16:12:54 +08:00
// writeAt must be called while holding either the Read or Write mutex in File.
// This code is concurrent safe with itself, but not with Close.
func (f *File) writeAt(b []byte, off int64) (written int, err error) {
2021-02-22 04:32:09 +08:00
if len(b) <= f.c.maxPacket {
// We can do this in one write.
return f.writeChunkAt(nil, b, off)
}
if f.c.useConcurrentWrites {
return f.writeAtConcurrent(b, off)
}
2021-01-23 00:45:57 +08:00
ch := make(chan result, 1) // reusable channel
2021-02-22 04:32:09 +08:00
chunkSize := f.c.maxPacket
for written < len(b) {
wb := b[written:]
if len(wb) > chunkSize {
wb = wb[:chunkSize]
2021-01-23 00:45:57 +08:00
}
2021-02-22 04:32:09 +08:00
n, err := f.writeChunkAt(ch, wb, off+int64(written))
2021-01-23 00:45:57 +08:00
if n > 0 {
2021-02-22 04:32:09 +08:00
written += n
2021-01-23 00:45:57 +08:00
}
if err != nil {
2021-02-22 04:32:09 +08:00
return written, err
2021-01-23 00:45:57 +08:00
}
}
2021-02-22 04:32:09 +08:00
return len(b), nil
}
// ReadFromWithConcurrency implements ReaderFrom,
// but uses the given concurrency to issue multiple requests at the same time.
//
// Giving a concurrency of less than one will default to the Clients max concurrency.
//
// Otherwise, the given concurrency will be capped by the Client's max concurrency.
//
// When one needs to guarantee concurrent reads/writes, this method is preferred
// over ReadFrom.
func (f *File) ReadFromWithConcurrency(r io.Reader, concurrency int) (read int64, err error) {
f.mu.Lock()
defer f.mu.Unlock()
return f.readFromWithConcurrency(r, concurrency)
}
func (f *File) readFromWithConcurrency(r io.Reader, concurrency int) (read int64, err error) {
2024-01-19 09:56:44 +08:00
if f.handle == "" {
return 0, os.ErrClosed
}
2021-02-22 04:32:09 +08:00
// Split the write into multiple maxPacket sized concurrent writes.
// This allows writes with a suitably large reader
// to transfer data at a much faster rate due to overlapping round trip times.
Implement ReadFrom for *sftp.File Improve the naive io.Copy case by splitting up the transfer into multiple concurrent chunks similar to how large Write's are performed. This improves the throughput on the BenchmarkCopyUp tests by 15-20x. $ go test -bench=. -integration PASS BenchmarkRead1k 20 78382052 ns/op 133.78 MB/s BenchmarkRead16k 100 14038681 ns/op 746.93 MB/s BenchmarkRead32k 100 12076514 ns/op 868.29 MB/s BenchmarkRead128k 200 8892708 ns/op 1179.16 MB/s BenchmarkRead512k 300 5937224 ns/op 1766.13 MB/s BenchmarkRead1MiB 300 5383775 ns/op 1947.68 MB/s BenchmarkRead4MiB 300 5896306 ns/op 1778.38 MB/s BenchmarkRead4MiBDelay10Msec 5 213987487 ns/op 49.00 MB/s BenchmarkRead4MiBDelay50Msec 1 1013717329 ns/op 10.34 MB/s BenchmarkRead4MiBDelay150Msec 1 3012666692 ns/op 3.48 MB/s BenchmarkWrite1k 10 189878293 ns/op 55.22 MB/s BenchmarkWrite16k 50 57726712 ns/op 181.65 MB/s BenchmarkWrite32k 30 79804300 ns/op 131.39 MB/s BenchmarkWrite128k 20 71296126 ns/op 147.08 MB/s BenchmarkWrite512k 20 101823875 ns/op 102.98 MB/s BenchmarkWrite1MiB 50 70351842 ns/op 149.05 MB/s BenchmarkWrite4MiB 20 70187426 ns/op 149.40 MB/s BenchmarkWrite4MiBDelay10Msec 5 333251686 ns/op 31.47 MB/s BenchmarkWrite4MiBDelay50Msec 1 1576708254 ns/op 6.65 MB/s BenchmarkWrite4MiBDelay150Msec 1 4823796059 ns/op 2.17 MB/s BenchmarkCopyDown10MiBDelay10Msec 10 196175368 ns/op 53.45 MB/s BenchmarkCopyDown10MiBDelay50Msec 2 918624682 ns/op 11.41 MB/s BenchmarkCopyDown10MiBDelay150Msec 1 2880111274 ns/op 3.64 MB/s BenchmarkCopyUp10MiBDelay10Msec 5 246048181 ns/op 42.62 MB/s BenchmarkCopyUp10MiBDelay50Msec 2 872059111 ns/op 12.02 MB/s BenchmarkCopyUp10MiBDelay150Msec 1 2516801139 ns/op 4.17 MB/s BenchmarkMarshalInit 2000000 690 ns/op BenchmarkMarshalOpen 3000000 579 ns/op BenchmarkMarshalWriteWorstCase 20000 60438 ns/op BenchmarkMarshalWrite1k 300000 4318 ns/op ok github.com/pkg/sftp 70.210s
2015-06-03 02:15:54 +08:00
2021-02-22 04:32:09 +08:00
cancel := make(chan struct{})
type work struct {
id uint32
res chan result
off int64
}
workCh := make(chan work)
type rwErr struct {
off int64
err error
}
errCh := make(chan rwErr)
if concurrency > f.c.maxConcurrentRequests || concurrency < 1 {
concurrency = f.c.maxConcurrentRequests
}
pool := newResChanPool(concurrency)
2021-03-12 23:14:27 +08:00
// Slice: cut up the Read into any number of buffers of length <= f.c.maxPacket, and at appropriate offsets.
2021-02-22 04:32:09 +08:00
go func() {
defer close(workCh)
b := make([]byte, f.c.maxPacket)
2021-02-22 09:09:26 +08:00
off := f.offset
2021-02-22 04:32:09 +08:00
for {
2021-02-22 04:32:09 +08:00
n, err := r.Read(b)
if n > 0 {
read += int64(n)
id := f.c.nextID()
res := pool.Get()
f.c.dispatchRequest(res, &sshFxpWritePacket{
ID: id,
Handle: f.handle,
Offset: uint64(off),
Length: uint32(n),
Data: b[:n],
})
select {
case workCh <- work{id, res, off}:
case <-cancel:
return
}
2021-02-22 09:09:26 +08:00
off += int64(n)
Implement ReadFrom for *sftp.File Improve the naive io.Copy case by splitting up the transfer into multiple concurrent chunks similar to how large Write's are performed. This improves the throughput on the BenchmarkCopyUp tests by 15-20x. $ go test -bench=. -integration PASS BenchmarkRead1k 20 78382052 ns/op 133.78 MB/s BenchmarkRead16k 100 14038681 ns/op 746.93 MB/s BenchmarkRead32k 100 12076514 ns/op 868.29 MB/s BenchmarkRead128k 200 8892708 ns/op 1179.16 MB/s BenchmarkRead512k 300 5937224 ns/op 1766.13 MB/s BenchmarkRead1MiB 300 5383775 ns/op 1947.68 MB/s BenchmarkRead4MiB 300 5896306 ns/op 1778.38 MB/s BenchmarkRead4MiBDelay10Msec 5 213987487 ns/op 49.00 MB/s BenchmarkRead4MiBDelay50Msec 1 1013717329 ns/op 10.34 MB/s BenchmarkRead4MiBDelay150Msec 1 3012666692 ns/op 3.48 MB/s BenchmarkWrite1k 10 189878293 ns/op 55.22 MB/s BenchmarkWrite16k 50 57726712 ns/op 181.65 MB/s BenchmarkWrite32k 30 79804300 ns/op 131.39 MB/s BenchmarkWrite128k 20 71296126 ns/op 147.08 MB/s BenchmarkWrite512k 20 101823875 ns/op 102.98 MB/s BenchmarkWrite1MiB 50 70351842 ns/op 149.05 MB/s BenchmarkWrite4MiB 20 70187426 ns/op 149.40 MB/s BenchmarkWrite4MiBDelay10Msec 5 333251686 ns/op 31.47 MB/s BenchmarkWrite4MiBDelay50Msec 1 1576708254 ns/op 6.65 MB/s BenchmarkWrite4MiBDelay150Msec 1 4823796059 ns/op 2.17 MB/s BenchmarkCopyDown10MiBDelay10Msec 10 196175368 ns/op 53.45 MB/s BenchmarkCopyDown10MiBDelay50Msec 2 918624682 ns/op 11.41 MB/s BenchmarkCopyDown10MiBDelay150Msec 1 2880111274 ns/op 3.64 MB/s BenchmarkCopyUp10MiBDelay10Msec 5 246048181 ns/op 42.62 MB/s BenchmarkCopyUp10MiBDelay50Msec 2 872059111 ns/op 12.02 MB/s BenchmarkCopyUp10MiBDelay150Msec 1 2516801139 ns/op 4.17 MB/s BenchmarkMarshalInit 2000000 690 ns/op BenchmarkMarshalOpen 3000000 579 ns/op BenchmarkMarshalWriteWorstCase 20000 60438 ns/op BenchmarkMarshalWrite1k 300000 4318 ns/op ok github.com/pkg/sftp 70.210s
2015-06-03 02:15:54 +08:00
}
if err != nil {
if err != io.EOF {
2021-02-22 09:09:26 +08:00
errCh <- rwErr{off, err}
}
return
Implement ReadFrom for *sftp.File Improve the naive io.Copy case by splitting up the transfer into multiple concurrent chunks similar to how large Write's are performed. This improves the throughput on the BenchmarkCopyUp tests by 15-20x. $ go test -bench=. -integration PASS BenchmarkRead1k 20 78382052 ns/op 133.78 MB/s BenchmarkRead16k 100 14038681 ns/op 746.93 MB/s BenchmarkRead32k 100 12076514 ns/op 868.29 MB/s BenchmarkRead128k 200 8892708 ns/op 1179.16 MB/s BenchmarkRead512k 300 5937224 ns/op 1766.13 MB/s BenchmarkRead1MiB 300 5383775 ns/op 1947.68 MB/s BenchmarkRead4MiB 300 5896306 ns/op 1778.38 MB/s BenchmarkRead4MiBDelay10Msec 5 213987487 ns/op 49.00 MB/s BenchmarkRead4MiBDelay50Msec 1 1013717329 ns/op 10.34 MB/s BenchmarkRead4MiBDelay150Msec 1 3012666692 ns/op 3.48 MB/s BenchmarkWrite1k 10 189878293 ns/op 55.22 MB/s BenchmarkWrite16k 50 57726712 ns/op 181.65 MB/s BenchmarkWrite32k 30 79804300 ns/op 131.39 MB/s BenchmarkWrite128k 20 71296126 ns/op 147.08 MB/s BenchmarkWrite512k 20 101823875 ns/op 102.98 MB/s BenchmarkWrite1MiB 50 70351842 ns/op 149.05 MB/s BenchmarkWrite4MiB 20 70187426 ns/op 149.40 MB/s BenchmarkWrite4MiBDelay10Msec 5 333251686 ns/op 31.47 MB/s BenchmarkWrite4MiBDelay50Msec 1 1576708254 ns/op 6.65 MB/s BenchmarkWrite4MiBDelay150Msec 1 4823796059 ns/op 2.17 MB/s BenchmarkCopyDown10MiBDelay10Msec 10 196175368 ns/op 53.45 MB/s BenchmarkCopyDown10MiBDelay50Msec 2 918624682 ns/op 11.41 MB/s BenchmarkCopyDown10MiBDelay150Msec 1 2880111274 ns/op 3.64 MB/s BenchmarkCopyUp10MiBDelay10Msec 5 246048181 ns/op 42.62 MB/s BenchmarkCopyUp10MiBDelay50Msec 2 872059111 ns/op 12.02 MB/s BenchmarkCopyUp10MiBDelay150Msec 1 2516801139 ns/op 4.17 MB/s BenchmarkMarshalInit 2000000 690 ns/op BenchmarkMarshalOpen 3000000 579 ns/op BenchmarkMarshalWriteWorstCase 20000 60438 ns/op BenchmarkMarshalWrite1k 300000 4318 ns/op ok github.com/pkg/sftp 70.210s
2015-06-03 02:15:54 +08:00
}
}
2021-02-22 04:32:09 +08:00
}()
var wg sync.WaitGroup
wg.Add(concurrency)
for i := 0; i < concurrency; i++ {
// Map_i: each worker gets work, and does the Write from each buffer to its respective offset.
go func() {
defer wg.Done()
for work := range workCh {
s := <-work.res
pool.Put(work.res)
err := s.err
if err == nil {
switch s.typ {
case sshFxpStatus:
err = normaliseError(unmarshalStatus(work.id, s.data))
default:
err = unimplementedPacketErr(s.typ)
}
}
if err != nil {
errCh <- rwErr{work.off, err}
// DO NOT return.
// We want to ensure that workCh is drained before wg.Wait returns.
}
}
}()
Implement ReadFrom for *sftp.File Improve the naive io.Copy case by splitting up the transfer into multiple concurrent chunks similar to how large Write's are performed. This improves the throughput on the BenchmarkCopyUp tests by 15-20x. $ go test -bench=. -integration PASS BenchmarkRead1k 20 78382052 ns/op 133.78 MB/s BenchmarkRead16k 100 14038681 ns/op 746.93 MB/s BenchmarkRead32k 100 12076514 ns/op 868.29 MB/s BenchmarkRead128k 200 8892708 ns/op 1179.16 MB/s BenchmarkRead512k 300 5937224 ns/op 1766.13 MB/s BenchmarkRead1MiB 300 5383775 ns/op 1947.68 MB/s BenchmarkRead4MiB 300 5896306 ns/op 1778.38 MB/s BenchmarkRead4MiBDelay10Msec 5 213987487 ns/op 49.00 MB/s BenchmarkRead4MiBDelay50Msec 1 1013717329 ns/op 10.34 MB/s BenchmarkRead4MiBDelay150Msec 1 3012666692 ns/op 3.48 MB/s BenchmarkWrite1k 10 189878293 ns/op 55.22 MB/s BenchmarkWrite16k 50 57726712 ns/op 181.65 MB/s BenchmarkWrite32k 30 79804300 ns/op 131.39 MB/s BenchmarkWrite128k 20 71296126 ns/op 147.08 MB/s BenchmarkWrite512k 20 101823875 ns/op 102.98 MB/s BenchmarkWrite1MiB 50 70351842 ns/op 149.05 MB/s BenchmarkWrite4MiB 20 70187426 ns/op 149.40 MB/s BenchmarkWrite4MiBDelay10Msec 5 333251686 ns/op 31.47 MB/s BenchmarkWrite4MiBDelay50Msec 1 1576708254 ns/op 6.65 MB/s BenchmarkWrite4MiBDelay150Msec 1 4823796059 ns/op 2.17 MB/s BenchmarkCopyDown10MiBDelay10Msec 10 196175368 ns/op 53.45 MB/s BenchmarkCopyDown10MiBDelay50Msec 2 918624682 ns/op 11.41 MB/s BenchmarkCopyDown10MiBDelay150Msec 1 2880111274 ns/op 3.64 MB/s BenchmarkCopyUp10MiBDelay10Msec 5 246048181 ns/op 42.62 MB/s BenchmarkCopyUp10MiBDelay50Msec 2 872059111 ns/op 12.02 MB/s BenchmarkCopyUp10MiBDelay150Msec 1 2516801139 ns/op 4.17 MB/s BenchmarkMarshalInit 2000000 690 ns/op BenchmarkMarshalOpen 3000000 579 ns/op BenchmarkMarshalWriteWorstCase 20000 60438 ns/op BenchmarkMarshalWrite1k 300000 4318 ns/op ok github.com/pkg/sftp 70.210s
2015-06-03 02:15:54 +08:00
}
// Wait for long tail, before closing results.
go func() {
wg.Wait()
close(errCh)
}()
2021-02-22 09:09:26 +08:00
// Reduce: Collect all the results into a relevant return: the earliest offset to return an error.
2021-01-23 00:45:57 +08:00
firstErr := rwErr{math.MaxInt64, nil}
for rwErr := range errCh {
2021-01-23 00:45:57 +08:00
if rwErr.off <= firstErr.off {
firstErr = rwErr
}
select {
case <-cancel:
default:
2021-01-23 00:45:57 +08:00
// stop any more work from being distributed.
close(cancel)
}
Implement ReadFrom for *sftp.File Improve the naive io.Copy case by splitting up the transfer into multiple concurrent chunks similar to how large Write's are performed. This improves the throughput on the BenchmarkCopyUp tests by 15-20x. $ go test -bench=. -integration PASS BenchmarkRead1k 20 78382052 ns/op 133.78 MB/s BenchmarkRead16k 100 14038681 ns/op 746.93 MB/s BenchmarkRead32k 100 12076514 ns/op 868.29 MB/s BenchmarkRead128k 200 8892708 ns/op 1179.16 MB/s BenchmarkRead512k 300 5937224 ns/op 1766.13 MB/s BenchmarkRead1MiB 300 5383775 ns/op 1947.68 MB/s BenchmarkRead4MiB 300 5896306 ns/op 1778.38 MB/s BenchmarkRead4MiBDelay10Msec 5 213987487 ns/op 49.00 MB/s BenchmarkRead4MiBDelay50Msec 1 1013717329 ns/op 10.34 MB/s BenchmarkRead4MiBDelay150Msec 1 3012666692 ns/op 3.48 MB/s BenchmarkWrite1k 10 189878293 ns/op 55.22 MB/s BenchmarkWrite16k 50 57726712 ns/op 181.65 MB/s BenchmarkWrite32k 30 79804300 ns/op 131.39 MB/s BenchmarkWrite128k 20 71296126 ns/op 147.08 MB/s BenchmarkWrite512k 20 101823875 ns/op 102.98 MB/s BenchmarkWrite1MiB 50 70351842 ns/op 149.05 MB/s BenchmarkWrite4MiB 20 70187426 ns/op 149.40 MB/s BenchmarkWrite4MiBDelay10Msec 5 333251686 ns/op 31.47 MB/s BenchmarkWrite4MiBDelay50Msec 1 1576708254 ns/op 6.65 MB/s BenchmarkWrite4MiBDelay150Msec 1 4823796059 ns/op 2.17 MB/s BenchmarkCopyDown10MiBDelay10Msec 10 196175368 ns/op 53.45 MB/s BenchmarkCopyDown10MiBDelay50Msec 2 918624682 ns/op 11.41 MB/s BenchmarkCopyDown10MiBDelay150Msec 1 2880111274 ns/op 3.64 MB/s BenchmarkCopyUp10MiBDelay10Msec 5 246048181 ns/op 42.62 MB/s BenchmarkCopyUp10MiBDelay50Msec 2 872059111 ns/op 12.02 MB/s BenchmarkCopyUp10MiBDelay150Msec 1 2516801139 ns/op 4.17 MB/s BenchmarkMarshalInit 2000000 690 ns/op BenchmarkMarshalOpen 3000000 579 ns/op BenchmarkMarshalWriteWorstCase 20000 60438 ns/op BenchmarkMarshalWrite1k 300000 4318 ns/op ok github.com/pkg/sftp 70.210s
2015-06-03 02:15:54 +08:00
}
if firstErr.err != nil {
2021-02-22 04:32:09 +08:00
// firstErr.err != nil if and only if firstErr.off is a valid offset.
//
// firstErr.off will then be the lesser of:
2021-02-22 09:09:26 +08:00
// * the offset of the first error from writing,
2021-02-22 04:32:09 +08:00
// * the last successfully read offset.
//
2021-08-21 03:00:33 +08:00
// This could be less than the last successfully written offset,
2021-02-22 04:32:09 +08:00
// which is the whole reason for the UseConcurrentWrites() ClientOption.
2021-02-22 09:09:26 +08:00
//
// Callers are responsible for truncating any SFTP files to a safe length.
2021-02-22 04:32:09 +08:00
f.offset = firstErr.off
// ReadFrom is defined to return the read bytes, regardless of any writer errors.
return read, firstErr.err
Implement ReadFrom for *sftp.File Improve the naive io.Copy case by splitting up the transfer into multiple concurrent chunks similar to how large Write's are performed. This improves the throughput on the BenchmarkCopyUp tests by 15-20x. $ go test -bench=. -integration PASS BenchmarkRead1k 20 78382052 ns/op 133.78 MB/s BenchmarkRead16k 100 14038681 ns/op 746.93 MB/s BenchmarkRead32k 100 12076514 ns/op 868.29 MB/s BenchmarkRead128k 200 8892708 ns/op 1179.16 MB/s BenchmarkRead512k 300 5937224 ns/op 1766.13 MB/s BenchmarkRead1MiB 300 5383775 ns/op 1947.68 MB/s BenchmarkRead4MiB 300 5896306 ns/op 1778.38 MB/s BenchmarkRead4MiBDelay10Msec 5 213987487 ns/op 49.00 MB/s BenchmarkRead4MiBDelay50Msec 1 1013717329 ns/op 10.34 MB/s BenchmarkRead4MiBDelay150Msec 1 3012666692 ns/op 3.48 MB/s BenchmarkWrite1k 10 189878293 ns/op 55.22 MB/s BenchmarkWrite16k 50 57726712 ns/op 181.65 MB/s BenchmarkWrite32k 30 79804300 ns/op 131.39 MB/s BenchmarkWrite128k 20 71296126 ns/op 147.08 MB/s BenchmarkWrite512k 20 101823875 ns/op 102.98 MB/s BenchmarkWrite1MiB 50 70351842 ns/op 149.05 MB/s BenchmarkWrite4MiB 20 70187426 ns/op 149.40 MB/s BenchmarkWrite4MiBDelay10Msec 5 333251686 ns/op 31.47 MB/s BenchmarkWrite4MiBDelay50Msec 1 1576708254 ns/op 6.65 MB/s BenchmarkWrite4MiBDelay150Msec 1 4823796059 ns/op 2.17 MB/s BenchmarkCopyDown10MiBDelay10Msec 10 196175368 ns/op 53.45 MB/s BenchmarkCopyDown10MiBDelay50Msec 2 918624682 ns/op 11.41 MB/s BenchmarkCopyDown10MiBDelay150Msec 1 2880111274 ns/op 3.64 MB/s BenchmarkCopyUp10MiBDelay10Msec 5 246048181 ns/op 42.62 MB/s BenchmarkCopyUp10MiBDelay50Msec 2 872059111 ns/op 12.02 MB/s BenchmarkCopyUp10MiBDelay150Msec 1 2516801139 ns/op 4.17 MB/s BenchmarkMarshalInit 2000000 690 ns/op BenchmarkMarshalOpen 3000000 579 ns/op BenchmarkMarshalWriteWorstCase 20000 60438 ns/op BenchmarkMarshalWrite1k 300000 4318 ns/op ok github.com/pkg/sftp 70.210s
2015-06-03 02:15:54 +08:00
}
2021-02-22 04:32:09 +08:00
f.offset += read
return read, nil
2013-11-08 18:24:50 +08:00
}
2021-02-22 04:32:09 +08:00
// ReadFrom reads data from r until EOF and writes it to the file. The return
// value is the number of bytes read. Any error except io.EOF encountered
// during the read is also returned.
//
// This method is preferred over calling Write multiple times
// to maximise throughput for transferring the entire file,
// especially over high-latency links.
//
// To ensure concurrent writes, the given r needs to implement one of
// the following receiver methods:
//
// Len() int
// Size() int64
// Stat() (os.FileInfo, error)
//
// or be an instance of [io.LimitedReader] to determine the number of possible
// concurrent requests. Otherwise, reads/writes are performed sequentially.
// ReadFromWithConcurrency can be used explicitly to guarantee concurrent
// processing of the reader.
2021-02-22 04:32:09 +08:00
func (f *File) ReadFrom(r io.Reader) (int64, error) {
f.mu.Lock()
defer f.mu.Unlock()
if f.handle == "" {
return 0, os.ErrClosed
}
2021-02-22 04:32:09 +08:00
if f.c.useConcurrentWrites {
var remain int64
switch r := r.(type) {
case interface{ Len() int }:
remain = int64(r.Len())
case interface{ Size() int64 }:
remain = r.Size()
2021-02-22 04:32:09 +08:00
case *io.LimitedReader:
remain = r.N
case interface{ Stat() (os.FileInfo, error) }:
info, err := r.Stat()
if err == nil {
remain = info.Size()
}
2021-02-22 04:32:09 +08:00
}
if remain < 0 {
// We can strongly assert that we want default max concurrency here.
return f.readFromWithConcurrency(r, f.c.maxConcurrentRequests)
}
2021-02-22 04:32:09 +08:00
if remain > int64(f.c.maxPacket) {
// Otherwise, only use concurrency, if it would be at least two packets.
// This is the best reasonable guess we can make.
concurrency64 := remain/int64(f.c.maxPacket) + 1
// We need to cap this value to an `int` size value to avoid overflow on 32-bit machines.
// So, we may as well pre-cap it to `f.c.maxConcurrentRequests`.
if concurrency64 > int64(f.c.maxConcurrentRequests) {
concurrency64 = int64(f.c.maxConcurrentRequests)
}
return f.readFromWithConcurrency(r, int(concurrency64))
2021-02-22 04:32:09 +08:00
}
}
ch := make(chan result, 1) // reusable channel
b := make([]byte, f.c.maxPacket)
var read int64
for {
n, err := r.Read(b)
if n < 0 {
panic("sftp.File: reader returned negative count from Read")
}
if n > 0 {
read += int64(n)
m, err2 := f.writeChunkAt(ch, b[:n], f.offset)
f.offset += int64(m)
if err == nil {
err = err2
}
}
if err != nil {
if err == io.EOF {
return read, nil // return nil explicitly.
}
return read, err
}
}
}
// Seek implements io.Seeker by setting the client offset for the next Read or
// Write. It returns the next offset read. Seeking before or after the end of
// the file is undefined. Seeking relative to the end calls Stat.
func (f *File) Seek(offset int64, whence int) (int64, error) {
2021-02-22 04:32:09 +08:00
f.mu.Lock()
defer f.mu.Unlock()
if f.handle == "" {
return 0, os.ErrClosed
}
switch whence {
2018-02-16 02:27:33 +08:00
case io.SeekStart:
case io.SeekCurrent:
2021-02-22 04:32:09 +08:00
offset += f.offset
2018-02-16 02:27:33 +08:00
case io.SeekEnd:
fi, err := f.stat()
if err != nil {
2021-02-22 04:32:09 +08:00
return f.offset, err
}
2021-02-22 04:32:09 +08:00
offset += fi.Size()
default:
2021-02-22 04:32:09 +08:00
return f.offset, unimplementedSeekWhence(whence)
}
2021-02-22 04:32:09 +08:00
if offset < 0 {
return f.offset, os.ErrInvalid
}
f.offset = offset
return f.offset, nil
}
// Chown changes the uid/gid of the current file.
func (f *File) Chown(uid, gid int) error {
f.mu.RLock()
defer f.mu.RUnlock()
if f.handle == "" {
return os.ErrClosed
}
return f.c.fsetstat(f.handle, sshFileXferAttrUIDGID, &FileStat{
UID: uint32(uid),
GID: uint32(gid),
})
}
// Chmod changes the permissions of the current file.
//
// See Client.Chmod for details.
func (f *File) Chmod(mode os.FileMode) error {
f.mu.RLock()
defer f.mu.RUnlock()
if f.handle == "" {
return os.ErrClosed
}
return f.c.fsetstat(f.handle, sshFileXferAttrPermissions, toChmodPerm(mode))
}
// SetExtendedData sets extended attributes of the current file. It uses the
// SSH_FILEXFER_ATTR_EXTENDED flag in the setstat request.
//
// This flag provides a general extension mechanism for vendor-specific extensions.
// Names of the attributes should be a string of the format "name@domain", where "domain"
// is a valid, registered domain name and "name" identifies the method. Server
// implementations SHOULD ignore extended data fields that they do not understand.
func (f *File) SetExtendedData(path string, extended []StatExtended) error {
f.mu.RLock()
defer f.mu.RUnlock()
if f.handle == "" {
return os.ErrClosed
}
attrs := &FileStat{
Extended: extended,
}
return f.c.fsetstat(f.handle, sshFileXferAttrExtended, attrs)
}
// Truncate sets the size of the current file. Although it may be safely assumed
// that if the size is less than its current size it will be truncated to fit,
// the SFTP protocol does not specify what behavior the server should do when setting
// size greater than the current size.
// We send a SSH_FXP_FSETSTAT here since we have a file handle
func (f *File) Truncate(size int64) error {
f.mu.RLock()
defer f.mu.RUnlock()
if f.handle == "" {
return os.ErrClosed
}
return f.c.fsetstat(f.handle, sshFileXferAttrSize, uint64(size))
}
// Sync requests a flush of the contents of a File to stable storage.
//
// Sync requires the server to support the fsync@openssh.com extension.
func (f *File) Sync() error {
f.mu.Lock()
defer f.mu.Unlock()
if f.handle == "" {
return os.ErrClosed
}
if data, ok := f.c.HasExtension(openssh.ExtensionFSync().Name); !ok || data != "1" {
return &StatusError{
Code: sshFxOPUnsupported,
msg: "fsync not supported",
}
}
id := f.c.nextID()
2023-11-13 16:13:33 +08:00
typ, data, err := f.c.sendPacket(context.Background(), nil, &sshFxpFsyncPacket{
ID: id,
Handle: f.handle,
})
switch {
case err != nil:
return err
case typ == sshFxpStatus:
return normaliseError(unmarshalStatus(id, data))
default:
return &unexpectedPacketErr{want: sshFxpStatus, got: typ}
}
}
// normaliseError normalises an error into a more standard form that can be
// checked against stdlib errors like io.EOF or os.ErrNotExist.
func normaliseError(err error) error {
switch err := err.(type) {
case *StatusError:
switch err.Code {
case sshFxEOF:
return io.EOF
case sshFxNoSuchFile:
return os.ErrNotExist
case sshFxPermissionDenied:
return os.ErrPermission
case sshFxOk:
return nil
default:
return err
}
default:
return err
2013-11-06 12:00:04 +08:00
}
}
2013-11-14 12:32:21 +08:00
// flags converts the flags passed to OpenFile into ssh flags.
// Unsupported flags are ignored.
func toPflags(f int) uint32 {
2013-11-14 12:32:21 +08:00
var out uint32
2024-04-05 04:19:37 +08:00
switch f & (os.O_RDONLY | os.O_WRONLY | os.O_RDWR) {
2013-11-14 12:32:21 +08:00
case os.O_RDONLY:
out |= sshFxfRead
2024-04-05 04:19:37 +08:00
case os.O_WRONLY:
out |= sshFxfWrite
case os.O_RDWR:
out |= sshFxfRead | sshFxfWrite
2013-11-14 12:32:21 +08:00
}
if f&os.O_APPEND == os.O_APPEND {
out |= sshFxfAppend
2013-11-14 12:32:21 +08:00
}
if f&os.O_CREATE == os.O_CREATE {
out |= sshFxfCreat
2013-11-14 12:32:21 +08:00
}
if f&os.O_TRUNC == os.O_TRUNC {
out |= sshFxfTrunc
2013-11-14 12:32:21 +08:00
}
if f&os.O_EXCL == os.O_EXCL {
out |= sshFxfExcl
2013-11-14 12:32:21 +08:00
}
return out
}
// toChmodPerm converts Go permission bits to POSIX permission bits.
//
// This differs from fromFileMode in that we preserve the POSIX versions of
// setuid, setgid and sticky in m, because we've historically supported those
// bits, and we mask off any non-permission bits.
func toChmodPerm(m os.FileMode) (perm uint32) {
2024-04-06 04:07:40 +08:00
const mask = os.ModePerm | os.FileMode(s_ISUID|s_ISGID|s_ISVTX)
perm = uint32(m & mask)
if m&os.ModeSetuid != 0 {
perm |= s_ISUID
}
if m&os.ModeSetgid != 0 {
perm |= s_ISGID
}
if m&os.ModeSticky != 0 {
perm |= s_ISVTX
}
return perm
}