diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index b0018fa91..e50c899f7 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -1603,51 +1603,82 @@ func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, re return err } - if opts.WalkVersions { - go func() { - defer close(results) - - var marker, versionIDMarker string - for { - loi, err := z.ListObjectVersions(ctx, bucket, prefix, marker, versionIDMarker, "", 1000) - if err != nil { - break - } - - for _, obj := range loi.Objects { - results <- obj - } - - if !loi.IsTruncated { - break - } - - marker = loi.NextMarker - versionIDMarker = loi.NextVersionIDMarker - } - }() - return nil - } - + ctx, cancel := context.WithCancel(ctx) go func() { + defer cancel() defer close(results) - var marker string - for { - loi, err := z.ListObjects(ctx, bucket, prefix, marker, "", 1000) - if err != nil { - break - } + for _, erasureSet := range z.serverPools { + var wg sync.WaitGroup + for _, set := range erasureSet.sets { + set := set + wg.Add(1) + go func() { + defer wg.Done() - for _, obj := range loi.Objects { - results <- obj - } + disks, _ := set.getOnlineDisksWithHealing() + if len(disks) == 0 { + cancel() + return + } - if !loi.IsTruncated { - break - } + loadEntry := func(entry metaCacheEntry) { + if entry.isDir() { + return + } - marker = loi.NextMarker + fivs, err := entry.fileInfoVersions(bucket) + if err != nil { + cancel() + return + } + + for _, version := range fivs.Versions { + results <- version.ToObjectInfo(bucket, version.Name) + } + } + + // How to resolve partial results. + resolver := metadataResolutionParams{ + dirQuorum: 1, + objQuorum: 1, + bucket: bucket, + } + + path := baseDirFromPrefix(prefix) + if path == "" { + path = prefix + } + + lopts := listPathRawOptions{ + disks: disks, + bucket: bucket, + path: path, + recursive: true, + forwardTo: "", + minDisks: 1, + reportNotFound: false, + agreed: loadEntry, + partial: func(entries metaCacheEntries, nAgreed int, errs []error) { + entry, ok := entries.resolve(&resolver) + if !ok { + // check if we can get one entry atleast + // proceed to heal nonetheless. + entry, _ = entries.firstFound() + } + + loadEntry(*entry) + }, + finished: nil, + } + + if err := listPathRaw(ctx, lopts); err != nil { + logger.LogIf(ctx, fmt.Errorf("listPathRaw returned %w: opts(%#v)", err, lopts)) + return + } + }() + } + wg.Wait() } }()