2018-03-23 23:50:16 +08:00
package elasticsearch
import (
2023-05-04 00:09:18 +08:00
"bytes"
2018-03-23 23:50:16 +08:00
"context"
2021-07-15 22:45:59 +08:00
"encoding/json"
"errors"
2018-03-23 23:50:16 +08:00
"fmt"
2023-05-04 00:09:18 +08:00
"io"
"net/http"
"net/url"
"path"
2022-04-11 16:29:49 +08:00
"strconv"
2023-05-05 17:35:30 +08:00
"strings"
2023-09-07 19:54:31 +08:00
"time"
2018-05-23 20:36:41 +08:00
2021-07-15 22:45:59 +08:00
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/datasource"
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
2022-11-02 22:03:50 +08:00
2021-05-20 05:53:41 +08:00
"github.com/grafana/grafana/pkg/infra/httpclient"
2021-07-15 22:45:59 +08:00
"github.com/grafana/grafana/pkg/infra/log"
2023-09-18 16:49:12 +08:00
"github.com/grafana/grafana/pkg/infra/tracing"
2023-09-07 19:54:16 +08:00
ngalertmodels "github.com/grafana/grafana/pkg/services/ngalert/models"
2019-10-02 19:59:05 +08:00
es "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client"
2018-03-23 23:50:16 +08:00
)
2021-07-15 22:45:59 +08:00
var eslog = log . New ( "tsdb.elasticsearch" )
type Service struct {
2022-01-21 01:16:22 +08:00
httpClientProvider httpclient . Provider
2021-08-25 21:11:22 +08:00
im instancemgmt . InstanceManager
2023-09-18 16:49:12 +08:00
tracer tracing . Tracer
logger * log . ConcreteLogger
2018-03-23 23:50:16 +08:00
}
2023-09-18 16:49:12 +08:00
func ProvideService ( httpClientProvider httpclient . Provider , tracer tracing . Tracer ) * Service {
2021-07-15 22:45:59 +08:00
return & Service {
2022-09-26 20:27:46 +08:00
im : datasource . NewInstanceManager ( newInstanceSettings ( httpClientProvider ) ) ,
2022-01-21 01:16:22 +08:00
httpClientProvider : httpClientProvider ,
2023-09-18 16:49:12 +08:00
tracer : tracer ,
logger : eslog ,
2018-03-26 19:48:57 +08:00
}
2021-07-15 22:45:59 +08:00
}
2018-03-26 19:48:57 +08:00
2021-07-15 22:45:59 +08:00
func ( s * Service ) QueryData ( ctx context . Context , req * backend . QueryDataRequest ) ( * backend . QueryDataResponse , error ) {
2023-05-24 16:19:34 +08:00
dsInfo , err := s . getDSInfo ( ctx , req . PluginContext )
2023-09-07 19:54:16 +08:00
_ , fromAlert := req . Headers [ ngalertmodels . FromAlertHeaderName ]
2023-09-18 16:49:12 +08:00
logger := s . logger . FromContext ( ctx ) . New ( "fromAlert" , fromAlert )
2023-09-07 19:54:16 +08:00
2018-03-23 23:50:16 +08:00
if err != nil {
2023-09-07 19:54:16 +08:00
logger . Error ( "Failed to get data source info" , "error" , err )
2021-07-15 22:45:59 +08:00
return & backend . QueryDataResponse { } , err
2018-03-23 23:50:16 +08:00
}
2023-09-18 16:49:12 +08:00
return queryData ( ctx , req . Queries , dsInfo , logger , s . tracer )
2022-11-28 21:59:57 +08:00
}
// separate function to allow testing the whole transformation and query flow
2023-09-18 16:49:12 +08:00
func queryData ( ctx context . Context , queries [ ] backend . DataQuery , dsInfo * es . DatasourceInfo , logger log . Logger , tracer tracing . Tracer ) ( * backend . QueryDataResponse , error ) {
2022-11-28 21:59:57 +08:00
if len ( queries ) == 0 {
2022-05-05 22:16:34 +08:00
return & backend . QueryDataResponse { } , fmt . Errorf ( "query contains no queries" )
}
2023-09-18 16:49:12 +08:00
client , err := es . NewClient ( ctx , dsInfo , queries [ 0 ] . TimeRange , logger , tracer )
2021-07-15 22:45:59 +08:00
if err != nil {
return & backend . QueryDataResponse { } , err
2019-06-25 14:52:17 +08:00
}
2023-09-18 16:49:12 +08:00
query := newElasticsearchDataQuery ( ctx , client , queries , logger , tracer )
2018-05-23 21:09:58 +08:00
return query . execute ( )
2018-03-23 23:50:16 +08:00
}
2021-07-15 22:45:59 +08:00
2022-09-26 20:27:46 +08:00
func newInstanceSettings ( httpClientProvider httpclient . Provider ) datasource . InstanceFactoryFunc {
2023-10-14 00:35:16 +08:00
return func ( _ context . Context , settings backend . DataSourceInstanceSettings ) ( instancemgmt . Instance , error ) {
2023-08-30 23:46:47 +08:00
jsonData := map [ string ] any { }
2021-07-15 22:45:59 +08:00
err := json . Unmarshal ( settings . JSONData , & jsonData )
if err != nil {
return nil , fmt . Errorf ( "error reading settings: %w" , err )
}
2023-10-14 00:35:16 +08:00
httpCliOpts , err := settings . HTTPClientOptions ( )
2021-07-15 22:45:59 +08:00
if err != nil {
return nil , fmt . Errorf ( "error getting http options: %w" , err )
}
2021-09-27 20:32:19 +08:00
// Set SigV4 service namespace
if httpCliOpts . SigV4 != nil {
httpCliOpts . SigV4 . Service = "es"
}
2023-01-23 23:43:55 +08:00
httpCli , err := httpClientProvider . New ( httpCliOpts )
if err != nil {
return nil , err
}
2023-03-28 22:04:56 +08:00
// we used to have a field named `esVersion`, please do not use this name in the future.
2021-07-15 22:45:59 +08:00
timeField , ok := jsonData [ "timeField" ] . ( string )
if ! ok {
return nil , errors . New ( "timeField cannot be cast to string" )
}
if timeField == "" {
return nil , errors . New ( "elasticsearch time field name is required" )
}
2023-03-01 18:50:56 +08:00
logLevelField , ok := jsonData [ "logLevelField" ] . ( string )
if ! ok {
logLevelField = ""
}
logMessageField , ok := jsonData [ "logMessageField" ] . ( string )
if ! ok {
logMessageField = ""
}
2021-07-15 22:45:59 +08:00
interval , ok := jsonData [ "interval" ] . ( string )
if ! ok {
interval = ""
}
timeInterval , ok := jsonData [ "timeInterval" ] . ( string )
if ! ok {
timeInterval = ""
}
2023-04-19 16:30:09 +08:00
index , ok := jsonData [ "index" ] . ( string )
if ! ok {
index = ""
}
if index == "" {
index = settings . Database
}
2022-04-11 16:29:49 +08:00
var maxConcurrentShardRequests float64
switch v := jsonData [ "maxConcurrentShardRequests" ] . ( type ) {
case float64 :
maxConcurrentShardRequests = v
case string :
maxConcurrentShardRequests , err = strconv . ParseFloat ( v , 64 )
if err != nil {
maxConcurrentShardRequests = 256
}
default :
2021-07-15 22:45:59 +08:00
maxConcurrentShardRequests = 256
}
includeFrozen , ok := jsonData [ "includeFrozen" ] . ( bool )
if ! ok {
includeFrozen = false
}
xpack , ok := jsonData [ "xpack" ] . ( bool )
if ! ok {
xpack = false
}
2023-03-01 18:50:56 +08:00
configuredFields := es . ConfiguredFields {
TimeField : timeField ,
LogLevelField : logLevelField ,
LogMessageField : logMessageField ,
}
2021-07-15 22:45:59 +08:00
model := es . DatasourceInfo {
ID : settings . ID ,
URL : settings . URL ,
2022-09-26 20:27:46 +08:00
HTTPClient : httpCli ,
2023-04-19 16:30:09 +08:00
Database : index ,
2021-07-15 22:45:59 +08:00
MaxConcurrentShardRequests : int64 ( maxConcurrentShardRequests ) ,
2023-03-01 18:50:56 +08:00
ConfiguredFields : configuredFields ,
2021-07-15 22:45:59 +08:00
Interval : interval ,
TimeInterval : timeInterval ,
IncludeFrozen : includeFrozen ,
XPack : xpack ,
}
return model , nil
}
}
2023-05-24 16:19:34 +08:00
func ( s * Service ) getDSInfo ( ctx context . Context , pluginCtx backend . PluginContext ) ( * es . DatasourceInfo , error ) {
i , err := s . im . Get ( ctx , pluginCtx )
2021-07-15 22:45:59 +08:00
if err != nil {
return nil , err
}
instance := i . ( es . DatasourceInfo )
return & instance , nil
}
2023-05-04 00:09:18 +08:00
func ( s * Service ) CallResource ( ctx context . Context , req * backend . CallResourceRequest , sender backend . CallResourceResponseSender ) error {
logger := eslog . FromContext ( ctx )
// allowed paths for resource calls:
// - empty string for fetching db version
2023-05-06 16:00:43 +08:00
// - ?/_mapping for fetching index mapping
// - _msearch for executing getTerms queries
if req . Path != "" && ! strings . HasSuffix ( req . Path , "/_mapping" ) && req . Path != "_msearch" {
2023-09-07 19:54:31 +08:00
logger . Error ( "Invalid resource path" , "path" , req . Path )
2023-05-04 00:09:18 +08:00
return fmt . Errorf ( "invalid resource URL: %s" , req . Path )
}
2023-05-24 16:19:34 +08:00
ds , err := s . getDSInfo ( ctx , req . PluginContext )
2023-05-04 00:09:18 +08:00
if err != nil {
2023-09-07 19:54:31 +08:00
logger . Error ( "Failed to get data source info" , "error" , err )
2023-05-04 00:09:18 +08:00
return err
}
esUrl , err := url . Parse ( ds . URL )
if err != nil {
2023-09-07 19:54:31 +08:00
logger . Error ( "Failed to parse data source URL" , "error" , err , "url" , ds . URL )
2023-05-04 00:09:18 +08:00
return err
}
resourcePath , err := url . Parse ( req . Path )
if err != nil {
2023-09-07 19:54:31 +08:00
logger . Error ( "Failed to parse data source path" , "error" , err , "url" , req . Path )
2023-05-04 00:09:18 +08:00
return err
}
// We take the path and the query-string only
esUrl . RawQuery = resourcePath . RawQuery
esUrl . Path = path . Join ( esUrl . Path , resourcePath . Path )
request , err := http . NewRequestWithContext ( ctx , req . Method , esUrl . String ( ) , bytes . NewBuffer ( req . Body ) )
if err != nil {
2023-09-07 19:54:31 +08:00
logger . Error ( "Failed to create request" , "error" , err , "url" , esUrl . String ( ) )
2023-05-04 00:09:18 +08:00
return err
}
2023-09-07 19:54:31 +08:00
logger . Debug ( "Sending request to Elasticsearch" , "resourcePath" , req . Path )
start := time . Now ( )
2023-05-04 00:09:18 +08:00
response , err := ds . HTTPClient . Do ( request )
if err != nil {
2023-09-07 19:54:31 +08:00
status := "error"
if errors . Is ( err , context . Canceled ) {
status = "cancelled"
}
2023-09-08 00:15:24 +08:00
lp := [ ] any { "error" , err , "status" , status , "duration" , time . Since ( start ) , "stage" , es . StageDatabaseRequest , "resourcePath" , req . Path }
if response != nil {
lp = append ( lp , "statusCode" , response . StatusCode )
}
logger . Error ( "Error received from Elasticsearch" , lp ... )
2023-05-04 00:09:18 +08:00
return err
}
2023-09-08 00:15:24 +08:00
logger . Info ( "Response received from Elasticsearch" , "statusCode" , response . StatusCode , "status" , "ok" , "duration" , time . Since ( start ) , "stage" , es . StageDatabaseRequest , "contentLength" , response . Header . Get ( "Content-Length" ) , "resourcePath" , req . Path )
2023-05-04 00:09:18 +08:00
defer func ( ) {
if err := response . Body . Close ( ) ; err != nil {
2023-09-07 19:54:16 +08:00
logger . Warn ( "Failed to close response body" , "error" , err )
2023-05-04 00:09:18 +08:00
}
} ( )
body , err := io . ReadAll ( response . Body )
if err != nil {
2023-09-07 19:54:31 +08:00
logger . Error ( "Error reading response body bytes" , "error" , err )
2023-05-04 00:09:18 +08:00
return err
}
responseHeaders := map [ string ] [ ] string {
"content-type" : { "application/json" } ,
}
if response . Header . Get ( "Content-Encoding" ) != "" {
responseHeaders [ "content-encoding" ] = [ ] string { response . Header . Get ( "Content-Encoding" ) }
}
return sender . Send ( & backend . CallResourceResponse {
Status : response . StatusCode ,
Headers : responseHeaders ,
Body : body ,
} )
}