storage/remote: compute highestTimestamp and dataIn at QueueManager level

Because of relabelling, an endpoint can only select a subset of series
that go through WriteStorage

Having a highestTimestamp at WriteStorage level yields wrong values
if the corresponding sample won't even make it to a remote queue.

Currently PrometheusRemoteWriteBehind is based on that, and would fire
if an endpoint is only interested in a subset of series that take time
to appear.

A "prometheus_remote_storage_queue_highest_timestamp_seconds" that only
takes into account samples in the queue is introduced, and used in
PrometheusRemoteWriteBehind and dashboards in documentation/prometheus-mixin

Same applies to samplesIn/dataIn, QueueManager should know more about
when to update those; when data is enqueued.

That makes dataDropped unnecessary, thus help simplify the logic
in QueueManager.calculateDesiredShards()

Signed-off-by: machine424 <ayoubmrini424@gmail.com>
This commit is contained in:
machine424 2025-08-20 23:08:47 +02:00
parent 2cbeef6d95
commit 184c7eb918
No known key found for this signature in database
GPG Key ID: A4B001A4FDEE017D
6 changed files with 149 additions and 74 deletions

View File

@ -21,6 +21,7 @@ import (
"io" "io"
"math" "math"
"net/http" "net/http"
"net/http/httptest"
"os" "os"
"os/exec" "os/exec"
"path/filepath" "path/filepath"
@ -881,3 +882,86 @@ scrape_configs:
}) })
} }
} }
// This test verifies that metrics for the highest timestamps per queue account for relabelling.
// See: https://github.com/prometheus/prometheus/pull/17065.
func TestRemoteWrite_PerQueueMetricsAfterRelabeling(t *testing.T) {
t.Parallel()
tmpDir := t.TempDir()
configFile := filepath.Join(tmpDir, "prometheus.yml")
port := testutil.RandomUnprivilegedPort(t)
targetPort := testutil.RandomUnprivilegedPort(t)
server := httptest.NewServer(http.HandlerFunc(func(http.ResponseWriter, *http.Request) {
panic("should never be reached")
}))
t.Cleanup(server.Close)
// Simulate a remote write relabeling that doesn't yield any series.
config := fmt.Sprintf(`
global:
scrape_interval: 1s
scrape_configs:
- job_name: 'self'
static_configs:
- targets: ['localhost:%d']
- job_name: 'target'
static_configs:
- targets: ['localhost:%d']
remote_write:
- url: %s
write_relabel_configs:
- source_labels: [job,__name__]
regex: 'target,special_metric'
action: keep
`, port, targetPort, server.URL)
require.NoError(t, os.WriteFile(configFile, []byte(config), 0o777))
prom := prometheusCommandWithLogging(
t,
configFile,
port,
fmt.Sprintf("--storage.tsdb.path=%s", tmpDir),
)
require.NoError(t, prom.Start())
require.Eventually(t, func() bool {
r, err := http.Get(fmt.Sprintf("http://127.0.0.1:%d/metrics", port))
if err != nil {
return false
}
defer r.Body.Close()
if r.StatusCode != http.StatusOK {
return false
}
metrics, err := io.ReadAll(r.Body)
if err != nil {
return false
}
gHighestTimestamp, err := getMetricValue(t, bytes.NewReader(metrics), model.MetricTypeGauge, "prometheus_remote_storage_highest_timestamp_in_seconds")
// The highest timestamp at storage level sees all samples, it should also consider the ones that are filtered out by relabeling.
if err != nil || gHighestTimestamp == 0 {
return false
}
// The queue shouldn't see and send any sample, all samples are dropped due to relabeling, the metrics should reflect that.
droppedSamples, err := getMetricValue(t, bytes.NewReader(metrics), model.MetricTypeCounter, "prometheus_remote_storage_samples_dropped_total")
if err != nil || droppedSamples == 0 {
return false
}
highestTimestamp, err := getMetricValue(t, bytes.NewReader(metrics), model.MetricTypeGauge, "prometheus_remote_storage_queue_highest_timestamp_seconds")
require.NoError(t, err)
require.Zero(t, highestTimestamp)
highestSentTimestamp, err := getMetricValue(t, bytes.NewReader(metrics), model.MetricTypeGauge, "prometheus_remote_storage_queue_highest_sent_timestamp_seconds")
require.NoError(t, err)
require.Zero(t, highestSentTimestamp)
return true
}, 10*time.Second, 100*time.Millisecond)
}

View File

@ -212,8 +212,8 @@
# Without max_over_time, failed scrapes could create false negatives, see # Without max_over_time, failed scrapes could create false negatives, see
# https://www.robustperception.io/alerting-on-gauges-in-prometheus-2-0 for details. # https://www.robustperception.io/alerting-on-gauges-in-prometheus-2-0 for details.
( (
max_over_time(prometheus_remote_storage_highest_timestamp_in_seconds{%(prometheusSelector)s}[5m]) max_over_time(prometheus_remote_storage_queue_highest_timestamp_seconds{%(prometheusSelector)s}[5m])
- ignoring(remote_name, url) group_right -
max_over_time(prometheus_remote_storage_queue_highest_sent_timestamp_seconds{%(prometheusSelector)s}[5m]) max_over_time(prometheus_remote_storage_queue_highest_sent_timestamp_seconds{%(prometheusSelector)s}[5m])
) )
> 120 > 120

View File

@ -527,7 +527,7 @@ local row = panel.row;
; ;
local timestampComparison = local timestampComparison =
panel.timeSeries.new('Highest Timestamp In vs. Highest Timestamp Sent') panel.timeSeries.new('Highest Enqueued Timestamp vs. Highest Timestamp Sent')
+ panelTimeSeriesStdOptions + panelTimeSeriesStdOptions
+ panel.timeSeries.standardOptions.withUnit('short') + panel.timeSeries.standardOptions.withUnit('short')
+ panel.timeSeries.queryOptions.withTargets([ + panel.timeSeries.queryOptions.withTargets([
@ -535,9 +535,9 @@ local row = panel.row;
'$datasource', '$datasource',
||| |||
( (
prometheus_remote_storage_highest_timestamp_in_seconds{%(clusterLabel)s=~"$cluster", instance=~"$instance"} prometheus_remote_storage_queue_highest_timestamp_seconds{%(clusterLabel)s=~"$cluster", instance=~"$instance", url=~"$url"}
- -
ignoring(remote_name, url) group_right(instance) (prometheus_remote_storage_queue_highest_sent_timestamp_seconds{%(clusterLabel)s=~"$cluster", instance=~"$instance", url=~"$url"} != 0) prometheus_remote_storage_queue_highest_sent_timestamp_seconds{%(clusterLabel)s=~"$cluster", instance=~"$instance", url=~"$url"}
) )
||| % $._config ||| % $._config
) )
@ -555,9 +555,9 @@ local row = panel.row;
'$datasource', '$datasource',
||| |||
clamp_min( clamp_min(
rate(prometheus_remote_storage_highest_timestamp_in_seconds{%(clusterLabel)s=~"$cluster", instance=~"$instance"}[5m]) rate(prometheus_remote_storage_queue_highest_timestamp_seconds{%(clusterLabel)s=~"$cluster", instance=~"$instance", url=~"$url"}[5m])
- -
ignoring (remote_name, url) group_right(instance) rate(prometheus_remote_storage_queue_highest_sent_timestamp_seconds{%(clusterLabel)s=~"$cluster", instance=~"$instance", url=~"$url"}[5m]) rate(prometheus_remote_storage_queue_highest_sent_timestamp_seconds{%(clusterLabel)s=~"$cluster", instance=~"$instance", url=~"$url"}[5m])
, 0) , 0)
||| % $._config ||| % $._config
) )

View File

@ -82,6 +82,7 @@ type queueManagerMetrics struct {
droppedHistogramsTotal *prometheus.CounterVec droppedHistogramsTotal *prometheus.CounterVec
enqueueRetriesTotal prometheus.Counter enqueueRetriesTotal prometheus.Counter
sentBatchDuration prometheus.Histogram sentBatchDuration prometheus.Histogram
highestTimestamp *maxTimestamp
highestSentTimestamp *maxTimestamp highestSentTimestamp *maxTimestamp
pendingSamples prometheus.Gauge pendingSamples prometheus.Gauge
pendingExemplars prometheus.Gauge pendingExemplars prometheus.Gauge
@ -228,12 +229,21 @@ func newQueueManagerMetrics(r prometheus.Registerer, rn, e string) *queueManager
NativeHistogramMaxBucketNumber: 100, NativeHistogramMaxBucketNumber: 100,
NativeHistogramMinResetDuration: 1 * time.Hour, NativeHistogramMinResetDuration: 1 * time.Hour,
}) })
m.highestTimestamp = &maxTimestamp{
Gauge: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "queue_highest_timestamp_seconds",
Help: "Highest timestamp that was enqueued, in seconds since epoch. Initialized to 0 when no data has been received yet.",
ConstLabels: constLabels,
}),
}
m.highestSentTimestamp = &maxTimestamp{ m.highestSentTimestamp = &maxTimestamp{
Gauge: prometheus.NewGauge(prometheus.GaugeOpts{ Gauge: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace, Namespace: namespace,
Subsystem: subsystem, Subsystem: subsystem,
Name: "queue_highest_sent_timestamp_seconds", Name: "queue_highest_sent_timestamp_seconds",
Help: "Timestamp from a WAL sample, the highest timestamp successfully sent by this queue, in seconds since epoch. Initialized to 0 when no data has been sent yet.", Help: "Highest timestamp successfully sent by this queue, in seconds since epoch. Initialized to 0 when no data has been sent yet.",
ConstLabels: constLabels, ConstLabels: constLabels,
}), }),
} }
@ -338,6 +348,7 @@ func (m *queueManagerMetrics) register() {
m.droppedHistogramsTotal, m.droppedHistogramsTotal,
m.enqueueRetriesTotal, m.enqueueRetriesTotal,
m.sentBatchDuration, m.sentBatchDuration,
m.highestTimestamp,
m.highestSentTimestamp, m.highestSentTimestamp,
m.pendingSamples, m.pendingSamples,
m.pendingExemplars, m.pendingExemplars,
@ -373,6 +384,7 @@ func (m *queueManagerMetrics) unregister() {
m.reg.Unregister(m.droppedHistogramsTotal) m.reg.Unregister(m.droppedHistogramsTotal)
m.reg.Unregister(m.enqueueRetriesTotal) m.reg.Unregister(m.enqueueRetriesTotal)
m.reg.Unregister(m.sentBatchDuration) m.reg.Unregister(m.sentBatchDuration)
m.reg.Unregister(m.highestTimestamp)
m.reg.Unregister(m.highestSentTimestamp) m.reg.Unregister(m.highestSentTimestamp)
m.reg.Unregister(m.pendingSamples) m.reg.Unregister(m.pendingSamples)
m.reg.Unregister(m.pendingExemplars) m.reg.Unregister(m.pendingExemplars)
@ -440,11 +452,10 @@ type QueueManager struct {
quit chan struct{} quit chan struct{}
wg sync.WaitGroup wg sync.WaitGroup
dataIn, dataDropped, dataOut, dataOutDuration *ewmaRate dataIn, dataOut, dataOutDuration *ewmaRate
metrics *queueManagerMetrics metrics *queueManagerMetrics
interner *pool interner *pool
highestRecvTimestamp *maxTimestamp
} }
// NewQueueManager builds a new QueueManager and starts a new // NewQueueManager builds a new QueueManager and starts a new
@ -458,7 +469,6 @@ func NewQueueManager(
readerMetrics *wlog.LiveReaderMetrics, readerMetrics *wlog.LiveReaderMetrics,
logger *slog.Logger, logger *slog.Logger,
dir string, dir string,
samplesIn *ewmaRate,
cfg config.QueueConfig, cfg config.QueueConfig,
mCfg config.MetadataConfig, mCfg config.MetadataConfig,
externalLabels labels.Labels, externalLabels labels.Labels,
@ -466,7 +476,6 @@ func NewQueueManager(
client WriteClient, client WriteClient,
flushDeadline time.Duration, flushDeadline time.Duration,
interner *pool, interner *pool,
highestRecvTimestamp *maxTimestamp,
sm ReadyScrapeManager, sm ReadyScrapeManager,
enableExemplarRemoteWrite bool, enableExemplarRemoteWrite bool,
enableNativeHistogramRemoteWrite bool, enableNativeHistogramRemoteWrite bool,
@ -506,14 +515,12 @@ func NewQueueManager(
reshardChan: make(chan int), reshardChan: make(chan int),
quit: make(chan struct{}), quit: make(chan struct{}),
dataIn: samplesIn, dataIn: newEWMARate(ewmaWeight, shardUpdateDuration),
dataDropped: newEWMARate(ewmaWeight, shardUpdateDuration),
dataOut: newEWMARate(ewmaWeight, shardUpdateDuration), dataOut: newEWMARate(ewmaWeight, shardUpdateDuration),
dataOutDuration: newEWMARate(ewmaWeight, shardUpdateDuration), dataOutDuration: newEWMARate(ewmaWeight, shardUpdateDuration),
metrics: metrics, metrics: metrics,
interner: interner, interner: interner,
highestRecvTimestamp: highestRecvTimestamp,
protoMsg: protoMsg, protoMsg: protoMsg,
compr: compression.Snappy, // Hardcoded for now, but scaffolding exists for likely future use. compr: compression.Snappy, // Hardcoded for now, but scaffolding exists for likely future use.
@ -703,7 +710,6 @@ outer:
t.seriesMtx.Lock() t.seriesMtx.Lock()
lbls, ok := t.seriesLabels[s.Ref] lbls, ok := t.seriesLabels[s.Ref]
if !ok { if !ok {
t.dataDropped.incr(1)
if _, ok := t.droppedSeries[s.Ref]; !ok { if _, ok := t.droppedSeries[s.Ref]; !ok {
t.logger.Info("Dropped sample for series that was not explicitly dropped via relabelling", "ref", s.Ref) t.logger.Info("Dropped sample for series that was not explicitly dropped via relabelling", "ref", s.Ref)
t.metrics.droppedSamplesTotal.WithLabelValues(reasonUnintentionalDroppedSeries).Inc() t.metrics.droppedSamplesTotal.WithLabelValues(reasonUnintentionalDroppedSeries).Inc()
@ -765,8 +771,6 @@ outer:
t.seriesMtx.Lock() t.seriesMtx.Lock()
lbls, ok := t.seriesLabels[e.Ref] lbls, ok := t.seriesLabels[e.Ref]
if !ok { if !ok {
// Track dropped exemplars in the same EWMA for sharding calc.
t.dataDropped.incr(1)
if _, ok := t.droppedSeries[e.Ref]; !ok { if _, ok := t.droppedSeries[e.Ref]; !ok {
t.logger.Info("Dropped exemplar for series that was not explicitly dropped via relabelling", "ref", e.Ref) t.logger.Info("Dropped exemplar for series that was not explicitly dropped via relabelling", "ref", e.Ref)
t.metrics.droppedExemplarsTotal.WithLabelValues(reasonUnintentionalDroppedSeries).Inc() t.metrics.droppedExemplarsTotal.WithLabelValues(reasonUnintentionalDroppedSeries).Inc()
@ -822,7 +826,6 @@ outer:
t.seriesMtx.Lock() t.seriesMtx.Lock()
lbls, ok := t.seriesLabels[h.Ref] lbls, ok := t.seriesLabels[h.Ref]
if !ok { if !ok {
t.dataDropped.incr(1)
if _, ok := t.droppedSeries[h.Ref]; !ok { if _, ok := t.droppedSeries[h.Ref]; !ok {
t.logger.Info("Dropped histogram for series that was not explicitly dropped via relabelling", "ref", h.Ref) t.logger.Info("Dropped histogram for series that was not explicitly dropped via relabelling", "ref", h.Ref)
t.metrics.droppedHistogramsTotal.WithLabelValues(reasonUnintentionalDroppedSeries).Inc() t.metrics.droppedHistogramsTotal.WithLabelValues(reasonUnintentionalDroppedSeries).Inc()
@ -877,7 +880,6 @@ outer:
t.seriesMtx.Lock() t.seriesMtx.Lock()
lbls, ok := t.seriesLabels[h.Ref] lbls, ok := t.seriesLabels[h.Ref]
if !ok { if !ok {
t.dataDropped.incr(1)
if _, ok := t.droppedSeries[h.Ref]; !ok { if _, ok := t.droppedSeries[h.Ref]; !ok {
t.logger.Info("Dropped histogram for series that was not explicitly dropped via relabelling", "ref", h.Ref) t.logger.Info("Dropped histogram for series that was not explicitly dropped via relabelling", "ref", h.Ref)
t.metrics.droppedHistogramsTotal.WithLabelValues(reasonUnintentionalDroppedSeries).Inc() t.metrics.droppedHistogramsTotal.WithLabelValues(reasonUnintentionalDroppedSeries).Inc()
@ -1107,8 +1109,8 @@ func (t *QueueManager) shouldReshard(desiredShards int) bool {
// outlined in this functions implementation. It is up to the caller to reshard, or not, // outlined in this functions implementation. It is up to the caller to reshard, or not,
// based on the return value. // based on the return value.
func (t *QueueManager) calculateDesiredShards() int { func (t *QueueManager) calculateDesiredShards() int {
t.dataIn.tick()
t.dataOut.tick() t.dataOut.tick()
t.dataDropped.tick()
t.dataOutDuration.tick() t.dataOutDuration.tick()
// We use the number of incoming samples as a prediction of how much work we // We use the number of incoming samples as a prediction of how much work we
@ -1118,13 +1120,12 @@ func (t *QueueManager) calculateDesiredShards() int {
var ( var (
dataInRate = t.dataIn.rate() dataInRate = t.dataIn.rate()
dataOutRate = t.dataOut.rate() dataOutRate = t.dataOut.rate()
dataKeptRatio = dataOutRate / (t.dataDropped.rate() + dataOutRate)
dataOutDuration = t.dataOutDuration.rate() / float64(time.Second) dataOutDuration = t.dataOutDuration.rate() / float64(time.Second)
dataPendingRate = dataInRate*dataKeptRatio - dataOutRate dataPendingRate = dataInRate - dataOutRate
highestSent = t.metrics.highestSentTimestamp.Get() highestSent = t.metrics.highestSentTimestamp.Get()
highestRecv = t.highestRecvTimestamp.Get() highestRecv = t.metrics.highestTimestamp.Get()
delay = highestRecv - highestSent delay = highestRecv - highestSent
dataPending = delay * dataInRate * dataKeptRatio dataPending = delay * dataInRate
) )
if dataOutRate <= 0 { if dataOutRate <= 0 {
@ -1136,13 +1137,12 @@ func (t *QueueManager) calculateDesiredShards() int {
backlogCatchup = 0.05 * dataPending backlogCatchup = 0.05 * dataPending
// Calculate Time to send one sample, averaged across all sends done this tick. // Calculate Time to send one sample, averaged across all sends done this tick.
timePerSample = dataOutDuration / dataOutRate timePerSample = dataOutDuration / dataOutRate
desiredShards = timePerSample * (dataInRate*dataKeptRatio + backlogCatchup) desiredShards = timePerSample * (dataInRate + backlogCatchup)
) )
t.metrics.desiredNumShards.Set(desiredShards) t.metrics.desiredNumShards.Set(desiredShards)
t.logger.Debug("QueueManager.calculateDesiredShards", t.logger.Debug("QueueManager.calculateDesiredShards",
"dataInRate", dataInRate, "dataInRate", dataInRate,
"dataOutRate", dataOutRate, "dataOutRate", dataOutRate,
"dataKeptRatio", dataKeptRatio,
"dataPendingRate", dataPendingRate, "dataPendingRate", dataPendingRate,
"dataPending", dataPending, "dataPending", dataPending,
"dataOutDuration", dataOutDuration, "dataOutDuration", dataOutDuration,
@ -1331,7 +1331,11 @@ func (s *shards) enqueue(ref chunks.HeadSeriesRef, data timeSeries) bool {
case tHistogram, tFloatHistogram: case tHistogram, tFloatHistogram:
s.qm.metrics.pendingHistograms.Inc() s.qm.metrics.pendingHistograms.Inc()
s.enqueuedHistograms.Inc() s.enqueuedHistograms.Inc()
default:
return true
} }
s.qm.metrics.highestTimestamp.Set(float64(data.timestamp / 1000))
s.qm.dataIn.incr(1)
return true return true
} }
} }

View File

@ -53,17 +53,6 @@ import (
const defaultFlushDeadline = 1 * time.Minute const defaultFlushDeadline = 1 * time.Minute
func newHighestTimestampMetric() *maxTimestamp {
return &maxTimestamp{
Gauge: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "highest_timestamp_in_seconds",
Help: "Highest timestamp that has come into the remote storage via the Appender interface, in seconds since epoch. Initialized to 0 when no data has been received yet",
}),
}
}
func TestBasicContentNegotiation(t *testing.T) { func TestBasicContentNegotiation(t *testing.T) {
t.Parallel() t.Parallel()
queueConfig := config.DefaultQueueConfig queueConfig := config.DefaultQueueConfig
@ -323,7 +312,7 @@ func newTestClientAndQueueManager(t testing.TB, flushDeadline time.Duration, pro
func newTestQueueManager(t testing.TB, cfg config.QueueConfig, mcfg config.MetadataConfig, deadline time.Duration, c WriteClient, protoMsg config.RemoteWriteProtoMsg) *QueueManager { func newTestQueueManager(t testing.TB, cfg config.QueueConfig, mcfg config.MetadataConfig, deadline time.Duration, c WriteClient, protoMsg config.RemoteWriteProtoMsg) *QueueManager {
dir := t.TempDir() dir := t.TempDir()
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, false, protoMsg) m := NewQueueManager(metrics, nil, nil, nil, dir, cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), nil, false, false, false, protoMsg)
return m return m
} }
@ -783,7 +772,7 @@ func TestDisableReshardOnRetry(t *testing.T) {
} }
) )
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, 0, newPool(), newHighestTimestampMetric(), nil, false, false, false, config.RemoteWriteProtoMsgV1) m := NewQueueManager(metrics, nil, nil, nil, "", cfg, mcfg, labels.EmptyLabels(), nil, client, 0, newPool(), nil, false, false, false, config.RemoteWriteProtoMsgV1)
m.StoreSeries(fakeSeries, 0) m.StoreSeries(fakeSeries, 0)
// Attempt to samples while the manager is running. We immediately stop the // Attempt to samples while the manager is running. We immediately stop the
@ -1460,7 +1449,8 @@ func BenchmarkStoreSeries(b *testing.B) {
cfg := config.DefaultQueueConfig cfg := config.DefaultQueueConfig
mcfg := config.DefaultMetadataConfig mcfg := config.DefaultMetadataConfig
metrics := newQueueManagerMetrics(nil, "", "") metrics := newQueueManagerMetrics(nil, "", "")
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false, config.RemoteWriteProtoMsgV1)
m := NewQueueManager(metrics, nil, nil, nil, dir, cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), nil, false, false, false, config.RemoteWriteProtoMsgV1)
m.externalLabels = tc.externalLabels m.externalLabels = tc.externalLabels
m.relabelConfigs = tc.relabelConfigs m.relabelConfigs = tc.relabelConfigs
@ -1560,9 +1550,8 @@ func TestCalculateDesiredShards(t *testing.T) {
addSamples := func(s int64, ts time.Duration) { addSamples := func(s int64, ts time.Duration) {
pendingSamples += s pendingSamples += s
samplesIn.incr(s) samplesIn.incr(s)
samplesIn.tick()
m.highestRecvTimestamp.Set(float64(startedAt.Add(ts).Unix())) m.metrics.highestTimestamp.Set(float64(startedAt.Add(ts).Unix()))
} }
// helper function for sending samples. // helper function for sending samples.
@ -1619,7 +1608,6 @@ func TestCalculateDesiredShardsDetail(t *testing.T) {
prevShards int prevShards int
dataIn int64 // Quantities normalised to seconds. dataIn int64 // Quantities normalised to seconds.
dataOut int64 dataOut int64
dataDropped int64
dataOutDuration float64 dataOutDuration float64
backlog float64 backlog float64
expectedShards int expectedShards int
@ -1766,11 +1754,9 @@ func TestCalculateDesiredShardsDetail(t *testing.T) {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
m.numShards = tc.prevShards m.numShards = tc.prevShards
forceEMWA(samplesIn, tc.dataIn*int64(shardUpdateDuration/time.Second)) forceEMWA(samplesIn, tc.dataIn*int64(shardUpdateDuration/time.Second))
samplesIn.tick()
forceEMWA(m.dataOut, tc.dataOut*int64(shardUpdateDuration/time.Second)) forceEMWA(m.dataOut, tc.dataOut*int64(shardUpdateDuration/time.Second))
forceEMWA(m.dataDropped, tc.dataDropped*int64(shardUpdateDuration/time.Second))
forceEMWA(m.dataOutDuration, int64(tc.dataOutDuration*float64(shardUpdateDuration))) forceEMWA(m.dataOutDuration, int64(tc.dataOutDuration*float64(shardUpdateDuration)))
m.highestRecvTimestamp.value = tc.backlog // Not Set() because it can only increase value. m.metrics.highestTimestamp.value = tc.backlog // Not Set() because it can only increase value.
require.Equal(t, tc.expectedShards, m.calculateDesiredShards()) require.Equal(t, tc.expectedShards, m.calculateDesiredShards())
}) })
@ -2481,3 +2467,27 @@ func TestPopulateV2TimeSeries_TypeAndUnitLabels(t *testing.T) {
}) })
} }
} }
func TestHighestTimestampOnAppend(t *testing.T) {
for _, protoMsg := range []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1, config.RemoteWriteProtoMsgV2} {
t.Run(fmt.Sprint(protoMsg), func(t *testing.T) {
nSamples := 11 * config.DefaultQueueConfig.Capacity
nSeries := 3
samples, series := createTimeseries(nSamples, nSeries)
_, m := newTestClientAndQueueManager(t, defaultFlushDeadline, protoMsg)
m.Start()
defer m.Stop()
require.Equal(t, 0.0, m.metrics.highestTimestamp.Get())
m.StoreSeries(series, 0)
require.True(t, m.Append(samples))
// Check that Append sets the highest timestamp correctly.
highestTs := float64((nSamples - 1) / 1000)
require.Greater(t, highestTs, 0.0)
require.Equal(t, highestTs, m.metrics.highestTimestamp.Get())
})
}
}

View File

@ -66,11 +66,9 @@ type WriteStorage struct {
externalLabels labels.Labels externalLabels labels.Labels
dir string dir string
queues map[string]*QueueManager queues map[string]*QueueManager
samplesIn *ewmaRate
flushDeadline time.Duration flushDeadline time.Duration
interner *pool interner *pool
scraper ReadyScrapeManager scraper ReadyScrapeManager
quit chan struct{}
// For timestampTracker. // For timestampTracker.
highestTimestamp *maxTimestamp highestTimestamp *maxTimestamp
@ -89,11 +87,9 @@ func NewWriteStorage(logger *slog.Logger, reg prometheus.Registerer, dir string,
logger: logger, logger: logger,
reg: reg, reg: reg,
flushDeadline: flushDeadline, flushDeadline: flushDeadline,
samplesIn: newEWMARate(ewmaWeight, shardUpdateDuration),
dir: dir, dir: dir,
interner: newPool(), interner: newPool(),
scraper: sm, scraper: sm,
quit: make(chan struct{}),
highestTimestamp: &maxTimestamp{ highestTimestamp: &maxTimestamp{
Gauge: prometheus.NewGauge(prometheus.GaugeOpts{ Gauge: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace, Namespace: namespace,
@ -107,23 +103,9 @@ func NewWriteStorage(logger *slog.Logger, reg prometheus.Registerer, dir string,
if reg != nil { if reg != nil {
reg.MustRegister(rws.highestTimestamp) reg.MustRegister(rws.highestTimestamp)
} }
go rws.run()
return rws return rws
} }
func (rws *WriteStorage) run() {
ticker := time.NewTicker(shardUpdateDuration)
defer ticker.Stop()
for {
select {
case <-ticker.C:
rws.samplesIn.tick()
case <-rws.quit:
return
}
}
}
func (rws *WriteStorage) Notify() { func (rws *WriteStorage) Notify() {
rws.mtx.Lock() rws.mtx.Lock()
defer rws.mtx.Unlock() defer rws.mtx.Unlock()
@ -201,7 +183,6 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
rws.liveReaderMetrics, rws.liveReaderMetrics,
rws.logger, rws.logger,
rws.dir, rws.dir,
rws.samplesIn,
rwConf.QueueConfig, rwConf.QueueConfig,
rwConf.MetadataConfig, rwConf.MetadataConfig,
conf.GlobalConfig.ExternalLabels, conf.GlobalConfig.ExternalLabels,
@ -209,7 +190,6 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error {
c, c,
rws.flushDeadline, rws.flushDeadline,
rws.interner, rws.interner,
rws.highestTimestamp,
rws.scraper, rws.scraper,
rwConf.SendExemplars, rwConf.SendExemplars,
rwConf.SendNativeHistograms, rwConf.SendNativeHistograms,
@ -270,7 +250,6 @@ func (rws *WriteStorage) Close() error {
for _, q := range rws.queues { for _, q := range rws.queues {
q.Stop() q.Stop()
} }
close(rws.quit)
rws.watcherMetrics.Unregister() rws.watcherMetrics.Unregister()
rws.liveReaderMetrics.Unregister() rws.liveReaderMetrics.Unregister()
@ -346,8 +325,6 @@ func (*timestampTracker) UpdateMetadata(storage.SeriesRef, labels.Labels, metada
// Commit implements storage.Appender. // Commit implements storage.Appender.
func (t *timestampTracker) Commit() error { func (t *timestampTracker) Commit() error {
t.writeStorage.samplesIn.incr(t.samples + t.exemplars + t.histograms)
samplesIn.Add(float64(t.samples)) samplesIn.Add(float64(t.samples))
exemplarsIn.Add(float64(t.exemplars)) exemplarsIn.Add(float64(t.exemplars))
histogramsIn.Add(float64(t.histograms)) histogramsIn.Add(float64(t.histograms))