mirror of https://github.com/grafana/grafana.git
				
				
				
			
		
			
				
	
	
		
			364 lines
		
	
	
		
			9.9 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			364 lines
		
	
	
		
			9.9 KiB
		
	
	
	
		
			Go
		
	
	
	
package lokihttp
 | 
						|
 | 
						|
import (
 | 
						|
	"bufio"
 | 
						|
	"bytes"
 | 
						|
	"context"
 | 
						|
	"errors"
 | 
						|
	"fmt"
 | 
						|
	"io"
 | 
						|
	"net/http"
 | 
						|
	"strconv"
 | 
						|
	"sync"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/grafana/dskit/backoff"
 | 
						|
	"github.com/prometheus/client_golang/prometheus"
 | 
						|
	"github.com/prometheus/common/config"
 | 
						|
 | 
						|
	"github.com/grafana/grafana/pkg/infra/log"
 | 
						|
	"github.com/grafana/grafana/pkg/setting"
 | 
						|
)
 | 
						|
 | 
						|
const (
 | 
						|
	contentType  = "application/x-protobuf"
 | 
						|
	maxErrMsgLen = 1024
 | 
						|
 | 
						|
	// Label reserved to override the tenant ID while processing
 | 
						|
	// pipeline stages
 | 
						|
	ReservedLabelTenantID = "__tenant_id__"
 | 
						|
 | 
						|
	LatencyLabel = "filename"
 | 
						|
	HostLabel    = "host"
 | 
						|
)
 | 
						|
 | 
						|
var UserAgent = fmt.Sprintf("grafana/%s", setting.BuildVersion)
 | 
						|
 | 
						|
type metrics struct {
 | 
						|
	encodedBytes     *prometheus.CounterVec
 | 
						|
	sentBytes        *prometheus.CounterVec
 | 
						|
	droppedBytes     *prometheus.CounterVec
 | 
						|
	sentEntries      *prometheus.CounterVec
 | 
						|
	droppedEntries   *prometheus.CounterVec
 | 
						|
	requestDuration  *prometheus.HistogramVec
 | 
						|
	batchRetries     *prometheus.CounterVec
 | 
						|
	countersWithHost []*prometheus.CounterVec
 | 
						|
}
 | 
						|
 | 
						|
func newMetrics(reg prometheus.Registerer) *metrics {
 | 
						|
	var m metrics
 | 
						|
 | 
						|
	m.encodedBytes = prometheus.NewCounterVec(prometheus.CounterOpts{
 | 
						|
		Namespace: "promtail",
 | 
						|
		Name:      "encoded_bytes_total",
 | 
						|
		Help:      "Number of bytes encoded and ready to send.",
 | 
						|
	}, []string{HostLabel})
 | 
						|
	m.sentBytes = prometheus.NewCounterVec(prometheus.CounterOpts{
 | 
						|
		Namespace: "promtail",
 | 
						|
		Name:      "sent_bytes_total",
 | 
						|
		Help:      "Number of bytes sent.",
 | 
						|
	}, []string{HostLabel})
 | 
						|
	m.droppedBytes = prometheus.NewCounterVec(prometheus.CounterOpts{
 | 
						|
		Namespace: "promtail",
 | 
						|
		Name:      "dropped_bytes_total",
 | 
						|
		Help:      "Number of bytes dropped because failed to be sent to the ingester after all retries.",
 | 
						|
	}, []string{HostLabel})
 | 
						|
	m.sentEntries = prometheus.NewCounterVec(prometheus.CounterOpts{
 | 
						|
		Namespace: "promtail",
 | 
						|
		Name:      "sent_entries_total",
 | 
						|
		Help:      "Number of log entries sent to the ingester.",
 | 
						|
	}, []string{HostLabel})
 | 
						|
	m.droppedEntries = prometheus.NewCounterVec(prometheus.CounterOpts{
 | 
						|
		Namespace: "promtail",
 | 
						|
		Name:      "dropped_entries_total",
 | 
						|
		Help:      "Number of log entries dropped because failed to be sent to the ingester after all retries.",
 | 
						|
	}, []string{HostLabel})
 | 
						|
	m.requestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
 | 
						|
		Namespace: "promtail",
 | 
						|
		Name:      "request_duration_seconds",
 | 
						|
		Help:      "Duration of send requests.",
 | 
						|
	}, []string{"status_code", HostLabel})
 | 
						|
	m.batchRetries = prometheus.NewCounterVec(prometheus.CounterOpts{
 | 
						|
		Namespace: "promtail",
 | 
						|
		Name:      "batch_retries_total",
 | 
						|
		Help:      "Number of times batches has had to be retried.",
 | 
						|
	}, []string{HostLabel})
 | 
						|
 | 
						|
	m.countersWithHost = []*prometheus.CounterVec{
 | 
						|
		m.encodedBytes, m.sentBytes, m.droppedBytes, m.sentEntries, m.droppedEntries,
 | 
						|
	}
 | 
						|
 | 
						|
	if reg != nil {
 | 
						|
		m.encodedBytes = mustRegisterOrGet(reg, m.encodedBytes).(*prometheus.CounterVec)
 | 
						|
		m.sentBytes = mustRegisterOrGet(reg, m.sentBytes).(*prometheus.CounterVec)
 | 
						|
		m.droppedBytes = mustRegisterOrGet(reg, m.droppedBytes).(*prometheus.CounterVec)
 | 
						|
		m.sentEntries = mustRegisterOrGet(reg, m.sentEntries).(*prometheus.CounterVec)
 | 
						|
		m.droppedEntries = mustRegisterOrGet(reg, m.droppedEntries).(*prometheus.CounterVec)
 | 
						|
		m.requestDuration = mustRegisterOrGet(reg, m.requestDuration).(*prometheus.HistogramVec)
 | 
						|
		m.batchRetries = mustRegisterOrGet(reg, m.batchRetries).(*prometheus.CounterVec)
 | 
						|
	}
 | 
						|
 | 
						|
	return &m
 | 
						|
}
 | 
						|
 | 
						|
func mustRegisterOrGet(reg prometheus.Registerer, c prometheus.Collector) prometheus.Collector {
 | 
						|
	if err := reg.Register(c); err != nil {
 | 
						|
		promError := prometheus.AlreadyRegisteredError{}
 | 
						|
		if errors.As(err, &promError) {
 | 
						|
			return promError.ExistingCollector
 | 
						|
		}
 | 
						|
		panic(err)
 | 
						|
	}
 | 
						|
	return c
 | 
						|
}
 | 
						|
 | 
						|
// Client pushes entries to Loki and can be stopped
 | 
						|
type Client interface {
 | 
						|
	Chan() chan<- Entry
 | 
						|
 | 
						|
	Stop()
 | 
						|
	StopNow()
 | 
						|
}
 | 
						|
 | 
						|
// Client for pushing logs in snappy-compressed protos over HTTP.
 | 
						|
type client struct {
 | 
						|
	metrics *metrics
 | 
						|
	logger  log.Logger
 | 
						|
	cfg     Config
 | 
						|
	client  *http.Client
 | 
						|
	entries chan Entry
 | 
						|
 | 
						|
	once sync.Once
 | 
						|
	wg   sync.WaitGroup
 | 
						|
 | 
						|
	// ctx is used in any upstream calls from the `client`.
 | 
						|
	ctx    context.Context
 | 
						|
	cancel context.CancelFunc
 | 
						|
}
 | 
						|
 | 
						|
// Tripperware can wrap a roundtripper.
 | 
						|
type Tripperware func(http.RoundTripper) http.RoundTripper
 | 
						|
 | 
						|
// New makes a new Client.
 | 
						|
func New(reg prometheus.Registerer, cfg Config, logger log.Logger) (Client, error) {
 | 
						|
	return newClient(reg, cfg, logger)
 | 
						|
}
 | 
						|
 | 
						|
func newClient(reg prometheus.Registerer, cfg Config, logger log.Logger) (*client, error) {
 | 
						|
	if cfg.URL.URL == nil {
 | 
						|
		return nil, errors.New("client needs target URL")
 | 
						|
	}
 | 
						|
 | 
						|
	ctx, cancel := context.WithCancel(context.Background())
 | 
						|
 | 
						|
	c := &client{
 | 
						|
		logger:  logger.New("host", cfg.URL.Host),
 | 
						|
		cfg:     cfg,
 | 
						|
		entries: make(chan Entry),
 | 
						|
		metrics: newMetrics(reg),
 | 
						|
 | 
						|
		ctx:    ctx,
 | 
						|
		cancel: cancel,
 | 
						|
	}
 | 
						|
 | 
						|
	err := cfg.Client.Validate()
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	c.client, err = config.NewClientFromConfig(cfg.Client, "promtail", config.WithHTTP2Disabled())
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	c.client.Timeout = cfg.Timeout
 | 
						|
 | 
						|
	// Initialize counters to 0 so the metrics are exported before the first
 | 
						|
	// occurrence of incrementing to avoid missing metrics.
 | 
						|
	for _, counter := range c.metrics.countersWithHost {
 | 
						|
		counter.WithLabelValues(c.cfg.URL.Host).Add(0)
 | 
						|
	}
 | 
						|
 | 
						|
	c.wg.Add(1)
 | 
						|
	go c.run()
 | 
						|
	return c, nil
 | 
						|
}
 | 
						|
 | 
						|
// NewWithTripperware creates a new Loki client with a custom tripperware.
 | 
						|
func NewWithTripperware(reg prometheus.Registerer, cfg Config, logger log.Logger, tp Tripperware) (Client, error) {
 | 
						|
	c, err := newClient(reg, cfg, logger)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	if tp != nil {
 | 
						|
		c.client.Transport = tp(c.client.Transport)
 | 
						|
	}
 | 
						|
 | 
						|
	return c, nil
 | 
						|
}
 | 
						|
 | 
						|
func (c *client) run() {
 | 
						|
	batches := map[string]*batch{}
 | 
						|
 | 
						|
	// Given the client handles multiple batches (1 per tenant) and each batch
 | 
						|
	// can be created at a different point in time, we look for batches whose
 | 
						|
	// max wait time has been reached every 10 times per BatchWait, so that the
 | 
						|
	// maximum delay we have sending batches is 10% of the max waiting time.
 | 
						|
	// We apply a cap of 10ms to the ticker, to avoid too frequent checks in
 | 
						|
	// case the BatchWait is very low.
 | 
						|
	minWaitCheckFrequency := 10 * time.Millisecond
 | 
						|
	maxWaitCheckFrequency := c.cfg.BatchWait / 10
 | 
						|
	if maxWaitCheckFrequency < minWaitCheckFrequency {
 | 
						|
		maxWaitCheckFrequency = minWaitCheckFrequency
 | 
						|
	}
 | 
						|
 | 
						|
	maxWaitCheck := time.NewTicker(maxWaitCheckFrequency)
 | 
						|
 | 
						|
	defer func() {
 | 
						|
		maxWaitCheck.Stop()
 | 
						|
		// Send all pending batches
 | 
						|
		for tenantID, batch := range batches {
 | 
						|
			c.sendBatch(tenantID, batch)
 | 
						|
		}
 | 
						|
 | 
						|
		c.wg.Done()
 | 
						|
	}()
 | 
						|
 | 
						|
	for {
 | 
						|
		select {
 | 
						|
		case e, ok := <-c.entries:
 | 
						|
			if !ok {
 | 
						|
				return
 | 
						|
			}
 | 
						|
			tenantID := ""
 | 
						|
			batch, ok := batches[tenantID]
 | 
						|
 | 
						|
			// If the batch doesn't exist yet, we create a new one with the entry
 | 
						|
			if !ok {
 | 
						|
				batches[tenantID] = newBatch(e)
 | 
						|
				break
 | 
						|
			}
 | 
						|
 | 
						|
			// If adding the entry to the batch will increase the size over the max
 | 
						|
			// size allowed, we do send the current batch and then create a new one
 | 
						|
			if batch.sizeBytesAfter(e) > c.cfg.BatchSize {
 | 
						|
				c.sendBatch(tenantID, batch)
 | 
						|
 | 
						|
				batches[tenantID] = newBatch(e)
 | 
						|
				break
 | 
						|
			}
 | 
						|
 | 
						|
			// The max size of the batch isn't reached, so we can add the entry
 | 
						|
			batch.add(e)
 | 
						|
 | 
						|
		case <-maxWaitCheck.C:
 | 
						|
			// Send all batches whose max wait time has been reached
 | 
						|
			for tenantID, batch := range batches {
 | 
						|
				if batch.age() < c.cfg.BatchWait {
 | 
						|
					continue
 | 
						|
				}
 | 
						|
 | 
						|
				c.sendBatch(tenantID, batch)
 | 
						|
				delete(batches, tenantID)
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (c *client) Chan() chan<- Entry {
 | 
						|
	return c.entries
 | 
						|
}
 | 
						|
 | 
						|
func (c *client) sendBatch(tenantID string, batch *batch) {
 | 
						|
	buf, entriesCount, err := batch.encode()
 | 
						|
	if err != nil {
 | 
						|
		c.logger.Error("error encoding batch", "error", err)
 | 
						|
		return
 | 
						|
	}
 | 
						|
	bufBytes := float64(len(buf))
 | 
						|
	c.metrics.encodedBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes)
 | 
						|
 | 
						|
	backoff := backoff.New(c.ctx, c.cfg.BackoffConfig)
 | 
						|
	var status int
 | 
						|
	for {
 | 
						|
		start := time.Now()
 | 
						|
		// send uses `timeout` internally, so `context.Background` is good enough.
 | 
						|
		status, err = c.send(context.Background(), tenantID, buf)
 | 
						|
 | 
						|
		c.metrics.requestDuration.WithLabelValues(strconv.Itoa(status), c.cfg.URL.Host).Observe(time.Since(start).Seconds())
 | 
						|
 | 
						|
		// Only retry 429s, 500s and connection-level errors.
 | 
						|
		if status > 0 && status != 429 && status/100 != 5 {
 | 
						|
			break
 | 
						|
		}
 | 
						|
 | 
						|
		c.logger.Warn("error sending batch, will retry", "status", status, "error", err)
 | 
						|
		c.metrics.batchRetries.WithLabelValues(c.cfg.URL.Host).Inc()
 | 
						|
		backoff.Wait()
 | 
						|
 | 
						|
		// Make sure it sends at least once before checking for retry.
 | 
						|
		if !backoff.Ongoing() {
 | 
						|
			break
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	if err != nil {
 | 
						|
		c.logger.Error("final error sendnig batch", "status", status, "error")
 | 
						|
		c.metrics.droppedBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes)
 | 
						|
		c.metrics.droppedEntries.WithLabelValues(c.cfg.URL.Host).Add(float64(entriesCount))
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (c *client) send(ctx context.Context, tenantID string, buf []byte) (int, error) {
 | 
						|
	ctx, cancel := context.WithTimeout(ctx, c.cfg.Timeout)
 | 
						|
	defer cancel()
 | 
						|
	req, err := http.NewRequest("POST", c.cfg.URL.String(), bytes.NewReader(buf))
 | 
						|
	if err != nil {
 | 
						|
		return -1, err
 | 
						|
	}
 | 
						|
	req = req.WithContext(ctx)
 | 
						|
	req.Header.Set("Content-Type", contentType)
 | 
						|
	req.Header.Set("User-Agent", UserAgent)
 | 
						|
 | 
						|
	// If the tenant ID is not empty promtail is running in multi-tenant mode, so
 | 
						|
	// we should send it to Loki
 | 
						|
	if tenantID != "" {
 | 
						|
		req.Header.Set("X-Scope-OrgID", tenantID)
 | 
						|
	}
 | 
						|
 | 
						|
	resp, err := c.client.Do(req)
 | 
						|
	if err != nil {
 | 
						|
		return -1, err
 | 
						|
	}
 | 
						|
	defer func() {
 | 
						|
		err := resp.Body.Close()
 | 
						|
		if err != nil {
 | 
						|
			c.logger.Error("closing response body", "error", err)
 | 
						|
		}
 | 
						|
	}()
 | 
						|
 | 
						|
	if resp.StatusCode/100 != 2 {
 | 
						|
		scanner := bufio.NewScanner(io.LimitReader(resp.Body, maxErrMsgLen))
 | 
						|
		line := ""
 | 
						|
		if scanner.Scan() {
 | 
						|
			line = scanner.Text()
 | 
						|
		}
 | 
						|
		err = fmt.Errorf("server returned HTTP status %s (%d): %s", resp.Status, resp.StatusCode, line)
 | 
						|
	}
 | 
						|
	return resp.StatusCode, err
 | 
						|
}
 | 
						|
 | 
						|
// Stop the client.
 | 
						|
func (c *client) Stop() {
 | 
						|
	c.once.Do(func() { close(c.entries) })
 | 
						|
	c.wg.Wait()
 | 
						|
}
 | 
						|
 | 
						|
// StopNow stops the client without retries
 | 
						|
func (c *client) StopNow() {
 | 
						|
	// cancel will stop retrying http requests.
 | 
						|
	c.cancel()
 | 
						|
	c.Stop()
 | 
						|
}
 |