tsdb: Fix commit order for mixed-typed series
Fixes https://github.com/prometheus/prometheus/issues/15177 The basic idea here is to divide the samples to be commited into (sub) batches whenever we detect that the same series receives a sample of a type different from the previous one. We then commit those batches one after another, and we log them to the WAL one after another, so that we hit both birds with the same stone. The cost of the stone is that we have to track the sample type of each series in a map. Given the amount of things we already track in the appender, I hope that it won't make a dent. Note that this even addresses the NHCB special case in the WAL. This does a few other things that I could not resist to pick up on the go: - It adds more zeropool.Pools and uses the existing ones more consistently. My understanding is that this was merely an oversight. Maybe the additional pool usage will compensate for the increased memory demand of the map. - Create the synthetic zero sample for histograms a bit more carefully. So far, we created a sample that always went into its own chunk. Now we create a sample that is compatible enough with the following sample to go into the same chunk. This changed the test results quite a bit. But IMHO it makes much more sense now. - Continuing past efforts, I changed more namings of `Samples` into `Floats` to keep things consistent and less confusing. (Histogram samples are also samples.) I still avoided changing names in other packages. - I added a few shortcuts `h := a.head`, saving many characters. TODOs: - Address @krajorama's TODOs about commit order and staleness handling. Signed-off-by: beorn7 <beorn@grafana.com>
This commit is contained in:
parent
46cfc9fb99
commit
7e82bdb75b
|
@ -4924,10 +4924,7 @@ func TestMetadataAssertInMemoryData(t *testing.T) {
|
|||
}
|
||||
|
||||
// TestMultipleEncodingsCommitOrder mainly serves to demonstrate when happens when committing a batch of samples for the
|
||||
// same series when there are multiple encodings. Commit() will process all float samples before histogram samples. This
|
||||
// means that if histograms are appended before floats, the histograms could be marked as OOO when they are committed.
|
||||
// While possible, this shouldn't happen very often - you need the same series to be ingested as both a float and a
|
||||
// histogram in a single write request.
|
||||
// same series when there are multiple encodings. With issue #15177 fixed, this now all works as expected.
|
||||
func TestMultipleEncodingsCommitOrder(t *testing.T) {
|
||||
opts := DefaultOptions()
|
||||
opts.OutOfOrderCapMax = 30
|
||||
|
@ -5001,26 +4998,19 @@ func TestMultipleEncodingsCommitOrder(t *testing.T) {
|
|||
s := addSample(app, int64(i), chunkenc.ValFloat)
|
||||
expSamples = append(expSamples, s)
|
||||
}
|
||||
// These samples will be marked as OOO as their timestamps are less than the max timestamp for float samples in the
|
||||
// same batch.
|
||||
for i := 110; i < 120; i++ {
|
||||
s := addSample(app, int64(i), chunkenc.ValHistogram)
|
||||
expSamples = append(expSamples, s)
|
||||
}
|
||||
// These samples will be marked as OOO as their timestamps are less than the max timestamp for float samples in the
|
||||
// same batch.
|
||||
for i := 120; i < 130; i++ {
|
||||
s := addSample(app, int64(i), chunkenc.ValFloatHistogram)
|
||||
expSamples = append(expSamples, s)
|
||||
}
|
||||
// These samples will be marked as in-order as their timestamps are greater than the max timestamp for float
|
||||
// samples in the same batch.
|
||||
for i := 140; i < 150; i++ {
|
||||
s := addSample(app, int64(i), chunkenc.ValFloatHistogram)
|
||||
expSamples = append(expSamples, s)
|
||||
}
|
||||
// These samples will be marked as in-order, even though they're appended after the float histograms from ts 140-150
|
||||
// because float samples are processed first and these samples are in-order wrt to the float samples in the batch.
|
||||
// These samples will be marked as out-of-order.
|
||||
for i := 130; i < 135; i++ {
|
||||
s := addSample(app, int64(i), chunkenc.ValFloat)
|
||||
expSamples = append(expSamples, s)
|
||||
|
@ -5032,8 +5022,8 @@ func TestMultipleEncodingsCommitOrder(t *testing.T) {
|
|||
return expSamples[i].T() < expSamples[j].T()
|
||||
})
|
||||
|
||||
// oooCount = 20 because the histograms from 120 - 130 and float histograms from 120 - 130 are detected as OOO.
|
||||
verifySamples(100, 150, expSamples, 20)
|
||||
// oooCount = 5 for the samples 130 to 134.
|
||||
verifySamples(100, 150, expSamples, 5)
|
||||
|
||||
// Append and commit some in-order histograms by themselves.
|
||||
app = db.Appender(context.Background())
|
||||
|
@ -5043,8 +5033,8 @@ func TestMultipleEncodingsCommitOrder(t *testing.T) {
|
|||
}
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
// oooCount remains at 20 as no new OOO samples have been added.
|
||||
verifySamples(100, 160, expSamples, 20)
|
||||
// oooCount remains at 5.
|
||||
verifySamples(100, 160, expSamples, 5)
|
||||
|
||||
// Append and commit samples for all encoding types. This time all samples will be treated as OOO because samples
|
||||
// with newer timestamps have already been committed.
|
||||
|
@ -5072,8 +5062,8 @@ func TestMultipleEncodingsCommitOrder(t *testing.T) {
|
|||
return expSamples[i].T() < expSamples[j].T()
|
||||
})
|
||||
|
||||
// oooCount = 50 as we've added 30 more OOO samples.
|
||||
verifySamples(50, 160, expSamples, 50)
|
||||
// oooCount = 35 as we've added 30 more OOO samples.
|
||||
verifySamples(50, 160, expSamples, 35)
|
||||
}
|
||||
|
||||
// TODO(codesome): test more samples incoming once compaction has started. To verify new samples after the start
|
||||
|
|
|
@ -86,7 +86,8 @@ type Head struct {
|
|||
exemplarMetrics *ExemplarMetrics
|
||||
exemplars ExemplarStorage
|
||||
logger *slog.Logger
|
||||
appendPool zeropool.Pool[[]record.RefSample]
|
||||
refSeriesPool zeropool.Pool[[]record.RefSeries]
|
||||
floatsPool zeropool.Pool[[]record.RefSample]
|
||||
exemplarsPool zeropool.Pool[[]exemplarWithSeriesRef]
|
||||
histogramsPool zeropool.Pool[[]record.RefHistogramSample]
|
||||
floatHistogramsPool zeropool.Pool[[]record.RefFloatHistogramSample]
|
||||
|
|
|
@ -164,13 +164,6 @@ func (h *Head) Appender(context.Context) storage.Appender {
|
|||
func (h *Head) appender() *headAppender {
|
||||
minValidTime := h.appendableMinValidTime()
|
||||
appendID, cleanupAppendIDsBelow := h.iso.newAppendID(minValidTime) // Every appender gets an ID that is cleared upon commit/rollback.
|
||||
|
||||
// Allocate the exemplars buffer only if exemplars are enabled.
|
||||
var exemplarsBuf []exemplarWithSeriesRef
|
||||
if h.opts.EnableExemplarStorage {
|
||||
exemplarsBuf = h.getExemplarBuffer()
|
||||
}
|
||||
|
||||
return &headAppender{
|
||||
head: h,
|
||||
minValidTime: minValidTime,
|
||||
|
@ -178,12 +171,9 @@ func (h *Head) appender() *headAppender {
|
|||
maxt: math.MinInt64,
|
||||
headMaxt: h.MaxTime(),
|
||||
oooTimeWindow: h.opts.OutOfOrderTimeWindow.Load(),
|
||||
samples: h.getAppendBuffer(),
|
||||
sampleSeries: h.getSeriesBuffer(),
|
||||
exemplars: exemplarsBuf,
|
||||
histograms: h.getHistogramBuffer(),
|
||||
floatHistograms: h.getFloatHistogramBuffer(),
|
||||
metadata: h.getMetadataBuffer(),
|
||||
seriesRefs: h.getRefSeriesBuffer(),
|
||||
series: h.getSeriesBuffer(),
|
||||
typesInBatch: map[chunks.HeadSeriesRef]sampleType{},
|
||||
appendID: appendID,
|
||||
cleanupAppendIDsBelow: cleanupAppendIDsBelow,
|
||||
}
|
||||
|
@ -213,16 +203,28 @@ func (h *Head) AppendableMinValidTime() (int64, bool) {
|
|||
return h.appendableMinValidTime(), true
|
||||
}
|
||||
|
||||
func (h *Head) getAppendBuffer() []record.RefSample {
|
||||
b := h.appendPool.Get()
|
||||
func (h *Head) getRefSeriesBuffer() []record.RefSeries {
|
||||
b := h.refSeriesPool.Get()
|
||||
if b == nil {
|
||||
return make([]record.RefSeries, 0, 512)
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
||||
func (h *Head) putRefSeriesBuffer(b []record.RefSeries) {
|
||||
h.refSeriesPool.Put(b[:0])
|
||||
}
|
||||
|
||||
func (h *Head) getFloatBuffer() []record.RefSample {
|
||||
b := h.floatsPool.Get()
|
||||
if b == nil {
|
||||
return make([]record.RefSample, 0, 512)
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
||||
func (h *Head) putAppendBuffer(b []record.RefSample) {
|
||||
h.appendPool.Put(b[:0])
|
||||
func (h *Head) putFloatBuffer(b []record.RefSample) {
|
||||
h.floatsPool.Put(b[:0])
|
||||
}
|
||||
|
||||
func (h *Head) getExemplarBuffer() []exemplarWithSeriesRef {
|
||||
|
@ -312,17 +314,30 @@ type exemplarWithSeriesRef struct {
|
|||
exemplar exemplar.Exemplar
|
||||
}
|
||||
|
||||
type headAppender struct {
|
||||
head *Head
|
||||
minValidTime int64 // No samples below this timestamp are allowed.
|
||||
mint, maxt int64
|
||||
headMaxt int64 // We track it here to not take the lock for every sample appended.
|
||||
oooTimeWindow int64 // Use the same for the entire append, and don't load the atomic for each sample.
|
||||
// sampleType describes sample types we need to distinguish for append batching.
|
||||
// We need separate types for everything that goes into a different WAL record
|
||||
// type or into a different chunk encoding.
|
||||
type sampleType byte
|
||||
|
||||
seriesRefs []record.RefSeries // New series records held by this appender.
|
||||
series []*memSeries // New series held by this appender (using corresponding slices indexes from seriesRefs)
|
||||
samples []record.RefSample // New float samples held by this appender.
|
||||
sampleSeries []*memSeries // Float series corresponding to the samples held by this appender (using corresponding slice indices - same series may appear more than once).
|
||||
const (
|
||||
stNone sampleType = iota // To mark that the sample type does not matter.
|
||||
stFloat // All simple floats (counters, gauges, untyped). Goes to `floats`.
|
||||
stHistogram // Native integer histograms with a standard exponential schema. Goes to `histograms`.
|
||||
stCustomBucketHistogram // Native integer histograms with custom bucket boundaries. Goes to `histograms`.
|
||||
stFloatHistogram // Native float histograms. Goes to `floatHistograms`.
|
||||
stCustomBucketFloatHistogram // Native float histograms with custom bucket boundaries. Goes to `floatHistograms`.
|
||||
)
|
||||
|
||||
// appendBatch is used to partition all the appended data into batches that are
|
||||
// "type clean", i.e. every series receives only samples of one type within the
|
||||
// batch. Types in this regard are defined by the sampleType enum above.
|
||||
// TODO(beorn7): The same concept could be extended to make sure every series in
|
||||
// the batch has at most one metadata record. This is currently not implemented
|
||||
// because it is unclear if it is needed at all. (Maybe we will remove metadata
|
||||
// records altogether, see issue #15911.)
|
||||
type appendBatch struct {
|
||||
floats []record.RefSample // New float samples held by this appender.
|
||||
floatSeries []*memSeries // Float series corresponding to the samples held by this appender (using corresponding slice indices - same series may appear more than once).
|
||||
histograms []record.RefHistogramSample // New histogram samples held by this appender.
|
||||
histogramSeries []*memSeries // HistogramSamples series corresponding to the samples held by this appender (using corresponding slice indices - same series may appear more than once).
|
||||
floatHistograms []record.RefFloatHistogramSample // New float histogram samples held by this appender.
|
||||
|
@ -330,6 +345,42 @@ type headAppender struct {
|
|||
metadata []record.RefMetadata // New metadata held by this appender.
|
||||
metadataSeries []*memSeries // Series corresponding to the metadata held by this appender.
|
||||
exemplars []exemplarWithSeriesRef // New exemplars held by this appender.
|
||||
}
|
||||
|
||||
// close returns all the slices to the pools in Head and nil's them.
|
||||
func (b *appendBatch) close(h *Head) {
|
||||
h.putFloatBuffer(b.floats)
|
||||
b.floats = nil
|
||||
h.putSeriesBuffer(b.floatSeries)
|
||||
b.floatSeries = nil
|
||||
h.putHistogramBuffer(b.histograms)
|
||||
b.histograms = nil
|
||||
h.putSeriesBuffer(b.histogramSeries)
|
||||
b.histogramSeries = nil
|
||||
h.putFloatHistogramBuffer(b.floatHistograms)
|
||||
b.floatHistograms = nil
|
||||
h.putSeriesBuffer(b.floatHistogramSeries)
|
||||
b.floatHistogramSeries = nil
|
||||
h.putMetadataBuffer(b.metadata)
|
||||
b.metadata = nil
|
||||
h.putSeriesBuffer(b.metadataSeries)
|
||||
b.metadataSeries = nil
|
||||
h.putExemplarBuffer(b.exemplars)
|
||||
b.exemplars = nil
|
||||
}
|
||||
|
||||
type headAppender struct {
|
||||
head *Head
|
||||
minValidTime int64 // No samples below this timestamp are allowed.
|
||||
mint, maxt int64
|
||||
headMaxt int64 // We track it here to not take the lock for every sample appended.
|
||||
oooTimeWindow int64 // Use the same for the entire append, and don't load the atomic for each sample.
|
||||
|
||||
seriesRefs []record.RefSeries // New series records held by this appender.
|
||||
series []*memSeries // New series held by this appender (using corresponding slices indexes from seriesRefs)
|
||||
batches []*appendBatch // Holds all the other data to append. (In regular cases, there should be only one of these.)
|
||||
|
||||
typesInBatch map[chunks.HeadSeriesRef]sampleType // Which (one) sample type each series holds in the most recent batch.
|
||||
|
||||
appendID, cleanupAppendIDsBelow uint64
|
||||
closed bool
|
||||
|
@ -403,12 +454,13 @@ func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64
|
|||
a.maxt = t
|
||||
}
|
||||
|
||||
a.samples = append(a.samples, record.RefSample{
|
||||
b := a.getCurrentBatch(stFloat, s.ref)
|
||||
b.floats = append(b.floats, record.RefSample{
|
||||
Ref: s.ref,
|
||||
T: t,
|
||||
V: v,
|
||||
})
|
||||
a.sampleSeries = append(a.sampleSeries, s)
|
||||
b.floatSeries = append(b.floatSeries, s)
|
||||
return storage.SeriesRef(s.ref), nil
|
||||
}
|
||||
|
||||
|
@ -448,8 +500,9 @@ func (a *headAppender) AppendCTZeroSample(ref storage.SeriesRef, lset labels.Lab
|
|||
if ct > a.maxt {
|
||||
a.maxt = ct
|
||||
}
|
||||
a.samples = append(a.samples, record.RefSample{Ref: s.ref, T: ct, V: 0.0})
|
||||
a.sampleSeries = append(a.sampleSeries, s)
|
||||
b := a.getCurrentBatch(stFloat, s.ref)
|
||||
b.floats = append(b.floats, record.RefSample{Ref: s.ref, T: ct, V: 0.0})
|
||||
b.floatSeries = append(b.floatSeries, s)
|
||||
return storage.SeriesRef(s.ref), nil
|
||||
}
|
||||
|
||||
|
@ -476,6 +529,65 @@ func (a *headAppender) getOrCreate(lset labels.Labels) (s *memSeries, created bo
|
|||
return s, created, nil
|
||||
}
|
||||
|
||||
// getCurrentBatch returns the current batch if it fits the provided sampleType
|
||||
// for the provided series. Otherwise, it adds a new batch and returns it.
|
||||
func (a *headAppender) getCurrentBatch(st sampleType, s chunks.HeadSeriesRef) *appendBatch {
|
||||
h := a.head
|
||||
|
||||
newBatch := func() *appendBatch {
|
||||
b := appendBatch{
|
||||
floats: h.getFloatBuffer(),
|
||||
floatSeries: h.getSeriesBuffer(),
|
||||
histograms: h.getHistogramBuffer(),
|
||||
histogramSeries: h.getSeriesBuffer(),
|
||||
floatHistograms: h.getFloatHistogramBuffer(),
|
||||
floatHistogramSeries: h.getSeriesBuffer(),
|
||||
metadata: h.getMetadataBuffer(),
|
||||
metadataSeries: h.getSeriesBuffer(),
|
||||
}
|
||||
|
||||
// Allocate the exemplars buffer only if exemplars are enabled.
|
||||
if h.opts.EnableExemplarStorage {
|
||||
b.exemplars = h.getExemplarBuffer()
|
||||
}
|
||||
clear(a.typesInBatch)
|
||||
if st != stNone {
|
||||
a.typesInBatch[s] = st
|
||||
}
|
||||
a.batches = append(a.batches, &b)
|
||||
return &b
|
||||
}
|
||||
|
||||
// First batch ever. Create it.
|
||||
if len(a.batches) == 0 {
|
||||
return newBatch()
|
||||
}
|
||||
|
||||
// TODO(beorn7): If we ever see that the a.typesInBatch map grows so
|
||||
// large that it matters for total memory consumption, we could limit
|
||||
// the batch size here, i.e. cut a new batch even without a type change.
|
||||
// Something like:
|
||||
// if len(a.typesInBatch > limit) {
|
||||
// return newBatch()
|
||||
// }
|
||||
|
||||
lastBatch := a.batches[len(a.batches)-1]
|
||||
if st == stNone {
|
||||
// Type doesn't matter, last batch will always do.
|
||||
return lastBatch
|
||||
}
|
||||
prevST, ok := a.typesInBatch[s]
|
||||
switch {
|
||||
case !ok: // New series. Add it to map and return current batch.
|
||||
a.typesInBatch[s] = st
|
||||
return lastBatch
|
||||
case prevST == st: // Old series, same type. Just return batch.
|
||||
return lastBatch
|
||||
}
|
||||
// An old series got a new type. Start new batch.
|
||||
return newBatch()
|
||||
}
|
||||
|
||||
// appendable checks whether the given sample is valid for appending to the series.
|
||||
// If the sample is valid and in-order, it returns false with no error.
|
||||
// If the sample belongs to the out-of-order chunk, it returns true with no error.
|
||||
|
@ -638,7 +750,8 @@ func (a *headAppender) AppendExemplar(ref storage.SeriesRef, lset labels.Labels,
|
|||
return 0, err
|
||||
}
|
||||
|
||||
a.exemplars = append(a.exemplars, exemplarWithSeriesRef{ref, e})
|
||||
b := a.getCurrentBatch(stNone, chunks.HeadSeriesRef(ref))
|
||||
b.exemplars = append(b.exemplars, exemplarWithSeriesRef{ref, e})
|
||||
|
||||
return storage.SeriesRef(s.ref), nil
|
||||
}
|
||||
|
@ -707,12 +820,17 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels
|
|||
}
|
||||
return 0, err
|
||||
}
|
||||
a.histograms = append(a.histograms, record.RefHistogramSample{
|
||||
st := stHistogram
|
||||
if h.UsesCustomBuckets() {
|
||||
st = stCustomBucketHistogram
|
||||
}
|
||||
b := a.getCurrentBatch(st, s.ref)
|
||||
b.histograms = append(b.histograms, record.RefHistogramSample{
|
||||
Ref: s.ref,
|
||||
T: t,
|
||||
H: h,
|
||||
})
|
||||
a.histogramSeries = append(a.histogramSeries, s)
|
||||
b.histogramSeries = append(b.histogramSeries, s)
|
||||
case fh != nil:
|
||||
s.Lock()
|
||||
|
||||
|
@ -742,12 +860,17 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels
|
|||
}
|
||||
return 0, err
|
||||
}
|
||||
a.floatHistograms = append(a.floatHistograms, record.RefFloatHistogramSample{
|
||||
st := stFloatHistogram
|
||||
if fh.UsesCustomBuckets() {
|
||||
st = stCustomBucketFloatHistogram
|
||||
}
|
||||
b := a.getCurrentBatch(st, s.ref)
|
||||
b.floatHistograms = append(b.floatHistograms, record.RefFloatHistogramSample{
|
||||
Ref: s.ref,
|
||||
T: t,
|
||||
FH: fh,
|
||||
})
|
||||
a.floatHistogramSeries = append(a.floatHistogramSeries, s)
|
||||
b.floatHistogramSeries = append(b.floatHistogramSeries, s)
|
||||
}
|
||||
|
||||
if t < a.mint {
|
||||
|
@ -784,6 +907,10 @@ func (a *headAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, lset l
|
|||
zeroHistogram := &histogram.Histogram{
|
||||
// The CTZeroSample represents a counter reset by definition.
|
||||
CounterResetHint: histogram.CounterReset,
|
||||
// Replicate other fields to avoid needless chunk creation.
|
||||
Schema: h.Schema,
|
||||
ZeroThreshold: h.ZeroThreshold,
|
||||
CustomValues: h.CustomValues,
|
||||
}
|
||||
s.Lock()
|
||||
|
||||
|
@ -815,16 +942,25 @@ func (a *headAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, lset l
|
|||
|
||||
s.pendingCommit = true
|
||||
s.Unlock()
|
||||
a.histograms = append(a.histograms, record.RefHistogramSample{
|
||||
st := stHistogram
|
||||
if h.UsesCustomBuckets() {
|
||||
st = stCustomBucketHistogram
|
||||
}
|
||||
b := a.getCurrentBatch(st, s.ref)
|
||||
b.histograms = append(b.histograms, record.RefHistogramSample{
|
||||
Ref: s.ref,
|
||||
T: ct,
|
||||
H: zeroHistogram,
|
||||
})
|
||||
a.histogramSeries = append(a.histogramSeries, s)
|
||||
b.histogramSeries = append(b.histogramSeries, s)
|
||||
case fh != nil:
|
||||
zeroFloatHistogram := &histogram.FloatHistogram{
|
||||
// The CTZeroSample represents a counter reset by definition.
|
||||
CounterResetHint: histogram.CounterReset,
|
||||
// Replicate other fields to avoid needless chunk creation.
|
||||
Schema: fh.Schema,
|
||||
ZeroThreshold: fh.ZeroThreshold,
|
||||
CustomValues: fh.CustomValues,
|
||||
}
|
||||
s.Lock()
|
||||
|
||||
|
@ -855,12 +991,17 @@ func (a *headAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, lset l
|
|||
|
||||
s.pendingCommit = true
|
||||
s.Unlock()
|
||||
a.floatHistograms = append(a.floatHistograms, record.RefFloatHistogramSample{
|
||||
st := stFloatHistogram
|
||||
if fh.UsesCustomBuckets() {
|
||||
st = stCustomBucketFloatHistogram
|
||||
}
|
||||
b := a.getCurrentBatch(st, s.ref)
|
||||
b.floatHistograms = append(b.floatHistograms, record.RefFloatHistogramSample{
|
||||
Ref: s.ref,
|
||||
T: ct,
|
||||
FH: zeroFloatHistogram,
|
||||
})
|
||||
a.floatHistogramSeries = append(a.floatHistogramSeries, s)
|
||||
b.floatHistogramSeries = append(b.floatHistogramSeries, s)
|
||||
}
|
||||
|
||||
if ct > a.maxt {
|
||||
|
@ -889,13 +1030,14 @@ func (a *headAppender) UpdateMetadata(ref storage.SeriesRef, lset labels.Labels,
|
|||
s.Unlock()
|
||||
|
||||
if hasNewMetadata {
|
||||
a.metadata = append(a.metadata, record.RefMetadata{
|
||||
b := a.getCurrentBatch(stNone, s.ref)
|
||||
b.metadata = append(b.metadata, record.RefMetadata{
|
||||
Ref: s.ref,
|
||||
Type: record.GetMetricType(meta.Type),
|
||||
Unit: meta.Unit,
|
||||
Help: meta.Help,
|
||||
})
|
||||
a.metadataSeries = append(a.metadataSeries, s)
|
||||
b.metadataSeries = append(b.metadataSeries, s)
|
||||
}
|
||||
|
||||
return ref, nil
|
||||
|
@ -932,66 +1074,68 @@ func (a *headAppender) log() error {
|
|||
return fmt.Errorf("log series: %w", err)
|
||||
}
|
||||
}
|
||||
if len(a.metadata) > 0 {
|
||||
rec = enc.Metadata(a.metadata, buf)
|
||||
buf = rec[:0]
|
||||
for _, b := range a.batches {
|
||||
if len(b.metadata) > 0 {
|
||||
rec = enc.Metadata(b.metadata, buf)
|
||||
buf = rec[:0]
|
||||
|
||||
if err := a.head.wal.Log(rec); err != nil {
|
||||
return fmt.Errorf("log metadata: %w", err)
|
||||
}
|
||||
}
|
||||
if len(a.samples) > 0 {
|
||||
rec = enc.Samples(a.samples, buf)
|
||||
buf = rec[:0]
|
||||
|
||||
if err := a.head.wal.Log(rec); err != nil {
|
||||
return fmt.Errorf("log samples: %w", err)
|
||||
}
|
||||
}
|
||||
if len(a.histograms) > 0 {
|
||||
var customBucketsHistograms []record.RefHistogramSample
|
||||
rec, customBucketsHistograms = enc.HistogramSamples(a.histograms, buf)
|
||||
buf = rec[:0]
|
||||
if len(rec) > 0 {
|
||||
if err := a.head.wal.Log(rec); err != nil {
|
||||
return fmt.Errorf("log histograms: %w", err)
|
||||
return fmt.Errorf("log metadata: %w", err)
|
||||
}
|
||||
}
|
||||
if len(b.floats) > 0 {
|
||||
rec = enc.Samples(b.floats, buf)
|
||||
buf = rec[:0]
|
||||
|
||||
if len(customBucketsHistograms) > 0 {
|
||||
rec = enc.CustomBucketsHistogramSamples(customBucketsHistograms, buf)
|
||||
if err := a.head.wal.Log(rec); err != nil {
|
||||
return fmt.Errorf("log custom buckets histograms: %w", err)
|
||||
return fmt.Errorf("log samples: %w", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(a.floatHistograms) > 0 {
|
||||
var customBucketsFloatHistograms []record.RefFloatHistogramSample
|
||||
rec, customBucketsFloatHistograms = enc.FloatHistogramSamples(a.floatHistograms, buf)
|
||||
buf = rec[:0]
|
||||
if len(rec) > 0 {
|
||||
if err := a.head.wal.Log(rec); err != nil {
|
||||
return fmt.Errorf("log float histograms: %w", err)
|
||||
if len(b.histograms) > 0 {
|
||||
var customBucketsHistograms []record.RefHistogramSample
|
||||
rec, customBucketsHistograms = enc.HistogramSamples(b.histograms, buf)
|
||||
buf = rec[:0]
|
||||
if len(rec) > 0 {
|
||||
if err := a.head.wal.Log(rec); err != nil {
|
||||
return fmt.Errorf("log histograms: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
if len(customBucketsHistograms) > 0 {
|
||||
rec = enc.CustomBucketsHistogramSamples(customBucketsHistograms, buf)
|
||||
if err := a.head.wal.Log(rec); err != nil {
|
||||
return fmt.Errorf("log custom buckets histograms: %w", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(b.floatHistograms) > 0 {
|
||||
var customBucketsFloatHistograms []record.RefFloatHistogramSample
|
||||
rec, customBucketsFloatHistograms = enc.FloatHistogramSamples(b.floatHistograms, buf)
|
||||
buf = rec[:0]
|
||||
if len(rec) > 0 {
|
||||
if err := a.head.wal.Log(rec); err != nil {
|
||||
return fmt.Errorf("log float histograms: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
if len(customBucketsFloatHistograms) > 0 {
|
||||
rec = enc.CustomBucketsFloatHistogramSamples(customBucketsFloatHistograms, buf)
|
||||
if err := a.head.wal.Log(rec); err != nil {
|
||||
return fmt.Errorf("log custom buckets float histograms: %w", err)
|
||||
if len(customBucketsFloatHistograms) > 0 {
|
||||
rec = enc.CustomBucketsFloatHistogramSamples(customBucketsFloatHistograms, buf)
|
||||
if err := a.head.wal.Log(rec); err != nil {
|
||||
return fmt.Errorf("log custom buckets float histograms: %w", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// Exemplars should be logged after samples (float/native histogram/etc),
|
||||
// otherwise it might happen that we send the exemplars in a remote write
|
||||
// batch before the samples, which in turn means the exemplar is rejected
|
||||
// for missing series, since series are created due to samples.
|
||||
if len(a.exemplars) > 0 {
|
||||
rec = enc.Exemplars(exemplarsForEncoding(a.exemplars), buf)
|
||||
buf = rec[:0]
|
||||
// Exemplars should be logged after samples (float/native histogram/etc),
|
||||
// otherwise it might happen that we send the exemplars in a remote write
|
||||
// batch before the samples, which in turn means the exemplar is rejected
|
||||
// for missing series, since series are created due to samples.
|
||||
if len(b.exemplars) > 0 {
|
||||
rec = enc.Exemplars(exemplarsForEncoding(b.exemplars), buf)
|
||||
buf = rec[:0]
|
||||
|
||||
if err := a.head.wal.Log(rec); err != nil {
|
||||
return fmt.Errorf("log exemplars: %w", err)
|
||||
if err := a.head.wal.Log(rec); err != nil {
|
||||
return fmt.Errorf("log exemplars: %w", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
@ -1040,10 +1184,10 @@ type appenderCommitContext struct {
|
|||
enc record.Encoder
|
||||
}
|
||||
|
||||
// commitExemplars adds all exemplars from headAppender to the head's exemplar storage.
|
||||
func (a *headAppender) commitExemplars() {
|
||||
// commitExemplars adds all exemplars from the provided batch to the head's exemplar storage.
|
||||
func (a *headAppender) commitExemplars(b *appendBatch) {
|
||||
// No errors logging to WAL, so pass the exemplars along to the in memory storage.
|
||||
for _, e := range a.exemplars {
|
||||
for _, e := range b.exemplars {
|
||||
s := a.head.series.getByID(chunks.HeadSeriesRef(e.ref))
|
||||
if s == nil {
|
||||
// This is very unlikely to happen, but we have seen it in the wild.
|
||||
|
@ -1147,9 +1291,9 @@ func handleAppendableError(err error, appended, oooRejected, oobRejected, tooOld
|
|||
}
|
||||
}
|
||||
|
||||
// commitSamples processes and commits the samples in the headAppender to the series.
|
||||
// It handles both in-order and out-of-order samples, updating the appenderCommitContext
|
||||
// with the results of the append operations.
|
||||
// commitFloats processes and commits the samples in the provided batch to the
|
||||
// series. It handles both in-order and out-of-order samples, updating the
|
||||
// appenderCommitContext with the results of the append operations.
|
||||
//
|
||||
// The function iterates over the samples in the headAppender and attempts to append each sample
|
||||
// to its corresponding series. It handles various error cases such as out-of-order samples,
|
||||
|
@ -1166,12 +1310,12 @@ func handleAppendableError(err error, appended, oooRejected, oobRejected, tooOld
|
|||
// operations on the series after appending the samples.
|
||||
//
|
||||
// There are also specific functions to commit histograms and float histograms.
|
||||
func (a *headAppender) commitSamples(acc *appenderCommitContext) {
|
||||
func (a *headAppender) commitFloats(b *appendBatch, acc *appenderCommitContext) {
|
||||
var ok, chunkCreated bool
|
||||
var series *memSeries
|
||||
|
||||
for i, s := range a.samples {
|
||||
series = a.sampleSeries[i]
|
||||
for i, s := range b.floats {
|
||||
series = b.floatSeries[i]
|
||||
series.Lock()
|
||||
|
||||
oooSample, _, err := series.appendable(s.T, s.V, a.headMaxt, a.minValidTime, a.oooTimeWindow)
|
||||
|
@ -1261,13 +1405,13 @@ func (a *headAppender) commitSamples(acc *appenderCommitContext) {
|
|||
}
|
||||
}
|
||||
|
||||
// For details on the commitHistograms function, see the commitSamples docs.
|
||||
func (a *headAppender) commitHistograms(acc *appenderCommitContext) {
|
||||
// For details on the commitHistograms function, see the commitFloats docs.
|
||||
func (a *headAppender) commitHistograms(b *appendBatch, acc *appenderCommitContext) {
|
||||
var ok, chunkCreated bool
|
||||
var series *memSeries
|
||||
|
||||
for i, s := range a.histograms {
|
||||
series = a.histogramSeries[i]
|
||||
for i, s := range b.histograms {
|
||||
series = b.histogramSeries[i]
|
||||
series.Lock()
|
||||
|
||||
oooSample, _, err := series.appendableHistogram(s.T, s.H, a.headMaxt, a.minValidTime, a.oooTimeWindow)
|
||||
|
@ -1361,13 +1505,13 @@ func (a *headAppender) commitHistograms(acc *appenderCommitContext) {
|
|||
}
|
||||
}
|
||||
|
||||
// For details on the commitFloatHistograms function, see the commitSamples docs.
|
||||
func (a *headAppender) commitFloatHistograms(acc *appenderCommitContext) {
|
||||
// For details on the commitFloatHistograms function, see the commitFloats docs.
|
||||
func (a *headAppender) commitFloatHistograms(b *appendBatch, acc *appenderCommitContext) {
|
||||
var ok, chunkCreated bool
|
||||
var series *memSeries
|
||||
|
||||
for i, s := range a.floatHistograms {
|
||||
series = a.floatHistogramSeries[i]
|
||||
for i, s := range b.floatHistograms {
|
||||
series = b.floatHistogramSeries[i]
|
||||
series.Lock()
|
||||
|
||||
oooSample, _, err := series.appendableFloatHistogram(s.T, s.FH, a.headMaxt, a.minValidTime, a.oooTimeWindow)
|
||||
|
@ -1461,14 +1605,14 @@ func (a *headAppender) commitFloatHistograms(acc *appenderCommitContext) {
|
|||
}
|
||||
}
|
||||
|
||||
// commitMetadata commits the metadata for each series in the headAppender.
|
||||
// commitMetadata commits the metadata for each series in the provided batch.
|
||||
// It iterates over the metadata slice and updates the corresponding series
|
||||
// with the new metadata information. The series is locked during the update
|
||||
// to ensure thread safety.
|
||||
func (a *headAppender) commitMetadata() {
|
||||
func commitMetadata(b *appendBatch) {
|
||||
var series *memSeries
|
||||
for i, m := range a.metadata {
|
||||
series = a.metadataSeries[i]
|
||||
for i, m := range b.metadata {
|
||||
series = b.metadataSeries[i]
|
||||
series.Lock()
|
||||
series.meta = &metadata.Metadata{Type: record.ToMetricType(m.Type), Unit: m.Unit, Help: m.Help}
|
||||
series.Unlock()
|
||||
|
@ -1489,75 +1633,80 @@ func (a *headAppender) Commit() (err error) {
|
|||
if a.closed {
|
||||
return ErrAppenderClosed
|
||||
}
|
||||
defer func() { a.closed = true }()
|
||||
|
||||
h := a.head
|
||||
|
||||
defer func() {
|
||||
h.putRefSeriesBuffer(a.seriesRefs)
|
||||
h.putSeriesBuffer(a.series)
|
||||
a.closed = true
|
||||
}()
|
||||
|
||||
if err := a.log(); err != nil {
|
||||
_ = a.Rollback() // Most likely the same error will happen again.
|
||||
return fmt.Errorf("write to WAL: %w", err)
|
||||
}
|
||||
|
||||
if a.head.writeNotified != nil {
|
||||
a.head.writeNotified.Notify()
|
||||
if h.writeNotified != nil {
|
||||
h.writeNotified.Notify()
|
||||
}
|
||||
|
||||
a.commitExemplars()
|
||||
|
||||
defer a.head.metrics.activeAppenders.Dec()
|
||||
defer a.head.putAppendBuffer(a.samples)
|
||||
defer a.head.putSeriesBuffer(a.sampleSeries)
|
||||
defer a.head.putExemplarBuffer(a.exemplars)
|
||||
defer a.head.putHistogramBuffer(a.histograms)
|
||||
defer a.head.putFloatHistogramBuffer(a.floatHistograms)
|
||||
defer a.head.putMetadataBuffer(a.metadata)
|
||||
defer a.head.iso.closeAppend(a.appendID)
|
||||
|
||||
acc := &appenderCommitContext{
|
||||
floatsAppended: len(a.samples),
|
||||
histogramsAppended: len(a.histograms) + len(a.floatHistograms),
|
||||
inOrderMint: math.MaxInt64,
|
||||
inOrderMaxt: math.MinInt64,
|
||||
oooMinT: math.MaxInt64,
|
||||
oooMaxT: math.MinInt64,
|
||||
oooCapMax: a.head.opts.OutOfOrderCapMax.Load(),
|
||||
inOrderMint: math.MaxInt64,
|
||||
inOrderMaxt: math.MinInt64,
|
||||
oooMinT: math.MaxInt64,
|
||||
oooMaxT: math.MinInt64,
|
||||
oooCapMax: h.opts.OutOfOrderCapMax.Load(),
|
||||
appendChunkOpts: chunkOpts{
|
||||
chunkDiskMapper: a.head.chunkDiskMapper,
|
||||
chunkRange: a.head.chunkRange.Load(),
|
||||
samplesPerChunk: a.head.opts.SamplesPerChunk,
|
||||
chunkDiskMapper: h.chunkDiskMapper,
|
||||
chunkRange: h.chunkRange.Load(),
|
||||
samplesPerChunk: h.opts.SamplesPerChunk,
|
||||
},
|
||||
}
|
||||
|
||||
for _, b := range a.batches {
|
||||
acc.floatsAppended += len(b.floats)
|
||||
acc.histogramsAppended += len(b.histograms) + len(b.floatHistograms)
|
||||
a.commitExemplars(b)
|
||||
defer b.close(h)
|
||||
}
|
||||
defer h.metrics.activeAppenders.Dec()
|
||||
defer h.iso.closeAppend(a.appendID)
|
||||
|
||||
defer func() {
|
||||
for i := range acc.oooRecords {
|
||||
a.head.putBytesBuffer(acc.oooRecords[i][:0])
|
||||
h.putBytesBuffer(acc.oooRecords[i][:0])
|
||||
}
|
||||
}()
|
||||
|
||||
a.commitSamples(acc)
|
||||
a.commitHistograms(acc)
|
||||
a.commitFloatHistograms(acc)
|
||||
a.commitMetadata()
|
||||
for _, b := range a.batches {
|
||||
a.commitFloats(b, acc)
|
||||
a.commitHistograms(b, acc)
|
||||
a.commitFloatHistograms(b, acc)
|
||||
commitMetadata(b)
|
||||
}
|
||||
// Unmark all series as pending commit after all samples have been committed.
|
||||
a.unmarkCreatedSeriesAsPendingCommit()
|
||||
|
||||
a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeFloat).Add(float64(acc.floatOOORejected))
|
||||
a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeHistogram).Add(float64(acc.histoOOORejected))
|
||||
a.head.metrics.outOfBoundSamples.WithLabelValues(sampleMetricTypeFloat).Add(float64(acc.floatOOBRejected))
|
||||
a.head.metrics.tooOldSamples.WithLabelValues(sampleMetricTypeFloat).Add(float64(acc.floatTooOldRejected))
|
||||
a.head.metrics.samplesAppended.WithLabelValues(sampleMetricTypeFloat).Add(float64(acc.floatsAppended))
|
||||
a.head.metrics.samplesAppended.WithLabelValues(sampleMetricTypeHistogram).Add(float64(acc.histogramsAppended))
|
||||
a.head.metrics.outOfOrderSamplesAppended.WithLabelValues(sampleMetricTypeFloat).Add(float64(acc.oooFloatsAccepted))
|
||||
a.head.metrics.outOfOrderSamplesAppended.WithLabelValues(sampleMetricTypeHistogram).Add(float64(acc.oooHistogramAccepted))
|
||||
a.head.updateMinMaxTime(acc.inOrderMint, acc.inOrderMaxt)
|
||||
a.head.updateMinOOOMaxOOOTime(acc.oooMinT, acc.oooMaxT)
|
||||
h.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeFloat).Add(float64(acc.floatOOORejected))
|
||||
h.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeHistogram).Add(float64(acc.histoOOORejected))
|
||||
h.metrics.outOfBoundSamples.WithLabelValues(sampleMetricTypeFloat).Add(float64(acc.floatOOBRejected))
|
||||
h.metrics.tooOldSamples.WithLabelValues(sampleMetricTypeFloat).Add(float64(acc.floatTooOldRejected))
|
||||
h.metrics.samplesAppended.WithLabelValues(sampleMetricTypeFloat).Add(float64(acc.floatsAppended))
|
||||
h.metrics.samplesAppended.WithLabelValues(sampleMetricTypeHistogram).Add(float64(acc.histogramsAppended))
|
||||
h.metrics.outOfOrderSamplesAppended.WithLabelValues(sampleMetricTypeFloat).Add(float64(acc.oooFloatsAccepted))
|
||||
h.metrics.outOfOrderSamplesAppended.WithLabelValues(sampleMetricTypeHistogram).Add(float64(acc.oooHistogramAccepted))
|
||||
h.updateMinMaxTime(acc.inOrderMint, acc.inOrderMaxt)
|
||||
h.updateMinOOOMaxOOOTime(acc.oooMinT, acc.oooMaxT)
|
||||
|
||||
acc.collectOOORecords(a)
|
||||
if a.head.wbl != nil {
|
||||
if err := a.head.wbl.Log(acc.oooRecords...); err != nil {
|
||||
if h.wbl != nil {
|
||||
if err := h.wbl.Log(acc.oooRecords...); err != nil {
|
||||
// TODO(codesome): Currently WBL logging of ooo samples is best effort here since we cannot try logging
|
||||
// until we have found what samples become OOO. We can try having a metric for this failure.
|
||||
// Returning the error here is not correct because we have already put the samples into the memory,
|
||||
// hence the append/insert was a success.
|
||||
a.head.logger.Error("Failed to log out of order samples into the WAL", "err", err)
|
||||
h.logger.Error("Failed to log out of order samples into the WAL", "err", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
@ -2007,37 +2156,36 @@ func (a *headAppender) Rollback() (err error) {
|
|||
if a.closed {
|
||||
return ErrAppenderClosed
|
||||
}
|
||||
defer func() { a.closed = true }()
|
||||
defer a.head.metrics.activeAppenders.Dec()
|
||||
defer a.head.iso.closeAppend(a.appendID)
|
||||
defer a.head.putSeriesBuffer(a.sampleSeries)
|
||||
defer a.unmarkCreatedSeriesAsPendingCommit()
|
||||
h := a.head
|
||||
defer func() {
|
||||
a.unmarkCreatedSeriesAsPendingCommit()
|
||||
h.iso.closeAppend(a.appendID)
|
||||
h.metrics.activeAppenders.Dec()
|
||||
a.closed = true
|
||||
h.putRefSeriesBuffer(a.seriesRefs)
|
||||
h.putSeriesBuffer(a.series)
|
||||
}()
|
||||
|
||||
var series *memSeries
|
||||
for i := range a.samples {
|
||||
series = a.sampleSeries[i]
|
||||
series.Lock()
|
||||
series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow)
|
||||
series.pendingCommit = false
|
||||
series.Unlock()
|
||||
fmt.Println("ROLLBACK")
|
||||
for _, b := range a.batches {
|
||||
for i := range b.floats {
|
||||
series = b.floatSeries[i]
|
||||
series.Lock()
|
||||
series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow)
|
||||
series.pendingCommit = false
|
||||
series.Unlock()
|
||||
}
|
||||
for i := range b.histograms {
|
||||
series = b.histogramSeries[i]
|
||||
series.Lock()
|
||||
series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow)
|
||||
series.pendingCommit = false
|
||||
series.Unlock()
|
||||
}
|
||||
b.close(h)
|
||||
}
|
||||
for i := range a.histograms {
|
||||
series = a.histogramSeries[i]
|
||||
series.Lock()
|
||||
series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow)
|
||||
series.pendingCommit = false
|
||||
series.Unlock()
|
||||
}
|
||||
a.head.putAppendBuffer(a.samples)
|
||||
a.head.putExemplarBuffer(a.exemplars)
|
||||
a.head.putHistogramBuffer(a.histograms)
|
||||
a.head.putFloatHistogramBuffer(a.floatHistograms)
|
||||
a.head.putMetadataBuffer(a.metadata)
|
||||
a.samples = nil
|
||||
a.exemplars = nil
|
||||
a.histograms = nil
|
||||
a.metadata = nil
|
||||
|
||||
a.batches = a.batches[:0]
|
||||
// Series are created in the head memory regardless of rollback. Thus we have
|
||||
// to log them to the WAL in any case.
|
||||
return a.log()
|
||||
|
|
|
@ -5336,8 +5336,6 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) {
|
|||
samples []chunks.Sample
|
||||
expChunks int
|
||||
err error
|
||||
// If this is empty, samples above will be taken instead of this.
|
||||
addToExp []chunks.Sample
|
||||
}{
|
||||
// Histograms that end up in the expected samples are copied here so that we
|
||||
// can independently set the CounterResetHint later.
|
||||
|
@ -5377,43 +5375,29 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) {
|
|||
samples: []chunks.Sample{sample{t: 100, fh: floatHists[4].Copy()}},
|
||||
err: storage.ErrOutOfOrderSample,
|
||||
},
|
||||
// The three next tests all failed before #15177 was fixed.
|
||||
{
|
||||
// Combination of histograms and float64 in the same commit. The behaviour is undefined, but we want to also
|
||||
// verify how TSDB would behave. Here the histogram is appended at the end, hence will be considered as out of order.
|
||||
samples: []chunks.Sample{
|
||||
sample{t: 400, f: 4},
|
||||
sample{t: 500, h: hists[5]}, // This won't be committed.
|
||||
sample{t: 500, h: hists[5]},
|
||||
sample{t: 600, f: 6},
|
||||
},
|
||||
addToExp: []chunks.Sample{
|
||||
sample{t: 400, f: 4},
|
||||
sample{t: 600, f: 6},
|
||||
},
|
||||
expChunks: 7, // Only 1 new chunk for float64.
|
||||
expChunks: 9, // Each of the three samples above creates a new chunk because the type changes.
|
||||
},
|
||||
{
|
||||
// Here the histogram is appended at the end, hence the first histogram is out of order.
|
||||
samples: []chunks.Sample{
|
||||
sample{t: 700, h: hists[7]}, // Out of order w.r.t. the next float64 sample that is appended first.
|
||||
sample{t: 700, h: hists[7]},
|
||||
sample{t: 800, f: 8},
|
||||
sample{t: 900, h: hists[9]},
|
||||
},
|
||||
addToExp: []chunks.Sample{
|
||||
sample{t: 800, f: 8},
|
||||
sample{t: 900, h: hists[9].Copy()},
|
||||
},
|
||||
expChunks: 8, // float64 added to old chunk, only 1 new for histograms.
|
||||
expChunks: 12, // Again each sample creates a new chunk.
|
||||
},
|
||||
{
|
||||
// Float histogram is appended at the end.
|
||||
samples: []chunks.Sample{
|
||||
sample{t: 1000, fh: floatHists[7]}, // Out of order w.r.t. the next histogram.
|
||||
sample{t: 1000, fh: floatHists[7]},
|
||||
sample{t: 1100, h: hists[9]},
|
||||
},
|
||||
addToExp: []chunks.Sample{
|
||||
sample{t: 1100, h: hists[9].Copy()},
|
||||
},
|
||||
expChunks: 8,
|
||||
expChunks: 14, // Even changes between float and integer histogram create new chunks.
|
||||
},
|
||||
}
|
||||
|
||||
|
@ -5431,11 +5415,7 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) {
|
|||
|
||||
if a.err == nil {
|
||||
require.NoError(t, app.Commit())
|
||||
if len(a.addToExp) > 0 {
|
||||
expResult = append(expResult, a.addToExp...)
|
||||
} else {
|
||||
expResult = append(expResult, a.samples...)
|
||||
}
|
||||
expResult = append(expResult, a.samples...)
|
||||
checkExpChunks(a.expChunks)
|
||||
} else {
|
||||
require.NoError(t, app.Rollback())
|
||||
|
@ -6751,7 +6731,27 @@ func TestHeadAppender_AppendFloatWithSameTimestampAsPreviousHistogram(t *testing
|
|||
|
||||
func TestHeadAppender_AppendCT(t *testing.T) {
|
||||
testHistogram := tsdbutil.GenerateTestHistogram(1)
|
||||
testHistogram.CounterResetHint = histogram.NotCounterReset
|
||||
testFloatHistogram := tsdbutil.GenerateTestFloatHistogram(1)
|
||||
testFloatHistogram.CounterResetHint = histogram.NotCounterReset
|
||||
// TODO(beorn7): Once issue #15346 is fixed, the CounterResetHint of the
|
||||
// following two zero histograms should be histogram.CounterReset.
|
||||
testZeroHistogram := &histogram.Histogram{
|
||||
Schema: testHistogram.Schema,
|
||||
ZeroThreshold: testHistogram.ZeroThreshold,
|
||||
PositiveSpans: testHistogram.PositiveSpans,
|
||||
NegativeSpans: testHistogram.NegativeSpans,
|
||||
PositiveBuckets: []int64{0, 0, 0, 0},
|
||||
NegativeBuckets: []int64{0, 0, 0, 0},
|
||||
}
|
||||
testZeroFloatHistogram := &histogram.FloatHistogram{
|
||||
Schema: testFloatHistogram.Schema,
|
||||
ZeroThreshold: testFloatHistogram.ZeroThreshold,
|
||||
PositiveSpans: testFloatHistogram.PositiveSpans,
|
||||
NegativeSpans: testFloatHistogram.NegativeSpans,
|
||||
PositiveBuckets: []float64{0, 0, 0, 0},
|
||||
NegativeBuckets: []float64{0, 0, 0, 0},
|
||||
}
|
||||
type appendableSamples struct {
|
||||
ts int64
|
||||
fSample float64
|
||||
|
@ -6783,12 +6783,10 @@ func TestHeadAppender_AppendCT(t *testing.T) {
|
|||
{ts: 101, h: testHistogram, ct: 1},
|
||||
},
|
||||
expectedSamples: func() []chunks.Sample {
|
||||
hNoCounterReset := *testHistogram
|
||||
hNoCounterReset.CounterResetHint = histogram.NotCounterReset
|
||||
return []chunks.Sample{
|
||||
sample{t: 1, h: &histogram.Histogram{}},
|
||||
sample{t: 1, h: testZeroHistogram},
|
||||
sample{t: 100, h: testHistogram},
|
||||
sample{t: 101, h: &hNoCounterReset},
|
||||
sample{t: 101, h: testHistogram},
|
||||
}
|
||||
}(),
|
||||
},
|
||||
|
@ -6799,12 +6797,10 @@ func TestHeadAppender_AppendCT(t *testing.T) {
|
|||
{ts: 101, fh: testFloatHistogram, ct: 1},
|
||||
},
|
||||
expectedSamples: func() []chunks.Sample {
|
||||
fhNoCounterReset := *testFloatHistogram
|
||||
fhNoCounterReset.CounterResetHint = histogram.NotCounterReset
|
||||
return []chunks.Sample{
|
||||
sample{t: 1, fh: &histogram.FloatHistogram{}},
|
||||
sample{t: 1, fh: testZeroFloatHistogram},
|
||||
sample{t: 100, fh: testFloatHistogram},
|
||||
sample{t: 101, fh: &fhNoCounterReset},
|
||||
sample{t: 101, fh: testFloatHistogram},
|
||||
}
|
||||
}(),
|
||||
},
|
||||
|
@ -6827,12 +6823,10 @@ func TestHeadAppender_AppendCT(t *testing.T) {
|
|||
{ts: 101, h: testHistogram, ct: 1},
|
||||
},
|
||||
expectedSamples: func() []chunks.Sample {
|
||||
hNoCounterReset := *testHistogram
|
||||
hNoCounterReset.CounterResetHint = histogram.NotCounterReset
|
||||
return []chunks.Sample{
|
||||
sample{t: 1, h: &histogram.Histogram{}},
|
||||
sample{t: 1, h: testZeroHistogram},
|
||||
sample{t: 100, h: testHistogram},
|
||||
sample{t: 101, h: &hNoCounterReset},
|
||||
sample{t: 101, h: testHistogram},
|
||||
}
|
||||
}(),
|
||||
},
|
||||
|
@ -6843,12 +6837,10 @@ func TestHeadAppender_AppendCT(t *testing.T) {
|
|||
{ts: 101, fh: testFloatHistogram, ct: 1},
|
||||
},
|
||||
expectedSamples: func() []chunks.Sample {
|
||||
fhNoCounterReset := *testFloatHistogram
|
||||
fhNoCounterReset.CounterResetHint = histogram.NotCounterReset
|
||||
return []chunks.Sample{
|
||||
sample{t: 1, fh: &histogram.FloatHistogram{}},
|
||||
sample{t: 1, fh: testZeroFloatHistogram},
|
||||
sample{t: 100, fh: testFloatHistogram},
|
||||
sample{t: 101, fh: &fhNoCounterReset},
|
||||
sample{t: 101, fh: testFloatHistogram},
|
||||
}
|
||||
}(),
|
||||
},
|
||||
|
@ -6872,9 +6864,9 @@ func TestHeadAppender_AppendCT(t *testing.T) {
|
|||
{ts: 102, h: testHistogram, ct: 101},
|
||||
},
|
||||
expectedSamples: []chunks.Sample{
|
||||
sample{t: 1, h: &histogram.Histogram{}},
|
||||
sample{t: 1, h: testZeroHistogram},
|
||||
sample{t: 100, h: testHistogram},
|
||||
sample{t: 101, h: &histogram.Histogram{CounterResetHint: histogram.UnknownCounterReset}},
|
||||
sample{t: 101, h: testZeroHistogram},
|
||||
sample{t: 102, h: testHistogram},
|
||||
},
|
||||
},
|
||||
|
@ -6885,9 +6877,9 @@ func TestHeadAppender_AppendCT(t *testing.T) {
|
|||
{ts: 102, fh: testFloatHistogram, ct: 101},
|
||||
},
|
||||
expectedSamples: []chunks.Sample{
|
||||
sample{t: 1, fh: &histogram.FloatHistogram{}},
|
||||
sample{t: 1, fh: testZeroFloatHistogram},
|
||||
sample{t: 100, fh: testFloatHistogram},
|
||||
sample{t: 101, fh: &histogram.FloatHistogram{CounterResetHint: histogram.UnknownCounterReset}},
|
||||
sample{t: 101, fh: testZeroFloatHistogram},
|
||||
sample{t: 102, fh: testFloatHistogram},
|
||||
},
|
||||
},
|
||||
|
@ -6910,12 +6902,10 @@ func TestHeadAppender_AppendCT(t *testing.T) {
|
|||
{ts: 101, h: testHistogram, ct: 100},
|
||||
},
|
||||
expectedSamples: func() []chunks.Sample {
|
||||
hNoCounterReset := *testHistogram
|
||||
hNoCounterReset.CounterResetHint = histogram.NotCounterReset
|
||||
return []chunks.Sample{
|
||||
sample{t: 1, h: &histogram.Histogram{}},
|
||||
sample{t: 1, h: testZeroHistogram},
|
||||
sample{t: 100, h: testHistogram},
|
||||
sample{t: 101, h: &hNoCounterReset},
|
||||
sample{t: 101, h: testHistogram},
|
||||
}
|
||||
}(),
|
||||
},
|
||||
|
@ -6926,12 +6916,10 @@ func TestHeadAppender_AppendCT(t *testing.T) {
|
|||
{ts: 101, fh: testFloatHistogram, ct: 100},
|
||||
},
|
||||
expectedSamples: func() []chunks.Sample {
|
||||
fhNoCounterReset := *testFloatHistogram
|
||||
fhNoCounterReset.CounterResetHint = histogram.NotCounterReset
|
||||
return []chunks.Sample{
|
||||
sample{t: 1, fh: &histogram.FloatHistogram{}},
|
||||
sample{t: 1, fh: testZeroFloatHistogram},
|
||||
sample{t: 100, fh: testFloatHistogram},
|
||||
sample{t: 101, fh: &fhNoCounterReset},
|
||||
sample{t: 101, fh: testFloatHistogram},
|
||||
}
|
||||
}(),
|
||||
},
|
||||
|
|
Loading…
Reference in New Issue