diff --git a/pkg/expr/graph.go b/pkg/expr/graph.go index 58919d1b72c..2b09143110b 100644 --- a/pkg/expr/graph.go +++ b/pkg/expr/graph.go @@ -292,3 +292,23 @@ func buildGraphEdges(dp *simple.DirectedGraph, registry map[string]Node) error { } return nil } + +// GetCommandsFromPipeline traverses the pipeline and extracts all CMDNode commands that match the type +func GetCommandsFromPipeline[T Command](pipeline DataPipeline) []T { + var results []T + for _, p := range pipeline { + if p.NodeType() != TypeCMDNode { + continue + } + switch cmd := p.(type) { + case *CMDNode: + switch r := cmd.Command.(type) { + case T: + results = append(results, r) + } + default: + continue + } + } + return results +} diff --git a/pkg/expr/graph_test.go b/pkg/expr/graph_test.go index 25dbc57eca8..11be8e5a352 100644 --- a/pkg/expr/graph_test.go +++ b/pkg/expr/graph_test.go @@ -246,6 +246,41 @@ func TestServicebuildPipeLine(t *testing.T) { } } +func TestGetCommandsFromPipeline(t *testing.T) { + pipeline := DataPipeline{ + &MLNode{}, + &DSNode{}, + &CMDNode{ + baseNode: baseNode{}, + CMDType: 0, + Command: &ReduceCommand{}, + }, + &CMDNode{ + baseNode: baseNode{}, + CMDType: 0, + Command: &ReduceCommand{}, + }, + &CMDNode{ + baseNode: baseNode{}, + CMDType: 0, + Command: &HysteresisCommand{}, + }, + } + t.Run("should find command that exists", func(t *testing.T) { + cmds := GetCommandsFromPipeline[*HysteresisCommand](pipeline) + require.Len(t, cmds, 1) + require.Equal(t, pipeline[4].(*CMDNode).Command, cmds[0]) + }) + t.Run("should find all commands that exist", func(t *testing.T) { + cmds := GetCommandsFromPipeline[*ReduceCommand](pipeline) + require.Len(t, cmds, 2) + }) + t.Run("should not find all command that does not exist", func(t *testing.T) { + cmds := GetCommandsFromPipeline[*MathCommand](pipeline) + require.Len(t, cmds, 0) + }) +} + func getRefIDOrder(nodes []Node) []string { ids := make([]string, 0, len(nodes)) for _, n := range nodes { diff --git a/pkg/expr/hysteresis.go b/pkg/expr/hysteresis.go index 9526f835715..51ddcf37d5b 100644 --- a/pkg/expr/hysteresis.go +++ b/pkg/expr/hysteresis.go @@ -120,3 +120,17 @@ func FingerprintsFromFrame(frame *data.Frame) (Fingerprints, error) { } return result, nil } + +// FingerprintsToFrame converts Fingerprints to data.Frame. +func FingerprintsToFrame(fingerprints Fingerprints) *data.Frame { + fp := make([]uint64, 0, len(fingerprints)) + for fingerprint := range fingerprints { + fp = append(fp, uint64(fingerprint)) + } + frame := data.NewFrame("", data.NewField("fingerprints", nil, fp)) + frame.SetMeta(&data.FrameMeta{ + Type: "fingerprints", + TypeVersion: data.FrameTypeVersion{1, 0}, + }) + return frame +} diff --git a/pkg/expr/hysteresis_test.go b/pkg/expr/hysteresis_test.go index 957e045174c..49d75ac7c66 100644 --- a/pkg/expr/hysteresis_test.go +++ b/pkg/expr/hysteresis_test.go @@ -186,3 +186,36 @@ func TestLoadedDimensionsFromFrame(t *testing.T) { }) } } + +func TestFingerprintsToFrame(t *testing.T) { + testCases := []struct { + name string + input Fingerprints + expected Fingerprints + expectedError bool + }{ + { + name: "when empty map", + input: Fingerprints{}, + expected: Fingerprints{}, + }, + { + name: "when nil", + input: nil, + expected: Fingerprints{}, + }, + { + name: "when has values", + input: Fingerprints{1: {}, 2: {}, 3: {}, 4: {}, 5: {}}, + expected: Fingerprints{1: {}, 2: {}, 3: {}, 4: {}, 5: {}}, + }, + } + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + frame := FingerprintsToFrame(testCase.input) + actual, err := FingerprintsFromFrame(frame) + require.NoError(t, err) + require.EqualValues(t, testCase.expected, actual) + }) + } +} diff --git a/pkg/expr/nodes.go b/pkg/expr/nodes.go index 87579f94262..32883d305d6 100644 --- a/pkg/expr/nodes.go +++ b/pkg/expr/nodes.go @@ -3,6 +3,7 @@ package expr import ( "context" "encoding/json" + "errors" "fmt" "strings" "time" @@ -45,10 +46,10 @@ type rawNode struct { idx int64 } -func (rn *rawNode) GetCommandType() (c CommandType, err error) { - rawType, ok := rn.Query["type"] +func GetExpressionCommandType(rawQuery map[string]any) (c CommandType, err error) { + rawType, ok := rawQuery["type"] if !ok { - return c, fmt.Errorf("no expression command type in query for refId %v", rn.RefID) + return c, errors.New("no expression command type in query") } typeString, ok := rawType.(string) if !ok { @@ -97,7 +98,7 @@ func (gn *CMDNode) Execute(ctx context.Context, now time.Time, vars mathexp.Vars } func buildCMDNode(rn *rawNode, toggles featuremgmt.FeatureToggles) (*CMDNode, error) { - commandType, err := rn.GetCommandType() + commandType, err := GetExpressionCommandType(rn.Query) if err != nil { return nil, fmt.Errorf("invalid command type in expression '%v': %w", rn.RefID, err) } diff --git a/pkg/expr/threshold.go b/pkg/expr/threshold.go index 2fa6e750243..f6fd093a27d 100644 --- a/pkg/expr/threshold.go +++ b/pkg/expr/threshold.go @@ -3,6 +3,7 @@ package expr import ( "context" "encoding/json" + "errors" "fmt" "strings" "time" @@ -163,3 +164,64 @@ type ThresholdConditionJSON struct { UnloadEvaluator *ConditionEvalJSON `json:"unloadEvaluator"` LoadedDimensions *data.Frame `json:"loadedDimensions"` } + +// IsHysteresisExpression returns true if the raw model describes a hysteresis command: +// - field 'type' has value "threshold", +// - field 'conditions' is array of objects and has exactly one element +// - field 'conditions[0].unloadEvaluator is not nil +func IsHysteresisExpression(query map[string]any) bool { + c, err := getConditionForHysteresisCommand(query) + if err != nil { + return false + } + return c != nil +} + +// SetLoadedDimensionsToHysteresisCommand mutates the input map and sets field "conditions[0].loadedMetrics" with the data frame created from the provided fingerprints. +func SetLoadedDimensionsToHysteresisCommand(query map[string]any, fingerprints Fingerprints) error { + condition, err := getConditionForHysteresisCommand(query) + if err != nil { + return err + } + if condition == nil { + return errors.New("not a hysteresis command") + } + fr := FingerprintsToFrame(fingerprints) + condition["loadedDimensions"] = fr + return nil +} + +func getConditionForHysteresisCommand(query map[string]any) (map[string]any, error) { + t, err := GetExpressionCommandType(query) + if err != nil { + return nil, err + } + if t != TypeThreshold { + return nil, errors.New("not a threshold command") + } + + c, ok := query["conditions"] + if !ok { + return nil, errors.New("invalid threshold command: expected field \"condition\"") + } + var condition map[string]any + switch arr := c.(type) { + case []any: + if len(arr) != 1 { + return nil, errors.New("invalid threshold command: field \"condition\" expected to have exactly 1 field") + } + switch m := arr[0].(type) { + case map[string]any: + condition = m + default: + return nil, errors.New("invalid threshold command: value of the first element of field \"condition\" expected to be an object") + } + default: + return nil, errors.New("invalid threshold command: field \"condition\" expected to be an array of objects") + } + _, ok = condition["unloadEvaluator"] + if !ok { + return nil, nil + } + return condition, nil +} diff --git a/pkg/expr/threshold_test.go b/pkg/expr/threshold_test.go index 5368ae33486..18d1a19fb6f 100644 --- a/pkg/expr/threshold_test.go +++ b/pkg/expr/threshold_test.go @@ -3,6 +3,7 @@ package expr import ( "encoding/json" "fmt" + "math" "sort" "testing" @@ -162,7 +163,7 @@ func TestUnmarshalThresholdCommand(t *testing.T) { ], "type": "lt" }, - "loadedDimensions": {"schema":{"name":"test","meta":{"type":"fingerprints","typeVersion":[1,0]},"fields":[{"name":"fingerprints","type":"number","typeInfo":{"frame":"uint64"}}]},"data":{"values":[[1,2,3,4,5]]}} + "loadedDimensions": {"schema":{"name":"test","meta":{"type":"fingerprints","typeVersion":[1,0]},"fields":[{"name":"fingerprints","type":"number","typeInfo":{"frame":"uint64"}}]},"data":{"values":[[18446744073709551615,2,3,4,5]]}} } ] }`, @@ -186,7 +187,7 @@ func TestUnmarshalThresholdCommand(t *testing.T) { return actual[i] < actual[j] }) - require.EqualValues(t, []uint64{1, 2, 3, 4, 5}, actual) + require.EqualValues(t, []uint64{2, 3, 4, 5, 18446744073709551615}, actual) }, }, } @@ -330,3 +331,115 @@ func TestIsSupportedThresholdFunc(t *testing.T) { }) } } + +func TestIsHysteresisExpression(t *testing.T) { + cases := []struct { + name string + input json.RawMessage + expected bool + }{ + { + name: "false if it's empty", + input: json.RawMessage(`{}`), + expected: false, + }, + { + name: "false if it is not threshold type", + input: json.RawMessage(`{ "type": "reduce" }`), + expected: false, + }, + { + name: "false if no conditions", + input: json.RawMessage(`{ "type": "threshold" }`), + expected: false, + }, + { + name: "false if many conditions", + input: json.RawMessage(`{ "type": "threshold", "conditions": [{}, {}] }`), + expected: false, + }, + { + name: "false if condition is not an object", + input: json.RawMessage(`{ "type": "threshold", "conditions": ["test"] }`), + expected: false, + }, + { + name: "false if condition is does not have unloadEvaluator", + input: json.RawMessage(`{ "type": "threshold", "conditions": [{}] }`), + expected: false, + }, + { + name: "true type is threshold and a single condition has unloadEvaluator field", + input: json.RawMessage(`{ "type": "threshold", "conditions": [{ "unloadEvaluator" : {}}] }`), + expected: true, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + query := map[string]any{} + require.NoError(t, json.Unmarshal(tc.input, &query)) + require.Equal(t, tc.expected, IsHysteresisExpression(query)) + }) + } +} + +func TestSetLoadedDimensionsToHysteresisCommand(t *testing.T) { + cases := []struct { + name string + input json.RawMessage + }{ + { + name: "error if model is empty", + input: json.RawMessage(`{}`), + }, + { + name: "error if is not a threshold type", + input: json.RawMessage(`{ "type": "reduce" }`), + }, + { + name: "error if threshold but no conditions", + input: json.RawMessage(`{ "type": "threshold" }`), + }, + { + name: "error if threshold and many conditions", + input: json.RawMessage(`{ "type": "threshold", "conditions": [{}, {}] }`), + }, + { + name: "error if condition is not an object", + input: json.RawMessage(`{ "type": "threshold", "conditions": ["test"] }`), + }, + { + name: "error if condition does not have unloadEvaluator", + input: json.RawMessage(`{ "type": "threshold", "conditions": [{ "evaluator": { "params": [5], "type": "gt"}}], "expression": "A" }`), + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + query := map[string]any{} + require.NoError(t, json.Unmarshal(tc.input, &query)) + err := SetLoadedDimensionsToHysteresisCommand(query, Fingerprints{math.MaxUint64: {}, 2: {}, 3: {}}) + require.Error(t, err) + }) + } + + t.Run("when unloadEvaluator is set, mutates query with loaded dimensions", func(t *testing.T) { + fingerprints := Fingerprints{math.MaxUint64: {}, 2: {}, 3: {}} + input := json.RawMessage(`{ "type": "threshold", "conditions": [{ "evaluator": { "params": [5], "type": "gt" }, "unloadEvaluator" : {"params": [2], "type": "lt"}}], "expression": "A" }`) + query := map[string]any{} + require.NoError(t, json.Unmarshal(input, &query)) + require.NoError(t, SetLoadedDimensionsToHysteresisCommand(query, fingerprints)) + raw, err := json.Marshal(query) + require.NoError(t, err) + + // Assert the query is set by unmarshalling the query because it's the easiest way to assert Fingerprints + cmd, err := UnmarshalThresholdCommand(&rawNode{ + RefID: "B", + QueryRaw: raw, + }, featuremgmt.WithFeatures(featuremgmt.FlagRecoveryThreshold)) + require.NoError(t, err) + + require.Equal(t, fingerprints, cmd.(*HysteresisCommand).LoadedDimensions) + }) +}