mirror of https://github.com/grafana/grafana.git
				
				
				
			
		
			
				
	
	
		
			460 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			460 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Go
		
	
	
	
| package runstream
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"errors"
 | |
| 	"fmt"
 | |
| 	"math"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/grafana/grafana-plugin-sdk-go/backend"
 | |
| 
 | |
| 	"github.com/grafana/grafana/pkg/apimachinery/identity"
 | |
| 	"github.com/grafana/grafana/pkg/infra/log"
 | |
| 	"github.com/grafana/grafana/pkg/plugins"
 | |
| )
 | |
| 
 | |
| var (
 | |
| 	logger = log.New("live.runstream")
 | |
| )
 | |
| 
 | |
| //go:generate mockgen -destination=mock.go -package=runstream github.com/grafana/grafana/pkg/services/live/runstream ChannelLocalPublisher,NumLocalSubscribersGetter,StreamRunner,PluginContextGetter
 | |
| 
 | |
| type ChannelLocalPublisher interface {
 | |
| 	PublishLocal(channel string, data []byte) error
 | |
| }
 | |
| 
 | |
| type PluginContextGetter interface {
 | |
| 	GetPluginContext(ctx context.Context, user identity.Requester, pluginID string, datasourceUID string, skipCache bool) (backend.PluginContext, error)
 | |
| }
 | |
| 
 | |
| type NumLocalSubscribersGetter interface {
 | |
| 	// GetNumSubscribers returns number of channel subscribers throughout all nodes.
 | |
| 	GetNumLocalSubscribers(channel string) (int, error)
 | |
| }
 | |
| 
 | |
| type StreamRunner interface {
 | |
| 	RunStream(ctx context.Context, request *backend.RunStreamRequest, sender *backend.StreamSender) error
 | |
| }
 | |
| 
 | |
| type packetSender struct {
 | |
| 	channelLocalPublisher ChannelLocalPublisher
 | |
| 	channel               string
 | |
| }
 | |
| 
 | |
| func (p *packetSender) Send(packet *backend.StreamPacket) error {
 | |
| 	return p.channelLocalPublisher.PublishLocal(p.channel, packet.Data)
 | |
| }
 | |
| 
 | |
| // Manager manages streams from Grafana to plugins (i.e. RunStream method).
 | |
| type Manager struct {
 | |
| 	mu                      sync.RWMutex
 | |
| 	baseCtx                 context.Context
 | |
| 	streams                 map[string]streamContext
 | |
| 	datasourceStreams       map[string]map[string]struct{}
 | |
| 	presenceGetter          NumLocalSubscribersGetter
 | |
| 	pluginContextGetter     PluginContextGetter
 | |
| 	channelSender           ChannelLocalPublisher
 | |
| 	registerCh              chan submitRequest
 | |
| 	closedCh                chan struct{}
 | |
| 	checkInterval           time.Duration
 | |
| 	maxChecks               int
 | |
| 	datasourceCheckInterval time.Duration
 | |
| }
 | |
| 
 | |
| // ManagerOption modifies Manager behavior (used for tests for example).
 | |
| type ManagerOption func(*Manager)
 | |
| 
 | |
| // WithCheckConfig allows setting custom check rules.
 | |
| func WithCheckConfig(interval time.Duration, maxChecks int) ManagerOption {
 | |
| 	return func(sm *Manager) {
 | |
| 		sm.checkInterval = interval
 | |
| 		sm.maxChecks = maxChecks
 | |
| 	}
 | |
| }
 | |
| 
 | |
| const (
 | |
| 	defaultCheckInterval           = 5 * time.Second
 | |
| 	defaultDatasourceCheckInterval = time.Minute
 | |
| 	defaultMaxChecks               = 3
 | |
| )
 | |
| 
 | |
| // NewManager creates new Manager.
 | |
| func NewManager(channelSender ChannelLocalPublisher, presenceGetter NumLocalSubscribersGetter, pluginContextGetter PluginContextGetter, opts ...ManagerOption) *Manager {
 | |
| 	sm := &Manager{
 | |
| 		streams:                 make(map[string]streamContext),
 | |
| 		datasourceStreams:       map[string]map[string]struct{}{},
 | |
| 		channelSender:           channelSender,
 | |
| 		presenceGetter:          presenceGetter,
 | |
| 		pluginContextGetter:     pluginContextGetter,
 | |
| 		registerCh:              make(chan submitRequest),
 | |
| 		closedCh:                make(chan struct{}),
 | |
| 		checkInterval:           defaultCheckInterval,
 | |
| 		maxChecks:               defaultMaxChecks,
 | |
| 		datasourceCheckInterval: defaultDatasourceCheckInterval,
 | |
| 	}
 | |
| 	for _, opt := range opts {
 | |
| 		opt(sm)
 | |
| 	}
 | |
| 	return sm
 | |
| }
 | |
| 
 | |
| func (s *Manager) HandleDatasourceDelete(orgID int64, dsUID string) error {
 | |
| 	return s.handleDatasourceEvent(orgID, dsUID, false)
 | |
| }
 | |
| 
 | |
| func (s *Manager) HandleDatasourceUpdate(orgID int64, dsUID string) error {
 | |
| 	return s.handleDatasourceEvent(orgID, dsUID, true)
 | |
| }
 | |
| 
 | |
| func (s *Manager) handleDatasourceEvent(orgID int64, dsUID string, resubmit bool) error {
 | |
| 	dsKey := datasourceKey(orgID, dsUID)
 | |
| 	s.mu.RLock()
 | |
| 	dsStreams, ok := s.datasourceStreams[dsKey]
 | |
| 	if !ok {
 | |
| 		s.mu.RUnlock()
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	resubmitRequests := make([]streamRequest, 0, len(dsStreams))
 | |
| 	waitChannels := make([]chan struct{}, 0, len(dsStreams))
 | |
| 	for channel := range dsStreams {
 | |
| 		streamCtx, ok := s.streams[channel]
 | |
| 		if !ok {
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		streamCtx.cancelFn()
 | |
| 
 | |
| 		waitChannels = append(waitChannels, streamCtx.CloseCh)
 | |
| 		resubmitRequests = append(resubmitRequests, streamCtx.streamRequest)
 | |
| 	}
 | |
| 
 | |
| 	s.mu.RUnlock()
 | |
| 
 | |
| 	// Wait for all streams to stop.
 | |
| 	for _, ch := range waitChannels {
 | |
| 		<-ch
 | |
| 	}
 | |
| 
 | |
| 	if resubmit {
 | |
| 		// Re-submit streams.
 | |
| 		for _, sr := range resubmitRequests {
 | |
| 			_, err := s.SubmitStream(s.baseCtx, sr.user, sr.Channel, sr.Path, sr.Data, sr.PluginContext, sr.StreamRunner, true)
 | |
| 			if err != nil {
 | |
| 				// Log error but do not prevent execution of caller routine.
 | |
| 				logger.Error("Error re-submitting stream", "path", sr.Path, "error", err)
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func datasourceKey(orgID int64, dsUID string) string {
 | |
| 	return fmt.Sprintf("%d_%s", orgID, dsUID)
 | |
| }
 | |
| 
 | |
| func (s *Manager) stopStream(sr streamRequest, cancelFn func()) {
 | |
| 	s.mu.Lock()
 | |
| 	defer s.mu.Unlock()
 | |
| 	streamCtx, ok := s.streams[sr.Channel]
 | |
| 	if !ok {
 | |
| 		return
 | |
| 	}
 | |
| 	closeCh := streamCtx.CloseCh
 | |
| 	delete(s.streams, sr.Channel)
 | |
| 	if sr.PluginContext.DataSourceInstanceSettings != nil {
 | |
| 		dsUID := sr.PluginContext.DataSourceInstanceSettings.UID
 | |
| 		dsKey := datasourceKey(sr.PluginContext.OrgID, dsUID)
 | |
| 		delete(s.datasourceStreams[dsKey], sr.Channel)
 | |
| 	}
 | |
| 	cancelFn()
 | |
| 	close(closeCh)
 | |
| }
 | |
| 
 | |
| func (s *Manager) watchStream(ctx context.Context, cancelFn func(), sr streamRequest) {
 | |
| 	numNoSubscribersChecks := 0
 | |
| 	presenceTicker := time.NewTicker(s.checkInterval)
 | |
| 	defer presenceTicker.Stop()
 | |
| 	datasourceTicker := time.NewTicker(s.datasourceCheckInterval)
 | |
| 	defer datasourceTicker.Stop()
 | |
| 	for {
 | |
| 		select {
 | |
| 		case <-ctx.Done():
 | |
| 			return
 | |
| 		case <-datasourceTicker.C:
 | |
| 			if sr.PluginContext.DataSourceInstanceSettings != nil {
 | |
| 				dsUID := sr.PluginContext.DataSourceInstanceSettings.UID
 | |
| 				pCtx, err := s.pluginContextGetter.GetPluginContext(ctx, sr.user, sr.PluginContext.PluginID, dsUID, false)
 | |
| 				if err != nil {
 | |
| 					if errors.Is(err, plugins.ErrPluginNotRegistered) {
 | |
| 						logger.Debug("Datasource not found, stop stream", "channel", sr.Channel, "path", sr.Path)
 | |
| 						return
 | |
| 					}
 | |
| 					logger.Error("Error getting datasource context", "channel", sr.Channel, "path", sr.Path, "error", err)
 | |
| 					continue
 | |
| 				}
 | |
| 				if pCtx.DataSourceInstanceSettings.Updated != sr.PluginContext.DataSourceInstanceSettings.Updated {
 | |
| 					logger.Debug("Datasource changed, re-establish stream", "channel", sr.Channel, "path", sr.Path)
 | |
| 					err := s.HandleDatasourceUpdate(pCtx.OrgID, dsUID)
 | |
| 					if err != nil {
 | |
| 						logger.Error("Error re-establishing stream", "channel", sr.Channel, "path", sr.Path, "error", err)
 | |
| 						continue
 | |
| 					}
 | |
| 					return
 | |
| 				}
 | |
| 			}
 | |
| 		case <-presenceTicker.C:
 | |
| 			numSubscribers, err := s.presenceGetter.GetNumLocalSubscribers(sr.Channel)
 | |
| 			if err != nil {
 | |
| 				logger.Error("Error checking num subscribers", "channel", sr.Channel, "path", sr.Path, "error", err)
 | |
| 				continue
 | |
| 			}
 | |
| 			if numSubscribers > 0 {
 | |
| 				// reset counter since channel has active subscribers.
 | |
| 				numNoSubscribersChecks = 0
 | |
| 				continue
 | |
| 			}
 | |
| 			numNoSubscribersChecks++
 | |
| 			if numNoSubscribersChecks >= s.maxChecks {
 | |
| 				logger.Debug("Stop stream since no active subscribers", "channel", sr.Channel, "path", sr.Path)
 | |
| 				s.stopStream(sr, cancelFn)
 | |
| 				return
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| const streamDurationThreshold = 100 * time.Millisecond
 | |
| const coolDownDelay = 100 * time.Millisecond
 | |
| const maxDelay = 5 * time.Second
 | |
| 
 | |
| func getDelay(numErrors int) time.Duration {
 | |
| 	if numErrors == 0 {
 | |
| 		return 0
 | |
| 	}
 | |
| 	delay := coolDownDelay * time.Duration(math.Pow(2, float64(numErrors)))
 | |
| 	if delay > maxDelay {
 | |
| 		return maxDelay
 | |
| 	}
 | |
| 	return delay
 | |
| }
 | |
| 
 | |
| // run stream until context canceled or stream finished without an error.
 | |
| func (s *Manager) runStream(ctx context.Context, cancelFn func(), sr streamRequest) {
 | |
| 	defer func() { s.stopStream(sr, cancelFn) }()
 | |
| 	var numFastErrors int
 | |
| 	var delay time.Duration
 | |
| 	var isReconnect bool
 | |
| 	startTime := time.Now()
 | |
| 	for {
 | |
| 		select {
 | |
| 		case <-ctx.Done():
 | |
| 			return
 | |
| 		default:
 | |
| 		}
 | |
| 
 | |
| 		pluginCtx := sr.PluginContext
 | |
| 
 | |
| 		if isReconnect {
 | |
| 			// Best effort to cool down re-establishment process. We don't have a
 | |
| 			// nice way to understand whether we really need to wait here - so relying
 | |
| 			// on duration time of running a stream.
 | |
| 			if time.Since(startTime) < streamDurationThreshold {
 | |
| 				if delay < maxDelay {
 | |
| 					// Due to not calling getDelay after we have delay larger than maxDelay
 | |
| 					// we avoid possible float overflow errors while calculating delay duration
 | |
| 					// based on numFastErrors.
 | |
| 					delay = getDelay(numFastErrors)
 | |
| 				}
 | |
| 				numFastErrors++
 | |
| 			} else {
 | |
| 				// Assuming that stream successfully started.
 | |
| 				delay = 0
 | |
| 				numFastErrors = 0
 | |
| 			}
 | |
| 			select {
 | |
| 			case <-ctx.Done():
 | |
| 				return
 | |
| 			case <-time.After(delay):
 | |
| 			}
 | |
| 			startTime = time.Now()
 | |
| 
 | |
| 			// Resolve new plugin context as it could be modified since last call.
 | |
| 			// We are using the same user here which initiated stream originally.
 | |
| 			var datasourceUID string
 | |
| 			if pluginCtx.DataSourceInstanceSettings != nil {
 | |
| 				datasourceUID = pluginCtx.DataSourceInstanceSettings.UID
 | |
| 			}
 | |
| 			newPluginCtx, err := s.pluginContextGetter.GetPluginContext(ctx, sr.user, pluginCtx.PluginID, datasourceUID, false)
 | |
| 			if err != nil {
 | |
| 				if errors.Is(err, plugins.ErrPluginNotRegistered) {
 | |
| 					logger.Info("No plugin context found, stopping stream", "path", sr.Path)
 | |
| 					return
 | |
| 				}
 | |
| 				logger.Error("Error getting plugin context", "path", sr.Path, "error", err)
 | |
| 				isReconnect = true
 | |
| 				continue
 | |
| 			}
 | |
| 			pluginCtx = newPluginCtx
 | |
| 		}
 | |
| 
 | |
| 		err := sr.StreamRunner.RunStream(
 | |
| 			ctx,
 | |
| 			&backend.RunStreamRequest{
 | |
| 				PluginContext: pluginCtx,
 | |
| 				Path:          sr.Path,
 | |
| 				Data:          sr.Data,
 | |
| 			},
 | |
| 			backend.NewStreamSender(&packetSender{channelLocalPublisher: s.channelSender, channel: sr.Channel}),
 | |
| 		)
 | |
| 		if err != nil {
 | |
| 			if errors.Is(ctx.Err(), context.Canceled) {
 | |
| 				logger.Debug("Stream cleanly finished", "path", sr.Path)
 | |
| 				return
 | |
| 			}
 | |
| 			logger.Error("Error running stream, re-establishing", "path", sr.Path, "error", err, "wait", delay)
 | |
| 			isReconnect = true
 | |
| 			continue
 | |
| 		}
 | |
| 		logger.Debug("Stream finished without error, stopping it", "path", sr.Path)
 | |
| 		return
 | |
| 	}
 | |
| }
 | |
| 
 | |
| var errClosed = errors.New("stream manager closed")
 | |
| 
 | |
| type streamContext struct {
 | |
| 	CloseCh       chan struct{}
 | |
| 	cancelFn      func()
 | |
| 	streamRequest streamRequest
 | |
| }
 | |
| 
 | |
| func (s *Manager) registerStream(ctx context.Context, sr submitRequest) {
 | |
| 	s.mu.Lock()
 | |
| 	if streamCtx, ok := s.streams[sr.streamRequest.Channel]; ok {
 | |
| 		s.mu.Unlock()
 | |
| 		sr.responseCh <- submitResponse{Result: submitResult{StreamExists: true, CloseNotify: streamCtx.CloseCh}}
 | |
| 		return
 | |
| 	}
 | |
| 	ctx, cancel := context.WithCancel(ctx)
 | |
| 	defer cancel()
 | |
| 	closeCh := make(chan struct{})
 | |
| 	s.streams[sr.streamRequest.Channel] = streamContext{
 | |
| 		CloseCh:       closeCh,
 | |
| 		cancelFn:      cancel,
 | |
| 		streamRequest: sr.streamRequest,
 | |
| 	}
 | |
| 	if sr.streamRequest.PluginContext.DataSourceInstanceSettings != nil {
 | |
| 		dsUID := sr.streamRequest.PluginContext.DataSourceInstanceSettings.UID
 | |
| 		dsKey := datasourceKey(sr.streamRequest.PluginContext.OrgID, dsUID)
 | |
| 		if _, ok := s.datasourceStreams[dsKey]; !ok {
 | |
| 			s.datasourceStreams[dsKey] = map[string]struct{}{}
 | |
| 		}
 | |
| 		s.datasourceStreams[dsKey][sr.streamRequest.Channel] = struct{}{}
 | |
| 	}
 | |
| 	s.mu.Unlock()
 | |
| 	sr.responseCh <- submitResponse{Result: submitResult{StreamExists: false, CloseNotify: closeCh}}
 | |
| 	go s.watchStream(ctx, cancel, sr.streamRequest)
 | |
| 	s.runStream(ctx, cancel, sr.streamRequest)
 | |
| }
 | |
| 
 | |
| // Run Manager till context canceled.
 | |
| func (s *Manager) Run(ctx context.Context) error {
 | |
| 	s.baseCtx = ctx
 | |
| 	for {
 | |
| 		select {
 | |
| 		case sr := <-s.registerCh:
 | |
| 			go s.registerStream(ctx, sr)
 | |
| 		case <-ctx.Done():
 | |
| 			close(s.closedCh)
 | |
| 			return ctx.Err()
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| type streamRequest struct {
 | |
| 	Channel       string
 | |
| 	Path          string
 | |
| 	user          identity.Requester
 | |
| 	PluginContext backend.PluginContext
 | |
| 	StreamRunner  StreamRunner
 | |
| 	Data          []byte
 | |
| }
 | |
| 
 | |
| type submitRequest struct {
 | |
| 	responseCh    chan submitResponse
 | |
| 	streamRequest streamRequest
 | |
| }
 | |
| 
 | |
| type submitResult struct {
 | |
| 	// StreamExists tells whether stream have been already opened.
 | |
| 	StreamExists bool
 | |
| 	// CloseNotify will be closed as soon as stream cleanly exited.
 | |
| 	CloseNotify chan struct{}
 | |
| }
 | |
| 
 | |
| type submitResponse struct {
 | |
| 	Error  error
 | |
| 	Result submitResult
 | |
| }
 | |
| 
 | |
| var errDatasourceNotFound = errors.New("datasource not found")
 | |
| 
 | |
| // SubmitStream submits stream handler in Manager to manage.
 | |
| // The stream will be opened and kept till channel has active subscribers.
 | |
| func (s *Manager) SubmitStream(ctx context.Context, user identity.Requester, channel string, path string, data []byte, pCtx backend.PluginContext, streamRunner StreamRunner, isResubmit bool) (*submitResult, error) {
 | |
| 	if isResubmit {
 | |
| 		// Resolve new plugin context as it could be modified since last call.
 | |
| 		var datasourceUID string
 | |
| 		if pCtx.DataSourceInstanceSettings != nil {
 | |
| 			datasourceUID = pCtx.DataSourceInstanceSettings.UID
 | |
| 		}
 | |
| 		newPluginCtx, err := s.pluginContextGetter.GetPluginContext(ctx, user, pCtx.PluginID, datasourceUID, false)
 | |
| 		if err != nil {
 | |
| 			if errors.Is(err, plugins.ErrPluginNotRegistered) {
 | |
| 				return nil, errDatasourceNotFound
 | |
| 			}
 | |
| 			return nil, err
 | |
| 		}
 | |
| 		pCtx = newPluginCtx
 | |
| 	}
 | |
| 
 | |
| 	req := submitRequest{
 | |
| 		responseCh: make(chan submitResponse, 1),
 | |
| 		streamRequest: streamRequest{
 | |
| 			user:          user,
 | |
| 			Channel:       channel,
 | |
| 			Path:          path,
 | |
| 			PluginContext: pCtx,
 | |
| 			StreamRunner:  streamRunner,
 | |
| 			Data:          data,
 | |
| 		},
 | |
| 	}
 | |
| 
 | |
| 	// Send submit request.
 | |
| 	select {
 | |
| 	case s.registerCh <- req:
 | |
| 	case <-s.closedCh:
 | |
| 		close(s.registerCh)
 | |
| 		return nil, errClosed
 | |
| 	case <-ctx.Done():
 | |
| 		return nil, ctx.Err()
 | |
| 	}
 | |
| 
 | |
| 	// Wait for submit response.
 | |
| 	select {
 | |
| 	case resp := <-req.responseCh:
 | |
| 		if resp.Error != nil {
 | |
| 			return nil, resp.Error
 | |
| 		}
 | |
| 		return &resp.Result, nil
 | |
| 	case <-s.closedCh:
 | |
| 		return nil, errClosed
 | |
| 	case <-ctx.Done():
 | |
| 		return nil, ctx.Err()
 | |
| 	}
 | |
| }
 |