2018-03-23 23:50:16 +08:00
package elasticsearch
import (
2023-02-22 20:28:43 +08:00
"encoding/json"
2018-03-23 23:50:16 +08:00
"errors"
2018-03-25 02:18:28 +08:00
"regexp"
2018-05-23 21:09:58 +08:00
"sort"
2018-03-26 19:48:57 +08:00
"strconv"
2018-03-25 02:18:28 +08:00
"strings"
2021-06-18 18:26:19 +08:00
"time"
2018-05-23 21:09:58 +08:00
2021-07-15 22:45:59 +08:00
"github.com/grafana/grafana-plugin-sdk-go/backend"
2021-06-18 18:26:19 +08:00
"github.com/grafana/grafana-plugin-sdk-go/data"
2023-01-30 16:50:27 +08:00
2018-05-23 21:09:58 +08:00
"github.com/grafana/grafana/pkg/components/simplejson"
2020-06-29 20:08:32 +08:00
es "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client"
2018-03-23 23:50:16 +08:00
)
2018-09-22 16:50:00 +08:00
const (
// Metric types
countType = "count"
percentilesType = "percentiles"
extendedStatsType = "extended_stats"
2021-06-04 18:07:59 +08:00
topMetricsType = "top_metrics"
2018-09-22 16:50:00 +08:00
// Bucket types
dateHistType = "date_histogram"
2023-01-27 23:18:36 +08:00
nestedType = "nested"
2018-09-22 16:50:00 +08:00
histogramType = "histogram"
filtersType = "filters"
termsType = "terms"
geohashGridType = "geohash_grid"
2022-12-22 22:08:15 +08:00
// Document types
rawDocumentType = "raw_document"
rawDataType = "raw_data"
// Logs type
logsType = "logs"
2018-09-22 16:50:00 +08:00
)
2023-03-01 18:50:56 +08:00
func parseResponse ( responses [ ] * es . SearchResponse , targets [ ] * Query , configuredFields es . ConfiguredFields ) ( * backend . QueryDataResponse , error ) {
2021-07-15 22:45:59 +08:00
result := backend . QueryDataResponse {
Responses : backend . Responses { } ,
2021-03-08 14:02:49 +08:00
}
2022-12-12 23:00:15 +08:00
if responses == nil {
2021-07-15 22:45:59 +08:00
return & result , nil
2018-05-23 21:09:58 +08:00
}
2022-12-12 23:00:15 +08:00
for i , res := range responses {
target := targets [ i ]
2018-05-23 21:09:58 +08:00
if res . Error != nil {
2021-07-15 22:45:59 +08:00
errResult := getErrorFromElasticResponse ( res )
result . Responses [ target . RefID ] = backend . DataResponse {
Error : errors . New ( errResult ) ,
2022-12-05 17:21:15 +08:00
}
2018-05-23 21:09:58 +08:00
continue
}
2021-07-15 22:45:59 +08:00
queryRes := backend . DataResponse { }
2023-03-02 16:50:54 +08:00
if isRawDataQuery ( target ) {
err := processRawDataResponse ( res , target , configuredFields , & queryRes )
if err != nil {
return & backend . QueryDataResponse { } , err
}
result . Responses [ target . RefID ] = queryRes
} else if isRawDocumentQuery ( target ) {
err := processRawDocumentResponse ( res , target , & queryRes )
2023-03-01 18:50:56 +08:00
if err != nil {
return & backend . QueryDataResponse { } , err
}
result . Responses [ target . RefID ] = queryRes
} else if isLogsQuery ( target ) {
err := processLogsResponse ( res , target , configuredFields , & queryRes )
2023-02-22 20:28:43 +08:00
if err != nil {
return & backend . QueryDataResponse { } , err
}
result . Responses [ target . RefID ] = queryRes
} else {
2023-03-01 18:50:56 +08:00
// Process as metric query result
2023-02-22 20:28:43 +08:00
props := make ( map [ string ] string )
err := processBuckets ( res . Aggregations , target , & queryRes , props , 0 )
if err != nil {
return & backend . QueryDataResponse { } , err
}
nameFields ( queryRes , target )
trimDatapoints ( queryRes , target )
2018-05-23 21:09:58 +08:00
2023-02-22 20:28:43 +08:00
result . Responses [ target . RefID ] = queryRes
}
2018-03-23 23:50:16 +08:00
}
2021-07-15 22:45:59 +08:00
return & result , nil
2018-03-23 23:50:16 +08:00
}
2023-03-01 18:50:56 +08:00
func processLogsResponse ( res * es . SearchResponse , target * Query , configuredFields es . ConfiguredFields , queryRes * backend . DataResponse ) error {
propNames := make ( map [ string ] bool )
2023-02-22 20:28:43 +08:00
docs := make ( [ ] map [ string ] interface { } , len ( res . Hits . Hits ) )
2023-03-01 18:50:56 +08:00
for hitIdx , hit := range res . Hits . Hits {
var flattened map [ string ] interface { }
if hit [ "_source" ] != nil {
flattened = flatten ( hit [ "_source" ] . ( map [ string ] interface { } ) )
}
doc := map [ string ] interface { } {
"_id" : hit [ "_id" ] ,
"_type" : hit [ "_type" ] ,
"_index" : hit [ "_index" ] ,
"sort" : hit [ "sort" ] ,
"highlight" : hit [ "highlight" ] ,
"_source" : flattened ,
}
for k , v := range flattened {
if configuredFields . LogLevelField != "" && k == configuredFields . LogLevelField {
doc [ "level" ] = v
} else {
doc [ k ] = v
}
}
for key := range doc {
propNames [ key ] = true
}
// TODO: Implement highlighting
docs [ hitIdx ] = doc
}
sortedPropNames := sortPropNames ( propNames , configuredFields , true )
fields := processDocsToDataFrameFields ( docs , sortedPropNames , configuredFields )
frames := data . Frames { }
frame := data . NewFrame ( "" , fields ... )
setPreferredVisType ( frame , "logs" )
frames = append ( frames , frame )
queryRes . Frames = frames
return nil
}
2023-03-02 16:50:54 +08:00
func processRawDataResponse ( res * es . SearchResponse , target * Query , configuredFields es . ConfiguredFields , queryRes * backend . DataResponse ) error {
2023-02-22 20:28:43 +08:00
propNames := make ( map [ string ] bool )
2023-03-01 18:50:56 +08:00
docs := make ( [ ] map [ string ] interface { } , len ( res . Hits . Hits ) )
2023-02-22 20:28:43 +08:00
for hitIdx , hit := range res . Hits . Hits {
var flattened map [ string ] interface { }
if hit [ "_source" ] != nil {
flattened = flatten ( hit [ "_source" ] . ( map [ string ] interface { } ) )
}
doc := map [ string ] interface { } {
"_id" : hit [ "_id" ] ,
"_type" : hit [ "_type" ] ,
"_index" : hit [ "_index" ] ,
"sort" : hit [ "sort" ] ,
"highlight" : hit [ "highlight" ] ,
}
for k , v := range flattened {
doc [ k ] = v
}
for key := range doc {
propNames [ key ] = true
}
docs [ hitIdx ] = doc
}
2023-03-01 18:50:56 +08:00
sortedPropNames := sortPropNames ( propNames , configuredFields , false )
fields := processDocsToDataFrameFields ( docs , sortedPropNames , configuredFields )
frames := data . Frames { }
frame := data . NewFrame ( "" , fields ... )
frames = append ( frames , frame )
queryRes . Frames = frames
2023-03-02 16:50:54 +08:00
return nil
}
func processRawDocumentResponse ( res * es . SearchResponse , target * Query , queryRes * backend . DataResponse ) error {
docs := make ( [ ] map [ string ] interface { } , len ( res . Hits . Hits ) )
for hitIdx , hit := range res . Hits . Hits {
doc := map [ string ] interface { } {
"_id" : hit [ "_id" ] ,
"_type" : hit [ "_type" ] ,
"_index" : hit [ "_index" ] ,
"sort" : hit [ "sort" ] ,
"highlight" : hit [ "highlight" ] ,
}
if hit [ "_source" ] != nil {
source , ok := hit [ "_source" ] . ( map [ string ] interface { } )
if ok {
for k , v := range source {
doc [ k ] = v
}
}
}
if hit [ "fields" ] != nil {
source , ok := hit [ "fields" ] . ( map [ string ] interface { } )
if ok {
for k , v := range source {
doc [ k ] = v
}
}
}
docs [ hitIdx ] = doc
}
fieldVector := make ( [ ] * json . RawMessage , len ( res . Hits . Hits ) )
for i , doc := range docs {
bytes , err := json . Marshal ( doc )
if err != nil {
// We skip docs that can't be marshalled
// should not happen
continue
}
value := json . RawMessage ( bytes )
fieldVector [ i ] = & value
}
isFilterable := true
field := data . NewField ( target . RefID , nil , fieldVector )
field . Config = & data . FieldConfig { Filterable : & isFilterable }
frames := data . Frames { }
frame := data . NewFrame ( target . RefID , field )
frames = append ( frames , frame )
queryRes . Frames = frames
2023-03-01 18:50:56 +08:00
return nil
}
func processDocsToDataFrameFields ( docs [ ] map [ string ] interface { } , propNames [ ] string , configuredFields es . ConfiguredFields ) [ ] * data . Field {
2023-02-22 20:28:43 +08:00
size := len ( docs )
isFilterable := true
allFields := make ( [ ] * data . Field , len ( propNames ) )
2023-03-01 18:50:56 +08:00
for propNameIdx , propName := range propNames {
2023-02-22 20:28:43 +08:00
// Special handling for time field
2023-03-01 18:50:56 +08:00
if propName == configuredFields . TimeField {
2023-02-22 20:28:43 +08:00
timeVector := make ( [ ] * time . Time , size )
for i , doc := range docs {
2023-03-01 18:50:56 +08:00
timeString , ok := doc [ configuredFields . TimeField ] . ( string )
2023-02-22 20:28:43 +08:00
if ! ok {
continue
}
timeValue , err := time . Parse ( time . RFC3339Nano , timeString )
if err != nil {
// We skip time values that cannot be parsed
continue
} else {
timeVector [ i ] = & timeValue
}
}
2023-03-01 18:50:56 +08:00
field := data . NewField ( configuredFields . TimeField , nil , timeVector )
2023-02-22 20:28:43 +08:00
field . Config = & data . FieldConfig { Filterable : & isFilterable }
allFields [ propNameIdx ] = field
continue
}
propNameValue := findTheFirstNonNilDocValueForPropName ( docs , propName )
switch propNameValue . ( type ) {
// We are checking for default data types values (float64, int, bool, string)
// and default to json.RawMessage if we cannot find any of them
case float64 :
allFields [ propNameIdx ] = createFieldOfType [ float64 ] ( docs , propName , size , isFilterable )
case int :
allFields [ propNameIdx ] = createFieldOfType [ int ] ( docs , propName , size , isFilterable )
case string :
allFields [ propNameIdx ] = createFieldOfType [ string ] ( docs , propName , size , isFilterable )
case bool :
allFields [ propNameIdx ] = createFieldOfType [ bool ] ( docs , propName , size , isFilterable )
default :
fieldVector := make ( [ ] * json . RawMessage , size )
for i , doc := range docs {
bytes , err := json . Marshal ( doc [ propName ] )
if err != nil {
// We skip values that cannot be marshalled
continue
}
value := json . RawMessage ( bytes )
fieldVector [ i ] = & value
}
field := data . NewField ( propName , nil , fieldVector )
field . Config = & data . FieldConfig { Filterable : & isFilterable }
allFields [ propNameIdx ] = field
}
}
2023-03-01 18:50:56 +08:00
return allFields
2023-02-22 20:28:43 +08:00
}
2022-12-12 23:00:15 +08:00
func processBuckets ( aggs map [ string ] interface { } , target * Query ,
2021-07-15 22:45:59 +08:00
queryResult * backend . DataResponse , props map [ string ] string , depth int ) error {
2018-03-25 02:18:28 +08:00
var err error
2018-03-23 23:50:16 +08:00
maxDepth := len ( target . BucketAggs ) - 1
2018-05-23 21:09:58 +08:00
aggIDs := make ( [ ] string , 0 )
for k := range aggs {
aggIDs = append ( aggIDs , k )
}
sort . Strings ( aggIDs )
for _ , aggID := range aggIDs {
v := aggs [ aggID ]
aggDef , _ := findAgg ( target , aggID )
2018-03-23 23:50:16 +08:00
esAgg := simplejson . NewFromAny ( v )
if aggDef == nil {
continue
}
2023-01-27 23:18:36 +08:00
if aggDef . Type == nestedType {
err = processBuckets ( esAgg . MustMap ( ) , target , queryResult , props , depth + 1 )
if err != nil {
return err
}
continue
}
2018-03-23 23:50:16 +08:00
if depth == maxDepth {
2018-09-22 16:50:00 +08:00
if aggDef . Type == dateHistType {
2022-12-12 23:00:15 +08:00
err = processMetrics ( esAgg , target , queryResult , props )
2018-03-25 02:18:28 +08:00
} else {
2022-12-12 23:00:15 +08:00
err = processAggregationDocs ( esAgg , aggDef , target , queryResult , props )
2018-05-23 21:09:58 +08:00
}
if err != nil {
return err
2018-03-23 23:50:16 +08:00
}
2018-03-25 02:18:28 +08:00
} else {
2018-05-23 21:09:58 +08:00
for _ , b := range esAgg . Get ( "buckets" ) . MustArray ( ) {
2018-03-25 02:18:28 +08:00
bucket := simplejson . NewFromAny ( b )
2018-09-21 02:05:19 +08:00
newProps := make ( map [ string ] string )
2018-05-23 21:09:58 +08:00
for k , v := range props {
newProps [ k ] = v
}
2018-03-25 02:18:28 +08:00
if key , err := bucket . Get ( "key" ) . String ( ) ; err == nil {
2018-03-28 01:42:25 +08:00
newProps [ aggDef . Field ] = key
2018-05-23 21:09:58 +08:00
} else if key , err := bucket . Get ( "key" ) . Int64 ( ) ; err == nil {
newProps [ aggDef . Field ] = strconv . FormatInt ( key , 10 )
2018-03-25 02:18:28 +08:00
}
2018-03-23 23:50:16 +08:00
2018-03-25 02:18:28 +08:00
if key , err := bucket . Get ( "key_as_string" ) . String ( ) ; err == nil {
2018-05-23 21:09:58 +08:00
newProps [ aggDef . Field ] = key
}
2022-12-12 23:00:15 +08:00
err = processBuckets ( bucket . MustMap ( ) , target , queryResult , newProps , depth + 1 )
2018-05-23 21:09:58 +08:00
if err != nil {
return err
}
}
2018-06-04 21:15:47 +08:00
buckets := esAgg . Get ( "buckets" ) . MustMap ( )
bucketKeys := make ( [ ] string , 0 )
for k := range buckets {
bucketKeys = append ( bucketKeys , k )
}
sort . Strings ( bucketKeys )
for _ , bucketKey := range bucketKeys {
bucket := simplejson . NewFromAny ( buckets [ bucketKey ] )
2018-09-21 02:05:19 +08:00
newProps := make ( map [ string ] string )
2018-05-23 21:09:58 +08:00
for k , v := range props {
newProps [ k ] = v
}
2018-06-04 21:15:47 +08:00
newProps [ "filter" ] = bucketKey
2018-05-23 21:09:58 +08:00
2022-12-12 23:00:15 +08:00
err = processBuckets ( bucket . MustMap ( ) , target , queryResult , newProps , depth + 1 )
2018-05-23 21:09:58 +08:00
if err != nil {
return err
2018-03-25 02:18:28 +08:00
}
}
}
2018-03-23 23:50:16 +08:00
}
2018-03-25 02:18:28 +08:00
return nil
2018-03-23 23:50:16 +08:00
}
2022-12-13 20:19:03 +08:00
func newTimeSeriesFrame ( timeData [ ] time . Time , tags map [ string ] string , values [ ] * float64 ) * data . Frame {
frame := data . NewFrame ( "" ,
data . NewField ( "time" , nil , timeData ) ,
data . NewField ( "value" , tags , values ) )
frame . Meta = & data . FrameMeta {
Type : data . FrameTypeTimeSeriesMulti ,
}
return frame
}
2021-11-10 18:52:16 +08:00
// nolint:gocyclo
2022-12-12 23:00:15 +08:00
func processMetrics ( esAgg * simplejson . Json , target * Query , query * backend . DataResponse ,
2021-03-08 14:02:49 +08:00
props map [ string ] string ) error {
2021-06-18 18:26:19 +08:00
frames := data . Frames { }
esAggBuckets := esAgg . Get ( "buckets" ) . MustArray ( )
2018-03-28 01:42:25 +08:00
for _ , metric := range target . Metrics {
if metric . Hide {
2018-03-23 23:50:16 +08:00
continue
}
2018-03-25 02:18:28 +08:00
2021-06-18 18:26:19 +08:00
tags := make ( map [ string ] string , len ( props ) )
timeVector := make ( [ ] time . Time , 0 , len ( esAggBuckets ) )
values := make ( [ ] * float64 , 0 , len ( esAggBuckets ) )
2018-03-28 01:42:25 +08:00
switch metric . Type {
2018-09-22 16:50:00 +08:00
case countType :
2021-06-18 18:26:19 +08:00
for _ , v := range esAggBuckets {
2018-03-23 23:50:16 +08:00
bucket := simplejson . NewFromAny ( v )
2021-06-18 18:26:19 +08:00
value := castToFloat ( bucket . Get ( "doc_count" ) )
key := castToFloat ( bucket . Get ( "key" ) )
timeVector = append ( timeVector , time . Unix ( int64 ( * key ) / 1000 , 0 ) . UTC ( ) )
values = append ( values , value )
2018-03-23 23:50:16 +08:00
}
2018-05-23 21:09:58 +08:00
for k , v := range props {
2021-06-18 18:26:19 +08:00
tags [ k ] = v
2018-05-23 21:09:58 +08:00
}
2021-06-18 18:26:19 +08:00
tags [ "metric" ] = countType
2022-12-13 20:19:03 +08:00
frames = append ( frames , newTimeSeriesFrame ( timeVector , tags , values ) )
2018-09-22 16:50:00 +08:00
case percentilesType :
2021-06-18 18:26:19 +08:00
buckets := esAggBuckets
2018-03-23 23:50:16 +08:00
if len ( buckets ) == 0 {
break
}
firstBucket := simplejson . NewFromAny ( buckets [ 0 ] )
2018-03-28 01:42:25 +08:00
percentiles := firstBucket . GetPath ( metric . ID , "values" ) . MustMap ( )
2018-03-23 23:50:16 +08:00
2018-05-23 21:09:58 +08:00
percentileKeys := make ( [ ] string , 0 )
for k := range percentiles {
percentileKeys = append ( percentileKeys , k )
}
sort . Strings ( percentileKeys )
for _ , percentileName := range percentileKeys {
2021-06-18 18:26:19 +08:00
tags := make ( map [ string ] string , len ( props ) )
timeVector := make ( [ ] time . Time , 0 , len ( esAggBuckets ) )
values := make ( [ ] * float64 , 0 , len ( esAggBuckets ) )
2018-05-23 21:09:58 +08:00
for k , v := range props {
2021-06-18 18:26:19 +08:00
tags [ k ] = v
2018-05-23 21:09:58 +08:00
}
2021-06-18 18:26:19 +08:00
tags [ "metric" ] = "p" + percentileName
tags [ "field" ] = metric . Field
2018-03-23 23:50:16 +08:00
for _ , v := range buckets {
bucket := simplejson . NewFromAny ( v )
2021-06-18 18:26:19 +08:00
value := castToFloat ( bucket . GetPath ( metric . ID , "values" , percentileName ) )
key := castToFloat ( bucket . Get ( "key" ) )
timeVector = append ( timeVector , time . Unix ( int64 ( * key ) / 1000 , 0 ) . UTC ( ) )
values = append ( values , value )
2018-03-25 02:18:28 +08:00
}
2022-12-13 20:19:03 +08:00
frames = append ( frames , newTimeSeriesFrame ( timeVector , tags , values ) )
2018-03-25 02:18:28 +08:00
}
2021-06-04 18:07:59 +08:00
case topMetricsType :
2021-06-18 18:26:19 +08:00
buckets := esAggBuckets
metrics := metric . Settings . Get ( "metrics" ) . MustArray ( )
for _ , metricField := range metrics {
tags := make ( map [ string ] string , len ( props ) )
timeVector := make ( [ ] time . Time , 0 , len ( esAggBuckets ) )
values := make ( [ ] * float64 , 0 , len ( esAggBuckets ) )
for k , v := range props {
tags [ k ] = v
}
tags [ "field" ] = metricField . ( string )
tags [ "metric" ] = "top_metrics"
for _ , v := range buckets {
bucket := simplejson . NewFromAny ( v )
stats := bucket . GetPath ( metric . ID , "top" )
key := castToFloat ( bucket . Get ( "key" ) )
timeVector = append ( timeVector , time . Unix ( int64 ( * key ) / 1000 , 0 ) . UTC ( ) )
for _ , stat := range stats . MustArray ( ) {
stat := stat . ( map [ string ] interface { } )
metrics , hasMetrics := stat [ "metrics" ]
if hasMetrics {
metrics := metrics . ( map [ string ] interface { } )
metricValue , hasMetricValue := metrics [ metricField . ( string ) ]
if hasMetricValue && metricValue != nil {
v := metricValue . ( float64 )
values = append ( values , & v )
}
}
}
}
2022-12-13 20:19:03 +08:00
frames = append ( frames , newTimeSeriesFrame ( timeVector , tags , values ) )
2021-06-18 18:26:19 +08:00
}
2021-06-04 18:07:59 +08:00
2018-09-22 16:50:00 +08:00
case extendedStatsType :
2021-06-18 18:26:19 +08:00
buckets := esAggBuckets
2018-05-23 21:09:58 +08:00
metaKeys := make ( [ ] string , 0 )
meta := metric . Meta . MustMap ( )
for k := range meta {
metaKeys = append ( metaKeys , k )
}
sort . Strings ( metaKeys )
for _ , statName := range metaKeys {
v := meta [ statName ]
if enabled , ok := v . ( bool ) ; ! ok || ! enabled {
continue
}
2021-06-18 18:26:19 +08:00
tags := make ( map [ string ] string , len ( props ) )
timeVector := make ( [ ] time . Time , 0 , len ( esAggBuckets ) )
values := make ( [ ] * float64 , 0 , len ( esAggBuckets ) )
2018-05-23 21:09:58 +08:00
for k , v := range props {
2021-06-18 18:26:19 +08:00
tags [ k ] = v
2018-05-23 21:09:58 +08:00
}
2021-06-18 18:26:19 +08:00
tags [ "metric" ] = statName
tags [ "field" ] = metric . Field
2018-05-23 21:09:58 +08:00
for _ , v := range buckets {
bucket := simplejson . NewFromAny ( v )
2021-06-18 18:26:19 +08:00
key := castToFloat ( bucket . Get ( "key" ) )
var value * float64
2020-07-16 20:39:01 +08:00
switch statName {
case "std_deviation_bounds_upper" :
2021-06-18 18:26:19 +08:00
value = castToFloat ( bucket . GetPath ( metric . ID , "std_deviation_bounds" , "upper" ) )
2020-07-16 20:39:01 +08:00
case "std_deviation_bounds_lower" :
2021-06-18 18:26:19 +08:00
value = castToFloat ( bucket . GetPath ( metric . ID , "std_deviation_bounds" , "lower" ) )
2020-07-16 20:39:01 +08:00
default :
2021-06-18 18:26:19 +08:00
value = castToFloat ( bucket . GetPath ( metric . ID , statName ) )
2018-05-23 21:09:58 +08:00
}
2021-06-18 18:26:19 +08:00
timeVector = append ( timeVector , time . Unix ( int64 ( * key ) / 1000 , 0 ) . UTC ( ) )
values = append ( values , value )
2018-05-23 21:09:58 +08:00
}
2021-06-18 18:26:19 +08:00
labels := tags
2022-12-13 20:19:03 +08:00
frames = append ( frames , newTimeSeriesFrame ( timeVector , labels , values ) )
2018-05-23 21:09:58 +08:00
}
2018-03-25 02:18:28 +08:00
default :
2018-03-28 12:35:05 +08:00
for k , v := range props {
2021-06-18 18:26:19 +08:00
tags [ k ] = v
2018-03-28 12:35:05 +08:00
}
2021-06-18 18:26:19 +08:00
tags [ "metric" ] = metric . Type
tags [ "field" ] = metric . Field
tags [ "metricId" ] = metric . ID
for _ , v := range esAggBuckets {
2018-03-25 02:18:28 +08:00
bucket := simplejson . NewFromAny ( v )
2021-06-18 18:26:19 +08:00
key := castToFloat ( bucket . Get ( "key" ) )
2018-03-28 01:42:25 +08:00
valueObj , err := bucket . Get ( metric . ID ) . Map ( )
2018-03-25 02:18:28 +08:00
if err != nil {
2018-03-28 12:35:05 +08:00
continue
2018-03-25 02:18:28 +08:00
}
2021-06-18 18:26:19 +08:00
var value * float64
2018-03-25 02:18:28 +08:00
if _ , ok := valueObj [ "normalized_value" ] ; ok {
2021-06-18 18:26:19 +08:00
value = castToFloat ( bucket . GetPath ( metric . ID , "normalized_value" ) )
2018-03-25 02:18:28 +08:00
} else {
2021-06-18 18:26:19 +08:00
value = castToFloat ( bucket . GetPath ( metric . ID , "value" ) )
2018-03-25 02:18:28 +08:00
}
2021-06-18 18:26:19 +08:00
timeVector = append ( timeVector , time . Unix ( int64 ( * key ) / 1000 , 0 ) . UTC ( ) )
values = append ( values , value )
2018-03-25 02:18:28 +08:00
}
2022-12-13 20:19:03 +08:00
frames = append ( frames , newTimeSeriesFrame ( timeVector , tags , values ) )
2021-06-18 18:26:19 +08:00
}
}
2021-07-15 22:45:59 +08:00
if query . Frames != nil {
oldFrames := query . Frames
2021-06-18 18:26:19 +08:00
frames = append ( oldFrames , frames ... )
2018-03-25 02:18:28 +08:00
}
2021-07-15 22:45:59 +08:00
query . Frames = frames
2018-03-25 02:18:28 +08:00
return nil
}
2022-12-12 23:00:15 +08:00
func processAggregationDocs ( esAgg * simplejson . Json , aggDef * BucketAgg , target * Query ,
2021-07-15 22:45:59 +08:00
queryResult * backend . DataResponse , props map [ string ] string ) error {
2018-05-23 21:09:58 +08:00
propKeys := make ( [ ] string , 0 )
for k := range props {
propKeys = append ( propKeys , k )
}
sort . Strings ( propKeys )
2021-06-18 18:26:19 +08:00
frames := data . Frames { }
var fields [ ] * data . Field
2018-05-23 21:09:58 +08:00
2021-07-15 22:45:59 +08:00
if queryResult . Frames == nil {
2018-05-23 21:09:58 +08:00
for _ , propKey := range propKeys {
2021-06-18 18:26:19 +08:00
fields = append ( fields , data . NewField ( propKey , nil , [ ] * string { } ) )
2018-05-23 21:09:58 +08:00
}
}
2021-06-18 18:26:19 +08:00
addMetricValue := func ( values [ ] interface { } , metricName string , value * float64 ) {
index := - 1
for i , f := range fields {
if f . Name == metricName {
index = i
2018-05-23 21:09:58 +08:00
break
}
}
2021-06-18 18:26:19 +08:00
var field data . Field
if index == - 1 {
field = * data . NewField ( metricName , nil , [ ] * float64 { } )
fields = append ( fields , & field )
} else {
field = * fields [ index ]
2018-05-23 21:09:58 +08:00
}
2021-06-18 18:26:19 +08:00
field . Append ( value )
2018-05-23 21:09:58 +08:00
}
for _ , v := range esAgg . Get ( "buckets" ) . MustArray ( ) {
bucket := simplejson . NewFromAny ( v )
2021-06-18 18:26:19 +08:00
var values [ ] interface { }
2018-05-23 21:09:58 +08:00
2021-06-18 18:26:19 +08:00
found := false
for _ , e := range fields {
for _ , propKey := range propKeys {
if e . Name == propKey {
e . Append ( props [ propKey ] )
}
}
if e . Name == aggDef . Field {
found = true
if key , err := bucket . Get ( "key" ) . String ( ) ; err == nil {
e . Append ( & key )
} else {
f , err := bucket . Get ( "key" ) . Float64 ( )
if err != nil {
return err
}
e . Append ( & f )
}
}
2018-05-23 21:09:58 +08:00
}
2021-06-18 18:26:19 +08:00
if ! found {
var aggDefField * data . Field
if key , err := bucket . Get ( "key" ) . String ( ) ; err == nil {
aggDefField = extractDataField ( aggDef . Field , & key )
aggDefField . Append ( & key )
} else {
f , err := bucket . Get ( "key" ) . Float64 ( )
if err != nil {
return err
}
aggDefField = extractDataField ( aggDef . Field , & f )
aggDefField . Append ( & f )
}
fields = append ( fields , aggDefField )
2018-05-23 21:09:58 +08:00
}
for _ , metric := range target . Metrics {
switch metric . Type {
2018-09-22 16:50:00 +08:00
case countType :
2022-12-12 23:00:15 +08:00
addMetricValue ( values , getMetricName ( metric . Type ) , castToFloat ( bucket . Get ( "doc_count" ) ) )
2018-09-22 16:50:00 +08:00
case extendedStatsType :
2018-05-23 21:09:58 +08:00
metaKeys := make ( [ ] string , 0 )
meta := metric . Meta . MustMap ( )
for k := range meta {
metaKeys = append ( metaKeys , k )
}
sort . Strings ( metaKeys )
for _ , statName := range metaKeys {
v := meta [ statName ]
if enabled , ok := v . ( bool ) ; ! ok || ! enabled {
continue
}
2021-06-18 18:26:19 +08:00
var value * float64
2020-07-16 20:39:01 +08:00
switch statName {
case "std_deviation_bounds_upper" :
2021-06-18 18:26:19 +08:00
value = castToFloat ( bucket . GetPath ( metric . ID , "std_deviation_bounds" , "upper" ) )
2020-07-16 20:39:01 +08:00
case "std_deviation_bounds_lower" :
2021-06-18 18:26:19 +08:00
value = castToFloat ( bucket . GetPath ( metric . ID , "std_deviation_bounds" , "lower" ) )
2020-07-16 20:39:01 +08:00
default :
2021-06-18 18:26:19 +08:00
value = castToFloat ( bucket . GetPath ( metric . ID , statName ) )
2018-05-23 21:09:58 +08:00
}
2022-12-12 23:00:15 +08:00
addMetricValue ( values , getMetricName ( metric . Type ) , value )
2018-05-23 21:09:58 +08:00
break
}
default :
2022-12-12 23:00:15 +08:00
metricName := getMetricName ( metric . Type )
2018-05-23 21:09:58 +08:00
otherMetrics := make ( [ ] * MetricAgg , 0 )
for _ , m := range target . Metrics {
if m . Type == metric . Type {
otherMetrics = append ( otherMetrics , m )
}
}
if len ( otherMetrics ) > 1 {
metricName += " " + metric . Field
2020-07-09 21:21:19 +08:00
if metric . Type == "bucket_script" {
2020-09-22 22:22:19 +08:00
// Use the formula in the column name
2020-07-09 21:21:19 +08:00
metricName = metric . Settings . Get ( "script" ) . MustString ( "" )
}
2018-05-23 21:09:58 +08:00
}
2021-06-18 18:26:19 +08:00
addMetricValue ( values , metricName , castToFloat ( bucket . GetPath ( metric . ID , "value" ) ) )
2018-05-23 21:09:58 +08:00
}
}
2021-06-18 18:26:19 +08:00
var dataFields [ ] * data . Field
dataFields = append ( dataFields , fields ... )
2018-05-23 21:09:58 +08:00
2021-06-18 18:26:19 +08:00
frames = data . Frames {
& data . Frame {
Fields : dataFields ,
} }
}
2021-07-15 22:45:59 +08:00
queryResult . Frames = frames
2018-05-23 21:09:58 +08:00
return nil
}
2021-06-18 18:26:19 +08:00
func extractDataField ( name string , v interface { } ) * data . Field {
switch v . ( type ) {
case * string :
return data . NewField ( name , nil , [ ] * string { } )
case * float64 :
return data . NewField ( name , nil , [ ] * float64 { } )
default :
return & data . Field { }
}
}
2022-12-12 23:00:15 +08:00
func trimDatapoints ( queryResult backend . DataResponse , target * Query ) {
2018-05-23 21:09:58 +08:00
var histogram * BucketAgg
for _ , bucketAgg := range target . BucketAggs {
2018-09-22 16:50:00 +08:00
if bucketAgg . Type == dateHistType {
2018-05-23 21:09:58 +08:00
histogram = bucketAgg
break
}
}
if histogram == nil {
return
}
2022-10-14 18:53:12 +08:00
trimEdges , err := castToInt ( histogram . Settings . Get ( "trimEdges" ) )
2018-05-23 21:09:58 +08:00
if err != nil {
return
}
2021-07-15 22:45:59 +08:00
frames := queryResult . Frames
2021-06-18 18:26:19 +08:00
for _ , frame := range frames {
for _ , field := range frame . Fields {
if field . Len ( ) > trimEdges * 2 {
2022-10-19 16:40:42 +08:00
// first we delete the first "trim" items
for i := 0 ; i < trimEdges ; i ++ {
field . Delete ( 0 )
}
// then we delete the last "trim" items
for i := 0 ; i < trimEdges ; i ++ {
field . Delete ( field . Len ( ) - 1 )
2021-06-18 18:26:19 +08:00
}
}
2018-05-23 21:09:58 +08:00
}
}
}
2023-02-02 16:03:18 +08:00
// we sort the label's pairs by the label-key,
// and return the label-values
func getSortedLabelValues ( labels data . Labels ) [ ] string {
var keys [ ] string
for key := range labels {
keys = append ( keys , key )
}
sort . Strings ( keys )
var values [ ] string
for _ , key := range keys {
values = append ( values , labels [ key ] )
}
return values
}
2022-12-12 23:00:15 +08:00
func nameFields ( queryResult backend . DataResponse , target * Query ) {
2021-03-08 14:02:49 +08:00
set := make ( map [ string ] struct { } )
2021-07-15 22:45:59 +08:00
frames := queryResult . Frames
2021-06-18 18:26:19 +08:00
for _ , v := range frames {
for _ , vv := range v . Fields {
if metricType , exists := vv . Labels [ "metric" ] ; exists {
if _ , ok := set [ metricType ] ; ! ok {
set [ metricType ] = struct { } { }
}
2018-03-25 02:18:28 +08:00
}
}
}
metricTypeCount := len ( set )
2022-12-13 20:19:03 +08:00
for _ , frame := range frames {
if frame . Meta != nil && frame . Meta . Type == data . FrameTypeTimeSeriesMulti {
// if it is a time-series-multi, it means it has two columns, one is "time",
// another is "number"
valueField := frame . Fields [ 1 ]
fieldName := getFieldName ( * valueField , target , metricTypeCount )
if fieldName != "" {
valueField . SetConfig ( & data . FieldConfig { DisplayNameFromDS : fieldName } )
}
2021-08-16 19:45:20 +08:00
}
2018-03-25 02:18:28 +08:00
}
}
2018-05-23 21:09:58 +08:00
var aliasPatternRegex = regexp . MustCompile ( ` \ { \ { ([\s\S]+?)\}\} ` )
2022-12-12 23:00:15 +08:00
func getFieldName ( dataField data . Field , target * Query , metricTypeCount int ) string {
2021-06-18 18:26:19 +08:00
metricType := dataField . Labels [ "metric" ]
2022-12-12 23:00:15 +08:00
metricName := getMetricName ( metricType )
2021-06-18 18:26:19 +08:00
delete ( dataField . Labels , "metric" )
2018-03-25 02:18:28 +08:00
field := ""
2021-06-18 18:26:19 +08:00
if v , ok := dataField . Labels [ "field" ] ; ok {
2018-03-25 02:18:28 +08:00
field = v
2021-06-18 18:26:19 +08:00
delete ( dataField . Labels , "field" )
2018-03-25 02:18:28 +08:00
}
if target . Alias != "" {
2021-06-18 18:26:19 +08:00
frameName := target . Alias
2018-03-25 02:18:28 +08:00
2018-05-23 21:09:58 +08:00
subMatches := aliasPatternRegex . FindAllStringSubmatch ( target . Alias , - 1 )
for _ , subMatch := range subMatches {
group := subMatch [ 0 ]
if len ( subMatch ) > 1 {
group = subMatch [ 1 ]
}
if strings . Index ( group , "term " ) == 0 {
2021-06-18 18:26:19 +08:00
frameName = strings . Replace ( frameName , subMatch [ 0 ] , dataField . Labels [ group [ 5 : ] ] , 1 )
2018-03-23 23:50:16 +08:00
}
2021-06-18 18:26:19 +08:00
if v , ok := dataField . Labels [ group ] ; ok {
frameName = strings . Replace ( frameName , subMatch [ 0 ] , v , 1 )
2018-03-25 02:18:28 +08:00
}
2018-05-23 21:09:58 +08:00
if group == "metric" {
2021-06-18 18:26:19 +08:00
frameName = strings . Replace ( frameName , subMatch [ 0 ] , metricName , 1 )
2018-05-23 21:09:58 +08:00
}
if group == "field" {
2021-06-18 18:26:19 +08:00
frameName = strings . Replace ( frameName , subMatch [ 0 ] , field , 1 )
2018-03-25 02:18:28 +08:00
}
}
2018-05-23 21:09:58 +08:00
2021-06-18 18:26:19 +08:00
return frameName
2018-03-25 02:18:28 +08:00
}
// todo, if field and pipelineAgg
2023-01-04 22:26:57 +08:00
if isPipelineAgg ( metricType ) {
if metricType != "" && isPipelineAggWithMultipleBucketPaths ( metricType ) {
2018-11-16 23:54:25 +08:00
metricID := ""
2021-06-18 18:26:19 +08:00
if v , ok := dataField . Labels [ "metricId" ] ; ok {
2018-11-16 23:54:25 +08:00
metricID = v
}
for _ , metric := range target . Metrics {
if metric . ID == metricID {
metricName = metric . Settings . Get ( "script" ) . MustString ( )
for name , pipelineAgg := range metric . PipelineVariables {
for _ , m := range target . Metrics {
if m . ID == pipelineAgg {
2020-09-22 22:22:19 +08:00
metricName = strings . ReplaceAll ( metricName , "params." + name , describeMetric ( m . Type , m . Field ) )
2018-11-16 23:54:25 +08:00
}
}
}
}
}
} else {
2023-01-04 22:26:57 +08:00
if field != "" {
found := false
for _ , metric := range target . Metrics {
if metric . ID == field {
metricName += " " + describeMetric ( metric . Type , field )
found = true
}
}
if ! found {
metricName = "Unset"
2018-11-16 23:54:25 +08:00
}
2018-03-26 19:48:57 +08:00
}
}
} else if field != "" {
2018-03-25 02:18:28 +08:00
metricName += " " + field
}
2021-06-18 18:26:19 +08:00
delete ( dataField . Labels , "metricId" )
2018-11-16 23:54:25 +08:00
2021-06-18 18:26:19 +08:00
if len ( dataField . Labels ) == 0 {
2018-03-25 02:18:28 +08:00
return metricName
}
name := ""
2023-02-02 16:03:18 +08:00
for _ , v := range getSortedLabelValues ( dataField . Labels ) {
2018-03-25 02:18:28 +08:00
name += v + " "
}
if metricTypeCount == 1 {
return strings . TrimSpace ( name )
}
return strings . TrimSpace ( name ) + " " + metricName
}
2022-12-12 23:00:15 +08:00
func getMetricName ( metric string ) string {
2018-03-25 02:18:28 +08:00
if text , ok := metricAggType [ metric ] ; ok {
return text
}
if text , ok := extendedStats [ metric ] ; ok {
return text
}
return metric
}
2022-10-14 18:53:12 +08:00
func castToInt ( j * simplejson . Json ) ( int , error ) {
i , err := j . Int ( )
if err == nil {
return i , nil
}
s , err := j . String ( )
if err != nil {
return 0 , err
}
v , err := strconv . Atoi ( s )
if err != nil {
return 0 , err
}
return v , nil
}
2021-06-18 18:26:19 +08:00
func castToFloat ( j * simplejson . Json ) * float64 {
2018-03-25 02:18:28 +08:00
f , err := j . Float64 ( )
if err == nil {
2021-06-18 18:26:19 +08:00
return & f
2018-03-25 02:18:28 +08:00
}
2018-05-24 04:57:46 +08:00
if s , err := j . String ( ) ; err == nil {
if strings . ToLower ( s ) == "nan" {
2021-06-18 18:26:19 +08:00
return nil
2018-05-24 04:57:46 +08:00
}
if v , err := strconv . ParseFloat ( s , 64 ) ; err == nil {
2021-06-18 18:26:19 +08:00
return & v
2018-05-24 04:57:46 +08:00
}
2018-03-25 02:18:28 +08:00
}
2021-06-18 18:26:19 +08:00
return nil
2018-03-25 02:18:28 +08:00
}
2018-05-23 21:09:58 +08:00
func findAgg ( target * Query , aggID string ) ( * BucketAgg , error ) {
2018-03-25 02:18:28 +08:00
for _ , v := range target . BucketAggs {
2018-05-23 21:09:58 +08:00
if aggID == v . ID {
2018-03-28 01:42:25 +08:00
return v , nil
2018-03-23 23:50:16 +08:00
}
}
2018-05-23 21:09:58 +08:00
return nil , errors . New ( "can't found aggDef, aggID:" + aggID )
}
2021-07-15 22:45:59 +08:00
func getErrorFromElasticResponse ( response * es . SearchResponse ) string {
var errorString string
2018-05-23 21:09:58 +08:00
json := simplejson . NewFromAny ( response . Error )
reason := json . Get ( "reason" ) . MustString ( )
rootCauseReason := json . Get ( "root_cause" ) . GetIndex ( 0 ) . Get ( "reason" ) . MustString ( )
2023-01-20 18:46:31 +08:00
causedByReason := json . Get ( "caused_by" ) . Get ( "reason" ) . MustString ( )
2018-05-23 21:09:58 +08:00
2020-07-16 20:39:01 +08:00
switch {
case rootCauseReason != "" :
2021-07-15 22:45:59 +08:00
errorString = rootCauseReason
2020-07-16 20:39:01 +08:00
case reason != "" :
2021-07-15 22:45:59 +08:00
errorString = reason
2023-01-20 18:46:31 +08:00
case causedByReason != "" :
errorString = causedByReason
2020-07-16 20:39:01 +08:00
default :
2021-07-15 22:45:59 +08:00
errorString = "Unknown elasticsearch error response"
2018-05-23 21:09:58 +08:00
}
2021-07-15 22:45:59 +08:00
return errorString
2018-03-23 23:50:16 +08:00
}
2023-02-22 20:28:43 +08:00
// flatten flattens multi-level objects to single level objects. It uses dot notation to join keys.
func flatten ( target map [ string ] interface { } ) map [ string ] interface { } {
// On frontend maxDepth wasn't used but as we are processing on backend
// let's put a limit to avoid infinite loop. 10 was chosen arbitrary.
maxDepth := 10
currentDepth := 0
delimiter := ""
output := make ( map [ string ] interface { } )
var step func ( object map [ string ] interface { } , prev string )
step = func ( object map [ string ] interface { } , prev string ) {
for key , value := range object {
if prev == "" {
delimiter = ""
} else {
delimiter = "."
}
newKey := prev + delimiter + key
v , ok := value . ( map [ string ] interface { } )
shouldStepInside := ok && len ( v ) > 0 && currentDepth < maxDepth
if shouldStepInside {
currentDepth ++
step ( v , newKey )
} else {
output [ newKey ] = value
}
}
}
step ( target , "" )
return output
}
2023-03-01 18:50:56 +08:00
// sortPropNames orders propNames so that timeField is first (if it exists), log message field is second
// if shouldSortLogMessageField is true, and rest of propNames are ordered alphabetically
func sortPropNames ( propNames map [ string ] bool , configuredFields es . ConfiguredFields , shouldSortLogMessageField bool ) [ ] string {
2023-02-22 20:28:43 +08:00
hasTimeField := false
2023-03-01 18:50:56 +08:00
hasLogMessageField := false
2023-02-22 20:28:43 +08:00
var sortedPropNames [ ] string
for k := range propNames {
2023-03-01 18:50:56 +08:00
if configuredFields . TimeField != "" && k == configuredFields . TimeField {
2023-02-22 20:28:43 +08:00
hasTimeField = true
2023-03-01 18:50:56 +08:00
} else if shouldSortLogMessageField && configuredFields . LogMessageField != "" && k == configuredFields . LogMessageField {
hasLogMessageField = true
2023-02-22 20:28:43 +08:00
} else {
sortedPropNames = append ( sortedPropNames , k )
}
}
sort . Strings ( sortedPropNames )
2023-03-01 18:50:56 +08:00
if hasLogMessageField {
sortedPropNames = append ( [ ] string { configuredFields . LogMessageField } , sortedPropNames ... )
}
2023-02-22 20:28:43 +08:00
if hasTimeField {
2023-03-01 18:50:56 +08:00
sortedPropNames = append ( [ ] string { configuredFields . TimeField } , sortedPropNames ... )
2023-02-22 20:28:43 +08:00
}
return sortedPropNames
}
// findTheFirstNonNilDocValueForPropName finds the first non-nil value for propName in docs. If none of the values are non-nil, it returns the value of propName in the first doc.
func findTheFirstNonNilDocValueForPropName ( docs [ ] map [ string ] interface { } , propName string ) interface { } {
for _ , doc := range docs {
if doc [ propName ] != nil {
return doc [ propName ]
}
}
return docs [ 0 ] [ propName ]
}
func createFieldOfType [ T int | float64 | bool | string ] ( docs [ ] map [ string ] interface { } , propName string , size int , isFilterable bool ) * data . Field {
fieldVector := make ( [ ] * T , size )
for i , doc := range docs {
value , ok := doc [ propName ] . ( T )
if ! ok {
continue
}
fieldVector [ i ] = & value
}
field := data . NewField ( propName , nil , fieldVector )
field . Config = & data . FieldConfig { Filterable : & isFilterable }
return field
}
2023-03-01 18:50:56 +08:00
func setPreferredVisType ( frame * data . Frame , visType data . VisType ) {
if frame . Meta == nil {
frame . Meta = & data . FrameMeta { }
}
frame . Meta . PreferredVisualization = visType
}