mirror of https://github.com/grafana/grafana.git
PostgreSQL: Decouple plugin (#111620)
This commit is contained in:
parent
4b2bb46930
commit
7055a879ba
|
@ -209,7 +209,7 @@
|
|||
"path": "public/plugins/grafana-azure-monitor-datasource/img/azure_monitor_cpu.png"
|
||||
}
|
||||
],
|
||||
"version": "12.2.0-pre",
|
||||
"version": "12.3.0-pre",
|
||||
"updated": "",
|
||||
"keywords": [
|
||||
"azure",
|
||||
|
@ -880,7 +880,7 @@
|
|||
},
|
||||
"build": {},
|
||||
"screenshots": null,
|
||||
"version": "12.2.0-pre",
|
||||
"version": "12.3.0-pre",
|
||||
"updated": "",
|
||||
"keywords": null
|
||||
},
|
||||
|
@ -934,7 +934,7 @@
|
|||
},
|
||||
"build": {},
|
||||
"screenshots": null,
|
||||
"version": "12.2.0-pre",
|
||||
"version": "12.3.0-pre",
|
||||
"updated": "",
|
||||
"keywords": [
|
||||
"grafana",
|
||||
|
@ -1000,7 +1000,7 @@
|
|||
},
|
||||
"build": {},
|
||||
"screenshots": null,
|
||||
"version": "12.2.0-pre",
|
||||
"version": "12.3.0-pre",
|
||||
"updated": "",
|
||||
"keywords": null
|
||||
},
|
||||
|
@ -1217,7 +1217,7 @@
|
|||
},
|
||||
"build": {},
|
||||
"screenshots": null,
|
||||
"version": "12.2.0-pre",
|
||||
"version": "12.3.0-pre",
|
||||
"updated": "",
|
||||
"keywords": null
|
||||
},
|
||||
|
@ -1325,7 +1325,7 @@
|
|||
},
|
||||
"build": {},
|
||||
"screenshots": null,
|
||||
"version": "12.2.0-pre",
|
||||
"version": "12.3.0-pre",
|
||||
"updated": "",
|
||||
"keywords": null
|
||||
},
|
||||
|
@ -1375,7 +1375,7 @@
|
|||
},
|
||||
"build": {},
|
||||
"screenshots": null,
|
||||
"version": "12.2.0-pre",
|
||||
"version": "12.3.0-pre",
|
||||
"updated": "",
|
||||
"keywords": null
|
||||
},
|
||||
|
@ -1425,7 +1425,7 @@
|
|||
},
|
||||
"build": {},
|
||||
"screenshots": null,
|
||||
"version": "12.2.0-pre",
|
||||
"version": "12.3.0-pre",
|
||||
"updated": "",
|
||||
"keywords": null
|
||||
},
|
||||
|
@ -1629,7 +1629,7 @@
|
|||
},
|
||||
"build": {},
|
||||
"screenshots": null,
|
||||
"version": "12.2.0-pre",
|
||||
"version": "12.3.0-pre",
|
||||
"updated": "",
|
||||
"keywords": [
|
||||
"grafana",
|
||||
|
@ -1734,12 +1734,12 @@
|
|||
},
|
||||
"build": {},
|
||||
"screenshots": null,
|
||||
"version": "12.2.0-pre",
|
||||
"version": "12.3.0-pre",
|
||||
"updated": "",
|
||||
"keywords": null
|
||||
},
|
||||
"dependencies": {
|
||||
"grafanaDependency": "",
|
||||
"grafanaDependency": "\u003e=11.6.0",
|
||||
"grafanaVersion": "*",
|
||||
"plugins": [],
|
||||
"extensions": {
|
||||
|
@ -2042,7 +2042,7 @@
|
|||
},
|
||||
"build": {},
|
||||
"screenshots": null,
|
||||
"version": "12.2.0-pre",
|
||||
"version": "12.3.0-pre",
|
||||
"updated": "",
|
||||
"keywords": null
|
||||
},
|
||||
|
@ -2092,7 +2092,7 @@
|
|||
},
|
||||
"build": {},
|
||||
"screenshots": null,
|
||||
"version": "12.2.0-pre",
|
||||
"version": "12.3.0-pre",
|
||||
"updated": "",
|
||||
"keywords": null
|
||||
},
|
||||
|
@ -2445,7 +2445,7 @@
|
|||
},
|
||||
"build": {},
|
||||
"screenshots": null,
|
||||
"version": "12.2.0-pre",
|
||||
"version": "12.3.0-pre",
|
||||
"updated": "",
|
||||
"keywords": null
|
||||
},
|
||||
|
|
|
@ -0,0 +1,117 @@
|
|||
package pgx
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"strings"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
|
||||
"github.com/grafana/grafana/pkg/tsdb/grafana-postgresql-datasource/sqleng"
|
||||
)
|
||||
|
||||
func (e *DataSourceHandler) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) {
|
||||
err := e.Ping(ctx)
|
||||
if err != nil {
|
||||
logCheckHealthError(ctx, e.dsInfo, err)
|
||||
if strings.EqualFold(req.PluginContext.User.Role, "Admin") {
|
||||
return ErrToHealthCheckResult(err)
|
||||
}
|
||||
errResponse := &backend.CheckHealthResult{
|
||||
Status: backend.HealthStatusError,
|
||||
Message: e.TransformQueryError(e.log, err).Error(),
|
||||
}
|
||||
return errResponse, nil
|
||||
}
|
||||
return &backend.CheckHealthResult{Status: backend.HealthStatusOk, Message: "Database Connection OK"}, nil
|
||||
}
|
||||
|
||||
// ErrToHealthCheckResult converts error into user friendly health check message
|
||||
// This should be called with non nil error. If the err parameter is empty, we will send Internal Server Error
|
||||
func ErrToHealthCheckResult(err error) (*backend.CheckHealthResult, error) {
|
||||
if err == nil {
|
||||
return &backend.CheckHealthResult{Status: backend.HealthStatusError, Message: "Internal Server Error"}, nil
|
||||
}
|
||||
res := &backend.CheckHealthResult{Status: backend.HealthStatusError, Message: err.Error()}
|
||||
details := map[string]string{
|
||||
"verboseMessage": err.Error(),
|
||||
"errorDetailsLink": "https://grafana.com/docs/grafana/latest/datasources/postgres",
|
||||
}
|
||||
var opErr *net.OpError
|
||||
if errors.As(err, &opErr) {
|
||||
res.Message = "Network error: Failed to connect to the server"
|
||||
if opErr != nil && opErr.Err != nil {
|
||||
errMessage := opErr.Err.Error()
|
||||
if strings.HasSuffix(opErr.Err.Error(), "no such host") {
|
||||
errMessage = "no such host"
|
||||
}
|
||||
if strings.HasSuffix(opErr.Err.Error(), "unknown port") {
|
||||
errMessage = "unknown port"
|
||||
}
|
||||
if strings.HasSuffix(opErr.Err.Error(), "invalid port") {
|
||||
errMessage = "invalid port"
|
||||
}
|
||||
if strings.HasSuffix(opErr.Err.Error(), "missing port in address") {
|
||||
errMessage = "missing port in address"
|
||||
}
|
||||
if strings.HasSuffix(opErr.Err.Error(), "invalid syntax") {
|
||||
errMessage = "invalid syntax found in the address"
|
||||
}
|
||||
res.Message += fmt.Sprintf(". Error message: %s", errMessage)
|
||||
}
|
||||
}
|
||||
|
||||
if errors.Is(err, sqleng.ErrParsingPostgresURL) {
|
||||
res.Message = fmt.Sprintf("Connection string error: %s", sqleng.ErrParsingPostgresURL.Error())
|
||||
if unwrappedErr := errors.Unwrap(err); unwrappedErr != nil {
|
||||
details["verboseMessage"] = unwrappedErr.Error()
|
||||
}
|
||||
}
|
||||
detailBytes, marshalErr := json.Marshal(details)
|
||||
if marshalErr != nil {
|
||||
return res, nil
|
||||
}
|
||||
res.JSONDetails = detailBytes
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func logCheckHealthError(ctx context.Context, dsInfo sqleng.DataSourceInfo, err error) {
|
||||
logger := log.DefaultLogger.FromContext(ctx)
|
||||
configSummary := map[string]any{
|
||||
"config_url_length": len(dsInfo.URL),
|
||||
"config_user_length": len(dsInfo.User),
|
||||
"config_database_length": len(dsInfo.Database),
|
||||
"config_json_data_database_length": len(dsInfo.JsonData.Database),
|
||||
"config_max_open_conns": dsInfo.JsonData.MaxOpenConns,
|
||||
"config_max_idle_conns": dsInfo.JsonData.MaxIdleConns,
|
||||
"config_conn_max_life_time": dsInfo.JsonData.ConnMaxLifetime,
|
||||
"config_conn_timeout": dsInfo.JsonData.ConnectionTimeout,
|
||||
"config_timescaledb": dsInfo.JsonData.Timescaledb,
|
||||
"config_ssl_mode": dsInfo.JsonData.Mode,
|
||||
"config_tls_configuration_method": dsInfo.JsonData.ConfigurationMethod,
|
||||
"config_tls_skip_verify": dsInfo.JsonData.TlsSkipVerify,
|
||||
"config_timezone": dsInfo.JsonData.Timezone,
|
||||
"config_time_interval": dsInfo.JsonData.TimeInterval,
|
||||
"config_enable_secure_proxy": dsInfo.JsonData.SecureDSProxy,
|
||||
"config_allow_clear_text_passwords": dsInfo.JsonData.AllowCleartextPasswords,
|
||||
"config_authentication_type": dsInfo.JsonData.AuthenticationType,
|
||||
"config_ssl_root_cert_file_length": len(dsInfo.JsonData.RootCertFile),
|
||||
"config_ssl_cert_file_length": len(dsInfo.JsonData.CertFile),
|
||||
"config_ssl_key_file_length": len(dsInfo.JsonData.CertKeyFile),
|
||||
"config_encrypt_length": len(dsInfo.JsonData.Encrypt),
|
||||
"config_server_name_length": len(dsInfo.JsonData.Servername),
|
||||
"config_password_length": len(dsInfo.DecryptedSecureJSONData["password"]),
|
||||
"config_tls_ca_cert_length": len(dsInfo.DecryptedSecureJSONData["tlsCACert"]),
|
||||
"config_tls_client_cert_length": len(dsInfo.DecryptedSecureJSONData["tlsClientCert"]),
|
||||
"config_tls_client_key_length": len(dsInfo.DecryptedSecureJSONData["tlsClientKey"]),
|
||||
}
|
||||
configSummaryJSON, marshalError := json.Marshal(configSummary)
|
||||
if marshalError != nil {
|
||||
logger.Error("Check health failed", "error", err, "message_type", "ds_config_health_check_error")
|
||||
return
|
||||
}
|
||||
logger.Error("Check health failed", "error", err, "message_type", "ds_config_health_check_error_detailed", "details", string(configSummaryJSON))
|
||||
}
|
|
@ -0,0 +1,61 @@
|
|||
package pgx
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"testing"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/grafana/grafana/pkg/tsdb/grafana-postgresql-datasource/sqleng"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestErrToHealthCheckResult(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
err error
|
||||
want *backend.CheckHealthResult
|
||||
}{
|
||||
{
|
||||
name: "without error",
|
||||
want: &backend.CheckHealthResult{Status: backend.HealthStatusError, Message: "Internal Server Error"},
|
||||
},
|
||||
{
|
||||
name: "network error",
|
||||
err: errors.Join(errors.New("foo"), &net.OpError{Op: "read", Net: "tcp", Err: errors.New("some op")}),
|
||||
want: &backend.CheckHealthResult{
|
||||
Status: backend.HealthStatusError,
|
||||
Message: "Network error: Failed to connect to the server. Error message: some op",
|
||||
JSONDetails: []byte(`{"errorDetailsLink":"https://grafana.com/docs/grafana/latest/datasources/postgres","verboseMessage":"foo\nread tcp: some op"}`),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "regular error",
|
||||
err: errors.New("internal server error"),
|
||||
want: &backend.CheckHealthResult{
|
||||
Status: backend.HealthStatusError,
|
||||
Message: "internal server error",
|
||||
JSONDetails: []byte(`{"errorDetailsLink":"https://grafana.com/docs/grafana/latest/datasources/postgres","verboseMessage":"internal server error"}`),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "invalid port specifier error",
|
||||
err: fmt.Errorf("%w %q: %w", sqleng.ErrParsingPostgresURL, `"foo.bar.co"`, errors.New(`strconv.Atoi: parsing "foo.bar.co": invalid syntax`)),
|
||||
want: &backend.CheckHealthResult{
|
||||
Status: backend.HealthStatusError,
|
||||
Message: "Connection string error: error parsing postgres url",
|
||||
JSONDetails: []byte(`{"errorDetailsLink":"https://grafana.com/docs/grafana/latest/datasources/postgres","verboseMessage":"error parsing postgres url \"\\\"foo.bar.co\\\"\": strconv.Atoi: parsing \"foo.bar.co\": invalid syntax"}`),
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
got, err := ErrToHealthCheckResult(tt.err)
|
||||
require.Nil(t, err)
|
||||
assert.Equal(t, string(tt.want.JSONDetails), string(got.JSONDetails))
|
||||
require.Equal(t, tt.want, got)
|
||||
})
|
||||
}
|
||||
}
|
|
@ -1,25 +1,109 @@
|
|||
package sqleng
|
||||
package pgx
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"runtime/debug"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend/gtime"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data/sqlutil"
|
||||
"github.com/grafana/grafana/pkg/tsdb/grafana-postgresql-datasource/sqleng"
|
||||
"github.com/jackc/pgx/v5/pgconn"
|
||||
"github.com/jackc/pgx/v5/pgtype"
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
)
|
||||
|
||||
func NewQueryDataHandlerPGX(userFacingDefaultError string, p *pgxpool.Pool, config DataPluginConfiguration, queryResultTransformer SqlQueryResultTransformer,
|
||||
// MetaKeyExecutedQueryString is the key where the executed query should get stored
|
||||
const MetaKeyExecutedQueryString = "executedQueryString"
|
||||
|
||||
// SQLMacroEngine interpolates macros into sql. It takes in the Query to have access to query context and
|
||||
// timeRange to be able to generate queries that use from and to.
|
||||
type SQLMacroEngine interface {
|
||||
Interpolate(query *backend.DataQuery, timeRange backend.TimeRange, sql string) (string, error)
|
||||
}
|
||||
|
||||
// SqlQueryResultTransformer transforms a query result row to RowValues with proper types.
|
||||
type SqlQueryResultTransformer interface {
|
||||
// TransformQueryError transforms a query error.
|
||||
TransformQueryError(logger log.Logger, err error) error
|
||||
GetConverterList() []sqlutil.StringConverter
|
||||
}
|
||||
|
||||
type JsonData struct {
|
||||
MaxOpenConns int `json:"maxOpenConns"`
|
||||
MaxIdleConns int `json:"maxIdleConns"`
|
||||
ConnMaxLifetime int `json:"connMaxLifetime"`
|
||||
ConnectionTimeout int `json:"connectionTimeout"`
|
||||
Timescaledb bool `json:"timescaledb"`
|
||||
Mode string `json:"sslmode"`
|
||||
ConfigurationMethod string `json:"tlsConfigurationMethod"`
|
||||
TlsSkipVerify bool `json:"tlsSkipVerify"`
|
||||
RootCertFile string `json:"sslRootCertFile"`
|
||||
CertFile string `json:"sslCertFile"`
|
||||
CertKeyFile string `json:"sslKeyFile"`
|
||||
Timezone string `json:"timezone"`
|
||||
Encrypt string `json:"encrypt"`
|
||||
Servername string `json:"servername"`
|
||||
TimeInterval string `json:"timeInterval"`
|
||||
Database string `json:"database"`
|
||||
SecureDSProxy bool `json:"enableSecureSocksProxy"`
|
||||
SecureDSProxyUsername string `json:"secureSocksProxyUsername"`
|
||||
AllowCleartextPasswords bool `json:"allowCleartextPasswords"`
|
||||
AuthenticationType string `json:"authenticationType"`
|
||||
}
|
||||
|
||||
type DataPluginConfiguration struct {
|
||||
DSInfo sqleng.DataSourceInfo
|
||||
TimeColumnNames []string
|
||||
MetricColumnTypes []string
|
||||
RowLimit int64
|
||||
}
|
||||
|
||||
type DataSourceHandler struct {
|
||||
macroEngine SQLMacroEngine
|
||||
queryResultTransformer SqlQueryResultTransformer
|
||||
timeColumnNames []string
|
||||
metricColumnTypes []string
|
||||
log log.Logger
|
||||
dsInfo sqleng.DataSourceInfo
|
||||
rowLimit int64
|
||||
userError string
|
||||
pool *pgxpool.Pool
|
||||
}
|
||||
|
||||
type QueryJson struct {
|
||||
RawSql string `json:"rawSql"`
|
||||
Fill bool `json:"fill"`
|
||||
FillInterval float64 `json:"fillInterval"`
|
||||
FillMode string `json:"fillMode"`
|
||||
FillValue float64 `json:"fillValue"`
|
||||
Format string `json:"format"`
|
||||
}
|
||||
|
||||
func (e *DataSourceHandler) TransformQueryError(logger log.Logger, err error) error {
|
||||
// OpError is the error type usually returned by functions in the net
|
||||
// package. It describes the operation, network type, and address of
|
||||
// an error. We log this error rather than return it to the client
|
||||
// for security purposes.
|
||||
var opErr *net.OpError
|
||||
if errors.As(err, &opErr) {
|
||||
return fmt.Errorf("failed to connect to server - %s", e.userError)
|
||||
}
|
||||
|
||||
return e.queryResultTransformer.TransformQueryError(logger, err)
|
||||
}
|
||||
|
||||
func NewQueryDataHandler(userFacingDefaultError string, p *pgxpool.Pool, config DataPluginConfiguration, queryResultTransformer SqlQueryResultTransformer,
|
||||
macroEngine SQLMacroEngine, log log.Logger) (*DataSourceHandler, error) {
|
||||
queryDataHandler := DataSourceHandler{
|
||||
queryResultTransformer: queryResultTransformer,
|
||||
|
@ -43,7 +127,12 @@ func NewQueryDataHandlerPGX(userFacingDefaultError string, p *pgxpool.Pool, conf
|
|||
return &queryDataHandler, nil
|
||||
}
|
||||
|
||||
func (e *DataSourceHandler) DisposePGX() {
|
||||
type DBDataResponse struct {
|
||||
dataResponse backend.DataResponse
|
||||
refID string
|
||||
}
|
||||
|
||||
func (e *DataSourceHandler) Dispose() {
|
||||
e.log.Debug("Disposing DB...")
|
||||
|
||||
if e.pool != nil {
|
||||
|
@ -53,11 +142,11 @@ func (e *DataSourceHandler) DisposePGX() {
|
|||
e.log.Debug("DB disposed")
|
||||
}
|
||||
|
||||
func (e *DataSourceHandler) PingPGX(ctx context.Context) error {
|
||||
func (e *DataSourceHandler) Ping(ctx context.Context) error {
|
||||
return e.pool.Ping(ctx)
|
||||
}
|
||||
|
||||
func (e *DataSourceHandler) QueryDataPGX(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
|
||||
func (e *DataSourceHandler) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
|
||||
result := backend.NewQueryDataResponse()
|
||||
ch := make(chan DBDataResponse, len(req.Queries))
|
||||
var wg sync.WaitGroup
|
||||
|
@ -83,7 +172,7 @@ func (e *DataSourceHandler) QueryDataPGX(ctx context.Context, req *backend.Query
|
|||
}
|
||||
|
||||
wg.Add(1)
|
||||
go e.executeQueryPGX(ctx, query, &wg, ch, queryjson)
|
||||
go e.executeQuery(ctx, query, &wg, ch, queryjson)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
@ -101,7 +190,7 @@ func (e *DataSourceHandler) QueryDataPGX(ctx context.Context, req *backend.Query
|
|||
func (e *DataSourceHandler) handleQueryError(frameErr string, err error, query string, source backend.ErrorSource, ch chan DBDataResponse, queryResult DBDataResponse) {
|
||||
var emptyFrame data.Frame
|
||||
emptyFrame.SetMeta(&data.FrameMeta{ExecutedQueryString: query})
|
||||
if backend.IsDownstreamError(err) {
|
||||
if isDownstreamError(err) {
|
||||
source = backend.ErrorSourceDownstream
|
||||
}
|
||||
queryResult.dataResponse.Error = fmt.Errorf("%s: %w", frameErr, err)
|
||||
|
@ -127,6 +216,18 @@ func (e *DataSourceHandler) handlePanic(logger log.Logger, queryResult *DBDataRe
|
|||
}
|
||||
}
|
||||
|
||||
// Interpolate provides global macros/substitutions for all sql datasources.
|
||||
var Interpolate = func(query backend.DataQuery, timeRange backend.TimeRange, timeInterval string, sql string) string {
|
||||
interval := query.Interval
|
||||
|
||||
sql = strings.ReplaceAll(sql, "$__interval_ms", strconv.FormatInt(interval.Milliseconds(), 10))
|
||||
sql = strings.ReplaceAll(sql, "$__interval", gtime.FormatInterval(interval))
|
||||
sql = strings.ReplaceAll(sql, "$__unixEpochFrom()", fmt.Sprintf("%d", timeRange.From.UTC().Unix()))
|
||||
sql = strings.ReplaceAll(sql, "$__unixEpochTo()", fmt.Sprintf("%d", timeRange.To.UTC().Unix()))
|
||||
|
||||
return sql
|
||||
}
|
||||
|
||||
func (e *DataSourceHandler) execQuery(ctx context.Context, query string) ([]*pgconn.Result, error) {
|
||||
c, err := e.pool.Acquire(ctx)
|
||||
if err != nil {
|
||||
|
@ -140,7 +241,7 @@ func (e *DataSourceHandler) execQuery(ctx context.Context, query string) ([]*pgc
|
|||
return mrr.ReadAll()
|
||||
}
|
||||
|
||||
func (e *DataSourceHandler) executeQueryPGX(queryContext context.Context, query backend.DataQuery, wg *sync.WaitGroup,
|
||||
func (e *DataSourceHandler) executeQuery(queryContext context.Context, query backend.DataQuery, wg *sync.WaitGroup,
|
||||
ch chan DBDataResponse, queryJSON QueryJson) {
|
||||
defer wg.Done()
|
||||
queryResult := DBDataResponse{
|
||||
|
@ -171,7 +272,7 @@ func (e *DataSourceHandler) executeQueryPGX(queryContext context.Context, query
|
|||
return
|
||||
}
|
||||
|
||||
qm, err := e.newProcessCfgPGX(queryContext, query, results, interpolatedQuery)
|
||||
qm, err := e.newProcessCfg(queryContext, query, results, interpolatedQuery)
|
||||
if err != nil {
|
||||
e.handleQueryError("failed to get configurations", err, interpolatedQuery, backend.ErrorSourceDownstream, ch, queryResult)
|
||||
return
|
||||
|
@ -186,6 +287,47 @@ func (e *DataSourceHandler) executeQueryPGX(queryContext context.Context, query
|
|||
e.processFrame(frame, qm, queryResult, ch, logger)
|
||||
}
|
||||
|
||||
// dataQueryFormat is the type of query.
|
||||
type dataQueryFormat string
|
||||
|
||||
const (
|
||||
// dataQueryFormatTable identifies a table query (default).
|
||||
dataQueryFormatTable dataQueryFormat = "table"
|
||||
// dataQueryFormatSeries identifies a time series query.
|
||||
dataQueryFormatSeries dataQueryFormat = "time_series"
|
||||
)
|
||||
|
||||
type dataQueryModel struct {
|
||||
InterpolatedQuery string // property not set until after Interpolate()
|
||||
Format dataQueryFormat
|
||||
TimeRange backend.TimeRange
|
||||
FillMissing *data.FillMissing // property not set until after Interpolate()
|
||||
Interval time.Duration
|
||||
columnNames []string
|
||||
columnTypes []string
|
||||
timeIndex int
|
||||
timeEndIndex int
|
||||
metricIndex int
|
||||
metricPrefix bool
|
||||
queryContext context.Context
|
||||
}
|
||||
|
||||
func convertSQLTimeColumnsToEpochMS(frame *data.Frame, qm *dataQueryModel) error {
|
||||
if qm.timeIndex != -1 {
|
||||
if err := convertSQLTimeColumnToEpochMS(frame, qm.timeIndex); err != nil {
|
||||
return fmt.Errorf("%v: %w", "failed to convert time column", err)
|
||||
}
|
||||
}
|
||||
|
||||
if qm.timeEndIndex != -1 {
|
||||
if err := convertSQLTimeColumnToEpochMS(frame, qm.timeEndIndex); err != nil {
|
||||
return fmt.Errorf("%v: %w", "failed to convert timeend column", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *DataSourceHandler) processFrame(frame *data.Frame, qm *dataQueryModel, queryResult DBDataResponse, ch chan DBDataResponse, logger log.Logger) {
|
||||
if frame.Meta == nil {
|
||||
frame.Meta = &data.FrameMeta{}
|
||||
|
@ -281,10 +423,10 @@ func (e *DataSourceHandler) processFrame(frame *data.Frame, qm *dataQueryModel,
|
|||
ch <- queryResult
|
||||
}
|
||||
|
||||
func (e *DataSourceHandler) newProcessCfgPGX(queryContext context.Context, query backend.DataQuery,
|
||||
func (e *DataSourceHandler) newProcessCfg(queryContext context.Context, query backend.DataQuery,
|
||||
results []*pgconn.Result, interpolatedQuery string) (*dataQueryModel, error) {
|
||||
columnNames := []string{}
|
||||
columnTypesPGX := []string{}
|
||||
columnTypes := []string{}
|
||||
|
||||
// The results will contain column information in the metadata
|
||||
for _, result := range results {
|
||||
|
@ -296,26 +438,26 @@ func (e *DataSourceHandler) newProcessCfgPGX(queryContext context.Context, query
|
|||
// Handle special cases for field types
|
||||
switch field.DataTypeOID {
|
||||
case pgtype.TimetzOID:
|
||||
columnTypesPGX = append(columnTypesPGX, "timetz")
|
||||
columnTypes = append(columnTypes, "timetz")
|
||||
case 790:
|
||||
columnTypesPGX = append(columnTypesPGX, "money")
|
||||
columnTypes = append(columnTypes, "money")
|
||||
default:
|
||||
columnTypesPGX = append(columnTypesPGX, "unknown")
|
||||
columnTypes = append(columnTypes, "unknown")
|
||||
}
|
||||
} else {
|
||||
columnTypesPGX = append(columnTypesPGX, pqtype.Name)
|
||||
columnTypes = append(columnTypes, pqtype.Name)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
qm := &dataQueryModel{
|
||||
columnTypesPGX: columnTypesPGX,
|
||||
columnNames: columnNames,
|
||||
timeIndex: -1,
|
||||
timeEndIndex: -1,
|
||||
metricIndex: -1,
|
||||
metricPrefix: false,
|
||||
queryContext: queryContext,
|
||||
columnTypes: columnTypes,
|
||||
columnNames: columnNames,
|
||||
timeIndex: -1,
|
||||
timeEndIndex: -1,
|
||||
metricIndex: -1,
|
||||
metricPrefix: false,
|
||||
queryContext: queryContext,
|
||||
}
|
||||
|
||||
queryJSON := QueryJson{}
|
||||
|
@ -370,7 +512,7 @@ func (e *DataSourceHandler) newProcessCfgPGX(queryContext context.Context, query
|
|||
qm.metricIndex = i
|
||||
default:
|
||||
if qm.metricIndex == -1 {
|
||||
columnType := qm.columnTypesPGX[i]
|
||||
columnType := qm.columnTypes[i]
|
||||
for _, mct := range e.metricColumnTypes {
|
||||
if columnType == mct {
|
||||
qm.metricIndex = i
|
||||
|
@ -596,3 +738,99 @@ func getFieldTypesFromDescriptions(fieldDescriptions []pgconn.FieldDescription,
|
|||
}
|
||||
return fieldTypes, nil
|
||||
}
|
||||
|
||||
// convertSQLTimeColumnToEpochMS converts column named time to unix timestamp in milliseconds
|
||||
// to make native datetime types and epoch dates work in annotation and table queries.
|
||||
func convertSQLTimeColumnToEpochMS(frame *data.Frame, timeIndex int) error {
|
||||
if timeIndex < 0 || timeIndex >= len(frame.Fields) {
|
||||
return fmt.Errorf("timeIndex %d is out of range", timeIndex)
|
||||
}
|
||||
|
||||
origin := frame.Fields[timeIndex]
|
||||
valueType := origin.Type()
|
||||
if valueType == data.FieldTypeTime || valueType == data.FieldTypeNullableTime {
|
||||
return nil
|
||||
}
|
||||
|
||||
newField := data.NewFieldFromFieldType(data.FieldTypeNullableTime, 0)
|
||||
newField.Name = origin.Name
|
||||
newField.Labels = origin.Labels
|
||||
|
||||
valueLength := origin.Len()
|
||||
for i := 0; i < valueLength; i++ {
|
||||
v, err := origin.NullableFloatAt(i)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to convert data to a time field")
|
||||
}
|
||||
if v == nil {
|
||||
newField.Append(nil)
|
||||
} else {
|
||||
timestamp := time.Unix(0, int64(epochPrecisionToMS(*v))*int64(time.Millisecond))
|
||||
newField.Append(×tamp)
|
||||
}
|
||||
}
|
||||
frame.Fields[timeIndex] = newField
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// convertSQLValueColumnToFloat converts timeseries value column to float.
|
||||
func convertSQLValueColumnToFloat(frame *data.Frame, Index int) (*data.Frame, error) {
|
||||
if Index < 0 || Index >= len(frame.Fields) {
|
||||
return frame, fmt.Errorf("metricIndex %d is out of range", Index)
|
||||
}
|
||||
|
||||
origin := frame.Fields[Index]
|
||||
valueType := origin.Type()
|
||||
if valueType == data.FieldTypeFloat64 || valueType == data.FieldTypeNullableFloat64 {
|
||||
return frame, nil
|
||||
}
|
||||
|
||||
newField := data.NewFieldFromFieldType(data.FieldTypeNullableFloat64, origin.Len())
|
||||
newField.Name = origin.Name
|
||||
newField.Labels = origin.Labels
|
||||
|
||||
for i := 0; i < origin.Len(); i++ {
|
||||
v, err := origin.NullableFloatAt(i)
|
||||
if err != nil {
|
||||
return frame, err
|
||||
}
|
||||
newField.Set(i, v)
|
||||
}
|
||||
|
||||
frame.Fields[Index] = newField
|
||||
|
||||
return frame, nil
|
||||
}
|
||||
|
||||
// epochPrecisionToMS converts epoch precision to millisecond, if needed.
|
||||
// Only seconds to milliseconds supported right now
|
||||
func epochPrecisionToMS(value float64) float64 {
|
||||
s := strconv.FormatFloat(value, 'e', -1, 64)
|
||||
if strings.HasSuffix(s, "e+09") {
|
||||
return value * float64(1e3)
|
||||
}
|
||||
|
||||
if strings.HasSuffix(s, "e+18") {
|
||||
return value / float64(time.Millisecond)
|
||||
}
|
||||
|
||||
return value
|
||||
}
|
||||
|
||||
func isDownstreamError(err error) bool {
|
||||
if backend.IsDownstreamError(err) {
|
||||
return true
|
||||
}
|
||||
resultProcessingDownstreamErrors := []error{
|
||||
data.ErrorInputFieldsWithoutRows,
|
||||
data.ErrorSeriesUnsorted,
|
||||
data.ErrorNullTimeValues,
|
||||
}
|
||||
for _, e := range resultProcessingDownstreamErrors {
|
||||
if errors.Is(err, e) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
|
@ -0,0 +1,681 @@
|
|||
package pgx
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data/sqlutil"
|
||||
"github.com/jackc/pgx/v5/pgconn"
|
||||
"github.com/jackc/pgx/v5/pgtype"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
|
||||
)
|
||||
|
||||
func Pointer[T any](v T) *T { return &v }
|
||||
|
||||
func TestSQLEngine(t *testing.T) {
|
||||
dt := time.Date(2018, 3, 14, 21, 20, 6, int(527345*time.Microsecond), time.UTC)
|
||||
|
||||
t.Run("Handle interpolating $__interval and $__interval_ms", func(t *testing.T) {
|
||||
from := time.Date(2018, 4, 12, 18, 0, 0, 0, time.UTC)
|
||||
to := from.Add(5 * time.Minute)
|
||||
timeRange := backend.TimeRange{From: from, To: to}
|
||||
|
||||
text := "$__interval $__timeGroupAlias(time,$__interval) $__interval_ms"
|
||||
|
||||
t.Run("interpolate 10 minutes $__interval", func(t *testing.T) {
|
||||
query := backend.DataQuery{JSON: []byte("{}"), MaxDataPoints: 1500, Interval: time.Minute * 10}
|
||||
sql := Interpolate(query, timeRange, "", text)
|
||||
require.Equal(t, "10m $__timeGroupAlias(time,10m) 600000", sql)
|
||||
})
|
||||
|
||||
t.Run("interpolate 4seconds $__interval", func(t *testing.T) {
|
||||
query := backend.DataQuery{JSON: []byte("{}"), MaxDataPoints: 1500, Interval: time.Second * 4}
|
||||
sql := Interpolate(query, timeRange, "", text)
|
||||
require.Equal(t, "4s $__timeGroupAlias(time,4s) 4000", sql)
|
||||
})
|
||||
|
||||
t.Run("interpolate 200 milliseconds $__interval", func(t *testing.T) {
|
||||
query := backend.DataQuery{JSON: []byte("{}"), MaxDataPoints: 1500, Interval: time.Millisecond * 200}
|
||||
sql := Interpolate(query, timeRange, "", text)
|
||||
require.Equal(t, "200ms $__timeGroupAlias(time,200ms) 200", sql)
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("Given a time range between 2018-04-12 00:00 and 2018-04-12 00:05", func(t *testing.T) {
|
||||
from := time.Date(2018, 4, 12, 18, 0, 0, 0, time.UTC)
|
||||
to := from.Add(5 * time.Minute)
|
||||
timeRange := backend.TimeRange{From: from, To: to}
|
||||
query := backend.DataQuery{JSON: []byte("{}"), MaxDataPoints: 1500, Interval: time.Second * 60}
|
||||
|
||||
t.Run("interpolate __unixEpochFrom function", func(t *testing.T) {
|
||||
sql := Interpolate(query, timeRange, "", "select $__unixEpochFrom()")
|
||||
require.Equal(t, fmt.Sprintf("select %d", from.Unix()), sql)
|
||||
})
|
||||
|
||||
t.Run("interpolate __unixEpochTo function", func(t *testing.T) {
|
||||
sql := Interpolate(query, timeRange, "", "select $__unixEpochTo()")
|
||||
require.Equal(t, fmt.Sprintf("select %d", to.Unix()), sql)
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("Given row values with int64 as time columns", func(t *testing.T) {
|
||||
tSeconds := dt.Unix()
|
||||
tMilliseconds := dt.UnixNano() / 1e6
|
||||
tNanoSeconds := dt.UnixNano()
|
||||
var nilPointer *int64
|
||||
|
||||
originFrame := data.NewFrame("",
|
||||
data.NewField("time1", nil, []int64{
|
||||
tSeconds,
|
||||
}),
|
||||
data.NewField("time2", nil, []*int64{
|
||||
Pointer(tSeconds),
|
||||
}),
|
||||
data.NewField("time3", nil, []int64{
|
||||
tMilliseconds,
|
||||
}),
|
||||
data.NewField("time4", nil, []*int64{
|
||||
Pointer(tMilliseconds),
|
||||
}),
|
||||
data.NewField("time5", nil, []int64{
|
||||
tNanoSeconds,
|
||||
}),
|
||||
data.NewField("time6", nil, []*int64{
|
||||
Pointer(tNanoSeconds),
|
||||
}),
|
||||
data.NewField("time7", nil, []*int64{
|
||||
nilPointer,
|
||||
}),
|
||||
)
|
||||
|
||||
for i := 0; i < len(originFrame.Fields); i++ {
|
||||
err := convertSQLTimeColumnToEpochMS(originFrame, i)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
require.Equal(t, dt.Unix(), (*originFrame.Fields[0].At(0).(*time.Time)).Unix())
|
||||
require.Equal(t, dt.Unix(), (*originFrame.Fields[1].At(0).(*time.Time)).Unix())
|
||||
require.Equal(t, dt.Unix(), (*originFrame.Fields[2].At(0).(*time.Time)).Unix())
|
||||
require.Equal(t, dt.Unix(), (*originFrame.Fields[3].At(0).(*time.Time)).Unix())
|
||||
require.Equal(t, dt.Unix(), (*originFrame.Fields[4].At(0).(*time.Time)).Unix())
|
||||
require.Equal(t, dt.Unix(), (*originFrame.Fields[5].At(0).(*time.Time)).Unix())
|
||||
require.Nil(t, originFrame.Fields[6].At(0))
|
||||
})
|
||||
|
||||
t.Run("Given row values with uint64 as time columns", func(t *testing.T) {
|
||||
tSeconds := uint64(dt.Unix())
|
||||
tMilliseconds := uint64(dt.UnixNano() / 1e6)
|
||||
tNanoSeconds := uint64(dt.UnixNano())
|
||||
var nilPointer *uint64
|
||||
|
||||
originFrame := data.NewFrame("",
|
||||
data.NewField("time1", nil, []uint64{
|
||||
tSeconds,
|
||||
}),
|
||||
data.NewField("time2", nil, []*uint64{
|
||||
Pointer(tSeconds),
|
||||
}),
|
||||
data.NewField("time3", nil, []uint64{
|
||||
tMilliseconds,
|
||||
}),
|
||||
data.NewField("time4", nil, []*uint64{
|
||||
Pointer(tMilliseconds),
|
||||
}),
|
||||
data.NewField("time5", nil, []uint64{
|
||||
tNanoSeconds,
|
||||
}),
|
||||
data.NewField("time6", nil, []*uint64{
|
||||
Pointer(tNanoSeconds),
|
||||
}),
|
||||
data.NewField("time7", nil, []*uint64{
|
||||
nilPointer,
|
||||
}),
|
||||
)
|
||||
|
||||
for i := 0; i < len(originFrame.Fields); i++ {
|
||||
err := convertSQLTimeColumnToEpochMS(originFrame, i)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
require.Equal(t, dt.Unix(), (*originFrame.Fields[0].At(0).(*time.Time)).Unix())
|
||||
require.Equal(t, dt.Unix(), (*originFrame.Fields[1].At(0).(*time.Time)).Unix())
|
||||
require.Equal(t, dt.Unix(), (*originFrame.Fields[2].At(0).(*time.Time)).Unix())
|
||||
require.Equal(t, dt.Unix(), (*originFrame.Fields[3].At(0).(*time.Time)).Unix())
|
||||
require.Equal(t, dt.Unix(), (*originFrame.Fields[4].At(0).(*time.Time)).Unix())
|
||||
require.Equal(t, dt.Unix(), (*originFrame.Fields[5].At(0).(*time.Time)).Unix())
|
||||
require.Nil(t, originFrame.Fields[6].At(0))
|
||||
})
|
||||
|
||||
t.Run("Given row values with int32 as time columns", func(t *testing.T) {
|
||||
tSeconds := int32(dt.Unix())
|
||||
var nilInt *int32
|
||||
|
||||
originFrame := data.NewFrame("",
|
||||
data.NewField("time1", nil, []int32{
|
||||
tSeconds,
|
||||
}),
|
||||
data.NewField("time2", nil, []*int32{
|
||||
Pointer(tSeconds),
|
||||
}),
|
||||
data.NewField("time7", nil, []*int32{
|
||||
nilInt,
|
||||
}),
|
||||
)
|
||||
for i := 0; i < 3; i++ {
|
||||
err := convertSQLTimeColumnToEpochMS(originFrame, i)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
require.Equal(t, dt.Unix(), (*originFrame.Fields[0].At(0).(*time.Time)).Unix())
|
||||
require.Equal(t, dt.Unix(), (*originFrame.Fields[1].At(0).(*time.Time)).Unix())
|
||||
require.Nil(t, originFrame.Fields[2].At(0))
|
||||
})
|
||||
|
||||
t.Run("Given row values with uint32 as time columns", func(t *testing.T) {
|
||||
tSeconds := uint32(dt.Unix())
|
||||
var nilInt *uint32
|
||||
|
||||
originFrame := data.NewFrame("",
|
||||
data.NewField("time1", nil, []uint32{
|
||||
tSeconds,
|
||||
}),
|
||||
data.NewField("time2", nil, []*uint32{
|
||||
Pointer(tSeconds),
|
||||
}),
|
||||
data.NewField("time7", nil, []*uint32{
|
||||
nilInt,
|
||||
}),
|
||||
)
|
||||
for i := 0; i < len(originFrame.Fields); i++ {
|
||||
err := convertSQLTimeColumnToEpochMS(originFrame, i)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
require.Equal(t, dt.Unix(), (*originFrame.Fields[0].At(0).(*time.Time)).Unix())
|
||||
require.Equal(t, dt.Unix(), (*originFrame.Fields[1].At(0).(*time.Time)).Unix())
|
||||
require.Nil(t, originFrame.Fields[2].At(0))
|
||||
})
|
||||
|
||||
t.Run("Given row values with float64 as time columns", func(t *testing.T) {
|
||||
tSeconds := float64(dt.UnixNano()) / float64(time.Second)
|
||||
tMilliseconds := float64(dt.UnixNano()) / float64(time.Millisecond)
|
||||
tNanoSeconds := float64(dt.UnixNano())
|
||||
var nilPointer *float64
|
||||
|
||||
originFrame := data.NewFrame("",
|
||||
data.NewField("time1", nil, []float64{
|
||||
tSeconds,
|
||||
}),
|
||||
data.NewField("time2", nil, []*float64{
|
||||
Pointer(tSeconds),
|
||||
}),
|
||||
data.NewField("time3", nil, []float64{
|
||||
tMilliseconds,
|
||||
}),
|
||||
data.NewField("time4", nil, []*float64{
|
||||
Pointer(tMilliseconds),
|
||||
}),
|
||||
data.NewField("time5", nil, []float64{
|
||||
tNanoSeconds,
|
||||
}),
|
||||
data.NewField("time6", nil, []*float64{
|
||||
Pointer(tNanoSeconds),
|
||||
}),
|
||||
data.NewField("time7", nil, []*float64{
|
||||
nilPointer,
|
||||
}),
|
||||
)
|
||||
|
||||
for i := 0; i < len(originFrame.Fields); i++ {
|
||||
err := convertSQLTimeColumnToEpochMS(originFrame, i)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
require.Equal(t, dt.Unix(), (*originFrame.Fields[0].At(0).(*time.Time)).Unix())
|
||||
require.Equal(t, dt.Unix(), (*originFrame.Fields[1].At(0).(*time.Time)).Unix())
|
||||
require.Equal(t, dt.Unix(), (*originFrame.Fields[2].At(0).(*time.Time)).Unix())
|
||||
require.Equal(t, dt.Unix(), (*originFrame.Fields[3].At(0).(*time.Time)).Unix())
|
||||
require.Equal(t, dt.Unix(), (*originFrame.Fields[4].At(0).(*time.Time)).Unix())
|
||||
require.Equal(t, dt.Unix(), (*originFrame.Fields[5].At(0).(*time.Time)).Unix())
|
||||
require.Nil(t, originFrame.Fields[6].At(0))
|
||||
})
|
||||
|
||||
t.Run("Given row values with float32 as time columns", func(t *testing.T) {
|
||||
tSeconds := float32(dt.Unix())
|
||||
var nilInt *float32
|
||||
|
||||
originFrame := data.NewFrame("",
|
||||
data.NewField("time1", nil, []float32{
|
||||
tSeconds,
|
||||
}),
|
||||
data.NewField("time2", nil, []*float32{
|
||||
Pointer(tSeconds),
|
||||
}),
|
||||
data.NewField("time7", nil, []*float32{
|
||||
nilInt,
|
||||
}),
|
||||
)
|
||||
for i := 0; i < len(originFrame.Fields); i++ {
|
||||
err := convertSQLTimeColumnToEpochMS(originFrame, i)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
require.Equal(t, int64(tSeconds), (*originFrame.Fields[0].At(0).(*time.Time)).Unix())
|
||||
require.Equal(t, int64(tSeconds), (*originFrame.Fields[1].At(0).(*time.Time)).Unix())
|
||||
require.Nil(t, originFrame.Fields[2].At(0))
|
||||
})
|
||||
|
||||
t.Run("Given row with value columns, would be converted to float64", func(t *testing.T) {
|
||||
originFrame := data.NewFrame("",
|
||||
data.NewField("value1", nil, []int64{
|
||||
int64(1),
|
||||
}),
|
||||
data.NewField("value2", nil, []*int64{
|
||||
Pointer(int64(1)),
|
||||
}),
|
||||
data.NewField("value3", nil, []int32{
|
||||
int32(1),
|
||||
}),
|
||||
data.NewField("value4", nil, []*int32{
|
||||
Pointer(int32(1)),
|
||||
}),
|
||||
data.NewField("value5", nil, []int16{
|
||||
int16(1),
|
||||
}),
|
||||
data.NewField("value6", nil, []*int16{
|
||||
Pointer(int16(1)),
|
||||
}),
|
||||
data.NewField("value7", nil, []int8{
|
||||
int8(1),
|
||||
}),
|
||||
data.NewField("value8", nil, []*int8{
|
||||
Pointer(int8(1)),
|
||||
}),
|
||||
data.NewField("value9", nil, []float64{
|
||||
float64(1),
|
||||
}),
|
||||
data.NewField("value10", nil, []*float64{
|
||||
Pointer(1.0),
|
||||
}),
|
||||
data.NewField("value11", nil, []float32{
|
||||
float32(1),
|
||||
}),
|
||||
data.NewField("value12", nil, []*float32{
|
||||
Pointer(float32(1)),
|
||||
}),
|
||||
data.NewField("value13", nil, []uint64{
|
||||
uint64(1),
|
||||
}),
|
||||
data.NewField("value14", nil, []*uint64{
|
||||
Pointer(uint64(1)),
|
||||
}),
|
||||
data.NewField("value15", nil, []uint32{
|
||||
uint32(1),
|
||||
}),
|
||||
data.NewField("value16", nil, []*uint32{
|
||||
Pointer(uint32(1)),
|
||||
}),
|
||||
data.NewField("value17", nil, []uint16{
|
||||
uint16(1),
|
||||
}),
|
||||
data.NewField("value18", nil, []*uint16{
|
||||
Pointer(uint16(1)),
|
||||
}),
|
||||
data.NewField("value19", nil, []uint8{
|
||||
uint8(1),
|
||||
}),
|
||||
data.NewField("value20", nil, []*uint8{
|
||||
Pointer(uint8(1)),
|
||||
}),
|
||||
)
|
||||
for i := 0; i < len(originFrame.Fields); i++ {
|
||||
_, err := convertSQLValueColumnToFloat(originFrame, i)
|
||||
require.NoError(t, err)
|
||||
if i == 8 {
|
||||
require.Equal(t, float64(1), originFrame.Fields[i].At(0).(float64))
|
||||
} else {
|
||||
require.NotNil(t, originFrame.Fields[i].At(0).(*float64))
|
||||
require.Equal(t, float64(1), *originFrame.Fields[i].At(0).(*float64))
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("Given row with nil value columns", func(t *testing.T) {
|
||||
var int64NilPointer *int64
|
||||
var int32NilPointer *int32
|
||||
var int16NilPointer *int16
|
||||
var int8NilPointer *int8
|
||||
var float64NilPointer *float64
|
||||
var float32NilPointer *float32
|
||||
var uint64NilPointer *uint64
|
||||
var uint32NilPointer *uint32
|
||||
var uint16NilPointer *uint16
|
||||
var uint8NilPointer *uint8
|
||||
|
||||
originFrame := data.NewFrame("",
|
||||
data.NewField("value1", nil, []*int64{
|
||||
int64NilPointer,
|
||||
}),
|
||||
data.NewField("value2", nil, []*int32{
|
||||
int32NilPointer,
|
||||
}),
|
||||
data.NewField("value3", nil, []*int16{
|
||||
int16NilPointer,
|
||||
}),
|
||||
data.NewField("value4", nil, []*int8{
|
||||
int8NilPointer,
|
||||
}),
|
||||
data.NewField("value5", nil, []*float64{
|
||||
float64NilPointer,
|
||||
}),
|
||||
data.NewField("value6", nil, []*float32{
|
||||
float32NilPointer,
|
||||
}),
|
||||
data.NewField("value7", nil, []*uint64{
|
||||
uint64NilPointer,
|
||||
}),
|
||||
data.NewField("value8", nil, []*uint32{
|
||||
uint32NilPointer,
|
||||
}),
|
||||
data.NewField("value9", nil, []*uint16{
|
||||
uint16NilPointer,
|
||||
}),
|
||||
data.NewField("value10", nil, []*uint8{
|
||||
uint8NilPointer,
|
||||
}),
|
||||
)
|
||||
for i := 0; i < len(originFrame.Fields); i++ {
|
||||
t.Run("", func(t *testing.T) {
|
||||
_, err := convertSQLValueColumnToFloat(originFrame, i)
|
||||
require.NoError(t, err)
|
||||
require.Nil(t, originFrame.Fields[i].At(0))
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("Should not return raw connection errors", func(t *testing.T) {
|
||||
err := net.OpError{Op: "Dial", Err: fmt.Errorf("inner-error")}
|
||||
transformer := &testQueryResultTransformer{}
|
||||
dp := DataSourceHandler{
|
||||
log: backend.NewLoggerWith("logger", "test"),
|
||||
queryResultTransformer: transformer,
|
||||
}
|
||||
resultErr := dp.TransformQueryError(dp.log, &err)
|
||||
assert.False(t, transformer.transformQueryErrorWasCalled)
|
||||
errorText := resultErr.Error()
|
||||
assert.NotEqual(t, err, resultErr)
|
||||
assert.NotContains(t, errorText, "inner-error")
|
||||
assert.Contains(t, errorText, "failed to connect to server")
|
||||
})
|
||||
|
||||
t.Run("Should return non-connection errors unmodified", func(t *testing.T) {
|
||||
err := fmt.Errorf("normal error")
|
||||
transformer := &testQueryResultTransformer{}
|
||||
dp := DataSourceHandler{
|
||||
log: backend.NewLoggerWith("logger", "test"),
|
||||
queryResultTransformer: transformer,
|
||||
}
|
||||
resultErr := dp.TransformQueryError(dp.log, err)
|
||||
assert.True(t, transformer.transformQueryErrorWasCalled)
|
||||
assert.Equal(t, err, resultErr)
|
||||
assert.ErrorIs(t, err, resultErr)
|
||||
})
|
||||
}
|
||||
|
||||
func TestConvertResultsToFrame(t *testing.T) {
|
||||
// Import the pgx packages needed for testing
|
||||
// These imports are included in the main file but need to be accessible for tests
|
||||
t.Run("convertResultsToFrame with single result", func(t *testing.T) {
|
||||
// Create mock field descriptions
|
||||
fieldDescs := []pgconn.FieldDescription{
|
||||
{Name: "id", DataTypeOID: pgtype.Int4OID},
|
||||
{Name: "name", DataTypeOID: pgtype.TextOID},
|
||||
{Name: "value", DataTypeOID: pgtype.Float8OID},
|
||||
}
|
||||
|
||||
// Create mock result data
|
||||
mockRows := [][][]byte{
|
||||
{[]byte("1"), []byte("test1"), []byte("10.5")},
|
||||
{[]byte("2"), []byte("test2"), []byte("20.7")},
|
||||
}
|
||||
|
||||
// Create mock result
|
||||
result := &pgconn.Result{
|
||||
FieldDescriptions: fieldDescs,
|
||||
Rows: mockRows,
|
||||
}
|
||||
result.CommandTag = pgconn.NewCommandTag("SELECT 2")
|
||||
|
||||
results := []*pgconn.Result{result}
|
||||
|
||||
frame, err := convertResultsToFrame(results, 1000)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, frame)
|
||||
require.Equal(t, 3, len(frame.Fields))
|
||||
require.Equal(t, 2, frame.Rows())
|
||||
|
||||
// Verify field names
|
||||
require.Equal(t, "id", frame.Fields[0].Name)
|
||||
require.Equal(t, "name", frame.Fields[1].Name)
|
||||
require.Equal(t, "value", frame.Fields[2].Name)
|
||||
})
|
||||
|
||||
t.Run("convertResultsToFrame with multiple compatible results", func(t *testing.T) {
|
||||
// Create mock field descriptions (same structure for both results)
|
||||
fieldDescs := []pgconn.FieldDescription{
|
||||
{Name: "id", DataTypeOID: pgtype.Int4OID},
|
||||
{Name: "name", DataTypeOID: pgtype.TextOID},
|
||||
}
|
||||
|
||||
// Create first result
|
||||
mockRows1 := [][][]byte{
|
||||
{[]byte("1"), []byte("test1")},
|
||||
{[]byte("2"), []byte("test2")},
|
||||
}
|
||||
result1 := &pgconn.Result{
|
||||
FieldDescriptions: fieldDescs,
|
||||
Rows: mockRows1,
|
||||
}
|
||||
result1.CommandTag = pgconn.NewCommandTag("SELECT 2")
|
||||
|
||||
// Create second result with same structure
|
||||
mockRows2 := [][][]byte{
|
||||
{[]byte("3"), []byte("test3")},
|
||||
{[]byte("4"), []byte("test4")},
|
||||
}
|
||||
result2 := &pgconn.Result{
|
||||
FieldDescriptions: fieldDescs,
|
||||
Rows: mockRows2,
|
||||
}
|
||||
result2.CommandTag = pgconn.NewCommandTag("SELECT 2")
|
||||
|
||||
results := []*pgconn.Result{result1, result2}
|
||||
|
||||
frame, err := convertResultsToFrame(results, 1000)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, frame)
|
||||
require.Equal(t, 2, len(frame.Fields))
|
||||
require.Equal(t, 4, frame.Rows()) // Should have rows from both results
|
||||
|
||||
// Verify field names
|
||||
require.Equal(t, "id", frame.Fields[0].Name)
|
||||
require.Equal(t, "name", frame.Fields[1].Name)
|
||||
})
|
||||
|
||||
t.Run("convertResultsToFrame with row limit", func(t *testing.T) {
|
||||
// Create mock field descriptions
|
||||
fieldDescs := []pgconn.FieldDescription{
|
||||
{Name: "id", DataTypeOID: pgtype.Int4OID},
|
||||
}
|
||||
|
||||
// Create mock result data with 3 rows
|
||||
mockRows := [][][]byte{
|
||||
{[]byte("1")},
|
||||
{[]byte("2")},
|
||||
{[]byte("3")},
|
||||
}
|
||||
|
||||
result := &pgconn.Result{
|
||||
FieldDescriptions: fieldDescs,
|
||||
Rows: mockRows,
|
||||
}
|
||||
result.CommandTag = pgconn.NewCommandTag("SELECT 3")
|
||||
|
||||
results := []*pgconn.Result{result}
|
||||
|
||||
// Set row limit to 2
|
||||
frame, err := convertResultsToFrame(results, 2)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, frame)
|
||||
require.Equal(t, 1, len(frame.Fields))
|
||||
require.Equal(t, 2, frame.Rows()) // Should be limited to 2 rows
|
||||
|
||||
// Should have a notice about the limit
|
||||
require.NotNil(t, frame.Meta)
|
||||
require.Len(t, frame.Meta.Notices, 1)
|
||||
require.Contains(t, frame.Meta.Notices[0].Text, "Results have been limited to 2")
|
||||
})
|
||||
|
||||
t.Run("convertResultsToFrame with mixed SELECT and non-SELECT results", func(t *testing.T) {
|
||||
// Create a non-SELECT result (should be skipped)
|
||||
nonSelectResult := &pgconn.Result{}
|
||||
nonSelectResult.CommandTag = pgconn.NewCommandTag("UPDATE 1")
|
||||
|
||||
// Create a SELECT result
|
||||
fieldDescs := []pgconn.FieldDescription{
|
||||
{Name: "id", DataTypeOID: pgtype.Int4OID},
|
||||
}
|
||||
mockRows := [][][]byte{
|
||||
{[]byte("1")},
|
||||
}
|
||||
selectResult := &pgconn.Result{
|
||||
FieldDescriptions: fieldDescs,
|
||||
Rows: mockRows,
|
||||
}
|
||||
selectResult.CommandTag = pgconn.NewCommandTag("SELECT 1")
|
||||
|
||||
results := []*pgconn.Result{nonSelectResult, selectResult}
|
||||
|
||||
frame, err := convertResultsToFrame(results, 1000)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, frame)
|
||||
require.Equal(t, 1, len(frame.Fields))
|
||||
require.Equal(t, 1, frame.Rows())
|
||||
})
|
||||
|
||||
t.Run("convertResultsToFrame with no SELECT results", func(t *testing.T) {
|
||||
// Create only non-SELECT results
|
||||
result1 := &pgconn.Result{}
|
||||
result1.CommandTag = pgconn.NewCommandTag("UPDATE 1")
|
||||
|
||||
result2 := &pgconn.Result{}
|
||||
result2.CommandTag = pgconn.NewCommandTag("INSERT 1")
|
||||
|
||||
results := []*pgconn.Result{result1, result2}
|
||||
|
||||
frame, err := convertResultsToFrame(results, 1000)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, frame)
|
||||
require.Equal(t, 0, len(frame.Fields))
|
||||
require.Equal(t, 0, frame.Rows())
|
||||
})
|
||||
|
||||
t.Run("convertResultsToFrame with multiple results and row limit per result", func(t *testing.T) {
|
||||
// Create mock field descriptions (same structure for both results)
|
||||
fieldDescs := []pgconn.FieldDescription{
|
||||
{Name: "id", DataTypeOID: pgtype.Int4OID},
|
||||
}
|
||||
|
||||
// Create first result with 3 rows
|
||||
mockRows1 := [][][]byte{
|
||||
{[]byte("1")},
|
||||
{[]byte("2")},
|
||||
{[]byte("3")},
|
||||
}
|
||||
result1 := &pgconn.Result{
|
||||
FieldDescriptions: fieldDescs,
|
||||
Rows: mockRows1,
|
||||
}
|
||||
result1.CommandTag = pgconn.NewCommandTag("SELECT 3")
|
||||
|
||||
// Create second result with 3 rows
|
||||
mockRows2 := [][][]byte{
|
||||
{[]byte("4")},
|
||||
{[]byte("5")},
|
||||
{[]byte("6")},
|
||||
}
|
||||
result2 := &pgconn.Result{
|
||||
FieldDescriptions: fieldDescs,
|
||||
Rows: mockRows2,
|
||||
}
|
||||
result2.CommandTag = pgconn.NewCommandTag("SELECT 3")
|
||||
|
||||
results := []*pgconn.Result{result1, result2}
|
||||
|
||||
// Set row limit to 2 (should limit each result to 2 rows)
|
||||
frame, err := convertResultsToFrame(results, 2)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, frame)
|
||||
require.Equal(t, 1, len(frame.Fields))
|
||||
require.Equal(t, 4, frame.Rows()) // 2 rows from each result
|
||||
|
||||
// Should have notices about the limit from both results
|
||||
require.NotNil(t, frame.Meta)
|
||||
require.Len(t, frame.Meta.Notices, 2)
|
||||
require.Contains(t, frame.Meta.Notices[0].Text, "Results have been limited to 2")
|
||||
require.Contains(t, frame.Meta.Notices[1].Text, "Results have been limited to 2")
|
||||
})
|
||||
|
||||
t.Run("convertResultsToFrame handles null values correctly", func(t *testing.T) {
|
||||
// Create mock field descriptions
|
||||
fieldDescs := []pgconn.FieldDescription{
|
||||
{Name: "id", DataTypeOID: pgtype.Int4OID},
|
||||
{Name: "name", DataTypeOID: pgtype.TextOID},
|
||||
}
|
||||
|
||||
// Create mock result data with null values
|
||||
mockRows := [][][]byte{
|
||||
{[]byte("1"), nil}, // null name
|
||||
{nil, []byte("test2")}, // null id
|
||||
}
|
||||
|
||||
result := &pgconn.Result{
|
||||
FieldDescriptions: fieldDescs,
|
||||
Rows: mockRows,
|
||||
}
|
||||
result.CommandTag = pgconn.NewCommandTag("SELECT 2")
|
||||
|
||||
results := []*pgconn.Result{result}
|
||||
|
||||
frame, err := convertResultsToFrame(results, 1000)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, frame)
|
||||
require.Equal(t, 2, len(frame.Fields))
|
||||
require.Equal(t, 2, frame.Rows())
|
||||
|
||||
// Check that null values are handled correctly
|
||||
// The exact representation depends on the field type, but should not panic
|
||||
require.NotPanics(t, func() {
|
||||
frame.Fields[0].At(1) // null id
|
||||
frame.Fields[1].At(0) // null name
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
type testQueryResultTransformer struct {
|
||||
transformQueryErrorWasCalled bool
|
||||
}
|
||||
|
||||
func (t *testQueryResultTransformer) TransformQueryError(_ log.Logger, err error) error {
|
||||
t.transformQueryErrorWasCalled = true
|
||||
return err
|
||||
}
|
||||
|
||||
func (t *testQueryResultTransformer) GetConverterList() []sqlutil.StringConverter {
|
||||
return nil
|
||||
}
|
|
@ -16,56 +16,14 @@ import (
|
|||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data/sqlutil"
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
"github.com/lib/pq"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
|
||||
sqlengpgx "github.com/grafana/grafana/pkg/tsdb/grafana-postgresql-datasource/pgx"
|
||||
"github.com/grafana/grafana/pkg/tsdb/grafana-postgresql-datasource/sqleng"
|
||||
)
|
||||
|
||||
func ProvideService(cfg *setting.Cfg, features featuremgmt.FeatureToggles) *Service {
|
||||
logger := backend.NewLoggerWith("logger", "tsdb.postgres")
|
||||
s := &Service{
|
||||
tlsManager: newTLSManager(logger, cfg.DataPath),
|
||||
pgxTlsManager: newPgxTlsManager(logger),
|
||||
logger: logger,
|
||||
features: features,
|
||||
}
|
||||
s.im = datasource.NewInstanceManager(s.newInstanceSettings())
|
||||
return s
|
||||
}
|
||||
|
||||
type Service struct {
|
||||
tlsManager tlsSettingsProvider
|
||||
pgxTlsManager *pgxTlsManager
|
||||
im instancemgmt.InstanceManager
|
||||
logger log.Logger
|
||||
features featuremgmt.FeatureToggles
|
||||
}
|
||||
|
||||
func (s *Service) getDSInfo(ctx context.Context, pluginCtx backend.PluginContext) (*sqleng.DataSourceHandler, error) {
|
||||
i, err := s.im.Get(ctx, pluginCtx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
instance := i.(*sqleng.DataSourceHandler)
|
||||
return instance, nil
|
||||
}
|
||||
|
||||
func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
|
||||
dsInfo, err := s.getDSInfo(ctx, req.PluginContext)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if s.features.IsEnabled(ctx, featuremgmt.FlagPostgresDSUsePGX) {
|
||||
return dsInfo.QueryDataPGX(ctx, req)
|
||||
}
|
||||
|
||||
return dsInfo.QueryData(ctx, req)
|
||||
}
|
||||
|
||||
func newPostgres(ctx context.Context, userFacingDefaultError string, rowLimit int64, dsInfo sqleng.DataSourceInfo, cnnstr string, logger log.Logger, settings backend.DataSourceInstanceSettings) (*sql.DB, *sqleng.DataSourceHandler, error) {
|
||||
connector, err := pq.NewConnector(cnnstr)
|
||||
if err != nil {
|
||||
|
@ -115,7 +73,7 @@ func newPostgres(ctx context.Context, userFacingDefaultError string, rowLimit in
|
|||
return db, handler, nil
|
||||
}
|
||||
|
||||
func newPostgresPGX(ctx context.Context, userFacingDefaultError string, rowLimit int64, dsInfo sqleng.DataSourceInfo, cnnstr string, logger log.Logger, settings backend.DataSourceInstanceSettings) (*pgxpool.Pool, *sqleng.DataSourceHandler, error) {
|
||||
func newPostgresPGX(ctx context.Context, userFacingDefaultError string, rowLimit int64, dsInfo sqleng.DataSourceInfo, cnnstr string, logger log.Logger, settings backend.DataSourceInstanceSettings) (*pgxpool.Pool, *sqlengpgx.DataSourceHandler, error) {
|
||||
pgxConf, err := pgxpool.ParseConfig(cnnstr)
|
||||
if err != nil {
|
||||
logger.Error("postgres config creation failed", "error", err)
|
||||
|
@ -144,7 +102,7 @@ func newPostgresPGX(ctx context.Context, userFacingDefaultError string, rowLimit
|
|||
return []string{host}, nil
|
||||
}
|
||||
|
||||
config := sqleng.DataPluginConfiguration{
|
||||
config := sqlengpgx.DataPluginConfiguration{
|
||||
DSInfo: dsInfo,
|
||||
MetricColumnTypes: []string{"unknown", "text", "varchar", "char", "bpchar"},
|
||||
RowLimit: rowLimit,
|
||||
|
@ -160,7 +118,7 @@ func newPostgresPGX(ctx context.Context, userFacingDefaultError string, rowLimit
|
|||
return nil, nil, err
|
||||
}
|
||||
|
||||
handler, err := sqleng.NewQueryDataHandlerPGX(userFacingDefaultError, p, config, &queryResultTransformer, newPostgresMacroEngine(dsInfo.JsonData.Timescaledb),
|
||||
handler, err := sqlengpgx.NewQueryDataHandler(userFacingDefaultError, p, config, &queryResultTransformer, newPostgresMacroEngine(dsInfo.JsonData.Timescaledb),
|
||||
logger)
|
||||
if err != nil {
|
||||
logger.Error("Failed connecting to Postgres", "err", err)
|
||||
|
@ -171,8 +129,7 @@ func newPostgresPGX(ctx context.Context, userFacingDefaultError string, rowLimit
|
|||
return p, handler, nil
|
||||
}
|
||||
|
||||
func (s *Service) newInstanceSettings() datasource.InstanceFactoryFunc {
|
||||
logger := s.logger
|
||||
func NewInstanceSettings(logger log.Logger, features featuremgmt.FeatureToggles, dataPath string) datasource.InstanceFactoryFunc {
|
||||
return func(ctx context.Context, settings backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
|
||||
cfg := backend.GrafanaConfigFromContext(ctx)
|
||||
sqlCfg, err := cfg.SQL()
|
||||
|
@ -210,49 +167,53 @@ func (s *Service) newInstanceSettings() datasource.InstanceFactoryFunc {
|
|||
DecryptedSecureJSONData: settings.DecryptedSecureJSONData,
|
||||
}
|
||||
|
||||
isPGX := s.features.IsEnabled(ctx, featuremgmt.FlagPostgresDSUsePGX)
|
||||
isPGX := features.IsEnabled(ctx, featuremgmt.FlagPostgresDSUsePGX)
|
||||
|
||||
userFacingDefaultError, err := cfg.UserFacingDefaultError()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var handler instancemgmt.Instance
|
||||
if isPGX {
|
||||
pgxTlsSettings, err := s.pgxTlsManager.getTLSSettings(dsInfo)
|
||||
pgxlogger := logger.FromContext(ctx).With("driver", "pgx")
|
||||
pgxTlsManager := newPgxTlsManager(pgxlogger)
|
||||
pgxTlsSettings, err := pgxTlsManager.getTLSSettings(dsInfo)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
// Ensure cleanupCertFiles is called after the connection is opened
|
||||
defer s.pgxTlsManager.cleanupCertFiles(pgxTlsSettings)
|
||||
cnnstr, err := s.generateConnectionString(dsInfo, pgxTlsSettings, isPGX)
|
||||
defer pgxTlsManager.cleanupCertFiles(pgxTlsSettings)
|
||||
cnnstr, err := generateConnectionString(dsInfo, pgxTlsSettings, isPGX, pgxlogger)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
_, handler, err = newPostgresPGX(ctx, userFacingDefaultError, sqlCfg.RowLimit, dsInfo, cnnstr, logger, settings)
|
||||
_, handler, err := newPostgresPGX(ctx, userFacingDefaultError, sqlCfg.RowLimit, dsInfo, cnnstr, pgxlogger, settings)
|
||||
if err != nil {
|
||||
logger.Error("Failed connecting to Postgres", "err", err)
|
||||
pgxlogger.Error("Failed connecting to Postgres", "err", err)
|
||||
return nil, err
|
||||
}
|
||||
pgxlogger.Debug("Successfully connected to Postgres")
|
||||
return handler, nil
|
||||
} else {
|
||||
tlsSettings, err := s.tlsManager.getTLSSettings(dsInfo)
|
||||
pqlogger := logger.FromContext(ctx).With("driver", "libpq")
|
||||
tlsManager := newTLSManager(pqlogger, dataPath)
|
||||
tlsSettings, err := tlsManager.getTLSSettings(dsInfo)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
cnnstr, err := s.generateConnectionString(dsInfo, tlsSettings, isPGX)
|
||||
cnnstr, err := generateConnectionString(dsInfo, tlsSettings, isPGX, pqlogger)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
_, handler, err = newPostgres(ctx, userFacingDefaultError, sqlCfg.RowLimit, dsInfo, cnnstr, logger, settings)
|
||||
_, handler, err := newPostgres(ctx, userFacingDefaultError, sqlCfg.RowLimit, dsInfo, cnnstr, pqlogger, settings)
|
||||
if err != nil {
|
||||
logger.Error("Failed connecting to Postgres", "err", err)
|
||||
pqlogger.Error("Failed connecting to Postgres", "err", err)
|
||||
return nil, err
|
||||
}
|
||||
pqlogger.Debug("Successfully connected to Postgres")
|
||||
return handler, nil
|
||||
}
|
||||
|
||||
logger.Debug("Successfully connected to Postgres")
|
||||
return handler, nil
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -342,9 +303,7 @@ func buildBaseConnectionString(params connectionParams) string {
|
|||
return connStr
|
||||
}
|
||||
|
||||
func (s *Service) generateConnectionString(dsInfo sqleng.DataSourceInfo, tlsSettings tlsSettings, isPGX bool) (string, error) {
|
||||
logger := s.logger
|
||||
|
||||
func generateConnectionString(dsInfo sqleng.DataSourceInfo, tlsSettings tlsSettings, isPGX bool, logger log.Logger) (string, error) {
|
||||
params, err := parseConnectionParams(dsInfo, logger)
|
||||
if err != nil {
|
||||
return "", err
|
||||
|
@ -387,15 +346,6 @@ func (t *postgresQueryResultTransformer) TransformQueryError(_ log.Logger, err e
|
|||
return err
|
||||
}
|
||||
|
||||
// CheckHealth pings the connected SQL database
|
||||
func (s *Service) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) {
|
||||
dsHandler, err := s.getDSInfo(ctx, req.PluginContext)
|
||||
if err != nil {
|
||||
return sqleng.ErrToHealthCheckResult(err)
|
||||
}
|
||||
return dsHandler.CheckHealth(ctx, req, s.features)
|
||||
}
|
||||
|
||||
func (t *postgresQueryResultTransformer) GetConverterList() []sqlutil.StringConverter {
|
||||
return []sqlutil.StringConverter{
|
||||
{
|
||||
|
|
|
@ -186,7 +186,7 @@ func TestIntegrationPostgresPGXSnapshots(t *testing.T) {
|
|||
|
||||
query := makeQuery(rawSQL, test.format)
|
||||
|
||||
result, err := handler.QueryDataPGX(context.Background(), &query)
|
||||
result, err := handler.QueryData(context.Background(), &query)
|
||||
require.Len(t, result.Responses, 1)
|
||||
response, found := result.Responses["A"]
|
||||
require.True(t, found)
|
||||
|
|
|
@ -151,10 +151,6 @@ func TestIntegrationGenerateConnectionStringPGX(t *testing.T) {
|
|||
}
|
||||
for _, tt := range testCases {
|
||||
t.Run(tt.desc, func(t *testing.T) {
|
||||
svc := Service{
|
||||
logger: backend.NewLoggerWith("logger", "tsdb.postgres"),
|
||||
}
|
||||
|
||||
ds := sqleng.DataSourceInfo{
|
||||
URL: tt.host,
|
||||
User: tt.user,
|
||||
|
@ -162,8 +158,9 @@ func TestIntegrationGenerateConnectionStringPGX(t *testing.T) {
|
|||
Database: tt.database,
|
||||
UID: tt.uid,
|
||||
}
|
||||
logger := backend.NewLoggerWith("logger", "tsdb.postgres")
|
||||
|
||||
connStr, err := svc.generateConnectionString(ds, tt.tlsSettings, false)
|
||||
connStr, err := generateConnectionString(ds, tt.tlsSettings, false, logger)
|
||||
|
||||
if tt.expErr == "" {
|
||||
require.NoError(t, err, tt.desc)
|
||||
|
@ -284,7 +281,7 @@ func TestIntegrationPostgresPGX(t *testing.T) {
|
|||
},
|
||||
},
|
||||
}
|
||||
resp, err := exe.QueryDataPGX(t.Context(), query)
|
||||
resp, err := exe.QueryData(t.Context(), query)
|
||||
require.NoError(t, err)
|
||||
queryResult := resp.Responses["A"]
|
||||
require.NoError(t, queryResult.Error)
|
||||
|
@ -383,7 +380,7 @@ func TestIntegrationPostgresPGX(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
resp, err := exe.QueryDataPGX(t.Context(), query)
|
||||
resp, err := exe.QueryData(t.Context(), query)
|
||||
require.NoError(t, err)
|
||||
queryResult := resp.Responses["A"]
|
||||
require.NoError(t, queryResult.Error)
|
||||
|
@ -426,7 +423,7 @@ func TestIntegrationPostgresPGX(t *testing.T) {
|
|||
},
|
||||
},
|
||||
}
|
||||
resp, err := exe.QueryDataPGX(t.Context(), query)
|
||||
resp, err := exe.QueryData(t.Context(), query)
|
||||
require.NoError(t, err)
|
||||
queryResult := resp.Responses["A"]
|
||||
require.NoError(t, queryResult.Error)
|
||||
|
@ -460,7 +457,7 @@ func TestIntegrationPostgresPGX(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
resp, err := exe.QueryDataPGX(t.Context(), query)
|
||||
resp, err := exe.QueryData(t.Context(), query)
|
||||
require.NoError(t, err)
|
||||
queryResult := resp.Responses["A"]
|
||||
frames := queryResult.Frames
|
||||
|
@ -488,7 +485,7 @@ func TestIntegrationPostgresPGX(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
resp, err := exe.QueryDataPGX(t.Context(), query)
|
||||
resp, err := exe.QueryData(t.Context(), query)
|
||||
require.NoError(t, err)
|
||||
queryResult := resp.Responses["A"]
|
||||
require.NoError(t, queryResult.Error)
|
||||
|
@ -542,7 +539,7 @@ func TestIntegrationPostgresPGX(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
resp, err := exe.QueryDataPGX(t.Context(), query)
|
||||
resp, err := exe.QueryData(t.Context(), query)
|
||||
require.NoError(t, err)
|
||||
queryResult := resp.Responses["A"]
|
||||
require.NoError(t, queryResult.Error)
|
||||
|
@ -589,7 +586,7 @@ func TestIntegrationPostgresPGX(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
resp, err := exe.QueryDataPGX(t.Context(), query)
|
||||
resp, err := exe.QueryData(t.Context(), query)
|
||||
require.NoError(t, err)
|
||||
queryResult := resp.Responses["A"]
|
||||
require.NoError(t, queryResult.Error)
|
||||
|
@ -624,7 +621,7 @@ func TestIntegrationPostgresPGX(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
resp, err := exe.QueryDataPGX(t.Context(), query)
|
||||
resp, err := exe.QueryData(t.Context(), query)
|
||||
require.NoError(t, err)
|
||||
queryResult := resp.Responses["A"]
|
||||
require.NoError(t, queryResult.Error)
|
||||
|
@ -741,7 +738,7 @@ func TestIntegrationPostgresPGX(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
resp, err := exe.QueryDataPGX(t.Context(), query)
|
||||
resp, err := exe.QueryData(t.Context(), query)
|
||||
require.NoError(t, err)
|
||||
queryResult := resp.Responses["A"]
|
||||
require.NoError(t, queryResult.Error)
|
||||
|
@ -765,7 +762,7 @@ func TestIntegrationPostgresPGX(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
resp, err := exe.QueryDataPGX(t.Context(), query)
|
||||
resp, err := exe.QueryData(t.Context(), query)
|
||||
require.NoError(t, err)
|
||||
queryResult := resp.Responses["A"]
|
||||
require.NoError(t, queryResult.Error)
|
||||
|
@ -789,7 +786,7 @@ func TestIntegrationPostgresPGX(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
resp, err := exe.QueryDataPGX(t.Context(), query)
|
||||
resp, err := exe.QueryData(t.Context(), query)
|
||||
require.NoError(t, err)
|
||||
queryResult := resp.Responses["A"]
|
||||
require.NoError(t, queryResult.Error)
|
||||
|
@ -813,7 +810,7 @@ func TestIntegrationPostgresPGX(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
resp, err := exe.QueryDataPGX(t.Context(), query)
|
||||
resp, err := exe.QueryData(t.Context(), query)
|
||||
require.NoError(t, err)
|
||||
queryResult := resp.Responses["A"]
|
||||
require.NoError(t, queryResult.Error)
|
||||
|
@ -837,7 +834,7 @@ func TestIntegrationPostgresPGX(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
resp, err := exe.QueryDataPGX(t.Context(), query)
|
||||
resp, err := exe.QueryData(t.Context(), query)
|
||||
require.NoError(t, err)
|
||||
queryResult := resp.Responses["A"]
|
||||
require.NoError(t, queryResult.Error)
|
||||
|
@ -861,7 +858,7 @@ func TestIntegrationPostgresPGX(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
resp, err := exe.QueryDataPGX(t.Context(), query)
|
||||
resp, err := exe.QueryData(t.Context(), query)
|
||||
require.NoError(t, err)
|
||||
queryResult := resp.Responses["A"]
|
||||
require.NoError(t, queryResult.Error)
|
||||
|
@ -885,7 +882,7 @@ func TestIntegrationPostgresPGX(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
resp, err := exe.QueryDataPGX(t.Context(), query)
|
||||
resp, err := exe.QueryData(t.Context(), query)
|
||||
require.NoError(t, err)
|
||||
queryResult := resp.Responses["A"]
|
||||
require.NoError(t, queryResult.Error)
|
||||
|
@ -910,7 +907,7 @@ func TestIntegrationPostgresPGX(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
resp, err := exe.QueryDataPGX(t.Context(), query)
|
||||
resp, err := exe.QueryData(t.Context(), query)
|
||||
require.NoError(t, err)
|
||||
queryResult := resp.Responses["A"]
|
||||
require.NoError(t, queryResult.Error)
|
||||
|
@ -934,7 +931,7 @@ func TestIntegrationPostgresPGX(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
resp, err := exe.QueryDataPGX(t.Context(), query)
|
||||
resp, err := exe.QueryData(t.Context(), query)
|
||||
require.NoError(t, err)
|
||||
queryResult := resp.Responses["A"]
|
||||
require.NoError(t, queryResult.Error)
|
||||
|
@ -959,7 +956,7 @@ func TestIntegrationPostgresPGX(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
resp, err := exe.QueryDataPGX(t.Context(), query)
|
||||
resp, err := exe.QueryData(t.Context(), query)
|
||||
require.NoError(t, err)
|
||||
queryResult := resp.Responses["A"]
|
||||
require.NoError(t, queryResult.Error)
|
||||
|
@ -991,7 +988,7 @@ func TestIntegrationPostgresPGX(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
resp, err := exe.QueryDataPGX(t.Context(), query)
|
||||
resp, err := exe.QueryData(t.Context(), query)
|
||||
require.NoError(t, err)
|
||||
queryResult := resp.Responses["A"]
|
||||
require.NoError(t, queryResult.Error)
|
||||
|
@ -1026,7 +1023,7 @@ func TestIntegrationPostgresPGX(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
resp, err := exe.QueryDataPGX(t.Context(), query)
|
||||
resp, err := exe.QueryData(t.Context(), query)
|
||||
require.NoError(t, err)
|
||||
queryResult := resp.Responses["A"]
|
||||
require.NoError(t, queryResult.Error)
|
||||
|
@ -1086,7 +1083,7 @@ func TestIntegrationPostgresPGX(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
resp, err := exe.QueryDataPGX(t.Context(), query)
|
||||
resp, err := exe.QueryData(t.Context(), query)
|
||||
require.NoError(t, err)
|
||||
|
||||
queryResult := resp.Responses["Deploys"]
|
||||
|
@ -1113,7 +1110,7 @@ func TestIntegrationPostgresPGX(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
resp, err := exe.QueryDataPGX(t.Context(), query)
|
||||
resp, err := exe.QueryData(t.Context(), query)
|
||||
require.NoError(t, err)
|
||||
|
||||
queryResult := resp.Responses["Tickets"]
|
||||
|
@ -1136,7 +1133,7 @@ func TestIntegrationPostgresPGX(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
resp, err := exe.QueryDataPGX(t.Context(), query)
|
||||
resp, err := exe.QueryData(t.Context(), query)
|
||||
require.NoError(t, err)
|
||||
queryResult := resp.Responses["A"]
|
||||
require.NoError(t, queryResult.Error)
|
||||
|
@ -1161,7 +1158,7 @@ func TestIntegrationPostgresPGX(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
resp, err := exe.QueryDataPGX(t.Context(), query)
|
||||
resp, err := exe.QueryData(t.Context(), query)
|
||||
require.NoError(t, err)
|
||||
queryResult := resp.Responses["A"]
|
||||
require.NoError(t, queryResult.Error)
|
||||
|
@ -1186,7 +1183,7 @@ func TestIntegrationPostgresPGX(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
resp, err := exe.QueryDataPGX(t.Context(), query)
|
||||
resp, err := exe.QueryData(t.Context(), query)
|
||||
require.NoError(t, err)
|
||||
queryResult := resp.Responses["A"]
|
||||
require.NoError(t, queryResult.Error)
|
||||
|
@ -1212,7 +1209,7 @@ func TestIntegrationPostgresPGX(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
resp, err := exe.QueryDataPGX(t.Context(), query)
|
||||
resp, err := exe.QueryData(t.Context(), query)
|
||||
require.NoError(t, err)
|
||||
queryResult := resp.Responses["A"]
|
||||
require.NoError(t, queryResult.Error)
|
||||
|
@ -1238,7 +1235,7 @@ func TestIntegrationPostgresPGX(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
resp, err := exe.QueryDataPGX(t.Context(), query)
|
||||
resp, err := exe.QueryData(t.Context(), query)
|
||||
require.NoError(t, err)
|
||||
queryResult := resp.Responses["A"]
|
||||
require.NoError(t, queryResult.Error)
|
||||
|
@ -1264,7 +1261,7 @@ func TestIntegrationPostgresPGX(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
resp, err := exe.QueryDataPGX(t.Context(), query)
|
||||
resp, err := exe.QueryData(t.Context(), query)
|
||||
require.NoError(t, err)
|
||||
queryResult := resp.Responses["A"]
|
||||
require.NoError(t, queryResult.Error)
|
||||
|
@ -1290,7 +1287,7 @@ func TestIntegrationPostgresPGX(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
resp, err := exe.QueryDataPGX(t.Context(), query)
|
||||
resp, err := exe.QueryData(t.Context(), query)
|
||||
require.NoError(t, err)
|
||||
queryResult := resp.Responses["A"]
|
||||
require.NoError(t, queryResult.Error)
|
||||
|
@ -1338,7 +1335,7 @@ func TestIntegrationPostgresPGX(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
resp, err := handler.QueryDataPGX(t.Context(), query)
|
||||
resp, err := handler.QueryData(t.Context(), query)
|
||||
require.NoError(t, err)
|
||||
queryResult := resp.Responses["A"]
|
||||
require.NoError(t, queryResult.Error)
|
||||
|
@ -1368,7 +1365,7 @@ func TestIntegrationPostgresPGX(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
resp, err := handler.QueryDataPGX(t.Context(), query)
|
||||
resp, err := handler.QueryData(t.Context(), query)
|
||||
require.NoError(t, err)
|
||||
queryResult := resp.Responses["A"]
|
||||
require.NoError(t, queryResult.Error)
|
||||
|
@ -1406,7 +1403,7 @@ func TestIntegrationPostgresPGX(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
resp, err := exe.QueryDataPGX(t.Context(), query)
|
||||
resp, err := exe.QueryData(t.Context(), query)
|
||||
require.NoError(t, err)
|
||||
queryResult := resp.Responses["A"]
|
||||
|
||||
|
@ -1453,7 +1450,7 @@ func TestIntegrationPostgresPGX(t *testing.T) {
|
|||
}
|
||||
|
||||
// This should not panic and should work correctly
|
||||
resp, err := exe.QueryDataPGX(t.Context(), query)
|
||||
resp, err := exe.QueryData(t.Context(), query)
|
||||
require.NoError(t, err)
|
||||
queryResult := resp.Responses["A"]
|
||||
require.NoError(t, queryResult.Error)
|
||||
|
@ -1488,7 +1485,7 @@ func TestIntegrationPostgresPGX(t *testing.T) {
|
|||
}
|
||||
|
||||
// This should not panic anymore, but should return an error instead
|
||||
resp, err := exe.QueryDataPGX(t.Context(), query)
|
||||
resp, err := exe.QueryData(t.Context(), query)
|
||||
require.NoError(t, err)
|
||||
queryResult := resp.Responses["A"]
|
||||
|
||||
|
@ -1517,7 +1514,7 @@ func TestIntegrationPostgresPGX(t *testing.T) {
|
|||
}
|
||||
|
||||
// This should not panic anymore, but should return an error instead
|
||||
resp, err := exe.QueryDataPGX(t.Context(), query)
|
||||
resp, err := exe.QueryData(t.Context(), query)
|
||||
require.NoError(t, err)
|
||||
queryResult := resp.Responses["A"]
|
||||
|
||||
|
@ -1546,7 +1543,7 @@ func TestIntegrationPostgresPGX(t *testing.T) {
|
|||
}
|
||||
|
||||
// This should not panic
|
||||
resp, err := exe.QueryDataPGX(t.Context(), query)
|
||||
resp, err := exe.QueryData(t.Context(), query)
|
||||
require.NoError(t, err)
|
||||
queryResult := resp.Responses["A"]
|
||||
require.NoError(t, queryResult.Error)
|
||||
|
|
|
@ -0,0 +1,80 @@
|
|||
package postgres
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend/datasource"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
|
||||
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
sqlengpgx "github.com/grafana/grafana/pkg/tsdb/grafana-postgresql-datasource/pgx"
|
||||
"github.com/grafana/grafana/pkg/tsdb/grafana-postgresql-datasource/sqleng"
|
||||
)
|
||||
|
||||
type Service struct {
|
||||
im instancemgmt.InstanceManager
|
||||
features featuremgmt.FeatureToggles
|
||||
}
|
||||
|
||||
func ProvideService(cfg *setting.Cfg, features featuremgmt.FeatureToggles) *Service {
|
||||
logger := backend.NewLoggerWith("logger", "tsdb.postgres")
|
||||
s := &Service{
|
||||
im: datasource.NewInstanceManager(NewInstanceSettings(logger, features, cfg.DataPath)),
|
||||
features: features,
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
// NOTE: do not put any business logic into this method. it's whole job is to forward the call "inside"
|
||||
func (s *Service) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) {
|
||||
if s.features.IsEnabled(ctx, featuremgmt.FlagPostgresDSUsePGX) {
|
||||
dsHandler, err := s.getDSInfoPGX(ctx, req.PluginContext)
|
||||
if err != nil {
|
||||
return sqlengpgx.ErrToHealthCheckResult(err)
|
||||
}
|
||||
return dsHandler.CheckHealth(ctx, req)
|
||||
} else {
|
||||
dsHandler, err := s.getDSInfo(ctx, req.PluginContext)
|
||||
if err != nil {
|
||||
return sqleng.ErrToHealthCheckResult(err)
|
||||
}
|
||||
return dsHandler.CheckHealth(ctx, req)
|
||||
}
|
||||
}
|
||||
|
||||
// NOTE: do not put any business logic into this method. it's whole job is to forward the call "inside"
|
||||
func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
|
||||
if s.features.IsEnabled(ctx, featuremgmt.FlagPostgresDSUsePGX) {
|
||||
dsInfo, err := s.getDSInfoPGX(ctx, req.PluginContext)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return dsInfo.QueryData(ctx, req)
|
||||
} else {
|
||||
dsInfo, err := s.getDSInfo(ctx, req.PluginContext)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return dsInfo.QueryData(ctx, req)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Service) getDSInfo(ctx context.Context, pluginCtx backend.PluginContext) (*sqleng.DataSourceHandler, error) {
|
||||
i, err := s.im.Get(ctx, pluginCtx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
instance := i.(*sqleng.DataSourceHandler)
|
||||
return instance, nil
|
||||
}
|
||||
|
||||
func (s *Service) getDSInfoPGX(ctx context.Context, pluginCtx backend.PluginContext) (*sqlengpgx.DataSourceHandler, error) {
|
||||
i, err := s.im.Get(ctx, pluginCtx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
instance := i.(*sqlengpgx.DataSourceHandler)
|
||||
return instance, nil
|
||||
}
|
|
@ -156,10 +156,7 @@ func TestIntegrationGenerateConnectionString(t *testing.T) {
|
|||
}
|
||||
for _, tt := range testCases {
|
||||
t.Run(tt.desc, func(t *testing.T) {
|
||||
svc := Service{
|
||||
tlsManager: &tlsTestManager{settings: tt.tlsSettings},
|
||||
logger: backend.NewLoggerWith("logger", "tsdb.postgres"),
|
||||
}
|
||||
logger := backend.NewLoggerWith("logger", "tsdb.postgres")
|
||||
|
||||
ds := sqleng.DataSourceInfo{
|
||||
URL: tt.host,
|
||||
|
@ -169,7 +166,7 @@ func TestIntegrationGenerateConnectionString(t *testing.T) {
|
|||
UID: tt.uid,
|
||||
}
|
||||
|
||||
connStr, err := svc.generateConnectionString(ds, tt.tlsSettings, false)
|
||||
connStr, err := generateConnectionString(ds, tt.tlsSettings, false, logger)
|
||||
|
||||
if tt.expErr == "" {
|
||||
require.NoError(t, err, tt.desc)
|
||||
|
@ -1409,14 +1406,6 @@ func genTimeRangeByInterval(from time.Time, duration time.Duration, interval tim
|
|||
return timeRange
|
||||
}
|
||||
|
||||
type tlsTestManager struct {
|
||||
settings tlsSettings
|
||||
}
|
||||
|
||||
func (m *tlsTestManager) getTLSSettings(dsInfo sqleng.DataSourceInfo) (tlsSettings, error) {
|
||||
return m.settings, nil
|
||||
}
|
||||
|
||||
func isTestDbPostgres() bool {
|
||||
if db, present := os.LookupEnv("GRAFANA_TEST_DB"); present {
|
||||
return db == "postgres"
|
||||
|
|
|
@ -10,17 +10,11 @@ import (
|
|||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
"github.com/lib/pq"
|
||||
)
|
||||
|
||||
func (e *DataSourceHandler) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest, features featuremgmt.FeatureToggles) (*backend.CheckHealthResult, error) {
|
||||
var err error
|
||||
if features.IsEnabled(ctx, featuremgmt.FlagPostgresDSUsePGX) {
|
||||
err = e.PingPGX(ctx)
|
||||
} else {
|
||||
err = e.Ping()
|
||||
}
|
||||
func (e *DataSourceHandler) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) {
|
||||
err := e.Ping()
|
||||
if err != nil {
|
||||
logCheckHealthError(ctx, e.dsInfo, err)
|
||||
if strings.EqualFold(req.PluginContext.User.Role, "Admin") {
|
||||
|
|
|
@ -19,7 +19,6 @@ import (
|
|||
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data/sqlutil"
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
)
|
||||
|
||||
// MetaKeyExecutedQueryString is the key where the executed query should get stored
|
||||
|
@ -89,7 +88,6 @@ type DataSourceHandler struct {
|
|||
dsInfo DataSourceInfo
|
||||
rowLimit int64
|
||||
userError string
|
||||
pool *pgxpool.Pool
|
||||
}
|
||||
|
||||
type QueryJson struct {
|
||||
|
@ -490,7 +488,6 @@ type dataQueryModel struct {
|
|||
Interval time.Duration
|
||||
columnNames []string
|
||||
columnTypes []*sql.ColumnType
|
||||
columnTypesPGX []string
|
||||
timeIndex int
|
||||
timeEndIndex int
|
||||
metricIndex int
|
||||
|
|
|
@ -9,8 +9,6 @@ import (
|
|||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data/sqlutil"
|
||||
"github.com/jackc/pgx/v5/pgconn"
|
||||
"github.com/jackc/pgx/v5/pgtype"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
|
@ -427,246 +425,6 @@ func TestSQLEngine(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestConvertResultsToFrame(t *testing.T) {
|
||||
// Import the pgx packages needed for testing
|
||||
// These imports are included in the main file but need to be accessible for tests
|
||||
t.Run("convertResultsToFrame with single result", func(t *testing.T) {
|
||||
// Create mock field descriptions
|
||||
fieldDescs := []pgconn.FieldDescription{
|
||||
{Name: "id", DataTypeOID: pgtype.Int4OID},
|
||||
{Name: "name", DataTypeOID: pgtype.TextOID},
|
||||
{Name: "value", DataTypeOID: pgtype.Float8OID},
|
||||
}
|
||||
|
||||
// Create mock result data
|
||||
mockRows := [][][]byte{
|
||||
{[]byte("1"), []byte("test1"), []byte("10.5")},
|
||||
{[]byte("2"), []byte("test2"), []byte("20.7")},
|
||||
}
|
||||
|
||||
// Create mock result
|
||||
result := &pgconn.Result{
|
||||
FieldDescriptions: fieldDescs,
|
||||
Rows: mockRows,
|
||||
}
|
||||
result.CommandTag = pgconn.NewCommandTag("SELECT 2")
|
||||
|
||||
results := []*pgconn.Result{result}
|
||||
|
||||
frame, err := convertResultsToFrame(results, 1000)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, frame)
|
||||
require.Equal(t, 3, len(frame.Fields))
|
||||
require.Equal(t, 2, frame.Rows())
|
||||
|
||||
// Verify field names
|
||||
require.Equal(t, "id", frame.Fields[0].Name)
|
||||
require.Equal(t, "name", frame.Fields[1].Name)
|
||||
require.Equal(t, "value", frame.Fields[2].Name)
|
||||
})
|
||||
|
||||
t.Run("convertResultsToFrame with multiple compatible results", func(t *testing.T) {
|
||||
// Create mock field descriptions (same structure for both results)
|
||||
fieldDescs := []pgconn.FieldDescription{
|
||||
{Name: "id", DataTypeOID: pgtype.Int4OID},
|
||||
{Name: "name", DataTypeOID: pgtype.TextOID},
|
||||
}
|
||||
|
||||
// Create first result
|
||||
mockRows1 := [][][]byte{
|
||||
{[]byte("1"), []byte("test1")},
|
||||
{[]byte("2"), []byte("test2")},
|
||||
}
|
||||
result1 := &pgconn.Result{
|
||||
FieldDescriptions: fieldDescs,
|
||||
Rows: mockRows1,
|
||||
}
|
||||
result1.CommandTag = pgconn.NewCommandTag("SELECT 2")
|
||||
|
||||
// Create second result with same structure
|
||||
mockRows2 := [][][]byte{
|
||||
{[]byte("3"), []byte("test3")},
|
||||
{[]byte("4"), []byte("test4")},
|
||||
}
|
||||
result2 := &pgconn.Result{
|
||||
FieldDescriptions: fieldDescs,
|
||||
Rows: mockRows2,
|
||||
}
|
||||
result2.CommandTag = pgconn.NewCommandTag("SELECT 2")
|
||||
|
||||
results := []*pgconn.Result{result1, result2}
|
||||
|
||||
frame, err := convertResultsToFrame(results, 1000)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, frame)
|
||||
require.Equal(t, 2, len(frame.Fields))
|
||||
require.Equal(t, 4, frame.Rows()) // Should have rows from both results
|
||||
|
||||
// Verify field names
|
||||
require.Equal(t, "id", frame.Fields[0].Name)
|
||||
require.Equal(t, "name", frame.Fields[1].Name)
|
||||
})
|
||||
|
||||
t.Run("convertResultsToFrame with row limit", func(t *testing.T) {
|
||||
// Create mock field descriptions
|
||||
fieldDescs := []pgconn.FieldDescription{
|
||||
{Name: "id", DataTypeOID: pgtype.Int4OID},
|
||||
}
|
||||
|
||||
// Create mock result data with 3 rows
|
||||
mockRows := [][][]byte{
|
||||
{[]byte("1")},
|
||||
{[]byte("2")},
|
||||
{[]byte("3")},
|
||||
}
|
||||
|
||||
result := &pgconn.Result{
|
||||
FieldDescriptions: fieldDescs,
|
||||
Rows: mockRows,
|
||||
}
|
||||
result.CommandTag = pgconn.NewCommandTag("SELECT 3")
|
||||
|
||||
results := []*pgconn.Result{result}
|
||||
|
||||
// Set row limit to 2
|
||||
frame, err := convertResultsToFrame(results, 2)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, frame)
|
||||
require.Equal(t, 1, len(frame.Fields))
|
||||
require.Equal(t, 2, frame.Rows()) // Should be limited to 2 rows
|
||||
|
||||
// Should have a notice about the limit
|
||||
require.NotNil(t, frame.Meta)
|
||||
require.Len(t, frame.Meta.Notices, 1)
|
||||
require.Contains(t, frame.Meta.Notices[0].Text, "Results have been limited to 2")
|
||||
})
|
||||
|
||||
t.Run("convertResultsToFrame with mixed SELECT and non-SELECT results", func(t *testing.T) {
|
||||
// Create a non-SELECT result (should be skipped)
|
||||
nonSelectResult := &pgconn.Result{}
|
||||
nonSelectResult.CommandTag = pgconn.NewCommandTag("UPDATE 1")
|
||||
|
||||
// Create a SELECT result
|
||||
fieldDescs := []pgconn.FieldDescription{
|
||||
{Name: "id", DataTypeOID: pgtype.Int4OID},
|
||||
}
|
||||
mockRows := [][][]byte{
|
||||
{[]byte("1")},
|
||||
}
|
||||
selectResult := &pgconn.Result{
|
||||
FieldDescriptions: fieldDescs,
|
||||
Rows: mockRows,
|
||||
}
|
||||
selectResult.CommandTag = pgconn.NewCommandTag("SELECT 1")
|
||||
|
||||
results := []*pgconn.Result{nonSelectResult, selectResult}
|
||||
|
||||
frame, err := convertResultsToFrame(results, 1000)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, frame)
|
||||
require.Equal(t, 1, len(frame.Fields))
|
||||
require.Equal(t, 1, frame.Rows())
|
||||
})
|
||||
|
||||
t.Run("convertResultsToFrame with no SELECT results", func(t *testing.T) {
|
||||
// Create only non-SELECT results
|
||||
result1 := &pgconn.Result{}
|
||||
result1.CommandTag = pgconn.NewCommandTag("UPDATE 1")
|
||||
|
||||
result2 := &pgconn.Result{}
|
||||
result2.CommandTag = pgconn.NewCommandTag("INSERT 1")
|
||||
|
||||
results := []*pgconn.Result{result1, result2}
|
||||
|
||||
frame, err := convertResultsToFrame(results, 1000)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, frame)
|
||||
require.Equal(t, 0, len(frame.Fields))
|
||||
require.Equal(t, 0, frame.Rows())
|
||||
})
|
||||
|
||||
t.Run("convertResultsToFrame with multiple results and row limit per result", func(t *testing.T) {
|
||||
// Create mock field descriptions (same structure for both results)
|
||||
fieldDescs := []pgconn.FieldDescription{
|
||||
{Name: "id", DataTypeOID: pgtype.Int4OID},
|
||||
}
|
||||
|
||||
// Create first result with 3 rows
|
||||
mockRows1 := [][][]byte{
|
||||
{[]byte("1")},
|
||||
{[]byte("2")},
|
||||
{[]byte("3")},
|
||||
}
|
||||
result1 := &pgconn.Result{
|
||||
FieldDescriptions: fieldDescs,
|
||||
Rows: mockRows1,
|
||||
}
|
||||
result1.CommandTag = pgconn.NewCommandTag("SELECT 3")
|
||||
|
||||
// Create second result with 3 rows
|
||||
mockRows2 := [][][]byte{
|
||||
{[]byte("4")},
|
||||
{[]byte("5")},
|
||||
{[]byte("6")},
|
||||
}
|
||||
result2 := &pgconn.Result{
|
||||
FieldDescriptions: fieldDescs,
|
||||
Rows: mockRows2,
|
||||
}
|
||||
result2.CommandTag = pgconn.NewCommandTag("SELECT 3")
|
||||
|
||||
results := []*pgconn.Result{result1, result2}
|
||||
|
||||
// Set row limit to 2 (should limit each result to 2 rows)
|
||||
frame, err := convertResultsToFrame(results, 2)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, frame)
|
||||
require.Equal(t, 1, len(frame.Fields))
|
||||
require.Equal(t, 4, frame.Rows()) // 2 rows from each result
|
||||
|
||||
// Should have notices about the limit from both results
|
||||
require.NotNil(t, frame.Meta)
|
||||
require.Len(t, frame.Meta.Notices, 2)
|
||||
require.Contains(t, frame.Meta.Notices[0].Text, "Results have been limited to 2")
|
||||
require.Contains(t, frame.Meta.Notices[1].Text, "Results have been limited to 2")
|
||||
})
|
||||
|
||||
t.Run("convertResultsToFrame handles null values correctly", func(t *testing.T) {
|
||||
// Create mock field descriptions
|
||||
fieldDescs := []pgconn.FieldDescription{
|
||||
{Name: "id", DataTypeOID: pgtype.Int4OID},
|
||||
{Name: "name", DataTypeOID: pgtype.TextOID},
|
||||
}
|
||||
|
||||
// Create mock result data with null values
|
||||
mockRows := [][][]byte{
|
||||
{[]byte("1"), nil}, // null name
|
||||
{nil, []byte("test2")}, // null id
|
||||
}
|
||||
|
||||
result := &pgconn.Result{
|
||||
FieldDescriptions: fieldDescs,
|
||||
Rows: mockRows,
|
||||
}
|
||||
result.CommandTag = pgconn.NewCommandTag("SELECT 2")
|
||||
|
||||
results := []*pgconn.Result{result}
|
||||
|
||||
frame, err := convertResultsToFrame(results, 1000)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, frame)
|
||||
require.Equal(t, 2, len(frame.Fields))
|
||||
require.Equal(t, 2, frame.Rows())
|
||||
|
||||
// Check that null values are handled correctly
|
||||
// The exact representation depends on the field type, but should not panic
|
||||
require.NotPanics(t, func() {
|
||||
frame.Fields[0].At(1) // null id
|
||||
frame.Fields[1].At(0) // null name
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
type testQueryResultTransformer struct {
|
||||
transformQueryErrorWasCalled bool
|
||||
}
|
||||
|
|
|
@ -0,0 +1,25 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"os"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend/datasource"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
postgres "github.com/grafana/grafana/pkg/tsdb/grafana-postgresql-datasource"
|
||||
)
|
||||
|
||||
func main() {
|
||||
// No need to pass logger name, it will be set by the plugin SDK
|
||||
logger := backend.NewLoggerWith()
|
||||
// TODO: get rid of setting.NewCfg() and featuremgmt.FeatureToggles once PostgresDSUsePGX is removed
|
||||
cfg := setting.NewCfg()
|
||||
// We want to enable the feature toggle for api server
|
||||
features := featuremgmt.WithFeatures(featuremgmt.FlagPostgresDSUsePGX)
|
||||
if err := datasource.Manage("grafana-postgresql-datasource", postgres.NewInstanceSettings(logger, features, cfg.DataPath), datasource.ManageOpts{}); err != nil {
|
||||
log.DefaultLogger.Error(err.Error())
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
|
@ -2,6 +2,7 @@
|
|||
"type": "datasource",
|
||||
"name": "PostgreSQL",
|
||||
"id": "grafana-postgresql-datasource",
|
||||
"executable": "gpx_grafana-postgresql-datasource",
|
||||
"aliasIDs": ["postgres"],
|
||||
"category": "sql",
|
||||
|
||||
|
@ -21,6 +22,9 @@
|
|||
{ "name": "Documentation", "url": "https://grafana.com/docs/grafana/latest/datasources/postgres/" }
|
||||
]
|
||||
},
|
||||
"dependencies": {
|
||||
"grafanaDependency": ">=11.6.0"
|
||||
},
|
||||
|
||||
"alerting": true,
|
||||
"annotations": true,
|
||||
|
|
Loading…
Reference in New Issue