SQL Expressions: Query Service Support (#101955)

---------

Co-authored-by: Adam Simpson <adam@adamsimpson.net>
Co-authored-by: Sarah Zinger <sarah.zinger@grafana.com>
This commit is contained in:
Kyle Brandt 2025-04-03 09:36:02 -04:00 committed by GitHub
parent 370f4c2bcd
commit 4a0ec27e5d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 118 additions and 57 deletions

View File

@ -23,11 +23,17 @@ type ResultConverter struct {
func (c *ResultConverter) Convert(ctx context.Context,
datasourceType string,
frames data.Frames,
forSqlInput bool,
) (string, mathexp.Results, error) {
if len(frames) == 0 {
return "no-data", mathexp.Results{Values: mathexp.Values{mathexp.NewNoData()}}, nil
}
if forSqlInput {
results, err := handleSqlInput(frames)
return "sql input", results, err
}
var dt data.FrameType
dt, useDataplane, _ := shouldUseDataplane(frames, logger, c.Features.IsEnabled(ctx, featuremgmt.FlagDisableSSEDataplane))
if useDataplane {
@ -120,6 +126,43 @@ func (c *ResultConverter) Convert(ctx context.Context,
}, nil
}
// copied from pkg/expr/nodes.go from within the Execute method
func handleSqlInput(dataFrames data.Frames) (mathexp.Results, error) {
var result mathexp.Results
var needsConversion bool
// Convert it if Multi:
if len(dataFrames) > 1 {
needsConversion = true
}
// Convert it if Wide (has labels):
if len(dataFrames) == 1 {
for _, field := range dataFrames[0].Fields {
if len(field.Labels) > 0 {
needsConversion = true
break
}
}
}
if needsConversion {
convertedFrames, err := ConvertToFullLong(dataFrames)
if err != nil {
return result, fmt.Errorf("failed to convert data frames to long format for sql: %w", err)
}
result.Values = mathexp.Values{
mathexp.TableData{Frame: convertedFrames[0]},
}
return result, nil
}
// Otherwise it is already Long format; return as is
result.Values = mathexp.Values{
mathexp.TableData{Frame: dataFrames[0]},
}
return result, nil
}
func getResponseFrame(logger *log.ConcreteLogger, resp *backend.QueryDataResponse, refID string) (data.Frames, error) {
response, ok := resp.Responses[refID]
if !ok {

View File

@ -40,7 +40,7 @@ func TestConvertDataFramesToResults(t *testing.T) {
for _, dtype := range supported {
t.Run(dtype, func(t *testing.T) {
resultType, res, err := converter.Convert(context.Background(), dtype, frames)
resultType, res, err := converter.Convert(context.Background(), dtype, frames, false)
require.NoError(t, err)
assert.Equal(t, "single frame series", resultType)
require.Len(t, res.Values, 2)
@ -68,7 +68,7 @@ func TestConvertDataFramesToResults(t *testing.T) {
for _, dtype := range supported {
t.Run(dtype, func(t *testing.T) {
resultType, res, err := converter.Convert(context.Background(), dtype, frames)
resultType, res, err := converter.Convert(context.Background(), dtype, frames, false)
require.NoError(t, err)
assert.Equal(t, "multi frame series", resultType)
require.Len(t, res.Values, 2)
@ -101,7 +101,7 @@ func TestConvertDataFramesToResults(t *testing.T) {
for _, dtype := range supported {
t.Run(dtype, func(t *testing.T) {
resultType, res, err := converter.Convert(context.Background(), dtype, frames)
resultType, res, err := converter.Convert(context.Background(), dtype, frames, false)
require.NoError(t, err)
assert.Equal(t, "multi frame series", resultType)
require.Len(t, res.Values, 2)

View File

@ -130,7 +130,7 @@ func (m *MLNode) Execute(ctx context.Context, now time.Time, _ mathexp.Vars, s *
}
// process the response the same way DSNode does. Use plugin ID as data source type. Semantically, they are the same.
responseType, result, err = s.converter.Convert(ctx, mlPluginID, dataFrames)
responseType, result, err = s.converter.Convert(ctx, mlPluginID, dataFrames, false)
return result, err
}

View File

@ -341,7 +341,7 @@ func executeDSNodesGrouped(ctx context.Context, now time.Time, vars mathexp.Vars
}
var result mathexp.Results
responseType, result, err := s.converter.Convert(ctx, dn.datasource.Type, dataFrames)
responseType, result, err := s.converter.Convert(ctx, dn.datasource.Type, dataFrames, dn.isInputToSQLExpr)
if err != nil {
result.Error = makeConversionError(dn.RefID(), err)
}
@ -409,44 +409,9 @@ func (dn *DSNode) Execute(ctx context.Context, now time.Time, _ mathexp.Vars, s
}
var result mathexp.Results
// If the datasource node is an input to a SQL expression,
// the data must be in the Long format
if dn.isInputToSQLExpr {
var needsConversion bool
// Convert it if Multi:
if len(dataFrames) > 1 {
needsConversion = true
}
// Convert it if Wide (has labels):
if len(dataFrames) == 1 {
for _, field := range dataFrames[0].Fields {
if len(field.Labels) > 0 {
needsConversion = true
break
}
}
}
responseType, result, err = s.converter.Convert(ctx, dn.datasource.Type, dataFrames, dn.isInputToSQLExpr)
if needsConversion {
convertedFrames, err := ConvertToFullLong(dataFrames)
if err != nil {
return result, fmt.Errorf("failed to convert data frames to long format for sql: %w", err)
}
result.Values = mathexp.Values{
mathexp.TableData{Frame: convertedFrames[0]},
}
return result, nil
}
// Otherwise it is already Long format; return as is
result.Values = mathexp.Values{
mathexp.TableData{Frame: dataFrames[0]},
}
return result, nil
}
responseType, result, err = s.converter.Convert(ctx, dn.datasource.Type, dataFrames)
if err != nil {
err = makeConversionError(dn.refID, err)
}

View File

@ -126,9 +126,8 @@ func (h *ExpressionQueryReader) ReadQuery(
}
case QueryTypeSQL:
enabled := enableSqlExpressions(h)
if !enabled {
return eq, fmt.Errorf("sqlExpressions is not implemented")
if !h.features.IsEnabledGlobally(featuremgmt.FlagSqlExpressions) {
return eq, fmt.Errorf("sql expressions are disabled")
}
q := &SQLExpression{}
err = iter.ReadVal(q)
@ -192,11 +191,3 @@ func getReferenceVar(exp string, refId string) (string, error) {
}
return exp, nil
}
func enableSqlExpressions(h *ExpressionQueryReader) bool {
enabled := !h.features.IsEnabledGlobally(featuremgmt.FlagSqlExpressions)
if enabled {
return false
}
return false
}

View File

@ -43,6 +43,9 @@ type parsedRequestInfo struct {
// Hidden queries used as dependencies
HideBeforeReturn []string `json:"hide,omitempty"`
// SQL Inputs
SqlInputs map[string]struct{} `json:"sqlInputs,omitempty"`
}
type queryParser struct {
@ -71,6 +74,7 @@ func (p *queryParser) parseRequest(ctx context.Context, input *query.QueryDataRe
index := make(map[string]int) // index lookup
rsp := parsedRequestInfo{
RefIDTypes: make(map[string]string, len(input.Queries)),
SqlInputs: make(map[string]struct{}),
}
for _, q := range input.Queries {
@ -90,6 +94,7 @@ func (p *queryParser) parseRequest(ctx context.Context, input *query.QueryDataRe
}
// Process each query
// check if ds is expression
if expr.IsDataSource(ds.UID) {
// In order to process the query as a typed expression query, we
// are writing it back to JSON and parsing again. Alternatively we
@ -149,20 +154,40 @@ func (p *queryParser) parseRequest(ctx context.Context, input *query.QueryDataRe
// Build the graph for a request
dg := simple.NewDirectedGraph()
dg.AddNode(queryNode)
for _, exp := range expressions {
dg.AddNode(exp)
}
for _, exp := range expressions {
vars := exp.Command.NeedsVars()
for _, refId := range vars {
target := queryNode
q, ok := queryRefIDs[refId]
if !ok {
_, isSQLCMD := target.Command.(*expr.SQLCommand)
if isSQLCMD {
continue
} else {
target, ok = expressions[refId]
if !ok {
return rsp, makeDependencyError(exp.RefID, refId)
}
}
}
// If the input is SQL, conversion is handled differently
if _, isSqlExp := exp.Command.(*expr.SQLCommand); isSqlExp {
if _, ifDepIsAlsoExpression := expressions[refId]; ifDepIsAlsoExpression {
// Only allow data source nodes as SQL expression inputs for now
return rsp, fmt.Errorf("only data source queries may be inputs to a sql expression, %v is the input for %v", refId, exp.RefID)
} else {
rsp.SqlInputs[refId] = struct{}{}
}
}
// Do not hide queries used in variables
if q != nil && q.Hide {
q.Hide = false

View File

@ -166,6 +166,40 @@ func TestQuerySplitting(t *testing.T) {
})
}
func TestSqlInputs(t *testing.T) {
parser := newQueryParser(
expr.NewExpressionQueryReader(featuremgmt.WithFeatures(featuremgmt.FlagSqlExpressions)),
nil,
tracing.InitializeTracerForTest(),
log.NewNopLogger(),
)
parsedRequestInfo, err := parser.parseRequest(context.Background(), &query.QueryDataRequest{
QueryDataRequest: data.QueryDataRequest{
Queries: []data.DataQuery{
data.NewDataQuery(map[string]any{
"refId": "A",
"datasource": &data.DataSourceRef{
Type: "prometheus",
UID: "local-prom",
},
}),
data.NewDataQuery(map[string]any{
"refId": "B",
"datasource": &data.DataSourceRef{
Type: "__expr__",
UID: "__expr__",
},
"type": "sql",
"expression": "Select time, value + 10 from A",
}),
},
},
})
require.NoError(t, err)
require.Equal(t, parsedRequestInfo.SqlInputs["B"], struct{}{})
}
type legacyDataSourceRetriever struct{}
func (s *legacyDataSourceRetriever) GetDataSourceFromDeprecatedFields(ctx context.Context, name string, id int64) (*data.DataSourceRef, error) {

View File

@ -381,11 +381,14 @@ func (b *QueryAPIBuilder) handleExpressions(ctx context.Context, req parsedReque
if !ok {
dr, ok := qdr.Responses[refId]
if ok {
_, res, err := b.converter.Convert(ctx, req.RefIDTypes[refId], dr.Frames)
_, isSqlInput := req.SqlInputs[refId]
_, res, err := b.converter.Convert(ctx, req.RefIDTypes[refId], dr.Frames, isSqlInput)
if err != nil {
expressionsLogger.Error("error converting frames for expressions", "error", err)
res.Error = err
}
vars[refId] = res
} else {
expressionsLogger.Error("missing variable in handle expressions", "refId", refId, "expressionRefId", expression.RefID)
@ -427,7 +430,7 @@ func (b *QueryAPIBuilder) convertQueryWithoutExpression(ctx context.Context, req
return nil, fmt.Errorf("refID '%s' does not exist", refID)
}
frames := qdr.Responses[refID].Frames
_, results, err := b.converter.Convert(ctx, req.PluginId, frames)
_, results, err := b.converter.Convert(ctx, req.PluginId, frames, false)
if err != nil {
results.Error = err
}