request-server to use new packet ordering code

The 2 servers now use all the same packet managing code to ensure
ordered processing and reply packets.
This commit is contained in:
John Eikenberry 2017-04-23 14:27:25 -07:00
parent 5024cb048c
commit ced55b1bf0
1 changed files with 18 additions and 18 deletions

View File

@ -28,7 +28,6 @@ type Handlers struct {
type RequestServer struct { type RequestServer struct {
serverConn serverConn
Handlers Handlers Handlers Handlers
pktChan chan requestPacket
pktMgr packetManager pktMgr packetManager
openRequests map[string]Request openRequests map[string]Request
openRequestLock sync.RWMutex openRequestLock sync.RWMutex
@ -47,7 +46,6 @@ func NewRequestServer(rwc io.ReadWriteCloser, h Handlers) *RequestServer {
return &RequestServer{ return &RequestServer{
serverConn: svrConn, serverConn: svrConn,
Handlers: h, Handlers: h,
pktChan: make(chan requestPacket, sftpServerWorkerCount),
pktMgr: newPktMgr(&svrConn), pktMgr: newPktMgr(&svrConn),
openRequests: make(map[string]Request), openRequests: make(map[string]Request),
} }
@ -84,15 +82,15 @@ func (rs *RequestServer) Close() error { return rs.conn.Close() }
// Serve requests for user session // Serve requests for user session
func (rs *RequestServer) Serve() error { func (rs *RequestServer) Serve() error {
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(sftpServerWorkerCount) wg.Add(1)
for i := 0; i < sftpServerWorkerCount; i++ { workerFunc := func(ch requestChan) {
go func() { wg.Add(1)
defer wg.Done() defer wg.Done()
if err := rs.packetWorker(); err != nil { if err := rs.packetWorker(ch); err != nil {
rs.conn.Close() // shuts down recvPacket rs.conn.Close() // shuts down recvPacket
} }
}()
} }
pktChan := rs.pktMgr.workerChan(workerFunc)
var err error var err error
var pkt requestPacket var pkt requestPacket
@ -103,24 +101,26 @@ func (rs *RequestServer) Serve() error {
if err != nil { if err != nil {
break break
} }
pkt, err = makePacket(rxPacket{fxp(pktType), pktBytes}) pkt, err = makePacket(rxPacket{fxp(pktType), pktBytes})
if err != nil { if err != nil {
debug("makePacket err: %v", err) debug("makePacket err: %v", err)
rs.conn.Close() // shuts down recvPacket rs.conn.Close() // shuts down recvPacket
break break
} }
rs.pktMgr.incomingPacket(pkt)
rs.pktChan <- pkt
}
close(rs.pktChan) // shuts down sftpServerWorkers pktChan <- pkt
}
wg.Done()
close(pktChan) // shuts down sftpServerWorkers
wg.Wait() // wait for all workers to exit wg.Wait() // wait for all workers to exit
rs.pktMgr.close() // shuts down packetManager
return err return err
} }
func (rs *RequestServer) packetWorker() error { func (rs *RequestServer) packetWorker(pktChan chan requestPacket) error {
for pkt := range rs.pktChan { for pkt := range pktChan {
var rpkt responsePacket var rpkt responsePacket
switch pkt := pkt.(type) { switch pkt := pkt.(type) {
case *sshFxInitPacket: case *sshFxInitPacket: