mirror of https://github.com/grafana/grafana.git
move test files
This commit is contained in:
parent
617dd1b40e
commit
df9baddf53
|
@ -1,166 +0,0 @@
|
||||||
package test
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
"strings"
|
|
||||||
|
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
|
||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
|
||||||
"k8s.io/apimachinery/pkg/watch"
|
|
||||||
"k8s.io/apiserver/pkg/endpoints/request"
|
|
||||||
"k8s.io/apiserver/pkg/storage"
|
|
||||||
|
|
||||||
"github.com/grafana/grafana/pkg/infra/appcontext"
|
|
||||||
"github.com/grafana/grafana/pkg/services/user"
|
|
||||||
)
|
|
||||||
|
|
||||||
var _ storage.Interface = &RequestInfoWrapper{}
|
|
||||||
|
|
||||||
type RequestInfoWrapper struct {
|
|
||||||
store storage.Interface
|
|
||||||
gr schema.GroupResource
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *RequestInfoWrapper) setRequestInfo(ctx context.Context, key string) (context.Context, error) {
|
|
||||||
pkey, err := convertToParsedKey(key)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx = appcontext.WithUser(ctx, &user.SignedInUser{
|
|
||||||
Login: "admin",
|
|
||||||
UserID: 1,
|
|
||||||
OrgID: 1,
|
|
||||||
})
|
|
||||||
|
|
||||||
return request.WithRequestInfo(ctx, &request.RequestInfo{
|
|
||||||
APIGroup: pkey.Group,
|
|
||||||
APIVersion: "v1",
|
|
||||||
Resource: pkey.Resource,
|
|
||||||
Subresource: "",
|
|
||||||
Namespace: pkey.Namespace,
|
|
||||||
Name: pkey.Name,
|
|
||||||
Parts: strings.Split(key, "/"),
|
|
||||||
IsResourceRequest: true,
|
|
||||||
}), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *RequestInfoWrapper) Create(ctx context.Context, key string, obj runtime.Object, out runtime.Object, ttl uint64) error {
|
|
||||||
ctx, err := r.setRequestInfo(ctx, key)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return r.store.Create(ctx, key, obj, out, ttl)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *RequestInfoWrapper) Delete(ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions, validateDeletion storage.ValidateObjectFunc, cachedExistingObject runtime.Object) error {
|
|
||||||
ctx, err := r.setRequestInfo(ctx, key)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return r.store.Delete(ctx, key, out, preconditions, validateDeletion, cachedExistingObject)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *RequestInfoWrapper) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
|
|
||||||
ctx, err := r.setRequestInfo(ctx, key)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return r.store.Watch(ctx, key, opts)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *RequestInfoWrapper) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error {
|
|
||||||
ctx, err := r.setRequestInfo(ctx, key)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return r.store.Get(ctx, key, opts, objPtr)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *RequestInfoWrapper) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
|
|
||||||
ctx, err := r.setRequestInfo(ctx, key)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return r.store.GetList(ctx, key, opts, listObj)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *RequestInfoWrapper) GuaranteedUpdate(ctx context.Context, key string, destination runtime.Object, ignoreNotFound bool, preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, cachedExistingObject runtime.Object) error {
|
|
||||||
ctx, err := r.setRequestInfo(ctx, key)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return r.store.GuaranteedUpdate(ctx, key, destination, ignoreNotFound, preconditions, tryUpdate, cachedExistingObject)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *RequestInfoWrapper) Count(key string) (int64, error) {
|
|
||||||
return r.store.Count(key)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *RequestInfoWrapper) Versioner() storage.Versioner {
|
|
||||||
return r.store.Versioner()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *RequestInfoWrapper) RequestWatchProgress(ctx context.Context) error {
|
|
||||||
return r.store.RequestWatchProgress(ctx)
|
|
||||||
}
|
|
||||||
|
|
||||||
type Key struct {
|
|
||||||
Group string
|
|
||||||
Resource string
|
|
||||||
Namespace string
|
|
||||||
Name string
|
|
||||||
}
|
|
||||||
|
|
||||||
func convertToParsedKey(key string) (*Key, error) {
|
|
||||||
// NOTE: the following supports the watcher tests that run against v1/pods
|
|
||||||
// Other than that, there are ambiguities in the key format that only field selector
|
|
||||||
// when set to use metadata.name can be used to bring clarity in the 3-segment case
|
|
||||||
|
|
||||||
// Cases handled below:
|
|
||||||
// namespace scoped:
|
|
||||||
// /<resource>/[<namespace>]/[<name>]
|
|
||||||
// /<resource>/[<namespace>]
|
|
||||||
//
|
|
||||||
// cluster scoped:
|
|
||||||
// /<resource>/[<name>]
|
|
||||||
// /<resource>
|
|
||||||
k := &Key{}
|
|
||||||
|
|
||||||
if !strings.HasPrefix(key, "/") {
|
|
||||||
key = "/" + key
|
|
||||||
}
|
|
||||||
|
|
||||||
parts := strings.SplitN(key, "/", 5)
|
|
||||||
if len(parts) < 2 {
|
|
||||||
return nil, fmt.Errorf("invalid key format: %s", key)
|
|
||||||
}
|
|
||||||
|
|
||||||
k.Resource = parts[1]
|
|
||||||
if len(parts) < 3 {
|
|
||||||
return k, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// figure out whether the key is namespace scoped or cluster scoped
|
|
||||||
if isTestNs(parts[2]) {
|
|
||||||
k.Namespace = parts[2]
|
|
||||||
if len(parts) >= 4 {
|
|
||||||
k.Name = parts[3]
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
k.Name = parts[2]
|
|
||||||
}
|
|
||||||
|
|
||||||
return k, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func isTestNs(part string) bool {
|
|
||||||
return strings.HasPrefix(part, "test-ns-") || strings.HasPrefix(part, "ns-") || strings.Index(part, "-ns") > 0
|
|
||||||
}
|
|
|
@ -1,366 +0,0 @@
|
||||||
// SPDX-License-Identifier: AGPL-3.0-only
|
|
||||||
// Provenance-includes-location: https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go
|
|
||||||
// Provenance-includes-license: Apache-2.0
|
|
||||||
// Provenance-includes-copyright: The Kubernetes Authors.
|
|
||||||
|
|
||||||
package test
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
"k8s.io/apimachinery/pkg/api/apitesting"
|
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
|
||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
|
||||||
"k8s.io/apimachinery/pkg/runtime/serializer"
|
|
||||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
|
||||||
"k8s.io/apiserver/pkg/apis/example"
|
|
||||||
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
|
|
||||||
"k8s.io/apiserver/pkg/storage"
|
|
||||||
"k8s.io/apiserver/pkg/storage/storagebackend"
|
|
||||||
"k8s.io/apiserver/pkg/storage/storagebackend/factory"
|
|
||||||
storagetesting "k8s.io/apiserver/pkg/storage/testing"
|
|
||||||
|
|
||||||
"github.com/grafana/grafana/pkg/services/apiserver/storage/entity"
|
|
||||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
|
||||||
"github.com/grafana/grafana/pkg/services/sqlstore"
|
|
||||||
entityStore "github.com/grafana/grafana/pkg/services/store/entity"
|
|
||||||
"github.com/grafana/grafana/pkg/services/store/entity/db/dbimpl"
|
|
||||||
"github.com/grafana/grafana/pkg/services/store/entity/sqlstash"
|
|
||||||
"github.com/grafana/grafana/pkg/setting"
|
|
||||||
"github.com/grafana/grafana/pkg/tests/testinfra"
|
|
||||||
"github.com/grafana/grafana/pkg/tests/testsuite"
|
|
||||||
)
|
|
||||||
|
|
||||||
var scheme = runtime.NewScheme()
|
|
||||||
var codecs = serializer.NewCodecFactory(scheme)
|
|
||||||
|
|
||||||
func TestMain(m *testing.M) {
|
|
||||||
testsuite.Run(m)
|
|
||||||
}
|
|
||||||
|
|
||||||
func createTestContext(t *testing.T) (entityStore.EntityStoreClient, factory.DestroyFunc) {
|
|
||||||
t.Helper()
|
|
||||||
|
|
||||||
grafDir, cfgPath := testinfra.CreateGrafDir(t, testinfra.GrafanaOpts{
|
|
||||||
EnableFeatureToggles: []string{
|
|
||||||
featuremgmt.FlagGrpcServer,
|
|
||||||
featuremgmt.FlagUnifiedStorage,
|
|
||||||
},
|
|
||||||
AppModeProduction: false, // required for migrations to run
|
|
||||||
GRPCServerAddress: "127.0.0.1:0", // :0 for choosing the port automatically
|
|
||||||
})
|
|
||||||
|
|
||||||
cfg, err := setting.NewCfgFromArgs(setting.CommandLineArgs{Config: cfgPath, HomePath: grafDir})
|
|
||||||
assert.NoError(t, err)
|
|
||||||
|
|
||||||
featureManager, err := featuremgmt.ProvideManagerService(cfg)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
|
|
||||||
featureToggles := featuremgmt.ProvideToggles(featureManager)
|
|
||||||
|
|
||||||
db := sqlstore.InitTestDBWithMigration(t, nil, sqlstore.InitTestDBOpt{EnsureDefaultOrgAndUser: false})
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
eDB, err := dbimpl.ProvideEntityDB(db, cfg, featureToggles, nil)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
err = eDB.Init()
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
traceConfig, err := tracing.ParseTracingConfig(cfg)
|
|
||||||
require.NoError(t, err)
|
|
||||||
tracer, err := tracing.ProvideService(traceConfig)
|
|
||||||
require.NoError(t, err)
|
|
||||||
store, err := sqlstash.ProvideSQLEntityServer(eDB, tracer)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
client := entityStore.NewEntityStoreClientLocal(store)
|
|
||||||
|
|
||||||
return client, func() { store.Stop() }
|
|
||||||
}
|
|
||||||
|
|
||||||
func init() {
|
|
||||||
metav1.AddToGroupVersion(scheme, metav1.SchemeGroupVersion)
|
|
||||||
utilruntime.Must(example.AddToScheme(scheme))
|
|
||||||
utilruntime.Must(examplev1.AddToScheme(scheme))
|
|
||||||
}
|
|
||||||
|
|
||||||
type setupOptions struct {
|
|
||||||
codec runtime.Codec
|
|
||||||
newFunc func() runtime.Object
|
|
||||||
newListFunc func() runtime.Object
|
|
||||||
prefix string
|
|
||||||
resourcePrefix string
|
|
||||||
groupResource schema.GroupResource
|
|
||||||
}
|
|
||||||
|
|
||||||
type setupOption func(*setupOptions, *testing.T)
|
|
||||||
|
|
||||||
func withDefaults(options *setupOptions, t *testing.T) {
|
|
||||||
options.codec = apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
|
||||||
options.newFunc = newPod
|
|
||||||
options.newListFunc = newPodList
|
|
||||||
options.prefix = t.TempDir()
|
|
||||||
options.resourcePrefix = "/pods"
|
|
||||||
options.groupResource = schema.GroupResource{Resource: "pods"}
|
|
||||||
}
|
|
||||||
|
|
||||||
var _ setupOption = withDefaults
|
|
||||||
|
|
||||||
func testSetup(t *testing.T, opts ...setupOption) (context.Context, storage.Interface, factory.DestroyFunc, error) {
|
|
||||||
setupOpts := setupOptions{}
|
|
||||||
opts = append([]setupOption{withDefaults}, opts...)
|
|
||||||
for _, opt := range opts {
|
|
||||||
opt(&setupOpts, t)
|
|
||||||
}
|
|
||||||
|
|
||||||
config := storagebackend.NewDefaultConfig(setupOpts.prefix, setupOpts.codec)
|
|
||||||
|
|
||||||
client, destroyFunc := createTestContext(t)
|
|
||||||
|
|
||||||
store, _, err := entity.NewStorage(
|
|
||||||
config.ForResource(setupOpts.groupResource),
|
|
||||||
setupOpts.groupResource,
|
|
||||||
client,
|
|
||||||
setupOpts.codec,
|
|
||||||
func(obj runtime.Object) (string, error) {
|
|
||||||
return storage.NamespaceKeyFunc(setupOpts.resourcePrefix, obj)
|
|
||||||
},
|
|
||||||
setupOpts.newFunc,
|
|
||||||
setupOpts.newListFunc,
|
|
||||||
storage.DefaultNamespaceScopedAttr,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx := context.Background()
|
|
||||||
|
|
||||||
wrappedStore := &RequestInfoWrapper{
|
|
||||||
store: store,
|
|
||||||
gr: setupOpts.groupResource,
|
|
||||||
}
|
|
||||||
|
|
||||||
return ctx, wrappedStore, destroyFunc, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestIntegrationWatch(t *testing.T) {
|
|
||||||
if testing.Short() {
|
|
||||||
t.Skip("skipping integration test")
|
|
||||||
}
|
|
||||||
t.Skip("In maintenance")
|
|
||||||
|
|
||||||
ctx, store, destroyFunc, err := testSetup(t)
|
|
||||||
defer destroyFunc()
|
|
||||||
assert.NoError(t, err)
|
|
||||||
storagetesting.RunTestWatch(ctx, t, store)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestIntegrationClusterScopedWatch(t *testing.T) {
|
|
||||||
if testing.Short() {
|
|
||||||
t.Skip("skipping integration test")
|
|
||||||
}
|
|
||||||
t.Skip("In maintenance")
|
|
||||||
|
|
||||||
ctx, store, destroyFunc, err := testSetup(t)
|
|
||||||
defer destroyFunc()
|
|
||||||
assert.NoError(t, err)
|
|
||||||
storagetesting.RunTestClusterScopedWatch(ctx, t, store)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestIntegrationNamespaceScopedWatch(t *testing.T) {
|
|
||||||
if testing.Short() {
|
|
||||||
t.Skip("skipping integration test")
|
|
||||||
}
|
|
||||||
t.Skip("In maintenance")
|
|
||||||
|
|
||||||
ctx, store, destroyFunc, err := testSetup(t)
|
|
||||||
defer destroyFunc()
|
|
||||||
assert.NoError(t, err)
|
|
||||||
storagetesting.RunTestNamespaceScopedWatch(ctx, t, store)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestIntegrationDeleteTriggerWatch(t *testing.T) {
|
|
||||||
if testing.Short() {
|
|
||||||
t.Skip("skipping integration test")
|
|
||||||
}
|
|
||||||
t.Skip("In maintenance")
|
|
||||||
|
|
||||||
ctx, store, destroyFunc, err := testSetup(t)
|
|
||||||
defer destroyFunc()
|
|
||||||
assert.NoError(t, err)
|
|
||||||
storagetesting.RunTestDeleteTriggerWatch(ctx, t, store)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestIntegrationWatchFromZero(t *testing.T) {
|
|
||||||
if testing.Short() {
|
|
||||||
t.Skip("skipping integration test")
|
|
||||||
}
|
|
||||||
t.Skip("In maintenance")
|
|
||||||
|
|
||||||
ctx, store, destroyFunc, err := testSetup(t)
|
|
||||||
defer destroyFunc()
|
|
||||||
assert.NoError(t, err)
|
|
||||||
storagetesting.RunTestWatchFromZero(ctx, t, store, nil)
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestWatchFromNonZero tests that
|
|
||||||
// - watch from non-0 should just watch changes after given version
|
|
||||||
func TestIntegrationWatchFromNonZero(t *testing.T) {
|
|
||||||
if testing.Short() {
|
|
||||||
t.Skip("skipping integration test")
|
|
||||||
}
|
|
||||||
t.Skip("In maintenance")
|
|
||||||
|
|
||||||
ctx, store, destroyFunc, err := testSetup(t)
|
|
||||||
defer destroyFunc()
|
|
||||||
assert.NoError(t, err)
|
|
||||||
storagetesting.RunTestWatchFromNonZero(ctx, t, store)
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
// TODO this times out, we need to buffer events
|
|
||||||
func TestIntegrationDelayedWatchDelivery(t *testing.T) {
|
|
||||||
if testing.Short() {
|
|
||||||
t.Skip("skipping integration test")
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx, store, destroyFunc, err := testSetup(t)
|
|
||||||
defer destroyFunc()
|
|
||||||
assert.NoError(t, err)
|
|
||||||
storagetesting.RunTestDelayedWatchDelivery(ctx, t, store)
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
|
|
||||||
/* func TestIntegrationWatchError(t *testing.T) {
|
|
||||||
if testing.Short() {
|
|
||||||
t.Skip("skipping integration test")
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx, store, _ := testSetup(t)
|
|
||||||
storagetesting.RunTestWatchError(ctx, t, &storeWithPrefixTransformer{store})
|
|
||||||
} */
|
|
||||||
|
|
||||||
func TestIntegrationWatchContextCancel(t *testing.T) {
|
|
||||||
if testing.Short() {
|
|
||||||
t.Skip("skipping integration test")
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx, store, destroyFunc, err := testSetup(t)
|
|
||||||
defer destroyFunc()
|
|
||||||
assert.NoError(t, err)
|
|
||||||
storagetesting.RunTestWatchContextCancel(ctx, t, store)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestIntegrationWatcherTimeout(t *testing.T) {
|
|
||||||
if testing.Short() {
|
|
||||||
t.Skip("skipping integration test")
|
|
||||||
}
|
|
||||||
t.Skip("In maintenance")
|
|
||||||
|
|
||||||
ctx, store, destroyFunc, err := testSetup(t)
|
|
||||||
defer destroyFunc()
|
|
||||||
assert.NoError(t, err)
|
|
||||||
storagetesting.RunTestWatcherTimeout(ctx, t, store)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestIntegrationWatchDeleteEventObjectHaveLatestRV(t *testing.T) {
|
|
||||||
if testing.Short() {
|
|
||||||
t.Skip("skipping integration test")
|
|
||||||
}
|
|
||||||
t.Skip("In maintenance")
|
|
||||||
|
|
||||||
ctx, store, destroyFunc, err := testSetup(t)
|
|
||||||
defer destroyFunc()
|
|
||||||
assert.NoError(t, err)
|
|
||||||
storagetesting.RunTestWatchDeleteEventObjectHaveLatestRV(ctx, t, store)
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: enable when we support flow control and priority fairness
|
|
||||||
/* func TestIntegrationWatchInitializationSignal(t *testing.T) {
|
|
||||||
if testing.Short() {
|
|
||||||
t.Skip("skipping integration test")
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx, store, destroyFunc, err := testSetup(t)
|
|
||||||
defer destroyFunc()
|
|
||||||
assert.NoError(t, err)
|
|
||||||
storagetesting.RunTestWatchInitializationSignal(ctx, t, store)
|
|
||||||
} */
|
|
||||||
|
|
||||||
/* func TestIntegrationProgressNotify(t *testing.T) {
|
|
||||||
if testing.Short() {
|
|
||||||
t.Skip("skipping integration test")
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx, store, destroyFunc, err := testSetup(t)
|
|
||||||
defer destroyFunc()
|
|
||||||
assert.NoError(t, err)
|
|
||||||
storagetesting.RunOptionalTestProgressNotify(ctx, t, store)
|
|
||||||
} */
|
|
||||||
|
|
||||||
// TestWatchDispatchBookmarkEvents makes sure that
|
|
||||||
// setting allowWatchBookmarks query param against
|
|
||||||
// etcd implementation doesn't have any effect.
|
|
||||||
func TestIntegrationWatchDispatchBookmarkEvents(t *testing.T) {
|
|
||||||
if testing.Short() {
|
|
||||||
t.Skip("skipping integration test")
|
|
||||||
}
|
|
||||||
t.Skip("In maintenance")
|
|
||||||
|
|
||||||
ctx, store, destroyFunc, err := testSetup(t)
|
|
||||||
defer destroyFunc()
|
|
||||||
assert.NoError(t, err)
|
|
||||||
storagetesting.RunTestWatchDispatchBookmarkEvents(ctx, t, store, false)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestIntegrationSendInitialEventsBackwardCompatibility(t *testing.T) {
|
|
||||||
if testing.Short() {
|
|
||||||
t.Skip("skipping integration test")
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx, store, destroyFunc, err := testSetup(t)
|
|
||||||
defer destroyFunc()
|
|
||||||
assert.NoError(t, err)
|
|
||||||
storagetesting.RunSendInitialEventsBackwardCompatibility(ctx, t, store)
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO this test times out
|
|
||||||
func TestIntegrationEtcdWatchSemantics(t *testing.T) {
|
|
||||||
if testing.Short() {
|
|
||||||
t.Skip("skipping integration test")
|
|
||||||
}
|
|
||||||
t.Skip("In maintenance")
|
|
||||||
|
|
||||||
ctx, store, destroyFunc, err := testSetup(t)
|
|
||||||
defer destroyFunc()
|
|
||||||
assert.NoError(t, err)
|
|
||||||
storagetesting.RunWatchSemantics(ctx, t, store)
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
// TODO this test times out
|
|
||||||
func TestIntegrationEtcdWatchSemanticInitialEventsExtended(t *testing.T) {
|
|
||||||
if testing.Short() {
|
|
||||||
t.Skip("skipping integration test")
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx, store, destroyFunc, err := testSetup(t)
|
|
||||||
defer destroyFunc()
|
|
||||||
assert.NoError(t, err)
|
|
||||||
storagetesting.RunWatchSemanticInitialEventsExtended(ctx, t, store)
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
|
|
||||||
func newPod() runtime.Object {
|
|
||||||
return &example.Pod{}
|
|
||||||
}
|
|
||||||
|
|
||||||
func newPodList() runtime.Object {
|
|
||||||
return &example.PodList{}
|
|
||||||
}
|
|
|
@ -4,10 +4,8 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/fs"
|
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/hack-pad/hackpadfs"
|
"github.com/hack-pad/hackpadfs"
|
||||||
|
@ -153,184 +151,6 @@ func (f *fsStore) open(p string) (*fsEvent, error) {
|
||||||
return evt, err
|
return evt, err
|
||||||
}
|
}
|
||||||
|
|
||||||
type eventTree struct {
|
|
||||||
path string
|
|
||||||
group string
|
|
||||||
resource string
|
|
||||||
namespaces []namespaceEvents
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *eventTree) list(fs *fsStore, rv int64) (*ListResponse, error) {
|
|
||||||
rsp := &ListResponse{}
|
|
||||||
for idx, ns := range t.namespaces {
|
|
||||||
if idx == 0 {
|
|
||||||
rsp.ResourceVersion = ns.version()
|
|
||||||
}
|
|
||||||
err := ns.append(fs, rv, rsp)
|
|
||||||
if err != nil {
|
|
||||||
return rsp, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return rsp, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *eventTree) read(root fs.FS, key *ResourceKey) error {
|
|
||||||
t.group = key.Group
|
|
||||||
t.resource = key.Resource
|
|
||||||
t.path = fmt.Sprintf("%s/%s", t.group, t.resource)
|
|
||||||
|
|
||||||
// Cluster scoped, with an explicit name
|
|
||||||
if key.Namespace == "" {
|
|
||||||
if key.Name != "" {
|
|
||||||
ns := namespaceEvents{
|
|
||||||
path: t.path + "/__cluster__",
|
|
||||||
namespace: "",
|
|
||||||
}
|
|
||||||
err := ns.read(root, key)
|
|
||||||
if err == nil {
|
|
||||||
t.namespaces = append(t.namespaces, ns)
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
files, err := hackpadfs.ReadDir(root, t.path)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
for _, file := range files {
|
|
||||||
ns := namespaceEvents{
|
|
||||||
path: t.path + "/" + file.Name(),
|
|
||||||
namespace: file.Name(),
|
|
||||||
}
|
|
||||||
err = ns.read(root, key)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
t.namespaces = append(t.namespaces, ns)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type namespaceEvents struct {
|
|
||||||
path string
|
|
||||||
namespace string
|
|
||||||
names []nameEvents
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *namespaceEvents) version() int64 {
|
|
||||||
if len(t.names) > 0 {
|
|
||||||
return t.names[0].version()
|
|
||||||
}
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *namespaceEvents) append(fs *fsStore, rv int64, rsp *ListResponse) error {
|
|
||||||
for _, name := range t.names {
|
|
||||||
err := name.append(fs, rv, rsp)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *namespaceEvents) read(root fs.FS, key *ResourceKey) error {
|
|
||||||
if key.Name != "" {
|
|
||||||
vv := nameEvents{
|
|
||||||
path: t.path + "/" + key.Name,
|
|
||||||
name: key.Name,
|
|
||||||
}
|
|
||||||
err := vv.read(root)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
t.names = []nameEvents{vv}
|
|
||||||
}
|
|
||||||
|
|
||||||
files, err := hackpadfs.ReadDir(root, t.path)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
for _, file := range files {
|
|
||||||
ns := nameEvents{
|
|
||||||
path: t.path + "/" + file.Name(),
|
|
||||||
name: file.Name(),
|
|
||||||
}
|
|
||||||
err = ns.read(root)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
t.names = append(t.names, ns)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type nameEvents struct {
|
|
||||||
path string
|
|
||||||
name string
|
|
||||||
versions []resourceEvent
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *nameEvents) version() int64 {
|
|
||||||
if len(t.versions) > 0 {
|
|
||||||
return t.versions[0].rv
|
|
||||||
}
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *nameEvents) append(fs *fsStore, rv int64, rsp *ListResponse) error {
|
|
||||||
for _, rev := range t.versions {
|
|
||||||
val, err := fs.open(t.path + "/" + rev.file)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
wrapper := &ResourceWrapper{
|
|
||||||
ResourceVersion: val.ResourceVersion,
|
|
||||||
Value: val.Value,
|
|
||||||
// Operation: val.Operation,
|
|
||||||
}
|
|
||||||
rsp.Items = append(rsp.Items, wrapper)
|
|
||||||
if true {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *nameEvents) read(root fs.FS) error {
|
|
||||||
var err error
|
|
||||||
files, err := hackpadfs.ReadDir(root, t.path)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
for _, file := range files {
|
|
||||||
p := file.Name()
|
|
||||||
if file.IsDir() || !strings.HasSuffix(p, ".json") {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
base := strings.TrimSuffix(p, ".json")
|
|
||||||
base = strings.TrimPrefix(base, "rv")
|
|
||||||
rr := resourceEvent{file: p}
|
|
||||||
rr.rv, err = strconv.ParseInt(base, 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
t.versions = append(t.versions, rr)
|
|
||||||
}
|
|
||||||
sort.Slice(t.versions, func(i int, j int) bool {
|
|
||||||
return t.versions[i].rv > t.versions[j].rv
|
|
||||||
})
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
type resourceEvent struct {
|
|
||||||
file string // path to the actual file
|
|
||||||
rv int64
|
|
||||||
}
|
|
||||||
|
|
||||||
// List implements AppendingStore.
|
// List implements AppendingStore.
|
||||||
func (f *fsStore) List(ctx context.Context, req *ListRequest) (*ListResponse, error) {
|
func (f *fsStore) List(ctx context.Context, req *ListRequest) (*ListResponse, error) {
|
||||||
tree := eventTree{
|
tree := eventTree{
|
||||||
|
|
|
@ -0,0 +1,194 @@
|
||||||
|
package resource
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"io/fs"
|
||||||
|
"sort"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/hack-pad/hackpadfs"
|
||||||
|
)
|
||||||
|
|
||||||
|
// VERY VERY early, hacky file system reader
|
||||||
|
type eventTree struct {
|
||||||
|
path string
|
||||||
|
group string
|
||||||
|
resource string
|
||||||
|
namespaces []namespaceEvents
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *eventTree) list(fs *fsStore, rv int64) (*ListResponse, error) {
|
||||||
|
rsp := &ListResponse{}
|
||||||
|
for idx, ns := range t.namespaces {
|
||||||
|
if idx == 0 {
|
||||||
|
rsp.ResourceVersion = ns.version()
|
||||||
|
}
|
||||||
|
err := ns.append(fs, rv, rsp)
|
||||||
|
if err != nil {
|
||||||
|
return rsp, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return rsp, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *eventTree) read(root fs.FS, key *ResourceKey) error {
|
||||||
|
t.group = key.Group
|
||||||
|
t.resource = key.Resource
|
||||||
|
t.path = fmt.Sprintf("%s/%s", t.group, t.resource)
|
||||||
|
|
||||||
|
// Cluster scoped, with an explicit name
|
||||||
|
if key.Namespace == "" {
|
||||||
|
if key.Name != "" {
|
||||||
|
ns := namespaceEvents{
|
||||||
|
path: t.path + "/__cluster__",
|
||||||
|
namespace: "",
|
||||||
|
}
|
||||||
|
err := ns.read(root, key)
|
||||||
|
if err == nil {
|
||||||
|
t.namespaces = append(t.namespaces, ns)
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
files, err := hackpadfs.ReadDir(root, t.path)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for _, file := range files {
|
||||||
|
ns := namespaceEvents{
|
||||||
|
path: t.path + "/" + file.Name(),
|
||||||
|
namespace: file.Name(),
|
||||||
|
}
|
||||||
|
err = ns.read(root, key)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
t.namespaces = append(t.namespaces, ns)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type namespaceEvents struct {
|
||||||
|
path string
|
||||||
|
namespace string
|
||||||
|
names []nameEvents
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *namespaceEvents) version() int64 {
|
||||||
|
if len(t.names) > 0 {
|
||||||
|
return t.names[0].version()
|
||||||
|
}
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *namespaceEvents) append(fs *fsStore, rv int64, rsp *ListResponse) error {
|
||||||
|
for _, name := range t.names {
|
||||||
|
err := name.append(fs, rv, rsp)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *namespaceEvents) read(root fs.FS, key *ResourceKey) error {
|
||||||
|
if key.Name != "" {
|
||||||
|
vv := nameEvents{
|
||||||
|
path: t.path + "/" + key.Name,
|
||||||
|
name: key.Name,
|
||||||
|
}
|
||||||
|
err := vv.read(root)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
t.names = []nameEvents{vv}
|
||||||
|
}
|
||||||
|
|
||||||
|
files, err := hackpadfs.ReadDir(root, t.path)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for _, file := range files {
|
||||||
|
ns := nameEvents{
|
||||||
|
path: t.path + "/" + file.Name(),
|
||||||
|
name: file.Name(),
|
||||||
|
}
|
||||||
|
err = ns.read(root)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
t.names = append(t.names, ns)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type nameEvents struct {
|
||||||
|
path string
|
||||||
|
name string
|
||||||
|
versions []resourceEvent
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *nameEvents) version() int64 {
|
||||||
|
if len(t.versions) > 0 {
|
||||||
|
return t.versions[0].rv
|
||||||
|
}
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *nameEvents) append(fs *fsStore, rv int64, rsp *ListResponse) error {
|
||||||
|
if rv > 0 {
|
||||||
|
fmt.Printf("TODO... check explicit rv")
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, rev := range t.versions {
|
||||||
|
val, err := fs.open(t.path + "/" + rev.file)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
wrapper := &ResourceWrapper{
|
||||||
|
ResourceVersion: val.ResourceVersion,
|
||||||
|
Value: val.Value,
|
||||||
|
// Operation: val.Operation,
|
||||||
|
}
|
||||||
|
rsp.Items = append(rsp.Items, wrapper)
|
||||||
|
if true {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *nameEvents) read(root fs.FS) error {
|
||||||
|
var err error
|
||||||
|
files, err := hackpadfs.ReadDir(root, t.path)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for _, file := range files {
|
||||||
|
p := file.Name()
|
||||||
|
if file.IsDir() || !strings.HasSuffix(p, ".json") {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
base := strings.TrimSuffix(p, ".json")
|
||||||
|
base = strings.TrimPrefix(base, "rv")
|
||||||
|
rr := resourceEvent{file: p}
|
||||||
|
rr.rv, err = strconv.ParseInt(base, 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
t.versions = append(t.versions, rr)
|
||||||
|
}
|
||||||
|
sort.Slice(t.versions, func(i int, j int) bool {
|
||||||
|
return t.versions[i].rv > t.versions[j].rv
|
||||||
|
})
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
type resourceEvent struct {
|
||||||
|
file string // path to the actual file
|
||||||
|
rv int64
|
||||||
|
}
|
Loading…
Reference in New Issue