| 
									
										
										
										
											2017-10-10 21:19:14 +08:00
										 |  |  | package tsdb | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import ( | 
					
						
							|  |  |  | 	"context" | 
					
						
							| 
									
										
										
										
											2018-04-25 01:50:14 +08:00
										 |  |  | 	"fmt" | 
					
						
							| 
									
										
										
										
											2017-10-10 21:19:14 +08:00
										 |  |  | 	"sync" | 
					
						
							| 
									
										
										
										
											2018-03-21 02:40:10 +08:00
										 |  |  | 	"time" | 
					
						
							| 
									
										
										
										
											2017-10-10 21:19:14 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-04-25 01:50:14 +08:00
										 |  |  | 	"github.com/grafana/grafana/pkg/components/null" | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-10-10 21:19:14 +08:00
										 |  |  | 	"github.com/go-xorm/core" | 
					
						
							|  |  |  | 	"github.com/go-xorm/xorm" | 
					
						
							|  |  |  | 	"github.com/grafana/grafana/pkg/components/simplejson" | 
					
						
							|  |  |  | 	"github.com/grafana/grafana/pkg/models" | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // SqlEngine is a wrapper class around xorm for relational database data sources.
 | 
					
						
							|  |  |  | type SqlEngine interface { | 
					
						
							|  |  |  | 	InitEngine(driverName string, dsInfo *models.DataSource, cnnstr string) error | 
					
						
							|  |  |  | 	Query( | 
					
						
							|  |  |  | 		ctx context.Context, | 
					
						
							|  |  |  | 		ds *models.DataSource, | 
					
						
							|  |  |  | 		query *TsdbQuery, | 
					
						
							| 
									
										
										
										
											2017-12-10 03:35:00 +08:00
										 |  |  | 		transformToTimeSeries func(query *Query, rows *core.Rows, result *QueryResult, tsdbQuery *TsdbQuery) error, | 
					
						
							|  |  |  | 		transformToTable func(query *Query, rows *core.Rows, result *QueryResult, tsdbQuery *TsdbQuery) error, | 
					
						
							| 
									
										
										
										
											2017-10-10 21:19:14 +08:00
										 |  |  | 	) (*Response, error) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-12-09 06:04:17 +08:00
										 |  |  | // SqlMacroEngine interpolates macros into sql. It takes in the Query to have access to query context and
 | 
					
						
							|  |  |  | // timeRange to be able to generate queries that use from and to.
 | 
					
						
							| 
									
										
										
										
											2017-10-10 21:19:14 +08:00
										 |  |  | type SqlMacroEngine interface { | 
					
						
							| 
									
										
										
										
											2017-12-09 06:04:17 +08:00
										 |  |  | 	Interpolate(query *Query, timeRange *TimeRange, sql string) (string, error) | 
					
						
							| 
									
										
										
										
											2017-10-10 21:19:14 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | type DefaultSqlEngine struct { | 
					
						
							|  |  |  | 	MacroEngine SqlMacroEngine | 
					
						
							|  |  |  | 	XormEngine  *xorm.Engine | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | type engineCacheType struct { | 
					
						
							|  |  |  | 	cache    map[int64]*xorm.Engine | 
					
						
							|  |  |  | 	versions map[int64]int | 
					
						
							|  |  |  | 	sync.Mutex | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | var engineCache = engineCacheType{ | 
					
						
							|  |  |  | 	cache:    make(map[int64]*xorm.Engine), | 
					
						
							|  |  |  | 	versions: make(map[int64]int), | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // InitEngine creates the db connection and inits the xorm engine or loads it from the engine cache
 | 
					
						
							|  |  |  | func (e *DefaultSqlEngine) InitEngine(driverName string, dsInfo *models.DataSource, cnnstr string) error { | 
					
						
							|  |  |  | 	engineCache.Lock() | 
					
						
							|  |  |  | 	defer engineCache.Unlock() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	if engine, present := engineCache.cache[dsInfo.Id]; present { | 
					
						
							| 
									
										
										
										
											2018-04-17 02:42:17 +08:00
										 |  |  | 		if version := engineCache.versions[dsInfo.Id]; version == dsInfo.Version { | 
					
						
							| 
									
										
										
										
											2017-10-10 21:19:14 +08:00
										 |  |  | 			e.XormEngine = engine | 
					
						
							|  |  |  | 			return nil | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	engine, err := xorm.NewEngine(driverName, cnnstr) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return err | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-11-15 18:07:35 +08:00
										 |  |  | 	engine.SetMaxOpenConns(10) | 
					
						
							|  |  |  | 	engine.SetMaxIdleConns(10) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-10-10 21:19:14 +08:00
										 |  |  | 	engineCache.cache[dsInfo.Id] = engine | 
					
						
							|  |  |  | 	e.XormEngine = engine | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	return nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Query is a default implementation of the Query method for an SQL data source.
 | 
					
						
							|  |  |  | // The caller of this function must implement transformToTimeSeries and transformToTable and
 | 
					
						
							|  |  |  | // pass them in as parameters.
 | 
					
						
							|  |  |  | func (e *DefaultSqlEngine) Query( | 
					
						
							|  |  |  | 	ctx context.Context, | 
					
						
							|  |  |  | 	dsInfo *models.DataSource, | 
					
						
							|  |  |  | 	tsdbQuery *TsdbQuery, | 
					
						
							| 
									
										
										
										
											2017-12-10 03:35:00 +08:00
										 |  |  | 	transformToTimeSeries func(query *Query, rows *core.Rows, result *QueryResult, tsdbQuery *TsdbQuery) error, | 
					
						
							|  |  |  | 	transformToTable func(query *Query, rows *core.Rows, result *QueryResult, tsdbQuery *TsdbQuery) error, | 
					
						
							| 
									
										
										
										
											2017-10-10 21:19:14 +08:00
										 |  |  | ) (*Response, error) { | 
					
						
							|  |  |  | 	result := &Response{ | 
					
						
							|  |  |  | 		Results: make(map[string]*QueryResult), | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	session := e.XormEngine.NewSession() | 
					
						
							|  |  |  | 	defer session.Close() | 
					
						
							|  |  |  | 	db := session.DB() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	for _, query := range tsdbQuery.Queries { | 
					
						
							|  |  |  | 		rawSql := query.Model.Get("rawSql").MustString() | 
					
						
							|  |  |  | 		if rawSql == "" { | 
					
						
							|  |  |  | 			continue | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		queryResult := &QueryResult{Meta: simplejson.New(), RefId: query.RefId} | 
					
						
							|  |  |  | 		result.Results[query.RefId] = queryResult | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-12-09 06:04:17 +08:00
										 |  |  | 		rawSql, err := e.MacroEngine.Interpolate(query, tsdbQuery.TimeRange, rawSql) | 
					
						
							| 
									
										
										
										
											2017-10-10 21:19:14 +08:00
										 |  |  | 		if err != nil { | 
					
						
							|  |  |  | 			queryResult.Error = err | 
					
						
							|  |  |  | 			continue | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		queryResult.Meta.Set("sql", rawSql) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		rows, err := db.Query(rawSql) | 
					
						
							|  |  |  | 		if err != nil { | 
					
						
							|  |  |  | 			queryResult.Error = err | 
					
						
							|  |  |  | 			continue | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		defer rows.Close() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		format := query.Model.Get("format").MustString("time_series") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		switch format { | 
					
						
							|  |  |  | 		case "time_series": | 
					
						
							| 
									
										
										
										
											2017-12-10 03:35:00 +08:00
										 |  |  | 			err := transformToTimeSeries(query, rows, queryResult, tsdbQuery) | 
					
						
							| 
									
										
										
										
											2017-10-10 21:19:14 +08:00
										 |  |  | 			if err != nil { | 
					
						
							|  |  |  | 				queryResult.Error = err | 
					
						
							|  |  |  | 				continue | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 		case "table": | 
					
						
							| 
									
										
										
										
											2017-12-10 03:35:00 +08:00
										 |  |  | 			err := transformToTable(query, rows, queryResult, tsdbQuery) | 
					
						
							| 
									
										
										
										
											2017-10-10 21:19:14 +08:00
										 |  |  | 			if err != nil { | 
					
						
							|  |  |  | 				queryResult.Error = err | 
					
						
							|  |  |  | 				continue | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	return result, nil | 
					
						
							|  |  |  | } | 
					
						
							| 
									
										
										
										
											2018-03-21 02:40:10 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-04-10 16:32:30 +08:00
										 |  |  | // ConvertSqlTimeColumnToEpochMs converts column named time to unix timestamp in milliseconds
 | 
					
						
							| 
									
										
										
										
											2018-03-21 02:40:10 +08:00
										 |  |  | // to make native datetime types and epoch dates work in annotation and table queries.
 | 
					
						
							|  |  |  | func ConvertSqlTimeColumnToEpochMs(values RowValues, timeIndex int) { | 
					
						
							|  |  |  | 	if timeIndex >= 0 { | 
					
						
							|  |  |  | 		switch value := values[timeIndex].(type) { | 
					
						
							|  |  |  | 		case time.Time: | 
					
						
							| 
									
										
										
										
											2018-04-10 16:32:30 +08:00
										 |  |  | 			values[timeIndex] = EpochPrecisionToMs(float64(value.UnixNano())) | 
					
						
							| 
									
										
										
										
											2018-03-21 02:40:10 +08:00
										 |  |  | 		case *time.Time: | 
					
						
							|  |  |  | 			if value != nil { | 
					
						
							| 
									
										
										
										
											2018-04-10 16:32:30 +08:00
										 |  |  | 				values[timeIndex] = EpochPrecisionToMs(float64((*value).UnixNano())) | 
					
						
							| 
									
										
										
										
											2018-03-21 02:40:10 +08:00
										 |  |  | 			} | 
					
						
							|  |  |  | 		case int64: | 
					
						
							|  |  |  | 			values[timeIndex] = int64(EpochPrecisionToMs(float64(value))) | 
					
						
							|  |  |  | 		case *int64: | 
					
						
							|  |  |  | 			if value != nil { | 
					
						
							|  |  |  | 				values[timeIndex] = int64(EpochPrecisionToMs(float64(*value))) | 
					
						
							|  |  |  | 			} | 
					
						
							| 
									
										
										
										
											2018-04-10 16:32:30 +08:00
										 |  |  | 		case uint64: | 
					
						
							|  |  |  | 			values[timeIndex] = int64(EpochPrecisionToMs(float64(value))) | 
					
						
							|  |  |  | 		case *uint64: | 
					
						
							|  |  |  | 			if value != nil { | 
					
						
							|  |  |  | 				values[timeIndex] = int64(EpochPrecisionToMs(float64(*value))) | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 		case int32: | 
					
						
							|  |  |  | 			values[timeIndex] = int64(EpochPrecisionToMs(float64(value))) | 
					
						
							|  |  |  | 		case *int32: | 
					
						
							|  |  |  | 			if value != nil { | 
					
						
							|  |  |  | 				values[timeIndex] = int64(EpochPrecisionToMs(float64(*value))) | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 		case uint32: | 
					
						
							|  |  |  | 			values[timeIndex] = int64(EpochPrecisionToMs(float64(value))) | 
					
						
							|  |  |  | 		case *uint32: | 
					
						
							|  |  |  | 			if value != nil { | 
					
						
							|  |  |  | 				values[timeIndex] = int64(EpochPrecisionToMs(float64(*value))) | 
					
						
							|  |  |  | 			} | 
					
						
							| 
									
										
										
										
											2018-03-21 02:40:10 +08:00
										 |  |  | 		case float64: | 
					
						
							|  |  |  | 			values[timeIndex] = EpochPrecisionToMs(value) | 
					
						
							|  |  |  | 		case *float64: | 
					
						
							|  |  |  | 			if value != nil { | 
					
						
							|  |  |  | 				values[timeIndex] = EpochPrecisionToMs(*value) | 
					
						
							|  |  |  | 			} | 
					
						
							| 
									
										
										
										
											2018-04-10 16:32:30 +08:00
										 |  |  | 		case float32: | 
					
						
							|  |  |  | 			values[timeIndex] = EpochPrecisionToMs(float64(value)) | 
					
						
							|  |  |  | 		case *float32: | 
					
						
							|  |  |  | 			if value != nil { | 
					
						
							|  |  |  | 				values[timeIndex] = EpochPrecisionToMs(float64(*value)) | 
					
						
							|  |  |  | 			} | 
					
						
							| 
									
										
										
										
											2018-03-21 02:40:10 +08:00
										 |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							| 
									
										
										
										
											2018-04-25 01:50:14 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | // ConvertSqlValueColumnToFloat converts timeseries value column to float.
 | 
					
						
							|  |  |  | func ConvertSqlValueColumnToFloat(columnName string, columnValue interface{}) (null.Float, error) { | 
					
						
							|  |  |  | 	var value null.Float | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	switch typedValue := columnValue.(type) { | 
					
						
							|  |  |  | 	case int: | 
					
						
							|  |  |  | 		value = null.FloatFrom(float64(typedValue)) | 
					
						
							|  |  |  | 	case *int: | 
					
						
							|  |  |  | 		if typedValue == nil { | 
					
						
							|  |  |  | 			value.Valid = false | 
					
						
							|  |  |  | 		} else { | 
					
						
							|  |  |  | 			value = null.FloatFrom(float64(*typedValue)) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	case int64: | 
					
						
							|  |  |  | 		value = null.FloatFrom(float64(typedValue)) | 
					
						
							|  |  |  | 	case *int64: | 
					
						
							|  |  |  | 		if typedValue == nil { | 
					
						
							|  |  |  | 			value.Valid = false | 
					
						
							|  |  |  | 		} else { | 
					
						
							|  |  |  | 			value = null.FloatFrom(float64(*typedValue)) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	case int32: | 
					
						
							|  |  |  | 		value = null.FloatFrom(float64(typedValue)) | 
					
						
							|  |  |  | 	case *int32: | 
					
						
							|  |  |  | 		if typedValue == nil { | 
					
						
							|  |  |  | 			value.Valid = false | 
					
						
							|  |  |  | 		} else { | 
					
						
							|  |  |  | 			value = null.FloatFrom(float64(*typedValue)) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	case int16: | 
					
						
							|  |  |  | 		value = null.FloatFrom(float64(typedValue)) | 
					
						
							|  |  |  | 	case *int16: | 
					
						
							|  |  |  | 		if typedValue == nil { | 
					
						
							|  |  |  | 			value.Valid = false | 
					
						
							|  |  |  | 		} else { | 
					
						
							|  |  |  | 			value = null.FloatFrom(float64(*typedValue)) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	case int8: | 
					
						
							|  |  |  | 		value = null.FloatFrom(float64(typedValue)) | 
					
						
							|  |  |  | 	case *int8: | 
					
						
							|  |  |  | 		if typedValue == nil { | 
					
						
							|  |  |  | 			value.Valid = false | 
					
						
							|  |  |  | 		} else { | 
					
						
							|  |  |  | 			value = null.FloatFrom(float64(*typedValue)) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	case uint: | 
					
						
							|  |  |  | 		value = null.FloatFrom(float64(typedValue)) | 
					
						
							|  |  |  | 	case *uint: | 
					
						
							|  |  |  | 		if typedValue == nil { | 
					
						
							|  |  |  | 			value.Valid = false | 
					
						
							|  |  |  | 		} else { | 
					
						
							|  |  |  | 			value = null.FloatFrom(float64(*typedValue)) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	case uint64: | 
					
						
							|  |  |  | 		value = null.FloatFrom(float64(typedValue)) | 
					
						
							|  |  |  | 	case *uint64: | 
					
						
							|  |  |  | 		if typedValue == nil { | 
					
						
							|  |  |  | 			value.Valid = false | 
					
						
							|  |  |  | 		} else { | 
					
						
							|  |  |  | 			value = null.FloatFrom(float64(*typedValue)) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	case uint32: | 
					
						
							|  |  |  | 		value = null.FloatFrom(float64(typedValue)) | 
					
						
							|  |  |  | 	case *uint32: | 
					
						
							|  |  |  | 		if typedValue == nil { | 
					
						
							|  |  |  | 			value.Valid = false | 
					
						
							|  |  |  | 		} else { | 
					
						
							|  |  |  | 			value = null.FloatFrom(float64(*typedValue)) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	case uint16: | 
					
						
							|  |  |  | 		value = null.FloatFrom(float64(typedValue)) | 
					
						
							|  |  |  | 	case *uint16: | 
					
						
							|  |  |  | 		if typedValue == nil { | 
					
						
							|  |  |  | 			value.Valid = false | 
					
						
							|  |  |  | 		} else { | 
					
						
							|  |  |  | 			value = null.FloatFrom(float64(*typedValue)) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	case uint8: | 
					
						
							|  |  |  | 		value = null.FloatFrom(float64(typedValue)) | 
					
						
							|  |  |  | 	case *uint8: | 
					
						
							|  |  |  | 		if typedValue == nil { | 
					
						
							|  |  |  | 			value.Valid = false | 
					
						
							|  |  |  | 		} else { | 
					
						
							|  |  |  | 			value = null.FloatFrom(float64(*typedValue)) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	case float64: | 
					
						
							|  |  |  | 		value = null.FloatFrom(typedValue) | 
					
						
							|  |  |  | 	case *float64: | 
					
						
							|  |  |  | 		value = null.FloatFromPtr(typedValue) | 
					
						
							|  |  |  | 	case float32: | 
					
						
							|  |  |  | 		value = null.FloatFrom(float64(typedValue)) | 
					
						
							|  |  |  | 	case *float32: | 
					
						
							|  |  |  | 		if typedValue == nil { | 
					
						
							|  |  |  | 			value.Valid = false | 
					
						
							|  |  |  | 		} else { | 
					
						
							|  |  |  | 			value = null.FloatFrom(float64(*typedValue)) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	case nil: | 
					
						
							|  |  |  | 		value.Valid = false | 
					
						
							|  |  |  | 	default: | 
					
						
							|  |  |  | 		return null.NewFloat(0, false), fmt.Errorf("Value column must have numeric datatype, column: %s type: %T value: %v", columnName, typedValue, typedValue) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	return value, nil | 
					
						
							|  |  |  | } |