| 
									
										
										
										
											2017-05-10 17:44:13 +08:00
										 |  |  | // Copyright 2017 The Prometheus Authors
 | 
					
						
							|  |  |  | // Licensed under the Apache License, Version 2.0 (the "License");
 | 
					
						
							|  |  |  | // you may not use this file except in compliance with the License.
 | 
					
						
							|  |  |  | // You may obtain a copy of the License at
 | 
					
						
							|  |  |  | //
 | 
					
						
							|  |  |  | // http://www.apache.org/licenses/LICENSE-2.0
 | 
					
						
							|  |  |  | //
 | 
					
						
							|  |  |  | // Unless required by applicable law or agreed to in writing, software
 | 
					
						
							|  |  |  | // distributed under the License is distributed on an "AS IS" BASIS,
 | 
					
						
							|  |  |  | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
					
						
							|  |  |  | // See the License for the specific language governing permissions and
 | 
					
						
							|  |  |  | // limitations under the License.
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | package remote | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import ( | 
					
						
							| 
									
										
										
										
											2019-06-01 09:39:40 +08:00
										 |  |  | 	"crypto/md5" | 
					
						
							|  |  |  | 	"encoding/json" | 
					
						
							|  |  |  | 	"sync" | 
					
						
							|  |  |  | 	"time" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	"github.com/go-kit/kit/log" | 
					
						
							|  |  |  | 	"github.com/go-kit/kit/log/level" | 
					
						
							| 
									
										
										
										
											2019-03-02 03:04:26 +08:00
										 |  |  | 	"github.com/prometheus/client_golang/prometheus" | 
					
						
							|  |  |  | 	"github.com/prometheus/client_golang/prometheus/promauto" | 
					
						
							| 
									
										
										
										
											2019-06-01 09:39:40 +08:00
										 |  |  | 	"github.com/prometheus/prometheus/config" | 
					
						
							| 
									
										
										
										
											2017-05-10 17:44:13 +08:00
										 |  |  | 	"github.com/prometheus/prometheus/pkg/labels" | 
					
						
							|  |  |  | 	"github.com/prometheus/prometheus/storage" | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-03-02 03:04:26 +08:00
										 |  |  | var ( | 
					
						
							|  |  |  | 	samplesIn = promauto.NewCounter(prometheus.CounterOpts{ | 
					
						
							|  |  |  | 		Namespace: namespace, | 
					
						
							|  |  |  | 		Subsystem: subsystem, | 
					
						
							|  |  |  | 		Name:      "samples_in_total", | 
					
						
							|  |  |  | 		Help:      "Samples in to remote storage, compare to samples out for queue managers.", | 
					
						
							|  |  |  | 	}) | 
					
						
							|  |  |  | 	highestTimestamp = maxGauge{ | 
					
						
							|  |  |  | 		Gauge: promauto.NewGauge(prometheus.GaugeOpts{ | 
					
						
							|  |  |  | 			Namespace: namespace, | 
					
						
							|  |  |  | 			Subsystem: subsystem, | 
					
						
							|  |  |  | 			Name:      "highest_timestamp_in_seconds", | 
					
						
							|  |  |  | 			Help:      "Highest timestamp that has come into the remote storage via the Appender interface, in seconds since epoch.", | 
					
						
							|  |  |  | 		}), | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-06-01 09:39:40 +08:00
										 |  |  | // WriteStorage represents all the remote write storage.
 | 
					
						
							|  |  |  | type WriteStorage struct { | 
					
						
							|  |  |  | 	logger log.Logger | 
					
						
							|  |  |  | 	mtx    sync.Mutex | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-09-05 01:21:53 +08:00
										 |  |  | 	configHash        [16]byte | 
					
						
							|  |  |  | 	externalLabelHash [16]byte | 
					
						
							|  |  |  | 	walDir            string | 
					
						
							|  |  |  | 	queues            []*QueueManager | 
					
						
							|  |  |  | 	hashes            [][16]byte | 
					
						
							|  |  |  | 	samplesIn         *ewmaRate | 
					
						
							|  |  |  | 	flushDeadline     time.Duration | 
					
						
							| 
									
										
										
										
											2019-06-01 09:39:40 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // NewWriteStorage creates and runs a WriteStorage.
 | 
					
						
							|  |  |  | func NewWriteStorage(logger log.Logger, walDir string, flushDeadline time.Duration) *WriteStorage { | 
					
						
							|  |  |  | 	if logger == nil { | 
					
						
							|  |  |  | 		logger = log.NewNopLogger() | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	rws := &WriteStorage{ | 
					
						
							|  |  |  | 		logger:        logger, | 
					
						
							|  |  |  | 		flushDeadline: flushDeadline, | 
					
						
							|  |  |  | 		samplesIn:     newEWMARate(ewmaWeight, shardUpdateDuration), | 
					
						
							|  |  |  | 		walDir:        walDir, | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	go rws.run() | 
					
						
							|  |  |  | 	return rws | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (rws *WriteStorage) run() { | 
					
						
							|  |  |  | 	ticker := time.NewTicker(shardUpdateDuration) | 
					
						
							|  |  |  | 	defer ticker.Stop() | 
					
						
							|  |  |  | 	for range ticker.C { | 
					
						
							|  |  |  | 		rws.samplesIn.tick() | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // ApplyConfig updates the state as the new config requires.
 | 
					
						
							| 
									
										
										
										
											2019-09-05 01:21:53 +08:00
										 |  |  | // Only stop & create queues which have changes.
 | 
					
						
							| 
									
										
										
										
											2019-06-01 09:39:40 +08:00
										 |  |  | func (rws *WriteStorage) ApplyConfig(conf *config.Config) error { | 
					
						
							|  |  |  | 	rws.mtx.Lock() | 
					
						
							|  |  |  | 	defer rws.mtx.Unlock() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Remote write queues only need to change if the remote write config or
 | 
					
						
							|  |  |  | 	// external labels change. Hash these together and only reload if the hash
 | 
					
						
							|  |  |  | 	// changes.
 | 
					
						
							|  |  |  | 	cfgBytes, err := json.Marshal(conf.RemoteWriteConfigs) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return err | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	externalLabelBytes, err := json.Marshal(conf.GlobalConfig.ExternalLabels) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return err | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-09-05 01:21:53 +08:00
										 |  |  | 	configHash := md5.Sum(cfgBytes) | 
					
						
							|  |  |  | 	externalLabelHash := md5.Sum(externalLabelBytes) | 
					
						
							|  |  |  | 	externalLabelUnchanged := externalLabelHash == rws.externalLabelHash | 
					
						
							|  |  |  | 	if configHash == rws.configHash && externalLabelUnchanged { | 
					
						
							| 
									
										
										
										
											2019-06-01 09:39:40 +08:00
										 |  |  | 		level.Debug(rws.logger).Log("msg", "remote write config has not changed, no need to restart QueueManagers") | 
					
						
							|  |  |  | 		return nil | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-09-05 01:21:53 +08:00
										 |  |  | 	rws.configHash = configHash | 
					
						
							|  |  |  | 	rws.externalLabelHash = externalLabelHash | 
					
						
							| 
									
										
										
										
											2019-06-01 09:39:40 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	// Update write queues
 | 
					
						
							|  |  |  | 	newQueues := []*QueueManager{} | 
					
						
							| 
									
										
										
										
											2019-09-05 01:21:53 +08:00
										 |  |  | 	newHashes := [][16]byte{} | 
					
						
							|  |  |  | 	newClientIndexes := []int{} | 
					
						
							| 
									
										
										
										
											2019-06-01 09:39:40 +08:00
										 |  |  | 	for i, rwConf := range conf.RemoteWriteConfigs { | 
					
						
							| 
									
										
										
										
											2019-09-05 01:21:53 +08:00
										 |  |  | 		b, err := json.Marshal(rwConf) | 
					
						
							|  |  |  | 		if err != nil { | 
					
						
							|  |  |  | 			return err | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		// Use RemoteWriteConfigs and its index to get hash. So if its index changed,
 | 
					
						
							| 
									
										
										
										
											2019-09-30 23:54:55 +08:00
										 |  |  | 		// the corresponding queue should also be restarted.
 | 
					
						
							| 
									
										
										
										
											2019-09-05 01:21:53 +08:00
										 |  |  | 		hash := md5.Sum(b) | 
					
						
							|  |  |  | 		if i < len(rws.queues) && rws.hashes[i] == hash && externalLabelUnchanged { | 
					
						
							|  |  |  | 			// The RemoteWriteConfig and index both not changed, keep the queue.
 | 
					
						
							|  |  |  | 			newQueues = append(newQueues, rws.queues[i]) | 
					
						
							|  |  |  | 			newHashes = append(newHashes, hash) | 
					
						
							|  |  |  | 			rws.queues[i] = nil | 
					
						
							|  |  |  | 			continue | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		// Otherwise create a new queue.
 | 
					
						
							| 
									
										
										
										
											2019-06-01 09:39:40 +08:00
										 |  |  | 		c, err := NewClient(i, &ClientConfig{ | 
					
						
							|  |  |  | 			URL:              rwConf.URL, | 
					
						
							|  |  |  | 			Timeout:          rwConf.RemoteTimeout, | 
					
						
							|  |  |  | 			HTTPClientConfig: rwConf.HTTPClientConfig, | 
					
						
							|  |  |  | 		}) | 
					
						
							|  |  |  | 		if err != nil { | 
					
						
							|  |  |  | 			return err | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		newQueues = append(newQueues, NewQueueManager( | 
					
						
							| 
									
										
										
										
											2019-09-19 17:15:41 +08:00
										 |  |  | 			prometheus.DefaultRegisterer, | 
					
						
							| 
									
										
										
										
											2019-06-01 09:39:40 +08:00
										 |  |  | 			rws.logger, | 
					
						
							|  |  |  | 			rws.walDir, | 
					
						
							|  |  |  | 			rws.samplesIn, | 
					
						
							|  |  |  | 			rwConf.QueueConfig, | 
					
						
							|  |  |  | 			conf.GlobalConfig.ExternalLabels, | 
					
						
							|  |  |  | 			rwConf.WriteRelabelConfigs, | 
					
						
							|  |  |  | 			c, | 
					
						
							|  |  |  | 			rws.flushDeadline, | 
					
						
							|  |  |  | 		)) | 
					
						
							| 
									
										
										
										
											2019-09-05 01:21:53 +08:00
										 |  |  | 		newHashes = append(newHashes, hash) | 
					
						
							|  |  |  | 		newClientIndexes = append(newClientIndexes, i) | 
					
						
							| 
									
										
										
										
											2019-06-01 09:39:40 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	for _, q := range rws.queues { | 
					
						
							| 
									
										
										
										
											2019-09-05 01:21:53 +08:00
										 |  |  | 		// A nil queue means that queue has been reused.
 | 
					
						
							|  |  |  | 		if q != nil { | 
					
						
							|  |  |  | 			q.Stop() | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2019-06-01 09:39:40 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-09-05 01:21:53 +08:00
										 |  |  | 	for _, index := range newClientIndexes { | 
					
						
							|  |  |  | 		newQueues[index].Start() | 
					
						
							| 
									
										
										
										
											2019-06-01 09:39:40 +08:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2019-09-05 01:21:53 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	rws.queues = newQueues | 
					
						
							|  |  |  | 	rws.hashes = newHashes | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-06-01 09:39:40 +08:00
										 |  |  | 	return nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Appender implements storage.Storage.
 | 
					
						
							|  |  |  | func (rws *WriteStorage) Appender() (storage.Appender, error) { | 
					
						
							| 
									
										
										
										
											2019-01-18 20:48:16 +08:00
										 |  |  | 	return ×tampTracker{ | 
					
						
							| 
									
										
										
										
											2019-06-01 09:39:40 +08:00
										 |  |  | 		writeStorage: rws, | 
					
						
							| 
									
										
										
										
											2019-01-18 20:48:16 +08:00
										 |  |  | 	}, nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-06-01 09:39:40 +08:00
										 |  |  | // Close closes the WriteStorage.
 | 
					
						
							|  |  |  | func (rws *WriteStorage) Close() error { | 
					
						
							|  |  |  | 	rws.mtx.Lock() | 
					
						
							|  |  |  | 	defer rws.mtx.Unlock() | 
					
						
							|  |  |  | 	for _, q := range rws.queues { | 
					
						
							|  |  |  | 		q.Stop() | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-01-18 20:48:16 +08:00
										 |  |  | type timestampTracker struct { | 
					
						
							| 
									
										
										
										
											2019-06-01 09:39:40 +08:00
										 |  |  | 	writeStorage     *WriteStorage | 
					
						
							| 
									
										
										
										
											2019-01-18 20:48:16 +08:00
										 |  |  | 	samples          int64 | 
					
						
							|  |  |  | 	highestTimestamp int64 | 
					
						
							| 
									
										
										
										
											2017-05-10 17:44:13 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-10-23 21:57:30 +08:00
										 |  |  | // Add implements storage.Appender.
 | 
					
						
							| 
									
										
										
										
											2019-01-18 20:48:16 +08:00
										 |  |  | func (t *timestampTracker) Add(_ labels.Labels, ts int64, v float64) (uint64, error) { | 
					
						
							|  |  |  | 	t.samples++ | 
					
						
							|  |  |  | 	if ts > t.highestTimestamp { | 
					
						
							|  |  |  | 		t.highestTimestamp = ts | 
					
						
							| 
									
										
										
										
											2017-05-10 17:44:13 +08:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2017-09-07 20:14:41 +08:00
										 |  |  | 	return 0, nil | 
					
						
							| 
									
										
										
										
											2017-05-10 17:44:13 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-10-24 04:28:57 +08:00
										 |  |  | // AddFast implements storage.Appender.
 | 
					
						
							| 
									
										
										
										
											2019-01-18 20:48:16 +08:00
										 |  |  | func (t *timestampTracker) AddFast(l labels.Labels, _ uint64, ts int64, v float64) error { | 
					
						
							|  |  |  | 	_, err := t.Add(l, ts, v) | 
					
						
							| 
									
										
										
										
											2017-07-12 19:41:27 +08:00
										 |  |  | 	return err | 
					
						
							| 
									
										
										
										
											2017-05-10 17:44:13 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-10-23 21:57:30 +08:00
										 |  |  | // Commit implements storage.Appender.
 | 
					
						
							| 
									
										
										
										
											2019-01-18 20:48:16 +08:00
										 |  |  | func (t *timestampTracker) Commit() error { | 
					
						
							| 
									
										
										
										
											2019-06-01 09:39:40 +08:00
										 |  |  | 	t.writeStorage.samplesIn.incr(t.samples) | 
					
						
							| 
									
										
										
										
											2019-01-18 20:48:16 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-03-02 03:04:26 +08:00
										 |  |  | 	samplesIn.Add(float64(t.samples)) | 
					
						
							|  |  |  | 	highestTimestamp.Set(float64(t.highestTimestamp / 1000)) | 
					
						
							| 
									
										
										
										
											2017-05-10 17:44:13 +08:00
										 |  |  | 	return nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-10-23 21:57:30 +08:00
										 |  |  | // Rollback implements storage.Appender.
 | 
					
						
							| 
									
										
										
										
											2019-01-18 20:48:16 +08:00
										 |  |  | func (*timestampTracker) Rollback() error { | 
					
						
							| 
									
										
										
										
											2017-05-10 17:44:13 +08:00
										 |  |  | 	return nil | 
					
						
							|  |  |  | } |