2022-05-14 02:28:54 +08:00
|
|
|
package querydata
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
2024-08-21 23:40:42 +08:00
|
|
|
"errors"
|
2022-10-28 22:11:06 +08:00
|
|
|
"fmt"
|
2022-07-04 17:18:45 +08:00
|
|
|
"net/http"
|
2022-05-14 02:28:54 +08:00
|
|
|
"regexp"
|
2024-08-12 20:31:39 +08:00
|
|
|
"sync"
|
2022-05-14 02:28:54 +08:00
|
|
|
"time"
|
|
|
|
|
2024-08-12 20:31:39 +08:00
|
|
|
"github.com/grafana/dskit/concurrency"
|
2022-05-14 02:28:54 +08:00
|
|
|
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
2023-10-20 21:54:27 +08:00
|
|
|
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
|
2023-10-24 01:11:12 +08:00
|
|
|
"github.com/grafana/grafana-plugin-sdk-go/backend/tracing"
|
2024-08-12 20:31:39 +08:00
|
|
|
"github.com/grafana/grafana-plugin-sdk-go/data"
|
|
|
|
"github.com/grafana/grafana-plugin-sdk-go/data/utils/maputil"
|
2024-12-31 05:43:10 +08:00
|
|
|
"github.com/grafana/grafana-plugin-sdk-go/experimental/status"
|
2024-08-21 23:40:42 +08:00
|
|
|
"go.opentelemetry.io/otel/trace"
|
|
|
|
|
2024-03-12 00:22:33 +08:00
|
|
|
"github.com/grafana/grafana/pkg/promlib/client"
|
|
|
|
"github.com/grafana/grafana/pkg/promlib/intervalv2"
|
|
|
|
"github.com/grafana/grafana/pkg/promlib/models"
|
|
|
|
"github.com/grafana/grafana/pkg/promlib/querydata/exemplar"
|
|
|
|
"github.com/grafana/grafana/pkg/promlib/utils"
|
2022-05-14 02:28:54 +08:00
|
|
|
)
|
|
|
|
|
|
|
|
const legendFormatAuto = "__auto"
|
|
|
|
|
|
|
|
var legendFormatRegexp = regexp.MustCompile(`\{\{\s*(.+?)\s*\}\}`)
|
|
|
|
|
|
|
|
type ExemplarEvent struct {
|
|
|
|
Time time.Time
|
|
|
|
Value float64
|
|
|
|
Labels map[string]string
|
|
|
|
}
|
|
|
|
|
2022-07-04 17:18:45 +08:00
|
|
|
// QueryData handles querying but different from buffered package uses a custom client instead of default Go Prom
|
|
|
|
// client.
|
2022-05-14 02:28:54 +08:00
|
|
|
type QueryData struct {
|
2022-12-31 02:04:35 +08:00
|
|
|
intervalCalculator intervalv2.Calculator
|
2023-10-24 01:11:12 +08:00
|
|
|
tracer trace.Tracer
|
2022-12-31 02:04:35 +08:00
|
|
|
client *client.Client
|
|
|
|
log log.Logger
|
|
|
|
ID int64
|
|
|
|
URL string
|
|
|
|
TimeInterval string
|
2023-01-11 21:27:47 +08:00
|
|
|
exemplarSampler func() exemplar.Sampler
|
2025-03-14 20:54:19 +08:00
|
|
|
featureToggles backend.FeatureToggles
|
2022-05-14 02:28:54 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
func New(
|
2022-07-04 17:18:45 +08:00
|
|
|
httpClient *http.Client,
|
2022-05-14 02:28:54 +08:00
|
|
|
settings backend.DataSourceInstanceSettings,
|
|
|
|
plog log.Logger,
|
2025-03-14 20:54:19 +08:00
|
|
|
featureToggles backend.FeatureToggles,
|
2022-05-14 02:28:54 +08:00
|
|
|
) (*QueryData, error) {
|
2022-07-04 17:18:45 +08:00
|
|
|
jsonData, err := utils.GetJsonData(settings)
|
2022-05-14 02:28:54 +08:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2022-07-04 17:18:45 +08:00
|
|
|
httpMethod, _ := maputil.GetStringOptional(jsonData, "httpMethod")
|
2024-10-23 22:16:00 +08:00
|
|
|
if httpMethod == "" {
|
|
|
|
httpMethod = http.MethodPost
|
|
|
|
}
|
2022-05-14 02:28:54 +08:00
|
|
|
|
2022-07-04 17:18:45 +08:00
|
|
|
timeInterval, err := maputil.GetStringOptional(jsonData, "timeInterval")
|
2022-05-14 02:28:54 +08:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2024-10-23 22:16:00 +08:00
|
|
|
queryTimeout, err := maputil.GetStringOptional(jsonData, "queryTimeout")
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
2023-11-03 22:16:02 +08:00
|
|
|
}
|
|
|
|
|
2024-10-23 22:16:00 +08:00
|
|
|
promClient := client.NewClient(httpClient, httpMethod, settings.URL, queryTimeout)
|
2022-07-04 17:18:45 +08:00
|
|
|
|
2022-12-31 02:04:35 +08:00
|
|
|
// standard deviation sampler is the default for backwards compatibility
|
2023-01-11 21:27:47 +08:00
|
|
|
exemplarSampler := exemplar.NewStandardDeviationSampler
|
2022-12-31 02:04:35 +08:00
|
|
|
|
2022-05-14 02:28:54 +08:00
|
|
|
return &QueryData{
|
2022-12-31 02:04:35 +08:00
|
|
|
intervalCalculator: intervalv2.NewCalculator(),
|
2023-10-24 01:11:12 +08:00
|
|
|
tracer: tracing.DefaultTracer(),
|
2022-12-31 02:04:35 +08:00
|
|
|
log: plog,
|
|
|
|
client: promClient,
|
|
|
|
TimeInterval: timeInterval,
|
|
|
|
ID: settings.ID,
|
|
|
|
URL: settings.URL,
|
|
|
|
exemplarSampler: exemplarSampler,
|
2025-03-14 20:54:19 +08:00
|
|
|
featureToggles: featureToggles,
|
2022-05-14 02:28:54 +08:00
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *QueryData) Execute(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
|
2022-12-31 02:04:35 +08:00
|
|
|
fromAlert := req.Headers["FromAlert"] == "true"
|
2024-08-29 23:06:25 +08:00
|
|
|
logger := s.log.FromContext(ctx)
|
|
|
|
logger.Debug("Begin query execution", "fromAlert", fromAlert)
|
2022-05-14 02:28:54 +08:00
|
|
|
result := backend.QueryDataResponse{
|
|
|
|
Responses: backend.Responses{},
|
|
|
|
}
|
|
|
|
|
2024-08-12 20:31:39 +08:00
|
|
|
var (
|
2025-04-12 06:11:19 +08:00
|
|
|
hasPromQLScopeFeatureFlag = s.featureToggles.IsEnabled("promQLScope")
|
|
|
|
m sync.Mutex
|
2024-08-12 20:31:39 +08:00
|
|
|
)
|
|
|
|
|
2025-04-12 06:11:19 +08:00
|
|
|
concurrentQueryCount, err := req.PluginContext.GrafanaConfig.ConcurrentQueryCount()
|
|
|
|
if err != nil {
|
|
|
|
logger.Debug(fmt.Sprintf("Concurrent Query Count read/parse error: %v", err), "prometheusRunQueriesInParallel")
|
|
|
|
concurrentQueryCount = 10
|
|
|
|
}
|
2024-02-02 01:07:32 +08:00
|
|
|
|
2025-04-12 06:11:19 +08:00
|
|
|
_ = concurrency.ForEachJob(ctx, len(req.Queries), concurrentQueryCount, func(ctx context.Context, idx int) error {
|
|
|
|
query := req.Queries[idx]
|
|
|
|
r := s.handleQuery(ctx, query, fromAlert, hasPromQLScopeFeatureFlag)
|
|
|
|
if r != nil {
|
|
|
|
m.Lock()
|
|
|
|
result.Responses[query.RefID] = *r
|
|
|
|
m.Unlock()
|
2022-05-14 02:28:54 +08:00
|
|
|
}
|
2025-04-12 06:11:19 +08:00
|
|
|
return nil
|
|
|
|
})
|
2022-05-14 02:28:54 +08:00
|
|
|
|
|
|
|
return &result, nil
|
|
|
|
}
|
|
|
|
|
2024-08-12 20:31:39 +08:00
|
|
|
func (s *QueryData) handleQuery(ctx context.Context, bq backend.DataQuery, fromAlert,
|
2025-04-12 06:11:19 +08:00
|
|
|
hasPromQLScopeFeatureFlag bool) *backend.DataResponse {
|
2024-04-18 01:53:38 +08:00
|
|
|
traceCtx, span := s.tracer.Start(ctx, "datasource.prometheus")
|
|
|
|
defer span.End()
|
|
|
|
query, err := models.Parse(span, bq, s.TimeInterval, s.intervalCalculator, fromAlert, hasPromQLScopeFeatureFlag)
|
|
|
|
if err != nil {
|
|
|
|
return &backend.DataResponse{
|
|
|
|
Error: err,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2025-04-12 06:11:19 +08:00
|
|
|
r := s.fetch(traceCtx, s.client, query)
|
2024-04-18 01:53:38 +08:00
|
|
|
if r == nil {
|
|
|
|
s.log.FromContext(ctx).Debug("Received nil response from runQuery", "query", query.Expr)
|
|
|
|
}
|
|
|
|
return r
|
|
|
|
}
|
2022-05-14 02:28:54 +08:00
|
|
|
|
2025-04-12 06:11:19 +08:00
|
|
|
func (s *QueryData) fetch(traceCtx context.Context, client *client.Client, q *models.Query) *backend.DataResponse {
|
2022-10-28 00:05:06 +08:00
|
|
|
logger := s.log.FromContext(traceCtx)
|
2024-10-23 22:16:00 +08:00
|
|
|
logger.Debug("Sending query", "start", q.Start, "end", q.End, "step", q.Step, "query", q.Expr /*, "queryTimeout", s.QueryTimeout*/)
|
2022-10-28 00:05:06 +08:00
|
|
|
|
2023-02-01 02:26:45 +08:00
|
|
|
dr := &backend.DataResponse{
|
2022-05-14 02:28:54 +08:00
|
|
|
Frames: data.Frames{},
|
|
|
|
Error: nil,
|
|
|
|
}
|
|
|
|
|
2024-08-12 20:31:39 +08:00
|
|
|
var (
|
|
|
|
wg sync.WaitGroup
|
|
|
|
m sync.Mutex
|
|
|
|
)
|
|
|
|
|
2022-10-28 22:11:06 +08:00
|
|
|
if q.InstantQuery {
|
2025-04-12 06:11:19 +08:00
|
|
|
wg.Add(1)
|
|
|
|
go func() {
|
|
|
|
defer wg.Done()
|
2025-03-13 18:12:20 +08:00
|
|
|
res := s.instantQuery(traceCtx, client, q)
|
2025-04-12 06:11:19 +08:00
|
|
|
m.Lock()
|
2024-08-12 20:31:39 +08:00
|
|
|
addDataResponse(&res, dr)
|
2025-04-12 06:11:19 +08:00
|
|
|
m.Unlock()
|
|
|
|
}()
|
2022-05-14 02:28:54 +08:00
|
|
|
}
|
|
|
|
|
2022-10-28 22:11:06 +08:00
|
|
|
if q.RangeQuery {
|
2025-04-12 06:11:19 +08:00
|
|
|
wg.Add(1)
|
|
|
|
go func() {
|
|
|
|
defer wg.Done()
|
2025-03-13 18:12:20 +08:00
|
|
|
res := s.rangeQuery(traceCtx, client, q)
|
2025-04-12 06:11:19 +08:00
|
|
|
m.Lock()
|
2024-08-12 20:31:39 +08:00
|
|
|
addDataResponse(&res, dr)
|
2025-04-12 06:11:19 +08:00
|
|
|
m.Unlock()
|
|
|
|
}()
|
2022-05-14 02:28:54 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
if q.ExemplarQuery {
|
2025-04-12 06:11:19 +08:00
|
|
|
wg.Add(1)
|
|
|
|
go func() {
|
|
|
|
defer wg.Done()
|
2025-03-13 18:12:20 +08:00
|
|
|
res := s.exemplarQuery(traceCtx, client, q)
|
2025-04-12 06:11:19 +08:00
|
|
|
m.Lock()
|
2024-08-12 20:31:39 +08:00
|
|
|
if res.Error != nil {
|
|
|
|
// If exemplar query returns error, we want to only log it and
|
|
|
|
// continue with other results processing
|
|
|
|
logger.Error("Exemplar query failed", "query", q.Expr, "err", res.Error)
|
|
|
|
}
|
|
|
|
dr.Frames = append(dr.Frames, res.Frames...)
|
2025-04-12 06:11:19 +08:00
|
|
|
m.Unlock()
|
|
|
|
}()
|
2022-05-14 02:28:54 +08:00
|
|
|
}
|
2024-08-12 20:31:39 +08:00
|
|
|
wg.Wait()
|
2022-05-14 02:28:54 +08:00
|
|
|
|
2023-02-01 02:26:45 +08:00
|
|
|
return dr
|
2022-05-14 02:28:54 +08:00
|
|
|
}
|
|
|
|
|
2025-03-13 18:12:20 +08:00
|
|
|
func (s *QueryData) rangeQuery(ctx context.Context, c *client.Client, q *models.Query) backend.DataResponse {
|
2022-12-21 20:25:58 +08:00
|
|
|
res, err := c.QueryRange(ctx, q)
|
2023-03-01 02:08:01 +08:00
|
|
|
if err != nil {
|
2024-11-26 00:30:13 +08:00
|
|
|
return addErrorSourceToDataResponse(err)
|
2023-03-01 02:08:01 +08:00
|
|
|
}
|
|
|
|
|
2023-02-23 22:10:03 +08:00
|
|
|
defer func() {
|
|
|
|
err := res.Body.Close()
|
|
|
|
if err != nil {
|
2023-09-05 04:25:43 +08:00
|
|
|
s.log.Warn("Failed to close query range response body", "error", err)
|
2023-02-23 22:10:03 +08:00
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
2024-08-26 20:53:38 +08:00
|
|
|
return s.parseResponse(ctx, q, res)
|
2023-03-01 02:08:01 +08:00
|
|
|
}
|
|
|
|
|
2025-03-13 18:12:20 +08:00
|
|
|
func (s *QueryData) instantQuery(ctx context.Context, c *client.Client, q *models.Query) backend.DataResponse {
|
2023-03-01 02:08:01 +08:00
|
|
|
res, err := c.QueryInstant(ctx, q)
|
2022-05-14 02:28:54 +08:00
|
|
|
if err != nil {
|
2024-11-26 00:30:13 +08:00
|
|
|
return addErrorSourceToDataResponse(err)
|
2022-05-14 02:28:54 +08:00
|
|
|
}
|
|
|
|
|
2024-11-14 10:19:30 +08:00
|
|
|
// This is only for health check fall back scenario
|
|
|
|
if res.StatusCode != 200 && q.RefId == "__healthcheck__" {
|
2023-04-27 21:43:54 +08:00
|
|
|
return backend.DataResponse{
|
2024-11-26 00:30:13 +08:00
|
|
|
Error: errors.New(res.Status),
|
|
|
|
ErrorSource: backend.ErrorSourceFromHTTPStatus(res.StatusCode),
|
2023-04-27 21:43:54 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-02-23 22:10:03 +08:00
|
|
|
defer func() {
|
|
|
|
err := res.Body.Close()
|
|
|
|
if err != nil {
|
2023-09-05 04:25:43 +08:00
|
|
|
s.log.Warn("Failed to close response body", "error", err)
|
2023-02-23 22:10:03 +08:00
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
2024-08-26 20:53:38 +08:00
|
|
|
return s.parseResponse(ctx, q, res)
|
2023-03-01 02:08:01 +08:00
|
|
|
}
|
|
|
|
|
2025-03-13 18:12:20 +08:00
|
|
|
func (s *QueryData) exemplarQuery(ctx context.Context, c *client.Client, q *models.Query) backend.DataResponse {
|
2023-03-01 02:08:01 +08:00
|
|
|
res, err := c.QueryExemplars(ctx, q)
|
2022-05-14 02:28:54 +08:00
|
|
|
if err != nil {
|
2024-11-26 00:30:13 +08:00
|
|
|
response := backend.DataResponse{
|
2024-11-14 10:19:30 +08:00
|
|
|
Error: err,
|
2023-02-01 02:26:45 +08:00
|
|
|
}
|
2024-11-26 00:30:13 +08:00
|
|
|
|
|
|
|
if backend.IsDownstreamHTTPError(err) {
|
|
|
|
response.ErrorSource = backend.ErrorSourceDownstream
|
|
|
|
}
|
|
|
|
return response
|
2022-05-14 02:28:54 +08:00
|
|
|
}
|
|
|
|
|
2023-02-23 22:10:03 +08:00
|
|
|
defer func() {
|
|
|
|
err := res.Body.Close()
|
|
|
|
if err != nil {
|
2023-09-05 04:25:43 +08:00
|
|
|
s.log.Warn("Failed to close response body", "error", err)
|
2023-02-23 22:10:03 +08:00
|
|
|
}
|
|
|
|
}()
|
2024-08-26 20:53:38 +08:00
|
|
|
return s.parseResponse(ctx, q, res)
|
2022-05-14 02:28:54 +08:00
|
|
|
}
|
2024-08-12 20:31:39 +08:00
|
|
|
|
|
|
|
func addDataResponse(res *backend.DataResponse, dr *backend.DataResponse) {
|
|
|
|
if res.Error != nil {
|
|
|
|
if dr.Error == nil {
|
|
|
|
dr.Error = res.Error
|
|
|
|
} else {
|
|
|
|
dr.Error = fmt.Errorf("%v %w", dr.Error, res.Error)
|
|
|
|
}
|
2024-12-31 05:43:10 +08:00
|
|
|
dr.ErrorSource = status.SourceFromHTTPStatus(int(res.Status))
|
2024-08-12 20:31:39 +08:00
|
|
|
dr.Status = res.Status
|
|
|
|
}
|
|
|
|
dr.Frames = append(dr.Frames, res.Frames...)
|
|
|
|
}
|
2024-11-26 00:30:13 +08:00
|
|
|
|
|
|
|
func addErrorSourceToDataResponse(err error) backend.DataResponse {
|
|
|
|
response := backend.DataResponse{
|
|
|
|
Error: err,
|
|
|
|
Status: backend.StatusBadGateway,
|
|
|
|
}
|
|
|
|
|
|
|
|
if backend.IsDownstreamHTTPError(err) {
|
|
|
|
response.ErrorSource = backend.ErrorSourceDownstream
|
|
|
|
}
|
|
|
|
return response
|
|
|
|
}
|