Merge pull request #17029 from pr00se/wal-checkpoint-dropped-samples

TSDB: use timestamps rather than WAL segment numbers to track how long deleted series should be retained in checkpoints
This commit is contained in:
Bryan Boreham 2025-08-20 11:15:10 +01:00 committed by GitHub
commit 498f63e60b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 337 additions and 79 deletions

View File

@ -631,17 +631,20 @@ Loop:
} }
} }
// keepSeriesInWALCheckpoint is used to determine whether a series record should be kept in the checkpoint // keepSeriesInWALCheckpointFn returns a function that is used to determine whether a series record should be kept in the checkpoint.
// last is the last WAL segment that was considered for checkpointing. // last is the last WAL segment that was considered for checkpointing.
func (db *DB) keepSeriesInWALCheckpoint(id chunks.HeadSeriesRef, last int) bool { // NOTE: the agent implementation here is different from the Prometheus implementation, in that it uses WAL segment numbers instead of timestamps.
// Keep the record if the series exists in the db. func (db *DB) keepSeriesInWALCheckpointFn(last int) func(id chunks.HeadSeriesRef) bool {
if db.series.GetByID(id) != nil { return func(id chunks.HeadSeriesRef) bool {
return true // Keep the record if the series exists in the db.
} if db.series.GetByID(id) != nil {
return true
}
// Keep the record if the series was recently deleted. // Keep the record if the series was recently deleted.
seg, ok := db.deleted[id] seg, ok := db.deleted[id]
return ok && seg > last return ok && seg > last
}
} }
func (db *DB) truncate(mint int64) error { func (db *DB) truncate(mint int64) error {
@ -678,7 +681,7 @@ func (db *DB) truncate(mint int64) error {
db.metrics.checkpointCreationTotal.Inc() db.metrics.checkpointCreationTotal.Inc()
if _, err = wlog.Checkpoint(db.logger, db.wal, first, last, db.keepSeriesInWALCheckpoint, mint); err != nil { if _, err = wlog.Checkpoint(db.logger, db.wal, first, last, db.keepSeriesInWALCheckpointFn(last), mint); err != nil {
db.metrics.checkpointCreationFail.Inc() db.metrics.checkpointCreationFail.Inc()
var cerr *wlog.CorruptionErr var cerr *wlog.CorruptionErr
if errors.As(err, &cerr) { if errors.As(err, &cerr) {

View File

@ -1557,7 +1557,7 @@ func TestSizeRetention(t *testing.T) {
// Create a WAL checkpoint, and compare sizes. // Create a WAL checkpoint, and compare sizes.
first, last, err := wlog.Segments(db.Head().wal.Dir()) first, last, err := wlog.Segments(db.Head().wal.Dir())
require.NoError(t, err) require.NoError(t, err)
_, err = wlog.Checkpoint(promslog.NewNopLogger(), db.Head().wal, first, last-1, func(chunks.HeadSeriesRef, int) bool { return false }, 0) _, err = wlog.Checkpoint(promslog.NewNopLogger(), db.Head().wal, first, last-1, func(chunks.HeadSeriesRef) bool { return false }, 0)
require.NoError(t, err) require.NoError(t, err)
blockSize = int64(prom_testutil.ToFloat64(db.metrics.blocksBytes)) // Use the actual internal metrics. blockSize = int64(prom_testutil.ToFloat64(db.metrics.blocksBytes)) // Use the actual internal metrics.
walSize, err = db.Head().wal.Size() walSize, err = db.Head().wal.Size()
@ -4738,7 +4738,7 @@ func TestMetadataCheckpointingOnlyKeepsLatestEntry(t *testing.T) {
// Let's create a checkpoint. // Let's create a checkpoint.
first, last, err := wlog.Segments(w.Dir()) first, last, err := wlog.Segments(w.Dir())
require.NoError(t, err) require.NoError(t, err)
keep := func(id chunks.HeadSeriesRef, _ int) bool { keep := func(id chunks.HeadSeriesRef) bool {
return id != 3 return id != 3
} }
_, err = wlog.Checkpoint(promslog.NewNopLogger(), w, first, last-1, keep, 0) _, err = wlog.Checkpoint(promslog.NewNopLogger(), w, first, last-1, keep, 0)

View File

@ -110,7 +110,7 @@ type Head struct {
series *stripeSeries series *stripeSeries
walExpiriesMtx sync.Mutex walExpiriesMtx sync.Mutex
walExpiries map[chunks.HeadSeriesRef]int // Series no longer in the head, and what WAL segment they must be kept until. walExpiries map[chunks.HeadSeriesRef]int64 // Series no longer in the head, and what time they must be kept until.
// TODO(codesome): Extend MemPostings to return only OOOPostings, Set OOOStatus, ... Like an additional map of ooo postings. // TODO(codesome): Extend MemPostings to return only OOOPostings, Set OOOStatus, ... Like an additional map of ooo postings.
postings *index.MemPostings // Postings lists for terms. postings *index.MemPostings // Postings lists for terms.
@ -337,7 +337,7 @@ func (h *Head) resetInMemoryState() error {
h.exemplars = es h.exemplars = es
h.postings = index.NewUnorderedMemPostings() h.postings = index.NewUnorderedMemPostings()
h.tombstones = tombstones.NewMemTombstones() h.tombstones = tombstones.NewMemTombstones()
h.walExpiries = map[chunks.HeadSeriesRef]int{} h.walExpiries = map[chunks.HeadSeriesRef]int64{}
h.chunkRange.Store(h.opts.ChunkRange) h.chunkRange.Store(h.opts.ChunkRange)
h.minTime.Store(math.MaxInt64) h.minTime.Store(math.MaxInt64)
h.maxTime.Store(math.MinInt64) h.maxTime.Store(math.MinInt64)
@ -791,7 +791,7 @@ func (h *Head) Init(minValidTime int64) error {
// A corrupted checkpoint is a hard error for now and requires user // A corrupted checkpoint is a hard error for now and requires user
// intervention. There's likely little data that can be recovered anyway. // intervention. There's likely little data that can be recovered anyway.
if err := h.loadWAL(wlog.NewReader(sr), syms, multiRef, mmappedChunks, oooMmappedChunks, endAt); err != nil { if err := h.loadWAL(wlog.NewReader(sr), syms, multiRef, mmappedChunks, oooMmappedChunks); err != nil {
return fmt.Errorf("backfill checkpoint: %w", err) return fmt.Errorf("backfill checkpoint: %w", err)
} }
h.updateWALReplayStatusRead(startFrom) h.updateWALReplayStatusRead(startFrom)
@ -825,7 +825,7 @@ func (h *Head) Init(minValidTime int64) error {
if err != nil { if err != nil {
return fmt.Errorf("segment reader (offset=%d): %w", offset, err) return fmt.Errorf("segment reader (offset=%d): %w", offset, err)
} }
err = h.loadWAL(wlog.NewReader(sr), syms, multiRef, mmappedChunks, oooMmappedChunks, endAt) err = h.loadWAL(wlog.NewReader(sr), syms, multiRef, mmappedChunks, oooMmappedChunks)
if err := sr.Close(); err != nil { if err := sr.Close(); err != nil {
h.logger.Warn("Error while closing the wal segments reader", "err", err) h.logger.Warn("Error while closing the wal segments reader", "err", err)
} }
@ -1282,7 +1282,7 @@ func (h *Head) IsQuerierCollidingWithTruncation(querierMint, querierMaxt int64)
return false, false, 0 return false, false, 0
} }
func (h *Head) getWALExpiry(id chunks.HeadSeriesRef) (int, bool) { func (h *Head) getWALExpiry(id chunks.HeadSeriesRef) (int64, bool) {
h.walExpiriesMtx.Lock() h.walExpiriesMtx.Lock()
defer h.walExpiriesMtx.Unlock() defer h.walExpiriesMtx.Unlock()
@ -1290,24 +1290,27 @@ func (h *Head) getWALExpiry(id chunks.HeadSeriesRef) (int, bool) {
return keepUntil, ok return keepUntil, ok
} }
func (h *Head) setWALExpiry(id chunks.HeadSeriesRef, keepUntil int) { // updateWALExpiry updates the WAL expiry for a series, keeping the higher of the current value and keepUntil.
func (h *Head) updateWALExpiry(id chunks.HeadSeriesRef, keepUntil int64) {
h.walExpiriesMtx.Lock() h.walExpiriesMtx.Lock()
defer h.walExpiriesMtx.Unlock() defer h.walExpiriesMtx.Unlock()
h.walExpiries[id] = keepUntil h.walExpiries[id] = max(keepUntil, h.walExpiries[id])
} }
// keepSeriesInWALCheckpoint is used to determine whether a series record should be kept in the checkpoint // keepSeriesInWALCheckpointFn returns a function that is used to determine whether a series record should be kept in the checkpoint.
// last is the last WAL segment that was considered for checkpointing. // mint is the time before which data in the WAL is being truncated.
func (h *Head) keepSeriesInWALCheckpoint(id chunks.HeadSeriesRef, last int) bool { func (h *Head) keepSeriesInWALCheckpointFn(mint int64) func(id chunks.HeadSeriesRef) bool {
// Keep the record if the series exists in the head. return func(id chunks.HeadSeriesRef) bool {
if h.series.getByID(id) != nil { // Keep the record if the series exists in the head.
return true if h.series.getByID(id) != nil {
} return true
}
// Keep the record if the series has an expiry set. // Keep the record if the series has an expiry set.
keepUntil, ok := h.getWALExpiry(id) keepUntil, ok := h.getWALExpiry(id)
return ok && keepUntil > last return ok && keepUntil >= mint
}
} }
// truncateWAL removes old data before mint from the WAL. // truncateWAL removes old data before mint from the WAL.
@ -1344,7 +1347,7 @@ func (h *Head) truncateWAL(mint int64) error {
} }
h.metrics.checkpointCreationTotal.Inc() h.metrics.checkpointCreationTotal.Inc()
if _, err = wlog.Checkpoint(h.logger, h.wal, first, last, h.keepSeriesInWALCheckpoint, mint); err != nil { if _, err = wlog.Checkpoint(h.logger, h.wal, first, last, h.keepSeriesInWALCheckpointFn(mint), mint); err != nil {
h.metrics.checkpointCreationFail.Inc() h.metrics.checkpointCreationFail.Inc()
var cerr *chunks.CorruptionErr var cerr *chunks.CorruptionErr
if errors.As(err, &cerr) { if errors.As(err, &cerr) {
@ -1359,11 +1362,10 @@ func (h *Head) truncateWAL(mint int64) error {
h.logger.Error("truncating segments failed", "err", err) h.logger.Error("truncating segments failed", "err", err)
} }
// The checkpoint is written and segments before it is truncated, so stop // The checkpoint is written and data before mint is truncated, so stop tracking expired series.
// tracking expired series.
h.walExpiriesMtx.Lock() h.walExpiriesMtx.Lock()
for ref, segment := range h.walExpiries { for ref, keepUntil := range h.walExpiries {
if segment <= last { if keepUntil < mint {
delete(h.walExpiries, ref) delete(h.walExpiries, ref)
} }
} }
@ -1633,16 +1635,13 @@ func (h *Head) gc() (actualInOrderMint, minOOOTime int64, minMmapFile int) {
h.tombstones.TruncateBefore(mint) h.tombstones.TruncateBefore(mint)
if h.wal != nil { if h.wal != nil {
_, last, _ := wlog.Segments(h.wal.Dir())
h.walExpiriesMtx.Lock() h.walExpiriesMtx.Lock()
// Keep series records until we're past segment 'last' // Samples for deleted series are likely still in the WAL, so flag that the deleted series records should be kept during
// because the WAL will still have samples records with // WAL checkpointing while the WAL contains data through actualInOrderMint.
// this ref ID. If we didn't keep these series records then // If we didn't keep these series records then on start up when we replay the WAL, or any other code that reads the WAL,
// on start up when we replay the WAL, or any other code // wouldn't be able to use those samples since we would have no labels for that ref ID.
// that reads the WAL, wouldn't be able to use those
// samples since we would have no labels for that ref ID.
for ref := range deleted { for ref := range deleted {
h.walExpiries[chunks.HeadSeriesRef(ref)] = last h.walExpiries[chunks.HeadSeriesRef(ref)] = actualInOrderMint
} }
h.walExpiriesMtx.Unlock() h.walExpiriesMtx.Unlock()
} }

View File

@ -162,6 +162,10 @@ func populateTestWL(t testing.TB, w *wlog.WL, recs []interface{}, buf []byte) []
buf = enc.Tombstones(v, buf) buf = enc.Tombstones(v, buf)
case []record.RefExemplar: case []record.RefExemplar:
buf = enc.Exemplars(v, buf) buf = enc.Exemplars(v, buf)
case []record.RefHistogramSample:
buf, _ = enc.HistogramSamples(v, buf)
case []record.RefFloatHistogramSample:
buf, _ = enc.FloatHistogramSamples(v, buf)
case []record.RefMmapMarker: case []record.RefMmapMarker:
buf = enc.MmapMarkers(v, buf) buf = enc.MmapMarkers(v, buf)
case []record.RefMetadata: case []record.RefMetadata:
@ -766,9 +770,7 @@ func TestHead_ReadWAL(t *testing.T) {
// But it should have a WAL expiry set. // But it should have a WAL expiry set.
keepUntil, ok := head.getWALExpiry(101) keepUntil, ok := head.getWALExpiry(101)
require.True(t, ok) require.True(t, ok)
_, last, err := wlog.Segments(w.Dir()) require.Equal(t, int64(101), keepUntil)
require.NoError(t, err)
require.Equal(t, last, keepUntil)
// Only the duplicate series record should have a WAL expiry set. // Only the duplicate series record should have a WAL expiry set.
_, ok = head.getWALExpiry(50) _, ok = head.getWALExpiry(50)
require.False(t, ok) require.False(t, ok)
@ -886,17 +888,265 @@ func TestHead_WALMultiRef(t *testing.T) {
}}, series) }}, series)
} }
func TestHead_WALCheckpointMultiRef(t *testing.T) {
cases := []struct {
name string
walEntries []interface{}
expectedWalExpiry int64
walTruncateMinT int64
expectedWalEntries []interface{}
}{
{
name: "Samples only; keep needed duplicate series record",
walEntries: []interface{}{
[]record.RefSeries{
{Ref: 1, Labels: labels.FromStrings("a", "1")},
{Ref: 2, Labels: labels.FromStrings("a", "1")},
},
[]record.RefSample{
{Ref: 1, T: 100, V: 1},
{Ref: 2, T: 200, V: 2},
{Ref: 2, T: 500, V: 3},
},
},
expectedWalExpiry: 500,
walTruncateMinT: 500,
expectedWalEntries: []interface{}{
[]record.RefSeries{
{Ref: 1, Labels: labels.FromStrings("a", "1")},
{Ref: 2, Labels: labels.FromStrings("a", "1")},
},
[]record.RefSample{
{Ref: 2, T: 500, V: 3},
},
},
},
{
name: "Tombstones only; keep needed duplicate series record",
walEntries: []interface{}{
[]record.RefSeries{
{Ref: 1, Labels: labels.FromStrings("a", "1")},
{Ref: 2, Labels: labels.FromStrings("a", "1")},
},
[]tombstones.Stone{
{Ref: 1, Intervals: []tombstones.Interval{{Mint: 0, Maxt: 100}}},
{Ref: 2, Intervals: []tombstones.Interval{{Mint: 0, Maxt: 200}}},
{Ref: 2, Intervals: []tombstones.Interval{{Mint: 0, Maxt: 500}}},
},
},
expectedWalExpiry: 500,
walTruncateMinT: 500,
expectedWalEntries: []interface{}{
[]record.RefSeries{
{Ref: 1, Labels: labels.FromStrings("a", "1")},
{Ref: 2, Labels: labels.FromStrings("a", "1")},
},
[]tombstones.Stone{
{Ref: 2, Intervals: []tombstones.Interval{{Mint: 0, Maxt: 500}}},
},
},
},
{
name: "Exemplars only; keep needed duplicate series record",
walEntries: []interface{}{
[]record.RefSeries{
{Ref: 1, Labels: labels.FromStrings("a", "1")},
{Ref: 2, Labels: labels.FromStrings("a", "1")},
},
[]record.RefExemplar{
{Ref: 1, T: 100, V: 1, Labels: labels.FromStrings("trace_id", "asdf")},
{Ref: 2, T: 200, V: 2, Labels: labels.FromStrings("trace_id", "asdf")},
{Ref: 2, T: 500, V: 3, Labels: labels.FromStrings("trace_id", "asdf")},
},
},
expectedWalExpiry: 500,
walTruncateMinT: 500,
expectedWalEntries: []interface{}{
[]record.RefSeries{
{Ref: 1, Labels: labels.FromStrings("a", "1")},
{Ref: 2, Labels: labels.FromStrings("a", "1")},
},
[]record.RefExemplar{
{Ref: 2, T: 500, V: 3, Labels: labels.FromStrings("trace_id", "asdf")},
},
},
},
{
name: "Histograms only; keep needed duplicate series record",
walEntries: []interface{}{
[]record.RefSeries{
{Ref: 1, Labels: labels.FromStrings("a", "1")},
{Ref: 2, Labels: labels.FromStrings("a", "1")},
},
[]record.RefHistogramSample{
{Ref: 1, T: 100, H: &histogram.Histogram{}},
{Ref: 2, T: 200, H: &histogram.Histogram{}},
{Ref: 2, T: 500, H: &histogram.Histogram{}},
},
},
expectedWalExpiry: 500,
walTruncateMinT: 500,
expectedWalEntries: []interface{}{
[]record.RefSeries{
{Ref: 1, Labels: labels.FromStrings("a", "1")},
{Ref: 2, Labels: labels.FromStrings("a", "1")},
},
[]record.RefHistogramSample{
{Ref: 2, T: 500, H: &histogram.Histogram{}},
},
},
},
{
name: "Float histograms only; keep needed duplicate series record",
walEntries: []interface{}{
[]record.RefSeries{
{Ref: 1, Labels: labels.FromStrings("a", "1")},
{Ref: 2, Labels: labels.FromStrings("a", "1")},
},
[]record.RefFloatHistogramSample{
{Ref: 1, T: 100, FH: &histogram.FloatHistogram{}},
{Ref: 2, T: 200, FH: &histogram.FloatHistogram{}},
{Ref: 2, T: 500, FH: &histogram.FloatHistogram{}},
},
},
expectedWalExpiry: 500,
walTruncateMinT: 500,
expectedWalEntries: []interface{}{
[]record.RefSeries{
{Ref: 1, Labels: labels.FromStrings("a", "1")},
{Ref: 2, Labels: labels.FromStrings("a", "1")},
},
[]record.RefFloatHistogramSample{
{Ref: 2, T: 500, FH: &histogram.FloatHistogram{}},
},
},
},
{
name: "All record types; keep needed duplicate series record until last record",
// Series with 2 refs and samples for both
walEntries: []interface{}{
[]record.RefSeries{
{Ref: 1, Labels: labels.FromStrings("a", "1")},
{Ref: 2, Labels: labels.FromStrings("a", "1")},
},
[]record.RefSample{
{Ref: 2, T: 500, V: 3},
},
[]tombstones.Stone{
{Ref: 2, Intervals: []tombstones.Interval{{Mint: 0, Maxt: 500}}},
},
[]record.RefExemplar{
{Ref: 2, T: 800, V: 2, Labels: labels.FromStrings("trace_id", "asdf")},
},
[]record.RefHistogramSample{
{Ref: 2, T: 500, H: &histogram.Histogram{}},
},
[]record.RefFloatHistogramSample{
{Ref: 2, T: 500, FH: &histogram.FloatHistogram{}},
},
},
expectedWalExpiry: 800,
walTruncateMinT: 700,
expectedWalEntries: []interface{}{
[]record.RefSeries{
{Ref: 1, Labels: labels.FromStrings("a", "1")},
{Ref: 2, Labels: labels.FromStrings("a", "1")},
},
[]record.RefExemplar{
{Ref: 2, T: 800, V: 2, Labels: labels.FromStrings("trace_id", "asdf")},
},
},
},
{
name: "All record types; drop expired duplicate series record",
// Series with 2 refs and samples for both
walEntries: []interface{}{
[]record.RefSeries{
{Ref: 1, Labels: labels.FromStrings("a", "1")},
{Ref: 2, Labels: labels.FromStrings("a", "1")},
},
[]record.RefSample{
{Ref: 2, T: 500, V: 2},
{Ref: 1, T: 900, V: 3},
},
[]tombstones.Stone{
{Ref: 2, Intervals: []tombstones.Interval{{Mint: 0, Maxt: 750}}},
},
[]record.RefExemplar{
{Ref: 2, T: 800, V: 2, Labels: labels.FromStrings("trace_id", "asdf")},
},
[]record.RefHistogramSample{
{Ref: 2, T: 600, H: &histogram.Histogram{}},
},
[]record.RefFloatHistogramSample{
{Ref: 2, T: 700, FH: &histogram.FloatHistogram{}},
},
},
expectedWalExpiry: 800,
walTruncateMinT: 900,
expectedWalEntries: []interface{}{
[]record.RefSeries{
{Ref: 1, Labels: labels.FromStrings("a", "1")},
},
[]record.RefSample{
{Ref: 1, T: 900, V: 3},
},
},
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
h, w := newTestHead(t, 1000, compression.None, false)
t.Cleanup(func() {
require.NoError(t, h.Close())
})
populateTestWL(t, w, tc.walEntries, nil)
first, _, err := wlog.Segments(w.Dir())
require.NoError(t, err)
require.NoError(t, h.Init(0))
keepUntil, ok := h.getWALExpiry(2)
require.True(t, ok)
require.Equal(t, tc.expectedWalExpiry, keepUntil)
// Each truncation creates a new segment, so attempt truncations until a checkpoint is created
for {
h.lastWALTruncationTime.Store(0) // Reset so that it's always time to truncate the WAL
err := h.truncateWAL(tc.walTruncateMinT)
require.NoError(t, err)
f, _, err := wlog.Segments(w.Dir())
require.NoError(t, err)
if f > first {
break
}
}
// Read test WAL , checkpoint first
checkpointDir, _, err := wlog.LastCheckpoint(w.Dir())
require.NoError(t, err)
cprecs := readTestWAL(t, checkpointDir)
recs := readTestWAL(t, w.Dir())
recs = append(cprecs, recs...)
// Use testutil.RequireEqual which handles labels properly with dedupelabels
testutil.RequireEqual(t, tc.expectedWalEntries, recs)
})
}
}
func TestHead_KeepSeriesInWALCheckpoint(t *testing.T) { func TestHead_KeepSeriesInWALCheckpoint(t *testing.T) {
existingRef := 1 existingRef := 1
existingLbls := labels.FromStrings("foo", "bar") existingLbls := labels.FromStrings("foo", "bar")
deletedKeepUntil := 10 keepUntil := int64(10)
cases := []struct { cases := []struct {
name string name string
prepare func(t *testing.T, h *Head) prepare func(t *testing.T, h *Head)
seriesRef chunks.HeadSeriesRef mint int64
last int expected bool
expected bool
}{ }{
{ {
name: "keep series still in the head", name: "keep series still in the head",
@ -904,26 +1154,22 @@ func TestHead_KeepSeriesInWALCheckpoint(t *testing.T) {
_, _, err := h.getOrCreateWithID(chunks.HeadSeriesRef(existingRef), existingLbls.Hash(), existingLbls, false) _, _, err := h.getOrCreateWithID(chunks.HeadSeriesRef(existingRef), existingLbls.Hash(), existingLbls, false)
require.NoError(t, err) require.NoError(t, err)
}, },
seriesRef: chunks.HeadSeriesRef(existingRef), expected: true,
expected: true,
}, },
{ {
name: "keep deleted series with keepUntil > last", name: "keep series with keepUntil > mint",
prepare: func(_ *testing.T, h *Head) { mint: keepUntil - 1,
h.setWALExpiry(chunks.HeadSeriesRef(existingRef), deletedKeepUntil) expected: true,
},
seriesRef: chunks.HeadSeriesRef(existingRef),
last: deletedKeepUntil - 1,
expected: true,
}, },
{ {
name: "drop deleted series with keepUntil <= last", name: "keep series with keepUntil = mint",
prepare: func(_ *testing.T, h *Head) { mint: keepUntil,
h.setWALExpiry(chunks.HeadSeriesRef(existingRef), deletedKeepUntil) expected: true,
}, },
seriesRef: chunks.HeadSeriesRef(existingRef), {
last: deletedKeepUntil, name: "drop series with keepUntil < mint",
expected: false, mint: keepUntil + 1,
expected: false,
}, },
} }
@ -936,10 +1182,12 @@ func TestHead_KeepSeriesInWALCheckpoint(t *testing.T) {
if tc.prepare != nil { if tc.prepare != nil {
tc.prepare(t, h) tc.prepare(t, h)
} else {
h.updateWALExpiry(chunks.HeadSeriesRef(existingRef), keepUntil)
} }
kept := h.keepSeriesInWALCheckpoint(tc.seriesRef, tc.last) keep := h.keepSeriesInWALCheckpointFn(tc.mint)
require.Equal(t, tc.expected, kept) require.Equal(t, tc.expected, keep(chunks.HeadSeriesRef(existingRef)))
}) })
} }
} }

View File

@ -76,7 +76,7 @@ func counterAddNonZero(v *prometheus.CounterVec, value float64, lvs ...string) {
} }
} }
func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, mmappedChunks, oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk, lastSegment int) (err error) { func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, mmappedChunks, oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk) (err error) {
// Track number of missing series records that were referenced by other records. // Track number of missing series records that were referenced by other records.
unknownSeriesRefs := &seriesRefSet{refs: make(map[chunks.HeadSeriesRef]struct{}), mtx: sync.Mutex{}} unknownSeriesRefs := &seriesRefSet{refs: make(map[chunks.HeadSeriesRef]struct{}), mtx: sync.Mutex{}}
// Track number of different records that referenced a series we don't know about // Track number of different records that referenced a series we don't know about
@ -266,8 +266,6 @@ Outer:
} }
if !created { if !created {
multiRef[walSeries.Ref] = mSeries.ref multiRef[walSeries.Ref] = mSeries.ref
// Set the WAL expiry for the duplicate series, so it is kept in subsequent WAL checkpoints.
h.setWALExpiry(walSeries.Ref, lastSegment)
} }
idx := uint64(mSeries.ref) % uint64(concurrency) idx := uint64(mSeries.ref) % uint64(concurrency)
@ -293,6 +291,8 @@ Outer:
continue // Before minValidTime: discard. continue // Before minValidTime: discard.
} }
if r, ok := multiRef[sam.Ref]; ok { if r, ok := multiRef[sam.Ref]; ok {
// This is a sample for a duplicate series, so we need to keep the series record at least until this record's timestamp.
h.updateWALExpiry(sam.Ref, sam.T)
sam.Ref = r sam.Ref = r
} }
mod := uint64(sam.Ref) % uint64(concurrency) mod := uint64(sam.Ref) % uint64(concurrency)
@ -314,6 +314,8 @@ Outer:
continue continue
} }
if r, ok := multiRef[chunks.HeadSeriesRef(s.Ref)]; ok { if r, ok := multiRef[chunks.HeadSeriesRef(s.Ref)]; ok {
// This is a tombstone for a duplicate series, so we need to keep the series record at least until this record's timestamp.
h.updateWALExpiry(chunks.HeadSeriesRef(s.Ref), itv.Maxt)
s.Ref = storage.SeriesRef(r) s.Ref = storage.SeriesRef(r)
} }
if m := h.series.getByID(chunks.HeadSeriesRef(s.Ref)); m == nil { if m := h.series.getByID(chunks.HeadSeriesRef(s.Ref)); m == nil {
@ -331,6 +333,8 @@ Outer:
continue continue
} }
if r, ok := multiRef[e.Ref]; ok { if r, ok := multiRef[e.Ref]; ok {
// This is an exemplar for a duplicate series, so we need to keep the series record at least until this record's timestamp.
h.updateWALExpiry(e.Ref, e.T)
e.Ref = r e.Ref = r
} }
exemplarsInput <- e exemplarsInput <- e
@ -355,6 +359,8 @@ Outer:
continue // Before minValidTime: discard. continue // Before minValidTime: discard.
} }
if r, ok := multiRef[sam.Ref]; ok { if r, ok := multiRef[sam.Ref]; ok {
// This is a histogram sample for a duplicate series, so we need to keep the series record at least until this record's timestamp.
h.updateWALExpiry(sam.Ref, sam.T)
sam.Ref = r sam.Ref = r
} }
mod := uint64(sam.Ref) % uint64(concurrency) mod := uint64(sam.Ref) % uint64(concurrency)
@ -388,6 +394,8 @@ Outer:
continue // Before minValidTime: discard. continue // Before minValidTime: discard.
} }
if r, ok := multiRef[sam.Ref]; ok { if r, ok := multiRef[sam.Ref]; ok {
// This is a float histogram sample for a duplicate series, so we need to keep the series record at least until this record's timestamp.
h.updateWALExpiry(sam.Ref, sam.T)
sam.Ref = r sam.Ref = r
} }
mod := uint64(sam.Ref) % uint64(concurrency) mod := uint64(sam.Ref) % uint64(concurrency)

View File

@ -93,7 +93,7 @@ const CheckpointPrefix = "checkpoint."
// segmented format as the original WAL itself. // segmented format as the original WAL itself.
// This makes it easy to read it through the WAL package and concatenate // This makes it easy to read it through the WAL package and concatenate
// it with the original WAL. // it with the original WAL.
func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.HeadSeriesRef, last int) bool, mint int64) (*CheckpointStats, error) { func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.HeadSeriesRef) bool, mint int64) (*CheckpointStats, error) {
stats := &CheckpointStats{} stats := &CheckpointStats{}
var sgmReader io.ReadCloser var sgmReader io.ReadCloser
@ -181,7 +181,7 @@ func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.He
// Drop irrelevant series in place. // Drop irrelevant series in place.
repl := series[:0] repl := series[:0]
for _, s := range series { for _, s := range series {
if keep(s.Ref, to) { if keep(s.Ref) {
repl = append(repl, s) repl = append(repl, s)
} }
} }
@ -323,7 +323,7 @@ func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.He
// Only keep reference to the latest found metadata for each refID. // Only keep reference to the latest found metadata for each refID.
repl := 0 repl := 0
for _, m := range metadata { for _, m := range metadata {
if keep(m.Ref, to) { if keep(m.Ref) {
if _, ok := latestMetadataMap[m.Ref]; !ok { if _, ok := latestMetadataMap[m.Ref]; !ok {
repl++ repl++
} }

View File

@ -292,7 +292,7 @@ func TestCheckpoint(t *testing.T) {
} }
require.NoError(t, w.Close()) require.NoError(t, w.Close())
stats, err := Checkpoint(promslog.NewNopLogger(), w, 100, 106, func(x chunks.HeadSeriesRef, _ int) bool { stats, err := Checkpoint(promslog.NewNopLogger(), w, 100, 106, func(x chunks.HeadSeriesRef) bool {
return x%2 == 0 return x%2 == 0
}, last/2) }, last/2)
require.NoError(t, err) require.NoError(t, err)

View File

@ -400,7 +400,7 @@ func TestReadToEndWithCheckpoint(t *testing.T) {
} }
} }
Checkpoint(promslog.NewNopLogger(), w, 0, 1, func(chunks.HeadSeriesRef, int) bool { return true }, 0) Checkpoint(promslog.NewNopLogger(), w, 0, 1, func(chunks.HeadSeriesRef) bool { return true }, 0)
w.Truncate(1) w.Truncate(1)
// Write more records after checkpointing. // Write more records after checkpointing.
@ -492,7 +492,7 @@ func TestReadCheckpoint(t *testing.T) {
} }
_, err = w.NextSegmentSync() _, err = w.NextSegmentSync()
require.NoError(t, err) require.NoError(t, err)
_, err = Checkpoint(promslog.NewNopLogger(), w, 30, 31, func(chunks.HeadSeriesRef, int) bool { return true }, 0) _, err = Checkpoint(promslog.NewNopLogger(), w, 30, 31, func(chunks.HeadSeriesRef) bool { return true }, 0)
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, w.Truncate(32)) require.NoError(t, w.Truncate(32))
@ -655,7 +655,7 @@ func TestCheckpointSeriesReset(t *testing.T) {
return wt.checkNumSeries() == seriesCount return wt.checkNumSeries() == seriesCount
}, 10*time.Second, 1*time.Second) }, 10*time.Second, 1*time.Second)
_, err = Checkpoint(promslog.NewNopLogger(), w, 2, 4, func(chunks.HeadSeriesRef, int) bool { return true }, 0) _, err = Checkpoint(promslog.NewNopLogger(), w, 2, 4, func(chunks.HeadSeriesRef) bool { return true }, 0)
require.NoError(t, err) require.NoError(t, err)
err = w.Truncate(5) err = w.Truncate(5)