| 
									
										
										
										
											2021-02-27 00:43:19 +08:00
										 |  |  | // Copyright 2021 The Prometheus Authors
 | 
					
						
							|  |  |  | // Licensed under the Apache License, Version 2.0 (the "License");
 | 
					
						
							|  |  |  | // you may not use this file except in compliance with the License.
 | 
					
						
							|  |  |  | // You may obtain a copy of the License at
 | 
					
						
							|  |  |  | //
 | 
					
						
							|  |  |  | // http://www.apache.org/licenses/LICENSE-2.0
 | 
					
						
							|  |  |  | //
 | 
					
						
							|  |  |  | // Unless required by applicable law or agreed to in writing, software
 | 
					
						
							|  |  |  | // distributed under the License is distributed on an "AS IS" BASIS,
 | 
					
						
							|  |  |  | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
					
						
							|  |  |  | // See the License for the specific language governing permissions and
 | 
					
						
							|  |  |  | // limitations under the License.
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | package remote | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import ( | 
					
						
							|  |  |  | 	"context" | 
					
						
							| 
									
										
										
										
											2023-10-31 19:15:30 +08:00
										 |  |  | 	"errors" | 
					
						
							| 
									
										
										
										
											2024-09-10 09:41:53 +08:00
										 |  |  | 	"log/slog" | 
					
						
							| 
									
										
										
										
											2021-02-27 00:43:19 +08:00
										 |  |  | 	"net/http" | 
					
						
							| 
									
										
										
										
											2024-01-16 00:24:46 +08:00
										 |  |  | 	"slices" | 
					
						
							| 
									
										
										
										
											2023-09-22 04:53:51 +08:00
										 |  |  | 	"strings" | 
					
						
							| 
									
										
										
										
											2022-11-15 23:29:16 +08:00
										 |  |  | 	"sync" | 
					
						
							| 
									
										
										
										
											2021-02-27 00:43:19 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	"github.com/prometheus/client_golang/prometheus" | 
					
						
							| 
									
										
										
										
											2021-10-22 16:19:38 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-02-27 00:43:19 +08:00
										 |  |  | 	"github.com/prometheus/prometheus/config" | 
					
						
							| 
									
										
										
										
											2021-11-08 22:23:17 +08:00
										 |  |  | 	"github.com/prometheus/prometheus/model/labels" | 
					
						
							| 
									
										
										
										
											2021-02-27 00:43:19 +08:00
										 |  |  | 	"github.com/prometheus/prometheus/prompb" | 
					
						
							|  |  |  | 	"github.com/prometheus/prometheus/storage" | 
					
						
							| 
									
										
										
										
											2023-09-15 00:57:31 +08:00
										 |  |  | 	"github.com/prometheus/prometheus/util/annotations" | 
					
						
							| 
									
										
										
										
											2021-11-08 22:23:17 +08:00
										 |  |  | 	"github.com/prometheus/prometheus/util/gate" | 
					
						
							| 
									
										
										
										
											2021-02-27 00:43:19 +08:00
										 |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | type readHandler struct { | 
					
						
							| 
									
										
										
										
											2024-09-10 09:41:53 +08:00
										 |  |  | 	logger                    *slog.Logger | 
					
						
							| 
									
										
										
										
											2021-02-27 00:43:19 +08:00
										 |  |  | 	queryable                 storage.SampleAndChunkQueryable | 
					
						
							|  |  |  | 	config                    func() config.Config | 
					
						
							|  |  |  | 	remoteReadSampleLimit     int | 
					
						
							|  |  |  | 	remoteReadMaxBytesInFrame int | 
					
						
							|  |  |  | 	remoteReadGate            *gate.Gate | 
					
						
							|  |  |  | 	queries                   prometheus.Gauge | 
					
						
							| 
									
										
										
										
											2022-11-15 23:29:16 +08:00
										 |  |  | 	marshalPool               *sync.Pool | 
					
						
							| 
									
										
										
										
											2021-02-27 00:43:19 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // NewReadHandler creates a http.Handler that accepts remote read requests and
 | 
					
						
							|  |  |  | // writes them to the provided queryable.
 | 
					
						
							| 
									
										
										
										
											2024-09-10 09:41:53 +08:00
										 |  |  | func NewReadHandler(logger *slog.Logger, r prometheus.Registerer, queryable storage.SampleAndChunkQueryable, config func() config.Config, remoteReadSampleLimit, remoteReadConcurrencyLimit, remoteReadMaxBytesInFrame int) http.Handler { | 
					
						
							| 
									
										
										
										
											2021-02-27 00:43:19 +08:00
										 |  |  | 	h := &readHandler{ | 
					
						
							|  |  |  | 		logger:                    logger, | 
					
						
							|  |  |  | 		queryable:                 queryable, | 
					
						
							|  |  |  | 		config:                    config, | 
					
						
							|  |  |  | 		remoteReadSampleLimit:     remoteReadSampleLimit, | 
					
						
							|  |  |  | 		remoteReadGate:            gate.New(remoteReadConcurrencyLimit), | 
					
						
							|  |  |  | 		remoteReadMaxBytesInFrame: remoteReadMaxBytesInFrame, | 
					
						
							| 
									
										
										
										
											2022-11-15 23:29:16 +08:00
										 |  |  | 		marshalPool:               &sync.Pool{}, | 
					
						
							| 
									
										
										
										
											2021-02-27 00:43:19 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 		queries: prometheus.NewGauge(prometheus.GaugeOpts{ | 
					
						
							| 
									
										
										
										
											2024-12-05 20:28:15 +08:00
										 |  |  | 			Namespace: namespace, | 
					
						
							|  |  |  | 			Subsystem: "remote_read_handler", | 
					
						
							|  |  |  | 			Name:      "queries", | 
					
						
							|  |  |  | 			Help:      "The current number of remote read queries that are either in execution or queued on the handler.", | 
					
						
							| 
									
										
										
										
											2021-02-27 00:43:19 +08:00
										 |  |  | 		}), | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	if r != nil { | 
					
						
							|  |  |  | 		r.MustRegister(h.queries) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	return h | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (h *readHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { | 
					
						
							|  |  |  | 	ctx := r.Context() | 
					
						
							|  |  |  | 	if err := h.remoteReadGate.Start(ctx); err != nil { | 
					
						
							|  |  |  | 		http.Error(w, err.Error(), http.StatusInternalServerError) | 
					
						
							|  |  |  | 		return | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	h.queries.Inc() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	defer h.remoteReadGate.Done() | 
					
						
							|  |  |  | 	defer h.queries.Dec() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	req, err := DecodeReadRequest(r) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		http.Error(w, err.Error(), http.StatusBadRequest) | 
					
						
							|  |  |  | 		return | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	externalLabels := h.config().GlobalConfig.ExternalLabels.Map() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	sortedExternalLabels := make([]prompb.Label, 0, len(externalLabels)) | 
					
						
							|  |  |  | 	for name, value := range externalLabels { | 
					
						
							|  |  |  | 		sortedExternalLabels = append(sortedExternalLabels, prompb.Label{ | 
					
						
							|  |  |  | 			Name:  name, | 
					
						
							|  |  |  | 			Value: value, | 
					
						
							|  |  |  | 		}) | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2023-09-22 04:53:51 +08:00
										 |  |  | 	slices.SortFunc(sortedExternalLabels, func(a, b prompb.Label) int { | 
					
						
							|  |  |  | 		return strings.Compare(a.Name, b.Name) | 
					
						
							| 
									
										
										
										
											2021-02-27 00:43:19 +08:00
										 |  |  | 	}) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	responseType, err := NegotiateResponseType(req.AcceptedResponseTypes) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		http.Error(w, err.Error(), http.StatusBadRequest) | 
					
						
							|  |  |  | 		return | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	switch responseType { | 
					
						
							|  |  |  | 	case prompb.ReadRequest_STREAMED_XOR_CHUNKS: | 
					
						
							|  |  |  | 		h.remoteReadStreamedXORChunks(ctx, w, req, externalLabels, sortedExternalLabels) | 
					
						
							|  |  |  | 	default: | 
					
						
							|  |  |  | 		// On empty or unknown types in req.AcceptedResponseTypes we default to non streamed, raw samples response.
 | 
					
						
							|  |  |  | 		h.remoteReadSamples(ctx, w, req, externalLabels, sortedExternalLabels) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (h *readHandler) remoteReadSamples( | 
					
						
							|  |  |  | 	ctx context.Context, | 
					
						
							|  |  |  | 	w http.ResponseWriter, | 
					
						
							|  |  |  | 	req *prompb.ReadRequest, | 
					
						
							|  |  |  | 	externalLabels map[string]string, | 
					
						
							|  |  |  | 	sortedExternalLabels []prompb.Label, | 
					
						
							|  |  |  | ) { | 
					
						
							|  |  |  | 	w.Header().Set("Content-Type", "application/x-protobuf") | 
					
						
							|  |  |  | 	w.Header().Set("Content-Encoding", "snappy") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	resp := prompb.ReadResponse{ | 
					
						
							|  |  |  | 		Results: make([]*prompb.QueryResult, len(req.Queries)), | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	for i, query := range req.Queries { | 
					
						
							|  |  |  | 		if err := func() error { | 
					
						
							|  |  |  | 			filteredMatchers, err := filterExtLabelsFromMatchers(query.Matchers, externalLabels) | 
					
						
							|  |  |  | 			if err != nil { | 
					
						
							|  |  |  | 				return err | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-09-12 18:37:38 +08:00
										 |  |  | 			querier, err := h.queryable.Querier(query.StartTimestampMs, query.EndTimestampMs) | 
					
						
							| 
									
										
										
										
											2021-02-27 00:43:19 +08:00
										 |  |  | 			if err != nil { | 
					
						
							|  |  |  | 				return err | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 			defer func() { | 
					
						
							|  |  |  | 				if err := querier.Close(); err != nil { | 
					
						
							| 
									
										
										
										
											2024-09-10 09:41:53 +08:00
										 |  |  | 					h.logger.Warn("Error on querier close", "err", err.Error()) | 
					
						
							| 
									
										
										
										
											2021-02-27 00:43:19 +08:00
										 |  |  | 				} | 
					
						
							|  |  |  | 			}() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 			var hints *storage.SelectHints | 
					
						
							|  |  |  | 			if query.Hints != nil { | 
					
						
							|  |  |  | 				hints = &storage.SelectHints{ | 
					
						
							|  |  |  | 					Start:    query.Hints.StartMs, | 
					
						
							|  |  |  | 					End:      query.Hints.EndMs, | 
					
						
							|  |  |  | 					Step:     query.Hints.StepMs, | 
					
						
							|  |  |  | 					Func:     query.Hints.Func, | 
					
						
							|  |  |  | 					Grouping: query.Hints.Grouping, | 
					
						
							|  |  |  | 					Range:    query.Hints.RangeMs, | 
					
						
							|  |  |  | 					By:       query.Hints.By, | 
					
						
							|  |  |  | 				} | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-09-15 00:57:31 +08:00
										 |  |  | 			var ws annotations.Annotations | 
					
						
							| 
									
										
										
										
											2023-09-12 18:37:38 +08:00
										 |  |  | 			resp.Results[i], ws, err = ToQueryResult(querier.Select(ctx, false, hints, filteredMatchers...), h.remoteReadSampleLimit) | 
					
						
							| 
									
										
										
										
											2021-02-27 00:43:19 +08:00
										 |  |  | 			if err != nil { | 
					
						
							|  |  |  | 				return err | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 			for _, w := range ws { | 
					
						
							| 
									
										
										
										
											2024-09-10 09:41:53 +08:00
										 |  |  | 				h.logger.Warn("Warnings on remote read query", "err", w.Error()) | 
					
						
							| 
									
										
										
										
											2021-02-27 00:43:19 +08:00
										 |  |  | 			} | 
					
						
							|  |  |  | 			for _, ts := range resp.Results[i].Timeseries { | 
					
						
							|  |  |  | 				ts.Labels = MergeLabels(ts.Labels, sortedExternalLabels) | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 			return nil | 
					
						
							|  |  |  | 		}(); err != nil { | 
					
						
							| 
									
										
										
										
											2023-10-31 19:15:30 +08:00
										 |  |  | 			var httpErr HTTPError | 
					
						
							|  |  |  | 			if errors.As(err, &httpErr) { | 
					
						
							| 
									
										
										
										
											2021-02-27 00:43:19 +08:00
										 |  |  | 				http.Error(w, httpErr.Error(), httpErr.Status()) | 
					
						
							|  |  |  | 				return | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 			http.Error(w, err.Error(), http.StatusInternalServerError) | 
					
						
							|  |  |  | 			return | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	if err := EncodeReadResponse(&resp, w); err != nil { | 
					
						
							|  |  |  | 		http.Error(w, err.Error(), http.StatusInternalServerError) | 
					
						
							|  |  |  | 		return | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func (h *readHandler) remoteReadStreamedXORChunks(ctx context.Context, w http.ResponseWriter, req *prompb.ReadRequest, externalLabels map[string]string, sortedExternalLabels []prompb.Label) { | 
					
						
							|  |  |  | 	w.Header().Set("Content-Type", "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	f, ok := w.(http.Flusher) | 
					
						
							|  |  |  | 	if !ok { | 
					
						
							|  |  |  | 		http.Error(w, "internal http.ResponseWriter does not implement http.Flusher interface", http.StatusInternalServerError) | 
					
						
							|  |  |  | 		return | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	for i, query := range req.Queries { | 
					
						
							|  |  |  | 		if err := func() error { | 
					
						
							|  |  |  | 			filteredMatchers, err := filterExtLabelsFromMatchers(query.Matchers, externalLabels) | 
					
						
							|  |  |  | 			if err != nil { | 
					
						
							|  |  |  | 				return err | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-07-26 16:49:25 +08:00
										 |  |  | 			querier, err := h.queryable.ChunkQuerier(query.StartTimestampMs, query.EndTimestampMs) | 
					
						
							|  |  |  | 			if err != nil { | 
					
						
							| 
									
										
										
										
											2021-02-27 00:43:19 +08:00
										 |  |  | 				return err | 
					
						
							|  |  |  | 			} | 
					
						
							| 
									
										
										
										
											2024-07-26 16:49:25 +08:00
										 |  |  | 			defer func() { | 
					
						
							|  |  |  | 				if err := querier.Close(); err != nil { | 
					
						
							| 
									
										
										
										
											2024-09-10 09:41:53 +08:00
										 |  |  | 					h.logger.Warn("Error on chunk querier close", "err", err.Error()) | 
					
						
							| 
									
										
										
										
											2024-07-26 16:49:25 +08:00
										 |  |  | 				} | 
					
						
							|  |  |  | 			}() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 			var hints *storage.SelectHints | 
					
						
							|  |  |  | 			if query.Hints != nil { | 
					
						
							|  |  |  | 				hints = &storage.SelectHints{ | 
					
						
							|  |  |  | 					Start:    query.Hints.StartMs, | 
					
						
							|  |  |  | 					End:      query.Hints.EndMs, | 
					
						
							|  |  |  | 					Step:     query.Hints.StepMs, | 
					
						
							|  |  |  | 					Func:     query.Hints.Func, | 
					
						
							|  |  |  | 					Grouping: query.Hints.Grouping, | 
					
						
							|  |  |  | 					Range:    query.Hints.RangeMs, | 
					
						
							|  |  |  | 					By:       query.Hints.By, | 
					
						
							|  |  |  | 				} | 
					
						
							|  |  |  | 			} | 
					
						
							| 
									
										
										
										
											2021-02-27 00:43:19 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 			ws, err := StreamChunkedReadResponses( | 
					
						
							|  |  |  | 				NewChunkedWriter(w, f), | 
					
						
							|  |  |  | 				int64(i), | 
					
						
							|  |  |  | 				// The streaming API has to provide the series sorted.
 | 
					
						
							| 
									
										
										
										
											2024-07-26 16:49:25 +08:00
										 |  |  | 				querier.Select(ctx, true, hints, filteredMatchers...), | 
					
						
							| 
									
										
										
										
											2021-02-27 00:43:19 +08:00
										 |  |  | 				sortedExternalLabels, | 
					
						
							|  |  |  | 				h.remoteReadMaxBytesInFrame, | 
					
						
							| 
									
										
										
										
											2022-11-15 23:29:16 +08:00
										 |  |  | 				h.marshalPool, | 
					
						
							| 
									
										
										
										
											2021-02-27 00:43:19 +08:00
										 |  |  | 			) | 
					
						
							|  |  |  | 			if err != nil { | 
					
						
							|  |  |  | 				return err | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 			for _, w := range ws { | 
					
						
							| 
									
										
										
										
											2024-09-10 09:41:53 +08:00
										 |  |  | 				h.logger.Warn("Warnings on chunked remote read query", "warnings", w.Error()) | 
					
						
							| 
									
										
										
										
											2021-02-27 00:43:19 +08:00
										 |  |  | 			} | 
					
						
							|  |  |  | 			return nil | 
					
						
							|  |  |  | 		}(); err != nil { | 
					
						
							| 
									
										
										
										
											2023-10-31 19:15:30 +08:00
										 |  |  | 			var httpErr HTTPError | 
					
						
							|  |  |  | 			if errors.As(err, &httpErr) { | 
					
						
							| 
									
										
										
										
											2021-02-27 00:43:19 +08:00
										 |  |  | 				http.Error(w, httpErr.Error(), httpErr.Status()) | 
					
						
							|  |  |  | 				return | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 			http.Error(w, err.Error(), http.StatusInternalServerError) | 
					
						
							|  |  |  | 			return | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // filterExtLabelsFromMatchers change equality matchers which match external labels
 | 
					
						
							|  |  |  | // to a matcher that looks for an empty label,
 | 
					
						
							|  |  |  | // as that label should not be present in the storage.
 | 
					
						
							|  |  |  | func filterExtLabelsFromMatchers(pbMatchers []*prompb.LabelMatcher, externalLabels map[string]string) ([]*labels.Matcher, error) { | 
					
						
							|  |  |  | 	matchers, err := FromLabelMatchers(pbMatchers) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return nil, err | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	filteredMatchers := make([]*labels.Matcher, 0, len(matchers)) | 
					
						
							|  |  |  | 	for _, m := range matchers { | 
					
						
							|  |  |  | 		value := externalLabels[m.Name] | 
					
						
							|  |  |  | 		if m.Type == labels.MatchEqual && value == m.Value { | 
					
						
							|  |  |  | 			matcher, err := labels.NewMatcher(labels.MatchEqual, m.Name, "") | 
					
						
							|  |  |  | 			if err != nil { | 
					
						
							|  |  |  | 				return nil, err | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 			filteredMatchers = append(filteredMatchers, matcher) | 
					
						
							|  |  |  | 		} else { | 
					
						
							|  |  |  | 			filteredMatchers = append(filteredMatchers, m) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	return filteredMatchers, nil | 
					
						
							|  |  |  | } |