mirror of https://github.com/grafana/grafana.git
				
				
				
			
		
			
				
	
	
		
			344 lines
		
	
	
		
			8.7 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			344 lines
		
	
	
		
			8.7 KiB
		
	
	
	
		
			Go
		
	
	
	
| package sql
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"fmt"
 | |
| 	"net/http"
 | |
| 	"os"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/google/uuid"
 | |
| 	apierrors "k8s.io/apimachinery/pkg/api/errors"
 | |
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | |
| 	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
 | |
| 
 | |
| 	"github.com/grafana/grafana-app-sdk/logging"
 | |
| 
 | |
| 	"github.com/grafana/grafana/pkg/apimachinery/utils"
 | |
| 	"github.com/grafana/grafana/pkg/storage/unified/parquet"
 | |
| 	"github.com/grafana/grafana/pkg/storage/unified/resource"
 | |
| 	"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
 | |
| 	"github.com/grafana/grafana/pkg/storage/unified/sql/db"
 | |
| 	"github.com/grafana/grafana/pkg/storage/unified/sql/dbutil"
 | |
| 	"github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate"
 | |
| )
 | |
| 
 | |
| var (
 | |
| 	_ resource.BulkProcessingBackend = (*backend)(nil)
 | |
| )
 | |
| 
 | |
| type bulkRV struct {
 | |
| 	max     int64
 | |
| 	counter int64
 | |
| }
 | |
| 
 | |
| // When executing a bulk import we can fake the RV values
 | |
| func newBulkRV() *bulkRV {
 | |
| 	t := time.Now().Truncate(time.Second * 10)
 | |
| 	return &bulkRV{
 | |
| 		max:     (t.UnixMicro() / 10000000) * 10000000,
 | |
| 		counter: 0,
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (x *bulkRV) next(obj metav1.Object) int64 {
 | |
| 	ts := obj.GetCreationTimestamp().UnixMicro()
 | |
| 	anno := obj.GetAnnotations()
 | |
| 	if anno != nil {
 | |
| 		v := anno[utils.AnnoKeyUpdatedTimestamp]
 | |
| 		t, err := time.Parse(time.RFC3339, v)
 | |
| 		if err == nil {
 | |
| 			ts = t.UnixMicro()
 | |
| 		}
 | |
| 	}
 | |
| 	if ts > x.max || ts < 10000000 {
 | |
| 		ts = x.max
 | |
| 	}
 | |
| 	x.counter++
 | |
| 	return (ts/10000000)*10000000 + x.counter
 | |
| }
 | |
| 
 | |
| type bulkLock struct {
 | |
| 	running map[string]bool
 | |
| 	mu      sync.Mutex
 | |
| }
 | |
| 
 | |
| func (x *bulkLock) Start(keys []*resourcepb.ResourceKey) error {
 | |
| 	x.mu.Lock()
 | |
| 	defer x.mu.Unlock()
 | |
| 
 | |
| 	// First verify that it is not already running
 | |
| 	ids := make([]string, len(keys))
 | |
| 	for i, k := range keys {
 | |
| 		id := resource.NSGR(k)
 | |
| 		if x.running[id] {
 | |
| 			return &apierrors.StatusError{ErrStatus: metav1.Status{
 | |
| 				Code:    http.StatusPreconditionFailed,
 | |
| 				Message: "bulk export is already running",
 | |
| 			}}
 | |
| 		}
 | |
| 		ids[i] = id
 | |
| 	}
 | |
| 
 | |
| 	// Then add the keys to the lock
 | |
| 	for _, k := range ids {
 | |
| 		x.running[k] = true
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (x *bulkLock) Finish(keys []*resourcepb.ResourceKey) {
 | |
| 	x.mu.Lock()
 | |
| 	defer x.mu.Unlock()
 | |
| 	for _, k := range keys {
 | |
| 		delete(x.running, resource.NSGR(k))
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (x *bulkLock) Active() bool {
 | |
| 	x.mu.Lock()
 | |
| 	defer x.mu.Unlock()
 | |
| 	return len(x.running) > 0
 | |
| }
 | |
| 
 | |
| func (b *backend) ProcessBulk(ctx context.Context, setting resource.BulkSettings, iter resource.BulkRequestIterator) *resourcepb.BulkResponse {
 | |
| 	err := b.bulkLock.Start(setting.Collection)
 | |
| 	if err != nil {
 | |
| 		return &resourcepb.BulkResponse{
 | |
| 			Error: resource.AsErrorResult(err),
 | |
| 		}
 | |
| 	}
 | |
| 	defer b.bulkLock.Finish(setting.Collection)
 | |
| 
 | |
| 	// We may want to first write parquet, then read parquet
 | |
| 	if b.dialect.DialectName() == "sqlite" {
 | |
| 		file, err := os.CreateTemp("", "grafana-bulk-export-*.parquet")
 | |
| 		if err != nil {
 | |
| 			return &resourcepb.BulkResponse{
 | |
| 				Error: resource.AsErrorResult(err),
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		writer, err := parquet.NewParquetWriter(file)
 | |
| 		if err != nil {
 | |
| 			return &resourcepb.BulkResponse{
 | |
| 				Error: resource.AsErrorResult(err),
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		// write bulk to parquet
 | |
| 		rsp := writer.ProcessBulk(ctx, setting, iter)
 | |
| 		if rsp.Error != nil {
 | |
| 			return rsp
 | |
| 		}
 | |
| 
 | |
| 		b.log.Info("using parquet buffer", "parquet", file)
 | |
| 
 | |
| 		// Replace the iterator with one from parquet
 | |
| 		iter, err = parquet.NewParquetReader(file.Name(), 50)
 | |
| 		if err != nil {
 | |
| 			return &resourcepb.BulkResponse{
 | |
| 				Error: resource.AsErrorResult(err),
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return b.processBulk(ctx, setting, iter)
 | |
| }
 | |
| 
 | |
| // internal bulk process
 | |
| func (b *backend) processBulk(ctx context.Context, setting resource.BulkSettings, iter resource.BulkRequestIterator) *resourcepb.BulkResponse {
 | |
| 	rsp := &resourcepb.BulkResponse{}
 | |
| 	err := b.db.WithTx(ctx, ReadCommitted, func(ctx context.Context, tx db.Tx) error {
 | |
| 		rollbackWithError := func(err error) error {
 | |
| 			txerr := tx.Rollback()
 | |
| 			if txerr != nil {
 | |
| 				b.log.Warn("rollback", "error", txerr)
 | |
| 			} else {
 | |
| 				b.log.Info("rollback")
 | |
| 			}
 | |
| 			return err
 | |
| 		}
 | |
| 		bulk := &bulkWroker{
 | |
| 			ctx:     ctx,
 | |
| 			tx:      tx,
 | |
| 			dialect: b.dialect,
 | |
| 			logger:  logging.FromContext(ctx),
 | |
| 		}
 | |
| 
 | |
| 		// Calculate the RV based on incoming request timestamps
 | |
| 		rv := newBulkRV()
 | |
| 
 | |
| 		summaries := make(map[string]*resourcepb.BulkResponse_Summary, len(setting.Collection)*4)
 | |
| 
 | |
| 		// First clear everything in the transaction
 | |
| 		if setting.RebuildCollection {
 | |
| 			for _, key := range setting.Collection {
 | |
| 				summary, err := bulk.deleteCollection(key)
 | |
| 				if err != nil {
 | |
| 					return rollbackWithError(err)
 | |
| 				}
 | |
| 				summaries[resource.NSGR(key)] = summary
 | |
| 				rsp.Summary = append(rsp.Summary, summary)
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		obj := &unstructured.Unstructured{}
 | |
| 
 | |
| 		// Write each event into the history
 | |
| 		for iter.Next() {
 | |
| 			if iter.RollbackRequested() {
 | |
| 				return rollbackWithError(nil)
 | |
| 			}
 | |
| 			req := iter.Request()
 | |
| 			if req == nil {
 | |
| 				return rollbackWithError(fmt.Errorf("missing request"))
 | |
| 			}
 | |
| 			rsp.Processed++
 | |
| 
 | |
| 			if req.Action == resourcepb.BulkRequest_UNKNOWN {
 | |
| 				rsp.Rejected = append(rsp.Rejected, &resourcepb.BulkResponse_Rejected{
 | |
| 					Key:    req.Key,
 | |
| 					Action: req.Action,
 | |
| 					Error:  "unknown action",
 | |
| 				})
 | |
| 				continue
 | |
| 			}
 | |
| 
 | |
| 			err := obj.UnmarshalJSON(req.Value)
 | |
| 			if err != nil {
 | |
| 				rsp.Rejected = append(rsp.Rejected, &resourcepb.BulkResponse_Rejected{
 | |
| 					Key:    req.Key,
 | |
| 					Action: req.Action,
 | |
| 					Error:  "unable to unmarshal json",
 | |
| 				})
 | |
| 				continue
 | |
| 			}
 | |
| 
 | |
| 			// Write the event to history
 | |
| 			if _, err := dbutil.Exec(ctx, tx, sqlResourceHistoryInsert, sqlResourceRequest{
 | |
| 				SQLTemplate: sqltemplate.New(b.dialect),
 | |
| 				WriteEvent: resource.WriteEvent{
 | |
| 					Key:        req.Key,
 | |
| 					Type:       resourcepb.WatchEvent_Type(req.Action),
 | |
| 					Value:      req.Value,
 | |
| 					PreviousRV: -1, // Used for WATCH, but we want to skip watch events
 | |
| 				},
 | |
| 				Folder:          req.Folder,
 | |
| 				GUID:            uuid.New().String(),
 | |
| 				ResourceVersion: rv.next(obj),
 | |
| 			}); err != nil {
 | |
| 				return rollbackWithError(fmt.Errorf("insert into resource history: %w", err))
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		// Now update the resource table from history
 | |
| 		for _, key := range setting.Collection {
 | |
| 			k := fmt.Sprintf("%s/%s/%s", key.Namespace, key.Group, key.Resource)
 | |
| 			summary := summaries[k]
 | |
| 			if summary == nil {
 | |
| 				return rollbackWithError(fmt.Errorf("missing summary key for: %s", k))
 | |
| 			}
 | |
| 
 | |
| 			err := bulk.syncCollection(key, summary)
 | |
| 			if err != nil {
 | |
| 				return err
 | |
| 			}
 | |
| 
 | |
| 			// Make sure the collection RV is above our last written event
 | |
| 			_, err = b.rvManager.ExecWithRV(ctx, key, func(tx db.Tx) (string, error) {
 | |
| 				return "", nil
 | |
| 			})
 | |
| 			if err != nil {
 | |
| 				b.log.Warn("error increasing RV", "error", err)
 | |
| 			}
 | |
| 		}
 | |
| 		return nil
 | |
| 	})
 | |
| 	if err != nil {
 | |
| 		rsp.Error = resource.AsErrorResult(err)
 | |
| 	}
 | |
| 	return rsp
 | |
| }
 | |
| 
 | |
| type bulkWroker struct {
 | |
| 	ctx     context.Context
 | |
| 	tx      db.ContextExecer
 | |
| 	dialect sqltemplate.Dialect
 | |
| 	logger  logging.Logger
 | |
| }
 | |
| 
 | |
| // This will remove everything from the `resource` and `resource_history` table for a given namespace/group/resource
 | |
| func (w *bulkWroker) deleteCollection(key *resourcepb.ResourceKey) (*resourcepb.BulkResponse_Summary, error) {
 | |
| 	summary := &resourcepb.BulkResponse_Summary{
 | |
| 		Namespace: key.Namespace,
 | |
| 		Group:     key.Group,
 | |
| 		Resource:  key.Resource,
 | |
| 	}
 | |
| 
 | |
| 	// First delete history
 | |
| 	res, err := dbutil.Exec(w.ctx, w.tx, sqlResourceHistoryDelete, &sqlResourceHistoryDeleteRequest{
 | |
| 		SQLTemplate: sqltemplate.New(w.dialect),
 | |
| 		Namespace:   key.Namespace,
 | |
| 		Group:       key.Group,
 | |
| 		Resource:    key.Resource,
 | |
| 	})
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	summary.PreviousHistory, err = res.RowsAffected()
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	// Next delete the active resource table
 | |
| 	res, err = dbutil.Exec(w.ctx, w.tx, sqlResourceDelete, &sqlResourceRequest{
 | |
| 		SQLTemplate: sqltemplate.New(w.dialect),
 | |
| 		WriteEvent: resource.WriteEvent{
 | |
| 			Key: key,
 | |
| 		},
 | |
| 	})
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	summary.PreviousCount, err = res.RowsAffected()
 | |
| 	return summary, err
 | |
| }
 | |
| 
 | |
| // Copy the latest value from history into the active resource table
 | |
| func (w *bulkWroker) syncCollection(key *resourcepb.ResourceKey, summary *resourcepb.BulkResponse_Summary) error {
 | |
| 	w.logger.Info("synchronize collection", "key", resource.NSGR(key))
 | |
| 	_, err := dbutil.Exec(w.ctx, w.tx, sqlResourceInsertFromHistory, &sqlResourceInsertFromHistoryRequest{
 | |
| 		SQLTemplate: sqltemplate.New(w.dialect),
 | |
| 		Key:         key,
 | |
| 	})
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	w.logger.Info("get stats (still in transaction)", "key", resource.NSGR(key))
 | |
| 	rows, err := dbutil.QueryRows(w.ctx, w.tx, sqlResourceStats, &sqlStatsRequest{
 | |
| 		SQLTemplate: sqltemplate.New(w.dialect),
 | |
| 		Namespace:   key.Namespace,
 | |
| 		Group:       key.Group,
 | |
| 		Resource:    key.Resource,
 | |
| 	})
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	if rows != nil {
 | |
| 		defer func() {
 | |
| 			_ = rows.Close()
 | |
| 		}()
 | |
| 	}
 | |
| 	if rows.Next() {
 | |
| 		row := resource.ResourceStats{}
 | |
| 		return rows.Scan(&row.Namespace, &row.Group, &row.Resource,
 | |
| 			&summary.Count,
 | |
| 			&summary.ResourceVersion)
 | |
| 	}
 | |
| 	return err
 | |
| }
 |