mirror of https://github.com/minio/minio.git
				
				
				
			
		
			
				
	
	
		
			576 lines
		
	
	
		
			18 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			576 lines
		
	
	
		
			18 KiB
		
	
	
	
		
			Go
		
	
	
	
| package cmd
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"errors"
 | |
| 	"fmt"
 | |
| 	"io"
 | |
| 	"io/ioutil"
 | |
| 	"net/http"
 | |
| 	"os"
 | |
| 	"strings"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/djherbis/atime"
 | |
| 	"github.com/minio/minio/cmd/logger"
 | |
| 	"github.com/minio/minio/pkg/wildcard"
 | |
| )
 | |
| 
 | |
| const (
 | |
| 	cacheBlkSize = int64(1 * 1024 * 1024)
 | |
| )
 | |
| 
 | |
| // CacheStorageInfo - represents total, free capacity of
 | |
| // underlying cache storage.
 | |
| type CacheStorageInfo struct {
 | |
| 	Total uint64 // Total cache disk space.
 | |
| 	Free  uint64 // Free cache available space.
 | |
| }
 | |
| 
 | |
| // CacheObjectLayer implements primitives for cache object API layer.
 | |
| type CacheObjectLayer interface {
 | |
| 	// Object operations.
 | |
| 	GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error)
 | |
| 	GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error)
 | |
| 	DeleteObject(ctx context.Context, bucket, object string) error
 | |
| 	DeleteObjects(ctx context.Context, bucket string, objects []string) ([]error, error)
 | |
| 	PutObject(ctx context.Context, bucket, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error)
 | |
| 	// Storage operations.
 | |
| 	StorageInfo(ctx context.Context) CacheStorageInfo
 | |
| }
 | |
| 
 | |
| // Abstracts disk caching - used by the S3 layer
 | |
| type cacheObjects struct {
 | |
| 	// slice of cache drives
 | |
| 	cache []*diskCache
 | |
| 	// file path patterns to exclude from cache
 | |
| 	exclude []string
 | |
| 	// to manage cache namespace locks
 | |
| 	nsMutex *nsLockMap
 | |
| 
 | |
| 	// if true migration is in progress from v1 to v2
 | |
| 	migrating bool
 | |
| 	// mutex to protect migration bool
 | |
| 	migMutex sync.Mutex
 | |
| 
 | |
| 	// Object functions pointing to the corresponding functions of backend implementation.
 | |
| 	GetObjectNInfoFn func(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error)
 | |
| 	GetObjectInfoFn  func(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error)
 | |
| 	DeleteObjectFn   func(ctx context.Context, bucket, object string) error
 | |
| 	DeleteObjectsFn  func(ctx context.Context, bucket string, objects []string) ([]error, error)
 | |
| 	PutObjectFn      func(ctx context.Context, bucket, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error)
 | |
| }
 | |
| 
 | |
| func (c *cacheObjects) delete(ctx context.Context, dcache *diskCache, bucket, object string) (err error) {
 | |
| 	cLock := c.nsMutex.NewNSLock(ctx, bucket, object)
 | |
| 	if err := cLock.GetLock(globalObjectTimeout); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	defer cLock.Unlock()
 | |
| 	return dcache.Delete(ctx, bucket, object)
 | |
| }
 | |
| 
 | |
| func (c *cacheObjects) put(ctx context.Context, dcache *diskCache, bucket, object string, data io.Reader, size int64, opts ObjectOptions) error {
 | |
| 	cLock := c.nsMutex.NewNSLock(ctx, bucket, object)
 | |
| 	if err := cLock.GetLock(globalObjectTimeout); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	defer cLock.Unlock()
 | |
| 	return dcache.Put(ctx, bucket, object, data, size, opts)
 | |
| }
 | |
| 
 | |
| func (c *cacheObjects) get(ctx context.Context, dcache *diskCache, bucket, object string, rs *HTTPRangeSpec, h http.Header, opts ObjectOptions) (gr *GetObjectReader, err error) {
 | |
| 	cLock := c.nsMutex.NewNSLock(ctx, bucket, object)
 | |
| 	if err := cLock.GetRLock(globalObjectTimeout); err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	defer cLock.RUnlock()
 | |
| 	return dcache.Get(ctx, bucket, object, rs, h, opts)
 | |
| }
 | |
| 
 | |
| func (c *cacheObjects) stat(ctx context.Context, dcache *diskCache, bucket, object string) (oi ObjectInfo, err error) {
 | |
| 	cLock := c.nsMutex.NewNSLock(ctx, bucket, object)
 | |
| 	if err := cLock.GetRLock(globalObjectTimeout); err != nil {
 | |
| 		return oi, err
 | |
| 	}
 | |
| 
 | |
| 	defer cLock.RUnlock()
 | |
| 	return dcache.Stat(ctx, bucket, object)
 | |
| }
 | |
| 
 | |
| // DeleteObject clears cache entry if backend delete operation succeeds
 | |
| func (c *cacheObjects) DeleteObject(ctx context.Context, bucket, object string) (err error) {
 | |
| 	if err = c.DeleteObjectFn(ctx, bucket, object); err != nil {
 | |
| 		return
 | |
| 	}
 | |
| 	if c.isCacheExclude(bucket, object) || c.skipCache() {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	dcache, cerr := c.getCacheLoc(ctx, bucket, object)
 | |
| 	if cerr != nil {
 | |
| 		return
 | |
| 	}
 | |
| 	if dcache.Exists(ctx, bucket, object) {
 | |
| 		c.delete(ctx, dcache, bucket, object)
 | |
| 	}
 | |
| 	return
 | |
| }
 | |
| 
 | |
| // DeleteObjects batch deletes objects in slice, and clears any cached entries
 | |
| func (c *cacheObjects) DeleteObjects(ctx context.Context, bucket string, objects []string) ([]error, error) {
 | |
| 	errs := make([]error, len(objects))
 | |
| 	for idx, object := range objects {
 | |
| 		errs[idx] = c.DeleteObject(ctx, bucket, object)
 | |
| 	}
 | |
| 	return errs, nil
 | |
| }
 | |
| 
 | |
| // construct a metadata k-v map
 | |
| func getMetadata(objInfo ObjectInfo) map[string]string {
 | |
| 	metadata := make(map[string]string)
 | |
| 	metadata["etag"] = objInfo.ETag
 | |
| 	metadata["content-type"] = objInfo.ContentType
 | |
| 	if objInfo.ContentEncoding != "" {
 | |
| 		metadata["content-encoding"] = objInfo.ContentEncoding
 | |
| 	}
 | |
| 	if objInfo.Expires != timeSentinel {
 | |
| 		metadata["expires"] = objInfo.Expires.Format(http.TimeFormat)
 | |
| 	}
 | |
| 	for k, v := range objInfo.UserDefined {
 | |
| 		metadata[k] = v
 | |
| 	}
 | |
| 	return metadata
 | |
| }
 | |
| 
 | |
| func (c *cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error) {
 | |
| 	if c.isCacheExclude(bucket, object) || c.skipCache() {
 | |
| 		return c.GetObjectNInfoFn(ctx, bucket, object, rs, h, lockType, opts)
 | |
| 	}
 | |
| 	var cc cacheControl
 | |
| 
 | |
| 	// fetch diskCache if object is currently cached or nearest available cache drive
 | |
| 	dcache, err := c.getCacheToLoc(ctx, bucket, object)
 | |
| 	if err != nil {
 | |
| 		return c.GetObjectNInfoFn(ctx, bucket, object, rs, h, lockType, opts)
 | |
| 	}
 | |
| 
 | |
| 	cacheReader, cacheErr := c.get(ctx, dcache, bucket, object, rs, h, opts)
 | |
| 	if cacheErr == nil {
 | |
| 		cc = cacheControlOpts(cacheReader.ObjInfo)
 | |
| 		if !cc.isEmpty() && !cc.isStale(cacheReader.ObjInfo.ModTime) {
 | |
| 			return cacheReader, nil
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	objInfo, err := c.GetObjectInfoFn(ctx, bucket, object, opts)
 | |
| 	if backendDownError(err) && cacheErr == nil {
 | |
| 		return cacheReader, nil
 | |
| 	} else if err != nil {
 | |
| 		if _, ok := err.(ObjectNotFound); ok {
 | |
| 			if cacheErr == nil {
 | |
| 				cacheReader.Close()
 | |
| 				// Delete cached entry if backend object
 | |
| 				// was deleted.
 | |
| 				dcache.Delete(ctx, bucket, object)
 | |
| 			}
 | |
| 		}
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	if !objInfo.IsCacheable() {
 | |
| 		return c.GetObjectNInfoFn(ctx, bucket, object, rs, h, lockType, opts)
 | |
| 	}
 | |
| 
 | |
| 	if cacheErr == nil {
 | |
| 		// if ETag matches for stale cache entry, serve from cache
 | |
| 		if cacheReader.ObjInfo.ETag == objInfo.ETag {
 | |
| 			// Update metadata in case server-side copy might have changed object metadata
 | |
| 			dcache.updateMetadataIfChanged(ctx, bucket, object, objInfo, cacheReader.ObjInfo)
 | |
| 			return cacheReader, nil
 | |
| 		}
 | |
| 		cacheReader.Close()
 | |
| 		// Object is stale, so delete from cache
 | |
| 		c.delete(ctx, dcache, bucket, object)
 | |
| 	}
 | |
| 
 | |
| 	// Since we got here, we are serving the request from backend,
 | |
| 	// and also adding the object to the cache.
 | |
| 	if !dcache.diskUsageLow() {
 | |
| 		select {
 | |
| 		case dcache.purgeChan <- struct{}{}:
 | |
| 		default:
 | |
| 		}
 | |
| 	}
 | |
| 	if !dcache.diskAvailable(objInfo.Size) {
 | |
| 		return c.GetObjectNInfoFn(ctx, bucket, object, rs, h, lockType, opts)
 | |
| 	}
 | |
| 
 | |
| 	if rs != nil {
 | |
| 		go func() {
 | |
| 			// fill cache in the background for range GET requests
 | |
| 			bReader, bErr := c.GetObjectNInfoFn(ctx, bucket, object, nil, h, lockType, opts)
 | |
| 			if bErr != nil {
 | |
| 				return
 | |
| 			}
 | |
| 			defer bReader.Close()
 | |
| 			oi, err := c.stat(ctx, dcache, bucket, object)
 | |
| 			// avoid cache overwrite if another background routine filled cache
 | |
| 			if err != nil || oi.ETag != bReader.ObjInfo.ETag {
 | |
| 				c.put(ctx, dcache, bucket, object, bReader, bReader.ObjInfo.Size, ObjectOptions{UserDefined: getMetadata(bReader.ObjInfo)})
 | |
| 			}
 | |
| 		}()
 | |
| 		return c.GetObjectNInfoFn(ctx, bucket, object, rs, h, lockType, opts)
 | |
| 	}
 | |
| 
 | |
| 	bkReader, bkErr := c.GetObjectNInfoFn(ctx, bucket, object, rs, h, lockType, opts)
 | |
| 	if bkErr != nil {
 | |
| 		return nil, bkErr
 | |
| 	}
 | |
| 	// Initialize pipe.
 | |
| 	pipeReader, pipeWriter := io.Pipe()
 | |
| 	teeReader := io.TeeReader(bkReader, pipeWriter)
 | |
| 	go func() {
 | |
| 		putErr := c.put(ctx, dcache, bucket, object, io.LimitReader(pipeReader, bkReader.ObjInfo.Size), bkReader.ObjInfo.Size, ObjectOptions{UserDefined: getMetadata(bkReader.ObjInfo)})
 | |
| 		// close the write end of the pipe, so the error gets
 | |
| 		// propagated to getObjReader
 | |
| 		pipeWriter.CloseWithError(putErr)
 | |
| 	}()
 | |
| 	cleanupBackend := func() { bkReader.Close() }
 | |
| 	cleanupPipe := func() { pipeWriter.Close() }
 | |
| 	return NewGetObjectReaderFromReader(teeReader, bkReader.ObjInfo, opts.CheckCopyPrecondFn, cleanupBackend, cleanupPipe)
 | |
| }
 | |
| 
 | |
| // Returns ObjectInfo from cache if available.
 | |
| func (c *cacheObjects) GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error) {
 | |
| 	getObjectInfoFn := c.GetObjectInfoFn
 | |
| 
 | |
| 	if c.isCacheExclude(bucket, object) || c.skipCache() {
 | |
| 		return getObjectInfoFn(ctx, bucket, object, opts)
 | |
| 	}
 | |
| 
 | |
| 	// fetch diskCache if object is currently cached or nearest available cache drive
 | |
| 	dcache, err := c.getCacheToLoc(ctx, bucket, object)
 | |
| 	if err != nil {
 | |
| 		return getObjectInfoFn(ctx, bucket, object, opts)
 | |
| 	}
 | |
| 	var cc cacheControl
 | |
| 	// if cache control setting is valid, avoid HEAD operation to backend
 | |
| 	cachedObjInfo, cerr := c.stat(ctx, dcache, bucket, object)
 | |
| 	if cerr == nil {
 | |
| 		cc = cacheControlOpts(cachedObjInfo)
 | |
| 		if !cc.isEmpty() && !cc.isStale(cachedObjInfo.ModTime) {
 | |
| 			return cachedObjInfo, nil
 | |
| 		}
 | |
| 	}
 | |
| 	objInfo, err := getObjectInfoFn(ctx, bucket, object, opts)
 | |
| 	if err != nil {
 | |
| 		if _, ok := err.(ObjectNotFound); ok {
 | |
| 			// Delete the cached entry if backend object was deleted.
 | |
| 			c.delete(ctx, dcache, bucket, object)
 | |
| 			return ObjectInfo{}, err
 | |
| 		}
 | |
| 		if !backendDownError(err) {
 | |
| 			return ObjectInfo{}, err
 | |
| 		}
 | |
| 		if cerr == nil {
 | |
| 			return cachedObjInfo, nil
 | |
| 		}
 | |
| 		return ObjectInfo{}, BackendDown{}
 | |
| 	}
 | |
| 
 | |
| 	// when backend is up, do a sanity check on cached object
 | |
| 	if cerr != nil {
 | |
| 		return objInfo, nil
 | |
| 	}
 | |
| 	if cachedObjInfo.ETag != objInfo.ETag {
 | |
| 		// Delete the cached entry if the backend object was replaced.
 | |
| 		c.delete(ctx, dcache, bucket, object)
 | |
| 	}
 | |
| 	return objInfo, nil
 | |
| }
 | |
| 
 | |
| // StorageInfo - returns underlying storage statistics.
 | |
| func (c *cacheObjects) StorageInfo(ctx context.Context) (cInfo CacheStorageInfo) {
 | |
| 	var total, free uint64
 | |
| 	for _, cache := range c.cache {
 | |
| 		if cache == nil {
 | |
| 			continue
 | |
| 		}
 | |
| 		info, err := getDiskInfo(cache.dir)
 | |
| 		logger.GetReqInfo(ctx).AppendTags("cachePath", cache.dir)
 | |
| 		logger.LogIf(ctx, err)
 | |
| 		total += info.Total
 | |
| 		free += info.Free
 | |
| 	}
 | |
| 	return CacheStorageInfo{
 | |
| 		Total: total,
 | |
| 		Free:  free,
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // skipCache() returns true if cache migration is in progress
 | |
| func (c *cacheObjects) skipCache() bool {
 | |
| 	c.migMutex.Lock()
 | |
| 	defer c.migMutex.Unlock()
 | |
| 	return c.migrating
 | |
| }
 | |
| 
 | |
| // Returns true if object should be excluded from cache
 | |
| func (c *cacheObjects) isCacheExclude(bucket, object string) bool {
 | |
| 	// exclude directories from cache
 | |
| 	if strings.HasSuffix(object, SlashSeparator) {
 | |
| 		return true
 | |
| 	}
 | |
| 	for _, pattern := range c.exclude {
 | |
| 		matchStr := fmt.Sprintf("%s/%s", bucket, object)
 | |
| 		if ok := wildcard.MatchSimple(pattern, matchStr); ok {
 | |
| 			return true
 | |
| 		}
 | |
| 	}
 | |
| 	return false
 | |
| }
 | |
| 
 | |
| // choose a cache deterministically based on hash of bucket,object. The hash index is treated as
 | |
| // a hint. In the event that the cache drive at hash index is offline, treat the list of cache drives
 | |
| // as a circular buffer and walk through them starting at hash index until an online drive is found.
 | |
| func (c *cacheObjects) getCacheLoc(ctx context.Context, bucket, object string) (*diskCache, error) {
 | |
| 	index := c.hashIndex(bucket, object)
 | |
| 	numDisks := len(c.cache)
 | |
| 	for k := 0; k < numDisks; k++ {
 | |
| 		i := (index + k) % numDisks
 | |
| 		if c.cache[i] == nil {
 | |
| 			continue
 | |
| 		}
 | |
| 		if c.cache[i].IsOnline() {
 | |
| 			return c.cache[i], nil
 | |
| 		}
 | |
| 	}
 | |
| 	return nil, errDiskNotFound
 | |
| }
 | |
| 
 | |
| // get cache disk where object is currently cached for a GET operation. If object does not exist at that location,
 | |
| // treat the list of cache drives as a circular buffer and walk through them starting at hash index
 | |
| // until an online drive is found.If object is not found, fall back to the first online cache drive
 | |
| // closest to the hash index, so that object can be re-cached.
 | |
| func (c *cacheObjects) getCacheToLoc(ctx context.Context, bucket, object string) (*diskCache, error) {
 | |
| 	index := c.hashIndex(bucket, object)
 | |
| 
 | |
| 	numDisks := len(c.cache)
 | |
| 	// save first online cache disk closest to the hint index
 | |
| 	var firstOnlineDisk *diskCache
 | |
| 	for k := 0; k < numDisks; k++ {
 | |
| 		i := (index + k) % numDisks
 | |
| 		if c.cache[i] == nil {
 | |
| 			continue
 | |
| 		}
 | |
| 		if c.cache[i].IsOnline() {
 | |
| 			if firstOnlineDisk == nil {
 | |
| 				firstOnlineDisk = c.cache[i]
 | |
| 			}
 | |
| 			if c.cache[i].Exists(ctx, bucket, object) {
 | |
| 				return c.cache[i], nil
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if firstOnlineDisk != nil {
 | |
| 		return firstOnlineDisk, nil
 | |
| 	}
 | |
| 	return nil, errDiskNotFound
 | |
| }
 | |
| 
 | |
| // Compute a unique hash sum for bucket and object
 | |
| func (c *cacheObjects) hashIndex(bucket, object string) int {
 | |
| 	return crcHashMod(pathJoin(bucket, object), len(c.cache))
 | |
| }
 | |
| 
 | |
| // newCache initializes the cacheFSObjects for the "drives" specified in config.json
 | |
| // or the global env overrides.
 | |
| func newCache(config CacheConfig) ([]*diskCache, bool, error) {
 | |
| 	var caches []*diskCache
 | |
| 	ctx := logger.SetReqInfo(context.Background(), &logger.ReqInfo{})
 | |
| 	formats, migrating, err := loadAndValidateCacheFormat(ctx, config.Drives)
 | |
| 	if err != nil {
 | |
| 		return nil, false, err
 | |
| 	}
 | |
| 	for i, dir := range config.Drives {
 | |
| 		// skip diskCache creation for cache drives missing a format.json
 | |
| 		if formats[i] == nil {
 | |
| 			caches = append(caches, nil)
 | |
| 			continue
 | |
| 		}
 | |
| 		if err := checkAtimeSupport(dir); err != nil {
 | |
| 			return nil, false, errors.New("Atime support required for disk caching")
 | |
| 		}
 | |
| 
 | |
| 		cache, err := newdiskCache(dir, config.Expiry, config.MaxUse)
 | |
| 		if err != nil {
 | |
| 			return nil, false, err
 | |
| 		}
 | |
| 		// Start the purging go-routine for entries that have expired if no migration in progress
 | |
| 		if !migrating {
 | |
| 			go cache.purge()
 | |
| 		}
 | |
| 
 | |
| 		caches = append(caches, cache)
 | |
| 	}
 | |
| 	return caches, migrating, nil
 | |
| }
 | |
| 
 | |
| // Return error if Atime is disabled on the O/S
 | |
| func checkAtimeSupport(dir string) (err error) {
 | |
| 	file, err := ioutil.TempFile(dir, "prefix")
 | |
| 	if err != nil {
 | |
| 		return
 | |
| 	}
 | |
| 	defer os.Remove(file.Name())
 | |
| 	finfo1, err := os.Stat(file.Name())
 | |
| 	if err != nil {
 | |
| 		return
 | |
| 	}
 | |
| 	// add a sleep to ensure atime change is detected
 | |
| 	time.Sleep(10 * time.Millisecond)
 | |
| 
 | |
| 	if _, err = io.Copy(ioutil.Discard, file); err != nil {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	finfo2, err := os.Stat(file.Name())
 | |
| 
 | |
| 	if atime.Get(finfo2).Equal(atime.Get(finfo1)) {
 | |
| 		return errors.New("Atime not supported")
 | |
| 	}
 | |
| 	return
 | |
| }
 | |
| func (c *cacheObjects) migrateCacheFromV1toV2(ctx context.Context) {
 | |
| 	logStartupMessage(colorBlue("Cache migration initiated ...."))
 | |
| 
 | |
| 	var wg sync.WaitGroup
 | |
| 	errs := make([]error, len(c.cache))
 | |
| 	for i, dc := range c.cache {
 | |
| 		if dc == nil {
 | |
| 			continue
 | |
| 		}
 | |
| 		wg.Add(1)
 | |
| 		// start migration from V1 to V2
 | |
| 		go func(ctx context.Context, dc *diskCache, errs []error, idx int) {
 | |
| 			defer wg.Done()
 | |
| 			if err := migrateOldCache(ctx, dc); err != nil {
 | |
| 				errs[idx] = err
 | |
| 				logger.LogIf(ctx, err)
 | |
| 				return
 | |
| 			}
 | |
| 			// start purge routine after migration completes.
 | |
| 			go dc.purge()
 | |
| 		}(ctx, dc, errs, i)
 | |
| 	}
 | |
| 	wg.Wait()
 | |
| 
 | |
| 	errCnt := 0
 | |
| 	for _, err := range errs {
 | |
| 		if err != nil {
 | |
| 			errCnt++
 | |
| 		}
 | |
| 	}
 | |
| 	if errCnt > 0 {
 | |
| 		return
 | |
| 	}
 | |
| 	// update migration status
 | |
| 	c.migMutex.Lock()
 | |
| 	defer c.migMutex.Unlock()
 | |
| 	c.migrating = false
 | |
| 	logStartupMessage(colorBlue("Cache migration completed successfully."))
 | |
| }
 | |
| 
 | |
| // PutObject - caches the uploaded object for single Put operations
 | |
| func (c *cacheObjects) PutObject(ctx context.Context, bucket, object string, r *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) {
 | |
| 	putObjectFn := c.PutObjectFn
 | |
| 	dcache, err := c.getCacheToLoc(ctx, bucket, object)
 | |
| 	if err != nil {
 | |
| 		// disk cache could not be located,execute backend call.
 | |
| 		return putObjectFn(ctx, bucket, object, r, opts)
 | |
| 	}
 | |
| 	size := r.Size()
 | |
| 	if c.skipCache() {
 | |
| 		return putObjectFn(ctx, bucket, object, r, opts)
 | |
| 	}
 | |
| 
 | |
| 	// fetch from backend if there is no space on cache drive
 | |
| 	if !dcache.diskAvailable(size) {
 | |
| 		return putObjectFn(ctx, bucket, object, r, opts)
 | |
| 	}
 | |
| 	if opts.ServerSideEncryption != nil {
 | |
| 		return putObjectFn(ctx, bucket, object, r, opts)
 | |
| 	}
 | |
| 	// fetch from backend if cache exclude pattern or cache-control
 | |
| 	// directive set to exclude
 | |
| 	if c.isCacheExclude(bucket, object) {
 | |
| 		dcache.Delete(ctx, bucket, object)
 | |
| 		return putObjectFn(ctx, bucket, object, r, opts)
 | |
| 	}
 | |
| 
 | |
| 	objInfo, err = putObjectFn(ctx, bucket, object, r, opts)
 | |
| 
 | |
| 	if err == nil {
 | |
| 		go func() {
 | |
| 			// fill cache in the background
 | |
| 			bReader, bErr := c.GetObjectNInfoFn(ctx, bucket, object, nil, http.Header{}, readLock, ObjectOptions{})
 | |
| 			if bErr != nil {
 | |
| 				return
 | |
| 			}
 | |
| 			defer bReader.Close()
 | |
| 			oi, err := c.stat(ctx, dcache, bucket, object)
 | |
| 			// avoid cache overwrite if another background routine filled cache
 | |
| 			if err != nil || oi.ETag != bReader.ObjInfo.ETag {
 | |
| 				c.put(ctx, dcache, bucket, object, bReader, bReader.ObjInfo.Size, ObjectOptions{UserDefined: getMetadata(bReader.ObjInfo)})
 | |
| 			}
 | |
| 		}()
 | |
| 	}
 | |
| 
 | |
| 	return objInfo, err
 | |
| 
 | |
| }
 | |
| 
 | |
| // Returns cacheObjects for use by Server.
 | |
| func newServerCacheObjects(ctx context.Context, config CacheConfig) (CacheObjectLayer, error) {
 | |
| 	// list of disk caches for cache "drives" specified in config.json or MINIO_CACHE_DRIVES env var.
 | |
| 	cache, migrateSw, err := newCache(config)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	c := &cacheObjects{
 | |
| 		cache:     cache,
 | |
| 		exclude:   config.Exclude,
 | |
| 		nsMutex:   newNSLock(false),
 | |
| 		migrating: migrateSw,
 | |
| 		migMutex:  sync.Mutex{},
 | |
| 		GetObjectInfoFn: func(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error) {
 | |
| 			return newObjectLayerFn().GetObjectInfo(ctx, bucket, object, opts)
 | |
| 		},
 | |
| 		GetObjectNInfoFn: func(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error) {
 | |
| 			return newObjectLayerFn().GetObjectNInfo(ctx, bucket, object, rs, h, lockType, opts)
 | |
| 		},
 | |
| 		DeleteObjectFn: func(ctx context.Context, bucket, object string) error {
 | |
| 			return newObjectLayerFn().DeleteObject(ctx, bucket, object)
 | |
| 		},
 | |
| 		DeleteObjectsFn: func(ctx context.Context, bucket string, objects []string) ([]error, error) {
 | |
| 			errs := make([]error, len(objects))
 | |
| 			for idx, object := range objects {
 | |
| 				errs[idx] = newObjectLayerFn().DeleteObject(ctx, bucket, object)
 | |
| 			}
 | |
| 			return errs, nil
 | |
| 		},
 | |
| 		PutObjectFn: func(ctx context.Context, bucket, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) {
 | |
| 			return newObjectLayerFn().PutObject(ctx, bucket, object, data, opts)
 | |
| 		},
 | |
| 	}
 | |
| 	if migrateSw {
 | |
| 		go c.migrateCacheFromV1toV2(ctx)
 | |
| 	}
 | |
| 	return c, nil
 | |
| }
 |