diff --git a/cmd/erasure-decode.go b/cmd/erasure-decode.go index 716f53a65..0e7fb6835 100644 --- a/cmd/erasure-decode.go +++ b/cmd/erasure-decode.go @@ -282,3 +282,51 @@ func (e Erasure) Decode(ctx context.Context, writer io.Writer, readers []io.Read return bytesWritten, derr } + +// Heal reads from readers, reconstruct shards and writes the data to the writers. +func (e Erasure) Heal(ctx context.Context, writers []io.Writer, readers []io.ReaderAt, totalLength int64) (derr error) { + if len(writers) != e.parityBlocks+e.dataBlocks { + return errInvalidArgument + } + + reader := newParallelReader(readers, e, 0, totalLength) + + startBlock := int64(0) + endBlock := totalLength / e.blockSize + if totalLength%e.blockSize != 0 { + endBlock++ + } + + var bufs [][]byte + for block := startBlock; block < endBlock; block++ { + var err error + bufs, err = reader.Read(bufs) + if len(bufs) > 0 { + if errors.Is(err, errFileNotFound) || errors.Is(err, errFileCorrupt) { + if derr == nil { + derr = err + } + } + } else if err != nil { + return err + } + + if err = e.DecodeDataAndParityBlocks(ctx, bufs); err != nil { + logger.LogIf(ctx, err) + return err + } + + w := parallelWriter{ + writers: writers, + writeQuorum: 1, + errs: make([]error, len(writers)), + } + + err = w.Write(ctx, bufs) + if err != nil { + return err + } + } + + return derr +} diff --git a/cmd/erasure-heal_test.go b/cmd/erasure-heal_test.go index af1609178..3c51d3ad4 100644 --- a/cmd/erasure-heal_test.go +++ b/cmd/erasure-heal_test.go @@ -24,9 +24,6 @@ import ( "io" "os" "testing" - - humanize "github.com/dustin/go-humanize" - "github.com/minio/minio/internal/bpool" ) var erasureHealTests = []struct { @@ -137,15 +134,8 @@ func TestErasureHeal(t *testing.T) { staleWriters[i] = newBitrotWriter(disk, "testbucket", "testobject", erasure.ShardFileSize(test.size), test.algorithm, erasure.ShardSize()) } - // Number of buffers, max 2GB - n := (2 * humanize.GiByte) / (int(test.blocksize) * 2) - - // Initialize byte pool once for all sets, bpool size is set to - // setCount * setDriveCount with each memory upto blockSizeV2. - bp := bpool.NewBytePoolCap(n, int(test.blocksize), int(test.blocksize)*2) - // test case setup is complete - now call Heal() - err = erasure.Heal(context.Background(), readers, staleWriters, test.size, bp) + err = erasure.Heal(context.Background(), staleWriters, readers, test.size) closeBitrotReaders(readers) closeBitrotWriters(staleWriters) if err != nil && !test.shouldFail { diff --git a/cmd/erasure-healing.go b/cmd/erasure-healing.go index 8616c441b..7f687aa71 100644 --- a/cmd/erasure-healing.go +++ b/cmd/erasure-healing.go @@ -457,10 +457,6 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s } erasureInfo := latestMeta.Erasure - bp := er.bp - if erasureInfo.BlockSize == blockSizeV1 { - bp = er.bpOld - } for partIndex := 0; partIndex < len(latestMeta.Parts); partIndex++ { partSize := latestMeta.Parts[partIndex].Size partActualSize := latestMeta.Parts[partIndex].ActualSize @@ -491,7 +487,7 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s tillOffset, DefaultBitrotAlgorithm, erasure.ShardSize()) } } - err = erasure.Heal(ctx, readers, writers, partSize, bp) + err = erasure.Heal(ctx, writers, readers, partSize) closeBitrotReaders(readers) closeBitrotWriters(writers) if err != nil { diff --git a/cmd/erasure-healing_test.go b/cmd/erasure-healing_test.go index 3d00fe6e7..371f0a677 100644 --- a/cmd/erasure-healing_test.go +++ b/cmd/erasure-healing_test.go @@ -21,6 +21,8 @@ import ( "bytes" "context" "crypto/rand" + "crypto/sha256" + "io" "os" "path" "reflect" @@ -649,3 +651,126 @@ func TestHealEmptyDirectoryErasure(t *testing.T) { } } } + +func TestHealLastDataShard(t *testing.T) { + tests := []struct { + name string + dataSize int64 + }{ + {"4KiB", 4 * humanize.KiByte}, + {"64KiB", 64 * humanize.KiByte}, + {"128KiB", 128 * humanize.KiByte}, + {"1MiB", 1 * humanize.MiByte}, + {"5MiB", 5 * humanize.MiByte}, + {"10MiB", 10 * humanize.MiByte}, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + nDisks := 16 + fsDirs, err := getRandomDisks(nDisks) + if err != nil { + t.Fatal(err) + } + + defer removeRoots(fsDirs) + + obj, _, err := initObjectLayer(ctx, mustGetPoolEndpoints(fsDirs...)) + if err != nil { + t.Fatal(err) + } + bucket := "bucket" + object := "object" + + data := make([]byte, test.dataSize) + _, err = rand.Read(data) + if err != nil { + t.Fatal(err) + } + var opts ObjectOptions + + err = obj.MakeBucketWithLocation(ctx, bucket, BucketOptions{}) + if err != nil { + t.Fatalf("Failed to make a bucket - %v", err) + } + + _, err = obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader(data), int64(len(data)), "", ""), opts) + + if err != nil { + t.Fatal(err) + } + + actualH := sha256.New() + _, err = io.Copy(actualH, bytes.NewReader(data)) + if err != nil { + return + } + actualSha256 := actualH.Sum(nil) + + z := obj.(*erasureServerPools) + er := z.serverPools[0].getHashedSet(object) + + disks := er.getDisks() + distribution := hashOrder(pathJoin(bucket, object), nDisks) + shuffledDisks := shuffleDisks(disks, distribution) + + // remove last data shard + err = removeAll(pathJoin(shuffledDisks[11].String(), bucket, object)) + if err != nil { + t.Fatalf("Failed to delete a file - %v", err) + } + _, err = obj.HealObject(ctx, bucket, object, "", madmin.HealOpts{ScanMode: madmin.HealNormalScan}) + if err != nil { + t.Fatal(err) + } + + firstGr, err := obj.GetObjectNInfo(ctx, bucket, object, nil, nil, noLock, ObjectOptions{}) + defer firstGr.Close() + if err != nil { + t.Fatal(err) + } + + firstHealedH := sha256.New() + _, err = io.Copy(firstHealedH, firstGr) + if err != nil { + t.Fatal() + } + firstHealedDataSha256 := firstHealedH.Sum(nil) + + if !bytes.Equal(actualSha256, firstHealedDataSha256) { + t.Fatal("object healed wrong") + } + + // remove another data shard + err = removeAll(pathJoin(shuffledDisks[1].String(), bucket, object)) + if err != nil { + t.Fatalf("Failed to delete a file - %v", err) + } + + _, err = obj.HealObject(ctx, bucket, object, "", madmin.HealOpts{ScanMode: madmin.HealNormalScan}) + if err != nil { + t.Fatal(err) + } + + secondGr, err := obj.GetObjectNInfo(ctx, bucket, object, nil, nil, noLock, ObjectOptions{}) + defer secondGr.Close() + if err != nil { + t.Fatal(err) + } + + secondHealedH := sha256.New() + _, err = io.Copy(secondHealedH, secondGr) + if err != nil { + t.Fatal() + } + secondHealedDataSha256 := secondHealedH.Sum(nil) + + if !bytes.Equal(actualSha256, secondHealedDataSha256) { + t.Fatal("object healed wrong") + } + }) + } +} diff --git a/cmd/erasure-lowlevel-heal.go b/cmd/erasure-lowlevel-heal.go deleted file mode 100644 index f63253c57..000000000 --- a/cmd/erasure-lowlevel-heal.go +++ /dev/null @@ -1,59 +0,0 @@ -// Copyright (c) 2015-2021 MinIO, Inc. -// -// This file is part of MinIO Object Storage stack -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package cmd - -import ( - "context" - "io" - - "github.com/minio/minio/internal/bpool" - xioutil "github.com/minio/minio/internal/ioutil" - "github.com/minio/minio/internal/logger" -) - -// Heal heals the shard files on non-nil writers. Note that the quorum passed is 1 -// as healing should continue even if it has been successful healing only one shard file. -func (e Erasure) Heal(ctx context.Context, readers []io.ReaderAt, writers []io.Writer, size int64, bp *bpool.BytePoolCap) error { - r, w := xioutil.WaitPipe() - go func() { - _, err := e.Decode(ctx, w, readers, 0, size, size, nil) - w.CloseWithError(err) - }() - - // Fetch buffer for I/O, returns from the pool if not allocates a new one and returns. - var buffer []byte - switch { - case size == 0: - buffer = make([]byte, 1) // Allocate atleast a byte to reach EOF - case size >= e.blockSize: - buffer = bp.Get() - defer bp.Put(buffer) - case size < e.blockSize: - // No need to allocate fully blockSizeV1 buffer if the incoming data is smaller. - buffer = make([]byte, size, 2*size+int64(e.parityBlocks+e.dataBlocks-1)) - } - - // quorum is 1 because CreateFile should continue writing as long as we are writing to even 1 disk. - n, err := e.Encode(ctx, r, writers, buffer, 1) - if err == nil && n != size { - logger.LogIf(ctx, errLessData) - err = errLessData - } - r.CloseWithError(err) - return err -} diff --git a/go.mod b/go.mod index 4080e2898..24f3917b4 100644 --- a/go.mod +++ b/go.mod @@ -40,7 +40,7 @@ require ( github.com/klauspost/cpuid/v2 v2.0.9 github.com/klauspost/pgzip v1.2.5 github.com/klauspost/readahead v1.3.1 - github.com/klauspost/reedsolomon v1.9.13 + github.com/klauspost/reedsolomon v1.9.15 github.com/lib/pq v1.9.0 github.com/miekg/dns v1.1.43 github.com/minio/cli v1.22.0 diff --git a/go.sum b/go.sum index 9a82873a1..4f727d6d2 100644 --- a/go.sum +++ b/go.sum @@ -953,8 +953,8 @@ github.com/klauspost/pgzip v1.2.5 h1:qnWYvvKqedOF2ulHpMG72XQol4ILEJ8k2wwRl/Km8oE github.com/klauspost/pgzip v1.2.5/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs= github.com/klauspost/readahead v1.3.1 h1:QqXNYvm+VvqYcbrRT4LojUciM0XrznFRIDrbHiJtu/0= github.com/klauspost/readahead v1.3.1/go.mod h1:AH9juHzNH7xqdqFHrMRSHeH2Ps+vFf+kblDqzPFiLJg= -github.com/klauspost/reedsolomon v1.9.13 h1:Xr0COKf7F0ACTXUNnz2ZFCWlUKlUTAUX3y7BODdUxqU= -github.com/klauspost/reedsolomon v1.9.13/go.mod h1:eqPAcE7xar5CIzcdfwydOEdcmchAKAP/qs14y4GCBOk= +github.com/klauspost/reedsolomon v1.9.15 h1:g2erWKD2M6rgnPf89fCji6jNlhMKMdXcuNHMW1SYCIo= +github.com/klauspost/reedsolomon v1.9.15/go.mod h1:eqPAcE7xar5CIzcdfwydOEdcmchAKAP/qs14y4GCBOk= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=