mirror of https://github.com/grafana/grafana.git
				
				
				
			
		
			
				
	
	
		
			297 lines
		
	
	
		
			7.7 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			297 lines
		
	
	
		
			7.7 KiB
		
	
	
	
		
			Go
		
	
	
	
| package tsdb
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"fmt"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/grafana/grafana/pkg/components/null"
 | |
| 
 | |
| 	"github.com/go-xorm/core"
 | |
| 	"github.com/go-xorm/xorm"
 | |
| 	"github.com/grafana/grafana/pkg/components/simplejson"
 | |
| 	"github.com/grafana/grafana/pkg/models"
 | |
| )
 | |
| 
 | |
| // SqlEngine is a wrapper class around xorm for relational database data sources.
 | |
| type SqlEngine interface {
 | |
| 	InitEngine(driverName string, dsInfo *models.DataSource, cnnstr string) error
 | |
| 	Query(
 | |
| 		ctx context.Context,
 | |
| 		ds *models.DataSource,
 | |
| 		query *TsdbQuery,
 | |
| 		transformToTimeSeries func(query *Query, rows *core.Rows, result *QueryResult, tsdbQuery *TsdbQuery) error,
 | |
| 		transformToTable func(query *Query, rows *core.Rows, result *QueryResult, tsdbQuery *TsdbQuery) error,
 | |
| 	) (*Response, error)
 | |
| }
 | |
| 
 | |
| // SqlMacroEngine interpolates macros into sql. It takes in the Query to have access to query context and
 | |
| // timeRange to be able to generate queries that use from and to.
 | |
| type SqlMacroEngine interface {
 | |
| 	Interpolate(query *Query, timeRange *TimeRange, sql string) (string, error)
 | |
| }
 | |
| 
 | |
| type DefaultSqlEngine struct {
 | |
| 	MacroEngine SqlMacroEngine
 | |
| 	XormEngine  *xorm.Engine
 | |
| }
 | |
| 
 | |
| type engineCacheType struct {
 | |
| 	cache    map[int64]*xorm.Engine
 | |
| 	versions map[int64]int
 | |
| 	sync.Mutex
 | |
| }
 | |
| 
 | |
| var engineCache = engineCacheType{
 | |
| 	cache:    make(map[int64]*xorm.Engine),
 | |
| 	versions: make(map[int64]int),
 | |
| }
 | |
| 
 | |
| // InitEngine creates the db connection and inits the xorm engine or loads it from the engine cache
 | |
| func (e *DefaultSqlEngine) InitEngine(driverName string, dsInfo *models.DataSource, cnnstr string) error {
 | |
| 	engineCache.Lock()
 | |
| 	defer engineCache.Unlock()
 | |
| 
 | |
| 	if engine, present := engineCache.cache[dsInfo.Id]; present {
 | |
| 		if version := engineCache.versions[dsInfo.Id]; version == dsInfo.Version {
 | |
| 			e.XormEngine = engine
 | |
| 			return nil
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	engine, err := xorm.NewEngine(driverName, cnnstr)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	engine.SetMaxOpenConns(10)
 | |
| 	engine.SetMaxIdleConns(10)
 | |
| 
 | |
| 	engineCache.cache[dsInfo.Id] = engine
 | |
| 	e.XormEngine = engine
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Query is a default implementation of the Query method for an SQL data source.
 | |
| // The caller of this function must implement transformToTimeSeries and transformToTable and
 | |
| // pass them in as parameters.
 | |
| func (e *DefaultSqlEngine) Query(
 | |
| 	ctx context.Context,
 | |
| 	dsInfo *models.DataSource,
 | |
| 	tsdbQuery *TsdbQuery,
 | |
| 	transformToTimeSeries func(query *Query, rows *core.Rows, result *QueryResult, tsdbQuery *TsdbQuery) error,
 | |
| 	transformToTable func(query *Query, rows *core.Rows, result *QueryResult, tsdbQuery *TsdbQuery) error,
 | |
| ) (*Response, error) {
 | |
| 	result := &Response{
 | |
| 		Results: make(map[string]*QueryResult),
 | |
| 	}
 | |
| 
 | |
| 	session := e.XormEngine.NewSession()
 | |
| 	defer session.Close()
 | |
| 	db := session.DB()
 | |
| 
 | |
| 	for _, query := range tsdbQuery.Queries {
 | |
| 		rawSql := query.Model.Get("rawSql").MustString()
 | |
| 		if rawSql == "" {
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		queryResult := &QueryResult{Meta: simplejson.New(), RefId: query.RefId}
 | |
| 		result.Results[query.RefId] = queryResult
 | |
| 
 | |
| 		rawSql, err := e.MacroEngine.Interpolate(query, tsdbQuery.TimeRange, rawSql)
 | |
| 		if err != nil {
 | |
| 			queryResult.Error = err
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		queryResult.Meta.Set("sql", rawSql)
 | |
| 
 | |
| 		rows, err := db.Query(rawSql)
 | |
| 		if err != nil {
 | |
| 			queryResult.Error = err
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		defer rows.Close()
 | |
| 
 | |
| 		format := query.Model.Get("format").MustString("time_series")
 | |
| 
 | |
| 		switch format {
 | |
| 		case "time_series":
 | |
| 			err := transformToTimeSeries(query, rows, queryResult, tsdbQuery)
 | |
| 			if err != nil {
 | |
| 				queryResult.Error = err
 | |
| 				continue
 | |
| 			}
 | |
| 		case "table":
 | |
| 			err := transformToTable(query, rows, queryResult, tsdbQuery)
 | |
| 			if err != nil {
 | |
| 				queryResult.Error = err
 | |
| 				continue
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return result, nil
 | |
| }
 | |
| 
 | |
| // ConvertSqlTimeColumnToEpochMs converts column named time to unix timestamp in milliseconds
 | |
| // to make native datetime types and epoch dates work in annotation and table queries.
 | |
| func ConvertSqlTimeColumnToEpochMs(values RowValues, timeIndex int) {
 | |
| 	if timeIndex >= 0 {
 | |
| 		switch value := values[timeIndex].(type) {
 | |
| 		case time.Time:
 | |
| 			values[timeIndex] = float64(value.UnixNano()) / float64(time.Millisecond)
 | |
| 		case *time.Time:
 | |
| 			if value != nil {
 | |
| 				values[timeIndex] = float64((*value).UnixNano()) / float64(time.Millisecond)
 | |
| 			}
 | |
| 		case int64:
 | |
| 			values[timeIndex] = int64(EpochPrecisionToMs(float64(value)))
 | |
| 		case *int64:
 | |
| 			if value != nil {
 | |
| 				values[timeIndex] = int64(EpochPrecisionToMs(float64(*value)))
 | |
| 			}
 | |
| 		case uint64:
 | |
| 			values[timeIndex] = int64(EpochPrecisionToMs(float64(value)))
 | |
| 		case *uint64:
 | |
| 			if value != nil {
 | |
| 				values[timeIndex] = int64(EpochPrecisionToMs(float64(*value)))
 | |
| 			}
 | |
| 		case int32:
 | |
| 			values[timeIndex] = int64(EpochPrecisionToMs(float64(value)))
 | |
| 		case *int32:
 | |
| 			if value != nil {
 | |
| 				values[timeIndex] = int64(EpochPrecisionToMs(float64(*value)))
 | |
| 			}
 | |
| 		case uint32:
 | |
| 			values[timeIndex] = int64(EpochPrecisionToMs(float64(value)))
 | |
| 		case *uint32:
 | |
| 			if value != nil {
 | |
| 				values[timeIndex] = int64(EpochPrecisionToMs(float64(*value)))
 | |
| 			}
 | |
| 		case float64:
 | |
| 			values[timeIndex] = EpochPrecisionToMs(value)
 | |
| 		case *float64:
 | |
| 			if value != nil {
 | |
| 				values[timeIndex] = EpochPrecisionToMs(*value)
 | |
| 			}
 | |
| 		case float32:
 | |
| 			values[timeIndex] = EpochPrecisionToMs(float64(value))
 | |
| 		case *float32:
 | |
| 			if value != nil {
 | |
| 				values[timeIndex] = EpochPrecisionToMs(float64(*value))
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // ConvertSqlValueColumnToFloat converts timeseries value column to float.
 | |
| func ConvertSqlValueColumnToFloat(columnName string, columnValue interface{}) (null.Float, error) {
 | |
| 	var value null.Float
 | |
| 
 | |
| 	switch typedValue := columnValue.(type) {
 | |
| 	case int:
 | |
| 		value = null.FloatFrom(float64(typedValue))
 | |
| 	case *int:
 | |
| 		if typedValue == nil {
 | |
| 			value.Valid = false
 | |
| 		} else {
 | |
| 			value = null.FloatFrom(float64(*typedValue))
 | |
| 		}
 | |
| 	case int64:
 | |
| 		value = null.FloatFrom(float64(typedValue))
 | |
| 	case *int64:
 | |
| 		if typedValue == nil {
 | |
| 			value.Valid = false
 | |
| 		} else {
 | |
| 			value = null.FloatFrom(float64(*typedValue))
 | |
| 		}
 | |
| 	case int32:
 | |
| 		value = null.FloatFrom(float64(typedValue))
 | |
| 	case *int32:
 | |
| 		if typedValue == nil {
 | |
| 			value.Valid = false
 | |
| 		} else {
 | |
| 			value = null.FloatFrom(float64(*typedValue))
 | |
| 		}
 | |
| 	case int16:
 | |
| 		value = null.FloatFrom(float64(typedValue))
 | |
| 	case *int16:
 | |
| 		if typedValue == nil {
 | |
| 			value.Valid = false
 | |
| 		} else {
 | |
| 			value = null.FloatFrom(float64(*typedValue))
 | |
| 		}
 | |
| 	case int8:
 | |
| 		value = null.FloatFrom(float64(typedValue))
 | |
| 	case *int8:
 | |
| 		if typedValue == nil {
 | |
| 			value.Valid = false
 | |
| 		} else {
 | |
| 			value = null.FloatFrom(float64(*typedValue))
 | |
| 		}
 | |
| 	case uint:
 | |
| 		value = null.FloatFrom(float64(typedValue))
 | |
| 	case *uint:
 | |
| 		if typedValue == nil {
 | |
| 			value.Valid = false
 | |
| 		} else {
 | |
| 			value = null.FloatFrom(float64(*typedValue))
 | |
| 		}
 | |
| 	case uint64:
 | |
| 		value = null.FloatFrom(float64(typedValue))
 | |
| 	case *uint64:
 | |
| 		if typedValue == nil {
 | |
| 			value.Valid = false
 | |
| 		} else {
 | |
| 			value = null.FloatFrom(float64(*typedValue))
 | |
| 		}
 | |
| 	case uint32:
 | |
| 		value = null.FloatFrom(float64(typedValue))
 | |
| 	case *uint32:
 | |
| 		if typedValue == nil {
 | |
| 			value.Valid = false
 | |
| 		} else {
 | |
| 			value = null.FloatFrom(float64(*typedValue))
 | |
| 		}
 | |
| 	case uint16:
 | |
| 		value = null.FloatFrom(float64(typedValue))
 | |
| 	case *uint16:
 | |
| 		if typedValue == nil {
 | |
| 			value.Valid = false
 | |
| 		} else {
 | |
| 			value = null.FloatFrom(float64(*typedValue))
 | |
| 		}
 | |
| 	case uint8:
 | |
| 		value = null.FloatFrom(float64(typedValue))
 | |
| 	case *uint8:
 | |
| 		if typedValue == nil {
 | |
| 			value.Valid = false
 | |
| 		} else {
 | |
| 			value = null.FloatFrom(float64(*typedValue))
 | |
| 		}
 | |
| 	case float64:
 | |
| 		value = null.FloatFrom(typedValue)
 | |
| 	case *float64:
 | |
| 		value = null.FloatFromPtr(typedValue)
 | |
| 	case float32:
 | |
| 		value = null.FloatFrom(float64(typedValue))
 | |
| 	case *float32:
 | |
| 		if typedValue == nil {
 | |
| 			value.Valid = false
 | |
| 		} else {
 | |
| 			value = null.FloatFrom(float64(*typedValue))
 | |
| 		}
 | |
| 	case nil:
 | |
| 		value.Valid = false
 | |
| 	default:
 | |
| 		return null.NewFloat(0, false), fmt.Errorf("Value column must have numeric datatype, column: %s type: %T value: %v", columnName, typedValue, typedValue)
 | |
| 	}
 | |
| 
 | |
| 	return value, nil
 | |
| }
 |