mirror of https://github.com/minio/minio.git
				
				
				
			Revert all GetObjectNInfo related PRs (#6398)
* Revert "Encrypted reader wrapped in NewGetObjectReader should be closed (#6383)" This reverts commit53a0bbeb5b. * Revert "Change SelectAPI to use new GetObjectNInfo API (#6373)" This reverts commit5b05df215a. * Revert "Implement GetObjectNInfo object layer call (#6290)" This reverts commite6d740ce09.
This commit is contained in:
		
							parent
							
								
									fb27388101
								
							
						
					
					
						commit
						4487f70f08
					
				|  | @ -61,7 +61,7 @@ func encodeResponseJSON(response interface{}) []byte { | |||
| } | ||||
| 
 | ||||
| // Write object header
 | ||||
| func setObjectHeaders(w http.ResponseWriter, objInfo ObjectInfo, rs *HTTPRangeSpec) { | ||||
| func setObjectHeaders(w http.ResponseWriter, objInfo ObjectInfo, contentRange *httpRange) { | ||||
| 	// set common headers
 | ||||
| 	setCommonHeaders(w) | ||||
| 
 | ||||
|  | @ -96,9 +96,9 @@ func setObjectHeaders(w http.ResponseWriter, objInfo ObjectInfo, rs *HTTPRangeSp | |||
| 	} | ||||
| 
 | ||||
| 	// for providing ranged content
 | ||||
| 	if rs != nil { | ||||
| 	if contentRange != nil && contentRange.offsetBegin > -1 { | ||||
| 		// Override content-length
 | ||||
| 		w.Header().Set("Content-Length", strconv.FormatInt(rs.GetLength(objInfo.Size), 10)) | ||||
| 		w.Header().Set("Content-Range", rs.ContentRangeString(objInfo.Size)) | ||||
| 		w.Header().Set("Content-Length", strconv.FormatInt(contentRange.getLength(), 10)) | ||||
| 		w.Header().Set("Content-Range", contentRange.String()) | ||||
| 	} | ||||
| } | ||||
|  |  | |||
|  | @ -57,7 +57,6 @@ type cacheObjects struct { | |||
| 	// file path patterns to exclude from cache
 | ||||
| 	exclude []string | ||||
| 	// Object functions pointing to the corresponding functions of backend implementation.
 | ||||
| 	GetObjectNInfoFn          func(ctx context.Context, bucket, object string, rs *HTTPRangeSpec) (objInfo ObjectInfo, reader io.ReadCloser, err error) | ||||
| 	GetObjectFn               func(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string) (err error) | ||||
| 	GetObjectInfoFn           func(ctx context.Context, bucket, object string) (objInfo ObjectInfo, err error) | ||||
| 	PutObjectFn               func(ctx context.Context, bucket, object string, data *hash.Reader, metadata map[string]string) (objInfo ObjectInfo, err error) | ||||
|  | @ -89,7 +88,6 @@ type CacheObjectLayer interface { | |||
| 	ListBuckets(ctx context.Context) (buckets []BucketInfo, err error) | ||||
| 	DeleteBucket(ctx context.Context, bucket string) error | ||||
| 	// Object operations.
 | ||||
| 	GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec) (objInfo ObjectInfo, reader io.ReadCloser, err error) | ||||
| 	GetObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string) (err error) | ||||
| 	GetObjectInfo(ctx context.Context, bucket, object string) (objInfo ObjectInfo, err error) | ||||
| 	PutObject(ctx context.Context, bucket, object string, data *hash.Reader, metadata map[string]string) (objInfo ObjectInfo, err error) | ||||
|  | @ -177,75 +175,6 @@ func (c cacheObjects) getMetadata(objInfo ObjectInfo) map[string]string { | |||
| 	return metadata | ||||
| } | ||||
| 
 | ||||
| func (c cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec) (oi ObjectInfo, r io.ReadCloser, err error) { | ||||
| 
 | ||||
| 	bkObjInfo, bkReader, bkErr := c.GetObjectNInfoFn(ctx, bucket, object, rs) | ||||
| 
 | ||||
| 	if c.isCacheExclude(bucket, object) { | ||||
| 		return bkObjInfo, bkReader, bkErr | ||||
| 	} | ||||
| 
 | ||||
| 	// fetch cacheFSObjects if object is currently cached or nearest available cache drive
 | ||||
| 	dcache, err := c.cache.getCachedFSLoc(ctx, bucket, object) | ||||
| 	if err != nil { | ||||
| 		return bkObjInfo, bkReader, bkErr | ||||
| 	} | ||||
| 
 | ||||
| 	backendDown := backendDownError(bkErr) | ||||
| 	if bkErr != nil && !backendDown { | ||||
| 		if _, ok := err.(ObjectNotFound); ok { | ||||
| 			// Delete the cached entry if backend object was deleted.
 | ||||
| 			dcache.Delete(ctx, bucket, object) | ||||
| 		} | ||||
| 		return oi, r, bkErr | ||||
| 	} | ||||
| 
 | ||||
| 	if !backendDown && filterFromCache(bkObjInfo.UserDefined) { | ||||
| 		return bkObjInfo, bkReader, bkErr | ||||
| 	} | ||||
| 
 | ||||
| 	cacheObjInfo, cacheReader, cacheErr := dcache.GetObjectNInfo(ctx, bucket, object, rs) | ||||
| 	if cacheErr == nil { | ||||
| 		if backendDown { | ||||
| 			// If the backend is down, serve the request from cache.
 | ||||
| 			return cacheObjInfo, cacheReader, nil | ||||
| 		} | ||||
| 		if cacheObjInfo.ETag == bkObjInfo.ETag && !isStaleCache(bkObjInfo) { | ||||
| 			return cacheObjInfo, cacheReader, nil | ||||
| 		} | ||||
| 		dcache.Delete(ctx, bucket, object) | ||||
| 	} | ||||
| 
 | ||||
| 	if rs != nil { | ||||
| 		// We don't cache partial objects.
 | ||||
| 		return bkObjInfo, bkReader, bkErr | ||||
| 	} | ||||
| 	if !dcache.diskAvailable(bkObjInfo.Size * cacheSizeMultiplier) { | ||||
| 		// cache only objects < 1/100th of disk capacity
 | ||||
| 		return bkObjInfo, bkReader, bkErr | ||||
| 	} | ||||
| 
 | ||||
| 	// Initialize pipe.
 | ||||
| 	pipeReader, pipeWriter := io.Pipe() | ||||
| 	teeReader := io.TeeReader(bkReader, pipeWriter) | ||||
| 	hashReader, herr := hash.NewReader(pipeReader, bkObjInfo.Size, "", "") | ||||
| 	if err != nil { | ||||
| 		return oi, r, herr | ||||
| 	} | ||||
| 
 | ||||
| 	cleanupBackend := func() { bkReader.Close() } | ||||
| 	getObjReader := NewGetObjectReader(teeReader, nil, cleanupBackend) | ||||
| 
 | ||||
| 	go func() { | ||||
| 		putErr := dcache.Put(ctx, bucket, object, hashReader, c.getMetadata(bkObjInfo)) | ||||
| 		// close the write end of the pipe, so the error gets
 | ||||
| 		// propagated to getObjReader
 | ||||
| 		pipeWriter.CloseWithError(putErr) | ||||
| 	}() | ||||
| 
 | ||||
| 	return bkObjInfo, getObjReader, nil | ||||
| } | ||||
| 
 | ||||
| // Uses cached-object to serve the request. If object is not cached it serves the request from the backend and also
 | ||||
| // stores it in the cache for serving subsequent requests.
 | ||||
| func (c cacheObjects) GetObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string) (err error) { | ||||
|  |  | |||
|  | @ -59,10 +59,6 @@ func (api *DummyObjectLayer) ListObjectsV2(ctx context.Context, bucket, prefix, | |||
| 	return | ||||
| } | ||||
| 
 | ||||
| func (api *DummyObjectLayer) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec) (objInfo ObjectInfo, reader io.ReadCloser, err error) { | ||||
| 	return | ||||
| } | ||||
| 
 | ||||
| func (api *DummyObjectLayer) GetObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string) (err error) { | ||||
| 	return | ||||
| } | ||||
|  |  | |||
|  | @ -312,155 +312,6 @@ func newDecryptWriterWithObjectKey(client io.Writer, objectEncryptionKey []byte, | |||
| 	return writer, nil | ||||
| } | ||||
| 
 | ||||
| // Adding support for reader based interface
 | ||||
| 
 | ||||
| // DecryptRequestWithSequenceNumberR - same as
 | ||||
| // DecryptRequestWithSequenceNumber but with a reader
 | ||||
| func DecryptRequestWithSequenceNumberR(client io.Reader, r *http.Request, bucket, object string, seqNumber uint32, metadata map[string]string) (io.Reader, error) { | ||||
| 	if crypto.S3.IsEncrypted(metadata) { | ||||
| 		return newDecryptReader(client, nil, bucket, object, seqNumber, metadata) | ||||
| 	} | ||||
| 
 | ||||
| 	key, err := ParseSSECustomerRequest(r) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	delete(metadata, crypto.SSECKey) // make sure we do not save the key by accident
 | ||||
| 	return newDecryptReader(client, key, bucket, object, seqNumber, metadata) | ||||
| } | ||||
| 
 | ||||
| // DecryptCopyRequestR - same as DecryptCopyRequest, but with a
 | ||||
| // Reader
 | ||||
| func DecryptCopyRequestR(client io.Reader, r *http.Request, bucket, object string, metadata map[string]string) (io.Reader, error) { | ||||
| 	var ( | ||||
| 		key []byte | ||||
| 		err error | ||||
| 	) | ||||
| 	if crypto.SSECopy.IsRequested(r.Header) { | ||||
| 		key, err = ParseSSECopyCustomerRequest(r, metadata) | ||||
| 		if err != nil { | ||||
| 			return nil, err | ||||
| 		} | ||||
| 	} | ||||
| 	delete(metadata, crypto.SSECopyKey) // make sure we do not save the key by accident
 | ||||
| 	return newDecryptReader(client, key, bucket, object, 0, metadata) | ||||
| } | ||||
| 
 | ||||
| func newDecryptReader(client io.Reader, key []byte, bucket, object string, seqNumber uint32, metadata map[string]string) (io.Reader, error) { | ||||
| 	objectEncryptionKey, err := decryptObjectInfo(key, bucket, object, metadata) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	return newDecryptReaderWithObjectKey(client, objectEncryptionKey, seqNumber, metadata) | ||||
| } | ||||
| 
 | ||||
| func newDecryptReaderWithObjectKey(client io.Reader, objectEncryptionKey []byte, seqNumber uint32, metadata map[string]string) (io.Reader, error) { | ||||
| 	reader, err := sio.DecryptReader(client, sio.Config{ | ||||
| 		Key:            objectEncryptionKey, | ||||
| 		SequenceNumber: seqNumber, | ||||
| 	}) | ||||
| 	if err != nil { | ||||
| 		return nil, crypto.ErrInvalidCustomerKey | ||||
| 	} | ||||
| 	delete(metadata, crypto.SSEIV) | ||||
| 	delete(metadata, crypto.SSESealAlgorithm) | ||||
| 	delete(metadata, crypto.SSECSealedKey) | ||||
| 	delete(metadata, crypto.SSEMultipart) | ||||
| 	delete(metadata, crypto.S3SealedKey) | ||||
| 	delete(metadata, crypto.S3KMSSealedKey) | ||||
| 	delete(metadata, crypto.S3KMSKeyID) | ||||
| 	return reader, nil | ||||
| } | ||||
| 
 | ||||
| // DecryptBlocksRequestR - same as DecryptBlocksRequest but with a
 | ||||
| // reader
 | ||||
| func DecryptBlocksRequestR(client io.Reader, r *http.Request, bucket, object string, startOffset, length int64, objInfo ObjectInfo, copySource bool) (io.Reader, int64, int64, error) { | ||||
| 	var seqNumber uint32 | ||||
| 	var encStartOffset, encLength int64 | ||||
| 
 | ||||
| 	if len(objInfo.Parts) == 0 || !crypto.IsMultiPart(objInfo.UserDefined) { | ||||
| 		seqNumber, encStartOffset, encLength = getEncryptedSinglePartOffsetLength(startOffset, length, objInfo) | ||||
| 
 | ||||
| 		var reader io.Reader | ||||
| 		var err error | ||||
| 		if copySource { | ||||
| 			reader, err = DecryptCopyRequestR(client, r, bucket, object, objInfo.UserDefined) | ||||
| 		} else { | ||||
| 			reader, err = DecryptRequestWithSequenceNumberR(client, r, bucket, object, seqNumber, objInfo.UserDefined) | ||||
| 		} | ||||
| 		if err != nil { | ||||
| 			return nil, 0, 0, err | ||||
| 		} | ||||
| 		return reader, encStartOffset, encLength, nil | ||||
| 	} | ||||
| 
 | ||||
| 	seqNumber, encStartOffset, encLength = getEncryptedMultipartsOffsetLength(startOffset, length, objInfo) | ||||
| 	var partStartIndex int | ||||
| 	var partStartOffset = startOffset | ||||
| 	// Skip parts until final offset maps to a particular part offset.
 | ||||
| 	for i, part := range objInfo.Parts { | ||||
| 		decryptedSize, err := sio.DecryptedSize(uint64(part.Size)) | ||||
| 		if err != nil { | ||||
| 			return nil, -1, -1, errObjectTampered | ||||
| 		} | ||||
| 
 | ||||
| 		partStartIndex = i | ||||
| 
 | ||||
| 		// Offset is smaller than size we have reached the
 | ||||
| 		// proper part offset, break out we start from
 | ||||
| 		// this part index.
 | ||||
| 		if partStartOffset < int64(decryptedSize) { | ||||
| 			break | ||||
| 		} | ||||
| 
 | ||||
| 		// Continue to look for next part.
 | ||||
| 		partStartOffset -= int64(decryptedSize) | ||||
| 	} | ||||
| 
 | ||||
| 	startSeqNum := partStartOffset / sseDAREPackageBlockSize | ||||
| 	partEncRelOffset := int64(startSeqNum) * (sseDAREPackageBlockSize + sseDAREPackageMetaSize) | ||||
| 
 | ||||
| 	w := &DecryptBlocksReader{ | ||||
| 		reader:            client, | ||||
| 		startSeqNum:       uint32(startSeqNum), | ||||
| 		partEncRelOffset:  partEncRelOffset, | ||||
| 		parts:             objInfo.Parts, | ||||
| 		partIndex:         partStartIndex, | ||||
| 		req:               r, | ||||
| 		bucket:            bucket, | ||||
| 		object:            object, | ||||
| 		customerKeyHeader: r.Header.Get(crypto.SSECKey), | ||||
| 		copySource:        copySource, | ||||
| 	} | ||||
| 
 | ||||
| 	w.metadata = map[string]string{} | ||||
| 	// Copy encryption metadata for internal use.
 | ||||
| 	for k, v := range objInfo.UserDefined { | ||||
| 		w.metadata[k] = v | ||||
| 	} | ||||
| 
 | ||||
| 	// Purge all the encryption headers.
 | ||||
| 	delete(objInfo.UserDefined, crypto.SSEIV) | ||||
| 	delete(objInfo.UserDefined, crypto.SSESealAlgorithm) | ||||
| 	delete(objInfo.UserDefined, crypto.SSECSealedKey) | ||||
| 	delete(objInfo.UserDefined, crypto.SSEMultipart) | ||||
| 
 | ||||
| 	if crypto.S3.IsEncrypted(objInfo.UserDefined) { | ||||
| 		delete(objInfo.UserDefined, crypto.S3SealedKey) | ||||
| 		delete(objInfo.UserDefined, crypto.S3KMSKeyID) | ||||
| 		delete(objInfo.UserDefined, crypto.S3KMSSealedKey) | ||||
| 	} | ||||
| 	if w.copySource { | ||||
| 		w.customerKeyHeader = r.Header.Get(crypto.SSECopyKey) | ||||
| 	} | ||||
| 
 | ||||
| 	if err := w.buildDecrypter(w.parts[w.partIndex].Number); err != nil { | ||||
| 		return nil, 0, 0, err | ||||
| 	} | ||||
| 
 | ||||
| 	return w, encStartOffset, encLength, nil | ||||
| } | ||||
| 
 | ||||
| // DecryptRequestWithSequenceNumber decrypts the object with the client provided key. It also removes
 | ||||
| // the client-side-encryption metadata from the object and sets the correct headers.
 | ||||
| func DecryptRequestWithSequenceNumber(client io.Writer, r *http.Request, bucket, object string, seqNumber uint32, metadata map[string]string) (io.WriteCloser, error) { | ||||
|  | @ -482,123 +333,6 @@ func DecryptRequest(client io.Writer, r *http.Request, bucket, object string, me | |||
| 	return DecryptRequestWithSequenceNumber(client, r, bucket, object, 0, metadata) | ||||
| } | ||||
| 
 | ||||
| // DecryptBlocksReader - decrypts multipart parts, while implementing
 | ||||
| // a io.Reader compatible interface.
 | ||||
| type DecryptBlocksReader struct { | ||||
| 	// Source of the encrypted content that will be decrypted
 | ||||
| 	reader io.Reader | ||||
| 	// Current decrypter for the current encrypted data block
 | ||||
| 	decrypter io.Reader | ||||
| 	// Start sequence number
 | ||||
| 	startSeqNum uint32 | ||||
| 	// Current part index
 | ||||
| 	partIndex int | ||||
| 	// Parts information
 | ||||
| 	parts          []objectPartInfo | ||||
| 	req            *http.Request | ||||
| 	bucket, object string | ||||
| 	metadata       map[string]string | ||||
| 
 | ||||
| 	partEncRelOffset int64 | ||||
| 
 | ||||
| 	copySource bool | ||||
| 	// Customer Key
 | ||||
| 	customerKeyHeader string | ||||
| } | ||||
| 
 | ||||
| func (d *DecryptBlocksReader) buildDecrypter(partID int) error { | ||||
| 	m := make(map[string]string) | ||||
| 	for k, v := range d.metadata { | ||||
| 		m[k] = v | ||||
| 	} | ||||
| 	// Initialize the first decrypter; new decrypters will be
 | ||||
| 	// initialized in Read() operation as needed.
 | ||||
| 	var key []byte | ||||
| 	var err error | ||||
| 	if d.copySource { | ||||
| 		if crypto.SSEC.IsEncrypted(d.metadata) { | ||||
| 			d.req.Header.Set(crypto.SSECopyKey, d.customerKeyHeader) | ||||
| 			key, err = ParseSSECopyCustomerRequest(d.req, d.metadata) | ||||
| 		} | ||||
| 	} else { | ||||
| 		if crypto.SSEC.IsEncrypted(d.metadata) { | ||||
| 			d.req.Header.Set(crypto.SSECKey, d.customerKeyHeader) | ||||
| 			key, err = ParseSSECustomerRequest(d.req) | ||||
| 		} | ||||
| 	} | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 
 | ||||
| 	objectEncryptionKey, err := decryptObjectInfo(key, d.bucket, d.object, m) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 
 | ||||
| 	var partIDbin [4]byte | ||||
| 	binary.LittleEndian.PutUint32(partIDbin[:], uint32(partID)) // marshal part ID
 | ||||
| 
 | ||||
| 	mac := hmac.New(sha256.New, objectEncryptionKey) // derive part encryption key from part ID and object key
 | ||||
| 	mac.Write(partIDbin[:]) | ||||
| 	partEncryptionKey := mac.Sum(nil) | ||||
| 
 | ||||
| 	// make sure we do not save the key by accident
 | ||||
| 	if d.copySource { | ||||
| 		delete(m, crypto.SSECopyKey) | ||||
| 	} else { | ||||
| 		delete(m, crypto.SSECKey) | ||||
| 	} | ||||
| 
 | ||||
| 	// make sure to provide a NopCloser such that a Close
 | ||||
| 	// on sio.decryptWriter doesn't close the underlying writer's
 | ||||
| 	// close which perhaps can close the stream prematurely.
 | ||||
| 	decrypter, err := newDecryptReaderWithObjectKey(d.reader, partEncryptionKey, d.startSeqNum, m) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 
 | ||||
| 	d.decrypter = decrypter | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (d *DecryptBlocksReader) Read(p []byte) (int, error) { | ||||
| 	var err error | ||||
| 	var n1 int | ||||
| 	if int64(len(p)) < d.parts[d.partIndex].Size-d.partEncRelOffset { | ||||
| 		n1, err = d.decrypter.Read(p) | ||||
| 		if err != nil { | ||||
| 			return 0, err | ||||
| 		} | ||||
| 		d.partEncRelOffset += int64(n1) | ||||
| 	} else { | ||||
| 		n1, err = d.decrypter.Read(p[:d.parts[d.partIndex].Size-d.partEncRelOffset]) | ||||
| 		if err != nil { | ||||
| 			return 0, err | ||||
| 		} | ||||
| 
 | ||||
| 		// We should now proceed to next part, reset all
 | ||||
| 		// values appropriately.
 | ||||
| 		d.partEncRelOffset = 0 | ||||
| 		d.startSeqNum = 0 | ||||
| 
 | ||||
| 		d.partIndex++ | ||||
| 
 | ||||
| 		err = d.buildDecrypter(d.partIndex + 1) | ||||
| 		if err != nil { | ||||
| 			return 0, err | ||||
| 		} | ||||
| 
 | ||||
| 		n1, err = d.decrypter.Read(p[n1:]) | ||||
| 		if err != nil { | ||||
| 			return 0, err | ||||
| 		} | ||||
| 
 | ||||
| 		d.partEncRelOffset += int64(n1) | ||||
| 	} | ||||
| 
 | ||||
| 	return len(p), nil | ||||
| } | ||||
| 
 | ||||
| // DecryptBlocksWriter - decrypts multipart parts, while implementing a io.Writer compatible interface.
 | ||||
| type DecryptBlocksWriter struct { | ||||
| 	// Original writer where the plain data will be written
 | ||||
|  |  | |||
							
								
								
									
										91
									
								
								cmd/fs-v1.go
								
								
								
								
							
							
						
						
									
										91
									
								
								cmd/fs-v1.go
								
								
								
								
							|  | @ -17,7 +17,6 @@ | |||
| package cmd | ||||
| 
 | ||||
| import ( | ||||
| 	"bytes" | ||||
| 	"context" | ||||
| 	"encoding/hex" | ||||
| 	"io" | ||||
|  | @ -499,96 +498,6 @@ func (fs *FSObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBu | |||
| 	return objInfo, nil | ||||
| } | ||||
| 
 | ||||
| // GetObjectNInfo - returns object info and a reader for object
 | ||||
| // content.
 | ||||
| func (fs *FSObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec) (objInfo ObjectInfo, reader io.ReadCloser, err error) { | ||||
| 
 | ||||
| 	if err = checkGetObjArgs(ctx, bucket, object); err != nil { | ||||
| 		return objInfo, reader, err | ||||
| 	} | ||||
| 
 | ||||
| 	if _, err = fs.statBucketDir(ctx, bucket); err != nil { | ||||
| 		return objInfo, reader, toObjectErr(err, bucket) | ||||
| 	} | ||||
| 
 | ||||
| 	// Lock the object before reading.
 | ||||
| 	lock := fs.nsMutex.NewNSLock(bucket, object) | ||||
| 	if err = lock.GetRLock(globalObjectTimeout); err != nil { | ||||
| 		logger.LogIf(ctx, err) | ||||
| 		return objInfo, reader, err | ||||
| 	} | ||||
| 
 | ||||
| 	// For a directory, we need to send an empty body.
 | ||||
| 	if hasSuffix(object, slashSeparator) { | ||||
| 		// The lock taken above is released when
 | ||||
| 		// objReader.Close() is called by the caller.
 | ||||
| 		objReader := NewGetObjectReader(bytes.NewBuffer(nil), lock, nil) | ||||
| 		return objInfo, objReader, nil | ||||
| 	} | ||||
| 
 | ||||
| 	// Otherwise we get the object info
 | ||||
| 	objInfo, err = fs.getObjectInfo(ctx, bucket, object) | ||||
| 	err = toObjectErr(err, bucket, object) | ||||
| 	if err != nil { | ||||
| 		lock.RUnlock() | ||||
| 		return objInfo, nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	// Take a rwPool lock for NFS gateway type deployment
 | ||||
| 	var cleanUp func() | ||||
| 	if bucket != minioMetaBucket { | ||||
| 		fsMetaPath := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix, bucket, object, fs.metaJSONFile) | ||||
| 		_, err = fs.rwPool.Open(fsMetaPath) | ||||
| 		if err != nil && err != errFileNotFound { | ||||
| 			logger.LogIf(ctx, err) | ||||
| 			lock.RUnlock() | ||||
| 			return objInfo, nil, toObjectErr(err, bucket, object) | ||||
| 		} | ||||
| 		cleanUp = func() { | ||||
| 			// Need to clean up lock after getObject is
 | ||||
| 			// completed.
 | ||||
| 			fs.rwPool.Close(fsMetaPath) | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	offset, length := int64(0), objInfo.Size | ||||
| 	if rs != nil { | ||||
| 		offset, length = rs.GetOffsetLength(objInfo.Size) | ||||
| 	} | ||||
| 
 | ||||
| 	// Read the object, doesn't exist returns an s3 compatible error.
 | ||||
| 	fsObjPath := pathJoin(fs.fsPath, bucket, object) | ||||
| 	reader, size, err := fsOpenFile(ctx, fsObjPath, offset) | ||||
| 	if err != nil { | ||||
| 		lock.RUnlock() | ||||
| 		cleanUp() | ||||
| 		return objInfo, nil, toObjectErr(err, bucket, object) | ||||
| 	} | ||||
| 
 | ||||
| 	bufSize := int64(readSizeV1) | ||||
| 	if length > 0 && bufSize > length { | ||||
| 		bufSize = length | ||||
| 	} | ||||
| 
 | ||||
| 	// For negative length we read everything.
 | ||||
| 	if length < 0 { | ||||
| 		length = size - offset | ||||
| 	} | ||||
| 
 | ||||
| 	// Reply back invalid range if the input offset and length
 | ||||
| 	// fall out of range.
 | ||||
| 	if offset > size || offset+length > size { | ||||
| 		err = InvalidRange{offset, length, size} | ||||
| 		logger.LogIf(ctx, err) | ||||
| 		lock.RUnlock() | ||||
| 		cleanUp() | ||||
| 		return objInfo, nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	objReader := NewGetObjectReader(io.LimitReader(reader, length), lock, cleanUp) | ||||
| 	return objInfo, objReader, nil | ||||
| } | ||||
| 
 | ||||
| // GetObject - reads an object from the disk.
 | ||||
| // Supports additional parameters like offset and length
 | ||||
| // which are synonymous with HTTP Range requests.
 | ||||
|  |  | |||
|  | @ -615,27 +615,6 @@ func (a *azureObjects) ListObjectsV2(ctx context.Context, bucket, prefix, contin | |||
| 	return result, nil | ||||
| } | ||||
| 
 | ||||
| func (a *azureObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *minio.HTTPRangeSpec) (objInfo minio.ObjectInfo, reader io.ReadCloser, err error) { | ||||
| 	objInfo, err = a.GetObjectInfo(ctx, bucket, object) | ||||
| 	if err != nil { | ||||
| 		return objInfo, reader, err | ||||
| 	} | ||||
| 
 | ||||
| 	startOffset, length := int64(0), objInfo.Size | ||||
| 	if rs != nil { | ||||
| 		startOffset, length = rs.GetOffsetLength(objInfo.Size) | ||||
| 	} | ||||
| 
 | ||||
| 	pr, pw := io.Pipe() | ||||
| 	objReader := minio.NewGetObjectReader(pr, nil, nil) | ||||
| 	go func() { | ||||
| 		err := a.GetObject(ctx, bucket, object, startOffset, length, pw, objInfo.ETag) | ||||
| 		pw.CloseWithError(err) | ||||
| 	}() | ||||
| 
 | ||||
| 	return objInfo, objReader, nil | ||||
| } | ||||
| 
 | ||||
| // GetObject - reads an object from azure. Supports additional
 | ||||
| // parameters like offset and length which are synonymous with
 | ||||
| // HTTP Range requests.
 | ||||
|  |  | |||
|  | @ -394,27 +394,6 @@ func (l *b2Objects) ListObjectsV2(ctx context.Context, bucket, prefix, continuat | |||
| 	return loi, nil | ||||
| } | ||||
| 
 | ||||
| func (l *b2Objects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *minio.HTTPRangeSpec) (objInfo minio.ObjectInfo, reader io.ReadCloser, err error) { | ||||
| 	objInfo, err = l.GetObjectInfo(ctx, bucket, object) | ||||
| 	if err != nil { | ||||
| 		return objInfo, reader, err | ||||
| 	} | ||||
| 
 | ||||
| 	startOffset, length := int64(0), objInfo.Size | ||||
| 	if rs != nil { | ||||
| 		startOffset, length = rs.GetOffsetLength(objInfo.Size) | ||||
| 	} | ||||
| 
 | ||||
| 	pr, pw := io.Pipe() | ||||
| 	objReader := minio.NewGetObjectReader(pr, nil, nil) | ||||
| 	go func() { | ||||
| 		err := l.GetObject(ctx, bucket, object, startOffset, length, pw, objInfo.ETag) | ||||
| 		pw.CloseWithError(err) | ||||
| 	}() | ||||
| 
 | ||||
| 	return objInfo, objReader, nil | ||||
| } | ||||
| 
 | ||||
| // GetObject reads an object from B2. Supports additional
 | ||||
| // parameters like offset and length which are synonymous with
 | ||||
| // HTTP Range requests.
 | ||||
|  |  | |||
|  | @ -732,27 +732,6 @@ func (l *gcsGateway) ListObjectsV2(ctx context.Context, bucket, prefix, continua | |||
| 	}, nil | ||||
| } | ||||
| 
 | ||||
| func (l *gcsGateway) GetObjectNInfo(ctx context.Context, bucket, object string, rs *minio.HTTPRangeSpec) (objInfo minio.ObjectInfo, reader io.ReadCloser, err error) { | ||||
| 	objInfo, err = l.GetObjectInfo(ctx, bucket, object) | ||||
| 	if err != nil { | ||||
| 		return objInfo, reader, err | ||||
| 	} | ||||
| 
 | ||||
| 	startOffset, length := int64(0), objInfo.Size | ||||
| 	if rs != nil { | ||||
| 		startOffset, length = rs.GetOffsetLength(objInfo.Size) | ||||
| 	} | ||||
| 
 | ||||
| 	pr, pw := io.Pipe() | ||||
| 	objReader := minio.NewGetObjectReader(pr, nil, nil) | ||||
| 	go func() { | ||||
| 		err := l.GetObject(ctx, bucket, object, startOffset, length, pw, objInfo.ETag) | ||||
| 		pw.CloseWithError(err) | ||||
| 	}() | ||||
| 
 | ||||
| 	return objInfo, objReader, nil | ||||
| } | ||||
| 
 | ||||
| // GetObject - reads an object from GCS. Supports additional
 | ||||
| // parameters like offset and length which are synonymous with
 | ||||
| // HTTP Range requests.
 | ||||
|  |  | |||
|  | @ -497,27 +497,6 @@ func (t *tritonObjects) ListObjectsV2(ctx context.Context, bucket, prefix, conti | |||
| 	return result, nil | ||||
| } | ||||
| 
 | ||||
| func (t *tritonObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *minio.HTTPRangeSpec) (objInfo minio.ObjectInfo, reader io.ReadCloser, err error) { | ||||
| 	objInfo, err = t.GetObjectInfo(ctx, bucket, object) | ||||
| 	if err != nil { | ||||
| 		return objInfo, reader, err | ||||
| 	} | ||||
| 
 | ||||
| 	startOffset, length := int64(0), objInfo.Size | ||||
| 	if rs != nil { | ||||
| 		startOffset, length = rs.GetOffsetLength(objInfo.Size) | ||||
| 	} | ||||
| 
 | ||||
| 	pr, pw := io.Pipe() | ||||
| 	objReader := minio.NewGetObjectReader(pr, nil, nil) | ||||
| 	go func() { | ||||
| 		err := t.GetObject(ctx, bucket, object, startOffset, length, pw, objInfo.ETag) | ||||
| 		pw.CloseWithError(err) | ||||
| 	}() | ||||
| 
 | ||||
| 	return objInfo, objReader, nil | ||||
| } | ||||
| 
 | ||||
| // GetObject - Reads an object from Manta. Supports additional parameters like
 | ||||
| // offset and length which are synonymous with HTTP Range requests.
 | ||||
| //
 | ||||
|  |  | |||
|  | @ -69,7 +69,7 @@ ENVIRONMENT VARIABLES: | |||
| 
 | ||||
|   DOMAIN: | ||||
|      MINIO_DOMAIN: To enable virtual-host-style requests, set this value to Minio host domain name. | ||||
| 
 | ||||
| 	 | ||||
|   CACHE: | ||||
|      MINIO_CACHE_DRIVES: List of mounted drives or directories delimited by ";". | ||||
|      MINIO_CACHE_EXCLUDE: List of cache exclusion patterns delimited by ";". | ||||
|  | @ -547,27 +547,6 @@ func ossGetObject(ctx context.Context, client *oss.Client, bucket, key string, s | |||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (l *ossObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *minio.HTTPRangeSpec) (objInfo minio.ObjectInfo, reader io.ReadCloser, err error) { | ||||
| 	objInfo, err = l.GetObjectInfo(ctx, bucket, object) | ||||
| 	if err != nil { | ||||
| 		return objInfo, reader, err | ||||
| 	} | ||||
| 
 | ||||
| 	startOffset, length := int64(0), objInfo.Size | ||||
| 	if rs != nil { | ||||
| 		startOffset, length = rs.GetOffsetLength(objInfo.Size) | ||||
| 	} | ||||
| 
 | ||||
| 	pr, pw := io.Pipe() | ||||
| 	objReader := minio.NewGetObjectReader(pr, nil, nil) | ||||
| 	go func() { | ||||
| 		err := l.GetObject(ctx, bucket, object, startOffset, length, pw, objInfo.ETag) | ||||
| 		pw.CloseWithError(err) | ||||
| 	}() | ||||
| 
 | ||||
| 	return objInfo, objReader, nil | ||||
| } | ||||
| 
 | ||||
| // GetObject reads an object on OSS. Supports additional
 | ||||
| // parameters like offset and length which are synonymous with
 | ||||
| // HTTP Range requests.
 | ||||
|  |  | |||
|  | @ -301,27 +301,6 @@ func (l *s3Objects) ListObjectsV2(ctx context.Context, bucket, prefix, continuat | |||
| 	return minio.FromMinioClientListBucketV2Result(bucket, result), nil | ||||
| } | ||||
| 
 | ||||
| func (l *s3Objects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *minio.HTTPRangeSpec) (objInfo minio.ObjectInfo, reader io.ReadCloser, err error) { | ||||
| 	objInfo, err = l.GetObjectInfo(ctx, bucket, object) | ||||
| 	if err != nil { | ||||
| 		return objInfo, reader, err | ||||
| 	} | ||||
| 
 | ||||
| 	startOffset, length := int64(0), objInfo.Size | ||||
| 	if rs != nil { | ||||
| 		startOffset, length = rs.GetOffsetLength(objInfo.Size) | ||||
| 	} | ||||
| 
 | ||||
| 	pr, pw := io.Pipe() | ||||
| 	objReader := minio.NewGetObjectReader(pr, nil, nil) | ||||
| 	go func() { | ||||
| 		err := l.GetObject(ctx, bucket, object, startOffset, length, pw, objInfo.ETag) | ||||
| 		pw.CloseWithError(err) | ||||
| 	}() | ||||
| 
 | ||||
| 	return objInfo, objReader, nil | ||||
| } | ||||
| 
 | ||||
| // GetObject reads an object from S3. Supports additional
 | ||||
| // parameters like offset and length which are synonymous with
 | ||||
| // HTTP Range requests.
 | ||||
|  | @ -334,9 +313,6 @@ func (l *s3Objects) GetObject(ctx context.Context, bucket string, key string, st | |||
| 	} | ||||
| 
 | ||||
| 	opts := miniogo.GetObjectOptions{} | ||||
| 	if etag != "" { | ||||
| 		opts.SetMatchETag(etag) | ||||
| 	} | ||||
| 	if startOffset >= 0 && length >= 0 { | ||||
| 		if err := opts.SetRange(startOffset, startOffset+length-1); err != nil { | ||||
| 			logger.LogIf(ctx, err) | ||||
|  |  | |||
|  | @ -431,27 +431,6 @@ func (s *siaObjects) ListObjects(ctx context.Context, bucket string, prefix stri | |||
| 	return loi, nil | ||||
| } | ||||
| 
 | ||||
| func (s *siaObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *minio.HTTPRangeSpec) (objInfo minio.ObjectInfo, reader io.ReadCloser, err error) { | ||||
| 	objInfo, err = s.GetObjectInfo(ctx, bucket, object) | ||||
| 	if err != nil { | ||||
| 		return objInfo, reader, err | ||||
| 	} | ||||
| 
 | ||||
| 	startOffset, length := int64(0), objInfo.Size | ||||
| 	if rs != nil { | ||||
| 		startOffset, length = rs.GetOffsetLength(objInfo.Size) | ||||
| 	} | ||||
| 
 | ||||
| 	pr, pw := io.Pipe() | ||||
| 	objReader := minio.NewGetObjectReader(pr, nil, nil) | ||||
| 	go func() { | ||||
| 		err := s.GetObject(ctx, bucket, object, startOffset, length, pw, objInfo.ETag) | ||||
| 		pw.CloseWithError(err) | ||||
| 	}() | ||||
| 
 | ||||
| 	return objInfo, objReader, nil | ||||
| } | ||||
| 
 | ||||
| func (s *siaObjects) GetObject(ctx context.Context, bucket string, object string, startOffset int64, length int64, writer io.Writer, etag string) error { | ||||
| 	dstFile := path.Join(s.TempDir, minio.MustGetUUID()) | ||||
| 	defer os.Remove(dstFile) | ||||
|  |  | |||
							
								
								
									
										105
									
								
								cmd/httprange.go
								
								
								
								
							
							
						
						
									
										105
									
								
								cmd/httprange.go
								
								
								
								
							|  | @ -133,108 +133,3 @@ func parseRequestRange(rangeString string, resourceSize int64) (hrange *httpRang | |||
| 
 | ||||
| 	return &httpRange{offsetBegin, offsetEnd, resourceSize}, nil | ||||
| } | ||||
| 
 | ||||
| // HTTPRangeSpec represents a range specification as supported by S3 GET
 | ||||
| // object request.
 | ||||
| //
 | ||||
| // Case 1: Not present -> represented by a nil RangeSpec
 | ||||
| // Case 2: bytes=1-10 (absolute start and end offsets) -> RangeSpec{false, 1, 10}
 | ||||
| // Case 3: bytes=10- (absolute start offset with end offset unspecified) -> RangeSpec{false, 10, -1}
 | ||||
| // Case 4: bytes=-30 (suffix length specification) -> RangeSpec{true, -30, -1}
 | ||||
| type HTTPRangeSpec struct { | ||||
| 	// Does the range spec refer to a suffix of the object?
 | ||||
| 	IsSuffixLength bool | ||||
| 
 | ||||
| 	// Start and end offset specified in range spec
 | ||||
| 	Start, End int64 | ||||
| } | ||||
| 
 | ||||
| // ContentRangeString populate range stringer interface
 | ||||
| func (h *HTTPRangeSpec) ContentRangeString(resourceSize int64) string { | ||||
| 	start, rangeLength := h.GetOffsetLength(resourceSize) | ||||
| 	return fmt.Sprintf("bytes %d-%d/%d", start, start+rangeLength-1, resourceSize) | ||||
| } | ||||
| 
 | ||||
| // GetLength - get length of range
 | ||||
| func (h *HTTPRangeSpec) GetLength(resourceSize int64) int64 { | ||||
| 	switch { | ||||
| 	case h.IsSuffixLength: | ||||
| 		specifiedLen := -h.Start | ||||
| 		if specifiedLen > resourceSize { | ||||
| 			specifiedLen = resourceSize | ||||
| 		} | ||||
| 		return specifiedLen | ||||
| 	case h.End > -1: | ||||
| 		end := h.End | ||||
| 		if resourceSize < end { | ||||
| 			end = resourceSize - 1 | ||||
| 		} | ||||
| 		return end - h.Start + 1 | ||||
| 	default: | ||||
| 		return resourceSize - h.Start | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| // GetOffsetLength computes the start offset and length of the range
 | ||||
| // given the size of the resource
 | ||||
| func (h *HTTPRangeSpec) GetOffsetLength(resourceSize int64) (start int64, length int64) { | ||||
| 	length = h.GetLength(resourceSize) | ||||
| 	start = h.Start | ||||
| 	if h.IsSuffixLength { | ||||
| 		start = resourceSize + h.Start | ||||
| 	} | ||||
| 	return | ||||
| } | ||||
| 
 | ||||
| // Parses a range header value into a HTTPRangeSpec
 | ||||
| func parseRequestRangeSpec(rangeString string) (hrange *HTTPRangeSpec, err error) { | ||||
| 	// Return error if given range string doesn't start with byte range prefix.
 | ||||
| 	if !strings.HasPrefix(rangeString, byteRangePrefix) { | ||||
| 		return nil, fmt.Errorf("'%s' does not start with '%s'", rangeString, byteRangePrefix) | ||||
| 	} | ||||
| 
 | ||||
| 	// Trim byte range prefix.
 | ||||
| 	byteRangeString := strings.TrimPrefix(rangeString, byteRangePrefix) | ||||
| 
 | ||||
| 	// Check if range string contains delimiter '-', else return error. eg. "bytes=8"
 | ||||
| 	sepIndex := strings.Index(byteRangeString, "-") | ||||
| 	if sepIndex == -1 { | ||||
| 		return nil, fmt.Errorf("'%s' does not have a valid range value", rangeString) | ||||
| 	} | ||||
| 
 | ||||
| 	offsetBeginString := byteRangeString[:sepIndex] | ||||
| 	offsetBegin := int64(-1) | ||||
| 	// Convert offsetBeginString only if its not empty.
 | ||||
| 	if len(offsetBeginString) > 0 { | ||||
| 		if offsetBegin, err = strconv.ParseInt(offsetBeginString, 10, 64); err != nil { | ||||
| 			return nil, fmt.Errorf("'%s' does not have a valid first byte position value", rangeString) | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	offsetEndString := byteRangeString[sepIndex+1:] | ||||
| 	offsetEnd := int64(-1) | ||||
| 	// Convert offsetEndString only if its not empty.
 | ||||
| 	if len(offsetEndString) > 0 { | ||||
| 		if offsetEnd, err = strconv.ParseInt(offsetEndString, 10, 64); err != nil { | ||||
| 			return nil, fmt.Errorf("'%s' does not have a valid last byte position value", rangeString) | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	switch { | ||||
| 	case offsetBegin > -1 && offsetEnd > -1: | ||||
| 		if offsetBegin > offsetEnd { | ||||
| 			return nil, errInvalidRange | ||||
| 		} | ||||
| 		return &HTTPRangeSpec{false, offsetBegin, offsetEnd}, nil | ||||
| 	case offsetBegin > -1: | ||||
| 		return &HTTPRangeSpec{false, offsetBegin, -1}, nil | ||||
| 	case offsetEnd > -1: | ||||
| 		if offsetEnd == 0 { | ||||
| 			return nil, errInvalidRange | ||||
| 		} | ||||
| 		return &HTTPRangeSpec{true, -offsetEnd, -1}, nil | ||||
| 	default: | ||||
| 		// rangeString contains first and last byte positions missing. eg. "bytes=-"
 | ||||
| 		return nil, fmt.Errorf("'%s' does not have valid range value", rangeString) | ||||
| 	} | ||||
| } | ||||
|  |  | |||
|  | @ -40,9 +40,6 @@ type ObjectLayer interface { | |||
| 	ListObjectsV2(ctx context.Context, bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (result ListObjectsV2Info, err error) | ||||
| 
 | ||||
| 	// Object operations.
 | ||||
| 
 | ||||
| 	GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec) (objInfo ObjectInfo, reader io.ReadCloser, err error) | ||||
| 
 | ||||
| 	GetObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string) (err error) | ||||
| 	GetObjectInfo(ctx context.Context, bucket, object string) (objInfo ObjectInfo, err error) | ||||
| 	PutObject(ctx context.Context, bucket, object string, data *hash.Reader, metadata map[string]string) (objInfo ObjectInfo, err error) | ||||
|  |  | |||
|  | @ -20,12 +20,10 @@ import ( | |||
| 	"context" | ||||
| 	"encoding/hex" | ||||
| 	"fmt" | ||||
| 	"io" | ||||
| 	"math/rand" | ||||
| 	"path" | ||||
| 	"runtime" | ||||
| 	"strings" | ||||
| 	"sync" | ||||
| 	"time" | ||||
| 	"unicode/utf8" | ||||
| 
 | ||||
|  | @ -304,56 +302,3 @@ type byBucketName []BucketInfo | |||
| func (d byBucketName) Len() int           { return len(d) } | ||||
| func (d byBucketName) Swap(i, j int)      { d[i], d[j] = d[j], d[i] } | ||||
| func (d byBucketName) Less(i, j int) bool { return d[i].Name < d[j].Name } | ||||
| 
 | ||||
| // GetObjectReader is a type that wraps a reader with a lock to
 | ||||
| // provide a ReadCloser interface that unlocks on Close()
 | ||||
| type GetObjectReader struct { | ||||
| 	lock RWLocker | ||||
| 	pr   io.Reader | ||||
| 
 | ||||
| 	// register any clean up actions (happens before unlocking)
 | ||||
| 	cleanUp func() | ||||
| 
 | ||||
| 	once sync.Once | ||||
| } | ||||
| 
 | ||||
| // NewGetObjectReader creates a new GetObjectReader. The cleanUp
 | ||||
| // action is called on Close() before the lock is unlocked.
 | ||||
| func NewGetObjectReader(reader io.Reader, lock RWLocker, cleanUp func()) io.ReadCloser { | ||||
| 	return &GetObjectReader{ | ||||
| 		lock:    lock, | ||||
| 		pr:      reader, | ||||
| 		cleanUp: cleanUp, | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| // Close - calls the cleanup action if provided, and *then* unlocks
 | ||||
| // the object. Calling Close multiple times is safe.
 | ||||
| func (g *GetObjectReader) Close() error { | ||||
| 	// sync.Once is used here to ensure that Close() is
 | ||||
| 	// idempotent.
 | ||||
| 	g.once.Do(func() { | ||||
| 		// Unlocking is defer-red - this ensures that
 | ||||
| 		// unlocking happens even if cleanUp panics.
 | ||||
| 		defer func() { | ||||
| 			if g.lock != nil { | ||||
| 				g.lock.RUnlock() | ||||
| 			} | ||||
| 		}() | ||||
| 		if g.cleanUp != nil { | ||||
| 			g.cleanUp() | ||||
| 		} | ||||
| 	}) | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| // Read - to implement Reader interface.
 | ||||
| func (g *GetObjectReader) Read(p []byte) (n int, err error) { | ||||
| 	n, err = g.pr.Read(p) | ||||
| 	if err != nil { | ||||
| 		// Calling code may not Close() in case of error, so
 | ||||
| 		// we ensure it.
 | ||||
| 		g.Close() | ||||
| 	} | ||||
| 	return | ||||
| } | ||||
|  |  | |||
|  | @ -73,6 +73,10 @@ func setHeadGetRespHeaders(w http.ResponseWriter, reqParams url.Values) { | |||
| // also specify a data serialization format (JSON, CSV) of the object.
 | ||||
| func (api objectAPIHandlers) SelectObjectContentHandler(w http.ResponseWriter, r *http.Request) { | ||||
| 	ctx := newContext(r, w, "SelectObject") | ||||
| 	var object, bucket string | ||||
| 	vars := mux.Vars(r) | ||||
| 	bucket = vars["bucket"] | ||||
| 	object = vars["object"] | ||||
| 
 | ||||
| 	// Fetch object stat info.
 | ||||
| 	objectAPI := api.ObjectAPI() | ||||
|  | @ -81,43 +85,27 @@ func (api objectAPIHandlers) SelectObjectContentHandler(w http.ResponseWriter, r | |||
| 		return | ||||
| 	} | ||||
| 
 | ||||
| 	if crypto.S3.IsRequested(r.Header) || crypto.S3KMS.IsRequested(r.Header) { // If SSE-S3 or SSE-KMS present -> AWS fails with undefined error
 | ||||
| 		writeErrorResponse(w, ErrBadRequest, r.URL) | ||||
| 		return | ||||
| 	getObjectInfo := objectAPI.GetObjectInfo | ||||
| 	if api.CacheAPI() != nil { | ||||
| 		getObjectInfo = api.CacheAPI().GetObjectInfo | ||||
| 	} | ||||
| 
 | ||||
| 	vars := mux.Vars(r) | ||||
| 	bucket := vars["bucket"] | ||||
| 	object := vars["object"] | ||||
| 
 | ||||
| 	// Check for auth type to return S3 compatible error.
 | ||||
| 	// type to return the correct error (NoSuchKey vs AccessDenied)
 | ||||
| 	if s3Error := checkRequestAuthType(ctx, r, policy.GetObjectAction, bucket, object); s3Error != ErrNone { | ||||
| 		if getRequestAuthType(r) == authTypeAnonymous { | ||||
| 			// As per "Permission" section in
 | ||||
| 			// https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectGET.html
 | ||||
| 			// If the object you request does not exist,
 | ||||
| 			// the error Amazon S3 returns depends on
 | ||||
| 			// whether you also have the s3:ListBucket
 | ||||
| 			// permission.
 | ||||
| 			// * If you have the s3:ListBucket permission
 | ||||
| 			//   on the bucket, Amazon S3 will return an
 | ||||
| 			//   HTTP status code 404 ("no such key")
 | ||||
| 			//   error.
 | ||||
| 			// * if you don’t have the s3:ListBucket
 | ||||
| 			//   permission, Amazon S3 will return an HTTP
 | ||||
| 			//   status code 403 ("access denied") error.`
 | ||||
| 			// https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectGET.html If
 | ||||
| 			// the object you request does not exist, the error Amazon S3 returns
 | ||||
| 			// depends on whether you also have the s3:ListBucket permission. * If you
 | ||||
| 			// have the s3:ListBucket permission on the bucket, Amazon S3 will return
 | ||||
| 			// an HTTP status code 404 ("no such key") error. * if you don’t have the
 | ||||
| 			// s3:ListBucket permission, Amazon S3 will return an HTTP status code 403
 | ||||
| 			// ("access denied") error.`
 | ||||
| 			if globalPolicySys.IsAllowed(policy.Args{ | ||||
| 				Action:          policy.ListBucketAction, | ||||
| 				BucketName:      bucket, | ||||
| 				ConditionValues: getConditionValues(r, ""), | ||||
| 				IsOwner:         false, | ||||
| 			}) { | ||||
| 				getObjectInfo := objectAPI.GetObjectInfo | ||||
| 				if api.CacheAPI() != nil { | ||||
| 					getObjectInfo = api.CacheAPI().GetObjectInfo | ||||
| 				} | ||||
| 
 | ||||
| 				_, err := getObjectInfo(ctx, bucket, object) | ||||
| 				if toAPIErrorCode(err) == ErrNoSuchKey { | ||||
| 					s3Error = ErrNoSuchKey | ||||
|  | @ -127,57 +115,27 @@ func (api objectAPIHandlers) SelectObjectContentHandler(w http.ResponseWriter, r | |||
| 		writeErrorResponse(w, s3Error, r.URL) | ||||
| 		return | ||||
| 	} | ||||
| 
 | ||||
| 	// Get request range.
 | ||||
| 	rangeHeader := r.Header.Get("Range") | ||||
| 	if rangeHeader != "" { | ||||
| 		writeErrorResponse(w, ErrUnsupportedRangeHeader, r.URL) | ||||
| 		return | ||||
| 	} | ||||
| 
 | ||||
| 	if r.ContentLength <= 0 { | ||||
| 		writeErrorResponse(w, ErrEmptyRequestBody, r.URL) | ||||
| 		return | ||||
| 	} | ||||
| 
 | ||||
| 	var selectReq ObjectSelectRequest | ||||
| 	if err := xmlDecoder(r.Body, &selectReq, r.ContentLength); err != nil { | ||||
| 		writeErrorResponse(w, ErrMalformedXML, r.URL) | ||||
| 		return | ||||
| 	} | ||||
| 
 | ||||
| 	if !strings.EqualFold(string(selectReq.ExpressionType), "SQL") { | ||||
| 		writeErrorResponse(w, ErrInvalidExpressionType, r.URL) | ||||
| 		return | ||||
| 	} | ||||
| 	if len(selectReq.Expression) >= s3select.MaxExpressionLength { | ||||
| 		writeErrorResponse(w, ErrExpressionTooLong, r.URL) | ||||
| 		return | ||||
| 	} | ||||
| 	if selectReq.InputSerialization.CSV.FileHeaderInfo != CSVFileHeaderInfoUse && | ||||
| 		selectReq.InputSerialization.CSV.FileHeaderInfo != CSVFileHeaderInfoNone && | ||||
| 		selectReq.InputSerialization.CSV.FileHeaderInfo != CSVFileHeaderInfoIgnore && | ||||
| 		selectReq.InputSerialization.CSV.FileHeaderInfo != "" { | ||||
| 		writeErrorResponse(w, ErrInvalidFileHeaderInfo, r.URL) | ||||
| 		return | ||||
| 	} | ||||
| 	if selectReq.OutputSerialization.CSV.QuoteFields != CSVQuoteFieldsAlways && | ||||
| 		selectReq.OutputSerialization.CSV.QuoteFields != CSVQuoteFieldsAsNeeded && | ||||
| 		selectReq.OutputSerialization.CSV.QuoteFields != "" { | ||||
| 		writeErrorResponse(w, ErrInvalidQuoteFields, r.URL) | ||||
| 		return | ||||
| 	} | ||||
| 
 | ||||
| 	getObjectNInfo := objectAPI.GetObjectNInfo | ||||
| 	if api.CacheAPI() != nil { | ||||
| 		getObjectNInfo = api.CacheAPI().GetObjectNInfo | ||||
| 	} | ||||
| 
 | ||||
| 	objInfo, reader, err := getObjectNInfo(ctx, bucket, object, nil) | ||||
| 	objInfo, err := getObjectInfo(ctx, bucket, object) | ||||
| 	if err != nil { | ||||
| 		writeErrorResponse(w, toAPIErrorCode(err), r.URL) | ||||
| 		return | ||||
| 	} | ||||
| 	// Get request range.
 | ||||
| 	rangeHeader := r.Header.Get("Range") | ||||
| 	if rangeHeader != "" { | ||||
| 		writeErrorResponse(w, ErrUnsupportedRangeHeader, r.URL) | ||||
| 		return | ||||
| 	} | ||||
| 
 | ||||
| 	if selectReq.InputSerialization.CompressionType == SelectCompressionGZIP { | ||||
| 		if !strings.Contains(objInfo.ContentType, "gzip") { | ||||
|  | @ -199,51 +157,63 @@ func (api objectAPIHandlers) SelectObjectContentHandler(w http.ResponseWriter, r | |||
| 			return | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	// If object is encrypted, we avoid the cache layer.
 | ||||
| 	if crypto.IsEncrypted(objInfo.UserDefined) && api.CacheAPI() != nil { | ||||
| 		// Close the existing reader before re-querying the backend
 | ||||
| 		if reader != nil { | ||||
| 			reader.Close() | ||||
| 		} | ||||
| 		// Query the backend
 | ||||
| 		objInfo, reader, err = objectAPI.GetObjectNInfo(ctx, bucket, object, nil) | ||||
| 		if err != nil { | ||||
| 			writeErrorResponse(w, toAPIErrorCode(err), r.URL) | ||||
| 			return | ||||
| 		} | ||||
| 	if !strings.EqualFold(string(selectReq.ExpressionType), "SQL") { | ||||
| 		writeErrorResponse(w, ErrInvalidExpressionType, r.URL) | ||||
| 		return | ||||
| 	} | ||||
| 	if len(selectReq.Expression) >= s3select.MaxExpressionLength { | ||||
| 		writeErrorResponse(w, ErrExpressionTooLong, r.URL) | ||||
| 		return | ||||
| 	} | ||||
| 	if selectReq.InputSerialization.CSV.FileHeaderInfo != CSVFileHeaderInfoUse && | ||||
| 		selectReq.InputSerialization.CSV.FileHeaderInfo != CSVFileHeaderInfoNone && | ||||
| 		selectReq.InputSerialization.CSV.FileHeaderInfo != CSVFileHeaderInfoIgnore && | ||||
| 		selectReq.InputSerialization.CSV.FileHeaderInfo != "" { | ||||
| 		writeErrorResponse(w, ErrInvalidFileHeaderInfo, r.URL) | ||||
| 		return | ||||
| 	} | ||||
| 	if selectReq.OutputSerialization.CSV.QuoteFields != CSVQuoteFieldsAlways && | ||||
| 		selectReq.OutputSerialization.CSV.QuoteFields != CSVQuoteFieldsAsNeeded && | ||||
| 		selectReq.OutputSerialization.CSV.QuoteFields != "" { | ||||
| 		writeErrorResponse(w, ErrInvalidQuoteFields, r.URL) | ||||
| 		return | ||||
| 	} | ||||
| 	defer reader.Close() | ||||
| 
 | ||||
| 	startOffset, length := int64(0), objInfo.Size | ||||
| 	getObject := objectAPI.GetObject | ||||
| 	if api.CacheAPI() != nil && !crypto.SSEC.IsRequested(r.Header) { | ||||
| 		getObject = api.CacheAPI().GetObject | ||||
| 	} | ||||
| 
 | ||||
| 	reader, pipewriter := io.Pipe() | ||||
| 
 | ||||
| 	// Get the object.
 | ||||
| 	var startOffset int64 | ||||
| 	length := objInfo.Size | ||||
| 
 | ||||
| 	var writer io.Writer | ||||
| 	writer = pipewriter | ||||
| 	if objectAPI.IsEncryptionSupported() { | ||||
| 		s3Encrypted := crypto.IsEncrypted(objInfo.UserDefined) | ||||
| 		if s3Encrypted { | ||||
| 			var encReader io.Reader | ||||
| 			encReader, startOffset, length, err = DecryptBlocksRequestR(reader, r, bucket, object, startOffset, length, objInfo, false) | ||||
| 		if crypto.SSEC.IsRequested(r.Header) { | ||||
| 			// Response writer should be limited early on for decryption upto required length,
 | ||||
| 			// additionally also skipping mod(offset)64KiB boundaries.
 | ||||
| 			writer = ioutil.LimitedWriter(writer, startOffset%(64*1024), length) | ||||
| 
 | ||||
| 			writer, startOffset, length, err = DecryptBlocksRequest(writer, r, bucket, | ||||
| 				object, startOffset, length, objInfo, false) | ||||
| 			if err != nil { | ||||
| 				writeErrorResponse(w, toAPIErrorCode(err), r.URL) | ||||
| 				return | ||||
| 			} | ||||
| 			// Resulting reader should be limited early on
 | ||||
| 			// for decryption upto required length,
 | ||||
| 			// additionally also skipping mod(offset)64KiB
 | ||||
| 			// boundaries.
 | ||||
| 			encReader = io.LimitReader(ioutil.NewSkipReader(encReader, startOffset%(64*1024)), length) | ||||
| 			cleanUp := func() { reader.Close() } | ||||
| 			reader = NewGetObjectReader(encReader, nil, cleanUp) | ||||
| 			if reader != nil { | ||||
| 				defer reader.Close() | ||||
| 			} | ||||
| 			if s3Encrypted { | ||||
| 				w.Header().Set(crypto.SSEHeader, crypto.SSEAlgorithmAES256) | ||||
| 			} else { | ||||
| 				w.Header().Set(crypto.SSECAlgorithm, r.Header.Get(crypto.SSECAlgorithm)) | ||||
| 				w.Header().Set(crypto.SSECKeyMD5, r.Header.Get(crypto.SSECKeyMD5)) | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| 	go func() { | ||||
| 		defer reader.Close() | ||||
| 		if gerr := getObject(ctx, bucket, object, 0, objInfo.Size, writer, objInfo.ETag); gerr != nil { | ||||
| 			pipewriter.CloseWithError(gerr) | ||||
| 			return | ||||
| 		} | ||||
| 		pipewriter.Close() // Close writer explicitly signaling we wrote all data.
 | ||||
| 	}() | ||||
| 
 | ||||
| 	//s3select //Options
 | ||||
| 	if selectReq.OutputSerialization.CSV.FieldDelimiter == "" { | ||||
|  | @ -302,34 +272,23 @@ func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Req | |||
| 	bucket := vars["bucket"] | ||||
| 	object := vars["object"] | ||||
| 
 | ||||
| 	// Check for auth type to return S3 compatible error.
 | ||||
| 	// type to return the correct error (NoSuchKey vs AccessDenied)
 | ||||
| 	getObjectInfo := objectAPI.GetObjectInfo | ||||
| 	if api.CacheAPI() != nil { | ||||
| 		getObjectInfo = api.CacheAPI().GetObjectInfo | ||||
| 	} | ||||
| 
 | ||||
| 	if s3Error := checkRequestAuthType(ctx, r, policy.GetObjectAction, bucket, object); s3Error != ErrNone { | ||||
| 		if getRequestAuthType(r) == authTypeAnonymous { | ||||
| 			// As per "Permission" section in
 | ||||
| 			// https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectGET.html
 | ||||
| 			// If the object you request does not exist,
 | ||||
| 			// the error Amazon S3 returns depends on
 | ||||
| 			// whether you also have the s3:ListBucket
 | ||||
| 			// permission.
 | ||||
| 			// * If you have the s3:ListBucket permission
 | ||||
| 			//   on the bucket, Amazon S3 will return an
 | ||||
| 			//   HTTP status code 404 ("no such key")
 | ||||
| 			//   error.
 | ||||
| 			// * if you don’t have the s3:ListBucket
 | ||||
| 			//   permission, Amazon S3 will return an HTTP
 | ||||
| 			//   status code 403 ("access denied") error.`
 | ||||
| 			// As per "Permission" section in https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectGET.html
 | ||||
| 			// If the object you request does not exist, the error Amazon S3 returns depends on whether you also have the s3:ListBucket permission.
 | ||||
| 			// * If you have the s3:ListBucket permission on the bucket, Amazon S3 will return an HTTP status code 404 ("no such key") error.
 | ||||
| 			// * if you don’t have the s3:ListBucket permission, Amazon S3 will return an HTTP status code 403 ("access denied") error.`
 | ||||
| 			if globalPolicySys.IsAllowed(policy.Args{ | ||||
| 				Action:          policy.ListBucketAction, | ||||
| 				BucketName:      bucket, | ||||
| 				ConditionValues: getConditionValues(r, ""), | ||||
| 				IsOwner:         false, | ||||
| 			}) { | ||||
| 				getObjectInfo := objectAPI.GetObjectInfo | ||||
| 				if api.CacheAPI() != nil { | ||||
| 					getObjectInfo = api.CacheAPI().GetObjectInfo | ||||
| 				} | ||||
| 
 | ||||
| 				_, err := getObjectInfo(ctx, bucket, object) | ||||
| 				if toAPIErrorCode(err) == ErrNoSuchKey { | ||||
| 					s3Error = ErrNoSuchKey | ||||
|  | @ -340,20 +299,26 @@ func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Req | |||
| 		return | ||||
| 	} | ||||
| 
 | ||||
| 	getObjectNInfo := objectAPI.GetObjectNInfo | ||||
| 	if api.CacheAPI() != nil { | ||||
| 		getObjectNInfo = api.CacheAPI().GetObjectNInfo | ||||
| 	objInfo, err := getObjectInfo(ctx, bucket, object) | ||||
| 	if err != nil { | ||||
| 		writeErrorResponse(w, toAPIErrorCode(err), r.URL) | ||||
| 		return | ||||
| 	} | ||||
| 
 | ||||
| 	if objectAPI.IsEncryptionSupported() { | ||||
| 		if _, err = DecryptObjectInfo(&objInfo, r.Header); err != nil { | ||||
| 			writeErrorResponse(w, toAPIErrorCode(err), r.URL) | ||||
| 			return | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	// Get request range.
 | ||||
| 	var rs *HTTPRangeSpec | ||||
| 	var hrange *httpRange | ||||
| 	rangeHeader := r.Header.Get("Range") | ||||
| 	if rangeHeader != "" { | ||||
| 		var err error | ||||
| 		if rs, err = parseRequestRangeSpec(rangeHeader); err != nil { | ||||
| 			// Handle only errInvalidRange. Ignore other
 | ||||
| 			// parse error and treat it as regular Get
 | ||||
| 			// request like Amazon S3.
 | ||||
| 		if hrange, err = parseRequestRange(rangeHeader, objInfo.Size); err != nil { | ||||
| 			// Handle only errInvalidRange
 | ||||
| 			// Ignore other parse error and treat it as regular Get request like Amazon S3.
 | ||||
| 			if err == errInvalidRange { | ||||
| 				writeErrorResponse(w, ErrInvalidRange, r.URL) | ||||
| 				return | ||||
|  | @ -364,64 +329,33 @@ func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Req | |||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	objInfo, reader, err := getObjectNInfo(ctx, bucket, object, rs) | ||||
| 	if err != nil { | ||||
| 		writeErrorResponse(w, toAPIErrorCode(err), r.URL) | ||||
| 		return | ||||
| 	} | ||||
| 
 | ||||
| 	// If object is encrypted, we avoid the cache layer.
 | ||||
| 	if crypto.IsEncrypted(objInfo.UserDefined) && api.CacheAPI() != nil { | ||||
| 		// Close the existing reader before re-querying the backend
 | ||||
| 		if reader != nil { | ||||
| 			reader.Close() | ||||
| 		} | ||||
| 		// Query the backend
 | ||||
| 		objInfo, reader, err = objectAPI.GetObjectNInfo(ctx, bucket, object, rs) | ||||
| 		if err != nil { | ||||
| 			writeErrorResponse(w, toAPIErrorCode(err), r.URL) | ||||
| 			return | ||||
| 		} | ||||
| 	} | ||||
| 	defer reader.Close() | ||||
| 
 | ||||
| 	if objectAPI.IsEncryptionSupported() { | ||||
| 		if _, err = DecryptObjectInfo(&objInfo, r.Header); err != nil { | ||||
| 			writeErrorResponse(w, toAPIErrorCode(err), r.URL) | ||||
| 			return | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	// Validate pre-conditions if any.
 | ||||
| 	if checkPreconditions(w, r, objInfo) { | ||||
| 		return | ||||
| 	} | ||||
| 
 | ||||
| 	startOffset, length := int64(0), objInfo.Size | ||||
| 	if rs != nil { | ||||
| 		startOffset, length = rs.GetOffsetLength(objInfo.Size) | ||||
| 	// Get the object.
 | ||||
| 	var startOffset int64 | ||||
| 	length := objInfo.Size | ||||
| 	if hrange != nil { | ||||
| 		startOffset = hrange.offsetBegin | ||||
| 		length = hrange.getLength() | ||||
| 	} | ||||
| 
 | ||||
| 	// Get the object.
 | ||||
| 	var writer io.Writer | ||||
| 	writer = w | ||||
| 	if objectAPI.IsEncryptionSupported() { | ||||
| 		s3Encrypted := crypto.IsEncrypted(objInfo.UserDefined) | ||||
| 		if s3Encrypted { | ||||
| 			var encReader io.Reader | ||||
| 			encReader, startOffset, length, err = DecryptBlocksRequestR(reader, r, bucket, object, startOffset, length, objInfo, false) | ||||
| 		s3Encrypted := crypto.S3.IsEncrypted(objInfo.UserDefined) | ||||
| 		if crypto.SSEC.IsRequested(r.Header) || s3Encrypted { | ||||
| 			// Response writer should be limited early on for decryption upto required length,
 | ||||
| 			// additionally also skipping mod(offset)64KiB boundaries.
 | ||||
| 			writer = ioutil.LimitedWriter(writer, startOffset%(64*1024), length) | ||||
| 
 | ||||
| 			writer, startOffset, length, err = DecryptBlocksRequest(writer, r, bucket, object, startOffset, length, objInfo, false) | ||||
| 			if err != nil { | ||||
| 				writeErrorResponse(w, toAPIErrorCode(err), r.URL) | ||||
| 				return | ||||
| 			} | ||||
| 			// Resulting reader should be limited early on
 | ||||
| 			// for decryption upto required length,
 | ||||
| 			// additionally also skipping mod(offset)64KiB
 | ||||
| 			// boundaries.
 | ||||
| 			encReader = io.LimitReader(ioutil.NewSkipReader(encReader, startOffset%(64*1024)), length) | ||||
| 			cleanUp := func() { reader.Close() } | ||||
| 			reader = NewGetObjectReader(encReader, nil, cleanUp) | ||||
| 			if reader != nil { | ||||
| 				defer reader.Close() | ||||
| 			} | ||||
| 			if s3Encrypted { | ||||
| 				w.Header().Set(crypto.SSEHeader, crypto.SSEAlgorithmAES256) | ||||
| 			} else { | ||||
|  | @ -431,19 +365,24 @@ func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Req | |||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	setObjectHeaders(w, objInfo, rs) | ||||
| 	setObjectHeaders(w, objInfo, hrange) | ||||
| 	setHeadGetRespHeaders(w, r.URL.Query()) | ||||
| 
 | ||||
| 	statusCodeWritten := false | ||||
| 	httpWriter := ioutil.WriteOnClose(w) | ||||
| 	getObject := objectAPI.GetObject | ||||
| 	if api.CacheAPI() != nil && !crypto.SSEC.IsRequested(r.Header) && !crypto.S3.IsEncrypted(objInfo.UserDefined) { | ||||
| 		getObject = api.CacheAPI().GetObject | ||||
| 	} | ||||
| 
 | ||||
| 	if rs != nil { | ||||
| 	statusCodeWritten := false | ||||
| 	httpWriter := ioutil.WriteOnClose(writer) | ||||
| 
 | ||||
| 	if hrange != nil && hrange.offsetBegin > -1 { | ||||
| 		statusCodeWritten = true | ||||
| 		w.WriteHeader(http.StatusPartialContent) | ||||
| 	} | ||||
| 
 | ||||
| 	// Write object content to response body
 | ||||
| 	if _, err = io.Copy(httpWriter, reader); err != nil { | ||||
| 	// Reads the object at startOffset and writes to mw.
 | ||||
| 	if err = getObject(ctx, bucket, object, startOffset, length, httpWriter, objInfo.ETag); err != nil { | ||||
| 		if !httpWriter.HasWritten() && !statusCodeWritten { // write error response only if no data or headers has been written to client yet
 | ||||
| 			writeErrorResponse(w, toAPIErrorCode(err), r.URL) | ||||
| 		} | ||||
|  |  | |||
|  | @ -550,11 +550,6 @@ func (s *xlSets) ListBuckets(ctx context.Context) (buckets []BucketInfo, err err | |||
| 
 | ||||
| // --- Object Operations ---
 | ||||
| 
 | ||||
| // GetObjectNInfo
 | ||||
| func (s *xlSets) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec) (objInfo ObjectInfo, reader io.ReadCloser, err error) { | ||||
| 	return s.getHashedSet(object).GetObjectNInfo(ctx, bucket, object, rs) | ||||
| } | ||||
| 
 | ||||
| // GetObject - reads an object from the hashedSet based on the object name.
 | ||||
| func (s *xlSets) GetObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string) error { | ||||
| 	return s.getHashedSet(object).GetObject(ctx, bucket, object, startOffset, length, writer, etag) | ||||
|  |  | |||
|  | @ -17,7 +17,6 @@ | |||
| package cmd | ||||
| 
 | ||||
| import ( | ||||
| 	"bytes" | ||||
| 	"context" | ||||
| 	"encoding/hex" | ||||
| 	"io" | ||||
|  | @ -163,54 +162,6 @@ func (xl xlObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBuc | |||
| 	return objInfo, nil | ||||
| } | ||||
| 
 | ||||
| func (xl xlObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec) (objInfo ObjectInfo, reader io.ReadCloser, err error) { | ||||
| 
 | ||||
| 	// Acquire lock
 | ||||
| 	lock := xl.nsMutex.NewNSLock(bucket, object) | ||||
| 	if err = lock.GetRLock(globalObjectTimeout); err != nil { | ||||
| 		return objInfo, nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	if err = checkGetObjArgs(ctx, bucket, object); err != nil { | ||||
| 		return objInfo, nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	if hasSuffix(object, slashSeparator) { | ||||
| 		if !xl.isObjectDir(bucket, object) { | ||||
| 			return objInfo, nil, toObjectErr(errFileNotFound, bucket, object) | ||||
| 		} | ||||
| 		var e error | ||||
| 		if objInfo, e = xl.getObjectInfoDir(ctx, bucket, object); e != nil { | ||||
| 			return objInfo, nil, toObjectErr(e, bucket, object) | ||||
| 		} | ||||
| 		objReader := NewGetObjectReader(bytes.NewReader(nil), lock, nil) | ||||
| 		return objInfo, objReader, nil | ||||
| 	} | ||||
| 
 | ||||
| 	objInfo, err = xl.getObjectInfo(ctx, bucket, object) | ||||
| 	if err != nil { | ||||
| 		return objInfo, nil, toObjectErr(err, bucket, object) | ||||
| 	} | ||||
| 
 | ||||
| 	startOffset, readLength := int64(0), objInfo.Size | ||||
| 	if rs != nil { | ||||
| 		startOffset, readLength = rs.GetOffsetLength(objInfo.Size) | ||||
| 	} | ||||
| 
 | ||||
| 	pr, pw := io.Pipe() | ||||
| 	objReader := NewGetObjectReader(pr, lock, nil) | ||||
| 	go func() { | ||||
| 		err := xl.getObject(ctx, bucket, object, startOffset, readLength, pw, "") | ||||
| 		if err != nil { | ||||
| 			pw.CloseWithError(err) | ||||
| 			return | ||||
| 		} | ||||
| 		pw.Close() | ||||
| 	}() | ||||
| 
 | ||||
| 	return objInfo, objReader, nil | ||||
| } | ||||
| 
 | ||||
| // GetObject - reads an object erasured coded across multiple
 | ||||
| // disks. Supports additional parameters like offset and length
 | ||||
| // which are synonymous with HTTP Range requests.
 | ||||
|  |  | |||
|  | @ -126,34 +126,3 @@ func (nopCloser) Close() error { return nil } | |||
| func NopCloser(w io.Writer) io.WriteCloser { | ||||
| 	return nopCloser{w} | ||||
| } | ||||
| 
 | ||||
| // SkipReader skips a given number of bytes and then returns all
 | ||||
| // remaining data.
 | ||||
| type SkipReader struct { | ||||
| 	io.Reader | ||||
| 
 | ||||
| 	skipCount int64 | ||||
| } | ||||
| 
 | ||||
| func (s *SkipReader) Read(p []byte) (int, error) { | ||||
| 	l := int64(len(p)) | ||||
| 	if l == 0 { | ||||
| 		return 0, nil | ||||
| 	} | ||||
| 	for s.skipCount > 0 { | ||||
| 		if l > s.skipCount { | ||||
| 			l = s.skipCount | ||||
| 		} | ||||
| 		n, err := s.Reader.Read(p[:l]) | ||||
| 		if err != nil { | ||||
| 			return 0, err | ||||
| 		} | ||||
| 		s.skipCount -= int64(n) | ||||
| 	} | ||||
| 	return s.Reader.Read(p) | ||||
| } | ||||
| 
 | ||||
| // NewSkipReader - creates a SkipReader
 | ||||
| func NewSkipReader(r io.Reader, n int64) io.Reader { | ||||
| 	return &SkipReader{r, n} | ||||
| } | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue