diff --git a/scrape/scrape.go b/scrape/scrape.go index 4d23efdbc8..830463982d 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -977,8 +977,8 @@ type scrapeCache struct { // seriesCur and seriesPrev store the labels of series that were seen // in the current and previous scrape. // We hold two maps and swap them out to save allocations. - seriesCur map[uint64]labels.Labels - seriesPrev map[uint64]labels.Labels + seriesCur map[uint64]*cacheEntry + seriesPrev map[uint64]*cacheEntry // TODO(bwplotka): Consider moving Metadata API to use WAL instead of scrape loop to // avoid locking (using metadata API can block scraping). @@ -1005,8 +1005,8 @@ func newScrapeCache(metrics *scrapeMetrics) *scrapeCache { return &scrapeCache{ series: map[string]*cacheEntry{}, droppedSeries: map[string]*uint64{}, - seriesCur: map[uint64]labels.Labels{}, - seriesPrev: map[uint64]labels.Labels{}, + seriesCur: map[uint64]*cacheEntry{}, + seriesPrev: map[uint64]*cacheEntry{}, metadata: map[string]*metaEntry{}, metrics: metrics, } @@ -1075,11 +1075,13 @@ func (c *scrapeCache) get(met []byte) (*cacheEntry, bool, bool) { return e, true, alreadyScraped } -func (c *scrapeCache) addRef(met []byte, ref storage.SeriesRef, lset labels.Labels, hash uint64) { +func (c *scrapeCache) addRef(met []byte, ref storage.SeriesRef, lset labels.Labels, hash uint64) (ce *cacheEntry) { if ref == 0 { - return + return nil } - c.series[string(met)] = &cacheEntry{ref: ref, lastIter: c.iter, lset: lset, hash: hash} + ce = &cacheEntry{ref: ref, lastIter: c.iter, lset: lset, hash: hash} + c.series[string(met)] = ce + return ce } func (c *scrapeCache) addDropped(met []byte) { @@ -1095,14 +1097,17 @@ func (c *scrapeCache) getDropped(met []byte) bool { return ok } -func (c *scrapeCache) trackStaleness(hash uint64, lset labels.Labels) { - c.seriesCur[hash] = lset +func (c *scrapeCache) trackStaleness(hash uint64, ce *cacheEntry) { + c.seriesCur[hash] = ce } -func (c *scrapeCache) forEachStale(f func(labels.Labels) bool) { - for h, lset := range c.seriesPrev { +func (c *scrapeCache) forEachStale(f func(storage.SeriesRef, labels.Labels) bool) { + for h, ce := range c.seriesPrev { + if ce == nil { + continue + } if _, ok := c.seriesCur[h]; !ok { - if !f(lset) { + if !f(ce.ref, ce.lset) { break } } @@ -1600,10 +1605,10 @@ type appendErrors struct { // Update the stale markers. func (sl *scrapeLoop) updateStaleMarkers(app storage.Appender, defTime int64) (err error) { - sl.cache.forEachStale(func(lset labels.Labels) bool { + sl.cache.forEachStale(func(ref storage.SeriesRef, lset labels.Labels) bool { // Series no longer exposed, mark it stale. app.SetOptions(&storage.AppendOptions{DiscardOutOfOrder: true}) - _, err = app.Append(0, lset, defTime, math.Float64frombits(value.StaleNaN)) + _, err = app.Append(ref, lset, defTime, math.Float64frombits(value.StaleNaN)) app.SetOptions(nil) switch { case errors.Is(err, storage.ErrOutOfOrderSample), errors.Is(err, storage.ErrDuplicateSampleForTimestamp): @@ -1800,7 +1805,7 @@ loop: if err == nil { if (parsedTimestamp == nil || sl.trackTimestampsStaleness) && ce != nil { - sl.cache.trackStaleness(ce.hash, ce.lset) + sl.cache.trackStaleness(ce.hash, ce) } } @@ -1813,11 +1818,11 @@ loop: } if !seriesCached { + ce = sl.cache.addRef(met, ref, lset, hash) if parsedTimestamp == nil || sl.trackTimestampsStaleness { // Bypass staleness logic if there is an explicit timestamp. - sl.cache.trackStaleness(hash, lset) + sl.cache.trackStaleness(hash, ce) } - sl.cache.addRef(met, ref, lset, hash) if sampleAdded && sampleLimitErr == nil && bucketLimitErr == nil { seriesAdded++ }