diff --git a/storage/remote/otlptranslator/prometheusremotewrite/combined_appender.go b/storage/remote/otlptranslator/prometheusremotewrite/combined_appender.go index b14d9d64dc..9193e18398 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/combined_appender.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/combined_appender.go @@ -48,7 +48,7 @@ func NewCombinedAppender(app storage.Appender, logger *slog.Logger, reg promethe app: app, logger: logger, ingestCTZeroSample: ingestCTZeroSample, - refs: make(map[uint64]storage.SeriesRef), + refs: make(map[uint64]labelsRef), samplesAppendedWithoutMetadata: promauto.With(reg).NewCounter(prometheus.CounterOpts{ Namespace: "prometheus", Subsystem: "api", @@ -64,6 +64,11 @@ func NewCombinedAppender(app storage.Appender, logger *slog.Logger, reg promethe } } +type labelsRef struct { + ref storage.SeriesRef + ls modelLabels.Labels +} + type combinedAppender struct { app storage.Appender logger *slog.Logger @@ -71,13 +76,20 @@ type combinedAppender struct { outOfOrderExemplars prometheus.Counter ingestCTZeroSample bool // Used to ensure we only update metadata and created timestamps once, and to share storage.SeriesRefs. - refs map[uint64]storage.SeriesRef + // To detect hash collision it also stores the labels. + // There is no overflow/conflict list, the TSDB will handle that part. + refs map[uint64]labelsRef } func (b *combinedAppender) AppendSample(_ string, rawls labels.Labels, meta metadata.Metadata, t, ct int64, v float64, es []exemplar.Exemplar) (err error) { ls := modelLabels.NewFromSorted(rawls) hash := ls.Hash() - ref, exists := b.refs[hash] + lref, exists := b.refs[hash] + ref := lref.ref + if exists && !modelLabels.Equal(lref.ls, ls) { + // Hash collision, this is a new series. + exists = false + } if !exists { ref, err = b.app.UpdateMetadata(0, ls, meta) if err != nil { @@ -105,14 +117,24 @@ func (b *combinedAppender) AppendSample(_ string, rawls labels.Labels, meta meta } } ref = b.appendExemplars(ref, ls, es) - b.refs[hash] = ref + if !exists { + b.refs[hash] = labelsRef{ + ref: ref, + ls: ls, + } + } return } func (b *combinedAppender) AppendHistogram(_ string, rawls labels.Labels, meta metadata.Metadata, t, ct int64, h *histogram.Histogram, es []exemplar.Exemplar) (err error) { ls := modelLabels.NewFromSorted(rawls) hash := ls.Hash() - ref, exists := b.refs[hash] + lref, exists := b.refs[hash] + ref := lref.ref + if exists && !modelLabels.Equal(lref.ls, ls) { + // Hash collision, this is a new series. + exists = false + } if !exists { ref, err = b.app.UpdateMetadata(0, ls, meta) if err != nil { @@ -140,7 +162,12 @@ func (b *combinedAppender) AppendHistogram(_ string, rawls labels.Labels, meta m } } ref = b.appendExemplars(ref, ls, es) - b.refs[hash] = ref + if !exists { + b.refs[hash] = labelsRef{ + ref: ref, + ls: ls, + } + } return }