[BUGFIX] PromQL: pass Context so spans parent correctly
Assigning to `evaluator.ctx` in `eval()` broke the parent-child relationship. Signed-off-by: Bryan Boreham <bjboreham@gmail.com>
This commit is contained in:
		
							parent
							
								
									4ffb74dae4
								
							
						
					
					
						commit
						8742077498
					
				
							
								
								
									
										103
									
								
								promql/engine.go
								
								
								
								
							
							
						
						
									
										103
									
								
								promql/engine.go
								
								
								
								
							|  | @ -713,7 +713,6 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.Eval | |||
| 			startTimestamp:           start, | ||||
| 			endTimestamp:             start, | ||||
| 			interval:                 1, | ||||
| 			ctx:                      ctxInnerEval, | ||||
| 			maxSamples:               ng.maxSamplesPerQuery, | ||||
| 			logger:                   ng.logger, | ||||
| 			lookbackDelta:            s.LookbackDelta, | ||||
|  | @ -723,7 +722,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.Eval | |||
| 		} | ||||
| 		query.sampleStats.InitStepTracking(start, start, 1) | ||||
| 
 | ||||
| 		val, warnings, err := evaluator.Eval(s.Expr) | ||||
| 		val, warnings, err := evaluator.Eval(ctxInnerEval, s.Expr) | ||||
| 
 | ||||
| 		evalSpanTimer.Finish() | ||||
| 
 | ||||
|  | @ -772,7 +771,6 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.Eval | |||
| 		startTimestamp:           timeMilliseconds(s.Start), | ||||
| 		endTimestamp:             timeMilliseconds(s.End), | ||||
| 		interval:                 durationMilliseconds(s.Interval), | ||||
| 		ctx:                      ctxInnerEval, | ||||
| 		maxSamples:               ng.maxSamplesPerQuery, | ||||
| 		logger:                   ng.logger, | ||||
| 		lookbackDelta:            s.LookbackDelta, | ||||
|  | @ -781,7 +779,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.Eval | |||
| 		enableDelayedNameRemoval: ng.enableDelayedNameRemoval, | ||||
| 	} | ||||
| 	query.sampleStats.InitStepTracking(evaluator.startTimestamp, evaluator.endTimestamp, evaluator.interval) | ||||
| 	val, warnings, err := evaluator.Eval(s.Expr) | ||||
| 	val, warnings, err := evaluator.Eval(ctxInnerEval, s.Expr) | ||||
| 
 | ||||
| 	evalSpanTimer.Finish() | ||||
| 
 | ||||
|  | @ -1029,8 +1027,6 @@ func (e errWithWarnings) Error() string { return e.err.Error() } | |||
| // querier and reports errors. On timeout or cancellation of its context it
 | ||||
| // terminates.
 | ||||
| type evaluator struct { | ||||
| 	ctx context.Context | ||||
| 
 | ||||
| 	startTimestamp int64 // Start time in milliseconds.
 | ||||
| 	endTimestamp   int64 // End time in milliseconds.
 | ||||
| 	interval       int64 // Interval in milliseconds.
 | ||||
|  | @ -1079,10 +1075,10 @@ func (ev *evaluator) recover(expr parser.Expr, ws *annotations.Annotations, errp | |||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (ev *evaluator) Eval(expr parser.Expr) (v parser.Value, ws annotations.Annotations, err error) { | ||||
| func (ev *evaluator) Eval(ctx context.Context, expr parser.Expr) (v parser.Value, ws annotations.Annotations, err error) { | ||||
| 	defer ev.recover(expr, &ws, &err) | ||||
| 
 | ||||
| 	v, ws = ev.eval(expr) | ||||
| 	v, ws = ev.eval(ctx, expr) | ||||
| 	if ev.enableDelayedNameRemoval { | ||||
| 		ev.cleanupMetricLabels(v) | ||||
| 	} | ||||
|  | @ -1133,7 +1129,7 @@ func (enh *EvalNodeHelper) resetBuilder(lbls labels.Labels) { | |||
| // function call results.
 | ||||
| // The prepSeries function (if provided) can be used to prepare the helper
 | ||||
| // for each series, then passed to each call funcCall.
 | ||||
| func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper), funcCall func([]parser.Value, [][]EvalSeriesHelper, *EvalNodeHelper) (Vector, annotations.Annotations), exprs ...parser.Expr) (Matrix, annotations.Annotations) { | ||||
| func (ev *evaluator) rangeEval(ctx context.Context, prepSeries func(labels.Labels, *EvalSeriesHelper), funcCall func([]parser.Value, [][]EvalSeriesHelper, *EvalNodeHelper) (Vector, annotations.Annotations), exprs ...parser.Expr) (Matrix, annotations.Annotations) { | ||||
| 	numSteps := int((ev.endTimestamp-ev.startTimestamp)/ev.interval) + 1 | ||||
| 	matrixes := make([]Matrix, len(exprs)) | ||||
| 	origMatrixes := make([]Matrix, len(exprs)) | ||||
|  | @ -1144,7 +1140,7 @@ func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper) | |||
| 		// Functions will take string arguments from the expressions, not the values.
 | ||||
| 		if e != nil && e.Type() != parser.ValueTypeString { | ||||
| 			// ev.currentSamples will be updated to the correct value within the ev.eval call.
 | ||||
| 			val, ws := ev.eval(e) | ||||
| 			val, ws := ev.eval(ctx, e) | ||||
| 			warnings.Merge(ws) | ||||
| 			matrixes[i] = val.(Matrix) | ||||
| 
 | ||||
|  | @ -1196,7 +1192,7 @@ func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper) | |||
| 	} | ||||
| 
 | ||||
| 	for ts := ev.startTimestamp; ts <= ev.endTimestamp; ts += ev.interval { | ||||
| 		if err := contextDone(ev.ctx, "expression evaluation"); err != nil { | ||||
| 		if err := contextDone(ctx, "expression evaluation"); err != nil { | ||||
| 			ev.error(err) | ||||
| 		} | ||||
| 		// Reset number of samples in memory after each timestamp.
 | ||||
|  | @ -1306,7 +1302,7 @@ func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper) | |||
| 	return mat, warnings | ||||
| } | ||||
| 
 | ||||
| func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping []string, inputMatrix Matrix, param float64) (Matrix, annotations.Annotations) { | ||||
| func (ev *evaluator) rangeEvalAgg(ctx context.Context, aggExpr *parser.AggregateExpr, sortedGrouping []string, inputMatrix Matrix, param float64) (Matrix, annotations.Annotations) { | ||||
| 	// Keep a copy of the original point slice so that it can be returned to the pool.
 | ||||
| 	origMatrix := slices.Clone(inputMatrix) | ||||
| 	defer func() { | ||||
|  | @ -1386,7 +1382,7 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping | |||
| 	} | ||||
| 
 | ||||
| 	for ts := ev.startTimestamp; ts <= ev.endTimestamp; ts += ev.interval { | ||||
| 		if err := contextDone(ev.ctx, "expression evaluation"); err != nil { | ||||
| 		if err := contextDone(ctx, "expression evaluation"); err != nil { | ||||
| 			ev.error(err) | ||||
| 		} | ||||
| 		// Reset number of samples in memory after each timestamp.
 | ||||
|  | @ -1437,11 +1433,11 @@ func (ev *evaluator) rangeEvalAgg(aggExpr *parser.AggregateExpr, sortedGrouping | |||
| 
 | ||||
| // evalSubquery evaluates given SubqueryExpr and returns an equivalent
 | ||||
| // evaluated MatrixSelector in its place. Note that the Name and LabelMatchers are not set.
 | ||||
| func (ev *evaluator) evalSubquery(subq *parser.SubqueryExpr) (*parser.MatrixSelector, int, annotations.Annotations) { | ||||
| func (ev *evaluator) evalSubquery(ctx context.Context, subq *parser.SubqueryExpr) (*parser.MatrixSelector, int, annotations.Annotations) { | ||||
| 	samplesStats := ev.samplesStats | ||||
| 	// Avoid double counting samples when running a subquery, those samples will be counted in later stage.
 | ||||
| 	ev.samplesStats = ev.samplesStats.NewChild() | ||||
| 	val, ws := ev.eval(subq) | ||||
| 	val, ws := ev.eval(ctx, subq) | ||||
| 	// But do incorporate the peak from the subquery
 | ||||
| 	samplesStats.UpdatePeakFromSubquery(ev.samplesStats) | ||||
| 	ev.samplesStats = samplesStats | ||||
|  | @ -1468,17 +1464,16 @@ func (ev *evaluator) evalSubquery(subq *parser.SubqueryExpr) (*parser.MatrixSele | |||
| } | ||||
| 
 | ||||
| // eval evaluates the given expression as the given AST expression node requires.
 | ||||
| func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotations) { | ||||
| func (ev *evaluator) eval(ctx context.Context, expr parser.Expr) (parser.Value, annotations.Annotations) { | ||||
| 	// This is the top-level evaluation method.
 | ||||
| 	// Thus, we check for timeout/cancellation here.
 | ||||
| 	if err := contextDone(ev.ctx, "expression evaluation"); err != nil { | ||||
| 	if err := contextDone(ctx, "expression evaluation"); err != nil { | ||||
| 		ev.error(err) | ||||
| 	} | ||||
| 	numSteps := int((ev.endTimestamp-ev.startTimestamp)/ev.interval) + 1 | ||||
| 
 | ||||
| 	// Create a new span to help investigate inner evaluation performances.
 | ||||
| 	ctxWithSpan, span := otel.Tracer("").Start(ev.ctx, stats.InnerEvalTime.SpanOperation()+" eval "+reflect.TypeOf(expr).String()) | ||||
| 	ev.ctx = ctxWithSpan | ||||
| 	ctx, span := otel.Tracer("").Start(ctx, stats.InnerEvalTime.SpanOperation()+" eval "+reflect.TypeOf(expr).String()) | ||||
| 	defer span.End() | ||||
| 
 | ||||
| 	switch e := expr.(type) { | ||||
|  | @ -1500,7 +1495,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio | |||
| 				sortedGrouping = append(sortedGrouping, valueLabel.Val) | ||||
| 				slices.Sort(sortedGrouping) | ||||
| 			} | ||||
| 			return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { | ||||
| 			return ev.rangeEval(ctx, nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { | ||||
| 				return ev.aggregationCountValues(e, sortedGrouping, valueLabel.Val, v[0].(Vector), enh) | ||||
| 			}, e.Expr) | ||||
| 		} | ||||
|  | @ -1510,16 +1505,16 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio | |||
| 		// param is the number k for topk/bottomk, or q for quantile.
 | ||||
| 		var fParam float64 | ||||
| 		if param != nil { | ||||
| 			val, ws := ev.eval(param) | ||||
| 			val, ws := ev.eval(ctx, param) | ||||
| 			warnings.Merge(ws) | ||||
| 			fParam = val.(Matrix)[0].Floats[0].F | ||||
| 		} | ||||
| 		// Now fetch the data to be aggregated.
 | ||||
| 		val, ws := ev.eval(e.Expr) | ||||
| 		val, ws := ev.eval(ctx, e.Expr) | ||||
| 		warnings.Merge(ws) | ||||
| 		inputMatrix := val.(Matrix) | ||||
| 
 | ||||
| 		result, ws := ev.rangeEvalAgg(e, sortedGrouping, inputMatrix, fParam) | ||||
| 		result, ws := ev.rangeEvalAgg(ctx, e, sortedGrouping, inputMatrix, fParam) | ||||
| 		warnings.Merge(ws) | ||||
| 		ev.currentSamples = originalNumSamples + result.TotalSamples() | ||||
| 		ev.samplesStats.UpdatePeak(ev.currentSamples) | ||||
|  | @ -1537,7 +1532,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio | |||
| 			unwrapParenExpr(&arg) | ||||
| 			vs, ok := arg.(*parser.VectorSelector) | ||||
| 			if ok { | ||||
| 				return ev.rangeEvalTimestampFunctionOverVectorSelector(vs, call, e) | ||||
| 				return ev.rangeEvalTimestampFunctionOverVectorSelector(ctx, vs, call, e) | ||||
| 			} | ||||
| 		} | ||||
| 
 | ||||
|  | @ -1561,7 +1556,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio | |||
| 				matrixArgIndex = i | ||||
| 				matrixArg = true | ||||
| 				// Replacing parser.SubqueryExpr with parser.MatrixSelector.
 | ||||
| 				val, totalSamples, ws := ev.evalSubquery(subq) | ||||
| 				val, totalSamples, ws := ev.evalSubquery(ctx, subq) | ||||
| 				e.Args[i] = val | ||||
| 				warnings.Merge(ws) | ||||
| 				defer func() { | ||||
|  | @ -1576,14 +1571,14 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio | |||
| 		// Special handling for functions that work on series not samples.
 | ||||
| 		switch e.Func.Name { | ||||
| 		case "label_replace": | ||||
| 			return ev.evalLabelReplace(e.Args) | ||||
| 			return ev.evalLabelReplace(ctx, e.Args) | ||||
| 		case "label_join": | ||||
| 			return ev.evalLabelJoin(e.Args) | ||||
| 			return ev.evalLabelJoin(ctx, e.Args) | ||||
| 		} | ||||
| 
 | ||||
| 		if !matrixArg { | ||||
| 			// Does not have a matrix argument.
 | ||||
| 			return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { | ||||
| 			return ev.rangeEval(ctx, nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { | ||||
| 				vec, annos := call(v, e.Args, enh) | ||||
| 				return vec, warnings.Merge(annos) | ||||
| 			}, e.Args...) | ||||
|  | @ -1595,7 +1590,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio | |||
| 		otherInArgs := make([]Vector, len(e.Args)) | ||||
| 		for i, e := range e.Args { | ||||
| 			if i != matrixArgIndex { | ||||
| 				val, ws := ev.eval(e) | ||||
| 				val, ws := ev.eval(ctx, e) | ||||
| 				otherArgs[i] = val.(Matrix) | ||||
| 				otherInArgs[i] = Vector{Sample{}} | ||||
| 				inArgs[i] = otherInArgs[i] | ||||
|  | @ -1609,7 +1604,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio | |||
| 		sel := arg.(*parser.MatrixSelector) | ||||
| 		selVS := sel.VectorSelector.(*parser.VectorSelector) | ||||
| 
 | ||||
| 		ws, err := checkAndExpandSeriesSet(ev.ctx, sel) | ||||
| 		ws, err := checkAndExpandSeriesSet(ctx, sel) | ||||
| 		warnings.Merge(ws) | ||||
| 		if err != nil { | ||||
| 			ev.error(errWithWarnings{fmt.Errorf("expanding series: %w", err), warnings}) | ||||
|  | @ -1639,7 +1634,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio | |||
| 		dropName := e.Func.Name != "last_over_time" | ||||
| 
 | ||||
| 		for i, s := range selVS.Series { | ||||
| 			if err := contextDone(ev.ctx, "expression evaluation"); err != nil { | ||||
| 			if err := contextDone(ctx, "expression evaluation"); err != nil { | ||||
| 				ev.error(err) | ||||
| 			} | ||||
| 			ev.currentSamples -= len(floats) + totalHPointSize(histograms) | ||||
|  | @ -1785,10 +1780,10 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio | |||
| 		return mat, warnings | ||||
| 
 | ||||
| 	case *parser.ParenExpr: | ||||
| 		return ev.eval(e.Expr) | ||||
| 		return ev.eval(ctx, e.Expr) | ||||
| 
 | ||||
| 	case *parser.UnaryExpr: | ||||
| 		val, ws := ev.eval(e.Expr) | ||||
| 		val, ws := ev.eval(ctx, e.Expr) | ||||
| 		mat := val.(Matrix) | ||||
| 		if e.Op == parser.SUB { | ||||
| 			for i := range mat { | ||||
|  | @ -1809,7 +1804,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio | |||
| 	case *parser.BinaryExpr: | ||||
| 		switch lt, rt := e.LHS.Type(), e.RHS.Type(); { | ||||
| 		case lt == parser.ValueTypeScalar && rt == parser.ValueTypeScalar: | ||||
| 			return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { | ||||
| 			return ev.rangeEval(ctx, nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { | ||||
| 				val := scalarBinop(e.Op, v[0].(Vector)[0].F, v[1].(Vector)[0].F) | ||||
| 				return append(enh.Out, Sample{F: val}), nil | ||||
| 			}, e.LHS, e.RHS) | ||||
|  | @ -1822,39 +1817,39 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio | |||
| 			} | ||||
| 			switch e.Op { | ||||
| 			case parser.LAND: | ||||
| 				return ev.rangeEval(initSignatures, func(v []parser.Value, sh [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { | ||||
| 				return ev.rangeEval(ctx, initSignatures, func(v []parser.Value, sh [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { | ||||
| 					return ev.VectorAnd(v[0].(Vector), v[1].(Vector), e.VectorMatching, sh[0], sh[1], enh), nil | ||||
| 				}, e.LHS, e.RHS) | ||||
| 			case parser.LOR: | ||||
| 				return ev.rangeEval(initSignatures, func(v []parser.Value, sh [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { | ||||
| 				return ev.rangeEval(ctx, initSignatures, func(v []parser.Value, sh [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { | ||||
| 					return ev.VectorOr(v[0].(Vector), v[1].(Vector), e.VectorMatching, sh[0], sh[1], enh), nil | ||||
| 				}, e.LHS, e.RHS) | ||||
| 			case parser.LUNLESS: | ||||
| 				return ev.rangeEval(initSignatures, func(v []parser.Value, sh [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { | ||||
| 				return ev.rangeEval(ctx, initSignatures, func(v []parser.Value, sh [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { | ||||
| 					return ev.VectorUnless(v[0].(Vector), v[1].(Vector), e.VectorMatching, sh[0], sh[1], enh), nil | ||||
| 				}, e.LHS, e.RHS) | ||||
| 			default: | ||||
| 				return ev.rangeEval(initSignatures, func(v []parser.Value, sh [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { | ||||
| 				return ev.rangeEval(ctx, initSignatures, func(v []parser.Value, sh [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { | ||||
| 					vec, err := ev.VectorBinop(e.Op, v[0].(Vector), v[1].(Vector), e.VectorMatching, e.ReturnBool, sh[0], sh[1], enh) | ||||
| 					return vec, handleVectorBinopError(err, e) | ||||
| 				}, e.LHS, e.RHS) | ||||
| 			} | ||||
| 
 | ||||
| 		case lt == parser.ValueTypeVector && rt == parser.ValueTypeScalar: | ||||
| 			return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { | ||||
| 			return ev.rangeEval(ctx, nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { | ||||
| 				vec, err := ev.VectorscalarBinop(e.Op, v[0].(Vector), Scalar{V: v[1].(Vector)[0].F}, false, e.ReturnBool, enh) | ||||
| 				return vec, handleVectorBinopError(err, e) | ||||
| 			}, e.LHS, e.RHS) | ||||
| 
 | ||||
| 		case lt == parser.ValueTypeScalar && rt == parser.ValueTypeVector: | ||||
| 			return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { | ||||
| 			return ev.rangeEval(ctx, nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { | ||||
| 				vec, err := ev.VectorscalarBinop(e.Op, v[1].(Vector), Scalar{V: v[0].(Vector)[0].F}, true, e.ReturnBool, enh) | ||||
| 				return vec, handleVectorBinopError(err, e) | ||||
| 			}, e.LHS, e.RHS) | ||||
| 		} | ||||
| 
 | ||||
| 	case *parser.NumberLiteral: | ||||
| 		return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { | ||||
| 		return ev.rangeEval(ctx, nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { | ||||
| 			return append(enh.Out, Sample{F: e.Val, Metric: labels.EmptyLabels()}), nil | ||||
| 		}) | ||||
| 
 | ||||
|  | @ -1862,7 +1857,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio | |||
| 		return String{V: e.Val, T: ev.startTimestamp}, nil | ||||
| 
 | ||||
| 	case *parser.VectorSelector: | ||||
| 		ws, err := checkAndExpandSeriesSet(ev.ctx, e) | ||||
| 		ws, err := checkAndExpandSeriesSet(ctx, e) | ||||
| 		if err != nil { | ||||
| 			ev.error(errWithWarnings{fmt.Errorf("expanding series: %w", err), ws}) | ||||
| 		} | ||||
|  | @ -1871,7 +1866,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio | |||
| 		it := storage.NewMemoizedEmptyIterator(durationMilliseconds(ev.lookbackDelta)) | ||||
| 		var chkIter chunkenc.Iterator | ||||
| 		for i, s := range e.Series { | ||||
| 			if err := contextDone(ev.ctx, "expression evaluation"); err != nil { | ||||
| 			if err := contextDone(ctx, "expression evaluation"); err != nil { | ||||
| 				ev.error(err) | ||||
| 			} | ||||
| 			chkIter = s.Iterator(chkIter) | ||||
|  | @ -1922,14 +1917,13 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio | |||
| 		if ev.startTimestamp != ev.endTimestamp { | ||||
| 			panic(errors.New("cannot do range evaluation of matrix selector")) | ||||
| 		} | ||||
| 		return ev.matrixSelector(e) | ||||
| 		return ev.matrixSelector(ctx, e) | ||||
| 
 | ||||
| 	case *parser.SubqueryExpr: | ||||
| 		offsetMillis := durationMilliseconds(e.Offset) | ||||
| 		rangeMillis := durationMilliseconds(e.Range) | ||||
| 		newEv := &evaluator{ | ||||
| 			endTimestamp:             ev.endTimestamp - offsetMillis, | ||||
| 			ctx:                      ev.ctx, | ||||
| 			currentSamples:           ev.currentSamples, | ||||
| 			maxSamples:               ev.maxSamples, | ||||
| 			logger:                   ev.logger, | ||||
|  | @ -1959,7 +1953,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio | |||
| 			setOffsetForAtModifier(newEv.startTimestamp, e.Expr) | ||||
| 		} | ||||
| 
 | ||||
| 		res, ws := newEv.eval(e.Expr) | ||||
| 		res, ws := newEv.eval(ctx, e.Expr) | ||||
| 		ev.currentSamples = newEv.currentSamples | ||||
| 		ev.samplesStats.UpdatePeakFromSubquery(newEv.samplesStats) | ||||
| 		ev.samplesStats.IncrementSamplesAtTimestamp(ev.endTimestamp, newEv.samplesStats.TotalSamples) | ||||
|  | @ -1967,14 +1961,13 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio | |||
| 	case *parser.StepInvariantExpr: | ||||
| 		switch ce := e.Expr.(type) { | ||||
| 		case *parser.StringLiteral, *parser.NumberLiteral: | ||||
| 			return ev.eval(ce) | ||||
| 			return ev.eval(ctx, ce) | ||||
| 		} | ||||
| 
 | ||||
| 		newEv := &evaluator{ | ||||
| 			startTimestamp:           ev.startTimestamp, | ||||
| 			endTimestamp:             ev.startTimestamp, // Always a single evaluation.
 | ||||
| 			interval:                 ev.interval, | ||||
| 			ctx:                      ev.ctx, | ||||
| 			currentSamples:           ev.currentSamples, | ||||
| 			maxSamples:               ev.maxSamples, | ||||
| 			logger:                   ev.logger, | ||||
|  | @ -1983,7 +1976,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio | |||
| 			noStepSubqueryIntervalFn: ev.noStepSubqueryIntervalFn, | ||||
| 			enableDelayedNameRemoval: ev.enableDelayedNameRemoval, | ||||
| 		} | ||||
| 		res, ws := newEv.eval(e.Expr) | ||||
| 		res, ws := newEv.eval(ctx, e.Expr) | ||||
| 		ev.currentSamples = newEv.currentSamples | ||||
| 		ev.samplesStats.UpdatePeakFromSubquery(newEv.samplesStats) | ||||
| 		for ts, step := ev.startTimestamp, -1; ts <= ev.endTimestamp; ts += ev.interval { | ||||
|  | @ -2059,8 +2052,8 @@ func reuseOrGetFPointSlices(prevSS *Series, numSteps int) (r []FPoint) { | |||
| 	return getFPointSlice(numSteps) | ||||
| } | ||||
| 
 | ||||
| func (ev *evaluator) rangeEvalTimestampFunctionOverVectorSelector(vs *parser.VectorSelector, call FunctionCall, e *parser.Call) (parser.Value, annotations.Annotations) { | ||||
| 	ws, err := checkAndExpandSeriesSet(ev.ctx, vs) | ||||
| func (ev *evaluator) rangeEvalTimestampFunctionOverVectorSelector(ctx context.Context, vs *parser.VectorSelector, call FunctionCall, e *parser.Call) (parser.Value, annotations.Annotations) { | ||||
| 	ws, err := checkAndExpandSeriesSet(ctx, vs) | ||||
| 	if err != nil { | ||||
| 		ev.error(errWithWarnings{fmt.Errorf("expanding series: %w", err), ws}) | ||||
| 	} | ||||
|  | @ -2071,7 +2064,7 @@ func (ev *evaluator) rangeEvalTimestampFunctionOverVectorSelector(vs *parser.Vec | |||
| 		seriesIterators[i] = storage.NewMemoizedIterator(it, durationMilliseconds(ev.lookbackDelta)) | ||||
| 	} | ||||
| 
 | ||||
| 	return ev.rangeEval(nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { | ||||
| 	return ev.rangeEval(ctx, nil, func(v []parser.Value, _ [][]EvalSeriesHelper, enh *EvalNodeHelper) (Vector, annotations.Annotations) { | ||||
| 		if vs.Timestamp != nil { | ||||
| 			// This is a special case for "timestamp()" when the @ modifier is used, to ensure that
 | ||||
| 			// we return a point for each time step in this case.
 | ||||
|  | @ -2207,7 +2200,7 @@ func putMatrixSelectorHPointSlice(p []HPoint) { | |||
| } | ||||
| 
 | ||||
| // matrixSelector evaluates a *parser.MatrixSelector expression.
 | ||||
| func (ev *evaluator) matrixSelector(node *parser.MatrixSelector) (Matrix, annotations.Annotations) { | ||||
| func (ev *evaluator) matrixSelector(ctx context.Context, node *parser.MatrixSelector) (Matrix, annotations.Annotations) { | ||||
| 	var ( | ||||
| 		vs = node.VectorSelector.(*parser.VectorSelector) | ||||
| 
 | ||||
|  | @ -2218,7 +2211,7 @@ func (ev *evaluator) matrixSelector(node *parser.MatrixSelector) (Matrix, annota | |||
| 
 | ||||
| 		it = storage.NewBuffer(durationMilliseconds(node.Range)) | ||||
| 	) | ||||
| 	ws, err := checkAndExpandSeriesSet(ev.ctx, node) | ||||
| 	ws, err := checkAndExpandSeriesSet(ctx, node) | ||||
| 	if err != nil { | ||||
| 		ev.error(errWithWarnings{fmt.Errorf("expanding series: %w", err), ws}) | ||||
| 	} | ||||
|  | @ -2226,7 +2219,7 @@ func (ev *evaluator) matrixSelector(node *parser.MatrixSelector) (Matrix, annota | |||
| 	var chkIter chunkenc.Iterator | ||||
| 	series := vs.Series | ||||
| 	for i, s := range series { | ||||
| 		if err := contextDone(ev.ctx, "expression evaluation"); err != nil { | ||||
| 		if err := contextDone(ctx, "expression evaluation"); err != nil { | ||||
| 			ev.error(err) | ||||
| 		} | ||||
| 		chkIter = s.Iterator(chkIter) | ||||
|  |  | |||
|  | @ -14,6 +14,7 @@ | |||
| package promql | ||||
| 
 | ||||
| import ( | ||||
| 	"context" | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
| 	"math" | ||||
|  | @ -1463,7 +1464,7 @@ func funcChanges(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelp | |||
| } | ||||
| 
 | ||||
| // label_replace function operates only on series; does not look at timestamps or values.
 | ||||
| func (ev *evaluator) evalLabelReplace(args parser.Expressions) (parser.Value, annotations.Annotations) { | ||||
| func (ev *evaluator) evalLabelReplace(ctx context.Context, args parser.Expressions) (parser.Value, annotations.Annotations) { | ||||
| 	var ( | ||||
| 		dst      = stringFromArg(args[1]) | ||||
| 		repl     = stringFromArg(args[2]) | ||||
|  | @ -1479,7 +1480,7 @@ func (ev *evaluator) evalLabelReplace(args parser.Expressions) (parser.Value, an | |||
| 		panic(fmt.Errorf("invalid destination label name in label_replace(): %s", dst)) | ||||
| 	} | ||||
| 
 | ||||
| 	val, ws := ev.eval(args[0]) | ||||
| 	val, ws := ev.eval(ctx, args[0]) | ||||
| 	matrix := val.(Matrix) | ||||
| 	lb := labels.NewBuilder(labels.EmptyLabels()) | ||||
| 
 | ||||
|  | @ -1520,7 +1521,7 @@ func funcVector(vals []parser.Value, args parser.Expressions, enh *EvalNodeHelpe | |||
| } | ||||
| 
 | ||||
| // label_join function operates only on series; does not look at timestamps or values.
 | ||||
| func (ev *evaluator) evalLabelJoin(args parser.Expressions) (parser.Value, annotations.Annotations) { | ||||
| func (ev *evaluator) evalLabelJoin(ctx context.Context, args parser.Expressions) (parser.Value, annotations.Annotations) { | ||||
| 	var ( | ||||
| 		dst       = stringFromArg(args[1]) | ||||
| 		sep       = stringFromArg(args[2]) | ||||
|  | @ -1537,7 +1538,7 @@ func (ev *evaluator) evalLabelJoin(args parser.Expressions) (parser.Value, annot | |||
| 		panic(fmt.Errorf("invalid destination label name in label_join(): %s", dst)) | ||||
| 	} | ||||
| 
 | ||||
| 	val, ws := ev.eval(args[0]) | ||||
| 	val, ws := ev.eval(ctx, args[0]) | ||||
| 	matrix := val.(Matrix) | ||||
| 	srcVals := make([]string, len(srcLabels)) | ||||
| 	lb := labels.NewBuilder(labels.EmptyLabels()) | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue