packet-manager: sort incoming request ID queue

Incoming queue is not guaranteed to contain request IDs in ascending
order. Such state may be achieved by using a single sftp.Client
connection from multiple goroutines. Sort incoming queue to avoid
livelock due to different request/response order, like this:

2017/03/27 18:29:07 incoming: [55 56 54 57 58]
2017/03/27 18:29:07 outgoing: [54 55 56 57 58]

For single-threaded clients request/response order will remain intact
and nothing should break.

Signed-off-by: Pavel Borzenkov <pavel.borzenkov@gmail.com>
This commit is contained in:
Pavel Borzenkov 2017-03-27 18:33:59 +03:00 committed by John Eikenberry
parent f581f81666
commit df2f92e1bf
3 changed files with 21 additions and 2 deletions

View File

@ -14,7 +14,7 @@ type packetManager struct {
requests chan requestPacket
responses chan responsePacket
fini chan struct{}
incoming []uint32
incoming requestPacketIDs
outgoing responsePackets
sender packetSender // connection object
}
@ -55,6 +55,9 @@ func (s *packetManager) worker() {
case pkt := <-s.requests:
debug("incoming id: %v", pkt.id())
s.incoming = append(s.incoming, pkt.id())
if len(s.incoming) > 1 {
s.incoming.Sort()
}
case pkt := <-s.responses:
debug("outgoing pkt: %v", pkt.id())
s.outgoing = append(s.outgoing, pkt)

View File

@ -11,3 +11,11 @@ func (r responsePackets) Sort() {
return r[i].id() < r[j].id()
})
}
type requestPacketIDs []uint32
func (r requestPacketIDs) Sort() {
sort.Slice(r, func(i, j int) bool {
return r[i] < r[j]
})
}

View File

@ -4,10 +4,18 @@ package sftp
import "sort"
// for sorting/ordering incoming/outgoing
// for sorting/ordering outgoing
type responsePackets []responsePacket
func (r responsePackets) Len() int { return len(r) }
func (r responsePackets) Swap(i, j int) { r[i], r[j] = r[j], r[i] }
func (r responsePackets) Less(i, j int) bool { return r[i].id() < r[j].id() }
func (r responsePackets) Sort() { sort.Sort(r) }
// for sorting/ordering incoming
type requestPacketIDs []uint32
func (r requestPacketIDs) Len() int { return len(r) }
func (r requestPacketIDs) Swap(i, j int) { r[i], r[j] = r[j], r[i] }
func (r requestPacketIDs) Less(i, j int) bool { return r[i] < r[j] }
func (r requestPacketIDs) Sort() { sort.Sort(r) }