mirror of https://github.com/pkg/sftp.git
simpify Request struct and Method updating
In #192 we normalized all Request use to have pointer receivers. This simplified reasoning and allowed for a simpler Request struct. This change completes that work by eliminating the need for the state/stackLock pointers and also removes the need for a new Request each packet, reducing the pressure on the GC. The main bit is that the RequestServer.openRequests[] map now stores Request pointers and it only creates new Requests (replacing the mapped entry) when the Method changes. It never updates/modifies an existing Request and the packet to Method mapping is now all in place place (requestMethod).
This commit is contained in:
parent
9e9438a691
commit
8722020a70
|
@ -29,7 +29,7 @@ type RequestServer struct {
|
|||
*serverConn
|
||||
Handlers Handlers
|
||||
pktMgr *packetManager
|
||||
openRequests map[string]Request
|
||||
openRequests map[string]*Request
|
||||
openRequestLock sync.RWMutex
|
||||
handleCount int
|
||||
}
|
||||
|
@ -47,26 +47,44 @@ func NewRequestServer(rwc io.ReadWriteCloser, h Handlers) *RequestServer {
|
|||
serverConn: svrConn,
|
||||
Handlers: h,
|
||||
pktMgr: newPktMgr(svrConn),
|
||||
openRequests: make(map[string]Request),
|
||||
openRequests: make(map[string]*Request),
|
||||
}
|
||||
}
|
||||
|
||||
// Note that we are explicitly saving the Request as a value.
|
||||
// New Open packet/Request
|
||||
func (rs *RequestServer) nextRequest(r *Request) string {
|
||||
rs.openRequestLock.Lock()
|
||||
defer rs.openRequestLock.Unlock()
|
||||
rs.handleCount++
|
||||
handle := strconv.Itoa(rs.handleCount)
|
||||
rs.openRequests[handle] = *r
|
||||
rs.openRequests[handle] = r
|
||||
return handle
|
||||
}
|
||||
|
||||
// Returns pointer to new copy of Request object
|
||||
func (rs *RequestServer) getRequest(handle string) (*Request, bool) {
|
||||
// Returns Request from openRequests, bool is false if it is missing
|
||||
// If the method is different, save/return a new Request w/ that Method.
|
||||
//
|
||||
// The Requests in openRequests work essentially as open file descriptors that
|
||||
// you can do different things with. What you are doing with it are denoted by
|
||||
// the first packet of that type (read/write/etc). We create a new Request when
|
||||
// it changes to set the request.Method attribute in a thread safe way.
|
||||
func (rs *RequestServer) getRequest(handle, method string) (*Request, bool) {
|
||||
rs.openRequestLock.RLock()
|
||||
defer rs.openRequestLock.RUnlock()
|
||||
r, ok := rs.openRequests[handle]
|
||||
return &r, ok
|
||||
rs.openRequestLock.RUnlock()
|
||||
if !ok || r.Method == method {
|
||||
return r, ok
|
||||
}
|
||||
// if we make it here we need to replace the request
|
||||
rs.openRequestLock.Lock()
|
||||
defer rs.openRequestLock.Unlock()
|
||||
r, ok = rs.openRequests[handle]
|
||||
if !ok || r.Method == method { // re-check needed b/c lock race
|
||||
return r, ok
|
||||
}
|
||||
r = &Request{Method: method, Filepath: r.Filepath, state: r.state}
|
||||
rs.openRequests[handle] = r
|
||||
return r, ok
|
||||
}
|
||||
|
||||
func (rs *RequestServer) closeRequest(handle string) error {
|
||||
|
@ -138,7 +156,7 @@ func (rs *RequestServer) packetWorker(pktChan chan requestPacket) error {
|
|||
rpkt = sshFxpHandlePacket{pkt.id(), handle}
|
||||
case *sshFxpFstatPacket:
|
||||
handle := pkt.getHandle()
|
||||
request, ok := rs.getRequest(handle)
|
||||
request, ok := rs.getRequest(handle, requestMethod(pkt))
|
||||
if !ok {
|
||||
rpkt = statusFromError(pkt, syscall.EBADF)
|
||||
} else {
|
||||
|
@ -148,7 +166,7 @@ func (rs *RequestServer) packetWorker(pktChan chan requestPacket) error {
|
|||
}
|
||||
case *sshFxpFsetstatPacket:
|
||||
handle := pkt.getHandle()
|
||||
request, ok := rs.getRequest(handle)
|
||||
request, ok := rs.getRequest(handle, requestMethod(pkt))
|
||||
if !ok {
|
||||
rpkt = statusFromError(pkt, syscall.EBADF)
|
||||
} else {
|
||||
|
@ -160,12 +178,8 @@ func (rs *RequestServer) packetWorker(pktChan chan requestPacket) error {
|
|||
}
|
||||
case hasHandle:
|
||||
handle := pkt.getHandle()
|
||||
request, ok := rs.getRequest(handle)
|
||||
uerr := request.updateMethod(pkt)
|
||||
if !ok || uerr != nil {
|
||||
if uerr == nil {
|
||||
uerr = syscall.EBADF
|
||||
}
|
||||
request, ok := rs.getRequest(handle, requestMethod(pkt))
|
||||
if !ok {
|
||||
rpkt = statusFromError(pkt, syscall.EBADF)
|
||||
} else {
|
||||
rpkt = request.call(rs.Handlers, pkt)
|
||||
|
|
|
@ -82,10 +82,10 @@ func TestRequestCache(t *testing.T) {
|
|||
fh := p.svr.nextRequest(foo)
|
||||
bh := p.svr.nextRequest(bar)
|
||||
assert.Len(t, p.svr.openRequests, 2)
|
||||
_foo, ok := p.svr.getRequest(fh)
|
||||
_foo, ok := p.svr.getRequest(fh, "")
|
||||
assert.Equal(t, foo, _foo)
|
||||
assert.True(t, ok)
|
||||
_, ok = p.svr.getRequest("zed")
|
||||
_, ok = p.svr.getRequest("zed", "")
|
||||
assert.False(t, ok)
|
||||
p.svr.closeRequest(fh)
|
||||
p.svr.closeRequest(bh)
|
||||
|
|
40
request.go
40
request.go
|
@ -24,8 +24,8 @@ type Request struct {
|
|||
Attrs []byte // convert to sub-struct
|
||||
Target string // for renames and sym-links
|
||||
// reader/writer/readdir from handlers
|
||||
stateLock *sync.RWMutex
|
||||
state *state
|
||||
stateLock sync.RWMutex
|
||||
state state
|
||||
}
|
||||
|
||||
type state struct {
|
||||
|
@ -62,16 +62,9 @@ func requestFromPacket(pkt hasPath) *Request {
|
|||
return request
|
||||
}
|
||||
|
||||
func newRequest() *Request {
|
||||
return &Request{state: &state{}, stateLock: &sync.RWMutex{}}
|
||||
}
|
||||
|
||||
// NewRequest creates a new Request object.
|
||||
func NewRequest(method, path string) *Request {
|
||||
request := newRequest()
|
||||
request.Method = method
|
||||
request.Filepath = cleanPath(path)
|
||||
return request
|
||||
return &Request{Method: method, Filepath: cleanPath(path)}
|
||||
}
|
||||
|
||||
// Returns current offset for file list
|
||||
|
@ -304,27 +297,18 @@ func filelist(h FileLister, r *Request, pd packet_data) responsePacket {
|
|||
}
|
||||
}
|
||||
|
||||
// file data for additional read/write packets
|
||||
func (r *Request) updateMethod(p hasHandle) error {
|
||||
switch p := p.(type) {
|
||||
case *sshFxpReadPacket:
|
||||
r.Method = "Get"
|
||||
case *sshFxpWritePacket:
|
||||
r.Method = "Put"
|
||||
case *sshFxpReaddirPacket:
|
||||
r.Method = "List"
|
||||
default:
|
||||
return errors.Errorf("unexpected packet type %T", p)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// init attributes of request object from packet data
|
||||
func requestMethod(p hasPath) (method string) {
|
||||
func requestMethod(p requestPacket) (method string) {
|
||||
switch p.(type) {
|
||||
case *sshFxpReadPacket:
|
||||
method = "Get"
|
||||
case *sshFxpWritePacket:
|
||||
method = "Put"
|
||||
case *sshFxpReaddirPacket:
|
||||
method = "List"
|
||||
case *sshFxpOpenPacket, *sshFxpOpendirPacket:
|
||||
method = "Open"
|
||||
case *sshFxpSetstatPacket:
|
||||
case *sshFxpSetstatPacket, *sshFxpFsetstatPacket:
|
||||
method = "Setstat"
|
||||
case *sshFxpRenamePacket:
|
||||
method = "Rename"
|
||||
|
@ -332,7 +316,7 @@ func requestMethod(p hasPath) (method string) {
|
|||
method = "Symlink"
|
||||
case *sshFxpRemovePacket:
|
||||
method = "Remove"
|
||||
case *sshFxpStatPacket, *sshFxpLstatPacket:
|
||||
case *sshFxpStatPacket, *sshFxpLstatPacket, *sshFxpFstatPacket:
|
||||
method = "Stat"
|
||||
case *sshFxpRmdirPacket:
|
||||
method = "Rmdir"
|
||||
|
|
|
@ -1,8 +1,6 @@
|
|||
package sftp
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"bytes"
|
||||
|
@ -62,8 +60,6 @@ func testRequest(method string) *Request {
|
|||
Method: method,
|
||||
Attrs: []byte("foo"),
|
||||
Target: "foo",
|
||||
state: &state{},
|
||||
stateLock: &sync.RWMutex{},
|
||||
}
|
||||
return request
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue