mirror of https://github.com/pkg/sftp.git
Add WithMaxTxPacket server option
Add the WithMaxTxPacket and WithRSMaxTxPacket server options to increase the maximum tx packet size to a value above 32K. This allows to send bigger chunks of data to the client as response to a read request. As the client specifies the wanted length, it should be safe to increase the server maximum value. This in particular allows the implemented Client with the MaxPacketUnchecked option to retrieve data in larger chunks. Signed-off-by: Peter Verraedt <peter@verraedt.be>
This commit is contained in:
parent
06342e8b90
commit
c1f47ba1b9
|
@ -823,7 +823,7 @@ func (p *sshFxpReadPacket) UnmarshalBinary(b []byte) error {
|
||||||
// So, we need: uint32(length) + byte(type) + uint32(id) + uint32(data_length)
|
// So, we need: uint32(length) + byte(type) + uint32(id) + uint32(data_length)
|
||||||
const dataHeaderLen = 4 + 1 + 4 + 4
|
const dataHeaderLen = 4 + 1 + 4 + 4
|
||||||
|
|
||||||
func (p *sshFxpReadPacket) getDataSlice(alloc *allocator, orderID uint32) []byte {
|
func (p *sshFxpReadPacket) getDataSlice(alloc *allocator, orderID uint32, maxTxPacket uint32) []byte {
|
||||||
dataLen := p.Len
|
dataLen := p.Len
|
||||||
if dataLen > maxTxPacket {
|
if dataLen > maxTxPacket {
|
||||||
dataLen = maxTxPacket
|
dataLen = maxTxPacket
|
||||||
|
|
|
@ -10,7 +10,7 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
var maxTxPacket uint32 = 1 << 15
|
const defaultMaxTxPacket uint32 = 1 << 15
|
||||||
|
|
||||||
// Handlers contains the 4 SFTP server request handlers.
|
// Handlers contains the 4 SFTP server request handlers.
|
||||||
type Handlers struct {
|
type Handlers struct {
|
||||||
|
@ -28,6 +28,7 @@ type RequestServer struct {
|
||||||
pktMgr *packetManager
|
pktMgr *packetManager
|
||||||
|
|
||||||
startDirectory string
|
startDirectory string
|
||||||
|
maxTxPacket uint32
|
||||||
|
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
handleCount int
|
handleCount int
|
||||||
|
@ -57,6 +58,22 @@ func WithStartDirectory(startDirectory string) RequestServerOption {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithRSMaxTxPacket sets the maximum size of the payload returned to the client,
|
||||||
|
// measured in bytes. The default value is 32768 bytes, and this option
|
||||||
|
// can only be used to increase it. Setting this option to a larger value
|
||||||
|
// should be safe, because the client decides the size of the requested payload.
|
||||||
|
//
|
||||||
|
// The default maximum packet size is 32768 bytes.
|
||||||
|
func WithRSMaxTxPacket(size uint32) RequestServerOption {
|
||||||
|
return func(rs *RequestServer) {
|
||||||
|
if size < defaultMaxTxPacket {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
rs.maxTxPacket = size
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// NewRequestServer creates/allocates/returns new RequestServer.
|
// NewRequestServer creates/allocates/returns new RequestServer.
|
||||||
// Normally there will be one server per user-session.
|
// Normally there will be one server per user-session.
|
||||||
func NewRequestServer(rwc io.ReadWriteCloser, h Handlers, options ...RequestServerOption) *RequestServer {
|
func NewRequestServer(rwc io.ReadWriteCloser, h Handlers, options ...RequestServerOption) *RequestServer {
|
||||||
|
@ -73,6 +90,7 @@ func NewRequestServer(rwc io.ReadWriteCloser, h Handlers, options ...RequestServ
|
||||||
pktMgr: newPktMgr(svrConn),
|
pktMgr: newPktMgr(svrConn),
|
||||||
|
|
||||||
startDirectory: "/",
|
startDirectory: "/",
|
||||||
|
maxTxPacket: defaultMaxTxPacket,
|
||||||
|
|
||||||
openRequests: make(map[string]*Request),
|
openRequests: make(map[string]*Request),
|
||||||
}
|
}
|
||||||
|
@ -260,7 +278,7 @@ func (rs *RequestServer) packetWorker(ctx context.Context, pktChan chan orderedR
|
||||||
Method: "Stat",
|
Method: "Stat",
|
||||||
Filepath: cleanPathWithBase(rs.startDirectory, request.Filepath),
|
Filepath: cleanPathWithBase(rs.startDirectory, request.Filepath),
|
||||||
}
|
}
|
||||||
rpkt = request.call(rs.Handlers, pkt, rs.pktMgr.alloc, orderID)
|
rpkt = request.call(rs.Handlers, pkt, rs.pktMgr.alloc, orderID, rs.maxTxPacket)
|
||||||
}
|
}
|
||||||
case *sshFxpFsetstatPacket:
|
case *sshFxpFsetstatPacket:
|
||||||
handle := pkt.getHandle()
|
handle := pkt.getHandle()
|
||||||
|
@ -272,7 +290,7 @@ func (rs *RequestServer) packetWorker(ctx context.Context, pktChan chan orderedR
|
||||||
Method: "Setstat",
|
Method: "Setstat",
|
||||||
Filepath: cleanPathWithBase(rs.startDirectory, request.Filepath),
|
Filepath: cleanPathWithBase(rs.startDirectory, request.Filepath),
|
||||||
}
|
}
|
||||||
rpkt = request.call(rs.Handlers, pkt, rs.pktMgr.alloc, orderID)
|
rpkt = request.call(rs.Handlers, pkt, rs.pktMgr.alloc, orderID, rs.maxTxPacket)
|
||||||
}
|
}
|
||||||
case *sshFxpExtendedPacketPosixRename:
|
case *sshFxpExtendedPacketPosixRename:
|
||||||
request := &Request{
|
request := &Request{
|
||||||
|
@ -280,24 +298,24 @@ func (rs *RequestServer) packetWorker(ctx context.Context, pktChan chan orderedR
|
||||||
Filepath: cleanPathWithBase(rs.startDirectory, pkt.Oldpath),
|
Filepath: cleanPathWithBase(rs.startDirectory, pkt.Oldpath),
|
||||||
Target: cleanPathWithBase(rs.startDirectory, pkt.Newpath),
|
Target: cleanPathWithBase(rs.startDirectory, pkt.Newpath),
|
||||||
}
|
}
|
||||||
rpkt = request.call(rs.Handlers, pkt, rs.pktMgr.alloc, orderID)
|
rpkt = request.call(rs.Handlers, pkt, rs.pktMgr.alloc, orderID, rs.maxTxPacket)
|
||||||
case *sshFxpExtendedPacketStatVFS:
|
case *sshFxpExtendedPacketStatVFS:
|
||||||
request := &Request{
|
request := &Request{
|
||||||
Method: "StatVFS",
|
Method: "StatVFS",
|
||||||
Filepath: cleanPathWithBase(rs.startDirectory, pkt.Path),
|
Filepath: cleanPathWithBase(rs.startDirectory, pkt.Path),
|
||||||
}
|
}
|
||||||
rpkt = request.call(rs.Handlers, pkt, rs.pktMgr.alloc, orderID)
|
rpkt = request.call(rs.Handlers, pkt, rs.pktMgr.alloc, orderID, rs.maxTxPacket)
|
||||||
case hasHandle:
|
case hasHandle:
|
||||||
handle := pkt.getHandle()
|
handle := pkt.getHandle()
|
||||||
request, ok := rs.getRequest(handle)
|
request, ok := rs.getRequest(handle)
|
||||||
if !ok {
|
if !ok {
|
||||||
rpkt = statusFromError(pkt.id(), EBADF)
|
rpkt = statusFromError(pkt.id(), EBADF)
|
||||||
} else {
|
} else {
|
||||||
rpkt = request.call(rs.Handlers, pkt, rs.pktMgr.alloc, orderID)
|
rpkt = request.call(rs.Handlers, pkt, rs.pktMgr.alloc, orderID, rs.maxTxPacket)
|
||||||
}
|
}
|
||||||
case hasPath:
|
case hasPath:
|
||||||
request := requestFromPacket(ctx, pkt, rs.startDirectory)
|
request := requestFromPacket(ctx, pkt, rs.startDirectory)
|
||||||
rpkt = request.call(rs.Handlers, pkt, rs.pktMgr.alloc, orderID)
|
rpkt = request.call(rs.Handlers, pkt, rs.pktMgr.alloc, orderID, rs.maxTxPacket)
|
||||||
request.close()
|
request.close()
|
||||||
default:
|
default:
|
||||||
rpkt = statusFromError(pkt.id(), ErrSSHFxOpUnsupported)
|
rpkt = statusFromError(pkt.id(), ErrSSHFxOpUnsupported)
|
||||||
|
|
24
request.go
24
request.go
|
@ -300,14 +300,14 @@ func (r *Request) transferError(err error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// called from worker to handle packet/request
|
// called from worker to handle packet/request
|
||||||
func (r *Request) call(handlers Handlers, pkt requestPacket, alloc *allocator, orderID uint32) responsePacket {
|
func (r *Request) call(handlers Handlers, pkt requestPacket, alloc *allocator, orderID uint32, maxTxPacket uint32) responsePacket {
|
||||||
switch r.Method {
|
switch r.Method {
|
||||||
case "Get":
|
case "Get":
|
||||||
return fileget(handlers.FileGet, r, pkt, alloc, orderID)
|
return fileget(handlers.FileGet, r, pkt, alloc, orderID, maxTxPacket)
|
||||||
case "Put":
|
case "Put":
|
||||||
return fileput(handlers.FilePut, r, pkt, alloc, orderID)
|
return fileput(handlers.FilePut, r, pkt, alloc, orderID, maxTxPacket)
|
||||||
case "Open":
|
case "Open":
|
||||||
return fileputget(handlers.FilePut, r, pkt, alloc, orderID)
|
return fileputget(handlers.FilePut, r, pkt, alloc, orderID, maxTxPacket)
|
||||||
case "Setstat", "Rename", "Rmdir", "Mkdir", "Link", "Symlink", "Remove", "PosixRename", "StatVFS":
|
case "Setstat", "Rename", "Rmdir", "Mkdir", "Link", "Symlink", "Remove", "PosixRename", "StatVFS":
|
||||||
return filecmd(handlers.FileCmd, r, pkt)
|
return filecmd(handlers.FileCmd, r, pkt)
|
||||||
case "List":
|
case "List":
|
||||||
|
@ -392,13 +392,13 @@ func (r *Request) opendir(h Handlers, pkt requestPacket) responsePacket {
|
||||||
}
|
}
|
||||||
|
|
||||||
// wrap FileReader handler
|
// wrap FileReader handler
|
||||||
func fileget(h FileReader, r *Request, pkt requestPacket, alloc *allocator, orderID uint32) responsePacket {
|
func fileget(h FileReader, r *Request, pkt requestPacket, alloc *allocator, orderID uint32, maxTxPacket uint32) responsePacket {
|
||||||
rd := r.getReaderAt()
|
rd := r.getReaderAt()
|
||||||
if rd == nil {
|
if rd == nil {
|
||||||
return statusFromError(pkt.id(), errors.New("unexpected read packet"))
|
return statusFromError(pkt.id(), errors.New("unexpected read packet"))
|
||||||
}
|
}
|
||||||
|
|
||||||
data, offset, _ := packetData(pkt, alloc, orderID)
|
data, offset, _ := packetData(pkt, alloc, orderID, maxTxPacket)
|
||||||
|
|
||||||
n, err := rd.ReadAt(data, offset)
|
n, err := rd.ReadAt(data, offset)
|
||||||
// only return EOF error if no data left to read
|
// only return EOF error if no data left to read
|
||||||
|
@ -414,20 +414,20 @@ func fileget(h FileReader, r *Request, pkt requestPacket, alloc *allocator, orde
|
||||||
}
|
}
|
||||||
|
|
||||||
// wrap FileWriter handler
|
// wrap FileWriter handler
|
||||||
func fileput(h FileWriter, r *Request, pkt requestPacket, alloc *allocator, orderID uint32) responsePacket {
|
func fileput(h FileWriter, r *Request, pkt requestPacket, alloc *allocator, orderID uint32, maxTxPacket uint32) responsePacket {
|
||||||
wr := r.getWriterAt()
|
wr := r.getWriterAt()
|
||||||
if wr == nil {
|
if wr == nil {
|
||||||
return statusFromError(pkt.id(), errors.New("unexpected write packet"))
|
return statusFromError(pkt.id(), errors.New("unexpected write packet"))
|
||||||
}
|
}
|
||||||
|
|
||||||
data, offset, _ := packetData(pkt, alloc, orderID)
|
data, offset, _ := packetData(pkt, alloc, orderID, maxTxPacket)
|
||||||
|
|
||||||
_, err := wr.WriteAt(data, offset)
|
_, err := wr.WriteAt(data, offset)
|
||||||
return statusFromError(pkt.id(), err)
|
return statusFromError(pkt.id(), err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// wrap OpenFileWriter handler
|
// wrap OpenFileWriter handler
|
||||||
func fileputget(h FileWriter, r *Request, pkt requestPacket, alloc *allocator, orderID uint32) responsePacket {
|
func fileputget(h FileWriter, r *Request, pkt requestPacket, alloc *allocator, orderID uint32, maxTxPacket uint32) responsePacket {
|
||||||
rw := r.getWriterAtReaderAt()
|
rw := r.getWriterAtReaderAt()
|
||||||
if rw == nil {
|
if rw == nil {
|
||||||
return statusFromError(pkt.id(), errors.New("unexpected write and read packet"))
|
return statusFromError(pkt.id(), errors.New("unexpected write and read packet"))
|
||||||
|
@ -435,7 +435,7 @@ func fileputget(h FileWriter, r *Request, pkt requestPacket, alloc *allocator, o
|
||||||
|
|
||||||
switch p := pkt.(type) {
|
switch p := pkt.(type) {
|
||||||
case *sshFxpReadPacket:
|
case *sshFxpReadPacket:
|
||||||
data, offset := p.getDataSlice(alloc, orderID), int64(p.Offset)
|
data, offset := p.getDataSlice(alloc, orderID, maxTxPacket), int64(p.Offset)
|
||||||
|
|
||||||
n, err := rw.ReadAt(data, offset)
|
n, err := rw.ReadAt(data, offset)
|
||||||
// only return EOF error if no data left to read
|
// only return EOF error if no data left to read
|
||||||
|
@ -461,10 +461,10 @@ func fileputget(h FileWriter, r *Request, pkt requestPacket, alloc *allocator, o
|
||||||
}
|
}
|
||||||
|
|
||||||
// file data for additional read/write packets
|
// file data for additional read/write packets
|
||||||
func packetData(p requestPacket, alloc *allocator, orderID uint32) (data []byte, offset int64, length uint32) {
|
func packetData(p requestPacket, alloc *allocator, orderID uint32, maxTxPacket uint32) (data []byte, offset int64, length uint32) {
|
||||||
switch p := p.(type) {
|
switch p := p.(type) {
|
||||||
case *sshFxpReadPacket:
|
case *sshFxpReadPacket:
|
||||||
return p.getDataSlice(alloc, orderID), int64(p.Offset), p.Len
|
return p.getDataSlice(alloc, orderID, maxTxPacket), int64(p.Offset), p.Len
|
||||||
case *sshFxpWritePacket:
|
case *sshFxpWritePacket:
|
||||||
return p.Data, int64(p.Offset), p.Length
|
return p.Data, int64(p.Offset), p.Length
|
||||||
}
|
}
|
||||||
|
|
|
@ -149,7 +149,7 @@ func TestRequestGet(t *testing.T) {
|
||||||
for i, txt := range []string{"file-", "data."} {
|
for i, txt := range []string{"file-", "data."} {
|
||||||
pkt := &sshFxpReadPacket{ID: uint32(i), Handle: "a",
|
pkt := &sshFxpReadPacket{ID: uint32(i), Handle: "a",
|
||||||
Offset: uint64(i * 5), Len: 5}
|
Offset: uint64(i * 5), Len: 5}
|
||||||
rpkt := request.call(handlers, pkt, nil, 0)
|
rpkt := request.call(handlers, pkt, nil, 0, defaultMaxTxPacket)
|
||||||
dpkt := rpkt.(*sshFxpDataPacket)
|
dpkt := rpkt.(*sshFxpDataPacket)
|
||||||
assert.Equal(t, dpkt.id(), uint32(i))
|
assert.Equal(t, dpkt.id(), uint32(i))
|
||||||
assert.Equal(t, string(dpkt.Data), txt)
|
assert.Equal(t, string(dpkt.Data), txt)
|
||||||
|
@ -162,7 +162,7 @@ func TestRequestCustomError(t *testing.T) {
|
||||||
pkt := fakePacket{myid: 1}
|
pkt := fakePacket{myid: 1}
|
||||||
cmdErr := errors.New("stat not supported")
|
cmdErr := errors.New("stat not supported")
|
||||||
handlers.returnError(cmdErr)
|
handlers.returnError(cmdErr)
|
||||||
rpkt := request.call(handlers, pkt, nil, 0)
|
rpkt := request.call(handlers, pkt, nil, 0, defaultMaxTxPacket)
|
||||||
assert.Equal(t, rpkt, statusFromError(pkt.myid, cmdErr))
|
assert.Equal(t, rpkt, statusFromError(pkt.myid, cmdErr))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -173,11 +173,11 @@ func TestRequestPut(t *testing.T) {
|
||||||
request.state.writerAt, _ = handlers.FilePut.Filewrite(request)
|
request.state.writerAt, _ = handlers.FilePut.Filewrite(request)
|
||||||
pkt := &sshFxpWritePacket{ID: 0, Handle: "a", Offset: 0, Length: 5,
|
pkt := &sshFxpWritePacket{ID: 0, Handle: "a", Offset: 0, Length: 5,
|
||||||
Data: []byte("file-")}
|
Data: []byte("file-")}
|
||||||
rpkt := request.call(handlers, pkt, nil, 0)
|
rpkt := request.call(handlers, pkt, nil, 0, defaultMaxTxPacket)
|
||||||
checkOkStatus(t, rpkt)
|
checkOkStatus(t, rpkt)
|
||||||
pkt = &sshFxpWritePacket{ID: 1, Handle: "a", Offset: 5, Length: 5,
|
pkt = &sshFxpWritePacket{ID: 1, Handle: "a", Offset: 5, Length: 5,
|
||||||
Data: []byte("data.")}
|
Data: []byte("data.")}
|
||||||
rpkt = request.call(handlers, pkt, nil, 0)
|
rpkt = request.call(handlers, pkt, nil, 0, defaultMaxTxPacket)
|
||||||
checkOkStatus(t, rpkt)
|
checkOkStatus(t, rpkt)
|
||||||
assert.Equal(t, "file-data.", handlers.getOutString())
|
assert.Equal(t, "file-data.", handlers.getOutString())
|
||||||
}
|
}
|
||||||
|
@ -186,11 +186,11 @@ func TestRequestCmdr(t *testing.T) {
|
||||||
handlers := newTestHandlers()
|
handlers := newTestHandlers()
|
||||||
request := testRequest("Mkdir")
|
request := testRequest("Mkdir")
|
||||||
pkt := fakePacket{myid: 1}
|
pkt := fakePacket{myid: 1}
|
||||||
rpkt := request.call(handlers, pkt, nil, 0)
|
rpkt := request.call(handlers, pkt, nil, 0, defaultMaxTxPacket)
|
||||||
checkOkStatus(t, rpkt)
|
checkOkStatus(t, rpkt)
|
||||||
|
|
||||||
handlers.returnError(errTest)
|
handlers.returnError(errTest)
|
||||||
rpkt = request.call(handlers, pkt, nil, 0)
|
rpkt = request.call(handlers, pkt, nil, 0, defaultMaxTxPacket)
|
||||||
assert.Equal(t, rpkt, statusFromError(pkt.myid, errTest))
|
assert.Equal(t, rpkt, statusFromError(pkt.myid, errTest))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -198,7 +198,7 @@ func TestRequestInfoStat(t *testing.T) {
|
||||||
handlers := newTestHandlers()
|
handlers := newTestHandlers()
|
||||||
request := testRequest("Stat")
|
request := testRequest("Stat")
|
||||||
pkt := fakePacket{myid: 1}
|
pkt := fakePacket{myid: 1}
|
||||||
rpkt := request.call(handlers, pkt, nil, 0)
|
rpkt := request.call(handlers, pkt, nil, 0, defaultMaxTxPacket)
|
||||||
spkt, ok := rpkt.(*sshFxpStatResponse)
|
spkt, ok := rpkt.(*sshFxpStatResponse)
|
||||||
assert.True(t, ok)
|
assert.True(t, ok)
|
||||||
assert.Equal(t, spkt.info.Name(), "request_test.go")
|
assert.Equal(t, spkt.info.Name(), "request_test.go")
|
||||||
|
@ -215,13 +215,13 @@ func TestRequestInfoList(t *testing.T) {
|
||||||
assert.Equal(t, hpkt.Handle, "1")
|
assert.Equal(t, hpkt.Handle, "1")
|
||||||
}
|
}
|
||||||
pkt = fakePacket{myid: 2}
|
pkt = fakePacket{myid: 2}
|
||||||
request.call(handlers, pkt, nil, 0)
|
request.call(handlers, pkt, nil, 0, defaultMaxTxPacket)
|
||||||
}
|
}
|
||||||
func TestRequestInfoReadlink(t *testing.T) {
|
func TestRequestInfoReadlink(t *testing.T) {
|
||||||
handlers := newTestHandlers()
|
handlers := newTestHandlers()
|
||||||
request := testRequest("Readlink")
|
request := testRequest("Readlink")
|
||||||
pkt := fakePacket{myid: 1}
|
pkt := fakePacket{myid: 1}
|
||||||
rpkt := request.call(handlers, pkt, nil, 0)
|
rpkt := request.call(handlers, pkt, nil, 0, defaultMaxTxPacket)
|
||||||
npkt, ok := rpkt.(*sshFxpNamePacket)
|
npkt, ok := rpkt.(*sshFxpNamePacket)
|
||||||
if assert.True(t, ok) {
|
if assert.True(t, ok) {
|
||||||
assert.IsType(t, &sshFxpNameAttr{}, npkt.NameAttrs[0])
|
assert.IsType(t, &sshFxpNameAttr{}, npkt.NameAttrs[0])
|
||||||
|
@ -234,7 +234,7 @@ func TestOpendirHandleReuse(t *testing.T) {
|
||||||
request := testRequest("Stat")
|
request := testRequest("Stat")
|
||||||
request.handle = "1"
|
request.handle = "1"
|
||||||
pkt := fakePacket{myid: 1}
|
pkt := fakePacket{myid: 1}
|
||||||
rpkt := request.call(handlers, pkt, nil, 0)
|
rpkt := request.call(handlers, pkt, nil, 0, defaultMaxTxPacket)
|
||||||
assert.IsType(t, &sshFxpStatResponse{}, rpkt)
|
assert.IsType(t, &sshFxpStatResponse{}, rpkt)
|
||||||
|
|
||||||
request.Method = "List"
|
request.Method = "List"
|
||||||
|
@ -244,6 +244,6 @@ func TestOpendirHandleReuse(t *testing.T) {
|
||||||
hpkt := rpkt.(*sshFxpHandlePacket)
|
hpkt := rpkt.(*sshFxpHandlePacket)
|
||||||
assert.Equal(t, hpkt.Handle, "1")
|
assert.Equal(t, hpkt.Handle, "1")
|
||||||
}
|
}
|
||||||
rpkt = request.call(handlers, pkt, nil, 0)
|
rpkt = request.call(handlers, pkt, nil, 0, defaultMaxTxPacket)
|
||||||
assert.IsType(t, &sshFxpNamePacket{}, rpkt)
|
assert.IsType(t, &sshFxpNamePacket{}, rpkt)
|
||||||
}
|
}
|
||||||
|
|
22
server.go
22
server.go
|
@ -34,6 +34,7 @@ type Server struct {
|
||||||
openFilesLock sync.RWMutex
|
openFilesLock sync.RWMutex
|
||||||
handleCount int
|
handleCount int
|
||||||
workDir string
|
workDir string
|
||||||
|
maxTxPacket uint32
|
||||||
}
|
}
|
||||||
|
|
||||||
func (svr *Server) nextHandle(f *os.File) string {
|
func (svr *Server) nextHandle(f *os.File) string {
|
||||||
|
@ -86,6 +87,7 @@ func NewServer(rwc io.ReadWriteCloser, options ...ServerOption) (*Server, error)
|
||||||
debugStream: ioutil.Discard,
|
debugStream: ioutil.Discard,
|
||||||
pktMgr: newPktMgr(svrConn),
|
pktMgr: newPktMgr(svrConn),
|
||||||
openFiles: make(map[string]*os.File),
|
openFiles: make(map[string]*os.File),
|
||||||
|
maxTxPacket: defaultMaxTxPacket,
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, o := range options {
|
for _, o := range options {
|
||||||
|
@ -139,6 +141,24 @@ func WithServerWorkingDirectory(workDir string) ServerOption {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithMaxTxPacket sets the maximum size of the payload returned to the client,
|
||||||
|
// measured in bytes. The default value is 32768 bytes, and this option
|
||||||
|
// can only be used to increase it. Setting this option to a larger value
|
||||||
|
// should be safe, because the client decides the size of the requested payload.
|
||||||
|
//
|
||||||
|
// The default maximum packet size is 32768 bytes.
|
||||||
|
func WithMaxTxPacket(size uint32) ServerOption {
|
||||||
|
return func(s *Server) error {
|
||||||
|
if size < defaultMaxTxPacket {
|
||||||
|
return errors.New("size must be greater than or equal to 32768")
|
||||||
|
}
|
||||||
|
|
||||||
|
s.maxTxPacket = size
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type rxPacket struct {
|
type rxPacket struct {
|
||||||
pktType fxp
|
pktType fxp
|
||||||
pktBytes []byte
|
pktBytes []byte
|
||||||
|
@ -287,7 +307,7 @@ func handlePacket(s *Server, p orderedRequest) error {
|
||||||
f, ok := s.getHandle(p.Handle)
|
f, ok := s.getHandle(p.Handle)
|
||||||
if ok {
|
if ok {
|
||||||
err = nil
|
err = nil
|
||||||
data := p.getDataSlice(s.pktMgr.alloc, orderID)
|
data := p.getDataSlice(s.pktMgr.alloc, orderID, s.maxTxPacket)
|
||||||
n, _err := f.ReadAt(data, int64(p.Offset))
|
n, _err := f.ReadAt(data, int64(p.Offset))
|
||||||
if _err != nil && (_err != io.EOF || n == 0) {
|
if _err != nil && (_err != io.EOF || n == 0) {
|
||||||
err = _err
|
err = _err
|
||||||
|
|
Loading…
Reference in New Issue