2018-03-23 23:50:16 +08:00
package elasticsearch
import (
2023-02-22 20:28:43 +08:00
"encoding/json"
2018-03-23 23:50:16 +08:00
"errors"
2023-03-09 20:34:53 +08:00
"fmt"
2018-03-25 02:18:28 +08:00
"regexp"
2018-05-23 21:09:58 +08:00
"sort"
2018-03-26 19:48:57 +08:00
"strconv"
2018-03-25 02:18:28 +08:00
"strings"
2021-06-18 18:26:19 +08:00
"time"
2018-05-23 21:09:58 +08:00
2021-07-15 22:45:59 +08:00
"github.com/grafana/grafana-plugin-sdk-go/backend"
2021-06-18 18:26:19 +08:00
"github.com/grafana/grafana-plugin-sdk-go/data"
2023-01-30 16:50:27 +08:00
2018-05-23 21:09:58 +08:00
"github.com/grafana/grafana/pkg/components/simplejson"
2020-06-29 20:08:32 +08:00
es "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client"
2018-03-23 23:50:16 +08:00
)
2018-09-22 16:50:00 +08:00
const (
// Metric types
countType = "count"
percentilesType = "percentiles"
extendedStatsType = "extended_stats"
2021-06-04 18:07:59 +08:00
topMetricsType = "top_metrics"
2018-09-22 16:50:00 +08:00
// Bucket types
dateHistType = "date_histogram"
2023-01-27 23:18:36 +08:00
nestedType = "nested"
2018-09-22 16:50:00 +08:00
histogramType = "histogram"
filtersType = "filters"
termsType = "terms"
geohashGridType = "geohash_grid"
2022-12-22 22:08:15 +08:00
// Document types
rawDocumentType = "raw_document"
rawDataType = "raw_data"
// Logs type
logsType = "logs"
2018-09-22 16:50:00 +08:00
)
2023-03-09 20:34:53 +08:00
var searchWordsRegex = regexp . MustCompile ( regexp . QuoteMeta ( es . HighlightPreTagsString ) + ` (.*?) ` + regexp . QuoteMeta ( es . HighlightPostTagsString ) )
2023-03-01 18:50:56 +08:00
func parseResponse ( responses [ ] * es . SearchResponse , targets [ ] * Query , configuredFields es . ConfiguredFields ) ( * backend . QueryDataResponse , error ) {
2021-07-15 22:45:59 +08:00
result := backend . QueryDataResponse {
Responses : backend . Responses { } ,
2021-03-08 14:02:49 +08:00
}
2022-12-12 23:00:15 +08:00
if responses == nil {
2021-07-15 22:45:59 +08:00
return & result , nil
2018-05-23 21:09:58 +08:00
}
2022-12-12 23:00:15 +08:00
for i , res := range responses {
target := targets [ i ]
2018-05-23 21:09:58 +08:00
if res . Error != nil {
2021-07-15 22:45:59 +08:00
errResult := getErrorFromElasticResponse ( res )
result . Responses [ target . RefID ] = backend . DataResponse {
Error : errors . New ( errResult ) ,
2022-12-05 17:21:15 +08:00
}
2018-05-23 21:09:58 +08:00
continue
}
2021-07-15 22:45:59 +08:00
queryRes := backend . DataResponse { }
2023-03-02 16:50:54 +08:00
if isRawDataQuery ( target ) {
err := processRawDataResponse ( res , target , configuredFields , & queryRes )
if err != nil {
return & backend . QueryDataResponse { } , err
}
result . Responses [ target . RefID ] = queryRes
} else if isRawDocumentQuery ( target ) {
err := processRawDocumentResponse ( res , target , & queryRes )
2023-03-01 18:50:56 +08:00
if err != nil {
return & backend . QueryDataResponse { } , err
}
result . Responses [ target . RefID ] = queryRes
} else if isLogsQuery ( target ) {
err := processLogsResponse ( res , target , configuredFields , & queryRes )
2023-02-22 20:28:43 +08:00
if err != nil {
return & backend . QueryDataResponse { } , err
}
result . Responses [ target . RefID ] = queryRes
} else {
2023-03-01 18:50:56 +08:00
// Process as metric query result
2023-02-22 20:28:43 +08:00
props := make ( map [ string ] string )
err := processBuckets ( res . Aggregations , target , & queryRes , props , 0 )
if err != nil {
return & backend . QueryDataResponse { } , err
}
2023-04-11 19:29:38 +08:00
nameFields ( queryRes , target )
2023-02-22 20:28:43 +08:00
trimDatapoints ( queryRes , target )
2018-05-23 21:09:58 +08:00
2023-02-22 20:28:43 +08:00
result . Responses [ target . RefID ] = queryRes
}
2018-03-23 23:50:16 +08:00
}
2021-07-15 22:45:59 +08:00
return & result , nil
2018-03-23 23:50:16 +08:00
}
2023-03-01 18:50:56 +08:00
func processLogsResponse ( res * es . SearchResponse , target * Query , configuredFields es . ConfiguredFields , queryRes * backend . DataResponse ) error {
propNames := make ( map [ string ] bool )
2023-02-22 20:28:43 +08:00
docs := make ( [ ] map [ string ] interface { } , len ( res . Hits . Hits ) )
2023-03-09 20:34:53 +08:00
searchWords := make ( map [ string ] bool )
2023-03-01 18:50:56 +08:00
for hitIdx , hit := range res . Hits . Hits {
var flattened map [ string ] interface { }
if hit [ "_source" ] != nil {
flattened = flatten ( hit [ "_source" ] . ( map [ string ] interface { } ) )
}
doc := map [ string ] interface { } {
"_id" : hit [ "_id" ] ,
"_type" : hit [ "_type" ] ,
"_index" : hit [ "_index" ] ,
"sort" : hit [ "sort" ] ,
"highlight" : hit [ "highlight" ] ,
"_source" : flattened ,
}
for k , v := range flattened {
if configuredFields . LogLevelField != "" && k == configuredFields . LogLevelField {
doc [ "level" ] = v
} else {
doc [ k ] = v
}
}
2023-05-05 01:33:00 +08:00
if hit [ "fields" ] != nil {
source , ok := hit [ "fields" ] . ( map [ string ] interface { } )
if ok {
for k , v := range source {
doc [ k ] = v
}
}
}
2023-03-01 18:50:56 +08:00
for key := range doc {
propNames [ key ] = true
}
2023-03-09 20:34:53 +08:00
// Process highlight to searchWords
if highlights , ok := doc [ "highlight" ] . ( map [ string ] interface { } ) ; ok {
for _ , highlight := range highlights {
if highlightList , ok := highlight . ( [ ] interface { } ) ; ok {
for _ , highlightValue := range highlightList {
str := fmt . Sprintf ( "%v" , highlightValue )
matches := searchWordsRegex . FindAllStringSubmatch ( str , - 1 )
for _ , v := range matches {
searchWords [ v [ 1 ] ] = true
}
}
}
}
}
2023-03-01 18:50:56 +08:00
docs [ hitIdx ] = doc
}
sortedPropNames := sortPropNames ( propNames , configuredFields , true )
fields := processDocsToDataFrameFields ( docs , sortedPropNames , configuredFields )
frames := data . Frames { }
frame := data . NewFrame ( "" , fields ... )
2023-03-10 00:20:36 +08:00
setPreferredVisType ( frame , data . VisTypeLogs )
2023-03-09 20:34:53 +08:00
setSearchWords ( frame , searchWords )
2023-03-01 18:50:56 +08:00
frames = append ( frames , frame )
queryRes . Frames = frames
return nil
}
2023-03-02 16:50:54 +08:00
func processRawDataResponse ( res * es . SearchResponse , target * Query , configuredFields es . ConfiguredFields , queryRes * backend . DataResponse ) error {
2023-02-22 20:28:43 +08:00
propNames := make ( map [ string ] bool )
2023-03-01 18:50:56 +08:00
docs := make ( [ ] map [ string ] interface { } , len ( res . Hits . Hits ) )
2023-02-22 20:28:43 +08:00
for hitIdx , hit := range res . Hits . Hits {
var flattened map [ string ] interface { }
if hit [ "_source" ] != nil {
flattened = flatten ( hit [ "_source" ] . ( map [ string ] interface { } ) )
}
doc := map [ string ] interface { } {
"_id" : hit [ "_id" ] ,
"_type" : hit [ "_type" ] ,
"_index" : hit [ "_index" ] ,
"sort" : hit [ "sort" ] ,
"highlight" : hit [ "highlight" ] ,
}
for k , v := range flattened {
doc [ k ] = v
}
for key := range doc {
propNames [ key ] = true
}
docs [ hitIdx ] = doc
}
2023-03-01 18:50:56 +08:00
sortedPropNames := sortPropNames ( propNames , configuredFields , false )
fields := processDocsToDataFrameFields ( docs , sortedPropNames , configuredFields )
frames := data . Frames { }
frame := data . NewFrame ( "" , fields ... )
frames = append ( frames , frame )
queryRes . Frames = frames
2023-03-02 16:50:54 +08:00
return nil
}
func processRawDocumentResponse ( res * es . SearchResponse , target * Query , queryRes * backend . DataResponse ) error {
docs := make ( [ ] map [ string ] interface { } , len ( res . Hits . Hits ) )
for hitIdx , hit := range res . Hits . Hits {
doc := map [ string ] interface { } {
"_id" : hit [ "_id" ] ,
"_type" : hit [ "_type" ] ,
"_index" : hit [ "_index" ] ,
"sort" : hit [ "sort" ] ,
"highlight" : hit [ "highlight" ] ,
}
if hit [ "_source" ] != nil {
source , ok := hit [ "_source" ] . ( map [ string ] interface { } )
if ok {
for k , v := range source {
doc [ k ] = v
}
}
}
if hit [ "fields" ] != nil {
source , ok := hit [ "fields" ] . ( map [ string ] interface { } )
if ok {
for k , v := range source {
doc [ k ] = v
}
}
}
docs [ hitIdx ] = doc
}
fieldVector := make ( [ ] * json . RawMessage , len ( res . Hits . Hits ) )
for i , doc := range docs {
bytes , err := json . Marshal ( doc )
if err != nil {
// We skip docs that can't be marshalled
// should not happen
continue
}
value := json . RawMessage ( bytes )
fieldVector [ i ] = & value
}
isFilterable := true
field := data . NewField ( target . RefID , nil , fieldVector )
field . Config = & data . FieldConfig { Filterable : & isFilterable }
frames := data . Frames { }
frame := data . NewFrame ( target . RefID , field )
frames = append ( frames , frame )
queryRes . Frames = frames
2023-03-01 18:50:56 +08:00
return nil
}
func processDocsToDataFrameFields ( docs [ ] map [ string ] interface { } , propNames [ ] string , configuredFields es . ConfiguredFields ) [ ] * data . Field {
2023-02-22 20:28:43 +08:00
size := len ( docs )
isFilterable := true
allFields := make ( [ ] * data . Field , len ( propNames ) )
2023-05-05 01:33:00 +08:00
timeString := ""
timeStringOk := false
2023-02-22 20:28:43 +08:00
2023-03-01 18:50:56 +08:00
for propNameIdx , propName := range propNames {
2023-02-22 20:28:43 +08:00
// Special handling for time field
2023-03-01 18:50:56 +08:00
if propName == configuredFields . TimeField {
2023-02-22 20:28:43 +08:00
timeVector := make ( [ ] * time . Time , size )
for i , doc := range docs {
2023-05-05 01:33:00 +08:00
// Check if time field is a string
timeString , timeStringOk = doc [ configuredFields . TimeField ] . ( string )
// If not, it might be an array with one time string
if ! timeStringOk {
timeList , ok := doc [ configuredFields . TimeField ] . ( [ ] interface { } )
if ! ok || len ( timeList ) != 1 {
continue
}
// Check if the first element is a string
timeString , timeStringOk = timeList [ 0 ] . ( string )
if ! timeStringOk {
continue
}
2023-02-22 20:28:43 +08:00
}
timeValue , err := time . Parse ( time . RFC3339Nano , timeString )
if err != nil {
// We skip time values that cannot be parsed
continue
} else {
timeVector [ i ] = & timeValue
}
}
2023-03-01 18:50:56 +08:00
field := data . NewField ( configuredFields . TimeField , nil , timeVector )
2023-02-22 20:28:43 +08:00
field . Config = & data . FieldConfig { Filterable : & isFilterable }
allFields [ propNameIdx ] = field
continue
}
propNameValue := findTheFirstNonNilDocValueForPropName ( docs , propName )
switch propNameValue . ( type ) {
// We are checking for default data types values (float64, int, bool, string)
// and default to json.RawMessage if we cannot find any of them
case float64 :
allFields [ propNameIdx ] = createFieldOfType [ float64 ] ( docs , propName , size , isFilterable )
case int :
allFields [ propNameIdx ] = createFieldOfType [ int ] ( docs , propName , size , isFilterable )
case string :
allFields [ propNameIdx ] = createFieldOfType [ string ] ( docs , propName , size , isFilterable )
case bool :
allFields [ propNameIdx ] = createFieldOfType [ bool ] ( docs , propName , size , isFilterable )
default :
fieldVector := make ( [ ] * json . RawMessage , size )
for i , doc := range docs {
bytes , err := json . Marshal ( doc [ propName ] )
if err != nil {
// We skip values that cannot be marshalled
continue
}
value := json . RawMessage ( bytes )
fieldVector [ i ] = & value
}
field := data . NewField ( propName , nil , fieldVector )
field . Config = & data . FieldConfig { Filterable : & isFilterable }
allFields [ propNameIdx ] = field
}
}
2023-03-01 18:50:56 +08:00
return allFields
2023-02-22 20:28:43 +08:00
}
2022-12-12 23:00:15 +08:00
func processBuckets ( aggs map [ string ] interface { } , target * Query ,
2021-07-15 22:45:59 +08:00
queryResult * backend . DataResponse , props map [ string ] string , depth int ) error {
2018-03-25 02:18:28 +08:00
var err error
2018-03-23 23:50:16 +08:00
maxDepth := len ( target . BucketAggs ) - 1
2018-05-23 21:09:58 +08:00
aggIDs := make ( [ ] string , 0 )
for k := range aggs {
aggIDs = append ( aggIDs , k )
}
sort . Strings ( aggIDs )
for _ , aggID := range aggIDs {
v := aggs [ aggID ]
aggDef , _ := findAgg ( target , aggID )
2018-03-23 23:50:16 +08:00
esAgg := simplejson . NewFromAny ( v )
if aggDef == nil {
continue
}
2023-01-27 23:18:36 +08:00
if aggDef . Type == nestedType {
err = processBuckets ( esAgg . MustMap ( ) , target , queryResult , props , depth + 1 )
if err != nil {
return err
}
continue
}
2018-03-23 23:50:16 +08:00
if depth == maxDepth {
2018-09-22 16:50:00 +08:00
if aggDef . Type == dateHistType {
2022-12-12 23:00:15 +08:00
err = processMetrics ( esAgg , target , queryResult , props )
2018-03-25 02:18:28 +08:00
} else {
2022-12-12 23:00:15 +08:00
err = processAggregationDocs ( esAgg , aggDef , target , queryResult , props )
2018-05-23 21:09:58 +08:00
}
if err != nil {
return err
2018-03-23 23:50:16 +08:00
}
2018-03-25 02:18:28 +08:00
} else {
2018-05-23 21:09:58 +08:00
for _ , b := range esAgg . Get ( "buckets" ) . MustArray ( ) {
2018-03-25 02:18:28 +08:00
bucket := simplejson . NewFromAny ( b )
2018-09-21 02:05:19 +08:00
newProps := make ( map [ string ] string )
2018-05-23 21:09:58 +08:00
for k , v := range props {
newProps [ k ] = v
}
2018-03-25 02:18:28 +08:00
if key , err := bucket . Get ( "key" ) . String ( ) ; err == nil {
2018-03-28 01:42:25 +08:00
newProps [ aggDef . Field ] = key
2018-05-23 21:09:58 +08:00
} else if key , err := bucket . Get ( "key" ) . Int64 ( ) ; err == nil {
newProps [ aggDef . Field ] = strconv . FormatInt ( key , 10 )
2018-03-25 02:18:28 +08:00
}
2018-03-23 23:50:16 +08:00
2018-03-25 02:18:28 +08:00
if key , err := bucket . Get ( "key_as_string" ) . String ( ) ; err == nil {
2018-05-23 21:09:58 +08:00
newProps [ aggDef . Field ] = key
}
2022-12-12 23:00:15 +08:00
err = processBuckets ( bucket . MustMap ( ) , target , queryResult , newProps , depth + 1 )
2018-05-23 21:09:58 +08:00
if err != nil {
return err
}
}
2018-06-04 21:15:47 +08:00
buckets := esAgg . Get ( "buckets" ) . MustMap ( )
bucketKeys := make ( [ ] string , 0 )
for k := range buckets {
bucketKeys = append ( bucketKeys , k )
}
sort . Strings ( bucketKeys )
for _ , bucketKey := range bucketKeys {
bucket := simplejson . NewFromAny ( buckets [ bucketKey ] )
2018-09-21 02:05:19 +08:00
newProps := make ( map [ string ] string )
2018-05-23 21:09:58 +08:00
for k , v := range props {
newProps [ k ] = v
}
2018-06-04 21:15:47 +08:00
newProps [ "filter" ] = bucketKey
2018-05-23 21:09:58 +08:00
2022-12-12 23:00:15 +08:00
err = processBuckets ( bucket . MustMap ( ) , target , queryResult , newProps , depth + 1 )
2018-05-23 21:09:58 +08:00
if err != nil {
return err
2018-03-25 02:18:28 +08:00
}
}
}
2018-03-23 23:50:16 +08:00
}
2018-03-25 02:18:28 +08:00
return nil
2018-03-23 23:50:16 +08:00
}
2022-12-13 20:19:03 +08:00
func newTimeSeriesFrame ( timeData [ ] time . Time , tags map [ string ] string , values [ ] * float64 ) * data . Frame {
frame := data . NewFrame ( "" ,
2023-03-08 20:32:01 +08:00
data . NewField ( data . TimeSeriesTimeFieldName , nil , timeData ) ,
data . NewField ( data . TimeSeriesValueFieldName , tags , values ) )
2022-12-13 20:19:03 +08:00
frame . Meta = & data . FrameMeta {
Type : data . FrameTypeTimeSeriesMulti ,
}
return frame
}
2023-03-22 16:52:55 +08:00
func processCountMetric ( buckets [ ] * simplejson . Json , props map [ string ] string ) ( data . Frames , error ) {
tags := make ( map [ string ] string , len ( props ) )
timeVector := make ( [ ] time . Time , 0 , len ( buckets ) )
values := make ( [ ] * float64 , 0 , len ( buckets ) )
for _ , bucket := range buckets {
value := castToFloat ( bucket . Get ( "doc_count" ) )
timeValue , err := getAsTime ( bucket . Get ( "key" ) )
if err != nil {
return nil , err
}
timeVector = append ( timeVector , timeValue )
values = append ( values , value )
}
for k , v := range props {
tags [ k ] = v
}
tags [ "metric" ] = countType
return data . Frames { newTimeSeriesFrame ( timeVector , tags , values ) } , nil
}
func processPercentilesMetric ( metric * MetricAgg , buckets [ ] * simplejson . Json , props map [ string ] string ) ( data . Frames , error ) {
if len ( buckets ) == 0 {
return data . Frames { } , nil
}
firstBucket := buckets [ 0 ]
percentiles := firstBucket . GetPath ( metric . ID , "values" ) . MustMap ( )
percentileKeys := make ( [ ] string , 0 )
for k := range percentiles {
percentileKeys = append ( percentileKeys , k )
}
sort . Strings ( percentileKeys )
2021-06-18 18:26:19 +08:00
frames := data . Frames { }
2023-03-22 16:52:55 +08:00
for _ , percentileName := range percentileKeys {
tags := make ( map [ string ] string , len ( props ) )
timeVector := make ( [ ] time . Time , 0 , len ( buckets ) )
values := make ( [ ] * float64 , 0 , len ( buckets ) )
for k , v := range props {
tags [ k ] = v
}
tags [ "metric" ] = "p" + percentileName
tags [ "field" ] = metric . Field
for _ , bucket := range buckets {
value := castToFloat ( bucket . GetPath ( metric . ID , "values" , percentileName ) )
key := bucket . Get ( "key" )
timeValue , err := getAsTime ( key )
if err != nil {
return nil , err
}
timeVector = append ( timeVector , timeValue )
values = append ( values , value )
2018-03-23 23:50:16 +08:00
}
2023-03-22 16:52:55 +08:00
frames = append ( frames , newTimeSeriesFrame ( timeVector , tags , values ) )
}
2018-03-25 02:18:28 +08:00
2023-03-22 16:52:55 +08:00
return frames , nil
}
func processTopMetricsMetric ( metric * MetricAgg , buckets [ ] * simplejson . Json , props map [ string ] string ) ( data . Frames , error ) {
metrics := metric . Settings . Get ( "metrics" ) . MustArray ( )
frames := data . Frames { }
for _ , metricField := range metrics {
2021-06-18 18:26:19 +08:00
tags := make ( map [ string ] string , len ( props ) )
2023-03-22 16:52:55 +08:00
timeVector := make ( [ ] time . Time , 0 , len ( buckets ) )
values := make ( [ ] * float64 , 0 , len ( buckets ) )
for k , v := range props {
tags [ k ] = v
}
2021-06-18 18:26:19 +08:00
2023-03-22 16:52:55 +08:00
tags [ "field" ] = metricField . ( string )
tags [ "metric" ] = "top_metrics"
2018-05-23 21:09:58 +08:00
2023-03-22 16:52:55 +08:00
for _ , bucket := range buckets {
stats := bucket . GetPath ( metric . ID , "top" )
timeValue , err := getAsTime ( bucket . Get ( "key" ) )
if err != nil {
return nil , err
2018-03-23 23:50:16 +08:00
}
2023-03-22 16:52:55 +08:00
timeVector = append ( timeVector , timeValue )
2018-03-23 23:50:16 +08:00
2023-03-22 16:52:55 +08:00
for _ , stat := range stats . MustArray ( ) {
stat := stat . ( map [ string ] interface { } )
2018-03-23 23:50:16 +08:00
2023-03-22 16:52:55 +08:00
metrics , hasMetrics := stat [ "metrics" ]
if hasMetrics {
metrics := metrics . ( map [ string ] interface { } )
metricValue , hasMetricValue := metrics [ metricField . ( string ) ]
2021-06-18 18:26:19 +08:00
2023-03-22 16:52:55 +08:00
if hasMetricValue && metricValue != nil {
v := metricValue . ( float64 )
values = append ( values , & v )
}
2018-03-25 02:18:28 +08:00
}
}
2023-03-22 16:52:55 +08:00
}
2021-06-18 18:26:19 +08:00
2023-03-22 16:52:55 +08:00
frames = append ( frames , newTimeSeriesFrame ( timeVector , tags , values ) )
}
2021-06-18 18:26:19 +08:00
2023-03-22 16:52:55 +08:00
return frames , nil
}
2021-06-18 18:26:19 +08:00
2023-03-22 16:52:55 +08:00
func processExtendedStatsMetric ( metric * MetricAgg , buckets [ ] * simplejson . Json , props map [ string ] string ) ( data . Frames , error ) {
metaKeys := make ( [ ] string , 0 )
meta := metric . Meta . MustMap ( )
for k := range meta {
metaKeys = append ( metaKeys , k )
}
sort . Strings ( metaKeys )
2021-06-18 18:26:19 +08:00
2023-03-22 16:52:55 +08:00
frames := data . Frames { }
2021-06-18 18:26:19 +08:00
2023-03-22 16:52:55 +08:00
for _ , statName := range metaKeys {
v := meta [ statName ]
if enabled , ok := v . ( bool ) ; ! ok || ! enabled {
continue
}
2021-06-18 18:26:19 +08:00
2023-03-22 16:52:55 +08:00
tags := make ( map [ string ] string , len ( props ) )
timeVector := make ( [ ] time . Time , 0 , len ( buckets ) )
values := make ( [ ] * float64 , 0 , len ( buckets ) )
2021-06-18 18:26:19 +08:00
2023-03-22 16:52:55 +08:00
for k , v := range props {
tags [ k ] = v
}
tags [ "metric" ] = statName
tags [ "field" ] = metric . Field
2021-06-18 18:26:19 +08:00
2023-03-22 16:52:55 +08:00
for _ , bucket := range buckets {
timeValue , err := getAsTime ( bucket . Get ( "key" ) )
if err != nil {
return nil , err
}
var value * float64
switch statName {
case "std_deviation_bounds_upper" :
value = castToFloat ( bucket . GetPath ( metric . ID , "std_deviation_bounds" , "upper" ) )
case "std_deviation_bounds_lower" :
value = castToFloat ( bucket . GetPath ( metric . ID , "std_deviation_bounds" , "lower" ) )
default :
value = castToFloat ( bucket . GetPath ( metric . ID , statName ) )
2021-06-18 18:26:19 +08:00
}
2023-03-22 16:52:55 +08:00
timeVector = append ( timeVector , timeValue )
values = append ( values , value )
}
labels := tags
frames = append ( frames , newTimeSeriesFrame ( timeVector , labels , values ) )
}
2021-06-04 18:07:59 +08:00
2023-03-22 16:52:55 +08:00
return frames , nil
}
2018-05-23 21:09:58 +08:00
2023-03-22 16:52:55 +08:00
func processDefaultMetric ( metric * MetricAgg , buckets [ ] * simplejson . Json , props map [ string ] string ) ( data . Frames , error ) {
tags := make ( map [ string ] string , len ( props ) )
timeVector := make ( [ ] time . Time , 0 , len ( buckets ) )
values := make ( [ ] * float64 , 0 , len ( buckets ) )
2018-05-23 21:09:58 +08:00
2023-03-22 16:52:55 +08:00
for k , v := range props {
tags [ k ] = v
}
2021-06-18 18:26:19 +08:00
2023-03-22 16:52:55 +08:00
tags [ "metric" ] = metric . Type
tags [ "field" ] = metric . Field
tags [ "metricId" ] = metric . ID
for _ , bucket := range buckets {
timeValue , err := getAsTime ( bucket . Get ( "key" ) )
if err != nil {
return nil , err
}
valueObj , err := bucket . Get ( metric . ID ) . Map ( )
if err != nil {
continue
}
var value * float64
if _ , ok := valueObj [ "normalized_value" ] ; ok {
value = castToFloat ( bucket . GetPath ( metric . ID , "normalized_value" ) )
} else {
value = castToFloat ( bucket . GetPath ( metric . ID , "value" ) )
}
timeVector = append ( timeVector , timeValue )
values = append ( values , value )
}
return data . Frames { newTimeSeriesFrame ( timeVector , tags , values ) } , nil
}
// nolint:gocyclo
func processMetrics ( esAgg * simplejson . Json , target * Query , query * backend . DataResponse ,
props map [ string ] string ) error {
frames := data . Frames { }
esAggBuckets := esAgg . Get ( "buckets" ) . MustArray ( )
jsonBuckets := make ( [ ] * simplejson . Json , len ( esAggBuckets ) )
for i , v := range esAggBuckets {
jsonBuckets [ i ] = simplejson . NewFromAny ( v )
}
for _ , metric := range target . Metrics {
if metric . Hide {
continue
}
switch metric . Type {
case countType :
countFrames , err := processCountMetric ( jsonBuckets , props )
if err != nil {
return err
2018-05-23 21:09:58 +08:00
}
2023-03-22 16:52:55 +08:00
frames = append ( frames , countFrames ... )
case percentilesType :
percentileFrames , err := processPercentilesMetric ( metric , jsonBuckets , props )
if err != nil {
return err
}
frames = append ( frames , percentileFrames ... )
case topMetricsType :
topMetricsFrames , err := processTopMetricsMetric ( metric , jsonBuckets , props )
if err != nil {
return err
}
frames = append ( frames , topMetricsFrames ... )
case extendedStatsType :
extendedStatsFrames , err := processExtendedStatsMetric ( metric , jsonBuckets , props )
if err != nil {
return err
2018-03-28 12:35:05 +08:00
}
2023-03-22 16:52:55 +08:00
frames = append ( frames , extendedStatsFrames ... )
default :
defaultFrames , err := processDefaultMetric ( metric , jsonBuckets , props )
if err != nil {
return err
2018-03-25 02:18:28 +08:00
}
2023-03-22 16:52:55 +08:00
frames = append ( frames , defaultFrames ... )
2021-06-18 18:26:19 +08:00
}
}
2021-07-15 22:45:59 +08:00
if query . Frames != nil {
oldFrames := query . Frames
2021-06-18 18:26:19 +08:00
frames = append ( oldFrames , frames ... )
2018-03-25 02:18:28 +08:00
}
2021-07-15 22:45:59 +08:00
query . Frames = frames
2018-03-25 02:18:28 +08:00
return nil
}
2022-12-12 23:00:15 +08:00
func processAggregationDocs ( esAgg * simplejson . Json , aggDef * BucketAgg , target * Query ,
2021-07-15 22:45:59 +08:00
queryResult * backend . DataResponse , props map [ string ] string ) error {
2023-03-22 16:36:43 +08:00
propKeys := createPropKeys ( props )
2021-06-18 18:26:19 +08:00
frames := data . Frames { }
2023-03-23 00:53:09 +08:00
fields := createFields ( queryResult . Frames , propKeys )
2018-05-23 21:09:58 +08:00
for _ , v := range esAgg . Get ( "buckets" ) . MustArray ( ) {
bucket := simplejson . NewFromAny ( v )
2021-06-18 18:26:19 +08:00
var values [ ] interface { }
2018-05-23 21:09:58 +08:00
2021-06-18 18:26:19 +08:00
found := false
2023-03-20 23:32:19 +08:00
for _ , field := range fields {
2021-06-18 18:26:19 +08:00
for _ , propKey := range propKeys {
2023-03-20 23:32:19 +08:00
if field . Name == propKey {
2023-03-23 00:53:09 +08:00
value := props [ propKey ]
field . Append ( & value )
2021-06-18 18:26:19 +08:00
}
}
2023-03-20 23:32:19 +08:00
if field . Name == aggDef . Field {
2021-06-18 18:26:19 +08:00
found = true
if key , err := bucket . Get ( "key" ) . String ( ) ; err == nil {
2023-03-20 23:32:19 +08:00
field . Append ( & key )
2021-06-18 18:26:19 +08:00
} else {
f , err := bucket . Get ( "key" ) . Float64 ( )
if err != nil {
return err
}
2023-03-20 23:32:19 +08:00
field . Append ( & f )
2021-06-18 18:26:19 +08:00
}
}
2018-05-23 21:09:58 +08:00
}
2021-06-18 18:26:19 +08:00
if ! found {
var aggDefField * data . Field
if key , err := bucket . Get ( "key" ) . String ( ) ; err == nil {
aggDefField = extractDataField ( aggDef . Field , & key )
aggDefField . Append ( & key )
} else {
f , err := bucket . Get ( "key" ) . Float64 ( )
if err != nil {
return err
}
aggDefField = extractDataField ( aggDef . Field , & f )
aggDefField . Append ( & f )
}
fields = append ( fields , aggDefField )
2018-05-23 21:09:58 +08:00
}
for _ , metric := range target . Metrics {
switch metric . Type {
2018-09-22 16:50:00 +08:00
case countType :
2023-03-22 16:36:43 +08:00
addMetricValueToFields ( & fields , values , getMetricName ( metric . Type ) , castToFloat ( bucket . Get ( "doc_count" ) ) )
2018-09-22 16:50:00 +08:00
case extendedStatsType :
2023-03-22 16:36:43 +08:00
addExtendedStatsToFields ( & fields , bucket , metric , values )
2023-03-20 23:32:19 +08:00
case percentilesType :
2023-03-22 16:36:43 +08:00
addPercentilesToFields ( & fields , bucket , metric , values )
case topMetricsType :
addTopMetricsToFields ( & fields , bucket , metric , values )
2018-05-23 21:09:58 +08:00
default :
2023-03-22 16:36:43 +08:00
addOtherMetricsToFields ( & fields , bucket , metric , values , target )
2018-05-23 21:09:58 +08:00
}
}
2021-06-18 18:26:19 +08:00
var dataFields [ ] * data . Field
dataFields = append ( dataFields , fields ... )
2018-05-23 21:09:58 +08:00
2021-06-18 18:26:19 +08:00
frames = data . Frames {
& data . Frame {
Fields : dataFields ,
} }
}
2021-07-15 22:45:59 +08:00
queryResult . Frames = frames
2018-05-23 21:09:58 +08:00
return nil
}
2021-06-18 18:26:19 +08:00
func extractDataField ( name string , v interface { } ) * data . Field {
2023-03-13 17:11:18 +08:00
var field * data . Field
2021-06-18 18:26:19 +08:00
switch v . ( type ) {
case * string :
2023-03-13 17:11:18 +08:00
field = data . NewField ( name , nil , [ ] * string { } )
2021-06-18 18:26:19 +08:00
case * float64 :
2023-03-13 17:11:18 +08:00
field = data . NewField ( name , nil , [ ] * float64 { } )
2021-06-18 18:26:19 +08:00
default :
2023-03-13 17:11:18 +08:00
field = & data . Field { }
2021-06-18 18:26:19 +08:00
}
2023-03-13 17:11:18 +08:00
isFilterable := true
field . Config = & data . FieldConfig { Filterable : & isFilterable }
return field
2021-06-18 18:26:19 +08:00
}
2022-12-12 23:00:15 +08:00
func trimDatapoints ( queryResult backend . DataResponse , target * Query ) {
2018-05-23 21:09:58 +08:00
var histogram * BucketAgg
for _ , bucketAgg := range target . BucketAggs {
2018-09-22 16:50:00 +08:00
if bucketAgg . Type == dateHistType {
2018-05-23 21:09:58 +08:00
histogram = bucketAgg
break
}
}
if histogram == nil {
return
}
2022-10-14 18:53:12 +08:00
trimEdges , err := castToInt ( histogram . Settings . Get ( "trimEdges" ) )
2018-05-23 21:09:58 +08:00
if err != nil {
return
}
2021-07-15 22:45:59 +08:00
frames := queryResult . Frames
2021-06-18 18:26:19 +08:00
for _ , frame := range frames {
for _ , field := range frame . Fields {
if field . Len ( ) > trimEdges * 2 {
2022-10-19 16:40:42 +08:00
// first we delete the first "trim" items
for i := 0 ; i < trimEdges ; i ++ {
field . Delete ( 0 )
}
// then we delete the last "trim" items
for i := 0 ; i < trimEdges ; i ++ {
field . Delete ( field . Len ( ) - 1 )
2021-06-18 18:26:19 +08:00
}
}
2018-05-23 21:09:58 +08:00
}
}
}
2023-02-02 16:03:18 +08:00
// we sort the label's pairs by the label-key,
// and return the label-values
func getSortedLabelValues ( labels data . Labels ) [ ] string {
var keys [ ] string
for key := range labels {
keys = append ( keys , key )
}
sort . Strings ( keys )
var values [ ] string
for _ , key := range keys {
values = append ( values , labels [ key ] )
}
return values
}
2023-04-11 19:29:38 +08:00
func nameFields ( queryResult backend . DataResponse , target * Query ) {
2021-03-08 14:02:49 +08:00
set := make ( map [ string ] struct { } )
2021-07-15 22:45:59 +08:00
frames := queryResult . Frames
2021-06-18 18:26:19 +08:00
for _ , v := range frames {
for _ , vv := range v . Fields {
if metricType , exists := vv . Labels [ "metric" ] ; exists {
if _ , ok := set [ metricType ] ; ! ok {
set [ metricType ] = struct { } { }
}
2018-03-25 02:18:28 +08:00
}
}
}
metricTypeCount := len ( set )
2022-12-13 20:19:03 +08:00
for _ , frame := range frames {
if frame . Meta != nil && frame . Meta . Type == data . FrameTypeTimeSeriesMulti {
// if it is a time-series-multi, it means it has two columns, one is "time",
// another is "number"
valueField := frame . Fields [ 1 ]
fieldName := getFieldName ( * valueField , target , metricTypeCount )
2023-04-11 19:29:38 +08:00
if valueField . Config == nil {
valueField . Config = & data . FieldConfig { }
}
valueField . Config . DisplayNameFromDS = fieldName
2021-08-16 19:45:20 +08:00
}
2018-03-25 02:18:28 +08:00
}
}
2018-05-23 21:09:58 +08:00
var aliasPatternRegex = regexp . MustCompile ( ` \ { \ { ([\s\S]+?)\}\} ` )
2022-12-12 23:00:15 +08:00
func getFieldName ( dataField data . Field , target * Query , metricTypeCount int ) string {
2021-06-18 18:26:19 +08:00
metricType := dataField . Labels [ "metric" ]
2022-12-12 23:00:15 +08:00
metricName := getMetricName ( metricType )
2021-06-18 18:26:19 +08:00
delete ( dataField . Labels , "metric" )
2018-03-25 02:18:28 +08:00
field := ""
2021-06-18 18:26:19 +08:00
if v , ok := dataField . Labels [ "field" ] ; ok {
2018-03-25 02:18:28 +08:00
field = v
2021-06-18 18:26:19 +08:00
delete ( dataField . Labels , "field" )
2018-03-25 02:18:28 +08:00
}
if target . Alias != "" {
2021-06-18 18:26:19 +08:00
frameName := target . Alias
2018-03-25 02:18:28 +08:00
2018-05-23 21:09:58 +08:00
subMatches := aliasPatternRegex . FindAllStringSubmatch ( target . Alias , - 1 )
for _ , subMatch := range subMatches {
group := subMatch [ 0 ]
if len ( subMatch ) > 1 {
group = subMatch [ 1 ]
}
if strings . Index ( group , "term " ) == 0 {
2021-06-18 18:26:19 +08:00
frameName = strings . Replace ( frameName , subMatch [ 0 ] , dataField . Labels [ group [ 5 : ] ] , 1 )
2018-03-23 23:50:16 +08:00
}
2021-06-18 18:26:19 +08:00
if v , ok := dataField . Labels [ group ] ; ok {
frameName = strings . Replace ( frameName , subMatch [ 0 ] , v , 1 )
2018-03-25 02:18:28 +08:00
}
2018-05-23 21:09:58 +08:00
if group == "metric" {
2021-06-18 18:26:19 +08:00
frameName = strings . Replace ( frameName , subMatch [ 0 ] , metricName , 1 )
2018-05-23 21:09:58 +08:00
}
if group == "field" {
2021-06-18 18:26:19 +08:00
frameName = strings . Replace ( frameName , subMatch [ 0 ] , field , 1 )
2018-03-25 02:18:28 +08:00
}
}
2018-05-23 21:09:58 +08:00
2021-06-18 18:26:19 +08:00
return frameName
2018-03-25 02:18:28 +08:00
}
// todo, if field and pipelineAgg
2023-01-04 22:26:57 +08:00
if isPipelineAgg ( metricType ) {
if metricType != "" && isPipelineAggWithMultipleBucketPaths ( metricType ) {
2018-11-16 23:54:25 +08:00
metricID := ""
2021-06-18 18:26:19 +08:00
if v , ok := dataField . Labels [ "metricId" ] ; ok {
2018-11-16 23:54:25 +08:00
metricID = v
}
for _ , metric := range target . Metrics {
if metric . ID == metricID {
metricName = metric . Settings . Get ( "script" ) . MustString ( )
for name , pipelineAgg := range metric . PipelineVariables {
for _ , m := range target . Metrics {
if m . ID == pipelineAgg {
2020-09-22 22:22:19 +08:00
metricName = strings . ReplaceAll ( metricName , "params." + name , describeMetric ( m . Type , m . Field ) )
2018-11-16 23:54:25 +08:00
}
}
}
}
}
} else {
2023-01-04 22:26:57 +08:00
if field != "" {
found := false
for _ , metric := range target . Metrics {
if metric . ID == field {
2023-04-11 19:29:38 +08:00
metricName += " " + describeMetric ( metric . Type , metric . Field )
2023-01-04 22:26:57 +08:00
found = true
}
}
if ! found {
metricName = "Unset"
2018-11-16 23:54:25 +08:00
}
2018-03-26 19:48:57 +08:00
}
}
} else if field != "" {
2018-03-25 02:18:28 +08:00
metricName += " " + field
}
2021-06-18 18:26:19 +08:00
delete ( dataField . Labels , "metricId" )
2018-11-16 23:54:25 +08:00
2021-06-18 18:26:19 +08:00
if len ( dataField . Labels ) == 0 {
2018-03-25 02:18:28 +08:00
return metricName
}
name := ""
2023-02-02 16:03:18 +08:00
for _ , v := range getSortedLabelValues ( dataField . Labels ) {
2018-03-25 02:18:28 +08:00
name += v + " "
}
if metricTypeCount == 1 {
return strings . TrimSpace ( name )
}
return strings . TrimSpace ( name ) + " " + metricName
}
2022-12-12 23:00:15 +08:00
func getMetricName ( metric string ) string {
2018-03-25 02:18:28 +08:00
if text , ok := metricAggType [ metric ] ; ok {
return text
}
if text , ok := extendedStats [ metric ] ; ok {
return text
}
return metric
}
2022-10-14 18:53:12 +08:00
func castToInt ( j * simplejson . Json ) ( int , error ) {
i , err := j . Int ( )
if err == nil {
return i , nil
}
s , err := j . String ( )
if err != nil {
return 0 , err
}
v , err := strconv . Atoi ( s )
if err != nil {
return 0 , err
}
return v , nil
}
2021-06-18 18:26:19 +08:00
func castToFloat ( j * simplejson . Json ) * float64 {
2018-03-25 02:18:28 +08:00
f , err := j . Float64 ( )
if err == nil {
2021-06-18 18:26:19 +08:00
return & f
2018-03-25 02:18:28 +08:00
}
2018-05-24 04:57:46 +08:00
if s , err := j . String ( ) ; err == nil {
if strings . ToLower ( s ) == "nan" {
2021-06-18 18:26:19 +08:00
return nil
2018-05-24 04:57:46 +08:00
}
if v , err := strconv . ParseFloat ( s , 64 ) ; err == nil {
2021-06-18 18:26:19 +08:00
return & v
2018-05-24 04:57:46 +08:00
}
2018-03-25 02:18:28 +08:00
}
2021-06-18 18:26:19 +08:00
return nil
2018-03-25 02:18:28 +08:00
}
2023-03-22 16:52:55 +08:00
func getAsTime ( j * simplejson . Json ) ( time . Time , error ) {
// these are stored as numbers
number , err := j . Float64 ( )
if err != nil {
return time . Time { } , err
}
return time . UnixMilli ( int64 ( number ) ) . UTC ( ) , nil
}
2018-05-23 21:09:58 +08:00
func findAgg ( target * Query , aggID string ) ( * BucketAgg , error ) {
2018-03-25 02:18:28 +08:00
for _ , v := range target . BucketAggs {
2018-05-23 21:09:58 +08:00
if aggID == v . ID {
2018-03-28 01:42:25 +08:00
return v , nil
2018-03-23 23:50:16 +08:00
}
}
2018-05-23 21:09:58 +08:00
return nil , errors . New ( "can't found aggDef, aggID:" + aggID )
}
2021-07-15 22:45:59 +08:00
func getErrorFromElasticResponse ( response * es . SearchResponse ) string {
var errorString string
2018-05-23 21:09:58 +08:00
json := simplejson . NewFromAny ( response . Error )
reason := json . Get ( "reason" ) . MustString ( )
rootCauseReason := json . Get ( "root_cause" ) . GetIndex ( 0 ) . Get ( "reason" ) . MustString ( )
2023-01-20 18:46:31 +08:00
causedByReason := json . Get ( "caused_by" ) . Get ( "reason" ) . MustString ( )
2018-05-23 21:09:58 +08:00
2020-07-16 20:39:01 +08:00
switch {
case rootCauseReason != "" :
2021-07-15 22:45:59 +08:00
errorString = rootCauseReason
2020-07-16 20:39:01 +08:00
case reason != "" :
2021-07-15 22:45:59 +08:00
errorString = reason
2023-01-20 18:46:31 +08:00
case causedByReason != "" :
errorString = causedByReason
2020-07-16 20:39:01 +08:00
default :
2021-07-15 22:45:59 +08:00
errorString = "Unknown elasticsearch error response"
2018-05-23 21:09:58 +08:00
}
2021-07-15 22:45:59 +08:00
return errorString
2018-03-23 23:50:16 +08:00
}
2023-02-22 20:28:43 +08:00
// flatten flattens multi-level objects to single level objects. It uses dot notation to join keys.
func flatten ( target map [ string ] interface { } ) map [ string ] interface { } {
// On frontend maxDepth wasn't used but as we are processing on backend
// let's put a limit to avoid infinite loop. 10 was chosen arbitrary.
maxDepth := 10
currentDepth := 0
delimiter := ""
output := make ( map [ string ] interface { } )
var step func ( object map [ string ] interface { } , prev string )
step = func ( object map [ string ] interface { } , prev string ) {
for key , value := range object {
if prev == "" {
delimiter = ""
} else {
delimiter = "."
}
newKey := prev + delimiter + key
v , ok := value . ( map [ string ] interface { } )
shouldStepInside := ok && len ( v ) > 0 && currentDepth < maxDepth
if shouldStepInside {
currentDepth ++
step ( v , newKey )
} else {
output [ newKey ] = value
}
}
}
step ( target , "" )
return output
}
2023-03-01 18:50:56 +08:00
// sortPropNames orders propNames so that timeField is first (if it exists), log message field is second
// if shouldSortLogMessageField is true, and rest of propNames are ordered alphabetically
func sortPropNames ( propNames map [ string ] bool , configuredFields es . ConfiguredFields , shouldSortLogMessageField bool ) [ ] string {
2023-02-22 20:28:43 +08:00
hasTimeField := false
2023-03-01 18:50:56 +08:00
hasLogMessageField := false
2023-02-22 20:28:43 +08:00
var sortedPropNames [ ] string
for k := range propNames {
2023-03-01 18:50:56 +08:00
if configuredFields . TimeField != "" && k == configuredFields . TimeField {
2023-02-22 20:28:43 +08:00
hasTimeField = true
2023-03-01 18:50:56 +08:00
} else if shouldSortLogMessageField && configuredFields . LogMessageField != "" && k == configuredFields . LogMessageField {
hasLogMessageField = true
2023-02-22 20:28:43 +08:00
} else {
sortedPropNames = append ( sortedPropNames , k )
}
}
sort . Strings ( sortedPropNames )
2023-03-01 18:50:56 +08:00
if hasLogMessageField {
sortedPropNames = append ( [ ] string { configuredFields . LogMessageField } , sortedPropNames ... )
}
2023-02-22 20:28:43 +08:00
if hasTimeField {
2023-03-01 18:50:56 +08:00
sortedPropNames = append ( [ ] string { configuredFields . TimeField } , sortedPropNames ... )
2023-02-22 20:28:43 +08:00
}
return sortedPropNames
}
// findTheFirstNonNilDocValueForPropName finds the first non-nil value for propName in docs. If none of the values are non-nil, it returns the value of propName in the first doc.
func findTheFirstNonNilDocValueForPropName ( docs [ ] map [ string ] interface { } , propName string ) interface { } {
for _ , doc := range docs {
if doc [ propName ] != nil {
return doc [ propName ]
}
}
return docs [ 0 ] [ propName ]
}
func createFieldOfType [ T int | float64 | bool | string ] ( docs [ ] map [ string ] interface { } , propName string , size int , isFilterable bool ) * data . Field {
fieldVector := make ( [ ] * T , size )
for i , doc := range docs {
value , ok := doc [ propName ] . ( T )
if ! ok {
continue
}
fieldVector [ i ] = & value
}
field := data . NewField ( propName , nil , fieldVector )
field . Config = & data . FieldConfig { Filterable : & isFilterable }
return field
}
2023-03-01 18:50:56 +08:00
func setPreferredVisType ( frame * data . Frame , visType data . VisType ) {
if frame . Meta == nil {
frame . Meta = & data . FrameMeta { }
}
frame . Meta . PreferredVisualization = visType
}
2023-03-09 20:34:53 +08:00
func setSearchWords ( frame * data . Frame , searchWords map [ string ] bool ) {
i := 0
searchWordsList := make ( [ ] string , len ( searchWords ) )
for searchWord := range searchWords {
searchWordsList [ i ] = searchWord
i ++
}
sort . Strings ( searchWordsList )
if frame . Meta == nil {
frame . Meta = & data . FrameMeta { }
}
if frame . Meta . Custom == nil {
frame . Meta . Custom = map [ string ] interface { } { }
}
frame . Meta . Custom = map [ string ] interface { } {
"searchWords" : searchWordsList ,
}
}
2023-03-20 23:32:19 +08:00
2023-03-23 00:53:09 +08:00
func createFields ( frames data . Frames , propKeys [ ] string ) [ ] * data . Field {
2023-03-20 23:32:19 +08:00
var fields [ ] * data . Field
2023-03-23 00:53:09 +08:00
// Otherwise use the fields from frames
if frames != nil {
for _ , frame := range frames {
fields = append ( fields , frame . Fields ... )
}
// If we have no frames, we create fields from propKeys
} else {
2023-03-20 23:32:19 +08:00
for _ , propKey := range propKeys {
fields = append ( fields , data . NewField ( propKey , nil , [ ] * string { } ) )
}
}
return fields
}
2023-03-21 20:44:10 +08:00
func getSortedKeys ( data map [ string ] interface { } ) [ ] string {
keys := make ( [ ] string , 0 , len ( data ) )
for k := range data {
keys = append ( keys , k )
}
sort . Strings ( keys )
return keys
}
2023-03-22 16:36:43 +08:00
func createPropKeys ( props map [ string ] string ) [ ] string {
propKeys := make ( [ ] string , 0 )
for k := range props {
propKeys = append ( propKeys , k )
}
sort . Strings ( propKeys )
return propKeys
}
func addMetricValueToFields ( fields * [ ] * data . Field , values [ ] interface { } , metricName string , value * float64 ) {
index := - 1
for i , f := range * fields {
if f . Name == metricName {
index = i
break
}
}
var field data . Field
if index == - 1 {
field = * data . NewField ( metricName , nil , [ ] * float64 { } )
* fields = append ( * fields , & field )
} else {
field = * ( * fields ) [ index ]
}
field . Append ( value )
}
func addPercentilesToFields ( fields * [ ] * data . Field , bucket * simplejson . Json , metric * MetricAgg , values [ ] interface { } ) {
percentiles := bucket . GetPath ( metric . ID , "values" )
for _ , percentileName := range getSortedKeys ( percentiles . MustMap ( ) ) {
percentileValue := percentiles . Get ( percentileName ) . MustFloat64 ( )
addMetricValueToFields ( fields , values , fmt . Sprintf ( "p%v %v" , percentileName , metric . Field ) , & percentileValue )
}
}
func addExtendedStatsToFields ( fields * [ ] * data . Field , bucket * simplejson . Json , metric * MetricAgg , values [ ] interface { } ) {
metaKeys := make ( [ ] string , 0 )
meta := metric . Meta . MustMap ( )
for k := range meta {
metaKeys = append ( metaKeys , k )
}
sort . Strings ( metaKeys )
for _ , statName := range metaKeys {
v := meta [ statName ]
if enabled , ok := v . ( bool ) ; ! ok || ! enabled {
continue
}
var value * float64
switch statName {
case "std_deviation_bounds_upper" :
value = castToFloat ( bucket . GetPath ( metric . ID , "std_deviation_bounds" , "upper" ) )
case "std_deviation_bounds_lower" :
value = castToFloat ( bucket . GetPath ( metric . ID , "std_deviation_bounds" , "lower" ) )
default :
value = castToFloat ( bucket . GetPath ( metric . ID , statName ) )
}
addMetricValueToFields ( fields , values , getMetricName ( metric . Type ) , value )
break
}
}
func addTopMetricsToFields ( fields * [ ] * data . Field , bucket * simplejson . Json , metric * MetricAgg , values [ ] interface { } ) {
baseName := getMetricName ( metric . Type )
metrics := metric . Settings . Get ( "metrics" ) . MustStringArray ( )
for _ , metricField := range metrics {
// If we selected more than one metric we also add each metric name
metricName := baseName
if len ( metrics ) > 1 {
metricName += " " + metricField
}
top := bucket . GetPath ( metric . ID , "top" ) . MustArray ( )
metrics , hasMetrics := top [ 0 ] . ( map [ string ] interface { } ) [ "metrics" ]
if hasMetrics {
metrics := metrics . ( map [ string ] interface { } )
metricValue , hasMetricValue := metrics [ metricField ]
if hasMetricValue && metricValue != nil {
v := metricValue . ( float64 )
addMetricValueToFields ( fields , values , metricName , & v )
}
}
}
}
func addOtherMetricsToFields ( fields * [ ] * data . Field , bucket * simplejson . Json , metric * MetricAgg , values [ ] interface { } , target * Query ) {
metricName := getMetricName ( metric . Type )
otherMetrics := make ( [ ] * MetricAgg , 0 )
for _ , m := range target . Metrics {
2023-04-24 23:26:04 +08:00
// To other metrics we add metric of the same type that are not the current metric
if m . ID != metric . ID && m . Type == metric . Type {
2023-03-22 16:36:43 +08:00
otherMetrics = append ( otherMetrics , m )
}
}
2023-04-24 23:26:04 +08:00
if len ( otherMetrics ) > 0 {
2023-03-22 16:36:43 +08:00
metricName += " " + metric . Field
2023-04-24 23:26:04 +08:00
// We check if we have metric with the same type and same field name
// If so, append metric.ID to the metric name
for _ , m := range otherMetrics {
if m . Field == metric . Field {
metricName += " " + metric . ID
break
}
}
2023-03-22 16:36:43 +08:00
if metric . Type == "bucket_script" {
// Use the formula in the column name
metricName = metric . Settings . Get ( "script" ) . MustString ( "" )
}
}
addMetricValueToFields ( fields , values , metricName , castToFloat ( bucket . GetPath ( metric . ID , "value" ) ) )
}