| 
									
										
										
										
											2021-03-16 17:47:45 +08:00
										 |  |  | // Copyright 2020 The Prometheus Authors
 | 
					
						
							|  |  |  | // Licensed under the Apache License, Version 2.0 (the "License");
 | 
					
						
							|  |  |  | // you may not use this file except in compliance with the License.
 | 
					
						
							|  |  |  | // You may obtain a copy of the License at
 | 
					
						
							|  |  |  | //
 | 
					
						
							|  |  |  | // http://www.apache.org/licenses/LICENSE-2.0
 | 
					
						
							|  |  |  | //
 | 
					
						
							|  |  |  | // Unless required by applicable law or agreed to in writing, software
 | 
					
						
							|  |  |  | // distributed under the License is distributed on an "AS IS" BASIS,
 | 
					
						
							|  |  |  | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
					
						
							|  |  |  | // See the License for the specific language governing permissions and
 | 
					
						
							|  |  |  | // limitations under the License.
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | package tsdb | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import ( | 
					
						
							|  |  |  | 	"context" | 
					
						
							| 
									
										
										
										
											2023-11-17 02:54:41 +08:00
										 |  |  | 	"errors" | 
					
						
							| 
									
										
										
										
											2024-01-16 00:24:46 +08:00
										 |  |  | 	"slices" | 
					
						
							| 
									
										
										
										
											2021-03-16 17:47:45 +08:00
										 |  |  | 	"sync" | 
					
						
							| 
									
										
										
										
											2021-05-07 04:53:52 +08:00
										 |  |  | 	"unicode/utf8" | 
					
						
							| 
									
										
										
										
											2021-03-16 17:47:45 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	"github.com/prometheus/client_golang/prometheus" | 
					
						
							| 
									
										
										
										
											2021-10-22 16:19:38 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-07-20 12:52:57 +08:00
										 |  |  | 	"github.com/prometheus/prometheus/config" | 
					
						
							| 
									
										
										
										
											2021-11-08 22:23:17 +08:00
										 |  |  | 	"github.com/prometheus/prometheus/model/exemplar" | 
					
						
							|  |  |  | 	"github.com/prometheus/prometheus/model/labels" | 
					
						
							| 
									
										
										
										
											2021-03-16 17:47:45 +08:00
										 |  |  | 	"github.com/prometheus/prometheus/storage" | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-01-06 18:28:58 +08:00
										 |  |  | const ( | 
					
						
							| 
									
										
										
										
											2024-09-11 04:32:03 +08:00
										 |  |  | 	// Indicates that there is no index entry for an exemplar.
 | 
					
						
							| 
									
										
										
										
											2022-01-06 18:28:58 +08:00
										 |  |  | 	noExemplar = -1 | 
					
						
							|  |  |  | 	// Estimated number of exemplars per series, for sizing the index.
 | 
					
						
							|  |  |  | 	estimatedExemplarsPerSeries = 16 | 
					
						
							|  |  |  | ) | 
					
						
							| 
									
										
										
										
											2021-03-16 17:47:45 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-07-20 12:52:57 +08:00
										 |  |  | type CircularExemplarStorage struct { | 
					
						
							| 
									
										
										
										
											2021-03-16 17:47:45 +08:00
										 |  |  | 	lock      sync.RWMutex | 
					
						
							| 
									
										
										
										
											2024-05-11 23:37:52 +08:00
										 |  |  | 	exemplars []circularBufferEntry | 
					
						
							| 
									
										
										
										
											2021-03-16 17:47:45 +08:00
										 |  |  | 	nextIndex int | 
					
						
							| 
									
										
										
										
											2021-07-20 12:52:57 +08:00
										 |  |  | 	metrics   *ExemplarMetrics | 
					
						
							| 
									
										
										
										
											2021-03-16 17:47:45 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	// Map of series labels as a string to index entry, which points to the first
 | 
					
						
							|  |  |  | 	// and last exemplar for the series in the exemplars circular buffer.
 | 
					
						
							|  |  |  | 	index map[string]*indexEntry | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | type indexEntry struct { | 
					
						
							| 
									
										
										
										
											2021-05-06 01:51:16 +08:00
										 |  |  | 	oldest       int | 
					
						
							|  |  |  | 	newest       int | 
					
						
							|  |  |  | 	seriesLabels labels.Labels | 
					
						
							| 
									
										
										
										
											2021-03-16 17:47:45 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | type circularBufferEntry struct { | 
					
						
							| 
									
										
										
										
											2021-05-06 01:51:16 +08:00
										 |  |  | 	exemplar exemplar.Exemplar | 
					
						
							|  |  |  | 	next     int | 
					
						
							|  |  |  | 	ref      *indexEntry | 
					
						
							| 
									
										
										
										
											2021-03-16 17:47:45 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-07-20 12:52:57 +08:00
										 |  |  | type ExemplarMetrics struct { | 
					
						
							|  |  |  | 	exemplarsAppended            prometheus.Counter | 
					
						
							|  |  |  | 	exemplarsInStorage           prometheus.Gauge | 
					
						
							|  |  |  | 	seriesWithExemplarsInStorage prometheus.Gauge | 
					
						
							|  |  |  | 	lastExemplarsTs              prometheus.Gauge | 
					
						
							|  |  |  | 	maxExemplars                 prometheus.Gauge | 
					
						
							|  |  |  | 	outOfOrderExemplars          prometheus.Counter | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func NewExemplarMetrics(reg prometheus.Registerer) *ExemplarMetrics { | 
					
						
							|  |  |  | 	m := ExemplarMetrics{ | 
					
						
							| 
									
										
										
										
											2021-04-16 20:44:53 +08:00
										 |  |  | 		exemplarsAppended: prometheus.NewCounter(prometheus.CounterOpts{ | 
					
						
							|  |  |  | 			Name: "prometheus_tsdb_exemplar_exemplars_appended_total", | 
					
						
							|  |  |  | 			Help: "Total number of appended exemplars.", | 
					
						
							|  |  |  | 		}), | 
					
						
							|  |  |  | 		exemplarsInStorage: prometheus.NewGauge(prometheus.GaugeOpts{ | 
					
						
							|  |  |  | 			Name: "prometheus_tsdb_exemplar_exemplars_in_storage", | 
					
						
							|  |  |  | 			Help: "Number of exemplars currently in circular storage.", | 
					
						
							|  |  |  | 		}), | 
					
						
							|  |  |  | 		seriesWithExemplarsInStorage: prometheus.NewGauge(prometheus.GaugeOpts{ | 
					
						
							|  |  |  | 			Name: "prometheus_tsdb_exemplar_series_with_exemplars_in_storage", | 
					
						
							|  |  |  | 			Help: "Number of series with exemplars currently in circular storage.", | 
					
						
							|  |  |  | 		}), | 
					
						
							|  |  |  | 		lastExemplarsTs: prometheus.NewGauge(prometheus.GaugeOpts{ | 
					
						
							|  |  |  | 			Name: "prometheus_tsdb_exemplar_last_exemplars_timestamp_seconds", | 
					
						
							|  |  |  | 			Help: "The timestamp of the oldest exemplar stored in circular storage. Useful to check for what time" + | 
					
						
							|  |  |  | 				"range the current exemplar buffer limit allows. This usually means the last timestamp" + | 
					
						
							|  |  |  | 				"for all exemplars for a typical setup. This is not true though if one of the series timestamp is in future compared to rest series.", | 
					
						
							|  |  |  | 		}), | 
					
						
							| 
									
										
										
										
											2021-03-16 17:47:45 +08:00
										 |  |  | 		outOfOrderExemplars: prometheus.NewCounter(prometheus.CounterOpts{ | 
					
						
							|  |  |  | 			Name: "prometheus_tsdb_exemplar_out_of_order_exemplars_total", | 
					
						
							| 
									
										
										
										
											2021-05-07 04:53:52 +08:00
										 |  |  | 			Help: "Total number of out of order exemplar ingestion failed attempts.", | 
					
						
							| 
									
										
										
										
											2021-03-16 17:47:45 +08:00
										 |  |  | 		}), | 
					
						
							| 
									
										
										
										
											2021-07-20 12:52:57 +08:00
										 |  |  | 		maxExemplars: prometheus.NewGauge(prometheus.GaugeOpts{ | 
					
						
							|  |  |  | 			Name: "prometheus_tsdb_exemplar_max_exemplars", | 
					
						
							|  |  |  | 			Help: "Total number of exemplars the exemplar storage can store, resizeable.", | 
					
						
							|  |  |  | 		}), | 
					
						
							| 
									
										
										
										
											2021-03-16 17:47:45 +08:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2021-07-20 12:52:57 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-03-16 17:47:45 +08:00
										 |  |  | 	if reg != nil { | 
					
						
							| 
									
										
										
										
											2021-04-16 20:44:53 +08:00
										 |  |  | 		reg.MustRegister( | 
					
						
							| 
									
										
										
										
											2021-07-20 12:52:57 +08:00
										 |  |  | 			m.exemplarsAppended, | 
					
						
							|  |  |  | 			m.exemplarsInStorage, | 
					
						
							|  |  |  | 			m.seriesWithExemplarsInStorage, | 
					
						
							|  |  |  | 			m.lastExemplarsTs, | 
					
						
							|  |  |  | 			m.outOfOrderExemplars, | 
					
						
							|  |  |  | 			m.maxExemplars, | 
					
						
							| 
									
										
										
										
											2021-04-16 20:44:53 +08:00
										 |  |  | 		) | 
					
						
							| 
									
										
										
										
											2021-03-16 17:47:45 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-07-20 12:52:57 +08:00
										 |  |  | 	return &m | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-05-08 23:57:09 +08:00
										 |  |  | // NewCircularExemplarStorage creates a circular in memory exemplar storage.
 | 
					
						
							| 
									
										
										
										
											2021-07-20 12:52:57 +08:00
										 |  |  | // If we assume the average case 95 bytes per exemplar we can fit 5651272 exemplars in
 | 
					
						
							|  |  |  | // 1GB of extra memory, accounting for the fact that this is heap allocated space.
 | 
					
						
							|  |  |  | // If len <= 0, then the exemplar storage is essentially a noop storage but can later be
 | 
					
						
							|  |  |  | // resized to store exemplars.
 | 
					
						
							| 
									
										
										
										
											2023-04-09 15:08:40 +08:00
										 |  |  | func NewCircularExemplarStorage(length int64, m *ExemplarMetrics) (ExemplarStorage, error) { | 
					
						
							|  |  |  | 	if length < 0 { | 
					
						
							|  |  |  | 		length = 0 | 
					
						
							| 
									
										
										
										
											2021-07-20 12:52:57 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 	c := &CircularExemplarStorage{ | 
					
						
							| 
									
										
										
										
											2024-05-11 23:37:52 +08:00
										 |  |  | 		exemplars: make([]circularBufferEntry, length), | 
					
						
							| 
									
										
										
										
											2023-04-09 15:08:40 +08:00
										 |  |  | 		index:     make(map[string]*indexEntry, length/estimatedExemplarsPerSeries), | 
					
						
							| 
									
										
										
										
											2021-07-20 12:52:57 +08:00
										 |  |  | 		metrics:   m, | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-04-09 15:08:40 +08:00
										 |  |  | 	c.metrics.maxExemplars.Set(float64(length)) | 
					
						
							| 
									
										
										
										
											2021-07-20 12:52:57 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-03-16 17:47:45 +08:00
										 |  |  | 	return c, nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-07-20 12:52:57 +08:00
										 |  |  | func (ce *CircularExemplarStorage) ApplyConfig(cfg *config.Config) error { | 
					
						
							|  |  |  | 	ce.Resize(cfg.StorageConfig.ExemplarsConfig.MaxExemplars) | 
					
						
							|  |  |  | 	return nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-03-16 17:47:45 +08:00
										 |  |  | func (ce *CircularExemplarStorage) Appender() *CircularExemplarStorage { | 
					
						
							|  |  |  | 	return ce | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (ce *CircularExemplarStorage) ExemplarQuerier(_ context.Context) (storage.ExemplarQuerier, error) { | 
					
						
							|  |  |  | 	return ce, nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-04-16 20:44:53 +08:00
										 |  |  | func (ce *CircularExemplarStorage) Querier(_ context.Context) (storage.ExemplarQuerier, error) { | 
					
						
							| 
									
										
										
										
											2021-03-16 17:47:45 +08:00
										 |  |  | 	return ce, nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Select returns exemplars for a given set of label matchers.
 | 
					
						
							|  |  |  | func (ce *CircularExemplarStorage) Select(start, end int64, matchers ...[]*labels.Matcher) ([]exemplar.QueryResult, error) { | 
					
						
							|  |  |  | 	ret := make([]exemplar.QueryResult, 0) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-04-09 15:08:40 +08:00
										 |  |  | 	if len(ce.exemplars) == 0 { | 
					
						
							| 
									
										
										
										
											2021-07-20 12:52:57 +08:00
										 |  |  | 		return ret, nil | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-03-16 17:47:45 +08:00
										 |  |  | 	ce.lock.RLock() | 
					
						
							|  |  |  | 	defer ce.lock.RUnlock() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Loop through each index entry, which will point us to first/last exemplar for each series.
 | 
					
						
							|  |  |  | 	for _, idx := range ce.index { | 
					
						
							|  |  |  | 		var se exemplar.QueryResult | 
					
						
							| 
									
										
										
										
											2021-04-16 20:44:53 +08:00
										 |  |  | 		e := ce.exemplars[idx.oldest] | 
					
						
							| 
									
										
										
										
											2021-05-05 23:28:48 +08:00
										 |  |  | 		if e.exemplar.Ts > end || ce.exemplars[idx.newest].exemplar.Ts < start { | 
					
						
							|  |  |  | 			continue | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2021-05-06 01:51:16 +08:00
										 |  |  | 		if !matchesSomeMatcherSet(idx.seriesLabels, matchers) { | 
					
						
							| 
									
										
										
										
											2021-03-16 17:47:45 +08:00
										 |  |  | 			continue | 
					
						
							|  |  |  | 		} | 
					
						
							| 
									
										
										
										
											2021-05-06 01:51:16 +08:00
										 |  |  | 		se.SeriesLabels = idx.seriesLabels | 
					
						
							| 
									
										
										
										
											2021-03-16 17:47:45 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 		// Loop through all exemplars in the circular buffer for the current series.
 | 
					
						
							|  |  |  | 		for e.exemplar.Ts <= end { | 
					
						
							|  |  |  | 			if e.exemplar.Ts >= start { | 
					
						
							|  |  |  | 				se.Exemplars = append(se.Exemplars, e.exemplar) | 
					
						
							|  |  |  | 			} | 
					
						
							| 
									
										
										
										
											2021-07-20 12:52:57 +08:00
										 |  |  | 			if e.next == noExemplar { | 
					
						
							| 
									
										
										
										
											2021-03-16 17:47:45 +08:00
										 |  |  | 				break | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 			e = ce.exemplars[e.next] | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		if len(se.Exemplars) > 0 { | 
					
						
							|  |  |  | 			ret = append(ret, se) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-09-22 04:53:51 +08:00
										 |  |  | 	slices.SortFunc(ret, func(a, b exemplar.QueryResult) int { | 
					
						
							|  |  |  | 		return labels.Compare(a.SeriesLabels, b.SeriesLabels) | 
					
						
							| 
									
										
										
										
											2021-03-16 17:47:45 +08:00
										 |  |  | 	}) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	return ret, nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func matchesSomeMatcherSet(lbls labels.Labels, matchers [][]*labels.Matcher) bool { | 
					
						
							|  |  |  | Outer: | 
					
						
							|  |  |  | 	for _, ms := range matchers { | 
					
						
							|  |  |  | 		for _, m := range ms { | 
					
						
							|  |  |  | 			if !m.Matches(lbls.Get(m.Name)) { | 
					
						
							|  |  |  | 				continue Outer | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		return true | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return false | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-05-07 04:53:52 +08:00
										 |  |  | func (ce *CircularExemplarStorage) ValidateExemplar(l labels.Labels, e exemplar.Exemplar) error { | 
					
						
							| 
									
										
										
										
											2022-01-06 18:28:58 +08:00
										 |  |  | 	var buf [1024]byte | 
					
						
							|  |  |  | 	seriesLabels := l.Bytes(buf[:]) | 
					
						
							| 
									
										
										
										
											2021-04-16 20:44:53 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	// TODO(bwplotka): This lock can lock all scrapers, there might high contention on this on scale.
 | 
					
						
							|  |  |  | 	// Optimize by moving the lock to be per series (& benchmark it).
 | 
					
						
							| 
									
										
										
										
											2021-05-07 04:53:52 +08:00
										 |  |  | 	ce.lock.RLock() | 
					
						
							|  |  |  | 	defer ce.lock.RUnlock() | 
					
						
							| 
									
										
										
										
											2024-05-12 00:32:17 +08:00
										 |  |  | 	return ce.validateExemplar(ce.index[string(seriesLabels)], e, false) | 
					
						
							| 
									
										
										
										
											2021-05-07 04:53:52 +08:00
										 |  |  | } | 
					
						
							| 
									
										
										
										
											2021-03-16 17:47:45 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-05-21 15:20:07 +08:00
										 |  |  | // Not thread safe. The appended parameters tells us whether this is an external validation, or internal
 | 
					
						
							| 
									
										
										
										
											2021-05-12 22:47:05 +08:00
										 |  |  | // as a result of an AddExemplar call, in which case we should update any relevant metrics.
 | 
					
						
							| 
									
										
										
										
											2024-05-12 00:32:17 +08:00
										 |  |  | func (ce *CircularExemplarStorage) validateExemplar(idx *indexEntry, e exemplar.Exemplar, appended bool) error { | 
					
						
							| 
									
										
										
										
											2023-04-09 15:08:40 +08:00
										 |  |  | 	if len(ce.exemplars) == 0 { | 
					
						
							| 
									
										
										
										
											2021-07-20 12:52:57 +08:00
										 |  |  | 		return storage.ErrExemplarsDisabled | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-05-07 04:53:52 +08:00
										 |  |  | 	// Exemplar label length does not include chars involved in text rendering such as quotes
 | 
					
						
							| 
									
										
										
										
											2021-05-12 22:47:05 +08:00
										 |  |  | 	// equals sign, or commas. See definition of const ExemplarMaxLabelLength.
 | 
					
						
							| 
									
										
										
										
											2021-05-07 04:53:52 +08:00
										 |  |  | 	labelSetLen := 0 | 
					
						
							| 
									
										
										
										
											2022-03-10 06:17:40 +08:00
										 |  |  | 	if err := e.Labels.Validate(func(l labels.Label) error { | 
					
						
							| 
									
										
										
										
											2021-05-07 04:53:52 +08:00
										 |  |  | 		labelSetLen += utf8.RuneCountInString(l.Name) | 
					
						
							|  |  |  | 		labelSetLen += utf8.RuneCountInString(l.Value) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		if labelSetLen > exemplar.ExemplarMaxLabelSetLength { | 
					
						
							|  |  |  | 			return storage.ErrExemplarLabelLength | 
					
						
							| 
									
										
										
										
											2021-04-16 20:44:53 +08:00
										 |  |  | 		} | 
					
						
							| 
									
										
										
										
											2022-03-10 06:17:40 +08:00
										 |  |  | 		return nil | 
					
						
							|  |  |  | 	}); err != nil { | 
					
						
							|  |  |  | 		return err | 
					
						
							| 
									
										
										
										
											2021-05-07 04:53:52 +08:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2021-03-16 17:47:45 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-05-12 00:32:17 +08:00
										 |  |  | 	if idx == nil { | 
					
						
							| 
									
										
										
										
											2021-05-12 22:47:05 +08:00
										 |  |  | 		return nil | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-05-07 04:53:52 +08:00
										 |  |  | 	// Check for duplicate vs last stored exemplar for this series.
 | 
					
						
							|  |  |  | 	// NB these are expected, and appending them is a no-op.
 | 
					
						
							| 
									
										
										
										
											2023-11-16 22:07:37 +08:00
										 |  |  | 	// For floats and classic histograms, there is only 1 exemplar per series,
 | 
					
						
							|  |  |  | 	// so this is sufficient. For native histograms with multiple exemplars per series,
 | 
					
						
							|  |  |  | 	// we have another check below.
 | 
					
						
							|  |  |  | 	newestExemplar := ce.exemplars[idx.newest].exemplar | 
					
						
							|  |  |  | 	if newestExemplar.Equals(e) { | 
					
						
							| 
									
										
										
										
											2021-05-07 04:53:52 +08:00
										 |  |  | 		return storage.ErrDuplicateExemplar | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-11-16 22:07:37 +08:00
										 |  |  | 	// Since during the scrape the exemplars are sorted first by timestamp, then value, then labels,
 | 
					
						
							|  |  |  | 	// if any of these conditions are true, we know that the exemplar is either a duplicate
 | 
					
						
							|  |  |  | 	// of a previous one (but not the most recent one as that is checked above) or out of order.
 | 
					
						
							|  |  |  | 	// We now allow exemplars with duplicate timestamps as long as they have different values and/or labels
 | 
					
						
							|  |  |  | 	// since that can happen for different buckets of a native histogram.
 | 
					
						
							|  |  |  | 	// We do not distinguish between duplicates and out of order as iterating through the exemplars
 | 
					
						
							|  |  |  | 	// to check for that would be expensive (versus just comparing with the most recent one) especially
 | 
					
						
							|  |  |  | 	// since this is run under a lock, and not worth it as we just need to return an error so we do not
 | 
					
						
							|  |  |  | 	// append the exemplar.
 | 
					
						
							|  |  |  | 	if e.Ts < newestExemplar.Ts || | 
					
						
							|  |  |  | 		(e.Ts == newestExemplar.Ts && e.Value < newestExemplar.Value) || | 
					
						
							|  |  |  | 		(e.Ts == newestExemplar.Ts && e.Value == newestExemplar.Value && e.Labels.Hash() < newestExemplar.Labels.Hash()) { | 
					
						
							| 
									
										
										
										
											2023-05-21 15:20:07 +08:00
										 |  |  | 		if appended { | 
					
						
							| 
									
										
										
										
											2021-07-20 12:52:57 +08:00
										 |  |  | 			ce.metrics.outOfOrderExemplars.Inc() | 
					
						
							| 
									
										
										
										
											2021-04-16 20:44:53 +08:00
										 |  |  | 		} | 
					
						
							| 
									
										
										
										
											2021-05-07 04:53:52 +08:00
										 |  |  | 		return storage.ErrOutOfOrderExemplar | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return nil | 
					
						
							|  |  |  | } | 
					
						
							| 
									
										
										
										
											2021-03-16 17:47:45 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-07-20 12:52:57 +08:00
										 |  |  | // Resize changes the size of exemplar buffer by allocating a new buffer and migrating data to it.
 | 
					
						
							|  |  |  | // Exemplars are kept when possible. Shrinking will discard oldest data (in order of ingest) as needed.
 | 
					
						
							|  |  |  | func (ce *CircularExemplarStorage) Resize(l int64) int { | 
					
						
							|  |  |  | 	// Accept negative values as just 0 size.
 | 
					
						
							|  |  |  | 	if l <= 0 { | 
					
						
							|  |  |  | 		l = 0 | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	if l == int64(len(ce.exemplars)) { | 
					
						
							|  |  |  | 		return 0 | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	ce.lock.Lock() | 
					
						
							|  |  |  | 	defer ce.lock.Unlock() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	oldBuffer := ce.exemplars | 
					
						
							|  |  |  | 	oldNextIndex := int64(ce.nextIndex) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-05-11 23:37:52 +08:00
										 |  |  | 	ce.exemplars = make([]circularBufferEntry, l) | 
					
						
							| 
									
										
										
										
											2022-01-06 18:28:58 +08:00
										 |  |  | 	ce.index = make(map[string]*indexEntry, l/estimatedExemplarsPerSeries) | 
					
						
							| 
									
										
										
										
											2021-07-20 12:52:57 +08:00
										 |  |  | 	ce.nextIndex = 0 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Replay as many entries as needed, starting with oldest first.
 | 
					
						
							|  |  |  | 	count := int64(len(oldBuffer)) | 
					
						
							|  |  |  | 	if l < count { | 
					
						
							|  |  |  | 		count = l | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	migrated := 0 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-09-03 02:08:05 +08:00
										 |  |  | 	if l > 0 && len(oldBuffer) > 0 { | 
					
						
							| 
									
										
										
										
											2021-07-20 12:52:57 +08:00
										 |  |  | 		// Rewind previous next index by count with wrap-around.
 | 
					
						
							|  |  |  | 		// This math is essentially looking at nextIndex, where we would write the next exemplar to,
 | 
					
						
							|  |  |  | 		// and find the index in the old exemplar buffer that we should start migrating exemplars from.
 | 
					
						
							|  |  |  | 		// This way we don't migrate exemplars that would just be overwritten when migrating later exemplars.
 | 
					
						
							| 
									
										
										
										
											2021-10-22 16:06:44 +08:00
										 |  |  | 		startIndex := (oldNextIndex - count + int64(len(oldBuffer))) % int64(len(oldBuffer)) | 
					
						
							| 
									
										
										
										
											2021-07-20 12:52:57 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-05-12 00:22:56 +08:00
										 |  |  | 		var buf [1024]byte | 
					
						
							| 
									
										
										
										
											2021-07-20 12:52:57 +08:00
										 |  |  | 		for i := int64(0); i < count; i++ { | 
					
						
							|  |  |  | 			idx := (startIndex + i) % int64(len(oldBuffer)) | 
					
						
							| 
									
										
										
										
											2024-05-11 23:37:52 +08:00
										 |  |  | 			if oldBuffer[idx].ref != nil { | 
					
						
							| 
									
										
										
										
											2024-05-12 00:22:56 +08:00
										 |  |  | 				ce.migrate(&oldBuffer[idx], buf[:]) | 
					
						
							| 
									
										
										
										
											2021-07-20 12:52:57 +08:00
										 |  |  | 				migrated++ | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	ce.computeMetrics() | 
					
						
							|  |  |  | 	ce.metrics.maxExemplars.Set(float64(l)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	return migrated | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // migrate is like AddExemplar but reuses existing structs. Expected to be called in batch and requires
 | 
					
						
							|  |  |  | // external lock and does not compute metrics.
 | 
					
						
							| 
									
										
										
										
											2024-05-12 00:22:56 +08:00
										 |  |  | func (ce *CircularExemplarStorage) migrate(entry *circularBufferEntry, buf []byte) { | 
					
						
							|  |  |  | 	seriesLabels := entry.ref.seriesLabels.Bytes(buf[:0]) | 
					
						
							| 
									
										
										
										
											2021-07-20 12:52:57 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-01-06 18:28:58 +08:00
										 |  |  | 	idx, ok := ce.index[string(seriesLabels)] | 
					
						
							| 
									
										
										
										
											2021-07-20 12:52:57 +08:00
										 |  |  | 	if !ok { | 
					
						
							|  |  |  | 		idx = entry.ref | 
					
						
							|  |  |  | 		idx.oldest = ce.nextIndex | 
					
						
							| 
									
										
										
										
											2022-01-06 18:28:58 +08:00
										 |  |  | 		ce.index[string(seriesLabels)] = idx | 
					
						
							| 
									
										
										
										
											2021-07-20 12:52:57 +08:00
										 |  |  | 	} else { | 
					
						
							|  |  |  | 		entry.ref = idx | 
					
						
							|  |  |  | 		ce.exemplars[idx.newest].next = ce.nextIndex | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	idx.newest = ce.nextIndex | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	entry.next = noExemplar | 
					
						
							| 
									
										
										
										
											2024-05-11 23:37:52 +08:00
										 |  |  | 	ce.exemplars[ce.nextIndex] = *entry | 
					
						
							| 
									
										
										
										
											2021-07-20 12:52:57 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	ce.nextIndex = (ce.nextIndex + 1) % len(ce.exemplars) | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-05-07 04:53:52 +08:00
										 |  |  | func (ce *CircularExemplarStorage) AddExemplar(l labels.Labels, e exemplar.Exemplar) error { | 
					
						
							| 
									
										
										
										
											2023-04-09 15:08:40 +08:00
										 |  |  | 	if len(ce.exemplars) == 0 { | 
					
						
							| 
									
										
										
										
											2021-07-20 12:52:57 +08:00
										 |  |  | 		return storage.ErrExemplarsDisabled | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-01-06 18:28:58 +08:00
										 |  |  | 	var buf [1024]byte | 
					
						
							|  |  |  | 	seriesLabels := l.Bytes(buf[:]) | 
					
						
							| 
									
										
										
										
											2021-05-07 04:53:52 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	// TODO(bwplotka): This lock can lock all scrapers, there might high contention on this on scale.
 | 
					
						
							|  |  |  | 	// Optimize by moving the lock to be per series (& benchmark it).
 | 
					
						
							|  |  |  | 	ce.lock.Lock() | 
					
						
							|  |  |  | 	defer ce.lock.Unlock() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-05-12 00:32:17 +08:00
										 |  |  | 	idx, ok := ce.index[string(seriesLabels)] | 
					
						
							|  |  |  | 	err := ce.validateExemplar(idx, e, true) | 
					
						
							| 
									
										
										
										
											2021-05-07 04:53:52 +08:00
										 |  |  | 	if err != nil { | 
					
						
							| 
									
										
										
										
											2023-11-17 02:54:41 +08:00
										 |  |  | 		if errors.Is(err, storage.ErrDuplicateExemplar) { | 
					
						
							| 
									
										
										
										
											2021-05-07 04:53:52 +08:00
										 |  |  | 			// Duplicate exemplar, noop.
 | 
					
						
							|  |  |  | 			return nil | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		return err | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	if !ok { | 
					
						
							| 
									
										
										
										
											2024-05-11 23:25:00 +08:00
										 |  |  | 		idx = &indexEntry{oldest: ce.nextIndex, seriesLabels: l} | 
					
						
							|  |  |  | 		ce.index[string(seriesLabels)] = idx | 
					
						
							| 
									
										
										
										
											2021-05-07 04:53:52 +08:00
										 |  |  | 	} else { | 
					
						
							| 
									
										
										
										
											2024-05-11 23:25:00 +08:00
										 |  |  | 		ce.exemplars[idx.newest].next = ce.nextIndex | 
					
						
							| 
									
										
										
										
											2021-03-16 17:47:45 +08:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2021-04-16 20:44:53 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-05-11 23:37:52 +08:00
										 |  |  | 	if prev := &ce.exemplars[ce.nextIndex]; prev.ref != nil { | 
					
						
							| 
									
										
										
										
											2023-07-13 20:16:10 +08:00
										 |  |  | 		// There exists an exemplar already on this ce.nextIndex entry,
 | 
					
						
							|  |  |  | 		// drop it, to make place for others.
 | 
					
						
							| 
									
										
										
										
											2021-07-20 12:52:57 +08:00
										 |  |  | 		if prev.next == noExemplar { | 
					
						
							| 
									
										
										
										
											2021-04-16 20:44:53 +08:00
										 |  |  | 			// Last item for this series, remove index entry.
 | 
					
						
							| 
									
										
										
										
											2024-05-11 23:25:00 +08:00
										 |  |  | 			var buf [1024]byte | 
					
						
							|  |  |  | 			prevLabels := prev.ref.seriesLabels.Bytes(buf[:]) | 
					
						
							| 
									
										
										
										
											2022-01-06 18:28:58 +08:00
										 |  |  | 			delete(ce.index, string(prevLabels)) | 
					
						
							| 
									
										
										
										
											2021-04-16 20:44:53 +08:00
										 |  |  | 		} else { | 
					
						
							| 
									
										
										
										
											2024-05-11 23:25:00 +08:00
										 |  |  | 			prev.ref.oldest = prev.next | 
					
						
							| 
									
										
										
										
											2021-04-16 20:44:53 +08:00
										 |  |  | 		} | 
					
						
							| 
									
										
										
										
											2021-03-16 17:47:45 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-04-16 20:44:53 +08:00
										 |  |  | 	// Default the next value to -1 (which we use to detect that we've iterated through all exemplars for a series in Select)
 | 
					
						
							|  |  |  | 	// since this is the first exemplar stored for this series.
 | 
					
						
							| 
									
										
										
										
											2021-07-20 12:52:57 +08:00
										 |  |  | 	ce.exemplars[ce.nextIndex].next = noExemplar | 
					
						
							| 
									
										
										
										
											2021-04-16 20:44:53 +08:00
										 |  |  | 	ce.exemplars[ce.nextIndex].exemplar = e | 
					
						
							| 
									
										
										
										
											2024-05-11 23:25:00 +08:00
										 |  |  | 	ce.exemplars[ce.nextIndex].ref = idx | 
					
						
							|  |  |  | 	idx.newest = ce.nextIndex | 
					
						
							| 
									
										
										
										
											2021-04-16 20:44:53 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-03-16 17:47:45 +08:00
										 |  |  | 	ce.nextIndex = (ce.nextIndex + 1) % len(ce.exemplars) | 
					
						
							| 
									
										
										
										
											2021-04-16 20:44:53 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-07-20 12:52:57 +08:00
										 |  |  | 	ce.metrics.exemplarsAppended.Inc() | 
					
						
							|  |  |  | 	ce.computeMetrics() | 
					
						
							| 
									
										
										
										
											2021-03-16 17:47:45 +08:00
										 |  |  | 	return nil | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-07-20 12:52:57 +08:00
										 |  |  | func (ce *CircularExemplarStorage) computeMetrics() { | 
					
						
							|  |  |  | 	ce.metrics.seriesWithExemplarsInStorage.Set(float64(len(ce.index))) | 
					
						
							| 
									
										
										
										
											2021-05-07 04:53:52 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-07-20 12:52:57 +08:00
										 |  |  | 	if len(ce.exemplars) == 0 { | 
					
						
							|  |  |  | 		ce.metrics.exemplarsInStorage.Set(float64(0)) | 
					
						
							|  |  |  | 		ce.metrics.lastExemplarsTs.Set(float64(0)) | 
					
						
							|  |  |  | 		return | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2021-03-16 17:47:45 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-05-11 23:37:52 +08:00
										 |  |  | 	if ce.exemplars[ce.nextIndex].ref != nil { | 
					
						
							| 
									
										
										
										
											2021-07-20 12:52:57 +08:00
										 |  |  | 		ce.metrics.exemplarsInStorage.Set(float64(len(ce.exemplars))) | 
					
						
							| 
									
										
										
										
											2024-05-11 23:37:52 +08:00
										 |  |  | 		ce.metrics.lastExemplarsTs.Set(float64(ce.exemplars[ce.nextIndex].exemplar.Ts) / 1000) | 
					
						
							| 
									
										
										
										
											2021-07-20 12:52:57 +08:00
										 |  |  | 		return | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2021-03-16 17:47:45 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-07-20 12:52:57 +08:00
										 |  |  | 	// We did not yet fill the buffer.
 | 
					
						
							|  |  |  | 	ce.metrics.exemplarsInStorage.Set(float64(ce.nextIndex)) | 
					
						
							| 
									
										
										
										
											2024-05-11 23:37:52 +08:00
										 |  |  | 	if ce.exemplars[0].ref != nil { | 
					
						
							| 
									
										
										
										
											2021-07-20 12:52:57 +08:00
										 |  |  | 		ce.metrics.lastExemplarsTs.Set(float64(ce.exemplars[0].exemplar.Ts) / 1000) | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2021-03-16 17:47:45 +08:00
										 |  |  | } | 
					
						
							| 
									
										
										
										
											2021-08-30 22:04:38 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | // IterateExemplars iterates through all the exemplars from oldest to newest appended and calls
 | 
					
						
							|  |  |  | // the given function on all of them till the end (or) till the first function call that returns an error.
 | 
					
						
							|  |  |  | func (ce *CircularExemplarStorage) IterateExemplars(f func(seriesLabels labels.Labels, e exemplar.Exemplar) error) error { | 
					
						
							|  |  |  | 	ce.lock.RLock() | 
					
						
							|  |  |  | 	defer ce.lock.RUnlock() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	idx := ce.nextIndex | 
					
						
							|  |  |  | 	l := len(ce.exemplars) | 
					
						
							|  |  |  | 	for i := 0; i < l; i, idx = i+1, (idx+1)%l { | 
					
						
							| 
									
										
										
										
											2024-05-11 23:37:52 +08:00
										 |  |  | 		if ce.exemplars[idx].ref == nil { | 
					
						
							| 
									
										
										
										
											2021-08-30 22:04:38 +08:00
										 |  |  | 			continue | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		err := f(ce.exemplars[idx].ref.seriesLabels, ce.exemplars[idx].exemplar) | 
					
						
							|  |  |  | 		if err != nil { | 
					
						
							|  |  |  | 			return err | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return nil | 
					
						
							|  |  |  | } |