diff --git a/prompb/io/prometheus/write/v2/codec.go b/prompb/io/prometheus/write/v2/codec.go index 8f119d6d01..71196edb88 100644 --- a/prompb/io/prometheus/write/v2/codec.go +++ b/prompb/io/prometheus/write/v2/codec.go @@ -25,7 +25,7 @@ import ( // NOTE(bwplotka): This file's code is tested in /prompb/rwcommon. // ToLabels return model labels.Labels from timeseries' remote labels. -func (m TimeSeries) ToLabels(b *labels.ScratchBuilder, symbols []string) labels.Labels { +func (m TimeSeries) ToLabels(b *labels.ScratchBuilder, symbols []string) (labels.Labels, error) { return desymbolizeLabels(b, m.GetLabelsRefs(), symbols) } @@ -207,13 +207,18 @@ func spansToSpansProto(s []histogram.Span) []BucketSpan { return spans } -func (m Exemplar) ToExemplar(b *labels.ScratchBuilder, symbols []string) exemplar.Exemplar { +func (m Exemplar) ToExemplar(b *labels.ScratchBuilder, symbols []string) (exemplar.Exemplar, error) { timestamp := m.Timestamp + lbls, err := desymbolizeLabels(b, m.LabelsRefs, symbols) + if err != nil { + return exemplar.Exemplar{}, err + } + return exemplar.Exemplar{ - Labels: desymbolizeLabels(b, m.LabelsRefs, symbols), + Labels: lbls, Value: m.Value, Ts: timestamp, HasTs: timestamp != 0, - } + }, nil } diff --git a/prompb/io/prometheus/write/v2/symbols.go b/prompb/io/prometheus/write/v2/symbols.go index f316a976f2..7c7feca239 100644 --- a/prompb/io/prometheus/write/v2/symbols.go +++ b/prompb/io/prometheus/write/v2/symbols.go @@ -13,7 +13,11 @@ package writev2 -import "github.com/prometheus/prometheus/model/labels" +import ( + "fmt" + + "github.com/prometheus/prometheus/model/labels" +) // SymbolsTable implements table for easy symbol use. type SymbolsTable struct { @@ -73,11 +77,22 @@ func (t *SymbolsTable) Reset() { } // desymbolizeLabels decodes label references, with given symbols to labels. -func desymbolizeLabels(b *labels.ScratchBuilder, labelRefs []uint32, symbols []string) labels.Labels { +// This function requires labelRefs to have an even number of elements (name-value pairs) and +// all references must be valid indices within the symbols table. It will return an error if +// these invariants are violated. +func desymbolizeLabels(b *labels.ScratchBuilder, labelRefs []uint32, symbols []string) (labels.Labels, error) { + if len(labelRefs)%2 != 0 { + return labels.EmptyLabels(), fmt.Errorf("invalid labelRefs length %d", len(labelRefs)) + } + b.Reset() for i := 0; i < len(labelRefs); i += 2 { - b.Add(symbols[labelRefs[i]], symbols[labelRefs[i+1]]) + nameRef, valueRef := labelRefs[i], labelRefs[i+1] + if int(nameRef) >= len(symbols) || int(valueRef) >= len(symbols) { + return labels.EmptyLabels(), fmt.Errorf("labelRefs %d (name) = %d (value) outside of symbols table (size %d)", nameRef, valueRef, len(symbols)) + } + b.Add(symbols[nameRef], symbols[valueRef]) } b.Sort() - return b.Labels() + return b.Labels(), nil } diff --git a/prompb/io/prometheus/write/v2/symbols_test.go b/prompb/io/prometheus/write/v2/symbols_test.go index 3d852e88f1..7e7c7cb0bd 100644 --- a/prompb/io/prometheus/write/v2/symbols_test.go +++ b/prompb/io/prometheus/write/v2/symbols_test.go @@ -50,7 +50,8 @@ func TestSymbolsTable(t *testing.T) { encoded := s.SymbolizeLabels(ls, nil) require.Equal(t, []uint32{1, 3, 4, 5}, encoded) b := labels.NewScratchBuilder(len(encoded)) - decoded := desymbolizeLabels(&b, encoded, s.Symbols()) + decoded, err := desymbolizeLabels(&b, encoded, s.Symbols()) + require.NoError(t, err) require.Equal(t, ls, decoded) // Different buf. diff --git a/prompb/rwcommon/codec_test.go b/prompb/rwcommon/codec_test.go index b91355c51c..73a8196fa8 100644 --- a/prompb/rwcommon/codec_test.go +++ b/prompb/rwcommon/codec_test.go @@ -40,7 +40,9 @@ func TestToLabels(t *testing.T) { v2Symbols := []string{"", "__name__", "metric1", "foo", "bar"} ts := writev2.TimeSeries{LabelsRefs: []uint32{1, 2, 3, 4}} b := labels.NewScratchBuilder(2) - require.Equal(t, expected, ts.ToLabels(&b, v2Symbols)) + result, err := ts.ToLabels(&b, v2Symbols) + require.NoError(t, err) + require.Equal(t, expected, result) // No need for FromLabels in our prod code as we use symbol table to do so. }) } diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index d117349a86..7a051656d5 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -1245,7 +1245,11 @@ func v2RequestToWriteRequest(v2Req *writev2.Request) (*prompb.WriteRequest, erro } b := labels.NewScratchBuilder(0) for i, rts := range v2Req.Timeseries { - rts.ToLabels(&b, v2Req.Symbols).Range(func(l labels.Label) { + lbls, err := rts.ToLabels(&b, v2Req.Symbols) + if err != nil { + return nil, fmt.Errorf("failed to convert labels: %w", err) + } + lbls.Range(func(l labels.Label) { req.Timeseries[i].Labels = append(req.Timeseries[i].Labels, prompb.Label{ Name: l.Name, Value: l.Value, @@ -1256,7 +1260,11 @@ func v2RequestToWriteRequest(v2Req *writev2.Request) (*prompb.WriteRequest, erro for j, e := range rts.Exemplars { exemplars[j].Value = e.Value exemplars[j].Timestamp = e.Timestamp - e.ToExemplar(&b, v2Req.Symbols).Labels.Range(func(l labels.Label) { + ex, err := e.ToExemplar(&b, v2Req.Symbols) + if err != nil { + return nil, fmt.Errorf("failed to convert exemplar: %w", err) + } + ex.Labels.Range(func(l labels.Label) { exemplars[j].Labels = append(exemplars[j].Labels, prompb.Label{ Name: l.Name, Value: l.Value, @@ -1282,7 +1290,10 @@ func v2RequestToWriteRequest(v2Req *writev2.Request) (*prompb.WriteRequest, erro // Convert v2 metadata to v1 format. if rts.Metadata.Type != writev2.Metadata_METRIC_TYPE_UNSPECIFIED { - labels := rts.ToLabels(&b, v2Req.Symbols) + labels, err := rts.ToLabels(&b, v2Req.Symbols) + if err != nil { + return nil, fmt.Errorf("failed to convert metadata labels: %w", err) + } metadata := rts.ToMetadata(v2Req.Symbols) metricFamilyName := labels.String() diff --git a/storage/remote/write_handler.go b/storage/remote/write_handler.go index ce4c569715..7681655e61 100644 --- a/storage/remote/write_handler.go +++ b/storage/remote/write_handler.go @@ -388,7 +388,12 @@ func (h *writeHandler) appendV2(app storage.Appender, req *writev2.Request, rs * b = labels.NewScratchBuilder(0) ) for _, ts := range req.Timeseries { - ls := ts.ToLabels(&b, req.Symbols) + ls, err := ts.ToLabels(&b, req.Symbols) + if err != nil { + badRequestErrs = append(badRequestErrs, fmt.Errorf("parsing labels for series %v: %w", ts.LabelsRefs, err)) + samplesWithInvalidLabels += len(ts.Samples) + len(ts.Histograms) + continue + } // Validate series labels early. // NOTE(bwplotka): While spec allows UTF-8, Prometheus Receiver may impose // specific limits and follow https://prometheus.io/docs/specs/remote_write_spec_2_0/#invalid-samples case. @@ -474,7 +479,11 @@ func (h *writeHandler) appendV2(app storage.Appender, req *writev2.Request, rs * // Exemplars. for _, ep := range ts.Exemplars { - e := ep.ToExemplar(&b, req.Symbols) + e, err := ep.ToExemplar(&b, req.Symbols) + if err != nil { + badRequestErrs = append(badRequestErrs, fmt.Errorf("parsing exemplar for series %v: %w", ls.String(), err)) + continue + } ref, err = app.AppendExemplar(ref, ls, e) if err == nil { rs.Exemplars++ diff --git a/storage/remote/write_handler_test.go b/storage/remote/write_handler_test.go index f50106b3d4..5631e80732 100644 --- a/storage/remote/write_handler_test.go +++ b/storage/remote/write_handler_test.go @@ -402,6 +402,22 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) { expectedCode: http.StatusBadRequest, expectedRespBody: "invalid labels for series, labels {__name__=\"test_metric1\", test_metric1=\"test_metric1\", test_metric1=\"test_metric1\"}, duplicated label test_metric1\n", }, + { + desc: "Partial write; first series with odd number of label refs", + input: append( + []writev2.TimeSeries{{LabelsRefs: []uint32{1, 2, 3}, Samples: []writev2.Sample{{Value: 1, Timestamp: 1}}}}, + writeV2RequestFixture.Timeseries...), + expectedCode: http.StatusBadRequest, + expectedRespBody: "parsing labels for series [1 2 3]: invalid labelRefs length 3\n", + }, + { + desc: "Partial write; first series with out-of-bounds symbol references", + input: append( + []writev2.TimeSeries{{LabelsRefs: []uint32{1, 999}, Samples: []writev2.Sample{{Value: 1, Timestamp: 1}}}}, + writeV2RequestFixture.Timeseries...), + expectedCode: http.StatusBadRequest, + expectedRespBody: "parsing labels for series [1 999]: labelRefs 1 (name) = 999 (value) outside of symbols table (size 18)\n", + }, { desc: "Partial write; first series with one OOO sample", input: func() []writev2.TimeSeries { @@ -543,7 +559,8 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) { for _, ts := range writeV2RequestFixture.Timeseries { zeroHistogramIngested := false zeroFloatHistogramIngested := false - ls := ts.ToLabels(&b, writeV2RequestFixture.Symbols) + ls, err := ts.ToLabels(&b, writeV2RequestFixture.Symbols) + require.NoError(t, err) for _, s := range ts.Samples { if ts.CreatedTimestamp != 0 && tc.ingestCTZeroSample { @@ -579,7 +596,9 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) { } if tc.appendExemplarErr == nil { for _, e := range ts.Exemplars { - exemplarLabels := e.ToExemplar(&b, writeV2RequestFixture.Symbols).Labels + ex, err := e.ToExemplar(&b, writeV2RequestFixture.Symbols) + require.NoError(t, err) + exemplarLabels := ex.Labels requireEqual(t, mockExemplar{ls, exemplarLabels, e.Timestamp, e.Value}, appendable.exemplars[j]) j++ }