Ignore compacted stale series from WAL
Signed-off-by: Ganesh Vernekar <ganesh.vernekar@reddit.com>
This commit is contained in:
parent
fe9adb0a98
commit
3b79fb207e
|
@ -44,7 +44,6 @@ import (
|
||||||
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
|
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
|
||||||
"github.com/prometheus/prometheus/tsdb/fileutil"
|
"github.com/prometheus/prometheus/tsdb/fileutil"
|
||||||
_ "github.com/prometheus/prometheus/tsdb/goversion" // Load the package into main to make sure minimum Go version is met.
|
_ "github.com/prometheus/prometheus/tsdb/goversion" // Load the package into main to make sure minimum Go version is met.
|
||||||
"github.com/prometheus/prometheus/tsdb/index"
|
|
||||||
"github.com/prometheus/prometheus/tsdb/tsdbutil"
|
"github.com/prometheus/prometheus/tsdb/tsdbutil"
|
||||||
"github.com/prometheus/prometheus/tsdb/wlog"
|
"github.com/prometheus/prometheus/tsdb/wlog"
|
||||||
"github.com/prometheus/prometheus/util/compression"
|
"github.com/prometheus/prometheus/util/compression"
|
||||||
|
@ -1574,7 +1573,9 @@ func (db *DB) CompactStaleHead() error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
db.head.truncateStaleSeries(index.NewListPostings(staleSeriesRefs), maxt)
|
if err := db.head.truncateStaleSeries(staleSeriesRefs, maxt); err != nil {
|
||||||
|
return fmt.Errorf("head truncate: %w", err)
|
||||||
|
}
|
||||||
db.head.RebuildSymbolTable(db.logger)
|
db.head.RebuildSymbolTable(db.logger)
|
||||||
|
|
||||||
db.logger.Info("Ending stale series compaction")
|
db.logger.Info("Ending stale series compaction")
|
||||||
|
|
|
@ -9673,7 +9673,9 @@ func TestStaleSeriesCompaction(t *testing.T) {
|
||||||
nonFirstFH.CounterResetHint = histogram.NotCounterReset
|
nonFirstFH.CounterResetHint = histogram.NotCounterReset
|
||||||
|
|
||||||
// Verify head block.
|
// Verify head block.
|
||||||
{
|
verifyHeadBlock := func() {
|
||||||
|
require.Equal(t, uint64(3), db.head.NumSeries())
|
||||||
|
require.Equal(t, uint64(0), db.head.NumStaleSeries())
|
||||||
|
|
||||||
expHeadQuery := make(map[string][]chunks.Sample)
|
expHeadQuery := make(map[string][]chunks.Sample)
|
||||||
for i := 0; i < numSeriesPerCategory; i++ {
|
for i := 0; i < numSeriesPerCategory; i++ {
|
||||||
|
@ -9697,6 +9699,8 @@ func TestStaleSeriesCompaction(t *testing.T) {
|
||||||
require.Equal(t, expHeadQuery, seriesSet)
|
require.Equal(t, expHeadQuery, seriesSet)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
verifyHeadBlock()
|
||||||
|
|
||||||
// Verify blocks from stale series.
|
// Verify blocks from stale series.
|
||||||
{
|
{
|
||||||
expBlockQuery := make(map[string][]chunks.Sample)
|
expBlockQuery := make(map[string][]chunks.Sample)
|
||||||
|
@ -9769,4 +9773,14 @@ func TestStaleSeriesCompaction(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
// Restart DB and verify that stale series were discarded from WAL replay.
|
||||||
|
require.NoError(t, db.Close())
|
||||||
|
var err error
|
||||||
|
db, err = Open(db.Dir(), db.logger, db.registerer, db.opts, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
verifyHeadBlock()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
90
tsdb/head.go
90
tsdb/head.go
|
@ -1198,18 +1198,33 @@ func (h *Head) truncateMemory(mint int64) (err error) {
|
||||||
return h.truncateSeriesAndChunkDiskMapper("truncateMemory")
|
return h.truncateSeriesAndChunkDiskMapper("truncateMemory")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Head) truncateStaleSeries(p index.Postings, maxt int64) {
|
func (h *Head) truncateStaleSeries(seriesRefs []storage.SeriesRef, maxt int64) error {
|
||||||
h.chunkSnapshotMtx.Lock()
|
h.chunkSnapshotMtx.Lock()
|
||||||
defer h.chunkSnapshotMtx.Unlock()
|
defer h.chunkSnapshotMtx.Unlock()
|
||||||
|
|
||||||
if h.MinTime() >= maxt {
|
// Record these stale series refs in the WAL so that we can ignore them during replay.
|
||||||
return
|
if h.wal != nil {
|
||||||
|
stones := make([]tombstones.Stone, 0, len(seriesRefs))
|
||||||
|
for _, ref := range seriesRefs {
|
||||||
|
stones = append(stones, tombstones.Stone{
|
||||||
|
Ref: ref,
|
||||||
|
Intervals: tombstones.Intervals{{Mint: math.MinInt64, Maxt: math.MaxInt64}},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
var enc record.Encoder
|
||||||
|
if err := h.wal.Log(enc.Tombstones(stones, nil)); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if h.MinTime() >= maxt {
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: this will block all queries. See if we can do better.
|
|
||||||
h.WaitForPendingReadersInTimeRange(h.MinTime(), maxt)
|
h.WaitForPendingReadersInTimeRange(h.MinTime(), maxt)
|
||||||
|
|
||||||
h.gcStaleSeries(p, maxt)
|
h.gcStaleSeries(seriesRefs, maxt)
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// WaitForPendingReadersInTimeRange waits for queries overlapping with given range to finish querying.
|
// WaitForPendingReadersInTimeRange waits for queries overlapping with given range to finish querying.
|
||||||
|
@ -1713,10 +1728,10 @@ func (h *Head) gc() (actualInOrderMint, minOOOTime int64, minMmapFile int) {
|
||||||
|
|
||||||
// gcStaleSeries removes all the stale series provided given that they are still stale
|
// gcStaleSeries removes all the stale series provided given that they are still stale
|
||||||
// and the series maxt is <= the given max.
|
// and the series maxt is <= the given max.
|
||||||
func (h *Head) gcStaleSeries(p index.Postings, maxt int64) {
|
func (h *Head) gcStaleSeries(seriesRefs []storage.SeriesRef, maxt int64) {
|
||||||
// Drop old chunks and remember series IDs and hashes if they can be
|
// Drop old chunks and remember series IDs and hashes if they can be
|
||||||
// deleted entirely.
|
// deleted entirely.
|
||||||
deleted, affected, chunksRemoved := h.series.gcStaleSeries(p, maxt)
|
deleted, affected, chunksRemoved := h.series.gcStaleSeries(seriesRefs, maxt)
|
||||||
seriesRemoved := len(deleted)
|
seriesRemoved := len(deleted)
|
||||||
|
|
||||||
h.metrics.seriesRemoved.Add(float64(seriesRemoved))
|
h.metrics.seriesRemoved.Add(float64(seriesRemoved))
|
||||||
|
@ -2122,8 +2137,64 @@ func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) (
|
||||||
return deleted, affected, rmChunks, staleSeriesDeleted, actualMint, minOOOTime, minMmapFile
|
return deleted, affected, rmChunks, staleSeriesDeleted, actualMint, minOOOTime, minMmapFile
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// deleteSeriesByID deletes the series with the given reference.
|
||||||
|
// Only used for WAL replay.
|
||||||
|
func (h *Head) deleteSeriesByID(refs []chunks.HeadSeriesRef) {
|
||||||
|
var (
|
||||||
|
deleted = map[storage.SeriesRef]struct{}{}
|
||||||
|
affected = map[labels.Label]struct{}{}
|
||||||
|
staleSeriesDeleted = 0
|
||||||
|
chunksRemoved = 0
|
||||||
|
)
|
||||||
|
|
||||||
|
for _, ref := range refs {
|
||||||
|
refShard := int(ref) & (h.series.size - 1)
|
||||||
|
h.series.locks[refShard].Lock()
|
||||||
|
|
||||||
|
// Copying getByID here to avoid locking and unlocking twice.
|
||||||
|
series := h.series.series[refShard][ref]
|
||||||
|
if series == nil {
|
||||||
|
h.series.locks[refShard].Unlock()
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if value.IsStaleNaN(series.lastValue) ||
|
||||||
|
(series.lastHistogramValue != nil && value.IsStaleNaN(series.lastHistogramValue.Sum)) ||
|
||||||
|
(series.lastFloatHistogramValue != nil && value.IsStaleNaN(series.lastFloatHistogramValue.Sum)) {
|
||||||
|
staleSeriesDeleted++
|
||||||
|
}
|
||||||
|
|
||||||
|
hash := series.lset.Hash()
|
||||||
|
hashShard := int(hash) & (h.series.size - 1)
|
||||||
|
|
||||||
|
chunksRemoved += len(series.mmappedChunks)
|
||||||
|
if series.headChunks != nil {
|
||||||
|
chunksRemoved += series.headChunks.len()
|
||||||
|
}
|
||||||
|
|
||||||
|
deleted[storage.SeriesRef(series.ref)] = struct{}{}
|
||||||
|
series.lset.Range(func(l labels.Label) { affected[l] = struct{}{} })
|
||||||
|
h.series.hashes[hashShard].del(hash, series.ref)
|
||||||
|
delete(h.series.series[refShard], series.ref)
|
||||||
|
|
||||||
|
h.series.locks[refShard].Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
h.metrics.seriesRemoved.Add(float64(len(deleted)))
|
||||||
|
h.metrics.chunksRemoved.Add(float64(chunksRemoved))
|
||||||
|
h.metrics.chunks.Sub(float64(chunksRemoved))
|
||||||
|
h.numSeries.Sub(uint64(len(deleted)))
|
||||||
|
h.numStaleSeries.Sub(uint64(staleSeriesDeleted))
|
||||||
|
|
||||||
|
// Remove deleted series IDs from the postings lists.
|
||||||
|
h.postings.Delete(deleted, affected)
|
||||||
|
|
||||||
|
// Remove tombstones referring to the deleted series.
|
||||||
|
h.tombstones.DeleteTombstones(deleted)
|
||||||
|
}
|
||||||
|
|
||||||
// TODO: add comments.
|
// TODO: add comments.
|
||||||
func (s *stripeSeries) gcStaleSeries(p index.Postings, maxt int64) (_ map[storage.SeriesRef]struct{}, _ map[labels.Label]struct{}, _ int) {
|
func (s *stripeSeries) gcStaleSeries(seriesRefs []storage.SeriesRef, maxt int64) (_ map[storage.SeriesRef]struct{}, _ map[labels.Label]struct{}, _ int) {
|
||||||
var (
|
var (
|
||||||
deleted = map[storage.SeriesRef]struct{}{}
|
deleted = map[storage.SeriesRef]struct{}{}
|
||||||
affected = map[labels.Label]struct{}{}
|
affected = map[labels.Label]struct{}{}
|
||||||
|
@ -2131,8 +2202,7 @@ func (s *stripeSeries) gcStaleSeries(p index.Postings, maxt int64) (_ map[storag
|
||||||
)
|
)
|
||||||
|
|
||||||
staleSeriesMap := map[storage.SeriesRef]struct{}{}
|
staleSeriesMap := map[storage.SeriesRef]struct{}{}
|
||||||
for p.Next() {
|
for _, ref := range seriesRefs {
|
||||||
ref := p.At()
|
|
||||||
staleSeriesMap[ref] = struct{}{}
|
staleSeriesMap[ref] = struct{}{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -308,7 +308,15 @@ Outer:
|
||||||
}
|
}
|
||||||
h.wlReplaySamplesPool.Put(v)
|
h.wlReplaySamplesPool.Put(v)
|
||||||
case []tombstones.Stone:
|
case []tombstones.Stone:
|
||||||
|
// Tombstone records will be fairly rare, so not trying to optimise the allocations here.
|
||||||
|
deleteSeriesShards := make([][]chunks.HeadSeriesRef, concurrency)
|
||||||
for _, s := range v {
|
for _, s := range v {
|
||||||
|
if len(s.Intervals) == 1 && s.Intervals[0].Mint == math.MinInt64 && s.Intervals[0].Maxt == math.MaxInt64 {
|
||||||
|
// This series was fully deleted at this point.
|
||||||
|
mod := uint64(s.Ref) % uint64(concurrency)
|
||||||
|
deleteSeriesShards[mod] = append(deleteSeriesShards[mod], chunks.HeadSeriesRef(s.Ref))
|
||||||
|
continue
|
||||||
|
}
|
||||||
for _, itv := range s.Intervals {
|
for _, itv := range s.Intervals {
|
||||||
if itv.Maxt < h.minValidTime.Load() {
|
if itv.Maxt < h.minValidTime.Load() {
|
||||||
continue
|
continue
|
||||||
|
@ -326,6 +334,14 @@ Outer:
|
||||||
h.tombstones.AddInterval(s.Ref, itv)
|
h.tombstones.AddInterval(s.Ref, itv)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for i := 0; i < concurrency; i++ {
|
||||||
|
if len(deleteSeriesShards[i]) > 0 {
|
||||||
|
processors[i].input <- walSubsetProcessorInputItem{deletedSeriesRefs: deleteSeriesShards[i]}
|
||||||
|
deleteSeriesShards[i] = nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
h.wlReplaytStonesPool.Put(v)
|
h.wlReplaytStonesPool.Put(v)
|
||||||
case []record.RefExemplar:
|
case []record.RefExemplar:
|
||||||
for _, e := range v {
|
for _, e := range v {
|
||||||
|
@ -558,10 +574,11 @@ type walSubsetProcessor struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type walSubsetProcessorInputItem struct {
|
type walSubsetProcessorInputItem struct {
|
||||||
samples []record.RefSample
|
samples []record.RefSample
|
||||||
histogramSamples []histogramRecord
|
histogramSamples []histogramRecord
|
||||||
existingSeries *memSeries
|
existingSeries *memSeries
|
||||||
walSeriesRef chunks.HeadSeriesRef
|
walSeriesRef chunks.HeadSeriesRef
|
||||||
|
deletedSeriesRefs []chunks.HeadSeriesRef
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wp *walSubsetProcessor) setup() {
|
func (wp *walSubsetProcessor) setup() {
|
||||||
|
@ -712,6 +729,10 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp
|
||||||
case wp.histogramsOutput <- in.histogramSamples:
|
case wp.histogramsOutput <- in.histogramSamples:
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(in.deletedSeriesRefs) > 0 {
|
||||||
|
h.deleteSeriesByID(in.deletedSeriesRefs)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
h.updateMinMaxTime(mint, maxt)
|
h.updateMinMaxTime(mint, maxt)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue