feat: Use appenderV2 in OTLP ingestion

Depends on https://github.com/prometheus/prometheus/pull/17104

Signed-off-by: bwplotka <bwplotka@gmail.com>
This commit is contained in:
bwplotka 2025-09-11 09:23:35 +01:00
parent 656092e3a6
commit aad50c1dd0
1 changed files with 28 additions and 40 deletions

View File

@ -31,30 +31,6 @@ import (
"github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage"
) )
// Metadata extends metadata.Metadata with the metric family name.
// OTLP calculates the metric family name for all metrics and uses
// it for generating summary, histogram series by adding the magic
// suffixes. The metric family name is passed down to the appender
// in case the storage needs it for metadata updates.
// Known user is Mimir that implements /api/v1/metadata and uses
// Remote-Write 1.0 for this. Might be removed later if no longer
// needed by any downstream project.
type Metadata struct {
metadata.Metadata
MetricFamilyName string
}
// CombinedAppender is similar to storage.Appender, but combines updates to
// metadata, created timestamps, exemplars and samples into a single call.
type CombinedAppender interface {
// AppendSample appends a sample and related exemplars, metadata, and
// created timestamp to the storage.
AppendSample(ls labels.Labels, meta Metadata, ct, t int64, v float64, es []exemplar.Exemplar) error
// AppendHistogram appends a histogram and related exemplars, metadata, and
// created timestamp to the storage.
AppendHistogram(ls labels.Labels, meta Metadata, ct, t int64, h *histogram.Histogram, es []exemplar.Exemplar) error
}
// CombinedAppenderMetrics is for the metrics observed by the // CombinedAppenderMetrics is for the metrics observed by the
// combinedAppender implementation. // combinedAppender implementation.
type CombinedAppenderMetrics struct { type CombinedAppenderMetrics struct {
@ -82,11 +58,11 @@ func NewCombinedAppenderMetrics(reg prometheus.Registerer) CombinedAppenderMetri
// NewCombinedAppender creates a combined appender that sets start times and // NewCombinedAppender creates a combined appender that sets start times and
// updates metadata for each series only once, and appends samples and // updates metadata for each series only once, and appends samples and
// exemplars for each call. // exemplars for each call.
func NewCombinedAppender(app storage.Appender, logger *slog.Logger, ingestCTZeroSample bool, metrics CombinedAppenderMetrics) CombinedAppender { // TODO(bwplotka): Rename to OTLP specific appender
func NewCombinedAppender(app storage.Appender, logger *slog.Logger, metrics CombinedAppenderMetrics) storage.AppenderV2 {
return &combinedAppender{ return &combinedAppender{
app: app, Appender: app,
logger: logger, logger: logger,
ingestCTZeroSample: ingestCTZeroSample,
refs: make(map[uint64]seriesRef), refs: make(map[uint64]seriesRef),
samplesAppendedWithoutMetadata: metrics.samplesAppendedWithoutMetadata, samplesAppendedWithoutMetadata: metrics.samplesAppendedWithoutMetadata,
outOfOrderExemplars: metrics.outOfOrderExemplars, outOfOrderExemplars: metrics.outOfOrderExemplars,
@ -101,7 +77,9 @@ type seriesRef struct {
} }
type combinedAppender struct { type combinedAppender struct {
app storage.Appender storage.Appender
opts storage.AppendV2Options
logger *slog.Logger logger *slog.Logger
samplesAppendedWithoutMetadata prometheus.Counter samplesAppendedWithoutMetadata prometheus.Counter
outOfOrderExemplars prometheus.Counter outOfOrderExemplars prometheus.Counter
@ -112,23 +90,33 @@ type combinedAppender struct {
refs map[uint64]seriesRef refs map[uint64]seriesRef
} }
func (b *combinedAppender) AppendSample(ls labels.Labels, meta Metadata, ct, t int64, v float64, es []exemplar.Exemplar) (err error) { func (b *combinedAppender) SetOptions(opts *storage.AppendV2Options) {
b.opts = *opts
}
func (b *combinedAppender) AppendSample(_ storage.SeriesRef, ls labels.Labels, meta storage.Metadata, ct, t int64, v float64, es []exemplar.Exemplar) (_ storage.SeriesRef, err error) {
return b.appendFloatOrHistogram(ls, meta.Metadata, ct, t, v, nil, es) return b.appendFloatOrHistogram(ls, meta.Metadata, ct, t, v, nil, es)
} }
func (b *combinedAppender) AppendHistogram(ls labels.Labels, meta Metadata, ct, t int64, h *histogram.Histogram, es []exemplar.Exemplar) (err error) { func (b *combinedAppender) AppendHistogram(_ storage.SeriesRef, ls labels.Labels, meta storage.Metadata, ct, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram, es []exemplar.Exemplar) (_ storage.SeriesRef, err error) {
if h == nil { if h == nil {
// Sanity check, we should never get here with a nil histogram. // Sanity check, we should never get here with a nil histogram.
b.logger.Error("Received nil histogram in CombinedAppender.AppendHistogram", "series", ls.String()) b.logger.Error("Received nil histogram in CombinedAppender.AppendHistogram", "series", ls.String())
return errors.New("internal error, attempted to append nil histogram") return 0, errors.New("internal error, attempted to append nil histogram")
}
if fh != nil {
// Sanity check, we should never get here with a otlp appender.
b.logger.Error("Received not nil float histogram in CombinedAppender.AppendHistogram", "series", ls.String())
return 0, errors.New("internal error, attempted to append float histogram; works only for nil histograms")
} }
return b.appendFloatOrHistogram(ls, meta.Metadata, ct, t, 0, h, es) return b.appendFloatOrHistogram(ls, meta.Metadata, ct, t, 0, h, es)
} }
func (b *combinedAppender) appendFloatOrHistogram(ls labels.Labels, meta metadata.Metadata, ct, t int64, v float64, h *histogram.Histogram, es []exemplar.Exemplar) (err error) { func (b *combinedAppender) appendFloatOrHistogram(ls labels.Labels, meta metadata.Metadata, ct, t int64, v float64, h *histogram.Histogram, es []exemplar.Exemplar) (ref storage.SeriesRef, err error) {
// TODO: Use compatibility layer here and adopt this code benefits (checking collisions) or move to some pre-layer.
hash := ls.Hash() hash := ls.Hash()
series, exists := b.refs[hash] series, exists := b.refs[hash]
ref := series.ref ref = series.ref
if exists && !labels.Equal(series.ls, ls) { if exists && !labels.Equal(series.ls, ls) {
// Hash collision. The series reference we stored is pointing to a // Hash collision. The series reference we stored is pointing to a
// different series so we cannot use it, we need to reset the // different series so we cannot use it, we need to reset the
@ -142,9 +130,9 @@ func (b *combinedAppender) appendFloatOrHistogram(ls labels.Labels, meta metadat
if updateRefs && ct != 0 && b.ingestCTZeroSample { if updateRefs && ct != 0 && b.ingestCTZeroSample {
var newRef storage.SeriesRef var newRef storage.SeriesRef
if h != nil { if h != nil {
newRef, err = b.app.AppendHistogramCTZeroSample(ref, ls, t, ct, h, nil) newRef, err = b.Appender.AppendHistogramCTZeroSample(ref, ls, t, ct, h, nil)
} else { } else {
newRef, err = b.app.AppendCTZeroSample(ref, ls, t, ct) newRef, err = b.Appender.AppendCTZeroSample(ref, ls, t, ct)
} }
if err != nil { if err != nil {
if !errors.Is(err, storage.ErrOutOfOrderCT) { if !errors.Is(err, storage.ErrOutOfOrderCT) {
@ -162,9 +150,9 @@ func (b *combinedAppender) appendFloatOrHistogram(ls labels.Labels, meta metadat
{ {
var newRef storage.SeriesRef var newRef storage.SeriesRef
if h != nil { if h != nil {
newRef, err = b.app.AppendHistogram(ref, ls, t, h, nil) newRef, err = b.Appender.AppendHistogram(ref, ls, t, h, nil)
} else { } else {
newRef, err = b.app.Append(ref, ls, t, v) newRef, err = b.Appender.Append(ref, ls, t, v)
} }
if err != nil { if err != nil {
// Although Append does not currently return ErrDuplicateSampleForTimestamp there is // Although Append does not currently return ErrDuplicateSampleForTimestamp there is
@ -188,7 +176,7 @@ func (b *combinedAppender) appendFloatOrHistogram(ls labels.Labels, meta metadat
if !exists || series.meta.Help != meta.Help || series.meta.Type != meta.Type || series.meta.Unit != meta.Unit { if !exists || series.meta.Help != meta.Help || series.meta.Type != meta.Type || series.meta.Unit != meta.Unit {
updateRefs = true updateRefs = true
// If this is the first time we see this series, set the metadata. // If this is the first time we see this series, set the metadata.
_, err := b.app.UpdateMetadata(ref, ls, meta) _, err := b.Appender.UpdateMetadata(ref, ls, meta)
if err != nil { if err != nil {
b.samplesAppendedWithoutMetadata.Add(1) b.samplesAppendedWithoutMetadata.Add(1)
b.logger.Warn("Error while updating metadata from OTLP", "err", err) b.logger.Warn("Error while updating metadata from OTLP", "err", err)
@ -206,7 +194,7 @@ func (b *combinedAppender) appendFloatOrHistogram(ls labels.Labels, meta metadat
b.appendExemplars(ref, ls, es) b.appendExemplars(ref, ls, es)
return return ref, err // TODO(bwplotka): implement partial errors
} }
func sampleType(h *histogram.Histogram) string { func sampleType(h *histogram.Histogram) string {
@ -219,7 +207,7 @@ func sampleType(h *histogram.Histogram) string {
func (b *combinedAppender) appendExemplars(ref storage.SeriesRef, ls labels.Labels, es []exemplar.Exemplar) storage.SeriesRef { func (b *combinedAppender) appendExemplars(ref storage.SeriesRef, ls labels.Labels, es []exemplar.Exemplar) storage.SeriesRef {
var err error var err error
for _, e := range es { for _, e := range es {
if ref, err = b.app.AppendExemplar(ref, ls, e); err != nil { if ref, err = b.Appender.AppendExemplar(ref, ls, e); err != nil {
switch { switch {
case errors.Is(err, storage.ErrOutOfOrderExemplar): case errors.Is(err, storage.ErrOutOfOrderExemplar):
b.outOfOrderExemplars.Add(1) b.outOfOrderExemplars.Add(1)