fix(nativehistograms): validation should fail on unsupported schemas
Histogram.Validate and FloatHistogram.Validate now return error on unsupported schemas. Scrape and remote-write handler reduces the schema to the maximum allowed if it is above the maximum, but below theoretical maximum of 52. For scrape the maximum is a configuration option, for remote-write it is 8. Note: OTLP endpont already does the reduction, without checking that it is below 52 as the spec does not specify a maximum. Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
This commit is contained in:
parent
594f9d63a5
commit
bdf547ae9c
|
@ -798,7 +798,8 @@ func (h *FloatHistogram) AllReverseBucketIterator() BucketIterator[float64] {
|
|||
// create false positives here.
|
||||
func (h *FloatHistogram) Validate() error {
|
||||
var nCount, pCount float64
|
||||
if h.UsesCustomBuckets() {
|
||||
switch {
|
||||
case IsCustomBucketsSchema(h.Schema):
|
||||
if err := checkHistogramCustomBounds(h.CustomValues, h.PositiveSpans, len(h.PositiveBuckets)); err != nil {
|
||||
return fmt.Errorf("custom buckets: %w", err)
|
||||
}
|
||||
|
@ -814,7 +815,7 @@ func (h *FloatHistogram) Validate() error {
|
|||
if len(h.NegativeBuckets) > 0 {
|
||||
return errors.New("custom buckets: must not have negative buckets")
|
||||
}
|
||||
} else {
|
||||
case IsExponentialSchema(h.Schema):
|
||||
if err := checkHistogramSpans(h.PositiveSpans, len(h.PositiveBuckets)); err != nil {
|
||||
return fmt.Errorf("positive side: %w", err)
|
||||
}
|
||||
|
@ -828,6 +829,8 @@ func (h *FloatHistogram) Validate() error {
|
|||
if h.CustomValues != nil {
|
||||
return errors.New("histogram with exponential schema must not have custom bounds")
|
||||
}
|
||||
default:
|
||||
return fmt.Errorf("schema %d: %w", h.Schema, ErrHistogramsInvalidSchema)
|
||||
}
|
||||
err := checkHistogramBuckets(h.PositiveBuckets, &pCount, false)
|
||||
if err != nil {
|
||||
|
|
|
@ -22,7 +22,9 @@ import (
|
|||
|
||||
const (
|
||||
ExponentialSchemaMax int32 = 8
|
||||
ExponentialSchemaMaxReserved int32 = 52
|
||||
ExponentialSchemaMin int32 = -4
|
||||
ExponentialSchemaMinReserved int32 = -9
|
||||
CustomBucketsSchema int32 = -53
|
||||
)
|
||||
|
||||
|
@ -37,6 +39,7 @@ var (
|
|||
ErrHistogramCustomBucketsInfinite = errors.New("histogram custom bounds must be finite")
|
||||
ErrHistogramsIncompatibleSchema = errors.New("cannot apply this operation on histograms with a mix of exponential and custom bucket schemas")
|
||||
ErrHistogramsIncompatibleBounds = errors.New("cannot apply this operation on custom buckets histograms with different custom bounds")
|
||||
ErrHistogramsInvalidSchema = errors.New("histogram has an invalid schema, which must be between -4 and 8 for exponential buckets, or -53 for custom buckets")
|
||||
)
|
||||
|
||||
func IsCustomBucketsSchema(s int32) bool {
|
||||
|
@ -47,6 +50,10 @@ func IsExponentialSchema(s int32) bool {
|
|||
return s >= ExponentialSchemaMin && s <= ExponentialSchemaMax
|
||||
}
|
||||
|
||||
func IsExponentialSchemaReserved(s int32) bool {
|
||||
return s >= ExponentialSchemaMinReserved && s <= ExponentialSchemaMaxReserved
|
||||
}
|
||||
|
||||
// BucketCount is a type constraint for the count in a bucket, which can be
|
||||
// float64 (for type FloatHistogram) or uint64 (for type Histogram).
|
||||
type BucketCount interface {
|
||||
|
|
|
@ -425,7 +425,8 @@ func resize[T any](items []T, n int) []T {
|
|||
// the total h.Count).
|
||||
func (h *Histogram) Validate() error {
|
||||
var nCount, pCount uint64
|
||||
if h.UsesCustomBuckets() {
|
||||
switch {
|
||||
case IsCustomBucketsSchema(h.Schema):
|
||||
if err := checkHistogramCustomBounds(h.CustomValues, h.PositiveSpans, len(h.PositiveBuckets)); err != nil {
|
||||
return fmt.Errorf("custom buckets: %w", err)
|
||||
}
|
||||
|
@ -441,7 +442,7 @@ func (h *Histogram) Validate() error {
|
|||
if len(h.NegativeBuckets) > 0 {
|
||||
return errors.New("custom buckets: must not have negative buckets")
|
||||
}
|
||||
} else {
|
||||
case IsExponentialSchema(h.Schema):
|
||||
if err := checkHistogramSpans(h.PositiveSpans, len(h.PositiveBuckets)); err != nil {
|
||||
return fmt.Errorf("positive side: %w", err)
|
||||
}
|
||||
|
@ -455,6 +456,8 @@ func (h *Histogram) Validate() error {
|
|||
if h.CustomValues != nil {
|
||||
return errors.New("histogram with exponential schema must not have custom bounds")
|
||||
}
|
||||
default:
|
||||
return fmt.Errorf("schema %d: %w", h.Schema, ErrHistogramsInvalidSchema)
|
||||
}
|
||||
err := checkHistogramBuckets(h.PositiveBuckets, &pCount, true)
|
||||
if err != nil {
|
||||
|
|
|
@ -1565,6 +1565,18 @@ func TestHistogramValidation(t *testing.T) {
|
|||
CustomValues: []float64{1, 2, 3, 4, 5, 6, 7, 8},
|
||||
},
|
||||
},
|
||||
"schema too high": {
|
||||
h: &Histogram{
|
||||
Schema: 10,
|
||||
},
|
||||
errMsg: `schema 10: histogram has an invalid schema, which must be between -4 and 8 for exponential buckets, or -53 for custom buckets`,
|
||||
},
|
||||
"schema too low": {
|
||||
h: &Histogram{
|
||||
Schema: -10,
|
||||
},
|
||||
errMsg: `schema -10: histogram has an invalid schema, which must be between -4 and 8 for exponential buckets, or -53 for custom buckets`,
|
||||
},
|
||||
}
|
||||
|
||||
for testName, tc := range tests {
|
||||
|
|
|
@ -414,12 +414,12 @@ type maxSchemaAppender struct {
|
|||
|
||||
func (app *maxSchemaAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
|
||||
if h != nil {
|
||||
if histogram.IsExponentialSchema(h.Schema) && h.Schema > app.maxSchema {
|
||||
if histogram.IsExponentialSchemaReserved(h.Schema) && h.Schema > app.maxSchema {
|
||||
h = h.ReduceResolution(app.maxSchema)
|
||||
}
|
||||
}
|
||||
if fh != nil {
|
||||
if histogram.IsExponentialSchema(fh.Schema) && fh.Schema > app.maxSchema {
|
||||
if histogram.IsExponentialSchemaReserved(fh.Schema) && fh.Schema > app.maxSchema {
|
||||
fh = fh.ReduceResolution(app.maxSchema)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -86,16 +86,16 @@ func (c *PrometheusConverter) addExponentialHistogramDataPoints(ctx context.Cont
|
|||
func exponentialToNativeHistogram(p pmetric.ExponentialHistogramDataPoint, temporality pmetric.AggregationTemporality) (*histogram.Histogram, annotations.Annotations, error) {
|
||||
var annots annotations.Annotations
|
||||
scale := p.Scale()
|
||||
if scale < -4 {
|
||||
if scale < histogram.ExponentialSchemaMin {
|
||||
return nil, annots,
|
||||
fmt.Errorf("cannot convert exponential to native histogram."+
|
||||
" Scale must be >= -4, was %d", scale)
|
||||
" Scale must be >= %d, was %d", histogram.ExponentialSchemaMin, scale)
|
||||
}
|
||||
|
||||
var scaleDown int32
|
||||
if scale > 8 {
|
||||
scaleDown = scale - 8
|
||||
scale = 8
|
||||
if scale > histogram.ExponentialSchemaMax {
|
||||
scaleDown = scale - histogram.ExponentialSchemaMax
|
||||
scale = histogram.ExponentialSchemaMax
|
||||
}
|
||||
|
||||
pSpans, pDeltas := convertBucketsLayout(p.Positive().BucketCounts().AsRaw(), p.Positive().Offset(), scaleDown, true)
|
||||
|
|
|
@ -229,7 +229,7 @@ func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err
|
|||
samplesWithInvalidLabels := 0
|
||||
samplesAppended := 0
|
||||
|
||||
app := &timeLimitAppender{
|
||||
app := &remoteWriteAppender{
|
||||
Appender: h.appendable.Appender(ctx),
|
||||
maxTime: timestamp.FromTime(time.Now().Add(maxAheadTime)),
|
||||
}
|
||||
|
@ -344,7 +344,7 @@ func (h *writeHandler) appendV1Histograms(app storage.Appender, hh []prompb.Hist
|
|||
// NOTE(bwplotka): TSDB storage is NOT idempotent, so we don't allow "partial retry-able" errors.
|
||||
// Once we have 5xx type of error, we immediately stop and rollback all appends.
|
||||
func (h *writeHandler) writeV2(ctx context.Context, req *writev2.Request) (_ WriteResponseStats, errHTTPCode int, _ error) {
|
||||
app := &timeLimitAppender{
|
||||
app := &remoteWriteAppender{
|
||||
Appender: h.appendable.Appender(ctx),
|
||||
maxTime: timestamp.FromTime(time.Now().Add(maxAheadTime)),
|
||||
}
|
||||
|
@ -616,7 +616,7 @@ type rwExporter struct {
|
|||
|
||||
func (rw *rwExporter) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
|
||||
otlpCfg := rw.config().OTLPConfig
|
||||
app := &timeLimitAppender{
|
||||
app := &remoteWriteAppender{
|
||||
Appender: rw.appendable.Appender(ctx),
|
||||
maxTime: timestamp.FromTime(time.Now().Add(maxAheadTime)),
|
||||
}
|
||||
|
@ -719,13 +719,13 @@ func hasDelta(md pmetric.Metrics) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
type timeLimitAppender struct {
|
||||
type remoteWriteAppender struct {
|
||||
storage.Appender
|
||||
|
||||
maxTime int64
|
||||
}
|
||||
|
||||
func (app *timeLimitAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
|
||||
func (app *remoteWriteAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
|
||||
if t > app.maxTime {
|
||||
return 0, fmt.Errorf("%w: timestamp is too far in the future", storage.ErrOutOfBounds)
|
||||
}
|
||||
|
@ -737,11 +737,18 @@ func (app *timeLimitAppender) Append(ref storage.SeriesRef, lset labels.Labels,
|
|||
return ref, nil
|
||||
}
|
||||
|
||||
func (app *timeLimitAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
|
||||
func (app *remoteWriteAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
|
||||
if t > app.maxTime {
|
||||
return 0, fmt.Errorf("%w: timestamp is too far in the future", storage.ErrOutOfBounds)
|
||||
}
|
||||
|
||||
if h != nil && histogram.IsExponentialSchemaReserved(h.Schema) && h.Schema > histogram.ExponentialSchemaMax {
|
||||
h = h.ReduceResolution(histogram.ExponentialSchemaMax)
|
||||
}
|
||||
if fh != nil && histogram.IsExponentialSchemaReserved(fh.Schema) && fh.Schema > histogram.ExponentialSchemaMax {
|
||||
fh = fh.ReduceResolution(histogram.ExponentialSchemaMax)
|
||||
}
|
||||
|
||||
ref, err := app.Appender.AppendHistogram(ref, l, t, h, fh)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
|
@ -749,7 +756,7 @@ func (app *timeLimitAppender) AppendHistogram(ref storage.SeriesRef, l labels.La
|
|||
return ref, nil
|
||||
}
|
||||
|
||||
func (app *timeLimitAppender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) {
|
||||
func (app *remoteWriteAppender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) {
|
||||
if e.Ts > app.maxTime {
|
||||
return 0, fmt.Errorf("%w: timestamp is too far in the future", storage.ErrOutOfBounds)
|
||||
}
|
||||
|
|
|
@ -1134,3 +1134,100 @@ func (m *mockAppendable) AppendCTZeroSample(_ storage.SeriesRef, l labels.Labels
|
|||
m.samples = append(m.samples, mockSample{l, ct, 0})
|
||||
return storage.SeriesRef(hash), nil
|
||||
}
|
||||
|
||||
var (
|
||||
highSchemaHistogram = &histogram.Histogram{
|
||||
Schema: 10,
|
||||
PositiveSpans: []histogram.Span{
|
||||
{
|
||||
Offset: -1,
|
||||
Length: 2,
|
||||
},
|
||||
},
|
||||
PositiveBuckets: []int64{1, 2},
|
||||
NegativeSpans: []histogram.Span{
|
||||
{
|
||||
Offset: 0,
|
||||
Length: 1,
|
||||
},
|
||||
},
|
||||
NegativeBuckets: []int64{1},
|
||||
}
|
||||
reducedSchemaHistogram = &histogram.Histogram{
|
||||
Schema: 8,
|
||||
PositiveSpans: []histogram.Span{
|
||||
{
|
||||
Offset: 0,
|
||||
Length: 1,
|
||||
},
|
||||
},
|
||||
PositiveBuckets: []int64{4},
|
||||
NegativeSpans: []histogram.Span{
|
||||
{
|
||||
Offset: 0,
|
||||
Length: 1,
|
||||
},
|
||||
},
|
||||
NegativeBuckets: []int64{1},
|
||||
}
|
||||
)
|
||||
|
||||
func TestHistogramsReduction(t *testing.T) {
|
||||
for _, protoMsg := range []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1, config.RemoteWriteProtoMsgV2} {
|
||||
t.Run(string(protoMsg), func(t *testing.T) {
|
||||
appendable := &mockAppendable{}
|
||||
handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{protoMsg}, false)
|
||||
|
||||
var (
|
||||
err error
|
||||
payload []byte
|
||||
)
|
||||
|
||||
if protoMsg == config.RemoteWriteProtoMsgV1 {
|
||||
payload, _, _, err = buildWriteRequest(nil, []prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{{Name: "__name__", Value: "test_metric1"}},
|
||||
Histograms: []prompb.Histogram{prompb.FromIntHistogram(1, highSchemaHistogram)},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{{Name: "__name__", Value: "test_metric2"}},
|
||||
Histograms: []prompb.Histogram{prompb.FromFloatHistogram(2, highSchemaHistogram.ToFloat(nil))},
|
||||
},
|
||||
}, nil, nil, nil, nil, "snappy")
|
||||
} else {
|
||||
payload, _, _, err = buildV2WriteRequest(promslog.NewNopLogger(), []writev2.TimeSeries{
|
||||
{
|
||||
LabelsRefs: []uint32{0, 1},
|
||||
Histograms: []writev2.Histogram{writev2.FromIntHistogram(1, highSchemaHistogram)},
|
||||
},
|
||||
{
|
||||
LabelsRefs: []uint32{0, 2},
|
||||
Histograms: []writev2.Histogram{writev2.FromFloatHistogram(2, highSchemaHistogram.ToFloat(nil))},
|
||||
},
|
||||
}, []string{"__name__", "test_metric1", "test_metric2"},
|
||||
nil, nil, nil, "snappy")
|
||||
}
|
||||
require.NoError(t, err)
|
||||
|
||||
req, err := http.NewRequest("", "", bytes.NewReader(payload))
|
||||
require.NoError(t, err)
|
||||
|
||||
req.Header.Set("Content-Type", remoteWriteContentTypeHeaders[protoMsg])
|
||||
|
||||
recorder := httptest.NewRecorder()
|
||||
handler.ServeHTTP(recorder, req)
|
||||
|
||||
resp := recorder.Result()
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, http.StatusNoContent, resp.StatusCode)
|
||||
require.Empty(t, body)
|
||||
|
||||
require.Len(t, appendable.histograms, 2)
|
||||
require.Equal(t, int64(1), appendable.histograms[0].t)
|
||||
require.Equal(t, reducedSchemaHistogram, appendable.histograms[0].h)
|
||||
require.Equal(t, int64(2), appendable.histograms[1].t)
|
||||
require.Equal(t, reducedSchemaHistogram.ToFloat(nil), appendable.histograms[1].fh)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue