| 
									
										
										
										
											2021-04-19 03:41:13 +08:00
										 |  |  | // Copyright (c) 2015-2021 MinIO, Inc.
 | 
					
						
							|  |  |  | //
 | 
					
						
							|  |  |  | // This file is part of MinIO Object Storage stack
 | 
					
						
							|  |  |  | //
 | 
					
						
							|  |  |  | // This program is free software: you can redistribute it and/or modify
 | 
					
						
							|  |  |  | // it under the terms of the GNU Affero General Public License as published by
 | 
					
						
							|  |  |  | // the Free Software Foundation, either version 3 of the License, or
 | 
					
						
							|  |  |  | // (at your option) any later version.
 | 
					
						
							|  |  |  | //
 | 
					
						
							|  |  |  | // This program is distributed in the hope that it will be useful
 | 
					
						
							|  |  |  | // but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
					
						
							|  |  |  | // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
					
						
							|  |  |  | // GNU Affero General Public License for more details.
 | 
					
						
							|  |  |  | //
 | 
					
						
							|  |  |  | // You should have received a copy of the GNU Affero General Public License
 | 
					
						
							|  |  |  | // along with this program.  If not, see <http://www.gnu.org/licenses/>.
 | 
					
						
							| 
									
										
										
										
											2019-01-09 08:53:04 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | package s3select | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import ( | 
					
						
							| 
									
										
										
										
											2021-10-15 02:11:07 +08:00
										 |  |  | 	"context" | 
					
						
							| 
									
										
										
										
											2019-11-06 06:20:37 +08:00
										 |  |  | 	"errors" | 
					
						
							| 
									
										
										
										
											2019-01-09 08:53:04 +08:00
										 |  |  | 	"fmt" | 
					
						
							|  |  |  | 	"io" | 
					
						
							| 
									
										
										
										
											2021-10-15 02:11:07 +08:00
										 |  |  | 	"runtime" | 
					
						
							| 
									
										
										
										
											2019-11-06 06:20:37 +08:00
										 |  |  | 	"sync" | 
					
						
							| 
									
										
										
										
											2019-01-09 08:53:04 +08:00
										 |  |  | 	"sync/atomic" | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-10-15 02:11:07 +08:00
										 |  |  | 	"github.com/cosnicolaou/pbzip2" | 
					
						
							| 
									
										
										
										
											2021-09-07 00:09:53 +08:00
										 |  |  | 	"github.com/klauspost/compress/s2" | 
					
						
							|  |  |  | 	"github.com/klauspost/compress/zstd" | 
					
						
							| 
									
										
										
										
											2019-01-09 08:53:04 +08:00
										 |  |  | 	gzip "github.com/klauspost/pgzip" | 
					
						
							| 
									
										
										
										
											2021-09-07 00:09:53 +08:00
										 |  |  | 	"github.com/pierrec/lz4" | 
					
						
							| 
									
										
										
										
											2019-01-09 08:53:04 +08:00
										 |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | type countUpReader struct { | 
					
						
							|  |  |  | 	reader    io.Reader | 
					
						
							|  |  |  | 	bytesRead int64 | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-10-18 23:44:36 +08:00
										 |  |  | // Max bzip2 concurrency across calls. 50% of GOMAXPROCS.
 | 
					
						
							|  |  |  | var bz2Limiter = pbzip2.CreateConcurrencyPool((runtime.GOMAXPROCS(0) + 1) / 2) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-01-09 08:53:04 +08:00
										 |  |  | func (r *countUpReader) Read(p []byte) (n int, err error) { | 
					
						
							|  |  |  | 	n, err = r.reader.Read(p) | 
					
						
							|  |  |  | 	atomic.AddInt64(&r.bytesRead, int64(n)) | 
					
						
							|  |  |  | 	return n, err | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (r *countUpReader) BytesRead() int64 { | 
					
						
							| 
									
										
										
										
											2020-02-14 06:03:52 +08:00
										 |  |  | 	if r == nil { | 
					
						
							|  |  |  | 		return 0 | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2019-01-09 08:53:04 +08:00
										 |  |  | 	return atomic.LoadInt64(&r.bytesRead) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func newCountUpReader(reader io.Reader) *countUpReader { | 
					
						
							|  |  |  | 	return &countUpReader{ | 
					
						
							|  |  |  | 		reader: reader, | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | type progressReader struct { | 
					
						
							|  |  |  | 	rc              io.ReadCloser | 
					
						
							|  |  |  | 	scannedReader   *countUpReader | 
					
						
							|  |  |  | 	processedReader *countUpReader | 
					
						
							| 
									
										
										
										
											2019-11-06 06:20:37 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	closedMu sync.Mutex | 
					
						
							| 
									
										
										
										
											2021-09-07 00:09:53 +08:00
										 |  |  | 	closer   io.ReadCloser | 
					
						
							| 
									
										
										
										
											2019-11-06 06:20:37 +08:00
										 |  |  | 	closed   bool | 
					
						
							| 
									
										
										
										
											2019-01-09 08:53:04 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (pr *progressReader) Read(p []byte) (n int, err error) { | 
					
						
							| 
									
										
										
										
											2019-11-06 06:20:37 +08:00
										 |  |  | 	// This ensures that Close will block until Read has completed.
 | 
					
						
							|  |  |  | 	// This allows another goroutine to close the reader.
 | 
					
						
							|  |  |  | 	pr.closedMu.Lock() | 
					
						
							|  |  |  | 	defer pr.closedMu.Unlock() | 
					
						
							|  |  |  | 	if pr.closed { | 
					
						
							|  |  |  | 		return 0, errors.New("progressReader: read after Close") | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2019-01-09 08:53:04 +08:00
										 |  |  | 	return pr.processedReader.Read(p) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (pr *progressReader) Close() error { | 
					
						
							| 
									
										
										
										
											2019-11-06 06:20:37 +08:00
										 |  |  | 	pr.closedMu.Lock() | 
					
						
							|  |  |  | 	defer pr.closedMu.Unlock() | 
					
						
							|  |  |  | 	if pr.closed { | 
					
						
							|  |  |  | 		return nil | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	pr.closed = true | 
					
						
							| 
									
										
										
										
											2021-09-07 00:09:53 +08:00
										 |  |  | 	if pr.closer != nil { | 
					
						
							|  |  |  | 		pr.closer.Close() | 
					
						
							| 
									
										
										
										
											2021-01-20 09:51:46 +08:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2019-01-09 08:53:04 +08:00
										 |  |  | 	return pr.rc.Close() | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (pr *progressReader) Stats() (bytesScanned, bytesProcessed int64) { | 
					
						
							| 
									
										
										
										
											2020-02-14 06:03:52 +08:00
										 |  |  | 	if pr == nil { | 
					
						
							|  |  |  | 		return 0, 0 | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2019-01-09 08:53:04 +08:00
										 |  |  | 	return pr.scannedReader.BytesRead(), pr.processedReader.BytesRead() | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func newProgressReader(rc io.ReadCloser, compType CompressionType) (*progressReader, error) { | 
					
						
							| 
									
										
										
										
											2021-01-20 09:51:46 +08:00
										 |  |  | 	if rc == nil { | 
					
						
							|  |  |  | 		return nil, errors.New("newProgressReader: nil reader provided") | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2019-01-09 08:53:04 +08:00
										 |  |  | 	scannedReader := newCountUpReader(rc) | 
					
						
							| 
									
										
										
										
											2021-01-20 09:51:46 +08:00
										 |  |  | 	pr := progressReader{ | 
					
						
							|  |  |  | 		rc:            rc, | 
					
						
							|  |  |  | 		scannedReader: scannedReader, | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	var r io.Reader | 
					
						
							| 
									
										
										
										
											2019-01-09 08:53:04 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	switch compType { | 
					
						
							|  |  |  | 	case noneType: | 
					
						
							|  |  |  | 		r = scannedReader | 
					
						
							|  |  |  | 	case gzipType: | 
					
						
							| 
									
										
										
										
											2021-09-07 00:09:53 +08:00
										 |  |  | 		gzr, err := gzip.NewReader(scannedReader) | 
					
						
							| 
									
										
										
										
											2020-03-13 06:34:11 +08:00
										 |  |  | 		if err != nil { | 
					
						
							| 
									
										
										
										
											2020-03-06 11:34:04 +08:00
										 |  |  | 			if errors.Is(err, gzip.ErrHeader) || errors.Is(err, gzip.ErrChecksum) { | 
					
						
							| 
									
										
										
										
											2021-09-07 00:09:53 +08:00
										 |  |  | 				return nil, errInvalidCompression(err, compType) | 
					
						
							| 
									
										
										
										
											2020-03-06 11:34:04 +08:00
										 |  |  | 			} | 
					
						
							| 
									
										
										
										
											2020-03-13 06:34:11 +08:00
										 |  |  | 			return nil, errTruncatedInput(err) | 
					
						
							| 
									
										
										
										
											2019-01-09 08:53:04 +08:00
										 |  |  | 		} | 
					
						
							| 
									
										
										
										
											2021-09-07 00:09:53 +08:00
										 |  |  | 		r = gzr | 
					
						
							|  |  |  | 		pr.closer = gzr | 
					
						
							| 
									
										
										
										
											2019-01-09 08:53:04 +08:00
										 |  |  | 	case bzip2Type: | 
					
						
							| 
									
										
										
										
											2021-10-15 02:11:07 +08:00
										 |  |  | 		ctx, cancel := context.WithCancel(context.Background()) | 
					
						
							| 
									
										
										
										
											2021-10-18 23:44:36 +08:00
										 |  |  | 		r = pbzip2.NewReader(ctx, scannedReader, pbzip2.DecompressionOptions( | 
					
						
							|  |  |  | 			pbzip2.BZConcurrency((runtime.GOMAXPROCS(0)+1)/2), | 
					
						
							|  |  |  | 			pbzip2.BZConcurrencyPool(bz2Limiter), | 
					
						
							|  |  |  | 		)) | 
					
						
							| 
									
										
										
										
											2021-10-15 02:11:07 +08:00
										 |  |  | 		pr.closer = &nopReadCloser{fn: cancel} | 
					
						
							| 
									
										
										
										
											2021-09-07 00:09:53 +08:00
										 |  |  | 	case zstdType: | 
					
						
							|  |  |  | 		// Set a max window of 64MB. More than reasonable.
 | 
					
						
							|  |  |  | 		zr, err := zstd.NewReader(scannedReader, zstd.WithDecoderConcurrency(2), zstd.WithDecoderMaxWindow(64<<20)) | 
					
						
							|  |  |  | 		if err != nil { | 
					
						
							|  |  |  | 			return nil, errInvalidCompression(err, compType) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		r = zr | 
					
						
							|  |  |  | 		pr.closer = zr.IOReadCloser() | 
					
						
							|  |  |  | 	case lz4Type: | 
					
						
							|  |  |  | 		r = lz4.NewReader(scannedReader) | 
					
						
							|  |  |  | 	case s2Type: | 
					
						
							|  |  |  | 		r = s2.NewReader(scannedReader) | 
					
						
							|  |  |  | 	case snappyType: | 
					
						
							|  |  |  | 		r = s2.NewReader(scannedReader, s2.ReaderMaxBlockSize(64<<10)) | 
					
						
							| 
									
										
										
										
											2019-01-09 08:53:04 +08:00
										 |  |  | 	default: | 
					
						
							|  |  |  | 		return nil, errInvalidCompressionFormat(fmt.Errorf("unknown compression type '%v'", compType)) | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2021-01-20 09:51:46 +08:00
										 |  |  | 	pr.processedReader = newCountUpReader(r) | 
					
						
							| 
									
										
										
										
											2019-01-09 08:53:04 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-01-20 09:51:46 +08:00
										 |  |  | 	return &pr, nil | 
					
						
							| 
									
										
										
										
											2019-01-09 08:53:04 +08:00
										 |  |  | } | 
					
						
							| 
									
										
										
										
											2021-10-15 02:11:07 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | type nopReadCloser struct { | 
					
						
							|  |  |  | 	fn func() | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (n2 *nopReadCloser) Read(p []byte) (n int, err error) { | 
					
						
							|  |  |  | 	panic("should not be called") | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (n2 *nopReadCloser) Close() error { | 
					
						
							|  |  |  | 	if n2.fn != nil { | 
					
						
							|  |  |  | 		n2.fn() | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	n2.fn = nil | 
					
						
							|  |  |  | 	return nil | 
					
						
							|  |  |  | } |