mirror of https://github.com/minio/minio.git
				
				
				
			
		
			
				
	
	
		
			823 lines
		
	
	
		
			20 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			823 lines
		
	
	
		
			20 KiB
		
	
	
	
		
			Go
		
	
	
	
| /*
 | |
|  * MinIO Cloud Storage, (C) 2020 MinIO, Inc.
 | |
|  *
 | |
|  * Licensed under the Apache License, Version 2.0 (the "License");
 | |
|  * you may not use this file except in compliance with the License.
 | |
|  * You may obtain a copy of the License at
 | |
|  *
 | |
|  *     http://www.apache.org/licenses/LICENSE-2.0
 | |
|  *
 | |
|  * Unless required by applicable law or agreed to in writing, software
 | |
|  * distributed under the License is distributed on an "AS IS" BASIS,
 | |
|  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
|  * See the License for the specific language governing permissions and
 | |
|  * limitations under the License.
 | |
|  */
 | |
| 
 | |
| package cmd
 | |
| 
 | |
| import (
 | |
| 	"bytes"
 | |
| 	"context"
 | |
| 	"encoding/gob"
 | |
| 	"encoding/json"
 | |
| 	"fmt"
 | |
| 	"io"
 | |
| 	"strconv"
 | |
| 	"strings"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/minio/minio/cmd/logger"
 | |
| 	"github.com/minio/minio/pkg/console"
 | |
| 	"github.com/minio/minio/pkg/hash"
 | |
| )
 | |
| 
 | |
| type listPathOptions struct {
 | |
| 	// ID of the listing.
 | |
| 	// This will be used to persist the list.
 | |
| 	ID string
 | |
| 
 | |
| 	// Bucket of the listing.
 | |
| 	Bucket string
 | |
| 
 | |
| 	// Directory inside the bucket.
 | |
| 	BaseDir string
 | |
| 
 | |
| 	// Scan/return only content with prefix.
 | |
| 	Prefix string
 | |
| 
 | |
| 	// Marker to resume listing.
 | |
| 	// The response will be the first entry AFTER this object name.
 | |
| 	Marker string
 | |
| 
 | |
| 	// Limit the number of results.
 | |
| 	Limit int
 | |
| 
 | |
| 	// The number of disks to ask. Special values:
 | |
| 	// 0 uses default number of disks.
 | |
| 	// -1 use at least 50% of disks or at least the default number.
 | |
| 	AskDisks int
 | |
| 
 | |
| 	// InclDeleted will keep all entries where latest version is a delete marker.
 | |
| 	InclDeleted bool
 | |
| 
 | |
| 	// Scan recursively.
 | |
| 	// If false only main directory will be scanned.
 | |
| 	// Should always be true if Separator is n SlashSeparator.
 | |
| 	Recursive bool
 | |
| 
 | |
| 	// Separator to use.
 | |
| 	Separator string
 | |
| 
 | |
| 	// Create indicates that the lister should not attempt to load an existing cache.
 | |
| 	Create bool
 | |
| 
 | |
| 	// CurrentCycle indicates the current bloom cycle.
 | |
| 	// Will be used if a new scan is started.
 | |
| 	CurrentCycle uint64
 | |
| 
 | |
| 	// OldestCycle indicates the oldest cycle acceptable.
 | |
| 	OldestCycle uint64
 | |
| 
 | |
| 	// Include pure directories.
 | |
| 	IncludeDirectories bool
 | |
| 
 | |
| 	// Transient is set if the cache is transient due to an error or being a reserved bucket.
 | |
| 	// This means the cache metadata will not be persisted on disk.
 | |
| 	// A transient result will never be returned from the cache so knowing the list id is required.
 | |
| 	Transient bool
 | |
| 
 | |
| 	// singleObject will assume that prefix refers to an exact single object.
 | |
| 	singleObject bool
 | |
| }
 | |
| 
 | |
| func init() {
 | |
| 	gob.Register(listPathOptions{})
 | |
| }
 | |
| 
 | |
| // newMetacache constructs a new metacache from the options.
 | |
| func (o listPathOptions) newMetacache() metacache {
 | |
| 	return metacache{
 | |
| 		id:           o.ID,
 | |
| 		bucket:       o.Bucket,
 | |
| 		root:         o.BaseDir,
 | |
| 		recursive:    o.Recursive,
 | |
| 		status:       scanStateStarted,
 | |
| 		error:        "",
 | |
| 		started:      UTCNow(),
 | |
| 		lastHandout:  UTCNow(),
 | |
| 		lastUpdate:   UTCNow(),
 | |
| 		ended:        time.Time{},
 | |
| 		startedCycle: o.CurrentCycle,
 | |
| 		endedCycle:   0,
 | |
| 		dataVersion:  metacacheStreamVersion,
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // gatherResults will collect all results on the input channel and filter results according to the options.
 | |
| // Caller should close the channel when done.
 | |
| // The returned function will return the results once there is enough or input is closed.
 | |
| func (o *listPathOptions) gatherResults(in <-chan metaCacheEntry) func() (metaCacheEntriesSorted, error) {
 | |
| 	const debugPrint = false
 | |
| 	var resultsDone = make(chan metaCacheEntriesSorted)
 | |
| 	// Copy so we can mutate
 | |
| 	resCh := resultsDone
 | |
| 	resErr := io.EOF
 | |
| 
 | |
| 	go func() {
 | |
| 		var results metaCacheEntriesSorted
 | |
| 		for entry := range in {
 | |
| 			if resCh == nil {
 | |
| 				// past limit
 | |
| 				continue
 | |
| 			}
 | |
| 			if !o.IncludeDirectories && entry.isDir() {
 | |
| 				continue
 | |
| 			}
 | |
| 			if debugPrint {
 | |
| 				console.Infoln("gather got:", entry.name)
 | |
| 			}
 | |
| 			if o.Marker != "" && entry.name <= o.Marker {
 | |
| 				if debugPrint {
 | |
| 					console.Infoln("pre marker")
 | |
| 				}
 | |
| 				continue
 | |
| 			}
 | |
| 			if !strings.HasPrefix(entry.name, o.Prefix) {
 | |
| 				if debugPrint {
 | |
| 					console.Infoln("not in prefix")
 | |
| 				}
 | |
| 				continue
 | |
| 			}
 | |
| 			if !o.Recursive && !entry.isInDir(o.Prefix, o.Separator) {
 | |
| 				if debugPrint {
 | |
| 					console.Infoln("not in dir", o.Prefix, o.Separator)
 | |
| 				}
 | |
| 				continue
 | |
| 			}
 | |
| 			if !o.InclDeleted && entry.isObject() {
 | |
| 				if entry.isLatestDeletemarker() {
 | |
| 					if debugPrint {
 | |
| 						console.Infoln("latest delete")
 | |
| 					}
 | |
| 					continue
 | |
| 				}
 | |
| 			}
 | |
| 			if o.Limit > 0 && results.len() >= o.Limit {
 | |
| 				// We have enough and we have more.
 | |
| 				// Do not return io.EOF
 | |
| 				if resCh != nil {
 | |
| 					resErr = nil
 | |
| 					resCh <- results
 | |
| 					resCh = nil
 | |
| 				}
 | |
| 				continue
 | |
| 			}
 | |
| 			if debugPrint {
 | |
| 				console.Infoln("adding...")
 | |
| 			}
 | |
| 			results.o = append(results.o, entry)
 | |
| 		}
 | |
| 		if resCh != nil {
 | |
| 			resErr = io.EOF
 | |
| 			resCh <- results
 | |
| 		}
 | |
| 	}()
 | |
| 	return func() (metaCacheEntriesSorted, error) {
 | |
| 		return <-resultsDone, resErr
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // findFirstPart will find the part with 0 being the first that corresponds to the marker in the options.
 | |
| // io.ErrUnexpectedEOF is returned if the place containing the marker hasn't been scanned yet.
 | |
| // io.EOF indicates the marker is beyond the end of the stream and does not exist.
 | |
| func (o *listPathOptions) findFirstPart(fi FileInfo) (int, error) {
 | |
| 	search := o.Marker
 | |
| 	if search == "" {
 | |
| 		search = o.Prefix
 | |
| 	}
 | |
| 	if search == "" {
 | |
| 		return 0, nil
 | |
| 	}
 | |
| 	const debugPrint = false
 | |
| 	if debugPrint {
 | |
| 		console.Infoln("searching for ", search)
 | |
| 	}
 | |
| 	var tmp metacacheBlock
 | |
| 	i := 0
 | |
| 	for {
 | |
| 		partKey := fmt.Sprintf("%s-metacache-part-%d", ReservedMetadataPrefixLower, i)
 | |
| 		v, ok := fi.Metadata[partKey]
 | |
| 		if !ok {
 | |
| 			if debugPrint {
 | |
| 				console.Infoln("no match in metadata, waiting")
 | |
| 			}
 | |
| 			return -1, io.ErrUnexpectedEOF
 | |
| 		}
 | |
| 		err := json.Unmarshal([]byte(v), &tmp)
 | |
| 		if !ok {
 | |
| 			logger.LogIf(context.Background(), err)
 | |
| 			return -1, err
 | |
| 		}
 | |
| 		if tmp.First == "" && tmp.Last == "" && tmp.EOS {
 | |
| 			return 0, errFileNotFound
 | |
| 		}
 | |
| 		if tmp.First >= search {
 | |
| 			if debugPrint {
 | |
| 				console.Infoln("First >= search", v)
 | |
| 			}
 | |
| 			return i, nil
 | |
| 		}
 | |
| 		if tmp.Last >= search {
 | |
| 			if debugPrint {
 | |
| 
 | |
| 				console.Infoln("Last >= search", v)
 | |
| 			}
 | |
| 			return i, nil
 | |
| 		}
 | |
| 		if tmp.EOS {
 | |
| 			if debugPrint {
 | |
| 				console.Infoln("no match, at EOS", v)
 | |
| 			}
 | |
| 			return -3, io.EOF
 | |
| 		}
 | |
| 		if debugPrint {
 | |
| 			console.Infoln("First ", tmp.First, "<", search, " search", i)
 | |
| 		}
 | |
| 		i++
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // updateMetacacheListing will update the metacache listing.
 | |
| func (o *listPathOptions) updateMetacacheListing(m metacache, rpc *peerRESTClient) (metacache, error) {
 | |
| 	if o.Transient {
 | |
| 		return localMetacacheMgr.getTransient().updateCacheEntry(m)
 | |
| 	}
 | |
| 	if rpc == nil {
 | |
| 		return localMetacacheMgr.updateCacheEntry(m)
 | |
| 	}
 | |
| 	return rpc.UpdateMetacacheListing(context.Background(), m)
 | |
| }
 | |
| 
 | |
| func getMetacacheBlockInfo(fi FileInfo, block int) (*metacacheBlock, error) {
 | |
| 	var tmp metacacheBlock
 | |
| 	partKey := fmt.Sprintf("%s-metacache-part-%d", ReservedMetadataPrefixLower, block)
 | |
| 	v, ok := fi.Metadata[partKey]
 | |
| 	if !ok {
 | |
| 		return nil, io.ErrUnexpectedEOF
 | |
| 	}
 | |
| 	return &tmp, json.Unmarshal([]byte(v), &tmp)
 | |
| }
 | |
| 
 | |
| func metacachePrefixForID(bucket, id string) string {
 | |
| 	return pathJoin("buckets", bucket, ".metacache", id)
 | |
| }
 | |
| 
 | |
| // objectPath returns the object path of the cache.
 | |
| func (o *listPathOptions) objectPath(block int) string {
 | |
| 	return pathJoin(metacachePrefixForID(o.Bucket, o.ID), "block-"+strconv.Itoa(block)+".s2")
 | |
| }
 | |
| 
 | |
| // filter will apply the options and return the number of objects requested by the limit.
 | |
| // Will return io.EOF if there are no more entries with the same filter.
 | |
| // The last entry can be used as a marker to resume the listing.
 | |
| func (r *metacacheReader) filter(o listPathOptions) (entries metaCacheEntriesSorted, err error) {
 | |
| 	const debugPrint = false
 | |
| 	// Forward to prefix, if any
 | |
| 	err = r.forwardTo(o.Prefix)
 | |
| 	if err != nil {
 | |
| 		return entries, err
 | |
| 	}
 | |
| 	if o.Marker != "" {
 | |
| 		err = r.forwardTo(o.Marker)
 | |
| 		if err != nil {
 | |
| 			return entries, err
 | |
| 		}
 | |
| 		next, err := r.peek()
 | |
| 		if err != nil {
 | |
| 			return entries, err
 | |
| 		}
 | |
| 		if next.name == o.Marker {
 | |
| 			err := r.skip(1)
 | |
| 			if err != nil {
 | |
| 				return entries, err
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 	if debugPrint {
 | |
| 		console.Infoln("forwarded to ", o.Prefix, "marker:", o.Marker, "sep:", o.Separator)
 | |
| 	}
 | |
| 	// Filter
 | |
| 	if !o.Recursive {
 | |
| 		entries.o = make(metaCacheEntries, 0, o.Limit)
 | |
| 		pastPrefix := false
 | |
| 		err := r.readFn(func(entry metaCacheEntry) bool {
 | |
| 			if o.Prefix != "" && !strings.HasPrefix(entry.name, o.Prefix) {
 | |
| 				// We are past the prefix, don't continue.
 | |
| 				pastPrefix = true
 | |
| 				return false
 | |
| 			}
 | |
| 			if !o.IncludeDirectories && entry.isDir() {
 | |
| 				return true
 | |
| 			}
 | |
| 			if !entry.isInDir(o.Prefix, o.Separator) {
 | |
| 				return true
 | |
| 			}
 | |
| 			if !o.InclDeleted && entry.isObject() && entry.isLatestDeletemarker() {
 | |
| 				return entries.len() < o.Limit
 | |
| 			}
 | |
| 			entries.o = append(entries.o, entry)
 | |
| 			return entries.len() < o.Limit
 | |
| 		})
 | |
| 		if (err != nil && err.Error() == io.EOF.Error()) || pastPrefix || r.nextEOF() {
 | |
| 			return entries, io.EOF
 | |
| 		}
 | |
| 		return entries, err
 | |
| 	}
 | |
| 
 | |
| 	// We should not need to filter more.
 | |
| 	return r.readN(o.Limit, o.InclDeleted, o.IncludeDirectories, o.Prefix)
 | |
| }
 | |
| 
 | |
| func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOptions) (entries metaCacheEntriesSorted, err error) {
 | |
| 	retries := 0
 | |
| 	const debugPrint = false
 | |
| 	rpc := globalNotificationSys.restClientFromHash(o.Bucket)
 | |
| 	for {
 | |
| 		select {
 | |
| 		case <-ctx.Done():
 | |
| 			return entries, ctx.Err()
 | |
| 		default:
 | |
| 		}
 | |
| 
 | |
| 		// If many failures, check the cache state.
 | |
| 		if retries > 10 {
 | |
| 			err := o.checkMetacacheState(ctx, rpc)
 | |
| 			if debugPrint {
 | |
| 				logger.Info("waiting for first part (%s), err: %v", o.objectPath(0), err)
 | |
| 			}
 | |
| 			if err != nil {
 | |
| 				return entries, err
 | |
| 			}
 | |
| 			retries = 1
 | |
| 		}
 | |
| 
 | |
| 		const retryDelay = 500 * time.Millisecond
 | |
| 		// Load first part metadata...
 | |
| 		// All operations are performed without locks, so we must be careful and allow for failures.
 | |
| 		// Read metadata associated with the object from a disk.
 | |
| 		if retries > 0 {
 | |
| 			disks := er.getOnlineDisks()
 | |
| 			if len(disks) == 0 {
 | |
| 				time.Sleep(retryDelay)
 | |
| 				retries++
 | |
| 				continue
 | |
| 			}
 | |
| 
 | |
| 			_, err := disks[0].ReadVersion(ctx, minioMetaBucket, o.objectPath(0), "", false)
 | |
| 			if err != nil {
 | |
| 				time.Sleep(retryDelay)
 | |
| 				retries++
 | |
| 				continue
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		// Read metadata associated with the object from all disks.
 | |
| 		fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, minioMetaBucket, o.objectPath(0), ObjectOptions{})
 | |
| 		if err != nil {
 | |
| 			switch toObjectErr(err, minioMetaBucket, o.objectPath(0)).(type) {
 | |
| 			case ObjectNotFound:
 | |
| 				retries++
 | |
| 				time.Sleep(retryDelay)
 | |
| 				continue
 | |
| 			case InsufficientReadQuorum:
 | |
| 				retries++
 | |
| 				time.Sleep(retryDelay)
 | |
| 				continue
 | |
| 			default:
 | |
| 				if debugPrint {
 | |
| 					console.Infoln("first getObjectFileInfo", o.objectPath(0), "returned err:", err)
 | |
| 					console.Infof("err type: %T\n", err)
 | |
| 				}
 | |
| 				return entries, err
 | |
| 			}
 | |
| 		}
 | |
| 		if fi.Deleted {
 | |
| 			return entries, errFileNotFound
 | |
| 		}
 | |
| 
 | |
| 		partN, err := o.findFirstPart(fi)
 | |
| 		switch err {
 | |
| 		case nil:
 | |
| 		case io.ErrUnexpectedEOF:
 | |
| 			if retries == 10 {
 | |
| 				err := o.checkMetacacheState(ctx, rpc)
 | |
| 				if debugPrint {
 | |
| 					logger.Info("waiting for metadata, err: %v", err)
 | |
| 				}
 | |
| 				if err != nil {
 | |
| 					return entries, err
 | |
| 				}
 | |
| 				retries = -1
 | |
| 			}
 | |
| 			retries++
 | |
| 			time.Sleep(retryDelay)
 | |
| 			continue
 | |
| 		case io.EOF:
 | |
| 			return entries, io.EOF
 | |
| 		}
 | |
| 
 | |
| 		// We got a stream to start at.
 | |
| 		loadedPart := 0
 | |
| 		var buf bytes.Buffer
 | |
| 		for {
 | |
| 			select {
 | |
| 			case <-ctx.Done():
 | |
| 				return entries, ctx.Err()
 | |
| 			default:
 | |
| 			}
 | |
| 
 | |
| 			if partN != loadedPart {
 | |
| 				if retries > 10 {
 | |
| 					err := o.checkMetacacheState(ctx, rpc)
 | |
| 					if debugPrint {
 | |
| 						logger.Info("waiting for part data (%v), err: %v", o.objectPath(partN), err)
 | |
| 					}
 | |
| 					if err != nil {
 | |
| 						return entries, err
 | |
| 					}
 | |
| 					retries = 1
 | |
| 				}
 | |
| 
 | |
| 				if retries > 0 {
 | |
| 					// Load from one disk only
 | |
| 					disks := er.getOnlineDisks()
 | |
| 					if len(disks) == 0 {
 | |
| 						time.Sleep(retryDelay)
 | |
| 						retries++
 | |
| 						continue
 | |
| 					}
 | |
| 
 | |
| 					_, err := disks[0].ReadVersion(ctx, minioMetaBucket, o.objectPath(partN), "", false)
 | |
| 					if err != nil {
 | |
| 						time.Sleep(retryDelay)
 | |
| 						retries++
 | |
| 						continue
 | |
| 					}
 | |
| 				}
 | |
| 				// Load first part metadata...
 | |
| 				fi, metaArr, onlineDisks, err = er.getObjectFileInfo(ctx, minioMetaBucket, o.objectPath(partN), ObjectOptions{})
 | |
| 				if err != nil {
 | |
| 					time.Sleep(retryDelay)
 | |
| 					retries++
 | |
| 					continue
 | |
| 				}
 | |
| 				loadedPart = partN
 | |
| 				bi, err := getMetacacheBlockInfo(fi, partN)
 | |
| 				logger.LogIf(ctx, err)
 | |
| 				if err == nil {
 | |
| 					if bi.pastPrefix(o.Prefix) {
 | |
| 						return entries, io.EOF
 | |
| 					}
 | |
| 				}
 | |
| 				if fi.Deleted {
 | |
| 					return entries, io.ErrUnexpectedEOF
 | |
| 				}
 | |
| 			}
 | |
| 			buf.Reset()
 | |
| 			err := er.getObjectWithFileInfo(ctx, minioMetaBucket, o.objectPath(partN), 0, fi.Size, &buf, fi, metaArr, onlineDisks)
 | |
| 			if err != nil {
 | |
| 				switch toObjectErr(err, minioMetaBucket, o.objectPath(partN)).(type) {
 | |
| 				case ObjectNotFound:
 | |
| 					retries++
 | |
| 					time.Sleep(retryDelay)
 | |
| 					continue
 | |
| 				case InsufficientReadQuorum:
 | |
| 					retries++
 | |
| 					time.Sleep(retryDelay)
 | |
| 					continue
 | |
| 				default:
 | |
| 					logger.LogIf(ctx, err)
 | |
| 					return entries, err
 | |
| 				}
 | |
| 			}
 | |
| 			tmp, err := newMetacacheReader(&buf)
 | |
| 			if err != nil {
 | |
| 				return entries, err
 | |
| 			}
 | |
| 			e, err := tmp.filter(o)
 | |
| 			entries.o = append(entries.o, e.o...)
 | |
| 			if o.Limit > 0 && entries.len() > o.Limit {
 | |
| 				entries.truncate(o.Limit)
 | |
| 				return entries, nil
 | |
| 			}
 | |
| 			switch err {
 | |
| 			case io.EOF:
 | |
| 				// We finished at the end of the block.
 | |
| 				// And should not expect any more results.
 | |
| 				bi, err := getMetacacheBlockInfo(fi, partN)
 | |
| 				logger.LogIf(ctx, err)
 | |
| 				if err != nil || bi.EOS {
 | |
| 					// We are done and there are no more parts.
 | |
| 					return entries, io.EOF
 | |
| 				}
 | |
| 				if bi.endedPrefix(o.Prefix) {
 | |
| 					// Nothing more for prefix.
 | |
| 					return entries, io.EOF
 | |
| 				}
 | |
| 				partN++
 | |
| 				retries = 0
 | |
| 			case nil:
 | |
| 				// We stopped within the listing, we are done for now...
 | |
| 				return entries, nil
 | |
| 			default:
 | |
| 				return entries, err
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Will return io.EOF if continuing would not yield more results.
 | |
| func (er *erasureObjects) listPath(ctx context.Context, o listPathOptions) (entries metaCacheEntriesSorted, err error) {
 | |
| 	const debugPrint = false
 | |
| 	if debugPrint {
 | |
| 		console.Printf("listPath with options: %#v\n", o)
 | |
| 	}
 | |
| 	// See if we have the listing stored.
 | |
| 	if !o.Create && !o.singleObject {
 | |
| 		entries, err := er.streamMetadataParts(ctx, o)
 | |
| 		switch err {
 | |
| 		case nil, io.EOF, context.Canceled, context.DeadlineExceeded:
 | |
| 			return entries, err
 | |
| 		}
 | |
| 		logger.LogIf(ctx, err)
 | |
| 		return entries, err
 | |
| 	}
 | |
| 
 | |
| 	meta := o.newMetacache()
 | |
| 	rpc := globalNotificationSys.restClientFromHash(o.Bucket)
 | |
| 	var metaMu sync.Mutex
 | |
| 
 | |
| 	if debugPrint {
 | |
| 		console.Println("listPath: scanning bucket:", o.Bucket, "basedir:", o.BaseDir, "prefix:", o.Prefix, "marker:", o.Marker)
 | |
| 	}
 | |
| 
 | |
| 	// Disconnect from call above, but cancel on exit.
 | |
| 	ctx, cancel := context.WithCancel(GlobalContext)
 | |
| 	// We need to ask disks.
 | |
| 	disks := er.getOnlineDisks()
 | |
| 
 | |
| 	defer func() {
 | |
| 		if debugPrint {
 | |
| 			console.Println("listPath returning:", entries.len(), "err:", err)
 | |
| 		}
 | |
| 		if err != nil && err != io.EOF {
 | |
| 			metaMu.Lock()
 | |
| 			if meta.status != scanStateError {
 | |
| 				meta.error = err.Error()
 | |
| 				meta.status = scanStateError
 | |
| 			}
 | |
| 			meta, _ = o.updateMetacacheListing(meta, rpc)
 | |
| 			metaMu.Unlock()
 | |
| 			cancel()
 | |
| 		}
 | |
| 	}()
 | |
| 
 | |
| 	askDisks := o.AskDisks
 | |
| 	if askDisks == -1 {
 | |
| 		askDisks = getReadQuorum(er.SetDriveCount())
 | |
| 	}
 | |
| 
 | |
| 	if len(disks) < askDisks {
 | |
| 		err = InsufficientReadQuorum{}
 | |
| 		if debugPrint {
 | |
| 			console.Errorf("listPath: Insufficient disks, %d of %d needed are available", len(disks), askDisks)
 | |
| 		}
 | |
| 		logger.LogIf(ctx, fmt.Errorf("listPath: Insufficient disks, %d of %d needed are available", len(disks), askDisks))
 | |
| 		cancel()
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	// Select askDisks random disks.
 | |
| 	if len(disks) > askDisks {
 | |
| 		disks = disks[:askDisks]
 | |
| 	}
 | |
| 
 | |
| 	var readers = make([]*metacacheReader, askDisks)
 | |
| 	for i := range disks {
 | |
| 		r, w := io.Pipe()
 | |
| 		d := disks[i]
 | |
| 		readers[i], err = newMetacacheReader(r)
 | |
| 		if err != nil {
 | |
| 			cancel()
 | |
| 			return entries, err
 | |
| 		}
 | |
| 		// Send request to each disk.
 | |
| 		go func() {
 | |
| 			err := d.WalkDir(ctx, WalkDirOptions{Bucket: o.Bucket, BaseDir: o.BaseDir, Recursive: o.Recursive || o.Separator != SlashSeparator}, w)
 | |
| 			w.CloseWithError(err)
 | |
| 			if err != io.EOF {
 | |
| 				logger.LogIf(ctx, err)
 | |
| 			}
 | |
| 		}()
 | |
| 	}
 | |
| 
 | |
| 	// Create output for our results.
 | |
| 	cacheCh := make(chan metaCacheEntry, metacacheBlockSize)
 | |
| 
 | |
| 	// Create filter for results.
 | |
| 	filterCh := make(chan metaCacheEntry, 100)
 | |
| 	filteredResults := o.gatherResults(filterCh)
 | |
| 	closeChannels := func() {
 | |
| 		close(cacheCh)
 | |
| 		close(filterCh)
 | |
| 	}
 | |
| 
 | |
| 	go func() {
 | |
| 		defer cancel()
 | |
| 		// Save continuous updates
 | |
| 		go func() {
 | |
| 			var err error
 | |
| 			ticker := time.NewTicker(10 * time.Second)
 | |
| 			defer ticker.Stop()
 | |
| 			var exit bool
 | |
| 			for !exit {
 | |
| 				select {
 | |
| 				case <-ticker.C:
 | |
| 				case <-ctx.Done():
 | |
| 					exit = true
 | |
| 				}
 | |
| 				metaMu.Lock()
 | |
| 				meta.endedCycle = intDataUpdateTracker.current()
 | |
| 				meta, err = o.updateMetacacheListing(meta, rpc)
 | |
| 				if meta.status == scanStateError {
 | |
| 					cancel()
 | |
| 					exit = true
 | |
| 				}
 | |
| 				metaMu.Unlock()
 | |
| 				logger.LogIf(ctx, err)
 | |
| 			}
 | |
| 		}()
 | |
| 
 | |
| 		const retryDelay = 200 * time.Millisecond
 | |
| 		const maxTries = 10
 | |
| 
 | |
| 		// Write results to disk.
 | |
| 		bw := newMetacacheBlockWriter(cacheCh, func(b *metacacheBlock) error {
 | |
| 			if o.singleObject {
 | |
| 				// Don't save single object listings.
 | |
| 				return nil
 | |
| 			}
 | |
| 			if debugPrint {
 | |
| 				console.Println("listPath: saving block", b.n, "to", o.objectPath(b.n))
 | |
| 			}
 | |
| 			r, err := hash.NewReader(bytes.NewBuffer(b.data), int64(len(b.data)), "", "", int64(len(b.data)), false)
 | |
| 			logger.LogIf(ctx, err)
 | |
| 			custom := b.headerKV()
 | |
| 			_, err = er.putObject(ctx, minioMetaBucket, o.objectPath(b.n), NewPutObjReader(r, nil, nil), ObjectOptions{UserDefined: custom})
 | |
| 			if err != nil {
 | |
| 				metaMu.Lock()
 | |
| 				if meta.error != "" {
 | |
| 					meta.status = scanStateError
 | |
| 					meta.error = err.Error()
 | |
| 				}
 | |
| 				metaMu.Unlock()
 | |
| 				cancel()
 | |
| 				return err
 | |
| 			}
 | |
| 			if b.n == 0 {
 | |
| 				return nil
 | |
| 			}
 | |
| 			// Update block 0 metadata.
 | |
| 			var retries int
 | |
| 			for {
 | |
| 				err := er.updateObjectMeta(ctx, minioMetaBucket, o.objectPath(0), b.headerKV(), ObjectOptions{})
 | |
| 				if err == nil {
 | |
| 					break
 | |
| 				}
 | |
| 				switch err.(type) {
 | |
| 				case ObjectNotFound:
 | |
| 					return err
 | |
| 				case InsufficientReadQuorum:
 | |
| 				default:
 | |
| 					logger.LogIf(ctx, err)
 | |
| 				}
 | |
| 				if retries >= maxTries {
 | |
| 					return err
 | |
| 				}
 | |
| 				retries++
 | |
| 				time.Sleep(retryDelay)
 | |
| 			}
 | |
| 			return nil
 | |
| 		})
 | |
| 
 | |
| 		// How to resolve results.
 | |
| 		resolver := metadataResolutionParams{
 | |
| 			dirQuorum: askDisks - 1,
 | |
| 			objQuorum: askDisks - 1,
 | |
| 			bucket:    o.Bucket,
 | |
| 		}
 | |
| 
 | |
| 		topEntries := make(metaCacheEntries, len(readers))
 | |
| 		for {
 | |
| 			// Get the top entry from each
 | |
| 			var current metaCacheEntry
 | |
| 			var atEOF, agree int
 | |
| 			for i, r := range readers {
 | |
| 				topEntries[i].name = ""
 | |
| 				entry, err := r.peek()
 | |
| 				switch err {
 | |
| 				case io.EOF:
 | |
| 					atEOF++
 | |
| 					continue
 | |
| 				case nil:
 | |
| 				default:
 | |
| 					closeChannels()
 | |
| 					metaMu.Lock()
 | |
| 					meta.status = scanStateError
 | |
| 					meta.error = err.Error()
 | |
| 					metaMu.Unlock()
 | |
| 					return
 | |
| 				}
 | |
| 				// If no current, add it.
 | |
| 				if current.name == "" {
 | |
| 					topEntries[i] = entry
 | |
| 					current = entry
 | |
| 					agree++
 | |
| 					continue
 | |
| 				}
 | |
| 				// If exact match, we agree.
 | |
| 				if current.matches(&entry, o.Bucket) {
 | |
| 					topEntries[i] = entry
 | |
| 					agree++
 | |
| 					continue
 | |
| 				}
 | |
| 				// If only the name matches we didn't agree, but add it for resolution.
 | |
| 				if entry.name == current.name {
 | |
| 					topEntries[i] = entry
 | |
| 					continue
 | |
| 				}
 | |
| 				// We got different entries
 | |
| 				if entry.name > current.name {
 | |
| 					continue
 | |
| 				}
 | |
| 				// We got a new, better current.
 | |
| 				// Clear existing entries.
 | |
| 				for i := range topEntries[:i] {
 | |
| 					topEntries[i] = metaCacheEntry{}
 | |
| 				}
 | |
| 				agree = 1
 | |
| 				current = entry
 | |
| 				topEntries[i] = entry
 | |
| 			}
 | |
| 			// Break if all at EOF.
 | |
| 			if atEOF == len(readers) {
 | |
| 				break
 | |
| 			}
 | |
| 			if agree == len(readers) {
 | |
| 				// Everybody agreed
 | |
| 				for _, r := range readers {
 | |
| 					r.skip(1)
 | |
| 				}
 | |
| 				cacheCh <- topEntries[0]
 | |
| 				filterCh <- topEntries[0]
 | |
| 				continue
 | |
| 			}
 | |
| 
 | |
| 			// Results Disagree :-(
 | |
| 			entry, ok := topEntries.resolve(&resolver)
 | |
| 			if ok {
 | |
| 				cacheCh <- *entry
 | |
| 				filterCh <- *entry
 | |
| 			}
 | |
| 			// Skip the inputs we used.
 | |
| 			for i, r := range readers {
 | |
| 				if topEntries[i].name != "" {
 | |
| 					r.skip(1)
 | |
| 				}
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		// Save success
 | |
| 		metaMu.Lock()
 | |
| 		if meta.error == "" {
 | |
| 			meta.status = scanStateSuccess
 | |
| 			meta.endedCycle = intDataUpdateTracker.current()
 | |
| 		}
 | |
| 		meta, _ = o.updateMetacacheListing(meta, rpc)
 | |
| 		metaMu.Unlock()
 | |
| 
 | |
| 		closeChannels()
 | |
| 		if err := bw.Close(); err != nil {
 | |
| 			metaMu.Lock()
 | |
| 			meta.error = err.Error()
 | |
| 			meta.status = scanStateError
 | |
| 			meta, err = o.updateMetacacheListing(meta, rpc)
 | |
| 			metaMu.Unlock()
 | |
| 		}
 | |
| 	}()
 | |
| 
 | |
| 	return filteredResults()
 | |
| }
 |