Merge pull request #17291 from prometheus/krajo/pick-pr-17290
perf(tsdb): reuse map of sample types to speed up head appender
This commit is contained in:
commit
077abc5cca
|
@ -93,6 +93,7 @@ type Head struct {
|
||||||
floatHistogramsPool zeropool.Pool[[]record.RefFloatHistogramSample]
|
floatHistogramsPool zeropool.Pool[[]record.RefFloatHistogramSample]
|
||||||
metadataPool zeropool.Pool[[]record.RefMetadata]
|
metadataPool zeropool.Pool[[]record.RefMetadata]
|
||||||
seriesPool zeropool.Pool[[]*memSeries]
|
seriesPool zeropool.Pool[[]*memSeries]
|
||||||
|
typeMapPool zeropool.Pool[map[chunks.HeadSeriesRef]sampleType]
|
||||||
bytesPool zeropool.Pool[[]byte]
|
bytesPool zeropool.Pool[[]byte]
|
||||||
memChunkPool sync.Pool
|
memChunkPool sync.Pool
|
||||||
|
|
||||||
|
|
|
@ -173,7 +173,7 @@ func (h *Head) appender() *headAppender {
|
||||||
oooTimeWindow: h.opts.OutOfOrderTimeWindow.Load(),
|
oooTimeWindow: h.opts.OutOfOrderTimeWindow.Load(),
|
||||||
seriesRefs: h.getRefSeriesBuffer(),
|
seriesRefs: h.getRefSeriesBuffer(),
|
||||||
series: h.getSeriesBuffer(),
|
series: h.getSeriesBuffer(),
|
||||||
typesInBatch: map[chunks.HeadSeriesRef]sampleType{},
|
typesInBatch: h.getTypeMap(),
|
||||||
appendID: appendID,
|
appendID: appendID,
|
||||||
cleanupAppendIDsBelow: cleanupAppendIDsBelow,
|
cleanupAppendIDsBelow: cleanupAppendIDsBelow,
|
||||||
}
|
}
|
||||||
|
@ -297,6 +297,19 @@ func (h *Head) putSeriesBuffer(b []*memSeries) {
|
||||||
h.seriesPool.Put(b[:0])
|
h.seriesPool.Put(b[:0])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (h *Head) getTypeMap() map[chunks.HeadSeriesRef]sampleType {
|
||||||
|
b := h.typeMapPool.Get()
|
||||||
|
if b == nil {
|
||||||
|
return make(map[chunks.HeadSeriesRef]sampleType)
|
||||||
|
}
|
||||||
|
return b
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *Head) putTypeMap(b map[chunks.HeadSeriesRef]sampleType) {
|
||||||
|
clear(b)
|
||||||
|
h.typeMapPool.Put(b)
|
||||||
|
}
|
||||||
|
|
||||||
func (h *Head) getBytesBuffer() []byte {
|
func (h *Head) getBytesBuffer() []byte {
|
||||||
b := h.bytesPool.Get()
|
b := h.bytesPool.Get()
|
||||||
if b == nil {
|
if b == nil {
|
||||||
|
@ -1687,8 +1700,13 @@ func (a *headAppender) Commit() (err error) {
|
||||||
h := a.head
|
h := a.head
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
|
if a.closed {
|
||||||
|
// Don't double-close in case Rollback() was called.
|
||||||
|
return
|
||||||
|
}
|
||||||
h.putRefSeriesBuffer(a.seriesRefs)
|
h.putRefSeriesBuffer(a.seriesRefs)
|
||||||
h.putSeriesBuffer(a.series)
|
h.putSeriesBuffer(a.series)
|
||||||
|
h.putTypeMap(a.typesInBatch)
|
||||||
a.closed = true
|
a.closed = true
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
@ -2216,6 +2234,7 @@ func (a *headAppender) Rollback() (err error) {
|
||||||
a.closed = true
|
a.closed = true
|
||||||
h.putRefSeriesBuffer(a.seriesRefs)
|
h.putRefSeriesBuffer(a.seriesRefs)
|
||||||
h.putSeriesBuffer(a.series)
|
h.putSeriesBuffer(a.series)
|
||||||
|
h.putTypeMap(a.typesInBatch)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
var series *memSeries
|
var series *memSeries
|
||||||
|
|
|
@ -19,7 +19,7 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
func Test(t *testing.T, f func(t *testing.T)) {
|
func Test(t *testing.T, _ func(t *testing.T)) {
|
||||||
t.Skip("goexperiment.synctest is not enabled")
|
t.Skip("goexperiment.synctest is not enabled")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue