sftp/client.go

1151 lines
28 KiB
Go
Raw Normal View History

package sftp
import (
"bytes"
"encoding"
"encoding/binary"
"errors"
"fmt"
"io"
"os"
2013-11-06 12:40:35 +08:00
"path"
"sync"
"sync/atomic"
"time"
2013-11-11 09:57:03 +08:00
"github.com/kr/fs"
2013-11-07 08:31:46 +08:00
"golang.org/x/crypto/ssh"
)
// MaxPacket sets the maximum size of the payload.
func MaxPacket(size int) func(*Client) error {
return func(c *Client) error {
if size < 1<<15 {
return fmt.Errorf("size must be greater or equal to 32k")
}
c.maxPacket = size
return nil
}
}
2013-11-06 11:50:04 +08:00
// New creates a new SFTP client on conn.
func NewClient(conn *ssh.Client, opts ...func(*Client) error) (*Client, error) {
s, err := conn.NewSession()
if err != nil {
return nil, err
}
if err := s.RequestSubsystem("sftp"); err != nil {
return nil, err
}
pw, err := s.StdinPipe()
if err != nil {
return nil, err
}
pr, err := s.StdoutPipe()
if err != nil {
return nil, err
}
2014-10-10 02:49:08 +08:00
return NewClientPipe(pr, pw, opts...)
}
// NewClientPipe creates a new SFTP client given a Reader and a WriteCloser.
// This can be used for connecting to an SFTP server over TCP/TLS or by using
// the system's ssh client program (e.g. via exec.Command).
func NewClientPipe(rd io.Reader, wr io.WriteCloser, opts ...func(*Client) error) (*Client, error) {
sftp := &Client{
w: wr,
r: rd,
maxPacket: 1 << 15,
inflight: make(map[uint32]chan<- result),
recvClosed: make(chan struct{}),
}
if err := sftp.applyOptions(opts...); err != nil {
wr.Close()
return nil, err
}
if err := sftp.sendInit(); err != nil {
wr.Close()
return nil, err
}
Handle recvPacket in a single goroutine. Previously recvPacket would be invoked in several goroutines. This meant that when multiple concurrent requests were in flight there were N goroutines each waiting on recvPacket. For optimal throughput the goal is to send a new request as quickly as possible once a response is received. The previous mechanism worked counter to this because the goroutine sending new requests would be competing against N recvPacket goroutines that may become runnable as data streams in. Having a single goroutine responsible for recvPacket means that the recv and send goroutines will ping-pong back and forth optimizing throughput. This changes shows a ~10-25% increase in throughput in the the *Delay* benchmark tests. $ go test -bench=. -integration PASS BenchmarkRead1k 2 840068631 ns/op 12.48 MB/s BenchmarkRead16k 20 72968548 ns/op 143.70 MB/s BenchmarkRead32k 30 56871347 ns/op 184.38 MB/s BenchmarkRead128k 100 34150953 ns/op 307.05 MB/s BenchmarkRead512k 100 15730685 ns/op 666.59 MB/s BenchmarkRead1MiB 200 10462421 ns/op 1002.24 MB/s BenchmarkRead4MiB 200 7325236 ns/op 1431.47 MB/s BenchmarkRead4MiBDelay10Msec 10 186893765 ns/op 56.11 MB/s BenchmarkRead4MiBDelay50Msec 2 907127114 ns/op 11.56 MB/s BenchmarkRead4MiBDelay150Msec 1 2708025060 ns/op 3.87 MB/s BenchmarkWrite1k 1 1623940932 ns/op 6.46 MB/s BenchmarkWrite16k 10 174293843 ns/op 60.16 MB/s BenchmarkWrite32k 10 120377272 ns/op 87.11 MB/s BenchmarkWrite128k 20 54592205 ns/op 192.08 MB/s BenchmarkWrite512k 50 66449591 ns/op 157.80 MB/s BenchmarkWrite1MiB 50 70965660 ns/op 147.76 MB/s BenchmarkWrite4MiB 50 69234861 ns/op 151.45 MB/s BenchmarkWrite4MiBDelay10Msec 5 276624260 ns/op 37.91 MB/s BenchmarkWrite4MiBDelay50Msec 1 1318396552 ns/op 7.95 MB/s BenchmarkWrite4MiBDelay150Msec 1 3918416658 ns/op 2.68 MB/s BenchmarkCopyDown10MiBDelay10Msec 10 152240808 ns/op 68.88 MB/s BenchmarkCopyDown10MiBDelay50Msec 2 715003188 ns/op 14.67 MB/s BenchmarkCopyDown10MiBDelay150Msec 1 2116878801 ns/op 4.95 MB/s BenchmarkCopyUp10MiBDelay10Msec 10 192748258 ns/op 54.40 MB/s BenchmarkCopyUp10MiBDelay50Msec 2 691486538 ns/op 15.16 MB/s BenchmarkCopyUp10MiBDelay150Msec 1 1997162991 ns/op 5.25 MB/s BenchmarkMarshalInit 2000000 644 ns/op BenchmarkMarshalOpen 3000000 562 ns/op BenchmarkMarshalWriteWorstCase 20000 75166 ns/op BenchmarkMarshalWrite1k 500000 3862 ns/op ok github.com/pkg/sftp 71.174s
2015-06-03 04:03:39 +08:00
if err := sftp.recvVersion(); err != nil {
wr.Close()
return nil, err
}
go sftp.recv()
return sftp, nil
}
2013-11-06 10:04:40 +08:00
// Client represents an SFTP session on a *ssh.ClientConn SSH connection.
// Multiple Clients can be active on a single SSH connection, and a Client
// may be called concurrently from multiple Goroutines.
2013-11-07 14:43:06 +08:00
//
// Client implements the github.com/kr/fs.FileSystem interface.
type Client struct {
w io.WriteCloser
r io.Reader
maxPacket int // max packet size read or written.
nextid uint32
mu sync.Mutex // ensures only on request is in flight to the server at once
inflight map[uint32]chan<- result // outstanding requests
recvClosed chan struct{} // remote end has closed the connection
}
2013-11-06 10:04:40 +08:00
// Close closes the SFTP session.
func (c *Client) Close() error {
err := c.w.Close()
<-c.recvClosed
return err
}
2013-11-06 10:04:40 +08:00
// Create creates the named file mode 0666 (before umask), truncating it if
// it already exists. If successful, methods on the returned File can be
// used for I/O; the associated file descriptor has mode O_RDWR.
func (c *Client) Create(path string) (*File, error) {
2013-11-14 12:32:21 +08:00
return c.open(path, flags(os.O_RDWR|os.O_CREATE|os.O_TRUNC))
2013-11-06 10:04:40 +08:00
}
2014-10-10 02:45:30 +08:00
const sftpProtocolVersion = 3 // http://tools.ietf.org/html/draft-ietf-secsh-filexfer-02
func (c *Client) sendInit() error {
return sendPacket(c.w, sshFxInitPacket{
2014-10-10 02:45:30 +08:00
Version: sftpProtocolVersion, // http://tools.ietf.org/html/draft-ietf-secsh-filexfer-02
})
}
// returns the next value of c.nextid
func (c *Client) nextId() uint32 {
return atomic.AddUint32(&c.nextid, 1)
}
func (c *Client) recvVersion() error {
2014-10-10 02:45:30 +08:00
typ, data, err := recvPacket(c.r)
if err != nil {
return err
}
2013-11-06 16:10:28 +08:00
if typ != ssh_FXP_VERSION {
return &unexpectedPacketErr{ssh_FXP_VERSION, typ}
}
2014-10-10 02:45:30 +08:00
version, _ := unmarshalUint32(data)
if version != sftpProtocolVersion {
return &unexpectedVersionErr{sftpProtocolVersion, version}
}
return nil
}
// broadcastErr sends an error to all goroutines waiting for a response.
func (c *Client) broadcastErr(err error) {
c.mu.Lock()
2015-06-15 18:12:08 +08:00
listeners := make([]chan<- result, 0, len(c.inflight))
for _, ch := range c.inflight {
listeners = append(listeners, ch)
}
c.mu.Unlock()
for _, ch := range listeners {
ch <- result{err: err}
Handle recvPacket in a single goroutine. Previously recvPacket would be invoked in several goroutines. This meant that when multiple concurrent requests were in flight there were N goroutines each waiting on recvPacket. For optimal throughput the goal is to send a new request as quickly as possible once a response is received. The previous mechanism worked counter to this because the goroutine sending new requests would be competing against N recvPacket goroutines that may become runnable as data streams in. Having a single goroutine responsible for recvPacket means that the recv and send goroutines will ping-pong back and forth optimizing throughput. This changes shows a ~10-25% increase in throughput in the the *Delay* benchmark tests. $ go test -bench=. -integration PASS BenchmarkRead1k 2 840068631 ns/op 12.48 MB/s BenchmarkRead16k 20 72968548 ns/op 143.70 MB/s BenchmarkRead32k 30 56871347 ns/op 184.38 MB/s BenchmarkRead128k 100 34150953 ns/op 307.05 MB/s BenchmarkRead512k 100 15730685 ns/op 666.59 MB/s BenchmarkRead1MiB 200 10462421 ns/op 1002.24 MB/s BenchmarkRead4MiB 200 7325236 ns/op 1431.47 MB/s BenchmarkRead4MiBDelay10Msec 10 186893765 ns/op 56.11 MB/s BenchmarkRead4MiBDelay50Msec 2 907127114 ns/op 11.56 MB/s BenchmarkRead4MiBDelay150Msec 1 2708025060 ns/op 3.87 MB/s BenchmarkWrite1k 1 1623940932 ns/op 6.46 MB/s BenchmarkWrite16k 10 174293843 ns/op 60.16 MB/s BenchmarkWrite32k 10 120377272 ns/op 87.11 MB/s BenchmarkWrite128k 20 54592205 ns/op 192.08 MB/s BenchmarkWrite512k 50 66449591 ns/op 157.80 MB/s BenchmarkWrite1MiB 50 70965660 ns/op 147.76 MB/s BenchmarkWrite4MiB 50 69234861 ns/op 151.45 MB/s BenchmarkWrite4MiBDelay10Msec 5 276624260 ns/op 37.91 MB/s BenchmarkWrite4MiBDelay50Msec 1 1318396552 ns/op 7.95 MB/s BenchmarkWrite4MiBDelay150Msec 1 3918416658 ns/op 2.68 MB/s BenchmarkCopyDown10MiBDelay10Msec 10 152240808 ns/op 68.88 MB/s BenchmarkCopyDown10MiBDelay50Msec 2 715003188 ns/op 14.67 MB/s BenchmarkCopyDown10MiBDelay150Msec 1 2116878801 ns/op 4.95 MB/s BenchmarkCopyUp10MiBDelay10Msec 10 192748258 ns/op 54.40 MB/s BenchmarkCopyUp10MiBDelay50Msec 2 691486538 ns/op 15.16 MB/s BenchmarkCopyUp10MiBDelay150Msec 1 1997162991 ns/op 5.25 MB/s BenchmarkMarshalInit 2000000 644 ns/op BenchmarkMarshalOpen 3000000 562 ns/op BenchmarkMarshalWriteWorstCase 20000 75166 ns/op BenchmarkMarshalWrite1k 500000 3862 ns/op ok github.com/pkg/sftp 71.174s
2015-06-03 04:03:39 +08:00
}
}
// recv continuously reads from the server and forwards responses to the
// appropriate channel.
func (c *Client) recv() {
defer close(c.recvClosed)
Handle recvPacket in a single goroutine. Previously recvPacket would be invoked in several goroutines. This meant that when multiple concurrent requests were in flight there were N goroutines each waiting on recvPacket. For optimal throughput the goal is to send a new request as quickly as possible once a response is received. The previous mechanism worked counter to this because the goroutine sending new requests would be competing against N recvPacket goroutines that may become runnable as data streams in. Having a single goroutine responsible for recvPacket means that the recv and send goroutines will ping-pong back and forth optimizing throughput. This changes shows a ~10-25% increase in throughput in the the *Delay* benchmark tests. $ go test -bench=. -integration PASS BenchmarkRead1k 2 840068631 ns/op 12.48 MB/s BenchmarkRead16k 20 72968548 ns/op 143.70 MB/s BenchmarkRead32k 30 56871347 ns/op 184.38 MB/s BenchmarkRead128k 100 34150953 ns/op 307.05 MB/s BenchmarkRead512k 100 15730685 ns/op 666.59 MB/s BenchmarkRead1MiB 200 10462421 ns/op 1002.24 MB/s BenchmarkRead4MiB 200 7325236 ns/op 1431.47 MB/s BenchmarkRead4MiBDelay10Msec 10 186893765 ns/op 56.11 MB/s BenchmarkRead4MiBDelay50Msec 2 907127114 ns/op 11.56 MB/s BenchmarkRead4MiBDelay150Msec 1 2708025060 ns/op 3.87 MB/s BenchmarkWrite1k 1 1623940932 ns/op 6.46 MB/s BenchmarkWrite16k 10 174293843 ns/op 60.16 MB/s BenchmarkWrite32k 10 120377272 ns/op 87.11 MB/s BenchmarkWrite128k 20 54592205 ns/op 192.08 MB/s BenchmarkWrite512k 50 66449591 ns/op 157.80 MB/s BenchmarkWrite1MiB 50 70965660 ns/op 147.76 MB/s BenchmarkWrite4MiB 50 69234861 ns/op 151.45 MB/s BenchmarkWrite4MiBDelay10Msec 5 276624260 ns/op 37.91 MB/s BenchmarkWrite4MiBDelay50Msec 1 1318396552 ns/op 7.95 MB/s BenchmarkWrite4MiBDelay150Msec 1 3918416658 ns/op 2.68 MB/s BenchmarkCopyDown10MiBDelay10Msec 10 152240808 ns/op 68.88 MB/s BenchmarkCopyDown10MiBDelay50Msec 2 715003188 ns/op 14.67 MB/s BenchmarkCopyDown10MiBDelay150Msec 1 2116878801 ns/op 4.95 MB/s BenchmarkCopyUp10MiBDelay10Msec 10 192748258 ns/op 54.40 MB/s BenchmarkCopyUp10MiBDelay50Msec 2 691486538 ns/op 15.16 MB/s BenchmarkCopyUp10MiBDelay150Msec 1 1997162991 ns/op 5.25 MB/s BenchmarkMarshalInit 2000000 644 ns/op BenchmarkMarshalOpen 3000000 562 ns/op BenchmarkMarshalWriteWorstCase 20000 75166 ns/op BenchmarkMarshalWrite1k 500000 3862 ns/op ok github.com/pkg/sftp 71.174s
2015-06-03 04:03:39 +08:00
for {
typ, data, err := recvPacket(c.r)
if err != nil {
// Return the error to all listeners.
c.broadcastErr(err)
Handle recvPacket in a single goroutine. Previously recvPacket would be invoked in several goroutines. This meant that when multiple concurrent requests were in flight there were N goroutines each waiting on recvPacket. For optimal throughput the goal is to send a new request as quickly as possible once a response is received. The previous mechanism worked counter to this because the goroutine sending new requests would be competing against N recvPacket goroutines that may become runnable as data streams in. Having a single goroutine responsible for recvPacket means that the recv and send goroutines will ping-pong back and forth optimizing throughput. This changes shows a ~10-25% increase in throughput in the the *Delay* benchmark tests. $ go test -bench=. -integration PASS BenchmarkRead1k 2 840068631 ns/op 12.48 MB/s BenchmarkRead16k 20 72968548 ns/op 143.70 MB/s BenchmarkRead32k 30 56871347 ns/op 184.38 MB/s BenchmarkRead128k 100 34150953 ns/op 307.05 MB/s BenchmarkRead512k 100 15730685 ns/op 666.59 MB/s BenchmarkRead1MiB 200 10462421 ns/op 1002.24 MB/s BenchmarkRead4MiB 200 7325236 ns/op 1431.47 MB/s BenchmarkRead4MiBDelay10Msec 10 186893765 ns/op 56.11 MB/s BenchmarkRead4MiBDelay50Msec 2 907127114 ns/op 11.56 MB/s BenchmarkRead4MiBDelay150Msec 1 2708025060 ns/op 3.87 MB/s BenchmarkWrite1k 1 1623940932 ns/op 6.46 MB/s BenchmarkWrite16k 10 174293843 ns/op 60.16 MB/s BenchmarkWrite32k 10 120377272 ns/op 87.11 MB/s BenchmarkWrite128k 20 54592205 ns/op 192.08 MB/s BenchmarkWrite512k 50 66449591 ns/op 157.80 MB/s BenchmarkWrite1MiB 50 70965660 ns/op 147.76 MB/s BenchmarkWrite4MiB 50 69234861 ns/op 151.45 MB/s BenchmarkWrite4MiBDelay10Msec 5 276624260 ns/op 37.91 MB/s BenchmarkWrite4MiBDelay50Msec 1 1318396552 ns/op 7.95 MB/s BenchmarkWrite4MiBDelay150Msec 1 3918416658 ns/op 2.68 MB/s BenchmarkCopyDown10MiBDelay10Msec 10 152240808 ns/op 68.88 MB/s BenchmarkCopyDown10MiBDelay50Msec 2 715003188 ns/op 14.67 MB/s BenchmarkCopyDown10MiBDelay150Msec 1 2116878801 ns/op 4.95 MB/s BenchmarkCopyUp10MiBDelay10Msec 10 192748258 ns/op 54.40 MB/s BenchmarkCopyUp10MiBDelay50Msec 2 691486538 ns/op 15.16 MB/s BenchmarkCopyUp10MiBDelay150Msec 1 1997162991 ns/op 5.25 MB/s BenchmarkMarshalInit 2000000 644 ns/op BenchmarkMarshalOpen 3000000 562 ns/op BenchmarkMarshalWriteWorstCase 20000 75166 ns/op BenchmarkMarshalWrite1k 500000 3862 ns/op ok github.com/pkg/sftp 71.174s
2015-06-03 04:03:39 +08:00
return
}
sid, _ := unmarshalUint32(data)
c.mu.Lock()
ch, ok := c.inflight[sid]
delete(c.inflight, sid)
c.mu.Unlock()
if !ok {
// This is an unexpected occurrence. Send the error
// back to all listeners so that they terminate
// gracefully.
c.broadcastErr(fmt.Errorf("sid: %v not fond", sid))
Handle recvPacket in a single goroutine. Previously recvPacket would be invoked in several goroutines. This meant that when multiple concurrent requests were in flight there were N goroutines each waiting on recvPacket. For optimal throughput the goal is to send a new request as quickly as possible once a response is received. The previous mechanism worked counter to this because the goroutine sending new requests would be competing against N recvPacket goroutines that may become runnable as data streams in. Having a single goroutine responsible for recvPacket means that the recv and send goroutines will ping-pong back and forth optimizing throughput. This changes shows a ~10-25% increase in throughput in the the *Delay* benchmark tests. $ go test -bench=. -integration PASS BenchmarkRead1k 2 840068631 ns/op 12.48 MB/s BenchmarkRead16k 20 72968548 ns/op 143.70 MB/s BenchmarkRead32k 30 56871347 ns/op 184.38 MB/s BenchmarkRead128k 100 34150953 ns/op 307.05 MB/s BenchmarkRead512k 100 15730685 ns/op 666.59 MB/s BenchmarkRead1MiB 200 10462421 ns/op 1002.24 MB/s BenchmarkRead4MiB 200 7325236 ns/op 1431.47 MB/s BenchmarkRead4MiBDelay10Msec 10 186893765 ns/op 56.11 MB/s BenchmarkRead4MiBDelay50Msec 2 907127114 ns/op 11.56 MB/s BenchmarkRead4MiBDelay150Msec 1 2708025060 ns/op 3.87 MB/s BenchmarkWrite1k 1 1623940932 ns/op 6.46 MB/s BenchmarkWrite16k 10 174293843 ns/op 60.16 MB/s BenchmarkWrite32k 10 120377272 ns/op 87.11 MB/s BenchmarkWrite128k 20 54592205 ns/op 192.08 MB/s BenchmarkWrite512k 50 66449591 ns/op 157.80 MB/s BenchmarkWrite1MiB 50 70965660 ns/op 147.76 MB/s BenchmarkWrite4MiB 50 69234861 ns/op 151.45 MB/s BenchmarkWrite4MiBDelay10Msec 5 276624260 ns/op 37.91 MB/s BenchmarkWrite4MiBDelay50Msec 1 1318396552 ns/op 7.95 MB/s BenchmarkWrite4MiBDelay150Msec 1 3918416658 ns/op 2.68 MB/s BenchmarkCopyDown10MiBDelay10Msec 10 152240808 ns/op 68.88 MB/s BenchmarkCopyDown10MiBDelay50Msec 2 715003188 ns/op 14.67 MB/s BenchmarkCopyDown10MiBDelay150Msec 1 2116878801 ns/op 4.95 MB/s BenchmarkCopyUp10MiBDelay10Msec 10 192748258 ns/op 54.40 MB/s BenchmarkCopyUp10MiBDelay50Msec 2 691486538 ns/op 15.16 MB/s BenchmarkCopyUp10MiBDelay150Msec 1 1997162991 ns/op 5.25 MB/s BenchmarkMarshalInit 2000000 644 ns/op BenchmarkMarshalOpen 3000000 562 ns/op BenchmarkMarshalWriteWorstCase 20000 75166 ns/op BenchmarkMarshalWrite1k 500000 3862 ns/op ok github.com/pkg/sftp 71.174s
2015-06-03 04:03:39 +08:00
return
}
ch <- result{typ: typ, data: data}
}
}
// Walk returns a new Walker rooted at root.
2013-11-07 08:31:46 +08:00
func (c *Client) Walk(root string) *fs.Walker {
2013-11-07 14:23:51 +08:00
return fs.WalkFS(root, c)
}
2013-11-07 14:23:51 +08:00
// ReadDir reads the directory named by dirname and returns a list of
// directory entries.
func (c *Client) ReadDir(p string) ([]os.FileInfo, error) {
2013-11-06 12:40:35 +08:00
handle, err := c.opendir(p)
if err != nil {
return nil, err
}
2013-11-08 18:24:50 +08:00
defer c.close(handle) // this has to defer earlier than the lock below
var attrs []os.FileInfo
var done = false
for !done {
id := c.nextId()
typ, data, err1 := c.sendRequest(sshFxpReaddirPacket{
Id: id,
Handle: handle,
})
if err1 != nil {
err = err1
done = true
break
}
switch typ {
2013-11-06 16:10:28 +08:00
case ssh_FXP_NAME:
sid, data := unmarshalUint32(data)
if sid != id {
return nil, &unexpectedIdErr{id, sid}
}
count, data := unmarshalUint32(data)
for i := uint32(0); i < count; i++ {
var filename string
filename, data = unmarshalString(data)
_, data = unmarshalString(data) // discard longname
var attr *FileStat
attr, data = unmarshalAttrs(data)
if filename == "." || filename == ".." {
continue
}
attrs = append(attrs, fileInfoFromStat(attr, path.Base(filename)))
}
2013-11-06 16:10:28 +08:00
case ssh_FXP_STATUS:
// TODO(dfc) scope warning!
err = eofOrErr(unmarshalStatus(id, data))
done = true
default:
return nil, unimplementedPacketErr(typ)
}
}
if err == io.EOF {
err = nil
}
return attrs, err
}
func (c *Client) opendir(path string) (string, error) {
id := c.nextId()
typ, data, err := c.sendRequest(sshFxpOpendirPacket{
Id: id,
Path: path,
})
if err != nil {
return "", err
}
switch typ {
2013-11-06 16:10:28 +08:00
case ssh_FXP_HANDLE:
sid, data := unmarshalUint32(data)
if sid != id {
return "", &unexpectedIdErr{id, sid}
}
handle, _ := unmarshalString(data)
return handle, nil
2013-11-06 16:10:28 +08:00
case ssh_FXP_STATUS:
2013-11-06 11:50:04 +08:00
return "", unmarshalStatus(id, data)
default:
return "", unimplementedPacketErr(typ)
}
}
func (c *Client) Stat(p string) (os.FileInfo, error) {
id := c.nextId()
typ, data, err := c.sendRequest(sshFxpStatPacket{
Id: id,
Path: p,
})
if err != nil {
return nil, err
}
switch typ {
case ssh_FXP_ATTRS:
sid, data := unmarshalUint32(data)
if sid != id {
return nil, &unexpectedIdErr{id, sid}
}
attr, _ := unmarshalAttrs(data)
return fileInfoFromStat(attr, path.Base(p)), nil
case ssh_FXP_STATUS:
return nil, unmarshalStatus(id, data)
default:
return nil, unimplementedPacketErr(typ)
}
}
2013-11-06 12:40:35 +08:00
func (c *Client) Lstat(p string) (os.FileInfo, error) {
id := c.nextId()
typ, data, err := c.sendRequest(sshFxpLstatPacket{
Id: id,
2013-11-06 12:40:35 +08:00
Path: p,
})
if err != nil {
return nil, err
}
switch typ {
2013-11-06 16:10:28 +08:00
case ssh_FXP_ATTRS:
sid, data := unmarshalUint32(data)
if sid != id {
return nil, &unexpectedIdErr{id, sid}
}
attr, _ := unmarshalAttrs(data)
return fileInfoFromStat(attr, path.Base(p)), nil
2013-11-06 16:10:28 +08:00
case ssh_FXP_STATUS:
2013-11-06 11:50:04 +08:00
return nil, unmarshalStatus(id, data)
default:
return nil, unimplementedPacketErr(typ)
}
}
2013-11-06 08:04:26 +08:00
2014-09-24 02:42:28 +08:00
// ReadLink reads the target of a symbolic link.
2014-09-23 10:29:20 +08:00
func (c *Client) ReadLink(p string) (string, error) {
id := c.nextId()
2014-09-28 09:57:44 +08:00
typ, data, err := c.sendRequest(sshFxpReadlinkPacket{
2014-09-23 10:29:20 +08:00
Id: id,
Path: p,
})
if err != nil {
return "", err
}
switch typ {
case ssh_FXP_NAME:
sid, data := unmarshalUint32(data)
if sid != id {
return "", &unexpectedIdErr{id, sid}
}
count, data := unmarshalUint32(data)
if count != 1 {
2014-09-24 02:42:28 +08:00
return "", unexpectedCount(1, count)
2014-09-23 10:29:20 +08:00
}
2014-09-24 02:42:28 +08:00
filename, _ := unmarshalString(data) // ignore dummy attributes
2014-09-23 10:29:20 +08:00
return filename, nil
case ssh_FXP_STATUS:
return "", unmarshalStatus(id, data)
default:
return "", unimplementedPacketErr(typ)
}
}
2015-09-07 16:05:16 +08:00
// Symlink creates a symbolic link at 'newname', pointing at target 'oldname'
func (c *Client) Symlink(oldname, newname string) error {
id := c.nextId()
typ, data, err := c.sendRequest(sshFxpSymlinkPacket{
Id: id,
Linkpath: newname,
Targetpath: oldname,
})
if err != nil {
return err
}
switch typ {
case ssh_FXP_STATUS:
return okOrErr(unmarshalStatus(id, data))
default:
return unimplementedPacketErr(typ)
}
}
// setstat is a convience wrapper to allow for changing of various parts of the file descriptor.
func (c *Client) setstat(path string, flags uint32, attrs interface{}) error {
id := c.nextId()
typ, data, err := c.sendRequest(sshFxpSetstatPacket{
Id: id,
Path: path,
Flags: flags,
Attrs: attrs,
})
if err != nil {
return err
}
switch typ {
case ssh_FXP_STATUS:
return okOrErr(unmarshalStatus(id, data))
default:
return unimplementedPacketErr(typ)
}
}
// Chtimes changes the access and modification times of the named file.
func (c *Client) Chtimes(path string, atime time.Time, mtime time.Time) error {
type times struct {
Atime uint32
Mtime uint32
}
2014-06-23 04:01:04 +08:00
attrs := times{uint32(atime.Unix()), uint32(mtime.Unix())}
return c.setstat(path, ssh_FILEXFER_ATTR_ACMODTIME, attrs)
}
// Chown changes the user and group owners of the named file.
func (c *Client) Chown(path string, uid, gid int) error {
type owner struct {
Uid uint32
Gid uint32
}
2014-06-23 04:01:04 +08:00
attrs := owner{uint32(uid), uint32(gid)}
return c.setstat(path, ssh_FILEXFER_ATTR_UIDGID, attrs)
}
// Chmod changes the permissions of the named file.
func (c *Client) Chmod(path string, mode os.FileMode) error {
2014-06-23 04:01:04 +08:00
return c.setstat(path, ssh_FILEXFER_ATTR_PERMISSIONS, uint32(mode))
}
// Truncate sets the size of the named file. Although it may be safely assumed
// that if the size is less than its current size it will be truncated to fit,
// the SFTP protocol does not specify what behavior the server should do when setting
// size greater than the current size.
func (c *Client) Truncate(path string, size int64) error {
return c.setstat(path, ssh_FILEXFER_ATTR_SIZE, uint64(size))
}
2013-11-06 08:04:26 +08:00
// Open opens the named file for reading. If successful, methods on the
// returned file can be used for reading; the associated file descriptor
// has mode O_RDONLY.
func (c *Client) Open(path string) (*File, error) {
2013-11-14 12:32:21 +08:00
return c.open(path, flags(os.O_RDONLY))
}
// OpenFile is the generalized open call; most users will use Open or
// Create instead. It opens the named file with specified flag (O_RDONLY
// etc.). If successful, methods on the returned File can be used for I/O.
func (c *Client) OpenFile(path string, f int) (*File, error) {
return c.open(path, flags(f))
2013-11-06 09:53:45 +08:00
}
func (c *Client) open(path string, pflags uint32) (*File, error) {
2013-11-06 08:04:26 +08:00
id := c.nextId()
typ, data, err := c.sendRequest(sshFxpOpenPacket{
2013-11-06 08:04:26 +08:00
Id: id,
Path: path,
2013-11-06 09:53:45 +08:00
Pflags: pflags,
})
2013-11-06 08:04:26 +08:00
if err != nil {
return nil, err
}
switch typ {
2013-11-06 16:10:28 +08:00
case ssh_FXP_HANDLE:
2013-11-06 08:04:26 +08:00
sid, data := unmarshalUint32(data)
if sid != id {
return nil, &unexpectedIdErr{id, sid}
}
handle, _ := unmarshalString(data)
2013-11-06 09:36:05 +08:00
return &File{c: c, path: path, handle: handle}, nil
2013-11-06 16:10:28 +08:00
case ssh_FXP_STATUS:
2013-11-06 11:50:04 +08:00
return nil, unmarshalStatus(id, data)
2013-11-06 08:04:26 +08:00
default:
return nil, unimplementedPacketErr(typ)
}
}
2013-11-06 08:30:01 +08:00
// close closes a handle handle previously returned in the response
// to SSH_FXP_OPEN or SSH_FXP_OPENDIR. The handle becomes invalid
// immediately after this request has been sent.
func (c *Client) close(handle string) error {
id := c.nextId()
typ, data, err := c.sendRequest(sshFxpClosePacket{
2013-11-06 08:30:01 +08:00
Id: id,
Handle: handle,
})
2013-11-06 08:30:01 +08:00
if err != nil {
return err
}
switch typ {
2013-11-06 16:10:28 +08:00
case ssh_FXP_STATUS:
2013-11-06 11:50:04 +08:00
return okOrErr(unmarshalStatus(id, data))
2013-11-06 08:30:01 +08:00
default:
return unimplementedPacketErr(typ)
}
}
2013-11-06 09:36:05 +08:00
func (c *Client) fstat(handle string) (*FileStat, error) {
2013-11-06 09:36:05 +08:00
id := c.nextId()
typ, data, err := c.sendRequest(sshFxpFstatPacket{
2013-11-06 09:36:05 +08:00
Id: id,
Handle: handle,
})
2013-11-06 09:36:05 +08:00
if err != nil {
return nil, err
}
switch typ {
2013-11-06 16:10:28 +08:00
case ssh_FXP_ATTRS:
2013-11-06 09:36:05 +08:00
sid, data := unmarshalUint32(data)
if sid != id {
return nil, &unexpectedIdErr{id, sid}
}
attr, _ := unmarshalAttrs(data)
return attr, nil
2013-11-06 16:10:28 +08:00
case ssh_FXP_STATUS:
2013-11-06 11:50:04 +08:00
return nil, unmarshalStatus(id, data)
2013-11-06 09:36:05 +08:00
default:
return nil, unimplementedPacketErr(typ)
}
}
2013-11-06 09:42:14 +08:00
// Get vfs stats from remote host.
// Implementing statvfs@openssh.com SSH_FXP_EXTENDED feature
// from http://www.opensource.apple.com/source/OpenSSH/OpenSSH-175/openssh/PROTOCOL?txt
func (c *Client) StatVFS(path string) (*StatVFS, error) {
// send the StatVFS packet to the server
id := c.nextId()
typ, data, err := c.sendRequest(sshFxpStatvfsPacket{
Id: id,
Path: path,
})
if err != nil {
return nil, err
}
switch typ {
// server responded with valid data
case ssh_FXP_EXTENDED_REPLY:
var response StatVFS
err = binary.Read(bytes.NewReader(data), binary.BigEndian, &response)
if err != nil {
return nil, errors.New("can not parse reply")
}
return &response, nil
// the resquest failed
case ssh_FXP_STATUS:
return nil, errors.New(fxp(ssh_FXP_STATUS).String())
default:
return nil, unimplementedPacketErr(typ)
}
}
2013-11-07 14:23:51 +08:00
// Join joins any number of path elements into a single path, adding a
// separating slash if necessary. The result is Cleaned; in particular, all
// empty strings are ignored.
func (c *Client) Join(elem ...string) string { return path.Join(elem...) }
// Remove removes the specified file or directory. An error will be returned if no
// file or directory with the specified path exists, or if the specified directory
// is not empty.
2013-11-06 11:08:26 +08:00
func (c *Client) Remove(path string) error {
err := c.removeFile(path)
2014-05-25 12:31:57 +08:00
if status, ok := err.(*StatusError); ok && status.Code == ssh_FX_FAILURE {
err = c.removeDirectory(path)
}
return err
}
func (c *Client) removeFile(path string) error {
2013-11-06 11:08:26 +08:00
id := c.nextId()
typ, data, err := c.sendRequest(sshFxpRemovePacket{
2013-11-06 11:08:26 +08:00
Id: id,
Filename: path,
})
2013-11-06 11:08:26 +08:00
if err != nil {
return err
}
switch typ {
2013-11-06 16:10:28 +08:00
case ssh_FXP_STATUS:
2013-11-06 11:50:04 +08:00
return okOrErr(unmarshalStatus(id, data))
2013-11-06 11:08:26 +08:00
default:
return unimplementedPacketErr(typ)
}
}
func (c *Client) removeDirectory(path string) error {
id := c.nextId()
typ, data, err := c.sendRequest(sshFxpRmdirPacket{
Id: id,
Path: path,
})
if err != nil {
return err
}
switch typ {
case ssh_FXP_STATUS:
return okOrErr(unmarshalStatus(id, data))
default:
return unimplementedPacketErr(typ)
}
}
2013-11-06 11:15:26 +08:00
// Rename renames a file.
func (c *Client) Rename(oldname, newname string) error {
id := c.nextId()
typ, data, err := c.sendRequest(sshFxpRenamePacket{
2013-11-06 11:15:26 +08:00
Id: id,
Oldpath: oldname,
Newpath: newname,
})
2013-11-06 11:15:26 +08:00
if err != nil {
return err
}
switch typ {
2013-11-06 16:10:28 +08:00
case ssh_FXP_STATUS:
2013-11-06 11:15:26 +08:00
return okOrErr(unmarshalStatus(id, data))
default:
return unimplementedPacketErr(typ)
}
}
2015-05-24 12:51:18 +08:00
// result captures the result of receiving the a packet from the server
type result struct {
typ byte
data []byte
err error
}
2015-05-24 12:56:15 +08:00
type idmarshaler interface {
id() uint32
encoding.BinaryMarshaler
2015-05-24 12:56:15 +08:00
}
func (c *Client) sendRequest(p idmarshaler) (byte, []byte, error) {
2015-06-15 18:12:08 +08:00
ch := make(chan result, 1)
Increase throughput of Read/Write. Break up Read/Write calls into multiple concurrent requests to allow the roundtrip time to the server to overlap. This provides a roughly 10x throughput increase when using large buffers over a high latency link. This does not help the naive io.Copy case since io.Copy defaults to an 8k buffer. $ go test -bench=. -integration PASS BenchmarkRead1k 20 82017395 ns/op 127.85 MB/s BenchmarkRead16k 100 14634723 ns/op 716.51 MB/s BenchmarkRead32k 100 13706765 ns/op 765.02 MB/s BenchmarkRead128k 200 9614364 ns/op 1090.65 MB/s BenchmarkRead512k 200 5778457 ns/op 1814.65 MB/s BenchmarkRead1MiB 300 5624251 ns/op 1864.41 MB/s BenchmarkRead4MiB 200 5798324 ns/op 1808.43 MB/s BenchmarkRead4MiBDelay10Msec 5 214369945 ns/op 48.91 MB/s BenchmarkRead4MiBDelay50Msec 1 1014850552 ns/op 10.33 MB/s BenchmarkRead4MiBDelay150Msec 1 3016993337 ns/op 3.48 MB/s BenchmarkWrite1k 10 200740041 ns/op 52.24 MB/s BenchmarkWrite16k 50 74597799 ns/op 140.57 MB/s BenchmarkWrite32k 20 63229429 ns/op 165.84 MB/s BenchmarkWrite128k 20 78691019 ns/op 133.25 MB/s BenchmarkWrite512k 20 64372711 ns/op 162.89 MB/s BenchmarkWrite1MiB 20 95393443 ns/op 109.92 MB/s BenchmarkWrite4MiB 20 72211301 ns/op 145.21 MB/s BenchmarkWrite4MiBDelay10Msec 3 335329748 ns/op 31.27 MB/s BenchmarkWrite4MiBDelay50Msec 1 1668562466 ns/op 6.28 MB/s BenchmarkWrite4MiBDelay150Msec 1 4535944414 ns/op 2.31 MB/s BenchmarkCopyDown10MiBDelay10Msec 1 3371273197 ns/op 3.11 MB/s BenchmarkCopyDown10MiBDelay50Msec 1 16250399252 ns/op 0.65 MB/s BenchmarkCopyDown10MiBDelay150Msec 1 48459210755 ns/op 0.22 MB/s BenchmarkCopyUp10MiBDelay10Msec 1 3410202609 ns/op 3.07 MB/s BenchmarkCopyUp10MiBDelay50Msec 1 16291168491 ns/op 0.64 MB/s BenchmarkCopyUp10MiBDelay150Msec 1 48478335678 ns/op 0.22 MB/s BenchmarkMarshalInit 2000000 716 ns/op BenchmarkMarshalOpen 2000000 638 ns/op BenchmarkMarshalWriteWorstCase 20000 61127 ns/op BenchmarkMarshalWrite1k 300000 4724 ns/op ok github.com/pkg/sftp 186.575s
2015-06-03 00:37:35 +08:00
c.dispatchRequest(ch, p)
s := <-ch
return s.typ, s.data, s.err
}
Increase throughput of Read/Write. Break up Read/Write calls into multiple concurrent requests to allow the roundtrip time to the server to overlap. This provides a roughly 10x throughput increase when using large buffers over a high latency link. This does not help the naive io.Copy case since io.Copy defaults to an 8k buffer. $ go test -bench=. -integration PASS BenchmarkRead1k 20 82017395 ns/op 127.85 MB/s BenchmarkRead16k 100 14634723 ns/op 716.51 MB/s BenchmarkRead32k 100 13706765 ns/op 765.02 MB/s BenchmarkRead128k 200 9614364 ns/op 1090.65 MB/s BenchmarkRead512k 200 5778457 ns/op 1814.65 MB/s BenchmarkRead1MiB 300 5624251 ns/op 1864.41 MB/s BenchmarkRead4MiB 200 5798324 ns/op 1808.43 MB/s BenchmarkRead4MiBDelay10Msec 5 214369945 ns/op 48.91 MB/s BenchmarkRead4MiBDelay50Msec 1 1014850552 ns/op 10.33 MB/s BenchmarkRead4MiBDelay150Msec 1 3016993337 ns/op 3.48 MB/s BenchmarkWrite1k 10 200740041 ns/op 52.24 MB/s BenchmarkWrite16k 50 74597799 ns/op 140.57 MB/s BenchmarkWrite32k 20 63229429 ns/op 165.84 MB/s BenchmarkWrite128k 20 78691019 ns/op 133.25 MB/s BenchmarkWrite512k 20 64372711 ns/op 162.89 MB/s BenchmarkWrite1MiB 20 95393443 ns/op 109.92 MB/s BenchmarkWrite4MiB 20 72211301 ns/op 145.21 MB/s BenchmarkWrite4MiBDelay10Msec 3 335329748 ns/op 31.27 MB/s BenchmarkWrite4MiBDelay50Msec 1 1668562466 ns/op 6.28 MB/s BenchmarkWrite4MiBDelay150Msec 1 4535944414 ns/op 2.31 MB/s BenchmarkCopyDown10MiBDelay10Msec 1 3371273197 ns/op 3.11 MB/s BenchmarkCopyDown10MiBDelay50Msec 1 16250399252 ns/op 0.65 MB/s BenchmarkCopyDown10MiBDelay150Msec 1 48459210755 ns/op 0.22 MB/s BenchmarkCopyUp10MiBDelay10Msec 1 3410202609 ns/op 3.07 MB/s BenchmarkCopyUp10MiBDelay50Msec 1 16291168491 ns/op 0.64 MB/s BenchmarkCopyUp10MiBDelay150Msec 1 48478335678 ns/op 0.22 MB/s BenchmarkMarshalInit 2000000 716 ns/op BenchmarkMarshalOpen 2000000 638 ns/op BenchmarkMarshalWriteWorstCase 20000 61127 ns/op BenchmarkMarshalWrite1k 300000 4724 ns/op ok github.com/pkg/sftp 186.575s
2015-06-03 00:37:35 +08:00
func (c *Client) dispatchRequest(ch chan<- result, p idmarshaler) {
2013-11-06 10:04:40 +08:00
c.mu.Lock()
Increase throughput of Read/Write. Break up Read/Write calls into multiple concurrent requests to allow the roundtrip time to the server to overlap. This provides a roughly 10x throughput increase when using large buffers over a high latency link. This does not help the naive io.Copy case since io.Copy defaults to an 8k buffer. $ go test -bench=. -integration PASS BenchmarkRead1k 20 82017395 ns/op 127.85 MB/s BenchmarkRead16k 100 14634723 ns/op 716.51 MB/s BenchmarkRead32k 100 13706765 ns/op 765.02 MB/s BenchmarkRead128k 200 9614364 ns/op 1090.65 MB/s BenchmarkRead512k 200 5778457 ns/op 1814.65 MB/s BenchmarkRead1MiB 300 5624251 ns/op 1864.41 MB/s BenchmarkRead4MiB 200 5798324 ns/op 1808.43 MB/s BenchmarkRead4MiBDelay10Msec 5 214369945 ns/op 48.91 MB/s BenchmarkRead4MiBDelay50Msec 1 1014850552 ns/op 10.33 MB/s BenchmarkRead4MiBDelay150Msec 1 3016993337 ns/op 3.48 MB/s BenchmarkWrite1k 10 200740041 ns/op 52.24 MB/s BenchmarkWrite16k 50 74597799 ns/op 140.57 MB/s BenchmarkWrite32k 20 63229429 ns/op 165.84 MB/s BenchmarkWrite128k 20 78691019 ns/op 133.25 MB/s BenchmarkWrite512k 20 64372711 ns/op 162.89 MB/s BenchmarkWrite1MiB 20 95393443 ns/op 109.92 MB/s BenchmarkWrite4MiB 20 72211301 ns/op 145.21 MB/s BenchmarkWrite4MiBDelay10Msec 3 335329748 ns/op 31.27 MB/s BenchmarkWrite4MiBDelay50Msec 1 1668562466 ns/op 6.28 MB/s BenchmarkWrite4MiBDelay150Msec 1 4535944414 ns/op 2.31 MB/s BenchmarkCopyDown10MiBDelay10Msec 1 3371273197 ns/op 3.11 MB/s BenchmarkCopyDown10MiBDelay50Msec 1 16250399252 ns/op 0.65 MB/s BenchmarkCopyDown10MiBDelay150Msec 1 48459210755 ns/op 0.22 MB/s BenchmarkCopyUp10MiBDelay10Msec 1 3410202609 ns/op 3.07 MB/s BenchmarkCopyUp10MiBDelay50Msec 1 16291168491 ns/op 0.64 MB/s BenchmarkCopyUp10MiBDelay150Msec 1 48478335678 ns/op 0.22 MB/s BenchmarkMarshalInit 2000000 716 ns/op BenchmarkMarshalOpen 2000000 638 ns/op BenchmarkMarshalWriteWorstCase 20000 61127 ns/op BenchmarkMarshalWrite1k 300000 4724 ns/op ok github.com/pkg/sftp 186.575s
2015-06-03 00:37:35 +08:00
c.inflight[p.id()] = ch
2015-06-15 18:12:08 +08:00
if err := sendPacket(c.w, p); err != nil {
Handle recvPacket in a single goroutine. Previously recvPacket would be invoked in several goroutines. This meant that when multiple concurrent requests were in flight there were N goroutines each waiting on recvPacket. For optimal throughput the goal is to send a new request as quickly as possible once a response is received. The previous mechanism worked counter to this because the goroutine sending new requests would be competing against N recvPacket goroutines that may become runnable as data streams in. Having a single goroutine responsible for recvPacket means that the recv and send goroutines will ping-pong back and forth optimizing throughput. This changes shows a ~10-25% increase in throughput in the the *Delay* benchmark tests. $ go test -bench=. -integration PASS BenchmarkRead1k 2 840068631 ns/op 12.48 MB/s BenchmarkRead16k 20 72968548 ns/op 143.70 MB/s BenchmarkRead32k 30 56871347 ns/op 184.38 MB/s BenchmarkRead128k 100 34150953 ns/op 307.05 MB/s BenchmarkRead512k 100 15730685 ns/op 666.59 MB/s BenchmarkRead1MiB 200 10462421 ns/op 1002.24 MB/s BenchmarkRead4MiB 200 7325236 ns/op 1431.47 MB/s BenchmarkRead4MiBDelay10Msec 10 186893765 ns/op 56.11 MB/s BenchmarkRead4MiBDelay50Msec 2 907127114 ns/op 11.56 MB/s BenchmarkRead4MiBDelay150Msec 1 2708025060 ns/op 3.87 MB/s BenchmarkWrite1k 1 1623940932 ns/op 6.46 MB/s BenchmarkWrite16k 10 174293843 ns/op 60.16 MB/s BenchmarkWrite32k 10 120377272 ns/op 87.11 MB/s BenchmarkWrite128k 20 54592205 ns/op 192.08 MB/s BenchmarkWrite512k 50 66449591 ns/op 157.80 MB/s BenchmarkWrite1MiB 50 70965660 ns/op 147.76 MB/s BenchmarkWrite4MiB 50 69234861 ns/op 151.45 MB/s BenchmarkWrite4MiBDelay10Msec 5 276624260 ns/op 37.91 MB/s BenchmarkWrite4MiBDelay50Msec 1 1318396552 ns/op 7.95 MB/s BenchmarkWrite4MiBDelay150Msec 1 3918416658 ns/op 2.68 MB/s BenchmarkCopyDown10MiBDelay10Msec 10 152240808 ns/op 68.88 MB/s BenchmarkCopyDown10MiBDelay50Msec 2 715003188 ns/op 14.67 MB/s BenchmarkCopyDown10MiBDelay150Msec 1 2116878801 ns/op 4.95 MB/s BenchmarkCopyUp10MiBDelay10Msec 10 192748258 ns/op 54.40 MB/s BenchmarkCopyUp10MiBDelay50Msec 2 691486538 ns/op 15.16 MB/s BenchmarkCopyUp10MiBDelay150Msec 1 1997162991 ns/op 5.25 MB/s BenchmarkMarshalInit 2000000 644 ns/op BenchmarkMarshalOpen 3000000 562 ns/op BenchmarkMarshalWriteWorstCase 20000 75166 ns/op BenchmarkMarshalWrite1k 500000 3862 ns/op ok github.com/pkg/sftp 71.174s
2015-06-03 04:03:39 +08:00
delete(c.inflight, p.id())
2015-06-15 18:12:08 +08:00
c.mu.Unlock()
ch <- result{err: err}
Increase throughput of Read/Write. Break up Read/Write calls into multiple concurrent requests to allow the roundtrip time to the server to overlap. This provides a roughly 10x throughput increase when using large buffers over a high latency link. This does not help the naive io.Copy case since io.Copy defaults to an 8k buffer. $ go test -bench=. -integration PASS BenchmarkRead1k 20 82017395 ns/op 127.85 MB/s BenchmarkRead16k 100 14634723 ns/op 716.51 MB/s BenchmarkRead32k 100 13706765 ns/op 765.02 MB/s BenchmarkRead128k 200 9614364 ns/op 1090.65 MB/s BenchmarkRead512k 200 5778457 ns/op 1814.65 MB/s BenchmarkRead1MiB 300 5624251 ns/op 1864.41 MB/s BenchmarkRead4MiB 200 5798324 ns/op 1808.43 MB/s BenchmarkRead4MiBDelay10Msec 5 214369945 ns/op 48.91 MB/s BenchmarkRead4MiBDelay50Msec 1 1014850552 ns/op 10.33 MB/s BenchmarkRead4MiBDelay150Msec 1 3016993337 ns/op 3.48 MB/s BenchmarkWrite1k 10 200740041 ns/op 52.24 MB/s BenchmarkWrite16k 50 74597799 ns/op 140.57 MB/s BenchmarkWrite32k 20 63229429 ns/op 165.84 MB/s BenchmarkWrite128k 20 78691019 ns/op 133.25 MB/s BenchmarkWrite512k 20 64372711 ns/op 162.89 MB/s BenchmarkWrite1MiB 20 95393443 ns/op 109.92 MB/s BenchmarkWrite4MiB 20 72211301 ns/op 145.21 MB/s BenchmarkWrite4MiBDelay10Msec 3 335329748 ns/op 31.27 MB/s BenchmarkWrite4MiBDelay50Msec 1 1668562466 ns/op 6.28 MB/s BenchmarkWrite4MiBDelay150Msec 1 4535944414 ns/op 2.31 MB/s BenchmarkCopyDown10MiBDelay10Msec 1 3371273197 ns/op 3.11 MB/s BenchmarkCopyDown10MiBDelay50Msec 1 16250399252 ns/op 0.65 MB/s BenchmarkCopyDown10MiBDelay150Msec 1 48459210755 ns/op 0.22 MB/s BenchmarkCopyUp10MiBDelay10Msec 1 3410202609 ns/op 3.07 MB/s BenchmarkCopyUp10MiBDelay50Msec 1 16291168491 ns/op 0.64 MB/s BenchmarkCopyUp10MiBDelay150Msec 1 48478335678 ns/op 0.22 MB/s BenchmarkMarshalInit 2000000 716 ns/op BenchmarkMarshalOpen 2000000 638 ns/op BenchmarkMarshalWriteWorstCase 20000 61127 ns/op BenchmarkMarshalWrite1k 300000 4724 ns/op ok github.com/pkg/sftp 186.575s
2015-06-03 00:37:35 +08:00
return
2013-11-06 10:04:40 +08:00
}
2015-06-15 18:12:08 +08:00
c.mu.Unlock()
2013-11-06 10:04:40 +08:00
}
// Creates the specified directory. An error will be returned if a file or
// directory with the specified path already exists, or if the directory's
// parent folder does not exist (the method cannot create complete paths).
func (c *Client) Mkdir(path string) error {
id := c.nextId()
typ, data, err := c.sendRequest(sshFxpMkdirPacket{
Id: id,
Path: path,
})
if err != nil {
return err
}
switch typ {
case ssh_FXP_STATUS:
return okOrErr(unmarshalStatus(id, data))
default:
return unimplementedPacketErr(typ)
}
}
// applyOptions applies options functions to the Client.
// If an error is encountered, option processing ceases.
func (c *Client) applyOptions(opts ...func(*Client) error) error {
for _, f := range opts {
if err := f(c); err != nil {
return err
}
}
return nil
}
2013-11-06 09:42:14 +08:00
// File represents a remote file.
type File struct {
2013-11-06 09:53:45 +08:00
c *Client
path string
handle string
offset uint64 // current offset within remote file
2013-11-06 09:42:14 +08:00
}
// Close closes the File, rendering it unusable for I/O. It returns an
// error, if any.
func (f *File) Close() error {
2013-11-06 09:53:45 +08:00
return f.c.close(f.handle)
2013-11-06 09:42:14 +08:00
}
Increase throughput of Read/Write. Break up Read/Write calls into multiple concurrent requests to allow the roundtrip time to the server to overlap. This provides a roughly 10x throughput increase when using large buffers over a high latency link. This does not help the naive io.Copy case since io.Copy defaults to an 8k buffer. $ go test -bench=. -integration PASS BenchmarkRead1k 20 82017395 ns/op 127.85 MB/s BenchmarkRead16k 100 14634723 ns/op 716.51 MB/s BenchmarkRead32k 100 13706765 ns/op 765.02 MB/s BenchmarkRead128k 200 9614364 ns/op 1090.65 MB/s BenchmarkRead512k 200 5778457 ns/op 1814.65 MB/s BenchmarkRead1MiB 300 5624251 ns/op 1864.41 MB/s BenchmarkRead4MiB 200 5798324 ns/op 1808.43 MB/s BenchmarkRead4MiBDelay10Msec 5 214369945 ns/op 48.91 MB/s BenchmarkRead4MiBDelay50Msec 1 1014850552 ns/op 10.33 MB/s BenchmarkRead4MiBDelay150Msec 1 3016993337 ns/op 3.48 MB/s BenchmarkWrite1k 10 200740041 ns/op 52.24 MB/s BenchmarkWrite16k 50 74597799 ns/op 140.57 MB/s BenchmarkWrite32k 20 63229429 ns/op 165.84 MB/s BenchmarkWrite128k 20 78691019 ns/op 133.25 MB/s BenchmarkWrite512k 20 64372711 ns/op 162.89 MB/s BenchmarkWrite1MiB 20 95393443 ns/op 109.92 MB/s BenchmarkWrite4MiB 20 72211301 ns/op 145.21 MB/s BenchmarkWrite4MiBDelay10Msec 3 335329748 ns/op 31.27 MB/s BenchmarkWrite4MiBDelay50Msec 1 1668562466 ns/op 6.28 MB/s BenchmarkWrite4MiBDelay150Msec 1 4535944414 ns/op 2.31 MB/s BenchmarkCopyDown10MiBDelay10Msec 1 3371273197 ns/op 3.11 MB/s BenchmarkCopyDown10MiBDelay50Msec 1 16250399252 ns/op 0.65 MB/s BenchmarkCopyDown10MiBDelay150Msec 1 48459210755 ns/op 0.22 MB/s BenchmarkCopyUp10MiBDelay10Msec 1 3410202609 ns/op 3.07 MB/s BenchmarkCopyUp10MiBDelay50Msec 1 16291168491 ns/op 0.64 MB/s BenchmarkCopyUp10MiBDelay150Msec 1 48478335678 ns/op 0.22 MB/s BenchmarkMarshalInit 2000000 716 ns/op BenchmarkMarshalOpen 2000000 638 ns/op BenchmarkMarshalWriteWorstCase 20000 61127 ns/op BenchmarkMarshalWrite1k 300000 4724 ns/op ok github.com/pkg/sftp 186.575s
2015-06-03 00:37:35 +08:00
const maxConcurrentRequests = 64
2013-11-06 09:42:14 +08:00
// Read reads up to len(b) bytes from the File. It returns the number of
// bytes read and an error, if any. EOF is signaled by a zero count with
// err set to io.EOF.
func (f *File) Read(b []byte) (int, error) {
Increase throughput of Read/Write. Break up Read/Write calls into multiple concurrent requests to allow the roundtrip time to the server to overlap. This provides a roughly 10x throughput increase when using large buffers over a high latency link. This does not help the naive io.Copy case since io.Copy defaults to an 8k buffer. $ go test -bench=. -integration PASS BenchmarkRead1k 20 82017395 ns/op 127.85 MB/s BenchmarkRead16k 100 14634723 ns/op 716.51 MB/s BenchmarkRead32k 100 13706765 ns/op 765.02 MB/s BenchmarkRead128k 200 9614364 ns/op 1090.65 MB/s BenchmarkRead512k 200 5778457 ns/op 1814.65 MB/s BenchmarkRead1MiB 300 5624251 ns/op 1864.41 MB/s BenchmarkRead4MiB 200 5798324 ns/op 1808.43 MB/s BenchmarkRead4MiBDelay10Msec 5 214369945 ns/op 48.91 MB/s BenchmarkRead4MiBDelay50Msec 1 1014850552 ns/op 10.33 MB/s BenchmarkRead4MiBDelay150Msec 1 3016993337 ns/op 3.48 MB/s BenchmarkWrite1k 10 200740041 ns/op 52.24 MB/s BenchmarkWrite16k 50 74597799 ns/op 140.57 MB/s BenchmarkWrite32k 20 63229429 ns/op 165.84 MB/s BenchmarkWrite128k 20 78691019 ns/op 133.25 MB/s BenchmarkWrite512k 20 64372711 ns/op 162.89 MB/s BenchmarkWrite1MiB 20 95393443 ns/op 109.92 MB/s BenchmarkWrite4MiB 20 72211301 ns/op 145.21 MB/s BenchmarkWrite4MiBDelay10Msec 3 335329748 ns/op 31.27 MB/s BenchmarkWrite4MiBDelay50Msec 1 1668562466 ns/op 6.28 MB/s BenchmarkWrite4MiBDelay150Msec 1 4535944414 ns/op 2.31 MB/s BenchmarkCopyDown10MiBDelay10Msec 1 3371273197 ns/op 3.11 MB/s BenchmarkCopyDown10MiBDelay50Msec 1 16250399252 ns/op 0.65 MB/s BenchmarkCopyDown10MiBDelay150Msec 1 48459210755 ns/op 0.22 MB/s BenchmarkCopyUp10MiBDelay10Msec 1 3410202609 ns/op 3.07 MB/s BenchmarkCopyUp10MiBDelay50Msec 1 16291168491 ns/op 0.64 MB/s BenchmarkCopyUp10MiBDelay150Msec 1 48478335678 ns/op 0.22 MB/s BenchmarkMarshalInit 2000000 716 ns/op BenchmarkMarshalOpen 2000000 638 ns/op BenchmarkMarshalWriteWorstCase 20000 61127 ns/op BenchmarkMarshalWrite1k 300000 4724 ns/op ok github.com/pkg/sftp 186.575s
2015-06-03 00:37:35 +08:00
// Split the read into multiple maxPacket sized concurrent reads
// bounded by maxConcurrentRequests. This allows reads with a suitably
// large buffer to transfer data at a much faster rate due to
// overlapping round trip times.
inFlight := 0
desiredInFlight := 1
offset := f.offset
ch := make(chan result)
type inflightRead struct {
b []byte
offset uint64
}
reqs := map[uint32]inflightRead{}
type offsetErr struct {
offset uint64
err error
}
var firstErr offsetErr
sendReq := func(b []byte, offset uint64) {
reqId := f.c.nextId()
f.c.dispatchRequest(ch, sshFxpReadPacket{
Id: reqId,
Handle: f.handle,
Offset: offset,
Len: uint32(len(b)),
})
inFlight++
reqs[reqId] = inflightRead{b: b, offset: offset}
}
var read int
Increase throughput of Read/Write. Break up Read/Write calls into multiple concurrent requests to allow the roundtrip time to the server to overlap. This provides a roughly 10x throughput increase when using large buffers over a high latency link. This does not help the naive io.Copy case since io.Copy defaults to an 8k buffer. $ go test -bench=. -integration PASS BenchmarkRead1k 20 82017395 ns/op 127.85 MB/s BenchmarkRead16k 100 14634723 ns/op 716.51 MB/s BenchmarkRead32k 100 13706765 ns/op 765.02 MB/s BenchmarkRead128k 200 9614364 ns/op 1090.65 MB/s BenchmarkRead512k 200 5778457 ns/op 1814.65 MB/s BenchmarkRead1MiB 300 5624251 ns/op 1864.41 MB/s BenchmarkRead4MiB 200 5798324 ns/op 1808.43 MB/s BenchmarkRead4MiBDelay10Msec 5 214369945 ns/op 48.91 MB/s BenchmarkRead4MiBDelay50Msec 1 1014850552 ns/op 10.33 MB/s BenchmarkRead4MiBDelay150Msec 1 3016993337 ns/op 3.48 MB/s BenchmarkWrite1k 10 200740041 ns/op 52.24 MB/s BenchmarkWrite16k 50 74597799 ns/op 140.57 MB/s BenchmarkWrite32k 20 63229429 ns/op 165.84 MB/s BenchmarkWrite128k 20 78691019 ns/op 133.25 MB/s BenchmarkWrite512k 20 64372711 ns/op 162.89 MB/s BenchmarkWrite1MiB 20 95393443 ns/op 109.92 MB/s BenchmarkWrite4MiB 20 72211301 ns/op 145.21 MB/s BenchmarkWrite4MiBDelay10Msec 3 335329748 ns/op 31.27 MB/s BenchmarkWrite4MiBDelay50Msec 1 1668562466 ns/op 6.28 MB/s BenchmarkWrite4MiBDelay150Msec 1 4535944414 ns/op 2.31 MB/s BenchmarkCopyDown10MiBDelay10Msec 1 3371273197 ns/op 3.11 MB/s BenchmarkCopyDown10MiBDelay50Msec 1 16250399252 ns/op 0.65 MB/s BenchmarkCopyDown10MiBDelay150Msec 1 48459210755 ns/op 0.22 MB/s BenchmarkCopyUp10MiBDelay10Msec 1 3410202609 ns/op 3.07 MB/s BenchmarkCopyUp10MiBDelay50Msec 1 16291168491 ns/op 0.64 MB/s BenchmarkCopyUp10MiBDelay150Msec 1 48478335678 ns/op 0.22 MB/s BenchmarkMarshalInit 2000000 716 ns/op BenchmarkMarshalOpen 2000000 638 ns/op BenchmarkMarshalWriteWorstCase 20000 61127 ns/op BenchmarkMarshalWrite1k 300000 4724 ns/op ok github.com/pkg/sftp 186.575s
2015-06-03 00:37:35 +08:00
for len(b) > 0 || inFlight > 0 {
for inFlight < desiredInFlight && len(b) > 0 && firstErr.err == nil {
l := min(len(b), f.c.maxPacket)
rb := b[:l]
sendReq(rb, offset)
offset += uint64(l)
b = b[l:]
}
if inFlight == 0 {
break
}
select {
case res := <-ch:
inFlight--
if res.err != nil {
firstErr = offsetErr{offset: 0, err: res.err}
break
}
reqId, data := unmarshalUint32(res.data)
req, ok := reqs[reqId]
if !ok {
firstErr = offsetErr{offset: 0, err: fmt.Errorf("sid: %v not found", reqId)}
break
}
delete(reqs, reqId)
switch res.typ {
case ssh_FXP_STATUS:
if firstErr.err == nil || req.offset < firstErr.offset {
firstErr = offsetErr{offset: req.offset, err: eofOrErr(unmarshalStatus(reqId, res.data))}
break
}
case ssh_FXP_DATA:
l, data := unmarshalUint32(data)
n := copy(req.b, data[:l])
read += n
if n < len(req.b) {
sendReq(req.b[l:], req.offset+uint64(l))
}
if desiredInFlight < maxConcurrentRequests {
desiredInFlight++
}
default:
firstErr = offsetErr{offset: 0, err: unimplementedPacketErr(res.typ)}
break
}
}
}
Increase throughput of Read/Write. Break up Read/Write calls into multiple concurrent requests to allow the roundtrip time to the server to overlap. This provides a roughly 10x throughput increase when using large buffers over a high latency link. This does not help the naive io.Copy case since io.Copy defaults to an 8k buffer. $ go test -bench=. -integration PASS BenchmarkRead1k 20 82017395 ns/op 127.85 MB/s BenchmarkRead16k 100 14634723 ns/op 716.51 MB/s BenchmarkRead32k 100 13706765 ns/op 765.02 MB/s BenchmarkRead128k 200 9614364 ns/op 1090.65 MB/s BenchmarkRead512k 200 5778457 ns/op 1814.65 MB/s BenchmarkRead1MiB 300 5624251 ns/op 1864.41 MB/s BenchmarkRead4MiB 200 5798324 ns/op 1808.43 MB/s BenchmarkRead4MiBDelay10Msec 5 214369945 ns/op 48.91 MB/s BenchmarkRead4MiBDelay50Msec 1 1014850552 ns/op 10.33 MB/s BenchmarkRead4MiBDelay150Msec 1 3016993337 ns/op 3.48 MB/s BenchmarkWrite1k 10 200740041 ns/op 52.24 MB/s BenchmarkWrite16k 50 74597799 ns/op 140.57 MB/s BenchmarkWrite32k 20 63229429 ns/op 165.84 MB/s BenchmarkWrite128k 20 78691019 ns/op 133.25 MB/s BenchmarkWrite512k 20 64372711 ns/op 162.89 MB/s BenchmarkWrite1MiB 20 95393443 ns/op 109.92 MB/s BenchmarkWrite4MiB 20 72211301 ns/op 145.21 MB/s BenchmarkWrite4MiBDelay10Msec 3 335329748 ns/op 31.27 MB/s BenchmarkWrite4MiBDelay50Msec 1 1668562466 ns/op 6.28 MB/s BenchmarkWrite4MiBDelay150Msec 1 4535944414 ns/op 2.31 MB/s BenchmarkCopyDown10MiBDelay10Msec 1 3371273197 ns/op 3.11 MB/s BenchmarkCopyDown10MiBDelay50Msec 1 16250399252 ns/op 0.65 MB/s BenchmarkCopyDown10MiBDelay150Msec 1 48459210755 ns/op 0.22 MB/s BenchmarkCopyUp10MiBDelay10Msec 1 3410202609 ns/op 3.07 MB/s BenchmarkCopyUp10MiBDelay50Msec 1 16291168491 ns/op 0.64 MB/s BenchmarkCopyUp10MiBDelay150Msec 1 48478335678 ns/op 0.22 MB/s BenchmarkMarshalInit 2000000 716 ns/op BenchmarkMarshalOpen 2000000 638 ns/op BenchmarkMarshalWriteWorstCase 20000 61127 ns/op BenchmarkMarshalWrite1k 300000 4724 ns/op ok github.com/pkg/sftp 186.575s
2015-06-03 00:37:35 +08:00
// If the error is anything other than EOF, then there
// may be gaps in the data copied to the buffer so it's
// best to return 0 so the caller can't make any
// incorrect assumptions about the state of the buffer.
if firstErr.err != nil && firstErr.err != io.EOF {
read = 0
}
f.offset += uint64(read)
return read, firstErr.err
2013-11-06 09:42:14 +08:00
}
Implement WriteTo for *sftp.File Improve the naive io.Copy case by splitting up the transfer into multiple concurrent chunks similar to how large Read's are performed. This improves the throughput on the BenchmarkCopyDown tests by 15-20x. $ go test -bench=. -integration PASS BenchmarkRead1k 20 80039871 ns/op 131.01 MB/s BenchmarkRead16k 100 13109576 ns/op 799.86 MB/s BenchmarkRead32k 100 13002925 ns/op 806.42 MB/s BenchmarkRead128k 200 9189480 ns/op 1141.07 MB/s BenchmarkRead512k 300 5863892 ns/op 1788.21 MB/s BenchmarkRead1MiB 300 5350731 ns/op 1959.71 MB/s BenchmarkRead4MiB 300 5880209 ns/op 1783.25 MB/s BenchmarkRead4MiBDelay10Msec 5 211600615 ns/op 49.56 MB/s BenchmarkRead4MiBDelay50Msec 1 1014580728 ns/op 10.34 MB/s BenchmarkRead4MiBDelay150Msec 1 3015748763 ns/op 3.48 MB/s BenchmarkWrite1k 10 210602614 ns/op 49.79 MB/s BenchmarkWrite16k 30 53914210 ns/op 194.49 MB/s BenchmarkWrite32k 20 68630676 ns/op 152.79 MB/s BenchmarkWrite128k 50 70518854 ns/op 148.70 MB/s BenchmarkWrite512k 30 69846510 ns/op 150.13 MB/s BenchmarkWrite1MiB 30 70971873 ns/op 147.75 MB/s BenchmarkWrite4MiB 20 68902426 ns/op 152.18 MB/s BenchmarkWrite4MiBDelay10Msec 5 334770724 ns/op 31.32 MB/s BenchmarkWrite4MiBDelay50Msec 1 1439154435 ns/op 7.29 MB/s BenchmarkWrite4MiBDelay150Msec 1 4381710538 ns/op 2.39 MB/s BenchmarkCopyDown10MiBDelay10Msec 10 161331837 ns/op 64.99 MB/s BenchmarkCopyDown10MiBDelay50Msec 2 844679071 ns/op 12.41 MB/s BenchmarkCopyDown10MiBDelay150Msec 1 2721133400 ns/op 3.85 MB/s BenchmarkCopyUp10MiBDelay10Msec 1 3410147635 ns/op 3.07 MB/s BenchmarkCopyUp10MiBDelay50Msec 1 16310789039 ns/op 0.64 MB/s BenchmarkCopyUp10MiBDelay150Msec 1 48479031068 ns/op 0.22 MB/s BenchmarkMarshalInit 2000000 685 ns/op BenchmarkMarshalOpen 3000000 606 ns/op BenchmarkMarshalWriteWorstCase 20000 81904 ns/op BenchmarkMarshalWrite1k 300000 4646 ns/op ok github.com/pkg/sftp 128.842s
2015-06-03 01:41:36 +08:00
// WriteTo writes the file to w. The return value is the number of bytes
// written. Any error encountered during the write is also returned.
func (f *File) WriteTo(w io.Writer) (int64, error) {
fi, err := f.Stat()
if err != nil {
return 0, err
}
inFlight := 0
desiredInFlight := 1
offset := f.offset
writeOffset := offset
fileSize := uint64(fi.Size())
ch := make(chan result)
type inflightRead struct {
b []byte
offset uint64
}
reqs := map[uint32]inflightRead{}
pendingWrites := map[uint64][]byte{}
type offsetErr struct {
offset uint64
err error
}
var firstErr offsetErr
sendReq := func(b []byte, offset uint64) {
reqId := f.c.nextId()
f.c.dispatchRequest(ch, sshFxpReadPacket{
Id: reqId,
Handle: f.handle,
Offset: offset,
Len: uint32(len(b)),
})
inFlight++
reqs[reqId] = inflightRead{b: b, offset: offset}
}
var copied int64
for firstErr.err == nil || inFlight > 0 {
for inFlight < desiredInFlight && firstErr.err == nil {
b := make([]byte, f.c.maxPacket)
sendReq(b, offset)
offset += uint64(f.c.maxPacket)
if offset > fileSize {
desiredInFlight = 1
}
}
if inFlight == 0 {
break
}
select {
case res := <-ch:
inFlight--
if res.err != nil {
firstErr = offsetErr{offset: 0, err: res.err}
break
}
reqId, data := unmarshalUint32(res.data)
req, ok := reqs[reqId]
if !ok {
firstErr = offsetErr{offset: 0, err: fmt.Errorf("sid: %v not found", reqId)}
break
}
delete(reqs, reqId)
switch res.typ {
case ssh_FXP_STATUS:
if firstErr.err == nil || req.offset < firstErr.offset {
firstErr = offsetErr{offset: req.offset, err: eofOrErr(unmarshalStatus(reqId, res.data))}
break
}
case ssh_FXP_DATA:
l, data := unmarshalUint32(data)
if req.offset == writeOffset {
nbytes, err := w.Write(data)
copied += int64(nbytes)
if err != nil {
firstErr = offsetErr{offset: req.offset + uint64(nbytes), err: err}
break
}
if nbytes < int(l) {
firstErr = offsetErr{offset: req.offset + uint64(nbytes), err: io.ErrShortWrite}
break
}
switch {
case offset > fileSize:
desiredInFlight = 1
case desiredInFlight < maxConcurrentRequests:
desiredInFlight++
}
writeOffset += uint64(nbytes)
for pendingData, ok := pendingWrites[writeOffset]; ok; pendingData, ok = pendingWrites[writeOffset] {
nbytes, err := w.Write(pendingData)
if err != nil {
firstErr = offsetErr{offset: writeOffset + uint64(nbytes), err: err}
break
}
if nbytes < len(pendingData) {
firstErr = offsetErr{offset: writeOffset + uint64(nbytes), err: io.ErrShortWrite}
break
}
writeOffset += uint64(nbytes)
inFlight--
}
} else {
// Don't write the data yet because
// this response came in out of order
// and we need to wait for responses
// for earlier segments of the file.
inFlight++ // Pending writes should still be considered inFlight.
pendingWrites[req.offset] = data
}
default:
firstErr = offsetErr{offset: 0, err: unimplementedPacketErr(res.typ)}
break
}
}
}
Implement WriteTo for *sftp.File Improve the naive io.Copy case by splitting up the transfer into multiple concurrent chunks similar to how large Read's are performed. This improves the throughput on the BenchmarkCopyDown tests by 15-20x. $ go test -bench=. -integration PASS BenchmarkRead1k 20 80039871 ns/op 131.01 MB/s BenchmarkRead16k 100 13109576 ns/op 799.86 MB/s BenchmarkRead32k 100 13002925 ns/op 806.42 MB/s BenchmarkRead128k 200 9189480 ns/op 1141.07 MB/s BenchmarkRead512k 300 5863892 ns/op 1788.21 MB/s BenchmarkRead1MiB 300 5350731 ns/op 1959.71 MB/s BenchmarkRead4MiB 300 5880209 ns/op 1783.25 MB/s BenchmarkRead4MiBDelay10Msec 5 211600615 ns/op 49.56 MB/s BenchmarkRead4MiBDelay50Msec 1 1014580728 ns/op 10.34 MB/s BenchmarkRead4MiBDelay150Msec 1 3015748763 ns/op 3.48 MB/s BenchmarkWrite1k 10 210602614 ns/op 49.79 MB/s BenchmarkWrite16k 30 53914210 ns/op 194.49 MB/s BenchmarkWrite32k 20 68630676 ns/op 152.79 MB/s BenchmarkWrite128k 50 70518854 ns/op 148.70 MB/s BenchmarkWrite512k 30 69846510 ns/op 150.13 MB/s BenchmarkWrite1MiB 30 70971873 ns/op 147.75 MB/s BenchmarkWrite4MiB 20 68902426 ns/op 152.18 MB/s BenchmarkWrite4MiBDelay10Msec 5 334770724 ns/op 31.32 MB/s BenchmarkWrite4MiBDelay50Msec 1 1439154435 ns/op 7.29 MB/s BenchmarkWrite4MiBDelay150Msec 1 4381710538 ns/op 2.39 MB/s BenchmarkCopyDown10MiBDelay10Msec 10 161331837 ns/op 64.99 MB/s BenchmarkCopyDown10MiBDelay50Msec 2 844679071 ns/op 12.41 MB/s BenchmarkCopyDown10MiBDelay150Msec 1 2721133400 ns/op 3.85 MB/s BenchmarkCopyUp10MiBDelay10Msec 1 3410147635 ns/op 3.07 MB/s BenchmarkCopyUp10MiBDelay50Msec 1 16310789039 ns/op 0.64 MB/s BenchmarkCopyUp10MiBDelay150Msec 1 48479031068 ns/op 0.22 MB/s BenchmarkMarshalInit 2000000 685 ns/op BenchmarkMarshalOpen 3000000 606 ns/op BenchmarkMarshalWriteWorstCase 20000 81904 ns/op BenchmarkMarshalWrite1k 300000 4646 ns/op ok github.com/pkg/sftp 128.842s
2015-06-03 01:41:36 +08:00
if firstErr.err != io.EOF {
return copied, firstErr.err
}
return copied, nil
2013-11-06 09:42:14 +08:00
}
// Stat returns the FileInfo structure describing file. If there is an
2013-11-06 16:48:37 +08:00
// error.
2013-11-06 09:42:14 +08:00
func (f *File) Stat() (os.FileInfo, error) {
2014-06-24 14:06:55 +08:00
fs, err := f.c.fstat(f.handle)
if err != nil {
return nil, err
2013-11-06 09:53:45 +08:00
}
2014-06-24 14:06:55 +08:00
return fileInfoFromStat(fs, path.Base(f.path)), nil
2013-11-06 09:42:14 +08:00
}
2013-11-06 10:04:40 +08:00
// Write writes len(b) bytes to the File. It returns the number of bytes
// written and an error, if any. Write returns a non-nil error when n !=
// len(b).
func (f *File) Write(b []byte) (int, error) {
Increase throughput of Read/Write. Break up Read/Write calls into multiple concurrent requests to allow the roundtrip time to the server to overlap. This provides a roughly 10x throughput increase when using large buffers over a high latency link. This does not help the naive io.Copy case since io.Copy defaults to an 8k buffer. $ go test -bench=. -integration PASS BenchmarkRead1k 20 82017395 ns/op 127.85 MB/s BenchmarkRead16k 100 14634723 ns/op 716.51 MB/s BenchmarkRead32k 100 13706765 ns/op 765.02 MB/s BenchmarkRead128k 200 9614364 ns/op 1090.65 MB/s BenchmarkRead512k 200 5778457 ns/op 1814.65 MB/s BenchmarkRead1MiB 300 5624251 ns/op 1864.41 MB/s BenchmarkRead4MiB 200 5798324 ns/op 1808.43 MB/s BenchmarkRead4MiBDelay10Msec 5 214369945 ns/op 48.91 MB/s BenchmarkRead4MiBDelay50Msec 1 1014850552 ns/op 10.33 MB/s BenchmarkRead4MiBDelay150Msec 1 3016993337 ns/op 3.48 MB/s BenchmarkWrite1k 10 200740041 ns/op 52.24 MB/s BenchmarkWrite16k 50 74597799 ns/op 140.57 MB/s BenchmarkWrite32k 20 63229429 ns/op 165.84 MB/s BenchmarkWrite128k 20 78691019 ns/op 133.25 MB/s BenchmarkWrite512k 20 64372711 ns/op 162.89 MB/s BenchmarkWrite1MiB 20 95393443 ns/op 109.92 MB/s BenchmarkWrite4MiB 20 72211301 ns/op 145.21 MB/s BenchmarkWrite4MiBDelay10Msec 3 335329748 ns/op 31.27 MB/s BenchmarkWrite4MiBDelay50Msec 1 1668562466 ns/op 6.28 MB/s BenchmarkWrite4MiBDelay150Msec 1 4535944414 ns/op 2.31 MB/s BenchmarkCopyDown10MiBDelay10Msec 1 3371273197 ns/op 3.11 MB/s BenchmarkCopyDown10MiBDelay50Msec 1 16250399252 ns/op 0.65 MB/s BenchmarkCopyDown10MiBDelay150Msec 1 48459210755 ns/op 0.22 MB/s BenchmarkCopyUp10MiBDelay10Msec 1 3410202609 ns/op 3.07 MB/s BenchmarkCopyUp10MiBDelay50Msec 1 16291168491 ns/op 0.64 MB/s BenchmarkCopyUp10MiBDelay150Msec 1 48478335678 ns/op 0.22 MB/s BenchmarkMarshalInit 2000000 716 ns/op BenchmarkMarshalOpen 2000000 638 ns/op BenchmarkMarshalWriteWorstCase 20000 61127 ns/op BenchmarkMarshalWrite1k 300000 4724 ns/op ok github.com/pkg/sftp 186.575s
2015-06-03 00:37:35 +08:00
// Split the write into multiple maxPacket sized concurrent writes
// bounded by maxConcurrentRequests. This allows writes with a suitably
// large buffer to transfer data at a much faster rate due to
// overlapping round trip times.
inFlight := 0
desiredInFlight := 1
offset := f.offset
ch := make(chan result)
var firstErr error
written := len(b)
for len(b) > 0 || inFlight > 0 {
for inFlight < desiredInFlight && len(b) > 0 && firstErr == nil {
l := min(len(b), f.c.maxPacket)
rb := b[:l]
f.c.dispatchRequest(ch, sshFxpWritePacket{
Id: f.c.nextId(),
Handle: f.handle,
Offset: offset,
Length: uint32(len(rb)),
Data: rb,
})
inFlight++
offset += uint64(l)
b = b[l:]
2013-11-08 18:24:50 +08:00
}
Increase throughput of Read/Write. Break up Read/Write calls into multiple concurrent requests to allow the roundtrip time to the server to overlap. This provides a roughly 10x throughput increase when using large buffers over a high latency link. This does not help the naive io.Copy case since io.Copy defaults to an 8k buffer. $ go test -bench=. -integration PASS BenchmarkRead1k 20 82017395 ns/op 127.85 MB/s BenchmarkRead16k 100 14634723 ns/op 716.51 MB/s BenchmarkRead32k 100 13706765 ns/op 765.02 MB/s BenchmarkRead128k 200 9614364 ns/op 1090.65 MB/s BenchmarkRead512k 200 5778457 ns/op 1814.65 MB/s BenchmarkRead1MiB 300 5624251 ns/op 1864.41 MB/s BenchmarkRead4MiB 200 5798324 ns/op 1808.43 MB/s BenchmarkRead4MiBDelay10Msec 5 214369945 ns/op 48.91 MB/s BenchmarkRead4MiBDelay50Msec 1 1014850552 ns/op 10.33 MB/s BenchmarkRead4MiBDelay150Msec 1 3016993337 ns/op 3.48 MB/s BenchmarkWrite1k 10 200740041 ns/op 52.24 MB/s BenchmarkWrite16k 50 74597799 ns/op 140.57 MB/s BenchmarkWrite32k 20 63229429 ns/op 165.84 MB/s BenchmarkWrite128k 20 78691019 ns/op 133.25 MB/s BenchmarkWrite512k 20 64372711 ns/op 162.89 MB/s BenchmarkWrite1MiB 20 95393443 ns/op 109.92 MB/s BenchmarkWrite4MiB 20 72211301 ns/op 145.21 MB/s BenchmarkWrite4MiBDelay10Msec 3 335329748 ns/op 31.27 MB/s BenchmarkWrite4MiBDelay50Msec 1 1668562466 ns/op 6.28 MB/s BenchmarkWrite4MiBDelay150Msec 1 4535944414 ns/op 2.31 MB/s BenchmarkCopyDown10MiBDelay10Msec 1 3371273197 ns/op 3.11 MB/s BenchmarkCopyDown10MiBDelay50Msec 1 16250399252 ns/op 0.65 MB/s BenchmarkCopyDown10MiBDelay150Msec 1 48459210755 ns/op 0.22 MB/s BenchmarkCopyUp10MiBDelay10Msec 1 3410202609 ns/op 3.07 MB/s BenchmarkCopyUp10MiBDelay50Msec 1 16291168491 ns/op 0.64 MB/s BenchmarkCopyUp10MiBDelay150Msec 1 48478335678 ns/op 0.22 MB/s BenchmarkMarshalInit 2000000 716 ns/op BenchmarkMarshalOpen 2000000 638 ns/op BenchmarkMarshalWriteWorstCase 20000 61127 ns/op BenchmarkMarshalWrite1k 300000 4724 ns/op ok github.com/pkg/sftp 186.575s
2015-06-03 00:37:35 +08:00
if inFlight == 0 {
break
}
select {
case res := <-ch:
inFlight--
if res.err != nil {
firstErr = res.err
break
}
switch res.typ {
case ssh_FXP_STATUS:
id, _ := unmarshalUint32(res.data)
err := okOrErr(unmarshalStatus(id, res.data))
if err != nil && firstErr == nil {
firstErr = err
break
}
if desiredInFlight < maxConcurrentRequests {
desiredInFlight++
}
default:
firstErr = unimplementedPacketErr(res.typ)
break
}
2013-11-08 18:24:50 +08:00
}
}
Increase throughput of Read/Write. Break up Read/Write calls into multiple concurrent requests to allow the roundtrip time to the server to overlap. This provides a roughly 10x throughput increase when using large buffers over a high latency link. This does not help the naive io.Copy case since io.Copy defaults to an 8k buffer. $ go test -bench=. -integration PASS BenchmarkRead1k 20 82017395 ns/op 127.85 MB/s BenchmarkRead16k 100 14634723 ns/op 716.51 MB/s BenchmarkRead32k 100 13706765 ns/op 765.02 MB/s BenchmarkRead128k 200 9614364 ns/op 1090.65 MB/s BenchmarkRead512k 200 5778457 ns/op 1814.65 MB/s BenchmarkRead1MiB 300 5624251 ns/op 1864.41 MB/s BenchmarkRead4MiB 200 5798324 ns/op 1808.43 MB/s BenchmarkRead4MiBDelay10Msec 5 214369945 ns/op 48.91 MB/s BenchmarkRead4MiBDelay50Msec 1 1014850552 ns/op 10.33 MB/s BenchmarkRead4MiBDelay150Msec 1 3016993337 ns/op 3.48 MB/s BenchmarkWrite1k 10 200740041 ns/op 52.24 MB/s BenchmarkWrite16k 50 74597799 ns/op 140.57 MB/s BenchmarkWrite32k 20 63229429 ns/op 165.84 MB/s BenchmarkWrite128k 20 78691019 ns/op 133.25 MB/s BenchmarkWrite512k 20 64372711 ns/op 162.89 MB/s BenchmarkWrite1MiB 20 95393443 ns/op 109.92 MB/s BenchmarkWrite4MiB 20 72211301 ns/op 145.21 MB/s BenchmarkWrite4MiBDelay10Msec 3 335329748 ns/op 31.27 MB/s BenchmarkWrite4MiBDelay50Msec 1 1668562466 ns/op 6.28 MB/s BenchmarkWrite4MiBDelay150Msec 1 4535944414 ns/op 2.31 MB/s BenchmarkCopyDown10MiBDelay10Msec 1 3371273197 ns/op 3.11 MB/s BenchmarkCopyDown10MiBDelay50Msec 1 16250399252 ns/op 0.65 MB/s BenchmarkCopyDown10MiBDelay150Msec 1 48459210755 ns/op 0.22 MB/s BenchmarkCopyUp10MiBDelay10Msec 1 3410202609 ns/op 3.07 MB/s BenchmarkCopyUp10MiBDelay50Msec 1 16291168491 ns/op 0.64 MB/s BenchmarkCopyUp10MiBDelay150Msec 1 48478335678 ns/op 0.22 MB/s BenchmarkMarshalInit 2000000 716 ns/op BenchmarkMarshalOpen 2000000 638 ns/op BenchmarkMarshalWriteWorstCase 20000 61127 ns/op BenchmarkMarshalWrite1k 300000 4724 ns/op ok github.com/pkg/sftp 186.575s
2015-06-03 00:37:35 +08:00
// If error is non-nil, then there may be gaps in the data written to
// the file so it's best to return 0 so the caller can't make any
// incorrect assumptions about the state of the file.
if firstErr != nil {
written = 0
2013-11-08 18:24:50 +08:00
}
Increase throughput of Read/Write. Break up Read/Write calls into multiple concurrent requests to allow the roundtrip time to the server to overlap. This provides a roughly 10x throughput increase when using large buffers over a high latency link. This does not help the naive io.Copy case since io.Copy defaults to an 8k buffer. $ go test -bench=. -integration PASS BenchmarkRead1k 20 82017395 ns/op 127.85 MB/s BenchmarkRead16k 100 14634723 ns/op 716.51 MB/s BenchmarkRead32k 100 13706765 ns/op 765.02 MB/s BenchmarkRead128k 200 9614364 ns/op 1090.65 MB/s BenchmarkRead512k 200 5778457 ns/op 1814.65 MB/s BenchmarkRead1MiB 300 5624251 ns/op 1864.41 MB/s BenchmarkRead4MiB 200 5798324 ns/op 1808.43 MB/s BenchmarkRead4MiBDelay10Msec 5 214369945 ns/op 48.91 MB/s BenchmarkRead4MiBDelay50Msec 1 1014850552 ns/op 10.33 MB/s BenchmarkRead4MiBDelay150Msec 1 3016993337 ns/op 3.48 MB/s BenchmarkWrite1k 10 200740041 ns/op 52.24 MB/s BenchmarkWrite16k 50 74597799 ns/op 140.57 MB/s BenchmarkWrite32k 20 63229429 ns/op 165.84 MB/s BenchmarkWrite128k 20 78691019 ns/op 133.25 MB/s BenchmarkWrite512k 20 64372711 ns/op 162.89 MB/s BenchmarkWrite1MiB 20 95393443 ns/op 109.92 MB/s BenchmarkWrite4MiB 20 72211301 ns/op 145.21 MB/s BenchmarkWrite4MiBDelay10Msec 3 335329748 ns/op 31.27 MB/s BenchmarkWrite4MiBDelay50Msec 1 1668562466 ns/op 6.28 MB/s BenchmarkWrite4MiBDelay150Msec 1 4535944414 ns/op 2.31 MB/s BenchmarkCopyDown10MiBDelay10Msec 1 3371273197 ns/op 3.11 MB/s BenchmarkCopyDown10MiBDelay50Msec 1 16250399252 ns/op 0.65 MB/s BenchmarkCopyDown10MiBDelay150Msec 1 48459210755 ns/op 0.22 MB/s BenchmarkCopyUp10MiBDelay10Msec 1 3410202609 ns/op 3.07 MB/s BenchmarkCopyUp10MiBDelay50Msec 1 16291168491 ns/op 0.64 MB/s BenchmarkCopyUp10MiBDelay150Msec 1 48478335678 ns/op 0.22 MB/s BenchmarkMarshalInit 2000000 716 ns/op BenchmarkMarshalOpen 2000000 638 ns/op BenchmarkMarshalWriteWorstCase 20000 61127 ns/op BenchmarkMarshalWrite1k 300000 4724 ns/op ok github.com/pkg/sftp 186.575s
2015-06-03 00:37:35 +08:00
f.offset += uint64(written)
return written, firstErr
2013-11-08 18:24:50 +08:00
}
Implement ReadFrom for *sftp.File Improve the naive io.Copy case by splitting up the transfer into multiple concurrent chunks similar to how large Write's are performed. This improves the throughput on the BenchmarkCopyUp tests by 15-20x. $ go test -bench=. -integration PASS BenchmarkRead1k 20 78382052 ns/op 133.78 MB/s BenchmarkRead16k 100 14038681 ns/op 746.93 MB/s BenchmarkRead32k 100 12076514 ns/op 868.29 MB/s BenchmarkRead128k 200 8892708 ns/op 1179.16 MB/s BenchmarkRead512k 300 5937224 ns/op 1766.13 MB/s BenchmarkRead1MiB 300 5383775 ns/op 1947.68 MB/s BenchmarkRead4MiB 300 5896306 ns/op 1778.38 MB/s BenchmarkRead4MiBDelay10Msec 5 213987487 ns/op 49.00 MB/s BenchmarkRead4MiBDelay50Msec 1 1013717329 ns/op 10.34 MB/s BenchmarkRead4MiBDelay150Msec 1 3012666692 ns/op 3.48 MB/s BenchmarkWrite1k 10 189878293 ns/op 55.22 MB/s BenchmarkWrite16k 50 57726712 ns/op 181.65 MB/s BenchmarkWrite32k 30 79804300 ns/op 131.39 MB/s BenchmarkWrite128k 20 71296126 ns/op 147.08 MB/s BenchmarkWrite512k 20 101823875 ns/op 102.98 MB/s BenchmarkWrite1MiB 50 70351842 ns/op 149.05 MB/s BenchmarkWrite4MiB 20 70187426 ns/op 149.40 MB/s BenchmarkWrite4MiBDelay10Msec 5 333251686 ns/op 31.47 MB/s BenchmarkWrite4MiBDelay50Msec 1 1576708254 ns/op 6.65 MB/s BenchmarkWrite4MiBDelay150Msec 1 4823796059 ns/op 2.17 MB/s BenchmarkCopyDown10MiBDelay10Msec 10 196175368 ns/op 53.45 MB/s BenchmarkCopyDown10MiBDelay50Msec 2 918624682 ns/op 11.41 MB/s BenchmarkCopyDown10MiBDelay150Msec 1 2880111274 ns/op 3.64 MB/s BenchmarkCopyUp10MiBDelay10Msec 5 246048181 ns/op 42.62 MB/s BenchmarkCopyUp10MiBDelay50Msec 2 872059111 ns/op 12.02 MB/s BenchmarkCopyUp10MiBDelay150Msec 1 2516801139 ns/op 4.17 MB/s BenchmarkMarshalInit 2000000 690 ns/op BenchmarkMarshalOpen 3000000 579 ns/op BenchmarkMarshalWriteWorstCase 20000 60438 ns/op BenchmarkMarshalWrite1k 300000 4318 ns/op ok github.com/pkg/sftp 70.210s
2015-06-03 02:15:54 +08:00
// ReadFrom reads data from r until EOF and writes it to the file. The return
// value is the number of bytes read. Any error except io.EOF encountered
// during the read is also returned.
func (f *File) ReadFrom(r io.Reader) (int64, error) {
inFlight := 0
desiredInFlight := 1
offset := f.offset
ch := make(chan result)
var firstErr error
read := int64(0)
b := make([]byte, f.c.maxPacket)
for inFlight > 0 || firstErr == nil {
for inFlight < desiredInFlight && firstErr == nil {
n, err := r.Read(b)
if err != nil {
firstErr = err
}
f.c.dispatchRequest(ch, sshFxpWritePacket{
Id: f.c.nextId(),
Handle: f.handle,
Offset: offset,
Length: uint32(n),
Data: b[:n],
})
inFlight++
offset += uint64(n)
read += int64(n)
}
if inFlight == 0 {
break
}
select {
case res := <-ch:
inFlight--
if res.err != nil {
firstErr = res.err
break
}
switch res.typ {
case ssh_FXP_STATUS:
id, _ := unmarshalUint32(res.data)
err := okOrErr(unmarshalStatus(id, res.data))
if err != nil && firstErr == nil {
firstErr = err
break
}
if desiredInFlight < maxConcurrentRequests {
desiredInFlight++
}
default:
firstErr = unimplementedPacketErr(res.typ)
break
}
}
}
if firstErr == io.EOF {
firstErr = nil
}
// If error is non-nil, then there may be gaps in the data written to
// the file so it's best to return 0 so the caller can't make any
// incorrect assumptions about the state of the file.
if firstErr != nil {
read = 0
}
f.offset += uint64(read)
return read, firstErr
2013-11-08 18:24:50 +08:00
}
// Seek implements io.Seeker by setting the client offset for the next Read or
// Write. It returns the next offset read. Seeking before or after the end of
// the file is undefined. Seeking relative to the end calls Stat.
func (f *File) Seek(offset int64, whence int) (int64, error) {
switch whence {
case os.SEEK_SET:
f.offset = uint64(offset)
case os.SEEK_CUR:
f.offset = uint64(int64(f.offset) + offset)
case os.SEEK_END:
fi, err := f.Stat()
if err != nil {
return int64(f.offset), err
}
f.offset = uint64(fi.Size() + offset)
default:
return int64(f.offset), unimplementedSeekWhence(whence)
}
return int64(f.offset), nil
}
// Chown changes the uid/gid of the current file.
func (f *File) Chown(uid, gid int) error {
return f.c.Chown(f.path, uid, gid)
}
// Chmod changes the permissions of the current file.
func (f *File) Chmod(mode os.FileMode) error {
return f.c.Chmod(f.path, mode)
}
// Truncate sets the size of the current file. Although it may be safely assumed
// that if the size is less than its current size it will be truncated to fit,
// the SFTP protocol does not specify what behavior the server should do when setting
// size greater than the current size.
func (f *File) Truncate(size int64) error {
return f.c.Truncate(f.path, size)
}
2013-11-08 18:24:50 +08:00
func min(a, b int) int {
if a > b {
return b
}
return a
2013-11-06 10:04:40 +08:00
}
2013-11-06 11:08:26 +08:00
// okOrErr returns nil if Err.Code is SSH_FX_OK, otherwise it returns the error.
2013-11-06 11:15:26 +08:00
func okOrErr(err error) error {
2013-11-06 16:10:28 +08:00
if err, ok := err.(*StatusError); ok && err.Code == ssh_FX_OK {
2013-11-06 11:08:26 +08:00
return nil
}
return err
}
2013-11-06 11:15:26 +08:00
2013-11-06 12:00:04 +08:00
func eofOrErr(err error) error {
2013-11-06 16:10:28 +08:00
if err, ok := err.(*StatusError); ok && err.Code == ssh_FX_EOF {
2013-11-06 12:00:04 +08:00
return io.EOF
}
return err
}
2013-11-06 11:15:26 +08:00
func unmarshalStatus(id uint32, data []byte) error {
sid, data := unmarshalUint32(data)
if sid != id {
return &unexpectedIdErr{id, sid}
}
code, data := unmarshalUint32(data)
msg, data := unmarshalString(data)
lang, _ := unmarshalString(data)
return &StatusError{
Code: code,
msg: msg,
lang: lang,
}
}
2013-11-14 12:32:21 +08:00
2015-07-25 16:19:29 +08:00
func marshalStatus(b []byte, err StatusError) []byte {
b = marshalUint32(b, err.Code)
b = marshalString(b, err.msg)
b = marshalString(b, err.lang)
return b
}
2013-11-14 12:32:21 +08:00
// flags converts the flags passed to OpenFile into ssh flags.
// Unsupported flags are ignored.
func flags(f int) uint32 {
var out uint32
switch f & os.O_WRONLY {
case os.O_WRONLY:
out |= ssh_FXF_WRITE
case os.O_RDONLY:
out |= ssh_FXF_READ
}
if f&os.O_RDWR == os.O_RDWR {
out |= ssh_FXF_READ | ssh_FXF_WRITE
}
if f&os.O_APPEND == os.O_APPEND {
out |= ssh_FXF_APPEND
}
if f&os.O_CREATE == os.O_CREATE {
out |= ssh_FXF_CREAT
}
if f&os.O_TRUNC == os.O_TRUNC {
out |= ssh_FXF_TRUNC
}
if f&os.O_EXCL == os.O_EXCL {
out |= ssh_FXF_EXCL
}
return out
}