mirror of https://github.com/minio/minio.git
				
				
				
			
		
			
				
	
	
		
			212 lines
		
	
	
		
			5.6 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			212 lines
		
	
	
		
			5.6 KiB
		
	
	
	
		
			Go
		
	
	
	
| /*
 | |
|  * MinIO Cloud Storage, (C) 2019 MinIO, Inc.
 | |
|  *
 | |
|  * Licensed under the Apache License, Version 2.0 (the "License");
 | |
|  * you may not use this file except in compliance with the License.
 | |
|  * You may obtain a copy of the License at
 | |
|  *
 | |
|  *     http://www.apache.org/licenses/LICENSE-2.0
 | |
|  *
 | |
|  * Unless required by applicable law or agreed to in writing, software
 | |
|  * distributed under the License is distributed on an "AS IS" BASIS,
 | |
|  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
|  * See the License for the specific language governing permissions and
 | |
|  * limitations under the License.
 | |
|  */
 | |
| 
 | |
| package cmd
 | |
| 
 | |
| import (
 | |
| 	"bytes"
 | |
| 	"context"
 | |
| 	"encoding/json"
 | |
| 	"os"
 | |
| 	"path/filepath"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	jsoniter "github.com/json-iterator/go"
 | |
| 	"github.com/minio/minio/cmd/logger"
 | |
| 	"github.com/minio/minio/pkg/hash"
 | |
| )
 | |
| 
 | |
| const (
 | |
| 	dataUsageObjName       = "data-usage"
 | |
| 	dataUsageCrawlInterval = 12 * time.Hour
 | |
| )
 | |
| 
 | |
| func initDataUsageStats() {
 | |
| 	go runDataUsageInfoUpdateRoutine()
 | |
| }
 | |
| 
 | |
| func runDataUsageInfoUpdateRoutine() {
 | |
| 	// Wait until the object layer is ready
 | |
| 	var objAPI ObjectLayer
 | |
| 	for {
 | |
| 		objAPI = newObjectLayerWithoutSafeModeFn()
 | |
| 		if objAPI == nil {
 | |
| 			time.Sleep(time.Second)
 | |
| 			continue
 | |
| 		}
 | |
| 		break
 | |
| 	}
 | |
| 
 | |
| 	runDataUsageInfo(context.Background(), objAPI, GlobalServiceDoneCh)
 | |
| }
 | |
| 
 | |
| // timeToNextCrawl returns the duration until next crawl should occur
 | |
| // this is validated by verifying the LastUpdate time.
 | |
| func timeToCrawl(ctx context.Context, objAPI ObjectLayer) time.Duration {
 | |
| 	dataUsageInfo, err := loadDataUsageFromBackend(ctx, objAPI)
 | |
| 	if err != nil {
 | |
| 		// Upon an error wait for like 10
 | |
| 		// seconds to start the crawler.
 | |
| 		return 10 * time.Second
 | |
| 	}
 | |
| 	// File indeed doesn't exist when LastUpdate is zero
 | |
| 	// so we have never crawled, start crawl right away.
 | |
| 	if dataUsageInfo.LastUpdate.IsZero() {
 | |
| 		return 1 * time.Second
 | |
| 	}
 | |
| 	waitDuration := dataUsageInfo.LastUpdate.Sub(UTCNow())
 | |
| 	if waitDuration > dataUsageCrawlInterval {
 | |
| 		// Waited long enough start crawl in a 1 second
 | |
| 		return 1 * time.Second
 | |
| 	}
 | |
| 	// No crawling needed, ask the routine to wait until
 | |
| 	// the daily interval 12hrs - delta between last update
 | |
| 	// with current time.
 | |
| 	return dataUsageCrawlInterval - waitDuration
 | |
| }
 | |
| 
 | |
| func runDataUsageInfo(ctx context.Context, objAPI ObjectLayer, endCh <-chan struct{}) {
 | |
| 	locker := objAPI.NewNSLock(ctx, minioMetaBucket, "leader-data-usage-info")
 | |
| 	for {
 | |
| 		err := locker.GetLock(newDynamicTimeout(time.Millisecond, time.Millisecond))
 | |
| 		if err != nil {
 | |
| 			time.Sleep(5 * time.Minute)
 | |
| 			continue
 | |
| 		}
 | |
| 		// Break without unlocking, this node will acquire
 | |
| 		// data usage calculator role for its lifetime.
 | |
| 		break
 | |
| 	}
 | |
| 
 | |
| 	for {
 | |
| 		wait := timeToCrawl(ctx, objAPI)
 | |
| 		select {
 | |
| 		case <-endCh:
 | |
| 			locker.Unlock()
 | |
| 			return
 | |
| 		case <-time.NewTimer(wait).C:
 | |
| 			// Crawl only when no previous crawl has occurred,
 | |
| 			// or its been too long since last crawl.
 | |
| 			err := storeDataUsageInBackend(ctx, objAPI, objAPI.CrawlAndGetDataUsage(ctx, endCh))
 | |
| 			logger.LogIf(ctx, err)
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func storeDataUsageInBackend(ctx context.Context, objAPI ObjectLayer, dataUsageInfo DataUsageInfo) error {
 | |
| 	dataUsageJSON, err := json.Marshal(dataUsageInfo)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	size := int64(len(dataUsageJSON))
 | |
| 	r, err := hash.NewReader(bytes.NewReader(dataUsageJSON), size, "", "", size, false)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	_, err = objAPI.PutObject(ctx, minioMetaBackgroundOpsBucket, dataUsageObjName, NewPutObjReader(r, nil, nil), ObjectOptions{})
 | |
| 	return err
 | |
| }
 | |
| 
 | |
| func loadDataUsageFromBackend(ctx context.Context, objAPI ObjectLayer) (DataUsageInfo, error) {
 | |
| 	var dataUsageInfoJSON bytes.Buffer
 | |
| 
 | |
| 	err := objAPI.GetObject(ctx, minioMetaBackgroundOpsBucket, dataUsageObjName, 0, -1, &dataUsageInfoJSON, "", ObjectOptions{})
 | |
| 	if err != nil {
 | |
| 		if isErrObjectNotFound(err) {
 | |
| 			return DataUsageInfo{}, nil
 | |
| 		}
 | |
| 		return DataUsageInfo{}, toObjectErr(err, minioMetaBackgroundOpsBucket, dataUsageObjName)
 | |
| 	}
 | |
| 
 | |
| 	var dataUsageInfo DataUsageInfo
 | |
| 	var json = jsoniter.ConfigCompatibleWithStandardLibrary
 | |
| 	err = json.Unmarshal(dataUsageInfoJSON.Bytes(), &dataUsageInfo)
 | |
| 	if err != nil {
 | |
| 		return DataUsageInfo{}, err
 | |
| 	}
 | |
| 
 | |
| 	return dataUsageInfo, nil
 | |
| }
 | |
| 
 | |
| // Item represents each file while walking.
 | |
| type Item struct {
 | |
| 	Path string
 | |
| 	Typ  os.FileMode
 | |
| }
 | |
| 
 | |
| type getSizeFn func(item Item) (int64, error)
 | |
| 
 | |
| func updateUsage(basePath string, doneCh <-chan struct{}, waitForLowActiveIO func(), getSize getSizeFn) DataUsageInfo {
 | |
| 	var dataUsageInfo = DataUsageInfo{
 | |
| 		BucketsSizes:          make(map[string]uint64),
 | |
| 		ObjectsSizesHistogram: make(map[string]uint64),
 | |
| 	}
 | |
| 
 | |
| 	numWorkers := 4
 | |
| 	var mutex sync.Mutex // Mutex to update dataUsageInfo
 | |
| 
 | |
| 	fastWalk(basePath, numWorkers, doneCh, func(path string, typ os.FileMode) error {
 | |
| 		// Wait for I/O to go down.
 | |
| 		waitForLowActiveIO()
 | |
| 
 | |
| 		bucket, entry := path2BucketObjectWithBasePath(basePath, path)
 | |
| 		if bucket == "" {
 | |
| 			return nil
 | |
| 		}
 | |
| 
 | |
| 		if isReservedOrInvalidBucket(bucket, false) {
 | |
| 			return filepath.SkipDir
 | |
| 		}
 | |
| 
 | |
| 		if entry == "" && typ&os.ModeDir != 0 {
 | |
| 			mutex.Lock()
 | |
| 			dataUsageInfo.BucketsCount++
 | |
| 			dataUsageInfo.BucketsSizes[bucket] = 0
 | |
| 			mutex.Unlock()
 | |
| 			return nil
 | |
| 		}
 | |
| 
 | |
| 		mutex.Lock()
 | |
| 		defer mutex.Unlock()
 | |
| 
 | |
| 		if typ&os.ModeDir != 0 {
 | |
| 			return nil
 | |
| 		}
 | |
| 
 | |
| 		t := time.Now()
 | |
| 		size, err := getSize(Item{path, typ})
 | |
| 		// Use the response time of the getSize call to guess system load.
 | |
| 		// Sleep equivalent time.
 | |
| 		if d := time.Since(t); d > 100*time.Microsecond {
 | |
| 			time.Sleep(d)
 | |
| 		}
 | |
| 		if err != nil {
 | |
| 			return errSkipFile
 | |
| 		}
 | |
| 
 | |
| 		dataUsageInfo.ObjectsCount++
 | |
| 		dataUsageInfo.ObjectsTotalSize += uint64(size)
 | |
| 		dataUsageInfo.BucketsSizes[bucket] += uint64(size)
 | |
| 		dataUsageInfo.ObjectsSizesHistogram[objSizeToHistoInterval(uint64(size))]++
 | |
| 		return nil
 | |
| 	})
 | |
| 
 | |
| 	return dataUsageInfo
 | |
| }
 |