CloudMigrations: Fix issues with snapshot resource limits (#105425)

* fix bulk inserts

* commit progress so cursor doesn't sabotage me

* add more tests

* get everything working

* rename variable

* update comment

* regen mocks, fix k8s list method maybe

* fix bug with duplicate entries

* lint

* Snapshots: Use slices.Chunk for batching inserts

* remove extra linebreak

---------

Co-authored-by: Matheus Macabu <macabu.matheus@gmail.com>
This commit is contained in:
Michael Mandrus 2025-05-19 11:37:22 -04:00 committed by GitHub
parent 8ee1f2c1fc
commit 6205e126cc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 282 additions and 156 deletions

View File

@ -4,6 +4,7 @@ import (
"context"
"encoding/base64"
"fmt"
"slices"
"strings"
"time"
@ -29,6 +30,8 @@ const (
secretType = "cloudmigration-snapshot-encryption-key"
GetAllSnapshots = -1
GetSnapshotListSortingLatest = "latest"
maxResourceBatchSize = 1000
)
func (ss *sqlStore) GetMigrationSessionByUID(ctx context.Context, orgID int64, uid string) (*cloudmigration.CloudMigrationSession, error) {
@ -192,7 +195,9 @@ func (ss *sqlStore) CreateSnapshot(ctx context.Context, snapshot cloudmigration.
return snapshot.UID, nil
}
// UpdateSnapshot takes a snapshot object containing a uid and updates a subset of features in the database.
// UpdateSnapshot takes a command containing a snapshot uid and any updates to apply to the snapshot.
// When performing multiple updates at once (e.g. updating the status and local resources), they are executed in separate transactions in order to batch insert large datasets.
// The status is the last thing updated, as its status ultimately determines the behavior of the API.
func (ss *sqlStore) UpdateSnapshot(ctx context.Context, update cloudmigration.UpdateSnapshotCmd) error {
if update.UID == "" {
return fmt.Errorf("missing snapshot uid")
@ -200,37 +205,35 @@ func (ss *sqlStore) UpdateSnapshot(ctx context.Context, update cloudmigration.Up
if update.SessionID == "" {
return fmt.Errorf("missing session uid")
}
err := ss.db.InTransaction(ctx, func(ctx context.Context) error {
// Update status if set
if update.Status != "" {
if err := ss.db.WithDbSession(ctx, func(sess *sqlstore.DBSession) error {
rawSQL := "UPDATE cloud_migration_snapshot SET status=? WHERE session_uid=? AND uid=?"
if _, err := sess.Exec(rawSQL, update.Status, update.SessionID, update.UID); err != nil {
return fmt.Errorf("updating snapshot status for uid %s: %w", update.UID, err)
}
return nil
}); err != nil {
return err
}
}
// If local resources are set, it means we have to create them for the first time
if len(update.LocalResourcesToCreate) > 0 {
if err := ss.CreateSnapshotResources(ctx, update.UID, update.LocalResourcesToCreate); err != nil {
return err
}
// If local resources are set, it means we have to create them for the first time
if len(update.LocalResourcesToCreate) > 0 {
if err := ss.CreateSnapshotResources(ctx, update.UID, update.LocalResourcesToCreate); err != nil {
return err
}
// If cloud resources are set, it means we have to update our resource local state
if len(update.CloudResourcesToUpdate) > 0 {
if err := ss.UpdateSnapshotResources(ctx, update.UID, update.CloudResourcesToUpdate); err != nil {
return err
}
}
// If cloud resources are set, it means we have to update our resource local state
if len(update.CloudResourcesToUpdate) > 0 {
if err := ss.UpdateSnapshotResources(ctx, update.UID, update.CloudResourcesToUpdate); err != nil {
return err
}
}
return nil
})
// Update the snapshot status if set
if update.Status != "" {
if err := ss.db.WithDbSession(ctx, func(sess *sqlstore.DBSession) error {
rawSQL := "UPDATE cloud_migration_snapshot SET status=? WHERE session_uid=? AND uid=?"
if _, err := sess.Exec(rawSQL, update.Status, update.SessionID, update.UID); err != nil {
return fmt.Errorf("updating snapshot status for uid %s: %w", update.UID, err)
}
return nil
}); err != nil {
return err
}
}
return err
return nil
}
func (ss *sqlStore) deleteSnapshot(ctx context.Context, snapshotUid string) error {
@ -327,7 +330,18 @@ func (ss *sqlStore) GetSnapshotList(ctx context.Context, query cloudmigration.Li
}
// CreateSnapshotResources initializes the local state of a resources belonging to a snapshot
// Inserting large enough datasets causes SQL errors, so we batch the inserts
func (ss *sqlStore) CreateSnapshotResources(ctx context.Context, snapshotUid string, resources []cloudmigration.CloudMigrationResource) error {
for chunk := range slices.Chunk(resources, maxResourceBatchSize) {
if err := ss.createSnapshotResources(ctx, snapshotUid, chunk); err != nil {
return err
}
}
return nil
}
func (ss *sqlStore) createSnapshotResources(ctx context.Context, snapshotUid string, resources []cloudmigration.CloudMigrationResource) error {
for i := 0; i < len(resources); i++ {
resources[i].UID = util.GenerateShortUID()
// ensure snapshot_uids are consistent so that we can use in conjunction with refID for lookup later
@ -350,7 +364,18 @@ func (ss *sqlStore) CreateSnapshotResources(ctx context.Context, snapshotUid str
// UpdateSnapshotResources updates a migration resource for a snapshot, using snapshot_uid + resource_uid as a lookup
// It does preprocessing on the results in order to minimize the sql queries executed.
// Updating large enough datasets causes SQL errors, so we batch the updates
func (ss *sqlStore) UpdateSnapshotResources(ctx context.Context, snapshotUid string, resources []cloudmigration.CloudMigrationResource) error {
for chunk := range slices.Chunk(resources, maxResourceBatchSize) {
if err := ss.updateSnapshotResources(ctx, snapshotUid, chunk); err != nil {
return err
}
}
return nil
}
func (ss *sqlStore) updateSnapshotResources(ctx context.Context, snapshotUid string, resources []cloudmigration.CloudMigrationResource) error {
// refIds of resources that migrated successfully in order to update in bulk
okIds := make([]any, 0, len(resources))
@ -401,7 +426,6 @@ func (ss *sqlStore) UpdateSnapshotResources(ctx context.Context, snapshotUid str
}
// Execute the minimum number of required statements!
return ss.db.InTransaction(ctx, func(ctx context.Context) error {
err := ss.db.WithDbSession(ctx, func(sess *sqlstore.DBSession) error {
if okUpdateStatement != nil {

View File

@ -3,9 +3,11 @@ package cloudmigrationimpl
import (
"context"
"encoding/base64"
"fmt"
"strconv"
"testing"
"github.com/google/uuid"
"github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/services/cloudmigration"
fakeSecrets "github.com/grafana/grafana/pkg/services/secrets/fakes"
@ -96,25 +98,6 @@ func Test_GetMigrationSessionByUID(t *testing.T) {
})
}
/** rewrite this test using the new functions
func Test_DeleteMigrationSession(t *testing.T) {
_, s := setUpTest(t)
ctx := context.Background()
t.Run("deletes a session from the db", func(t *testing.T) {
uid := "qwerty"
session, snapshots, err := s.DeleteMigrationSessionByUID(ctx, uid)
require.NoError(t, err)
require.Equal(t, uid, session.UID)
require.NotNil(t, snapshots)
// now we try to find it, should return an error
_, err = s.GetMigrationSessionByUID(ctx, uid)
require.ErrorIs(t, cloudmigration.ErrMigrationNotFound, err)
})
}
*/
func Test_SnapshotManagement(t *testing.T) {
t.Parallel()
@ -182,6 +165,92 @@ func Test_SnapshotManagement(t *testing.T) {
require.ErrorIs(t, err, cloudmigration.ErrSnapshotNotFound)
require.Nil(t, snapshot)
})
t.Run("tests a snapshot with a large number of resources", func(t *testing.T) {
session, err := s.CreateMigrationSession(ctx, cloudmigration.CloudMigrationSession{
OrgID: 1,
AuthToken: encodeToken("token"),
})
require.NoError(t, err)
// create a snapshot
snapshotUid, err := s.CreateSnapshot(ctx, cloudmigration.CloudMigrationSnapshot{
SessionUID: session.UID,
Status: cloudmigration.SnapshotStatusCreating,
})
require.NoError(t, err)
require.NotEmpty(t, snapshotUid)
// Generate 50,001 test resources in order to test both update conditions (reached the batch limit or reached the end)
const numResources = 50001
resources := make([]cloudmigration.CloudMigrationResource, numResources)
for i := 0; i < numResources; i++ {
resources[i] = cloudmigration.CloudMigrationResource{
Name: fmt.Sprintf("Resource %d", i),
Type: cloudmigration.DashboardDataType,
RefID: fmt.Sprintf("refid-%d", i),
Status: cloudmigration.ItemStatusPending,
}
}
// Update the snapshot with the resources to create
err = s.UpdateSnapshot(ctx, cloudmigration.UpdateSnapshotCmd{
UID: snapshotUid,
Status: cloudmigration.SnapshotStatusPendingUpload,
SessionID: session.UID,
LocalResourcesToCreate: resources,
})
require.NoError(t, err)
// Get the Snapshot and ensure it's in the right state
snapshot, err := s.GetSnapshotByUID(ctx, 1, session.UID, snapshotUid, cloudmigration.SnapshotResultQueryParams{
ResultPage: 1,
ResultLimit: numResources,
SortColumn: cloudmigration.SortColumnID,
SortOrder: cloudmigration.SortOrderAsc,
})
require.NoError(t, err)
require.Equal(t, cloudmigration.SnapshotStatusPendingUpload, snapshot.Status)
require.Len(t, snapshot.Resources, numResources)
for i, r := range snapshot.Resources {
assert.Equal(t, cloudmigration.ItemStatusPending, r.Status)
if i%2 == 0 {
snapshot.Resources[i].Status = cloudmigration.ItemStatusOK
} else {
snapshot.Resources[i].Status = cloudmigration.ItemStatusError
}
}
// Update the snapshot with the resources to update
err = s.UpdateSnapshot(ctx, cloudmigration.UpdateSnapshotCmd{
UID: snapshotUid,
Status: cloudmigration.SnapshotStatusFinished,
SessionID: session.UID,
CloudResourcesToUpdate: snapshot.Resources,
})
require.NoError(t, err)
// Get the Snapshot and ensure it's in the right state
snapshot, err = s.GetSnapshotByUID(ctx, 1, session.UID, snapshotUid, cloudmigration.SnapshotResultQueryParams{
ResultPage: 1,
ResultLimit: numResources,
SortColumn: cloudmigration.SortColumnID,
SortOrder: cloudmigration.SortOrderAsc,
})
require.NoError(t, err)
require.Equal(t, cloudmigration.SnapshotStatusFinished, snapshot.Status)
for i, r := range snapshot.Resources {
if i%2 == 0 {
assert.Equal(t, cloudmigration.ItemStatusOK, r.Status)
} else {
assert.Equal(t, cloudmigration.ItemStatusError, r.Status)
}
}
})
}
func Test_SnapshotResources(t *testing.T) {
@ -357,6 +426,116 @@ func Test_SnapshotResources(t *testing.T) {
assert.Equal(t, "2", results[0].UID)
})
})
t.Run("test creating and updating a large number of resources", func(t *testing.T) {
// Generate 50,001 test resources in order to test both update conditions (reached the batch limit or reached the end)
const numResources = 50001
resources := make([]cloudmigration.CloudMigrationResource, numResources)
snapshotUid := uuid.New().String()
t.Run("create the resources", func(t *testing.T) {
for i := 0; i < numResources; i++ {
resources[i] = cloudmigration.CloudMigrationResource{
Name: fmt.Sprintf("Resource %d", i),
Type: cloudmigration.DashboardDataType,
RefID: fmt.Sprintf("refid-%d", i),
Status: cloudmigration.ItemStatusPending,
}
}
// Attempt to create all resources at once -- it should batch under the hood
err := s.CreateSnapshotResources(ctx, snapshotUid, resources)
require.NoError(t, err)
// Get the resources and ensure they're all there
resources, err := s.getSnapshotResources(ctx, snapshotUid, cloudmigration.SnapshotResultQueryParams{
ResultPage: 1,
ResultLimit: numResources,
SortColumn: cloudmigration.SortColumnID,
SortOrder: cloudmigration.SortOrderAsc,
})
require.NoError(t, err)
assert.Len(t, resources, numResources)
})
t.Run("update the resources", func(t *testing.T) {
// Initially, update with a mix of ok and error statuses
for i := 0; i < numResources; i++ {
if i%2 == 0 {
resources[i].Status = cloudmigration.ItemStatusOK
} else {
resources[i].Status = cloudmigration.ItemStatusError
resources[i].ErrorCode = "test-error"
resources[i].Error = "test-error-message"
}
}
err := s.UpdateSnapshotResources(ctx, snapshotUid, resources)
require.NoError(t, err)
resources, err := s.getSnapshotResources(ctx, snapshotUid, cloudmigration.SnapshotResultQueryParams{
ResultPage: 1,
ResultLimit: numResources,
SortColumn: cloudmigration.SortColumnID,
SortOrder: cloudmigration.SortOrderAsc,
})
require.NoError(t, err)
assert.Len(t, resources, numResources)
for i, r := range resources {
if i%2 == 0 {
assert.Equal(t, cloudmigration.ItemStatusOK, r.Status)
} else {
assert.Equal(t, cloudmigration.ItemStatusError, r.Status)
assert.Equal(t, "test-error", string(r.ErrorCode))
assert.Equal(t, "test-error-message", r.Error)
}
}
// Now update with only error statuses
for i := 0; i < numResources; i++ {
resources[i].Status = cloudmigration.ItemStatusError
resources[i].ErrorCode = "test-error-2"
resources[i].Error = "test-error-message-2"
}
err = s.UpdateSnapshotResources(ctx, snapshotUid, resources)
require.NoError(t, err)
resources, err = s.getSnapshotResources(ctx, snapshotUid, cloudmigration.SnapshotResultQueryParams{
ResultPage: 1,
ResultLimit: numResources,
SortColumn: cloudmigration.SortColumnID,
SortOrder: cloudmigration.SortOrderAsc,
})
require.NoError(t, err)
assert.Len(t, resources, numResources)
for _, r := range resources {
assert.Equal(t, cloudmigration.ItemStatusError, r.Status)
assert.Equal(t, "test-error-2", string(r.ErrorCode))
assert.Equal(t, "test-error-message-2", r.Error)
}
// Finally, all okay
for i := 0; i < numResources; i++ {
resources[i].Status = cloudmigration.ItemStatusOK
}
err = s.UpdateSnapshotResources(ctx, snapshotUid, resources)
require.NoError(t, err)
resources, err = s.getSnapshotResources(ctx, snapshotUid, cloudmigration.SnapshotResultQueryParams{
ResultPage: 1,
ResultLimit: numResources,
SortColumn: cloudmigration.SortColumnID,
SortOrder: cloudmigration.SortOrderAsc,
})
require.NoError(t, err)
assert.Len(t, resources, numResources)
for _, r := range resources {
assert.Equal(t, cloudmigration.ItemStatusOK, r.Status)
}
})
})
}
func Test_SnapshotResourceCaseInsensitiveSorting(t *testing.T) {

View File

@ -35,7 +35,6 @@ type DashboardService interface {
SaveDashboard(ctx context.Context, dto *SaveDashboardDTO, allowUiUpdate bool) (*Dashboard, error)
SearchDashboards(ctx context.Context, query *FindPersistedDashboardsQuery) (model.HitList, error)
CountInFolders(ctx context.Context, orgID int64, folderUIDs []string, user identity.Requester) (int64, error)
GetAllDashboards(ctx context.Context) ([]*Dashboard, error)
GetAllDashboardsByOrgId(ctx context.Context, orgID int64) ([]*Dashboard, error)
CleanUpDashboard(ctx context.Context, dashboardUID string, dashboardId int64, orgId int64) error
CountDashboardsInOrg(ctx context.Context, orgID int64) (int64, error)
@ -104,6 +103,5 @@ type Store interface {
CountDashboardsInFolders(ctx context.Context, request *CountDashboardsInFolderRequest) (int64, error)
DeleteDashboardsInFolders(ctx context.Context, request *DeleteDashboardsInFolderRequest) error
GetAllDashboards(ctx context.Context) ([]*Dashboard, error)
GetAllDashboardsByOrgId(ctx context.Context, orgID int64) ([]*Dashboard, error)
}

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.52.2. DO NOT EDIT.
// Code generated by mockery v2.53.3. DO NOT EDIT.
package dashboards

View File

@ -188,36 +188,6 @@ func (_m *FakeDashboardService) FindDashboards(ctx context.Context, query *FindP
return r0, r1
}
// GetAllDashboards provides a mock function with given fields: ctx
func (_m *FakeDashboardService) GetAllDashboards(ctx context.Context) ([]*Dashboard, error) {
ret := _m.Called(ctx)
if len(ret) == 0 {
panic("no return value specified for GetAllDashboards")
}
var r0 []*Dashboard
var r1 error
if rf, ok := ret.Get(0).(func(context.Context) ([]*Dashboard, error)); ok {
return rf(ctx)
}
if rf, ok := ret.Get(0).(func(context.Context) []*Dashboard); ok {
r0 = rf(ctx)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]*Dashboard)
}
}
if rf, ok := ret.Get(1).(func(context.Context) error); ok {
r1 = rf(ctx)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// GetAllDashboardsByOrgId provides a mock function with given fields: ctx, orgID
func (_m *FakeDashboardService) GetAllDashboardsByOrgId(ctx context.Context, orgID int64) ([]*Dashboard, error) {
ret := _m.Called(ctx, orgID)

View File

@ -1013,21 +1013,6 @@ func (d *dashboardStore) DeleteDashboardsInFolders(
})
}
func (d *dashboardStore) GetAllDashboards(ctx context.Context) ([]*dashboards.Dashboard, error) {
ctx, span := tracer.Start(ctx, "dashboards.database.GetAllDashboards")
defer span.End()
var dashboards = make([]*dashboards.Dashboard, 0)
err := d.store.WithDbSession(ctx, func(session *db.Session) error {
err := session.Find(&dashboards)
return err
})
if err != nil {
return nil, err
}
return dashboards, nil
}
func (d *dashboardStore) GetAllDashboardsByOrgId(ctx context.Context, orgID int64) ([]*dashboards.Dashboard, error) {
ctx, span := tracer.Start(ctx, "dashboards.database.GetAllDashboardsByOrgId")
defer span.End()

View File

@ -80,6 +80,7 @@ const (
k8sDashboardKvNamespace = "dashboard-cleanup"
k8sDashboardKvLastResourceVersionKey = "last-resource-version"
provisioningConcurrencyLimit = 10
listAllDashboardsLimit = 100000
)
type DashboardServiceImpl struct {
@ -1628,18 +1629,6 @@ func (dr *DashboardServiceImpl) SearchDashboards(ctx context.Context, query *das
return hits, nil
}
func (dr *DashboardServiceImpl) GetAllDashboards(ctx context.Context) ([]*dashboards.Dashboard, error) {
if dr.features.IsEnabledGlobally(featuremgmt.FlagKubernetesClientDashboardsFolders) {
requester, err := identity.GetRequester(ctx)
if err != nil {
return nil, err
}
return dr.listDashboardsThroughK8s(ctx, requester.GetOrgID())
}
return dr.dashboardStore.GetAllDashboards(ctx)
}
func (dr *DashboardServiceImpl) GetAllDashboardsByOrgId(ctx context.Context, orgID int64) ([]*dashboards.Dashboard, error) {
if dr.features.IsEnabledGlobally(featuremgmt.FlagKubernetesClientDashboardsFolders) {
return dr.listDashboardsThroughK8s(ctx, orgID)
@ -1935,29 +1924,40 @@ func (dr *DashboardServiceImpl) deleteDashboardThroughK8s(ctx context.Context, c
}
func (dr *DashboardServiceImpl) listDashboardsThroughK8s(ctx context.Context, orgID int64) ([]*dashboards.Dashboard, error) {
out, err := dr.k8sclient.List(ctx, orgID, v1.ListOptions{})
if err != nil {
return nil, err
} else if out == nil {
return nil, dashboards.ErrDashboardNotFound
}
dashes := make([]*dashboards.Dashboard, 0)
// get users ahead of time to do just one db call, rather than 2 per item in the list
users, err := dr.getUsersForList(ctx, out.Items, orgID)
if err != nil {
return nil, err
}
for continueToken := ""; true; {
out, err := dr.k8sclient.List(ctx, orgID, v1.ListOptions{
Limit: listAllDashboardsLimit,
Continue: continueToken,
})
if err != nil {
return nil, err
} else if out == nil {
return nil, dashboards.ErrDashboardNotFound
}
dashboards := make([]*dashboards.Dashboard, 0)
for _, item := range out.Items {
dash, err := dr.unstructuredToLegacyDashboardWithUsers(&item, orgID, users)
// get users ahead of time to do just one db call, rather than 2 per item in the list
users, err := dr.getUsersForList(ctx, out.Items, orgID)
if err != nil {
return nil, err
}
dashboards = append(dashboards, dash)
for _, item := range out.Items {
dash, err := dr.unstructuredToLegacyDashboardWithUsers(&item, orgID, users)
if err != nil {
return nil, err
}
dashes = append(dashes, dash)
}
continueToken = out.GetContinue()
if continueToken == "" {
break
}
}
return dashboards, nil
return dashes, nil
}
func (dr *DashboardServiceImpl) searchDashboardsThroughK8sRaw(ctx context.Context, query *dashboards.FindPersistedDashboardsQuery) (dashboardv0.SearchResults, error) {

View File

@ -424,8 +424,8 @@ func TestGetAllDashboards(t *testing.T) {
t.Run("Should fallback to dashboard store if Kubernetes feature flags are not enabled", func(t *testing.T) {
service.features = featuremgmt.WithFeatures()
fakeStore.On("GetAllDashboards", mock.Anything).Return([]*dashboards.Dashboard{}, nil).Once()
dashboard, err := service.GetAllDashboards(context.Background())
fakeStore.On("GetAllDashboardsByOrgId", mock.Anything, int64(1)).Return([]*dashboards.Dashboard{}, nil).Once()
dashboard, err := service.GetAllDashboardsByOrgId(context.Background(), 1)
require.NoError(t, err)
require.NotNil(t, dashboard)
fakeStore.AssertExpectations(t)
@ -457,7 +457,7 @@ func TestGetAllDashboards(t *testing.T) {
k8sCliMock.On("GetUsersFromMeta", mock.Anything, mock.Anything).Return(map[string]*user.User{}, nil)
k8sCliMock.On("List", mock.Anything, mock.Anything, mock.Anything).Return(&unstructured.UnstructuredList{Items: []unstructured.Unstructured{dashboardUnstructured}}, nil).Once()
dashes, err := service.GetAllDashboards(ctx)
dashes, err := service.GetAllDashboardsByOrgId(ctx, 1)
require.NoError(t, err)
require.NotNil(t, dashes)
k8sCliMock.AssertExpectations(t)

View File

@ -220,36 +220,6 @@ func (_m *FakeDashboardStore) FindDashboards(ctx context.Context, query *FindPer
return r0, r1
}
// GetAllDashboards provides a mock function with given fields: ctx
func (_m *FakeDashboardStore) GetAllDashboards(ctx context.Context) ([]*Dashboard, error) {
ret := _m.Called(ctx)
if len(ret) == 0 {
panic("no return value specified for GetAllDashboards")
}
var r0 []*Dashboard
var r1 error
if rf, ok := ret.Get(0).(func(context.Context) ([]*Dashboard, error)); ok {
return rf(ctx)
}
if rf, ok := ret.Get(0).(func(context.Context) []*Dashboard); ok {
r0 = rf(ctx)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]*Dashboard)
}
}
if rf, ok := ret.Get(1).(func(context.Context) error); ok {
r1 = rf(ctx)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// GetAllDashboardsByOrgId provides a mock function with given fields: ctx, orgID
func (_m *FakeDashboardStore) GetAllDashboardsByOrgId(ctx context.Context, orgID int64) ([]*Dashboard, error) {
ret := _m.Called(ctx, orgID)