| 
									
										
										
										
											2024-02-01 02:36:51 +08:00
										 |  |  | package query | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import ( | 
					
						
							|  |  |  | 	"context" | 
					
						
							|  |  |  | 	"encoding/json" | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | 	"errors" | 
					
						
							| 
									
										
										
										
											2024-02-01 02:36:51 +08:00
										 |  |  | 	"fmt" | 
					
						
							|  |  |  | 	"net/http" | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | 	"time" | 
					
						
							| 
									
										
										
										
											2024-02-01 02:36:51 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	"github.com/grafana/grafana-plugin-sdk-go/backend" | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | 	"github.com/grafana/grafana-plugin-sdk-go/experimental/apis/data/v0alpha1" | 
					
						
							|  |  |  | 	"go.opentelemetry.io/otel/attribute" | 
					
						
							| 
									
										
										
										
											2024-02-01 02:36:51 +08:00
										 |  |  | 	"golang.org/x/sync/errgroup" | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | 	query "github.com/grafana/grafana/pkg/apis/query/v0alpha1" | 
					
						
							|  |  |  | 	"github.com/grafana/grafana/pkg/expr/mathexp" | 
					
						
							| 
									
										
										
										
											2024-02-01 02:36:51 +08:00
										 |  |  | 	"github.com/grafana/grafana/pkg/infra/log" | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | 	"github.com/grafana/grafana/pkg/services/datasources" | 
					
						
							|  |  |  | 	"github.com/grafana/grafana/pkg/util/errutil" | 
					
						
							| 
									
										
										
										
											2024-02-01 02:36:51 +08:00
										 |  |  | 	"github.com/grafana/grafana/pkg/util/errutil/errhttp" | 
					
						
							|  |  |  | 	"github.com/grafana/grafana/pkg/web" | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | // The query method (not really a create)
 | 
					
						
							|  |  |  | func (b *QueryAPIBuilder) doQuery(w http.ResponseWriter, r *http.Request) { | 
					
						
							|  |  |  | 	ctx, span := b.tracer.Start(r.Context(), "QueryService.Query") | 
					
						
							|  |  |  | 	defer span.End() | 
					
						
							| 
									
										
										
										
											2024-02-01 02:36:51 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | 	raw := &query.QueryDataRequest{} | 
					
						
							|  |  |  | 	err := web.Bind(r, raw) | 
					
						
							| 
									
										
										
										
											2024-02-01 02:36:51 +08:00
										 |  |  | 	if err != nil { | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | 		errhttp.Write(ctx, errutil.BadRequest( | 
					
						
							|  |  |  | 			"query.bind", | 
					
						
							|  |  |  | 			errutil.WithPublicMessage("Error reading query")). | 
					
						
							|  |  |  | 			Errorf("error reading: %w", err), w) | 
					
						
							| 
									
										
										
										
											2024-02-01 02:36:51 +08:00
										 |  |  | 		return | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | 	// Parses the request and splits it into multiple sub queries (if necessary)
 | 
					
						
							|  |  |  | 	req, err := b.parser.parseRequest(ctx, raw) | 
					
						
							| 
									
										
										
										
											2024-02-01 02:36:51 +08:00
										 |  |  | 	if err != nil { | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | 		if errors.Is(err, datasources.ErrDataSourceNotFound) { | 
					
						
							|  |  |  | 			errhttp.Write(ctx, errutil.BadRequest( | 
					
						
							|  |  |  | 				"query.datasource.notfound", | 
					
						
							|  |  |  | 				errutil.WithPublicMessage(err.Error())), w) | 
					
						
							|  |  |  | 			return | 
					
						
							| 
									
										
										
										
											2024-02-01 02:36:51 +08:00
										 |  |  | 		} | 
					
						
							| 
									
										
										
										
											2024-03-19 21:52:15 +08:00
										 |  |  | 		errhttp.Write(ctx, err, w) | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | 		return | 
					
						
							| 
									
										
										
										
											2024-02-01 02:36:51 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | 	// Actually run the query
 | 
					
						
							|  |  |  | 	rsp, err := b.execute(ctx, req) | 
					
						
							| 
									
										
										
										
											2024-02-01 02:36:51 +08:00
										 |  |  | 	if err != nil { | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | 		errhttp.Write(ctx, errutil.Internal( | 
					
						
							|  |  |  | 			"query.execution", | 
					
						
							|  |  |  | 			errutil.WithPublicMessage("Error executing query")). | 
					
						
							|  |  |  | 			Errorf("execution error: %w", err), w) | 
					
						
							|  |  |  | 		return | 
					
						
							| 
									
										
										
										
											2024-02-01 02:36:51 +08:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-03-19 21:52:15 +08:00
										 |  |  | 	w.Header().Set("Content-Type", "application/json") | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | 	w.WriteHeader(query.GetResponseCode(rsp)) | 
					
						
							|  |  |  | 	_ = json.NewEncoder(w).Encode(rsp) | 
					
						
							| 
									
										
										
										
											2024-02-01 02:36:51 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | func (b *QueryAPIBuilder) execute(ctx context.Context, req parsedRequestInfo) (qdr *backend.QueryDataResponse, err error) { | 
					
						
							| 
									
										
										
										
											2024-02-01 02:36:51 +08:00
										 |  |  | 	switch len(req.Requests) { | 
					
						
							|  |  |  | 	case 0: | 
					
						
							|  |  |  | 		break // nothing to do
 | 
					
						
							|  |  |  | 	case 1: | 
					
						
							|  |  |  | 		qdr, err = b.handleQuerySingleDatasource(ctx, req.Requests[0]) | 
					
						
							|  |  |  | 	default: | 
					
						
							|  |  |  | 		qdr, err = b.executeConcurrentQueries(ctx, req.Requests) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	if len(req.Expressions) > 0 { | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | 		qdr, err = b.handleExpressions(ctx, req, qdr) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Remove hidden results
 | 
					
						
							|  |  |  | 	for _, refId := range req.HideBeforeReturn { | 
					
						
							|  |  |  | 		r, ok := qdr.Responses[refId] | 
					
						
							|  |  |  | 		if ok && r.Error == nil { | 
					
						
							|  |  |  | 			delete(qdr.Responses, refId) | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2024-02-01 02:36:51 +08:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | 	return | 
					
						
							| 
									
										
										
										
											2024-02-01 02:36:51 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Process a single request
 | 
					
						
							|  |  |  | // See: https://github.com/grafana/grafana/blob/v10.2.3/pkg/services/query/query.go#L242
 | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | func (b *QueryAPIBuilder) handleQuerySingleDatasource(ctx context.Context, req datasourceRequest) (*backend.QueryDataResponse, error) { | 
					
						
							|  |  |  | 	ctx, span := b.tracer.Start(ctx, "Query.handleQuerySingleDatasource") | 
					
						
							|  |  |  | 	defer span.End() | 
					
						
							|  |  |  | 	span.SetAttributes( | 
					
						
							|  |  |  | 		attribute.String("datasource.type", req.PluginId), | 
					
						
							|  |  |  | 		attribute.String("datasource.uid", req.UID), | 
					
						
							|  |  |  | 	) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	allHidden := true | 
					
						
							|  |  |  | 	for idx := range req.Request.Queries { | 
					
						
							|  |  |  | 		if !req.Request.Queries[idx].Hide { | 
					
						
							|  |  |  | 			allHidden = false | 
					
						
							|  |  |  | 			break | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	if allHidden { | 
					
						
							|  |  |  | 		return &backend.QueryDataResponse{}, nil | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-03-19 21:52:15 +08:00
										 |  |  | 	// Add user headers... here or in client.QueryData
 | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | 	client, err := b.client.GetDataSourceClient(ctx, v0alpha1.DataSourceRef{ | 
					
						
							|  |  |  | 		Type: req.PluginId, | 
					
						
							|  |  |  | 		UID:  req.UID, | 
					
						
							|  |  |  | 	}) | 
					
						
							| 
									
										
										
										
											2024-02-01 02:36:51 +08:00
										 |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return nil, err | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	_, rsp, err := client.QueryData(ctx, *req.Request) | 
					
						
							| 
									
										
										
										
											2024-03-19 21:52:15 +08:00
										 |  |  | 	if err == nil && rsp != nil { | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | 		for _, q := range req.Request.Queries { | 
					
						
							|  |  |  | 			if q.ResultAssertions != nil { | 
					
						
							|  |  |  | 				result, ok := rsp.Responses[q.RefID] | 
					
						
							|  |  |  | 				if ok && result.Error == nil { | 
					
						
							|  |  |  | 					err = q.ResultAssertions.Validate(result.Frames) | 
					
						
							|  |  |  | 					if err != nil { | 
					
						
							|  |  |  | 						result.Error = err | 
					
						
							|  |  |  | 						result.ErrorSource = backend.ErrorSourceDownstream | 
					
						
							|  |  |  | 						rsp.Responses[q.RefID] = result | 
					
						
							|  |  |  | 					} | 
					
						
							|  |  |  | 				} | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return rsp, err | 
					
						
							| 
									
										
										
										
											2024-02-01 02:36:51 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // buildErrorResponses applies the provided error to each query response in the list. These queries should all belong to the same datasource.
 | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | func buildErrorResponse(err error, req datasourceRequest) *backend.QueryDataResponse { | 
					
						
							| 
									
										
										
										
											2024-02-01 02:36:51 +08:00
										 |  |  | 	rsp := backend.NewQueryDataResponse() | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | 	for _, query := range req.Request.Queries { | 
					
						
							| 
									
										
										
										
											2024-02-01 02:36:51 +08:00
										 |  |  | 		rsp.Responses[query.RefID] = backend.DataResponse{ | 
					
						
							|  |  |  | 			Error: err, | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return rsp | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // executeConcurrentQueries executes queries to multiple datasources concurrently and returns the aggregate result.
 | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | func (b *QueryAPIBuilder) executeConcurrentQueries(ctx context.Context, requests []datasourceRequest) (*backend.QueryDataResponse, error) { | 
					
						
							|  |  |  | 	ctx, span := b.tracer.Start(ctx, "Query.executeConcurrentQueries") | 
					
						
							|  |  |  | 	defer span.End() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-02-01 02:36:51 +08:00
										 |  |  | 	g, ctx := errgroup.WithContext(ctx) | 
					
						
							|  |  |  | 	g.SetLimit(b.concurrentQueryLimit) // prevent too many concurrent requests
 | 
					
						
							|  |  |  | 	rchan := make(chan *backend.QueryDataResponse, len(requests)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Create panic recovery function for loop below
 | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | 	recoveryFn := func(req datasourceRequest) { | 
					
						
							| 
									
										
										
										
											2024-02-01 02:36:51 +08:00
										 |  |  | 		if r := recover(); r != nil { | 
					
						
							|  |  |  | 			var err error | 
					
						
							|  |  |  | 			b.log.Error("query datasource panic", "error", r, "stack", log.Stack(1)) | 
					
						
							|  |  |  | 			if theErr, ok := r.(error); ok { | 
					
						
							|  |  |  | 				err = theErr | 
					
						
							|  |  |  | 			} else if theErrString, ok := r.(string); ok { | 
					
						
							|  |  |  | 				err = fmt.Errorf(theErrString) | 
					
						
							|  |  |  | 			} else { | 
					
						
							|  |  |  | 				err = fmt.Errorf("unexpected error - %s", b.userFacingDefaultError) | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 			// Due to the panic, there is no valid response for any query for this datasource. Append an error for each one.
 | 
					
						
							|  |  |  | 			rchan <- buildErrorResponse(err, req) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Query each datasource concurrently
 | 
					
						
							|  |  |  | 	for idx := range requests { | 
					
						
							|  |  |  | 		req := requests[idx] | 
					
						
							|  |  |  | 		g.Go(func() error { | 
					
						
							|  |  |  | 			defer recoveryFn(req) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 			dqr, err := b.handleQuerySingleDatasource(ctx, req) | 
					
						
							|  |  |  | 			if err == nil { | 
					
						
							|  |  |  | 				rchan <- dqr | 
					
						
							|  |  |  | 			} else { | 
					
						
							|  |  |  | 				rchan <- buildErrorResponse(err, req) | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 			return nil | 
					
						
							|  |  |  | 		}) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	if err := g.Wait(); err != nil { | 
					
						
							|  |  |  | 		return nil, err | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	close(rchan) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Merge the results from each response
 | 
					
						
							|  |  |  | 	resp := backend.NewQueryDataResponse() | 
					
						
							|  |  |  | 	for result := range rchan { | 
					
						
							|  |  |  | 		for refId, dataResponse := range result.Responses { | 
					
						
							|  |  |  | 			resp.Responses[refId] = dataResponse | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	return resp, nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-03-09 00:12:59 +08:00
										 |  |  | // Unlike the implementation in expr/node.go, all datasource queries have been processed first
 | 
					
						
							|  |  |  | func (b *QueryAPIBuilder) handleExpressions(ctx context.Context, req parsedRequestInfo, data *backend.QueryDataResponse) (qdr *backend.QueryDataResponse, err error) { | 
					
						
							|  |  |  | 	start := time.Now() | 
					
						
							|  |  |  | 	ctx, span := b.tracer.Start(ctx, "SSE.handleExpressions") | 
					
						
							|  |  |  | 	defer func() { | 
					
						
							|  |  |  | 		var respStatus string | 
					
						
							|  |  |  | 		switch { | 
					
						
							|  |  |  | 		case err == nil: | 
					
						
							|  |  |  | 			respStatus = "success" | 
					
						
							|  |  |  | 		default: | 
					
						
							|  |  |  | 			respStatus = "failure" | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		duration := float64(time.Since(start).Nanoseconds()) / float64(time.Millisecond) | 
					
						
							|  |  |  | 		b.metrics.expressionsQuerySummary.WithLabelValues(respStatus).Observe(duration) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		span.End() | 
					
						
							|  |  |  | 	}() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	qdr = data | 
					
						
							|  |  |  | 	if qdr == nil { | 
					
						
							|  |  |  | 		qdr = &backend.QueryDataResponse{} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	now := start // <<< this should come from the original query parser
 | 
					
						
							|  |  |  | 	vars := make(mathexp.Vars) | 
					
						
							|  |  |  | 	for _, expression := range req.Expressions { | 
					
						
							|  |  |  | 		// Setup the variables
 | 
					
						
							|  |  |  | 		for _, refId := range expression.Command.NeedsVars() { | 
					
						
							|  |  |  | 			_, ok := vars[refId] | 
					
						
							|  |  |  | 			if !ok { | 
					
						
							|  |  |  | 				dr, ok := qdr.Responses[refId] | 
					
						
							|  |  |  | 				if ok { | 
					
						
							|  |  |  | 					allowLongFrames := false // TODO -- depends on input type and only if SQL?
 | 
					
						
							|  |  |  | 					_, res, err := b.converter.Convert(ctx, req.RefIDTypes[refId], dr.Frames, allowLongFrames) | 
					
						
							|  |  |  | 					if err != nil { | 
					
						
							|  |  |  | 						res.Error = err | 
					
						
							|  |  |  | 					} | 
					
						
							|  |  |  | 					vars[refId] = res | 
					
						
							|  |  |  | 				} else { | 
					
						
							|  |  |  | 					// This should error in the parsing phase
 | 
					
						
							|  |  |  | 					err := fmt.Errorf("missing variable %s for %s", refId, expression.RefID) | 
					
						
							|  |  |  | 					qdr.Responses[refId] = backend.DataResponse{ | 
					
						
							|  |  |  | 						Error: err, | 
					
						
							|  |  |  | 					} | 
					
						
							|  |  |  | 					return qdr, err | 
					
						
							|  |  |  | 				} | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		refId := expression.RefID | 
					
						
							|  |  |  | 		results, err := expression.Command.Execute(ctx, now, vars, b.tracer) | 
					
						
							|  |  |  | 		if err != nil { | 
					
						
							|  |  |  | 			results.Error = err | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		qdr.Responses[refId] = backend.DataResponse{ | 
					
						
							|  |  |  | 			Error:  results.Error, | 
					
						
							|  |  |  | 			Frames: results.Values.AsDataFrames(refId), | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return qdr, nil | 
					
						
							| 
									
										
										
										
											2024-02-01 02:36:51 +08:00
										 |  |  | } |