Outbox: Migrate storage to sql templates

This commit is contained in:
Matheus Macabu 2025-04-28 21:45:46 +02:00
parent 7bd1c4ef4f
commit b76711205b
26 changed files with 620 additions and 83 deletions

View File

@ -0,0 +1,31 @@
INSERT INTO {{ .Ident "secret_secure_value_outbox" }} (
{{ .Ident "uid" }},
{{ .Ident "message_type" }},
{{ .Ident "name" }},
{{ .Ident "namespace" }},
{{ if .Row.EncryptedSecret.Valid }}
{{ .Ident "encrypted_secret" }},
{{ end }}
{{ if .Row.KeeperName.Valid }}
{{ .Ident "keeper_name" }},
{{ end }}
{{ if .Row.ExternalID.Valid }}
{{ .Ident "external_id" }},
{{ end }}
{{ .Ident "created" }}
) VALUES (
{{ .Arg .Row.MessageID }},
{{ .Arg .Row.MessageType }},
{{ .Arg .Row.Name }},
{{ .Arg .Row.Namespace }},
{{ if .Row.EncryptedSecret.Valid }}
{{ .Arg .Row.EncryptedSecret.String }},
{{ end }}
{{ if .Row.KeeperName.Valid }}
{{ .Arg .Row.KeeperName.String }},
{{ end }}
{{ if .Row.ExternalID.Valid }}
{{ .Arg .Row.ExternalID.String }},
{{ end }}
{{ .Arg .Row.Created }}
);

View File

@ -0,0 +1,5 @@
DELETE FROM
{{ .Ident "secret_secure_value_outbox" }}
WHERE
{{ .Ident "uid" }} = {{ .Arg .MessageID }}
;

View File

@ -0,0 +1,17 @@
SELECT
{{ .Ident "uid" }},
{{ .Ident "message_type" }},
{{ .Ident "name" }},
{{ .Ident "namespace" }},
{{ .Ident "encrypted_secret" }},
{{ .Ident "keeper_name" }},
{{ .Ident "external_id" }},
{{ .Ident "created" }}
FROM
{{ .Ident "secret_secure_value_outbox" }}
ORDER BY
{{ .Ident "created" }} ASC
LIMIT
{{ .Arg .ReceiveLimit }}
{{ .SelectFor "UPDATE SKIP LOCKED" }}
;

View File

@ -2,112 +2,200 @@ package metadata
import (
"context"
"database/sql"
"fmt"
"time"
"github.com/google/uuid"
"github.com/grafana/grafana/pkg/apis/secret/v0alpha1"
"github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/registry/apis/secret/assert"
"github.com/grafana/grafana/pkg/registry/apis/secret/contracts"
"github.com/grafana/grafana/pkg/services/sqlstore"
"github.com/grafana/grafana/pkg/storage/secret/migrator"
"github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate"
)
// Implements contracts.OutboxQueue
type outboxStore struct {
db db.DB
db contracts.Database
dialect sqltemplate.Dialect
}
func ProvideOutboxQueue(db db.DB) contracts.OutboxQueue {
return &outboxStore{db: db}
func ProvideOutboxQueue(db contracts.Database) contracts.OutboxQueue {
return &outboxStore{
db: db,
dialect: sqltemplate.DialectForDriver(db.DriverName()),
}
}
type outboxMessageDB struct {
MessageID string `xorm:"pk 'uid'"`
MessageType contracts.OutboxMessageType `xorm:"message_type"`
Name string `xorm:"name"`
Namespace string `xorm:"namespace"`
EncryptedSecret string `xorm:"encrypted_secret"`
KeeperName *string `xorm:"keeper_name"`
ExternalID *string `xorm:"external_id"`
Created int64 `xorm:"created"`
}
func (*outboxMessageDB) TableName() string {
return migrator.TableNameSecureValueOutbox
MessageID string
MessageType contracts.OutboxMessageType
Name string
Namespace string
EncryptedSecret sql.NullString
KeeperName sql.NullString
ExternalID sql.NullString
Created int64
}
func (s *outboxStore) Append(ctx context.Context, input contracts.AppendOutboxMessage) (string, error) {
assert.True(input.Type != "", "outboxStore.Append: outbox message type is required")
var messageID string
err := s.db.InTransaction(ctx, func(ctx context.Context) error {
return s.db.WithDbSession(ctx, func(sess *sqlstore.DBSession) error {
outboxDB := outboxMessageDB{
MessageID: uuid.New().String(),
MessageType: input.Type,
Name: input.Name,
Namespace: input.Namespace,
KeeperName: input.KeeperName,
ExternalID: input.ExternalID,
Created: time.Now().UTC().UnixMilli(),
}
if input.Type == contracts.CreateSecretOutboxMessage || input.Type == contracts.UpdateSecretOutboxMessage {
outboxDB.EncryptedSecret = input.EncryptedSecret.DangerouslyExposeAndConsumeValue()
}
_, err := sess.Table(migrator.TableNameSecureValueOutbox).Insert(outboxDB)
if err != nil {
return fmt.Errorf("inserting message into secure value outbox table: %+w", err)
}
keeperName := sql.NullString{}
if input.KeeperName != nil {
keeperName = sql.NullString{
Valid: true,
String: *input.KeeperName,
}
}
messageID = outboxDB.MessageID
return nil
})
})
return messageID, err
externalID := sql.NullString{}
if input.ExternalID != nil {
externalID = sql.NullString{
Valid: true,
String: *input.ExternalID,
}
}
encryptedSecret := sql.NullString{}
if input.Type == contracts.CreateSecretOutboxMessage || input.Type == contracts.UpdateSecretOutboxMessage {
encryptedSecret = sql.NullString{
Valid: true,
// TODO: this type does not need to be exposed when encrypted (maybe []byte or string)
String: input.EncryptedSecret.DangerouslyExposeAndConsumeValue(),
}
}
messageID := uuid.New().String()
req := appendSecureValueOutbox{
SQLTemplate: sqltemplate.New(s.dialect),
Row: &outboxMessageDB{
MessageID: messageID,
MessageType: input.Type,
Name: input.Name,
Namespace: input.Namespace,
EncryptedSecret: encryptedSecret,
KeeperName: keeperName,
ExternalID: externalID,
Created: time.Now().UTC().UnixMilli(),
},
}
query, err := sqltemplate.Execute(sqlSecureValueOutboxAppend, req)
if err != nil {
return "", fmt.Errorf("execute template %q: %w", sqlSecureValueOutboxAppend.Name(), err)
}
result, err := s.db.ExecContext(ctx, query, req.GetArgs()...)
if err != nil {
return "", fmt.Errorf("inserting message into secure value outbox table: %w", err)
}
rowsAffected, err := result.RowsAffected()
if err != nil {
return "", fmt.Errorf("get rows affected: %w", err)
}
if rowsAffected != 1 {
return "", fmt.Errorf("expected to affect 1 row, but affected %d", rowsAffected)
}
return messageID, nil
}
func (s *outboxStore) ReceiveN(ctx context.Context, n uint) ([]contracts.OutboxMessage, error) {
req := receiveNSecureValueOutbox{
SQLTemplate: sqltemplate.New(s.dialect),
ReceiveLimit: n,
}
query, err := sqltemplate.Execute(sqlSecureValueOutboxReceiveN, req)
if err != nil {
return nil, fmt.Errorf("execute template %q: %w", sqlSecureValueOutboxReceiveN.Name(), err)
}
rows, err := s.db.QueryContext(ctx, query, req.GetArgs()...)
if err != nil {
return nil, fmt.Errorf("fetching rows from secure value outbox table: %w", err)
}
defer func() { _ = rows.Close() }()
messages := make([]contracts.OutboxMessage, 0)
err := s.db.InTransaction(ctx, func(ctx context.Context) error {
return s.db.WithDbSession(ctx, func(sess *sqlstore.DBSession) error {
rows := make([]outboxMessageDB, 0)
// TODO: skip locked rows
if err := sess.Table(migrator.TableNameSecureValueOutbox).ForUpdate().OrderBy("secret_secure_value_outbox.created ASC").Limit(int(n)).Find(&rows); err != nil {
return fmt.Errorf("fetching rows from secure value outbox table: %w", err)
}
for _, row := range rows {
msg := contracts.OutboxMessage{
Type: row.MessageType,
MessageID: row.MessageID,
Name: row.Name,
Namespace: row.Namespace,
KeeperName: row.KeeperName,
ExternalID: row.ExternalID,
}
if row.MessageType != contracts.DeleteSecretOutboxMessage {
msg.EncryptedSecret = v0alpha1.ExposedSecureValue(row.EncryptedSecret)
}
messages = append(messages, msg)
}
return nil
})
})
return messages, err
for rows.Next() {
var row outboxMessageDB
if err := rows.Scan(
&row.MessageID,
&row.MessageType,
&row.Name,
&row.Namespace,
&row.EncryptedSecret,
&row.KeeperName,
&row.ExternalID,
&row.Created,
); err != nil {
return nil, fmt.Errorf("scanning row from secure value outbox table: %w", err)
}
var keeperName *string
if row.KeeperName.Valid {
keeperName = &row.KeeperName.String
}
var externalID *string
if row.ExternalID.Valid {
externalID = &row.ExternalID.String
}
msg := contracts.OutboxMessage{
Type: row.MessageType,
MessageID: row.MessageID,
Name: row.Name,
Namespace: row.Namespace,
KeeperName: keeperName,
ExternalID: externalID,
}
if row.MessageType != contracts.DeleteSecretOutboxMessage && row.EncryptedSecret.Valid {
// TODO: dont do this because it is encrypted!
msg.EncryptedSecret = v0alpha1.ExposedSecureValue(row.EncryptedSecret.String)
}
messages = append(messages, msg)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("iterating over rows: %w", err)
}
return messages, nil
}
func (s *outboxStore) Delete(ctx context.Context, messageID string) error {
return s.db.InTransaction(ctx, func(ctx context.Context) error {
return s.db.WithDbSession(ctx, func(sess *sqlstore.DBSession) error {
deleted, err := sess.Delete(&outboxMessageDB{MessageID: messageID})
if err != nil {
return fmt.Errorf("deleting message from outbox table: messageID=%+v %w", messageID, err)
}
if deleted > 1 {
return fmt.Errorf("bug: deleted more than one row from the outbox table, should delete only one at a time: deleted=%+v", deleted)
}
return nil
})
})
assert.True(messageID != "", "outboxStore.Delete: messageID is required")
req := deleteSecureValueOutbox{
SQLTemplate: sqltemplate.New(s.dialect),
MessageID: messageID,
}
query, err := sqltemplate.Execute(sqlSecureValueOutboxDelete, req)
if err != nil {
return fmt.Errorf("execute template %q: %w", sqlSecureValueOutboxDelete.Name(), err)
}
result, err := s.db.ExecContext(ctx, query, req.GetArgs()...)
if err != nil {
return fmt.Errorf("deleting message id=%v from secure value outbox table: %w", messageID, err)
}
rowsAffected, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("get rows affected: %w", err)
}
if rowsAffected != 1 {
return fmt.Errorf("bug: deleted more than one row from the outbox table, should delete only one at a time: deleted=%v", rowsAffected)
}
return nil
}

View File

@ -11,6 +11,7 @@ import (
secretv0alpha1 "github.com/grafana/grafana/pkg/apis/secret/v0alpha1"
"github.com/grafana/grafana/pkg/registry/apis/secret/contracts"
"github.com/grafana/grafana/pkg/services/sqlstore"
"github.com/grafana/grafana/pkg/storage/secret/database"
"github.com/grafana/grafana/pkg/storage/secret/migrator"
"github.com/stretchr/testify/require"
)
@ -109,7 +110,7 @@ func TestOutboxStore(t *testing.T) {
ctx := context.Background()
outbox := ProvideOutboxQueue(testDB)
outbox := ProvideOutboxQueue(database.ProvideDatabase(testDB))
m1 := contracts.AppendOutboxMessage{
Type: contracts.CreateSecretOutboxMessage,
@ -178,7 +179,7 @@ func TestOutboxStoreProperty(t *testing.T) {
testDB := sqlstore.NewTestStore(t)
require.NoError(t, migrator.MigrateSecretSQL(testDB.GetEngine(), nil))
outbox := ProvideOutboxQueue(testDB)
outbox := ProvideOutboxQueue(database.ProvideDatabase(testDB))
model := newOutboxStoreModel()

View File

@ -39,6 +39,10 @@ var (
// sqlSecureValueUpdate = mustTemplate("secure_value_update.sql")
sqlSecureValueUpdateExternalId = mustTemplate("secure_value_updateExternalId.sql")
sqlSecureValueUpdateStatus = mustTemplate("secure_value_updateStatus.sql")
sqlSecureValueOutboxAppend = mustTemplate("secure_value_outbox_append.sql")
sqlSecureValueOutboxReceiveN = mustTemplate("secure_value_outbox_receiveN.sql")
sqlSecureValueOutboxDelete = mustTemplate("secure_value_outbox_delete.sql")
)
func mustTemplate(filename string) *template.Template {
@ -222,3 +226,27 @@ type updateStatusSecureValue struct {
func (r updateStatusSecureValue) Validate() error {
return nil // TODO
}
/*************************************/
/**-- Secure Value Outbox Queries --**/
/*************************************/
type appendSecureValueOutbox struct {
sqltemplate.SQLTemplate
Row *outboxMessageDB
}
func (appendSecureValueOutbox) Validate() error { return nil }
type receiveNSecureValueOutbox struct {
sqltemplate.SQLTemplate
ReceiveLimit uint
}
func (receiveNSecureValueOutbox) Validate() error { return nil }
type deleteSecureValueOutbox struct {
sqltemplate.SQLTemplate
MessageID string
}
func (deleteSecureValueOutbox) Validate() error { return nil }

View File

@ -1,6 +1,7 @@
package metadata
import (
"database/sql"
"testing"
"text/template"
@ -233,3 +234,94 @@ func TestSecureValueQueries(t *testing.T) {
},
})
}
func TestSecureValueOutboxQueries(t *testing.T) {
mocks.CheckQuerySnapshots(t, mocks.TemplateTestSetup{
RootDir: "testdata",
Templates: map[*template.Template][]mocks.TemplateTestCase{
sqlSecureValueOutboxAppend: {
{
Name: "no-encrypted-secret",
Data: &appendSecureValueOutbox{
SQLTemplate: mocks.NewTestingSQLTemplate(),
Row: &outboxMessageDB{
MessageID: "my-uuid",
MessageType: "some-type",
Name: "name",
Namespace: "namespace",
ExternalID: sql.NullString{Valid: true, String: "external-id"},
KeeperName: sql.NullString{Valid: true, String: "keeper"},
Created: 1234,
},
},
},
{
Name: "no-external-id",
Data: &appendSecureValueOutbox{
SQLTemplate: mocks.NewTestingSQLTemplate(),
Row: &outboxMessageDB{
MessageID: "my-uuid",
MessageType: "some-type",
Name: "name",
Namespace: "namespace",
EncryptedSecret: sql.NullString{Valid: true, String: "encrypted"},
KeeperName: sql.NullString{Valid: true, String: "keeper"},
Created: 1234,
},
},
},
{
Name: "no-keeper-name",
Data: &appendSecureValueOutbox{
SQLTemplate: mocks.NewTestingSQLTemplate(),
Row: &outboxMessageDB{
MessageID: "my-uuid",
MessageType: "some-type",
Name: "name",
Namespace: "namespace",
EncryptedSecret: sql.NullString{Valid: true, String: "encrypted"},
ExternalID: sql.NullString{Valid: true, String: "external-id"},
Created: 1234,
},
},
},
{
Name: "all-fields-present",
Data: &appendSecureValueOutbox{
SQLTemplate: mocks.NewTestingSQLTemplate(),
Row: &outboxMessageDB{
MessageID: "my-uuid",
MessageType: "some-type",
Name: "name",
Namespace: "namespace",
EncryptedSecret: sql.NullString{Valid: true, String: "encrypted"},
ExternalID: sql.NullString{Valid: true, String: ""}, // can be empty string
KeeperName: sql.NullString{Valid: true, String: "keeper"},
Created: 1234,
},
},
},
},
sqlSecureValueOutboxReceiveN: {
{
Name: "basic",
Data: &receiveNSecureValueOutbox{
SQLTemplate: mocks.NewTestingSQLTemplate(),
ReceiveLimit: 10,
},
},
},
sqlSecureValueOutboxDelete: {
{
Name: "basic",
Data: &deleteSecureValueOutbox{
SQLTemplate: mocks.NewTestingSQLTemplate(),
MessageID: "my-uuid",
},
},
},
},
})
}

View File

@ -0,0 +1,19 @@
INSERT INTO `secret_secure_value_outbox` (
`uid`,
`message_type`,
`name`,
`namespace`,
`encrypted_secret`,
`keeper_name`,
`external_id`,
`created`
) VALUES (
'my-uuid',
'some-type',
'name',
'namespace',
'encrypted',
'keeper',
'',
1234
);

View File

@ -0,0 +1,17 @@
INSERT INTO `secret_secure_value_outbox` (
`uid`,
`message_type`,
`name`,
`namespace`,
`keeper_name`,
`external_id`,
`created`
) VALUES (
'my-uuid',
'some-type',
'name',
'namespace',
'keeper',
'external-id',
1234
);

View File

@ -0,0 +1,17 @@
INSERT INTO `secret_secure_value_outbox` (
`uid`,
`message_type`,
`name`,
`namespace`,
`encrypted_secret`,
`keeper_name`,
`created`
) VALUES (
'my-uuid',
'some-type',
'name',
'namespace',
'encrypted',
'keeper',
1234
);

View File

@ -0,0 +1,17 @@
INSERT INTO `secret_secure_value_outbox` (
`uid`,
`message_type`,
`name`,
`namespace`,
`encrypted_secret`,
`external_id`,
`created`
) VALUES (
'my-uuid',
'some-type',
'name',
'namespace',
'encrypted',
'external-id',
1234
);

View File

@ -0,0 +1,5 @@
DELETE FROM
`secret_secure_value_outbox`
WHERE
`uid` = 'my-uuid'
;

View File

@ -0,0 +1,17 @@
SELECT
`uid`,
`message_type`,
`name`,
`namespace`,
`encrypted_secret`,
`keeper_name`,
`external_id`,
`created`
FROM
`secret_secure_value_outbox`
ORDER BY
`created` ASC
LIMIT
10
FOR UPDATE SKIP LOCKED
;

View File

@ -0,0 +1,19 @@
INSERT INTO "secret_secure_value_outbox" (
"uid",
"message_type",
"name",
"namespace",
"encrypted_secret",
"keeper_name",
"external_id",
"created"
) VALUES (
'my-uuid',
'some-type',
'name',
'namespace',
'encrypted',
'keeper',
'',
1234
);

View File

@ -0,0 +1,17 @@
INSERT INTO "secret_secure_value_outbox" (
"uid",
"message_type",
"name",
"namespace",
"keeper_name",
"external_id",
"created"
) VALUES (
'my-uuid',
'some-type',
'name',
'namespace',
'keeper',
'external-id',
1234
);

View File

@ -0,0 +1,17 @@
INSERT INTO "secret_secure_value_outbox" (
"uid",
"message_type",
"name",
"namespace",
"encrypted_secret",
"keeper_name",
"created"
) VALUES (
'my-uuid',
'some-type',
'name',
'namespace',
'encrypted',
'keeper',
1234
);

View File

@ -0,0 +1,17 @@
INSERT INTO "secret_secure_value_outbox" (
"uid",
"message_type",
"name",
"namespace",
"encrypted_secret",
"external_id",
"created"
) VALUES (
'my-uuid',
'some-type',
'name',
'namespace',
'encrypted',
'external-id',
1234
);

View File

@ -0,0 +1,5 @@
DELETE FROM
"secret_secure_value_outbox"
WHERE
"uid" = 'my-uuid'
;

View File

@ -0,0 +1,17 @@
SELECT
"uid",
"message_type",
"name",
"namespace",
"encrypted_secret",
"keeper_name",
"external_id",
"created"
FROM
"secret_secure_value_outbox"
ORDER BY
"created" ASC
LIMIT
10
FOR UPDATE SKIP LOCKED
;

View File

@ -0,0 +1,19 @@
INSERT INTO "secret_secure_value_outbox" (
"uid",
"message_type",
"name",
"namespace",
"encrypted_secret",
"keeper_name",
"external_id",
"created"
) VALUES (
'my-uuid',
'some-type',
'name',
'namespace',
'encrypted',
'keeper',
'',
1234
);

View File

@ -0,0 +1,17 @@
INSERT INTO "secret_secure_value_outbox" (
"uid",
"message_type",
"name",
"namespace",
"keeper_name",
"external_id",
"created"
) VALUES (
'my-uuid',
'some-type',
'name',
'namespace',
'keeper',
'external-id',
1234
);

View File

@ -0,0 +1,17 @@
INSERT INTO "secret_secure_value_outbox" (
"uid",
"message_type",
"name",
"namespace",
"encrypted_secret",
"keeper_name",
"created"
) VALUES (
'my-uuid',
'some-type',
'name',
'namespace',
'encrypted',
'keeper',
1234
);

View File

@ -0,0 +1,17 @@
INSERT INTO "secret_secure_value_outbox" (
"uid",
"message_type",
"name",
"namespace",
"encrypted_secret",
"external_id",
"created"
) VALUES (
'my-uuid',
'some-type',
'name',
'namespace',
'encrypted',
'external-id',
1234
);

View File

@ -0,0 +1,5 @@
DELETE FROM
"secret_secure_value_outbox"
WHERE
"uid" = 'my-uuid'
;

View File

@ -0,0 +1,16 @@
SELECT
"uid",
"message_type",
"name",
"namespace",
"encrypted_secret",
"keeper_name",
"external_id",
"created"
FROM
"secret_secure_value_outbox"
ORDER BY
"created" ASC
LIMIT
10
;

View File

@ -124,7 +124,7 @@ func initSecretStore(mg *migrator.Migrator) string {
{Name: "message_type", Type: migrator.DB_NVarchar, Length: 16, Nullable: false},
{Name: "name", Type: migrator.DB_NVarchar, Length: 253, Nullable: false}, // Limit enforced by K8s.
{Name: "namespace", Type: migrator.DB_NVarchar, Length: 253, Nullable: false}, // Limit enforced by K8s.
{Name: "encrypted_secret", Type: migrator.DB_Blob, Nullable: false},
{Name: "encrypted_secret", Type: migrator.DB_Blob, Nullable: true},
{Name: "keeper_name", Type: migrator.DB_NVarchar, Length: 253, Nullable: true}, // Keeper name, if not set, use default keeper.
{Name: "external_id", Type: migrator.DB_NVarchar, Length: 36, Nullable: true}, // Fixed size of a UUID.
{Name: "created", Type: migrator.DB_BigInt, Nullable: false},