Track stale series in the Head

Signed-off-by: Ganesh Vernekar <ganesh.vernekar@reddit.com>
This commit is contained in:
Ganesh Vernekar 2025-07-24 17:38:18 -07:00
parent 44b0fbba1e
commit 7a947d3629
2 changed files with 56 additions and 3 deletions

View File

@ -36,6 +36,7 @@ import (
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/metadata"
"github.com/prometheus/prometheus/model/value"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
@ -68,6 +69,7 @@ var (
type Head struct {
chunkRange atomic.Int64
numSeries atomic.Uint64
numStaleSeries atomic.Uint64
minOOOTime, maxOOOTime atomic.Int64 // TODO(jesusvazquez) These should be updated after garbage collection.
minTime, maxTime atomic.Int64 // Current min and max of the samples included in the head. TODO(jesusvazquez) Ensure these are properly tracked.
minValidTime atomic.Int64 // Mint allowed to be added to the head. It shouldn't be lower than the maxt of the last persisted block.
@ -360,6 +362,7 @@ func (h *Head) resetWLReplayResources() {
type headMetrics struct {
activeAppenders prometheus.Gauge
series prometheus.GaugeFunc
staleSeries prometheus.GaugeFunc
seriesCreated prometheus.Counter
seriesRemoved prometheus.Counter
seriesNotFound prometheus.Counter
@ -406,6 +409,12 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
}, func() float64 {
return float64(h.NumSeries())
}),
staleSeries: prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "prometheus_tsdb_head_stale_series",
Help: "Total number of stale series in the head block.",
}, func() float64 {
return float64(h.NumStaleSeries())
}),
seriesCreated: prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_head_series_created_total",
Help: "Total number of series created in the head",
@ -532,6 +541,7 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
r.MustRegister(
m.activeAppenders,
m.series,
m.staleSeries,
m.chunks,
m.chunksCreated,
m.chunksRemoved,
@ -1607,7 +1617,7 @@ func (h *Head) gc() (actualInOrderMint, minOOOTime int64, minMmapFile int) {
// Drop old chunks and remember series IDs and hashes if they can be
// deleted entirely.
deleted, affected, chunksRemoved, actualInOrderMint, minOOOTime, minMmapFile := h.series.gc(mint, minOOOMmapRef)
deleted, affected, chunksRemoved, actualInOrderMint, minOOOTime, minMmapFile := h.series.gc(mint, minOOOMmapRef, &h.numStaleSeries)
seriesRemoved := len(deleted)
h.metrics.seriesRemoved.Add(float64(seriesRemoved))
@ -1645,11 +1655,16 @@ func (h *Head) Tombstones() (tombstones.Reader, error) {
return h.tombstones, nil
}
// NumSeries returns the number of active series in the head.
// NumSeries returns the number of series tracked in the head.
func (h *Head) NumSeries() uint64 {
return h.numSeries.Load()
}
// NumStaleSeries returns the number of stale series in the head.
func (h *Head) NumStaleSeries() uint64 {
return h.numStaleSeries.Load()
}
var headULID = ulid.MustParse("0000000000XXXXXXXXXXXXHEAD")
// Meta returns meta information about the head.
@ -1929,7 +1944,7 @@ func newStripeSeries(stripeSize int, seriesCallback SeriesLifecycleCallback) *st
// but the returned map goes into postings.Delete() which expects a map[storage.SeriesRef]struct
// and there's no easy way to cast maps.
// minMmapFile is the min mmap file number seen in the series (in-order and out-of-order) after gc'ing the series.
func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) (_ map[storage.SeriesRef]struct{}, _ map[labels.Label]struct{}, _ int, _, _ int64, minMmapFile int) {
func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef, numStaleSeries *atomic.Uint64) (_ map[storage.SeriesRef]struct{}, _ map[labels.Label]struct{}, _ int, _, _ int64, minMmapFile int) {
var (
deleted = map[storage.SeriesRef]struct{}{}
affected = map[labels.Label]struct{}{}
@ -1987,6 +2002,12 @@ func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) (
defer s.locks[refShard].Unlock()
}
if value.IsStaleNaN(series.lastValue) ||
(series.lastHistogramValue != nil && value.IsStaleNaN(series.lastHistogramValue.Sum)) ||
(series.lastFloatHistogramValue != nil && value.IsStaleNaN(series.lastFloatHistogramValue.Sum)) {
numStaleSeries.Dec()
}
deleted[storage.SeriesRef(series.ref)] = struct{}{}
series.lset.Range(func(l labels.Label) { affected[l] = struct{}{} })
s.hashes[hashShard].del(hash, series.ref)

View File

@ -1222,6 +1222,8 @@ func (a *headAppender) commitSamples(acc *appenderCommitContext) {
acc.floatsAppended--
}
default:
newlyStale := !value.IsStaleNaN(series.lastValue) && value.IsStaleNaN(s.V)
staleToNonStale := value.IsStaleNaN(series.lastValue) && !value.IsStaleNaN(s.V)
ok, chunkCreated = series.append(s.T, s.V, a.appendID, acc.appendChunkOpts)
if ok {
if s.T < acc.inOrderMint {
@ -1230,6 +1232,12 @@ func (a *headAppender) commitSamples(acc *appenderCommitContext) {
if s.T > acc.inOrderMaxt {
acc.inOrderMaxt = s.T
}
if newlyStale {
a.head.numStaleSeries.Inc()
}
if staleToNonStale {
a.head.numStaleSeries.Dec()
}
} else {
// The sample is an exact duplicate, and should be silently dropped.
acc.floatsAppended--
@ -1310,6 +1318,12 @@ func (a *headAppender) commitHistograms(acc *appenderCommitContext) {
acc.histogramsAppended--
}
default:
newlyStale := value.IsStaleNaN(s.H.Sum)
staleToNonStale := false
if series.lastHistogramValue != nil {
newlyStale = newlyStale && !value.IsStaleNaN(series.lastHistogramValue.Sum)
staleToNonStale = value.IsStaleNaN(series.lastHistogramValue.Sum) && !value.IsStaleNaN(s.H.Sum)
}
ok, chunkCreated = series.appendHistogram(s.T, s.H, a.appendID, acc.appendChunkOpts)
if ok {
if s.T < acc.inOrderMint {
@ -1318,6 +1332,12 @@ func (a *headAppender) commitHistograms(acc *appenderCommitContext) {
if s.T > acc.inOrderMaxt {
acc.inOrderMaxt = s.T
}
if newlyStale {
a.head.numStaleSeries.Inc()
}
if staleToNonStale {
a.head.numStaleSeries.Dec()
}
} else {
acc.histogramsAppended--
acc.histoOOORejected++
@ -1398,6 +1418,12 @@ func (a *headAppender) commitFloatHistograms(acc *appenderCommitContext) {
acc.histogramsAppended--
}
default:
newlyStale := value.IsStaleNaN(s.FH.Sum)
staleToNonStale := false
if series.lastFloatHistogramValue != nil {
newlyStale = newlyStale && !value.IsStaleNaN(series.lastFloatHistogramValue.Sum)
staleToNonStale = value.IsStaleNaN(series.lastFloatHistogramValue.Sum) && !value.IsStaleNaN(s.FH.Sum)
}
ok, chunkCreated = series.appendFloatHistogram(s.T, s.FH, a.appendID, acc.appendChunkOpts)
if ok {
if s.T < acc.inOrderMint {
@ -1406,6 +1432,12 @@ func (a *headAppender) commitFloatHistograms(acc *appenderCommitContext) {
if s.T > acc.inOrderMaxt {
acc.inOrderMaxt = s.T
}
if newlyStale {
a.head.numStaleSeries.Inc()
}
if staleToNonStale {
a.head.numStaleSeries.Dec()
}
} else {
acc.histogramsAppended--
acc.histoOOORejected++