Take the stale series compaction config from the config file
CI / Go tests (push) Has been cancelled Details
CI / More Go tests (push) Has been cancelled Details
CI / Go tests with previous Go version (push) Has been cancelled Details
CI / UI tests (push) Has been cancelled Details
CI / Go tests on Windows (push) Has been cancelled Details
CI / Mixins tests (push) Has been cancelled Details
CI / Build Prometheus for common architectures (0) (push) Has been cancelled Details
CI / Build Prometheus for common architectures (1) (push) Has been cancelled Details
CI / Build Prometheus for common architectures (2) (push) Has been cancelled Details
CI / Build Prometheus for all architectures (0) (push) Has been cancelled Details
CI / Build Prometheus for all architectures (1) (push) Has been cancelled Details
CI / Build Prometheus for all architectures (10) (push) Has been cancelled Details
CI / Build Prometheus for all architectures (11) (push) Has been cancelled Details
CI / Build Prometheus for all architectures (2) (push) Has been cancelled Details
CI / Build Prometheus for all architectures (3) (push) Has been cancelled Details
CI / Build Prometheus for all architectures (4) (push) Has been cancelled Details
CI / Build Prometheus for all architectures (5) (push) Has been cancelled Details
CI / Build Prometheus for all architectures (6) (push) Has been cancelled Details
CI / Build Prometheus for all architectures (7) (push) Has been cancelled Details
CI / Build Prometheus for all architectures (8) (push) Has been cancelled Details
CI / Build Prometheus for all architectures (9) (push) Has been cancelled Details
CI / Check generated parser (push) Has been cancelled Details
CI / golangci-lint (push) Has been cancelled Details
CI / fuzzing (push) Has been cancelled Details
CI / codeql (push) Has been cancelled Details
CI / Report status of build Prometheus for all architectures (push) Has been cancelled Details
CI / Publish main branch artifacts (push) Has been cancelled Details
CI / Publish release artefacts (push) Has been cancelled Details
CI / Publish UI on npm Registry (push) Has been cancelled Details

Signed-off-by: Ganesh Vernekar <ganesh.vernekar@reddit.com>
This commit is contained in:
Ganesh Vernekar 2025-08-25 14:44:07 -07:00
parent 3b79fb207e
commit 5b1e6fe398
4 changed files with 126 additions and 56 deletions

View File

@ -668,6 +668,9 @@ func main() {
}
if cfgFile.StorageConfig.TSDBConfig != nil {
cfg.tsdb.OutOfOrderTimeWindow = cfgFile.StorageConfig.TSDBConfig.OutOfOrderTimeWindow
cfg.tsdb.StaleSeriesCompactionInterval = time.Duration(cfgFile.StorageConfig.TSDBConfig.StaleSeriesCompactionInterval)
cfg.tsdb.StaleSeriesCompactionThreshold = cfgFile.StorageConfig.TSDBConfig.StaleSeriesCompactionThreshold
cfg.tsdb.StaleSeriesImmediateCompactionThreshold = cfgFile.StorageConfig.TSDBConfig.StaleSeriesImmediateCompactionThreshold
}
// Set Go runtime parameters before we get too far into initialization.
@ -1857,50 +1860,56 @@ func (rm *readyScrapeManager) Get() (*scrape.Manager, error) {
// tsdbOptions is tsdb.Option version with defined units.
// This is required as tsdb.Option fields are unit agnostic (time).
type tsdbOptions struct {
WALSegmentSize units.Base2Bytes
MaxBlockChunkSegmentSize units.Base2Bytes
RetentionDuration model.Duration
MaxBytes units.Base2Bytes
NoLockfile bool
WALCompressionType compression.Type
HeadChunksWriteQueueSize int
SamplesPerChunk int
StripeSize int
MinBlockDuration model.Duration
MaxBlockDuration model.Duration
OutOfOrderTimeWindow int64
EnableExemplarStorage bool
MaxExemplars int64
EnableMemorySnapshotOnShutdown bool
EnableNativeHistograms bool
EnableDelayedCompaction bool
CompactionDelayMaxPercent int
EnableOverlappingCompaction bool
UseUncachedIO bool
WALSegmentSize units.Base2Bytes
MaxBlockChunkSegmentSize units.Base2Bytes
RetentionDuration model.Duration
MaxBytes units.Base2Bytes
NoLockfile bool
WALCompressionType compression.Type
HeadChunksWriteQueueSize int
SamplesPerChunk int
StripeSize int
MinBlockDuration model.Duration
MaxBlockDuration model.Duration
OutOfOrderTimeWindow int64
EnableExemplarStorage bool
MaxExemplars int64
EnableMemorySnapshotOnShutdown bool
EnableNativeHistograms bool
EnableDelayedCompaction bool
CompactionDelayMaxPercent int
EnableOverlappingCompaction bool
UseUncachedIO bool
StaleSeriesCompactionInterval time.Duration
StaleSeriesCompactionThreshold float64
StaleSeriesImmediateCompactionThreshold float64
}
func (opts tsdbOptions) ToTSDBOptions() tsdb.Options {
return tsdb.Options{
WALSegmentSize: int(opts.WALSegmentSize),
MaxBlockChunkSegmentSize: int64(opts.MaxBlockChunkSegmentSize),
RetentionDuration: int64(time.Duration(opts.RetentionDuration) / time.Millisecond),
MaxBytes: int64(opts.MaxBytes),
NoLockfile: opts.NoLockfile,
WALCompression: opts.WALCompressionType,
HeadChunksWriteQueueSize: opts.HeadChunksWriteQueueSize,
SamplesPerChunk: opts.SamplesPerChunk,
StripeSize: opts.StripeSize,
MinBlockDuration: int64(time.Duration(opts.MinBlockDuration) / time.Millisecond),
MaxBlockDuration: int64(time.Duration(opts.MaxBlockDuration) / time.Millisecond),
EnableExemplarStorage: opts.EnableExemplarStorage,
MaxExemplars: opts.MaxExemplars,
EnableMemorySnapshotOnShutdown: opts.EnableMemorySnapshotOnShutdown,
EnableNativeHistograms: opts.EnableNativeHistograms,
OutOfOrderTimeWindow: opts.OutOfOrderTimeWindow,
EnableDelayedCompaction: opts.EnableDelayedCompaction,
CompactionDelayMaxPercent: opts.CompactionDelayMaxPercent,
EnableOverlappingCompaction: opts.EnableOverlappingCompaction,
UseUncachedIO: opts.UseUncachedIO,
WALSegmentSize: int(opts.WALSegmentSize),
MaxBlockChunkSegmentSize: int64(opts.MaxBlockChunkSegmentSize),
RetentionDuration: int64(time.Duration(opts.RetentionDuration) / time.Millisecond),
MaxBytes: int64(opts.MaxBytes),
NoLockfile: opts.NoLockfile,
WALCompression: opts.WALCompressionType,
HeadChunksWriteQueueSize: opts.HeadChunksWriteQueueSize,
SamplesPerChunk: opts.SamplesPerChunk,
StripeSize: opts.StripeSize,
MinBlockDuration: int64(time.Duration(opts.MinBlockDuration) / time.Millisecond),
MaxBlockDuration: int64(time.Duration(opts.MaxBlockDuration) / time.Millisecond),
EnableExemplarStorage: opts.EnableExemplarStorage,
MaxExemplars: opts.MaxExemplars,
EnableMemorySnapshotOnShutdown: opts.EnableMemorySnapshotOnShutdown,
EnableNativeHistograms: opts.EnableNativeHistograms,
OutOfOrderTimeWindow: opts.OutOfOrderTimeWindow,
EnableDelayedCompaction: opts.EnableDelayedCompaction,
CompactionDelayMaxPercent: opts.CompactionDelayMaxPercent,
EnableOverlappingCompaction: opts.EnableOverlappingCompaction,
UseUncachedIO: opts.UseUncachedIO,
StaleSeriesCompactionInterval: opts.StaleSeriesCompactionInterval,
StaleSeriesCompactionThreshold: opts.StaleSeriesCompactionThreshold,
StaleSeriesImmediateCompactionThreshold: opts.StaleSeriesImmediateCompactionThreshold,
}
}

View File

@ -1021,6 +1021,19 @@ type TSDBConfig struct {
// During unmarshall, this is converted into milliseconds and stored in OutOfOrderTimeWindow.
// This should not be used directly and must be converted into OutOfOrderTimeWindow.
OutOfOrderTimeWindowFlag model.Duration `yaml:"out_of_order_time_window,omitempty"`
// StaleSeriesCompactionInterval tells at what interval to attempt stale series compaction
// if the number of stale series crosses the given threshold.
StaleSeriesCompactionInterval model.Duration `yaml:"stale_series_compaction_interval,omitempty"`
// StaleSeriesCompactionThreshold is a number between 0.0-1.0 indicating the % of stale series in
// the in-memory Head block. If the % of stale series crosses this threshold, stale series
// compaction will be run in the next stale series compaction interval.
StaleSeriesCompactionThreshold float64 `yaml:"stale_series_compaction_threshold,omitempty"`
// StaleSeriesImmediateCompactionThreshold is a number between 0.0-1.0 indicating the % of stale series in
// the in-memory Head block. If the % of stale series crosses this threshold, stale series is run immediately.
StaleSeriesImmediateCompactionThreshold float64 `yaml:"stale_series_immediate_compaction_threshold,omitempty"`
}
// UnmarshalYAML implements the yaml.Unmarshaler interface.

View File

@ -98,6 +98,16 @@ func DefaultOptions() *Options {
// Options of the DB storage.
type Options struct {
// staleSeriesCompactionInterval is same as below option with same name, but is atomic so that we can do live updates without locks.
// This is the one that must be used by the code.
staleSeriesCompactionInterval atomic.Int64
// staleSeriesCompactionThreshold is same as below option with same name, but is atomic so that we can do live updates without locks.
// This is the one that must be used by the code.
staleSeriesCompactionThreshold atomic.Float64
// staleSeriesImmediateCompactionThreshold is same as below option with same name, but is atomic so that we can do live updates without locks.
// This is the one that must be used by the code.
staleSeriesImmediateCompactionThreshold atomic.Float64
// Segments (wal files) max size.
// WALSegmentSize = 0, segment size is default size.
// WALSegmentSize > 0, segment size is WALSegmentSize.
@ -832,6 +842,10 @@ func validateOpts(opts *Options, rngs []int64) (*Options, []int64) {
rngs = ExponentialBlockRanges(opts.MinBlockDuration, 10, 3)
}
opts.staleSeriesCompactionInterval.Store(int64(opts.StaleSeriesCompactionInterval))
opts.staleSeriesCompactionThreshold.Store(opts.StaleSeriesCompactionThreshold)
opts.staleSeriesImmediateCompactionThreshold.Store(opts.StaleSeriesImmediateCompactionThreshold)
return opts, rngs
}
@ -1103,15 +1117,13 @@ func (db *DB) run(ctx context.Context) {
backoff := time.Duration(0)
nextStaleSeriesCompactionTime := time.Now().Round(db.opts.StaleSeriesCompactionInterval)
if nextStaleSeriesCompactionTime.Before(time.Now()) {
nextStaleSeriesCompactionTime = nextStaleSeriesCompactionTime.Add(db.opts.StaleSeriesCompactionInterval)
}
staleSeriesWaitDur := time.Until(nextStaleSeriesCompactionTime)
if db.opts.StaleSeriesCompactionInterval <= 0 {
// Long enough interval so that we don't schedule a stale series compaction.
staleSeriesWaitDur = 365 * 24 * time.Hour
staleSeriesCompactionInterval := time.Duration(db.opts.staleSeriesCompactionInterval.Load())
nextStaleSeriesCompactionTime := nextStepAlignedTime(staleSeriesCompactionInterval)
timedStaleSeriesCompactionActive := true
if staleSeriesCompactionInterval <= 0 {
// Far enough so that we don't schedule a stale series compaction.
timedStaleSeriesCompactionActive = false
nextStaleSeriesCompactionTime = time.Now().Add(365 * 24 * time.Hour)
}
for {
@ -1121,6 +1133,11 @@ func (db *DB) run(ctx context.Context) {
case <-time.After(backoff):
}
staleSeriesWaitDur := time.Until(nextStaleSeriesCompactionTime)
if staleSeriesWaitDur < 0 {
staleSeriesWaitDur = 0
}
select {
case <-time.After(1 * time.Minute):
db.cmtx.Lock()
@ -1132,8 +1149,8 @@ func (db *DB) run(ctx context.Context) {
// TODO: check if normal compaction is soon, and don't run stale series compaction if it is soon.
numStaleSeries, numSeries := db.Head().NumStaleSeries(), db.Head().NumSeries()
staleSeriesRatio := float64(numStaleSeries) / float64(numSeries)
if db.autoCompact && db.opts.StaleSeriesImmediateCompactionThreshold > 0 &&
staleSeriesRatio >= db.opts.StaleSeriesImmediateCompactionThreshold {
if db.autoCompact && db.opts.staleSeriesImmediateCompactionThreshold.Load() > 0 &&
staleSeriesRatio >= db.opts.staleSeriesImmediateCompactionThreshold.Load() {
if err := db.CompactStaleHead(); err != nil {
db.logger.Error("immediate stale series compaction failed", "err", err)
}
@ -1145,6 +1162,14 @@ func (db *DB) run(ctx context.Context) {
}
// We attempt mmapping of head chunks regularly.
db.head.mmapHeadChunks()
staleSeriesCompactionInterval := time.Duration(db.opts.staleSeriesCompactionInterval.Load())
if !timedStaleSeriesCompactionActive && staleSeriesCompactionInterval > 0 {
// The config was updated in realtime.
timedStaleSeriesCompactionActive = true
nextStaleSeriesCompactionTime = nextStepAlignedTime(staleSeriesCompactionInterval)
}
case <-db.compactc:
db.metrics.compactionsTriggered.Inc()
@ -1161,18 +1186,26 @@ func (db *DB) run(ctx context.Context) {
}
db.autoCompactMtx.Unlock()
case <-time.After(staleSeriesWaitDur):
staleSeriesCompactionInterval := time.Duration(db.opts.staleSeriesCompactionInterval.Load())
if staleSeriesCompactionInterval <= 0 {
// The config was updated in realtime.
// Far enough so that we don't schedule a stale series compaction.
timedStaleSeriesCompactionActive = false
nextStaleSeriesCompactionTime = time.Now().Add(365 * 24 * time.Hour)
continue
}
// TODO: check if normal compaction is soon, and don't run stale series compaction if it is soon.
numStaleSeries, numSeries := db.Head().NumStaleSeries(), db.Head().NumSeries()
staleSeriesRatio := float64(numStaleSeries) / float64(numSeries)
if db.autoCompact && db.opts.StaleSeriesCompactionThreshold > 0 &&
staleSeriesRatio >= db.opts.StaleSeriesCompactionThreshold {
if db.autoCompact && db.opts.staleSeriesCompactionThreshold.Load() > 0 &&
staleSeriesRatio >= db.opts.staleSeriesCompactionThreshold.Load() {
if err := db.CompactStaleHead(); err != nil {
db.logger.Error("scheduled stale series compaction failed", "err", err)
}
}
nextStaleSeriesCompactionTime = nextStaleSeriesCompactionTime.Add(db.opts.StaleSeriesCompactionInterval)
staleSeriesWaitDur = time.Until(nextStaleSeriesCompactionTime)
nextStaleSeriesCompactionTime = nextStepAlignedTime(db.opts.StaleSeriesCompactionInterval)
case <-db.stopc:
return
@ -1180,6 +1213,14 @@ func (db *DB) run(ctx context.Context) {
}
}
func nextStepAlignedTime(step time.Duration) (next time.Time) {
next = time.Now().Round(step)
if next.Before(time.Now()) {
next = next.Add(step)
}
return
}
// Appender opens a new appender against the database.
func (db *DB) Appender(ctx context.Context) storage.Appender {
return dbAppender{db: db, Appender: db.head.Appender(ctx)}
@ -1206,6 +1247,13 @@ func (db *DB) ApplyConfig(conf *config.Config) error {
oooTimeWindow := int64(0)
if conf.StorageConfig.TSDBConfig != nil {
oooTimeWindow = conf.StorageConfig.TSDBConfig.OutOfOrderTimeWindow
db.opts.staleSeriesCompactionInterval.Store(int64(conf.StorageConfig.TSDBConfig.StaleSeriesCompactionInterval))
db.opts.staleSeriesCompactionThreshold.Store(conf.StorageConfig.TSDBConfig.StaleSeriesCompactionThreshold)
db.opts.staleSeriesImmediateCompactionThreshold.Store(conf.StorageConfig.TSDBConfig.StaleSeriesImmediateCompactionThreshold)
} else {
db.opts.staleSeriesCompactionInterval.Store(0)
db.opts.staleSeriesCompactionThreshold.Store(0)
db.opts.staleSeriesImmediateCompactionThreshold.Store(0)
}
if oooTimeWindow < 0 {
oooTimeWindow = 0

View File

@ -1756,7 +1756,7 @@ func (h *Head) gcStaleSeries(seriesRefs []storage.SeriesRef, maxt int64) {
// that reads the WAL, wouldn't be able to use those
// samples since we would have no labels for that ref ID.
for ref := range deleted {
h.walExpiries[chunks.HeadSeriesRef(ref)] = last
h.walExpiries[chunks.HeadSeriesRef(ref)] = int64(last)
}
h.walExpiriesMtx.Unlock()
}