| 
									
										
										
										
											2017-05-10 17:44:13 +08:00
										 |  |  | // Copyright 2017 The Prometheus Authors
 | 
					
						
							|  |  |  | // Licensed under the Apache License, Version 2.0 (the "License");
 | 
					
						
							|  |  |  | // you may not use this file except in compliance with the License.
 | 
					
						
							|  |  |  | // You may obtain a copy of the License at
 | 
					
						
							|  |  |  | //
 | 
					
						
							|  |  |  | // http://www.apache.org/licenses/LICENSE-2.0
 | 
					
						
							|  |  |  | //
 | 
					
						
							|  |  |  | // Unless required by applicable law or agreed to in writing, software
 | 
					
						
							|  |  |  | // distributed under the License is distributed on an "AS IS" BASIS,
 | 
					
						
							|  |  |  | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
					
						
							|  |  |  | // See the License for the specific language governing permissions and
 | 
					
						
							|  |  |  | // limitations under the License.
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | package remote | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import ( | 
					
						
							| 
									
										
										
										
											2020-07-30 19:11:13 +08:00
										 |  |  | 	"context" | 
					
						
							| 
									
										
										
										
											2019-12-13 04:47:23 +08:00
										 |  |  | 	"fmt" | 
					
						
							| 
									
										
										
										
											2019-06-01 09:39:40 +08:00
										 |  |  | 	"sync" | 
					
						
							|  |  |  | 	"time" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	"github.com/go-kit/kit/log" | 
					
						
							| 
									
										
										
										
											2019-03-02 03:04:26 +08:00
										 |  |  | 	"github.com/prometheus/client_golang/prometheus" | 
					
						
							|  |  |  | 	"github.com/prometheus/client_golang/prometheus/promauto" | 
					
						
							| 
									
										
										
										
											2020-10-22 17:00:08 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-06-01 09:39:40 +08:00
										 |  |  | 	"github.com/prometheus/prometheus/config" | 
					
						
							| 
									
										
										
										
											2017-05-10 17:44:13 +08:00
										 |  |  | 	"github.com/prometheus/prometheus/pkg/labels" | 
					
						
							|  |  |  | 	"github.com/prometheus/prometheus/storage" | 
					
						
							| 
									
										
										
										
											2020-03-21 00:34:15 +08:00
										 |  |  | 	"github.com/prometheus/prometheus/tsdb/wal" | 
					
						
							| 
									
										
										
										
											2017-05-10 17:44:13 +08:00
										 |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-03-02 03:04:26 +08:00
										 |  |  | var ( | 
					
						
							|  |  |  | 	samplesIn = promauto.NewCounter(prometheus.CounterOpts{ | 
					
						
							|  |  |  | 		Namespace: namespace, | 
					
						
							|  |  |  | 		Subsystem: subsystem, | 
					
						
							|  |  |  | 		Name:      "samples_in_total", | 
					
						
							|  |  |  | 		Help:      "Samples in to remote storage, compare to samples out for queue managers.", | 
					
						
							|  |  |  | 	}) | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-06-01 09:39:40 +08:00
										 |  |  | // WriteStorage represents all the remote write storage.
 | 
					
						
							|  |  |  | type WriteStorage struct { | 
					
						
							|  |  |  | 	logger log.Logger | 
					
						
							| 
									
										
										
										
											2020-04-25 11:39:46 +08:00
										 |  |  | 	reg    prometheus.Registerer | 
					
						
							| 
									
										
										
										
											2019-06-01 09:39:40 +08:00
										 |  |  | 	mtx    sync.Mutex | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-03-21 00:34:15 +08:00
										 |  |  | 	watcherMetrics    *wal.WatcherMetrics | 
					
						
							|  |  |  | 	liveReaderMetrics *wal.LiveReaderMetrics | 
					
						
							| 
									
										
										
										
											2020-03-31 11:39:29 +08:00
										 |  |  | 	externalLabels    labels.Labels | 
					
						
							| 
									
										
										
										
											2019-09-05 01:21:53 +08:00
										 |  |  | 	walDir            string | 
					
						
							| 
									
										
										
										
											2019-12-13 04:47:23 +08:00
										 |  |  | 	queues            map[string]*QueueManager | 
					
						
							| 
									
										
										
										
											2019-09-05 01:21:53 +08:00
										 |  |  | 	samplesIn         *ewmaRate | 
					
						
							|  |  |  | 	flushDeadline     time.Duration | 
					
						
							| 
									
										
										
										
											2020-09-25 02:44:18 +08:00
										 |  |  | 	interner          *pool | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// For timestampTracker.
 | 
					
						
							| 
									
										
										
										
											2020-10-16 05:53:59 +08:00
										 |  |  | 	highestTimestamp *maxTimestamp | 
					
						
							| 
									
										
										
										
											2019-06-01 09:39:40 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // NewWriteStorage creates and runs a WriteStorage.
 | 
					
						
							| 
									
										
										
										
											2020-02-04 05:47:03 +08:00
										 |  |  | func NewWriteStorage(logger log.Logger, reg prometheus.Registerer, walDir string, flushDeadline time.Duration) *WriteStorage { | 
					
						
							| 
									
										
										
										
											2019-06-01 09:39:40 +08:00
										 |  |  | 	if logger == nil { | 
					
						
							|  |  |  | 		logger = log.NewNopLogger() | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	rws := &WriteStorage{ | 
					
						
							| 
									
										
										
										
											2020-03-21 00:34:15 +08:00
										 |  |  | 		queues:            make(map[string]*QueueManager), | 
					
						
							|  |  |  | 		watcherMetrics:    wal.NewWatcherMetrics(reg), | 
					
						
							|  |  |  | 		liveReaderMetrics: wal.NewLiveReaderMetrics(reg), | 
					
						
							|  |  |  | 		logger:            logger, | 
					
						
							| 
									
										
										
										
											2020-04-25 11:39:46 +08:00
										 |  |  | 		reg:               reg, | 
					
						
							| 
									
										
										
										
											2020-03-21 00:34:15 +08:00
										 |  |  | 		flushDeadline:     flushDeadline, | 
					
						
							|  |  |  | 		samplesIn:         newEWMARate(ewmaWeight, shardUpdateDuration), | 
					
						
							|  |  |  | 		walDir:            walDir, | 
					
						
							| 
									
										
										
										
											2020-09-25 02:44:18 +08:00
										 |  |  | 		interner:          newPool(), | 
					
						
							| 
									
										
										
										
											2020-10-16 05:53:59 +08:00
										 |  |  | 		highestTimestamp: &maxTimestamp{ | 
					
						
							| 
									
										
										
										
											2020-09-25 02:44:18 +08:00
										 |  |  | 			Gauge: prometheus.NewGauge(prometheus.GaugeOpts{ | 
					
						
							|  |  |  | 				Namespace: namespace, | 
					
						
							|  |  |  | 				Subsystem: subsystem, | 
					
						
							|  |  |  | 				Name:      "highest_timestamp_in_seconds", | 
					
						
							|  |  |  | 				Help:      "Highest timestamp that has come into the remote storage via the Appender interface, in seconds since epoch.", | 
					
						
							|  |  |  | 			}), | 
					
						
							|  |  |  | 		}, | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	if reg != nil { | 
					
						
							|  |  |  | 		reg.MustRegister(rws.highestTimestamp) | 
					
						
							| 
									
										
										
										
											2019-06-01 09:39:40 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 	go rws.run() | 
					
						
							|  |  |  | 	return rws | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (rws *WriteStorage) run() { | 
					
						
							|  |  |  | 	ticker := time.NewTicker(shardUpdateDuration) | 
					
						
							|  |  |  | 	defer ticker.Stop() | 
					
						
							|  |  |  | 	for range ticker.C { | 
					
						
							|  |  |  | 		rws.samplesIn.tick() | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // ApplyConfig updates the state as the new config requires.
 | 
					
						
							| 
									
										
										
										
											2019-09-05 01:21:53 +08:00
										 |  |  | // Only stop & create queues which have changes.
 | 
					
						
							| 
									
										
										
										
											2019-06-01 09:39:40 +08:00
										 |  |  | func (rws *WriteStorage) ApplyConfig(conf *config.Config) error { | 
					
						
							|  |  |  | 	rws.mtx.Lock() | 
					
						
							|  |  |  | 	defer rws.mtx.Unlock() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-12-13 04:47:23 +08:00
										 |  |  | 	// Remote write queues only need to change if the remote write config or
 | 
					
						
							|  |  |  | 	// external labels change.
 | 
					
						
							| 
									
										
										
										
											2020-03-31 11:39:29 +08:00
										 |  |  | 	externalLabelUnchanged := labels.Equal(conf.GlobalConfig.ExternalLabels, rws.externalLabels) | 
					
						
							|  |  |  | 	rws.externalLabels = conf.GlobalConfig.ExternalLabels | 
					
						
							| 
									
										
										
										
											2019-06-01 09:39:40 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-12-13 04:47:23 +08:00
										 |  |  | 	newQueues := make(map[string]*QueueManager) | 
					
						
							|  |  |  | 	newHashes := []string{} | 
					
						
							|  |  |  | 	for _, rwConf := range conf.RemoteWriteConfigs { | 
					
						
							|  |  |  | 		hash, err := toHash(rwConf) | 
					
						
							| 
									
										
										
										
											2019-09-05 01:21:53 +08:00
										 |  |  | 		if err != nil { | 
					
						
							|  |  |  | 			return err | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-03-31 11:39:29 +08:00
										 |  |  | 		// Don't allow duplicate remote write configs.
 | 
					
						
							|  |  |  | 		if _, ok := newQueues[hash]; ok { | 
					
						
							|  |  |  | 			return fmt.Errorf("duplicate remote write configs are not allowed, found duplicate for URL: %s", rwConf.URL) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-12-13 04:47:23 +08:00
										 |  |  | 		// Set the queue name to the config hash if the user has not set
 | 
					
						
							|  |  |  | 		// a name in their remote write config so we can still differentiate
 | 
					
						
							|  |  |  | 		// between queues that have the same remote write endpoint.
 | 
					
						
							| 
									
										
										
										
											2020-06-24 21:41:52 +08:00
										 |  |  | 		name := hash[:6] | 
					
						
							| 
									
										
										
										
											2019-12-13 04:47:23 +08:00
										 |  |  | 		if rwConf.Name != "" { | 
					
						
							|  |  |  | 			name = rwConf.Name | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-06-24 21:41:52 +08:00
										 |  |  | 		c, err := NewWriteClient(name, &ClientConfig{ | 
					
						
							| 
									
										
										
										
											2019-06-01 09:39:40 +08:00
										 |  |  | 			URL:              rwConf.URL, | 
					
						
							|  |  |  | 			Timeout:          rwConf.RemoteTimeout, | 
					
						
							|  |  |  | 			HTTPClientConfig: rwConf.HTTPClientConfig, | 
					
						
							|  |  |  | 		}) | 
					
						
							|  |  |  | 		if err != nil { | 
					
						
							|  |  |  | 			return err | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2020-03-31 11:39:29 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 		queue, ok := rws.queues[hash] | 
					
						
							|  |  |  | 		if externalLabelUnchanged && ok { | 
					
						
							|  |  |  | 			// Update the client in case any secret configuration has changed.
 | 
					
						
							|  |  |  | 			queue.SetClient(c) | 
					
						
							|  |  |  | 			newQueues[hash] = queue | 
					
						
							|  |  |  | 			delete(rws.queues, hash) | 
					
						
							|  |  |  | 			continue | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-04-25 11:39:46 +08:00
										 |  |  | 		endpoint := rwConf.URL.String() | 
					
						
							| 
									
										
										
										
											2019-12-13 04:47:23 +08:00
										 |  |  | 		newQueues[hash] = NewQueueManager( | 
					
						
							| 
									
										
										
										
											2020-04-25 11:39:46 +08:00
										 |  |  | 			newQueueManagerMetrics(rws.reg, name, endpoint), | 
					
						
							| 
									
										
										
										
											2020-03-21 00:34:15 +08:00
										 |  |  | 			rws.watcherMetrics, | 
					
						
							|  |  |  | 			rws.liveReaderMetrics, | 
					
						
							| 
									
										
										
										
											2019-06-01 09:39:40 +08:00
										 |  |  | 			rws.logger, | 
					
						
							|  |  |  | 			rws.walDir, | 
					
						
							|  |  |  | 			rws.samplesIn, | 
					
						
							|  |  |  | 			rwConf.QueueConfig, | 
					
						
							|  |  |  | 			conf.GlobalConfig.ExternalLabels, | 
					
						
							|  |  |  | 			rwConf.WriteRelabelConfigs, | 
					
						
							|  |  |  | 			c, | 
					
						
							|  |  |  | 			rws.flushDeadline, | 
					
						
							| 
									
										
										
										
											2020-09-25 02:44:18 +08:00
										 |  |  | 			rws.interner, | 
					
						
							|  |  |  | 			rws.highestTimestamp, | 
					
						
							| 
									
										
										
										
											2019-12-13 04:47:23 +08:00
										 |  |  | 		) | 
					
						
							|  |  |  | 		// Keep track of which queues are new so we know which to start.
 | 
					
						
							| 
									
										
										
										
											2019-09-05 01:21:53 +08:00
										 |  |  | 		newHashes = append(newHashes, hash) | 
					
						
							| 
									
										
										
										
											2019-06-01 09:39:40 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-12-13 04:47:23 +08:00
										 |  |  | 	// Anything remaining in rws.queues is a queue who's config has
 | 
					
						
							|  |  |  | 	// changed or was removed from the overall remote write config.
 | 
					
						
							| 
									
										
										
										
											2019-06-01 09:39:40 +08:00
										 |  |  | 	for _, q := range rws.queues { | 
					
						
							| 
									
										
										
										
											2019-12-13 04:47:23 +08:00
										 |  |  | 		q.Stop() | 
					
						
							| 
									
										
										
										
											2019-06-01 09:39:40 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-12-13 04:47:23 +08:00
										 |  |  | 	for _, hash := range newHashes { | 
					
						
							|  |  |  | 		newQueues[hash].Start() | 
					
						
							| 
									
										
										
										
											2019-06-01 09:39:40 +08:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2019-09-05 01:21:53 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	rws.queues = newQueues | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-06-01 09:39:40 +08:00
										 |  |  | 	return nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Appender implements storage.Storage.
 | 
					
						
							| 
									
										
										
										
											2020-07-30 19:11:13 +08:00
										 |  |  | func (rws *WriteStorage) Appender(_ context.Context) storage.Appender { | 
					
						
							| 
									
										
										
										
											2019-01-18 20:48:16 +08:00
										 |  |  | 	return ×tampTracker{ | 
					
						
							| 
									
										
										
										
											2020-09-25 02:44:18 +08:00
										 |  |  | 		writeStorage:         rws, | 
					
						
							|  |  |  | 		highestRecvTimestamp: rws.highestTimestamp, | 
					
						
							| 
									
										
										
										
											2020-02-06 23:58:38 +08:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2019-01-18 20:48:16 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-06-01 09:39:40 +08:00
										 |  |  | // Close closes the WriteStorage.
 | 
					
						
							|  |  |  | func (rws *WriteStorage) Close() error { | 
					
						
							|  |  |  | 	rws.mtx.Lock() | 
					
						
							|  |  |  | 	defer rws.mtx.Unlock() | 
					
						
							|  |  |  | 	for _, q := range rws.queues { | 
					
						
							|  |  |  | 		q.Stop() | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-01-18 20:48:16 +08:00
										 |  |  | type timestampTracker struct { | 
					
						
							| 
									
										
										
										
											2020-09-25 02:44:18 +08:00
										 |  |  | 	writeStorage         *WriteStorage | 
					
						
							|  |  |  | 	samples              int64 | 
					
						
							|  |  |  | 	highestTimestamp     int64 | 
					
						
							| 
									
										
										
										
											2020-10-16 05:53:59 +08:00
										 |  |  | 	highestRecvTimestamp *maxTimestamp | 
					
						
							| 
									
										
										
										
											2017-05-10 17:44:13 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-10-23 21:57:30 +08:00
										 |  |  | // Add implements storage.Appender.
 | 
					
						
							| 
									
										
										
										
											2020-02-06 23:58:38 +08:00
										 |  |  | func (t *timestampTracker) Add(_ labels.Labels, ts int64, _ float64) (uint64, error) { | 
					
						
							| 
									
										
										
										
											2019-01-18 20:48:16 +08:00
										 |  |  | 	t.samples++ | 
					
						
							|  |  |  | 	if ts > t.highestTimestamp { | 
					
						
							|  |  |  | 		t.highestTimestamp = ts | 
					
						
							| 
									
										
										
										
											2017-05-10 17:44:13 +08:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2017-09-07 20:14:41 +08:00
										 |  |  | 	return 0, nil | 
					
						
							| 
									
										
										
										
											2017-05-10 17:44:13 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-10-24 04:28:57 +08:00
										 |  |  | // AddFast implements storage.Appender.
 | 
					
						
							| 
									
										
										
										
											2020-02-06 23:58:38 +08:00
										 |  |  | func (t *timestampTracker) AddFast(_ uint64, ts int64, v float64) error { | 
					
						
							|  |  |  | 	_, err := t.Add(nil, ts, v) | 
					
						
							| 
									
										
										
										
											2017-07-12 19:41:27 +08:00
										 |  |  | 	return err | 
					
						
							| 
									
										
										
										
											2017-05-10 17:44:13 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-10-23 21:57:30 +08:00
										 |  |  | // Commit implements storage.Appender.
 | 
					
						
							| 
									
										
										
										
											2019-01-18 20:48:16 +08:00
										 |  |  | func (t *timestampTracker) Commit() error { | 
					
						
							| 
									
										
										
										
											2019-06-01 09:39:40 +08:00
										 |  |  | 	t.writeStorage.samplesIn.incr(t.samples) | 
					
						
							| 
									
										
										
										
											2019-01-18 20:48:16 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-03-02 03:04:26 +08:00
										 |  |  | 	samplesIn.Add(float64(t.samples)) | 
					
						
							| 
									
										
										
										
											2020-09-25 02:44:18 +08:00
										 |  |  | 	t.highestRecvTimestamp.Set(float64(t.highestTimestamp / 1000)) | 
					
						
							| 
									
										
										
										
											2017-05-10 17:44:13 +08:00
										 |  |  | 	return nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-10-23 21:57:30 +08:00
										 |  |  | // Rollback implements storage.Appender.
 | 
					
						
							| 
									
										
										
										
											2019-01-18 20:48:16 +08:00
										 |  |  | func (*timestampTracker) Rollback() error { | 
					
						
							| 
									
										
										
										
											2017-05-10 17:44:13 +08:00
										 |  |  | 	return nil | 
					
						
							|  |  |  | } |