promql: Make HistogramStatsIterator.AtFloatHistogram idempotent

Previously, multiple calls returned a wrong counter reset hint.

This commit also includes a bunch of refactorings that partially have
value on their own. However, the need for them was triggered by the
additional work needed for idempotency, so I included them in this
commit.

Signed-off-by: beorn7 <beorn@grafana.com>
This commit is contained in:
beorn7 2025-09-04 19:05:14 +02:00
parent 0746f388b0
commit 48c6c1a692
2 changed files with 69 additions and 38 deletions

View File

@ -29,15 +29,16 @@ import (
type HistogramStatsIterator struct { type HistogramStatsIterator struct {
chunkenc.Iterator chunkenc.Iterator
currentFH *histogram.FloatHistogram current *histogram.FloatHistogram
lastFH *histogram.FloatHistogram last *histogram.FloatHistogram
lastIsCurrent bool
} }
// NewHistogramStatsIterator creates a new HistogramStatsIterator. // NewHistogramStatsIterator creates a new HistogramStatsIterator.
func NewHistogramStatsIterator(it chunkenc.Iterator) *HistogramStatsIterator { func NewHistogramStatsIterator(it chunkenc.Iterator) *HistogramStatsIterator {
return &HistogramStatsIterator{ return &HistogramStatsIterator{
Iterator: it, Iterator: it,
currentFH: &histogram.FloatHistogram{}, current: &histogram.FloatHistogram{},
} }
} }
@ -45,12 +46,14 @@ func NewHistogramStatsIterator(it chunkenc.Iterator) *HistogramStatsIterator {
// objects already allocated where possible. // objects already allocated where possible.
func (hsi *HistogramStatsIterator) Reset(it chunkenc.Iterator) { func (hsi *HistogramStatsIterator) Reset(it chunkenc.Iterator) {
hsi.Iterator = it hsi.Iterator = it
hsi.lastFH = nil hsi.last = nil
hsi.lastIsCurrent = false
} }
// Next mostly relays to the underlying iterator, but changes a ValHistogram // Next mostly relays to the underlying iterator, but changes a ValHistogram
// return into a ValFloatHistogram return. // return into a ValFloatHistogram return.
func (hsi *HistogramStatsIterator) Next() chunkenc.ValueType { func (hsi *HistogramStatsIterator) Next() chunkenc.ValueType {
hsi.lastIsCurrent = false
vt := hsi.Iterator.Next() vt := hsi.Iterator.Next()
if vt == chunkenc.ValHistogram { if vt == chunkenc.ValHistogram {
return chunkenc.ValFloatHistogram return chunkenc.ValFloatHistogram
@ -62,9 +65,10 @@ func (hsi *HistogramStatsIterator) Next() chunkenc.ValueType {
// return into a ValFloatHistogram return. // return into a ValFloatHistogram return.
func (hsi *HistogramStatsIterator) Seek(t int64) chunkenc.ValueType { func (hsi *HistogramStatsIterator) Seek(t int64) chunkenc.ValueType {
// If the Seek is going to move the iterator, we have to forget the // If the Seek is going to move the iterator, we have to forget the
// lastFH. // lastFH and mark the currentFH as not current anymore.
if t > hsi.AtT() { if t > hsi.AtT() {
hsi.lastFH = nil hsi.last = nil
hsi.lastIsCurrent = false
} }
vt := hsi.Iterator.Seek(t) vt := hsi.Iterator.Seek(t)
if vt == chunkenc.ValHistogram { if vt == chunkenc.ValHistogram {
@ -83,47 +87,65 @@ func (*HistogramStatsIterator) AtHistogram(*histogram.Histogram) (int64, *histog
// hint (not UnknownCounterReset) if the previous sample has been accessed with // hint (not UnknownCounterReset) if the previous sample has been accessed with
// the same iterator. // the same iterator.
func (hsi *HistogramStatsIterator) AtFloatHistogram(fh *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { func (hsi *HistogramStatsIterator) AtFloatHistogram(fh *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) {
var t int64 populateFH := func(src *histogram.FloatHistogram, detectReset bool) {
t, hsi.currentFH = hsi.Iterator.AtFloatHistogram(hsi.currentFH) h := histogram.FloatHistogram{
if value.IsStaleNaN(hsi.currentFH.Sum) { CounterResetHint: src.CounterResetHint,
return t, &histogram.FloatHistogram{Sum: hsi.currentFH.Sum} Count: src.Count,
Sum: src.Sum,
}
if detectReset {
h.CounterResetHint = hsi.getResetHint(src.CounterResetHint)
}
if fh == nil {
// Note that we cannot simply write `fh = &h` here
// because that would let h escape to the heap.
fh = &histogram.FloatHistogram{}
*fh = h
} else {
h.CopyTo(fh)
}
} }
if fh == nil { if hsi.lastIsCurrent {
fh = &histogram.FloatHistogram{ // Nothing changed since last AtFloatHistogram call. Return a
CounterResetHint: hsi.getFloatResetHint(hsi.currentFH.CounterResetHint), // copy of the stored last histogram rather than doing counter
Count: hsi.currentFH.Count, // reset detection again (which would yield a potentially wrong
Sum: hsi.currentFH.Sum, // result of "no counter reset").
} populateFH(hsi.last, false)
hsi.setLastFH(hsi.currentFH) return hsi.AtT(), fh
}
var t int64
t, hsi.current = hsi.Iterator.AtFloatHistogram(hsi.current)
if value.IsStaleNaN(hsi.current.Sum) {
populateFH(hsi.current, false)
return t, fh return t, fh
} }
populateFH(hsi.current, true)
returnValue := histogram.FloatHistogram{ hsi.setLastFromCurrent(fh.CounterResetHint)
CounterResetHint: hsi.getFloatResetHint(hsi.currentFH.CounterResetHint),
Count: hsi.currentFH.Count,
Sum: hsi.currentFH.Sum,
}
returnValue.CopyTo(fh)
hsi.setLastFH(hsi.currentFH)
return t, fh return t, fh
} }
func (hsi *HistogramStatsIterator) setLastFH(fh *histogram.FloatHistogram) { // setLastFromCurrent stores a copy of hsi.current as hsi.last. The
if hsi.lastFH == nil { // CounterResetHint of hsi.last is set to the provided value, though. This is
hsi.lastFH = fh.Copy() // meant to store the value we have calculated on the fly so that we can return
// the same without re-calculation in case AtFloatHistogram is called multiple
// times.
func (hsi *HistogramStatsIterator) setLastFromCurrent(hint histogram.CounterResetHint) {
if hsi.last == nil {
hsi.last = hsi.current.Copy()
} else { } else {
fh.CopyTo(hsi.lastFH) hsi.current.CopyTo(hsi.last)
} }
hsi.last.CounterResetHint = hint
hsi.lastIsCurrent = true
} }
func (hsi *HistogramStatsIterator) getFloatResetHint(hint histogram.CounterResetHint) histogram.CounterResetHint { func (hsi *HistogramStatsIterator) getResetHint(hint histogram.CounterResetHint) histogram.CounterResetHint {
if hint != histogram.UnknownCounterReset { if hint != histogram.UnknownCounterReset {
return hint return hint
} }
prevFH := hsi.lastFH if hsi.last == nil {
if prevFH == nil {
// We don't know if there's a counter reset. Note that this // We don't know if there's a counter reset. Note that this
// generally will trigger an explicit counter reset detection by // generally will trigger an explicit counter reset detection by
// the PromQL engine, which in turn isn't as reliable in this // the PromQL engine, which in turn isn't as reliable in this
@ -134,7 +156,7 @@ func (hsi *HistogramStatsIterator) getFloatResetHint(hint histogram.CounterReset
// place. // place.
return histogram.UnknownCounterReset return histogram.UnknownCounterReset
} }
if hsi.currentFH.DetectReset(prevFH) { if hsi.current.DetectReset(hsi.last) {
return histogram.CounterReset return histogram.CounterReset
} }
return histogram.NotCounterReset return histogram.NotCounterReset

View File

@ -116,10 +116,16 @@ func TestHistogramStatsDecoding(t *testing.T) {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
check := func(statsIterator *HistogramStatsIterator) { check := func(statsIterator *HistogramStatsIterator) {
decodedStats := make([]*histogram.FloatHistogram, 0) decodedStats := make([]*histogram.FloatHistogram, 0)
for statsIterator.Next() != chunkenc.ValNone { for typ := statsIterator.Next(); typ != chunkenc.ValNone; typ = statsIterator.Next() {
_, h := statsIterator.AtFloatHistogram(nil) require.Equal(t, chunkenc.ValFloatHistogram, typ)
decodedStats = append(decodedStats, h) t1, h1 := statsIterator.AtFloatHistogram(nil)
// Call AtFloatHistogram again to check for idempotency.
t2, h2 := statsIterator.AtFloatHistogram(nil)
require.Equal(t, t1, t2)
require.True(t, h1.Equals(h2)) // require.Equal does not work with sum=NaN.
decodedStats = append(decodedStats, h1)
} }
require.NoError(t, statsIterator.Err())
for i := 0; i < len(tc.histograms); i++ { for i := 0; i < len(tc.histograms); i++ {
require.Equal(t, tc.expectedHints[i], decodedStats[i].CounterResetHint) require.Equal(t, tc.expectedHints[i], decodedStats[i].CounterResetHint)
fh := tc.histograms[i].ToFloat(nil) fh := tc.histograms[i].ToFloat(nil)
@ -170,6 +176,9 @@ func TestHistogramStatsMixedUse(t *testing.T) {
typ = statsIterator.Next() typ = statsIterator.Next()
require.Equal(t, chunkenc.ValFloatHistogram, typ) require.Equal(t, chunkenc.ValFloatHistogram, typ)
_, h = statsIterator.AtFloatHistogram(nil) _, h = statsIterator.AtFloatHistogram(nil)
// We call AtFloatHistogram here again "randomly" to check idempotency.
_, h2 := statsIterator.AtFloatHistogram(nil)
require.True(t, h.Equals(h2))
actualHints[1] = h.CounterResetHint actualHints[1] = h.CounterResetHint
typ = statsIterator.Next() typ = statsIterator.Next()
require.Equal(t, chunkenc.ValFloatHistogram, typ) require.Equal(t, chunkenc.ValFloatHistogram, typ)