Live: support streaming results out-of-the-box (#32821)

This commit is contained in:
Ryan McKinley 2021-04-09 12:17:22 -07:00 committed by GitHub
parent 2d7e980da7
commit b96e45299d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 179 additions and 242 deletions

2
go.mod
View File

@ -45,7 +45,7 @@ require (
github.com/grafana/grafana-aws-sdk v0.4.0 github.com/grafana/grafana-aws-sdk v0.4.0
github.com/grafana/grafana-live-sdk v0.0.4 github.com/grafana/grafana-live-sdk v0.0.4
github.com/grafana/grafana-plugin-model v0.0.0-20190930120109-1fc953a61fb4 github.com/grafana/grafana-plugin-model v0.0.0-20190930120109-1fc953a61fb4
github.com/grafana/grafana-plugin-sdk-go v0.91.0 github.com/grafana/grafana-plugin-sdk-go v0.92.0
github.com/grafana/loki v1.6.2-0.20201026154740-6978ee5d7387 github.com/grafana/loki v1.6.2-0.20201026154740-6978ee5d7387
github.com/grpc-ecosystem/go-grpc-middleware v1.2.2 github.com/grpc-ecosystem/go-grpc-middleware v1.2.2
github.com/hashicorp/go-hclog v0.15.0 github.com/hashicorp/go-hclog v0.15.0

14
go.sum
View File

@ -816,16 +816,6 @@ github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0U
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gosimple/slug v1.9.0 h1:r5vDcYrFz9BmfIAMC829un9hq7hKM4cHUrsv36LbEqs= github.com/gosimple/slug v1.9.0 h1:r5vDcYrFz9BmfIAMC829un9hq7hKM4cHUrsv36LbEqs=
github.com/gosimple/slug v1.9.0/go.mod h1:AMZ+sOVe65uByN3kgEyf9WEBKBCSS+dJjMX9x4vDJbg= github.com/gosimple/slug v1.9.0/go.mod h1:AMZ+sOVe65uByN3kgEyf9WEBKBCSS+dJjMX9x4vDJbg=
github.com/grafana/alerting-api v0.0.0-20210331135037-3294563b51bb h1:Hj25Whc/TRv0hSLm5VN0FJ5R4yZ6M4ycRcBgu7bsEAc=
github.com/grafana/alerting-api v0.0.0-20210331135037-3294563b51bb/go.mod h1:5IppnPguSHcCbVLGCVzVjBvuQZNbYgVJ4KyXXjhCyWY=
github.com/grafana/alerting-api v0.0.0-20210405171311-97906879c771 h1:CTmKHUu2n0O9fPTSXb+s5FO8Em9Atw57Z7mvw7lt6IM=
github.com/grafana/alerting-api v0.0.0-20210405171311-97906879c771/go.mod h1:5IppnPguSHcCbVLGCVzVjBvuQZNbYgVJ4KyXXjhCyWY=
github.com/grafana/alerting-api v0.0.0-20210407150830-64bd267999d1 h1:pbG8BsRHezUvUjMxwq+uZsx1ZMEQsfSj26KSd/H3A9g=
github.com/grafana/alerting-api v0.0.0-20210407150830-64bd267999d1/go.mod h1:Ce2PwraBlFMa+P0ArBzubfB/BXZV35mfYWQjM8C/BSE=
github.com/grafana/alerting-api v0.0.0-20210408130544-2969b61275a5 h1:r33Ruhf3TNvT4Y+acR8R0zBtNULnThnJRlAje2AFt6c=
github.com/grafana/alerting-api v0.0.0-20210408130544-2969b61275a5/go.mod h1:Ce2PwraBlFMa+P0ArBzubfB/BXZV35mfYWQjM8C/BSE=
github.com/grafana/alerting-api v0.0.0-20210408135630-bc10f737ceff h1:3l/P+TIOXJy1zX/kZB6dFAMPQrpwpiN3cLMVr8B8u4Q=
github.com/grafana/alerting-api v0.0.0-20210408135630-bc10f737ceff/go.mod h1:Ce2PwraBlFMa+P0ArBzubfB/BXZV35mfYWQjM8C/BSE=
github.com/grafana/alerting-api v0.0.0-20210409134845-c36ac1eae41b h1:QG52Et3EVCxPoYZifm91bRPVknccfjQURcpi7zXVut8= github.com/grafana/alerting-api v0.0.0-20210409134845-c36ac1eae41b h1:QG52Et3EVCxPoYZifm91bRPVknccfjQURcpi7zXVut8=
github.com/grafana/alerting-api v0.0.0-20210409134845-c36ac1eae41b/go.mod h1:Ce2PwraBlFMa+P0ArBzubfB/BXZV35mfYWQjM8C/BSE= github.com/grafana/alerting-api v0.0.0-20210409134845-c36ac1eae41b/go.mod h1:Ce2PwraBlFMa+P0ArBzubfB/BXZV35mfYWQjM8C/BSE=
github.com/grafana/go-mssqldb v0.0.0-20210326084033-d0ce3c521036 h1:GplhUk6Xes5JIhUUrggPcPBhOn+eT8+WsHiebvq7GgA= github.com/grafana/go-mssqldb v0.0.0-20210326084033-d0ce3c521036 h1:GplhUk6Xes5JIhUUrggPcPBhOn+eT8+WsHiebvq7GgA=
@ -840,8 +830,9 @@ github.com/grafana/grafana-plugin-model v0.0.0-20190930120109-1fc953a61fb4 h1:SP
github.com/grafana/grafana-plugin-model v0.0.0-20190930120109-1fc953a61fb4/go.mod h1:nc0XxBzjeGcrMltCDw269LoWF9S8ibhgxolCdA1R8To= github.com/grafana/grafana-plugin-model v0.0.0-20190930120109-1fc953a61fb4/go.mod h1:nc0XxBzjeGcrMltCDw269LoWF9S8ibhgxolCdA1R8To=
github.com/grafana/grafana-plugin-sdk-go v0.79.0/go.mod h1:NvxLzGkVhnoBKwzkst6CFfpMFKwAdIUZ1q8ssuLeF60= github.com/grafana/grafana-plugin-sdk-go v0.79.0/go.mod h1:NvxLzGkVhnoBKwzkst6CFfpMFKwAdIUZ1q8ssuLeF60=
github.com/grafana/grafana-plugin-sdk-go v0.88.0/go.mod h1:PTALh0lz+Y7k0+OMczAABTpeocL63aw6FVOBptp5GVo= github.com/grafana/grafana-plugin-sdk-go v0.88.0/go.mod h1:PTALh0lz+Y7k0+OMczAABTpeocL63aw6FVOBptp5GVo=
github.com/grafana/grafana-plugin-sdk-go v0.91.0 h1:kiPS3NqR+KOvHrc32EkX7D40JON3+GYZ6Nm2WOtCElQ=
github.com/grafana/grafana-plugin-sdk-go v0.91.0/go.mod h1:Ot3k7nY7P6DXmUsDgKvNB7oG1v7PRyTdmnYVoS554bU= github.com/grafana/grafana-plugin-sdk-go v0.91.0/go.mod h1:Ot3k7nY7P6DXmUsDgKvNB7oG1v7PRyTdmnYVoS554bU=
github.com/grafana/grafana-plugin-sdk-go v0.92.0 h1:Wim+Ey7BaA0BO7+wBHeTPrGotKPRznKBXYArSAJL3W8=
github.com/grafana/grafana-plugin-sdk-go v0.92.0/go.mod h1:UwW5HV5HUzRKCieDfC4J31H4PBQngn2wXjJXJmn2zjw=
github.com/grafana/loki v1.6.2-0.20201026154740-6978ee5d7387 h1:iwcM8lkYJ3EhytGLJ2BvRSwutb0QWoI7EWbYv3yJRsY= github.com/grafana/loki v1.6.2-0.20201026154740-6978ee5d7387 h1:iwcM8lkYJ3EhytGLJ2BvRSwutb0QWoI7EWbYv3yJRsY=
github.com/grafana/loki v1.6.2-0.20201026154740-6978ee5d7387/go.mod h1:jHA1OHnPsuj3LLgMXmFopsKDt4ARHHUhrmT3JrGf71g= github.com/grafana/loki v1.6.2-0.20201026154740-6978ee5d7387/go.mod h1:jHA1OHnPsuj3LLgMXmFopsKDt4ARHHUhrmT3JrGf71g=
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA= github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
@ -1051,7 +1042,6 @@ github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E
github.com/jung-kurt/gofpdf v1.16.2 h1:jgbatWHfRlPYiK85qgevsZTHviWXKwB1TTiKdz5PtRc= github.com/jung-kurt/gofpdf v1.16.2 h1:jgbatWHfRlPYiK85qgevsZTHviWXKwB1TTiKdz5PtRc=
github.com/jung-kurt/gofpdf v1.16.2/go.mod h1:1hl7y57EsiPAkLbOwzpzqgx1A30nQCk/YmFV8S2vmK0= github.com/jung-kurt/gofpdf v1.16.2/go.mod h1:1hl7y57EsiPAkLbOwzpzqgx1A30nQCk/YmFV8S2vmK0=
github.com/jwilder/encoding v0.0.0-20170811194829-b4e1701a28ef/go.mod h1:Ct9fl0F6iIOGgxJ5npU/IUOhOhqlVrGjyIZc8/MagT0= github.com/jwilder/encoding v0.0.0-20170811194829-b4e1701a28ef/go.mod h1:Ct9fl0F6iIOGgxJ5npU/IUOhOhqlVrGjyIZc8/MagT0=
github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88 h1:uC1QfSlInpQF+M0ao65imhwqKnz3Q2z/d8PWZRMQvDM=
github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88/go.mod h1:3w7q1U84EfirKl04SVQ/s7nPm1ZPhiXd34z40TNz36k= github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88/go.mod h1:3w7q1U84EfirKl04SVQ/s7nPm1ZPhiXd34z40TNz36k=
github.com/k0kubun/go-ansi v0.0.0-20180517002512-3bf9e2903213/go.mod h1:vNUNkEQ1e29fT/6vq2aBdFsgNPmy8qMdSay1npru+Sw= github.com/k0kubun/go-ansi v0.0.0-20180517002512-3bf9e2903213/go.mod h1:vNUNkEQ1e29fT/6vq2aBdFsgNPmy8qMdSay1npru+Sw=
github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0/go.mod h1:1NbS8ALrpOvjt0rHPNLyCIeMtbizbir8U//inJ+zuB8= github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0/go.mod h1:1NbS8ALrpOvjt0rHPNLyCIeMtbizbir8U//inJ+zuB8=

View File

@ -42,6 +42,9 @@ export interface QueryResultMeta {
/** Currently used to show results in Explore only in preferred visualisation option */ /** Currently used to show results in Explore only in preferred visualisation option */
preferredVisualisationType?: PreferredVisualisationType; preferredVisualisationType?: PreferredVisualisationType;
/** The path for live stream updates for this frame */
channel?: string;
/** /**
* Optionally identify which topic the frame should be assigned to. * Optionally identify which topic the frame should be assigned to.
* A value specified in the response will override what the request asked for. * A value specified in the response will override what the request asked for.

View File

@ -6,7 +6,7 @@
export * from './services'; export * from './services';
export * from './config'; export * from './config';
export * from './types'; export * from './types';
export * from './measurement'; export * from './utils/liveQuery';
export { loadPluginCss, SystemJS, PluginCssOptions } from './utils/plugin'; export { loadPluginCss, SystemJS, PluginCssOptions } from './utils/plugin';
export { reportMetaAnalytics } from './utils/analytics'; export { reportMetaAnalytics } from './utils/analytics';
export { logInfo, logDebug, logWarning, logError } from './utils/logging'; export { logInfo, logDebug, logWarning, logError } from './utils/logging';

View File

@ -1 +0,0 @@
export * from './query';

View File

@ -1,6 +1,13 @@
import { BackendSrv, BackendSrvRequest } from 'src/services'; import { BackendSrv, BackendSrvRequest } from 'src/services';
import { DataSourceWithBackend } from './DataSourceWithBackend'; import { DataSourceWithBackend, toStreamingDataResponse } from './DataSourceWithBackend';
import { DataSourceJsonData, DataQuery, DataSourceInstanceSettings, DataQueryRequest } from '@grafana/data'; import {
DataSourceJsonData,
DataQuery,
DataSourceInstanceSettings,
DataQueryRequest,
DataQueryResponseData,
MutableDataFrame,
} from '@grafana/data';
import { of } from 'rxjs'; import { of } from 'rxjs';
class MyDataSource extends DataSourceWithBackend<DataQuery, DataSourceJsonData> { class MyDataSource extends DataSourceWithBackend<DataQuery, DataSourceJsonData> {
@ -73,4 +80,26 @@ describe('DataSourceWithBackend', () => {
} }
`); `);
}); });
test('it converts results with channels to streaming queries', () => {
const request: DataQueryRequest = {
intervalMs: 100,
} as DataQueryRequest;
const rsp: DataQueryResponseData = {
data: [],
};
// Simple empty query
let obs = toStreamingDataResponse(request, rsp);
expect(obs).toBeDefined();
let frame = new MutableDataFrame();
frame.meta = {
channel: 'a/b/c',
};
rsp.data = [frame];
obs = toStreamingDataResponse(request, rsp);
expect(obs).toBeDefined();
});
}); });

View File

@ -7,11 +7,15 @@ import {
DataSourceJsonData, DataSourceJsonData,
ScopedVars, ScopedVars,
makeClassES5Compatible, makeClassES5Compatible,
DataFrame,
parseLiveChannelAddress,
StreamingFrameOptions,
} from '@grafana/data'; } from '@grafana/data';
import { Observable, of } from 'rxjs'; import { merge, Observable, of } from 'rxjs';
import { map, catchError } from 'rxjs/operators'; import { catchError, switchMap } from 'rxjs/operators';
import { getBackendSrv, getDataSourceSrv } from '../services'; import { getBackendSrv, getDataSourceSrv } from '../services';
import { BackendDataSourceResponse, toDataQueryResponse } from './queryResponse'; import { BackendDataSourceResponse, toDataQueryResponse } from './queryResponse';
import { getLiveDataStream } from './liveQuery';
const ExpressionDatasourceID = '__expr__'; const ExpressionDatasourceID = '__expr__';
@ -132,8 +136,13 @@ class DataSourceWithBackend<
requestId, requestId,
}) })
.pipe( .pipe(
map((rsp) => { switchMap((raw) => {
return toDataQueryResponse(rsp, queries as DataQuery[]); const rsp = toDataQueryResponse(raw, queries as DataQuery[]);
// Check if any response should subscribe to a live stream
if (rsp.data?.length && rsp.data.find((f: DataFrame) => f.meta?.channel)) {
return toStreamingDataResponse(request, rsp);
}
return of(rsp);
}), }),
catchError((err) => { catchError((err) => {
return of(toDataQueryResponse(err)); return of(toDataQueryResponse(err));
@ -209,6 +218,44 @@ class DataSourceWithBackend<
} }
} }
export function toStreamingDataResponse(
request: DataQueryRequest,
rsp: DataQueryResponse
): Observable<DataQueryResponse> {
const buffer: StreamingFrameOptions = {
maxLength: request.maxDataPoints ?? 500,
};
// For recent queries, clamp to the current time range
if (request.rangeRaw?.to === 'now') {
buffer.maxDelta = request.range.to.valueOf() - request.range.from.valueOf();
}
const staticdata: DataFrame[] = [];
const streams: Array<Observable<DataQueryResponse>> = [];
for (const frame of rsp.data) {
const addr = parseLiveChannelAddress(frame.meta?.channel);
if (addr) {
streams.push(
getLiveDataStream({
addr,
buffer,
frame: frame as DataFrame,
})
);
} else {
staticdata.push(frame);
}
}
if (staticdata.length) {
streams.push(of({ ...rsp, data: staticdata }));
}
if (streams.length === 1) {
return streams[0]; // avoid merge wrapper
}
return merge(...streams);
}
//@ts-ignore //@ts-ignore
DataSourceWithBackend = makeClassES5Compatible(DataSourceWithBackend); DataSourceWithBackend = makeClassES5Compatible(DataSourceWithBackend);

View File

@ -1,6 +1,7 @@
import { import {
DataFrame, DataFrame,
DataFrameJSON, DataFrameJSON,
dataFrameToJSON,
DataQueryResponse, DataQueryResponse,
isLiveChannelMessageEvent, isLiveChannelMessageEvent,
isLiveChannelStatusEvent, isLiveChannelStatusEvent,
@ -15,7 +16,7 @@ import {
import { getGrafanaLiveSrv } from '../services/live'; import { getGrafanaLiveSrv } from '../services/live';
import { Observable, of } from 'rxjs'; import { Observable, of } from 'rxjs';
import { toDataQueryError } from '../utils/queryResponse'; import { toDataQueryError } from './queryResponse';
import { perf } from './perf'; import { perf } from './perf';
export interface LiveDataFilter { export interface LiveDataFilter {
@ -28,6 +29,7 @@ export interface LiveDataFilter {
export interface LiveDataStreamOptions { export interface LiveDataStreamOptions {
key?: string; key?: string;
addr: LiveChannelAddress; addr: LiveChannelAddress;
frame?: DataFrame; // initial results
buffer?: StreamingFrameOptions; buffer?: StreamingFrameOptions;
filter?: LiveDataFilter; filter?: LiveDataFilter;
} }
@ -39,8 +41,13 @@ export interface LiveDataStreamOptions {
*/ */
export function getLiveDataStream(options: LiveDataStreamOptions): Observable<DataQueryResponse> { export function getLiveDataStream(options: LiveDataStreamOptions): Observable<DataQueryResponse> {
if (!isValidLiveChannelAddress(options.addr)) { if (!isValidLiveChannelAddress(options.addr)) {
return of({ error: toDataQueryError('invalid address'), data: [] }); return of({
error: toDataQueryError(`invalid channel address: ${JSON.stringify(options.addr)}`),
state: LoadingState.Error,
data: options.frame ? [options.frame] : [],
});
} }
const live = getGrafanaLiveSrv(); const live = getGrafanaLiveSrv();
if (!live) { if (!live) {
return of({ error: toDataQueryError('grafana live is not initalized'), data: [] }); return of({ error: toDataQueryError('grafana live is not initalized'), data: [] });
@ -50,8 +57,16 @@ export function getLiveDataStream(options: LiveDataStreamOptions): Observable<Da
let data: StreamingDataFrame | undefined = undefined; let data: StreamingDataFrame | undefined = undefined;
let filtered: DataFrame | undefined = undefined; let filtered: DataFrame | undefined = undefined;
let state = LoadingState.Loading; let state = LoadingState.Loading;
const { key, filter } = options; let { key } = options;
let last = perf.last; let last = perf.last;
if (options.frame) {
const msg = dataFrameToJSON(options.frame);
data = new StreamingDataFrame(msg, options.buffer);
state = LoadingState.Streaming;
}
if (!key) {
key = `xstr/${streamCounter++}`;
}
const process = (msg: DataFrameJSON) => { const process = (msg: DataFrameJSON) => {
if (!data) { if (!data) {
@ -61,14 +76,17 @@ export function getLiveDataStream(options: LiveDataStreamOptions): Observable<Da
} }
state = LoadingState.Streaming; state = LoadingState.Streaming;
// Select the fields we are actually looking at // Filter out fields
if (!filtered || msg.schema) { if (!filtered || msg.schema) {
filtered = data; filtered = data;
if (filter?.fields?.length) { if (options.filter) {
filtered = { const { fields } = options.filter;
...data, if (fields?.length) {
fields: data.fields.filter((f) => filter.fields!.includes(f.name)), filtered = {
}; ...data,
fields: data.fields.filter((f) => fields.includes(f.name)),
};
}
} }
} }
@ -85,15 +103,17 @@ export function getLiveDataStream(options: LiveDataStreamOptions): Observable<Da
.getStream() .getStream()
.subscribe({ .subscribe({
error: (err: any) => { error: (err: any) => {
console.log('LiveQuery [error]', { err }, options.addr);
state = LoadingState.Error; state = LoadingState.Error;
subscriber.next({ state, data: [data], key }); subscriber.next({ state, data: [data], key, error: toDataQueryError(err) });
sub.unsubscribe(); // close after error sub.unsubscribe(); // close after error
}, },
complete: () => { complete: () => {
console.log('LiveQuery [complete]', options.addr);
if (state !== LoadingState.Error) { if (state !== LoadingState.Error) {
state = LoadingState.Done; state = LoadingState.Done;
} }
subscriber.next({ state, data: [data], key }); // or track errors? subscriber.next({ state, data: [data], key });
subscriber.complete(); subscriber.complete();
sub.unsubscribe(); sub.unsubscribe();
}, },
@ -103,14 +123,19 @@ export function getLiveDataStream(options: LiveDataStreamOptions): Observable<Da
return; return;
} }
if (isLiveChannelStatusEvent(evt)) { if (isLiveChannelStatusEvent(evt)) {
if ( if (evt.error) {
let error = toDataQueryError(evt.error);
error.message = `Streaming channel error: ${error.message}`;
state = LoadingState.Error;
subscriber.next({ state, data: [data], key, error });
return;
} else if (
evt.state === LiveChannelConnectionState.Connected || evt.state === LiveChannelConnectionState.Connected ||
evt.state === LiveChannelConnectionState.Pending evt.state === LiveChannelConnectionState.Pending
) { ) {
if (evt.message) { if (evt.message) {
process(evt.message); process(evt.message);
} }
return;
} }
console.log('ignore state', evt); console.log('ignore state', evt);
} }
@ -122,3 +147,6 @@ export function getLiveDataStream(options: LiveDataStreamOptions): Observable<Da
}; };
}); });
} }
// incremet the stream ids
let streamCounter = 10;

View File

@ -1,60 +0,0 @@
package live
import (
"strings"
)
// Channel is the channel ID split by parts.
type Channel struct {
// Scope is one of available channel scopes:
// like ScopeGrafana, ScopePlugin, ScopeDatasource, ScopeStream.
Scope string `json:"scope,omitempty"`
// Namespace meaning depends on the scope.
// * when ScopeGrafana, namespace is a "feature"
// * when ScopePlugin, namespace is the plugin name
// * when ScopeDatasource, namespace is the datasource uid
// * when ScopeStream, namespace is the stream ID.
Namespace string `json:"namespace,omitempty"`
// Within each namespace, the handler can process the path as needed.
Path string `json:"path,omitempty"`
}
// ParseChannel parses the parts from a channel ID:
// ${scope} / ${namespace} / ${path}.
func ParseChannel(chID string) Channel {
addr := Channel{}
parts := strings.SplitN(chID, "/", 3)
length := len(parts)
if length > 0 {
addr.Scope = parts[0]
}
if length > 1 {
addr.Namespace = parts[1]
}
if length > 2 {
addr.Path = parts[2]
}
return addr
}
func (c Channel) String() string {
ch := c.Scope
if c.Namespace != "" {
ch += "/" + c.Namespace
}
if c.Path != "" {
ch += "/" + c.Path
}
return ch
}
// IsValid checks if all parts of the address are valid.
func (c *Channel) IsValid() bool {
if c.Scope == ScopePush {
// Push scope channels supposed to be like push/{$stream_id}.
return c.Namespace != "" && c.Path == ""
}
return c.Scope != "" && c.Namespace != "" && c.Path != ""
}

View File

@ -1,110 +0,0 @@
package live
import (
"testing"
"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/require"
)
func TestParseChannel(t *testing.T) {
addr := ParseChannel("aaa/bbb/ccc/ddd")
require.True(t, addr.IsValid())
ex := Channel{
Scope: "aaa",
Namespace: "bbb",
Path: "ccc/ddd",
}
if diff := cmp.Diff(addr, ex); diff != "" {
t.Fatalf("Result mismatch (-want +got):\n%s", diff)
}
}
func TestParseChannel_IsValid(t *testing.T) {
tests := []struct {
name string
id string
isValid bool
}{
{
name: "valid",
id: "stream/cpu/test",
isValid: true,
},
{
name: "valid_long_path",
id: "stream/cpu/test/other",
isValid: true,
},
{
name: "invalid_no_path",
id: "grafana/bbb",
isValid: false,
},
{
name: "invalid_only_scope",
id: "grafana",
isValid: false,
},
{
name: "push_scope_no_path_valid",
id: "push/telegraf",
isValid: true,
},
{
name: "push_scope_with_path_invalid",
id: "push/telegraf/test",
isValid: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := ParseChannel(tt.id); got.IsValid() != tt.isValid {
t.Errorf("unexpected isValid result for %s", tt.id)
}
})
}
}
func TestChannel_String(t *testing.T) {
type fields struct {
Scope string
Namespace string
Path string
}
tests := []struct {
name string
fields fields
want string
}{
{
"with_all_parts",
fields{Scope: ScopeStream, Namespace: "telegraf", Path: "test"},
"stream/telegraf/test",
},
{
"with_scope_and_namespace",
fields{Scope: ScopeStream, Namespace: "telegraf"},
"stream/telegraf",
},
{
"with_scope_only",
fields{Scope: ScopeStream},
"stream",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := Channel{
Scope: tt.fields.Scope,
Namespace: tt.fields.Namespace,
Path: tt.fields.Path,
}.String()
if got != tt.want {
t.Errorf("String() = %v, want %v", got, tt.want)
}
})
}
}

View File

@ -100,7 +100,7 @@ func (r *PluginPathRunner) OnSubscribe(ctx context.Context, user *models.SignedI
Path: r.path, Path: r.path,
}) })
if err != nil { if err != nil {
logger.Error("Plugin CanSubscribeToStream call error", "error", err, "path", r.path) logger.Error("Plugin OnSubscribe call error", "error", err, "path", r.path)
return models.SubscribeReply{}, 0, err return models.SubscribeReply{}, 0, err
} }
if resp.Status != backend.SubscribeStreamStatusOK { if resp.Status != backend.SubscribeStreamStatusOK {
@ -144,7 +144,7 @@ func (r *PluginPathRunner) OnPublish(ctx context.Context, user *models.SignedInU
Data: e.Data, Data: e.Data,
}) })
if err != nil { if err != nil {
logger.Error("Plugin CanSubscribeToStream call error", "error", err, "path", r.path) logger.Error("Plugin OnPublish call error", "error", err, "path", r.path)
return models.PublishReply{}, 0, err return models.PublishReply{}, 0, err
} }
if resp.Status != backend.PublishStreamStatusOK { if resp.Status != backend.PublishStreamStatusOK {

View File

@ -4,12 +4,14 @@ import (
"context" "context"
"fmt" "fmt"
"net/http" "net/http"
"strconv"
"sync" "sync"
"time" "time"
"github.com/centrifugal/centrifuge" "github.com/centrifugal/centrifuge"
"github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data" "github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana-plugin-sdk-go/live"
"github.com/grafana/grafana/pkg/api/dtos" "github.com/grafana/grafana/pkg/api/dtos"
"github.com/grafana/grafana/pkg/api/response" "github.com/grafana/grafana/pkg/api/response"
@ -306,11 +308,11 @@ func publishStatusToHTTPError(status backend.PublishStreamStatus) (int, string)
} }
// GetChannelHandler gives thread-safe access to the channel. // GetChannelHandler gives thread-safe access to the channel.
func (g *GrafanaLive) GetChannelHandler(user *models.SignedInUser, channel string) (models.ChannelHandler, Channel, error) { func (g *GrafanaLive) GetChannelHandler(user *models.SignedInUser, channel string) (models.ChannelHandler, live.Channel, error) {
// Parse the identifier ${scope}/${namespace}/${path} // Parse the identifier ${scope}/${namespace}/${path}
addr := ParseChannel(channel) addr := live.ParseChannel(channel)
if !addr.IsValid() { if !addr.IsValid() {
return nil, Channel{}, fmt.Errorf("invalid channel: %q", channel) return nil, live.Channel{}, fmt.Errorf("invalid channel: %q", channel)
} }
g.channelsMu.RLock() g.channelsMu.RLock()
@ -349,15 +351,15 @@ func (g *GrafanaLive) GetChannelHandler(user *models.SignedInUser, channel strin
// It gives thread-safe access to the channel. // It gives thread-safe access to the channel.
func (g *GrafanaLive) GetChannelHandlerFactory(user *models.SignedInUser, scope string, namespace string) (models.ChannelHandlerFactory, error) { func (g *GrafanaLive) GetChannelHandlerFactory(user *models.SignedInUser, scope string, namespace string) (models.ChannelHandlerFactory, error) {
switch scope { switch scope {
case ScopeGrafana: case live.ScopeGrafana:
return g.handleGrafanaScope(user, namespace) return g.handleGrafanaScope(user, namespace)
case ScopePlugin: case live.ScopePlugin:
return g.handlePluginScope(user, namespace) return g.handlePluginScope(user, namespace)
case ScopeDatasource: case live.ScopeDatasource:
return g.handleDatasourceScope(user, namespace) return g.handleDatasourceScope(user, namespace)
case ScopeStream: case live.ScopeStream:
return g.handleStreamScope(user, namespace) return g.handleStreamScope(user, namespace)
case ScopePush: case live.ScopePush:
return g.handlePushScope(user, namespace) return g.handlePushScope(user, namespace)
default: default:
return nil, fmt.Errorf("invalid scope: %q", scope) return nil, fmt.Errorf("invalid scope: %q", scope)
@ -403,7 +405,14 @@ func (g *GrafanaLive) handlePushScope(_ *models.SignedInUser, namespace string)
func (g *GrafanaLive) handleDatasourceScope(user *models.SignedInUser, namespace string) (models.ChannelHandlerFactory, error) { func (g *GrafanaLive) handleDatasourceScope(user *models.SignedInUser, namespace string) (models.ChannelHandlerFactory, error) {
ds, err := g.DatasourceCache.GetDatasourceByUID(namespace, user, false) ds, err := g.DatasourceCache.GetDatasourceByUID(namespace, user, false)
if err != nil { if err != nil {
return nil, fmt.Errorf("error getting datasource: %w", err) // the namespace may be an ID
id, _ := strconv.ParseInt(namespace, 10, 64)
if id > 0 {
ds, err = g.DatasourceCache.GetDatasource(id, user, false)
}
if err != nil {
return nil, fmt.Errorf("error getting datasource: %w", err)
}
} }
streamHandler, err := g.getStreamPlugin(ds.Type) streamHandler, err := g.getStreamPlugin(ds.Type)
if err != nil { if err != nil {
@ -430,7 +439,7 @@ func (g *GrafanaLive) IsEnabled() bool {
} }
func (g *GrafanaLive) HandleHTTPPublish(ctx *models.ReqContext, cmd dtos.LivePublishCmd) response.Response { func (g *GrafanaLive) HandleHTTPPublish(ctx *models.ReqContext, cmd dtos.LivePublishCmd) response.Response {
addr := ParseChannel(cmd.Channel) addr := live.ParseChannel(cmd.Channel)
if !addr.IsValid() { if !addr.IsValid() {
return response.Error(http.StatusBadRequest, "Bad channel address", nil) return response.Error(http.StatusBadRequest, "Bad channel address", nil)
} }

View File

@ -8,6 +8,7 @@ import (
"github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data" "github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana-plugin-sdk-go/live"
"github.com/grafana/grafana/pkg/models" "github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/util" "github.com/grafana/grafana/pkg/util"
) )
@ -111,7 +112,7 @@ func (s *ManagedStream) Push(path string, frame *data.Frame) error {
} }
// The channel this will be posted into. // The channel this will be posted into.
channel := Channel{Scope: ScopeStream, Namespace: s.id, Path: path}.String() channel := live.Channel{Scope: live.ScopeStream, Namespace: s.id, Path: path}.String()
logger.Debug("Publish data to channel", "channel", channel, "dataLength", len(frameJSON)) logger.Debug("Publish data to channel", "channel", channel, "dataLength", len(frameJSON))
return s.publisher(channel, frameJSON) return s.publisher(channel, frameJSON)
} }

View File

@ -1,14 +0,0 @@
package live
const (
// ScopeGrafana contains builtin features of Grafana Core.
ScopeGrafana = "grafana"
// ScopePlugin passes control to a plugin.
ScopePlugin = "plugin"
// ScopeDatasource passes control to a datasource plugin.
ScopeDatasource = "ds"
// ScopeStream is a managed data frame stream.
ScopeStream = "stream"
// ScopePush allows sending data into managed streams. It does not support subscriptions.
ScopePush = "push"
)

View File

@ -178,6 +178,7 @@ export class CentrifugeLiveChannel<TMessage = any, TPublish = any> implements Li
shutdownWithError(err: string) { shutdownWithError(err: string) {
this.currentStatus.error = err; this.currentStatus.error = err;
this.sendStatus();
this.disconnect(); this.disconnect();
} }
} }

View File

@ -1,7 +1,7 @@
import Centrifuge from 'centrifuge/dist/centrifuge'; import Centrifuge from 'centrifuge/dist/centrifuge';
import { GrafanaLiveSrv, setGrafanaLiveSrv, getGrafanaLiveSrv, config } from '@grafana/runtime'; import { GrafanaLiveSrv, setGrafanaLiveSrv, getGrafanaLiveSrv, config } from '@grafana/runtime';
import { BehaviorSubject } from 'rxjs'; import { BehaviorSubject } from 'rxjs';
import { LiveChannel, LiveChannelScope, LiveChannelAddress } from '@grafana/data'; import { LiveChannel, LiveChannelScope, LiveChannelAddress, LiveChannelConnectionState } from '@grafana/data';
import { CentrifugeLiveChannel, getErrorChannel } from './channel'; import { CentrifugeLiveChannel, getErrorChannel } from './channel';
import { import {
GrafanaLiveScope, GrafanaLiveScope,
@ -104,7 +104,10 @@ export class CentrifugeSrv implements GrafanaLiveSrv {
// Initialize the channel in the background // Initialize the channel in the background
this.initChannel(scope, channel).catch((err) => { this.initChannel(scope, channel).catch((err) => {
channel?.shutdownWithError(err); if (channel) {
channel.currentStatus.state = LiveChannelConnectionState.Invalid;
channel.shutdownWithError(err);
}
this.open.delete(id); this.open.delete(id);
}); });
@ -116,7 +119,7 @@ export class CentrifugeSrv implements GrafanaLiveSrv {
const { addr } = channel; const { addr } = channel;
const support = await scope.getChannelSupport(addr.namespace); const support = await scope.getChannelSupport(addr.namespace);
if (!support) { if (!support) {
throw new Error(channel.addr.namespace + 'does not support streaming'); throw new Error(channel.addr.namespace + ' does not support streaming');
} }
const config = support.getChannelConfig(addr.path); const config = support.getChannelConfig(addr.path);
if (!config) { if (!config) {

View File

@ -73,7 +73,10 @@ export class GrafanaLiveDataSourceScope extends GrafanaLiveScope {
*/ */
async getChannelSupport(namespace: string) { async getChannelSupport(namespace: string) {
const ds = await getDataSourceSrv().get(namespace); const ds = await getDataSourceSrv().get(namespace);
return ds.channelSupport; if (ds.channelSupport) {
return ds.channelSupport;
}
return new LiveMeasurementsSupport(); // default support?
} }
/** /**
@ -119,10 +122,13 @@ export class GrafanaLivePluginScope extends GrafanaLiveScope {
*/ */
async getChannelSupport(namespace: string) { async getChannelSupport(namespace: string) {
const plugin = await loadPlugin(namespace); const plugin = await loadPlugin(namespace);
if (!plugin.channelSupport) { if (!plugin) {
throw new Error('Unknown plugin: ' + namespace); throw new Error('Unknown streaming plugin: ' + namespace);
} }
return plugin.channelSupport; if (plugin.channelSupport) {
return plugin.channelSupport; // explicit
}
throw new Error('Plugin does not support streaming: ' + namespace);
} }
/** /**

View File

@ -21,6 +21,7 @@ export class DatasourceSrv implements DataSourceService {
private datasources: Record<string, DataSourceApi> = {}; private datasources: Record<string, DataSourceApi> = {};
private settingsMapByName: Record<string, DataSourceInstanceSettings> = {}; private settingsMapByName: Record<string, DataSourceInstanceSettings> = {};
private settingsMapByUid: Record<string, DataSourceInstanceSettings> = {}; private settingsMapByUid: Record<string, DataSourceInstanceSettings> = {};
private settingsMapById: Record<string, DataSourceInstanceSettings> = {};
private defaultName = ''; private defaultName = '';
/** @ngInject */ /** @ngInject */
@ -38,6 +39,7 @@ export class DatasourceSrv implements DataSourceService {
for (const dsSettings of Object.values(settingsMapByName)) { for (const dsSettings of Object.values(settingsMapByName)) {
this.settingsMapByUid[dsSettings.uid] = dsSettings; this.settingsMapByUid[dsSettings.uid] = dsSettings;
this.settingsMapById[dsSettings.id] = dsSettings;
} }
} }
@ -115,9 +117,12 @@ export class DatasourceSrv implements DataSourceService {
return Promise.resolve(expressionDatasource); return Promise.resolve(expressionDatasource);
} }
const dsConfig = this.settingsMapByName[name]; let dsConfig = this.settingsMapByName[name];
if (!dsConfig) { if (!dsConfig) {
return Promise.reject({ message: `Datasource named ${name} was not found` }); dsConfig = this.settingsMapById[name];
if (!dsConfig) {
return Promise.reject({ message: `Datasource named ${name} was not found` });
}
} }
try { try {

View File

@ -16,7 +16,7 @@ import {
import { TestDataQuery, StreamingQuery } from './types'; import { TestDataQuery, StreamingQuery } from './types';
import { getRandomLine } from './LogIpsum'; import { getRandomLine } from './LogIpsum';
import { perf } from '@grafana/runtime/src/measurement/perf'; // not exported import { perf } from '@grafana/runtime/src/utils/perf'; // not exported
export const defaultStreamQuery: StreamingQuery = { export const defaultStreamQuery: StreamingQuery = {
type: 'signal', type: 'signal',