Pass last know ref ID when injecting staleness markers
Currently all staleness markers are appended for any sample that disappears from scrape cache, even if that sample was never appended to TSDB. When staleness markers are appended they always use ref=0 as the SeriesRef, so the downstream appender doesn't know if the sample is for a know series or not. This changes the scrape cache so the map used for staleness tracking stores the cache entry instead of only the label set. Having the cache entry means: - we can ignore stale samples that didn't end up in TSDB (not in the scrape cache) - we can append them to TSDB using correct ref value, so the appender knows if they are for know or unknown series Signed-off-by: Lukasz Mierzwa <l.mierzwa@gmail.com>
This commit is contained in:
parent
1f7a23cced
commit
872f03766c
|
@ -977,8 +977,8 @@ type scrapeCache struct {
|
|||
// seriesCur and seriesPrev store the labels of series that were seen
|
||||
// in the current and previous scrape.
|
||||
// We hold two maps and swap them out to save allocations.
|
||||
seriesCur map[uint64]labels.Labels
|
||||
seriesPrev map[uint64]labels.Labels
|
||||
seriesCur map[uint64]*cacheEntry
|
||||
seriesPrev map[uint64]*cacheEntry
|
||||
|
||||
// TODO(bwplotka): Consider moving Metadata API to use WAL instead of scrape loop to
|
||||
// avoid locking (using metadata API can block scraping).
|
||||
|
@ -1005,8 +1005,8 @@ func newScrapeCache(metrics *scrapeMetrics) *scrapeCache {
|
|||
return &scrapeCache{
|
||||
series: map[string]*cacheEntry{},
|
||||
droppedSeries: map[string]*uint64{},
|
||||
seriesCur: map[uint64]labels.Labels{},
|
||||
seriesPrev: map[uint64]labels.Labels{},
|
||||
seriesCur: map[uint64]*cacheEntry{},
|
||||
seriesPrev: map[uint64]*cacheEntry{},
|
||||
metadata: map[string]*metaEntry{},
|
||||
metrics: metrics,
|
||||
}
|
||||
|
@ -1075,11 +1075,13 @@ func (c *scrapeCache) get(met []byte) (*cacheEntry, bool, bool) {
|
|||
return e, true, alreadyScraped
|
||||
}
|
||||
|
||||
func (c *scrapeCache) addRef(met []byte, ref storage.SeriesRef, lset labels.Labels, hash uint64) {
|
||||
func (c *scrapeCache) addRef(met []byte, ref storage.SeriesRef, lset labels.Labels, hash uint64) (ce *cacheEntry) {
|
||||
if ref == 0 {
|
||||
return
|
||||
return nil
|
||||
}
|
||||
c.series[string(met)] = &cacheEntry{ref: ref, lastIter: c.iter, lset: lset, hash: hash}
|
||||
ce = &cacheEntry{ref: ref, lastIter: c.iter, lset: lset, hash: hash}
|
||||
c.series[string(met)] = ce
|
||||
return ce
|
||||
}
|
||||
|
||||
func (c *scrapeCache) addDropped(met []byte) {
|
||||
|
@ -1095,14 +1097,17 @@ func (c *scrapeCache) getDropped(met []byte) bool {
|
|||
return ok
|
||||
}
|
||||
|
||||
func (c *scrapeCache) trackStaleness(hash uint64, lset labels.Labels) {
|
||||
c.seriesCur[hash] = lset
|
||||
func (c *scrapeCache) trackStaleness(hash uint64, ce *cacheEntry) {
|
||||
c.seriesCur[hash] = ce
|
||||
}
|
||||
|
||||
func (c *scrapeCache) forEachStale(f func(labels.Labels) bool) {
|
||||
for h, lset := range c.seriesPrev {
|
||||
func (c *scrapeCache) forEachStale(f func(storage.SeriesRef, labels.Labels) bool) {
|
||||
for h, ce := range c.seriesPrev {
|
||||
if ce == nil {
|
||||
continue
|
||||
}
|
||||
if _, ok := c.seriesCur[h]; !ok {
|
||||
if !f(lset) {
|
||||
if !f(ce.ref, ce.lset) {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
@ -1600,10 +1605,10 @@ type appendErrors struct {
|
|||
|
||||
// Update the stale markers.
|
||||
func (sl *scrapeLoop) updateStaleMarkers(app storage.Appender, defTime int64) (err error) {
|
||||
sl.cache.forEachStale(func(lset labels.Labels) bool {
|
||||
sl.cache.forEachStale(func(ref storage.SeriesRef, lset labels.Labels) bool {
|
||||
// Series no longer exposed, mark it stale.
|
||||
app.SetOptions(&storage.AppendOptions{DiscardOutOfOrder: true})
|
||||
_, err = app.Append(0, lset, defTime, math.Float64frombits(value.StaleNaN))
|
||||
_, err = app.Append(ref, lset, defTime, math.Float64frombits(value.StaleNaN))
|
||||
app.SetOptions(nil)
|
||||
switch {
|
||||
case errors.Is(err, storage.ErrOutOfOrderSample), errors.Is(err, storage.ErrDuplicateSampleForTimestamp):
|
||||
|
@ -1800,7 +1805,7 @@ loop:
|
|||
|
||||
if err == nil {
|
||||
if (parsedTimestamp == nil || sl.trackTimestampsStaleness) && ce != nil {
|
||||
sl.cache.trackStaleness(ce.hash, ce.lset)
|
||||
sl.cache.trackStaleness(ce.hash, ce)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1813,11 +1818,11 @@ loop:
|
|||
}
|
||||
|
||||
if !seriesCached {
|
||||
ce = sl.cache.addRef(met, ref, lset, hash)
|
||||
if parsedTimestamp == nil || sl.trackTimestampsStaleness {
|
||||
// Bypass staleness logic if there is an explicit timestamp.
|
||||
sl.cache.trackStaleness(hash, lset)
|
||||
sl.cache.trackStaleness(hash, ce)
|
||||
}
|
||||
sl.cache.addRef(met, ref, lset, hash)
|
||||
if sampleAdded && sampleLimitErr == nil && bucketLimitErr == nil {
|
||||
seriesAdded++
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue