mirror of https://github.com/minio/minio.git
				
				
				
			
		
			
	
	
		
			890 lines
		
	
	
		
			24 KiB
		
	
	
	
		
			Go
		
	
	
	
		
		
			
		
	
	
			890 lines
		
	
	
		
			24 KiB
		
	
	
	
		
			Go
		
	
	
	
|  | /* | ||
|  |  * Minio Cloud Storage, (C) 2017 Minio, Inc. | ||
|  |  * | ||
|  |  * Licensed under the Apache License, Version 2.0 (the "License"); | ||
|  |  * you may not use this file except in compliance with the License. | ||
|  |  * You may obtain a copy of the License at | ||
|  |  * | ||
|  |  *     http://www.apache.org/licenses/LICENSE-2.0
 | ||
|  |  * | ||
|  |  * Unless required by applicable law or agreed to in writing, software | ||
|  |  * distributed under the License is distributed on an "AS IS" BASIS, | ||
|  |  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
|  |  * See the License for the specific language governing permissions and | ||
|  |  * limitations under the License. | ||
|  |  */ | ||
|  | 
 | ||
|  | package cmd | ||
|  | 
 | ||
|  | import ( | ||
|  | 	"context" | ||
|  | 	"crypto/sha256" | ||
|  | 	"errors" | ||
|  | 	"fmt" | ||
|  | 	"hash" | ||
|  | 	"io" | ||
|  | 	"strconv" | ||
|  | 	"strings" | ||
|  | 
 | ||
|  | 	"encoding/hex" | ||
|  | 
 | ||
|  | 	"cloud.google.com/go/storage" | ||
|  | 
 | ||
|  | 	"google.golang.org/api/googleapi" | ||
|  | 	"google.golang.org/api/iterator" | ||
|  | 
 | ||
|  | 	"encoding/base64" | ||
|  | 
 | ||
|  | 	minio "github.com/minio/minio-go" | ||
|  | 	"github.com/minio/minio-go/pkg/policy" | ||
|  | ) | ||
|  | 
 | ||
|  | var ( | ||
|  | 	// ErrNotValidMultipartIdentifier the multipart identifier is not in the correct form
 | ||
|  | 	ErrNotValidMultipartIdentifier = errors.New("Not a valid multipart identifier") | ||
|  | ) | ||
|  | 
 | ||
|  | const ( | ||
|  | 	// ZZZZMinioPrefix is used for metadata and multiparts. The prefix is being filtered out,
 | ||
|  | 	// hence the naming of ZZZZ (last prefix)
 | ||
|  | 	ZZZZMinioPrefix = "ZZZZ-Minio" | ||
|  | ) | ||
|  | 
 | ||
|  | // Convert Minio errors to minio object layer errors.
 | ||
|  | func gcsToObjectError(err error, params ...string) error { | ||
|  | 	if err == nil { | ||
|  | 		return nil | ||
|  | 	} | ||
|  | 
 | ||
|  | 	e, ok := err.(*Error) | ||
|  | 	if !ok { | ||
|  | 		// Code should be fixed if this function is called without doing traceError()
 | ||
|  | 		// Else handling different situations in this function makes this function complicated.
 | ||
|  | 		errorIf(err, "Expected type *Error") | ||
|  | 		return err | ||
|  | 	} | ||
|  | 
 | ||
|  | 	err = e.e | ||
|  | 
 | ||
|  | 	bucket := "" | ||
|  | 	object := "" | ||
|  | 	if len(params) >= 1 { | ||
|  | 		bucket = params[0] | ||
|  | 	} | ||
|  | 	if len(params) == 2 { | ||
|  | 		object = params[1] | ||
|  | 	} | ||
|  | 
 | ||
|  | 	// in some cases just a plain error is being returned
 | ||
|  | 	switch err.Error() { | ||
|  | 	case "storage: bucket doesn't exist": | ||
|  | 		err = BucketNotFound{ | ||
|  | 			Bucket: bucket, | ||
|  | 		} | ||
|  | 		e.e = err | ||
|  | 		return e | ||
|  | 	case "storage: object doesn't exist": | ||
|  | 		err = ObjectNotFound{ | ||
|  | 			Bucket: bucket, | ||
|  | 			Object: object, | ||
|  | 		} | ||
|  | 		e.e = err | ||
|  | 		return e | ||
|  | 	} | ||
|  | 
 | ||
|  | 	googleAPIErr, ok := err.(*googleapi.Error) | ||
|  | 	if !ok { | ||
|  | 		// We don't interpret non Minio errors. As minio errors will
 | ||
|  | 		// have StatusCode to help to convert to object errors.
 | ||
|  | 		return e | ||
|  | 	} | ||
|  | 
 | ||
|  | 	reason := googleAPIErr.Errors[0].Reason | ||
|  | 	message := googleAPIErr.Errors[0].Message | ||
|  | 
 | ||
|  | 	switch reason { | ||
|  | 	case "required": | ||
|  | 		// Anonymous users does not have storage.xyz access to project 123.
 | ||
|  | 		fallthrough | ||
|  | 	case "keyInvalid": | ||
|  | 		fallthrough | ||
|  | 	case "forbidden": | ||
|  | 		err = PrefixAccessDenied{ | ||
|  | 			Bucket: bucket, | ||
|  | 			Object: object, | ||
|  | 		} | ||
|  | 	case "invalid": | ||
|  | 		err = BucketNameInvalid{ | ||
|  | 			Bucket: bucket, | ||
|  | 		} | ||
|  | 	case "notFound": | ||
|  | 		if object != "" { | ||
|  | 			err = ObjectNotFound{ | ||
|  | 				Bucket: bucket, | ||
|  | 				Object: object, | ||
|  | 			} | ||
|  | 		} else { | ||
|  | 			err = BucketNotFound{ | ||
|  | 				Bucket: bucket, | ||
|  | 			} | ||
|  | 		} | ||
|  | 	case "conflict": | ||
|  | 		if message == "You already own this bucket. Please select another name." { | ||
|  | 			err = BucketAlreadyOwnedByYou{ | ||
|  | 				Bucket: bucket, | ||
|  | 			} | ||
|  | 		} else if message == "Sorry, that name is not available. Please try a different one." { | ||
|  | 			err = BucketAlreadyExists{ | ||
|  | 				Bucket: bucket, | ||
|  | 			} | ||
|  | 		} else { | ||
|  | 			err = BucketNotEmpty{ | ||
|  | 				Bucket: bucket, | ||
|  | 			} | ||
|  | 		} | ||
|  | 	default: | ||
|  | 		err = fmt.Errorf("Unsupported error reason: %s", reason) | ||
|  | 
 | ||
|  | 	} | ||
|  | 
 | ||
|  | 	_ = bucket | ||
|  | 	_ = object | ||
|  | 
 | ||
|  | 	e.e = err | ||
|  | 	return e | ||
|  | } | ||
|  | 
 | ||
|  | // gcsGateway - Implements gateway for Minio and GCS compatible object storage servers.
 | ||
|  | type gcsGateway struct { | ||
|  | 	client     *storage.Client | ||
|  | 	anonClient *minio.Core | ||
|  | 	projectID  string | ||
|  | 	ctx        context.Context | ||
|  | } | ||
|  | 
 | ||
|  | // newGCSGateway returns gcs gatewaylayer
 | ||
|  | func newGCSGateway(args []string) (GatewayLayer, error) { | ||
|  | 	if len(args) != 1 { | ||
|  | 		return nil, fmt.Errorf("ProjectID expected") | ||
|  | 	} | ||
|  | 
 | ||
|  | 	profile := "default" | ||
|  | 	// TODO: don't know where to set profile yet
 | ||
|  | 	_ = profile | ||
|  | 
 | ||
|  | 	endpoint := "storage.googleapis.com" | ||
|  | 	secure := true | ||
|  | 
 | ||
|  | 	projectID := args[0] | ||
|  | 
 | ||
|  | 	ctx := context.Background() | ||
|  | 
 | ||
|  | 	// Creates a client.
 | ||
|  | 	client, err := storage.NewClient(ctx) | ||
|  | 	if err != nil { | ||
|  | 		return nil, err | ||
|  | 	} | ||
|  | 
 | ||
|  | 	anonClient, err := minio.NewCore(endpoint, "", "", secure) | ||
|  | 	if err != nil { | ||
|  | 		return nil, err | ||
|  | 	} | ||
|  | 
 | ||
|  | 	return &gcsGateway{ | ||
|  | 		client:     client, | ||
|  | 		projectID:  projectID, | ||
|  | 		ctx:        ctx, | ||
|  | 		anonClient: anonClient, | ||
|  | 	}, nil | ||
|  | } | ||
|  | 
 | ||
|  | // Shutdown - save any gateway metadata to disk
 | ||
|  | // if necessary and reload upon next restart.
 | ||
|  | func (l *gcsGateway) Shutdown() error { | ||
|  | 	// TODO
 | ||
|  | 	return nil | ||
|  | } | ||
|  | 
 | ||
|  | // StorageInfo - Not relevant to GCS backend.
 | ||
|  | func (l *gcsGateway) StorageInfo() StorageInfo { | ||
|  | 	return StorageInfo{} | ||
|  | } | ||
|  | 
 | ||
|  | // MakeBucket - Create a new container on GCS backend.
 | ||
|  | func (l *gcsGateway) MakeBucket(bucket string) error { | ||
|  | 	// will never be called, only satisfy ObjectLayer interface
 | ||
|  | 	return traceError(NotImplemented{}) | ||
|  | } | ||
|  | 
 | ||
|  | // MakeBucket - Create a new container on GCS backend.
 | ||
|  | func (l *gcsGateway) MakeBucketWithLocation(bucket, location string) error { | ||
|  | 	bkt := l.client.Bucket(bucket) | ||
|  | 
 | ||
|  | 	if err := bkt.Create(l.ctx, l.projectID, &storage.BucketAttrs{ | ||
|  | 		Location: serverConfig.Region, | ||
|  | 	}); err != nil { | ||
|  | 		return gcsToObjectError(traceError(err), bucket) | ||
|  | 	} | ||
|  | 
 | ||
|  | 	return nil | ||
|  | } | ||
|  | 
 | ||
|  | // GetBucketInfo - Get bucket metadata..
 | ||
|  | func (l *gcsGateway) GetBucketInfo(bucket string) (BucketInfo, error) { | ||
|  | 	attrs, err := l.client.Bucket(bucket).Attrs(l.ctx) | ||
|  | 	if err != nil { | ||
|  | 		return BucketInfo{}, gcsToObjectError(traceError(err), bucket) | ||
|  | 	} | ||
|  | 
 | ||
|  | 	return BucketInfo{ | ||
|  | 		Name:    attrs.Name, | ||
|  | 		Created: attrs.Created, | ||
|  | 	}, nil | ||
|  | } | ||
|  | 
 | ||
|  | // ListBuckets lists all GCS buckets
 | ||
|  | func (l *gcsGateway) ListBuckets() ([]BucketInfo, error) { | ||
|  | 	it := l.client.Buckets(l.ctx, l.projectID) | ||
|  | 
 | ||
|  | 	b := []BucketInfo{} | ||
|  | 	for { | ||
|  | 		attrs, err := it.Next() | ||
|  | 		if err == iterator.Done { | ||
|  | 			break | ||
|  | 		} | ||
|  | 
 | ||
|  | 		if err != nil { | ||
|  | 			return []BucketInfo{}, gcsToObjectError(traceError(err)) | ||
|  | 		} | ||
|  | 
 | ||
|  | 		b = append(b, BucketInfo{ | ||
|  | 			Name:    attrs.Name, | ||
|  | 			Created: attrs.Created, | ||
|  | 		}) | ||
|  | 	} | ||
|  | 
 | ||
|  | 	return b, nil | ||
|  | } | ||
|  | 
 | ||
|  | // DeleteBucket delete a bucket on GCS
 | ||
|  | func (l *gcsGateway) DeleteBucket(bucket string) error { | ||
|  | 	err := l.client.Bucket(bucket).Delete(l.ctx) | ||
|  | 	if err != nil { | ||
|  | 		return gcsToObjectError(traceError(err), bucket) | ||
|  | 	} | ||
|  | 
 | ||
|  | 	return nil | ||
|  | } | ||
|  | 
 | ||
|  | func toGCSPageToken(name string) string { | ||
|  | 	length := uint16(len(name)) | ||
|  | 
 | ||
|  | 	b := []byte{ | ||
|  | 		0xa, | ||
|  | 		byte(length & 0xFF), | ||
|  | 	} | ||
|  | 
 | ||
|  | 	length = length >> 7 | ||
|  | 	if length > 0 { | ||
|  | 		b = append(b, byte(length&0xFF)) | ||
|  | 	} | ||
|  | 
 | ||
|  | 	b = append(b, []byte(name)...) | ||
|  | 
 | ||
|  | 	return base64.StdEncoding.EncodeToString(b) | ||
|  | } | ||
|  | 
 | ||
|  | // ListObjects - lists all blobs in GCS bucket filtered by prefix
 | ||
|  | func (l *gcsGateway) ListObjects(bucket string, prefix string, marker string, delimiter string, maxKeys int) (ListObjectsInfo, error) { | ||
|  | 	it := l.client.Bucket(bucket).Objects(l.ctx, &storage.Query{Delimiter: delimiter, Prefix: prefix, Versions: false}) | ||
|  | 
 | ||
|  | 	isTruncated := false | ||
|  | 	nextMarker := "" | ||
|  | 	prefixes := []string{} | ||
|  | 
 | ||
|  | 	maxKeys = maxKeys | ||
|  | 
 | ||
|  | 	// we'll set marker to continue
 | ||
|  | 	it.PageInfo().Token = marker | ||
|  | 
 | ||
|  | 	objects := []ObjectInfo{} | ||
|  | 	for { | ||
|  | 		if len(objects) >= maxKeys { | ||
|  | 			// check if there is one next object and
 | ||
|  | 			// if that one next object is our hidden
 | ||
|  | 			// metadata folder, then just break
 | ||
|  | 			// otherwise we've truncated the output
 | ||
|  | 
 | ||
|  | 			attrs, _ := it.Next() | ||
|  | 			if attrs == nil { | ||
|  | 			} else if attrs.Prefix == ZZZZMinioPrefix { | ||
|  | 				break | ||
|  | 			} | ||
|  | 
 | ||
|  | 			isTruncated = true | ||
|  | 			break | ||
|  | 		} | ||
|  | 
 | ||
|  | 		attrs, err := it.Next() | ||
|  | 		if err == iterator.Done { | ||
|  | 			break | ||
|  | 		} else if err != nil { | ||
|  | 			return ListObjectsInfo{}, gcsToObjectError(traceError(err), bucket, prefix) | ||
|  | 		} | ||
|  | 
 | ||
|  | 		nextMarker = toGCSPageToken(attrs.Name) | ||
|  | 
 | ||
|  | 		if attrs.Prefix == ZZZZMinioPrefix { | ||
|  | 			// we don't return our metadata prefix
 | ||
|  | 			continue | ||
|  | 		} else if attrs.Prefix != "" { | ||
|  | 			prefixes = append(prefixes, attrs.Prefix) | ||
|  | 			continue | ||
|  | 		} | ||
|  | 
 | ||
|  | 		objects = append(objects, ObjectInfo{ | ||
|  | 			Name:            attrs.Name, | ||
|  | 			Bucket:          attrs.Bucket, | ||
|  | 			ModTime:         attrs.Updated, | ||
|  | 			Size:            attrs.Size, | ||
|  | 			MD5Sum:          hex.EncodeToString(attrs.MD5), | ||
|  | 			UserDefined:     attrs.Metadata, | ||
|  | 			ContentType:     attrs.ContentType, | ||
|  | 			ContentEncoding: attrs.ContentEncoding, | ||
|  | 		}) | ||
|  | 	} | ||
|  | 
 | ||
|  | 	return ListObjectsInfo{ | ||
|  | 		IsTruncated: isTruncated, | ||
|  | 		NextMarker:  nextMarker, | ||
|  | 		Prefixes:    prefixes, | ||
|  | 		Objects:     objects, | ||
|  | 	}, nil | ||
|  | } | ||
|  | 
 | ||
|  | // ListObjectsV2 - lists all blobs in GCS bucket filtered by prefix
 | ||
|  | func (l *gcsGateway) ListObjectsV2(bucket, prefix, continuationToken string, fetchOwner bool, delimiter string, maxKeys int) (ListObjectsV2Info, error) { | ||
|  | 	it := l.client.Bucket(bucket).Objects(l.ctx, &storage.Query{Delimiter: delimiter, Prefix: prefix, Versions: false}) | ||
|  | 
 | ||
|  | 	isTruncated := false | ||
|  | 	nextMarker := "" | ||
|  | 	prefixes := []string{} | ||
|  | 
 | ||
|  | 	objects := []ObjectInfo{} | ||
|  | 	for { | ||
|  | 		if maxKeys < len(objects) { | ||
|  | 			isTruncated = true | ||
|  | 			nextMarker = it.PageInfo().Token | ||
|  | 			break | ||
|  | 		} | ||
|  | 
 | ||
|  | 		attrs, err := it.Next() | ||
|  | 		if err == iterator.Done { | ||
|  | 			break | ||
|  | 		} | ||
|  | 
 | ||
|  | 		if err != nil { | ||
|  | 			return ListObjectsV2Info{}, gcsToObjectError(traceError(err), bucket, prefix) | ||
|  | 		} | ||
|  | 
 | ||
|  | 		if attrs.Prefix != "" { | ||
|  | 			prefixes = append(prefixes, attrs.Prefix) | ||
|  | 			continue | ||
|  | 		} | ||
|  | 
 | ||
|  | 		objects = append(objects, fromGCSObjectInfo(attrs)) | ||
|  | 	} | ||
|  | 
 | ||
|  | 	return ListObjectsV2Info{ | ||
|  | 		IsTruncated:           isTruncated, | ||
|  | 		ContinuationToken:     continuationToken, | ||
|  | 		NextContinuationToken: nextMarker, | ||
|  | 		Prefixes:              prefixes, | ||
|  | 		Objects:               objects, | ||
|  | 	}, nil | ||
|  | } | ||
|  | 
 | ||
|  | // GetObject - reads an object from GCS. Supports additional
 | ||
|  | // parameters like offset and length which are synonymous with
 | ||
|  | // HTTP Range requests.
 | ||
|  | //
 | ||
|  | // startOffset indicates the starting read location of the object.
 | ||
|  | // length indicates the total length of the object.
 | ||
|  | func (l *gcsGateway) GetObject(bucket string, key string, startOffset int64, length int64, writer io.Writer) error { | ||
|  | 	// if we want to mimic S3 behavior exactly, we need to verify if bucket exists first,
 | ||
|  | 	// otherwise gcs will just return object not exist in case of non-existing bucket
 | ||
|  | 	if _, err := l.client.Bucket(bucket).Attrs(l.ctx); err != nil { | ||
|  | 		return gcsToObjectError(traceError(err), bucket) | ||
|  | 	} | ||
|  | 
 | ||
|  | 	object := l.client.Bucket(bucket).Object(key) | ||
|  | 	r, err := object.NewRangeReader(l.ctx, startOffset, length) | ||
|  | 	if err != nil { | ||
|  | 		return gcsToObjectError(traceError(err), bucket, key) | ||
|  | 	} | ||
|  | 
 | ||
|  | 	defer r.Close() | ||
|  | 
 | ||
|  | 	if _, err := io.Copy(writer, r); err != nil { | ||
|  | 		return gcsToObjectError(traceError(err), bucket, key) | ||
|  | 	} | ||
|  | 
 | ||
|  | 	return nil | ||
|  | } | ||
|  | 
 | ||
|  | // fromGCSObjectInfo converts GCS BucketAttrs to gateway ObjectInfo
 | ||
|  | func fromGCSObjectInfo(attrs *storage.ObjectAttrs) ObjectInfo { | ||
|  | 	return ObjectInfo{ | ||
|  | 		Name:            attrs.Name, | ||
|  | 		Bucket:          attrs.Bucket, | ||
|  | 		ModTime:         attrs.Updated, | ||
|  | 		Size:            attrs.Size, | ||
|  | 		MD5Sum:          hex.EncodeToString(attrs.MD5), | ||
|  | 		UserDefined:     attrs.Metadata, | ||
|  | 		ContentType:     attrs.ContentType, | ||
|  | 		ContentEncoding: attrs.ContentEncoding, | ||
|  | 	} | ||
|  | } | ||
|  | 
 | ||
|  | // GetObjectInfo - reads object info and replies back ObjectInfo
 | ||
|  | func (l *gcsGateway) GetObjectInfo(bucket string, object string) (ObjectInfo, error) { | ||
|  | 	// if we want to mimic S3 behavior exactly, we need to verify if bucket exists first,
 | ||
|  | 	// otherwise gcs will just return object not exist in case of non-existing bucket
 | ||
|  | 	if _, err := l.client.Bucket(bucket).Attrs(l.ctx); err != nil { | ||
|  | 		return ObjectInfo{}, gcsToObjectError(traceError(err), bucket) | ||
|  | 	} | ||
|  | 
 | ||
|  | 	attrs, err := l.client.Bucket(bucket).Object(object).Attrs(l.ctx) | ||
|  | 	if err != nil { | ||
|  | 		return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, object) | ||
|  | 	} | ||
|  | 
 | ||
|  | 	return fromGCSObjectInfo(attrs), nil | ||
|  | } | ||
|  | 
 | ||
|  | // PutObject - Create a new object with the incoming data,
 | ||
|  | func (l *gcsGateway) PutObject(bucket string, key string, size int64, data io.Reader, metadata map[string]string, sha256sum string) (ObjectInfo, error) { | ||
|  | 	// if we want to mimic S3 behavior exactly, we need to verify if bucket exists first,
 | ||
|  | 	// otherwise gcs will just return object not exist in case of non-existing bucket
 | ||
|  | 	if _, err := l.client.Bucket(bucket).Attrs(l.ctx); err != nil { | ||
|  | 		return ObjectInfo{}, gcsToObjectError(traceError(err), bucket) | ||
|  | 	} | ||
|  | 
 | ||
|  | 	var sha256Writer hash.Hash | ||
|  | 
 | ||
|  | 	teeReader := data | ||
|  | 	if sha256sum == "" { | ||
|  | 	} else if _, err := hex.DecodeString(sha256sum); err != nil { | ||
|  | 		return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, key) | ||
|  | 	} else { | ||
|  | 		sha256Writer = sha256.New() | ||
|  | 		teeReader = io.TeeReader(data, sha256Writer) | ||
|  | 	} | ||
|  | 
 | ||
|  | 	delete(metadata, "md5Sum") | ||
|  | 
 | ||
|  | 	object := l.client.Bucket(bucket).Object(key) | ||
|  | 
 | ||
|  | 	w := object.NewWriter(l.ctx) | ||
|  | 
 | ||
|  | 	w.ContentType = metadata["content-type"] | ||
|  | 	w.ContentEncoding = metadata["content-encoding"] | ||
|  | 	w.Metadata = metadata | ||
|  | 
 | ||
|  | 	_, err := io.Copy(w, teeReader) | ||
|  | 	if err != nil { | ||
|  | 		return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, key) | ||
|  | 	} | ||
|  | 
 | ||
|  | 	err = w.Close() | ||
|  | 	if err != nil { | ||
|  | 		return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, key) | ||
|  | 	} | ||
|  | 
 | ||
|  | 	attrs, err := object.Attrs(l.ctx) | ||
|  | 	if err != nil { | ||
|  | 		return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, key) | ||
|  | 	} | ||
|  | 	if sha256sum != "" { | ||
|  | 		newSHA256sum := hex.EncodeToString(sha256Writer.Sum(nil)) | ||
|  | 		if newSHA256sum != sha256sum { | ||
|  | 			//l.Client.RemoveObject(bucket, object)
 | ||
|  | 			return ObjectInfo{}, traceError(SHA256Mismatch{}) | ||
|  | 		} | ||
|  | 	} | ||
|  | 
 | ||
|  | 	return fromGCSObjectInfo(attrs), nil | ||
|  | } | ||
|  | 
 | ||
|  | // CopyObject - Copies a blob from source container to destination container.
 | ||
|  | func (l *gcsGateway) CopyObject(srcBucket string, srcObject string, destBucket string, destObject string, metadata map[string]string) (ObjectInfo, error) { | ||
|  | 	src := l.client.Bucket(srcBucket).Object(srcObject) | ||
|  | 	dst := l.client.Bucket(destBucket).Object(destObject) | ||
|  | 
 | ||
|  | 	attrs, err := dst.CopierFrom(src).Run(l.ctx) | ||
|  | 	if err != nil { | ||
|  | 		return ObjectInfo{}, gcsToObjectError(traceError(err), destBucket, destObject) | ||
|  | 	} | ||
|  | 
 | ||
|  | 	return fromGCSObjectInfo(attrs), nil | ||
|  | } | ||
|  | 
 | ||
|  | // DeleteObject - Deletes a blob in bucket
 | ||
|  | func (l *gcsGateway) DeleteObject(bucket string, object string) error { | ||
|  | 	err := l.client.Bucket(bucket).Object(object).Delete(l.ctx) | ||
|  | 	if err != nil { | ||
|  | 		return gcsToObjectError(traceError(err), bucket, object) | ||
|  | 	} | ||
|  | 
 | ||
|  | 	return nil | ||
|  | } | ||
|  | 
 | ||
|  | // ListMultipartUploads - lists all multipart uploads.
 | ||
|  | func (l *gcsGateway) ListMultipartUploads(bucket string, prefix string, keyMarker string, uploadIDMarker string, delimiter string, maxUploads int) (ListMultipartsInfo, error) { | ||
|  | 	// TODO: implement prefix and prefixes, how does this work for Multiparts??
 | ||
|  | 	prefix = fmt.Sprintf("%s/multipart-", ZZZZMinioPrefix) | ||
|  | 
 | ||
|  | 	it := l.client.Bucket(bucket).Objects(l.ctx, &storage.Query{Delimiter: delimiter, Prefix: prefix, Versions: false}) | ||
|  | 
 | ||
|  | 	isTruncated := false | ||
|  | 	// prefixes := []string{}
 | ||
|  | 	nextMarker := "" | ||
|  | 
 | ||
|  | 	uploads := []uploadMetadata{} | ||
|  | 	for { | ||
|  | 		if maxUploads <= len(uploads) { | ||
|  | 			isTruncated = true | ||
|  | 			nextMarker = it.PageInfo().Token | ||
|  | 			break | ||
|  | 		} | ||
|  | 
 | ||
|  | 		attrs, err := it.Next() | ||
|  | 		if err == iterator.Done { | ||
|  | 			break | ||
|  | 		} | ||
|  | 
 | ||
|  | 		if err != nil { | ||
|  | 			return ListMultipartsInfo{}, gcsToObjectError(traceError(err), bucket) | ||
|  | 		} | ||
|  | 
 | ||
|  | 		if attrs.Prefix != "" { | ||
|  | 			continue | ||
|  | 		} | ||
|  | 
 | ||
|  | 		objectKey, uploadID, partID, err := fromGCSMultipartKey(attrs.Name) | ||
|  | 		if err != nil { | ||
|  | 			continue | ||
|  | 		} else if partID != 0 { | ||
|  | 			continue | ||
|  | 		} | ||
|  | 
 | ||
|  | 		// we count only partID == 0
 | ||
|  | 		uploads = append(uploads, uploadMetadata{ | ||
|  | 			Object:    objectKey, | ||
|  | 			UploadID:  uploadID, | ||
|  | 			Initiated: attrs.Created, | ||
|  | 		}) | ||
|  | 
 | ||
|  | 	} | ||
|  | 
 | ||
|  | 	return ListMultipartsInfo{ | ||
|  | 		Uploads:     uploads, | ||
|  | 		IsTruncated: isTruncated, | ||
|  | 
 | ||
|  | 		KeyMarker:          nextMarker, | ||
|  | 		UploadIDMarker:     nextMarker, | ||
|  | 		NextKeyMarker:      nextMarker, | ||
|  | 		NextUploadIDMarker: nextMarker, | ||
|  | 		MaxUploads:         maxUploads, | ||
|  | 	}, nil | ||
|  | 
 | ||
|  | } | ||
|  | 
 | ||
|  | func fromGCSMultipartKey(s string) (key, uploadID string, partID int, err error) { | ||
|  | 	parts := strings.Split(s, "-") | ||
|  | 	if parts[0] != "multipart" { | ||
|  | 		return "", "", 0, ErrNotValidMultipartIdentifier | ||
|  | 	} | ||
|  | 
 | ||
|  | 	if len(parts) != 4 { | ||
|  | 		return "", "", 0, ErrNotValidMultipartIdentifier | ||
|  | 	} | ||
|  | 
 | ||
|  | 	key = parts[1] | ||
|  | 	uploadID = parts[3] | ||
|  | 
 | ||
|  | 	partID, err = strconv.Atoi(parts[3]) | ||
|  | 	if err != nil { | ||
|  | 		return "", "", 0, err | ||
|  | 	} | ||
|  | 
 | ||
|  | 	return | ||
|  | } | ||
|  | 
 | ||
|  | func toGCSMultipartKey(key string, uploadID string, partID int) string { | ||
|  | 	// we don't use the etag within the key, because the amazon spec
 | ||
|  | 	// explicitly notes that uploaded parts with same number are being overwritten
 | ||
|  | 
 | ||
|  | 	// parts are allowed to be numbered from 1 to 10,000 (inclusive)
 | ||
|  | 	return fmt.Sprintf("%s/multipart-%s-%s-%05d", ZZZZMinioPrefix, key, uploadID, partID) | ||
|  | } | ||
|  | 
 | ||
|  | // NewMultipartUpload - upload object in multiple parts
 | ||
|  | func (l *gcsGateway) NewMultipartUpload(bucket string, key string, metadata map[string]string) (uploadID string, err error) { | ||
|  | 	// generate new uploadid
 | ||
|  | 	uploadID = mustGetUUID() | ||
|  | 
 | ||
|  | 	// generate name for part zero
 | ||
|  | 	partZeroKey := toGCSMultipartKey(key, uploadID, 0) | ||
|  | 
 | ||
|  | 	// we are writing a 0 sized object to hold the metadata
 | ||
|  | 	w := l.client.Bucket(bucket).Object(partZeroKey).NewWriter(l.ctx) | ||
|  | 	w.ContentType = metadata["content-type"] | ||
|  | 	w.ContentEncoding = metadata["content-encoding"] | ||
|  | 	w.Metadata = metadata | ||
|  | 	if err = w.Close(); err != nil { | ||
|  | 		return "", gcsToObjectError(traceError(err), bucket, key) | ||
|  | 	} | ||
|  | 
 | ||
|  | 	return uploadID, nil | ||
|  | } | ||
|  | 
 | ||
|  | // CopyObjectPart - copy part of object to other bucket and object
 | ||
|  | func (l *gcsGateway) CopyObjectPart(srcBucket string, srcObject string, destBucket string, destObject string, uploadID string, partID int, startOffset int64, length int64) (info PartInfo, err error) { | ||
|  | 	return PartInfo{}, traceError(NotSupported{}) | ||
|  | } | ||
|  | 
 | ||
|  | // PutObjectPart puts a part of object in bucket
 | ||
|  | func (l *gcsGateway) PutObjectPart(bucket string, key string, uploadID string, partID int, size int64, data io.Reader, md5Hex string, sha256sum string) (PartInfo, error) { | ||
|  | 	multipartKey := toGCSMultipartKey(key, uploadID, partID) | ||
|  | 
 | ||
|  | 	info, err := l.PutObject(bucket, multipartKey, size, data, map[string]string{}, sha256sum) | ||
|  | 	return PartInfo{ | ||
|  | 		PartNumber:   partID, | ||
|  | 		LastModified: info.ModTime, | ||
|  | 		ETag:         info.MD5Sum, | ||
|  | 		Size:         info.Size, | ||
|  | 	}, err | ||
|  | } | ||
|  | 
 | ||
|  | // ListObjectParts returns all object parts for specified object in specified bucket
 | ||
|  | func (l *gcsGateway) ListObjectParts(bucket string, key string, uploadID string, partNumberMarker int, maxParts int) (ListPartsInfo, error) { | ||
|  | 	// TODO: support partNumberMarker
 | ||
|  | 
 | ||
|  | 	prefix := fmt.Sprintf("%s/multipart-%s-%s", ZZZZMinioPrefix, key, uploadID) | ||
|  | 	delimiter := "/" | ||
|  | 
 | ||
|  | 	it := l.client.Bucket(bucket).Objects(l.ctx, &storage.Query{Delimiter: delimiter, Prefix: prefix, Versions: false}) | ||
|  | 
 | ||
|  | 	isTruncated := false | ||
|  | 
 | ||
|  | 	parts := []PartInfo{} | ||
|  | 	for { | ||
|  | 		if maxParts <= len(parts) { | ||
|  | 			isTruncated = true | ||
|  | 			// nextMarker = it.PageInfo().Token
 | ||
|  | 			break | ||
|  | 		} | ||
|  | 
 | ||
|  | 		attrs, err := it.Next() | ||
|  | 		if err == iterator.Done { | ||
|  | 			break | ||
|  | 		} | ||
|  | 
 | ||
|  | 		if err != nil { | ||
|  | 			return ListPartsInfo{}, gcsToObjectError(traceError(err), bucket, prefix) | ||
|  | 		} | ||
|  | 
 | ||
|  | 		if attrs.Prefix != "" { | ||
|  | 			continue | ||
|  | 		} | ||
|  | 
 | ||
|  | 		_, _, partID, err := fromGCSMultipartKey(attrs.Name) | ||
|  | 		if err != nil { | ||
|  | 			continue | ||
|  | 		} else if partID == 0 { | ||
|  | 			// we'll ignore partID 0, it is our zero object, containing
 | ||
|  | 			// metadata
 | ||
|  | 			continue | ||
|  | 		} | ||
|  | 
 | ||
|  | 		parts = append(parts, PartInfo{ | ||
|  | 			PartNumber:   partID, | ||
|  | 			LastModified: attrs.Updated, | ||
|  | 			ETag:         hex.EncodeToString(attrs.MD5), | ||
|  | 			Size:         attrs.Size, | ||
|  | 		}) | ||
|  | 	} | ||
|  | 
 | ||
|  | 	return ListPartsInfo{ | ||
|  | 		IsTruncated: isTruncated, | ||
|  | 		Parts:       parts, | ||
|  | 	}, nil | ||
|  | } | ||
|  | 
 | ||
|  | // AbortMultipartUpload aborts a ongoing multipart upload
 | ||
|  | func (l *gcsGateway) AbortMultipartUpload(bucket string, key string, uploadID string) error { | ||
|  | 	prefix := fmt.Sprintf("%s/multipart-%s-%s", ZZZZMinioPrefix, key, uploadID) | ||
|  | 	delimiter := "/" | ||
|  | 
 | ||
|  | 	// delete part zero, ignoring errors here, we want to clean up all remains
 | ||
|  | 	_ = l.client.Bucket(bucket).Object(toGCSMultipartKey(key, uploadID, 0)).Delete(l.ctx) | ||
|  | 
 | ||
|  | 	// iterate through all parts and delete them
 | ||
|  | 	it := l.client.Bucket(bucket).Objects(l.ctx, &storage.Query{Delimiter: delimiter, Prefix: prefix, Versions: false}) | ||
|  | 	for { | ||
|  | 		attrs, err := it.Next() | ||
|  | 		if err == iterator.Done { | ||
|  | 			break | ||
|  | 		} | ||
|  | 
 | ||
|  | 		if err != nil { | ||
|  | 			return gcsToObjectError(traceError(err), bucket, prefix) | ||
|  | 		} | ||
|  | 
 | ||
|  | 		// on error continue deleting other parts
 | ||
|  | 		l.client.Bucket(bucket).Object(attrs.Name).Delete(l.ctx) | ||
|  | 	} | ||
|  | 
 | ||
|  | 	return nil | ||
|  | } | ||
|  | 
 | ||
|  | // CompleteMultipartUpload completes ongoing multipart upload and finalizes object
 | ||
|  | 
 | ||
|  | // Note that there is a limit (currently 32) to the number of components that can be composed in a single operation.
 | ||
|  | 
 | ||
|  | // There is a limit (currently 1024) to the total number of components for a given composite object. This means you can append to each object at most 1023 times.
 | ||
|  | 
 | ||
|  | // There is a per-project rate limit (currently 200) to the number of components you can compose per second. This rate counts both the components being appended to a composite object as well as the components being copied when the composite object of which they are a part is copied.
 | ||
|  | func (l *gcsGateway) CompleteMultipartUpload(bucket string, key string, uploadID string, uploadedParts []completePart) (ObjectInfo, error) { | ||
|  | 	partZero := l.client.Bucket(bucket).Object(toGCSMultipartKey(key, uploadID, 0)) | ||
|  | 
 | ||
|  | 	partZeroAttrs, err := partZero.Attrs(l.ctx) | ||
|  | 	if err != nil { | ||
|  | 		return ObjectInfo{}, gcsToObjectError(traceError(err), bucket, key) | ||
|  | 	} | ||
|  | 
 | ||
|  | 	parts := make([]*storage.ObjectHandle, len(uploadedParts)) | ||
|  | 	for i, uploadedPart := range uploadedParts { | ||
|  | 		// TODO: verify attrs / ETag
 | ||
|  | 		parts[i] = l.client.Bucket(bucket).Object(toGCSMultipartKey(key, uploadID, uploadedPart.PartNumber)) | ||
|  | 	} | ||
|  | 
 | ||
|  | 	if len(parts) > 32 { | ||
|  | 		// we need to split up the compose of more than 32 parts
 | ||
|  | 		// into subcomposes. This means that the first 32 parts will
 | ||
|  | 		// compose to a composed-object-0, next parts to composed-object-1,
 | ||
|  | 		// the final compose will compose composed-object* to 1.
 | ||
|  | 		return ObjectInfo{}, NotSupported{} | ||
|  | 	} | ||
|  | 
 | ||
|  | 	dst := l.client.Bucket(bucket).Object(key) | ||
|  | 
 | ||
|  | 	composer := dst.ComposerFrom(parts...) | ||
|  | 
 | ||
|  | 	composer.ContentType = partZeroAttrs.ContentType | ||
|  | 	composer.Metadata = partZeroAttrs.Metadata | ||
|  | 
 | ||
|  | 	attrs, err := composer.Run(l.ctx) | ||
|  | 
 | ||
|  | 	// cleanup, delete all parts
 | ||
|  | 	for _, uploadedPart := range uploadedParts { | ||
|  | 		l.client.Bucket(bucket).Object(toGCSMultipartKey(key, uploadID, uploadedPart.PartNumber)).Delete(l.ctx) | ||
|  | 	} | ||
|  | 
 | ||
|  | 	partZero.Delete(l.ctx) | ||
|  | 
 | ||
|  | 	return fromGCSObjectInfo(attrs), gcsToObjectError(traceError(err), bucket, key) | ||
|  | } | ||
|  | 
 | ||
|  | // SetBucketPolicies - Set policy on bucket
 | ||
|  | func (l *gcsGateway) SetBucketPolicies(bucket string, policyInfo policy.BucketAccessPolicy) error { | ||
|  | 	var policies []BucketAccessPolicy | ||
|  | 
 | ||
|  | 	for prefix, policy := range policy.GetPolicies(policyInfo.Statements, bucket) { | ||
|  | 		policies = append(policies, BucketAccessPolicy{ | ||
|  | 			Prefix: prefix, | ||
|  | 			Policy: policy, | ||
|  | 		}) | ||
|  | 	} | ||
|  | 
 | ||
|  | 	prefix := bucket + "/*" // For all objects inside the bucket.
 | ||
|  | 
 | ||
|  | 	if len(policies) != 1 { | ||
|  | 		return traceError(NotImplemented{}) | ||
|  | 	} else if policies[0].Prefix != prefix { | ||
|  | 		return traceError(NotImplemented{}) | ||
|  | 	} | ||
|  | 
 | ||
|  | 	acl := l.client.Bucket(bucket).ACL() | ||
|  | 
 | ||
|  | 	if policies[0].Policy == policy.BucketPolicyNone { | ||
|  | 		if err := acl.Delete(l.ctx, storage.AllUsers); err != nil { | ||
|  | 			return gcsToObjectError(traceError(err), bucket) | ||
|  | 		} | ||
|  | 
 | ||
|  | 		return nil | ||
|  | 	} | ||
|  | 
 | ||
|  | 	role := storage.RoleReader | ||
|  | 
 | ||
|  | 	switch policies[0].Policy { | ||
|  | 	case policy.BucketPolicyReadOnly: | ||
|  | 		role = storage.RoleReader | ||
|  | 	case policy.BucketPolicyWriteOnly: | ||
|  | 		role = storage.RoleWriter | ||
|  | 	case policy.BucketPolicyReadWrite: | ||
|  | 		// not supported, google only has owner role
 | ||
|  | 		return gcsToObjectError(traceError(NotSupported{}), bucket) | ||
|  | 	default: | ||
|  | 		return gcsToObjectError(traceError(fmt.Errorf("Unknown policy: %s", policies[0].Policy)), bucket) | ||
|  | 	} | ||
|  | 
 | ||
|  | 	if err := acl.Set(l.ctx, storage.AllUsers, role); err != nil { | ||
|  | 		return gcsToObjectError(traceError(err), bucket) | ||
|  | 	} | ||
|  | 
 | ||
|  | 	return nil | ||
|  | } | ||
|  | 
 | ||
|  | // GetBucketPolicies - Get policy on bucket
 | ||
|  | func (l *gcsGateway) GetBucketPolicies(bucket string) (policy.BucketAccessPolicy, error) { | ||
|  | 	acl := l.client.Bucket(bucket).ACL() | ||
|  | 
 | ||
|  | 	rules, err := acl.List(l.ctx) | ||
|  | 	if err != nil { | ||
|  | 		return policy.BucketAccessPolicy{}, gcsToObjectError(traceError(err), bucket) | ||
|  | 	} | ||
|  | 
 | ||
|  | 	policyInfo := policy.BucketAccessPolicy{Version: "2012-10-17"} | ||
|  | 
 | ||
|  | 	for _, r := range rules { | ||
|  | 		if r.Entity != storage.AllUsers { | ||
|  | 			continue | ||
|  | 		} | ||
|  | 
 | ||
|  | 		switch r.Role { | ||
|  | 		case storage.RoleOwner: | ||
|  | 			return policy.BucketAccessPolicy{}, gcsToObjectError(traceError(NotSupported{}), bucket) | ||
|  | 		case storage.RoleReader: | ||
|  | 			policyInfo.Statements = policy.SetPolicy(policyInfo.Statements, policy.BucketPolicyReadOnly, bucket, "") | ||
|  | 		case storage.RoleWriter: | ||
|  | 			policyInfo.Statements = policy.SetPolicy(policyInfo.Statements, policy.BucketPolicyWriteOnly, bucket, "") | ||
|  | 		} | ||
|  | 	} | ||
|  | 
 | ||
|  | 	return policyInfo, nil | ||
|  | } | ||
|  | 
 | ||
|  | // DeleteBucketPolicies - Delete all policies on bucket
 | ||
|  | func (l *gcsGateway) DeleteBucketPolicies(bucket string) error { | ||
|  | 	acl := l.client.Bucket(bucket).ACL() | ||
|  | 
 | ||
|  | 	// this only removes the storage.AllUsers policies
 | ||
|  | 	if err := acl.Delete(l.ctx, storage.AllUsers); err != nil { | ||
|  | 		return gcsToObjectError(traceError(err), bucket) | ||
|  | 	} | ||
|  | 
 | ||
|  | 	return nil | ||
|  | } |