Fix the embarrassing bug introduced in commit 0851945.
				
					
				
			In that commit, the 'maintainSeries' call was accidentally removed. This commit refactors things a bit so that there is now a clean 'maintainMemorySeries' and a 'maintainArchivedSeries' call. Straighten the nomenclature a bit (consistently use 'drop' for chunks and 'purge' for series/metrics). Remove the annoying 'Completed maintenance sweep through archived fingerprints' message if there were no archived fingerprints to do maintenance on.
This commit is contained in:
		
							parent
							
								
									041aa59623
								
							
						
					
					
						commit
						edd716e63c
					
				|  | @ -78,8 +78,8 @@ func (p *persistence) recoverFromCrash(fingerprintToSeries map[clientmodel.Finge | |||
| 				// Oops, head chunk was persisted, but nothing on disk.
 | ||||
| 				// Thus, we lost that series completely. Clean up the remnants.
 | ||||
| 				delete(fingerprintToSeries, fp) | ||||
| 				if err := p.dropArchivedMetric(fp); err != nil { | ||||
| 					// Dropping the archived metric didn't work, so try
 | ||||
| 				if err := p.purgeArchivedMetric(fp); err != nil { | ||||
| 					// Purging the archived metric didn't work, so try
 | ||||
| 					// to unindex it, just in case it's in the indexes.
 | ||||
| 					p.unindexMetric(fp, s.metric) | ||||
| 				} | ||||
|  |  | |||
|  | @ -67,7 +67,7 @@ const ( | |||
| 	unpin           = "unpin" // Excluding the unpin on persisting.
 | ||||
| 	clone           = "clone" | ||||
| 	transcode       = "transcode" | ||||
| 	purge           = "purge" | ||||
| 	drop            = "drop" | ||||
| 
 | ||||
| 	// Op-types for chunkOps and chunkDescOps.
 | ||||
| 	evict = "evict" | ||||
|  |  | |||
|  | @ -771,7 +771,7 @@ func (p *persistence) dropChunks(fp clientmodel.Fingerprint, beforeTime clientmo | |||
| 		if err == io.EOF { | ||||
| 			// We ran into the end of the file without finding any chunks that should
 | ||||
| 			// be kept. Remove the whole file.
 | ||||
| 			chunkOps.WithLabelValues(purge).Add(float64(i)) | ||||
| 			chunkOps.WithLabelValues(drop).Add(float64(i)) | ||||
| 			if err := os.Remove(f.Name()); err != nil { | ||||
| 				return 0, 0, true, err | ||||
| 			} | ||||
|  | @ -783,7 +783,7 @@ func (p *persistence) dropChunks(fp clientmodel.Fingerprint, beforeTime clientmo | |||
| 		lastTime := clientmodel.Timestamp(binary.LittleEndian.Uint64(timeBuf[8:])) | ||||
| 		if !lastTime.Before(beforeTime) { | ||||
| 			firstTime = clientmodel.Timestamp(binary.LittleEndian.Uint64(timeBuf)) | ||||
| 			chunkOps.WithLabelValues(purge).Add(float64(i)) | ||||
| 			chunkOps.WithLabelValues(drop).Add(float64(i)) | ||||
| 			break | ||||
| 		} | ||||
| 	} | ||||
|  | @ -824,7 +824,7 @@ func (p *persistence) indexMetric(fp clientmodel.Fingerprint, m clientmodel.Metr | |||
| // indexes used for getFingerprintsForLabelPair, getLabelValuesForLabelName, and
 | ||||
| // getFingerprintsModifiedBefore. The index of fingerprints to archived metrics
 | ||||
| // is not affected by this removal. (In fact, never call this method for an
 | ||||
| // archived metric. To drop an archived metric, call dropArchivedFingerprint.)
 | ||||
| // archived metric. To purge an archived metric, call purgeArchivedFingerprint.)
 | ||||
| // If the queue is full, this method blocks until the metric can be queued. This
 | ||||
| // method is goroutine-safe.
 | ||||
| func (p *persistence) unindexMetric(fp clientmodel.Fingerprint, m clientmodel.Metric) { | ||||
|  | @ -910,11 +910,11 @@ func (p *persistence) getArchivedMetric(fp clientmodel.Fingerprint) (clientmodel | |||
| 	return metric, err | ||||
| } | ||||
| 
 | ||||
| // dropArchivedMetric deletes an archived fingerprint and its corresponding
 | ||||
| // purgeArchivedMetric deletes an archived fingerprint and its corresponding
 | ||||
| // metric entirely. It also queues the metric for un-indexing (no need to call
 | ||||
| // unindexMetric for the deleted metric.)  The caller must have locked the
 | ||||
| // fingerprint.
 | ||||
| func (p *persistence) dropArchivedMetric(fp clientmodel.Fingerprint) (err error) { | ||||
| // unindexMetric for the deleted metric.) It does not touch the series file,
 | ||||
| // though. The caller must have locked the fingerprint.
 | ||||
| func (p *persistence) purgeArchivedMetric(fp clientmodel.Fingerprint) (err error) { | ||||
| 	defer func() { | ||||
| 		if err != nil { | ||||
| 			p.setDirty(true) | ||||
|  | @ -944,7 +944,7 @@ func (p *persistence) dropArchivedMetric(fp clientmodel.Fingerprint) (err error) | |||
| } | ||||
| 
 | ||||
| // unarchiveMetric deletes an archived fingerprint and its metric, but (in
 | ||||
| // contrast to dropArchivedMetric) does not un-index the metric.  If a metric
 | ||||
| // contrast to purgeArchivedMetric) does not un-index the metric.  If a metric
 | ||||
| // was actually deleted, the method returns true and the first time of the
 | ||||
| // deleted metric. The caller must have locked the fingerprint.
 | ||||
| func (p *persistence) unarchiveMetric(fp clientmodel.Fingerprint) ( | ||||
|  |  | |||
|  | @ -349,11 +349,11 @@ func TestDropArchivedMetric(t *testing.T) { | |||
| 		t.Error("want FP 2 archived") | ||||
| 	} | ||||
| 
 | ||||
| 	if err != p.dropArchivedMetric(1) { | ||||
| 	if err != p.purgeArchivedMetric(1) { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
| 	if err != p.dropArchivedMetric(3) { | ||||
| 		// Dropping something that has not beet archived is not an error.
 | ||||
| 	if err != p.purgeArchivedMetric(3) { | ||||
| 		// Purging something that has not beet archived is not an error.
 | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
| 	p.waitForIndexing() | ||||
|  |  | |||
|  | @ -242,11 +242,11 @@ func (s *memorySeries) evictChunkDescs(iOldestNotEvicted int) { | |||
| 	} | ||||
| } | ||||
| 
 | ||||
| // purgeOlderThan removes chunkDescs older than t. It returns the number of
 | ||||
| // purged chunkDescs and true if all chunkDescs have been purged.
 | ||||
| // dropChunks removes chunkDescs older than t. It returns the number of dropped
 | ||||
| // chunkDescs and true if all chunkDescs have been dropped.
 | ||||
| //
 | ||||
| // The caller must have locked the fingerprint of the series.
 | ||||
| func (s *memorySeries) purgeOlderThan(t clientmodel.Timestamp) (int, bool) { | ||||
| func (s *memorySeries) dropChunks(t clientmodel.Timestamp) (int, bool) { | ||||
| 	keepIdx := len(s.chunkDescs) | ||||
| 	for i, cd := range s.chunkDescs { | ||||
| 		if !cd.lastTime().Before(t) { | ||||
|  |  | |||
|  | @ -68,7 +68,7 @@ type memorySeriesStorage struct { | |||
| 
 | ||||
| 	loopStopping, loopStopped  chan struct{} | ||||
| 	maxMemoryChunks            int | ||||
| 	purgeAfter                 time.Duration | ||||
| 	dropAfter                  time.Duration | ||||
| 	checkpointInterval         time.Duration | ||||
| 	checkpointDirtySeriesLimit int | ||||
| 
 | ||||
|  | @ -96,7 +96,6 @@ type memorySeriesStorage struct { | |||
| 	seriesOps                   *prometheus.CounterVec | ||||
| 	ingestedSamplesCount        prometheus.Counter | ||||
| 	invalidPreloadRequestsCount prometheus.Counter | ||||
| 	purgeDuration               prometheus.Gauge | ||||
| } | ||||
| 
 | ||||
| // MemorySeriesStorageOptions contains options needed by
 | ||||
|  | @ -105,7 +104,7 @@ type memorySeriesStorage struct { | |||
| type MemorySeriesStorageOptions struct { | ||||
| 	MemoryChunks               int           // How many chunks to keep in memory.
 | ||||
| 	PersistenceStoragePath     string        // Location of persistence files.
 | ||||
| 	PersistenceRetentionPeriod time.Duration // Chunks at least that old are purged.
 | ||||
| 	PersistenceRetentionPeriod time.Duration // Chunks at least that old are dropped.
 | ||||
| 	PersistenceQueueCapacity   int           // Capacity of queue for chunks to be persisted.
 | ||||
| 	CheckpointInterval         time.Duration // How often to checkpoint the series map and head chunks.
 | ||||
| 	CheckpointDirtySeriesLimit int           // How many dirty series will trigger an early checkpoint.
 | ||||
|  | @ -140,7 +139,7 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) { | |||
| 		loopStopping:               make(chan struct{}), | ||||
| 		loopStopped:                make(chan struct{}), | ||||
| 		maxMemoryChunks:            o.MemoryChunks, | ||||
| 		purgeAfter:                 o.PersistenceRetentionPeriod, | ||||
| 		dropAfter:                  o.PersistenceRetentionPeriod, | ||||
| 		checkpointInterval:         o.CheckpointInterval, | ||||
| 		checkpointDirtySeriesLimit: o.CheckpointDirtySeriesLimit, | ||||
| 
 | ||||
|  | @ -670,8 +669,8 @@ func (s *memorySeriesStorage) persistChunks(fp clientmodel.Fingerprint, cds []*c | |||
| 
 | ||||
| // waitForNextFP waits an estimated duration, after which we want to process
 | ||||
| // another fingerprint so that we will process all fingerprints in a tenth of
 | ||||
| // s.purgeAfter assuming that the system is doing nothing else, e.g. if we want
 | ||||
| // to purge after 40h, we want to cycle through all fingerprints within
 | ||||
| // s.dropAfter assuming that the system is doing nothing else, e.g. if we want
 | ||||
| // to drop chunks after 40h, we want to cycle through all fingerprints within
 | ||||
| // 4h. However, the maximum sweep time is capped at fpMaxSweepTime. Furthermore,
 | ||||
| // this method will always wait for at least fpMinWaitDuration and never longer
 | ||||
| // than fpMaxWaitDuration. If s.loopStopped is closed, it will return false
 | ||||
|  | @ -680,7 +679,7 @@ func (s *memorySeriesStorage) persistChunks(fp clientmodel.Fingerprint, cds []*c | |||
| func (s *memorySeriesStorage) waitForNextFP(numberOfFPs int) bool { | ||||
| 	d := fpMaxWaitDuration | ||||
| 	if numberOfFPs != 0 { | ||||
| 		sweepTime := s.purgeAfter / 10 | ||||
| 		sweepTime := s.dropAfter / 10 | ||||
| 		if sweepTime > fpMaxSweepTime { | ||||
| 			sweepTime = fpMaxSweepTime | ||||
| 		} | ||||
|  | @ -725,6 +724,7 @@ func (s *memorySeriesStorage) cycleThroughMemoryFingerprints() chan clientmodel. | |||
| 			} | ||||
| 			begin := time.Now() | ||||
| 			fpIter = s.fpToSeries.fpIter() | ||||
| 			count := 0 | ||||
| 			for fp := range fpIter { | ||||
| 				select { | ||||
| 				case memoryFingerprints <- fp: | ||||
|  | @ -732,8 +732,14 @@ func (s *memorySeriesStorage) cycleThroughMemoryFingerprints() chan clientmodel. | |||
| 					return | ||||
| 				} | ||||
| 				s.waitForNextFP(s.fpToSeries.length()) | ||||
| 				count++ | ||||
| 			} | ||||
| 			if count > 0 { | ||||
| 				glog.Infof( | ||||
| 					"Completed maintenance sweep through %d in-memory fingerprints in %v.", | ||||
| 					count, time.Since(begin), | ||||
| 				) | ||||
| 			} | ||||
| 			glog.Infof("Completed maintenance sweep through in-memory fingerprints in %v.", time.Since(begin)) | ||||
| 		} | ||||
| 	}() | ||||
| 
 | ||||
|  | @ -750,7 +756,7 @@ func (s *memorySeriesStorage) cycleThroughArchivedFingerprints() chan clientmode | |||
| 
 | ||||
| 		for { | ||||
| 			archivedFPs, err := s.persistence.getFingerprintsModifiedBefore( | ||||
| 				clientmodel.TimestampFromTime(time.Now()).Add(-1 * s.purgeAfter), | ||||
| 				clientmodel.TimestampFromTime(time.Now()).Add(-s.dropAfter), | ||||
| 			) | ||||
| 			if err != nil { | ||||
| 				glog.Error("Failed to lookup archived fingerprint ranges: ", err) | ||||
|  | @ -770,7 +776,12 @@ func (s *memorySeriesStorage) cycleThroughArchivedFingerprints() chan clientmode | |||
| 				} | ||||
| 				s.waitForNextFP(len(archivedFPs)) | ||||
| 			} | ||||
| 			glog.Infof("Completed maintenance sweep through archived fingerprints in %v.", time.Since(begin)) | ||||
| 			if len(archivedFPs) > 0 { | ||||
| 				glog.Infof( | ||||
| 					"Completed maintenance sweep through %d archived fingerprints in %v.", | ||||
| 					len(archivedFPs), time.Since(begin), | ||||
| 				) | ||||
| 			} | ||||
| 		} | ||||
| 	}() | ||||
| 	return archivedFingerprints | ||||
|  | @ -807,11 +818,9 @@ loop: | |||
| 			headChunksPersistedSinceLastCheckpoint = 0 | ||||
| 			checkpointTimer.Reset(s.checkpointInterval) | ||||
| 		case fp := <-memoryFingerprints: | ||||
| 			s.purgeSeries(fp, clientmodel.TimestampFromTime(time.Now()).Add(-1*s.purgeAfter)) | ||||
| 			s.seriesOps.WithLabelValues(memoryMaintenance).Inc() | ||||
| 			s.maintainMemorySeries(fp, clientmodel.TimestampFromTime(time.Now()).Add(-s.dropAfter)) | ||||
| 		case fp := <-archivedFingerprints: | ||||
| 			s.purgeSeries(fp, clientmodel.TimestampFromTime(time.Now()).Add(-1*s.purgeAfter)) | ||||
| 			s.seriesOps.WithLabelValues(archiveMaintenance).Inc() | ||||
| 			s.maintainArchivedSeries(fp, clientmodel.TimestampFromTime(time.Now()).Add(-s.dropAfter)) | ||||
| 		case <-s.countPersistedHeadChunks: | ||||
| 			headChunksPersistedSinceLastCheckpoint++ | ||||
| 			// Check if we have enough "dirty" series so that we need an early checkpoint.
 | ||||
|  | @ -835,10 +844,11 @@ loop: | |||
| 	} | ||||
| } | ||||
| 
 | ||||
| // maintainSeries closes the head chunk if not touched in a while. It archives a
 | ||||
| // series if all chunks are evicted. It evicts chunkDescs if there are too
 | ||||
| // many.
 | ||||
| func (s *memorySeriesStorage) maintainSeries(fp clientmodel.Fingerprint) { | ||||
| // maintainMemorySeries first purges the series from old chunks. If the series
 | ||||
| // still exists after that, it proceeds with the following steps: It closes the
 | ||||
| // head chunk if it was not touched in a while. It archives a series if all
 | ||||
| // chunks are evicted. It evicts chunkDescs if there are too many.
 | ||||
| func (s *memorySeriesStorage) maintainMemorySeries(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) { | ||||
| 	var headChunkToPersist *chunkDesc | ||||
| 	s.fpLocker.Lock(fp) | ||||
| 	defer func() { | ||||
|  | @ -846,19 +856,28 @@ func (s *memorySeriesStorage) maintainSeries(fp clientmodel.Fingerprint) { | |||
| 		// Queue outside of lock!
 | ||||
| 		if headChunkToPersist != nil { | ||||
| 			s.persistQueue <- persistRequest{fp, headChunkToPersist} | ||||
| 		} | ||||
| 		// Count that a head chunk was persisted, but only best effort, i.e. we
 | ||||
| 		// don't want to block here.
 | ||||
| 		select { | ||||
| 		case s.countPersistedHeadChunks <- struct{}{}: // Counted.
 | ||||
| 		default: // Meh...
 | ||||
| 			// Count that a head chunk was persisted, but only best effort, i.e. we
 | ||||
| 			// don't want to block here.
 | ||||
| 			select { | ||||
| 			case s.countPersistedHeadChunks <- struct{}{}: // Counted.
 | ||||
| 			default: // Meh...
 | ||||
| 			} | ||||
| 		} | ||||
| 	}() | ||||
| 
 | ||||
| 	series, ok := s.fpToSeries.get(fp) | ||||
| 	if !ok { | ||||
| 		// Series is actually not in memory, perhaps archived or dropped in the meantime.
 | ||||
| 		return | ||||
| 	} | ||||
| 
 | ||||
| 	defer s.seriesOps.WithLabelValues(memoryMaintenance).Inc() | ||||
| 
 | ||||
| 	if s.purgeMemorySeries(fp, series, beforeTime) { | ||||
| 		// Series is gone now, we are done.
 | ||||
| 		return | ||||
| 	} | ||||
| 
 | ||||
| 	iOldestNotEvicted := -1 | ||||
| 	for i, cd := range series.chunkDescs { | ||||
| 		if !cd.isEvicted() { | ||||
|  | @ -876,7 +895,10 @@ func (s *memorySeriesStorage) maintainSeries(fp clientmodel.Fingerprint) { | |||
| 		if len(series.chunkDescs) == 0 { | ||||
| 			cds, err := s.loadChunkDescs(fp, clientmodel.Latest) | ||||
| 			if err != nil { | ||||
| 				glog.Errorf("Could not load chunk descriptors prior to archiving metric %v, metric will not be archived: %v", series.metric, err) | ||||
| 				glog.Errorf( | ||||
| 					"Could not load chunk descriptors prior to archiving metric %v, metric will not be archived: %v", | ||||
| 					series.metric, err, | ||||
| 				) | ||||
| 				return | ||||
| 			} | ||||
| 			series.chunkDescs = cds | ||||
|  | @ -902,38 +924,43 @@ func (s *memorySeriesStorage) maintainSeries(fp clientmodel.Fingerprint) { | |||
| 	} | ||||
| } | ||||
| 
 | ||||
| // purgeSeries purges chunks older than beforeTime from a series. If the series
 | ||||
| // contains no chunks after the purge, it is dropped entirely.
 | ||||
| func (s *memorySeriesStorage) purgeSeries(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) { | ||||
| // purgeMemorySeries drops chunks older than beforeTime from the provided memory
 | ||||
| // series. The caller must have locked fp. If the series contains no chunks
 | ||||
| // after dropping old chunks, it is purged entirely. In that case, the method
 | ||||
| // returns true.
 | ||||
| func (s *memorySeriesStorage) purgeMemorySeries(fp clientmodel.Fingerprint, series *memorySeries, beforeTime clientmodel.Timestamp) bool { | ||||
| 	if !series.firstTime().Before(beforeTime) { | ||||
| 		// Oldest sample not old enough.
 | ||||
| 		return false | ||||
| 	} | ||||
| 	newFirstTime, numDroppedFromPersistence, allDroppedFromPersistence, err := s.persistence.dropChunks(fp, beforeTime) | ||||
| 	if err != nil { | ||||
| 		glog.Error("Error dropping persisted chunks: ", err) | ||||
| 	} | ||||
| 	numDroppedFromMemory, allDroppedFromMemory := series.dropChunks(beforeTime) | ||||
| 	if allDroppedFromPersistence && allDroppedFromMemory { | ||||
| 		s.fpToSeries.del(fp) | ||||
| 		s.numSeries.Dec() | ||||
| 		s.seriesOps.WithLabelValues(memoryPurge).Inc() | ||||
| 		s.persistence.unindexMetric(fp, series.metric) | ||||
| 		return true | ||||
| 	} | ||||
| 	if series.chunkDescsOffset != -1 { | ||||
| 		series.savedFirstTime = newFirstTime | ||||
| 		series.chunkDescsOffset += numDroppedFromMemory - numDroppedFromPersistence | ||||
| 		if series.chunkDescsOffset < 0 { | ||||
| 			panic("dropped more chunks from persistence than from memory") | ||||
| 		} | ||||
| 	} | ||||
| 	return false | ||||
| } | ||||
| 
 | ||||
| // maintainArchivedSeries drops chunks older than beforeTime from an archived
 | ||||
| // series. If the series contains no chunks after that, it is purged entirely.
 | ||||
| func (s *memorySeriesStorage) maintainArchivedSeries(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) { | ||||
| 	s.fpLocker.Lock(fp) | ||||
| 	defer s.fpLocker.Unlock(fp) | ||||
| 
 | ||||
| 	if series, ok := s.fpToSeries.get(fp); ok { | ||||
| 		// Deal with series in memory.
 | ||||
| 		if !series.firstTime().Before(beforeTime) { | ||||
| 			// Oldest sample not old enough.
 | ||||
| 			return | ||||
| 		} | ||||
| 		newFirstTime, numDropped, allDropped, err := s.persistence.dropChunks(fp, beforeTime) | ||||
| 		if err != nil { | ||||
| 			glog.Error("Error purging persisted chunks: ", err) | ||||
| 		} | ||||
| 		numPurged, allPurged := series.purgeOlderThan(beforeTime) | ||||
| 		if allPurged && allDropped { | ||||
| 			s.fpToSeries.del(fp) | ||||
| 			s.numSeries.Dec() | ||||
| 			s.seriesOps.WithLabelValues(memoryPurge).Inc() | ||||
| 			s.persistence.unindexMetric(fp, series.metric) | ||||
| 		} else if series.chunkDescsOffset != -1 { | ||||
| 			series.savedFirstTime = newFirstTime | ||||
| 			series.chunkDescsOffset += numPurged - numDropped | ||||
| 			if series.chunkDescsOffset < 0 { | ||||
| 				panic("dropped more chunks from persistence than from memory") | ||||
| 			} | ||||
| 		} | ||||
| 		return | ||||
| 	} | ||||
| 	// Deal with archived series.
 | ||||
| 	has, firstTime, lastTime, err := s.persistence.hasArchivedMetric(fp) | ||||
| 	if err != nil { | ||||
| 		glog.Error("Error looking up archived time range: ", err) | ||||
|  | @ -944,13 +971,15 @@ func (s *memorySeriesStorage) purgeSeries(fp clientmodel.Fingerprint, beforeTime | |||
| 		return | ||||
| 	} | ||||
| 
 | ||||
| 	defer s.seriesOps.WithLabelValues(archiveMaintenance).Inc() | ||||
| 
 | ||||
| 	newFirstTime, _, allDropped, err := s.persistence.dropChunks(fp, beforeTime) | ||||
| 	if err != nil { | ||||
| 		glog.Error("Error purging persisted chunks: ", err) | ||||
| 		glog.Error("Error dropping persisted chunks: ", err) | ||||
| 	} | ||||
| 	if allDropped { | ||||
| 		if err := s.persistence.dropArchivedMetric(fp); err != nil { | ||||
| 			glog.Errorf("Error dropping archived metric for fingerprint %v: %v", fp, err) | ||||
| 		if err := s.persistence.purgeArchivedMetric(fp); err != nil { | ||||
| 			glog.Errorf("Error purging archived metric for fingerprint %v: %v", fp, err) | ||||
| 			return | ||||
| 		} | ||||
| 		s.seriesOps.WithLabelValues(archivePurge).Inc() | ||||
|  |  | |||
|  | @ -36,6 +36,9 @@ func TestGetFingerprintsForLabelMatchers(t *testing.T) { | |||
| // TestLoop is just a smoke test for the loop method, if we can switch it on and
 | ||||
| // off without disaster.
 | ||||
| func TestLoop(t *testing.T) { | ||||
| 	if testing.Short() { | ||||
| 		t.Skip("Skipping test in short mode.") | ||||
| 	} | ||||
| 	samples := make(clientmodel.Samples, 1000) | ||||
| 	for i := range samples { | ||||
| 		samples[i] = &clientmodel.Sample{ | ||||
|  | @ -57,8 +60,18 @@ func TestLoop(t *testing.T) { | |||
| 	} | ||||
| 	storage.Start() | ||||
| 	storage.AppendSamples(samples) | ||||
| 	time.Sleep(time.Second) | ||||
| 	storage.WaitForIndexing() | ||||
| 	series, _ := storage.(*memorySeriesStorage).fpToSeries.get(clientmodel.Metric{}.Fingerprint()) | ||||
| 	cdsBefore := len(series.chunkDescs) | ||||
| 	time.Sleep(fpMaxWaitDuration + time.Second) // TODO(beorn7): Ugh, need to wait for maintenance to kick in.
 | ||||
| 	cdsAfter := len(series.chunkDescs) | ||||
| 	storage.Stop() | ||||
| 	if cdsBefore <= cdsAfter { | ||||
| 		t.Errorf( | ||||
| 			"Number of chunk descriptors should have gone down by now. Got before %d, after %d.", | ||||
| 			cdsBefore, cdsAfter, | ||||
| 		) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func TestChunk(t *testing.T) { | ||||
|  | @ -337,15 +350,15 @@ func TestEvictAndPurgeSeries(t *testing.T) { | |||
| 	s, closer := NewTestStorage(t) | ||||
| 	defer closer.Close() | ||||
| 
 | ||||
| 	ms := s.(*memorySeriesStorage) // Going to test the internal purgeSeries method.
 | ||||
| 	ms := s.(*memorySeriesStorage) // Going to test the internal maintain.*Series methods.
 | ||||
| 
 | ||||
| 	s.AppendSamples(samples) | ||||
| 	s.WaitForIndexing() | ||||
| 
 | ||||
| 	fp := clientmodel.Metric{}.Fingerprint() | ||||
| 
 | ||||
| 	// Purge ~half of the chunks.
 | ||||
| 	ms.purgeSeries(fp, 1000) | ||||
| 	// Drop ~half of the chunks.
 | ||||
| 	ms.maintainMemorySeries(fp, 1000) | ||||
| 	it := s.NewIterator(fp) | ||||
| 	actual := it.GetBoundaryValues(metric.Interval{ | ||||
| 		OldestInclusive: 0, | ||||
|  | @ -362,8 +375,8 @@ func TestEvictAndPurgeSeries(t *testing.T) { | |||
| 		t.Errorf("2nd timestamp: want %v, got %v", want, actual[1].Timestamp) | ||||
| 	} | ||||
| 
 | ||||
| 	// Purge everything.
 | ||||
| 	ms.purgeSeries(fp, 10000) | ||||
| 	// Drop everything.
 | ||||
| 	ms.maintainMemorySeries(fp, 10000) | ||||
| 	it = s.NewIterator(fp) | ||||
| 	actual = it.GetBoundaryValues(metric.Interval{ | ||||
| 		OldestInclusive: 0, | ||||
|  | @ -403,18 +416,18 @@ func TestEvictAndPurgeSeries(t *testing.T) { | |||
| 		t.Fatal("not archived") | ||||
| 	} | ||||
| 
 | ||||
| 	// Purge ~half of the chunks of an archived series.
 | ||||
| 	ms.purgeSeries(fp, 1000) | ||||
| 	// Drop ~half of the chunks of an archived series.
 | ||||
| 	ms.maintainArchivedSeries(fp, 1000) | ||||
| 	archived, _, _, err = ms.persistence.hasArchivedMetric(fp) | ||||
| 	if err != nil { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
| 	if !archived { | ||||
| 		t.Fatal("archived series dropped although only half of the chunks purged") | ||||
| 		t.Fatal("archived series purged although only half of the chunks dropped") | ||||
| 	} | ||||
| 
 | ||||
| 	// Purge everything.
 | ||||
| 	ms.purgeSeries(fp, 10000) | ||||
| 	// Drop everything.
 | ||||
| 	ms.maintainArchivedSeries(fp, 10000) | ||||
| 	archived, _, _, err = ms.persistence.hasArchivedMetric(fp) | ||||
| 	if err != nil { | ||||
| 		t.Fatal(err) | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue