diff --git a/tsdb/db_test.go b/tsdb/db_test.go index b118d3deb3..eb48435e84 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -4838,11 +4838,6 @@ func TestMetadataAssertInMemoryData(t *testing.T) { require.Equal(t, *reopenDB.head.series.getByHash(s4.Hash(), s4).meta, m4) } -// TestMultipleEncodingsCommitOrder mainly serves to demonstrate when happens when committing a batch of samples for the -// same series when there are multiple encodings. Commit() will process all float samples before histogram samples. This -// means that if histograms are appended before floats, the histograms could be marked as OOO when they are committed. -// While possible, this shouldn't happen very often - you need the same series to be ingested as both a float and a -// histogram in a single write request. func TestMultipleEncodingsCommitOrder(t *testing.T) { opts := DefaultOptions() opts.OutOfOrderCapMax = 30 @@ -4916,26 +4911,18 @@ func TestMultipleEncodingsCommitOrder(t *testing.T) { s := addSample(app, int64(i), chunkenc.ValFloat) expSamples = append(expSamples, s) } - // These samples will be marked as OOO as their timestamps are less than the max timestamp for float samples in the - // same batch. for i := 110; i < 120; i++ { s := addSample(app, int64(i), chunkenc.ValHistogram) expSamples = append(expSamples, s) } - // These samples will be marked as OOO as their timestamps are less than the max timestamp for float samples in the - // same batch. for i := 120; i < 130; i++ { s := addSample(app, int64(i), chunkenc.ValFloatHistogram) expSamples = append(expSamples, s) } - // These samples will be marked as in-order as their timestamps are greater than the max timestamp for float - // samples in the same batch. for i := 140; i < 150; i++ { s := addSample(app, int64(i), chunkenc.ValFloatHistogram) expSamples = append(expSamples, s) } - // These samples will be marked as in-order, even though they're appended after the float histograms from ts 140-150 - // because float samples are processed first and these samples are in-order wrt to the float samples in the batch. for i := 130; i < 135; i++ { s := addSample(app, int64(i), chunkenc.ValFloat) expSamples = append(expSamples, s) @@ -4947,8 +4934,7 @@ func TestMultipleEncodingsCommitOrder(t *testing.T) { return expSamples[i].T() < expSamples[j].T() }) - // oooCount = 20 because the histograms from 120 - 130 and float histograms from 120 - 130 are detected as OOO. - verifySamples(100, 150, expSamples, 20) + verifySamples(100, 150, expSamples, 5) // Append and commit some in-order histograms by themselves. app = db.Appender(context.Background()) @@ -4958,8 +4944,8 @@ func TestMultipleEncodingsCommitOrder(t *testing.T) { } require.NoError(t, app.Commit()) - // oooCount remains at 20 as no new OOO samples have been added. - verifySamples(100, 160, expSamples, 20) + // oooCount remains at 5 as no new OOO samples have been added. + verifySamples(100, 160, expSamples, 5) // Append and commit samples for all encoding types. This time all samples will be treated as OOO because samples // with newer timestamps have already been committed. @@ -4987,8 +4973,8 @@ func TestMultipleEncodingsCommitOrder(t *testing.T) { return expSamples[i].T() < expSamples[j].T() }) - // oooCount = 50 as we've added 30 more OOO samples. - verifySamples(50, 160, expSamples, 50) + // oooCount = 35 as we've added 30 more OOO samples. + verifySamples(50, 160, expSamples, 35) } // TODO(codesome): test more samples incoming once compaction has started. To verify new samples after the start diff --git a/tsdb/head.go b/tsdb/head.go index 7763d272b7..c612ee8e93 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -78,20 +78,18 @@ type Head struct { // This should be typecasted to chunks.ChunkDiskMapperRef after loading. minOOOMmapRef atomic.Uint64 - metrics *headMetrics - opts *HeadOptions - wal, wbl *wlog.WL - exemplarMetrics *ExemplarMetrics - exemplars ExemplarStorage - logger *slog.Logger - appendPool zeropool.Pool[[]record.RefSample] - exemplarsPool zeropool.Pool[[]exemplarWithSeriesRef] - histogramsPool zeropool.Pool[[]record.RefHistogramSample] - floatHistogramsPool zeropool.Pool[[]record.RefFloatHistogramSample] - metadataPool zeropool.Pool[[]record.RefMetadata] - seriesPool zeropool.Pool[[]*memSeries] - bytesPool zeropool.Pool[[]byte] - memChunkPool sync.Pool + metrics *headMetrics + opts *HeadOptions + wal, wbl *wlog.WL + exemplarMetrics *ExemplarMetrics + exemplars ExemplarStorage + logger *slog.Logger + appendPool zeropool.Pool[[]refUnionSample] + exemplarsPool zeropool.Pool[[]exemplarWithSeriesRef] + metadataPool zeropool.Pool[[]record.RefMetadata] + seriesPool zeropool.Pool[[]*memSeries] + bytesPool zeropool.Pool[[]byte] + memChunkPool sync.Pool // These pools are only used during WAL/WBL replay and are reset at the end. // NOTE: Adjust resetWLReplayResources() upon changes to the pools. diff --git a/tsdb/head_append.go b/tsdb/head_append.go index 03800b2455..2e85f113c4 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -19,6 +19,7 @@ import ( "fmt" "log/slog" "math" + "unsafe" "github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/histogram" @@ -181,8 +182,6 @@ func (h *Head) appender() *headAppender { samples: h.getAppendBuffer(), sampleSeries: h.getSeriesBuffer(), exemplars: exemplarsBuf, - histograms: h.getHistogramBuffer(), - floatHistograms: h.getFloatHistogramBuffer(), metadata: h.getMetadataBuffer(), appendID: appendID, cleanupAppendIDsBelow: cleanupAppendIDsBelow, @@ -213,15 +212,15 @@ func (h *Head) AppendableMinValidTime() (int64, bool) { return h.appendableMinValidTime(), true } -func (h *Head) getAppendBuffer() []record.RefSample { +func (h *Head) getAppendBuffer() []refUnionSample { b := h.appendPool.Get() if b == nil { - return make([]record.RefSample, 0, 512) + return make([]refUnionSample, 0, 512) } return b } -func (h *Head) putAppendBuffer(b []record.RefSample) { +func (h *Head) putAppendBuffer(b []refUnionSample) { h.appendPool.Put(b[:0]) } @@ -244,30 +243,6 @@ func (h *Head) putExemplarBuffer(b []exemplarWithSeriesRef) { h.exemplarsPool.Put(b[:0]) } -func (h *Head) getHistogramBuffer() []record.RefHistogramSample { - b := h.histogramsPool.Get() - if b == nil { - return make([]record.RefHistogramSample, 0, 512) - } - return b -} - -func (h *Head) putHistogramBuffer(b []record.RefHistogramSample) { - h.histogramsPool.Put(b[:0]) -} - -func (h *Head) getFloatHistogramBuffer() []record.RefFloatHistogramSample { - b := h.floatHistogramsPool.Get() - if b == nil { - return make([]record.RefFloatHistogramSample, 0, 512) - } - return b -} - -func (h *Head) putFloatHistogramBuffer(b []record.RefFloatHistogramSample) { - h.floatHistogramsPool.Put(b[:0]) -} - func (h *Head) getMetadataBuffer() []record.RefMetadata { b := h.metadataPool.Get() if b == nil { @@ -312,6 +287,60 @@ type exemplarWithSeriesRef struct { exemplar exemplar.Exemplar } +// refUnionSample is a union type that can hold either a float sample, a histogram sample, or a float histogram sample +// for the WAL. Should move into the record package when it supports union types in a new value record. +type refUnionSample struct { + Ref chunks.HeadSeriesRef + T int64 + V float64 // For float samples. + ptr unsafe.Pointer // For complex type. +} + +const ( + unionSampleHistogram = 2.0 + unionSampleFloatHistogram = 3.0 +) + +func (u *refUnionSample) Type() chunkenc.ValueType { + if u.ptr == nil { + return chunkenc.ValFloat // This is a float sample. + } + switch u.V { + case unionSampleHistogram: + return chunkenc.ValHistogram // This is a histogram sample. + case unionSampleFloatHistogram: + return chunkenc.ValFloatHistogram // This is a float histogram sample. + default: + return chunkenc.ValNone + } +} + +func newRefUnionHistogramSample(ref chunks.HeadSeriesRef, t int64, h *histogram.Histogram) refUnionSample { + return refUnionSample{ + Ref: ref, + T: t, + V: unionSampleHistogram, + ptr: unsafe.Pointer(h), + } +} + +func newRefUnionFloatHistogramSample(ref chunks.HeadSeriesRef, t int64, fh *histogram.FloatHistogram) refUnionSample { + return refUnionSample{ + Ref: ref, + T: t, + V: unionSampleFloatHistogram, + ptr: unsafe.Pointer(fh), + } +} + +func (u *refUnionSample) Histogram() *histogram.Histogram { + return (*histogram.Histogram)(u.ptr) +} + +func (u *refUnionSample) FloatHistogram() *histogram.FloatHistogram { + return (*histogram.FloatHistogram)(u.ptr) +} + type headAppender struct { head *Head minValidTime int64 // No samples below this timestamp are allowed. @@ -319,17 +348,16 @@ type headAppender struct { headMaxt int64 // We track it here to not take the lock for every sample appended. oooTimeWindow int64 // Use the same for the entire append, and don't load the atomic for each sample. - seriesRefs []record.RefSeries // New series records held by this appender. - series []*memSeries // New series held by this appender (using corresponding slices indexes from seriesRefs) - samples []record.RefSample // New float samples held by this appender. - sampleSeries []*memSeries // Float series corresponding to the samples held by this appender (using corresponding slice indices - same series may appear more than once). - histograms []record.RefHistogramSample // New histogram samples held by this appender. - histogramSeries []*memSeries // HistogramSamples series corresponding to the samples held by this appender (using corresponding slice indices - same series may appear more than once). - floatHistograms []record.RefFloatHistogramSample // New float histogram samples held by this appender. - floatHistogramSeries []*memSeries // FloatHistogramSamples series corresponding to the samples held by this appender (using corresponding slice indices - same series may appear more than once). - metadata []record.RefMetadata // New metadata held by this appender. - metadataSeries []*memSeries // Series corresponding to the metadata held by this appender. - exemplars []exemplarWithSeriesRef // New exemplars held by this appender. + seriesRefs []record.RefSeries // New series records held by this appender. + series []*memSeries // New series held by this appender (using corresponding slices indexes from seriesRefs) + samples []refUnionSample // New float samples held by this appender. + sampleSeries []*memSeries // Float series corresponding to the samples held by this appender (using corresponding slice indices - same series may appear more than once). + floatCount int + histogramCount int + floatHistogramCount int + metadata []record.RefMetadata // New metadata held by this appender. + metadataSeries []*memSeries // Series corresponding to the metadata held by this appender. + exemplars []exemplarWithSeriesRef // New exemplars held by this appender. appendID, cleanupAppendIDsBelow uint64 closed bool @@ -348,32 +376,14 @@ func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64 return 0, storage.ErrOutOfBounds } - s := a.head.series.getByID(chunks.HeadSeriesRef(ref)) - if s == nil { - var err error - s, _, err = a.getOrCreate(lset) - if err != nil { - return 0, err - } + s, err := a.getOrCreateByRef(ref, lset) + if err != nil { + return 0, err } s.Lock() - if value.IsStaleNaN(v) { - // TODO(krajorama): reorganize Commit() to handle samples in append order - // not floats first and then histograms. Then we could do this conversion - // in commit. This code should move into Commit(). - switch { - case s.lastHistogramValue != nil: - s.Unlock() - return a.AppendHistogram(ref, lset, t, &histogram.Histogram{Sum: v}, nil) - case s.lastFloatHistogramValue != nil: - s.Unlock() - return a.AppendHistogram(ref, lset, t, nil, &histogram.FloatHistogram{Sum: v}) - } - } - defer s.Unlock() - // TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise + // TODO(codesome): If we definitely know at this point that the sample is ooo, then optimize // to skip that sample from the WAL and write only in the WBL. isOOO, delta, err := s.appendable(t, v, a.headMaxt, a.minValidTime, a.oooTimeWindow) if err == nil { @@ -403,11 +413,12 @@ func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64 a.maxt = t } - a.samples = append(a.samples, record.RefSample{ + a.samples = append(a.samples, refUnionSample{ Ref: s.ref, T: t, V: v, }) + a.floatCount++ a.sampleSeries = append(a.sampleSeries, s) return storage.SeriesRef(s.ref), nil } @@ -420,13 +431,9 @@ func (a *headAppender) AppendCTZeroSample(ref storage.SeriesRef, lset labels.Lab return 0, storage.ErrCTNewerThanSample } - s := a.head.series.getByID(chunks.HeadSeriesRef(ref)) - if s == nil { - var err error - s, _, err = a.getOrCreate(lset) - if err != nil { - return 0, err - } + s, err := a.getOrCreateByRef(ref, lset) + if err != nil { + return 0, err } // Check if CT wouldn't be OOO vs samples we already might have for this series. @@ -448,23 +455,33 @@ func (a *headAppender) AppendCTZeroSample(ref storage.SeriesRef, lset labels.Lab if ct > a.maxt { a.maxt = ct } - a.samples = append(a.samples, record.RefSample{Ref: s.ref, T: ct, V: 0.0}) + a.samples = append(a.samples, refUnionSample{Ref: s.ref, T: ct, V: 0.0}) + a.floatCount++ a.sampleSeries = append(a.sampleSeries, s) return storage.SeriesRef(s.ref), nil } -func (a *headAppender) getOrCreate(lset labels.Labels) (s *memSeries, created bool, err error) { +func (a *headAppender) getOrCreateByRef(ref storage.SeriesRef, lset labels.Labels) (*memSeries, error) { + s := a.head.series.getByID(chunks.HeadSeriesRef(ref)) + if s != nil { + return s, nil + } + return a.getOrCreate(lset) +} + +func (a *headAppender) getOrCreate(lset labels.Labels) (s *memSeries, err error) { // Ensure no empty labels have gotten through. lset = lset.WithoutEmpty() if lset.IsEmpty() { - return nil, false, fmt.Errorf("empty labelset: %w", ErrInvalidSample) + return nil, fmt.Errorf("empty labelset: %w", ErrInvalidSample) } if l, dup := lset.HasDuplicateLabelNames(); dup { - return nil, false, fmt.Errorf(`label name "%s" is not unique: %w`, l, ErrInvalidSample) + return nil, fmt.Errorf(`label name "%s" is not unique: %w`, l, ErrInvalidSample) } + var created bool s, created, err = a.head.getOrCreate(lset.Hash(), lset, true) if err != nil { - return nil, false, err + return nil, err } if created { a.seriesRefs = append(a.seriesRefs, record.RefSeries{ @@ -473,7 +490,7 @@ func (a *headAppender) getOrCreate(lset labels.Labels) (s *memSeries, created bo }) a.series = append(a.series, s) } - return s, created, nil + return s, nil } // appendable checks whether the given sample is valid for appending to the series. @@ -496,11 +513,21 @@ func (s *memSeries) appendable(t int64, v float64, headMaxt, minValidTime, oooTi // like federation and erroring out at that time would be extremely noisy. // This only checks against the latest in-order sample. // The OOO headchunk has its own method to detect these duplicates. - if s.lastHistogramValue != nil || s.lastFloatHistogramValue != nil { - return false, 0, storage.NewDuplicateHistogramToFloatErr(t, v) - } - if math.Float64bits(s.lastValue) != math.Float64bits(v) { - return false, 0, storage.NewDuplicateFloatErr(t, s.lastValue, v) + // Exact duplicates include the case when the float is a stale + // marker and we have the equivalent histogram or float histogram stale marker. + switch { + case s.lastHistogramValue != nil: + if !value.IsStaleNaN(v) || !value.IsStaleNaN(s.lastHistogramValue.Sum) { + return false, 0, storage.NewDuplicateHistogramToFloatErr(t, v) + } + case s.lastFloatHistogramValue != nil: + if !value.IsStaleNaN(v) || !value.IsStaleNaN(s.lastFloatHistogramValue.Sum) { + return false, 0, storage.NewDuplicateHistogramToFloatErr(t, v) + } + default: + if math.Float64bits(s.lastValue) != math.Float64bits(v) { + return false, 0, storage.NewDuplicateFloatErr(t, s.lastValue, v) + } } // Sample is identical (ts + value) with most current (highest ts) sample in sampleBuf. return false, 0, nil @@ -541,8 +568,21 @@ func (s *memSeries) appendableHistogram(t int64, h *histogram.Histogram, headMax // like federation and erroring out at that time would be extremely noisy. // This only checks against the latest in-order sample. // The OOO headchunk has its own method to detect these duplicates. - if !h.Equals(s.lastHistogramValue) { - return false, 0, storage.ErrDuplicateSampleForTimestamp + // Exact duplicates include the case when the float is a stale + // marker and we have the equivalent histogram or float histogram stale marker. + switch { + case s.lastHistogramValue != nil: + if !h.Equals(s.lastHistogramValue) { + return false, 0, storage.ErrDuplicateSampleForTimestamp + } + case s.lastFloatHistogramValue != nil: + if !value.IsStaleNaN(h.Sum) || !value.IsStaleNaN(s.lastFloatHistogramValue.Sum) { + return false, 0, storage.ErrDuplicateSampleForTimestamp + } + default: + if !value.IsStaleNaN(h.Sum) || !value.IsStaleNaN(s.lastValue) { + return false, 0, storage.NewDuplicateHistogramToFloatErr(t, s.lastValue) + } } // Sample is identical (ts + value) with most current (highest ts) sample in sampleBuf. return false, 0, nil @@ -583,8 +623,22 @@ func (s *memSeries) appendableFloatHistogram(t int64, fh *histogram.FloatHistogr // like federation and erroring out at that time would be extremely noisy. // This only checks against the latest in-order sample. // The OOO headchunk has its own method to detect these duplicates. - if !fh.Equals(s.lastFloatHistogramValue) { - return false, 0, storage.ErrDuplicateSampleForTimestamp + // Exact duplicates include the case when the float is a stale + // marker and we have the equivalent histogram or float histogram stale marker. + switch { + case s.lastFloatHistogramValue != nil: + if !fh.Equals(s.lastFloatHistogramValue) { + return false, 0, storage.ErrDuplicateSampleForTimestamp + } + + case s.lastHistogramValue != nil: + if !value.IsStaleNaN(fh.Sum) || !value.IsStaleNaN(s.lastHistogramValue.Sum) { + return false, 0, storage.ErrDuplicateSampleForTimestamp + } + default: + if !value.IsStaleNaN(fh.Sum) || !value.IsStaleNaN(s.lastValue) { + return false, 0, storage.NewDuplicateHistogramToFloatErr(t, s.lastValue) + } } // Sample is identical (ts + value) with most current (highest ts) sample in sampleBuf. return false, 0, nil @@ -667,27 +721,14 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels } } - var created bool - s := a.head.series.getByID(chunks.HeadSeriesRef(ref)) - if s == nil { - var err error - s, created, err = a.getOrCreate(lset) - if err != nil { - return 0, err - } + s, err := a.getOrCreateByRef(ref, lset) + if err != nil { + return 0, err } switch { case h != nil: s.Lock() - - // TODO(krajorama): reorganize Commit() to handle samples in append order - // not floats first and then histograms. Then we would not need to do this. - // This whole "if" should be removed. - if created && s.lastHistogramValue == nil && s.lastFloatHistogramValue == nil { - s.lastHistogramValue = &histogram.Histogram{} - } - // TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise // to skip that sample from the WAL and write only in the WBL. _, delta, err := s.appendableHistogram(t, h, a.headMaxt, a.minValidTime, a.oooTimeWindow) @@ -707,22 +748,11 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels } return 0, err } - a.histograms = append(a.histograms, record.RefHistogramSample{ - Ref: s.ref, - T: t, - H: h, - }) - a.histogramSeries = append(a.histogramSeries, s) + a.samples = append(a.samples, newRefUnionHistogramSample(s.ref, t, h)) + a.histogramCount++ + a.sampleSeries = append(a.sampleSeries, s) case fh != nil: s.Lock() - - // TODO(krajorama): reorganize Commit() to handle samples in append order - // not floats first and then histograms. Then we would not need to do this. - // This whole "if" should be removed. - if created && s.lastHistogramValue == nil && s.lastFloatHistogramValue == nil { - s.lastFloatHistogramValue = &histogram.FloatHistogram{} - } - // TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise // to skip that sample from the WAL and write only in the WBL. _, delta, err := s.appendableFloatHistogram(t, fh, a.headMaxt, a.minValidTime, a.oooTimeWindow) @@ -742,12 +772,9 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels } return 0, err } - a.floatHistograms = append(a.floatHistograms, record.RefFloatHistogramSample{ - Ref: s.ref, - T: t, - FH: fh, - }) - a.floatHistogramSeries = append(a.floatHistogramSeries, s) + a.samples = append(a.samples, newRefUnionFloatHistogramSample(s.ref, t, fh)) + a.floatHistogramCount++ + a.sampleSeries = append(a.sampleSeries, s) } if t < a.mint { @@ -769,28 +796,15 @@ func (a *headAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, lset l return 0, storage.ErrCTNewerThanSample } - var created bool - s := a.head.series.getByID(chunks.HeadSeriesRef(ref)) - if s == nil { - var err error - s, created, err = a.getOrCreate(lset) - if err != nil { - return 0, err - } + s, err := a.getOrCreateByRef(ref, lset) + if err != nil { + return 0, err } switch { case h != nil: zeroHistogram := &histogram.Histogram{} s.Lock() - - // TODO(krajorama): reorganize Commit() to handle samples in append order - // not floats first and then histograms. Then we would not need to do this. - // This whole "if" should be removed. - if created && s.lastHistogramValue == nil && s.lastFloatHistogramValue == nil { - s.lastHistogramValue = zeroHistogram - } - // For CTZeroSamples OOO is not allowed. // We set it to true to make this implementation as close as possible to the float implementation. isOOO, _, err := s.appendableHistogram(ct, zeroHistogram, a.headMaxt, a.minValidTime, a.oooTimeWindow) @@ -808,23 +822,12 @@ func (a *headAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, lset l } s.pendingCommit = true s.Unlock() - a.histograms = append(a.histograms, record.RefHistogramSample{ - Ref: s.ref, - T: ct, - H: zeroHistogram, - }) - a.histogramSeries = append(a.histogramSeries, s) + a.samples = append(a.samples, newRefUnionHistogramSample(s.ref, ct, zeroHistogram)) + a.histogramCount++ + a.sampleSeries = append(a.sampleSeries, s) case fh != nil: zeroFloatHistogram := &histogram.FloatHistogram{} s.Lock() - - // TODO(krajorama): reorganize Commit() to handle samples in append order - // not floats first and then histograms. Then we would not need to do this. - // This whole "if" should be removed. - if created && s.lastHistogramValue == nil && s.lastFloatHistogramValue == nil { - s.lastFloatHistogramValue = zeroFloatHistogram - } - // We set it to true to make this implementation as close as possible to the float implementation. isOOO, _, err := s.appendableFloatHistogram(ct, zeroFloatHistogram, a.headMaxt, a.minValidTime, a.oooTimeWindow) // OOO is not allowed for CTZeroSamples. if err != nil { @@ -841,12 +844,9 @@ func (a *headAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, lset l } s.pendingCommit = true s.Unlock() - a.floatHistograms = append(a.floatHistograms, record.RefFloatHistogramSample{ - Ref: s.ref, - T: ct, - FH: zeroFloatHistogram, - }) - a.floatHistogramSeries = append(a.floatHistogramSeries, s) + a.samples = append(a.samples, newRefUnionFloatHistogramSample(s.ref, ct, zeroFloatHistogram)) + a.floatHistogramCount++ + a.sampleSeries = append(a.sampleSeries, s) } if ct > a.maxt { @@ -925,17 +925,39 @@ func (a *headAppender) log() error { return fmt.Errorf("log metadata: %w", err) } } - if len(a.samples) > 0 { - rec = enc.Samples(a.samples, buf) + if a.floatCount > 0 { + // TODO(krajorama): rewrite the WAL to handle mixed types. + samples := make([]record.RefSample, 0, a.floatCount) + for i := range a.samples { + if a.samples[i].Type() == chunkenc.ValFloat { + samples = append(samples, record.RefSample{ + Ref: a.samples[i].Ref, + T: a.samples[i].T, + V: a.samples[i].V, + }) + } + } + rec = enc.Samples(samples, buf) buf = rec[:0] if err := a.head.wal.Log(rec); err != nil { return fmt.Errorf("log samples: %w", err) } } - if len(a.histograms) > 0 { + if a.histogramCount > 0 { + // TODO(krajorama): rewrite the WAL to handle mixed types. + histograms := make([]record.RefHistogramSample, 0, a.histogramCount) + for i := range a.samples { + if a.samples[i].Type() == chunkenc.ValHistogram { + histograms = append(histograms, record.RefHistogramSample{ + Ref: a.samples[i].Ref, + T: a.samples[i].T, + H: a.samples[i].Histogram(), + }) + } + } var customBucketsHistograms []record.RefHistogramSample - rec, customBucketsHistograms = enc.HistogramSamples(a.histograms, buf) + rec, customBucketsHistograms = enc.HistogramSamples(histograms, buf) buf = rec[:0] if len(rec) > 0 { if err := a.head.wal.Log(rec); err != nil { @@ -950,9 +972,21 @@ func (a *headAppender) log() error { } } } - if len(a.floatHistograms) > 0 { + if a.floatHistogramCount > 0 { + // TODO(krajorama): rewrite the WAL to handle mixed types. + floatHistograms := make([]record.RefFloatHistogramSample, 0, a.floatHistogramCount) + for i := range a.samples { + if a.samples[i].Type() == chunkenc.ValFloatHistogram { + floatHistograms = append(floatHistograms, record.RefFloatHistogramSample{ + Ref: a.samples[i].Ref, + T: a.samples[i].T, + FH: a.samples[i].FloatHistogram(), + }) + } + } + var customBucketsFloatHistograms []record.RefFloatHistogramSample - rec, customBucketsFloatHistograms = enc.FloatHistogramSamples(a.floatHistograms, buf) + rec, customBucketsFloatHistograms = enc.FloatHistogramSamples(floatHistograms, buf) buf = rec[:0] if len(rec) > 0 { if err := a.head.wal.Log(rec); err != nil { @@ -1008,21 +1042,22 @@ type appenderCommitContext struct { floatTooOldRejected int histoTooOldRejected int // Number of samples rejected due to: out of bounds: with t < minValidTime (OOO support disabled). - floatOOBRejected int - histoOOBRejected int - inOrderMint int64 - inOrderMaxt int64 - oooMinT int64 - oooMaxT int64 - wblSamples []record.RefSample - wblHistograms []record.RefHistogramSample - wblFloatHistograms []record.RefFloatHistogramSample - oooMmapMarkers map[chunks.HeadSeriesRef][]chunks.ChunkDiskMapperRef - oooMmapMarkersCount int - oooRecords [][]byte - oooCapMax int64 - appendChunkOpts chunkOpts - enc record.Encoder + floatOOBRejected int + histoOOBRejected int + inOrderMint int64 + inOrderMaxt int64 + oooMinT int64 + oooMaxT int64 + wblSamples []refUnionSample + wblFloatCount int + wblHistogramCount int + wblFloatHistogramCount int + oooMmapMarkers map[chunks.HeadSeriesRef][]chunks.ChunkDiskMapperRef + oooMmapMarkersCount int + oooRecords [][]byte + oooCapMax int64 + appendChunkOpts chunkOpts + enc record.Encoder } // commitExemplars adds all exemplars from headAppender to the head's exemplar storage. @@ -1050,8 +1085,6 @@ func (acc *appenderCommitContext) collectOOORecords(a *headAppender) { if a.head.wbl == nil { // WBL is not enabled. So no need to collect. acc.wblSamples = nil - acc.wblHistograms = nil - acc.wblFloatHistograms = nil acc.oooMmapMarkers = nil acc.oooMmapMarkersCount = 0 return @@ -1076,12 +1109,35 @@ func (acc *appenderCommitContext) collectOOORecords(a *headAppender) { acc.oooRecords = append(acc.oooRecords, r) } - if len(acc.wblSamples) > 0 { - r := acc.enc.Samples(acc.wblSamples, a.head.getBytesBuffer()) + if acc.wblFloatCount > 0 { + // TODO(krajorama): rewrite the WAL to handle mixed types. + samples := make([]record.RefSample, 0, acc.wblFloatCount) + for i := range acc.wblSamples { + if acc.wblSamples[i].Type() == chunkenc.ValFloat { + samples = append(samples, record.RefSample{ + Ref: acc.wblSamples[i].Ref, + T: acc.wblSamples[i].T, + V: acc.wblSamples[i].V, + }) + } + } + + r := acc.enc.Samples(samples, a.head.getBytesBuffer()) acc.oooRecords = append(acc.oooRecords, r) } - if len(acc.wblHistograms) > 0 { - r, customBucketsHistograms := acc.enc.HistogramSamples(acc.wblHistograms, a.head.getBytesBuffer()) + if acc.wblHistogramCount > 0 { + // TODO(krajorama): rewrite the WAL to handle mixed types. + histograms := make([]record.RefHistogramSample, 0, acc.wblHistogramCount) + for i := range acc.wblSamples { + if acc.wblSamples[i].Type() == chunkenc.ValHistogram { + histograms = append(histograms, record.RefHistogramSample{ + Ref: acc.wblSamples[i].Ref, + T: acc.wblSamples[i].T, + H: acc.wblSamples[i].Histogram(), + }) + } + } + r, customBucketsHistograms := acc.enc.HistogramSamples(histograms, a.head.getBytesBuffer()) if len(r) > 0 { acc.oooRecords = append(acc.oooRecords, r) } @@ -1090,8 +1146,19 @@ func (acc *appenderCommitContext) collectOOORecords(a *headAppender) { acc.oooRecords = append(acc.oooRecords, r) } } - if len(acc.wblFloatHistograms) > 0 { - r, customBucketsFloatHistograms := acc.enc.FloatHistogramSamples(acc.wblFloatHistograms, a.head.getBytesBuffer()) + if acc.wblFloatHistogramCount > 0 { + // TODO(krajorama): rewrite the WAL to handle mixed types. + floatHistograms := make([]record.RefFloatHistogramSample, 0, acc.wblFloatHistogramCount) + for i := range acc.wblSamples { + if acc.wblSamples[i].Type() == chunkenc.ValFloatHistogram { + floatHistograms = append(floatHistograms, record.RefFloatHistogramSample{ + Ref: acc.wblSamples[i].Ref, + T: acc.wblSamples[i].T, + FH: acc.wblSamples[i].FloatHistogram(), + }) + } + } + r, customBucketsFloatHistograms := acc.enc.FloatHistogramSamples(floatHistograms, a.head.getBytesBuffer()) if len(r) > 0 { acc.oooRecords = append(acc.oooRecords, r) } @@ -1102,8 +1169,6 @@ func (acc *appenderCommitContext) collectOOORecords(a *headAppender) { } acc.wblSamples = nil - acc.wblHistograms = nil - acc.wblFloatHistograms = nil acc.oooMmapMarkers = nil } @@ -1159,9 +1224,65 @@ func (a *headAppender) commitSamples(acc *appenderCommitContext) { series = a.sampleSeries[i] series.Lock() - oooSample, _, err := series.appendable(s.T, s.V, a.headMaxt, a.minValidTime, a.oooTimeWindow) - if err != nil { - handleAppendableError(err, &acc.floatsAppended, &acc.floatOOORejected, &acc.floatOOBRejected, &acc.floatTooOldRejected) + var ( + oooSample bool + err error + v float64 + h *histogram.Histogram + fh *histogram.FloatHistogram + sType = s.Type() + ) + + // Load the value based on the type and handle stale NaN cases. + switch sType { + case chunkenc.ValFloat: + v = s.V + if value.IsStaleNaN(s.V) { + switch { + case series.lastHistogramValue != nil: + sType = chunkenc.ValHistogram + v = 0 + h = &histogram.Histogram{ + Sum: math.Float64frombits(value.StaleNaN), + } + case series.lastFloatHistogramValue != nil: + sType = chunkenc.ValFloatHistogram + v = 0 + fh = &histogram.FloatHistogram{ + Sum: math.Float64frombits(value.StaleNaN), + } + } + } + case chunkenc.ValHistogram: + h = s.Histogram() + case chunkenc.ValFloatHistogram: + fh = s.FloatHistogram() + default: + // This should never happen, as we only append samples of known types. + series.Unlock() + continue + } + + switch sType { + case chunkenc.ValFloat: + oooSample, _, err = series.appendable(s.T, v, a.headMaxt, a.minValidTime, a.oooTimeWindow) + if err != nil { + handleAppendableError(err, &acc.floatsAppended, &acc.floatOOORejected, &acc.floatOOBRejected, &acc.floatTooOldRejected) + } + case chunkenc.ValHistogram: + oooSample, _, err = series.appendableHistogram(s.T, h, a.headMaxt, a.minValidTime, a.oooTimeWindow) + if err != nil { + handleAppendableError(err, &acc.histogramsAppended, &acc.histoOOORejected, &acc.histoOOBRejected, &acc.histoTooOldRejected) + } + case chunkenc.ValFloatHistogram: + oooSample, _, err = series.appendableFloatHistogram(s.T, fh, a.headMaxt, a.minValidTime, a.oooTimeWindow) + if err != nil { + handleAppendableError(err, &acc.histogramsAppended, &acc.histoOOORejected, &acc.histoOOBRejected, &acc.histoTooOldRejected) + } + default: + // This should never happen, as we only append samples of known types. + series.Unlock() + continue } switch { @@ -1171,7 +1292,7 @@ func (a *headAppender) commitSamples(acc *appenderCommitContext) { // Sample is OOO and OOO handling is enabled // and the delta is within the OOO tolerance. var mmapRefs []chunks.ChunkDiskMapperRef - ok, chunkCreated, mmapRefs = series.insert(s.T, s.V, nil, nil, a.head.chunkDiskMapper, acc.oooCapMax, a.head.logger) + ok, chunkCreated, mmapRefs = series.insert(s.T, v, h, fh, a.head.chunkDiskMapper, acc.oooCapMax, a.head.logger) if chunkCreated { r, ok := acc.oooMmapMarkers[series.ref] if !ok || r != nil { @@ -1198,208 +1319,79 @@ func (a *headAppender) commitSamples(acc *appenderCommitContext) { } if ok { acc.wblSamples = append(acc.wblSamples, s) + switch sType { + case chunkenc.ValFloat: + acc.wblFloatCount++ + acc.oooFloatsAccepted++ + case chunkenc.ValHistogram: + acc.wblHistogramCount++ + acc.oooHistogramAccepted++ + case chunkenc.ValFloatHistogram: + acc.wblFloatHistogramCount++ + acc.oooHistogramAccepted++ + } if s.T < acc.oooMinT { acc.oooMinT = s.T } if s.T > acc.oooMaxT { acc.oooMaxT = s.T } - acc.oooFloatsAccepted++ } else { // Sample is an exact duplicate of the last sample. // NOTE: We can only detect updates if they clash with a sample in the OOOHeadChunk, // not with samples in already flushed OOO chunks. // TODO(codesome): Add error reporting? It depends on addressing https://github.com/prometheus/prometheus/discussions/10305. - acc.floatsAppended-- - } - default: - ok, chunkCreated = series.append(s.T, s.V, a.appendID, acc.appendChunkOpts) - if ok { - if s.T < acc.inOrderMint { - acc.inOrderMint = s.T - } - if s.T > acc.inOrderMaxt { - acc.inOrderMaxt = s.T - } - } else { - // The sample is an exact duplicate, and should be silently dropped. - acc.floatsAppended-- - } - } - - if chunkCreated { - a.head.metrics.chunks.Inc() - a.head.metrics.chunksCreated.Inc() - } - - series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow) - series.pendingCommit = false - series.Unlock() - } -} - -// For details on the commitHistograms function, see the commitSamples docs. -func (a *headAppender) commitHistograms(acc *appenderCommitContext) { - var ok, chunkCreated bool - var series *memSeries - - for i, s := range a.histograms { - series = a.histogramSeries[i] - series.Lock() - - oooSample, _, err := series.appendableHistogram(s.T, s.H, a.headMaxt, a.minValidTime, a.oooTimeWindow) - if err != nil { - handleAppendableError(err, &acc.histogramsAppended, &acc.histoOOORejected, &acc.histoOOBRejected, &acc.histoTooOldRejected) - } - - switch { - case err != nil: - // Do nothing here. - case oooSample: - // Sample is OOO and OOO handling is enabled - // and the delta is within the OOO tolerance. - var mmapRefs []chunks.ChunkDiskMapperRef - ok, chunkCreated, mmapRefs = series.insert(s.T, 0, s.H, nil, a.head.chunkDiskMapper, acc.oooCapMax, a.head.logger) - if chunkCreated { - r, ok := acc.oooMmapMarkers[series.ref] - if !ok || r != nil { - // !ok means there are no markers collected for these samples yet. So we first flush the samples - // before setting this m-map marker. - - // r != 0 means we have already m-mapped a chunk for this series in the same Commit(). - // Hence, before we m-map again, we should add the samples and m-map markers - // seen till now to the WBL records. - acc.collectOOORecords(a) - } - - if acc.oooMmapMarkers == nil { - acc.oooMmapMarkers = make(map[chunks.HeadSeriesRef][]chunks.ChunkDiskMapperRef) - } - if len(mmapRefs) > 0 { - acc.oooMmapMarkers[series.ref] = mmapRefs - acc.oooMmapMarkersCount += len(mmapRefs) + if sType == chunkenc.ValFloat { + acc.floatsAppended-- } else { - // No chunk was written to disk, so we need to set an initial marker for this series. - acc.oooMmapMarkers[series.ref] = []chunks.ChunkDiskMapperRef{0} - acc.oooMmapMarkersCount++ + acc.histogramsAppended-- } } - if ok { - acc.wblHistograms = append(acc.wblHistograms, s) - if s.T < acc.oooMinT { - acc.oooMinT = s.T - } - if s.T > acc.oooMaxT { - acc.oooMaxT = s.T - } - acc.oooHistogramAccepted++ - } else { - // Sample is an exact duplicate of the last sample. - // NOTE: We can only detect updates if they clash with a sample in the OOOHeadChunk, - // not with samples in already flushed OOO chunks. - // TODO(codesome): Add error reporting? It depends on addressing https://github.com/prometheus/prometheus/discussions/10305. - acc.histogramsAppended-- - } default: - ok, chunkCreated = series.appendHistogram(s.T, s.H, a.appendID, acc.appendChunkOpts) - if ok { - if s.T < acc.inOrderMint { - acc.inOrderMint = s.T - } - if s.T > acc.inOrderMaxt { - acc.inOrderMaxt = s.T - } - } else { - acc.histogramsAppended-- - acc.histoOOORejected++ - } - } - - if chunkCreated { - a.head.metrics.chunks.Inc() - a.head.metrics.chunksCreated.Inc() - } - - series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow) - series.pendingCommit = false - series.Unlock() - } -} - -// For details on the commitFloatHistograms function, see the commitSamples docs. -func (a *headAppender) commitFloatHistograms(acc *appenderCommitContext) { - var ok, chunkCreated bool - var series *memSeries - - for i, s := range a.floatHistograms { - series = a.floatHistogramSeries[i] - series.Lock() - - oooSample, _, err := series.appendableFloatHistogram(s.T, s.FH, a.headMaxt, a.minValidTime, a.oooTimeWindow) - if err != nil { - handleAppendableError(err, &acc.histogramsAppended, &acc.histoOOORejected, &acc.histoOOBRejected, &acc.histoTooOldRejected) - } - - switch { - case err != nil: - // Do nothing here. - case oooSample: - // Sample is OOO and OOO handling is enabled - // and the delta is within the OOO tolerance. - var mmapRefs []chunks.ChunkDiskMapperRef - ok, chunkCreated, mmapRefs = series.insert(s.T, 0, nil, s.FH, a.head.chunkDiskMapper, acc.oooCapMax, a.head.logger) - if chunkCreated { - r, ok := acc.oooMmapMarkers[series.ref] - if !ok || r != nil { - // !ok means there are no markers collected for these samples yet. So we first flush the samples - // before setting this m-map marker. - - // r != 0 means we have already m-mapped a chunk for this series in the same Commit(). - // Hence, before we m-map again, we should add the samples and m-map markers - // seen till now to the WBL records. - acc.collectOOORecords(a) - } - - if acc.oooMmapMarkers == nil { - acc.oooMmapMarkers = make(map[chunks.HeadSeriesRef][]chunks.ChunkDiskMapperRef) - } - if len(mmapRefs) > 0 { - acc.oooMmapMarkers[series.ref] = mmapRefs - acc.oooMmapMarkersCount += len(mmapRefs) + switch sType { + case chunkenc.ValFloat: + ok, chunkCreated = series.append(s.T, s.V, a.appendID, acc.appendChunkOpts) + if ok { + if s.T < acc.inOrderMint { + acc.inOrderMint = s.T + } + if s.T > acc.inOrderMaxt { + acc.inOrderMaxt = s.T + } } else { - // No chunk was written to disk, so we need to set an initial marker for this series. - acc.oooMmapMarkers[series.ref] = []chunks.ChunkDiskMapperRef{0} - acc.oooMmapMarkersCount++ + // The sample is an exact duplicate, and should be silently dropped. + acc.floatsAppended-- } - } - if ok { - acc.wblFloatHistograms = append(acc.wblFloatHistograms, s) - if s.T < acc.oooMinT { - acc.oooMinT = s.T + case chunkenc.ValHistogram: + ok, chunkCreated = series.appendHistogram(s.T, h, a.appendID, acc.appendChunkOpts) + if ok { + if s.T < acc.inOrderMint { + acc.inOrderMint = s.T + } + if s.T > acc.inOrderMaxt { + acc.inOrderMaxt = s.T + } + } else { + acc.histogramsAppended-- + acc.histoOOORejected++ } - if s.T > acc.oooMaxT { - acc.oooMaxT = s.T + case chunkenc.ValFloatHistogram: + ok, chunkCreated = series.appendFloatHistogram(s.T, fh, a.appendID, acc.appendChunkOpts) + if ok { + if s.T < acc.inOrderMint { + acc.inOrderMint = s.T + } + if s.T > acc.inOrderMaxt { + acc.inOrderMaxt = s.T + } + } else { + acc.histogramsAppended-- + acc.histoOOORejected++ } - acc.oooHistogramAccepted++ - } else { - // Sample is an exact duplicate of the last sample. - // NOTE: We can only detect updates if they clash with a sample in the OOOHeadChunk, - // not with samples in already flushed OOO chunks. - // TODO(codesome): Add error reporting? It depends on addressing https://github.com/prometheus/prometheus/discussions/10305. - acc.histogramsAppended-- - } - default: - ok, chunkCreated = series.appendFloatHistogram(s.T, s.FH, a.appendID, acc.appendChunkOpts) - if ok { - if s.T < acc.inOrderMint { - acc.inOrderMint = s.T - } - if s.T > acc.inOrderMaxt { - acc.inOrderMaxt = s.T - } - } else { - acc.histogramsAppended-- - acc.histoOOORejected++ + default: + // This should never happen, as we only append samples of known types. + series.Unlock() + continue } } @@ -1459,14 +1451,12 @@ func (a *headAppender) Commit() (err error) { defer a.head.putAppendBuffer(a.samples) defer a.head.putSeriesBuffer(a.sampleSeries) defer a.head.putExemplarBuffer(a.exemplars) - defer a.head.putHistogramBuffer(a.histograms) - defer a.head.putFloatHistogramBuffer(a.floatHistograms) defer a.head.putMetadataBuffer(a.metadata) defer a.head.iso.closeAppend(a.appendID) acc := &appenderCommitContext{ - floatsAppended: len(a.samples), - histogramsAppended: len(a.histograms) + len(a.floatHistograms), + floatsAppended: a.floatCount, + histogramsAppended: a.histogramCount + a.floatHistogramCount, inOrderMint: math.MaxInt64, inOrderMaxt: math.MinInt64, oooMinT: math.MaxInt64, @@ -1486,8 +1476,6 @@ func (a *headAppender) Commit() (err error) { }() a.commitSamples(acc) - a.commitHistograms(acc) - a.commitFloatHistograms(acc) a.commitMetadata() // Unmark all series as pending commit after all samples have been committed. a.unmarkCreatedSeriesAsPendingCommit() @@ -1974,21 +1962,14 @@ func (a *headAppender) Rollback() (err error) { series.pendingCommit = false series.Unlock() } - for i := range a.histograms { - series = a.histogramSeries[i] - series.Lock() - series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow) - series.pendingCommit = false - series.Unlock() - } a.head.putAppendBuffer(a.samples) a.head.putExemplarBuffer(a.exemplars) - a.head.putHistogramBuffer(a.histograms) - a.head.putFloatHistogramBuffer(a.floatHistograms) a.head.putMetadataBuffer(a.metadata) a.samples = nil + a.floatCount = 0 + a.histogramCount = 0 + a.floatHistogramCount = 0 a.exemplars = nil - a.histograms = nil a.metadata = nil // Series are created in the head memory regardless of rollback. Thus we have diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 485b8b7b1f..21324febf7 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -5083,8 +5083,6 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) { samples []chunks.Sample expChunks int err error - // If this is empty, samples above will be taken instead of this. - addToExp []chunks.Sample }{ // Histograms that end up in the expected samples are copied here so that we // can independently set the CounterResetHint later. @@ -5125,42 +5123,30 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) { err: storage.ErrOutOfOrderSample, }, { - // Combination of histograms and float64 in the same commit. The behaviour is undefined, but we want to also - // verify how TSDB would behave. Here the histogram is appended at the end, hence will be considered as out of order. + // Combination of histograms and float64 in the same commit. samples: []chunks.Sample{ sample{t: 400, f: 4}, - sample{t: 500, h: hists[5]}, // This won't be committed. + sample{t: 500, h: hists[5]}, sample{t: 600, f: 6}, }, - addToExp: []chunks.Sample{ - sample{t: 400, f: 4}, - sample{t: 600, f: 6}, - }, - expChunks: 7, // Only 1 new chunk for float64. + expChunks: 9, }, { - // Here the histogram is appended at the end, hence the first histogram is out of order. + // Combination of histograms and float64 in the same commit. samples: []chunks.Sample{ - sample{t: 700, h: hists[7]}, // Out of order w.r.t. the next float64 sample that is appended first. + sample{t: 700, h: hists[7]}, sample{t: 800, f: 8}, sample{t: 900, h: hists[9]}, }, - addToExp: []chunks.Sample{ - sample{t: 800, f: 8}, - sample{t: 900, h: hists[9].Copy()}, - }, - expChunks: 8, // float64 added to old chunk, only 1 new for histograms. + expChunks: 12, }, { - // Float histogram is appended at the end. + // Combination of different histograms in the same commit. samples: []chunks.Sample{ - sample{t: 1000, fh: floatHists[7]}, // Out of order w.r.t. the next histogram. + sample{t: 1000, fh: floatHists[7]}, sample{t: 1100, h: hists[9]}, }, - addToExp: []chunks.Sample{ - sample{t: 1100, h: hists[9].Copy()}, - }, - expChunks: 8, + expChunks: 14, }, } @@ -5178,11 +5164,7 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) { if a.err == nil { require.NoError(t, app.Commit()) - if len(a.addToExp) > 0 { - expResult = append(expResult, a.addToExp...) - } else { - expResult = append(expResult, a.samples...) - } + expResult = append(expResult, a.samples...) checkExpChunks(a.expChunks) } else { require.NoError(t, app.Rollback())