feat(storage): add new AppenderV2 interface and compatibility layer
Signed-off-by: bwplotka <bwplotka@gmail.com>
This commit is contained in:
parent
d7e9a2ffb0
commit
656092e3a6
|
|
@ -21,3 +21,8 @@ type Metadata struct {
|
||||||
Unit string `json:"unit"`
|
Unit string `json:"unit"`
|
||||||
Help string `json:"help"`
|
Help string `json:"help"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// IsEmpty returns true if metadata structure is empty, including unknown type case.
|
||||||
|
func (m Metadata) IsEmpty() bool {
|
||||||
|
return (m.Type == "" || m.Type == model.MetricTypeUnknown) && m.Unit == "" && m.Help == ""
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,221 @@
|
||||||
|
// Copyright 2025 The Prometheus Authors
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"log/slog"
|
||||||
|
|
||||||
|
"github.com/prometheus/prometheus/model/exemplar"
|
||||||
|
"github.com/prometheus/prometheus/model/histogram"
|
||||||
|
"github.com/prometheus/prometheus/model/labels"
|
||||||
|
"github.com/prometheus/prometheus/model/metadata"
|
||||||
|
)
|
||||||
|
|
||||||
|
// AsAppender exposes new V2 appender implementation as old appender.
|
||||||
|
//
|
||||||
|
// Because old appender interface allow exemplar, metadata and CT updates to
|
||||||
|
// be disjointed from sample and histogram updates, you need extra appender pieces
|
||||||
|
// to handle the old behaviour.
|
||||||
|
func AsAppender(appender AppenderV2, exemplar ExemplarAppender, metadata MetadataUpdater, ct combinedCTAppender) Appender {
|
||||||
|
return &asAppender{app: appender, exemplar: exemplar, metadata: metadata, ct: ct}
|
||||||
|
}
|
||||||
|
|
||||||
|
// For historical reason this interface is not consistent
|
||||||
|
// (HistogramAppender has CTZero but Appender does not have for Sample).
|
||||||
|
// No need to fix this elsewhere, given we plan to deprecate old interface and those methods.
|
||||||
|
type combinedCTAppender interface {
|
||||||
|
CreatedTimestampAppender
|
||||||
|
AppendHistogramCTZeroSample(ref SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (SeriesRef, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type asAppender struct {
|
||||||
|
opts AppendV2Options
|
||||||
|
app AppenderV2
|
||||||
|
|
||||||
|
exemplar ExemplarAppender
|
||||||
|
metadata MetadataUpdater
|
||||||
|
ct combinedCTAppender
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *asAppender) Append(ref SeriesRef, l labels.Labels, t int64, v float64) (SeriesRef, error) {
|
||||||
|
return a.app.AppendSample(ref, l, Metadata{}, 0, t, v, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *asAppender) AppendHistogram(ref SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (SeriesRef, error) {
|
||||||
|
return a.app.AppendHistogram(ref, l, Metadata{}, 0, t, h, fh, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *asAppender) Commit() error {
|
||||||
|
return a.app.Commit()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *asAppender) Rollback() error {
|
||||||
|
return a.app.Rollback()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *asAppender) SetOptions(opts *AppendOptions) {
|
||||||
|
if opts == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
a.opts.DiscardOutOfOrder = opts.DiscardOutOfOrder
|
||||||
|
a.app.SetOptions(&a.opts)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *asAppender) AppendCTZeroSample(ref SeriesRef, l labels.Labels, t, ct int64) (SeriesRef, error) {
|
||||||
|
// We could consider using AppendSample with AppendCTAsZero true, but this option is stateful,
|
||||||
|
// so use compatibility ct interface.
|
||||||
|
return a.ct.AppendCTZeroSample(ref, l, t, ct)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *asAppender) AppendHistogramCTZeroSample(ref SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (SeriesRef, error) {
|
||||||
|
// We could consider using AppendHistogram with AppendCTAsZero true, but this option is stateful,
|
||||||
|
// so use compatibility ct interface.
|
||||||
|
return a.ct.AppendHistogramCTZeroSample(ref, l, t, ct, h, fh)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *asAppender) AppendExemplar(ref SeriesRef, l labels.Labels, e exemplar.Exemplar) (SeriesRef, error) {
|
||||||
|
return a.exemplar.AppendExemplar(ref, l, e)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *asAppender) UpdateMetadata(ref SeriesRef, l labels.Labels, m metadata.Metadata) (SeriesRef, error) {
|
||||||
|
return a.metadata.UpdateMetadata(ref, l, m)
|
||||||
|
}
|
||||||
|
|
||||||
|
// AsAppenderV2 exposes old appender implementation as a new v2 appender.
|
||||||
|
func AsAppenderV2(log *slog.Logger, appender Appender) AppenderV2 {
|
||||||
|
if log == nil {
|
||||||
|
log = slog.Default()
|
||||||
|
}
|
||||||
|
return &asAppenderV2{app: appender, log: log}
|
||||||
|
}
|
||||||
|
|
||||||
|
type asAppenderV2 struct {
|
||||||
|
app Appender
|
||||||
|
opts AppendV2Options
|
||||||
|
|
||||||
|
log *slog.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *asAppenderV2) AppendSample(ref SeriesRef, ls labels.Labels, meta Metadata, ct, t int64, v float64, es []exemplar.Exemplar) (_ SeriesRef, err error) {
|
||||||
|
if ct != 0 && a.opts.AppendCTAsZero {
|
||||||
|
ref, err = a.app.AppendCTZeroSample(ref, ls, t, ct)
|
||||||
|
// Skip the error in general (mimic the behaviour of the scrape loop).
|
||||||
|
// https://github.com/prometheus/prometheus/blob/913cc8f72b8a4f6ae4beb1d168c16a88ca4705ab/scrape/scrape.go#L1787C5-L1791C7
|
||||||
|
if err != nil && !errors.Is(err, ErrOutOfOrderCT) { // OOO is a common case, ignoring completely for now.
|
||||||
|
// CT is an experimental feature. For now, we don't need to fail the
|
||||||
|
// scrape on errors updating the created timestamp, log debug.
|
||||||
|
a.log.Debug("error when appending CT in scrape loop", "series", meta, "ct", ct, "t", t, "err", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ref, err = a.app.Append(ref, ls, t, v)
|
||||||
|
if err != nil {
|
||||||
|
return ref, err
|
||||||
|
}
|
||||||
|
|
||||||
|
outOfOrderExemplars := 0
|
||||||
|
for _, e := range es {
|
||||||
|
_, exemplarErr := a.app.AppendExemplar(ref, ls, e)
|
||||||
|
// Skip the error in general (mimic the behaviour of the scrape loop).
|
||||||
|
// https://github.com/prometheus/prometheus/blob/913cc8f72b8a4f6ae4beb1d168c16a88ca4705ab/scrape/scrape.go#L1862C3-L1881C4
|
||||||
|
switch {
|
||||||
|
case exemplarErr == nil:
|
||||||
|
// Do nothing.
|
||||||
|
case errors.Is(exemplarErr, ErrOutOfOrderExemplar):
|
||||||
|
outOfOrderExemplars++
|
||||||
|
default:
|
||||||
|
// Since exemplar storage is still experimental, we don't fail the scrape on ingestion errors.
|
||||||
|
a.log.Debug("error while adding exemplar in AppendExemplar", "exemplar", fmt.Sprintf("%+v", e), "err", exemplarErr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if outOfOrderExemplars > 0 && outOfOrderExemplars == len(es) {
|
||||||
|
// Only report out of order exemplars if all are out of order, otherwise this was a partial update
|
||||||
|
// to some existing set of exemplars.
|
||||||
|
// TODO: Decide what to do with scrape logic for exemplar error handling.
|
||||||
|
// appErrs.numExemplarOutOfOrder += outOfOrderExemplars
|
||||||
|
a.log.Debug("Out of order exemplars", "count", outOfOrderExemplars, "latest", fmt.Sprintf("%+v", es[len(es)-1]))
|
||||||
|
// sl.metrics.targetScrapeExemplarOutOfOrder.Add(float64(outOfOrderExemplars))
|
||||||
|
}
|
||||||
|
|
||||||
|
if !meta.IsEmpty() {
|
||||||
|
ref, err = a.app.UpdateMetadata(ref, ls, meta.Metadata)
|
||||||
|
if err != nil {
|
||||||
|
return ref, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ref, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *asAppenderV2) AppendHistogram(ref SeriesRef, ls labels.Labels, meta Metadata, ct, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram, es []exemplar.Exemplar) (_ SeriesRef, err error) {
|
||||||
|
if ct != 0 && a.opts.AppendCTAsZero {
|
||||||
|
ref, err = a.app.AppendHistogramCTZeroSample(ref, ls, t, ct, h, fh)
|
||||||
|
// Skip the error in general (mimic the behaviour of the scrape loop).
|
||||||
|
// https://github.com/prometheus/prometheus/blob/913cc8f72b8a4f6ae4beb1d168c16a88ca4705ab/scrape/scrape.go#L1787C5-L1791C7
|
||||||
|
if err != nil && !errors.Is(err, ErrOutOfOrderCT) { // OOO is a common case, ignoring completely for now.
|
||||||
|
// CT is an experimental feature. For now, we don't need to fail the
|
||||||
|
// scrape on errors updating the created timestamp, log debug.
|
||||||
|
a.log.Debug("error when appending CT in scrape loop", "series", meta, "ct", ct, "t", t, "err", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ref, err = a.app.AppendHistogram(ref, ls, t, h, fh)
|
||||||
|
if err != nil {
|
||||||
|
return ref, err
|
||||||
|
}
|
||||||
|
|
||||||
|
outOfOrderExemplars := 0
|
||||||
|
for _, e := range es {
|
||||||
|
_, exemplarErr := a.app.AppendExemplar(ref, ls, e)
|
||||||
|
// Skip the error in general (mimic the behaviour of the scrape loop).
|
||||||
|
// https://github.com/prometheus/prometheus/blob/913cc8f72b8a4f6ae4beb1d168c16a88ca4705ab/scrape/scrape.go#L1862C3-L1881C4
|
||||||
|
switch {
|
||||||
|
case exemplarErr == nil:
|
||||||
|
// Do nothing.
|
||||||
|
case errors.Is(exemplarErr, ErrOutOfOrderExemplar):
|
||||||
|
outOfOrderExemplars++
|
||||||
|
default:
|
||||||
|
// Since exemplar storage is still experimental, we don't fail the scrape on ingestion errors.
|
||||||
|
a.log.Debug("error while adding exemplar in AppendExemplar", "exemplar", fmt.Sprintf("%+v", e), "err", exemplarErr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if outOfOrderExemplars > 0 && outOfOrderExemplars == len(es) {
|
||||||
|
// Only report out of order exemplars if all are out of order, otherwise this was a partial update
|
||||||
|
// to some existing set of exemplars.
|
||||||
|
// TODO: Decide what to do with scrape logic for exemplar error handling.
|
||||||
|
// appErrs.numExemplarOutOfOrder += outOfOrderExemplars
|
||||||
|
a.log.Debug("Out of order exemplars", "count", outOfOrderExemplars, "latest", fmt.Sprintf("%+v", es[len(es)-1]))
|
||||||
|
// sl.metrics.targetScrapeExemplarOutOfOrder.Add(float64(outOfOrderExemplars))
|
||||||
|
}
|
||||||
|
|
||||||
|
if !meta.IsEmpty() {
|
||||||
|
ref, err = a.app.UpdateMetadata(ref, ls, meta.Metadata)
|
||||||
|
if err != nil {
|
||||||
|
return ref, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ref, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *asAppenderV2) Commit() error { return a.app.Commit() }
|
||||||
|
|
||||||
|
func (a *asAppenderV2) Rollback() error { return a.app.Rollback() }
|
||||||
|
|
||||||
|
func (a *asAppenderV2) SetOptions(opts *AppendV2Options) {
|
||||||
|
if opts == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
a.opts = *opts
|
||||||
|
a.app.SetOptions(&AppendOptions{DiscardOutOfOrder: opts.DiscardOutOfOrder})
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,269 @@
|
||||||
|
// Copyright 2025 The Prometheus Authors
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"github.com/prometheus/prometheus/model/exemplar"
|
||||||
|
"github.com/prometheus/prometheus/model/histogram"
|
||||||
|
"github.com/prometheus/prometheus/model/labels"
|
||||||
|
"github.com/prometheus/prometheus/model/metadata"
|
||||||
|
)
|
||||||
|
|
||||||
|
type mockAppender struct {
|
||||||
|
t *testing.T
|
||||||
|
|
||||||
|
// For asserting calls.
|
||||||
|
gotLabels []labels.Labels
|
||||||
|
gotTs []int64
|
||||||
|
gotSamples []float64
|
||||||
|
gotHistograms []*histogram.Histogram
|
||||||
|
gotFloatHistograms []*histogram.FloatHistogram
|
||||||
|
gotExemplars []exemplar.Exemplar
|
||||||
|
gotMetadata []metadata.Metadata
|
||||||
|
|
||||||
|
committed bool
|
||||||
|
rolledBack bool
|
||||||
|
opts *AppendOptions
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockAppender) Append(ref SeriesRef, l labels.Labels, t int64, v float64) (SeriesRef, error) {
|
||||||
|
m.gotLabels = append(m.gotLabels, l)
|
||||||
|
m.gotTs = append(m.gotTs, t)
|
||||||
|
m.gotSamples = append(m.gotSamples, v)
|
||||||
|
return ref, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockAppender) AppendHistogram(ref SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (SeriesRef, error) {
|
||||||
|
m.gotLabels = append(m.gotLabels, l)
|
||||||
|
m.gotTs = append(m.gotTs, t)
|
||||||
|
m.gotHistograms = append(m.gotHistograms, h)
|
||||||
|
m.gotFloatHistograms = append(m.gotFloatHistograms, fh)
|
||||||
|
return ref, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockAppender) Commit() error {
|
||||||
|
m.committed = true
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockAppender) Rollback() error {
|
||||||
|
m.rolledBack = true
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockAppender) SetOptions(opts *AppendOptions) {
|
||||||
|
m.opts = opts
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockAppender) AppendCTZeroSample(ref SeriesRef, l labels.Labels, _, ct int64) (SeriesRef, error) {
|
||||||
|
return m.Append(ref, l, ct, 0) // Mimic the desired implementation.
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
zeroHistogram = &histogram.Histogram{
|
||||||
|
// The CTZeroSample represents a counter reset by definition.
|
||||||
|
CounterResetHint: histogram.CounterReset,
|
||||||
|
}
|
||||||
|
zeroFloatHistogram = &histogram.FloatHistogram{
|
||||||
|
// The CTZeroSample represents a counter reset by definition.
|
||||||
|
CounterResetHint: histogram.CounterReset,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
func (m *mockAppender) AppendHistogramCTZeroSample(ref SeriesRef, l labels.Labels, _, ct int64, _ *histogram.Histogram, _ *histogram.FloatHistogram) (SeriesRef, error) {
|
||||||
|
return m.AppendHistogram(ref, l, ct, zeroHistogram, zeroFloatHistogram) // Mimic the desired implementation.
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockAppender) AppendExemplar(ref SeriesRef, l labels.Labels, e exemplar.Exemplar) (SeriesRef, error) {
|
||||||
|
m.gotLabels = append(m.gotLabels, l)
|
||||||
|
m.gotExemplars = append(m.gotExemplars, e)
|
||||||
|
return ref, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockAppender) UpdateMetadata(ref SeriesRef, l labels.Labels, md metadata.Metadata) (SeriesRef, error) {
|
||||||
|
m.gotLabels = append(m.gotLabels, l)
|
||||||
|
m.gotMetadata = append(m.gotMetadata, md)
|
||||||
|
return ref, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAsAppenderV2(t *testing.T) {
|
||||||
|
testAsAppenderV2(t, func(appender Appender) AppenderV2 {
|
||||||
|
return AsAppenderV2(nil, appender)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAsAppender(t *testing.T) {
|
||||||
|
// To minimize the tests and mocks to maintain, we AsAppender between AsAppenderV2.
|
||||||
|
testAsAppenderV2(t, func(appender Appender) AppenderV2 {
|
||||||
|
return AsAppenderV2(nil,
|
||||||
|
AsAppender(
|
||||||
|
AsAppenderV2(nil, appender),
|
||||||
|
appender.(ExemplarAppender),
|
||||||
|
appender.(MetadataUpdater),
|
||||||
|
appender.(combinedCTAppender),
|
||||||
|
))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func testAsAppenderV2(t *testing.T, asAppenderV2Fn func(Appender) AppenderV2) {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
ls := labels.FromStrings("a", "b")
|
||||||
|
meta := Metadata{Metadata: metadata.Metadata{Type: "counter"}}
|
||||||
|
es := []exemplar.Exemplar{{Value: 1}, {Value: 2}}
|
||||||
|
h := &histogram.Histogram{Count: 1}
|
||||||
|
fh := &histogram.FloatHistogram{Count: 2}
|
||||||
|
t.Run("method=AppendSample", func(t *testing.T) {
|
||||||
|
t.Run("basic", func(t *testing.T) {
|
||||||
|
mockApp := &mockAppender{t: t}
|
||||||
|
app := asAppenderV2Fn(mockApp)
|
||||||
|
|
||||||
|
_, err := app.AppendSample(1, ls, Metadata{}, 0, 100, 1.23, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
expectedMockState := mockAppender{
|
||||||
|
t: t,
|
||||||
|
gotLabels: []labels.Labels{ls},
|
||||||
|
gotSamples: []float64{1.23},
|
||||||
|
gotTs: []int64{100},
|
||||||
|
}
|
||||||
|
require.Equal(t, expectedMockState, *mockApp)
|
||||||
|
})
|
||||||
|
t.Run("appendCTAsZero=true", func(t *testing.T) {
|
||||||
|
mockApp := &mockAppender{t: t}
|
||||||
|
app := asAppenderV2Fn(mockApp)
|
||||||
|
app.SetOptions(&AppendV2Options{AppendCTAsZero: true})
|
||||||
|
|
||||||
|
_, err := app.AppendSample(1, ls, Metadata{}, 0, 100, 1.23, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
expectedMockState := mockAppender{
|
||||||
|
t: t,
|
||||||
|
gotLabels: []labels.Labels{ls},
|
||||||
|
gotSamples: []float64{1.23},
|
||||||
|
gotTs: []int64{100},
|
||||||
|
opts: &AppendOptions{DiscardOutOfOrder: false},
|
||||||
|
}
|
||||||
|
require.Equal(t, expectedMockState, *mockApp)
|
||||||
|
|
||||||
|
_, err = app.AppendSample(1, ls, Metadata{}, 101, 102, 3.45, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
expectedMockState = mockAppender{
|
||||||
|
t: t,
|
||||||
|
gotLabels: []labels.Labels{ls, ls, ls},
|
||||||
|
gotSamples: []float64{1.23, 0, 3.45},
|
||||||
|
gotTs: []int64{100, 101, 102},
|
||||||
|
opts: &AppendOptions{DiscardOutOfOrder: false},
|
||||||
|
}
|
||||||
|
require.Equal(t, expectedMockState, *mockApp)
|
||||||
|
})
|
||||||
|
t.Run("with extra", func(t *testing.T) {
|
||||||
|
mockApp := &mockAppender{t: t}
|
||||||
|
app := asAppenderV2Fn(mockApp)
|
||||||
|
|
||||||
|
_, err := app.AppendSample(1, ls, meta, 0, 100, 1.23, es)
|
||||||
|
require.NoError(t, err)
|
||||||
|
expectedMockState := mockAppender{
|
||||||
|
t: t,
|
||||||
|
gotLabels: []labels.Labels{ls, ls, ls, ls}, // AppendSample, AppendExemplar, AppendExemplar, UpdateMetadata.
|
||||||
|
gotSamples: []float64{1.23},
|
||||||
|
gotTs: []int64{100},
|
||||||
|
gotExemplars: es,
|
||||||
|
gotMetadata: []metadata.Metadata{meta.Metadata},
|
||||||
|
}
|
||||||
|
require.Equal(t, expectedMockState, *mockApp)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
t.Run("method=AppendHistogram", func(t *testing.T) {
|
||||||
|
t.Run("basic", func(t *testing.T) {
|
||||||
|
mockApp := &mockAppender{t: t}
|
||||||
|
app := asAppenderV2Fn(mockApp)
|
||||||
|
|
||||||
|
_, err := app.AppendHistogram(1, ls, Metadata{}, 0, 102, h, nil, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
expectedMockState := mockAppender{
|
||||||
|
t: t,
|
||||||
|
gotLabels: []labels.Labels{ls},
|
||||||
|
gotHistograms: []*histogram.Histogram{h},
|
||||||
|
gotFloatHistograms: []*histogram.FloatHistogram{nil},
|
||||||
|
gotTs: []int64{102},
|
||||||
|
}
|
||||||
|
require.Equal(t, expectedMockState, *mockApp)
|
||||||
|
})
|
||||||
|
t.Run("appendCTAsZero=true", func(t *testing.T) {
|
||||||
|
mockApp := &mockAppender{t: t}
|
||||||
|
app := asAppenderV2Fn(mockApp)
|
||||||
|
app.SetOptions(&AppendV2Options{AppendCTAsZero: true})
|
||||||
|
|
||||||
|
_, err := app.AppendHistogram(1, ls, Metadata{}, 0, 102, h, nil, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
expectedMockState := mockAppender{
|
||||||
|
t: t,
|
||||||
|
gotLabels: []labels.Labels{ls},
|
||||||
|
gotHistograms: []*histogram.Histogram{h},
|
||||||
|
gotFloatHistograms: []*histogram.FloatHistogram{nil},
|
||||||
|
gotTs: []int64{102},
|
||||||
|
opts: &AppendOptions{DiscardOutOfOrder: false},
|
||||||
|
}
|
||||||
|
require.Equal(t, expectedMockState, *mockApp)
|
||||||
|
|
||||||
|
_, err = app.AppendHistogram(1, ls, Metadata{}, 103, 104, h, nil, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
expectedMockState = mockAppender{
|
||||||
|
t: t,
|
||||||
|
gotLabels: []labels.Labels{ls, ls, ls},
|
||||||
|
gotHistograms: []*histogram.Histogram{h, zeroHistogram, h},
|
||||||
|
gotFloatHistograms: []*histogram.FloatHistogram{nil, zeroFloatHistogram, nil},
|
||||||
|
gotTs: []int64{102, 103, 104},
|
||||||
|
opts: &AppendOptions{DiscardOutOfOrder: false},
|
||||||
|
}
|
||||||
|
require.Equal(t, expectedMockState, *mockApp)
|
||||||
|
})
|
||||||
|
t.Run("with extra", func(t *testing.T) {
|
||||||
|
mockApp := &mockAppender{t: t}
|
||||||
|
app := asAppenderV2Fn(mockApp)
|
||||||
|
|
||||||
|
_, err := app.AppendHistogram(1, ls, meta, 0, 102, nil, fh, es)
|
||||||
|
require.NoError(t, err)
|
||||||
|
expectedMockState := mockAppender{
|
||||||
|
t: t,
|
||||||
|
gotLabels: []labels.Labels{ls, ls, ls, ls}, // AppendSample, AppendExemplar, AppendExemplar, UpdateMetadata.
|
||||||
|
gotHistograms: []*histogram.Histogram{nil},
|
||||||
|
gotFloatHistograms: []*histogram.FloatHistogram{fh},
|
||||||
|
gotTs: []int64{102},
|
||||||
|
gotExemplars: es,
|
||||||
|
gotMetadata: []metadata.Metadata{meta.Metadata},
|
||||||
|
}
|
||||||
|
require.Equal(t, expectedMockState, *mockApp)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("method=Commit/Rollback/SetOptions", func(t *testing.T) {
|
||||||
|
mockApp := &mockAppender{t: t}
|
||||||
|
app := asAppenderV2Fn(mockApp)
|
||||||
|
|
||||||
|
require.NoError(t, app.Commit())
|
||||||
|
require.True(t, mockApp.committed)
|
||||||
|
|
||||||
|
require.NoError(t, app.Rollback())
|
||||||
|
require.True(t, mockApp.rolledBack)
|
||||||
|
|
||||||
|
opts := &AppendV2Options{DiscardOutOfOrder: true}
|
||||||
|
app.SetOptions(opts)
|
||||||
|
require.NotNil(t, mockApp.opts)
|
||||||
|
require.True(t, mockApp.opts.DiscardOutOfOrder)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
@ -255,6 +255,83 @@ func (f QueryableFunc) Querier(mint, maxt int64) (Querier, error) {
|
||||||
return f(mint, maxt)
|
return f(mint, maxt)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// AppendV2Options provides options for implementations of the
|
||||||
|
// AppenderV2 interface.
|
||||||
|
type AppendV2Options struct {
|
||||||
|
DiscardOutOfOrder bool // TODO: Pick better name? Make it an append option?
|
||||||
|
|
||||||
|
// AppendCTAsZero ensures that CT is appended as fake zero sample on all append methods.
|
||||||
|
// TODO(bwplotka): This option might be removed in future.
|
||||||
|
AppendCTAsZero bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// Metadata stores a series' metadata information.
|
||||||
|
type Metadata struct {
|
||||||
|
metadata.Metadata
|
||||||
|
|
||||||
|
// MetricFamilyName (optional) stores metric family name of the series that
|
||||||
|
// this metadata relates too. If the client of the AppenderV2 has this information
|
||||||
|
// (e.g. from scrape), it's recommended to pass it to the appender.
|
||||||
|
// Some implementation store Metadata per metric family and this information
|
||||||
|
// allows them to avoid slow and prone to error metric family detection.
|
||||||
|
//
|
||||||
|
// NOTE(krajorama): Example purpose is highlighted in OTLP ingestion: OTLP calculates the
|
||||||
|
// metric family name for all metrics and uses it for generating summary,
|
||||||
|
// histogram series by adding the magic suffixes. The metric family name is
|
||||||
|
// passed down to the appender in case the storage needs it for metadata updates.
|
||||||
|
// Known user of this is Mimir that implements /api/v1/metadata and uses
|
||||||
|
// Remote-Write 1.0 for this. Might be removed later if no longer
|
||||||
|
// needed by any downstream project.
|
||||||
|
MetricFamilyName string
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsEmpty returns true if metadata structure is empty, including unknown type case.
|
||||||
|
func (m Metadata) IsEmpty() bool {
|
||||||
|
return m.Metadata.IsEmpty() && m.MetricFamilyName == ""
|
||||||
|
}
|
||||||
|
|
||||||
|
// AppenderV2 provides batched appends against a storage for either float or histogram samples.
|
||||||
|
// It must be completed with a call to Commit or Rollback and must not be reused afterwards.
|
||||||
|
//
|
||||||
|
// Operations on the Appender interface are not goroutine-safe.
|
||||||
|
//
|
||||||
|
// The type of samples (float64, histogram, etc) appended for a given series must remain same within an Appender.
|
||||||
|
// The behaviour is undefined if samples of different types are appended to the same series in a single Commit().
|
||||||
|
// TODO(krajorama): Undefined behaviour might change in https://github.com/prometheus/prometheus/issues/15177
|
||||||
|
//
|
||||||
|
// NOTE(bwplotka): This interface is experimental, migration of Prometheus pieces is in progress.
|
||||||
|
// TODO(bwplotka): Add complex error for partial error cases.
|
||||||
|
// TODO(bwplotka): Looking on all usaes, we could propose even simpler interface with just one method
|
||||||
|
// Append(ref SeriesRef, ls labels.Labels, meta Metadata, ct, t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram, es []exemplar.Exemplar) (SeriesRef, error).
|
||||||
|
type AppenderV2 interface {
|
||||||
|
// AppendSample appends a float sample and related exemplars, metadata, and created timestamp to the storage.
|
||||||
|
//
|
||||||
|
// Implementations MUST attempt to append sample even if metadata, exemplar or CT appends fail.
|
||||||
|
// TODO(bwplotka): Raname to Append or AppendFloat?
|
||||||
|
AppendSample(ref SeriesRef, ls labels.Labels, meta Metadata, ct, t int64, v float64, es []exemplar.Exemplar) (SeriesRef, error)
|
||||||
|
|
||||||
|
// AppendHistogram appends a float or int histogram sample and related exemplars, metadata, and created timestamp to the storage.
|
||||||
|
//
|
||||||
|
// Implementations MUST attempt to append sample even if metadata, exemplar or CT appends fail.
|
||||||
|
AppendHistogram(ref SeriesRef, ls labels.Labels, meta Metadata, ct, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram, es []exemplar.Exemplar) (SeriesRef, error)
|
||||||
|
|
||||||
|
// Commit submits the collected samples and purges the batch. If Commit
|
||||||
|
// returns a non-nil error, it also rolls back all modifications made in
|
||||||
|
// the appender so far, as Rollback would do. In any case, an Appender
|
||||||
|
// must not be used anymore after Commit has been called.
|
||||||
|
Commit() error
|
||||||
|
|
||||||
|
// Rollback rolls back all modifications made in the appender so far.
|
||||||
|
// Appender has to be discarded after rollback.
|
||||||
|
Rollback() error
|
||||||
|
|
||||||
|
// SetOptions configures the appender with specific append options such as
|
||||||
|
// discarding out-of-order samples even if out-of-order is enabled in the TSDB.
|
||||||
|
SetOptions(opts *AppendV2Options)
|
||||||
|
}
|
||||||
|
|
||||||
|
// AppendOptions provides options for implementations of the
|
||||||
|
// Appender interface.
|
||||||
type AppendOptions struct {
|
type AppendOptions struct {
|
||||||
DiscardOutOfOrder bool
|
DiscardOutOfOrder bool
|
||||||
}
|
}
|
||||||
|
|
@ -266,6 +343,8 @@ type AppendOptions struct {
|
||||||
//
|
//
|
||||||
// The type of samples (float64, histogram, etc) appended for a given series must remain same within an Appender.
|
// The type of samples (float64, histogram, etc) appended for a given series must remain same within an Appender.
|
||||||
// The behaviour is undefined if samples of different types are appended to the same series in a single Commit().
|
// The behaviour is undefined if samples of different types are appended to the same series in a single Commit().
|
||||||
|
//
|
||||||
|
// WARNING(bwplotka): This interface might be deprecated soon, try AppenderV2 instead.
|
||||||
type Appender interface {
|
type Appender interface {
|
||||||
// Append adds a sample pair for the given series.
|
// Append adds a sample pair for the given series.
|
||||||
// An optional series reference can be provided to accelerate calls.
|
// An optional series reference can be provided to accelerate calls.
|
||||||
|
|
@ -300,7 +379,7 @@ type Appender interface {
|
||||||
// GetRef is an extra interface on Appenders used by downstream projects
|
// GetRef is an extra interface on Appenders used by downstream projects
|
||||||
// (e.g. Cortex) to avoid maintaining a parallel set of references.
|
// (e.g. Cortex) to avoid maintaining a parallel set of references.
|
||||||
type GetRef interface {
|
type GetRef interface {
|
||||||
// Returns reference number that can be used to pass to Appender.Append(),
|
// GetRef returns reference number that can be used to pass to Appender.Append(),
|
||||||
// and a set of labels that will not cause another copy when passed to Appender.Append().
|
// and a set of labels that will not cause another copy when passed to Appender.Append().
|
||||||
// 0 means the appender does not have a reference to this series.
|
// 0 means the appender does not have a reference to this series.
|
||||||
// hash should be a hash of lset.
|
// hash should be a hash of lset.
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue