| 
									
										
										
										
											2018-03-23 23:50:16 +08:00
										 |  |  | package elasticsearch | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import ( | 
					
						
							|  |  |  | 	"context" | 
					
						
							| 
									
										
										
										
											2021-07-15 22:45:59 +08:00
										 |  |  | 	"encoding/json" | 
					
						
							|  |  |  | 	"errors" | 
					
						
							| 
									
										
										
										
											2018-03-23 23:50:16 +08:00
										 |  |  | 	"fmt" | 
					
						
							| 
									
										
										
										
											2018-05-23 20:36:41 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-07-15 22:45:59 +08:00
										 |  |  | 	"github.com/Masterminds/semver" | 
					
						
							|  |  |  | 	"github.com/grafana/grafana-plugin-sdk-go/backend" | 
					
						
							|  |  |  | 	"github.com/grafana/grafana-plugin-sdk-go/backend/datasource" | 
					
						
							|  |  |  | 	"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt" | 
					
						
							| 
									
										
										
										
											2021-05-20 05:53:41 +08:00
										 |  |  | 	"github.com/grafana/grafana/pkg/infra/httpclient" | 
					
						
							| 
									
										
										
										
											2021-07-15 22:45:59 +08:00
										 |  |  | 	"github.com/grafana/grafana/pkg/infra/log" | 
					
						
							| 
									
										
										
										
											2019-10-02 19:59:05 +08:00
										 |  |  | 	es "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client" | 
					
						
							| 
									
										
										
										
											2021-09-07 15:35:37 +08:00
										 |  |  | 	"github.com/grafana/grafana/pkg/tsdb/intervalv2" | 
					
						
							| 
									
										
										
										
											2018-03-23 23:50:16 +08:00
										 |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-07-15 22:45:59 +08:00
										 |  |  | var eslog = log.New("tsdb.elasticsearch") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | type Service struct { | 
					
						
							| 
									
										
										
										
											2022-01-21 01:16:22 +08:00
										 |  |  | 	httpClientProvider httpclient.Provider | 
					
						
							| 
									
										
										
										
											2021-09-07 15:35:37 +08:00
										 |  |  | 	intervalCalculator intervalv2.Calculator | 
					
						
							| 
									
										
										
										
											2021-08-25 21:11:22 +08:00
										 |  |  | 	im                 instancemgmt.InstanceManager | 
					
						
							| 
									
										
										
										
											2018-03-23 23:50:16 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-01-21 01:16:22 +08:00
										 |  |  | func ProvideService(httpClientProvider httpclient.Provider) *Service { | 
					
						
							| 
									
										
										
										
											2021-07-15 22:45:59 +08:00
										 |  |  | 	eslog.Debug("initializing") | 
					
						
							| 
									
										
										
										
											2021-08-25 21:11:22 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-07-15 22:45:59 +08:00
										 |  |  | 	return &Service{ | 
					
						
							| 
									
										
										
										
											2022-01-21 01:16:22 +08:00
										 |  |  | 		im:                 datasource.NewInstanceManager(newInstanceSettings()), | 
					
						
							|  |  |  | 		httpClientProvider: httpClientProvider, | 
					
						
							| 
									
										
										
										
											2021-09-07 15:35:37 +08:00
										 |  |  | 		intervalCalculator: intervalv2.NewCalculator(), | 
					
						
							| 
									
										
										
										
											2018-03-26 19:48:57 +08:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2021-07-15 22:45:59 +08:00
										 |  |  | } | 
					
						
							| 
									
										
										
										
											2018-03-26 19:48:57 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-07-15 22:45:59 +08:00
										 |  |  | func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) { | 
					
						
							|  |  |  | 	if len(req.Queries) == 0 { | 
					
						
							|  |  |  | 		return &backend.QueryDataResponse{}, fmt.Errorf("query contains no queries") | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	dsInfo, err := s.getDSInfo(req.PluginContext) | 
					
						
							| 
									
										
										
										
											2018-03-23 23:50:16 +08:00
										 |  |  | 	if err != nil { | 
					
						
							| 
									
										
										
										
											2021-07-15 22:45:59 +08:00
										 |  |  | 		return &backend.QueryDataResponse{}, err | 
					
						
							| 
									
										
										
										
											2018-03-23 23:50:16 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-01-21 01:16:22 +08:00
										 |  |  | 	client, err := es.NewClient(ctx, s.httpClientProvider, dsInfo, req.Queries[0].TimeRange) | 
					
						
							| 
									
										
										
										
											2021-07-15 22:45:59 +08:00
										 |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return &backend.QueryDataResponse{}, err | 
					
						
							| 
									
										
										
										
											2019-06-25 14:52:17 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-07-15 22:45:59 +08:00
										 |  |  | 	query := newTimeSeriesQuery(client, req.Queries, s.intervalCalculator) | 
					
						
							| 
									
										
										
										
											2018-05-23 21:09:58 +08:00
										 |  |  | 	return query.execute() | 
					
						
							| 
									
										
										
										
											2018-03-23 23:50:16 +08:00
										 |  |  | } | 
					
						
							| 
									
										
										
										
											2021-07-15 22:45:59 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | func newInstanceSettings() datasource.InstanceFactoryFunc { | 
					
						
							|  |  |  | 	return func(settings backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) { | 
					
						
							|  |  |  | 		jsonData := map[string]interface{}{} | 
					
						
							|  |  |  | 		err := json.Unmarshal(settings.JSONData, &jsonData) | 
					
						
							|  |  |  | 		if err != nil { | 
					
						
							|  |  |  | 			return nil, fmt.Errorf("error reading settings: %w", err) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		httpCliOpts, err := settings.HTTPClientOptions() | 
					
						
							|  |  |  | 		if err != nil { | 
					
						
							|  |  |  | 			return nil, fmt.Errorf("error getting http options: %w", err) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-09-27 20:32:19 +08:00
										 |  |  | 		// Set SigV4 service namespace
 | 
					
						
							|  |  |  | 		if httpCliOpts.SigV4 != nil { | 
					
						
							|  |  |  | 			httpCliOpts.SigV4.Service = "es" | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-07-15 22:45:59 +08:00
										 |  |  | 		version, err := coerceVersion(jsonData["esVersion"]) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		if err != nil { | 
					
						
							|  |  |  | 			return nil, fmt.Errorf("elasticsearch version is required, err=%v", err) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		timeField, ok := jsonData["timeField"].(string) | 
					
						
							|  |  |  | 		if !ok { | 
					
						
							|  |  |  | 			return nil, errors.New("timeField cannot be cast to string") | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		if timeField == "" { | 
					
						
							|  |  |  | 			return nil, errors.New("elasticsearch time field name is required") | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		interval, ok := jsonData["interval"].(string) | 
					
						
							|  |  |  | 		if !ok { | 
					
						
							|  |  |  | 			interval = "" | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		timeInterval, ok := jsonData["timeInterval"].(string) | 
					
						
							|  |  |  | 		if !ok { | 
					
						
							|  |  |  | 			timeInterval = "" | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		maxConcurrentShardRequests, ok := jsonData["maxConcurrentShardRequests"].(float64) | 
					
						
							|  |  |  | 		if !ok { | 
					
						
							|  |  |  | 			maxConcurrentShardRequests = 256 | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		includeFrozen, ok := jsonData["includeFrozen"].(bool) | 
					
						
							|  |  |  | 		if !ok { | 
					
						
							|  |  |  | 			includeFrozen = false | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		xpack, ok := jsonData["xpack"].(bool) | 
					
						
							|  |  |  | 		if !ok { | 
					
						
							|  |  |  | 			xpack = false | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		model := es.DatasourceInfo{ | 
					
						
							|  |  |  | 			ID:                         settings.ID, | 
					
						
							|  |  |  | 			URL:                        settings.URL, | 
					
						
							|  |  |  | 			HTTPClientOpts:             httpCliOpts, | 
					
						
							|  |  |  | 			Database:                   settings.Database, | 
					
						
							|  |  |  | 			MaxConcurrentShardRequests: int64(maxConcurrentShardRequests), | 
					
						
							|  |  |  | 			ESVersion:                  version, | 
					
						
							|  |  |  | 			TimeField:                  timeField, | 
					
						
							|  |  |  | 			Interval:                   interval, | 
					
						
							|  |  |  | 			TimeInterval:               timeInterval, | 
					
						
							|  |  |  | 			IncludeFrozen:              includeFrozen, | 
					
						
							|  |  |  | 			XPack:                      xpack, | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		return model, nil | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (s *Service) getDSInfo(pluginCtx backend.PluginContext) (*es.DatasourceInfo, error) { | 
					
						
							|  |  |  | 	i, err := s.im.Get(pluginCtx) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return nil, err | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	instance := i.(es.DatasourceInfo) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	return &instance, nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func coerceVersion(v interface{}) (*semver.Version, error) { | 
					
						
							|  |  |  | 	versionString, ok := v.(string) | 
					
						
							|  |  |  | 	if ok { | 
					
						
							|  |  |  | 		return semver.NewVersion(versionString) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	versionNumber, ok := v.(float64) | 
					
						
							|  |  |  | 	if !ok { | 
					
						
							|  |  |  | 		return nil, fmt.Errorf("elasticsearch version %v, cannot be cast to int", v) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Legacy version numbers (before Grafana 8)
 | 
					
						
							|  |  |  | 	// valid values were 2,5,56,60,70
 | 
					
						
							|  |  |  | 	switch int64(versionNumber) { | 
					
						
							|  |  |  | 	case 2: | 
					
						
							|  |  |  | 		return semver.NewVersion("2.0.0") | 
					
						
							|  |  |  | 	case 5: | 
					
						
							|  |  |  | 		return semver.NewVersion("5.0.0") | 
					
						
							|  |  |  | 	case 56: | 
					
						
							|  |  |  | 		return semver.NewVersion("5.6.0") | 
					
						
							|  |  |  | 	case 60: | 
					
						
							|  |  |  | 		return semver.NewVersion("6.0.0") | 
					
						
							|  |  |  | 	case 70: | 
					
						
							|  |  |  | 		return semver.NewVersion("7.0.0") | 
					
						
							|  |  |  | 	default: | 
					
						
							|  |  |  | 		return nil, fmt.Errorf("elasticsearch version=%d is not supported", int64(versionNumber)) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } |