mirror of https://github.com/minio/minio.git
				
				
				
			
		
			
				
	
	
		
			241 lines
		
	
	
		
			7.7 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			241 lines
		
	
	
		
			7.7 KiB
		
	
	
	
		
			Go
		
	
	
	
| /*
 | |
|  * Minio Cloud Storage, (C) 2016 Minio, Inc.
 | |
|  *
 | |
|  * Licensed under the Apache License, Version 2.0 (the "License");
 | |
|  * you may not use this file except in compliance with the License.
 | |
|  * You may obtain a copy of the License at
 | |
|  *
 | |
|  *     http://www.apache.org/licenses/LICENSE-2.0
 | |
|  *
 | |
|  * Unless required by applicable law or agreed to in writing, software
 | |
|  * distributed under the License is distributed on an "AS IS" BASIS,
 | |
|  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
|  * See the License for the specific language governing permissions and
 | |
|  * limitations under the License.
 | |
|  */
 | |
| 
 | |
| package cmd
 | |
| 
 | |
| import (
 | |
| 	"errors"
 | |
| 	"io"
 | |
| 	"os"
 | |
| 	"reflect"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| )
 | |
| 
 | |
| // Error sent by appendParts go-routine when there are holes in parts.
 | |
| // For ex. let's say client uploads part-2 before part-1 in which case we
 | |
| // can not append and have to wait till part-1 is uploaded. Hence we return
 | |
| // this error. Currently this error is not used in the caller.
 | |
| var errPartsMissing = errors.New("required parts missing")
 | |
| 
 | |
| // Error sent when appendParts go-routine has waited long enough and timedout.
 | |
| var errAppendPartsTimeout = errors.New("appendParts go-routine timeout")
 | |
| 
 | |
| // Timeout value for the appendParts go-routine.
 | |
| var appendPartsTimeout = 24 * 60 * 60 * time.Second // 24 Hours.
 | |
| 
 | |
| // Holds a map of uploadID->appendParts go-routine
 | |
| type backgroundAppend struct {
 | |
| 	sync.Mutex
 | |
| 	infoMap    map[string]bgAppendPartsInfo
 | |
| 	appendFile io.WriteCloser
 | |
| }
 | |
| 
 | |
| // Input to the appendParts go-routine
 | |
| type bgAppendPartsInput struct {
 | |
| 	meta  fsMetaV1   // list of parts that need to be appended
 | |
| 	errCh chan error // error sent by appendParts go-routine
 | |
| }
 | |
| 
 | |
| // Identifies an appendParts go-routine.
 | |
| type bgAppendPartsInfo struct {
 | |
| 	inputCh    chan bgAppendPartsInput
 | |
| 	timeoutCh  chan struct{} // closed by appendParts go-routine when it timesout
 | |
| 	abortCh    chan struct{} // closed after abort of upload to end the appendParts go-routine
 | |
| 	completeCh chan struct{} // closed after complete of upload to end the appendParts go-routine
 | |
| }
 | |
| 
 | |
| // Called after a part is uploaded so that it can be appended in the background.
 | |
| func (fs fsObjects) append(bucket, object, uploadID string, meta fsMetaV1) chan error {
 | |
| 	fs.bgAppend.Lock()
 | |
| 	info, ok := fs.bgAppend.infoMap[uploadID]
 | |
| 	if !ok {
 | |
| 		// Corresponding appendParts go-routine was not found, create a new one. Would happen when the first
 | |
| 		// part of a multipart upload is uploaded.
 | |
| 		inputCh := make(chan bgAppendPartsInput)
 | |
| 		timeoutCh := make(chan struct{})
 | |
| 		abortCh := make(chan struct{})
 | |
| 		completeCh := make(chan struct{})
 | |
| 
 | |
| 		info = bgAppendPartsInfo{inputCh, timeoutCh, abortCh, completeCh}
 | |
| 		fs.bgAppend.infoMap[uploadID] = info
 | |
| 
 | |
| 		go fs.appendParts(bucket, object, uploadID, info)
 | |
| 	}
 | |
| 	fs.bgAppend.Unlock()
 | |
| 
 | |
| 	errCh := make(chan error)
 | |
| 	go func() {
 | |
| 		// send input in a goroutine as send on the inputCh can block if appendParts go-routine
 | |
| 		// is busy appending a part.
 | |
| 		select {
 | |
| 		case <-info.timeoutCh:
 | |
| 			// This is to handle a rare race condition where we found info in b.infoMap
 | |
| 			// but soon after that appendParts go-routine timed out.
 | |
| 			errCh <- errAppendPartsTimeout
 | |
| 		case info.inputCh <- bgAppendPartsInput{meta, errCh}:
 | |
| 		}
 | |
| 	}()
 | |
| 
 | |
| 	return errCh
 | |
| }
 | |
| 
 | |
| // Called on complete-multipart-upload. Returns nil if the required parts have been appended.
 | |
| func (fs *fsObjects) complete(bucket, object, uploadID string, meta fsMetaV1) error {
 | |
| 	fs.bgAppend.Lock()
 | |
| 	defer fs.bgAppend.Unlock()
 | |
| 
 | |
| 	info, ok := fs.bgAppend.infoMap[uploadID]
 | |
| 	delete(fs.bgAppend.infoMap, uploadID)
 | |
| 	if !ok {
 | |
| 		return errPartsMissing
 | |
| 	}
 | |
| 
 | |
| 	errCh := make(chan error)
 | |
| 
 | |
| 	select {
 | |
| 	case <-info.timeoutCh:
 | |
| 		// This is to handle a rare race condition where we found info in b.infoMap
 | |
| 		// but soon after that appendParts go-routine timedouted out.
 | |
| 		return errAppendPartsTimeout
 | |
| 	case info.inputCh <- bgAppendPartsInput{meta, errCh}:
 | |
| 	}
 | |
| 
 | |
| 	err := <-errCh
 | |
| 
 | |
| 	close(info.completeCh)
 | |
| 
 | |
| 	return err
 | |
| }
 | |
| 
 | |
| // Called after complete-multipart-upload or abort-multipart-upload so that the appendParts go-routine is not left dangling.
 | |
| func (fs fsObjects) abort(uploadID string) {
 | |
| 	fs.bgAppend.Lock()
 | |
| 	defer fs.bgAppend.Unlock()
 | |
| 
 | |
| 	info, ok := fs.bgAppend.infoMap[uploadID]
 | |
| 	if !ok {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	delete(fs.bgAppend.infoMap, uploadID)
 | |
| 
 | |
| 	info.abortCh <- struct{}{}
 | |
| }
 | |
| 
 | |
| // This is run as a go-routine that appends the parts in the background.
 | |
| func (fs fsObjects) appendParts(bucket, object, uploadID string, info bgAppendPartsInfo) {
 | |
| 	appendPath := pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID, uploadID)
 | |
| 	// Holds the list of parts that is already appended to the "append" file.
 | |
| 	appendMeta := fsMetaV1{}
 | |
| 
 | |
| 	// Allocate staging read buffer.
 | |
| 	buf := make([]byte, readSizeV1)
 | |
| 	for {
 | |
| 		select {
 | |
| 		case input := <-info.inputCh:
 | |
| 			// We receive on this channel when new part gets uploaded or when complete-multipart sends
 | |
| 			// a value on this channel to confirm if all the required parts are appended.
 | |
| 			meta := input.meta
 | |
| 
 | |
| 			for {
 | |
| 				// Append should be done such a way that if part-3 and part-2 is uploaded before part-1, we
 | |
| 				// wait till part-1 is uploaded after which we append part-2 and part-3 as well in this for-loop.
 | |
| 				part, appendNeeded := partToAppend(meta, appendMeta)
 | |
| 				if !appendNeeded {
 | |
| 					if reflect.DeepEqual(meta.Parts, appendMeta.Parts) {
 | |
| 						// Sending nil is useful so that the complete-multipart-upload knows that
 | |
| 						// all the required parts have been appended.
 | |
| 						input.errCh <- nil
 | |
| 					} else {
 | |
| 						// Sending error is useful so that complete-multipart-upload can fall-back to
 | |
| 						// its own append process.
 | |
| 						input.errCh <- errPartsMissing
 | |
| 					}
 | |
| 					break
 | |
| 				}
 | |
| 
 | |
| 				if err := fs.appendPart(bucket, object, uploadID, part, buf); err != nil {
 | |
| 					fsRemoveFile(appendPath)
 | |
| 					appendMeta.Parts = nil
 | |
| 					input.errCh <- err
 | |
| 					break
 | |
| 				}
 | |
| 
 | |
| 				appendMeta.AddObjectPart(part.Number, part.Name, part.ETag, part.Size)
 | |
| 			}
 | |
| 		case <-info.abortCh:
 | |
| 			// abort-multipart-upload closed abortCh to end the appendParts go-routine.
 | |
| 			fsRemoveFile(appendPath)
 | |
| 
 | |
| 			// So that any racing PutObjectPart does not leave a dangling go-routine.
 | |
| 			close(info.timeoutCh)
 | |
| 
 | |
| 			return
 | |
| 		case <-info.completeCh:
 | |
| 			// complete-multipart-upload closed completeCh to end the appendParts go-routine.
 | |
| 			close(info.timeoutCh) // So that any racing PutObjectPart does not leave a dangling go-routine.
 | |
| 			return
 | |
| 		case <-time.After(appendPartsTimeout):
 | |
| 			// Timeout the goroutine to garbage collect its resources. This would happen if the client initiates
 | |
| 			// a multipart upload and does not complete/abort it.
 | |
| 			fs.bgAppend.Lock()
 | |
| 			delete(fs.bgAppend.infoMap, uploadID)
 | |
| 			fs.bgAppend.Unlock()
 | |
| 
 | |
| 			// Delete the temporary append file as well.
 | |
| 			fsRemoveFile(appendPath)
 | |
| 
 | |
| 			close(info.timeoutCh)
 | |
| 			return
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Appends the "part" to the append-file inside "tmp/" that finally gets moved to the actual location
 | |
| // upon complete-multipart-upload.
 | |
| func (fs fsObjects) appendPart(bucket, object, uploadID string, part objectPartInfo, buf []byte) error {
 | |
| 	partPath := pathJoin(fs.fsPath, minioMetaMultipartBucket, bucket, object, uploadID, part.Name)
 | |
| 
 | |
| 	offset := int64(0)
 | |
| 	// Read each file part to start writing to the temporary concatenated object.
 | |
| 	file, size, err := fsOpenFile(partPath, offset)
 | |
| 	if err != nil {
 | |
| 		if err == errFileNotFound {
 | |
| 			return errPartsMissing
 | |
| 		}
 | |
| 		return err
 | |
| 	}
 | |
| 	defer file.Close()
 | |
| 
 | |
| 	tmpObjPath := pathJoin(fs.fsPath, minioMetaTmpBucket, fs.fsUUID, uploadID)
 | |
| 	// No need to hold a lock, this is a unique file and will be only written
 | |
| 	// to one one process per uploadID per minio process.
 | |
| 	wfile, err := os.OpenFile(preparePath(tmpObjPath), os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0666)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	defer wfile.Close()
 | |
| 
 | |
| 	// Fallocate more space as we concatenate.
 | |
| 	if err = fsFAllocate(int(wfile.Fd()), 0, size); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	_, err = io.CopyBuffer(wfile, file, buf)
 | |
| 	return err
 | |
| }
 |