| 
									
										
										
										
											2024-06-14 19:24:36 +08:00
										 |  |  | package resource | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import ( | 
					
						
							|  |  |  | 	context "context" | 
					
						
							|  |  |  | 	"encoding/json" | 
					
						
							|  |  |  | 	"errors" | 
					
						
							|  |  |  | 	"fmt" | 
					
						
							| 
									
										
										
										
											2024-06-15 05:49:06 +08:00
										 |  |  | 	"math/rand" | 
					
						
							| 
									
										
										
										
											2024-06-14 19:24:36 +08:00
										 |  |  | 	"sync" | 
					
						
							|  |  |  | 	"time" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	"github.com/bwmarrin/snowflake" | 
					
						
							| 
									
										
										
										
											2024-06-18 21:07:55 +08:00
										 |  |  | 	"github.com/google/uuid" | 
					
						
							| 
									
										
										
										
											2024-06-14 19:24:36 +08:00
										 |  |  | 	"go.opentelemetry.io/otel/trace" | 
					
						
							|  |  |  | 	"go.opentelemetry.io/otel/trace/noop" | 
					
						
							|  |  |  | 	apierrors "k8s.io/apimachinery/pkg/api/errors" | 
					
						
							|  |  |  | 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | 
					
						
							| 
									
										
										
										
											2024-06-18 21:07:55 +08:00
										 |  |  | 	"k8s.io/apimachinery/pkg/types" | 
					
						
							| 
									
										
										
										
											2024-06-14 19:24:36 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	"github.com/grafana/grafana/pkg/apimachinery/identity" | 
					
						
							|  |  |  | 	"github.com/grafana/grafana/pkg/apimachinery/utils" | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Package-level errors.
 | 
					
						
							|  |  |  | var ( | 
					
						
							|  |  |  | 	ErrNotFound                  = errors.New("entity not found") | 
					
						
							|  |  |  | 	ErrOptimisticLockingFailed   = errors.New("optimistic locking failed") | 
					
						
							|  |  |  | 	ErrUserNotFoundInContext     = errors.New("user not found in context") | 
					
						
							|  |  |  | 	ErrUnableToReadResourceJSON  = errors.New("unable to read resource json") | 
					
						
							|  |  |  | 	ErrNextPageTokenNotSupported = errors.New("nextPageToken not yet supported") | 
					
						
							|  |  |  | 	ErrLimitNotSupported         = errors.New("limit not yet supported") | 
					
						
							|  |  |  | 	ErrNotImplementedYet         = errors.New("not implemented yet") | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // ResourceServer implements all services
 | 
					
						
							|  |  |  | type ResourceServer interface { | 
					
						
							|  |  |  | 	ResourceStoreServer | 
					
						
							| 
									
										
										
										
											2024-06-14 20:21:17 +08:00
										 |  |  | 	ResourceSearchServer | 
					
						
							|  |  |  | 	DiagnosticsServer | 
					
						
							| 
									
										
										
										
											2024-06-14 21:52:17 +08:00
										 |  |  | 	LifecycleHooks | 
					
						
							| 
									
										
										
										
											2024-06-14 19:24:36 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | type AppendingStore interface { | 
					
						
							|  |  |  | 	// Write a Create/Update/Delete,
 | 
					
						
							|  |  |  | 	// NOTE: the contents of WriteEvent have been validated
 | 
					
						
							|  |  |  | 	// Return the revisionVersion for this event or error
 | 
					
						
							| 
									
										
										
										
											2024-06-18 21:07:55 +08:00
										 |  |  | 	WriteEvent(context.Context, WriteEvent) (int64, error) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Create new name for a given resource
 | 
					
						
							|  |  |  | 	GenerateName(ctx context.Context, key *ResourceKey, prefix string) (string, error) | 
					
						
							| 
									
										
										
										
											2024-06-14 19:24:36 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	// Read a value from storage
 | 
					
						
							|  |  |  | 	Read(context.Context, *ReadRequest) (*ReadResponse, error) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Implement List -- this expects the read after write semantics
 | 
					
						
							|  |  |  | 	List(context.Context, *ListRequest) (*ListResponse, error) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Watch for events
 | 
					
						
							| 
									
										
										
										
											2024-06-14 22:41:16 +08:00
										 |  |  | 	Watch(context.Context, *WatchRequest) (chan *WatchEvent, error) | 
					
						
							| 
									
										
										
										
											2024-06-14 19:24:36 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | type ResourceServerOptions struct { | 
					
						
							|  |  |  | 	// OTel tracer
 | 
					
						
							|  |  |  | 	Tracer trace.Tracer | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// When running in a cluster, each node should have a different ID
 | 
					
						
							|  |  |  | 	// This is used for snowflake generation and log identification
 | 
					
						
							|  |  |  | 	NodeID int64 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Get the next EventID.  When not set, this will default to snowflake IDs
 | 
					
						
							|  |  |  | 	NextEventID func() int64 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Real storage backend
 | 
					
						
							|  |  |  | 	Store AppendingStore | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Real storage backend
 | 
					
						
							|  |  |  | 	Search ResourceSearchServer | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Diagnostics
 | 
					
						
							|  |  |  | 	Diagnostics DiagnosticsServer | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Check if a user has access to write folders
 | 
					
						
							|  |  |  | 	// When this is nil, no resources can have folders configured
 | 
					
						
							|  |  |  | 	WriteAccess WriteAccessHooks | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Callbacks for startup and shutdown
 | 
					
						
							|  |  |  | 	Lifecycle LifecycleHooks | 
					
						
							| 
									
										
										
										
											2024-06-18 21:07:55 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	// Get the current time in unix millis
 | 
					
						
							|  |  |  | 	Now func() int64 | 
					
						
							| 
									
										
										
										
											2024-06-14 19:24:36 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func NewResourceServer(opts ResourceServerOptions) (ResourceServer, error) { | 
					
						
							|  |  |  | 	if opts.Tracer == nil { | 
					
						
							|  |  |  | 		opts.Tracer = noop.NewTracerProvider().Tracer("resource-server") | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	if opts.NextEventID == nil { | 
					
						
							| 
									
										
										
										
											2024-06-15 05:49:06 +08:00
										 |  |  | 		if opts.NodeID == 0 { | 
					
						
							|  |  |  | 			opts.NodeID = rand.Int63n(1024) | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2024-06-14 19:24:36 +08:00
										 |  |  | 		eventNode, err := snowflake.NewNode(opts.NodeID) | 
					
						
							|  |  |  | 		if err != nil { | 
					
						
							|  |  |  | 			return nil, apierrors.NewInternalError( | 
					
						
							|  |  |  | 				fmt.Errorf("error initializing snowflake id generator :: %w", err)) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		opts.NextEventID = func() int64 { | 
					
						
							|  |  |  | 			return eventNode.Generate().Int64() | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	if opts.Store == nil { | 
					
						
							|  |  |  | 		return nil, fmt.Errorf("missing AppendingStore implementation") | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	if opts.Search == nil { | 
					
						
							| 
									
										
										
										
											2024-06-14 22:41:16 +08:00
										 |  |  | 		opts.Search = &noopService{} | 
					
						
							| 
									
										
										
										
											2024-06-14 19:24:36 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 	if opts.Diagnostics == nil { | 
					
						
							| 
									
										
										
										
											2024-06-14 22:41:16 +08:00
										 |  |  | 		opts.Search = &noopService{} | 
					
						
							| 
									
										
										
										
											2024-06-14 19:24:36 +08:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-06-18 21:07:55 +08:00
										 |  |  | 	if opts.Now == nil { | 
					
						
							|  |  |  | 		opts.Now = func() int64 { | 
					
						
							|  |  |  | 			return time.Now().UnixMilli() | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-06-14 19:24:36 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	return &server{ | 
					
						
							|  |  |  | 		tracer:      opts.Tracer, | 
					
						
							|  |  |  | 		nextEventID: opts.NextEventID, | 
					
						
							|  |  |  | 		store:       opts.Store, | 
					
						
							|  |  |  | 		search:      opts.Search, | 
					
						
							|  |  |  | 		diagnostics: opts.Diagnostics, | 
					
						
							|  |  |  | 		access:      opts.WriteAccess, | 
					
						
							|  |  |  | 		lifecycle:   opts.Lifecycle, | 
					
						
							| 
									
										
										
										
											2024-06-18 21:07:55 +08:00
										 |  |  | 		now:         opts.Now, | 
					
						
							| 
									
										
										
										
											2024-06-14 19:24:36 +08:00
										 |  |  | 	}, nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | var _ ResourceServer = &server{} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | type server struct { | 
					
						
							|  |  |  | 	tracer      trace.Tracer | 
					
						
							|  |  |  | 	nextEventID func() int64 | 
					
						
							|  |  |  | 	store       AppendingStore | 
					
						
							|  |  |  | 	search      ResourceSearchServer | 
					
						
							|  |  |  | 	diagnostics DiagnosticsServer | 
					
						
							|  |  |  | 	access      WriteAccessHooks | 
					
						
							|  |  |  | 	lifecycle   LifecycleHooks | 
					
						
							| 
									
										
										
										
											2024-06-18 21:07:55 +08:00
										 |  |  | 	now         func() int64 | 
					
						
							| 
									
										
										
										
											2024-06-14 19:24:36 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	// init checking
 | 
					
						
							|  |  |  | 	once    sync.Once | 
					
						
							|  |  |  | 	initErr error | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Init implements ResourceServer.
 | 
					
						
							|  |  |  | func (s *server) Init() error { | 
					
						
							|  |  |  | 	s.once.Do(func() { | 
					
						
							|  |  |  | 		// TODO, setup a broadcaster for watch
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		// Call lifecycle hooks
 | 
					
						
							|  |  |  | 		if s.lifecycle != nil { | 
					
						
							|  |  |  | 			err := s.lifecycle.Init() | 
					
						
							|  |  |  | 			if err != nil { | 
					
						
							|  |  |  | 				s.initErr = fmt.Errorf("initialize Resource Server: %w", err) | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	}) | 
					
						
							|  |  |  | 	return s.initErr | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (s *server) Stop() { | 
					
						
							|  |  |  | 	s.initErr = fmt.Errorf("service is stopping") | 
					
						
							|  |  |  | 	if s.lifecycle != nil { | 
					
						
							|  |  |  | 		s.lifecycle.Stop() | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	s.initErr = fmt.Errorf("service is stopped") | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-06-18 21:07:55 +08:00
										 |  |  | // Old value indicates an update -- otherwise a create
 | 
					
						
							|  |  |  | func (s *server) newEventBuilder(ctx context.Context, key *ResourceKey, value, oldValue []byte) (*writeEventBuilder, error) { | 
					
						
							|  |  |  | 	event, err := newEventFromBytes(value, oldValue) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return nil, err | 
					
						
							| 
									
										
										
										
											2024-06-14 19:24:36 +08:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-06-18 21:07:55 +08:00
										 |  |  | 	event.EventID = s.nextEventID() | 
					
						
							|  |  |  | 	event.Key = key | 
					
						
							| 
									
										
										
										
											2024-06-14 19:24:36 +08:00
										 |  |  | 	event.Requester, err = identity.GetRequester(ctx) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return nil, ErrUserNotFoundInContext | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-06-18 21:07:55 +08:00
										 |  |  | 	obj := event.Meta | 
					
						
							|  |  |  | 	if key.Name != obj.GetName() { | 
					
						
							|  |  |  | 		return nil, apierrors.NewBadRequest("key/name do not match") | 
					
						
							| 
									
										
										
										
											2024-06-14 19:24:36 +08:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-06-18 21:07:55 +08:00
										 |  |  | 	if key.Namespace != obj.GetNamespace() { | 
					
						
							|  |  |  | 		return nil, apierrors.NewBadRequest("key/namespace do not match") | 
					
						
							| 
									
										
										
										
											2024-06-14 19:24:36 +08:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-06-18 21:07:55 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-06-14 19:24:36 +08:00
										 |  |  | 	gvk := obj.GetGroupVersionKind() | 
					
						
							|  |  |  | 	if gvk.Kind == "" { | 
					
						
							|  |  |  | 		return nil, apierrors.NewBadRequest("expecting resources with a kind in the body") | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	if gvk.Version == "" { | 
					
						
							|  |  |  | 		return nil, apierrors.NewBadRequest("expecting resources with an apiVersion") | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	if gvk.Group != "" && gvk.Group != key.Group { | 
					
						
							|  |  |  | 		return nil, apierrors.NewBadRequest( | 
					
						
							|  |  |  | 			fmt.Sprintf("group in key does not match group in the body (%s != %s)", key.Group, gvk.Group), | 
					
						
							|  |  |  | 		) | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-06-18 21:07:55 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	// This needs to be a create function
 | 
					
						
							|  |  |  | 	if key.Name == "" { | 
					
						
							|  |  |  | 		prefix := obj.GetGenerateName() | 
					
						
							|  |  |  | 		if prefix == "" { | 
					
						
							|  |  |  | 			return nil, apierrors.NewBadRequest("must have name or generate name set") | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		key.Name, err = s.store.GenerateName(ctx, key, prefix) | 
					
						
							|  |  |  | 		if err != nil { | 
					
						
							|  |  |  | 			return nil, err | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		obj.SetName(key.Name) | 
					
						
							|  |  |  | 		obj.SetGenerateName("") | 
					
						
							|  |  |  | 	} else if obj.GetGenerateName() != "" { | 
					
						
							|  |  |  | 		return nil, apierrors.NewBadRequest("values with a name must not include generate name") | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	err = validateName(obj.GetName()) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return nil, err | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-06-14 19:24:36 +08:00
										 |  |  | 	if obj.GetName() != key.Name { | 
					
						
							|  |  |  | 		return nil, apierrors.NewBadRequest("key name does not match the name in the body") | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	folder := obj.GetFolder() | 
					
						
							|  |  |  | 	if folder != "" { | 
					
						
							|  |  |  | 		err = s.access.CanWriteFolder(ctx, event.Requester, folder) | 
					
						
							|  |  |  | 		if err != nil { | 
					
						
							|  |  |  | 			return nil, err | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	origin, err := obj.GetOriginInfo() | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return nil, apierrors.NewBadRequest("invalid origin info") | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	if origin != nil { | 
					
						
							|  |  |  | 		err = s.access.CanWriteOrigin(ctx, event.Requester, origin.Name) | 
					
						
							|  |  |  | 		if err != nil { | 
					
						
							|  |  |  | 			return nil, err | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-06-18 21:07:55 +08:00
										 |  |  | 	obj.SetOriginInfo(origin) | 
					
						
							| 
									
										
										
										
											2024-06-14 19:24:36 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-06-18 21:07:55 +08:00
										 |  |  | 	// Make sure old values do not mutate things they should not
 | 
					
						
							|  |  |  | 	if event.OldMeta != nil { | 
					
						
							|  |  |  | 		old := event.OldMeta | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		if obj.GetUID() != event.OldMeta.GetUID() { | 
					
						
							| 
									
										
										
										
											2024-06-14 19:24:36 +08:00
										 |  |  | 			return nil, apierrors.NewBadRequest( | 
					
						
							| 
									
										
										
										
											2024-06-18 21:07:55 +08:00
										 |  |  | 				fmt.Sprintf("UIDs do not match (old: %s, new: %s)", old.GetUID(), obj.GetUID())) | 
					
						
							| 
									
										
										
										
											2024-06-14 19:24:36 +08:00
										 |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		// Can not change creation timestamps+user
 | 
					
						
							| 
									
										
										
										
											2024-06-18 21:07:55 +08:00
										 |  |  | 		if obj.GetCreatedBy() != event.OldMeta.GetCreatedBy() { | 
					
						
							| 
									
										
										
										
											2024-06-14 19:24:36 +08:00
										 |  |  | 			return nil, apierrors.NewBadRequest( | 
					
						
							| 
									
										
										
										
											2024-06-18 21:07:55 +08:00
										 |  |  | 				fmt.Sprintf("created by changed (old: %s, new: %s)", old.GetCreatedBy(), obj.GetCreatedBy())) | 
					
						
							| 
									
										
										
										
											2024-06-14 19:24:36 +08:00
										 |  |  | 		} | 
					
						
							| 
									
										
										
										
											2024-06-18 21:07:55 +08:00
										 |  |  | 		if obj.GetCreationTimestamp() != event.OldMeta.GetCreationTimestamp() { | 
					
						
							| 
									
										
										
										
											2024-06-14 19:24:36 +08:00
										 |  |  | 			return nil, apierrors.NewBadRequest( | 
					
						
							| 
									
										
										
										
											2024-06-18 21:07:55 +08:00
										 |  |  | 				fmt.Sprintf("creation timestamp changed (old:%v, new:%v)", old.GetCreationTimestamp(), obj.GetCreationTimestamp())) | 
					
						
							| 
									
										
										
										
											2024-06-14 19:24:36 +08:00
										 |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return event, nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (s *server) Create(ctx context.Context, req *CreateRequest) (*CreateResponse, error) { | 
					
						
							|  |  |  | 	ctx, span := s.tracer.Start(ctx, "storage_server.Create") | 
					
						
							|  |  |  | 	defer span.End() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	if err := s.Init(); err != nil { | 
					
						
							|  |  |  | 		return nil, err | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	rsp := &CreateResponse{} | 
					
						
							| 
									
										
										
										
											2024-06-18 21:07:55 +08:00
										 |  |  | 	builder, err := s.newEventBuilder(ctx, req.Key, req.Value, nil) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		rsp.Status, err = errToStatus(err) | 
					
						
							|  |  |  | 		return rsp, err | 
					
						
							| 
									
										
										
										
											2024-06-14 19:24:36 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-06-18 21:07:55 +08:00
										 |  |  | 	obj := builder.Meta | 
					
						
							|  |  |  | 	obj.SetCreatedBy(builder.Requester.GetUID().String()) | 
					
						
							|  |  |  | 	obj.SetUpdatedBy("") | 
					
						
							|  |  |  | 	obj.SetUpdatedTimestamp(nil) | 
					
						
							|  |  |  | 	obj.SetCreationTimestamp(metav1.NewTime(time.UnixMilli(s.now()))) | 
					
						
							|  |  |  | 	obj.SetUID(types.UID(uuid.New().String())) | 
					
						
							| 
									
										
										
										
											2024-06-14 19:24:36 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-06-18 21:07:55 +08:00
										 |  |  | 	event, err := builder.toEvent() | 
					
						
							| 
									
										
										
										
											2024-06-14 19:24:36 +08:00
										 |  |  | 	if err != nil { | 
					
						
							| 
									
										
										
										
											2024-06-18 21:07:55 +08:00
										 |  |  | 		rsp.Status, err = errToStatus(err) | 
					
						
							|  |  |  | 		return rsp, err | 
					
						
							| 
									
										
										
										
											2024-06-14 19:24:36 +08:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-06-18 21:07:55 +08:00
										 |  |  | 	event.Message = req.Message | 
					
						
							| 
									
										
										
										
											2024-06-14 19:24:36 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	rsp.ResourceVersion, err = s.store.WriteEvent(ctx, event) | 
					
						
							| 
									
										
										
										
											2024-06-18 21:07:55 +08:00
										 |  |  | 	if err == nil { | 
					
						
							|  |  |  | 		rsp.Value = event.Value // with mutated fields
 | 
					
						
							|  |  |  | 	} else { | 
					
						
							|  |  |  | 		rsp.Status, err = errToStatus(err) | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-06-14 19:24:36 +08:00
										 |  |  | 	return rsp, err | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Convert golang errors to status result errors that can be returned to a client
 | 
					
						
							|  |  |  | func errToStatus(err error) (*StatusResult, error) { | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							| 
									
										
										
										
											2024-06-14 20:21:17 +08:00
										 |  |  | 		apistatus, ok := err.(apierrors.APIStatus) | 
					
						
							|  |  |  | 		if ok { | 
					
						
							|  |  |  | 			s := apistatus.Status() | 
					
						
							|  |  |  | 			return &StatusResult{ | 
					
						
							|  |  |  | 				Status:  s.Status, | 
					
						
							|  |  |  | 				Message: s.Message, | 
					
						
							|  |  |  | 				Reason:  string(s.Reason), | 
					
						
							|  |  |  | 				Code:    s.Code, | 
					
						
							|  |  |  | 			}, nil | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-06-14 19:24:36 +08:00
										 |  |  | 		// TODO... better conversion!!!
 | 
					
						
							|  |  |  | 		return &StatusResult{ | 
					
						
							|  |  |  | 			Status:  "Failure", | 
					
						
							|  |  |  | 			Message: err.Error(), | 
					
						
							| 
									
										
										
										
											2024-06-14 20:21:17 +08:00
										 |  |  | 			Code:    500, | 
					
						
							| 
									
										
										
										
											2024-06-14 19:24:36 +08:00
										 |  |  | 		}, nil | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return nil, err | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (s *server) Update(ctx context.Context, req *UpdateRequest) (*UpdateResponse, error) { | 
					
						
							|  |  |  | 	ctx, span := s.tracer.Start(ctx, "storage_server.Update") | 
					
						
							|  |  |  | 	defer span.End() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	if err := s.Init(); err != nil { | 
					
						
							|  |  |  | 		return nil, err | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	rsp := &UpdateResponse{} | 
					
						
							| 
									
										
										
										
											2024-06-14 21:52:17 +08:00
										 |  |  | 	if req.ResourceVersion < 0 { | 
					
						
							| 
									
										
										
										
											2024-06-14 19:24:36 +08:00
										 |  |  | 		rsp.Status, _ = errToStatus(apierrors.NewBadRequest("update must include the previous version")) | 
					
						
							|  |  |  | 		return rsp, nil | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	latest, err := s.store.Read(ctx, &ReadRequest{ | 
					
						
							| 
									
										
										
										
											2024-06-15 02:30:41 +08:00
										 |  |  | 		Key: req.Key, | 
					
						
							| 
									
										
										
										
											2024-06-14 19:24:36 +08:00
										 |  |  | 	}) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return nil, err | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	if latest.Value == nil { | 
					
						
							|  |  |  | 		return nil, apierrors.NewBadRequest("current value does not exist") | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-06-18 21:07:55 +08:00
										 |  |  | 	builder, err := s.newEventBuilder(ctx, req.Key, req.Value, latest.Value) | 
					
						
							| 
									
										
										
										
											2024-06-14 19:24:36 +08:00
										 |  |  | 	if err != nil { | 
					
						
							| 
									
										
										
										
											2024-06-18 21:07:55 +08:00
										 |  |  | 		rsp.Status, err = errToStatus(err) | 
					
						
							|  |  |  | 		return rsp, err | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	obj := builder.Meta | 
					
						
							|  |  |  | 	obj.SetUpdatedBy(builder.Requester.GetUID().String()) | 
					
						
							|  |  |  | 	obj.SetUpdatedTimestampMillis(time.Now().UnixMilli()) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	event, err := builder.toEvent() | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		rsp.Status, err = errToStatus(err) | 
					
						
							|  |  |  | 		return rsp, err | 
					
						
							| 
									
										
										
										
											2024-06-14 19:24:36 +08:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-06-18 21:07:55 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-06-14 19:24:36 +08:00
										 |  |  | 	event.Operation = ResourceOperation_UPDATED | 
					
						
							|  |  |  | 	event.PreviousRV = latest.ResourceVersion | 
					
						
							|  |  |  | 	event.Message = req.Message | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	rsp.ResourceVersion, err = s.store.WriteEvent(ctx, event) | 
					
						
							|  |  |  | 	rsp.Status, err = errToStatus(err) | 
					
						
							| 
									
										
										
										
											2024-06-18 21:07:55 +08:00
										 |  |  | 	if err == nil { | 
					
						
							|  |  |  | 		rsp.Value = event.Value // with mutated fields
 | 
					
						
							|  |  |  | 	} else { | 
					
						
							|  |  |  | 		rsp.Status, err = errToStatus(err) | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-06-14 19:24:36 +08:00
										 |  |  | 	return rsp, err | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (s *server) Delete(ctx context.Context, req *DeleteRequest) (*DeleteResponse, error) { | 
					
						
							|  |  |  | 	ctx, span := s.tracer.Start(ctx, "storage_server.Delete") | 
					
						
							|  |  |  | 	defer span.End() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	if err := s.Init(); err != nil { | 
					
						
							|  |  |  | 		return nil, err | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	rsp := &DeleteResponse{} | 
					
						
							| 
									
										
										
										
											2024-06-14 21:52:17 +08:00
										 |  |  | 	if req.ResourceVersion < 0 { | 
					
						
							| 
									
										
										
										
											2024-06-14 19:24:36 +08:00
										 |  |  | 		return nil, apierrors.NewBadRequest("update must include the previous version") | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	latest, err := s.store.Read(ctx, &ReadRequest{ | 
					
						
							| 
									
										
										
										
											2024-06-15 02:30:41 +08:00
										 |  |  | 		Key: req.Key, | 
					
						
							| 
									
										
										
										
											2024-06-14 19:24:36 +08:00
										 |  |  | 	}) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return nil, err | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-06-14 21:52:17 +08:00
										 |  |  | 	if latest.ResourceVersion != req.ResourceVersion { | 
					
						
							| 
									
										
										
										
											2024-06-14 19:24:36 +08:00
										 |  |  | 		return nil, ErrOptimisticLockingFailed | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-06-18 21:07:55 +08:00
										 |  |  | 	now := metav1.NewTime(time.UnixMilli(s.now())) | 
					
						
							|  |  |  | 	event := WriteEvent{ | 
					
						
							| 
									
										
										
										
											2024-06-14 19:24:36 +08:00
										 |  |  | 		EventID:    s.nextEventID(), | 
					
						
							|  |  |  | 		Key:        req.Key, | 
					
						
							|  |  |  | 		Operation:  ResourceOperation_DELETED, | 
					
						
							|  |  |  | 		PreviousRV: latest.ResourceVersion, | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-06-18 21:07:55 +08:00
										 |  |  | 	requester, err := identity.GetRequester(ctx) | 
					
						
							| 
									
										
										
										
											2024-06-14 19:24:36 +08:00
										 |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return nil, apierrors.NewBadRequest("unable to get user") | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	marker := &DeletedMarker{} | 
					
						
							|  |  |  | 	err = json.Unmarshal(latest.Value, marker) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return nil, apierrors.NewBadRequest( | 
					
						
							|  |  |  | 			fmt.Sprintf("unable to read previous object, %v", err)) | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-06-18 21:07:55 +08:00
										 |  |  | 	obj, err := utils.MetaAccessor(marker) | 
					
						
							| 
									
										
										
										
											2024-06-14 19:24:36 +08:00
										 |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return nil, err | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-06-18 21:07:55 +08:00
										 |  |  | 	obj.SetDeletionTimestamp(&now) | 
					
						
							|  |  |  | 	obj.SetUpdatedTimestamp(&now.Time) | 
					
						
							|  |  |  | 	obj.SetManagedFields(nil) | 
					
						
							|  |  |  | 	obj.SetFinalizers(nil) | 
					
						
							|  |  |  | 	obj.SetUpdatedBy(requester.GetUID().String()) | 
					
						
							| 
									
										
										
										
											2024-06-14 19:24:36 +08:00
										 |  |  | 	marker.TypeMeta = metav1.TypeMeta{ | 
					
						
							|  |  |  | 		Kind:       "DeletedMarker", | 
					
						
							|  |  |  | 		APIVersion: "storage.grafana.app/v0alpha1", // ?? or can we stick this in common?
 | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	marker.Annotations["RestoreResourceVersion"] = fmt.Sprintf("%d", event.PreviousRV) | 
					
						
							|  |  |  | 	event.Value, err = json.Marshal(marker) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return nil, apierrors.NewBadRequest( | 
					
						
							|  |  |  | 			fmt.Sprintf("unable creating deletion marker, %v", err)) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	rsp.ResourceVersion, err = s.store.WriteEvent(ctx, event) | 
					
						
							|  |  |  | 	rsp.Status, err = errToStatus(err) | 
					
						
							|  |  |  | 	return rsp, err | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (s *server) Read(ctx context.Context, req *ReadRequest) (*ReadResponse, error) { | 
					
						
							|  |  |  | 	if err := s.Init(); err != nil { | 
					
						
							|  |  |  | 		return nil, err | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-06-15 03:53:10 +08:00
										 |  |  | 	if req.Key.Group == "" { | 
					
						
							|  |  |  | 		status, _ := errToStatus(apierrors.NewBadRequest("missing group")) | 
					
						
							|  |  |  | 		return &ReadResponse{Status: status}, nil | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	if req.Key.Resource == "" { | 
					
						
							|  |  |  | 		status, _ := errToStatus(apierrors.NewBadRequest("missing resource")) | 
					
						
							|  |  |  | 		return &ReadResponse{Status: status}, nil | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-06-14 19:24:36 +08:00
										 |  |  | 	rsp, err := s.store.Read(ctx, req) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		if rsp == nil { | 
					
						
							|  |  |  | 			rsp = &ReadResponse{} | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		rsp.Status, err = errToStatus(err) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return rsp, err | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (s *server) List(ctx context.Context, req *ListRequest) (*ListResponse, error) { | 
					
						
							|  |  |  | 	if err := s.Init(); err != nil { | 
					
						
							|  |  |  | 		return nil, err | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	rsp, err := s.store.List(ctx, req) | 
					
						
							|  |  |  | 	// Status???
 | 
					
						
							|  |  |  | 	return rsp, err | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (s *server) Watch(req *WatchRequest, srv ResourceStore_WatchServer) error { | 
					
						
							|  |  |  | 	if err := s.Init(); err != nil { | 
					
						
							|  |  |  | 		return err | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-06-14 21:52:17 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	// TODO??? can we move any of the common processing here?
 | 
					
						
							|  |  |  | 	stream, err := s.store.Watch(srv.Context(), req) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return err | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	for event := range stream { | 
					
						
							|  |  |  | 		srv.Send(event) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return nil | 
					
						
							| 
									
										
										
										
											2024-06-14 19:24:36 +08:00
										 |  |  | } | 
					
						
							| 
									
										
										
										
											2024-06-14 20:21:17 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | // GetBlob implements ResourceServer.
 | 
					
						
							|  |  |  | func (s *server) GetBlob(ctx context.Context, req *GetBlobRequest) (*GetBlobResponse, error) { | 
					
						
							|  |  |  | 	if err := s.Init(); err != nil { | 
					
						
							|  |  |  | 		return nil, err | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	rsp, err := s.search.GetBlob(ctx, req) | 
					
						
							|  |  |  | 	rsp.Status, err = errToStatus(err) | 
					
						
							|  |  |  | 	return rsp, err | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // History implements ResourceServer.
 | 
					
						
							|  |  |  | func (s *server) History(ctx context.Context, req *HistoryRequest) (*HistoryResponse, error) { | 
					
						
							|  |  |  | 	if err := s.Init(); err != nil { | 
					
						
							|  |  |  | 		return nil, err | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return s.search.History(ctx, req) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // IsHealthy implements ResourceServer.
 | 
					
						
							|  |  |  | func (s *server) IsHealthy(ctx context.Context, req *HealthCheckRequest) (*HealthCheckResponse, error) { | 
					
						
							|  |  |  | 	if err := s.Init(); err != nil { | 
					
						
							|  |  |  | 		return nil, err | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return s.diagnostics.IsHealthy(ctx, req) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Origin implements ResourceServer.
 | 
					
						
							|  |  |  | func (s *server) Origin(ctx context.Context, req *OriginRequest) (*OriginResponse, error) { | 
					
						
							|  |  |  | 	if err := s.Init(); err != nil { | 
					
						
							|  |  |  | 		return nil, err | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return s.search.Origin(ctx, req) | 
					
						
							|  |  |  | } |