| 
									
										
										
										
											2024-02-01 02:36:51 +08:00
										 |  |  | package query | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import ( | 
					
						
							|  |  |  | 	"context" | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | 	"errors" | 
					
						
							| 
									
										
										
										
											2024-02-01 02:36:51 +08:00
										 |  |  | 	"fmt" | 
					
						
							|  |  |  | 	"net/http" | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | 	"time" | 
					
						
							| 
									
										
										
										
											2024-02-01 02:36:51 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	"github.com/grafana/grafana-plugin-sdk-go/backend" | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | 	"github.com/grafana/grafana-plugin-sdk-go/experimental/apis/data/v0alpha1" | 
					
						
							|  |  |  | 	"go.opentelemetry.io/otel/attribute" | 
					
						
							| 
									
										
										
										
											2024-02-01 02:36:51 +08:00
										 |  |  | 	"golang.org/x/sync/errgroup" | 
					
						
							| 
									
										
										
										
											2024-05-11 02:01:17 +08:00
										 |  |  | 	errorsK8s "k8s.io/apimachinery/pkg/api/errors" | 
					
						
							|  |  |  | 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | 
					
						
							|  |  |  | 	"k8s.io/apimachinery/pkg/runtime" | 
					
						
							|  |  |  | 	"k8s.io/apimachinery/pkg/runtime/schema" | 
					
						
							|  |  |  | 	"k8s.io/apiserver/pkg/registry/rest" | 
					
						
							| 
									
										
										
										
											2024-02-01 02:36:51 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | 	query "github.com/grafana/grafana/pkg/apis/query/v0alpha1" | 
					
						
							|  |  |  | 	"github.com/grafana/grafana/pkg/expr/mathexp" | 
					
						
							| 
									
										
										
										
											2024-02-01 02:36:51 +08:00
										 |  |  | 	"github.com/grafana/grafana/pkg/infra/log" | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | 	"github.com/grafana/grafana/pkg/services/datasources" | 
					
						
							| 
									
										
										
										
											2024-02-01 02:36:51 +08:00
										 |  |  | 	"github.com/grafana/grafana/pkg/web" | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-05-11 02:01:17 +08:00
										 |  |  | type queryREST struct { | 
					
						
							|  |  |  | 	builder *QueryAPIBuilder | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | var ( | 
					
						
							|  |  |  | 	_ rest.Storage              = (*queryREST)(nil) | 
					
						
							|  |  |  | 	_ rest.SingularNameProvider = (*queryREST)(nil) | 
					
						
							|  |  |  | 	_ rest.Connecter            = (*queryREST)(nil) | 
					
						
							|  |  |  | 	_ rest.Scoper               = (*queryREST)(nil) | 
					
						
							|  |  |  | 	_ rest.StorageMetadata      = (*queryREST)(nil) | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (r *queryREST) New() runtime.Object { | 
					
						
							|  |  |  | 	// This is added as the "ResponseType" regarless what ProducesObject() says :)
 | 
					
						
							|  |  |  | 	return &query.QueryDataResponse{} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (r *queryREST) Destroy() {} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (r *queryREST) NamespaceScoped() bool { | 
					
						
							|  |  |  | 	return true | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (r *queryREST) GetSingularName() string { | 
					
						
							|  |  |  | 	return "QueryResults" // Used for the
 | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (r *queryREST) ProducesMIMETypes(verb string) []string { | 
					
						
							|  |  |  | 	return []string{"application/json"} // and parquet!
 | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (r *queryREST) ProducesObject(verb string) interface{} { | 
					
						
							|  |  |  | 	return &query.QueryDataResponse{} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (r *queryREST) ConnectMethods() []string { | 
					
						
							|  |  |  | 	return []string{"POST"} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (r *queryREST) NewConnectOptions() (runtime.Object, bool, string) { | 
					
						
							|  |  |  | 	return nil, false, "" // true means you can use the trailing path as a variable
 | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (r *queryREST) Connect(ctx context.Context, name string, opts runtime.Object, responder rest.Responder) (http.Handler, error) { | 
					
						
							|  |  |  | 	// See: /pkg/apiserver/builder/helper.go#L34
 | 
					
						
							|  |  |  | 	// The name is set with a rewriter hack
 | 
					
						
							|  |  |  | 	if name != "name" { | 
					
						
							|  |  |  | 		return nil, errorsK8s.NewNotFound(schema.GroupResource{}, name) | 
					
						
							| 
									
										
										
										
											2024-02-01 02:36:51 +08:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-05-11 02:01:17 +08:00
										 |  |  | 	b := r.builder | 
					
						
							| 
									
										
										
										
											2024-02-01 02:36:51 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-05-11 02:01:17 +08:00
										 |  |  | 	return http.HandlerFunc(func(w http.ResponseWriter, httpreq *http.Request) { | 
					
						
							|  |  |  | 		ctx, span := b.tracer.Start(httpreq.Context(), "QueryService.Query") | 
					
						
							|  |  |  | 		defer span.End() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		raw := &query.QueryDataRequest{} | 
					
						
							|  |  |  | 		err := web.Bind(httpreq, raw) | 
					
						
							|  |  |  | 		if err != nil { | 
					
						
							|  |  |  | 			err = errorsK8s.NewBadRequest("error reading query") | 
					
						
							|  |  |  | 			// TODO: can we wrap the error so details are not lost?!
 | 
					
						
							|  |  |  | 			// errutil.BadRequest(
 | 
					
						
							|  |  |  | 			// 	"query.bind",
 | 
					
						
							|  |  |  | 			// 	errutil.WithPublicMessage("Error reading query")).
 | 
					
						
							|  |  |  | 			// 	Errorf("error reading: %w", err)
 | 
					
						
							|  |  |  | 			responder.Error(err) | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | 			return | 
					
						
							| 
									
										
										
										
											2024-02-01 02:36:51 +08:00
										 |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-05-11 02:01:17 +08:00
										 |  |  | 		// Parses the request and splits it into multiple sub queries (if necessary)
 | 
					
						
							|  |  |  | 		req, err := b.parser.parseRequest(ctx, raw) | 
					
						
							|  |  |  | 		if err != nil { | 
					
						
							|  |  |  | 			if errors.Is(err, datasources.ErrDataSourceNotFound) { | 
					
						
							|  |  |  | 				// TODO, can we wrap the error somehow?
 | 
					
						
							|  |  |  | 				err = &errorsK8s.StatusError{ErrStatus: metav1.Status{ | 
					
						
							|  |  |  | 					Status:  metav1.StatusFailure, | 
					
						
							|  |  |  | 					Code:    http.StatusBadRequest, // the URL is found, but includes bad requests
 | 
					
						
							|  |  |  | 					Reason:  metav1.StatusReasonNotFound, | 
					
						
							|  |  |  | 					Message: "datasource not found", | 
					
						
							|  |  |  | 				}} | 
					
						
							|  |  |  | 			} | 
					
						
							| 
									
										
										
										
											2024-05-20 23:11:37 +08:00
										 |  |  | 			responder.Error(err) | 
					
						
							| 
									
										
										
										
											2024-05-11 02:01:17 +08:00
										 |  |  | 			return | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		// Actually run the query
 | 
					
						
							|  |  |  | 		rsp, err := b.execute(ctx, req) | 
					
						
							|  |  |  | 		if err != nil { | 
					
						
							| 
									
										
										
										
											2024-05-20 23:11:37 +08:00
										 |  |  | 			responder.Error(err) | 
					
						
							| 
									
										
										
										
											2024-05-11 02:01:17 +08:00
										 |  |  | 			return | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-05-11 02:01:17 +08:00
										 |  |  | 		responder.Object(query.GetResponseCode(rsp), &query.QueryDataResponse{ | 
					
						
							|  |  |  | 			QueryDataResponse: *rsp, // wrap the backend response as a QueryDataResponse
 | 
					
						
							|  |  |  | 		}) | 
					
						
							|  |  |  | 	}), nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | func (b *QueryAPIBuilder) execute(ctx context.Context, req parsedRequestInfo) (qdr *backend.QueryDataResponse, err error) { | 
					
						
							| 
									
										
										
										
											2024-02-01 02:36:51 +08:00
										 |  |  | 	switch len(req.Requests) { | 
					
						
							|  |  |  | 	case 0: | 
					
						
							|  |  |  | 		break // nothing to do
 | 
					
						
							|  |  |  | 	case 1: | 
					
						
							|  |  |  | 		qdr, err = b.handleQuerySingleDatasource(ctx, req.Requests[0]) | 
					
						
							|  |  |  | 	default: | 
					
						
							|  |  |  | 		qdr, err = b.executeConcurrentQueries(ctx, req.Requests) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	if len(req.Expressions) > 0 { | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | 		qdr, err = b.handleExpressions(ctx, req, qdr) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Remove hidden results
 | 
					
						
							|  |  |  | 	for _, refId := range req.HideBeforeReturn { | 
					
						
							|  |  |  | 		r, ok := qdr.Responses[refId] | 
					
						
							|  |  |  | 		if ok && r.Error == nil { | 
					
						
							|  |  |  | 			delete(qdr.Responses, refId) | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2024-02-01 02:36:51 +08:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | 	return | 
					
						
							| 
									
										
										
										
											2024-02-01 02:36:51 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Process a single request
 | 
					
						
							|  |  |  | // See: https://github.com/grafana/grafana/blob/v10.2.3/pkg/services/query/query.go#L242
 | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | func (b *QueryAPIBuilder) handleQuerySingleDatasource(ctx context.Context, req datasourceRequest) (*backend.QueryDataResponse, error) { | 
					
						
							|  |  |  | 	ctx, span := b.tracer.Start(ctx, "Query.handleQuerySingleDatasource") | 
					
						
							|  |  |  | 	defer span.End() | 
					
						
							|  |  |  | 	span.SetAttributes( | 
					
						
							|  |  |  | 		attribute.String("datasource.type", req.PluginId), | 
					
						
							|  |  |  | 		attribute.String("datasource.uid", req.UID), | 
					
						
							|  |  |  | 	) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	allHidden := true | 
					
						
							|  |  |  | 	for idx := range req.Request.Queries { | 
					
						
							|  |  |  | 		if !req.Request.Queries[idx].Hide { | 
					
						
							|  |  |  | 			allHidden = false | 
					
						
							|  |  |  | 			break | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	if allHidden { | 
					
						
							|  |  |  | 		return &backend.QueryDataResponse{}, nil | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-03-19 21:52:15 +08:00
										 |  |  | 	// Add user headers... here or in client.QueryData
 | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | 	client, err := b.client.GetDataSourceClient(ctx, v0alpha1.DataSourceRef{ | 
					
						
							|  |  |  | 		Type: req.PluginId, | 
					
						
							|  |  |  | 		UID:  req.UID, | 
					
						
							|  |  |  | 	}) | 
					
						
							| 
									
										
										
										
											2024-02-01 02:36:51 +08:00
										 |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return nil, err | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-03-25 22:22:34 +08:00
										 |  |  | 	code, rsp, err := client.QueryData(ctx, *req.Request) | 
					
						
							| 
									
										
										
										
											2024-03-19 21:52:15 +08:00
										 |  |  | 	if err == nil && rsp != nil { | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | 		for _, q := range req.Request.Queries { | 
					
						
							|  |  |  | 			if q.ResultAssertions != nil { | 
					
						
							|  |  |  | 				result, ok := rsp.Responses[q.RefID] | 
					
						
							|  |  |  | 				if ok && result.Error == nil { | 
					
						
							|  |  |  | 					err = q.ResultAssertions.Validate(result.Frames) | 
					
						
							|  |  |  | 					if err != nil { | 
					
						
							|  |  |  | 						result.Error = err | 
					
						
							|  |  |  | 						result.ErrorSource = backend.ErrorSourceDownstream | 
					
						
							|  |  |  | 						rsp.Responses[q.RefID] = result | 
					
						
							|  |  |  | 					} | 
					
						
							|  |  |  | 				} | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-03-25 22:22:34 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	// Create a response object with the error when missing (happens for client errors like 404)
 | 
					
						
							|  |  |  | 	if rsp == nil && err != nil { | 
					
						
							|  |  |  | 		rsp = &backend.QueryDataResponse{Responses: make(backend.Responses)} | 
					
						
							|  |  |  | 		for _, q := range req.Request.Queries { | 
					
						
							|  |  |  | 			rsp.Responses[q.RefID] = backend.DataResponse{ | 
					
						
							|  |  |  | 				Status: backend.Status(code), | 
					
						
							|  |  |  | 				Error:  err, | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | 	return rsp, err | 
					
						
							| 
									
										
										
										
											2024-02-01 02:36:51 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // buildErrorResponses applies the provided error to each query response in the list. These queries should all belong to the same datasource.
 | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | func buildErrorResponse(err error, req datasourceRequest) *backend.QueryDataResponse { | 
					
						
							| 
									
										
										
										
											2024-02-01 02:36:51 +08:00
										 |  |  | 	rsp := backend.NewQueryDataResponse() | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | 	for _, query := range req.Request.Queries { | 
					
						
							| 
									
										
										
										
											2024-02-01 02:36:51 +08:00
										 |  |  | 		rsp.Responses[query.RefID] = backend.DataResponse{ | 
					
						
							|  |  |  | 			Error: err, | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return rsp | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // executeConcurrentQueries executes queries to multiple datasources concurrently and returns the aggregate result.
 | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | func (b *QueryAPIBuilder) executeConcurrentQueries(ctx context.Context, requests []datasourceRequest) (*backend.QueryDataResponse, error) { | 
					
						
							|  |  |  | 	ctx, span := b.tracer.Start(ctx, "Query.executeConcurrentQueries") | 
					
						
							|  |  |  | 	defer span.End() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-02-01 02:36:51 +08:00
										 |  |  | 	g, ctx := errgroup.WithContext(ctx) | 
					
						
							|  |  |  | 	g.SetLimit(b.concurrentQueryLimit) // prevent too many concurrent requests
 | 
					
						
							|  |  |  | 	rchan := make(chan *backend.QueryDataResponse, len(requests)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Create panic recovery function for loop below
 | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | 	recoveryFn := func(req datasourceRequest) { | 
					
						
							| 
									
										
										
										
											2024-02-01 02:36:51 +08:00
										 |  |  | 		if r := recover(); r != nil { | 
					
						
							|  |  |  | 			var err error | 
					
						
							|  |  |  | 			b.log.Error("query datasource panic", "error", r, "stack", log.Stack(1)) | 
					
						
							|  |  |  | 			if theErr, ok := r.(error); ok { | 
					
						
							|  |  |  | 				err = theErr | 
					
						
							|  |  |  | 			} else if theErrString, ok := r.(string); ok { | 
					
						
							|  |  |  | 				err = fmt.Errorf(theErrString) | 
					
						
							|  |  |  | 			} else { | 
					
						
							|  |  |  | 				err = fmt.Errorf("unexpected error - %s", b.userFacingDefaultError) | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 			// Due to the panic, there is no valid response for any query for this datasource. Append an error for each one.
 | 
					
						
							|  |  |  | 			rchan <- buildErrorResponse(err, req) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Query each datasource concurrently
 | 
					
						
							|  |  |  | 	for idx := range requests { | 
					
						
							|  |  |  | 		req := requests[idx] | 
					
						
							|  |  |  | 		g.Go(func() error { | 
					
						
							|  |  |  | 			defer recoveryFn(req) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 			dqr, err := b.handleQuerySingleDatasource(ctx, req) | 
					
						
							|  |  |  | 			if err == nil { | 
					
						
							|  |  |  | 				rchan <- dqr | 
					
						
							|  |  |  | 			} else { | 
					
						
							|  |  |  | 				rchan <- buildErrorResponse(err, req) | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 			return nil | 
					
						
							|  |  |  | 		}) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	if err := g.Wait(); err != nil { | 
					
						
							|  |  |  | 		return nil, err | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	close(rchan) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Merge the results from each response
 | 
					
						
							|  |  |  | 	resp := backend.NewQueryDataResponse() | 
					
						
							|  |  |  | 	for result := range rchan { | 
					
						
							|  |  |  | 		for refId, dataResponse := range result.Responses { | 
					
						
							|  |  |  | 			resp.Responses[refId] = dataResponse | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	return resp, nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | // Unlike the implementation in expr/node.go, all datasource queries have been processed first
 | 
					
						
							|  |  |  | func (b *QueryAPIBuilder) handleExpressions(ctx context.Context, req parsedRequestInfo, data *backend.QueryDataResponse) (qdr *backend.QueryDataResponse, err error) { | 
					
						
							|  |  |  | 	start := time.Now() | 
					
						
							|  |  |  | 	ctx, span := b.tracer.Start(ctx, "SSE.handleExpressions") | 
					
						
							|  |  |  | 	defer func() { | 
					
						
							|  |  |  | 		var respStatus string | 
					
						
							|  |  |  | 		switch { | 
					
						
							|  |  |  | 		case err == nil: | 
					
						
							|  |  |  | 			respStatus = "success" | 
					
						
							|  |  |  | 		default: | 
					
						
							|  |  |  | 			respStatus = "failure" | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		duration := float64(time.Since(start).Nanoseconds()) / float64(time.Millisecond) | 
					
						
							|  |  |  | 		b.metrics.expressionsQuerySummary.WithLabelValues(respStatus).Observe(duration) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		span.End() | 
					
						
							|  |  |  | 	}() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	qdr = data | 
					
						
							|  |  |  | 	if qdr == nil { | 
					
						
							|  |  |  | 		qdr = &backend.QueryDataResponse{} | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-03-25 22:22:34 +08:00
										 |  |  | 	if qdr.Responses == nil { | 
					
						
							|  |  |  | 		qdr.Responses = make(backend.Responses) // avoid NPE for lookup
 | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | 	now := start // <<< this should come from the original query parser
 | 
					
						
							|  |  |  | 	vars := make(mathexp.Vars) | 
					
						
							|  |  |  | 	for _, expression := range req.Expressions { | 
					
						
							|  |  |  | 		// Setup the variables
 | 
					
						
							|  |  |  | 		for _, refId := range expression.Command.NeedsVars() { | 
					
						
							|  |  |  | 			_, ok := vars[refId] | 
					
						
							|  |  |  | 			if !ok { | 
					
						
							|  |  |  | 				dr, ok := qdr.Responses[refId] | 
					
						
							|  |  |  | 				if ok { | 
					
						
							|  |  |  | 					allowLongFrames := false // TODO -- depends on input type and only if SQL?
 | 
					
						
							|  |  |  | 					_, res, err := b.converter.Convert(ctx, req.RefIDTypes[refId], dr.Frames, allowLongFrames) | 
					
						
							|  |  |  | 					if err != nil { | 
					
						
							|  |  |  | 						res.Error = err | 
					
						
							|  |  |  | 					} | 
					
						
							|  |  |  | 					vars[refId] = res | 
					
						
							|  |  |  | 				} else { | 
					
						
							|  |  |  | 					// This should error in the parsing phase
 | 
					
						
							|  |  |  | 					err := fmt.Errorf("missing variable %s for %s", refId, expression.RefID) | 
					
						
							|  |  |  | 					qdr.Responses[refId] = backend.DataResponse{ | 
					
						
							|  |  |  | 						Error: err, | 
					
						
							|  |  |  | 					} | 
					
						
							|  |  |  | 					return qdr, err | 
					
						
							|  |  |  | 				} | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		refId := expression.RefID | 
					
						
							|  |  |  | 		results, err := expression.Command.Execute(ctx, now, vars, b.tracer) | 
					
						
							|  |  |  | 		if err != nil { | 
					
						
							|  |  |  | 			results.Error = err | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		qdr.Responses[refId] = backend.DataResponse{ | 
					
						
							|  |  |  | 			Error:  results.Error, | 
					
						
							|  |  |  | 			Frames: results.Values.AsDataFrames(refId), | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return qdr, nil | 
					
						
							| 
									
										
										
										
											2024-02-01 02:36:51 +08:00
										 |  |  | } |