diff --git a/pkg/expr/nodes.go b/pkg/expr/nodes.go index 83212715625..300488ae643 100644 --- a/pkg/expr/nodes.go +++ b/pkg/expr/nodes.go @@ -407,23 +407,11 @@ func (dn *DSNode) Execute(ctx context.Context, now time.Time, _ mathexp.Vars, s return mathexp.Results{}, MakeQueryError(dn.refID, dn.datasource.UID, err) } } else { - // transform request from backend.QueryDataRequest to k8s request - k8sReq := &data.QueryDataRequest{ - TimeRange: data.TimeRange{ - From: req.Queries[0].TimeRange.From.Format(time.RFC3339), - To: req.Queries[0].TimeRange.To.Format(time.RFC3339), - }, + k8sReq, err := ConvertBackendRequestToDataRequest(req) + if err != nil { + return mathexp.Results{}, MakeQueryError(dn.refID, dn.datasource.UID, err) } - for _, q := range req.Queries { - var dataQuery data.DataQuery - err := json.Unmarshal(q.JSON, &dataQuery) - if err != nil { - return mathexp.Results{}, MakeQueryError(dn.refID, dn.datasource.UID, err) - } - k8sReq.Queries = append(k8sReq.Queries, dataQuery) - } - var err error // make the query with a mt client resp, err = mtDSClient.QueryData(ctx, *k8sReq) diff --git a/pkg/expr/query_convert.go b/pkg/expr/query_convert.go new file mode 100644 index 00000000000..cb8f6b2e08f --- /dev/null +++ b/pkg/expr/query_convert.go @@ -0,0 +1,51 @@ +package expr + +import ( + "encoding/json" + "strconv" + + "github.com/grafana/grafana-plugin-sdk-go/backend" + data "github.com/grafana/grafana-plugin-sdk-go/experimental/apis/data/v0alpha1" +) + +func convertBackendQueryToDataQuery(q backend.DataQuery) (data.DataQuery, error) { + // we first restore it from the raw json data, + // this should take care of all datasource-specific (for example, specific + // for prometheus or loki) fields + var dataQuery data.DataQuery + err := json.Unmarshal(q.JSON, &dataQuery) + if err != nil { + return data.DataQuery{}, err + } + + // then we override the result with values that are available as fields in backend.DataQuery + dataQuery.RefID = q.RefID + dataQuery.QueryType = q.QueryType + dataQuery.MaxDataPoints = q.MaxDataPoints + dataQuery.IntervalMS = float64(q.Interval.Nanoseconds()) / 1000000.0 + dataQuery.TimeRange = &data.TimeRange{ + From: strconv.FormatInt(q.TimeRange.From.UnixMilli(), 10), + To: strconv.FormatInt(q.TimeRange.To.UnixMilli(), 10), + } + + return dataQuery, nil +} + +func ConvertBackendRequestToDataRequest(req *backend.QueryDataRequest) (*data.QueryDataRequest, error) { + k8sReq := &data.QueryDataRequest{ + // `backend.QueryDataRequest` does not have a concept of a top-level global from/to. + // timeRanges are always inside the queries, + // so we leave them empty here too. + } + + for _, q := range req.Queries { + dataQuery, err := convertBackendQueryToDataQuery(q) + if err != nil { + return nil, err + } + + k8sReq.Queries = append(k8sReq.Queries, dataQuery) + } + + return k8sReq, nil +} diff --git a/pkg/expr/query_convert_test.go b/pkg/expr/query_convert_test.go new file mode 100644 index 00000000000..1278f734a40 --- /dev/null +++ b/pkg/expr/query_convert_test.go @@ -0,0 +1,50 @@ +package expr + +import ( + "testing" + "time" + + "github.com/grafana/grafana-plugin-sdk-go/backend" + data "github.com/grafana/grafana-plugin-sdk-go/experimental/apis/data/v0alpha1" + "github.com/stretchr/testify/require" +) + +func TestConvertBackendRequestToDataRequest(t *testing.T) { + input1 := backend.DataQuery{ + RefID: "A", + QueryType: "large", + MaxDataPoints: 42, + Interval: time.Millisecond * 10, + TimeRange: backend.TimeRange{ + From: time.UnixMilli(1753959290000), + To: time.UnixMilli(1753959390000), + }, + JSON: []byte(`{ "field1": "value1" }`), + } + + result1 := data.DataQuery{ + CommonQueryProperties: data.CommonQueryProperties{ + RefID: "A", + QueryType: "large", + MaxDataPoints: 42, + IntervalMS: 10.0, + TimeRange: &data.TimeRange{ + From: "1753959290000", + To: "1753959390000", + }, + }, + } + result1.Set("field1", "value1") + + req := backend.QueryDataRequest{ + Queries: []backend.DataQuery{input1}, + } + + expected := data.QueryDataRequest{ + Queries: []data.DataQuery{result1}, + } + + result, err := ConvertBackendRequestToDataRequest(&req) + require.NoError(t, err) + require.Equal(t, &expected, result) +} diff --git a/pkg/services/query/query.go b/pkg/services/query/query.go index 73612d41f22..7aaafdea105 100644 --- a/pkg/services/query/query.go +++ b/pkg/services/query/query.go @@ -2,7 +2,6 @@ package query import ( "context" - "encoding/json" "errors" "fmt" "net/http" @@ -14,7 +13,6 @@ import ( "github.com/grafana/grafana-plugin-sdk-go/backend/gtime" "golang.org/x/sync/errgroup" - data "github.com/grafana/grafana-plugin-sdk-go/experimental/apis/data/v0alpha1" "github.com/grafana/grafana/pkg/api/dtos" "github.com/grafana/grafana/pkg/apimachinery/errutil" "github.com/grafana/grafana/pkg/apimachinery/identity" @@ -88,6 +86,7 @@ type ServiceImpl struct { concurrentQueryLimit int mtDatasourceClientBuilder mtdsclient.MTDatasourceClientBuilder headers map[string]string + supportLocalTimeRange bool } // Run ServiceImpl. @@ -105,7 +104,7 @@ func (s *ServiceImpl) QueryData(ctx context.Context, user identity.Requester, sk } } // Parse the request into parsed queries grouped by datasource uid - parsedReq, err := s.parseMetricRequest(ctx, user, skipDSCache, reqDTO) + parsedReq, err := s.parseMetricRequest(ctx, user, skipDSCache, reqDTO, s.supportLocalTimeRange) if err != nil { return nil, err } @@ -224,6 +223,7 @@ func QueryData(ctx context.Context, log log.Logger, dscache datasources.CacheSer mtDatasourceClientBuilder: mtDatasourceClientBuilder, headers: headers, concurrentQueryLimit: 16, // TODO: make it configurable + supportLocalTimeRange: true, } return s.QueryData(ctx, nil, false, reqDTO) } @@ -303,32 +303,32 @@ func (s *ServiceImpl) handleQuerySingleDatasource(ctx context.Context, user iden return s.pluginClient.QueryData(ctx, req) } else { // multi tenant flow // transform request from backend.QueryDataRequest to k8s request - k8sReq := &data.QueryDataRequest{ - TimeRange: data.TimeRange{ - From: req.Queries[0].TimeRange.From.Format(time.RFC3339), - To: req.Queries[0].TimeRange.To.Format(time.RFC3339), - }, - } - for _, q := range req.Queries { - var dataQuery data.DataQuery - err := json.Unmarshal(q.JSON, &dataQuery) - if err != nil { - return nil, err - } - - k8sReq.Queries = append(k8sReq.Queries, dataQuery) + k8sReq, err := expr.ConvertBackendRequestToDataRequest(req) + if err != nil { + return nil, err } return mtDsClient.QueryData(ctx, *k8sReq) } } +func getTimeRange(query *simplejson.Json, globalFrom string, globalTo string) gtime.TimeRange { + from := query.Get("timeRange").Get("from").MustString("") + to := query.Get("timeRange").Get("to").MustString("") + + if (from == "") && (to == "") { + from = globalFrom + to = globalTo + } + + return gtime.NewTimeRange(from, to) +} + // parseRequest parses a request into parsed queries grouped by datasource uid -func (s *ServiceImpl) parseMetricRequest(ctx context.Context, user identity.Requester, skipDSCache bool, reqDTO dtos.MetricRequest) (*parsedRequest, error) { +func (s *ServiceImpl) parseMetricRequest(ctx context.Context, user identity.Requester, skipDSCache bool, reqDTO dtos.MetricRequest, supportLocalTimeRange bool) (*parsedRequest, error) { if len(reqDTO.Queries) == 0 { return nil, ErrNoQueriesFound } - timeRange := gtime.NewTimeRange(reqDTO.From, reqDTO.To) req := &parsedRequest{ hasExpression: false, parsedQueries: make(map[string][]parsedQuery), @@ -357,6 +357,13 @@ func (s *ServiceImpl) parseMetricRequest(ctx context.Context, user identity.Requ req.parsedQueries[ds.UID] = []parsedQuery{} } + var timeRange gtime.TimeRange + if supportLocalTimeRange { + timeRange = getTimeRange(query, reqDTO.From, reqDTO.To) + } else { + timeRange = gtime.NewTimeRange(reqDTO.From, reqDTO.To) + } + modelJSON, err := query.MarshalJSON() if err != nil { return nil, err diff --git a/pkg/services/query/query_test.go b/pkg/services/query/query_test.go index 66c4ff17f85..7187312da50 100644 --- a/pkg/services/query/query_test.go +++ b/pkg/services/query/query_test.go @@ -69,7 +69,7 @@ func TestIntegrationParseMetricRequest(t *testing.T) { "type": "postgres" } }`) - parsedReq, err := tc.queryService.parseMetricRequest(context.Background(), tc.signedInUser, true, mr) + parsedReq, err := tc.queryService.parseMetricRequest(context.Background(), tc.signedInUser, true, mr, false) require.NoError(t, err) require.NotNil(t, parsedReq) assert.False(t, parsedReq.hasExpression) @@ -95,7 +95,7 @@ func TestIntegrationParseMetricRequest(t *testing.T) { }`) mr.From = "" mr.To = "" - parsedReq, err := tc.queryService.parseMetricRequest(context.Background(), tc.signedInUser, true, mr) + parsedReq, err := tc.queryService.parseMetricRequest(context.Background(), tc.signedInUser, true, mr, false) require.NoError(t, err) require.NotNil(t, parsedReq) assert.False(t, parsedReq.hasExpression) @@ -128,7 +128,7 @@ func TestIntegrationParseMetricRequest(t *testing.T) { "type": "math", "expression": "$A - 50" }`) - parsedReq, err := tc.queryService.parseMetricRequest(context.Background(), tc.signedInUser, true, mr) + parsedReq, err := tc.queryService.parseMetricRequest(context.Background(), tc.signedInUser, true, mr, false) require.NoError(t, err) require.NotNil(t, parsedReq) require.True(t, parsedReq.hasExpression) @@ -171,7 +171,7 @@ func TestIntegrationParseMetricRequest(t *testing.T) { "type": "testdata" } }`) - parsedReq, err := tc.queryService.parseMetricRequest(context.Background(), tc.signedInUser, true, mr) + parsedReq, err := tc.queryService.parseMetricRequest(context.Background(), tc.signedInUser, true, mr, false) require.NoError(t, err) require.NotNil(t, parsedReq) assert.False(t, parsedReq.hasExpression) @@ -231,7 +231,7 @@ func TestIntegrationParseMetricRequest(t *testing.T) { "type": "math", "expression": "$A_resample + $B_resample" }`) - parsedReq, err := tc.queryService.parseMetricRequest(context.Background(), tc.signedInUser, true, mr) + parsedReq, err := tc.queryService.parseMetricRequest(context.Background(), tc.signedInUser, true, mr, false) require.NoError(t, err) require.NotNil(t, parsedReq) assert.True(t, parsedReq.hasExpression) @@ -271,18 +271,18 @@ func TestIntegrationParseMetricRequest(t *testing.T) { reqCtx.Req = httpreq httpreq.Header.Add("X-Datasource-Uid", "gIEkMvIVz") - _, err = tc.queryService.parseMetricRequest(httpreq.Context(), tc.signedInUser, true, mr) + _, err = tc.queryService.parseMetricRequest(httpreq.Context(), tc.signedInUser, true, mr, false) require.NoError(t, err) // With the second value it is OK httpreq.Header.Add("X-Datasource-Uid", "sEx6ZvSVk") - _, err = tc.queryService.parseMetricRequest(httpreq.Context(), tc.signedInUser, true, mr) + _, err = tc.queryService.parseMetricRequest(httpreq.Context(), tc.signedInUser, true, mr, false) require.NoError(t, err) // Single header with comma syntax httpreq, _ = http.NewRequest(http.MethodPost, "http://localhost/", bytes.NewReader([]byte{})) httpreq.Header.Set("X-Datasource-Uid", "gIEkMvIVz, sEx6ZvSVk") - _, err = tc.queryService.parseMetricRequest(httpreq.Context(), tc.signedInUser, true, mr) + _, err = tc.queryService.parseMetricRequest(httpreq.Context(), tc.signedInUser, true, mr, false) require.NoError(t, err) }) @@ -301,9 +301,132 @@ func TestIntegrationParseMetricRequest(t *testing.T) { "type": "postgres" } }`) - _, err := tc.queryService.parseMetricRequest(context.Background(), tc.signedInUser, true, mr) + _, err := tc.queryService.parseMetricRequest(context.Background(), tc.signedInUser, true, mr, false) require.Error(t, err) }) + + t.Run("Test a datasource query with global time range", func(t *testing.T) { + tc := setup(t, false, nil) + mr := metricRequestWithQueries(t, `{ + "refId": "A", + "datasource": { + "uid": "gIEkMvIVz", + "type": "postgres" + } + }`, `{ + "refId": "B", + "datasource": { + "uid": "gIEkMvIVz", + "type": "postgres" + } + }`) + mr.From = "1753944628000" + mr.To = "1753944629000" + parsedReq, err := tc.queryService.parseMetricRequest(context.Background(), tc.signedInUser, true, mr, false) + require.NoError(t, err) + require.NotNil(t, parsedReq) + assert.Len(t, parsedReq.parsedQueries, 1) + assert.Contains(t, parsedReq.parsedQueries, "gIEkMvIVz") + queries := parsedReq.getFlattenedQueries() + assert.Len(t, queries, 2) + for _, q := range queries { + assert.Equal(t, int64(1753944628000), q.query.TimeRange.From.UnixMilli()) + assert.Equal(t, int64(1753944629000), q.query.TimeRange.To.UnixMilli()) + } + }) + t.Run("Test a datasource query with local time range", func(t *testing.T) { + tc := setup(t, false, nil) + mr := metricRequestWithQueries(t, `{ + "refId": "A", + "datasource": { + "uid": "gIEkMvIVz", + "type": "postgres" + }, + "timeRange": { + "from": "1753944618000", + "to": "1753944619000" + } + }`, `{ + "refId": "B", + "datasource": { + "uid": "gIEkMvIVz", + "type": "postgres" + }, + "timeRange": { + "from": "1753944628000", + "to": "1753944629000" + } + }`) + mr.From = "" + mr.To = "" + + verifyTimestamps := func(parsedReq *parsedRequest, ts1 int64, ts2 int64, ts3 int64, ts4 int64) { + require.NotNil(t, parsedReq) + assert.Len(t, parsedReq.parsedQueries, 1) + assert.Contains(t, parsedReq.parsedQueries, "gIEkMvIVz") + queries := parsedReq.getFlattenedQueries() + assert.Len(t, queries, 2) + + assert.Equal(t, ts1, queries[0].query.TimeRange.From.UnixMilli()) + assert.Equal(t, ts2, queries[0].query.TimeRange.To.UnixMilli()) + + assert.Equal(t, ts3, queries[1].query.TimeRange.From.UnixMilli()) + assert.Equal(t, ts4, queries[1].query.TimeRange.To.UnixMilli()) + } + + // with flag enabled + parsedReq, err := tc.queryService.parseMetricRequest(context.Background(), tc.signedInUser, true, mr, true) + require.NoError(t, err) + + verifyTimestamps(parsedReq, + int64(1753944618000), + int64(1753944619000), + int64(1753944628000), + int64(1753944629000)) + + // with flag disabled + parsedReq2, err := tc.queryService.parseMetricRequest(context.Background(), tc.signedInUser, true, mr, false) + require.NoError(t, err) + + verifyTimestamps(parsedReq2, int64(0), int64(0), int64(0), int64(0)) + }) + + t.Run("Test a datasource query with malformed local time range", func(t *testing.T) { + tc := setup(t, false, nil) + mr := metricRequestWithQueries(t, `{ + "refId": "A", + "datasource": { + "uid": "gIEkMvIVz", + "type": "postgres" + }, + "timeRange": { + "from": "1753944618000", + "not_to": "1753944619000" + } + }`, `{ + "refId": "B", + "datasource": { + "uid": "gIEkMvIVz", + "type": "postgres" + }, + "timeRange": 42 + }`) + mr.From = "" + mr.To = "" + parsedReq, err := tc.queryService.parseMetricRequest(context.Background(), tc.signedInUser, true, mr, true) + require.NoError(t, err) + require.NotNil(t, parsedReq) + assert.Len(t, parsedReq.parsedQueries, 1) + assert.Contains(t, parsedReq.parsedQueries, "gIEkMvIVz") + queries := parsedReq.getFlattenedQueries() + assert.Len(t, queries, 2) + + assert.Equal(t, int64(1753944618000), queries[0].query.TimeRange.From.UnixMilli()) + assert.Equal(t, int64(0), queries[0].query.TimeRange.To.UnixMilli()) + + assert.Equal(t, int64(0), queries[1].query.TimeRange.From.UnixMilli()) + assert.Equal(t, int64(0), queries[1].query.TimeRange.To.UnixMilli()) + }) } func TestIntegrationQueryDataMultipleSources(t *testing.T) { @@ -515,17 +638,13 @@ func TestIntegrationQueryDataWithMTDSClient(t *testing.T) { "type": "postgres" } }`) - mr.From = "2022-01-01" - mr.To = "2022-01-02" + mr.From = "1754309340000" + mr.To = "1754309370000" ctx := context.Background() _, err := tc.queryService.QueryData(ctx, tc.signedInUser, true, mr) require.NoError(t, err) assert.Equal(t, data.QueryDataRequest{ - TimeRange: data.TimeRange{ - From: "2022-01-01T00:00:00Z", - To: "2022-01-02T00:00:00Z", - }, Queries: []data.DataQuery{ { CommonQueryProperties: data.CommonQueryProperties{ @@ -534,6 +653,12 @@ func TestIntegrationQueryDataWithMTDSClient(t *testing.T) { Type: "postgres", UID: "gIEkMvIVz", }, + TimeRange: &data.TimeRange{ + From: "1754309340000", + To: "1754309370000", + }, + MaxDataPoints: 100, + IntervalMS: 1000, }, }, { @@ -543,6 +668,12 @@ func TestIntegrationQueryDataWithMTDSClient(t *testing.T) { Type: "postgres", UID: "gIEkMvIVz", }, + TimeRange: &data.TimeRange{ + From: "1754309340000", + To: "1754309370000", + }, + MaxDataPoints: 100, + IntervalMS: 1000, }, }, },