From aad50c1dd02c23058fe07fbf57a592cafe2a659f Mon Sep 17 00:00:00 2001 From: bwplotka Date: Thu, 11 Sep 2025 09:23:35 +0100 Subject: [PATCH] feat: Use appenderV2 in OTLP ingestion Depends on https://github.com/prometheus/prometheus/pull/17104 Signed-off-by: bwplotka --- .../combined_appender.go | 68 ++++++++----------- 1 file changed, 28 insertions(+), 40 deletions(-) diff --git a/storage/remote/otlptranslator/prometheusremotewrite/combined_appender.go b/storage/remote/otlptranslator/prometheusremotewrite/combined_appender.go index de2c65962d..31c6114fe2 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/combined_appender.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/combined_appender.go @@ -31,30 +31,6 @@ import ( "github.com/prometheus/prometheus/storage" ) -// Metadata extends metadata.Metadata with the metric family name. -// OTLP calculates the metric family name for all metrics and uses -// it for generating summary, histogram series by adding the magic -// suffixes. The metric family name is passed down to the appender -// in case the storage needs it for metadata updates. -// Known user is Mimir that implements /api/v1/metadata and uses -// Remote-Write 1.0 for this. Might be removed later if no longer -// needed by any downstream project. -type Metadata struct { - metadata.Metadata - MetricFamilyName string -} - -// CombinedAppender is similar to storage.Appender, but combines updates to -// metadata, created timestamps, exemplars and samples into a single call. -type CombinedAppender interface { - // AppendSample appends a sample and related exemplars, metadata, and - // created timestamp to the storage. - AppendSample(ls labels.Labels, meta Metadata, ct, t int64, v float64, es []exemplar.Exemplar) error - // AppendHistogram appends a histogram and related exemplars, metadata, and - // created timestamp to the storage. - AppendHistogram(ls labels.Labels, meta Metadata, ct, t int64, h *histogram.Histogram, es []exemplar.Exemplar) error -} - // CombinedAppenderMetrics is for the metrics observed by the // combinedAppender implementation. type CombinedAppenderMetrics struct { @@ -82,11 +58,11 @@ func NewCombinedAppenderMetrics(reg prometheus.Registerer) CombinedAppenderMetri // NewCombinedAppender creates a combined appender that sets start times and // updates metadata for each series only once, and appends samples and // exemplars for each call. -func NewCombinedAppender(app storage.Appender, logger *slog.Logger, ingestCTZeroSample bool, metrics CombinedAppenderMetrics) CombinedAppender { +// TODO(bwplotka): Rename to OTLP specific appender +func NewCombinedAppender(app storage.Appender, logger *slog.Logger, metrics CombinedAppenderMetrics) storage.AppenderV2 { return &combinedAppender{ - app: app, + Appender: app, logger: logger, - ingestCTZeroSample: ingestCTZeroSample, refs: make(map[uint64]seriesRef), samplesAppendedWithoutMetadata: metrics.samplesAppendedWithoutMetadata, outOfOrderExemplars: metrics.outOfOrderExemplars, @@ -101,7 +77,9 @@ type seriesRef struct { } type combinedAppender struct { - app storage.Appender + storage.Appender + + opts storage.AppendV2Options logger *slog.Logger samplesAppendedWithoutMetadata prometheus.Counter outOfOrderExemplars prometheus.Counter @@ -112,23 +90,33 @@ type combinedAppender struct { refs map[uint64]seriesRef } -func (b *combinedAppender) AppendSample(ls labels.Labels, meta Metadata, ct, t int64, v float64, es []exemplar.Exemplar) (err error) { +func (b *combinedAppender) SetOptions(opts *storage.AppendV2Options) { + b.opts = *opts +} + +func (b *combinedAppender) AppendSample(_ storage.SeriesRef, ls labels.Labels, meta storage.Metadata, ct, t int64, v float64, es []exemplar.Exemplar) (_ storage.SeriesRef, err error) { return b.appendFloatOrHistogram(ls, meta.Metadata, ct, t, v, nil, es) } -func (b *combinedAppender) AppendHistogram(ls labels.Labels, meta Metadata, ct, t int64, h *histogram.Histogram, es []exemplar.Exemplar) (err error) { +func (b *combinedAppender) AppendHistogram(_ storage.SeriesRef, ls labels.Labels, meta storage.Metadata, ct, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram, es []exemplar.Exemplar) (_ storage.SeriesRef, err error) { if h == nil { // Sanity check, we should never get here with a nil histogram. b.logger.Error("Received nil histogram in CombinedAppender.AppendHistogram", "series", ls.String()) - return errors.New("internal error, attempted to append nil histogram") + return 0, errors.New("internal error, attempted to append nil histogram") + } + if fh != nil { + // Sanity check, we should never get here with a otlp appender. + b.logger.Error("Received not nil float histogram in CombinedAppender.AppendHistogram", "series", ls.String()) + return 0, errors.New("internal error, attempted to append float histogram; works only for nil histograms") } return b.appendFloatOrHistogram(ls, meta.Metadata, ct, t, 0, h, es) } -func (b *combinedAppender) appendFloatOrHistogram(ls labels.Labels, meta metadata.Metadata, ct, t int64, v float64, h *histogram.Histogram, es []exemplar.Exemplar) (err error) { +func (b *combinedAppender) appendFloatOrHistogram(ls labels.Labels, meta metadata.Metadata, ct, t int64, v float64, h *histogram.Histogram, es []exemplar.Exemplar) (ref storage.SeriesRef, err error) { + // TODO: Use compatibility layer here and adopt this code benefits (checking collisions) or move to some pre-layer. hash := ls.Hash() series, exists := b.refs[hash] - ref := series.ref + ref = series.ref if exists && !labels.Equal(series.ls, ls) { // Hash collision. The series reference we stored is pointing to a // different series so we cannot use it, we need to reset the @@ -142,9 +130,9 @@ func (b *combinedAppender) appendFloatOrHistogram(ls labels.Labels, meta metadat if updateRefs && ct != 0 && b.ingestCTZeroSample { var newRef storage.SeriesRef if h != nil { - newRef, err = b.app.AppendHistogramCTZeroSample(ref, ls, t, ct, h, nil) + newRef, err = b.Appender.AppendHistogramCTZeroSample(ref, ls, t, ct, h, nil) } else { - newRef, err = b.app.AppendCTZeroSample(ref, ls, t, ct) + newRef, err = b.Appender.AppendCTZeroSample(ref, ls, t, ct) } if err != nil { if !errors.Is(err, storage.ErrOutOfOrderCT) { @@ -162,9 +150,9 @@ func (b *combinedAppender) appendFloatOrHistogram(ls labels.Labels, meta metadat { var newRef storage.SeriesRef if h != nil { - newRef, err = b.app.AppendHistogram(ref, ls, t, h, nil) + newRef, err = b.Appender.AppendHistogram(ref, ls, t, h, nil) } else { - newRef, err = b.app.Append(ref, ls, t, v) + newRef, err = b.Appender.Append(ref, ls, t, v) } if err != nil { // Although Append does not currently return ErrDuplicateSampleForTimestamp there is @@ -188,7 +176,7 @@ func (b *combinedAppender) appendFloatOrHistogram(ls labels.Labels, meta metadat if !exists || series.meta.Help != meta.Help || series.meta.Type != meta.Type || series.meta.Unit != meta.Unit { updateRefs = true // If this is the first time we see this series, set the metadata. - _, err := b.app.UpdateMetadata(ref, ls, meta) + _, err := b.Appender.UpdateMetadata(ref, ls, meta) if err != nil { b.samplesAppendedWithoutMetadata.Add(1) b.logger.Warn("Error while updating metadata from OTLP", "err", err) @@ -206,7 +194,7 @@ func (b *combinedAppender) appendFloatOrHistogram(ls labels.Labels, meta metadat b.appendExemplars(ref, ls, es) - return + return ref, err // TODO(bwplotka): implement partial errors } func sampleType(h *histogram.Histogram) string { @@ -219,7 +207,7 @@ func sampleType(h *histogram.Histogram) string { func (b *combinedAppender) appendExemplars(ref storage.SeriesRef, ls labels.Labels, es []exemplar.Exemplar) storage.SeriesRef { var err error for _, e := range es { - if ref, err = b.app.AppendExemplar(ref, ls, e); err != nil { + if ref, err = b.Appender.AppendExemplar(ref, ls, e); err != nil { switch { case errors.Is(err, storage.ErrOutOfOrderExemplar): b.outOfOrderExemplars.Add(1)