281 lines
		
	
	
		
			8.3 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			281 lines
		
	
	
		
			8.3 KiB
		
	
	
	
		
			Go
		
	
	
	
| // Copyright 2021 The Prometheus Authors
 | |
| // Licensed under the Apache License, Version 2.0 (the "License");
 | |
| // you may not use this file except in compliance with the License.
 | |
| // You may obtain a copy of the License at
 | |
| //
 | |
| // http://www.apache.org/licenses/LICENSE-2.0
 | |
| //
 | |
| // Unless required by applicable law or agreed to in writing, software
 | |
| // distributed under the License is distributed on an "AS IS" BASIS,
 | |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| // See the License for the specific language governing permissions and
 | |
| // limitations under the License.
 | |
| 
 | |
| package remote
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"errors"
 | |
| 	"log/slog"
 | |
| 	"net/http"
 | |
| 	"slices"
 | |
| 	"strings"
 | |
| 	"sync"
 | |
| 
 | |
| 	"github.com/prometheus/client_golang/prometheus"
 | |
| 
 | |
| 	"github.com/prometheus/prometheus/config"
 | |
| 	"github.com/prometheus/prometheus/model/labels"
 | |
| 	"github.com/prometheus/prometheus/prompb"
 | |
| 	"github.com/prometheus/prometheus/storage"
 | |
| 	"github.com/prometheus/prometheus/util/annotations"
 | |
| 	"github.com/prometheus/prometheus/util/gate"
 | |
| )
 | |
| 
 | |
| type readHandler struct {
 | |
| 	logger                    *slog.Logger
 | |
| 	queryable                 storage.SampleAndChunkQueryable
 | |
| 	config                    func() config.Config
 | |
| 	remoteReadSampleLimit     int
 | |
| 	remoteReadMaxBytesInFrame int
 | |
| 	remoteReadGate            *gate.Gate
 | |
| 	queries                   prometheus.Gauge
 | |
| 	marshalPool               *sync.Pool
 | |
| }
 | |
| 
 | |
| // NewReadHandler creates a http.Handler that accepts remote read requests and
 | |
| // writes them to the provided queryable.
 | |
| func NewReadHandler(logger *slog.Logger, r prometheus.Registerer, queryable storage.SampleAndChunkQueryable, config func() config.Config, remoteReadSampleLimit, remoteReadConcurrencyLimit, remoteReadMaxBytesInFrame int) http.Handler {
 | |
| 	h := &readHandler{
 | |
| 		logger:                    logger,
 | |
| 		queryable:                 queryable,
 | |
| 		config:                    config,
 | |
| 		remoteReadSampleLimit:     remoteReadSampleLimit,
 | |
| 		remoteReadGate:            gate.New(remoteReadConcurrencyLimit),
 | |
| 		remoteReadMaxBytesInFrame: remoteReadMaxBytesInFrame,
 | |
| 		marshalPool:               &sync.Pool{},
 | |
| 
 | |
| 		queries: prometheus.NewGauge(prometheus.GaugeOpts{
 | |
| 			Namespace: namespace,
 | |
| 			Subsystem: "remote_read_handler",
 | |
| 			Name:      "queries",
 | |
| 			Help:      "The current number of remote read queries that are either in execution or queued on the handler.",
 | |
| 		}),
 | |
| 	}
 | |
| 	if r != nil {
 | |
| 		r.MustRegister(h.queries)
 | |
| 	}
 | |
| 	return h
 | |
| }
 | |
| 
 | |
| func (h *readHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 | |
| 	ctx := r.Context()
 | |
| 	if err := h.remoteReadGate.Start(ctx); err != nil {
 | |
| 		http.Error(w, err.Error(), http.StatusInternalServerError)
 | |
| 		return
 | |
| 	}
 | |
| 	h.queries.Inc()
 | |
| 
 | |
| 	defer h.remoteReadGate.Done()
 | |
| 	defer h.queries.Dec()
 | |
| 
 | |
| 	req, err := DecodeReadRequest(r)
 | |
| 	if err != nil {
 | |
| 		http.Error(w, err.Error(), http.StatusBadRequest)
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	externalLabels := h.config().GlobalConfig.ExternalLabels.Map()
 | |
| 
 | |
| 	sortedExternalLabels := make([]prompb.Label, 0, len(externalLabels))
 | |
| 	for name, value := range externalLabels {
 | |
| 		sortedExternalLabels = append(sortedExternalLabels, prompb.Label{
 | |
| 			Name:  name,
 | |
| 			Value: value,
 | |
| 		})
 | |
| 	}
 | |
| 	slices.SortFunc(sortedExternalLabels, func(a, b prompb.Label) int {
 | |
| 		return strings.Compare(a.Name, b.Name)
 | |
| 	})
 | |
| 
 | |
| 	responseType, err := NegotiateResponseType(req.AcceptedResponseTypes)
 | |
| 	if err != nil {
 | |
| 		http.Error(w, err.Error(), http.StatusBadRequest)
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	switch responseType {
 | |
| 	case prompb.ReadRequest_STREAMED_XOR_CHUNKS:
 | |
| 		h.remoteReadStreamedXORChunks(ctx, w, req, externalLabels, sortedExternalLabels)
 | |
| 	default:
 | |
| 		// On empty or unknown types in req.AcceptedResponseTypes we default to non streamed, raw samples response.
 | |
| 		h.remoteReadSamples(ctx, w, req, externalLabels, sortedExternalLabels)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (h *readHandler) remoteReadSamples(
 | |
| 	ctx context.Context,
 | |
| 	w http.ResponseWriter,
 | |
| 	req *prompb.ReadRequest,
 | |
| 	externalLabels map[string]string,
 | |
| 	sortedExternalLabels []prompb.Label,
 | |
| ) {
 | |
| 	w.Header().Set("Content-Type", "application/x-protobuf")
 | |
| 	w.Header().Set("Content-Encoding", "snappy")
 | |
| 
 | |
| 	resp := prompb.ReadResponse{
 | |
| 		Results: make([]*prompb.QueryResult, len(req.Queries)),
 | |
| 	}
 | |
| 	for i, query := range req.Queries {
 | |
| 		if err := func() error {
 | |
| 			filteredMatchers, err := filterExtLabelsFromMatchers(query.Matchers, externalLabels)
 | |
| 			if err != nil {
 | |
| 				return err
 | |
| 			}
 | |
| 
 | |
| 			querier, err := h.queryable.Querier(query.StartTimestampMs, query.EndTimestampMs)
 | |
| 			if err != nil {
 | |
| 				return err
 | |
| 			}
 | |
| 			defer func() {
 | |
| 				if err := querier.Close(); err != nil {
 | |
| 					h.logger.Warn("Error on querier close", "err", err.Error())
 | |
| 				}
 | |
| 			}()
 | |
| 
 | |
| 			var hints *storage.SelectHints
 | |
| 			if query.Hints != nil {
 | |
| 				hints = &storage.SelectHints{
 | |
| 					Start:    query.Hints.StartMs,
 | |
| 					End:      query.Hints.EndMs,
 | |
| 					Step:     query.Hints.StepMs,
 | |
| 					Func:     query.Hints.Func,
 | |
| 					Grouping: query.Hints.Grouping,
 | |
| 					Range:    query.Hints.RangeMs,
 | |
| 					By:       query.Hints.By,
 | |
| 				}
 | |
| 			}
 | |
| 
 | |
| 			var ws annotations.Annotations
 | |
| 			resp.Results[i], ws, err = ToQueryResult(querier.Select(ctx, false, hints, filteredMatchers...), h.remoteReadSampleLimit)
 | |
| 			if err != nil {
 | |
| 				return err
 | |
| 			}
 | |
| 			for _, w := range ws {
 | |
| 				h.logger.Warn("Warnings on remote read query", "err", w.Error())
 | |
| 			}
 | |
| 			for _, ts := range resp.Results[i].Timeseries {
 | |
| 				ts.Labels = MergeLabels(ts.Labels, sortedExternalLabels)
 | |
| 			}
 | |
| 			return nil
 | |
| 		}(); err != nil {
 | |
| 			var httpErr HTTPError
 | |
| 			if errors.As(err, &httpErr) {
 | |
| 				http.Error(w, httpErr.Error(), httpErr.Status())
 | |
| 				return
 | |
| 			}
 | |
| 			http.Error(w, err.Error(), http.StatusInternalServerError)
 | |
| 			return
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if err := EncodeReadResponse(&resp, w); err != nil {
 | |
| 		http.Error(w, err.Error(), http.StatusInternalServerError)
 | |
| 		return
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (h *readHandler) remoteReadStreamedXORChunks(ctx context.Context, w http.ResponseWriter, req *prompb.ReadRequest, externalLabels map[string]string, sortedExternalLabels []prompb.Label) {
 | |
| 	w.Header().Set("Content-Type", "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse")
 | |
| 
 | |
| 	f, ok := w.(http.Flusher)
 | |
| 	if !ok {
 | |
| 		http.Error(w, "internal http.ResponseWriter does not implement http.Flusher interface", http.StatusInternalServerError)
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	for i, query := range req.Queries {
 | |
| 		if err := func() error {
 | |
| 			filteredMatchers, err := filterExtLabelsFromMatchers(query.Matchers, externalLabels)
 | |
| 			if err != nil {
 | |
| 				return err
 | |
| 			}
 | |
| 
 | |
| 			querier, err := h.queryable.ChunkQuerier(query.StartTimestampMs, query.EndTimestampMs)
 | |
| 			if err != nil {
 | |
| 				return err
 | |
| 			}
 | |
| 			defer func() {
 | |
| 				if err := querier.Close(); err != nil {
 | |
| 					h.logger.Warn("Error on chunk querier close", "err", err.Error())
 | |
| 				}
 | |
| 			}()
 | |
| 
 | |
| 			var hints *storage.SelectHints
 | |
| 			if query.Hints != nil {
 | |
| 				hints = &storage.SelectHints{
 | |
| 					Start:    query.Hints.StartMs,
 | |
| 					End:      query.Hints.EndMs,
 | |
| 					Step:     query.Hints.StepMs,
 | |
| 					Func:     query.Hints.Func,
 | |
| 					Grouping: query.Hints.Grouping,
 | |
| 					Range:    query.Hints.RangeMs,
 | |
| 					By:       query.Hints.By,
 | |
| 				}
 | |
| 			}
 | |
| 
 | |
| 			ws, err := StreamChunkedReadResponses(
 | |
| 				NewChunkedWriter(w, f),
 | |
| 				int64(i),
 | |
| 				// The streaming API has to provide the series sorted.
 | |
| 				querier.Select(ctx, true, hints, filteredMatchers...),
 | |
| 				sortedExternalLabels,
 | |
| 				h.remoteReadMaxBytesInFrame,
 | |
| 				h.marshalPool,
 | |
| 			)
 | |
| 			if err != nil {
 | |
| 				return err
 | |
| 			}
 | |
| 
 | |
| 			for _, w := range ws {
 | |
| 				h.logger.Warn("Warnings on chunked remote read query", "warnings", w.Error())
 | |
| 			}
 | |
| 			return nil
 | |
| 		}(); err != nil {
 | |
| 			var httpErr HTTPError
 | |
| 			if errors.As(err, &httpErr) {
 | |
| 				http.Error(w, httpErr.Error(), httpErr.Status())
 | |
| 				return
 | |
| 			}
 | |
| 			http.Error(w, err.Error(), http.StatusInternalServerError)
 | |
| 			return
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // filterExtLabelsFromMatchers change equality matchers which match external labels
 | |
| // to a matcher that looks for an empty label,
 | |
| // as that label should not be present in the storage.
 | |
| func filterExtLabelsFromMatchers(pbMatchers []*prompb.LabelMatcher, externalLabels map[string]string) ([]*labels.Matcher, error) {
 | |
| 	matchers, err := FromLabelMatchers(pbMatchers)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	filteredMatchers := make([]*labels.Matcher, 0, len(matchers))
 | |
| 	for _, m := range matchers {
 | |
| 		value := externalLabels[m.Name]
 | |
| 		if m.Type == labels.MatchEqual && value == m.Value {
 | |
| 			matcher, err := labels.NewMatcher(labels.MatchEqual, m.Name, "")
 | |
| 			if err != nil {
 | |
| 				return nil, err
 | |
| 			}
 | |
| 			filteredMatchers = append(filteredMatchers, matcher)
 | |
| 		} else {
 | |
| 			filteredMatchers = append(filteredMatchers, m)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return filteredMatchers, nil
 | |
| }
 |