2018-05-23 20:59:12 +08:00
package es
import (
"bytes"
"context"
"encoding/json"
2023-09-07 19:54:16 +08:00
"errors"
2018-05-23 20:59:12 +08:00
"fmt"
2025-01-16 01:05:54 +08:00
"io"
2018-05-23 20:59:12 +08:00
"net/http"
"net/url"
"path"
2018-06-01 01:02:20 +08:00
"strconv"
2018-05-23 20:59:12 +08:00
"strings"
"time"
2023-09-18 16:49:12 +08:00
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
2023-10-03 20:54:20 +08:00
"go.opentelemetry.io/otel/trace"
2022-11-02 22:03:50 +08:00
2024-12-12 17:12:03 +08:00
"github.com/grafana/grafana-plugin-sdk-go/backend"
2024-07-19 14:51:18 +08:00
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
2024-07-19 15:26:10 +08:00
"github.com/grafana/grafana-plugin-sdk-go/backend/tracing"
2025-01-16 01:05:54 +08:00
"github.com/grafana/grafana/pkg/services/featuremgmt"
2018-05-23 20:59:12 +08:00
)
2023-09-08 00:15:24 +08:00
// Used in logging to mark a stage
2024-08-16 21:42:39 +08:00
const (
2023-09-08 00:15:24 +08:00
StagePrepareRequest = "prepareRequest"
StageDatabaseRequest = "databaseRequest"
StageParseResponse = "parseResponse"
)
2021-07-15 22:45:59 +08:00
type DatasourceInfo struct {
ID int64
2022-09-26 20:27:46 +08:00
HTTPClient * http . Client
2021-07-15 22:45:59 +08:00
URL string
Database string
2023-03-01 18:50:56 +08:00
ConfiguredFields ConfiguredFields
2021-07-15 22:45:59 +08:00
Interval string
MaxConcurrentShardRequests int64
IncludeFrozen bool
}
2023-03-01 18:50:56 +08:00
type ConfiguredFields struct {
TimeField string
LogMessageField string
LogLevelField string
}
2018-05-23 20:59:12 +08:00
// Client represents a client which can interact with elasticsearch api
type Client interface {
2023-03-01 18:50:56 +08:00
GetConfiguredFields ( ) ConfiguredFields
2018-05-23 20:59:12 +08:00
ExecuteMultisearch ( r * MultiSearchRequest ) ( * MultiSearchResponse , error )
MultiSearch ( ) * MultiSearchRequestBuilder
}
// NewClient creates a new elasticsearch client
2024-07-19 15:26:10 +08:00
var NewClient = func ( ctx context . Context , ds * DatasourceInfo , logger log . Logger ) ( Client , error ) {
2024-07-19 14:51:18 +08:00
logger = logger . FromContext ( ctx ) . With ( "entity" , "client" )
2023-09-07 19:54:16 +08:00
2025-02-11 01:33:36 +08:00
ip , err := NewIndexPattern ( ds . Interval , ds . Database )
2018-05-23 20:59:12 +08:00
if err != nil {
2023-09-07 19:54:16 +08:00
logger . Error ( "Failed creating index pattern" , "error" , err , "interval" , ds . Interval , "index" , ds . Database )
2018-05-23 20:59:12 +08:00
return nil , err
}
2024-03-13 18:49:35 +08:00
logger . Debug ( "Creating new client" , "configuredFields" , fmt . Sprintf ( "%#v" , ds . ConfiguredFields ) , "interval" , ds . Interval , "index" , ds . Database )
2018-05-23 20:59:12 +08:00
2021-05-11 16:44:00 +08:00
return & baseClientImpl {
2023-03-01 18:50:56 +08:00
logger : logger ,
ctx : ctx ,
ds : ds ,
configuredFields : ds . ConfiguredFields ,
2024-03-13 18:49:35 +08:00
indexPattern : ip ,
2021-05-11 16:44:00 +08:00
} , nil
2018-05-23 20:59:12 +08:00
}
type baseClientImpl struct {
2023-03-01 18:50:56 +08:00
ctx context . Context
ds * DatasourceInfo
configuredFields ConfiguredFields
2024-03-13 18:49:35 +08:00
indexPattern IndexPattern
2023-03-01 18:50:56 +08:00
logger log . Logger
2018-05-23 20:59:12 +08:00
}
2023-03-01 18:50:56 +08:00
func ( c * baseClientImpl ) GetConfiguredFields ( ) ConfiguredFields {
return c . configuredFields
2018-05-23 20:59:12 +08:00
}
type multiRequest struct {
2023-08-30 23:46:47 +08:00
header map [ string ] any
body any
2023-01-05 19:26:27 +08:00
interval time . Duration
2018-05-23 20:59:12 +08:00
}
2022-12-05 17:21:15 +08:00
func ( c * baseClientImpl ) executeBatchRequest ( uriPath , uriQuery string , requests [ ] * multiRequest ) ( * http . Response , error ) {
2018-06-01 01:02:20 +08:00
bytes , err := c . encodeBatchRequests ( requests )
if err != nil {
return nil , err
}
2019-04-25 15:41:13 +08:00
return c . executeRequest ( http . MethodPost , uriPath , uriQuery , bytes )
2018-06-01 01:02:20 +08:00
}
func ( c * baseClientImpl ) encodeBatchRequests ( requests [ ] * multiRequest ) ( [ ] byte , error ) {
2018-05-25 16:31:56 +08:00
start := time . Now ( )
2018-05-23 20:59:12 +08:00
payload := bytes . Buffer { }
for _ , r := range requests {
reqHeader , err := json . Marshal ( r . header )
if err != nil {
return nil , err
}
payload . WriteString ( string ( reqHeader ) + "\n" )
reqBody , err := json . Marshal ( r . body )
if err != nil {
return nil , err
}
2018-06-01 01:02:20 +08:00
body := string ( reqBody )
2020-09-22 22:22:19 +08:00
body = strings . ReplaceAll ( body , "$__interval_ms" , strconv . FormatInt ( r . interval . Milliseconds ( ) , 10 ) )
2023-01-05 19:26:27 +08:00
body = strings . ReplaceAll ( body , "$__interval" , r . interval . String ( ) )
2018-06-01 01:02:20 +08:00
payload . WriteString ( body + "\n" )
2018-05-23 20:59:12 +08:00
}
2018-09-19 05:36:02 +08:00
elapsed := time . Since ( start )
2023-09-07 19:54:16 +08:00
c . logger . Debug ( "Completed encoding of batch requests to json" , "duration" , elapsed )
2018-05-25 16:31:56 +08:00
2018-06-01 01:02:20 +08:00
return payload . Bytes ( ) , nil
2018-05-23 20:59:12 +08:00
}
2022-12-05 17:21:15 +08:00
func ( c * baseClientImpl ) executeRequest ( method , uriPath , uriQuery string , body [ ] byte ) ( * http . Response , error ) {
2023-09-07 19:54:16 +08:00
c . logger . Debug ( "Sending request to Elasticsearch" , "url" , c . ds . URL )
2021-07-15 22:45:59 +08:00
u , err := url . Parse ( c . ds . URL )
2020-06-22 22:34:40 +08:00
if err != nil {
2025-02-20 17:04:48 +08:00
return nil , backend . DownstreamError ( fmt . Errorf ( "URL could not be parsed: %w" , err ) )
2020-06-22 22:34:40 +08:00
}
2018-05-23 20:59:12 +08:00
u . Path = path . Join ( u . Path , uriPath )
2019-04-25 15:41:13 +08:00
u . RawQuery = uriQuery
2018-05-23 20:59:12 +08:00
var req * http . Request
if method == http . MethodPost {
2022-04-12 02:20:10 +08:00
req , err = http . NewRequestWithContext ( c . ctx , http . MethodPost , u . String ( ) , bytes . NewBuffer ( body ) )
2018-05-23 20:59:12 +08:00
} else {
2022-04-12 02:20:10 +08:00
req , err = http . NewRequestWithContext ( c . ctx , http . MethodGet , u . String ( ) , nil )
2018-05-23 20:59:12 +08:00
}
if err != nil {
return nil , err
}
2018-05-25 16:31:56 +08:00
2021-03-30 06:41:45 +08:00
req . Header . Set ( "Content-Type" , "application/x-ndjson" )
2018-05-23 20:59:12 +08:00
2020-06-29 20:08:32 +08:00
//nolint:bodyclose
2022-09-26 20:27:46 +08:00
resp , err := c . ds . HTTPClient . Do ( req )
2020-06-29 20:08:32 +08:00
if err != nil {
return nil , err
}
2022-12-05 17:21:15 +08:00
return resp , nil
2018-05-23 20:59:12 +08:00
}
func ( c * baseClientImpl ) ExecuteMultisearch ( r * MultiSearchRequest ) ( * MultiSearchResponse , error ) {
2023-09-18 16:49:12 +08:00
var err error
2018-05-23 20:59:12 +08:00
multiRequests := c . createMultiSearchRequests ( r . Requests )
2019-04-25 15:41:13 +08:00
queryParams := c . getMultiSearchQueryParameters ( )
2024-07-19 15:26:10 +08:00
_ , span := tracing . DefaultTracer ( ) . Start ( c . ctx , "datasource.elasticsearch.queryData.executeMultisearch" , trace . WithAttributes (
2023-10-03 20:54:20 +08:00
attribute . String ( "queryParams" , queryParams ) ,
attribute . String ( "url" , c . ds . URL ) ,
) )
2023-09-18 16:49:12 +08:00
defer func ( ) {
if err != nil {
span . RecordError ( err )
span . SetStatus ( codes . Error , err . Error ( ) )
}
span . End ( )
} ( )
2023-09-07 19:54:16 +08:00
start := time . Now ( )
2019-06-25 14:52:17 +08:00
clientRes , err := c . executeBatchRequest ( "_msearch" , queryParams , multiRequests )
2018-05-23 20:59:12 +08:00
if err != nil {
2023-09-07 19:54:16 +08:00
status := "error"
if errors . Is ( err , context . Canceled ) {
status = "cancelled"
}
2023-09-08 00:15:24 +08:00
lp := [ ] any { "error" , err , "status" , status , "duration" , time . Since ( start ) , "stage" , StageDatabaseRequest }
2024-12-12 17:12:03 +08:00
sourceErr := backend . ErrorWithSource { }
2024-02-16 23:28:46 +08:00
if errors . As ( err , & sourceErr ) {
2024-12-12 17:12:03 +08:00
lp = append ( lp , "statusSource" , sourceErr . ErrorSource ( ) )
2024-02-16 23:28:46 +08:00
}
2023-09-08 00:15:24 +08:00
if clientRes != nil {
lp = append ( lp , "statusCode" , clientRes . StatusCode )
}
c . logger . Error ( "Error received from Elasticsearch" , lp ... )
2018-05-23 20:59:12 +08:00
return nil , err
}
2022-12-05 17:21:15 +08:00
res := clientRes
2020-12-15 16:32:06 +08:00
defer func ( ) {
if err := res . Body . Close ( ) ; err != nil {
2023-09-07 19:54:16 +08:00
c . logger . Warn ( "Failed to close response body" , "error" , err )
2020-12-15 16:32:06 +08:00
}
} ( )
2018-05-23 20:59:12 +08:00
2023-09-08 00:15:24 +08:00
c . logger . Info ( "Response received from Elasticsearch" , "status" , "ok" , "statusCode" , res . StatusCode , "contentLength" , res . ContentLength , "duration" , time . Since ( start ) , "stage" , StageDatabaseRequest )
2018-05-25 16:31:56 +08:00
2023-09-07 19:54:16 +08:00
start = time . Now ( )
2024-07-19 15:26:10 +08:00
_ , resSpan := tracing . DefaultTracer ( ) . Start ( c . ctx , "datasource.elasticsearch.queryData.executeMultisearch.decodeResponse" )
2023-09-18 16:49:12 +08:00
defer func ( ) {
if err != nil {
resSpan . RecordError ( err )
resSpan . SetStatus ( codes . Error , err . Error ( ) )
}
resSpan . End ( )
} ( )
2025-01-16 01:05:54 +08:00
var msr MultiSearchResponse
improvedParsingEnabled := isFeatureEnabled ( c . ctx , featuremgmt . FlagElasticsearchImprovedParsing )
if improvedParsingEnabled {
err = StreamMultiSearchResponse ( res . Body , & msr )
} else {
dec := json . NewDecoder ( res . Body )
err = dec . Decode ( & msr )
}
2018-05-23 20:59:12 +08:00
if err != nil {
2025-01-16 01:05:54 +08:00
c . logger . Error ( "Failed to decode response from Elasticsearch" , "error" , err , "duration" , time . Since ( start ) , "improvedParsingEnabled" , improvedParsingEnabled )
2018-05-23 20:59:12 +08:00
return nil , err
}
2025-01-16 01:05:54 +08:00
c . logger . Debug ( "Completed decoding of response from Elasticsearch" , "duration" , time . Since ( start ) , "improvedParsingEnabled" , improvedParsingEnabled )
2018-05-23 20:59:12 +08:00
2018-07-10 14:25:32 +08:00
msr . Status = res . StatusCode
2018-05-23 20:59:12 +08:00
return & msr , nil
}
2025-01-16 01:05:54 +08:00
// StreamMultiSearchResponse processes the JSON response in a streaming fashion
func StreamMultiSearchResponse ( body io . Reader , msr * MultiSearchResponse ) error {
dec := json . NewDecoder ( body )
_ , err := dec . Token ( ) // reads the `{` opening brace
if err != nil {
return err
}
for dec . More ( ) {
tok , err := dec . Token ( )
if err != nil {
return err
}
if tok == "responses" {
_ , err := dec . Token ( ) // reads the `[` opening bracket for responses array
if err != nil {
return err
}
for dec . More ( ) {
var sr SearchResponse
_ , err := dec . Token ( ) // reads `{` for each SearchResponse
if err != nil {
return err
}
for dec . More ( ) {
field , err := dec . Token ( )
if err != nil {
return err
}
switch field {
case "hits" :
sr . Hits = & SearchResponseHits { }
err := processHits ( dec , & sr )
if err != nil {
return err
}
case "aggregations" :
err := dec . Decode ( & sr . Aggregations )
if err != nil {
return err
}
case "error" :
err := dec . Decode ( & sr . Error )
if err != nil {
return err
}
default :
// skip over unknown fields
err := skipUnknownField ( dec )
if err != nil {
return err
}
}
}
msr . Responses = append ( msr . Responses , & sr )
_ , err = dec . Token ( ) // reads `}` closing for each SearchResponse
if err != nil {
return err
}
}
_ , err = dec . Token ( ) // reads the `]` closing bracket for responses array
if err != nil {
return err
}
} else {
err := skipUnknownField ( dec )
if err != nil {
return err
}
}
}
_ , err = dec . Token ( ) // reads the `}` closing brace for the entire JSON
return err
}
// processHits processes the hits in the JSON response incrementally.
func processHits ( dec * json . Decoder , sr * SearchResponse ) error {
tok , err := dec . Token ( ) // reads the `{` opening brace for the hits object
if err != nil {
return err
}
if tok != json . Delim ( '{' ) {
return fmt . Errorf ( "expected '{' for hits object, got %v" , tok )
}
for dec . More ( ) {
tok , err := dec . Token ( )
if err != nil {
return err
}
if tok == "hits" {
if err := streamHitsArray ( dec , sr ) ; err != nil {
return err
}
} else {
// ignore these fields as they are not used in the current implementation
err := skipUnknownField ( dec )
if err != nil {
return err
}
}
}
// read the closing `}` for the hits object
_ , err = dec . Token ( )
if err != nil {
return err
}
return nil
}
// streamHitsArray processes the hits array field incrementally.
func streamHitsArray ( dec * json . Decoder , sr * SearchResponse ) error {
tok , err := dec . Token ( )
if err != nil {
return err
}
// read the opening `[` for the hits array
if tok != json . Delim ( '[' ) {
return fmt . Errorf ( "expected '[' for hits array, got %v" , tok )
}
for dec . More ( ) {
var hit map [ string ] interface { }
err = dec . Decode ( & hit )
if err != nil {
return err
}
sr . Hits . Hits = append ( sr . Hits . Hits , hit )
}
// read the closing bracket `]` for the hits array
tok , err = dec . Token ( )
if err != nil {
return err
}
if tok != json . Delim ( ']' ) {
return fmt . Errorf ( "expected ']' for closing hits array, got %v" , tok )
}
return nil
}
// skipUnknownField skips over an unknown JSON field's value in the stream.
func skipUnknownField ( dec * json . Decoder ) error {
tok , err := dec . Token ( )
if err != nil {
return err
}
switch tok {
case json . Delim ( '{' ) :
// skip everything inside the object until we reach the closing `}`
for dec . More ( ) {
if err := skipUnknownField ( dec ) ; err != nil {
return err
}
}
_ , err = dec . Token ( ) // read the closing `}`
return err
case json . Delim ( '[' ) :
// skip everything inside the array until we reach the closing `]`
for dec . More ( ) {
if err := skipUnknownField ( dec ) ; err != nil {
return err
}
}
_ , err = dec . Token ( ) // read the closing `]`
return err
default :
// no further action needed for primitives
return nil
}
}
2018-05-23 20:59:12 +08:00
func ( c * baseClientImpl ) createMultiSearchRequests ( searchRequests [ ] * SearchRequest ) [ ] * multiRequest {
multiRequests := [ ] * multiRequest { }
for _ , searchReq := range searchRequests {
2024-03-13 18:49:35 +08:00
indices , err := c . indexPattern . GetIndices ( searchReq . TimeRange )
if err != nil {
c . logger . Error ( "Failed to get indices from index pattern" , "error" , err )
continue
}
2018-06-01 01:02:20 +08:00
mr := multiRequest {
2023-08-30 23:46:47 +08:00
header : map [ string ] any {
2018-05-23 20:59:12 +08:00
"search_type" : "query_then_fetch" ,
"ignore_unavailable" : true ,
2024-03-13 18:49:35 +08:00
"index" : strings . Join ( indices , "," ) ,
2018-05-23 20:59:12 +08:00
} ,
2018-06-01 01:02:20 +08:00
body : searchReq ,
interval : searchReq . Interval ,
}
2018-05-23 20:59:12 +08:00
2018-06-01 01:02:20 +08:00
multiRequests = append ( multiRequests , & mr )
2018-05-23 20:59:12 +08:00
}
return multiRequests
}
2019-04-25 15:41:13 +08:00
func ( c * baseClientImpl ) getMultiSearchQueryParameters ( ) string {
2021-07-15 21:52:02 +08:00
var qs [ ] string
2024-05-13 21:39:37 +08:00
qs = append ( qs , fmt . Sprintf ( "max_concurrent_shard_requests=%d" , c . ds . MaxConcurrentShardRequests ) )
2021-07-15 21:52:02 +08:00
2024-03-20 16:59:26 +08:00
if c . ds . IncludeFrozen {
2021-07-15 21:52:02 +08:00
qs = append ( qs , "ignore_throttled=false" )
2019-04-25 15:41:13 +08:00
}
2021-07-15 21:52:02 +08:00
return strings . Join ( qs , "&" )
2019-04-25 15:41:13 +08:00
}
2018-05-23 20:59:12 +08:00
func ( c * baseClientImpl ) MultiSearch ( ) * MultiSearchRequestBuilder {
2022-08-22 22:25:20 +08:00
return NewMultiSearchRequestBuilder ( )
2018-05-23 20:59:12 +08:00
}
2025-01-16 01:05:54 +08:00
func isFeatureEnabled ( ctx context . Context , feature string ) bool {
return backend . GrafanaConfigFromContext ( ctx ) . FeatureToggles ( ) . IsEnabled ( feature )
}