mirror of https://github.com/grafana/grafana.git
				
				
				
			Elasticsearch: Add Support for Serial Differencing Pipeline Aggregation (#28618)
* Elasticsearch: Add support for serial diff pipeline aggregation * Removing settings transsforms * Removing unused deps * removing unused dep * Fixing type in test * Adding backend support for serial_diff
This commit is contained in:
		
							parent
							
								
									ccf2e255fe
								
							
						
					
					
						commit
						cf1c01dd8b
					
				|  | @ -48,6 +48,7 @@ var metricAggType = map[string]string{ | ||||||
| 	"moving_fn":      "Moving Function", | 	"moving_fn":      "Moving Function", | ||||||
| 	"cumulative_sum": "Cumulative Sum", | 	"cumulative_sum": "Cumulative Sum", | ||||||
| 	"derivative":     "Derivative", | 	"derivative":     "Derivative", | ||||||
|  | 	"serial_diff":    "Serial Difference", | ||||||
| 	"bucket_script":  "Bucket Script", | 	"bucket_script":  "Bucket Script", | ||||||
| 	"raw_document":   "Raw Document", | 	"raw_document":   "Raw Document", | ||||||
| } | } | ||||||
|  | @ -68,6 +69,7 @@ var pipelineAggType = map[string]string{ | ||||||
| 	"moving_fn":      "moving_fn", | 	"moving_fn":      "moving_fn", | ||||||
| 	"cumulative_sum": "cumulative_sum", | 	"cumulative_sum": "cumulative_sum", | ||||||
| 	"derivative":     "derivative", | 	"derivative":     "derivative", | ||||||
|  | 	"serial_diff":    "serial_diff", | ||||||
| 	"bucket_script":  "bucket_script", | 	"bucket_script":  "bucket_script", | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -650,6 +650,64 @@ func TestExecuteTimeSeriesQuery(t *testing.T) { | ||||||
| 			So(plAgg.BucketPath, ShouldEqual, "_count") | 			So(plAgg.BucketPath, ShouldEqual, "_count") | ||||||
| 		}) | 		}) | ||||||
| 
 | 
 | ||||||
|  | 		Convey("With serial_diff", func() { | ||||||
|  | 			c := newFakeClient(5) | ||||||
|  | 			_, err := executeTsdbQuery(c, `{ | ||||||
|  | 				"timeField": "@timestamp", | ||||||
|  | 				"bucketAggs": [ | ||||||
|  | 					{ "type": "date_histogram", "field": "@timestamp", "id": "4" } | ||||||
|  | 				], | ||||||
|  | 				"metrics": [ | ||||||
|  | 					{ "id": "3", "type": "sum", "field": "@value" }, | ||||||
|  | 					{ | ||||||
|  | 						"id": "2", | ||||||
|  | 						"type": "serial_diff", | ||||||
|  | 						"pipelineAgg": "3" | ||||||
|  | 					} | ||||||
|  | 				] | ||||||
|  | 			}`, from, to, 15*time.Second) | ||||||
|  | 			So(err, ShouldBeNil) | ||||||
|  | 			sr := c.multisearchRequests[0].Requests[0] | ||||||
|  | 
 | ||||||
|  | 			firstLevel := sr.Aggs[0] | ||||||
|  | 			So(firstLevel.Key, ShouldEqual, "4") | ||||||
|  | 			So(firstLevel.Aggregation.Type, ShouldEqual, "date_histogram") | ||||||
|  | 
 | ||||||
|  | 			serialDiffAgg := firstLevel.Aggregation.Aggs[1] | ||||||
|  | 			So(serialDiffAgg.Key, ShouldEqual, "2") | ||||||
|  | 			plAgg := serialDiffAgg.Aggregation.Aggregation.(*es.PipelineAggregation) | ||||||
|  | 			So(plAgg.BucketPath, ShouldEqual, "3") | ||||||
|  | 		}) | ||||||
|  | 
 | ||||||
|  | 		Convey("With serial_diff doc count", func() { | ||||||
|  | 			c := newFakeClient(5) | ||||||
|  | 			_, err := executeTsdbQuery(c, `{ | ||||||
|  | 				"timeField": "@timestamp", | ||||||
|  | 				"bucketAggs": [ | ||||||
|  | 					{ "type": "date_histogram", "field": "@timestamp", "id": "4" } | ||||||
|  | 				], | ||||||
|  | 				"metrics": [ | ||||||
|  | 					{ "id": "3", "type": "count", "field": "select field" }, | ||||||
|  | 					{ | ||||||
|  | 						"id": "2", | ||||||
|  | 						"type": "serial_diff", | ||||||
|  | 						"pipelineAgg": "3" | ||||||
|  | 					} | ||||||
|  | 				] | ||||||
|  | 			}`, from, to, 15*time.Second) | ||||||
|  | 			So(err, ShouldBeNil) | ||||||
|  | 			sr := c.multisearchRequests[0].Requests[0] | ||||||
|  | 
 | ||||||
|  | 			firstLevel := sr.Aggs[0] | ||||||
|  | 			So(firstLevel.Key, ShouldEqual, "4") | ||||||
|  | 			So(firstLevel.Aggregation.Type, ShouldEqual, "date_histogram") | ||||||
|  | 
 | ||||||
|  | 			serialDiffAgg := firstLevel.Aggregation.Aggs[0] | ||||||
|  | 			So(serialDiffAgg.Key, ShouldEqual, "2") | ||||||
|  | 			plAgg := serialDiffAgg.Aggregation.Aggregation.(*es.PipelineAggregation) | ||||||
|  | 			So(plAgg.BucketPath, ShouldEqual, "_count") | ||||||
|  | 		}) | ||||||
|  | 
 | ||||||
| 		Convey("With bucket_script", func() { | 		Convey("With bucket_script", func() { | ||||||
| 			c := newFakeClient(5) | 			c := newFakeClient(5) | ||||||
| 			_, err := executeTsdbQuery(c, `{ | 			_, err := executeTsdbQuery(c, `{ | ||||||
|  |  | ||||||
|  | @ -35,6 +35,15 @@ export const SettingsEditor: FunctionComponent<Props> = ({ metric, previousMetri | ||||||
|     <SettingsEditorContainer label={description} hidden={metric.hide}> |     <SettingsEditorContainer label={description} hidden={metric.hide}> | ||||||
|       {metric.type === 'derivative' && <SettingField label="Unit" metric={metric} settingName="unit" />} |       {metric.type === 'derivative' && <SettingField label="Unit" metric={metric} settingName="unit" />} | ||||||
| 
 | 
 | ||||||
|  |       {metric.type === 'serial_diff' && ( | ||||||
|  |         <InlineField label="Lag"> | ||||||
|  |           <Input | ||||||
|  |             onBlur={e => dispatch(changeMetricSetting(metric, 'lag', parseInt(e.target.value, 10)))} | ||||||
|  |             defaultValue={metric.settings?.lag} | ||||||
|  |           /> | ||||||
|  |         </InlineField> | ||||||
|  |       )} | ||||||
|  | 
 | ||||||
|       {metric.type === 'cumulative_sum' && <SettingField label="Format" metric={metric} settingName="format" />} |       {metric.type === 'cumulative_sum' && <SettingField label="Format" metric={metric} settingName="format" />} | ||||||
| 
 | 
 | ||||||
|       {metric.type === 'moving_avg' && <MovingAverageSettingsEditor metric={metric} />} |       {metric.type === 'moving_avg' && <MovingAverageSettingsEditor metric={metric} />} | ||||||
|  |  | ||||||
|  | @ -4,6 +4,7 @@ export type PipelineMetricAggregationType = | ||||||
|   | 'moving_avg' |   | 'moving_avg' | ||||||
|   | 'moving_fn' |   | 'moving_fn' | ||||||
|   | 'derivative' |   | 'derivative' | ||||||
|  |   | 'serial_diff' | ||||||
|   | 'cumulative_sum' |   | 'cumulative_sum' | ||||||
|   | 'bucket_script'; |   | 'bucket_script'; | ||||||
| 
 | 
 | ||||||
|  | @ -247,6 +248,13 @@ export interface Derivative extends BasePipelineMetricAggregation { | ||||||
|   }; |   }; | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | export interface SerialDiff extends BasePipelineMetricAggregation { | ||||||
|  |   type: 'serial_diff'; | ||||||
|  |   settings?: { | ||||||
|  |     lag?: number; | ||||||
|  |   }; | ||||||
|  | } | ||||||
|  | 
 | ||||||
| interface CumulativeSum extends BasePipelineMetricAggregation { | interface CumulativeSum extends BasePipelineMetricAggregation { | ||||||
|   type: 'cumulative_sum'; |   type: 'cumulative_sum'; | ||||||
|   settings?: { |   settings?: { | ||||||
|  | @ -267,6 +275,7 @@ export type MetricAggregationWithSettings = | ||||||
|   | BucketScript |   | BucketScript | ||||||
|   | CumulativeSum |   | CumulativeSum | ||||||
|   | Derivative |   | Derivative | ||||||
|  |   | SerialDiff | ||||||
|   | RawData |   | RawData | ||||||
|   | RawDocument |   | RawDocument | ||||||
|   | UniqueCount |   | UniqueCount | ||||||
|  | @ -336,6 +345,7 @@ export const METRIC_AGGREGATION_TYPES = [ | ||||||
|   'moving_avg', |   'moving_avg', | ||||||
|   'moving_fn', |   'moving_fn', | ||||||
|   'derivative', |   'derivative', | ||||||
|  |   'serial_diff', | ||||||
|   'cumulative_sum', |   'cumulative_sum', | ||||||
|   'bucket_script', |   'bucket_script', | ||||||
| ]; | ]; | ||||||
|  |  | ||||||
|  | @ -147,6 +147,18 @@ export const metricAggregationConfig: MetricsConfiguration = { | ||||||
|     hasMeta: false, |     hasMeta: false, | ||||||
|     defaults: {}, |     defaults: {}, | ||||||
|   }, |   }, | ||||||
|  |   serial_diff: { | ||||||
|  |     label: 'Serial Difference', | ||||||
|  |     requiresField: true, | ||||||
|  |     isPipelineAgg: true, | ||||||
|  |     minVersion: 2, | ||||||
|  |     supportsMissing: false, | ||||||
|  |     supportsMultipleBucketPaths: false, | ||||||
|  |     hasSettings: true, | ||||||
|  |     supportsInlineScript: false, | ||||||
|  |     hasMeta: false, | ||||||
|  |     defaults: {}, | ||||||
|  |   }, | ||||||
|   cumulative_sum: { |   cumulative_sum: { | ||||||
|     label: 'Cumulative Sum', |     label: 'Cumulative Sum', | ||||||
|     requiresField: true, |     requiresField: true, | ||||||
|  | @ -236,6 +248,7 @@ export const pipelineOptions: PipelineOptions = { | ||||||
|   ], |   ], | ||||||
|   moving_fn: [{ label: 'window', default: 5 }, { label: 'script' }], |   moving_fn: [{ label: 'window', default: 5 }, { label: 'script' }], | ||||||
|   derivative: [{ label: 'unit' }], |   derivative: [{ label: 'unit' }], | ||||||
|  |   serial_diff: [{ label: 'lag' }], | ||||||
|   cumulative_sum: [{ label: 'format' }], |   cumulative_sum: [{ label: 'format' }], | ||||||
|   bucket_script: [], |   bucket_script: [], | ||||||
| }; | }; | ||||||
|  |  | ||||||
|  | @ -387,6 +387,35 @@ describe('ElasticQueryBuilder', () => { | ||||||
|         expect(firstLevel.aggs['2'].derivative.buckets_path).toBe('_count'); |         expect(firstLevel.aggs['2'].derivative.buckets_path).toBe('_count'); | ||||||
|       }); |       }); | ||||||
| 
 | 
 | ||||||
|  |       it('with serial_diff', () => { | ||||||
|  |         const query = builder.build({ | ||||||
|  |           refId: 'A', | ||||||
|  |           metrics: [ | ||||||
|  |             { | ||||||
|  |               id: '3', | ||||||
|  |               type: 'max', | ||||||
|  |               field: '@value', | ||||||
|  |             }, | ||||||
|  |             { | ||||||
|  |               id: '2', | ||||||
|  |               type: 'serial_diff', | ||||||
|  |               field: '3', | ||||||
|  |               settings: { | ||||||
|  |                 lag: 5, | ||||||
|  |               }, | ||||||
|  |             }, | ||||||
|  |           ], | ||||||
|  |           bucketAggs: [{ type: 'date_histogram', field: '@timestamp', id: '3' }], | ||||||
|  |         }); | ||||||
|  | 
 | ||||||
|  |         const firstLevel = query.aggs['3']; | ||||||
|  | 
 | ||||||
|  |         expect(firstLevel.aggs['2']).not.toBe(undefined); | ||||||
|  |         expect(firstLevel.aggs['2'].serial_diff).not.toBe(undefined); | ||||||
|  |         expect(firstLevel.aggs['2'].serial_diff.buckets_path).toBe('3'); | ||||||
|  |         expect(firstLevel.aggs['2'].serial_diff.lag).toBe(5); | ||||||
|  |       }); | ||||||
|  | 
 | ||||||
|       it('with bucket_script', () => { |       it('with bucket_script', () => { | ||||||
|         const query = builder.build({ |         const query = builder.build({ | ||||||
|           refId: 'A', |           refId: 'A', | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue