fix(tsdb): appender does not honor append order
Fixes: #15177 Instead of 3 separate lists, keep a single list of samples with series reference. To support different data types I did a union type where floats are handled separately from complex types, but complex types are kept in one place. Not optimized, in particular the WAL format is kept and there's a conversion from the union type to separate record types for the WAL/WBL. Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
This commit is contained in:
parent
f62d0e0385
commit
17b70cf123
|
@ -4838,11 +4838,6 @@ func TestMetadataAssertInMemoryData(t *testing.T) {
|
|||
require.Equal(t, *reopenDB.head.series.getByHash(s4.Hash(), s4).meta, m4)
|
||||
}
|
||||
|
||||
// TestMultipleEncodingsCommitOrder mainly serves to demonstrate when happens when committing a batch of samples for the
|
||||
// same series when there are multiple encodings. Commit() will process all float samples before histogram samples. This
|
||||
// means that if histograms are appended before floats, the histograms could be marked as OOO when they are committed.
|
||||
// While possible, this shouldn't happen very often - you need the same series to be ingested as both a float and a
|
||||
// histogram in a single write request.
|
||||
func TestMultipleEncodingsCommitOrder(t *testing.T) {
|
||||
opts := DefaultOptions()
|
||||
opts.OutOfOrderCapMax = 30
|
||||
|
@ -4916,26 +4911,18 @@ func TestMultipleEncodingsCommitOrder(t *testing.T) {
|
|||
s := addSample(app, int64(i), chunkenc.ValFloat)
|
||||
expSamples = append(expSamples, s)
|
||||
}
|
||||
// These samples will be marked as OOO as their timestamps are less than the max timestamp for float samples in the
|
||||
// same batch.
|
||||
for i := 110; i < 120; i++ {
|
||||
s := addSample(app, int64(i), chunkenc.ValHistogram)
|
||||
expSamples = append(expSamples, s)
|
||||
}
|
||||
// These samples will be marked as OOO as their timestamps are less than the max timestamp for float samples in the
|
||||
// same batch.
|
||||
for i := 120; i < 130; i++ {
|
||||
s := addSample(app, int64(i), chunkenc.ValFloatHistogram)
|
||||
expSamples = append(expSamples, s)
|
||||
}
|
||||
// These samples will be marked as in-order as their timestamps are greater than the max timestamp for float
|
||||
// samples in the same batch.
|
||||
for i := 140; i < 150; i++ {
|
||||
s := addSample(app, int64(i), chunkenc.ValFloatHistogram)
|
||||
expSamples = append(expSamples, s)
|
||||
}
|
||||
// These samples will be marked as in-order, even though they're appended after the float histograms from ts 140-150
|
||||
// because float samples are processed first and these samples are in-order wrt to the float samples in the batch.
|
||||
for i := 130; i < 135; i++ {
|
||||
s := addSample(app, int64(i), chunkenc.ValFloat)
|
||||
expSamples = append(expSamples, s)
|
||||
|
@ -4947,8 +4934,7 @@ func TestMultipleEncodingsCommitOrder(t *testing.T) {
|
|||
return expSamples[i].T() < expSamples[j].T()
|
||||
})
|
||||
|
||||
// oooCount = 20 because the histograms from 120 - 130 and float histograms from 120 - 130 are detected as OOO.
|
||||
verifySamples(100, 150, expSamples, 20)
|
||||
verifySamples(100, 150, expSamples, 5)
|
||||
|
||||
// Append and commit some in-order histograms by themselves.
|
||||
app = db.Appender(context.Background())
|
||||
|
@ -4958,8 +4944,8 @@ func TestMultipleEncodingsCommitOrder(t *testing.T) {
|
|||
}
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
// oooCount remains at 20 as no new OOO samples have been added.
|
||||
verifySamples(100, 160, expSamples, 20)
|
||||
// oooCount remains at 5 as no new OOO samples have been added.
|
||||
verifySamples(100, 160, expSamples, 5)
|
||||
|
||||
// Append and commit samples for all encoding types. This time all samples will be treated as OOO because samples
|
||||
// with newer timestamps have already been committed.
|
||||
|
@ -4987,8 +4973,8 @@ func TestMultipleEncodingsCommitOrder(t *testing.T) {
|
|||
return expSamples[i].T() < expSamples[j].T()
|
||||
})
|
||||
|
||||
// oooCount = 50 as we've added 30 more OOO samples.
|
||||
verifySamples(50, 160, expSamples, 50)
|
||||
// oooCount = 35 as we've added 30 more OOO samples.
|
||||
verifySamples(50, 160, expSamples, 35)
|
||||
}
|
||||
|
||||
// TODO(codesome): test more samples incoming once compaction has started. To verify new samples after the start
|
||||
|
|
26
tsdb/head.go
26
tsdb/head.go
|
@ -78,20 +78,18 @@ type Head struct {
|
|||
// This should be typecasted to chunks.ChunkDiskMapperRef after loading.
|
||||
minOOOMmapRef atomic.Uint64
|
||||
|
||||
metrics *headMetrics
|
||||
opts *HeadOptions
|
||||
wal, wbl *wlog.WL
|
||||
exemplarMetrics *ExemplarMetrics
|
||||
exemplars ExemplarStorage
|
||||
logger *slog.Logger
|
||||
appendPool zeropool.Pool[[]record.RefSample]
|
||||
exemplarsPool zeropool.Pool[[]exemplarWithSeriesRef]
|
||||
histogramsPool zeropool.Pool[[]record.RefHistogramSample]
|
||||
floatHistogramsPool zeropool.Pool[[]record.RefFloatHistogramSample]
|
||||
metadataPool zeropool.Pool[[]record.RefMetadata]
|
||||
seriesPool zeropool.Pool[[]*memSeries]
|
||||
bytesPool zeropool.Pool[[]byte]
|
||||
memChunkPool sync.Pool
|
||||
metrics *headMetrics
|
||||
opts *HeadOptions
|
||||
wal, wbl *wlog.WL
|
||||
exemplarMetrics *ExemplarMetrics
|
||||
exemplars ExemplarStorage
|
||||
logger *slog.Logger
|
||||
appendPool zeropool.Pool[[]refUnionSample]
|
||||
exemplarsPool zeropool.Pool[[]exemplarWithSeriesRef]
|
||||
metadataPool zeropool.Pool[[]record.RefMetadata]
|
||||
seriesPool zeropool.Pool[[]*memSeries]
|
||||
bytesPool zeropool.Pool[[]byte]
|
||||
memChunkPool sync.Pool
|
||||
|
||||
// These pools are only used during WAL/WBL replay and are reset at the end.
|
||||
// NOTE: Adjust resetWLReplayResources() upon changes to the pools.
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -5083,8 +5083,6 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) {
|
|||
samples []chunks.Sample
|
||||
expChunks int
|
||||
err error
|
||||
// If this is empty, samples above will be taken instead of this.
|
||||
addToExp []chunks.Sample
|
||||
}{
|
||||
// Histograms that end up in the expected samples are copied here so that we
|
||||
// can independently set the CounterResetHint later.
|
||||
|
@ -5125,42 +5123,30 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) {
|
|||
err: storage.ErrOutOfOrderSample,
|
||||
},
|
||||
{
|
||||
// Combination of histograms and float64 in the same commit. The behaviour is undefined, but we want to also
|
||||
// verify how TSDB would behave. Here the histogram is appended at the end, hence will be considered as out of order.
|
||||
// Combination of histograms and float64 in the same commit.
|
||||
samples: []chunks.Sample{
|
||||
sample{t: 400, f: 4},
|
||||
sample{t: 500, h: hists[5]}, // This won't be committed.
|
||||
sample{t: 500, h: hists[5]},
|
||||
sample{t: 600, f: 6},
|
||||
},
|
||||
addToExp: []chunks.Sample{
|
||||
sample{t: 400, f: 4},
|
||||
sample{t: 600, f: 6},
|
||||
},
|
||||
expChunks: 7, // Only 1 new chunk for float64.
|
||||
expChunks: 9,
|
||||
},
|
||||
{
|
||||
// Here the histogram is appended at the end, hence the first histogram is out of order.
|
||||
// Combination of histograms and float64 in the same commit.
|
||||
samples: []chunks.Sample{
|
||||
sample{t: 700, h: hists[7]}, // Out of order w.r.t. the next float64 sample that is appended first.
|
||||
sample{t: 700, h: hists[7]},
|
||||
sample{t: 800, f: 8},
|
||||
sample{t: 900, h: hists[9]},
|
||||
},
|
||||
addToExp: []chunks.Sample{
|
||||
sample{t: 800, f: 8},
|
||||
sample{t: 900, h: hists[9].Copy()},
|
||||
},
|
||||
expChunks: 8, // float64 added to old chunk, only 1 new for histograms.
|
||||
expChunks: 12,
|
||||
},
|
||||
{
|
||||
// Float histogram is appended at the end.
|
||||
// Combination of different histograms in the same commit.
|
||||
samples: []chunks.Sample{
|
||||
sample{t: 1000, fh: floatHists[7]}, // Out of order w.r.t. the next histogram.
|
||||
sample{t: 1000, fh: floatHists[7]},
|
||||
sample{t: 1100, h: hists[9]},
|
||||
},
|
||||
addToExp: []chunks.Sample{
|
||||
sample{t: 1100, h: hists[9].Copy()},
|
||||
},
|
||||
expChunks: 8,
|
||||
expChunks: 14,
|
||||
},
|
||||
}
|
||||
|
||||
|
@ -5178,11 +5164,7 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) {
|
|||
|
||||
if a.err == nil {
|
||||
require.NoError(t, app.Commit())
|
||||
if len(a.addToExp) > 0 {
|
||||
expResult = append(expResult, a.addToExp...)
|
||||
} else {
|
||||
expResult = append(expResult, a.samples...)
|
||||
}
|
||||
expResult = append(expResult, a.samples...)
|
||||
checkExpChunks(a.expChunks)
|
||||
} else {
|
||||
require.NoError(t, app.Rollback())
|
||||
|
|
Loading…
Reference in New Issue