2024-02-26 21:52:16 +08:00
package cloudmigrationimpl
2024-01-23 00:09:08 +08:00
import (
"context"
2024-03-25 23:43:28 +08:00
"encoding/base64"
"encoding/json"
2024-05-31 21:03:43 +08:00
"errors"
2024-03-25 23:43:28 +08:00
"fmt"
2024-04-03 19:36:13 +08:00
"net/http"
2024-06-19 21:20:52 +08:00
"path/filepath"
"sync"
2024-03-25 23:43:28 +08:00
"time"
2024-01-23 00:09:08 +08:00
2024-07-20 12:02:31 +08:00
"github.com/google/uuid"
2024-04-18 03:43:09 +08:00
"github.com/grafana/grafana/pkg/api/response"
2024-01-23 00:09:08 +08:00
"github.com/grafana/grafana/pkg/api/routing"
"github.com/grafana/grafana/pkg/infra/db"
2024-07-20 12:02:31 +08:00
"github.com/grafana/grafana/pkg/infra/kvstore"
2024-01-23 00:09:08 +08:00
"github.com/grafana/grafana/pkg/infra/log"
2024-03-25 23:43:28 +08:00
"github.com/grafana/grafana/pkg/infra/tracing"
2024-02-26 21:52:16 +08:00
"github.com/grafana/grafana/pkg/services/cloudmigration"
"github.com/grafana/grafana/pkg/services/cloudmigration/api"
2024-06-14 01:58:59 +08:00
"github.com/grafana/grafana/pkg/services/cloudmigration/gmsclient"
2024-07-11 20:32:02 +08:00
"github.com/grafana/grafana/pkg/services/cloudmigration/objectstorage"
2024-04-03 19:36:13 +08:00
"github.com/grafana/grafana/pkg/services/dashboards"
2024-01-23 00:09:08 +08:00
"github.com/grafana/grafana/pkg/services/datasources"
"github.com/grafana/grafana/pkg/services/featuremgmt"
2024-04-03 19:36:13 +08:00
"github.com/grafana/grafana/pkg/services/folder"
2024-03-25 23:43:28 +08:00
"github.com/grafana/grafana/pkg/services/gcom"
2024-04-03 19:36:13 +08:00
"github.com/grafana/grafana/pkg/services/secrets"
2024-07-03 21:38:26 +08:00
"github.com/grafana/grafana/pkg/services/user"
2024-01-23 00:09:08 +08:00
"github.com/grafana/grafana/pkg/setting"
2024-06-19 21:20:52 +08:00
"github.com/grafana/grafana/pkg/util"
2024-01-23 00:09:08 +08:00
"github.com/prometheus/client_golang/prometheus"
2024-05-31 21:03:43 +08:00
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
2024-01-23 00:09:08 +08:00
)
2024-03-28 19:50:31 +08:00
// Service Define the cloudmigration.Service Implementation.
2024-02-26 21:52:16 +08:00
type Service struct {
store store
2024-03-25 23:43:28 +08:00
log * log . ConcreteLogger
2024-01-23 00:09:08 +08:00
cfg * setting . Cfg
2024-06-19 21:20:52 +08:00
buildSnapshotMutex sync . Mutex
2024-07-19 00:55:27 +08:00
cancelMutex sync . Mutex
cancelFunc context . CancelFunc
2024-07-11 20:32:02 +08:00
features featuremgmt . FeatureToggles
gmsClient gmsclient . Client
objectStorage objectstorage . ObjectStorage
2024-04-03 19:36:13 +08:00
dsService datasources . DataSourceService
gcomService gcom . Service
dashboardService dashboards . DashboardService
folderService folder . Service
secretsService secrets . Service
2024-07-20 12:02:31 +08:00
kvStore * kvstore . NamespacedKVStore
2024-01-23 00:09:08 +08:00
2024-03-25 23:43:28 +08:00
api * api . CloudMigrationAPI
tracer tracing . Tracer
metrics * Metrics
2024-01-23 00:09:08 +08:00
}
2024-02-26 21:52:16 +08:00
var LogPrefix = "cloudmigration.service"
2024-01-23 00:09:08 +08:00
2024-03-25 23:43:28 +08:00
const (
// nolint:gosec
2024-04-11 20:43:46 +08:00
cloudMigrationAccessPolicyNamePrefix = "grafana-cloud-migrations"
2024-03-25 23:43:28 +08:00
//nolint:gosec
2024-04-11 20:43:46 +08:00
cloudMigrationTokenNamePrefix = "grafana-cloud-migrations"
2024-03-25 23:43:28 +08:00
)
2024-02-26 21:52:16 +08:00
var _ cloudmigration . Service = ( * Service ) ( nil )
2024-01-23 00:09:08 +08:00
// ProvideService Factory for method used by wire to inject dependencies.
// builds the service, and api, and configures routes
func ProvideService (
cfg * setting . Cfg ,
features featuremgmt . FeatureToggles ,
2024-02-26 21:52:16 +08:00
db db . DB ,
2024-01-23 00:09:08 +08:00
dsService datasources . DataSourceService ,
2024-04-03 19:36:13 +08:00
secretsService secrets . Service ,
2024-01-23 00:09:08 +08:00
routeRegister routing . RouteRegister ,
prom prometheus . Registerer ,
2024-03-25 23:43:28 +08:00
tracer tracing . Tracer ,
2024-04-03 19:36:13 +08:00
dashboardService dashboards . DashboardService ,
folderService folder . Service ,
2024-07-20 12:02:31 +08:00
kvStore kvstore . KVStore ,
2024-04-18 03:43:09 +08:00
) ( cloudmigration . Service , error ) {
2024-01-23 00:09:08 +08:00
if ! features . IsEnabledGlobally ( featuremgmt . FlagOnPremToCloudMigrations ) {
2024-04-18 03:43:09 +08:00
return & NoopServiceImpl { } , nil
2024-01-23 00:09:08 +08:00
}
2024-02-26 21:52:16 +08:00
s := & Service {
2024-04-03 19:36:13 +08:00
store : & sqlStore { db : db , secretsService : secretsService } ,
log : log . New ( LogPrefix ) ,
cfg : cfg ,
features : features ,
dsService : dsService ,
tracer : tracer ,
metrics : newMetrics ( ) ,
secretsService : secretsService ,
dashboardService : dashboardService ,
folderService : folderService ,
2024-07-20 12:02:31 +08:00
kvStore : kvstore . WithNamespace ( kvStore , 0 , "cloudmigration" ) ,
2024-01-23 00:09:08 +08:00
}
2024-03-25 23:43:28 +08:00
s . api = api . RegisterApi ( routeRegister , s , tracer )
2024-01-23 00:09:08 +08:00
2024-07-11 20:32:02 +08:00
s . objectStorage = objectstorage . NewS3 ( )
2024-04-21 11:51:58 +08:00
if ! cfg . CloudMigration . IsDeveloperMode {
2024-07-15 21:22:57 +08:00
c , err := gmsclient . NewGMSClient ( cfg )
2024-04-21 11:51:58 +08:00
if err != nil {
2024-07-15 21:22:57 +08:00
return nil , fmt . Errorf ( "initializing GMS client: %w" , err )
2024-04-21 11:51:58 +08:00
}
2024-07-15 21:22:57 +08:00
s . gmsClient = c
2024-04-21 11:51:58 +08:00
s . gcomService = gcom . New ( gcom . Config { ApiURL : cfg . GrafanaComAPIURL , Token : cfg . CloudMigration . GcomAPIToken } )
} else {
2024-06-14 01:58:59 +08:00
s . gmsClient = gmsclient . NewInMemoryClient ( )
2024-05-31 20:39:10 +08:00
s . gcomService = & gcomStub { policies : map [ string ] gcom . AccessPolicy { } , token : nil }
2024-04-21 11:51:58 +08:00
s . cfg . StackID = "12345"
2024-04-18 03:43:09 +08:00
}
2024-05-31 21:03:43 +08:00
if err := prom . Register ( s . metrics ) ; err != nil {
var alreadyRegisterErr prometheus . AlreadyRegisteredError
if errors . As ( err , & alreadyRegisterErr ) {
s . log . Warn ( "cloud migration metrics already registered" )
} else {
return s , fmt . Errorf ( "registering cloud migration metrics: %w" , err )
}
2024-01-23 00:09:08 +08:00
}
2024-04-18 03:43:09 +08:00
return s , nil
2024-01-23 00:09:08 +08:00
}
2024-05-31 20:39:10 +08:00
func ( s * Service ) GetToken ( ctx context . Context ) ( gcom . TokenView , error ) {
ctx , span := s . tracer . Start ( ctx , "CloudMigrationService.GetToken" )
defer span . End ( )
logger := s . log . FromContext ( ctx )
requestID := tracing . TraceIDFromContext ( ctx , false )
timeoutCtx , cancel := context . WithTimeout ( ctx , s . cfg . CloudMigration . FetchInstanceTimeout )
defer cancel ( )
instance , err := s . gcomService . GetInstanceByID ( timeoutCtx , requestID , s . cfg . StackID )
if err != nil {
return gcom . TokenView { } , fmt . Errorf ( "fetching instance by id: id=%s %w" , s . cfg . StackID , err )
}
logger . Info ( "instance found" , "slug" , instance . Slug )
accessPolicyName := fmt . Sprintf ( "%s-%s" , cloudMigrationAccessPolicyNamePrefix , s . cfg . StackID )
accessTokenName := fmt . Sprintf ( "%s-%s" , cloudMigrationTokenNamePrefix , s . cfg . StackID )
timeoutCtx , cancel = context . WithTimeout ( ctx , s . cfg . CloudMigration . ListTokensTimeout )
defer cancel ( )
tokens , err := s . gcomService . ListTokens ( timeoutCtx , gcom . ListTokenParams {
RequestID : requestID ,
Region : instance . RegionSlug ,
AccessPolicyName : accessPolicyName ,
TokenName : accessTokenName } )
if err != nil {
return gcom . TokenView { } , fmt . Errorf ( "listing tokens: %w" , err )
}
logger . Info ( "found access tokens" , "num_tokens" , len ( tokens ) )
for _ , token := range tokens {
if token . Name == accessTokenName {
logger . Info ( "found existing cloud migration token" , "tokenID" , token . ID , "accessPolicyID" , token . AccessPolicyID )
return token , nil
}
}
logger . Info ( "cloud migration token not found" )
return gcom . TokenView { } , cloudmigration . ErrTokenNotFound
}
2024-03-25 23:43:28 +08:00
func ( s * Service ) CreateToken ( ctx context . Context ) ( cloudmigration . CreateAccessTokenResponse , error ) {
ctx , span := s . tracer . Start ( ctx , "CloudMigrationService.CreateToken" )
defer span . End ( )
logger := s . log . FromContext ( ctx )
requestID := tracing . TraceIDFromContext ( ctx , false )
timeoutCtx , cancel := context . WithTimeout ( ctx , s . cfg . CloudMigration . FetchInstanceTimeout )
defer cancel ( )
instance , err := s . gcomService . GetInstanceByID ( timeoutCtx , requestID , s . cfg . StackID )
if err != nil {
return cloudmigration . CreateAccessTokenResponse { } , fmt . Errorf ( "fetching instance by id: id=%s %w" , s . cfg . StackID , err )
}
2024-04-11 20:43:46 +08:00
// Add the stack id to the access policy name to ensure access policies in a org have unique names.
accessPolicyName := fmt . Sprintf ( "%s-%s" , cloudMigrationAccessPolicyNamePrefix , s . cfg . StackID )
accessPolicyDisplayName := fmt . Sprintf ( "%s-%s" , s . cfg . Slug , cloudMigrationAccessPolicyNamePrefix )
2024-03-25 23:43:28 +08:00
timeoutCtx , cancel = context . WithTimeout ( ctx , s . cfg . CloudMigration . FetchAccessPolicyTimeout )
defer cancel ( )
2024-04-11 20:43:46 +08:00
existingAccessPolicy , err := s . findAccessPolicyByName ( timeoutCtx , instance . RegionSlug , accessPolicyName )
2024-03-25 23:43:28 +08:00
if err != nil {
2024-04-11 20:43:46 +08:00
return cloudmigration . CreateAccessTokenResponse { } , fmt . Errorf ( "fetching access policy by name: name=%s %w" , accessPolicyName , err )
2024-03-25 23:43:28 +08:00
}
if existingAccessPolicy != nil {
timeoutCtx , cancel := context . WithTimeout ( ctx , s . cfg . CloudMigration . DeleteAccessPolicyTimeout )
defer cancel ( )
if _ , err := s . gcomService . DeleteAccessPolicy ( timeoutCtx , gcom . DeleteAccessPolicyParams {
RequestID : requestID ,
AccessPolicyID : existingAccessPolicy . ID ,
Region : instance . RegionSlug ,
} ) ; err != nil {
return cloudmigration . CreateAccessTokenResponse { } , fmt . Errorf ( "deleting access policy: id=%s region=%s %w" , existingAccessPolicy . ID , instance . RegionSlug , err )
}
logger . Info ( "deleted access policy" , existingAccessPolicy . ID , "name" , existingAccessPolicy . Name )
}
timeoutCtx , cancel = context . WithTimeout ( ctx , s . cfg . CloudMigration . CreateAccessPolicyTimeout )
defer cancel ( )
accessPolicy , err := s . gcomService . CreateAccessPolicy ( timeoutCtx ,
gcom . CreateAccessPolicyParams {
RequestID : requestID ,
Region : instance . RegionSlug ,
} ,
gcom . CreateAccessPolicyPayload {
2024-04-11 20:43:46 +08:00
Name : accessPolicyName ,
DisplayName : accessPolicyDisplayName ,
2024-03-25 23:43:28 +08:00
Realms : [ ] gcom . Realm { { Type : "stack" , Identifier : s . cfg . StackID , LabelPolicies : [ ] gcom . LabelPolicy { } } } ,
Scopes : [ ] string { "cloud-migrations:read" , "cloud-migrations:write" } ,
} )
if err != nil {
return cloudmigration . CreateAccessTokenResponse { } , fmt . Errorf ( "creating access policy: %w" , err )
}
logger . Info ( "created access policy" , "id" , accessPolicy . ID , "name" , accessPolicy . Name )
2024-04-11 20:43:46 +08:00
// Add the stack id to the token name to ensure tokens in a org have unique names.
accessTokenName := fmt . Sprintf ( "%s-%s" , cloudMigrationTokenNamePrefix , s . cfg . StackID )
accessTokenDisplayName := fmt . Sprintf ( "%s-%s" , s . cfg . Slug , cloudMigrationTokenNamePrefix )
2024-03-25 23:43:28 +08:00
timeoutCtx , cancel = context . WithTimeout ( ctx , s . cfg . CloudMigration . CreateTokenTimeout )
defer cancel ( )
2024-04-11 20:43:46 +08:00
2024-03-25 23:43:28 +08:00
token , err := s . gcomService . CreateToken ( timeoutCtx ,
gcom . CreateTokenParams { RequestID : requestID , Region : instance . RegionSlug } ,
gcom . CreateTokenPayload {
AccessPolicyID : accessPolicy . ID ,
2024-04-11 20:43:46 +08:00
Name : accessTokenName ,
DisplayName : accessTokenDisplayName ,
2024-03-25 23:43:28 +08:00
ExpiresAt : time . Now ( ) . Add ( s . cfg . CloudMigration . TokenExpiresAfter ) ,
} )
if err != nil {
return cloudmigration . CreateAccessTokenResponse { } , fmt . Errorf ( "creating access token: %w" , err )
}
logger . Info ( "created access token" , "id" , token . ID , "name" , token . Name )
s . metrics . accessTokenCreated . With ( prometheus . Labels { "slug" : s . cfg . Slug } ) . Inc ( )
2024-03-29 03:02:54 +08:00
bytes , err := json . Marshal ( cloudmigration . Base64EncodedTokenPayload {
Token : token . Token ,
Instance : cloudmigration . Base64HGInstance {
StackID : instance . ID ,
RegionSlug : instance . RegionSlug ,
2024-06-14 01:58:59 +08:00
ClusterSlug : instance . ClusterSlug , // This should be used for routing to GMS
2024-03-29 03:02:54 +08:00
Slug : instance . Slug ,
} ,
2024-03-25 23:43:28 +08:00
} )
if err != nil {
return cloudmigration . CreateAccessTokenResponse { } , fmt . Errorf ( "encoding token: %w" , err )
}
return cloudmigration . CreateAccessTokenResponse { Token : base64 . StdEncoding . EncodeToString ( bytes ) } , nil
}
func ( s * Service ) findAccessPolicyByName ( ctx context . Context , regionSlug , accessPolicyName string ) ( * gcom . AccessPolicy , error ) {
ctx , span := s . tracer . Start ( ctx , "CloudMigrationService.findAccessPolicyByName" )
defer span . End ( )
accessPolicies , err := s . gcomService . ListAccessPolicies ( ctx , gcom . ListAccessPoliciesParams {
RequestID : tracing . TraceIDFromContext ( ctx , false ) ,
Region : regionSlug ,
Name : accessPolicyName ,
} )
if err != nil {
return nil , fmt . Errorf ( "listing access policies: name=%s region=%s :%w" , accessPolicyName , regionSlug , err )
}
for _ , accessPolicy := range accessPolicies {
if accessPolicy . Name == accessPolicyName {
return & accessPolicy , nil
}
}
return nil , nil
2024-01-23 00:09:08 +08:00
}
2024-03-25 20:30:47 +08:00
2024-06-14 01:58:59 +08:00
func ( s * Service ) ValidateToken ( ctx context . Context , cm cloudmigration . CloudMigrationSession ) error {
2024-04-03 19:36:13 +08:00
ctx , span := s . tracer . Start ( ctx , "CloudMigrationService.ValidateToken" )
defer span . End ( )
2024-06-14 01:58:59 +08:00
if err := s . gmsClient . ValidateKey ( ctx , cm ) ; err != nil {
2024-04-18 03:43:09 +08:00
return fmt . Errorf ( "validating key: %w" , err )
2024-04-03 19:36:13 +08:00
}
2024-03-25 20:30:47 +08:00
return nil
}
2024-05-31 21:03:43 +08:00
func ( s * Service ) DeleteToken ( ctx context . Context , tokenID string ) error {
ctx , span := s . tracer . Start ( ctx , "CloudMigrationService.DeleteToken" , trace . WithAttributes ( attribute . String ( "tokenID" , tokenID ) ) )
defer span . End ( )
logger := s . log . FromContext ( ctx )
requestID := tracing . TraceIDFromContext ( ctx , false )
timeoutCtx , cancel := context . WithTimeout ( ctx , s . cfg . CloudMigration . FetchInstanceTimeout )
defer cancel ( )
instance , err := s . gcomService . GetInstanceByID ( timeoutCtx , requestID , s . cfg . StackID )
if err != nil {
return fmt . Errorf ( "fetching instance by id: id=%s %w" , s . cfg . StackID , err )
}
logger . Info ( "found instance" , "instanceID" , instance . ID )
timeoutCtx , cancel = context . WithTimeout ( ctx , s . cfg . CloudMigration . DeleteTokenTimeout )
defer cancel ( )
if err := s . gcomService . DeleteToken ( timeoutCtx , gcom . DeleteTokenParams {
RequestID : tracing . TraceIDFromContext ( ctx , false ) ,
Region : instance . RegionSlug ,
TokenID : tokenID ,
} ) ; err != nil && ! errors . Is ( err , gcom . ErrTokenNotFound ) {
return fmt . Errorf ( "deleting cloud migration token: tokenID=%s %w" , tokenID , err )
}
logger . Info ( "deleted cloud migration token" , "tokenID" , tokenID )
s . metrics . accessTokenDeleted . With ( prometheus . Labels { "slug" : s . cfg . Slug } ) . Inc ( )
return nil
}
2024-06-14 01:58:59 +08:00
func ( s * Service ) GetSession ( ctx context . Context , uid string ) ( * cloudmigration . CloudMigrationSession , error ) {
2024-04-03 19:36:13 +08:00
ctx , span := s . tracer . Start ( ctx , "CloudMigrationService.GetMigration" )
defer span . End ( )
2024-06-14 01:58:59 +08:00
migration , err := s . store . GetMigrationSessionByUID ( ctx , uid )
2024-04-03 19:36:13 +08:00
if err != nil {
return nil , err
}
2024-03-28 19:50:31 +08:00
2024-04-03 19:36:13 +08:00
return migration , nil
2024-03-25 20:30:47 +08:00
}
2024-06-14 01:58:59 +08:00
func ( s * Service ) GetSessionList ( ctx context . Context ) ( * cloudmigration . CloudMigrationSessionListResponse , error ) {
2024-07-10 20:46:38 +08:00
values , err := s . store . GetCloudMigrationSessionList ( ctx )
2024-03-28 19:50:31 +08:00
if err != nil {
return nil , err
}
2024-06-14 01:58:59 +08:00
migrations := make ( [ ] cloudmigration . CloudMigrationSessionResponse , 0 )
2024-03-28 19:50:31 +08:00
for _ , v := range values {
2024-06-14 01:58:59 +08:00
migrations = append ( migrations , cloudmigration . CloudMigrationSessionResponse {
2024-05-02 00:29:25 +08:00
UID : v . UID ,
2024-06-14 01:58:59 +08:00
Slug : v . Slug ,
2024-03-28 19:50:31 +08:00
Created : v . Created ,
Updated : v . Updated ,
} )
}
2024-06-14 01:58:59 +08:00
return & cloudmigration . CloudMigrationSessionListResponse { Sessions : migrations } , nil
2024-03-25 20:30:47 +08:00
}
2024-06-14 01:58:59 +08:00
func ( s * Service ) CreateSession ( ctx context . Context , cmd cloudmigration . CloudMigrationSessionRequest ) ( * cloudmigration . CloudMigrationSessionResponse , error ) {
2024-03-29 03:02:54 +08:00
ctx , span := s . tracer . Start ( ctx , "CloudMigrationService.createMigration" )
defer span . End ( )
base64Token := cmd . AuthToken
b , err := base64 . StdEncoding . DecodeString ( base64Token )
if err != nil {
return nil , fmt . Errorf ( "token could not be decoded" )
}
var token cloudmigration . Base64EncodedTokenPayload
if err := json . Unmarshal ( b , & token ) ; err != nil {
return nil , fmt . Errorf ( "invalid token" ) // don't want to leak info here
}
2024-03-29 08:55:27 +08:00
migration := token . ToMigration ( )
2024-06-14 01:58:59 +08:00
// validate token against GMS before saving
2024-04-03 19:36:13 +08:00
if err := s . ValidateToken ( ctx , migration ) ; err != nil {
return nil , fmt . Errorf ( "token validation: %w" , err )
}
2024-06-14 01:58:59 +08:00
cm , err := s . store . CreateMigrationSession ( ctx , migration )
2024-04-03 19:36:13 +08:00
if err != nil {
2024-03-29 03:02:54 +08:00
return nil , fmt . Errorf ( "error creating migration: %w" , err )
}
2024-07-20 12:02:31 +08:00
s . report ( ctx , cm , gmsclient . EventConnect , 0 , nil )
2024-06-14 01:58:59 +08:00
return & cloudmigration . CloudMigrationSessionResponse {
2024-05-02 00:29:25 +08:00
UID : cm . UID ,
2024-06-14 01:58:59 +08:00
Slug : token . Instance . Slug ,
2024-04-03 19:36:13 +08:00
Created : cm . Created ,
Updated : cm . Updated ,
2024-03-29 03:02:54 +08:00
} , nil
2024-03-25 20:30:47 +08:00
}
2024-06-14 01:58:59 +08:00
func ( s * Service ) RunMigration ( ctx context . Context , uid string ) ( * cloudmigration . MigrateDataResponse , error ) {
2024-04-18 03:43:09 +08:00
// Get migration to read the auth token
2024-06-14 01:58:59 +08:00
migration , err := s . GetSession ( ctx , uid )
2024-04-18 03:43:09 +08:00
if err != nil {
return nil , fmt . Errorf ( "migration get error: %w" , err )
}
// Get migration data JSON
2024-07-03 21:38:26 +08:00
request , err := s . getMigrationDataJSON ( ctx , & user . SignedInUser { } )
2024-04-18 03:43:09 +08:00
if err != nil {
s . log . Error ( "error getting the json request body for migration run" , "err" , err . Error ( ) )
return nil , fmt . Errorf ( "migration data get error: %w" , err )
}
2024-06-14 01:58:59 +08:00
// Call the gms service
resp , err := s . gmsClient . MigrateData ( ctx , * migration , * request )
2024-04-18 03:43:09 +08:00
if err != nil {
s . log . Error ( "error migrating data: %w" , err )
return nil , fmt . Errorf ( "migrate data error: %w" , err )
}
// save the result of the migration
2024-06-14 01:58:59 +08:00
runUID , err := s . createMigrationRun ( ctx , cloudmigration . CloudMigrationSnapshot {
SessionUID : migration . UID ,
2024-06-25 11:50:07 +08:00
Resources : resp . Items ,
2024-04-18 03:43:09 +08:00
} )
if err != nil {
response . Error ( http . StatusInternalServerError , "migration run save error" , err )
}
2024-05-02 00:29:25 +08:00
resp . RunUID = runUID
2024-04-18 03:43:09 +08:00
return resp , nil
}
2024-06-19 21:20:52 +08:00
func ( s * Service ) createMigrationRun ( ctx context . Context , cmr cloudmigration . CloudMigrationSnapshot ) ( string , error ) {
uid , err := s . store . CreateMigrationRun ( ctx , cmr )
2024-04-03 19:36:13 +08:00
if err != nil {
2024-06-19 21:20:52 +08:00
s . log . Error ( "Failed to save migration run" , "err" , err )
return "" , err
2024-04-03 19:36:13 +08:00
}
2024-06-19 21:20:52 +08:00
return uid , nil
}
2024-06-15 02:16:36 +08:00
2024-06-19 21:20:52 +08:00
func ( s * Service ) GetMigrationStatus ( ctx context . Context , runUID string ) ( * cloudmigration . CloudMigrationSnapshot , error ) {
cmr , err := s . store . GetMigrationStatus ( ctx , runUID )
2024-06-15 02:16:36 +08:00
if err != nil {
2024-06-19 21:20:52 +08:00
return nil , fmt . Errorf ( "retrieving migration status from db: %w" , err )
2024-06-15 02:16:36 +08:00
}
2024-06-19 21:20:52 +08:00
return cmr , nil
}
2024-06-15 02:16:36 +08:00
2024-06-19 21:20:52 +08:00
func ( s * Service ) GetMigrationRunList ( ctx context . Context , migUID string ) ( * cloudmigration . CloudMigrationRunList , error ) {
runs , err := s . store . GetMigrationStatusList ( ctx , migUID )
2024-06-15 02:16:36 +08:00
if err != nil {
2024-06-19 21:20:52 +08:00
return nil , fmt . Errorf ( "retrieving migration statuses from db: %w" , err )
2024-06-15 02:16:36 +08:00
}
2024-06-19 21:20:52 +08:00
runList := & cloudmigration . CloudMigrationRunList { Runs : [ ] cloudmigration . MigrateDataResponseList { } }
for _ , s := range runs {
runList . Runs = append ( runList . Runs , cloudmigration . MigrateDataResponseList {
RunUID : s . UID ,
2024-04-03 19:36:13 +08:00
} )
}
2024-06-19 21:20:52 +08:00
return runList , nil
}
2024-04-03 19:36:13 +08:00
2024-06-19 21:20:52 +08:00
func ( s * Service ) DeleteSession ( ctx context . Context , uid string ) ( * cloudmigration . CloudMigrationSession , error ) {
c , err := s . store . DeleteMigrationSessionByUID ( ctx , uid )
if err != nil {
return c , fmt . Errorf ( "deleting migration from db: %w" , err )
2024-04-03 19:36:13 +08:00
}
2024-07-20 12:02:31 +08:00
s . report ( ctx , c , gmsclient . EventDisconnect , 0 , nil )
2024-06-19 21:20:52 +08:00
return c , nil
}
2024-06-15 02:16:36 +08:00
2024-07-03 21:38:26 +08:00
func ( s * Service ) CreateSnapshot ( ctx context . Context , signedInUser * user . SignedInUser , sessionUid string ) ( * cloudmigration . CloudMigrationSnapshot , error ) {
ctx , span := s . tracer . Start ( ctx , "CloudMigrationService.CreateSnapshot" , trace . WithAttributes (
attribute . String ( "sessionUid" , sessionUid ) ,
) )
2024-06-19 21:20:52 +08:00
defer span . End ( )
// fetch session for the gms auth token
session , err := s . store . GetMigrationSessionByUID ( ctx , sessionUid )
if err != nil {
return nil , fmt . Errorf ( "fetching migration session for uid %s: %w" , sessionUid , err )
2024-04-03 19:36:13 +08:00
}
2024-04-18 03:43:09 +08:00
2024-07-15 21:22:57 +08:00
// query gms to establish new snapshot s.cfg.CloudMigration.StartSnapshotTimeout
initResp , err := s . gmsClient . StartSnapshot ( ctx , * session )
2024-06-19 21:20:52 +08:00
if err != nil {
return nil , fmt . Errorf ( "initializing snapshot with GMS for session %s: %w" , sessionUid , err )
}
2024-04-03 19:36:13 +08:00
2024-07-03 21:38:26 +08:00
if s . cfg . CloudMigration . SnapshotFolder == "" {
return nil , fmt . Errorf ( "snapshot folder is not set" )
2024-04-03 19:36:13 +08:00
}
2024-06-19 21:20:52 +08:00
// save snapshot to the db
snapshot := cloudmigration . CloudMigrationSnapshot {
2024-07-03 21:38:26 +08:00
UID : util . GenerateShortUID ( ) ,
2024-06-19 21:20:52 +08:00
SessionUID : sessionUid ,
2024-07-15 21:22:57 +08:00
Status : cloudmigration . SnapshotStatusCreating ,
2024-06-19 21:20:52 +08:00
EncryptionKey : initResp . EncryptionKey ,
2024-07-03 21:38:26 +08:00
GMSSnapshotUID : initResp . SnapshotID ,
LocalDir : filepath . Join ( s . cfg . CloudMigration . SnapshotFolder , "grafana" , "snapshots" , initResp . SnapshotID ) ,
2024-04-03 19:36:13 +08:00
}
2024-06-19 21:20:52 +08:00
uid , err := s . store . CreateSnapshot ( ctx , snapshot )
2024-04-03 19:36:13 +08:00
if err != nil {
2024-06-19 21:20:52 +08:00
return nil , fmt . Errorf ( "saving snapshot: %w" , err )
2024-04-03 19:36:13 +08:00
}
2024-06-19 21:20:52 +08:00
snapshot . UID = uid
2024-04-03 19:36:13 +08:00
2024-06-19 21:20:52 +08:00
// start building the snapshot asynchronously while we return a success response to the client
2024-07-03 21:38:26 +08:00
go func ( ) {
2024-07-19 00:55:27 +08:00
s . cancelMutex . Lock ( )
defer func ( ) {
s . cancelFunc = nil
s . cancelMutex . Unlock ( )
} ( )
ctx , cancelFunc := context . WithCancel ( context . Background ( ) )
s . cancelFunc = cancelFunc
2024-07-20 12:02:31 +08:00
s . report ( ctx , session , gmsclient . EventStartBuildingSnapshot , 0 , nil )
start := time . Now ( )
2024-07-22 21:11:57 +08:00
err := s . buildSnapshot ( ctx , signedInUser , initResp . MaxItemsPerPartition , initResp . Metadata , snapshot )
2024-07-20 12:02:31 +08:00
if err != nil {
2024-07-03 21:38:26 +08:00
s . log . Error ( "building snapshot" , "err" , err . Error ( ) )
2024-07-18 23:34:28 +08:00
// Update status to error with retries
2024-07-19 00:55:27 +08:00
if err := s . updateSnapshotWithRetries ( context . Background ( ) , cloudmigration . UpdateSnapshotCmd {
2024-07-18 23:34:28 +08:00
UID : snapshot . UID ,
Status : cloudmigration . SnapshotStatusError ,
} ) ; err != nil {
s . log . Error ( "critical failure during snapshot creation - please report any error logs" )
}
2024-07-03 21:38:26 +08:00
}
2024-07-20 12:02:31 +08:00
s . report ( ctx , session , gmsclient . EventDoneBuildingSnapshot , time . Since ( start ) , err )
2024-07-03 21:38:26 +08:00
} ( )
2024-04-03 19:36:13 +08:00
2024-06-19 21:20:52 +08:00
return & snapshot , nil
2024-04-03 19:36:13 +08:00
}
2024-06-19 21:20:52 +08:00
// GetSnapshot returns the on-prem version of a snapshot, supplemented with processing status from GMS
2024-06-25 11:50:07 +08:00
func ( s * Service ) GetSnapshot ( ctx context . Context , query cloudmigration . GetSnapshotsQuery ) ( * cloudmigration . CloudMigrationSnapshot , error ) {
2024-06-19 21:20:52 +08:00
ctx , span := s . tracer . Start ( ctx , "CloudMigrationService.GetSnapshot" )
defer span . End ( )
2024-06-25 11:50:07 +08:00
sessionUid , snapshotUid := query . SessionUID , query . SnapshotUID
snapshot , err := s . store . GetSnapshotByUID ( ctx , snapshotUid , query . ResultPage , query . ResultLimit )
2024-04-03 19:36:13 +08:00
if err != nil {
2024-06-19 21:20:52 +08:00
return nil , fmt . Errorf ( "fetching snapshot for uid %s: %w" , snapshotUid , err )
2024-04-03 19:36:13 +08:00
}
2024-06-19 21:20:52 +08:00
session , err := s . store . GetMigrationSessionByUID ( ctx , sessionUid )
if err != nil {
return nil , fmt . Errorf ( "fetching session for uid %s: %w" , sessionUid , err )
2024-04-03 19:36:13 +08:00
}
2024-06-15 02:16:36 +08:00
2024-07-16 21:04:21 +08:00
// Ask GMS for snapshot status while the source of truth is in the cloud
2024-06-19 21:20:52 +08:00
if snapshot . ShouldQueryGMS ( ) {
2024-07-16 21:04:21 +08:00
// Calculate offset based on how many results we currently have responses for
pending := snapshot . StatsRollup . CountsByStatus [ cloudmigration . ItemStatusPending ]
snapshotMeta , err := s . gmsClient . GetSnapshotStatus ( ctx , * session , * snapshot , snapshot . StatsRollup . Total - pending )
2024-06-19 21:20:52 +08:00
if err != nil {
2024-07-15 21:22:57 +08:00
return snapshot , fmt . Errorf ( "error fetching snapshot status from GMS: sessionUid: %s, snapshotUid: %s" , sessionUid , snapshotUid )
}
if snapshotMeta . State == cloudmigration . SnapshotStateUnknown {
// If a status from Grafana Migration Service is unavailable, return the snapshot as-is
return snapshot , nil
2024-06-19 21:20:52 +08:00
}
2024-04-03 19:36:13 +08:00
2024-07-15 21:22:57 +08:00
localStatus , ok := gmsStateToLocalStatus [ snapshotMeta . State ]
if ! ok {
s . log . Error ( "unexpected GMS snapshot state: %s" , snapshotMeta . State )
return snapshot , nil
}
// We need to update the snapshot in our db before reporting anything
if err := s . store . UpdateSnapshot ( ctx , cloudmigration . UpdateSnapshotCmd {
UID : snapshot . UID ,
Status : localStatus ,
Resources : snapshotMeta . Results ,
} ) ; err != nil {
return nil , fmt . Errorf ( "error updating snapshot status: %w" , err )
2024-06-19 21:20:52 +08:00
}
2024-07-15 21:22:57 +08:00
snapshot . Status = localStatus
snapshot . Resources = append ( snapshot . Resources , snapshotMeta . Results ... )
2024-04-03 19:36:13 +08:00
}
2024-06-19 21:20:52 +08:00
return snapshot , nil
2024-03-25 20:30:47 +08:00
}
2024-07-15 21:22:57 +08:00
var gmsStateToLocalStatus map [ cloudmigration . SnapshotState ] cloudmigration . SnapshotStatus = map [ cloudmigration . SnapshotState ] cloudmigration . SnapshotStatus {
cloudmigration . SnapshotStateInitialized : cloudmigration . SnapshotStatusPendingProcessing , // GMS has not yet received a notification for the data
cloudmigration . SnapshotStateProcessing : cloudmigration . SnapshotStatusProcessing , // GMS has received a notification and is migrating the data
cloudmigration . SnapshotStateFinished : cloudmigration . SnapshotStatusFinished , // GMS has completed the migration - all resources were attempted to be migrated
cloudmigration . SnapshotStateCanceled : cloudmigration . SnapshotStatusCanceled , // GMS has processed a cancelation request. Snapshot cancelation is not supported yet.
cloudmigration . SnapshotStateError : cloudmigration . SnapshotStatusError , // Something unrecoverable has occurred in the migration process.
}
2024-06-19 21:20:52 +08:00
func ( s * Service ) GetSnapshotList ( ctx context . Context , query cloudmigration . ListSnapshotsQuery ) ( [ ] cloudmigration . CloudMigrationSnapshot , error ) {
ctx , span := s . tracer . Start ( ctx , "CloudMigrationService.GetSnapshotList" )
defer span . End ( )
snapshotList , err := s . store . GetSnapshotList ( ctx , query )
2024-04-03 19:36:13 +08:00
if err != nil {
2024-06-19 21:20:52 +08:00
return nil , fmt . Errorf ( "fetching snapshots for session uid %s: %w" , query . SessionUID , err )
2024-04-03 19:36:13 +08:00
}
2024-06-19 21:20:52 +08:00
return snapshotList , nil
2024-03-25 20:30:47 +08:00
}
2024-06-19 21:20:52 +08:00
func ( s * Service ) UploadSnapshot ( ctx context . Context , sessionUid string , snapshotUid string ) error {
2024-07-11 20:32:02 +08:00
ctx , span := s . tracer . Start ( ctx , "CloudMigrationService.UploadSnapshot" ,
trace . WithAttributes (
attribute . String ( "sessionUid" , sessionUid ) ,
attribute . String ( "snapshotUid" , snapshotUid ) ,
) ,
)
2024-06-19 21:20:52 +08:00
defer span . End ( )
2024-07-11 20:32:02 +08:00
// fetch session for the gms auth token
session , err := s . store . GetMigrationSessionByUID ( ctx , sessionUid )
if err != nil {
return fmt . Errorf ( "fetching migration session for uid %s: %w" , sessionUid , err )
}
2024-06-25 11:50:07 +08:00
snapshot , err := s . GetSnapshot ( ctx , cloudmigration . GetSnapshotsQuery {
SnapshotUID : snapshotUid ,
SessionUID : sessionUid ,
} )
2024-04-03 19:36:13 +08:00
if err != nil {
2024-06-19 21:20:52 +08:00
return fmt . Errorf ( "fetching snapshot with uid %s: %w" , snapshotUid , err )
2024-04-03 19:36:13 +08:00
}
2024-04-18 03:43:09 +08:00
2024-07-17 23:53:21 +08:00
uploadUrl , err := s . gmsClient . CreatePresignedUploadUrl ( ctx , * session , * snapshot )
if err != nil {
return fmt . Errorf ( "creating presigned upload url for snapshot %s: %w" , snapshotUid , err )
}
s . log . Info ( "Uploading snapshot in local directory" , "gmsSnapshotUID" , snapshot . GMSSnapshotUID , "localDir" , snapshot . LocalDir , "uploadURL" , uploadUrl )
2024-04-18 03:43:09 +08:00
2024-06-19 21:20:52 +08:00
// start uploading the snapshot asynchronously while we return a success response to the client
2024-07-11 20:32:02 +08:00
go func ( ) {
2024-07-19 00:55:27 +08:00
s . cancelMutex . Lock ( )
defer func ( ) {
s . cancelFunc = nil
s . cancelMutex . Unlock ( )
} ( )
ctx , cancelFunc := context . WithCancel ( context . Background ( ) )
s . cancelFunc = cancelFunc
2024-07-20 12:02:31 +08:00
s . report ( ctx , session , gmsclient . EventStartUploadingSnapshot , 0 , nil )
start := time . Now ( )
err := s . uploadSnapshot ( ctx , session , snapshot , uploadUrl )
if err != nil {
2024-07-18 23:34:28 +08:00
s . log . Error ( "uploading snapshot" , "err" , err . Error ( ) )
// Update status to error with retries
2024-07-19 00:55:27 +08:00
if err := s . updateSnapshotWithRetries ( context . Background ( ) , cloudmigration . UpdateSnapshotCmd {
2024-07-18 23:34:28 +08:00
UID : snapshot . UID ,
Status : cloudmigration . SnapshotStatusError ,
} ) ; err != nil {
s . log . Error ( "critical failure during snapshot upload - please report any error logs" )
}
2024-07-11 20:32:02 +08:00
}
2024-07-20 12:02:31 +08:00
s . report ( ctx , session , gmsclient . EventDoneUploadingSnapshot , time . Since ( start ) , err )
2024-07-11 20:32:02 +08:00
} ( )
2024-03-25 20:30:47 +08:00
2024-06-19 21:20:52 +08:00
return nil
2024-03-25 20:30:47 +08:00
}
2024-07-19 00:55:27 +08:00
func ( s * Service ) CancelSnapshot ( ctx context . Context , sessionUid string , snapshotUid string ) ( err error ) {
// The cancel func itself is protected by a mutex in the async threads, so it may or may not be set by the time CancelSnapshot is called
// Attempt to cancel and recover from the panic if the cancel function is nil
defer func ( ) {
if r := recover ( ) ; r != nil {
err = fmt . Errorf ( "nothing to cancel" )
}
} ( )
s . cancelFunc ( )
// Canceling will ensure that any goroutines holding the lock finish and release the lock
s . cancelMutex . Lock ( )
defer s . cancelMutex . Unlock ( )
s . cancelFunc = nil
if err := s . updateSnapshotWithRetries ( ctx , cloudmigration . UpdateSnapshotCmd {
UID : snapshotUid ,
Status : cloudmigration . SnapshotStatusCanceled ,
} ) ; err != nil {
s . log . Error ( "critical failure during snapshot cancelation - please report any error logs" )
}
s . log . Info ( "canceled snapshot" , "sessionUid" , sessionUid , "snapshotUid" , snapshotUid )
return nil
2024-06-21 21:35:15 +08:00
}
2024-07-20 12:02:31 +08:00
func ( s * Service ) report (
ctx context . Context ,
sess * cloudmigration . CloudMigrationSession ,
t gmsclient . LocalEventType ,
d time . Duration ,
evtErr error ,
) {
id , err := s . getLocalEventId ( ctx )
if err != nil {
s . log . Error ( "failed to report event" , "type" , t , "error" , err . Error ( ) )
return
}
e := gmsclient . EventRequestDTO {
Event : t ,
LocalID : id ,
}
if d != 0 {
e . DurationIfFinished = d
}
if evtErr != nil {
e . Error = evtErr . Error ( )
}
s . gmsClient . ReportEvent ( ctx , * sess , e )
}
func ( s * Service ) getLocalEventId ( ctx context . Context ) ( string , error ) {
anonId , ok , err := s . kvStore . Get ( ctx , "anonymous_id" )
if err != nil {
return "" , fmt . Errorf ( "failed to get usage stats id: %w" , err )
}
if ok {
return anonId , nil
}
anonId = uuid . NewString ( )
err = s . kvStore . Set ( ctx , "anonymous_id" , anonId )
if err != nil {
s . log . Error ( "Failed to store usage stats id" , "error" , err )
return "" , fmt . Errorf ( "failed to store usage stats id: %w" , err )
}
return anonId , nil
}