| 
									
										
										
										
											2024-02-01 02:36:51 +08:00
										 |  |  | package query | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import ( | 
					
						
							|  |  |  | 	"context" | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | 	"errors" | 
					
						
							| 
									
										
										
										
											2024-02-01 02:36:51 +08:00
										 |  |  | 	"fmt" | 
					
						
							|  |  |  | 	"net/http" | 
					
						
							| 
									
										
										
										
											2024-05-21 19:07:47 +08:00
										 |  |  | 	"strconv" | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | 	"time" | 
					
						
							| 
									
										
										
										
											2024-02-01 02:36:51 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	"github.com/grafana/grafana-plugin-sdk-go/backend" | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | 	"github.com/grafana/grafana-plugin-sdk-go/experimental/apis/data/v0alpha1" | 
					
						
							| 
									
										
										
										
											2024-09-20 01:20:39 +08:00
										 |  |  | 	"github.com/grafana/grafana/pkg/services/ngalert/models" | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | 	"go.opentelemetry.io/otel/attribute" | 
					
						
							| 
									
										
										
										
											2024-05-21 19:07:47 +08:00
										 |  |  | 	"go.opentelemetry.io/otel/codes" | 
					
						
							| 
									
										
										
										
											2024-02-01 02:36:51 +08:00
										 |  |  | 	"golang.org/x/sync/errgroup" | 
					
						
							| 
									
										
										
										
											2024-05-11 02:01:17 +08:00
										 |  |  | 	errorsK8s "k8s.io/apimachinery/pkg/api/errors" | 
					
						
							|  |  |  | 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | 
					
						
							|  |  |  | 	"k8s.io/apimachinery/pkg/runtime" | 
					
						
							|  |  |  | 	"k8s.io/apimachinery/pkg/runtime/schema" | 
					
						
							| 
									
										
										
										
											2024-06-22 01:05:55 +08:00
										 |  |  | 	"k8s.io/apiserver/pkg/endpoints/request" | 
					
						
							| 
									
										
										
										
											2024-05-11 02:01:17 +08:00
										 |  |  | 	"k8s.io/apiserver/pkg/registry/rest" | 
					
						
							| 
									
										
										
										
											2024-02-01 02:36:51 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | 	query "github.com/grafana/grafana/pkg/apis/query/v0alpha1" | 
					
						
							|  |  |  | 	"github.com/grafana/grafana/pkg/expr/mathexp" | 
					
						
							| 
									
										
										
										
											2024-02-01 02:36:51 +08:00
										 |  |  | 	"github.com/grafana/grafana/pkg/infra/log" | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | 	"github.com/grafana/grafana/pkg/services/datasources" | 
					
						
							| 
									
										
										
										
											2024-02-01 02:36:51 +08:00
										 |  |  | 	"github.com/grafana/grafana/pkg/web" | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-05-11 02:01:17 +08:00
										 |  |  | type queryREST struct { | 
					
						
							| 
									
										
										
										
											2024-05-21 19:07:47 +08:00
										 |  |  | 	logger  log.Logger | 
					
						
							| 
									
										
										
										
											2024-05-11 02:01:17 +08:00
										 |  |  | 	builder *QueryAPIBuilder | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | var ( | 
					
						
							|  |  |  | 	_ rest.Storage              = (*queryREST)(nil) | 
					
						
							|  |  |  | 	_ rest.SingularNameProvider = (*queryREST)(nil) | 
					
						
							|  |  |  | 	_ rest.Connecter            = (*queryREST)(nil) | 
					
						
							|  |  |  | 	_ rest.Scoper               = (*queryREST)(nil) | 
					
						
							|  |  |  | 	_ rest.StorageMetadata      = (*queryREST)(nil) | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-05-21 19:07:47 +08:00
										 |  |  | func newQueryREST(builder *QueryAPIBuilder) *queryREST { | 
					
						
							|  |  |  | 	return &queryREST{ | 
					
						
							|  |  |  | 		logger:  log.New("query"), | 
					
						
							|  |  |  | 		builder: builder, | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-05-11 02:01:17 +08:00
										 |  |  | func (r *queryREST) New() runtime.Object { | 
					
						
							| 
									
										
										
										
											2024-08-30 00:47:38 +08:00
										 |  |  | 	// This is added as the "ResponseType" regardless what ProducesObject() says :)
 | 
					
						
							| 
									
										
										
										
											2024-05-11 02:01:17 +08:00
										 |  |  | 	return &query.QueryDataResponse{} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (r *queryREST) Destroy() {} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (r *queryREST) NamespaceScoped() bool { | 
					
						
							|  |  |  | 	return true | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (r *queryREST) GetSingularName() string { | 
					
						
							|  |  |  | 	return "QueryResults" // Used for the
 | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (r *queryREST) ProducesMIMETypes(verb string) []string { | 
					
						
							|  |  |  | 	return []string{"application/json"} // and parquet!
 | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (r *queryREST) ProducesObject(verb string) interface{} { | 
					
						
							|  |  |  | 	return &query.QueryDataResponse{} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (r *queryREST) ConnectMethods() []string { | 
					
						
							|  |  |  | 	return []string{"POST"} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (r *queryREST) NewConnectOptions() (runtime.Object, bool, string) { | 
					
						
							|  |  |  | 	return nil, false, "" // true means you can use the trailing path as a variable
 | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-06-22 01:05:55 +08:00
										 |  |  | func (r *queryREST) Connect(connectCtx context.Context, name string, _ runtime.Object, incomingResponder rest.Responder) (http.Handler, error) { | 
					
						
							| 
									
										
										
										
											2024-07-01 23:42:34 +08:00
										 |  |  | 	// See: /pkg/services/apiserver/builder/helper.go#L34
 | 
					
						
							| 
									
										
										
										
											2024-05-11 02:01:17 +08:00
										 |  |  | 	// The name is set with a rewriter hack
 | 
					
						
							|  |  |  | 	if name != "name" { | 
					
						
							|  |  |  | 		return nil, errorsK8s.NewNotFound(schema.GroupResource{}, name) | 
					
						
							| 
									
										
										
										
											2024-02-01 02:36:51 +08:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-05-11 02:01:17 +08:00
										 |  |  | 	b := r.builder | 
					
						
							| 
									
										
										
										
											2024-02-01 02:36:51 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-05-11 02:01:17 +08:00
										 |  |  | 	return http.HandlerFunc(func(w http.ResponseWriter, httpreq *http.Request) { | 
					
						
							|  |  |  | 		ctx, span := b.tracer.Start(httpreq.Context(), "QueryService.Query") | 
					
						
							|  |  |  | 		defer span.End() | 
					
						
							| 
									
										
										
										
											2024-06-22 01:05:55 +08:00
										 |  |  | 		ctx = request.WithNamespace(ctx, request.NamespaceValue(connectCtx)) | 
					
						
							| 
									
										
										
										
											2024-05-11 02:01:17 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-05-21 19:07:47 +08:00
										 |  |  | 		responder := newResponderWrapper(incomingResponder, | 
					
						
							|  |  |  | 			func(statusCode int, obj runtime.Object) { | 
					
						
							|  |  |  | 				if statusCode >= 400 { | 
					
						
							|  |  |  | 					span.SetStatus(codes.Error, fmt.Sprintf("error with HTTP status code %s", strconv.Itoa(statusCode))) | 
					
						
							|  |  |  | 				} | 
					
						
							|  |  |  | 			}, | 
					
						
							|  |  |  | 			func(err error) { | 
					
						
							| 
									
										
										
										
											2024-05-22 23:56:34 +08:00
										 |  |  | 				span.SetStatus(codes.Error, "query error") | 
					
						
							| 
									
										
										
										
											2024-05-21 19:07:47 +08:00
										 |  |  | 				if err == nil { | 
					
						
							|  |  |  | 					return | 
					
						
							|  |  |  | 				} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 				span.RecordError(err) | 
					
						
							|  |  |  | 			}) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-05-11 02:01:17 +08:00
										 |  |  | 		raw := &query.QueryDataRequest{} | 
					
						
							|  |  |  | 		err := web.Bind(httpreq, raw) | 
					
						
							|  |  |  | 		if err != nil { | 
					
						
							|  |  |  | 			err = errorsK8s.NewBadRequest("error reading query") | 
					
						
							|  |  |  | 			// TODO: can we wrap the error so details are not lost?!
 | 
					
						
							|  |  |  | 			// errutil.BadRequest(
 | 
					
						
							|  |  |  | 			// 	"query.bind",
 | 
					
						
							|  |  |  | 			// 	errutil.WithPublicMessage("Error reading query")).
 | 
					
						
							|  |  |  | 			// 	Errorf("error reading: %w", err)
 | 
					
						
							|  |  |  | 			responder.Error(err) | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | 			return | 
					
						
							| 
									
										
										
										
											2024-02-01 02:36:51 +08:00
										 |  |  | 		} | 
					
						
							| 
									
										
										
										
											2024-05-11 02:01:17 +08:00
										 |  |  | 		// Parses the request and splits it into multiple sub queries (if necessary)
 | 
					
						
							|  |  |  | 		req, err := b.parser.parseRequest(ctx, raw) | 
					
						
							|  |  |  | 		if err != nil { | 
					
						
							| 
									
										
										
										
											2024-10-10 05:28:27 +08:00
										 |  |  | 			reason := metav1.StatusReasonInvalid | 
					
						
							|  |  |  | 			message := err.Error() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-05-11 02:01:17 +08:00
										 |  |  | 			if errors.Is(err, datasources.ErrDataSourceNotFound) { | 
					
						
							| 
									
										
										
										
											2024-10-10 05:28:27 +08:00
										 |  |  | 				reason = metav1.StatusReasonNotFound | 
					
						
							| 
									
										
										
										
											2024-05-11 02:01:17 +08:00
										 |  |  | 				// TODO, can we wrap the error somehow?
 | 
					
						
							| 
									
										
										
										
											2024-10-10 05:28:27 +08:00
										 |  |  | 				message = "datasource not found" | 
					
						
							| 
									
										
										
										
											2024-05-11 02:01:17 +08:00
										 |  |  | 			} | 
					
						
							| 
									
										
										
										
											2024-10-10 05:28:27 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 			err = &errorsK8s.StatusError{ErrStatus: metav1.Status{ | 
					
						
							|  |  |  | 				Status:  metav1.StatusFailure, | 
					
						
							|  |  |  | 				Code:    http.StatusBadRequest, | 
					
						
							|  |  |  | 				Reason:  reason, | 
					
						
							|  |  |  | 				Message: message, | 
					
						
							|  |  |  | 			}} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-05-20 23:11:37 +08:00
										 |  |  | 			responder.Error(err) | 
					
						
							| 
									
										
										
										
											2024-05-11 02:01:17 +08:00
										 |  |  | 			return | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-08-30 00:47:38 +08:00
										 |  |  | 		for i := range req.Requests { | 
					
						
							| 
									
										
										
										
											2024-09-04 00:01:27 +08:00
										 |  |  | 			req.Requests[i].Headers = ExtractKnownHeaders(httpreq.Header) | 
					
						
							| 
									
										
										
										
											2024-08-30 00:47:38 +08:00
										 |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-05-11 02:01:17 +08:00
										 |  |  | 		// Actually run the query
 | 
					
						
							|  |  |  | 		rsp, err := b.execute(ctx, req) | 
					
						
							|  |  |  | 		if err != nil { | 
					
						
							| 
									
										
										
										
											2024-05-20 23:11:37 +08:00
										 |  |  | 			responder.Error(err) | 
					
						
							| 
									
										
										
										
											2024-05-11 02:01:17 +08:00
										 |  |  | 			return | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-05-11 02:01:17 +08:00
										 |  |  | 		responder.Object(query.GetResponseCode(rsp), &query.QueryDataResponse{ | 
					
						
							|  |  |  | 			QueryDataResponse: *rsp, // wrap the backend response as a QueryDataResponse
 | 
					
						
							|  |  |  | 		}) | 
					
						
							|  |  |  | 	}), nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | func (b *QueryAPIBuilder) execute(ctx context.Context, req parsedRequestInfo) (qdr *backend.QueryDataResponse, err error) { | 
					
						
							| 
									
										
										
										
											2024-02-01 02:36:51 +08:00
										 |  |  | 	switch len(req.Requests) { | 
					
						
							|  |  |  | 	case 0: | 
					
						
							| 
									
										
										
										
											2024-07-03 03:44:29 +08:00
										 |  |  | 		qdr = &backend.QueryDataResponse{} | 
					
						
							| 
									
										
										
										
											2024-02-01 02:36:51 +08:00
										 |  |  | 	case 1: | 
					
						
							|  |  |  | 		qdr, err = b.handleQuerySingleDatasource(ctx, req.Requests[0]) | 
					
						
							| 
									
										
										
										
											2024-09-20 01:20:39 +08:00
										 |  |  | 		if alertQueryWithoutExpression(req) { | 
					
						
							|  |  |  | 			qdr, err = b.convertQueryWithoutExpression(ctx, req.Requests[0], qdr) | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2024-02-01 02:36:51 +08:00
										 |  |  | 	default: | 
					
						
							|  |  |  | 		qdr, err = b.executeConcurrentQueries(ctx, req.Requests) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	if len(req.Expressions) > 0 { | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | 		qdr, err = b.handleExpressions(ctx, req, qdr) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Remove hidden results
 | 
					
						
							|  |  |  | 	for _, refId := range req.HideBeforeReturn { | 
					
						
							|  |  |  | 		r, ok := qdr.Responses[refId] | 
					
						
							|  |  |  | 		if ok && r.Error == nil { | 
					
						
							|  |  |  | 			delete(qdr.Responses, refId) | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2024-02-01 02:36:51 +08:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | 	return | 
					
						
							| 
									
										
										
										
											2024-02-01 02:36:51 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Process a single request
 | 
					
						
							|  |  |  | // See: https://github.com/grafana/grafana/blob/v10.2.3/pkg/services/query/query.go#L242
 | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | func (b *QueryAPIBuilder) handleQuerySingleDatasource(ctx context.Context, req datasourceRequest) (*backend.QueryDataResponse, error) { | 
					
						
							|  |  |  | 	ctx, span := b.tracer.Start(ctx, "Query.handleQuerySingleDatasource") | 
					
						
							|  |  |  | 	defer span.End() | 
					
						
							|  |  |  | 	span.SetAttributes( | 
					
						
							|  |  |  | 		attribute.String("datasource.type", req.PluginId), | 
					
						
							|  |  |  | 		attribute.String("datasource.uid", req.UID), | 
					
						
							|  |  |  | 	) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	allHidden := true | 
					
						
							|  |  |  | 	for idx := range req.Request.Queries { | 
					
						
							|  |  |  | 		if !req.Request.Queries[idx].Hide { | 
					
						
							|  |  |  | 			allHidden = false | 
					
						
							|  |  |  | 			break | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	if allHidden { | 
					
						
							|  |  |  | 		return &backend.QueryDataResponse{}, nil | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-08-30 00:47:38 +08:00
										 |  |  | 	client, err := b.client.GetDataSourceClient( | 
					
						
							|  |  |  | 		ctx, | 
					
						
							|  |  |  | 		v0alpha1.DataSourceRef{ | 
					
						
							|  |  |  | 			Type: req.PluginId, | 
					
						
							|  |  |  | 			UID:  req.UID, | 
					
						
							|  |  |  | 		}, | 
					
						
							|  |  |  | 		req.Headers, | 
					
						
							|  |  |  | 	) | 
					
						
							| 
									
										
										
										
											2024-02-01 02:36:51 +08:00
										 |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return nil, err | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-03-25 22:22:34 +08:00
										 |  |  | 	code, rsp, err := client.QueryData(ctx, *req.Request) | 
					
						
							| 
									
										
										
										
											2024-03-19 21:52:15 +08:00
										 |  |  | 	if err == nil && rsp != nil { | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | 		for _, q := range req.Request.Queries { | 
					
						
							|  |  |  | 			if q.ResultAssertions != nil { | 
					
						
							|  |  |  | 				result, ok := rsp.Responses[q.RefID] | 
					
						
							|  |  |  | 				if ok && result.Error == nil { | 
					
						
							|  |  |  | 					err = q.ResultAssertions.Validate(result.Frames) | 
					
						
							|  |  |  | 					if err != nil { | 
					
						
							|  |  |  | 						result.Error = err | 
					
						
							|  |  |  | 						result.ErrorSource = backend.ErrorSourceDownstream | 
					
						
							|  |  |  | 						rsp.Responses[q.RefID] = result | 
					
						
							|  |  |  | 					} | 
					
						
							|  |  |  | 				} | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-03-25 22:22:34 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	// Create a response object with the error when missing (happens for client errors like 404)
 | 
					
						
							|  |  |  | 	if rsp == nil && err != nil { | 
					
						
							|  |  |  | 		rsp = &backend.QueryDataResponse{Responses: make(backend.Responses)} | 
					
						
							|  |  |  | 		for _, q := range req.Request.Queries { | 
					
						
							|  |  |  | 			rsp.Responses[q.RefID] = backend.DataResponse{ | 
					
						
							|  |  |  | 				Status: backend.Status(code), | 
					
						
							|  |  |  | 				Error:  err, | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | 	return rsp, err | 
					
						
							| 
									
										
										
										
											2024-02-01 02:36:51 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // buildErrorResponses applies the provided error to each query response in the list. These queries should all belong to the same datasource.
 | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | func buildErrorResponse(err error, req datasourceRequest) *backend.QueryDataResponse { | 
					
						
							| 
									
										
										
										
											2024-02-01 02:36:51 +08:00
										 |  |  | 	rsp := backend.NewQueryDataResponse() | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | 	for _, query := range req.Request.Queries { | 
					
						
							| 
									
										
										
										
											2024-02-01 02:36:51 +08:00
										 |  |  | 		rsp.Responses[query.RefID] = backend.DataResponse{ | 
					
						
							|  |  |  | 			Error: err, | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return rsp | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // executeConcurrentQueries executes queries to multiple datasources concurrently and returns the aggregate result.
 | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | func (b *QueryAPIBuilder) executeConcurrentQueries(ctx context.Context, requests []datasourceRequest) (*backend.QueryDataResponse, error) { | 
					
						
							|  |  |  | 	ctx, span := b.tracer.Start(ctx, "Query.executeConcurrentQueries") | 
					
						
							|  |  |  | 	defer span.End() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-02-01 02:36:51 +08:00
										 |  |  | 	g, ctx := errgroup.WithContext(ctx) | 
					
						
							|  |  |  | 	g.SetLimit(b.concurrentQueryLimit) // prevent too many concurrent requests
 | 
					
						
							|  |  |  | 	rchan := make(chan *backend.QueryDataResponse, len(requests)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Create panic recovery function for loop below
 | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | 	recoveryFn := func(req datasourceRequest) { | 
					
						
							| 
									
										
										
										
											2024-02-01 02:36:51 +08:00
										 |  |  | 		if r := recover(); r != nil { | 
					
						
							|  |  |  | 			var err error | 
					
						
							|  |  |  | 			b.log.Error("query datasource panic", "error", r, "stack", log.Stack(1)) | 
					
						
							|  |  |  | 			if theErr, ok := r.(error); ok { | 
					
						
							|  |  |  | 				err = theErr | 
					
						
							|  |  |  | 			} else if theErrString, ok := r.(string); ok { | 
					
						
							| 
									
										
										
										
											2024-08-21 23:40:42 +08:00
										 |  |  | 				err = errors.New(theErrString) | 
					
						
							| 
									
										
										
										
											2024-02-01 02:36:51 +08:00
										 |  |  | 			} else { | 
					
						
							|  |  |  | 				err = fmt.Errorf("unexpected error - %s", b.userFacingDefaultError) | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 			// Due to the panic, there is no valid response for any query for this datasource. Append an error for each one.
 | 
					
						
							|  |  |  | 			rchan <- buildErrorResponse(err, req) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Query each datasource concurrently
 | 
					
						
							|  |  |  | 	for idx := range requests { | 
					
						
							|  |  |  | 		req := requests[idx] | 
					
						
							|  |  |  | 		g.Go(func() error { | 
					
						
							|  |  |  | 			defer recoveryFn(req) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 			dqr, err := b.handleQuerySingleDatasource(ctx, req) | 
					
						
							|  |  |  | 			if err == nil { | 
					
						
							|  |  |  | 				rchan <- dqr | 
					
						
							|  |  |  | 			} else { | 
					
						
							|  |  |  | 				rchan <- buildErrorResponse(err, req) | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 			return nil | 
					
						
							|  |  |  | 		}) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	if err := g.Wait(); err != nil { | 
					
						
							|  |  |  | 		return nil, err | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	close(rchan) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Merge the results from each response
 | 
					
						
							|  |  |  | 	resp := backend.NewQueryDataResponse() | 
					
						
							|  |  |  | 	for result := range rchan { | 
					
						
							|  |  |  | 		for refId, dataResponse := range result.Responses { | 
					
						
							|  |  |  | 			resp.Responses[refId] = dataResponse | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	return resp, nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | // Unlike the implementation in expr/node.go, all datasource queries have been processed first
 | 
					
						
							|  |  |  | func (b *QueryAPIBuilder) handleExpressions(ctx context.Context, req parsedRequestInfo, data *backend.QueryDataResponse) (qdr *backend.QueryDataResponse, err error) { | 
					
						
							|  |  |  | 	start := time.Now() | 
					
						
							| 
									
										
										
										
											2024-05-22 23:56:34 +08:00
										 |  |  | 	ctx, span := b.tracer.Start(ctx, "Query.handleExpressions") | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | 	defer func() { | 
					
						
							|  |  |  | 		var respStatus string | 
					
						
							|  |  |  | 		switch { | 
					
						
							|  |  |  | 		case err == nil: | 
					
						
							|  |  |  | 			respStatus = "success" | 
					
						
							|  |  |  | 		default: | 
					
						
							|  |  |  | 			respStatus = "failure" | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		duration := float64(time.Since(start).Nanoseconds()) / float64(time.Millisecond) | 
					
						
							|  |  |  | 		b.metrics.expressionsQuerySummary.WithLabelValues(respStatus).Observe(duration) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		span.End() | 
					
						
							|  |  |  | 	}() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	qdr = data | 
					
						
							|  |  |  | 	if qdr == nil { | 
					
						
							|  |  |  | 		qdr = &backend.QueryDataResponse{} | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-03-25 22:22:34 +08:00
										 |  |  | 	if qdr.Responses == nil { | 
					
						
							|  |  |  | 		qdr.Responses = make(backend.Responses) // avoid NPE for lookup
 | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | 	now := start // <<< this should come from the original query parser
 | 
					
						
							|  |  |  | 	vars := make(mathexp.Vars) | 
					
						
							|  |  |  | 	for _, expression := range req.Expressions { | 
					
						
							|  |  |  | 		// Setup the variables
 | 
					
						
							|  |  |  | 		for _, refId := range expression.Command.NeedsVars() { | 
					
						
							|  |  |  | 			_, ok := vars[refId] | 
					
						
							|  |  |  | 			if !ok { | 
					
						
							|  |  |  | 				dr, ok := qdr.Responses[refId] | 
					
						
							|  |  |  | 				if ok { | 
					
						
							|  |  |  | 					allowLongFrames := false // TODO -- depends on input type and only if SQL?
 | 
					
						
							|  |  |  | 					_, res, err := b.converter.Convert(ctx, req.RefIDTypes[refId], dr.Frames, allowLongFrames) | 
					
						
							|  |  |  | 					if err != nil { | 
					
						
							|  |  |  | 						res.Error = err | 
					
						
							|  |  |  | 					} | 
					
						
							|  |  |  | 					vars[refId] = res | 
					
						
							|  |  |  | 				} else { | 
					
						
							|  |  |  | 					// This should error in the parsing phase
 | 
					
						
							|  |  |  | 					err := fmt.Errorf("missing variable %s for %s", refId, expression.RefID) | 
					
						
							|  |  |  | 					qdr.Responses[refId] = backend.DataResponse{ | 
					
						
							|  |  |  | 						Error: err, | 
					
						
							|  |  |  | 					} | 
					
						
							|  |  |  | 					return qdr, err | 
					
						
							|  |  |  | 				} | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		refId := expression.RefID | 
					
						
							|  |  |  | 		results, err := expression.Command.Execute(ctx, now, vars, b.tracer) | 
					
						
							|  |  |  | 		if err != nil { | 
					
						
							|  |  |  | 			results.Error = err | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		qdr.Responses[refId] = backend.DataResponse{ | 
					
						
							|  |  |  | 			Error:  results.Error, | 
					
						
							|  |  |  | 			Frames: results.Values.AsDataFrames(refId), | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return qdr, nil | 
					
						
							| 
									
										
										
										
											2024-02-01 02:36:51 +08:00
										 |  |  | } | 
					
						
							| 
									
										
										
										
											2024-05-21 19:07:47 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-09-20 01:20:39 +08:00
										 |  |  | func (b *QueryAPIBuilder) convertQueryWithoutExpression(ctx context.Context, req datasourceRequest, | 
					
						
							|  |  |  | 	qdr *backend.QueryDataResponse) (*backend.QueryDataResponse, error) { | 
					
						
							|  |  |  | 	if len(req.Request.Queries) == 0 { | 
					
						
							|  |  |  | 		return nil, errors.New("no queries to convert") | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	if qdr == nil { | 
					
						
							|  |  |  | 		return nil, errors.New("queryDataResponse is nil") | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	allowLongFrames := false | 
					
						
							|  |  |  | 	refID := req.Request.Queries[0].RefID | 
					
						
							|  |  |  | 	if _, exist := qdr.Responses[refID]; !exist { | 
					
						
							|  |  |  | 		return nil, fmt.Errorf("refID '%s' does not exist", refID) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	frames := qdr.Responses[refID].Frames | 
					
						
							|  |  |  | 	_, results, err := b.converter.Convert(ctx, req.PluginId, frames, allowLongFrames) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		results.Error = err | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	qdr = &backend.QueryDataResponse{ | 
					
						
							|  |  |  | 		Responses: map[string]backend.DataResponse{ | 
					
						
							|  |  |  | 			refID: { | 
					
						
							|  |  |  | 				Frames: results.Values.AsDataFrames(refID), | 
					
						
							|  |  |  | 				Error:  results.Error, | 
					
						
							|  |  |  | 			}, | 
					
						
							|  |  |  | 		}, | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return qdr, err | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-05-21 19:07:47 +08:00
										 |  |  | type responderWrapper struct { | 
					
						
							|  |  |  | 	wrapped    rest.Responder | 
					
						
							|  |  |  | 	onObjectFn func(statusCode int, obj runtime.Object) | 
					
						
							|  |  |  | 	onErrorFn  func(err error) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func newResponderWrapper(responder rest.Responder, onObjectFn func(statusCode int, obj runtime.Object), onErrorFn func(err error)) *responderWrapper { | 
					
						
							|  |  |  | 	return &responderWrapper{ | 
					
						
							|  |  |  | 		wrapped:    responder, | 
					
						
							|  |  |  | 		onObjectFn: onObjectFn, | 
					
						
							|  |  |  | 		onErrorFn:  onErrorFn, | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (r responderWrapper) Object(statusCode int, obj runtime.Object) { | 
					
						
							| 
									
										
										
										
											2024-05-22 23:56:34 +08:00
										 |  |  | 	if r.onObjectFn != nil { | 
					
						
							|  |  |  | 		r.onObjectFn(statusCode, obj) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-05-21 19:07:47 +08:00
										 |  |  | 	r.wrapped.Object(statusCode, obj) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (r responderWrapper) Error(err error) { | 
					
						
							| 
									
										
										
										
											2024-05-22 23:56:34 +08:00
										 |  |  | 	if r.onErrorFn != nil { | 
					
						
							|  |  |  | 		r.onErrorFn(err) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-05-21 19:07:47 +08:00
										 |  |  | 	r.wrapped.Error(err) | 
					
						
							|  |  |  | } | 
					
						
							| 
									
										
										
										
											2024-09-20 01:20:39 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | // Checks if the request only contains a single query and not expression.
 | 
					
						
							|  |  |  | func alertQueryWithoutExpression(req parsedRequestInfo) bool { | 
					
						
							|  |  |  | 	if len(req.Requests) != 1 { | 
					
						
							|  |  |  | 		return false | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	headers := req.Requests[0].Headers | 
					
						
							|  |  |  | 	_, exist := headers[models.FromAlertHeaderName] | 
					
						
							|  |  |  | 	if exist && len(req.Requests[0].Request.Queries) == 1 && len(req.Expressions) == 0 { | 
					
						
							|  |  |  | 		return true | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return false | 
					
						
							|  |  |  | } |