decouple packet channel from server struct

looking to create 2 pools, so I want to pass the channel in
This commit is contained in:
John Eikenberry 2017-03-25 17:07:47 -07:00
parent cb456b384c
commit 5fd073bcc3
1 changed files with 6 additions and 7 deletions

View File

@ -29,7 +29,6 @@ type Server struct {
serverConn serverConn
debugStream io.Writer debugStream io.Writer
readOnly bool readOnly bool
pktChan chan requestPacket
pktMgr packetManager pktMgr packetManager
openFiles map[string]*os.File openFiles map[string]*os.File
openFilesLock sync.RWMutex openFilesLock sync.RWMutex
@ -85,7 +84,6 @@ func NewServer(rwc io.ReadWriteCloser, options ...ServerOption) (*Server, error)
s := &Server{ s := &Server{
serverConn: svrConn, serverConn: svrConn,
debugStream: ioutil.Discard, debugStream: ioutil.Discard,
pktChan: make(chan requestPacket, sftpServerWorkerCount),
pktMgr: newPktMgr(&svrConn), pktMgr: newPktMgr(&svrConn),
openFiles: make(map[string]*os.File), openFiles: make(map[string]*os.File),
maxTxPacket: 1 << 15, maxTxPacket: 1 << 15,
@ -125,8 +123,8 @@ type rxPacket struct {
} }
// Up to N parallel servers // Up to N parallel servers
func (svr *Server) sftpServerWorker() error { func (svr *Server) sftpServerWorker(pktChan chan requestPacket) error {
for pkt := range svr.pktChan { for pkt := range pktChan {
// readonly checks // readonly checks
readonly := true readonly := true
@ -285,10 +283,11 @@ func handlePacket(s *Server, p interface{}) error {
func (svr *Server) Serve() error { func (svr *Server) Serve() error {
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(sftpServerWorkerCount) wg.Add(sftpServerWorkerCount)
pktChan := make(chan requestPacket, sftpServerWorkerCount)
for i := 0; i < sftpServerWorkerCount; i++ { for i := 0; i < sftpServerWorkerCount; i++ {
go func() { go func() {
defer wg.Done() defer wg.Done()
if err := svr.sftpServerWorker(); err != nil { if err := svr.sftpServerWorker(pktChan); err != nil {
svr.conn.Close() // shuts down recvPacket svr.conn.Close() // shuts down recvPacket
} }
}() }()
@ -312,10 +311,10 @@ func (svr *Server) Serve() error {
} }
svr.pktMgr.incomingPacket(pkt) svr.pktMgr.incomingPacket(pkt)
svr.pktChan <- pkt pktChan <- pkt
} }
close(svr.pktChan) // shuts down sftpServerWorkers close(pktChan) // shuts down sftpServerWorkers
wg.Wait() // wait for all workers to exit wg.Wait() // wait for all workers to exit
svr.pktMgr.close() // shuts down packetManager svr.pktMgr.close() // shuts down packetManager