A simpler bytes buffer pool

This commit is contained in:
Cassondra Foesch 2021-03-12 15:14:27 +00:00
parent e164c76a12
commit c34ea374a2
2 changed files with 37 additions and 16 deletions

View File

@ -1235,11 +1235,7 @@ func (f *File) WriteTo(w io.Writer) (written int64, err error) {
} }
}() }()
pool := sync.Pool{ pool := newBufPool(concurrency, f.c.maxPacket)
New: func() interface{} {
return make([]byte, f.c.maxPacket)
},
}
wg.Add(concurrency) wg.Add(concurrency)
for i := 0; i < concurrency; i++ { for i := 0; i < concurrency; i++ {
@ -1250,7 +1246,7 @@ func (f *File) WriteTo(w io.Writer) (written int64, err error) {
ch := make(chan result, 1) // reusable channel ch := make(chan result, 1) // reusable channel
for readWork := range readCh { for readWork := range readCh {
b := pool.Get().([]byte) b := pool.Get()
n, err := f.readChunkAt(ch, b, readWork.off) n, err := f.readChunkAt(ch, b, readWork.off)
if n < 0 { if n < 0 {
@ -1519,12 +1515,13 @@ func (f *File) readFromConcurrent(r io.Reader, remain int64) (read int64, err er
} }
errCh := make(chan rwErr) errCh := make(chan rwErr)
pool := sync.Pool{ concurrency := int(remain/int64(f.c.maxPacket) + 1) // a bad guess, but better than no guess
New: func() interface{} { if concurrency > f.c.maxConcurrentRequests {
return make([]byte, f.c.maxPacket) concurrency = f.c.maxConcurrentRequests
},
} }
pool := newBufPool(concurrency, f.c.maxPacket)
// Slice: cut up the Read into any number of buffers of length <= f.c.maxPacket, and at appropriate offsets. // Slice: cut up the Read into any number of buffers of length <= f.c.maxPacket, and at appropriate offsets.
go func() { go func() {
defer close(workCh) defer close(workCh)
@ -1532,7 +1529,7 @@ func (f *File) readFromConcurrent(r io.Reader, remain int64) (read int64, err er
off := f.offset off := f.offset
for { for {
b := pool.Get().([]byte) b := pool.Get()
n, err := r.Read(b) n, err := r.Read(b)
if n > 0 { if n > 0 {
@ -1557,11 +1554,6 @@ func (f *File) readFromConcurrent(r io.Reader, remain int64) (read int64, err er
} }
}() }()
concurrency := int(remain/int64(f.c.maxPacket) + 1) // a bad guess, but better than no guess
if concurrency > f.c.maxConcurrentRequests {
concurrency = f.c.maxConcurrentRequests
}
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(concurrency) wg.Add(concurrency)
for i := 0; i < concurrency; i++ { for i := 0; i < concurrency; i++ {

29
pool.go Normal file
View File

@ -0,0 +1,29 @@
package sftp
type bufPool struct {
ch chan []byte
blen int
}
func newBufPool(depth, bufLen int) *bufPool {
return &bufPool{
ch: make(chan []byte, depth),
blen: bufLen,
}
}
func (p *bufPool) Get() []byte {
select {
case b := <-p.ch:
return b
default:
return make([]byte, p.blen)
}
}
func (p *bufPool) Put(b []byte) {
select {
case p.ch <- b:
default:
}
}