diff --git a/docs/sources/setup-grafana/configure-grafana/feature-toggles/index.md b/docs/sources/setup-grafana/configure-grafana/feature-toggles/index.md index 8c25a54b410..faaa612f662 100644 --- a/docs/sources/setup-grafana/configure-grafana/feature-toggles/index.md +++ b/docs/sources/setup-grafana/configure-grafana/feature-toggles/index.md @@ -233,6 +233,7 @@ Experimental features might be changed or removed without prior notice. | `k8SFolderMove` | Enable folder's api server move | | `teamHttpHeadersMimir` | Enables LBAC for datasources for Mimir to apply LBAC filtering of metrics to the client requests for users in teams | | `queryLibraryDashboards` | Enables Query Library feature in Dashboards | +| `elasticsearchImprovedParsing` | Enables less memory intensive Elasticsearch result parsing | ## Development feature toggles diff --git a/packages/grafana-data/src/types/featureToggles.gen.ts b/packages/grafana-data/src/types/featureToggles.gen.ts index f1dff9d6a38..6d52f81aeb7 100644 --- a/packages/grafana-data/src/types/featureToggles.gen.ts +++ b/packages/grafana-data/src/types/featureToggles.gen.ts @@ -253,4 +253,5 @@ export interface FeatureToggles { ABTestFeatureToggleA?: boolean; ABTestFeatureToggleB?: boolean; queryLibraryDashboards?: boolean; + elasticsearchImprovedParsing?: boolean; } diff --git a/pkg/services/featuremgmt/registry.go b/pkg/services/featuremgmt/registry.go index f68c5134c7b..95889efea55 100644 --- a/pkg/services/featuremgmt/registry.go +++ b/pkg/services/featuremgmt/registry.go @@ -1750,6 +1750,12 @@ var ( Owner: grafanaFrontendPlatformSquad, AllowSelfServe: false, }, + { + Name: "elasticsearchImprovedParsing", + Description: "Enables less memory intensive Elasticsearch result parsing", + Stage: FeatureStageExperimental, + Owner: awsDatasourcesSquad, + }, } ) diff --git a/pkg/services/featuremgmt/toggles_gen.csv b/pkg/services/featuremgmt/toggles_gen.csv index 2eba503f6f5..7df8b6681cf 100644 --- a/pkg/services/featuremgmt/toggles_gen.csv +++ b/pkg/services/featuremgmt/toggles_gen.csv @@ -234,3 +234,4 @@ teamHttpHeadersMimir,experimental,@grafana/identity-access-team,false,false,fals ABTestFeatureToggleA,experimental,@grafana/sharing-squad,false,false,false ABTestFeatureToggleB,experimental,@grafana/sharing-squad,false,false,false queryLibraryDashboards,experimental,@grafana/grafana-frontend-platform,false,false,false +elasticsearchImprovedParsing,experimental,@grafana/aws-datasources,false,false,false diff --git a/pkg/services/featuremgmt/toggles_gen.go b/pkg/services/featuremgmt/toggles_gen.go index f74b6bcdb13..c309f0e85fb 100644 --- a/pkg/services/featuremgmt/toggles_gen.go +++ b/pkg/services/featuremgmt/toggles_gen.go @@ -946,4 +946,8 @@ const ( // FlagQueryLibraryDashboards // Enables Query Library feature in Dashboards FlagQueryLibraryDashboards = "queryLibraryDashboards" + + // FlagElasticsearchImprovedParsing + // Enables less memory intensive Elasticsearch result parsing + FlagElasticsearchImprovedParsing = "elasticsearchImprovedParsing" ) diff --git a/pkg/services/featuremgmt/toggles_gen.json b/pkg/services/featuremgmt/toggles_gen.json index 0ee59b47d73..8d95b8d6279 100644 --- a/pkg/services/featuremgmt/toggles_gen.json +++ b/pkg/services/featuremgmt/toggles_gen.json @@ -1321,6 +1321,21 @@ "codeowner": "@grafana/aws-datasources" } }, + { + "metadata": { + "name": "elasticsearchImprovedParsing", + "resourceVersion": "1736808262603", + "creationTimestamp": "2025-01-13T20:32:35Z", + "annotations": { + "grafana.app/updatedTimestamp": "2025-01-13 22:44:22.603729 +0000 UTC" + } + }, + "spec": { + "description": "Enables less memory intensive Elasticsearch result parsing", + "stage": "experimental", + "codeowner": "@grafana/aws-datasources" + } + }, { "metadata": { "name": "enableDatagridEditing", diff --git a/pkg/tsdb/elasticsearch/client/client.go b/pkg/tsdb/elasticsearch/client/client.go index 9ab71a8bc84..6bffc08ba72 100644 --- a/pkg/tsdb/elasticsearch/client/client.go +++ b/pkg/tsdb/elasticsearch/client/client.go @@ -6,6 +6,7 @@ import ( "encoding/json" "errors" "fmt" + "io" "net/http" "net/url" "path" @@ -20,6 +21,7 @@ import ( "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/backend/log" "github.com/grafana/grafana-plugin-sdk-go/backend/tracing" + "github.com/grafana/grafana/pkg/services/featuremgmt" ) // Used in logging to mark a stage @@ -202,8 +204,6 @@ func (c *baseClientImpl) ExecuteMultisearch(r *MultiSearchRequest) (*MultiSearch c.logger.Info("Response received from Elasticsearch", "status", "ok", "statusCode", res.StatusCode, "contentLength", res.ContentLength, "duration", time.Since(start), "stage", StageDatabaseRequest) start = time.Now() - var msr MultiSearchResponse - dec := json.NewDecoder(res.Body) _, resSpan := tracing.DefaultTracer().Start(c.ctx, "datasource.elasticsearch.queryData.executeMultisearch.decodeResponse") defer func() { if err != nil { @@ -212,19 +212,218 @@ func (c *baseClientImpl) ExecuteMultisearch(r *MultiSearchRequest) (*MultiSearch } resSpan.End() }() - err = dec.Decode(&msr) + + var msr MultiSearchResponse + improvedParsingEnabled := isFeatureEnabled(c.ctx, featuremgmt.FlagElasticsearchImprovedParsing) + if improvedParsingEnabled { + err = StreamMultiSearchResponse(res.Body, &msr) + } else { + dec := json.NewDecoder(res.Body) + err = dec.Decode(&msr) + } if err != nil { - c.logger.Error("Failed to decode response from Elasticsearch", "error", err, "duration", time.Since(start)) + c.logger.Error("Failed to decode response from Elasticsearch", "error", err, "duration", time.Since(start), "improvedParsingEnabled", improvedParsingEnabled) return nil, err } - c.logger.Debug("Completed decoding of response from Elasticsearch", "duration", time.Since(start)) + c.logger.Debug("Completed decoding of response from Elasticsearch", "duration", time.Since(start), "improvedParsingEnabled", improvedParsingEnabled) msr.Status = res.StatusCode return &msr, nil } +// StreamMultiSearchResponse processes the JSON response in a streaming fashion +func StreamMultiSearchResponse(body io.Reader, msr *MultiSearchResponse) error { + dec := json.NewDecoder(body) + + _, err := dec.Token() // reads the `{` opening brace + if err != nil { + return err + } + + for dec.More() { + tok, err := dec.Token() + if err != nil { + return err + } + + if tok == "responses" { + _, err := dec.Token() // reads the `[` opening bracket for responses array + if err != nil { + return err + } + + for dec.More() { + var sr SearchResponse + + _, err := dec.Token() // reads `{` for each SearchResponse + if err != nil { + return err + } + + for dec.More() { + field, err := dec.Token() + if err != nil { + return err + } + + switch field { + case "hits": + sr.Hits = &SearchResponseHits{} + err := processHits(dec, &sr) + if err != nil { + return err + } + case "aggregations": + err := dec.Decode(&sr.Aggregations) + if err != nil { + return err + } + case "error": + err := dec.Decode(&sr.Error) + if err != nil { + return err + } + default: + // skip over unknown fields + err := skipUnknownField(dec) + if err != nil { + return err + } + } + } + + msr.Responses = append(msr.Responses, &sr) + + _, err = dec.Token() // reads `}` closing for each SearchResponse + if err != nil { + return err + } + } + + _, err = dec.Token() // reads the `]` closing bracket for responses array + if err != nil { + return err + } + } else { + err := skipUnknownField(dec) + if err != nil { + return err + } + } + } + + _, err = dec.Token() // reads the `}` closing brace for the entire JSON + return err +} + +// processHits processes the hits in the JSON response incrementally. +func processHits(dec *json.Decoder, sr *SearchResponse) error { + tok, err := dec.Token() // reads the `{` opening brace for the hits object + if err != nil { + return err + } + + if tok != json.Delim('{') { + return fmt.Errorf("expected '{' for hits object, got %v", tok) + } + + for dec.More() { + tok, err := dec.Token() + if err != nil { + return err + } + + if tok == "hits" { + if err := streamHitsArray(dec, sr); err != nil { + return err + } + } else { + // ignore these fields as they are not used in the current implementation + err := skipUnknownField(dec) + if err != nil { + return err + } + } + } + + // read the closing `}` for the hits object + _, err = dec.Token() + if err != nil { + return err + } + + return nil +} + +// streamHitsArray processes the hits array field incrementally. +func streamHitsArray(dec *json.Decoder, sr *SearchResponse) error { + tok, err := dec.Token() + if err != nil { + return err + } + + // read the opening `[` for the hits array + if tok != json.Delim('[') { + return fmt.Errorf("expected '[' for hits array, got %v", tok) + } + + for dec.More() { + var hit map[string]interface{} + err = dec.Decode(&hit) + if err != nil { + return err + } + + sr.Hits.Hits = append(sr.Hits.Hits, hit) + } + + // read the closing bracket `]` for the hits array + tok, err = dec.Token() + if err != nil { + return err + } + + if tok != json.Delim(']') { + return fmt.Errorf("expected ']' for closing hits array, got %v", tok) + } + + return nil +} + +// skipUnknownField skips over an unknown JSON field's value in the stream. +func skipUnknownField(dec *json.Decoder) error { + tok, err := dec.Token() + if err != nil { + return err + } + + switch tok { + case json.Delim('{'): + // skip everything inside the object until we reach the closing `}` + for dec.More() { + if err := skipUnknownField(dec); err != nil { + return err + } + } + _, err = dec.Token() // read the closing `}` + return err + case json.Delim('['): + // skip everything inside the array until we reach the closing `]` + for dec.More() { + if err := skipUnknownField(dec); err != nil { + return err + } + } + _, err = dec.Token() // read the closing `]` + return err + default: + // no further action needed for primitives + return nil + } +} + func (c *baseClientImpl) createMultiSearchRequests(searchRequests []*SearchRequest) []*multiRequest { multiRequests := []*multiRequest{} @@ -264,3 +463,7 @@ func (c *baseClientImpl) getMultiSearchQueryParameters() string { func (c *baseClientImpl) MultiSearch() *MultiSearchRequestBuilder { return NewMultiSearchRequestBuilder() } + +func isFeatureEnabled(ctx context.Context, feature string) bool { + return backend.GrafanaConfigFromContext(ctx).FeatureToggles().IsEnabled(feature) +} diff --git a/pkg/tsdb/elasticsearch/client/client_test.go b/pkg/tsdb/elasticsearch/client/client_test.go index 086379d0805..447a9bb09e0 100644 --- a/pkg/tsdb/elasticsearch/client/client_test.go +++ b/pkg/tsdb/elasticsearch/client/client_test.go @@ -6,6 +6,7 @@ import ( "io" "net/http" "net/http/httptest" + "strings" "testing" "time" @@ -286,6 +287,129 @@ func TestClient_Index(t *testing.T) { } } +func TestStreamMultiSearchResponse_Success(t *testing.T) { + jsonBody := ` + { + "responses": [ + { "hits": { "hits": [] } }, + { "hits": { "hits": [] } } + ] + }` + + msr := &MultiSearchResponse{} + err := StreamMultiSearchResponse(strings.NewReader(jsonBody), msr) + + if err != nil { + t.Fatalf("expected no error, got %v", err) + } + + if len(msr.Responses) != 2 { + t.Errorf("expected 2 responses, got %d", len(msr.Responses)) + } +} + +func TestStreamMultiSearchResponse_MalformedJSON(t *testing.T) { + jsonBody := ` + { + "responses": [ + { "hits": { "hits": [] } } + ` // Missing closing braces + + msr := &MultiSearchResponse{} + err := StreamMultiSearchResponse(strings.NewReader(jsonBody), msr) + + if err == nil { + t.Fatalf("expected an error, got none") + } +} + +func TestStreamMultiSearchResponse_MissingResponses(t *testing.T) { + jsonBody := ` + { + "something_else": [ + { "hits": { "hits": [] } } + ] + }` + + msr := &MultiSearchResponse{} + err := StreamMultiSearchResponse(strings.NewReader(jsonBody), msr) + + if err != nil { + t.Fatalf("expected no error, got %v", err) + } + + if len(msr.Responses) != 0 { + t.Errorf("expected 0 responses, got %d", len(msr.Responses)) + } +} + +func TestStreamMultiSearchResponse_EmptyBody(t *testing.T) { + jsonBody := `{}` + + msr := &MultiSearchResponse{} + err := StreamMultiSearchResponse(strings.NewReader(jsonBody), msr) + + if err != nil { + t.Fatalf("expected no error, got %v", err) + } + + if len(msr.Responses) != 0 { + t.Errorf("expected 0 responses, got %d", len(msr.Responses)) + } +} + +func TestStreamMultiSearchResponse_InvalidJSONStart(t *testing.T) { + jsonBody := `invalid_json` + + msr := &MultiSearchResponse{} + err := StreamMultiSearchResponse(strings.NewReader(jsonBody), msr) + + if err == nil { + t.Fatalf("expected an error due to invalid JSON, got none") + } +} + +func TestStreamMultiSearchResponse_InvalidHitsField(t *testing.T) { + jsonBody := ` + { + "responses": [ + { "hits": "invalid_string_value" } + ] + }` + + msr := &MultiSearchResponse{} + err := StreamMultiSearchResponse(strings.NewReader(jsonBody), msr) + + if err == nil { + t.Fatalf("expected an error due to invalid 'hits' field, got none") + } + + if err.Error() != "expected '{' for hits object, got invalid_string_value" { + t.Errorf("unexpected error message: %v", err) + } +} + +func TestStreamMultiSearchResponse_InvalidHitElement(t *testing.T) { + jsonBody := ` + { + "responses": [ + { "hits": { "hits": ["invalid_element"] } } + ] + }` + + msr := &MultiSearchResponse{} + err := StreamMultiSearchResponse(strings.NewReader(jsonBody), msr) + + if err == nil { + t.Fatalf("expected an error due to invalid element in 'hits' array, got none") + } + + expected := "json: cannot unmarshal string into Go value of type map[string]interface {}" + if err.Error() != expected { + t.Errorf("unexpected error message: expected %v, got %v", expected, err) + } +} + func createMultisearchForTest(t *testing.T, c Client, timeRange backend.TimeRange) (*MultiSearchRequest, error) { t.Helper()