Handle error gracefully for the `desymbolizeLabels` function in prompb/io/prometheus/write/v2/symbols.go (#17160)
Signed-off-by: pipiland <user123@Minhs-Macbook.local> --------- Signed-off-by: pipiland <user123@Minhs-Macbook.local> Co-authored-by: pipiland <user123@Minhs-Macbook.local>
This commit is contained in:
parent
acd9aa0afb
commit
0fc2547740
|
@ -25,7 +25,7 @@ import (
|
||||||
// NOTE(bwplotka): This file's code is tested in /prompb/rwcommon.
|
// NOTE(bwplotka): This file's code is tested in /prompb/rwcommon.
|
||||||
|
|
||||||
// ToLabels return model labels.Labels from timeseries' remote labels.
|
// ToLabels return model labels.Labels from timeseries' remote labels.
|
||||||
func (m TimeSeries) ToLabels(b *labels.ScratchBuilder, symbols []string) labels.Labels {
|
func (m TimeSeries) ToLabels(b *labels.ScratchBuilder, symbols []string) (labels.Labels, error) {
|
||||||
return desymbolizeLabels(b, m.GetLabelsRefs(), symbols)
|
return desymbolizeLabels(b, m.GetLabelsRefs(), symbols)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -207,13 +207,18 @@ func spansToSpansProto(s []histogram.Span) []BucketSpan {
|
||||||
return spans
|
return spans
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m Exemplar) ToExemplar(b *labels.ScratchBuilder, symbols []string) exemplar.Exemplar {
|
func (m Exemplar) ToExemplar(b *labels.ScratchBuilder, symbols []string) (exemplar.Exemplar, error) {
|
||||||
timestamp := m.Timestamp
|
timestamp := m.Timestamp
|
||||||
|
|
||||||
|
lbls, err := desymbolizeLabels(b, m.LabelsRefs, symbols)
|
||||||
|
if err != nil {
|
||||||
|
return exemplar.Exemplar{}, err
|
||||||
|
}
|
||||||
|
|
||||||
return exemplar.Exemplar{
|
return exemplar.Exemplar{
|
||||||
Labels: desymbolizeLabels(b, m.LabelsRefs, symbols),
|
Labels: lbls,
|
||||||
Value: m.Value,
|
Value: m.Value,
|
||||||
Ts: timestamp,
|
Ts: timestamp,
|
||||||
HasTs: timestamp != 0,
|
HasTs: timestamp != 0,
|
||||||
}
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,7 +13,11 @@
|
||||||
|
|
||||||
package writev2
|
package writev2
|
||||||
|
|
||||||
import "github.com/prometheus/prometheus/model/labels"
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/prometheus/prometheus/model/labels"
|
||||||
|
)
|
||||||
|
|
||||||
// SymbolsTable implements table for easy symbol use.
|
// SymbolsTable implements table for easy symbol use.
|
||||||
type SymbolsTable struct {
|
type SymbolsTable struct {
|
||||||
|
@ -73,11 +77,22 @@ func (t *SymbolsTable) Reset() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// desymbolizeLabels decodes label references, with given symbols to labels.
|
// desymbolizeLabels decodes label references, with given symbols to labels.
|
||||||
func desymbolizeLabels(b *labels.ScratchBuilder, labelRefs []uint32, symbols []string) labels.Labels {
|
// This function requires labelRefs to have an even number of elements (name-value pairs) and
|
||||||
|
// all references must be valid indices within the symbols table. It will return an error if
|
||||||
|
// these invariants are violated.
|
||||||
|
func desymbolizeLabels(b *labels.ScratchBuilder, labelRefs []uint32, symbols []string) (labels.Labels, error) {
|
||||||
|
if len(labelRefs)%2 != 0 {
|
||||||
|
return labels.EmptyLabels(), fmt.Errorf("invalid labelRefs length %d", len(labelRefs))
|
||||||
|
}
|
||||||
|
|
||||||
b.Reset()
|
b.Reset()
|
||||||
for i := 0; i < len(labelRefs); i += 2 {
|
for i := 0; i < len(labelRefs); i += 2 {
|
||||||
b.Add(symbols[labelRefs[i]], symbols[labelRefs[i+1]])
|
nameRef, valueRef := labelRefs[i], labelRefs[i+1]
|
||||||
|
if int(nameRef) >= len(symbols) || int(valueRef) >= len(symbols) {
|
||||||
|
return labels.EmptyLabels(), fmt.Errorf("labelRefs %d (name) = %d (value) outside of symbols table (size %d)", nameRef, valueRef, len(symbols))
|
||||||
|
}
|
||||||
|
b.Add(symbols[nameRef], symbols[valueRef])
|
||||||
}
|
}
|
||||||
b.Sort()
|
b.Sort()
|
||||||
return b.Labels()
|
return b.Labels(), nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -50,7 +50,8 @@ func TestSymbolsTable(t *testing.T) {
|
||||||
encoded := s.SymbolizeLabels(ls, nil)
|
encoded := s.SymbolizeLabels(ls, nil)
|
||||||
require.Equal(t, []uint32{1, 3, 4, 5}, encoded)
|
require.Equal(t, []uint32{1, 3, 4, 5}, encoded)
|
||||||
b := labels.NewScratchBuilder(len(encoded))
|
b := labels.NewScratchBuilder(len(encoded))
|
||||||
decoded := desymbolizeLabels(&b, encoded, s.Symbols())
|
decoded, err := desymbolizeLabels(&b, encoded, s.Symbols())
|
||||||
|
require.NoError(t, err)
|
||||||
require.Equal(t, ls, decoded)
|
require.Equal(t, ls, decoded)
|
||||||
|
|
||||||
// Different buf.
|
// Different buf.
|
||||||
|
|
|
@ -40,7 +40,9 @@ func TestToLabels(t *testing.T) {
|
||||||
v2Symbols := []string{"", "__name__", "metric1", "foo", "bar"}
|
v2Symbols := []string{"", "__name__", "metric1", "foo", "bar"}
|
||||||
ts := writev2.TimeSeries{LabelsRefs: []uint32{1, 2, 3, 4}}
|
ts := writev2.TimeSeries{LabelsRefs: []uint32{1, 2, 3, 4}}
|
||||||
b := labels.NewScratchBuilder(2)
|
b := labels.NewScratchBuilder(2)
|
||||||
require.Equal(t, expected, ts.ToLabels(&b, v2Symbols))
|
result, err := ts.ToLabels(&b, v2Symbols)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, expected, result)
|
||||||
// No need for FromLabels in our prod code as we use symbol table to do so.
|
// No need for FromLabels in our prod code as we use symbol table to do so.
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -1245,7 +1245,11 @@ func v2RequestToWriteRequest(v2Req *writev2.Request) (*prompb.WriteRequest, erro
|
||||||
}
|
}
|
||||||
b := labels.NewScratchBuilder(0)
|
b := labels.NewScratchBuilder(0)
|
||||||
for i, rts := range v2Req.Timeseries {
|
for i, rts := range v2Req.Timeseries {
|
||||||
rts.ToLabels(&b, v2Req.Symbols).Range(func(l labels.Label) {
|
lbls, err := rts.ToLabels(&b, v2Req.Symbols)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to convert labels: %w", err)
|
||||||
|
}
|
||||||
|
lbls.Range(func(l labels.Label) {
|
||||||
req.Timeseries[i].Labels = append(req.Timeseries[i].Labels, prompb.Label{
|
req.Timeseries[i].Labels = append(req.Timeseries[i].Labels, prompb.Label{
|
||||||
Name: l.Name,
|
Name: l.Name,
|
||||||
Value: l.Value,
|
Value: l.Value,
|
||||||
|
@ -1256,7 +1260,11 @@ func v2RequestToWriteRequest(v2Req *writev2.Request) (*prompb.WriteRequest, erro
|
||||||
for j, e := range rts.Exemplars {
|
for j, e := range rts.Exemplars {
|
||||||
exemplars[j].Value = e.Value
|
exemplars[j].Value = e.Value
|
||||||
exemplars[j].Timestamp = e.Timestamp
|
exemplars[j].Timestamp = e.Timestamp
|
||||||
e.ToExemplar(&b, v2Req.Symbols).Labels.Range(func(l labels.Label) {
|
ex, err := e.ToExemplar(&b, v2Req.Symbols)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to convert exemplar: %w", err)
|
||||||
|
}
|
||||||
|
ex.Labels.Range(func(l labels.Label) {
|
||||||
exemplars[j].Labels = append(exemplars[j].Labels, prompb.Label{
|
exemplars[j].Labels = append(exemplars[j].Labels, prompb.Label{
|
||||||
Name: l.Name,
|
Name: l.Name,
|
||||||
Value: l.Value,
|
Value: l.Value,
|
||||||
|
@ -1282,7 +1290,10 @@ func v2RequestToWriteRequest(v2Req *writev2.Request) (*prompb.WriteRequest, erro
|
||||||
|
|
||||||
// Convert v2 metadata to v1 format.
|
// Convert v2 metadata to v1 format.
|
||||||
if rts.Metadata.Type != writev2.Metadata_METRIC_TYPE_UNSPECIFIED {
|
if rts.Metadata.Type != writev2.Metadata_METRIC_TYPE_UNSPECIFIED {
|
||||||
labels := rts.ToLabels(&b, v2Req.Symbols)
|
labels, err := rts.ToLabels(&b, v2Req.Symbols)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to convert metadata labels: %w", err)
|
||||||
|
}
|
||||||
metadata := rts.ToMetadata(v2Req.Symbols)
|
metadata := rts.ToMetadata(v2Req.Symbols)
|
||||||
|
|
||||||
metricFamilyName := labels.String()
|
metricFamilyName := labels.String()
|
||||||
|
|
|
@ -388,7 +388,12 @@ func (h *writeHandler) appendV2(app storage.Appender, req *writev2.Request, rs *
|
||||||
b = labels.NewScratchBuilder(0)
|
b = labels.NewScratchBuilder(0)
|
||||||
)
|
)
|
||||||
for _, ts := range req.Timeseries {
|
for _, ts := range req.Timeseries {
|
||||||
ls := ts.ToLabels(&b, req.Symbols)
|
ls, err := ts.ToLabels(&b, req.Symbols)
|
||||||
|
if err != nil {
|
||||||
|
badRequestErrs = append(badRequestErrs, fmt.Errorf("parsing labels for series %v: %w", ts.LabelsRefs, err))
|
||||||
|
samplesWithInvalidLabels += len(ts.Samples) + len(ts.Histograms)
|
||||||
|
continue
|
||||||
|
}
|
||||||
// Validate series labels early.
|
// Validate series labels early.
|
||||||
// NOTE(bwplotka): While spec allows UTF-8, Prometheus Receiver may impose
|
// NOTE(bwplotka): While spec allows UTF-8, Prometheus Receiver may impose
|
||||||
// specific limits and follow https://prometheus.io/docs/specs/remote_write_spec_2_0/#invalid-samples case.
|
// specific limits and follow https://prometheus.io/docs/specs/remote_write_spec_2_0/#invalid-samples case.
|
||||||
|
@ -474,7 +479,11 @@ func (h *writeHandler) appendV2(app storage.Appender, req *writev2.Request, rs *
|
||||||
|
|
||||||
// Exemplars.
|
// Exemplars.
|
||||||
for _, ep := range ts.Exemplars {
|
for _, ep := range ts.Exemplars {
|
||||||
e := ep.ToExemplar(&b, req.Symbols)
|
e, err := ep.ToExemplar(&b, req.Symbols)
|
||||||
|
if err != nil {
|
||||||
|
badRequestErrs = append(badRequestErrs, fmt.Errorf("parsing exemplar for series %v: %w", ls.String(), err))
|
||||||
|
continue
|
||||||
|
}
|
||||||
ref, err = app.AppendExemplar(ref, ls, e)
|
ref, err = app.AppendExemplar(ref, ls, e)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
rs.Exemplars++
|
rs.Exemplars++
|
||||||
|
|
|
@ -402,6 +402,22 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) {
|
||||||
expectedCode: http.StatusBadRequest,
|
expectedCode: http.StatusBadRequest,
|
||||||
expectedRespBody: "invalid labels for series, labels {__name__=\"test_metric1\", test_metric1=\"test_metric1\", test_metric1=\"test_metric1\"}, duplicated label test_metric1\n",
|
expectedRespBody: "invalid labels for series, labels {__name__=\"test_metric1\", test_metric1=\"test_metric1\", test_metric1=\"test_metric1\"}, duplicated label test_metric1\n",
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
desc: "Partial write; first series with odd number of label refs",
|
||||||
|
input: append(
|
||||||
|
[]writev2.TimeSeries{{LabelsRefs: []uint32{1, 2, 3}, Samples: []writev2.Sample{{Value: 1, Timestamp: 1}}}},
|
||||||
|
writeV2RequestFixture.Timeseries...),
|
||||||
|
expectedCode: http.StatusBadRequest,
|
||||||
|
expectedRespBody: "parsing labels for series [1 2 3]: invalid labelRefs length 3\n",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "Partial write; first series with out-of-bounds symbol references",
|
||||||
|
input: append(
|
||||||
|
[]writev2.TimeSeries{{LabelsRefs: []uint32{1, 999}, Samples: []writev2.Sample{{Value: 1, Timestamp: 1}}}},
|
||||||
|
writeV2RequestFixture.Timeseries...),
|
||||||
|
expectedCode: http.StatusBadRequest,
|
||||||
|
expectedRespBody: "parsing labels for series [1 999]: labelRefs 1 (name) = 999 (value) outside of symbols table (size 18)\n",
|
||||||
|
},
|
||||||
{
|
{
|
||||||
desc: "Partial write; first series with one OOO sample",
|
desc: "Partial write; first series with one OOO sample",
|
||||||
input: func() []writev2.TimeSeries {
|
input: func() []writev2.TimeSeries {
|
||||||
|
@ -543,7 +559,8 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) {
|
||||||
for _, ts := range writeV2RequestFixture.Timeseries {
|
for _, ts := range writeV2RequestFixture.Timeseries {
|
||||||
zeroHistogramIngested := false
|
zeroHistogramIngested := false
|
||||||
zeroFloatHistogramIngested := false
|
zeroFloatHistogramIngested := false
|
||||||
ls := ts.ToLabels(&b, writeV2RequestFixture.Symbols)
|
ls, err := ts.ToLabels(&b, writeV2RequestFixture.Symbols)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
for _, s := range ts.Samples {
|
for _, s := range ts.Samples {
|
||||||
if ts.CreatedTimestamp != 0 && tc.ingestCTZeroSample {
|
if ts.CreatedTimestamp != 0 && tc.ingestCTZeroSample {
|
||||||
|
@ -579,7 +596,9 @@ func TestRemoteWriteHandler_V2Message(t *testing.T) {
|
||||||
}
|
}
|
||||||
if tc.appendExemplarErr == nil {
|
if tc.appendExemplarErr == nil {
|
||||||
for _, e := range ts.Exemplars {
|
for _, e := range ts.Exemplars {
|
||||||
exemplarLabels := e.ToExemplar(&b, writeV2RequestFixture.Symbols).Labels
|
ex, err := e.ToExemplar(&b, writeV2RequestFixture.Symbols)
|
||||||
|
require.NoError(t, err)
|
||||||
|
exemplarLabels := ex.Labels
|
||||||
requireEqual(t, mockExemplar{ls, exemplarLabels, e.Timestamp, e.Value}, appendable.exemplars[j])
|
requireEqual(t, mockExemplar{ls, exemplarLabels, e.Timestamp, e.Value}, appendable.exemplars[j])
|
||||||
j++
|
j++
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue