DS-Querier: support group queries (#109879)

This commit is contained in:
Sarah Zinger 2025-08-22 14:59:30 -04:00 committed by GitHub
parent 333a21f19d
commit d51e6a16bc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 38 additions and 13 deletions

View File

@ -274,31 +274,23 @@ func executeDSNodesGrouped(ctx context.Context, now time.Time, vars mathexp.Vars
func() {
ctx, span := s.tracer.Start(ctx, "SSE.ExecuteDatasourceQuery")
defer span.End()
firstNode := nodeGroup[0]
pCtx, err := s.pCtxProvider.GetWithDataSource(ctx, firstNode.datasource.Type, firstNode.request.User, firstNode.datasource)
if err != nil {
for _, dn := range nodeGroup {
vars[dn.refID] = mathexp.Results{Error: datasources.ErrDataSourceNotFound}
}
return
}
firstNode := nodeGroup[0]
logger := logger.FromContext(ctx).New("datasourceType", firstNode.datasource.Type,
"queryRefId", firstNode.refID,
"datasourceUid", firstNode.datasource.UID,
"datasourceVersion", firstNode.datasource.Version,
)
span.SetAttributes(
attribute.String("datasource.type", firstNode.datasource.Type),
attribute.String("datasource.uid", firstNode.datasource.UID),
)
req := &backend.QueryDataRequest{
PluginContext: pCtx,
Headers: firstNode.request.Headers,
}
// add all the queries from the node group to the request
for _, dn := range nodeGroup {
req.Queries = append(req.Queries, backend.DataQuery{
RefID: dn.refID,
@ -324,15 +316,48 @@ func executeDSNodesGrouped(ctx context.Context, now time.Time, vars mathexp.Vars
s.metrics.DSRequests.WithLabelValues(respStatus, fmt.Sprintf("%t", useDataplane), firstNode.datasource.Type).Inc()
}
resp, err := s.dataService.QueryData(ctx, req)
var resp *backend.QueryDataResponse
// get the new client if it exists
qsDSClient, ok, err := s.qsDatasourceClientBuilder.BuildClient(firstNode.datasource.Type, firstNode.datasource.UID)
if err != nil {
for _, dn := range nodeGroup {
vars[dn.refID] = mathexp.Results{Error: MakeQueryError(firstNode.refID, firstNode.datasource.UID, err)}
vars[dn.refID] = mathexp.Results{Error: datasources.ErrDataSourceNotFound}
}
instrument(err, "")
return
}
var queryErr error
if !ok { // legacy flow
pCtx, err := s.pCtxProvider.GetWithDataSource(ctx, firstNode.datasource.Type, firstNode.request.User, firstNode.datasource)
if err != nil {
for _, dn := range nodeGroup {
vars[dn.refID] = mathexp.Results{Error: datasources.ErrDataSourceNotFound}
}
return
}
req.PluginContext = pCtx
resp, queryErr = s.dataService.QueryData(ctx, req)
} else { // new query service flow
k8sReq, err := ConvertBackendRequestToDataRequest(req)
if err != nil {
for _, dn := range nodeGroup {
vars[dn.refID] = mathexp.Results{Error: datasources.ErrDataSourceNotFound}
}
return
}
resp, queryErr = qsDSClient.QueryData(ctx, *k8sReq)
}
if queryErr != nil {
for _, dn := range nodeGroup {
vars[dn.refID] = mathexp.Results{Error: MakeQueryError(firstNode.refID, firstNode.datasource.UID, queryErr)}
}
instrument(queryErr, "")
return
}
for _, dn := range nodeGroup {
dataFrames, err := getResponseFrame(logger, resp, dn.refID)
if err != nil {