2019-09-11 03:50:04 +08:00
package sqleng
2017-10-10 21:19:14 +08:00
import (
"context"
2018-07-27 00:09:42 +08:00
"database/sql"
2021-09-07 15:35:37 +08:00
"encoding/json"
2021-03-29 19:03:01 +08:00
"errors"
2018-04-25 01:50:14 +08:00
"fmt"
2021-03-29 19:03:01 +08:00
"net"
2018-09-13 22:51:00 +08:00
"regexp"
2024-03-26 19:27:07 +08:00
"runtime/debug"
2018-08-12 16:51:58 +08:00
"strconv"
2018-07-27 00:09:42 +08:00
"strings"
2017-10-10 21:19:14 +08:00
"sync"
2018-03-21 02:40:10 +08:00
"time"
2017-10-10 21:19:14 +08:00
2021-05-05 22:46:07 +08:00
"github.com/grafana/grafana-plugin-sdk-go/backend"
2024-02-02 20:18:22 +08:00
"github.com/grafana/grafana-plugin-sdk-go/backend/gtime"
2023-12-14 16:42:06 +08:00
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
2024-08-21 23:40:42 +08:00
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana-plugin-sdk-go/data/sqlutil"
2017-10-10 21:19:14 +08:00
)
2020-05-30 02:06:33 +08:00
// MetaKeyExecutedQueryString is the key where the executed query should get stored
const MetaKeyExecutedQueryString = "executedQueryString"
2021-03-08 14:02:49 +08:00
// SQLMacroEngine interpolates macros into sql. It takes in the Query to have access to query context and
2017-12-09 06:04:17 +08:00
// timeRange to be able to generate queries that use from and to.
2021-03-08 14:02:49 +08:00
type SQLMacroEngine interface {
2021-09-07 15:35:37 +08:00
Interpolate ( query * backend . DataQuery , timeRange backend . TimeRange , sql string ) ( string , error )
2017-10-10 21:19:14 +08:00
}
2019-09-25 02:50:49 +08:00
// SqlQueryResultTransformer transforms a query result row to RowValues with proper types.
type SqlQueryResultTransformer interface {
// TransformQueryError transforms a query error.
2022-11-02 22:03:50 +08:00
TransformQueryError ( logger log . Logger , err error ) error
2021-05-05 22:46:07 +08:00
GetConverterList ( ) [ ] sqlutil . StringConverter
2017-10-10 21:19:14 +08:00
}
2021-09-07 15:35:37 +08:00
type JsonData struct {
2023-05-26 18:33:55 +08:00
MaxOpenConns int ` json:"maxOpenConns" `
MaxIdleConns int ` json:"maxIdleConns" `
ConnMaxLifetime int ` json:"connMaxLifetime" `
ConnectionTimeout int ` json:"connectionTimeout" `
Timescaledb bool ` json:"timescaledb" `
Mode string ` json:"sslmode" `
ConfigurationMethod string ` json:"tlsConfigurationMethod" `
TlsSkipVerify bool ` json:"tlsSkipVerify" `
RootCertFile string ` json:"sslRootCertFile" `
CertFile string ` json:"sslCertFile" `
CertKeyFile string ` json:"sslKeyFile" `
Timezone string ` json:"timezone" `
Encrypt string ` json:"encrypt" `
Servername string ` json:"servername" `
TimeInterval string ` json:"timeInterval" `
Database string ` json:"database" `
SecureDSProxy bool ` json:"enableSecureSocksProxy" `
2023-09-13 03:07:41 +08:00
SecureDSProxyUsername string ` json:"secureSocksProxyUsername" `
2023-05-26 18:33:55 +08:00
AllowCleartextPasswords bool ` json:"allowCleartextPasswords" `
2023-09-07 00:27:19 +08:00
AuthenticationType string ` json:"authenticationType" `
2021-09-07 15:35:37 +08:00
}
type DataSourceInfo struct {
JsonData JsonData
URL string
User string
Database string
ID int64
Updated time . Time
UID string
DecryptedSecureJSONData map [ string ] string
2018-07-27 00:09:42 +08:00
}
2021-03-08 14:02:49 +08:00
type DataPluginConfiguration struct {
2021-09-07 15:35:37 +08:00
DSInfo DataSourceInfo
2018-07-27 00:09:42 +08:00
TimeColumnNames [ ] string
MetricColumnTypes [ ] string
2021-09-13 21:27:51 +08:00
RowLimit int64
2018-07-27 00:09:42 +08:00
}
2023-06-16 23:46:47 +08:00
2021-09-07 15:35:37 +08:00
type DataSourceHandler struct {
macroEngine SQLMacroEngine
queryResultTransformer SqlQueryResultTransformer
2023-12-06 16:35:05 +08:00
db * sql . DB
2021-09-07 15:35:37 +08:00
timeColumnNames [ ] string
metricColumnTypes [ ] string
log log . Logger
dsInfo DataSourceInfo
2021-09-13 21:27:51 +08:00
rowLimit int64
2023-06-16 23:46:47 +08:00
userError string
2021-09-07 15:35:37 +08:00
}
2023-03-07 17:33:59 +08:00
2021-09-07 15:35:37 +08:00
type QueryJson struct {
RawSql string ` json:"rawSql" `
Fill bool ` json:"fill" `
FillInterval float64 ` json:"fillInterval" `
FillMode string ` json:"fillMode" `
FillValue float64 ` json:"fillValue" `
Format string ` json:"format" `
}
2018-07-27 00:09:42 +08:00
2022-12-20 00:17:52 +08:00
func ( e * DataSourceHandler ) TransformQueryError ( logger log . Logger , err error ) error {
2021-05-05 22:46:07 +08:00
// OpError is the error type usually returned by functions in the net
// package. It describes the operation, network type, and address of
// an error. We log this error rather than return it to the client
// for security purposes.
var opErr * net . OpError
if errors . As ( err , & opErr ) {
2022-11-02 22:03:50 +08:00
logger . Error ( "Query error" , "err" , err )
2024-02-07 15:39:24 +08:00
return fmt . Errorf ( "failed to connect to server - %s" , e . userError )
2021-05-05 22:46:07 +08:00
}
2022-11-02 22:03:50 +08:00
return e . queryResultTransformer . TransformQueryError ( logger , err )
2021-05-05 22:46:07 +08:00
}
2024-02-12 19:37:23 +08:00
func NewQueryDataHandler ( userFacingDefaultError string , db * sql . DB , config DataPluginConfiguration , queryResultTransformer SqlQueryResultTransformer ,
2021-09-07 15:35:37 +08:00
macroEngine SQLMacroEngine , log log . Logger ) ( * DataSourceHandler , error ) {
queryDataHandler := DataSourceHandler {
2019-09-25 02:50:49 +08:00
queryResultTransformer : queryResultTransformer ,
macroEngine : macroEngine ,
timeColumnNames : [ ] string { "time" } ,
log : log ,
2021-09-07 15:35:37 +08:00
dsInfo : config . DSInfo ,
2021-09-13 21:27:51 +08:00
rowLimit : config . RowLimit ,
2024-02-12 19:37:23 +08:00
userError : userFacingDefaultError ,
2018-07-27 00:09:42 +08:00
}
if len ( config . TimeColumnNames ) > 0 {
2021-09-07 15:35:37 +08:00
queryDataHandler . timeColumnNames = config . TimeColumnNames
2018-07-27 00:09:42 +08:00
}
2018-07-30 19:50:18 +08:00
if len ( config . MetricColumnTypes ) > 0 {
2021-09-07 15:35:37 +08:00
queryDataHandler . metricColumnTypes = config . MetricColumnTypes
2018-07-30 19:50:18 +08:00
}
2023-12-06 16:35:05 +08:00
queryDataHandler . db = db
2021-09-07 15:35:37 +08:00
return & queryDataHandler , nil
2017-10-10 21:19:14 +08:00
}
2021-09-07 15:35:37 +08:00
type DBDataResponse struct {
dataResponse backend . DataResponse
refID string
}
2021-09-16 05:03:25 +08:00
func ( e * DataSourceHandler ) Dispose ( ) {
2023-12-06 16:35:05 +08:00
e . log . Debug ( "Disposing DB..." )
if e . db != nil {
if err := e . db . Close ( ) ; err != nil {
e . log . Error ( "Failed to dispose db" , "error" , err )
2021-09-16 05:03:25 +08:00
}
}
2023-12-06 16:35:05 +08:00
e . log . Debug ( "DB disposed" )
2021-09-16 05:03:25 +08:00
}
2021-09-07 15:35:37 +08:00
func ( e * DataSourceHandler ) QueryData ( ctx context . Context , req * backend . QueryDataRequest ) ( * backend . QueryDataResponse , error ) {
result := backend . NewQueryDataResponse ( )
ch := make ( chan DBDataResponse , len ( req . Queries ) )
2018-09-24 20:33:45 +08:00
var wg sync . WaitGroup
2021-03-08 14:02:49 +08:00
// Execute each query in a goroutine and wait for them to finish afterwards
2021-09-07 15:35:37 +08:00
for _ , query := range req . Queries {
queryjson := QueryJson {
Fill : false ,
Format : "time_series" ,
}
err := json . Unmarshal ( query . JSON , & queryjson )
if err != nil {
return nil , fmt . Errorf ( "error unmarshal query json: %w" , err )
}
2024-02-05 15:08:35 +08:00
// the fill-params are only stored inside this function, during query-interpolation. we do not support
// sending them in "from the outside"
if queryjson . Fill || queryjson . FillInterval != 0.0 || queryjson . FillMode != "" || queryjson . FillValue != 0.0 {
return nil , fmt . Errorf ( "query fill-parameters not supported" )
}
2021-09-07 15:35:37 +08:00
if queryjson . RawSql == "" {
2017-10-10 21:19:14 +08:00
continue
}
2021-03-08 14:02:49 +08:00
wg . Add ( 1 )
2021-09-07 15:35:37 +08:00
go e . executeQuery ( query , & wg , ctx , ch , queryjson )
2017-10-10 21:19:14 +08:00
}
2021-03-08 14:02:49 +08:00
2018-09-24 20:33:45 +08:00
wg . Wait ( )
2017-10-10 21:19:14 +08:00
2021-03-08 14:02:49 +08:00
// Read results from channels
close ( ch )
2021-09-07 15:35:37 +08:00
result . Responses = make ( map [ string ] backend . DataResponse )
2021-03-08 14:02:49 +08:00
for queryResult := range ch {
2021-09-07 15:35:37 +08:00
result . Responses [ queryResult . refID ] = queryResult . dataResponse
2021-03-08 14:02:49 +08:00
}
2017-10-10 21:19:14 +08:00
return result , nil
}
2018-03-21 02:40:10 +08:00
2021-09-07 15:35:37 +08:00
func ( e * DataSourceHandler ) executeQuery ( query backend . DataQuery , wg * sync . WaitGroup , queryContext context . Context ,
ch chan DBDataResponse , queryJson QueryJson ) {
2021-05-05 22:46:07 +08:00
defer wg . Done ( )
2021-09-07 15:35:37 +08:00
queryResult := DBDataResponse {
dataResponse : backend . DataResponse { } ,
refID : query . RefID ,
2018-09-13 22:51:00 +08:00
}
2022-11-02 22:03:50 +08:00
logger := e . log . FromContext ( queryContext )
2021-05-05 22:46:07 +08:00
defer func ( ) {
if r := recover ( ) ; r != nil {
2024-03-26 19:27:07 +08:00
logger . Error ( "ExecuteQuery panic" , "error" , r , "stack" , string ( debug . Stack ( ) ) )
2021-05-05 22:46:07 +08:00
if theErr , ok := r . ( error ) ; ok {
2021-09-07 15:35:37 +08:00
queryResult . dataResponse . Error = theErr
2024-09-09 21:02:09 +08:00
queryResult . dataResponse . ErrorSource = backend . ErrorSourcePlugin
2021-05-05 22:46:07 +08:00
} else if theErrString , ok := r . ( string ) ; ok {
2024-08-21 23:40:42 +08:00
queryResult . dataResponse . Error = errors . New ( theErrString )
2024-09-09 21:02:09 +08:00
queryResult . dataResponse . ErrorSource = backend . ErrorSourcePlugin
2021-05-05 22:46:07 +08:00
} else {
2023-06-16 23:46:47 +08:00
queryResult . dataResponse . Error = fmt . Errorf ( "unexpected error - %s" , e . userError )
2024-09-09 21:02:09 +08:00
queryResult . dataResponse . ErrorSource = backend . ErrorSourceDownstream
2021-05-05 22:46:07 +08:00
}
ch <- queryResult
}
} ( )
2018-09-13 22:51:00 +08:00
2021-09-07 15:35:37 +08:00
if queryJson . RawSql == "" {
2021-05-05 22:46:07 +08:00
panic ( "Query model property rawSql should not be empty at this point" )
}
2021-09-07 15:35:37 +08:00
timeRange := query . TimeRange
2018-09-13 22:51:00 +08:00
2024-09-09 21:02:09 +08:00
errAppendDebug := func ( frameErr string , err error , query string , source backend . ErrorSource ) {
2021-06-09 21:14:28 +08:00
var emptyFrame data . Frame
emptyFrame . SetMeta ( & data . FrameMeta {
ExecutedQueryString : query ,
} )
2021-09-07 15:35:37 +08:00
queryResult . dataResponse . Error = fmt . Errorf ( "%s: %w" , frameErr , err )
2024-09-09 21:02:09 +08:00
queryResult . dataResponse . ErrorSource = source
2021-09-07 15:35:37 +08:00
queryResult . dataResponse . Frames = data . Frames { & emptyFrame }
2021-06-09 21:14:28 +08:00
ch <- queryResult
}
2021-05-05 22:46:07 +08:00
// global substitutions
2024-02-07 15:41:49 +08:00
interpolatedQuery := Interpolate ( query , timeRange , e . dsInfo . JsonData . TimeInterval , queryJson . RawSql )
2018-07-27 00:09:42 +08:00
2021-05-05 22:46:07 +08:00
// data source specific substitutions
2024-02-07 15:41:49 +08:00
interpolatedQuery , err := e . macroEngine . Interpolate ( & query , timeRange , interpolatedQuery )
2021-05-05 22:46:07 +08:00
if err != nil {
2024-09-09 21:02:09 +08:00
errAppendDebug ( "interpolation failed" , e . TransformQueryError ( logger , err ) , interpolatedQuery , backend . ErrorSourcePlugin )
2021-05-05 22:46:07 +08:00
return
}
2021-03-08 14:02:49 +08:00
2023-12-06 16:35:05 +08:00
rows , err := e . db . QueryContext ( queryContext , interpolatedQuery )
2021-05-05 22:46:07 +08:00
if err != nil {
2024-09-09 21:02:09 +08:00
errAppendDebug ( "db query error" , e . TransformQueryError ( logger , err ) , interpolatedQuery , backend . ErrorSourceDownstream )
2021-05-05 22:46:07 +08:00
return
2018-07-27 00:09:42 +08:00
}
2021-05-05 22:46:07 +08:00
defer func ( ) {
if err := rows . Close ( ) ; err != nil {
2022-11-02 22:03:50 +08:00
logger . Warn ( "Failed to close rows" , "err" , err )
2021-05-05 22:46:07 +08:00
}
} ( )
2018-07-27 00:09:42 +08:00
2021-05-05 22:46:07 +08:00
qm , err := e . newProcessCfg ( query , queryContext , rows , interpolatedQuery )
if err != nil {
2024-09-09 21:02:09 +08:00
errAppendDebug ( "failed to get configurations" , err , interpolatedQuery , backend . ErrorSourcePlugin )
2021-05-05 22:46:07 +08:00
return
}
2018-07-27 00:09:42 +08:00
2021-05-05 22:46:07 +08:00
// Convert row.Rows to dataframe
2021-05-11 20:59:33 +08:00
stringConverters := e . queryResultTransformer . GetConverterList ( )
2023-12-06 16:35:05 +08:00
frame , err := sqlutil . FrameFromRows ( rows , e . rowLimit , sqlutil . ToConverters ( stringConverters ... ) ... )
2021-05-05 22:46:07 +08:00
if err != nil {
2024-09-09 21:02:09 +08:00
errAppendDebug ( "convert frame from rows error" , err , interpolatedQuery , backend . ErrorSourcePlugin )
2021-05-05 22:46:07 +08:00
return
}
2019-12-14 00:25:36 +08:00
2021-09-13 21:27:51 +08:00
if frame . Meta == nil {
frame . Meta = & data . FrameMeta { }
}
frame . Meta . ExecutedQueryString = interpolatedQuery
2021-05-05 22:46:07 +08:00
2023-05-09 22:29:02 +08:00
// If no rows were returned, clear any previously set `Fields` with a single empty `data.Field` slice.
// Then assign `queryResult.dataResponse.Frames` the current single frame with that single empty Field.
// This assures 1) our visualization doesn't display unwanted empty fields, and also that 2)
// additionally-needed frame data stays intact and is correctly passed to our visulization.
2021-05-05 22:46:07 +08:00
if frame . Rows ( ) == 0 {
2023-05-09 22:29:02 +08:00
frame . Fields = [ ] * data . Field { }
queryResult . dataResponse . Frames = data . Frames { frame }
2021-06-09 21:14:28 +08:00
ch <- queryResult
2021-05-05 22:46:07 +08:00
return
2018-07-27 00:09:42 +08:00
}
2021-09-08 07:54:48 +08:00
if err := convertSQLTimeColumnsToEpochMS ( frame , qm ) ; err != nil {
2024-09-09 21:02:09 +08:00
errAppendDebug ( "converting time columns failed" , err , interpolatedQuery , backend . ErrorSourcePlugin )
2021-09-08 07:54:48 +08:00
return
2018-07-27 00:09:42 +08:00
}
2021-05-05 22:46:07 +08:00
if qm . Format == dataQueryFormatSeries {
// time series has to have time column
if qm . timeIndex == - 1 {
2024-11-26 23:47:14 +08:00
errAppendDebug ( "db has no time column" , errors . New ( "time column is missing; make sure your data includes a time column for time series format or switch to a table format that doesn't require it" ) , interpolatedQuery , backend . ErrorSourceDownstream )
2021-05-05 22:46:07 +08:00
return
2018-07-27 00:09:42 +08:00
}
2021-07-14 17:29:51 +08:00
// Make sure to name the time field 'Time' to be backward compatible with Grafana pre-v8.
frame . Fields [ qm . timeIndex ] . Name = data . TimeSeriesTimeFieldName
2021-05-05 22:46:07 +08:00
for i := range qm . columnNames {
if i == qm . timeIndex || i == qm . metricIndex {
continue
}
2018-07-27 00:09:42 +08:00
2021-07-21 00:42:54 +08:00
if t := frame . Fields [ i ] . Type ( ) ; t == data . FieldTypeString || t == data . FieldTypeNullableString {
continue
}
2021-05-05 22:46:07 +08:00
var err error
if frame , err = convertSQLValueColumnToFloat ( frame , i ) ; err != nil {
2024-09-09 21:02:09 +08:00
errAppendDebug ( "convert value to float failed" , err , interpolatedQuery , backend . ErrorSourcePlugin )
2021-05-05 22:46:07 +08:00
return
}
}
tsSchema := frame . TimeSeriesSchema ( )
if tsSchema . Type == data . TimeSeriesTypeLong {
var err error
2021-06-18 14:05:23 +08:00
originalData := frame
2021-05-05 22:46:07 +08:00
frame , err = data . LongToWide ( frame , qm . FillMissing )
if err != nil {
2024-09-09 21:02:09 +08:00
errAppendDebug ( "failed to convert long to wide series when converting from dataframe" , err , interpolatedQuery , backend . ErrorSourcePlugin )
2021-05-05 22:46:07 +08:00
return
}
2021-06-18 14:05:23 +08:00
// Before 8x, a special metric column was used to name time series. The LongToWide transforms that into a metric label on the value field.
// But that makes series name have both the value column name AND the metric name. So here we are removing the metric label here and moving it to the
// field name to get the same naming for the series as pre v8
if len ( originalData . Fields ) == 3 {
for _ , field := range frame . Fields {
if len ( field . Labels ) == 1 { // 7x only supported one label
name , ok := field . Labels [ "metric" ]
if ok {
field . Name = name
field . Labels = nil
}
}
}
}
2021-05-05 22:46:07 +08:00
}
if qm . FillMissing != nil {
2024-04-12 21:22:28 +08:00
// we align the start-time
startUnixTime := qm . TimeRange . From . Unix ( ) / int64 ( qm . Interval . Seconds ( ) ) * int64 ( qm . Interval . Seconds ( ) )
alignedTimeRange := backend . TimeRange {
From : time . Unix ( startUnixTime , 0 ) ,
To : qm . TimeRange . To ,
}
2021-05-05 22:46:07 +08:00
var err error
2024-04-12 21:22:28 +08:00
frame , err = sqlutil . ResampleWideFrame ( frame , qm . FillMissing , alignedTimeRange , qm . Interval )
2021-05-05 22:46:07 +08:00
if err != nil {
2022-11-02 22:03:50 +08:00
logger . Error ( "Failed to resample dataframe" , "err" , err )
2021-05-05 22:46:07 +08:00
frame . AppendNotices ( data . Notice { Text : "Failed to resample dataframe" , Severity : data . NoticeSeverityWarning } )
}
2018-07-27 00:09:42 +08:00
}
2021-05-05 22:46:07 +08:00
}
2021-09-07 15:35:37 +08:00
queryResult . dataResponse . Frames = data . Frames { frame }
2021-05-05 22:46:07 +08:00
ch <- queryResult
}
2018-07-27 00:09:42 +08:00
2021-05-05 22:46:07 +08:00
// Interpolate provides global macros/substitutions for all sql datasources.
2024-02-07 15:41:49 +08:00
var Interpolate = func ( query backend . DataQuery , timeRange backend . TimeRange , timeInterval string , sql string ) string {
2024-01-24 15:47:07 +08:00
interval := query . Interval
2018-07-27 00:09:42 +08:00
2021-05-05 22:46:07 +08:00
sql = strings . ReplaceAll ( sql , "$__interval_ms" , strconv . FormatInt ( interval . Milliseconds ( ) , 10 ) )
2024-02-02 20:18:22 +08:00
sql = strings . ReplaceAll ( sql , "$__interval" , gtime . FormatInterval ( interval ) )
2021-09-07 15:35:37 +08:00
sql = strings . ReplaceAll ( sql , "$__unixEpochFrom()" , fmt . Sprintf ( "%d" , timeRange . From . UTC ( ) . Unix ( ) ) )
sql = strings . ReplaceAll ( sql , "$__unixEpochTo()" , fmt . Sprintf ( "%d" , timeRange . To . UTC ( ) . Unix ( ) ) )
2021-05-05 22:46:07 +08:00
2024-02-07 15:41:49 +08:00
return sql
2018-07-27 00:09:42 +08:00
}
2021-09-07 15:35:37 +08:00
func ( e * DataSourceHandler ) newProcessCfg ( query backend . DataQuery , queryContext context . Context ,
2023-12-06 16:35:05 +08:00
rows * sql . Rows , interpolatedQuery string ) ( * dataQueryModel , error ) {
2018-07-27 00:09:42 +08:00
columnNames , err := rows . Columns ( )
if err != nil {
2020-08-18 20:43:18 +08:00
return nil , err
2018-07-27 00:09:42 +08:00
}
columnTypes , err := rows . ColumnTypes ( )
if err != nil {
2020-08-18 20:43:18 +08:00
return nil , err
2018-07-27 00:09:42 +08:00
}
2021-05-05 22:46:07 +08:00
qm := & dataQueryModel {
columnTypes : columnTypes ,
columnNames : columnNames ,
timeIndex : - 1 ,
2021-09-08 07:54:48 +08:00
timeEndIndex : - 1 ,
2021-05-05 22:46:07 +08:00
metricIndex : - 1 ,
metricPrefix : false ,
queryContext : queryContext ,
}
2020-08-18 20:43:18 +08:00
2021-09-07 15:35:37 +08:00
queryJson := QueryJson { }
err = json . Unmarshal ( query . JSON , & queryJson )
if err != nil {
return nil , err
}
if queryJson . Fill {
2021-05-05 22:46:07 +08:00
qm . FillMissing = & data . FillMissing { }
2021-09-07 15:35:37 +08:00
qm . Interval = time . Duration ( queryJson . FillInterval * float64 ( time . Second ) )
switch strings . ToLower ( queryJson . FillMode ) {
2021-05-05 22:46:07 +08:00
case "null" :
qm . FillMissing . Mode = data . FillModeNull
case "previous" :
qm . FillMissing . Mode = data . FillModePrevious
case "value" :
qm . FillMissing . Mode = data . FillModeValue
2021-09-07 15:35:37 +08:00
qm . FillMissing . Value = queryJson . FillValue
2021-05-05 22:46:07 +08:00
default :
}
2020-08-18 20:43:18 +08:00
}
2021-09-07 15:35:37 +08:00
qm . TimeRange . From = query . TimeRange . From . UTC ( )
qm . TimeRange . To = query . TimeRange . To . UTC ( )
2018-07-27 00:09:42 +08:00
2021-09-07 15:35:37 +08:00
switch queryJson . Format {
2021-05-05 22:46:07 +08:00
case "time_series" :
qm . Format = dataQueryFormatSeries
case "table" :
qm . Format = dataQueryFormatTable
default :
2021-09-07 15:35:37 +08:00
panic ( fmt . Sprintf ( "Unrecognized query model format: %q" , queryJson . Format ) )
2021-05-05 22:46:07 +08:00
}
for i , col := range qm . columnNames {
2018-07-27 00:09:42 +08:00
for _ , tc := range e . timeColumnNames {
if col == tc {
2021-05-05 22:46:07 +08:00
qm . timeIndex = i
break
2018-07-27 00:09:42 +08:00
}
}
2021-09-08 07:54:48 +08:00
if qm . Format == dataQueryFormatTable && col == "timeend" {
qm . timeEndIndex = i
continue
}
2018-07-27 00:09:42 +08:00
switch col {
case "metric" :
2021-05-05 22:46:07 +08:00
qm . metricIndex = i
2018-07-27 00:09:42 +08:00
default :
2021-05-05 22:46:07 +08:00
if qm . metricIndex == - 1 {
columnType := qm . columnTypes [ i ] . DatabaseTypeName ( )
2018-07-27 00:09:42 +08:00
for _ , mct := range e . metricColumnTypes {
if columnType == mct {
2021-05-05 22:46:07 +08:00
qm . metricIndex = i
2018-07-27 00:09:42 +08:00
continue
}
}
}
}
}
2021-05-05 22:46:07 +08:00
qm . InterpolatedQuery = interpolatedQuery
return qm , nil
}
2018-07-27 00:09:42 +08:00
2021-05-05 22:46:07 +08:00
// dataQueryFormat is the type of query.
type dataQueryFormat string
2018-07-25 01:25:48 +08:00
2021-05-05 22:46:07 +08:00
const (
// dataQueryFormatTable identifies a table query (default).
dataQueryFormatTable dataQueryFormat = "table"
// dataQueryFormatSeries identifies a time series query.
dataQueryFormatSeries dataQueryFormat = "time_series"
)
type dataQueryModel struct {
InterpolatedQuery string // property not set until after Interpolate()
Format dataQueryFormat
TimeRange backend . TimeRange
FillMissing * data . FillMissing // property not set until after Interpolate()
Interval time . Duration
columnNames [ ] string
columnTypes [ ] * sql . ColumnType
timeIndex int
2021-09-08 07:54:48 +08:00
timeEndIndex int
2021-05-05 22:46:07 +08:00
metricIndex int
metricPrefix bool
2021-09-07 15:35:37 +08:00
queryContext context . Context
2021-05-05 22:46:07 +08:00
}
2021-09-08 07:54:48 +08:00
func convertSQLTimeColumnsToEpochMS ( frame * data . Frame , qm * dataQueryModel ) error {
if qm . timeIndex != - 1 {
if err := convertSQLTimeColumnToEpochMS ( frame , qm . timeIndex ) ; err != nil {
2022-06-03 15:24:24 +08:00
return fmt . Errorf ( "%v: %w" , "failed to convert time column" , err )
2021-09-08 07:54:48 +08:00
}
}
if qm . timeEndIndex != - 1 {
if err := convertSQLTimeColumnToEpochMS ( frame , qm . timeEndIndex ) ; err != nil {
2022-06-03 15:24:24 +08:00
return fmt . Errorf ( "%v: %w" , "failed to convert timeend column" , err )
2021-09-08 07:54:48 +08:00
}
}
return nil
}
2021-05-05 22:46:07 +08:00
// convertSQLTimeColumnToEpochMS converts column named time to unix timestamp in milliseconds
2018-03-21 02:40:10 +08:00
// to make native datetime types and epoch dates work in annotation and table queries.
2021-05-05 22:46:07 +08:00
func convertSQLTimeColumnToEpochMS ( frame * data . Frame , timeIndex int ) error {
if timeIndex < 0 || timeIndex >= len ( frame . Fields ) {
return fmt . Errorf ( "timeIndex %d is out of range" , timeIndex )
}
origin := frame . Fields [ timeIndex ]
valueType := origin . Type ( )
if valueType == data . FieldTypeTime || valueType == data . FieldTypeNullableTime {
return nil
}
newField := data . NewFieldFromFieldType ( data . FieldTypeNullableTime , 0 )
newField . Name = origin . Name
newField . Labels = origin . Labels
2024-02-07 19:42:22 +08:00
valueLength := origin . Len ( )
for i := 0 ; i < valueLength ; i ++ {
v , err := origin . NullableFloatAt ( i )
if err != nil {
return fmt . Errorf ( "unable to convert data to a time field" )
}
if v == nil {
newField . Append ( nil )
} else {
timestamp := time . Unix ( 0 , int64 ( epochPrecisionToMS ( * v ) ) * int64 ( time . Millisecond ) )
newField . Append ( & timestamp )
}
2018-03-21 02:40:10 +08:00
}
2021-05-05 22:46:07 +08:00
frame . Fields [ timeIndex ] = newField
return nil
2018-03-21 02:40:10 +08:00
}
2018-04-25 01:50:14 +08:00
2021-05-05 22:46:07 +08:00
// convertSQLValueColumnToFloat converts timeseries value column to float.
func convertSQLValueColumnToFloat ( frame * data . Frame , Index int ) ( * data . Frame , error ) {
if Index < 0 || Index >= len ( frame . Fields ) {
return frame , fmt . Errorf ( "metricIndex %d is out of range" , Index )
}
origin := frame . Fields [ Index ]
valueType := origin . Type ( )
if valueType == data . FieldTypeFloat64 || valueType == data . FieldTypeNullableFloat64 {
return frame , nil
}
2024-01-17 20:42:32 +08:00
newField := data . NewFieldFromFieldType ( data . FieldTypeNullableFloat64 , origin . Len ( ) )
2021-05-05 22:46:07 +08:00
newField . Name = origin . Name
newField . Labels = origin . Labels
2024-01-17 20:42:32 +08:00
for i := 0 ; i < origin . Len ( ) ; i ++ {
v , err := origin . NullableFloatAt ( i )
if err != nil {
return frame , err
}
newField . Set ( i , v )
2018-04-25 01:50:14 +08:00
}
2024-01-17 20:42:32 +08:00
2021-05-05 22:46:07 +08:00
frame . Fields [ Index ] = newField
2018-04-25 01:50:14 +08:00
2021-05-05 22:46:07 +08:00
return frame , nil
2018-04-25 01:50:14 +08:00
}
2018-08-12 16:51:58 +08:00
2021-09-07 15:35:37 +08:00
func SetupFillmode ( query * backend . DataQuery , interval time . Duration , fillmode string ) error {
2023-08-30 23:46:47 +08:00
rawQueryProp := make ( map [ string ] any )
2021-09-07 15:35:37 +08:00
queryBytes , err := query . JSON . MarshalJSON ( )
if err != nil {
return err
}
err = json . Unmarshal ( queryBytes , & rawQueryProp )
if err != nil {
return err
}
rawQueryProp [ "fill" ] = true
rawQueryProp [ "fillInterval" ] = interval . Seconds ( )
2018-08-12 16:51:58 +08:00
switch fillmode {
case "NULL" :
2021-09-07 15:35:37 +08:00
rawQueryProp [ "fillMode" ] = "null"
2018-08-12 16:51:58 +08:00
case "previous" :
2021-09-07 15:35:37 +08:00
rawQueryProp [ "fillMode" ] = "previous"
2018-08-12 16:51:58 +08:00
default :
2021-09-07 15:35:37 +08:00
rawQueryProp [ "fillMode" ] = "value"
2018-08-12 16:51:58 +08:00
floatVal , err := strconv . ParseFloat ( fillmode , 64 )
if err != nil {
return fmt . Errorf ( "error parsing fill value %v" , fillmode )
}
2021-09-07 15:35:37 +08:00
rawQueryProp [ "fillValue" ] = floatVal
}
query . JSON , err = json . Marshal ( rawQueryProp )
if err != nil {
return err
2018-08-12 16:51:58 +08:00
}
return nil
}
2018-09-13 22:51:00 +08:00
2021-03-08 14:02:49 +08:00
type SQLMacroEngineBase struct { }
2018-09-13 22:51:00 +08:00
2021-03-08 14:02:49 +08:00
func NewSQLMacroEngineBase ( ) * SQLMacroEngineBase {
return & SQLMacroEngineBase { }
2018-09-13 22:51:00 +08:00
}
2021-03-08 14:02:49 +08:00
func ( m * SQLMacroEngineBase ) ReplaceAllStringSubmatchFunc ( re * regexp . Regexp , str string , repl func ( [ ] string ) string ) string {
2018-09-13 22:51:00 +08:00
result := ""
lastIndex := 0
2023-05-26 18:08:50 +08:00
for _ , v := range re . FindAllStringSubmatchIndex ( str , - 1 ) {
2018-09-13 22:51:00 +08:00
groups := [ ] string { }
for i := 0 ; i < len ( v ) ; i += 2 {
groups = append ( groups , str [ v [ i ] : v [ i + 1 ] ] )
}
result += str [ lastIndex : v [ 0 ] ] + repl ( groups )
lastIndex = v [ 1 ]
}
return result + str [ lastIndex : ]
}
2021-03-08 14:02:49 +08:00
// epochPrecisionToMS converts epoch precision to millisecond, if needed.
// Only seconds to milliseconds supported right now
func epochPrecisionToMS ( value float64 ) float64 {
s := strconv . FormatFloat ( value , 'e' , - 1 , 64 )
if strings . HasSuffix ( s , "e+09" ) {
return value * float64 ( 1e3 )
}
if strings . HasSuffix ( s , "e+18" ) {
return value / float64 ( time . Millisecond )
}
return value
}