Add hash collision detection
CI / Go tests (push) Waiting to run Details
CI / More Go tests (push) Waiting to run Details
CI / Go tests with previous Go version (push) Waiting to run Details
CI / UI tests (push) Waiting to run Details
CI / Go tests on Windows (push) Waiting to run Details
CI / Mixins tests (push) Waiting to run Details
CI / Build Prometheus for common architectures (0) (push) Waiting to run Details
CI / Build Prometheus for common architectures (1) (push) Waiting to run Details
CI / Build Prometheus for common architectures (2) (push) Waiting to run Details
CI / Build Prometheus for all architectures (0) (push) Waiting to run Details
CI / Build Prometheus for all architectures (1) (push) Waiting to run Details
CI / Build Prometheus for all architectures (10) (push) Waiting to run Details
CI / Build Prometheus for all architectures (11) (push) Waiting to run Details
CI / Build Prometheus for all architectures (2) (push) Waiting to run Details
CI / Build Prometheus for all architectures (3) (push) Waiting to run Details
CI / Build Prometheus for all architectures (4) (push) Waiting to run Details
CI / Build Prometheus for all architectures (5) (push) Waiting to run Details
CI / Build Prometheus for all architectures (6) (push) Waiting to run Details
CI / Build Prometheus for all architectures (7) (push) Waiting to run Details
CI / Build Prometheus for all architectures (8) (push) Waiting to run Details
CI / Build Prometheus for all architectures (9) (push) Waiting to run Details
CI / Report status of build Prometheus for all architectures (push) Blocked by required conditions Details
CI / Check generated parser (push) Waiting to run Details
CI / golangci-lint (push) Waiting to run Details
CI / fuzzing (push) Waiting to run Details
CI / codeql (push) Waiting to run Details
CI / Publish main branch artifacts (push) Blocked by required conditions Details
CI / Publish release artefacts (push) Blocked by required conditions Details
CI / Publish UI on npm Registry (push) Blocked by required conditions Details

Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
This commit is contained in:
György Krajcsovits 2025-07-30 16:37:46 +02:00
parent b008f23d67
commit 4b89cd56b0
No known key found for this signature in database
GPG Key ID: 47A8F9CE80FD7C7F
1 changed files with 33 additions and 6 deletions

View File

@ -48,7 +48,7 @@ func NewCombinedAppender(app storage.Appender, logger *slog.Logger, reg promethe
app: app, app: app,
logger: logger, logger: logger,
ingestCTZeroSample: ingestCTZeroSample, ingestCTZeroSample: ingestCTZeroSample,
refs: make(map[uint64]storage.SeriesRef), refs: make(map[uint64]labelsRef),
samplesAppendedWithoutMetadata: promauto.With(reg).NewCounter(prometheus.CounterOpts{ samplesAppendedWithoutMetadata: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Namespace: "prometheus", Namespace: "prometheus",
Subsystem: "api", Subsystem: "api",
@ -64,6 +64,11 @@ func NewCombinedAppender(app storage.Appender, logger *slog.Logger, reg promethe
} }
} }
type labelsRef struct {
ref storage.SeriesRef
ls modelLabels.Labels
}
type combinedAppender struct { type combinedAppender struct {
app storage.Appender app storage.Appender
logger *slog.Logger logger *slog.Logger
@ -71,13 +76,20 @@ type combinedAppender struct {
outOfOrderExemplars prometheus.Counter outOfOrderExemplars prometheus.Counter
ingestCTZeroSample bool ingestCTZeroSample bool
// Used to ensure we only update metadata and created timestamps once, and to share storage.SeriesRefs. // Used to ensure we only update metadata and created timestamps once, and to share storage.SeriesRefs.
refs map[uint64]storage.SeriesRef // To detect hash collision it also stores the labels.
// There is no overflow/conflict list, the TSDB will handle that part.
refs map[uint64]labelsRef
} }
func (b *combinedAppender) AppendSample(_ string, rawls labels.Labels, meta metadata.Metadata, t, ct int64, v float64, es []exemplar.Exemplar) (err error) { func (b *combinedAppender) AppendSample(_ string, rawls labels.Labels, meta metadata.Metadata, t, ct int64, v float64, es []exemplar.Exemplar) (err error) {
ls := modelLabels.NewFromSorted(rawls) ls := modelLabels.NewFromSorted(rawls)
hash := ls.Hash() hash := ls.Hash()
ref, exists := b.refs[hash] lref, exists := b.refs[hash]
ref := lref.ref
if exists && !modelLabels.Equal(lref.ls, ls) {
// Hash collision, this is a new series.
exists = false
}
if !exists { if !exists {
ref, err = b.app.UpdateMetadata(0, ls, meta) ref, err = b.app.UpdateMetadata(0, ls, meta)
if err != nil { if err != nil {
@ -105,14 +117,24 @@ func (b *combinedAppender) AppendSample(_ string, rawls labels.Labels, meta meta
} }
} }
ref = b.appendExemplars(ref, ls, es) ref = b.appendExemplars(ref, ls, es)
b.refs[hash] = ref if !exists {
b.refs[hash] = labelsRef{
ref: ref,
ls: ls,
}
}
return return
} }
func (b *combinedAppender) AppendHistogram(_ string, rawls labels.Labels, meta metadata.Metadata, t, ct int64, h *histogram.Histogram, es []exemplar.Exemplar) (err error) { func (b *combinedAppender) AppendHistogram(_ string, rawls labels.Labels, meta metadata.Metadata, t, ct int64, h *histogram.Histogram, es []exemplar.Exemplar) (err error) {
ls := modelLabels.NewFromSorted(rawls) ls := modelLabels.NewFromSorted(rawls)
hash := ls.Hash() hash := ls.Hash()
ref, exists := b.refs[hash] lref, exists := b.refs[hash]
ref := lref.ref
if exists && !modelLabels.Equal(lref.ls, ls) {
// Hash collision, this is a new series.
exists = false
}
if !exists { if !exists {
ref, err = b.app.UpdateMetadata(0, ls, meta) ref, err = b.app.UpdateMetadata(0, ls, meta)
if err != nil { if err != nil {
@ -140,7 +162,12 @@ func (b *combinedAppender) AppendHistogram(_ string, rawls labels.Labels, meta m
} }
} }
ref = b.appendExemplars(ref, ls, es) ref = b.appendExemplars(ref, ls, es)
b.refs[hash] = ref if !exists {
b.refs[hash] = labelsRef{
ref: ref,
ls: ls,
}
}
return return
} }