mirror of https://github.com/minio/minio.git
				
				
				
			Prevent overwrites due to rebalance-stop race (#20233)
Rebalance-stop can race with ongoing rebalance operations. This change prevents these operations from overwriting objects by checking the source and destination pool indices are different.
This commit is contained in:
		
							parent
							
								
									49055658a9
								
							
						
					
					
						commit
						4e67a4027e
					
				|  | @ -706,7 +706,13 @@ func (z *erasureServerPools) rebalanceBucket(ctx context.Context, bucket string, | |||
| 						continue | ||||
| 					} | ||||
| 
 | ||||
| 					if err = z.rebalanceObject(ctx, bucket, gr); err != nil { | ||||
| 					if err = z.rebalanceObject(ctx, poolIdx, bucket, gr); err != nil { | ||||
| 						// This can happen when rebalance stop races with ongoing rebalance workers.
 | ||||
| 						// These rebalance failures can be ignored.
 | ||||
| 						if isDataMovementOverWriteErr(err) { | ||||
| 							ignore = true | ||||
| 							continue | ||||
| 						} | ||||
| 						failure = true | ||||
| 						rebalanceLogIf(ctx, err) | ||||
| 						stopFn(version.Size, err) | ||||
|  | @ -822,7 +828,7 @@ func auditLogRebalance(ctx context.Context, apiName, bucket, object, versionID s | |||
| 	}) | ||||
| } | ||||
| 
 | ||||
| func (z *erasureServerPools) rebalanceObject(ctx context.Context, bucket string, gr *GetObjectReader) (err error) { | ||||
| func (z *erasureServerPools) rebalanceObject(ctx context.Context, poolIdx int, bucket string, gr *GetObjectReader) (err error) { | ||||
| 	oi := gr.ObjInfo | ||||
| 
 | ||||
| 	defer func() { | ||||
|  | @ -837,9 +843,11 @@ func (z *erasureServerPools) rebalanceObject(ctx context.Context, bucket string, | |||
| 
 | ||||
| 	if oi.isMultipart() { | ||||
| 		res, err := z.NewMultipartUpload(ctx, bucket, oi.Name, ObjectOptions{ | ||||
| 			VersionID:   oi.VersionID, | ||||
| 			UserDefined: oi.UserDefined, | ||||
| 			NoAuditLog:  true, | ||||
| 			VersionID:    oi.VersionID, | ||||
| 			UserDefined:  oi.UserDefined, | ||||
| 			NoAuditLog:   true, | ||||
| 			DataMovement: true, | ||||
| 			SrcPoolIdx:   poolIdx, | ||||
| 		}) | ||||
| 		if err != nil { | ||||
| 			return fmt.Errorf("rebalanceObject: NewMultipartUpload() %w", err) | ||||
|  | @ -891,6 +899,7 @@ func (z *erasureServerPools) rebalanceObject(ctx context.Context, bucket string, | |||
| 		oi.Name, | ||||
| 		NewPutObjReader(hr), | ||||
| 		ObjectOptions{ | ||||
| 			SrcPoolIdx:   poolIdx, | ||||
| 			DataMovement: true, | ||||
| 			VersionID:    oi.VersionID, | ||||
| 			MTime:        oi.ModTime, | ||||
|  | @ -981,6 +990,8 @@ const ( | |||
| 	rebalanceMetricSaveMetadata | ||||
| ) | ||||
| 
 | ||||
| var errDataMovementSrcDstPoolSame = errors.New("source and destination pool are the same") | ||||
| 
 | ||||
| func rebalanceTrace(r rebalanceMetric, poolIdx int, startTime time.Time, duration time.Duration, err error, path string, sz int64) madmin.TraceInfo { | ||||
| 	var errStr string | ||||
| 	if err != nil { | ||||
|  |  | |||
|  | @ -1087,6 +1087,14 @@ func (z *erasureServerPools) PutObject(ctx context.Context, bucket string, objec | |||
| 		return ObjectInfo{}, err | ||||
| 	} | ||||
| 
 | ||||
| 	if opts.DataMovement && idx == opts.SrcPoolIdx { | ||||
| 		return ObjectInfo{}, DataMovementOverwriteErr{ | ||||
| 			Bucket:    bucket, | ||||
| 			Object:    object, | ||||
| 			VersionID: opts.VersionID, | ||||
| 			Err:       errDataMovementSrcDstPoolSame, | ||||
| 		} | ||||
| 	} | ||||
| 	// Overwrite the object at the right pool
 | ||||
| 	return z.serverPools[idx].PutObject(ctx, bucket, object, data, opts) | ||||
| } | ||||
|  | @ -1752,6 +1760,15 @@ func (z *erasureServerPools) NewMultipartUpload(ctx context.Context, bucket, obj | |||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	if opts.DataMovement && idx == opts.SrcPoolIdx { | ||||
| 		return nil, DataMovementOverwriteErr{ | ||||
| 			Bucket:    bucket, | ||||
| 			Object:    object, | ||||
| 			VersionID: opts.VersionID, | ||||
| 			Err:       errDataMovementSrcDstPoolSame, | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	return z.serverPools[idx].NewMultipartUpload(ctx, bucket, object, opts) | ||||
| } | ||||
| 
 | ||||
|  |  | |||
|  | @ -788,3 +788,20 @@ func isReplicationPermissionCheck(err error) bool { | |||
| 	_, ok := err.(ReplicationPermissionCheck) | ||||
| 	return ok | ||||
| } | ||||
| 
 | ||||
| // DataMovementOverwriteErr - captures the error when a data movement activity
 | ||||
| // like rebalance incorrectly tries to overwrite an object.
 | ||||
| type DataMovementOverwriteErr GenericError | ||||
| 
 | ||||
| func (de DataMovementOverwriteErr) Error() string { | ||||
| 	objInfoStr := fmt.Sprintf("bucket=%s object=%s", de.Bucket, de.Object) | ||||
| 	if de.VersionID != "" { | ||||
| 		objInfoStr = fmt.Sprintf("%s version-id=%s", objInfoStr, de.VersionID) | ||||
| 	} | ||||
| 	return fmt.Sprintf("invalid data movement operation, source and destination pool are the same for %s", objInfoStr) | ||||
| } | ||||
| 
 | ||||
| func isDataMovementOverWriteErr(err error) bool { | ||||
| 	var de DataMovementOverwriteErr | ||||
| 	return errors.As(err, &de) | ||||
| } | ||||
|  |  | |||
|  | @ -113,6 +113,8 @@ type ObjectOptions struct { | |||
| 	// participating in a rebalance operation. Typically set for 'write' operations.
 | ||||
| 	SkipRebalancing bool | ||||
| 
 | ||||
| 	SrcPoolIdx int // set by PutObject/CompleteMultipart operations due to rebalance; used to prevent rebalance src, dst pools to be the same
 | ||||
| 
 | ||||
| 	DataMovement bool // indicates an going decommisionning or rebalacing
 | ||||
| 
 | ||||
| 	PrefixEnabledFn func(prefix string) bool // function which returns true if versioning is enabled on prefix
 | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue