2018-05-23 20:59:12 +08:00
package es
import (
"bytes"
"context"
"encoding/json"
2023-09-07 19:54:16 +08:00
"errors"
2018-05-23 20:59:12 +08:00
"fmt"
"net/http"
"net/url"
"path"
2018-06-01 01:02:20 +08:00
"strconv"
2018-05-23 20:59:12 +08:00
"strings"
"time"
2021-07-15 22:45:59 +08:00
"github.com/grafana/grafana-plugin-sdk-go/backend"
2022-11-02 22:03:50 +08:00
2019-05-13 14:45:54 +08:00
"github.com/grafana/grafana/pkg/infra/log"
2018-05-23 20:59:12 +08:00
)
2021-07-15 22:45:59 +08:00
type DatasourceInfo struct {
ID int64
2022-09-26 20:27:46 +08:00
HTTPClient * http . Client
2021-07-15 22:45:59 +08:00
URL string
Database string
2023-03-01 18:50:56 +08:00
ConfiguredFields ConfiguredFields
2021-07-15 22:45:59 +08:00
Interval string
TimeInterval string
MaxConcurrentShardRequests int64
IncludeFrozen bool
XPack bool
}
2023-03-01 18:50:56 +08:00
type ConfiguredFields struct {
TimeField string
LogMessageField string
LogLevelField string
}
2018-05-23 20:59:12 +08:00
// Client represents a client which can interact with elasticsearch api
type Client interface {
2023-03-01 18:50:56 +08:00
GetConfiguredFields ( ) ConfiguredFields
2018-05-23 20:59:12 +08:00
ExecuteMultisearch ( r * MultiSearchRequest ) ( * MultiSearchResponse , error )
MultiSearch ( ) * MultiSearchRequestBuilder
}
// NewClient creates a new elasticsearch client
2023-09-07 19:54:16 +08:00
var NewClient = func ( ctx context . Context , ds * DatasourceInfo , timeRange backend . TimeRange , logger log . Logger ) ( Client , error ) {
logger = logger . New ( "entity" , "client" )
2021-07-15 22:45:59 +08:00
ip , err := newIndexPattern ( ds . Interval , ds . Database )
2018-05-23 20:59:12 +08:00
if err != nil {
2023-09-07 19:54:16 +08:00
logger . Error ( "Failed creating index pattern" , "error" , err , "interval" , ds . Interval , "index" , ds . Database )
2018-05-23 20:59:12 +08:00
return nil , err
}
indices , err := ip . GetIndices ( timeRange )
if err != nil {
return nil , err
}
2023-09-07 19:54:16 +08:00
logger . Debug ( "Creating new client" , "configuredFields" , fmt . Sprintf ( "%#v" , ds . ConfiguredFields ) , "indices" , strings . Join ( indices , ", " ) , "interval" , ds . Interval , "index" , ds . Database )
2018-05-23 20:59:12 +08:00
2021-05-11 16:44:00 +08:00
return & baseClientImpl {
2023-03-01 18:50:56 +08:00
logger : logger ,
ctx : ctx ,
ds : ds ,
configuredFields : ds . ConfiguredFields ,
indices : indices ,
timeRange : timeRange ,
2021-05-11 16:44:00 +08:00
} , nil
2018-05-23 20:59:12 +08:00
}
type baseClientImpl struct {
2023-03-01 18:50:56 +08:00
ctx context . Context
ds * DatasourceInfo
configuredFields ConfiguredFields
indices [ ] string
timeRange backend . TimeRange
logger log . Logger
2018-05-23 20:59:12 +08:00
}
2023-03-01 18:50:56 +08:00
func ( c * baseClientImpl ) GetConfiguredFields ( ) ConfiguredFields {
return c . configuredFields
2018-05-23 20:59:12 +08:00
}
type multiRequest struct {
2023-08-30 23:46:47 +08:00
header map [ string ] any
body any
2023-01-05 19:26:27 +08:00
interval time . Duration
2018-05-23 20:59:12 +08:00
}
2022-12-05 17:21:15 +08:00
func ( c * baseClientImpl ) executeBatchRequest ( uriPath , uriQuery string , requests [ ] * multiRequest ) ( * http . Response , error ) {
2018-06-01 01:02:20 +08:00
bytes , err := c . encodeBatchRequests ( requests )
if err != nil {
return nil , err
}
2019-04-25 15:41:13 +08:00
return c . executeRequest ( http . MethodPost , uriPath , uriQuery , bytes )
2018-06-01 01:02:20 +08:00
}
func ( c * baseClientImpl ) encodeBatchRequests ( requests [ ] * multiRequest ) ( [ ] byte , error ) {
2018-05-25 16:31:56 +08:00
start := time . Now ( )
2018-05-23 20:59:12 +08:00
payload := bytes . Buffer { }
for _ , r := range requests {
reqHeader , err := json . Marshal ( r . header )
if err != nil {
return nil , err
}
payload . WriteString ( string ( reqHeader ) + "\n" )
reqBody , err := json . Marshal ( r . body )
if err != nil {
return nil , err
}
2018-06-01 01:02:20 +08:00
body := string ( reqBody )
2020-09-22 22:22:19 +08:00
body = strings . ReplaceAll ( body , "$__interval_ms" , strconv . FormatInt ( r . interval . Milliseconds ( ) , 10 ) )
2023-01-05 19:26:27 +08:00
body = strings . ReplaceAll ( body , "$__interval" , r . interval . String ( ) )
2018-06-01 01:02:20 +08:00
payload . WriteString ( body + "\n" )
2018-05-23 20:59:12 +08:00
}
2018-09-19 05:36:02 +08:00
elapsed := time . Since ( start )
2023-09-07 19:54:16 +08:00
c . logger . Debug ( "Completed encoding of batch requests to json" , "duration" , elapsed )
2018-05-25 16:31:56 +08:00
2018-06-01 01:02:20 +08:00
return payload . Bytes ( ) , nil
2018-05-23 20:59:12 +08:00
}
2022-12-05 17:21:15 +08:00
func ( c * baseClientImpl ) executeRequest ( method , uriPath , uriQuery string , body [ ] byte ) ( * http . Response , error ) {
2023-09-07 19:54:16 +08:00
c . logger . Debug ( "Sending request to Elasticsearch" , "url" , c . ds . URL )
2021-07-15 22:45:59 +08:00
u , err := url . Parse ( c . ds . URL )
2020-06-22 22:34:40 +08:00
if err != nil {
return nil , err
}
2018-05-23 20:59:12 +08:00
u . Path = path . Join ( u . Path , uriPath )
2019-04-25 15:41:13 +08:00
u . RawQuery = uriQuery
2018-05-23 20:59:12 +08:00
var req * http . Request
if method == http . MethodPost {
2022-04-12 02:20:10 +08:00
req , err = http . NewRequestWithContext ( c . ctx , http . MethodPost , u . String ( ) , bytes . NewBuffer ( body ) )
2018-05-23 20:59:12 +08:00
} else {
2022-04-12 02:20:10 +08:00
req , err = http . NewRequestWithContext ( c . ctx , http . MethodGet , u . String ( ) , nil )
2018-05-23 20:59:12 +08:00
}
if err != nil {
return nil , err
}
2018-05-25 16:31:56 +08:00
2021-03-30 06:41:45 +08:00
req . Header . Set ( "Content-Type" , "application/x-ndjson" )
2018-05-23 20:59:12 +08:00
2020-06-29 20:08:32 +08:00
//nolint:bodyclose
2022-09-26 20:27:46 +08:00
resp , err := c . ds . HTTPClient . Do ( req )
2020-06-29 20:08:32 +08:00
if err != nil {
return nil , err
}
2022-12-05 17:21:15 +08:00
return resp , nil
2018-05-23 20:59:12 +08:00
}
func ( c * baseClientImpl ) ExecuteMultisearch ( r * MultiSearchRequest ) ( * MultiSearchResponse , error ) {
multiRequests := c . createMultiSearchRequests ( r . Requests )
2019-04-25 15:41:13 +08:00
queryParams := c . getMultiSearchQueryParameters ( )
2023-09-07 19:54:16 +08:00
start := time . Now ( )
2019-06-25 14:52:17 +08:00
clientRes , err := c . executeBatchRequest ( "_msearch" , queryParams , multiRequests )
2018-05-23 20:59:12 +08:00
if err != nil {
2023-09-07 19:54:16 +08:00
status := "error"
if errors . Is ( err , context . Canceled ) {
status = "cancelled"
}
c . logger . Error ( "Error received from Elasticsearch" , "error" , err , "status" , status , "statusCode" , clientRes . StatusCode , "duration" , time . Since ( start ) , "action" , "databaseRequest" )
2018-05-23 20:59:12 +08:00
return nil , err
}
2022-12-05 17:21:15 +08:00
res := clientRes
2020-12-15 16:32:06 +08:00
defer func ( ) {
if err := res . Body . Close ( ) ; err != nil {
2023-09-07 19:54:16 +08:00
c . logger . Warn ( "Failed to close response body" , "error" , err )
2020-12-15 16:32:06 +08:00
}
} ( )
2018-05-23 20:59:12 +08:00
2023-09-07 19:54:16 +08:00
c . logger . Info ( "Response received from Elasticsearch" , "status" , "ok" , "statusCode" , res . StatusCode , "contentLength" , res . ContentLength , "duration" , time . Since ( start ) , "action" , "databaseRequest" )
2018-05-25 16:31:56 +08:00
2023-09-07 19:54:16 +08:00
start = time . Now ( )
2018-05-23 20:59:12 +08:00
var msr MultiSearchResponse
dec := json . NewDecoder ( res . Body )
err = dec . Decode ( & msr )
if err != nil {
2023-09-07 19:54:16 +08:00
c . logger . Error ( "Failed to decode response from Elasticsearch" , "error" , err , "duration" , time . Since ( start ) )
2018-05-23 20:59:12 +08:00
return nil , err
}
2023-09-07 19:54:16 +08:00
c . logger . Debug ( "Completed decoding of response from Elasticsearch" , "duration" , time . Since ( start ) )
2018-05-23 20:59:12 +08:00
2018-07-10 14:25:32 +08:00
msr . Status = res . StatusCode
2018-05-23 20:59:12 +08:00
return & msr , nil
}
func ( c * baseClientImpl ) createMultiSearchRequests ( searchRequests [ ] * SearchRequest ) [ ] * multiRequest {
multiRequests := [ ] * multiRequest { }
for _ , searchReq := range searchRequests {
2018-06-01 01:02:20 +08:00
mr := multiRequest {
2023-08-30 23:46:47 +08:00
header : map [ string ] any {
2018-05-23 20:59:12 +08:00
"search_type" : "query_then_fetch" ,
"ignore_unavailable" : true ,
2023-07-11 15:47:16 +08:00
"index" : strings . Join ( c . indices , "," ) ,
2018-05-23 20:59:12 +08:00
} ,
2018-06-01 01:02:20 +08:00
body : searchReq ,
interval : searchReq . Interval ,
}
2018-05-23 20:59:12 +08:00
2018-06-01 01:02:20 +08:00
multiRequests = append ( multiRequests , & mr )
2018-05-23 20:59:12 +08:00
}
return multiRequests
}
2019-04-25 15:41:13 +08:00
func ( c * baseClientImpl ) getMultiSearchQueryParameters ( ) string {
2021-07-15 21:52:02 +08:00
var qs [ ] string
2022-08-22 22:25:20 +08:00
maxConcurrentShardRequests := c . ds . MaxConcurrentShardRequests
if maxConcurrentShardRequests == 0 {
maxConcurrentShardRequests = 5
2021-07-15 21:52:02 +08:00
}
2022-08-22 22:25:20 +08:00
qs = append ( qs , fmt . Sprintf ( "max_concurrent_shard_requests=%d" , maxConcurrentShardRequests ) )
2021-07-15 21:52:02 +08:00
2022-08-22 22:25:20 +08:00
if c . ds . IncludeFrozen && c . ds . XPack {
2021-07-15 21:52:02 +08:00
qs = append ( qs , "ignore_throttled=false" )
2019-04-25 15:41:13 +08:00
}
2021-07-15 21:52:02 +08:00
return strings . Join ( qs , "&" )
2019-04-25 15:41:13 +08:00
}
2018-05-23 20:59:12 +08:00
func ( c * baseClientImpl ) MultiSearch ( ) * MultiSearchRequestBuilder {
2022-08-22 22:25:20 +08:00
return NewMultiSearchRequestBuilder ( )
2018-05-23 20:59:12 +08:00
}