tarFilterer: handle multiple archives

When processing .dockerignore files, we produce mini-archives for each
individual item being copied as part of a directory tree, so if we pass
them through a tarFilterer, it needs to not give up when it hits the end
of the first archive that it sees.

Add a flag to the tarFilterer that it sets when it closes the writing
end of the pipe that it's reading, and if the flag isn't set when the
reader finds an EOF, have the reader gear up to read another archive
instead of just refusing to process any more data.

Signed-off-by: Nalin Dahyabhai <nalin@redhat.com>
This commit is contained in:
Nalin Dahyabhai 2020-07-28 11:11:18 -04:00
parent 03d782c011
commit bee80f0e36
2 changed files with 182 additions and 28 deletions

View File

@ -54,6 +54,8 @@ func (s *simpleDigester) Digest() digest.Digest {
type tarFilterer struct {
wg sync.WaitGroup
pipeWriter *io.PipeWriter
closedLock sync.Mutex
closed bool
err error
}
@ -62,50 +64,77 @@ func (t *tarFilterer) Write(p []byte) (int, error) {
}
func (t *tarFilterer) Close() error {
t.closedLock.Lock()
if t.closed {
t.closedLock.Unlock()
return errors.Errorf("tar filter is already closed")
}
t.closed = true
t.closedLock.Unlock()
err := t.pipeWriter.Close()
t.wg.Wait()
if err != nil {
return err
return errors.Wrapf(err, "error closing filter pipe")
}
return t.err
}
// newTarFilterer passes a tarball through to an io.WriteCloser, potentially
// calling filter to modify headers as it goes.
func newTarFilterer(writeCloser io.WriteCloser, filter func(hdr *tar.Header)) io.WriteCloser {
// newTarFilterer passes one or more tar archives through to an io.WriteCloser
// as a single archive, potentially calling filter to modify headers and
// contents as it goes.
func newTarFilterer(writeCloser io.WriteCloser, filter func(hdr *tar.Header) (skip, replaceContents bool, replacementContents io.Reader)) io.WriteCloser {
pipeReader, pipeWriter := io.Pipe()
tarReader := tar.NewReader(pipeReader)
tarWriter := tar.NewWriter(writeCloser)
filterer := &tarFilterer{
pipeWriter: pipeWriter,
}
filterer.wg.Add(1)
go func() {
hdr, err := tarReader.Next()
for err == nil {
if filter != nil {
filter(hdr)
filterer.closedLock.Lock()
closed := filterer.closed
filterer.closedLock.Unlock()
for !closed {
tarReader := tar.NewReader(pipeReader)
hdr, err := tarReader.Next()
for err == nil {
var skip, replaceContents bool
var replacementContents io.Reader
if filter != nil {
skip, replaceContents, replacementContents = filter(hdr)
}
if !skip {
err = tarWriter.WriteHeader(hdr)
if err != nil {
err = errors.Wrapf(err, "error filtering tar header for %q", hdr.Name)
break
}
if hdr.Size != 0 {
var n int64
var copyErr error
if replaceContents {
n, copyErr = io.CopyN(tarWriter, replacementContents, hdr.Size)
} else {
n, copyErr = io.Copy(tarWriter, tarReader)
}
if copyErr != nil {
err = errors.Wrapf(copyErr, "error copying content for %q", hdr.Name)
break
}
if n != hdr.Size {
err = errors.Errorf("error filtering content for %q: expected %d bytes, got %d bytes", hdr.Name, hdr.Size, n)
break
}
}
}
hdr, err = tarReader.Next()
}
err = tarWriter.WriteHeader(hdr)
if err != nil {
err = errors.Wrapf(err, "error filtering tar header for %q", hdr.Name)
if err != io.EOF {
filterer.err = errors.Wrapf(err, "error reading tar archive")
break
}
if hdr.Size != 0 {
n, copyErr := io.Copy(tarWriter, tarReader)
if copyErr != nil {
err = errors.Wrapf(copyErr, "error filtering content for %q", hdr.Name)
break
}
if n != hdr.Size {
err = errors.Errorf("error filtering content for %q: expected %d bytes, got %d bytes", hdr.Name, hdr.Size, n)
break
}
}
hdr, err = tarReader.Next()
}
if err != io.EOF {
filterer.err = err
filterer.closedLock.Lock()
closed = filterer.closed
filterer.closedLock.Unlock()
}
pipeReader.Close()
tarWriter.Close()

View File

@ -5,6 +5,8 @@ import (
"bytes"
"io"
"io/ioutil"
"strings"
"sync"
"testing"
"time"
@ -121,7 +123,10 @@ func TestCompositeDigester(t *testing.T) {
}
if filtered {
// wrap the WriteCloser in another WriteCloser
hasher = newTarFilterer(hasher, func(hdr *tar.Header) { hdr.ModTime = zero })
hasher = newTarFilterer(hasher, func(hdr *tar.Header) (bool, bool, io.Reader) {
hdr.ModTime = zero
return false, false, nil
})
require.NotNil(t, hasher, "newTarFilterer returned a null WriteCloser?")
}
// write this item as an archive
@ -180,3 +185,123 @@ func TestCompositeDigester(t *testing.T) {
})
}
}
func TestTarFilterer(t *testing.T) {
tests := []struct {
name string
input, output map[string]string
breakAfter int
filter func(*tar.Header) (bool, bool, io.Reader)
}{
{
name: "none",
input: map[string]string{
"file a": "content a",
"file b": "content b",
},
output: map[string]string{
"file a": "content a",
"file b": "content b",
},
filter: nil,
},
{
name: "plain",
input: map[string]string{
"file a": "content a",
"file b": "content b",
},
output: map[string]string{
"file a": "content a",
"file b": "content b",
},
filter: func(*tar.Header) (bool, bool, io.Reader) { return false, false, nil },
},
{
name: "skip",
input: map[string]string{
"file a": "content a",
"file b": "content b",
},
output: map[string]string{
"file a": "content a",
},
filter: func(hdr *tar.Header) (bool, bool, io.Reader) { return hdr.Name == "file b", false, nil },
},
{
name: "replace",
input: map[string]string{
"file a": "content a",
"file b": "content b",
"file c": "content c",
},
output: map[string]string{
"file a": "content a",
"file b": "content b+c",
"file c": "content c",
},
breakAfter: 2,
filter: func(hdr *tar.Header) (bool, bool, io.Reader) {
if hdr.Name == "file b" {
content := "content b+c"
hdr.Size = int64(len(content))
return false, true, strings.NewReader(content)
}
return false, false, nil
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
var buffer bytes.Buffer
tw := tar.NewWriter(&buffer)
files := 0
for filename, contents := range test.input {
hdr := tar.Header{
Name: filename,
Size: int64(len(contents)),
Typeflag: tar.TypeReg,
}
err := tw.WriteHeader(&hdr)
require.Nil(t, err, "unexpected error from TarWriter.WriteHeader")
n, err := io.CopyN(tw, strings.NewReader(contents), int64(len(contents)))
require.Nil(t, err, "unexpected error copying to tar writer")
require.Equal(t, int64(len(contents)), n, "unexpected write length")
files++
if test.breakAfter != 0 && files%test.breakAfter == 0 {
// this test may have us writing multiple archives to the buffer
// they should still read back as a single archive
tw.Close()
tw = tar.NewWriter(&buffer)
}
}
tw.Close()
output := make(map[string]string)
pipeReader, pipeWriter := io.Pipe()
var wg sync.WaitGroup
wg.Add(1)
go func() {
tr := tar.NewReader(pipeReader)
hdr, err := tr.Next()
for err == nil {
var buffer bytes.Buffer
var n int64
n, err = io.Copy(&buffer, tr)
require.Nil(t, err, "unexpected error copying from tar reader")
require.Equal(t, hdr.Size, n, "unexpected read length")
output[hdr.Name] = buffer.String()
hdr, err = tr.Next()
}
require.Equal(t, io.EOF, err, "unexpected error ended our tarstream read")
pipeReader.Close()
wg.Done()
}()
filterer := newTarFilterer(pipeWriter, test.filter)
_, err := io.Copy(filterer, &buffer)
require.Nil(t, err, "unexpected error copying archive through filter to reader")
filterer.Close()
wg.Wait()
require.Equal(t, test.output, output, "got unexpected results")
})
}
}