datasources: querier: better handling of timestamps (#108973)

* query: support query-local timestamps

* make it configurable

* lint fix

* corrected unit test

* fixed unit test
This commit is contained in:
Gábor Farkas 2025-08-05 08:35:57 +02:00 committed by GitHub
parent a7ba662b02
commit 0ccfdbdfae
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 276 additions and 49 deletions

View File

@ -407,23 +407,11 @@ func (dn *DSNode) Execute(ctx context.Context, now time.Time, _ mathexp.Vars, s
return mathexp.Results{}, MakeQueryError(dn.refID, dn.datasource.UID, err) return mathexp.Results{}, MakeQueryError(dn.refID, dn.datasource.UID, err)
} }
} else { } else {
// transform request from backend.QueryDataRequest to k8s request k8sReq, err := ConvertBackendRequestToDataRequest(req)
k8sReq := &data.QueryDataRequest{
TimeRange: data.TimeRange{
From: req.Queries[0].TimeRange.From.Format(time.RFC3339),
To: req.Queries[0].TimeRange.To.Format(time.RFC3339),
},
}
for _, q := range req.Queries {
var dataQuery data.DataQuery
err := json.Unmarshal(q.JSON, &dataQuery)
if err != nil { if err != nil {
return mathexp.Results{}, MakeQueryError(dn.refID, dn.datasource.UID, err) return mathexp.Results{}, MakeQueryError(dn.refID, dn.datasource.UID, err)
} }
k8sReq.Queries = append(k8sReq.Queries, dataQuery)
}
var err error
// make the query with a mt client // make the query with a mt client
resp, err = mtDSClient.QueryData(ctx, *k8sReq) resp, err = mtDSClient.QueryData(ctx, *k8sReq)

51
pkg/expr/query_convert.go Normal file
View File

@ -0,0 +1,51 @@
package expr
import (
"encoding/json"
"strconv"
"github.com/grafana/grafana-plugin-sdk-go/backend"
data "github.com/grafana/grafana-plugin-sdk-go/experimental/apis/data/v0alpha1"
)
func convertBackendQueryToDataQuery(q backend.DataQuery) (data.DataQuery, error) {
// we first restore it from the raw json data,
// this should take care of all datasource-specific (for example, specific
// for prometheus or loki) fields
var dataQuery data.DataQuery
err := json.Unmarshal(q.JSON, &dataQuery)
if err != nil {
return data.DataQuery{}, err
}
// then we override the result with values that are available as fields in backend.DataQuery
dataQuery.RefID = q.RefID
dataQuery.QueryType = q.QueryType
dataQuery.MaxDataPoints = q.MaxDataPoints
dataQuery.IntervalMS = float64(q.Interval.Nanoseconds()) / 1000000.0
dataQuery.TimeRange = &data.TimeRange{
From: strconv.FormatInt(q.TimeRange.From.UnixMilli(), 10),
To: strconv.FormatInt(q.TimeRange.To.UnixMilli(), 10),
}
return dataQuery, nil
}
func ConvertBackendRequestToDataRequest(req *backend.QueryDataRequest) (*data.QueryDataRequest, error) {
k8sReq := &data.QueryDataRequest{
// `backend.QueryDataRequest` does not have a concept of a top-level global from/to.
// timeRanges are always inside the queries,
// so we leave them empty here too.
}
for _, q := range req.Queries {
dataQuery, err := convertBackendQueryToDataQuery(q)
if err != nil {
return nil, err
}
k8sReq.Queries = append(k8sReq.Queries, dataQuery)
}
return k8sReq, nil
}

View File

@ -0,0 +1,50 @@
package expr
import (
"testing"
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
data "github.com/grafana/grafana-plugin-sdk-go/experimental/apis/data/v0alpha1"
"github.com/stretchr/testify/require"
)
func TestConvertBackendRequestToDataRequest(t *testing.T) {
input1 := backend.DataQuery{
RefID: "A",
QueryType: "large",
MaxDataPoints: 42,
Interval: time.Millisecond * 10,
TimeRange: backend.TimeRange{
From: time.UnixMilli(1753959290000),
To: time.UnixMilli(1753959390000),
},
JSON: []byte(`{ "field1": "value1" }`),
}
result1 := data.DataQuery{
CommonQueryProperties: data.CommonQueryProperties{
RefID: "A",
QueryType: "large",
MaxDataPoints: 42,
IntervalMS: 10.0,
TimeRange: &data.TimeRange{
From: "1753959290000",
To: "1753959390000",
},
},
}
result1.Set("field1", "value1")
req := backend.QueryDataRequest{
Queries: []backend.DataQuery{input1},
}
expected := data.QueryDataRequest{
Queries: []data.DataQuery{result1},
}
result, err := ConvertBackendRequestToDataRequest(&req)
require.NoError(t, err)
require.Equal(t, &expected, result)
}

View File

@ -2,7 +2,6 @@ package query
import ( import (
"context" "context"
"encoding/json"
"errors" "errors"
"fmt" "fmt"
"net/http" "net/http"
@ -14,7 +13,6 @@ import (
"github.com/grafana/grafana-plugin-sdk-go/backend/gtime" "github.com/grafana/grafana-plugin-sdk-go/backend/gtime"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
data "github.com/grafana/grafana-plugin-sdk-go/experimental/apis/data/v0alpha1"
"github.com/grafana/grafana/pkg/api/dtos" "github.com/grafana/grafana/pkg/api/dtos"
"github.com/grafana/grafana/pkg/apimachinery/errutil" "github.com/grafana/grafana/pkg/apimachinery/errutil"
"github.com/grafana/grafana/pkg/apimachinery/identity" "github.com/grafana/grafana/pkg/apimachinery/identity"
@ -88,6 +86,7 @@ type ServiceImpl struct {
concurrentQueryLimit int concurrentQueryLimit int
mtDatasourceClientBuilder mtdsclient.MTDatasourceClientBuilder mtDatasourceClientBuilder mtdsclient.MTDatasourceClientBuilder
headers map[string]string headers map[string]string
supportLocalTimeRange bool
} }
// Run ServiceImpl. // Run ServiceImpl.
@ -105,7 +104,7 @@ func (s *ServiceImpl) QueryData(ctx context.Context, user identity.Requester, sk
} }
} }
// Parse the request into parsed queries grouped by datasource uid // Parse the request into parsed queries grouped by datasource uid
parsedReq, err := s.parseMetricRequest(ctx, user, skipDSCache, reqDTO) parsedReq, err := s.parseMetricRequest(ctx, user, skipDSCache, reqDTO, s.supportLocalTimeRange)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -224,6 +223,7 @@ func QueryData(ctx context.Context, log log.Logger, dscache datasources.CacheSer
mtDatasourceClientBuilder: mtDatasourceClientBuilder, mtDatasourceClientBuilder: mtDatasourceClientBuilder,
headers: headers, headers: headers,
concurrentQueryLimit: 16, // TODO: make it configurable concurrentQueryLimit: 16, // TODO: make it configurable
supportLocalTimeRange: true,
} }
return s.QueryData(ctx, nil, false, reqDTO) return s.QueryData(ctx, nil, false, reqDTO)
} }
@ -303,32 +303,32 @@ func (s *ServiceImpl) handleQuerySingleDatasource(ctx context.Context, user iden
return s.pluginClient.QueryData(ctx, req) return s.pluginClient.QueryData(ctx, req)
} else { // multi tenant flow } else { // multi tenant flow
// transform request from backend.QueryDataRequest to k8s request // transform request from backend.QueryDataRequest to k8s request
k8sReq := &data.QueryDataRequest{ k8sReq, err := expr.ConvertBackendRequestToDataRequest(req)
TimeRange: data.TimeRange{
From: req.Queries[0].TimeRange.From.Format(time.RFC3339),
To: req.Queries[0].TimeRange.To.Format(time.RFC3339),
},
}
for _, q := range req.Queries {
var dataQuery data.DataQuery
err := json.Unmarshal(q.JSON, &dataQuery)
if err != nil { if err != nil {
return nil, err return nil, err
} }
k8sReq.Queries = append(k8sReq.Queries, dataQuery)
}
return mtDsClient.QueryData(ctx, *k8sReq) return mtDsClient.QueryData(ctx, *k8sReq)
} }
} }
func getTimeRange(query *simplejson.Json, globalFrom string, globalTo string) gtime.TimeRange {
from := query.Get("timeRange").Get("from").MustString("")
to := query.Get("timeRange").Get("to").MustString("")
if (from == "") && (to == "") {
from = globalFrom
to = globalTo
}
return gtime.NewTimeRange(from, to)
}
// parseRequest parses a request into parsed queries grouped by datasource uid // parseRequest parses a request into parsed queries grouped by datasource uid
func (s *ServiceImpl) parseMetricRequest(ctx context.Context, user identity.Requester, skipDSCache bool, reqDTO dtos.MetricRequest) (*parsedRequest, error) { func (s *ServiceImpl) parseMetricRequest(ctx context.Context, user identity.Requester, skipDSCache bool, reqDTO dtos.MetricRequest, supportLocalTimeRange bool) (*parsedRequest, error) {
if len(reqDTO.Queries) == 0 { if len(reqDTO.Queries) == 0 {
return nil, ErrNoQueriesFound return nil, ErrNoQueriesFound
} }
timeRange := gtime.NewTimeRange(reqDTO.From, reqDTO.To)
req := &parsedRequest{ req := &parsedRequest{
hasExpression: false, hasExpression: false,
parsedQueries: make(map[string][]parsedQuery), parsedQueries: make(map[string][]parsedQuery),
@ -357,6 +357,13 @@ func (s *ServiceImpl) parseMetricRequest(ctx context.Context, user identity.Requ
req.parsedQueries[ds.UID] = []parsedQuery{} req.parsedQueries[ds.UID] = []parsedQuery{}
} }
var timeRange gtime.TimeRange
if supportLocalTimeRange {
timeRange = getTimeRange(query, reqDTO.From, reqDTO.To)
} else {
timeRange = gtime.NewTimeRange(reqDTO.From, reqDTO.To)
}
modelJSON, err := query.MarshalJSON() modelJSON, err := query.MarshalJSON()
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -69,7 +69,7 @@ func TestIntegrationParseMetricRequest(t *testing.T) {
"type": "postgres" "type": "postgres"
} }
}`) }`)
parsedReq, err := tc.queryService.parseMetricRequest(context.Background(), tc.signedInUser, true, mr) parsedReq, err := tc.queryService.parseMetricRequest(context.Background(), tc.signedInUser, true, mr, false)
require.NoError(t, err) require.NoError(t, err)
require.NotNil(t, parsedReq) require.NotNil(t, parsedReq)
assert.False(t, parsedReq.hasExpression) assert.False(t, parsedReq.hasExpression)
@ -95,7 +95,7 @@ func TestIntegrationParseMetricRequest(t *testing.T) {
}`) }`)
mr.From = "" mr.From = ""
mr.To = "" mr.To = ""
parsedReq, err := tc.queryService.parseMetricRequest(context.Background(), tc.signedInUser, true, mr) parsedReq, err := tc.queryService.parseMetricRequest(context.Background(), tc.signedInUser, true, mr, false)
require.NoError(t, err) require.NoError(t, err)
require.NotNil(t, parsedReq) require.NotNil(t, parsedReq)
assert.False(t, parsedReq.hasExpression) assert.False(t, parsedReq.hasExpression)
@ -128,7 +128,7 @@ func TestIntegrationParseMetricRequest(t *testing.T) {
"type": "math", "type": "math",
"expression": "$A - 50" "expression": "$A - 50"
}`) }`)
parsedReq, err := tc.queryService.parseMetricRequest(context.Background(), tc.signedInUser, true, mr) parsedReq, err := tc.queryService.parseMetricRequest(context.Background(), tc.signedInUser, true, mr, false)
require.NoError(t, err) require.NoError(t, err)
require.NotNil(t, parsedReq) require.NotNil(t, parsedReq)
require.True(t, parsedReq.hasExpression) require.True(t, parsedReq.hasExpression)
@ -171,7 +171,7 @@ func TestIntegrationParseMetricRequest(t *testing.T) {
"type": "testdata" "type": "testdata"
} }
}`) }`)
parsedReq, err := tc.queryService.parseMetricRequest(context.Background(), tc.signedInUser, true, mr) parsedReq, err := tc.queryService.parseMetricRequest(context.Background(), tc.signedInUser, true, mr, false)
require.NoError(t, err) require.NoError(t, err)
require.NotNil(t, parsedReq) require.NotNil(t, parsedReq)
assert.False(t, parsedReq.hasExpression) assert.False(t, parsedReq.hasExpression)
@ -231,7 +231,7 @@ func TestIntegrationParseMetricRequest(t *testing.T) {
"type": "math", "type": "math",
"expression": "$A_resample + $B_resample" "expression": "$A_resample + $B_resample"
}`) }`)
parsedReq, err := tc.queryService.parseMetricRequest(context.Background(), tc.signedInUser, true, mr) parsedReq, err := tc.queryService.parseMetricRequest(context.Background(), tc.signedInUser, true, mr, false)
require.NoError(t, err) require.NoError(t, err)
require.NotNil(t, parsedReq) require.NotNil(t, parsedReq)
assert.True(t, parsedReq.hasExpression) assert.True(t, parsedReq.hasExpression)
@ -271,18 +271,18 @@ func TestIntegrationParseMetricRequest(t *testing.T) {
reqCtx.Req = httpreq reqCtx.Req = httpreq
httpreq.Header.Add("X-Datasource-Uid", "gIEkMvIVz") httpreq.Header.Add("X-Datasource-Uid", "gIEkMvIVz")
_, err = tc.queryService.parseMetricRequest(httpreq.Context(), tc.signedInUser, true, mr) _, err = tc.queryService.parseMetricRequest(httpreq.Context(), tc.signedInUser, true, mr, false)
require.NoError(t, err) require.NoError(t, err)
// With the second value it is OK // With the second value it is OK
httpreq.Header.Add("X-Datasource-Uid", "sEx6ZvSVk") httpreq.Header.Add("X-Datasource-Uid", "sEx6ZvSVk")
_, err = tc.queryService.parseMetricRequest(httpreq.Context(), tc.signedInUser, true, mr) _, err = tc.queryService.parseMetricRequest(httpreq.Context(), tc.signedInUser, true, mr, false)
require.NoError(t, err) require.NoError(t, err)
// Single header with comma syntax // Single header with comma syntax
httpreq, _ = http.NewRequest(http.MethodPost, "http://localhost/", bytes.NewReader([]byte{})) httpreq, _ = http.NewRequest(http.MethodPost, "http://localhost/", bytes.NewReader([]byte{}))
httpreq.Header.Set("X-Datasource-Uid", "gIEkMvIVz, sEx6ZvSVk") httpreq.Header.Set("X-Datasource-Uid", "gIEkMvIVz, sEx6ZvSVk")
_, err = tc.queryService.parseMetricRequest(httpreq.Context(), tc.signedInUser, true, mr) _, err = tc.queryService.parseMetricRequest(httpreq.Context(), tc.signedInUser, true, mr, false)
require.NoError(t, err) require.NoError(t, err)
}) })
@ -301,9 +301,132 @@ func TestIntegrationParseMetricRequest(t *testing.T) {
"type": "postgres" "type": "postgres"
} }
}`) }`)
_, err := tc.queryService.parseMetricRequest(context.Background(), tc.signedInUser, true, mr) _, err := tc.queryService.parseMetricRequest(context.Background(), tc.signedInUser, true, mr, false)
require.Error(t, err) require.Error(t, err)
}) })
t.Run("Test a datasource query with global time range", func(t *testing.T) {
tc := setup(t, false, nil)
mr := metricRequestWithQueries(t, `{
"refId": "A",
"datasource": {
"uid": "gIEkMvIVz",
"type": "postgres"
}
}`, `{
"refId": "B",
"datasource": {
"uid": "gIEkMvIVz",
"type": "postgres"
}
}`)
mr.From = "1753944628000"
mr.To = "1753944629000"
parsedReq, err := tc.queryService.parseMetricRequest(context.Background(), tc.signedInUser, true, mr, false)
require.NoError(t, err)
require.NotNil(t, parsedReq)
assert.Len(t, parsedReq.parsedQueries, 1)
assert.Contains(t, parsedReq.parsedQueries, "gIEkMvIVz")
queries := parsedReq.getFlattenedQueries()
assert.Len(t, queries, 2)
for _, q := range queries {
assert.Equal(t, int64(1753944628000), q.query.TimeRange.From.UnixMilli())
assert.Equal(t, int64(1753944629000), q.query.TimeRange.To.UnixMilli())
}
})
t.Run("Test a datasource query with local time range", func(t *testing.T) {
tc := setup(t, false, nil)
mr := metricRequestWithQueries(t, `{
"refId": "A",
"datasource": {
"uid": "gIEkMvIVz",
"type": "postgres"
},
"timeRange": {
"from": "1753944618000",
"to": "1753944619000"
}
}`, `{
"refId": "B",
"datasource": {
"uid": "gIEkMvIVz",
"type": "postgres"
},
"timeRange": {
"from": "1753944628000",
"to": "1753944629000"
}
}`)
mr.From = ""
mr.To = ""
verifyTimestamps := func(parsedReq *parsedRequest, ts1 int64, ts2 int64, ts3 int64, ts4 int64) {
require.NotNil(t, parsedReq)
assert.Len(t, parsedReq.parsedQueries, 1)
assert.Contains(t, parsedReq.parsedQueries, "gIEkMvIVz")
queries := parsedReq.getFlattenedQueries()
assert.Len(t, queries, 2)
assert.Equal(t, ts1, queries[0].query.TimeRange.From.UnixMilli())
assert.Equal(t, ts2, queries[0].query.TimeRange.To.UnixMilli())
assert.Equal(t, ts3, queries[1].query.TimeRange.From.UnixMilli())
assert.Equal(t, ts4, queries[1].query.TimeRange.To.UnixMilli())
}
// with flag enabled
parsedReq, err := tc.queryService.parseMetricRequest(context.Background(), tc.signedInUser, true, mr, true)
require.NoError(t, err)
verifyTimestamps(parsedReq,
int64(1753944618000),
int64(1753944619000),
int64(1753944628000),
int64(1753944629000))
// with flag disabled
parsedReq2, err := tc.queryService.parseMetricRequest(context.Background(), tc.signedInUser, true, mr, false)
require.NoError(t, err)
verifyTimestamps(parsedReq2, int64(0), int64(0), int64(0), int64(0))
})
t.Run("Test a datasource query with malformed local time range", func(t *testing.T) {
tc := setup(t, false, nil)
mr := metricRequestWithQueries(t, `{
"refId": "A",
"datasource": {
"uid": "gIEkMvIVz",
"type": "postgres"
},
"timeRange": {
"from": "1753944618000",
"not_to": "1753944619000"
}
}`, `{
"refId": "B",
"datasource": {
"uid": "gIEkMvIVz",
"type": "postgres"
},
"timeRange": 42
}`)
mr.From = ""
mr.To = ""
parsedReq, err := tc.queryService.parseMetricRequest(context.Background(), tc.signedInUser, true, mr, true)
require.NoError(t, err)
require.NotNil(t, parsedReq)
assert.Len(t, parsedReq.parsedQueries, 1)
assert.Contains(t, parsedReq.parsedQueries, "gIEkMvIVz")
queries := parsedReq.getFlattenedQueries()
assert.Len(t, queries, 2)
assert.Equal(t, int64(1753944618000), queries[0].query.TimeRange.From.UnixMilli())
assert.Equal(t, int64(0), queries[0].query.TimeRange.To.UnixMilli())
assert.Equal(t, int64(0), queries[1].query.TimeRange.From.UnixMilli())
assert.Equal(t, int64(0), queries[1].query.TimeRange.To.UnixMilli())
})
} }
func TestIntegrationQueryDataMultipleSources(t *testing.T) { func TestIntegrationQueryDataMultipleSources(t *testing.T) {
@ -515,17 +638,13 @@ func TestIntegrationQueryDataWithMTDSClient(t *testing.T) {
"type": "postgres" "type": "postgres"
} }
}`) }`)
mr.From = "2022-01-01" mr.From = "1754309340000"
mr.To = "2022-01-02" mr.To = "1754309370000"
ctx := context.Background() ctx := context.Background()
_, err := tc.queryService.QueryData(ctx, tc.signedInUser, true, mr) _, err := tc.queryService.QueryData(ctx, tc.signedInUser, true, mr)
require.NoError(t, err) require.NoError(t, err)
assert.Equal(t, data.QueryDataRequest{ assert.Equal(t, data.QueryDataRequest{
TimeRange: data.TimeRange{
From: "2022-01-01T00:00:00Z",
To: "2022-01-02T00:00:00Z",
},
Queries: []data.DataQuery{ Queries: []data.DataQuery{
{ {
CommonQueryProperties: data.CommonQueryProperties{ CommonQueryProperties: data.CommonQueryProperties{
@ -534,6 +653,12 @@ func TestIntegrationQueryDataWithMTDSClient(t *testing.T) {
Type: "postgres", Type: "postgres",
UID: "gIEkMvIVz", UID: "gIEkMvIVz",
}, },
TimeRange: &data.TimeRange{
From: "1754309340000",
To: "1754309370000",
},
MaxDataPoints: 100,
IntervalMS: 1000,
}, },
}, },
{ {
@ -543,6 +668,12 @@ func TestIntegrationQueryDataWithMTDSClient(t *testing.T) {
Type: "postgres", Type: "postgres",
UID: "gIEkMvIVz", UID: "gIEkMvIVz",
}, },
TimeRange: &data.TimeRange{
From: "1754309340000",
To: "1754309370000",
},
MaxDataPoints: 100,
IntervalMS: 1000,
}, },
}, },
}, },