493 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			493 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Go
		
	
	
	
| // Copyright 2014 Prometheus Team
 | |
| // Licensed under the Apache License, Version 2.0 (the "License");
 | |
| // you may not use this file except in compliance with the License.
 | |
| // You may obtain a copy of the License at
 | |
| //
 | |
| // http://www.apache.org/licenses/LICENSE-2.0
 | |
| //
 | |
| // Unless required by applicable law or agreed to in writing, software
 | |
| // distributed under the License is distributed on an "AS IS" BASIS,
 | |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| // See the License for the specific language governing permissions and
 | |
| // limitations under the License.
 | |
| 
 | |
| // Package local contains the local time series storage used by Prometheus.
 | |
| package local
 | |
| 
 | |
| import (
 | |
| 	"fmt"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/golang/glog"
 | |
| 
 | |
| 	clientmodel "github.com/prometheus/client_golang/model"
 | |
| 	"github.com/prometheus/client_golang/prometheus"
 | |
| 	"github.com/prometheus/prometheus/storage/metric"
 | |
| )
 | |
| 
 | |
| const persistQueueCap = 1024
 | |
| 
 | |
| type storageState uint
 | |
| 
 | |
| const (
 | |
| 	storageStarting storageState = iota
 | |
| 	storageServing
 | |
| 	storageStopping
 | |
| )
 | |
| 
 | |
| type memorySeriesStorage struct {
 | |
| 	fpLocker *fingerprintLocker
 | |
| 
 | |
| 	persistDone chan bool
 | |
| 	stopServing chan chan<- bool
 | |
| 
 | |
| 	fingerprintToSeries *seriesMap
 | |
| 
 | |
| 	memoryEvictionInterval time.Duration
 | |
| 	memoryRetentionPeriod  time.Duration
 | |
| 
 | |
| 	persistencePurgeInterval   time.Duration
 | |
| 	persistenceRetentionPeriod time.Duration
 | |
| 
 | |
| 	persistQueue chan *persistRequest
 | |
| 	persistence  *persistence
 | |
| }
 | |
| 
 | |
| // MemorySeriesStorageOptions contains options needed by
 | |
| // NewMemorySeriesStorage. It is not safe to leave any of those at their zero
 | |
| // values.
 | |
| type MemorySeriesStorageOptions struct {
 | |
| 	MemoryEvictionInterval     time.Duration // How often to check for memory eviction.
 | |
| 	MemoryRetentionPeriod      time.Duration // Chunks at least that old are evicted from memory.
 | |
| 	PersistenceStoragePath     string        // Location of persistence files.
 | |
| 	PersistencePurgeInterval   time.Duration // How often to check for purging.
 | |
| 	PersistenceRetentionPeriod time.Duration // Chunks at least that old are purged.
 | |
| }
 | |
| 
 | |
| // NewMemorySeriesStorage returns a newly allocated Storage. Storage.Serve still
 | |
| // has to be called to start the storage.
 | |
| func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
 | |
| 	p, err := newPersistence(o.PersistenceStoragePath, 1024)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	glog.Info("Loading series map and head chunks...")
 | |
| 	fingerprintToSeries, err := p.loadSeriesMapAndHeads()
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	numSeries.Set(float64(fingerprintToSeries.length()))
 | |
| 
 | |
| 	return &memorySeriesStorage{
 | |
| 		fpLocker: newFingerprintLocker(100), // TODO: Tweak value.
 | |
| 
 | |
| 		fingerprintToSeries: fingerprintToSeries,
 | |
| 		persistDone:         make(chan bool),
 | |
| 		stopServing:         make(chan chan<- bool),
 | |
| 
 | |
| 		memoryEvictionInterval: o.MemoryEvictionInterval,
 | |
| 		memoryRetentionPeriod:  o.MemoryRetentionPeriod,
 | |
| 
 | |
| 		persistencePurgeInterval:   o.PersistencePurgeInterval,
 | |
| 		persistenceRetentionPeriod: o.PersistenceRetentionPeriod,
 | |
| 
 | |
| 		persistQueue: make(chan *persistRequest, persistQueueCap),
 | |
| 		persistence:  p,
 | |
| 	}, nil
 | |
| }
 | |
| 
 | |
| type persistRequest struct {
 | |
| 	fingerprint clientmodel.Fingerprint
 | |
| 	chunkDesc   *chunkDesc
 | |
| }
 | |
| 
 | |
| // AppendSamples implements Storage.
 | |
| func (s *memorySeriesStorage) AppendSamples(samples clientmodel.Samples) {
 | |
| 	for _, sample := range samples {
 | |
| 		s.appendSample(sample)
 | |
| 	}
 | |
| 
 | |
| 	numSamples.Add(float64(len(samples)))
 | |
| }
 | |
| 
 | |
| func (s *memorySeriesStorage) appendSample(sample *clientmodel.Sample) {
 | |
| 	fp := sample.Metric.Fingerprint()
 | |
| 	s.fpLocker.Lock(fp)
 | |
| 	defer s.fpLocker.Unlock(fp)
 | |
| 
 | |
| 	series := s.getOrCreateSeries(fp, sample.Metric)
 | |
| 	series.add(fp, &metric.SamplePair{
 | |
| 		Value:     sample.Value,
 | |
| 		Timestamp: sample.Timestamp,
 | |
| 	}, s.persistQueue)
 | |
| }
 | |
| 
 | |
| func (s *memorySeriesStorage) getOrCreateSeries(fp clientmodel.Fingerprint, m clientmodel.Metric) *memorySeries {
 | |
| 	series, ok := s.fingerprintToSeries.get(fp)
 | |
| 	if !ok {
 | |
| 		unarchived, err := s.persistence.unarchiveMetric(fp)
 | |
| 		if err != nil {
 | |
| 			glog.Errorf("Error unarchiving fingerprint %v: %v", fp, err)
 | |
| 		}
 | |
| 		if !unarchived {
 | |
| 			// This was a genuinely new series, so index the metric.
 | |
| 			s.persistence.indexMetric(m, fp)
 | |
| 		}
 | |
| 		series = newMemorySeries(m, !unarchived)
 | |
| 		s.fingerprintToSeries.put(fp, series)
 | |
| 		numSeries.Set(float64(s.fingerprintToSeries.length()))
 | |
| 
 | |
| 	}
 | |
| 	return series
 | |
| }
 | |
| 
 | |
| /*
 | |
| func (s *memorySeriesStorage) preloadChunksAtTime(fp clientmodel.Fingerprint, ts clientmodel.Timestamp) (chunkDescs, error) {
 | |
| 	series, ok := s.fingerprintToSeries.get(fp)
 | |
| 	if !ok {
 | |
| 		panic("requested preload for non-existent series")
 | |
| 	}
 | |
| 	return series.preloadChunksAtTime(ts, s.persistence)
 | |
| }
 | |
| */
 | |
| 
 | |
| func (s *memorySeriesStorage) preloadChunksForRange(fp clientmodel.Fingerprint, from clientmodel.Timestamp, through clientmodel.Timestamp) (chunkDescs, error) {
 | |
| 	stalenessDelta := 300 * time.Second // TODO: Turn into parameter.
 | |
| 	s.fpLocker.Lock(fp)
 | |
| 	defer s.fpLocker.Unlock(fp)
 | |
| 
 | |
| 	series, ok := s.fingerprintToSeries.get(fp)
 | |
| 	if !ok {
 | |
| 		has, first, last, err := s.persistence.hasArchivedMetric(fp)
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 		if !has {
 | |
| 			return nil, fmt.Errorf("requested preload for non-existent series %v", fp)
 | |
| 		}
 | |
| 		if from.Add(-stalenessDelta).Before(last) && through.Add(stalenessDelta).After(first) {
 | |
| 			metric, err := s.persistence.getArchivedMetric(fp)
 | |
| 			if err != nil {
 | |
| 				return nil, err
 | |
| 			}
 | |
| 			series = s.getOrCreateSeries(fp, metric)
 | |
| 		} else {
 | |
| 			return nil, nil
 | |
| 		}
 | |
| 	}
 | |
| 	return series.preloadChunksForRange(from, through, fp, s.persistence)
 | |
| }
 | |
| 
 | |
| // NewIterator implements storage.
 | |
| func (s *memorySeriesStorage) NewIterator(fp clientmodel.Fingerprint) SeriesIterator {
 | |
| 	s.fpLocker.Lock(fp)
 | |
| 	defer s.fpLocker.Unlock(fp)
 | |
| 
 | |
| 	series, ok := s.fingerprintToSeries.get(fp)
 | |
| 	if !ok {
 | |
| 		// Oops, no series for fp found. That happens if, after
 | |
| 		// preloading is done, the whole series is identified as old
 | |
| 		// enough for purging and hence purged for good. As there is no
 | |
| 		// data left to iterate over, return an iterator that will never
 | |
| 		// return any values.
 | |
| 		return nopSeriesIterator{}
 | |
| 	}
 | |
| 	return series.newIterator(
 | |
| 		func() { s.fpLocker.Lock(fp) },
 | |
| 		func() { s.fpLocker.Unlock(fp) },
 | |
| 	)
 | |
| }
 | |
| 
 | |
| func (s *memorySeriesStorage) evictMemoryChunks(ttl time.Duration) {
 | |
| 	defer func(begin time.Time) {
 | |
| 		evictionDuration.Set(float64(time.Since(begin) / time.Millisecond))
 | |
| 	}(time.Now())
 | |
| 
 | |
| 	for m := range s.fingerprintToSeries.iter() {
 | |
| 		s.fpLocker.Lock(m.fp)
 | |
| 		if m.series.evictOlderThan(
 | |
| 			clientmodel.TimestampFromTime(time.Now()).Add(-1*ttl),
 | |
| 			m.fp, s.persistQueue,
 | |
| 		) {
 | |
| 			s.fingerprintToSeries.del(m.fp)
 | |
| 			if err := s.persistence.archiveMetric(
 | |
| 				m.fp, m.series.metric, m.series.firstTime(), m.series.lastTime(),
 | |
| 			); err != nil {
 | |
| 				glog.Errorf("Error archiving metric %v: %v", m.series.metric, err)
 | |
| 			}
 | |
| 		}
 | |
| 		s.fpLocker.Unlock(m.fp)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func recordPersist(start time.Time, err error) {
 | |
| 	outcome := success
 | |
| 	if err != nil {
 | |
| 		outcome = failure
 | |
| 	}
 | |
| 	persistLatencies.WithLabelValues(outcome).Observe(float64(time.Since(start) / time.Millisecond))
 | |
| }
 | |
| 
 | |
| func (s *memorySeriesStorage) handlePersistQueue() {
 | |
| 	for req := range s.persistQueue {
 | |
| 		persistQueueLength.Set(float64(len(s.persistQueue)))
 | |
| 
 | |
| 		//glog.Info("Persist request: ", *req.fingerprint)
 | |
| 		start := time.Now()
 | |
| 		err := s.persistence.persistChunk(req.fingerprint, req.chunkDesc.chunk)
 | |
| 		recordPersist(start, err)
 | |
| 		if err != nil {
 | |
| 			glog.Error("Error persisting chunk, requeuing: ", err)
 | |
| 			s.persistQueue <- req
 | |
| 			continue
 | |
| 		}
 | |
| 		req.chunkDesc.unpin()
 | |
| 	}
 | |
| 	s.persistDone <- true
 | |
| }
 | |
| 
 | |
| // WaitForIndexing implements Storage.
 | |
| func (s *memorySeriesStorage) WaitForIndexing() {
 | |
| 	s.persistence.waitForIndexing()
 | |
| }
 | |
| 
 | |
| // Close stops serving, flushes all pending operations, and frees all
 | |
| // resources. It implements Storage.
 | |
| func (s *memorySeriesStorage) Close() error {
 | |
| 	stopped := make(chan bool)
 | |
| 	glog.Info("Waiting for storage to stop serving...")
 | |
| 	s.stopServing <- stopped
 | |
| 	<-stopped
 | |
| 	glog.Info("Serving stopped.")
 | |
| 
 | |
| 	glog.Info("Stopping persist loop...")
 | |
| 	close(s.persistQueue)
 | |
| 	<-s.persistDone
 | |
| 	glog.Info("Persist loop stopped.")
 | |
| 
 | |
| 	glog.Info("Persisting head chunks...")
 | |
| 	if err := s.persistence.persistSeriesMapAndHeads(s.fingerprintToSeries); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	glog.Info("Done persisting head chunks.")
 | |
| 
 | |
| 	if err := s.persistence.close(); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (s *memorySeriesStorage) purgePeriodically(stop <-chan bool) {
 | |
| 	purgeTicker := time.NewTicker(s.persistencePurgeInterval)
 | |
| 	defer purgeTicker.Stop()
 | |
| 
 | |
| 	for {
 | |
| 		select {
 | |
| 		case <-stop:
 | |
| 			glog.Info("Purging loop stopped.")
 | |
| 			return
 | |
| 		case <-purgeTicker.C:
 | |
| 			glog.Info("Purging old series data...")
 | |
| 			ts := clientmodel.TimestampFromTime(time.Now()).Add(-1 * s.persistenceRetentionPeriod)
 | |
| 			begin := time.Now()
 | |
| 
 | |
| 			for fp := range s.fingerprintToSeries.fpIter() {
 | |
| 				select {
 | |
| 				case <-stop:
 | |
| 					glog.Info("Interrupted running series purge.")
 | |
| 					return
 | |
| 				default:
 | |
| 					s.purgeSeries(fp, ts)
 | |
| 				}
 | |
| 			}
 | |
| 
 | |
| 			persistedFPs, err := s.persistence.getFingerprintsModifiedBefore(ts)
 | |
| 			if err != nil {
 | |
| 				glog.Error("Failed to lookup persisted fingerprint ranges: ", err)
 | |
| 				break
 | |
| 			}
 | |
| 			for _, fp := range persistedFPs {
 | |
| 				select {
 | |
| 				case <-stop:
 | |
| 					glog.Info("Interrupted running series purge.")
 | |
| 					return
 | |
| 				default:
 | |
| 					// TODO: Decide whether we also want to adjust the timerange index
 | |
| 					// entries here. Not updating them shouldn't break anything, but will
 | |
| 					// make some scenarios less efficient.
 | |
| 					s.purgeSeries(fp, ts)
 | |
| 				}
 | |
| 			}
 | |
| 			purgeDuration.Set(float64(time.Since(begin) / time.Millisecond))
 | |
| 			glog.Info("Done purging old series data.")
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // purgeSeries purges chunks older than persistenceRetentionPeriod from a
 | |
| // series. If the series contains no chunks after the purge, it is dropped
 | |
| // entirely.
 | |
| func (s *memorySeriesStorage) purgeSeries(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) {
 | |
| 	s.fpLocker.Lock(fp)
 | |
| 	defer s.fpLocker.Unlock(fp)
 | |
| 
 | |
| 	// First purge persisted chunks. We need to do that anyway.
 | |
| 	allDropped, err := s.persistence.dropChunks(fp, beforeTime)
 | |
| 	if err != nil {
 | |
| 		glog.Error("Error purging persisted chunks: ", err)
 | |
| 	}
 | |
| 
 | |
| 	// Purge chunks from memory accordingly.
 | |
| 	if series, ok := s.fingerprintToSeries.get(fp); ok {
 | |
| 		if series.purgeOlderThan(beforeTime) && allDropped {
 | |
| 			s.fingerprintToSeries.del(fp)
 | |
| 			s.persistence.unindexMetric(series.metric, fp)
 | |
| 		}
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	// If we arrive here, nothing was in memory, so the metric must have
 | |
| 	// been archived. Drop the archived metric if there are no persisted
 | |
| 	// chunks left.
 | |
| 	if !allDropped {
 | |
| 		return
 | |
| 	}
 | |
| 	if err := s.persistence.dropArchivedMetric(fp); err != nil {
 | |
| 		glog.Errorf("Error dropping archived metric for fingerprint %v: %v", fp, err)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Serve implements Storage.
 | |
| func (s *memorySeriesStorage) Serve(started chan struct{}) {
 | |
| 	evictMemoryTicker := time.NewTicker(s.memoryEvictionInterval)
 | |
| 	defer evictMemoryTicker.Stop()
 | |
| 
 | |
| 	go s.handlePersistQueue()
 | |
| 
 | |
| 	stopPurge := make(chan bool)
 | |
| 	go s.purgePeriodically(stopPurge)
 | |
| 
 | |
| 	close(started)
 | |
| 	for {
 | |
| 		select {
 | |
| 		case <-evictMemoryTicker.C:
 | |
| 			s.evictMemoryChunks(s.memoryRetentionPeriod)
 | |
| 		case stopped := <-s.stopServing:
 | |
| 			stopPurge <- true
 | |
| 			stopped <- true
 | |
| 			return
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // NewPreloader implements Storage.
 | |
| func (s *memorySeriesStorage) NewPreloader() Preloader {
 | |
| 	return &memorySeriesPreloader{
 | |
| 		storage: s,
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // GetFingerprintsForLabelMatchers implements Storage.
 | |
| func (s *memorySeriesStorage) GetFingerprintsForLabelMatchers(labelMatchers metric.LabelMatchers) clientmodel.Fingerprints {
 | |
| 	var result map[clientmodel.Fingerprint]struct{}
 | |
| 	for _, matcher := range labelMatchers {
 | |
| 		intersection := map[clientmodel.Fingerprint]struct{}{}
 | |
| 		switch matcher.Type {
 | |
| 		case metric.Equal:
 | |
| 			fps, err := s.persistence.getFingerprintsForLabelPair(
 | |
| 				metric.LabelPair{
 | |
| 					Name:  matcher.Name,
 | |
| 					Value: matcher.Value,
 | |
| 				},
 | |
| 			)
 | |
| 			if err != nil {
 | |
| 				glog.Error("Error getting fingerprints for label pair: ", err)
 | |
| 			}
 | |
| 			if len(fps) == 0 {
 | |
| 				return nil
 | |
| 			}
 | |
| 			for _, fp := range fps {
 | |
| 				if _, ok := result[fp]; ok || result == nil {
 | |
| 					intersection[fp] = struct{}{}
 | |
| 				}
 | |
| 			}
 | |
| 		default:
 | |
| 			values, err := s.persistence.getLabelValuesForLabelName(matcher.Name)
 | |
| 			if err != nil {
 | |
| 				glog.Errorf("Error getting label values for label name %q: %v", matcher.Name, err)
 | |
| 			}
 | |
| 			matches := matcher.Filter(values)
 | |
| 			if len(matches) == 0 {
 | |
| 				return nil
 | |
| 			}
 | |
| 			for _, v := range matches {
 | |
| 				fps, err := s.persistence.getFingerprintsForLabelPair(
 | |
| 					metric.LabelPair{
 | |
| 						Name:  matcher.Name,
 | |
| 						Value: v,
 | |
| 					},
 | |
| 				)
 | |
| 				if err != nil {
 | |
| 					glog.Error("Error getting fingerprints for label pair: ", err)
 | |
| 				}
 | |
| 				for _, fp := range fps {
 | |
| 					if _, ok := result[fp]; ok || result == nil {
 | |
| 						intersection[fp] = struct{}{}
 | |
| 					}
 | |
| 				}
 | |
| 			}
 | |
| 		}
 | |
| 		if len(intersection) == 0 {
 | |
| 			return nil
 | |
| 		}
 | |
| 		result = intersection
 | |
| 	}
 | |
| 
 | |
| 	fps := make(clientmodel.Fingerprints, 0, len(result))
 | |
| 	for fp := range result {
 | |
| 		fps = append(fps, fp)
 | |
| 	}
 | |
| 	return fps
 | |
| }
 | |
| 
 | |
| // GetLabelValuesForLabelName implements Storage.
 | |
| func (s *memorySeriesStorage) GetLabelValuesForLabelName(labelName clientmodel.LabelName) clientmodel.LabelValues {
 | |
| 	lvs, err := s.persistence.getLabelValuesForLabelName(labelName)
 | |
| 	if err != nil {
 | |
| 		glog.Errorf("Error getting label values for label name %q: %v", labelName, err)
 | |
| 	}
 | |
| 	return lvs
 | |
| }
 | |
| 
 | |
| // GetMetricForFingerprint implements Storage.
 | |
| func (s *memorySeriesStorage) GetMetricForFingerprint(fp clientmodel.Fingerprint) clientmodel.Metric {
 | |
| 	s.fpLocker.Lock(fp)
 | |
| 	defer s.fpLocker.Unlock(fp)
 | |
| 
 | |
| 	series, ok := s.fingerprintToSeries.get(fp)
 | |
| 	if ok {
 | |
| 		// Copy required here because caller might mutate the returned
 | |
| 		// metric.
 | |
| 		m := make(clientmodel.Metric, len(series.metric))
 | |
| 		for ln, lv := range series.metric {
 | |
| 			m[ln] = lv
 | |
| 		}
 | |
| 		return m
 | |
| 	}
 | |
| 	metric, err := s.persistence.getArchivedMetric(fp)
 | |
| 	if err != nil {
 | |
| 		glog.Errorf("Error retrieving archived metric for fingerprint %v: %v", fp, err)
 | |
| 	}
 | |
| 	return metric
 | |
| }
 | |
| 
 | |
| // Describe implements prometheus.Collector.
 | |
| func (s *memorySeriesStorage) Describe(ch chan<- *prometheus.Desc) {
 | |
| 	s.persistence.Describe(ch)
 | |
| }
 | |
| 
 | |
| // Collect implements prometheus.Collector.
 | |
| func (s *memorySeriesStorage) Collect(ch chan<- prometheus.Metric) {
 | |
| 	s.persistence.Collect(ch)
 | |
| }
 |