| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | package resource | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import ( | 
					
						
							|  |  |  | 	context "context" | 
					
						
							|  |  |  | 	"encoding/json" | 
					
						
							|  |  |  | 	"fmt" | 
					
						
							|  |  |  | 	"log/slog" | 
					
						
							| 
									
										
										
										
											2024-07-16 22:43:15 +08:00
										 |  |  | 	"net/http" | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 	"sync" | 
					
						
							| 
									
										
										
										
											2024-10-07 16:01:53 +08:00
										 |  |  | 	"sync/atomic" | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 	"time" | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-12-14 06:55:43 +08:00
										 |  |  | 	"github.com/google/uuid" | 
					
						
							| 
									
										
										
										
											2024-10-17 18:18:29 +08:00
										 |  |  | 	"github.com/prometheus/client_golang/prometheus" | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 	"go.opentelemetry.io/otel/trace" | 
					
						
							|  |  |  | 	"go.opentelemetry.io/otel/trace/noop" | 
					
						
							|  |  |  | 	apierrors "k8s.io/apimachinery/pkg/api/errors" | 
					
						
							|  |  |  | 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | 
					
						
							| 
									
										
										
										
											2024-07-17 22:40:03 +08:00
										 |  |  | 	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" | 
					
						
							| 
									
										
										
										
											2024-12-14 06:55:43 +08:00
										 |  |  | 	"k8s.io/apimachinery/pkg/types" | 
					
						
							| 
									
										
										
										
											2024-11-09 13:09:46 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-01-21 17:06:55 +08:00
										 |  |  | 	claims "github.com/grafana/authlib/types" | 
					
						
							| 
									
										
										
										
											2024-11-09 13:09:46 +08:00
										 |  |  | 	"github.com/grafana/grafana/pkg/apimachinery/utils" | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-30 18:16:16 +08:00
										 |  |  | // ResourceServer implements all gRPC services
 | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | type ResourceServer interface { | 
					
						
							|  |  |  | 	ResourceStoreServer | 
					
						
							| 
									
										
										
										
											2025-02-12 01:57:46 +08:00
										 |  |  | 	BatchStoreServer | 
					
						
							| 
									
										
										
										
											2024-07-18 01:49:23 +08:00
										 |  |  | 	ResourceIndexServer | 
					
						
							| 
									
										
										
										
											2025-01-11 02:27:10 +08:00
										 |  |  | 	RepositoryIndexServer | 
					
						
							| 
									
										
										
										
											2024-10-17 18:18:29 +08:00
										 |  |  | 	BlobStoreServer | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 	DiagnosticsServer | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-31 17:05:59 +08:00
										 |  |  | type ListIterator interface { | 
					
						
							|  |  |  | 	Next() bool // sql.Rows
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Iterator error (if exts)
 | 
					
						
							|  |  |  | 	Error() error | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// The token that can be used to start iterating *after* this item
 | 
					
						
							|  |  |  | 	ContinueToken() string | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-01-28 22:17:52 +08:00
										 |  |  | 	// The token that can be used to start iterating *before* this item
 | 
					
						
							|  |  |  | 	ContinueTokenWithCurrentRV() string | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-31 17:05:59 +08:00
										 |  |  | 	// ResourceVersion of the current item
 | 
					
						
							|  |  |  | 	ResourceVersion() int64 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Namespace of the current item
 | 
					
						
							|  |  |  | 	// Used for fast(er) authz filtering
 | 
					
						
							|  |  |  | 	Namespace() string | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Name of the current item
 | 
					
						
							|  |  |  | 	// Used for fast(er) authz filtering
 | 
					
						
							|  |  |  | 	Name() string | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-11-12 19:52:04 +08:00
										 |  |  | 	// Folder of the current item
 | 
					
						
							|  |  |  | 	// Used for fast(er) authz filtering
 | 
					
						
							|  |  |  | 	Folder() string | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-31 17:05:59 +08:00
										 |  |  | 	// Value for the current item
 | 
					
						
							|  |  |  | 	Value() []byte | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-11-12 19:52:04 +08:00
										 |  |  | type BackendReadResponse struct { | 
					
						
							|  |  |  | 	// Metadata
 | 
					
						
							|  |  |  | 	Key    *ResourceKey | 
					
						
							|  |  |  | 	Folder string | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// The new resource version
 | 
					
						
							|  |  |  | 	ResourceVersion int64 | 
					
						
							|  |  |  | 	// The properties
 | 
					
						
							|  |  |  | 	Value []byte | 
					
						
							|  |  |  | 	// Error details
 | 
					
						
							|  |  |  | 	Error *ErrorResult | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | // The StorageBackend is an internal abstraction that supports interacting with
 | 
					
						
							|  |  |  | // the underlying raw storage medium.  This interface is never exposed directly,
 | 
					
						
							|  |  |  | // it is provided by concrete instances that actually write values.
 | 
					
						
							|  |  |  | type StorageBackend interface { | 
					
						
							|  |  |  | 	// Write a Create/Update/Delete,
 | 
					
						
							|  |  |  | 	// NOTE: the contents of WriteEvent have been validated
 | 
					
						
							|  |  |  | 	// Return the revisionVersion for this event or error
 | 
					
						
							|  |  |  | 	WriteEvent(context.Context, WriteEvent) (int64, error) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-30 18:16:16 +08:00
										 |  |  | 	// Read a resource from storage optionally at an explicit version
 | 
					
						
							| 
									
										
										
										
											2024-11-12 19:52:04 +08:00
										 |  |  | 	ReadResource(context.Context, *ReadRequest) *BackendReadResponse | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-31 17:05:59 +08:00
										 |  |  | 	// When the ResourceServer executes a List request, this iterator will
 | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 	// query the backend for potential results.  All results will be
 | 
					
						
							|  |  |  | 	// checked against the kubernetes requirements before finally returning
 | 
					
						
							|  |  |  | 	// results.  The list options can be used to improve performance
 | 
					
						
							|  |  |  | 	// but are the the final answer.
 | 
					
						
							| 
									
										
										
										
											2024-07-31 17:05:59 +08:00
										 |  |  | 	ListIterator(context.Context, *ListRequest, func(ListIterator) error) (int64, error) | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	// Get all events from the store
 | 
					
						
							|  |  |  | 	// For HA setups, this will be more events than the local WriteEvent above!
 | 
					
						
							|  |  |  | 	WatchWriteEvents(ctx context.Context) (<-chan *WrittenEvent, error) | 
					
						
							| 
									
										
										
										
											2024-11-07 02:58:07 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-12-05 18:58:13 +08:00
										 |  |  | 	// Get resource stats within the storage backend.  When namespace is empty, it will apply to all
 | 
					
						
							|  |  |  | 	GetResourceStats(ctx context.Context, namespace string, minCount int) ([]ResourceStats, error) | 
					
						
							| 
									
										
										
										
											2024-12-04 01:20:27 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | type ResourceStats struct { | 
					
						
							|  |  |  | 	NamespacedResource | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	Count           int64 | 
					
						
							|  |  |  | 	ResourceVersion int64 | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-10-17 18:18:29 +08:00
										 |  |  | // This interface is not exposed to end users directly
 | 
					
						
							|  |  |  | // Access to this interface is already gated by access control
 | 
					
						
							|  |  |  | type BlobSupport interface { | 
					
						
							|  |  |  | 	// Indicates if storage layer supports signed urls
 | 
					
						
							|  |  |  | 	SupportsSignedURLs() bool | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Get the raw blob bytes and metadata -- limited to protobuf message size
 | 
					
						
							|  |  |  | 	// For larger payloads, we should use presigned URLs to upload from the client
 | 
					
						
							|  |  |  | 	PutResourceBlob(context.Context, *PutBlobRequest) (*PutBlobResponse, error) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Get blob contents.  When possible, this will return a signed URL
 | 
					
						
							|  |  |  | 	// For large payloads, signed URLs are required to avoid protobuf message size limits
 | 
					
						
							|  |  |  | 	GetResourceBlob(ctx context.Context, resource *ResourceKey, info *utils.BlobInfo, mustProxy bool) (*GetBlobResponse, error) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// TODO? List+Delete?  This is for admin access
 | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | type BlobConfig struct { | 
					
						
							|  |  |  | 	// The CDK configuration URL
 | 
					
						
							|  |  |  | 	URL string | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Directly implemented blob support
 | 
					
						
							|  |  |  | 	Backend BlobSupport | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-11-21 13:53:25 +08:00
										 |  |  | // Passed as input to the constructor
 | 
					
						
							|  |  |  | type SearchOptions struct { | 
					
						
							|  |  |  | 	// The raw index backend (eg, bleve, frames, parquet, etc)
 | 
					
						
							|  |  |  | 	Backend SearchBackend | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// The supported resource types
 | 
					
						
							|  |  |  | 	Resources DocumentBuilderSupplier | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// How many threads should build indexes
 | 
					
						
							|  |  |  | 	WorkerThreads int | 
					
						
							| 
									
										
										
										
											2024-12-04 01:20:27 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	// Skip building index on startup for small indexes
 | 
					
						
							|  |  |  | 	InitMinCount int | 
					
						
							| 
									
										
										
										
											2024-11-21 13:53:25 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | type ResourceServerOptions struct { | 
					
						
							|  |  |  | 	// OTel tracer
 | 
					
						
							|  |  |  | 	Tracer trace.Tracer | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Real storage backend
 | 
					
						
							|  |  |  | 	Backend StorageBackend | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-10-17 18:18:29 +08:00
										 |  |  | 	// The blob configuration
 | 
					
						
							|  |  |  | 	Blob BlobConfig | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-11-21 13:53:25 +08:00
										 |  |  | 	// Search options
 | 
					
						
							|  |  |  | 	Search SearchOptions | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 	// Diagnostics
 | 
					
						
							|  |  |  | 	Diagnostics DiagnosticsServer | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Check if a user has access to write folders
 | 
					
						
							|  |  |  | 	// When this is nil, no resources can have folders configured
 | 
					
						
							| 
									
										
										
										
											2024-11-13 20:17:15 +08:00
										 |  |  | 	WriteHooks WriteAccessHooks | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Link RBAC
 | 
					
						
							| 
									
										
										
										
											2025-01-21 17:06:55 +08:00
										 |  |  | 	AccessClient claims.AccessClient | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	// Callbacks for startup and shutdown
 | 
					
						
							|  |  |  | 	Lifecycle LifecycleHooks | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Get the current time in unix millis
 | 
					
						
							|  |  |  | 	Now func() int64 | 
					
						
							| 
									
										
										
										
											2024-10-17 18:18:29 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	// Registerer to register prometheus Metrics for the Resource server
 | 
					
						
							|  |  |  | 	Reg prometheus.Registerer | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func NewResourceServer(opts ResourceServerOptions) (ResourceServer, error) { | 
					
						
							|  |  |  | 	if opts.Tracer == nil { | 
					
						
							|  |  |  | 		opts.Tracer = noop.NewTracerProvider().Tracer("resource-server") | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	if opts.Backend == nil { | 
					
						
							|  |  |  | 		return nil, fmt.Errorf("missing Backend implementation") | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-10-01 03:46:14 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-11-13 20:17:15 +08:00
										 |  |  | 	if opts.AccessClient == nil { | 
					
						
							| 
									
										
										
										
											2025-01-21 17:06:55 +08:00
										 |  |  | 		opts.AccessClient = claims.FixedAccessClient(true) // everything OK
 | 
					
						
							| 
									
										
										
										
											2024-11-13 20:17:15 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 	if opts.Diagnostics == nil { | 
					
						
							|  |  |  | 		opts.Diagnostics = &noopService{} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	if opts.Now == nil { | 
					
						
							|  |  |  | 		opts.Now = func() int64 { | 
					
						
							|  |  |  | 			return time.Now().UnixMilli() | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-10-17 18:18:29 +08:00
										 |  |  | 	// Initialize the blob storage
 | 
					
						
							|  |  |  | 	blobstore := opts.Blob.Backend | 
					
						
							| 
									
										
										
										
											2025-01-09 04:08:10 +08:00
										 |  |  | 	if blobstore == nil { | 
					
						
							|  |  |  | 		if opts.Blob.URL != "" { | 
					
						
							|  |  |  | 			ctx := context.Background() | 
					
						
							|  |  |  | 			bucket, err := OpenBlobBucket(ctx, opts.Blob.URL) | 
					
						
							|  |  |  | 			if err != nil { | 
					
						
							|  |  |  | 				return nil, err | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 			blobstore, err = NewCDKBlobSupport(ctx, CDKBlobSupportOptions{ | 
					
						
							|  |  |  | 				Tracer: opts.Tracer, | 
					
						
							|  |  |  | 				Bucket: NewInstrumentedBucket(bucket, opts.Reg, opts.Tracer), | 
					
						
							|  |  |  | 			}) | 
					
						
							|  |  |  | 			if err != nil { | 
					
						
							|  |  |  | 				return nil, err | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 		} else { | 
					
						
							|  |  |  | 			// Check if the backend supports blob storage
 | 
					
						
							|  |  |  | 			blobstore, _ = opts.Backend.(BlobSupport) | 
					
						
							| 
									
										
										
										
											2024-10-17 18:18:29 +08:00
										 |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-10-22 07:15:11 +08:00
										 |  |  | 	logger := slog.Default().With("logger", "resource-server") | 
					
						
							|  |  |  | 	// register metrics
 | 
					
						
							|  |  |  | 	if err := prometheus.Register(NewStorageMetrics()); err != nil { | 
					
						
							|  |  |  | 		logger.Warn("failed to register storage metrics", "error", err) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 	// Make this cancelable
 | 
					
						
							| 
									
										
										
										
											2025-01-24 21:03:23 +08:00
										 |  |  | 	ctx, cancel := context.WithCancel(context.Background()) | 
					
						
							| 
									
										
										
										
											2024-11-07 02:58:07 +08:00
										 |  |  | 	s := &server{ | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 		tracer:      opts.Tracer, | 
					
						
							| 
									
										
										
										
											2024-10-22 07:15:11 +08:00
										 |  |  | 		log:         logger, | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 		backend:     opts.Backend, | 
					
						
							| 
									
										
										
										
											2024-10-17 18:18:29 +08:00
										 |  |  | 		blob:        blobstore, | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 		diagnostics: opts.Diagnostics, | 
					
						
							| 
									
										
										
										
											2024-11-13 20:17:15 +08:00
										 |  |  | 		access:      opts.AccessClient, | 
					
						
							|  |  |  | 		writeHooks:  opts.WriteHooks, | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 		lifecycle:   opts.Lifecycle, | 
					
						
							|  |  |  | 		now:         opts.Now, | 
					
						
							|  |  |  | 		ctx:         ctx, | 
					
						
							|  |  |  | 		cancel:      cancel, | 
					
						
							| 
									
										
										
										
											2024-11-07 02:58:07 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-11-21 13:53:25 +08:00
										 |  |  | 	if opts.Search.Resources != nil { | 
					
						
							|  |  |  | 		var err error | 
					
						
							| 
									
										
										
										
											2024-11-27 13:57:53 +08:00
										 |  |  | 		s.search, err = newSearchSupport(opts.Search, s.backend, s.access, s.blob, opts.Tracer) | 
					
						
							| 
									
										
										
										
											2024-11-21 13:53:25 +08:00
										 |  |  | 		if err != nil { | 
					
						
							|  |  |  | 			return nil, err | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-12-10 12:32:19 +08:00
										 |  |  | 	err := s.Init(ctx) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							| 
									
										
										
										
											2025-02-12 01:57:46 +08:00
										 |  |  | 		s.log.Error("resource server init failed", "error", err) | 
					
						
							| 
									
										
										
										
											2024-12-10 12:32:19 +08:00
										 |  |  | 		return nil, err | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-11-07 02:58:07 +08:00
										 |  |  | 	return s, nil | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | var _ ResourceServer = &server{} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | type server struct { | 
					
						
							| 
									
										
										
										
											2024-10-07 16:01:53 +08:00
										 |  |  | 	tracer       trace.Tracer | 
					
						
							|  |  |  | 	log          *slog.Logger | 
					
						
							|  |  |  | 	backend      StorageBackend | 
					
						
							| 
									
										
										
										
											2024-10-17 18:18:29 +08:00
										 |  |  | 	blob         BlobSupport | 
					
						
							| 
									
										
										
										
											2024-11-21 13:53:25 +08:00
										 |  |  | 	search       *searchSupport | 
					
						
							| 
									
										
										
										
											2024-10-07 16:01:53 +08:00
										 |  |  | 	diagnostics  DiagnosticsServer | 
					
						
							| 
									
										
										
										
											2025-01-21 17:06:55 +08:00
										 |  |  | 	access       claims.AccessClient | 
					
						
							| 
									
										
										
										
											2024-11-13 20:17:15 +08:00
										 |  |  | 	writeHooks   WriteAccessHooks | 
					
						
							| 
									
										
										
										
											2024-10-07 16:01:53 +08:00
										 |  |  | 	lifecycle    LifecycleHooks | 
					
						
							|  |  |  | 	now          func() int64 | 
					
						
							|  |  |  | 	mostRecentRV atomic.Int64 // The most recent resource version seen by the server
 | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	// Background watch task -- this has permissions for everything
 | 
					
						
							|  |  |  | 	ctx         context.Context | 
					
						
							|  |  |  | 	cancel      context.CancelFunc | 
					
						
							|  |  |  | 	broadcaster Broadcaster[*WrittenEvent] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// init checking
 | 
					
						
							|  |  |  | 	once    sync.Once | 
					
						
							|  |  |  | 	initErr error | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Init implements ResourceServer.
 | 
					
						
							| 
									
										
										
										
											2024-07-23 01:08:30 +08:00
										 |  |  | func (s *server) Init(ctx context.Context) error { | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 	s.once.Do(func() { | 
					
						
							|  |  |  | 		// Call lifecycle hooks
 | 
					
						
							|  |  |  | 		if s.lifecycle != nil { | 
					
						
							| 
									
										
										
										
											2024-07-23 01:08:30 +08:00
										 |  |  | 			err := s.lifecycle.Init(ctx) | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 			if err != nil { | 
					
						
							|  |  |  | 				s.initErr = fmt.Errorf("initialize Resource Server: %w", err) | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-11-21 13:53:25 +08:00
										 |  |  | 		// initialize the search index
 | 
					
						
							|  |  |  | 		if s.initErr == nil && s.search != nil { | 
					
						
							|  |  |  | 			s.initErr = s.search.init(ctx) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-12-10 12:32:19 +08:00
										 |  |  | 		// Start watching for changes
 | 
					
						
							|  |  |  | 		if s.initErr == nil { | 
					
						
							|  |  |  | 			s.initErr = s.initWatcher() | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 		if s.initErr != nil { | 
					
						
							| 
									
										
										
										
											2025-02-12 01:57:46 +08:00
										 |  |  | 			s.log.Error("error running resource server init", "error", s.initErr) | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 		} | 
					
						
							|  |  |  | 	}) | 
					
						
							|  |  |  | 	return s.initErr | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-23 01:08:30 +08:00
										 |  |  | func (s *server) Stop(ctx context.Context) error { | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 	s.initErr = fmt.Errorf("service is stopping") | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-23 01:08:30 +08:00
										 |  |  | 	var stopFailed bool | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 	if s.lifecycle != nil { | 
					
						
							| 
									
										
										
										
											2024-07-23 01:08:30 +08:00
										 |  |  | 		err := s.lifecycle.Stop(ctx) | 
					
						
							|  |  |  | 		if err != nil { | 
					
						
							|  |  |  | 			stopFailed = true | 
					
						
							|  |  |  | 			s.initErr = fmt.Errorf("service stopeed with error: %w", err) | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Stops the streaming
 | 
					
						
							|  |  |  | 	s.cancel() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// mark the value as done
 | 
					
						
							| 
									
										
										
										
											2024-07-23 01:08:30 +08:00
										 |  |  | 	if stopFailed { | 
					
						
							|  |  |  | 		return s.initErr | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 	s.initErr = fmt.Errorf("service is stopped") | 
					
						
							| 
									
										
										
										
											2024-07-23 01:08:30 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	return nil | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Old value indicates an update -- otherwise a create
 | 
					
						
							| 
									
										
										
										
											2024-08-12 14:49:34 +08:00
										 |  |  | func (s *server) newEvent(ctx context.Context, user claims.AuthInfo, key *ResourceKey, value, oldValue []byte) (*WriteEvent, *ErrorResult) { | 
					
						
							| 
									
										
										
										
											2024-07-17 22:40:03 +08:00
										 |  |  | 	tmp := &unstructured.Unstructured{} | 
					
						
							| 
									
										
										
										
											2024-08-02 15:27:10 +08:00
										 |  |  | 	err := tmp.UnmarshalJSON(value) | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 	if err != nil { | 
					
						
							| 
									
										
										
										
											2024-08-02 15:27:10 +08:00
										 |  |  | 		return nil, AsErrorResult(err) | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-07-17 22:40:03 +08:00
										 |  |  | 	obj, err := utils.MetaAccessor(tmp) | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 	if err != nil { | 
					
						
							| 
									
										
										
										
											2024-08-02 15:27:10 +08:00
										 |  |  | 		return nil, AsErrorResult(err) | 
					
						
							| 
									
										
										
										
											2024-07-17 22:40:03 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-10-23 16:29:41 +08:00
										 |  |  | 	if obj.GetUID() == "" { | 
					
						
							| 
									
										
										
										
											2024-11-13 00:58:32 +08:00
										 |  |  | 		// TODO! once https://github.com/grafana/grafana/pull/96086 is deployed everywhere
 | 
					
						
							|  |  |  | 		// return nil, NewBadRequestError("object is missing UID")
 | 
					
						
							| 
									
										
										
										
											2024-10-23 16:29:41 +08:00
										 |  |  | 		s.log.Error("object is missing UID", "key", key) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	if obj.GetResourceVersion() != "" { | 
					
						
							|  |  |  | 		s.log.Error("object must not include a resource version", "key", key) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-01-17 20:54:25 +08:00
										 |  |  | 	// Make sure the command labels are not saved
 | 
					
						
							|  |  |  | 	for k := range obj.GetLabels() { | 
					
						
							|  |  |  | 		if k == utils.LabelKeyGetHistory || k == utils.LabelKeyGetTrash { | 
					
						
							|  |  |  | 			return nil, NewBadRequestError("can not save label: " + k) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-01-21 17:06:55 +08:00
										 |  |  | 	check := claims.CheckRequest{ | 
					
						
							| 
									
										
										
										
											2025-01-15 16:23:56 +08:00
										 |  |  | 		Verb:      utils.VerbCreate, | 
					
						
							| 
									
										
										
										
											2024-11-13 20:17:15 +08:00
										 |  |  | 		Group:     key.Group, | 
					
						
							|  |  |  | 		Resource:  key.Resource, | 
					
						
							|  |  |  | 		Namespace: key.Namespace, | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-17 22:40:03 +08:00
										 |  |  | 	event := &WriteEvent{ | 
					
						
							|  |  |  | 		Value:  value, | 
					
						
							|  |  |  | 		Key:    key, | 
					
						
							|  |  |  | 		Object: obj, | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	if oldValue == nil { | 
					
						
							|  |  |  | 		event.Type = WatchEvent_ADDED | 
					
						
							|  |  |  | 	} else { | 
					
						
							| 
									
										
										
										
											2025-01-15 16:23:56 +08:00
										 |  |  | 		check.Verb = utils.VerbUpdate | 
					
						
							| 
									
										
										
										
											2024-07-17 22:40:03 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 		temp := &unstructured.Unstructured{} | 
					
						
							|  |  |  | 		err = temp.UnmarshalJSON(oldValue) | 
					
						
							|  |  |  | 		if err != nil { | 
					
						
							| 
									
										
										
										
											2024-08-02 15:27:10 +08:00
										 |  |  | 			return nil, AsErrorResult(err) | 
					
						
							| 
									
										
										
										
											2024-07-17 22:40:03 +08:00
										 |  |  | 		} | 
					
						
							|  |  |  | 		event.ObjectOld, err = utils.MetaAccessor(temp) | 
					
						
							|  |  |  | 		if err != nil { | 
					
						
							| 
									
										
										
										
											2024-08-02 15:27:10 +08:00
										 |  |  | 			return nil, AsErrorResult(err) | 
					
						
							| 
									
										
										
										
											2024-07-17 22:40:03 +08:00
										 |  |  | 		} | 
					
						
							| 
									
										
										
										
											2024-12-14 06:55:43 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 		// restores will restore with a different k8s uid
 | 
					
						
							|  |  |  | 		if event.ObjectOld.GetUID() != obj.GetUID() { | 
					
						
							|  |  |  | 			event.Type = WatchEvent_ADDED | 
					
						
							|  |  |  | 		} else { | 
					
						
							|  |  |  | 			event.Type = WatchEvent_MODIFIED | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	if key.Namespace != obj.GetNamespace() { | 
					
						
							| 
									
										
										
										
											2024-08-02 15:27:10 +08:00
										 |  |  | 		return nil, NewBadRequestError("key/namespace do not match") | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	gvk := obj.GetGroupVersionKind() | 
					
						
							|  |  |  | 	if gvk.Kind == "" { | 
					
						
							| 
									
										
										
										
											2024-08-02 15:27:10 +08:00
										 |  |  | 		return nil, NewBadRequestError("expecting resources with a kind in the body") | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 	if gvk.Version == "" { | 
					
						
							| 
									
										
										
										
											2024-08-02 15:27:10 +08:00
										 |  |  | 		return nil, NewBadRequestError("expecting resources with an apiVersion") | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 	if gvk.Group != "" && gvk.Group != key.Group { | 
					
						
							| 
									
										
										
										
											2024-08-02 15:27:10 +08:00
										 |  |  | 		return nil, NewBadRequestError( | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 			fmt.Sprintf("group in key does not match group in the body (%s != %s)", key.Group, gvk.Group), | 
					
						
							|  |  |  | 		) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// This needs to be a create function
 | 
					
						
							|  |  |  | 	if key.Name == "" { | 
					
						
							|  |  |  | 		if obj.GetName() == "" { | 
					
						
							| 
									
										
										
										
											2024-08-02 15:27:10 +08:00
										 |  |  | 			return nil, NewBadRequestError("missing name") | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 		} | 
					
						
							|  |  |  | 		key.Name = obj.GetName() | 
					
						
							|  |  |  | 	} else if key.Name != obj.GetName() { | 
					
						
							| 
									
										
										
										
											2024-08-02 15:27:10 +08:00
										 |  |  | 		return nil, NewBadRequestError( | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 			fmt.Sprintf("key/name do not match (key: %s, name: %s)", key.Name, obj.GetName())) | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-08-02 15:27:10 +08:00
										 |  |  | 	if err := validateName(obj.GetName()); err != nil { | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 		return nil, err | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-01-15 16:23:56 +08:00
										 |  |  | 	// We only set name for update checks
 | 
					
						
							|  |  |  | 	if check.Verb == utils.VerbUpdate { | 
					
						
							|  |  |  | 		check.Name = key.Name | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-11-13 20:17:15 +08:00
										 |  |  | 	check.Folder = obj.GetFolder() | 
					
						
							|  |  |  | 	a, err := s.access.Check(ctx, user, check) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return nil, AsErrorResult(err) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	if !a.Allowed { | 
					
						
							|  |  |  | 		return nil, &ErrorResult{ | 
					
						
							|  |  |  | 			Code: http.StatusForbidden, | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-11-13 20:17:15 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-11-15 22:26:14 +08:00
										 |  |  | 	repo, err := obj.GetRepositoryInfo() | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 	if err != nil { | 
					
						
							| 
									
										
										
										
											2024-11-15 22:26:14 +08:00
										 |  |  | 		return nil, NewBadRequestError("invalid repository info") | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-11-15 22:26:14 +08:00
										 |  |  | 	if repo != nil { | 
					
						
							|  |  |  | 		err = s.writeHooks.CanWriteValueFromRepository(ctx, user, repo.Name) | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 		if err != nil { | 
					
						
							| 
									
										
										
										
											2024-08-02 15:27:10 +08:00
										 |  |  | 			return nil, AsErrorResult(err) | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return event, nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (s *server) Create(ctx context.Context, req *CreateRequest) (*CreateResponse, error) { | 
					
						
							|  |  |  | 	ctx, span := s.tracer.Start(ctx, "storage_server.Create") | 
					
						
							|  |  |  | 	defer span.End() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	rsp := &CreateResponse{} | 
					
						
							| 
									
										
										
										
											2025-01-21 17:06:55 +08:00
										 |  |  | 	user, ok := claims.AuthInfoFrom(ctx) | 
					
						
							| 
									
										
										
										
											2024-08-12 14:49:34 +08:00
										 |  |  | 	if !ok || user == nil { | 
					
						
							| 
									
										
										
										
											2024-08-02 15:27:10 +08:00
										 |  |  | 		rsp.Error = &ErrorResult{ | 
					
						
							|  |  |  | 			Message: "no user found in context", | 
					
						
							|  |  |  | 			Code:    http.StatusUnauthorized, | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		return rsp, nil | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-30 18:16:16 +08:00
										 |  |  | 	found := s.backend.ReadResource(ctx, &ReadRequest{Key: req.Key}) | 
					
						
							| 
									
										
										
										
											2024-07-16 22:43:15 +08:00
										 |  |  | 	if found != nil && len(found.Value) > 0 { | 
					
						
							|  |  |  | 		rsp.Error = &ErrorResult{ | 
					
						
							|  |  |  | 			Code:    http.StatusConflict, | 
					
						
							| 
									
										
										
										
											2024-11-13 20:17:15 +08:00
										 |  |  | 			Message: "key already exists", // TODO?? soft delete replace?
 | 
					
						
							| 
									
										
										
										
											2024-07-16 22:43:15 +08:00
										 |  |  | 		} | 
					
						
							|  |  |  | 		return rsp, nil | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-08-02 15:27:10 +08:00
										 |  |  | 	event, e := s.newEvent(ctx, user, req.Key, req.Value, nil) | 
					
						
							|  |  |  | 	if e != nil { | 
					
						
							|  |  |  | 		rsp.Error = e | 
					
						
							| 
									
										
										
										
											2024-07-30 18:16:16 +08:00
										 |  |  | 		return rsp, nil | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-11-13 20:17:15 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-08-12 14:49:34 +08:00
										 |  |  | 	var err error | 
					
						
							| 
									
										
										
										
											2024-07-17 22:40:03 +08:00
										 |  |  | 	rsp.ResourceVersion, err = s.backend.WriteEvent(ctx, *event) | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 	if err != nil { | 
					
						
							| 
									
										
										
										
											2024-07-30 18:16:16 +08:00
										 |  |  | 		rsp.Error = AsErrorResult(err) | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-10-07 16:01:53 +08:00
										 |  |  | 	s.log.Debug("server.WriteEvent", "type", event.Type, "rv", rsp.ResourceVersion, "previousRV", event.PreviousRV, "group", event.Key.Group, "namespace", event.Key.Namespace, "name", event.Key.Name, "resource", event.Key.Resource) | 
					
						
							| 
									
										
										
										
											2024-07-30 18:16:16 +08:00
										 |  |  | 	return rsp, nil | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (s *server) Update(ctx context.Context, req *UpdateRequest) (*UpdateResponse, error) { | 
					
						
							|  |  |  | 	ctx, span := s.tracer.Start(ctx, "storage_server.Update") | 
					
						
							|  |  |  | 	defer span.End() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	rsp := &UpdateResponse{} | 
					
						
							| 
									
										
										
										
											2025-01-21 17:06:55 +08:00
										 |  |  | 	user, ok := claims.AuthInfoFrom(ctx) | 
					
						
							| 
									
										
										
										
											2024-08-12 14:49:34 +08:00
										 |  |  | 	if !ok || user == nil { | 
					
						
							| 
									
										
										
										
											2024-08-02 15:27:10 +08:00
										 |  |  | 		rsp.Error = &ErrorResult{ | 
					
						
							|  |  |  | 			Message: "no user found in context", | 
					
						
							|  |  |  | 			Code:    http.StatusUnauthorized, | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		return rsp, nil | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 	if req.ResourceVersion < 0 { | 
					
						
							| 
									
										
										
										
											2024-07-30 18:16:16 +08:00
										 |  |  | 		rsp.Error = AsErrorResult(apierrors.NewBadRequest("update must include the previous version")) | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 		return rsp, nil | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-30 18:16:16 +08:00
										 |  |  | 	latest := s.backend.ReadResource(ctx, &ReadRequest{ | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 		Key: req.Key, | 
					
						
							|  |  |  | 	}) | 
					
						
							| 
									
										
										
										
											2024-07-30 18:16:16 +08:00
										 |  |  | 	if latest.Error != nil { | 
					
						
							|  |  |  | 		return rsp, nil | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 	if latest.Value == nil { | 
					
						
							| 
									
										
										
										
											2024-07-30 18:16:16 +08:00
										 |  |  | 		rsp.Error = NewBadRequestError("current value does not exist") | 
					
						
							|  |  |  | 		return rsp, nil | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-13 07:01:24 +08:00
										 |  |  | 	if req.ResourceVersion > 0 && latest.ResourceVersion != req.ResourceVersion { | 
					
						
							|  |  |  | 		return nil, ErrOptimisticLockingFailed | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-08-02 15:27:10 +08:00
										 |  |  | 	event, e := s.newEvent(ctx, user, req.Key, req.Value, latest.Value) | 
					
						
							|  |  |  | 	if e != nil { | 
					
						
							|  |  |  | 		rsp.Error = e | 
					
						
							| 
									
										
										
										
											2024-08-12 14:49:34 +08:00
										 |  |  | 		return rsp, nil | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	event.Type = WatchEvent_MODIFIED | 
					
						
							|  |  |  | 	event.PreviousRV = latest.ResourceVersion | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-08-12 14:49:34 +08:00
										 |  |  | 	var err error | 
					
						
							| 
									
										
										
										
											2024-07-17 22:40:03 +08:00
										 |  |  | 	rsp.ResourceVersion, err = s.backend.WriteEvent(ctx, *event) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							| 
									
										
										
										
											2024-07-30 18:16:16 +08:00
										 |  |  | 		rsp.Error = AsErrorResult(err) | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-07-30 18:16:16 +08:00
										 |  |  | 	return rsp, nil | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (s *server) Delete(ctx context.Context, req *DeleteRequest) (*DeleteResponse, error) { | 
					
						
							|  |  |  | 	ctx, span := s.tracer.Start(ctx, "storage_server.Delete") | 
					
						
							|  |  |  | 	defer span.End() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	rsp := &DeleteResponse{} | 
					
						
							|  |  |  | 	if req.ResourceVersion < 0 { | 
					
						
							|  |  |  | 		return nil, apierrors.NewBadRequest("update must include the previous version") | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2025-01-21 17:06:55 +08:00
										 |  |  | 	user, ok := claims.AuthInfoFrom(ctx) | 
					
						
							| 
									
										
										
										
											2024-11-13 20:17:15 +08:00
										 |  |  | 	if !ok || user == nil { | 
					
						
							|  |  |  | 		rsp.Error = &ErrorResult{ | 
					
						
							|  |  |  | 			Message: "no user found in context", | 
					
						
							|  |  |  | 			Code:    http.StatusUnauthorized, | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		return rsp, nil | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-30 18:16:16 +08:00
										 |  |  | 	latest := s.backend.ReadResource(ctx, &ReadRequest{ | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 		Key: req.Key, | 
					
						
							|  |  |  | 	}) | 
					
						
							| 
									
										
										
										
											2024-07-30 18:16:16 +08:00
										 |  |  | 	if latest.Error != nil { | 
					
						
							|  |  |  | 		rsp.Error = latest.Error | 
					
						
							|  |  |  | 		return rsp, nil | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 	if req.ResourceVersion > 0 && latest.ResourceVersion != req.ResourceVersion { | 
					
						
							| 
									
										
										
										
											2024-07-30 18:16:16 +08:00
										 |  |  | 		rsp.Error = AsErrorResult(ErrOptimisticLockingFailed) | 
					
						
							|  |  |  | 		return rsp, nil | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-01-21 17:06:55 +08:00
										 |  |  | 	access, err := s.access.Check(ctx, user, claims.CheckRequest{ | 
					
						
							| 
									
										
										
										
											2024-11-13 20:17:15 +08:00
										 |  |  | 		Verb:      "delete", | 
					
						
							|  |  |  | 		Group:     req.Key.Group, | 
					
						
							|  |  |  | 		Resource:  req.Key.Resource, | 
					
						
							|  |  |  | 		Namespace: req.Key.Namespace, | 
					
						
							|  |  |  | 		Name:      req.Key.Name, | 
					
						
							|  |  |  | 		Folder:    latest.Folder, | 
					
						
							|  |  |  | 	}) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		rsp.Error = AsErrorResult(err) | 
					
						
							|  |  |  | 		return rsp, nil | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	if !access.Allowed { | 
					
						
							|  |  |  | 		rsp.Error = &ErrorResult{ | 
					
						
							|  |  |  | 			Code: http.StatusForbidden, | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		return rsp, nil | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 	now := metav1.NewTime(time.UnixMilli(s.now())) | 
					
						
							|  |  |  | 	event := WriteEvent{ | 
					
						
							|  |  |  | 		Key:        req.Key, | 
					
						
							|  |  |  | 		Type:       WatchEvent_DELETED, | 
					
						
							|  |  |  | 		PreviousRV: latest.ResourceVersion, | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2025-01-21 17:06:55 +08:00
										 |  |  | 	requester, ok := claims.AuthInfoFrom(ctx) | 
					
						
							| 
									
										
										
										
											2024-08-12 14:49:34 +08:00
										 |  |  | 	if !ok { | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 		return nil, apierrors.NewBadRequest("unable to get user") | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2025-01-17 20:54:25 +08:00
										 |  |  | 	marker := &unstructured.Unstructured{} | 
					
						
							| 
									
										
										
										
											2024-11-13 20:17:15 +08:00
										 |  |  | 	err = json.Unmarshal(latest.Value, marker) | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return nil, apierrors.NewBadRequest( | 
					
						
							|  |  |  | 			fmt.Sprintf("unable to read previous object, %v", err)) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	obj, err := utils.MetaAccessor(marker) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return nil, err | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	obj.SetDeletionTimestamp(&now) | 
					
						
							|  |  |  | 	obj.SetUpdatedTimestamp(&now.Time) | 
					
						
							|  |  |  | 	obj.SetManagedFields(nil) | 
					
						
							|  |  |  | 	obj.SetFinalizers(nil) | 
					
						
							| 
									
										
										
										
											2024-07-30 13:27:23 +08:00
										 |  |  | 	obj.SetUpdatedBy(requester.GetUID()) | 
					
						
							| 
									
										
										
										
											2025-01-17 20:54:25 +08:00
										 |  |  | 	obj.SetGeneration(utils.DeletedGeneration) | 
					
						
							|  |  |  | 	obj.SetAnnotation(utils.AnnoKeyKubectlLastAppliedConfig, "") // clears it
 | 
					
						
							|  |  |  | 	event.Value, err = marker.MarshalJSON() | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return nil, apierrors.NewBadRequest( | 
					
						
							|  |  |  | 			fmt.Sprintf("unable creating deletion marker, %v", err)) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	rsp.ResourceVersion, err = s.backend.WriteEvent(ctx, event) | 
					
						
							| 
									
										
										
										
											2024-07-30 18:16:16 +08:00
										 |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		rsp.Error = AsErrorResult(err) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return rsp, nil | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (s *server) Read(ctx context.Context, req *ReadRequest) (*ReadResponse, error) { | 
					
						
							| 
									
										
										
										
											2025-01-21 17:06:55 +08:00
										 |  |  | 	user, ok := claims.AuthInfoFrom(ctx) | 
					
						
							| 
									
										
										
										
											2024-11-13 20:17:15 +08:00
										 |  |  | 	if !ok || user == nil { | 
					
						
							|  |  |  | 		return &ReadResponse{ | 
					
						
							|  |  |  | 			Error: &ErrorResult{ | 
					
						
							|  |  |  | 				Message: "no user found in context", | 
					
						
							|  |  |  | 				Code:    http.StatusUnauthorized, | 
					
						
							|  |  |  | 			}}, nil | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	// if req.Key.Group == "" {
 | 
					
						
							| 
									
										
										
										
											2024-07-30 18:16:16 +08:00
										 |  |  | 	// 	status, _ := AsErrorResult(apierrors.NewBadRequest("missing group"))
 | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 	// 	return &ReadResponse{Status: status}, nil
 | 
					
						
							|  |  |  | 	// }
 | 
					
						
							|  |  |  | 	if req.Key.Resource == "" { | 
					
						
							| 
									
										
										
										
											2024-07-30 18:16:16 +08:00
										 |  |  | 		return &ReadResponse{Error: NewBadRequestError("missing resource")}, nil | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-30 18:16:16 +08:00
										 |  |  | 	rsp := s.backend.ReadResource(ctx, req) | 
					
						
							| 
									
										
										
										
											2024-11-13 20:17:15 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-01-21 17:06:55 +08:00
										 |  |  | 	a, err := s.access.Check(ctx, user, claims.CheckRequest{ | 
					
						
							| 
									
										
										
										
											2024-11-13 20:17:15 +08:00
										 |  |  | 		Verb:      "get", | 
					
						
							|  |  |  | 		Group:     req.Key.Group, | 
					
						
							|  |  |  | 		Resource:  req.Key.Resource, | 
					
						
							|  |  |  | 		Namespace: req.Key.Namespace, | 
					
						
							|  |  |  | 		Name:      req.Key.Name, | 
					
						
							|  |  |  | 		Folder:    rsp.Folder, | 
					
						
							|  |  |  | 	}) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return &ReadResponse{Error: AsErrorResult(err)}, nil | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	if !a.Allowed { | 
					
						
							|  |  |  | 		return &ReadResponse{ | 
					
						
							|  |  |  | 			Error: &ErrorResult{ | 
					
						
							|  |  |  | 				Code: http.StatusForbidden, | 
					
						
							|  |  |  | 			}}, nil | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-11-12 19:52:04 +08:00
										 |  |  | 	return &ReadResponse{ | 
					
						
							|  |  |  | 		ResourceVersion: rsp.ResourceVersion, | 
					
						
							|  |  |  | 		Value:           rsp.Value, | 
					
						
							|  |  |  | 		Error:           rsp.Error, | 
					
						
							|  |  |  | 	}, nil | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (s *server) List(ctx context.Context, req *ListRequest) (*ListResponse, error) { | 
					
						
							| 
									
										
										
										
											2024-10-25 05:05:33 +08:00
										 |  |  | 	ctx, span := s.tracer.Start(ctx, "storage_server.List") | 
					
						
							|  |  |  | 	defer span.End() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-01-17 20:54:25 +08:00
										 |  |  | 	// The history + trash queries do not yet support additional filters
 | 
					
						
							|  |  |  | 	if req.Source != ListRequest_STORE { | 
					
						
							|  |  |  | 		if len(req.Options.Fields) > 0 || len(req.Options.Labels) > 0 { | 
					
						
							|  |  |  | 			return &ListResponse{ | 
					
						
							|  |  |  | 				Error: NewBadRequestError("unexpected field/label selector for history query"), | 
					
						
							|  |  |  | 			}, nil | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-01-21 17:06:55 +08:00
										 |  |  | 	user, ok := claims.AuthInfoFrom(ctx) | 
					
						
							| 
									
										
										
										
											2024-11-13 20:17:15 +08:00
										 |  |  | 	if !ok || user == nil { | 
					
						
							|  |  |  | 		return &ListResponse{ | 
					
						
							|  |  |  | 			Error: &ErrorResult{ | 
					
						
							|  |  |  | 				Message: "no user found in context", | 
					
						
							|  |  |  | 				Code:    http.StatusUnauthorized, | 
					
						
							|  |  |  | 			}}, nil | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-01-17 20:54:25 +08:00
										 |  |  | 	// Do not allow label query for trash/history
 | 
					
						
							|  |  |  | 	for _, v := range req.Options.Labels { | 
					
						
							|  |  |  | 		if v.Key == utils.LabelKeyGetHistory || v.Key == utils.LabelKeyGetTrash { | 
					
						
							|  |  |  | 			return &ListResponse{Error: NewBadRequestError("history and trash must be requested as source")}, nil | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-31 17:05:59 +08:00
										 |  |  | 	if req.Limit < 1 { | 
					
						
							|  |  |  | 		req.Limit = 50 // default max 50 items in a page
 | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	maxPageBytes := 1024 * 1024 * 2 // 2mb/page
 | 
					
						
							|  |  |  | 	pageBytes := 0 | 
					
						
							|  |  |  | 	rsp := &ListResponse{} | 
					
						
							| 
									
										
										
										
											2024-11-13 20:17:15 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	key := req.Options.Key | 
					
						
							| 
									
										
										
										
											2025-01-21 17:06:55 +08:00
										 |  |  | 	checker, err := s.access.Compile(ctx, user, claims.ListRequest{ | 
					
						
							| 
									
										
										
										
											2024-11-13 20:17:15 +08:00
										 |  |  | 		Group:     key.Group, | 
					
						
							|  |  |  | 		Resource:  key.Resource, | 
					
						
							|  |  |  | 		Namespace: key.Namespace, | 
					
						
							| 
									
										
										
										
											2025-01-16 21:11:55 +08:00
										 |  |  | 		Verb:      utils.VerbGet, | 
					
						
							| 
									
										
										
										
											2024-11-13 20:17:15 +08:00
										 |  |  | 	}) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return &ListResponse{Error: AsErrorResult(err)}, nil | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	if checker == nil { | 
					
						
							|  |  |  | 		return &ListResponse{Error: &ErrorResult{ | 
					
						
							|  |  |  | 			Code: http.StatusForbidden, | 
					
						
							|  |  |  | 		}}, nil | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-31 17:05:59 +08:00
										 |  |  | 	rv, err := s.backend.ListIterator(ctx, req, func(iter ListIterator) error { | 
					
						
							|  |  |  | 		for iter.Next() { | 
					
						
							|  |  |  | 			if err := iter.Error(); err != nil { | 
					
						
							|  |  |  | 				return err | 
					
						
							|  |  |  | 			} | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-31 17:05:59 +08:00
										 |  |  | 			item := &ResourceWrapper{ | 
					
						
							|  |  |  | 				ResourceVersion: iter.ResourceVersion(), | 
					
						
							|  |  |  | 				Value:           iter.Value(), | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-11-13 20:17:15 +08:00
										 |  |  | 			if !checker(iter.Namespace(), iter.Name(), iter.Folder()) { | 
					
						
							|  |  |  | 				continue | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-31 17:05:59 +08:00
										 |  |  | 			pageBytes += len(item.Value) | 
					
						
							|  |  |  | 			rsp.Items = append(rsp.Items, item) | 
					
						
							|  |  |  | 			if len(rsp.Items) >= int(req.Limit) || pageBytes >= maxPageBytes { | 
					
						
							|  |  |  | 				t := iter.ContinueToken() | 
					
						
							| 
									
										
										
										
											2025-01-28 22:17:52 +08:00
										 |  |  | 				if req.Source == ListRequest_HISTORY { | 
					
						
							|  |  |  | 					// history lists in desc order, so the continue token takes the
 | 
					
						
							|  |  |  | 					// final RV in the list, and then will start from there in the next page,
 | 
					
						
							|  |  |  | 					// rather than the lists first RV
 | 
					
						
							|  |  |  | 					t = iter.ContinueTokenWithCurrentRV() | 
					
						
							|  |  |  | 				} | 
					
						
							| 
									
										
										
										
											2024-07-31 17:05:59 +08:00
										 |  |  | 				if iter.Next() { | 
					
						
							|  |  |  | 					rsp.NextPageToken = t | 
					
						
							|  |  |  | 				} | 
					
						
							| 
									
										
										
										
											2025-01-28 22:17:52 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-31 17:05:59 +08:00
										 |  |  | 				break | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		return nil | 
					
						
							|  |  |  | 	}) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		rsp.Error = AsErrorResult(err) | 
					
						
							|  |  |  | 		return rsp, nil | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	if rv < 1 { | 
					
						
							|  |  |  | 		rsp.Error = &ErrorResult{ | 
					
						
							|  |  |  | 			Code:    http.StatusInternalServerError, | 
					
						
							|  |  |  | 			Message: fmt.Sprintf("invalid resource version for list: %v", rv), | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		return rsp, nil | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	rsp.ResourceVersion = rv | 
					
						
							|  |  |  | 	return rsp, err | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-12-14 06:55:43 +08:00
										 |  |  | func (s *server) Restore(ctx context.Context, req *RestoreRequest) (*RestoreResponse, error) { | 
					
						
							|  |  |  | 	ctx, span := s.tracer.Start(ctx, "storage_server.List") | 
					
						
							|  |  |  | 	defer span.End() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// check that the user has access
 | 
					
						
							| 
									
										
										
										
											2025-01-21 17:06:55 +08:00
										 |  |  | 	user, ok := claims.AuthInfoFrom(ctx) | 
					
						
							| 
									
										
										
										
											2024-12-14 06:55:43 +08:00
										 |  |  | 	if !ok || user == nil { | 
					
						
							|  |  |  | 		return &RestoreResponse{ | 
					
						
							|  |  |  | 			Error: &ErrorResult{ | 
					
						
							|  |  |  | 				Message: "no user found in context", | 
					
						
							|  |  |  | 				Code:    http.StatusUnauthorized, | 
					
						
							|  |  |  | 			}}, nil | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	if err := s.Init(ctx); err != nil { | 
					
						
							|  |  |  | 		return nil, err | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-01-21 17:06:55 +08:00
										 |  |  | 	checker, err := s.access.Compile(ctx, user, claims.ListRequest{ | 
					
						
							| 
									
										
										
										
											2024-12-14 06:55:43 +08:00
										 |  |  | 		Group:     req.Key.Group, | 
					
						
							|  |  |  | 		Resource:  req.Key.Resource, | 
					
						
							|  |  |  | 		Namespace: req.Key.Namespace, | 
					
						
							| 
									
										
										
										
											2025-01-16 21:11:55 +08:00
										 |  |  | 		Verb:      utils.VerbGet, | 
					
						
							| 
									
										
										
										
											2024-12-14 06:55:43 +08:00
										 |  |  | 	}) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return &RestoreResponse{Error: AsErrorResult(err)}, nil | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	if checker == nil { | 
					
						
							|  |  |  | 		return &RestoreResponse{Error: &ErrorResult{ | 
					
						
							|  |  |  | 			Code: http.StatusForbidden, | 
					
						
							|  |  |  | 		}}, nil | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// get the asked for resource version to restore
 | 
					
						
							|  |  |  | 	readRsp, err := s.Read(ctx, &ReadRequest{ | 
					
						
							|  |  |  | 		Key:             req.Key, | 
					
						
							|  |  |  | 		ResourceVersion: req.ResourceVersion, | 
					
						
							|  |  |  | 		IncludeDeleted:  true, | 
					
						
							|  |  |  | 	}) | 
					
						
							|  |  |  | 	if err != nil || readRsp == nil || readRsp.Error != nil { | 
					
						
							|  |  |  | 		return &RestoreResponse{ | 
					
						
							|  |  |  | 			Error: &ErrorResult{ | 
					
						
							|  |  |  | 				Code:    http.StatusNotFound, | 
					
						
							|  |  |  | 				Message: fmt.Sprintf("could not find old resource: %s", readRsp.Error.Message), | 
					
						
							|  |  |  | 			}, | 
					
						
							|  |  |  | 		}, nil | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// generate a new k8s UID when restoring. The name will remain the same
 | 
					
						
							|  |  |  | 	// (for dashboards, this will be the dashboard uid), but since controllers
 | 
					
						
							|  |  |  | 	// will see this as a create event, we do not want the same k8s UID, or
 | 
					
						
							|  |  |  | 	// there may be unintended behavior
 | 
					
						
							|  |  |  | 	newUid := types.UID(uuid.NewString()) | 
					
						
							|  |  |  | 	tmp := &unstructured.Unstructured{} | 
					
						
							|  |  |  | 	err = tmp.UnmarshalJSON(readRsp.Value) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return &RestoreResponse{ | 
					
						
							|  |  |  | 			Error: &ErrorResult{ | 
					
						
							|  |  |  | 				Code:    http.StatusNotFound, | 
					
						
							|  |  |  | 				Message: fmt.Sprintf("could not unmarhsal: %s", err.Error()), | 
					
						
							|  |  |  | 			}, | 
					
						
							|  |  |  | 		}, nil | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	obj, err := utils.MetaAccessor(tmp) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return &RestoreResponse{ | 
					
						
							|  |  |  | 			Error: &ErrorResult{ | 
					
						
							|  |  |  | 				Code:    http.StatusNotFound, | 
					
						
							|  |  |  | 				Message: fmt.Sprintf("could not get object: %s", err.Error()), | 
					
						
							|  |  |  | 			}, | 
					
						
							|  |  |  | 		}, nil | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	obj.SetUID(newUid) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	rtObj, ok := obj.GetRuntimeObject() | 
					
						
							|  |  |  | 	if !ok { | 
					
						
							|  |  |  | 		return &RestoreResponse{ | 
					
						
							|  |  |  | 			Error: &ErrorResult{ | 
					
						
							|  |  |  | 				Code:    http.StatusNotFound, | 
					
						
							|  |  |  | 				Message: "could not get runtime object", | 
					
						
							|  |  |  | 			}, | 
					
						
							|  |  |  | 		}, nil | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	newObj, err := json.Marshal(rtObj) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return &RestoreResponse{ | 
					
						
							|  |  |  | 			Error: &ErrorResult{ | 
					
						
							|  |  |  | 				Code:    http.StatusNotFound, | 
					
						
							|  |  |  | 				Message: fmt.Sprintf("could not marshal object: %s", err.Error()), | 
					
						
							|  |  |  | 			}, | 
					
						
							|  |  |  | 		}, nil | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// finally, send to the backend to create & update the history of the restored object
 | 
					
						
							|  |  |  | 	event, errRes := s.newEvent(ctx, user, req.Key, newObj, readRsp.Value) | 
					
						
							|  |  |  | 	if errRes != nil { | 
					
						
							|  |  |  | 		return &RestoreResponse{ | 
					
						
							|  |  |  | 			Error: &ErrorResult{ | 
					
						
							|  |  |  | 				Code:    http.StatusInternalServerError, | 
					
						
							|  |  |  | 				Message: fmt.Sprintf("could not create restore resource event: %s", errRes.Message), | 
					
						
							|  |  |  | 			}, | 
					
						
							|  |  |  | 		}, nil | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	rv, err := s.backend.WriteEvent(ctx, *event) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return &RestoreResponse{ | 
					
						
							|  |  |  | 			Error: &ErrorResult{ | 
					
						
							|  |  |  | 				Code:    http.StatusInternalServerError, | 
					
						
							|  |  |  | 				Message: fmt.Sprintf("could not restore resource: %s", err.Error()), | 
					
						
							|  |  |  | 			}, | 
					
						
							|  |  |  | 		}, nil | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	return &RestoreResponse{ | 
					
						
							|  |  |  | 		Error:           nil, | 
					
						
							|  |  |  | 		ResourceVersion: rv, | 
					
						
							|  |  |  | 	}, nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | func (s *server) initWatcher() error { | 
					
						
							|  |  |  | 	var err error | 
					
						
							|  |  |  | 	s.broadcaster, err = NewBroadcaster(s.ctx, func(out chan<- *WrittenEvent) error { | 
					
						
							|  |  |  | 		events, err := s.backend.WatchWriteEvents(s.ctx) | 
					
						
							|  |  |  | 		if err != nil { | 
					
						
							|  |  |  | 			return err | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		go func() { | 
					
						
							|  |  |  | 			for { | 
					
						
							|  |  |  | 				// pipe all events
 | 
					
						
							|  |  |  | 				v := <-events | 
					
						
							| 
									
										
										
										
											2025-02-12 01:57:46 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 				// Skip events during batch updates
 | 
					
						
							|  |  |  | 				if v.PreviousRV < 0 { | 
					
						
							|  |  |  | 					continue | 
					
						
							|  |  |  | 				} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-10-07 16:01:53 +08:00
										 |  |  | 				s.log.Debug("Server. Streaming Event", "type", v.Type, "previousRV", v.PreviousRV, "group", v.Key.Group, "namespace", v.Key.Namespace, "resource", v.Key.Resource, "name", v.Key.Name) | 
					
						
							|  |  |  | 				s.mostRecentRV.Store(v.ResourceVersion) | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 				out <- v | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 		}() | 
					
						
							|  |  |  | 		return nil | 
					
						
							|  |  |  | 	}) | 
					
						
							|  |  |  | 	return err | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-11-13 20:17:15 +08:00
										 |  |  | //nolint:gocyclo
 | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | func (s *server) Watch(req *WatchRequest, srv ResourceStore_WatchServer) error { | 
					
						
							| 
									
										
										
										
											2024-07-23 01:08:30 +08:00
										 |  |  | 	ctx := srv.Context() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-01-21 17:06:55 +08:00
										 |  |  | 	user, ok := claims.AuthInfoFrom(ctx) | 
					
						
							| 
									
										
										
										
											2024-11-13 20:17:15 +08:00
										 |  |  | 	if !ok || user == nil { | 
					
						
							|  |  |  | 		return apierrors.NewUnauthorized("no user found in context") | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	key := req.Options.Key | 
					
						
							| 
									
										
										
										
											2025-01-21 17:06:55 +08:00
										 |  |  | 	checker, err := s.access.Compile(ctx, user, claims.ListRequest{ | 
					
						
							| 
									
										
										
										
											2024-11-13 20:17:15 +08:00
										 |  |  | 		Group:     key.Group, | 
					
						
							|  |  |  | 		Resource:  key.Resource, | 
					
						
							|  |  |  | 		Namespace: key.Namespace, | 
					
						
							|  |  |  | 	}) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return err | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	if checker == nil { | 
					
						
							|  |  |  | 		return apierrors.NewUnauthorized("not allowed to list anything") // ?? or a single error?
 | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-10-07 16:01:53 +08:00
										 |  |  | 	// Start listening -- this will buffer any changes that happen while we backfill.
 | 
					
						
							|  |  |  | 	// If events are generated faster than we can process them, then some events will be dropped.
 | 
					
						
							|  |  |  | 	// TODO: Think of a way to allow the client to catch up.
 | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 	stream, err := s.broadcaster.Subscribe(ctx) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return err | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	defer s.broadcaster.Unsubscribe(stream) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-10-07 16:01:53 +08:00
										 |  |  | 	if !req.SendInitialEvents && req.Since == 0 { | 
					
						
							|  |  |  | 		// This is a temporary hack only relevant for tests to ensure that the first events are sent.
 | 
					
						
							|  |  |  | 		// This is required because the SQL backend polls the database every 100ms.
 | 
					
						
							|  |  |  | 		// TODO: Implement a getLatestResourceVersion method in the backend.
 | 
					
						
							|  |  |  | 		time.Sleep(10 * time.Millisecond) | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-10-02 02:45:47 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-10-07 16:01:53 +08:00
										 |  |  | 	mostRecentRV := s.mostRecentRV.Load() // get the latest resource version
 | 
					
						
							|  |  |  | 	var initialEventsRV int64             // resource version coming from the initial events
 | 
					
						
							|  |  |  | 	if req.SendInitialEvents { | 
					
						
							|  |  |  | 		// Backfill the stream by adding every existing entities.
 | 
					
						
							|  |  |  | 		initialEventsRV, err = s.backend.ListIterator(ctx, &ListRequest{Options: req.Options}, func(iter ListIterator) error { | 
					
						
							|  |  |  | 			for iter.Next() { | 
					
						
							|  |  |  | 				if err := iter.Error(); err != nil { | 
					
						
							|  |  |  | 					return err | 
					
						
							|  |  |  | 				} | 
					
						
							|  |  |  | 				if err := srv.Send(&WatchEvent{ | 
					
						
							|  |  |  | 					Type: WatchEvent_ADDED, | 
					
						
							|  |  |  | 					Resource: &WatchEvent_Resource{ | 
					
						
							|  |  |  | 						Value:   iter.Value(), | 
					
						
							|  |  |  | 						Version: iter.ResourceVersion(), | 
					
						
							|  |  |  | 					}, | 
					
						
							|  |  |  | 				}); err != nil { | 
					
						
							|  |  |  | 					return err | 
					
						
							|  |  |  | 				} | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 			return nil | 
					
						
							|  |  |  | 		}) | 
					
						
							|  |  |  | 		if err != nil { | 
					
						
							|  |  |  | 			return err | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	if req.SendInitialEvents && req.AllowWatchBookmarks { | 
					
						
							|  |  |  | 		if err := srv.Send(&WatchEvent{ | 
					
						
							|  |  |  | 			Type: WatchEvent_BOOKMARK, | 
					
						
							|  |  |  | 			Resource: &WatchEvent_Resource{ | 
					
						
							|  |  |  | 				Version: initialEventsRV, | 
					
						
							|  |  |  | 			}, | 
					
						
							|  |  |  | 		}); err != nil { | 
					
						
							|  |  |  | 			return err | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-10-07 16:01:53 +08:00
										 |  |  | 	var since int64 // resource version to start watching from
 | 
					
						
							|  |  |  | 	switch { | 
					
						
							|  |  |  | 	case req.SendInitialEvents: | 
					
						
							|  |  |  | 		since = initialEventsRV | 
					
						
							|  |  |  | 	case req.Since == 0: | 
					
						
							|  |  |  | 		since = mostRecentRV | 
					
						
							|  |  |  | 	default: | 
					
						
							|  |  |  | 		since = req.Since | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 	for { | 
					
						
							|  |  |  | 		select { | 
					
						
							|  |  |  | 		case <-ctx.Done(): | 
					
						
							|  |  |  | 			return nil | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		case event, ok := <-stream: | 
					
						
							|  |  |  | 			if !ok { | 
					
						
							|  |  |  | 				s.log.Debug("watch events closed") | 
					
						
							|  |  |  | 				return nil | 
					
						
							|  |  |  | 			} | 
					
						
							| 
									
										
										
										
											2024-10-07 16:01:53 +08:00
										 |  |  | 			s.log.Debug("Server Broadcasting", "type", event.Type, "rv", event.ResourceVersion, "previousRV", event.PreviousRV, "group", event.Key.Group, "namespace", event.Key.Namespace, "resource", event.Key.Resource, "name", event.Key.Name) | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 			if event.ResourceVersion > since && matchesQueryKey(req.Options.Key, event.Key) { | 
					
						
							| 
									
										
										
										
											2024-11-13 20:17:15 +08:00
										 |  |  | 				if !checker(event.Key.Namespace, event.Key.Name, event.Folder) { | 
					
						
							|  |  |  | 					continue | 
					
						
							|  |  |  | 				} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-10-07 16:01:53 +08:00
										 |  |  | 				value := event.Value | 
					
						
							|  |  |  | 				// remove the delete marker stored in the value for deleted objects
 | 
					
						
							|  |  |  | 				if event.Type == WatchEvent_DELETED { | 
					
						
							|  |  |  | 					value = []byte{} | 
					
						
							|  |  |  | 				} | 
					
						
							|  |  |  | 				resp := &WatchEvent{ | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 					Timestamp: event.Timestamp, | 
					
						
							|  |  |  | 					Type:      event.Type, | 
					
						
							|  |  |  | 					Resource: &WatchEvent_Resource{ | 
					
						
							| 
									
										
										
										
											2024-10-07 16:01:53 +08:00
										 |  |  | 						Value:   value, | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 						Version: event.ResourceVersion, | 
					
						
							|  |  |  | 					}, | 
					
						
							| 
									
										
										
										
											2024-10-07 16:01:53 +08:00
										 |  |  | 				} | 
					
						
							|  |  |  | 				if event.PreviousRV > 0 { | 
					
						
							|  |  |  | 					prevObj, err := s.Read(ctx, &ReadRequest{Key: event.Key, ResourceVersion: event.PreviousRV}) | 
					
						
							|  |  |  | 					if err != nil { | 
					
						
							|  |  |  | 						// This scenario should never happen, but if it does, we should log it and continue
 | 
					
						
							|  |  |  | 						// sending the event without the previous object. The client will decide what to do.
 | 
					
						
							|  |  |  | 						s.log.Error("error reading previous object", "key", event.Key, "resource_version", event.PreviousRV, "error", prevObj.Error) | 
					
						
							|  |  |  | 					} else { | 
					
						
							|  |  |  | 						if prevObj.ResourceVersion != event.PreviousRV { | 
					
						
							|  |  |  | 							s.log.Error("resource version mismatch", "key", event.Key, "resource_version", event.PreviousRV, "actual", prevObj.ResourceVersion) | 
					
						
							|  |  |  | 							return fmt.Errorf("resource version mismatch") | 
					
						
							|  |  |  | 						} | 
					
						
							|  |  |  | 						resp.Previous = &WatchEvent_Resource{ | 
					
						
							|  |  |  | 							Value:   prevObj.Value, | 
					
						
							|  |  |  | 							Version: prevObj.ResourceVersion, | 
					
						
							|  |  |  | 						} | 
					
						
							|  |  |  | 					} | 
					
						
							|  |  |  | 				} | 
					
						
							|  |  |  | 				if err := srv.Send(resp); err != nil { | 
					
						
							| 
									
										
										
										
											2024-08-05 22:30:14 +08:00
										 |  |  | 					return err | 
					
						
							|  |  |  | 				} | 
					
						
							| 
									
										
										
										
											2024-10-22 07:15:11 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 				// record latency - resource version is a unix timestamp in microseconds so we convert to seconds
 | 
					
						
							|  |  |  | 				latencySeconds := float64(time.Now().UnixMicro()-event.ResourceVersion) / 1e6 | 
					
						
							|  |  |  | 				if latencySeconds > 0 { | 
					
						
							|  |  |  | 					StorageServerMetrics.WatchEventLatency.WithLabelValues(event.Key.Resource).Observe(latencySeconds) | 
					
						
							|  |  |  | 				} | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | 			} | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-11-27 13:57:53 +08:00
										 |  |  | func (s *server) Search(ctx context.Context, req *ResourceSearchRequest) (*ResourceSearchResponse, error) { | 
					
						
							|  |  |  | 	if s.search == nil { | 
					
						
							| 
									
										
										
										
											2024-11-23 04:26:56 +08:00
										 |  |  | 		return nil, fmt.Errorf("search index not configured") | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-11-27 13:57:53 +08:00
										 |  |  | 	return s.search.Search(ctx, req) | 
					
						
							| 
									
										
										
										
											2024-10-01 03:46:14 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-12-11 02:37:37 +08:00
										 |  |  | // GetStats implements ResourceServer.
 | 
					
						
							|  |  |  | func (s *server) GetStats(ctx context.Context, req *ResourceStatsRequest) (*ResourceStatsResponse, error) { | 
					
						
							|  |  |  | 	if err := s.Init(ctx); err != nil { | 
					
						
							|  |  |  | 		return nil, err | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	if s.search == nil { | 
					
						
							|  |  |  | 		// If the backend implements "GetStats", we can use it
 | 
					
						
							|  |  |  | 		srv, ok := s.backend.(ResourceIndexServer) | 
					
						
							|  |  |  | 		if ok { | 
					
						
							|  |  |  | 			return srv.GetStats(ctx, req) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		return nil, fmt.Errorf("search index not configured") | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return s.search.GetStats(ctx, req) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-01-11 02:27:10 +08:00
										 |  |  | func (s *server) ListRepositoryObjects(ctx context.Context, req *ListRepositoryObjectsRequest) (*ListRepositoryObjectsResponse, error) { | 
					
						
							|  |  |  | 	return s.search.ListRepositoryObjects(ctx, req) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (s *server) CountRepositoryObjects(ctx context.Context, req *CountRepositoryObjectsRequest) (*CountRepositoryObjectsResponse, error) { | 
					
						
							|  |  |  | 	return s.search.CountRepositoryObjects(ctx, req) | 
					
						
							| 
									
										
										
										
											2024-10-08 21:43:23 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-10 06:08:13 +08:00
										 |  |  | // IsHealthy implements ResourceServer.
 | 
					
						
							|  |  |  | func (s *server) IsHealthy(ctx context.Context, req *HealthCheckRequest) (*HealthCheckResponse, error) { | 
					
						
							|  |  |  | 	return s.diagnostics.IsHealthy(ctx, req) | 
					
						
							|  |  |  | } | 
					
						
							| 
									
										
										
										
											2024-10-17 18:18:29 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | // GetBlob implements BlobStore.
 | 
					
						
							|  |  |  | func (s *server) PutBlob(ctx context.Context, req *PutBlobRequest) (*PutBlobResponse, error) { | 
					
						
							|  |  |  | 	if s.blob == nil { | 
					
						
							|  |  |  | 		return &PutBlobResponse{Error: &ErrorResult{ | 
					
						
							|  |  |  | 			Message: "blob store not configured", | 
					
						
							|  |  |  | 			Code:    http.StatusNotImplemented, | 
					
						
							|  |  |  | 		}}, nil | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	rsp, err := s.blob.PutResourceBlob(ctx, req) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		rsp.Error = AsErrorResult(err) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return rsp, nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (s *server) getPartialObject(ctx context.Context, key *ResourceKey, rv int64) (utils.GrafanaMetaAccessor, *ErrorResult) { | 
					
						
							| 
									
										
										
										
											2024-11-09 13:09:46 +08:00
										 |  |  | 	if r := verifyRequestKey(key); r != nil { | 
					
						
							|  |  |  | 		return nil, r | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-10-17 18:18:29 +08:00
										 |  |  | 	rsp := s.backend.ReadResource(ctx, &ReadRequest{ | 
					
						
							|  |  |  | 		Key:             key, | 
					
						
							|  |  |  | 		ResourceVersion: rv, | 
					
						
							|  |  |  | 	}) | 
					
						
							|  |  |  | 	if rsp.Error != nil { | 
					
						
							|  |  |  | 		return nil, rsp.Error | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	partial := &metav1.PartialObjectMetadata{} | 
					
						
							|  |  |  | 	err := json.Unmarshal(rsp.Value, partial) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return nil, AsErrorResult(err) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	obj, err := utils.MetaAccessor(partial) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return nil, AsErrorResult(err) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return obj, nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // GetBlob implements BlobStore.
 | 
					
						
							|  |  |  | func (s *server) GetBlob(ctx context.Context, req *GetBlobRequest) (*GetBlobResponse, error) { | 
					
						
							|  |  |  | 	if s.blob == nil { | 
					
						
							|  |  |  | 		return &GetBlobResponse{Error: &ErrorResult{ | 
					
						
							|  |  |  | 			Message: "blob store not configured", | 
					
						
							|  |  |  | 			Code:    http.StatusNotImplemented, | 
					
						
							|  |  |  | 		}}, nil | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// The linked blob is stored in the resource metadata attributes
 | 
					
						
							|  |  |  | 	obj, status := s.getPartialObject(ctx, req.Resource, req.ResourceVersion) | 
					
						
							|  |  |  | 	if status != nil { | 
					
						
							|  |  |  | 		return &GetBlobResponse{Error: status}, nil | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	info := obj.GetBlob() | 
					
						
							|  |  |  | 	if info == nil || info.UID == "" { | 
					
						
							|  |  |  | 		return &GetBlobResponse{Error: &ErrorResult{ | 
					
						
							|  |  |  | 			Message: "Resource does not have a linked blob", | 
					
						
							|  |  |  | 			Code:    404, | 
					
						
							|  |  |  | 		}}, nil | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	rsp, err := s.blob.GetResourceBlob(ctx, req.Resource, info, req.MustProxyBytes) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		rsp.Error = AsErrorResult(err) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return rsp, nil | 
					
						
							|  |  |  | } |