1567 lines
		
	
	
		
			43 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			1567 lines
		
	
	
		
			43 KiB
		
	
	
	
		
			Go
		
	
	
	
| // Copyright 2021 The Prometheus Authors
 | |
| // Licensed under the Apache License, Version 2.0 (the "License");
 | |
| // you may not use this file except in compliance with the License.
 | |
| // You may obtain a copy of the License at
 | |
| //
 | |
| // http://www.apache.org/licenses/LICENSE-2.0
 | |
| //
 | |
| // Unless required by applicable law or agreed to in writing, software
 | |
| // distributed under the License is distributed on an "AS IS" BASIS,
 | |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| // See the License for the specific language governing permissions and
 | |
| // limitations under the License.
 | |
| 
 | |
| package tsdb
 | |
| 
 | |
| import (
 | |
| 	"fmt"
 | |
| 	"math"
 | |
| 	"os"
 | |
| 	"path/filepath"
 | |
| 	"runtime"
 | |
| 	"strconv"
 | |
| 	"strings"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/go-kit/log/level"
 | |
| 	"github.com/pkg/errors"
 | |
| 	"go.uber.org/atomic"
 | |
| 
 | |
| 	"github.com/prometheus/prometheus/model/exemplar"
 | |
| 	"github.com/prometheus/prometheus/model/histogram"
 | |
| 	"github.com/prometheus/prometheus/model/labels"
 | |
| 	"github.com/prometheus/prometheus/model/metadata"
 | |
| 	"github.com/prometheus/prometheus/storage"
 | |
| 	"github.com/prometheus/prometheus/tsdb/chunkenc"
 | |
| 	"github.com/prometheus/prometheus/tsdb/chunks"
 | |
| 	"github.com/prometheus/prometheus/tsdb/encoding"
 | |
| 	tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
 | |
| 	"github.com/prometheus/prometheus/tsdb/fileutil"
 | |
| 	"github.com/prometheus/prometheus/tsdb/record"
 | |
| 	"github.com/prometheus/prometheus/tsdb/tombstones"
 | |
| 	"github.com/prometheus/prometheus/tsdb/wlog"
 | |
| )
 | |
| 
 | |
| // histogramRecord combines both RefHistogramSample and RefFloatHistogramSample
 | |
| // to simplify the WAL replay.
 | |
| type histogramRecord struct {
 | |
| 	ref chunks.HeadSeriesRef
 | |
| 	t   int64
 | |
| 	h   *histogram.Histogram
 | |
| 	fh  *histogram.FloatHistogram
 | |
| }
 | |
| 
 | |
| func (h *Head) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, mmappedChunks, oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk) (err error) {
 | |
| 	// Track number of samples that referenced a series we don't know about
 | |
| 	// for error reporting.
 | |
| 	var unknownRefs atomic.Uint64
 | |
| 	var unknownExemplarRefs atomic.Uint64
 | |
| 	var unknownHistogramRefs atomic.Uint64
 | |
| 	var unknownMetadataRefs atomic.Uint64
 | |
| 	// Track number of series records that had overlapping m-map chunks.
 | |
| 	var mmapOverlappingChunks atomic.Uint64
 | |
| 
 | |
| 	// Start workers that each process samples for a partition of the series ID space.
 | |
| 	var (
 | |
| 		wg             sync.WaitGroup
 | |
| 		n              = runtime.GOMAXPROCS(0)
 | |
| 		processors     = make([]walSubsetProcessor, n)
 | |
| 		exemplarsInput chan record.RefExemplar
 | |
| 
 | |
| 		dec             record.Decoder
 | |
| 		shards          = make([][]record.RefSample, n)
 | |
| 		histogramShards = make([][]histogramRecord, n)
 | |
| 
 | |
| 		decoded                      = make(chan interface{}, 10)
 | |
| 		decodeErr, seriesCreationErr error
 | |
| 		seriesPool                   = sync.Pool{
 | |
| 			New: func() interface{} {
 | |
| 				return []record.RefSeries{}
 | |
| 			},
 | |
| 		}
 | |
| 		samplesPool = sync.Pool{
 | |
| 			New: func() interface{} {
 | |
| 				return []record.RefSample{}
 | |
| 			},
 | |
| 		}
 | |
| 		tstonesPool = sync.Pool{
 | |
| 			New: func() interface{} {
 | |
| 				return []tombstones.Stone{}
 | |
| 			},
 | |
| 		}
 | |
| 		exemplarsPool = sync.Pool{
 | |
| 			New: func() interface{} {
 | |
| 				return []record.RefExemplar{}
 | |
| 			},
 | |
| 		}
 | |
| 		histogramsPool = sync.Pool{
 | |
| 			New: func() interface{} {
 | |
| 				return []record.RefHistogramSample{}
 | |
| 			},
 | |
| 		}
 | |
| 		floatHistogramsPool = sync.Pool{
 | |
| 			New: func() interface{} {
 | |
| 				return []record.RefFloatHistogramSample{}
 | |
| 			},
 | |
| 		}
 | |
| 		metadataPool = sync.Pool{
 | |
| 			New: func() interface{} {
 | |
| 				return []record.RefMetadata{}
 | |
| 			},
 | |
| 		}
 | |
| 	)
 | |
| 
 | |
| 	defer func() {
 | |
| 		// For CorruptionErr ensure to terminate all workers before exiting.
 | |
| 		_, ok := err.(*wlog.CorruptionErr)
 | |
| 		if ok || seriesCreationErr != nil {
 | |
| 			for i := 0; i < n; i++ {
 | |
| 				processors[i].closeAndDrain()
 | |
| 			}
 | |
| 			close(exemplarsInput)
 | |
| 			wg.Wait()
 | |
| 		}
 | |
| 	}()
 | |
| 
 | |
| 	wg.Add(n)
 | |
| 	for i := 0; i < n; i++ {
 | |
| 		processors[i].setup()
 | |
| 
 | |
| 		go func(wp *walSubsetProcessor) {
 | |
| 			unknown, unknownHistograms, overlapping := wp.processWALSamples(h, mmappedChunks, oooMmappedChunks)
 | |
| 			unknownRefs.Add(unknown)
 | |
| 			mmapOverlappingChunks.Add(overlapping)
 | |
| 			unknownHistogramRefs.Add(unknownHistograms)
 | |
| 			wg.Done()
 | |
| 		}(&processors[i])
 | |
| 	}
 | |
| 
 | |
| 	wg.Add(1)
 | |
| 	exemplarsInput = make(chan record.RefExemplar, 300)
 | |
| 	go func(input <-chan record.RefExemplar) {
 | |
| 		var err error
 | |
| 		defer wg.Done()
 | |
| 		for e := range input {
 | |
| 			ms := h.series.getByID(e.Ref)
 | |
| 			if ms == nil {
 | |
| 				unknownExemplarRefs.Inc()
 | |
| 				continue
 | |
| 			}
 | |
| 
 | |
| 			if e.T < h.minValidTime.Load() {
 | |
| 				continue
 | |
| 			}
 | |
| 			// At the moment the only possible error here is out of order exemplars, which we shouldn't see when
 | |
| 			// replaying the WAL, so lets just log the error if it's not that type.
 | |
| 			err = h.exemplars.AddExemplar(ms.lset, exemplar.Exemplar{Ts: e.T, Value: e.V, Labels: e.Labels})
 | |
| 			if err != nil && err == storage.ErrOutOfOrderExemplar {
 | |
| 				level.Warn(h.logger).Log("msg", "Unexpected error when replaying WAL on exemplar record", "err", err)
 | |
| 			}
 | |
| 		}
 | |
| 	}(exemplarsInput)
 | |
| 
 | |
| 	go func() {
 | |
| 		defer close(decoded)
 | |
| 		var err error
 | |
| 		for r.Next() {
 | |
| 			rec := r.Record()
 | |
| 			switch dec.Type(rec) {
 | |
| 			case record.Series:
 | |
| 				series := seriesPool.Get().([]record.RefSeries)[:0]
 | |
| 				series, err = dec.Series(rec, series)
 | |
| 				if err != nil {
 | |
| 					decodeErr = &wlog.CorruptionErr{
 | |
| 						Err:     errors.Wrap(err, "decode series"),
 | |
| 						Segment: r.Segment(),
 | |
| 						Offset:  r.Offset(),
 | |
| 					}
 | |
| 					return
 | |
| 				}
 | |
| 				decoded <- series
 | |
| 			case record.Samples:
 | |
| 				samples := samplesPool.Get().([]record.RefSample)[:0]
 | |
| 				samples, err = dec.Samples(rec, samples)
 | |
| 				if err != nil {
 | |
| 					decodeErr = &wlog.CorruptionErr{
 | |
| 						Err:     errors.Wrap(err, "decode samples"),
 | |
| 						Segment: r.Segment(),
 | |
| 						Offset:  r.Offset(),
 | |
| 					}
 | |
| 					return
 | |
| 				}
 | |
| 				decoded <- samples
 | |
| 			case record.Tombstones:
 | |
| 				tstones := tstonesPool.Get().([]tombstones.Stone)[:0]
 | |
| 				tstones, err = dec.Tombstones(rec, tstones)
 | |
| 				if err != nil {
 | |
| 					decodeErr = &wlog.CorruptionErr{
 | |
| 						Err:     errors.Wrap(err, "decode tombstones"),
 | |
| 						Segment: r.Segment(),
 | |
| 						Offset:  r.Offset(),
 | |
| 					}
 | |
| 					return
 | |
| 				}
 | |
| 				decoded <- tstones
 | |
| 			case record.Exemplars:
 | |
| 				exemplars := exemplarsPool.Get().([]record.RefExemplar)[:0]
 | |
| 				exemplars, err = dec.Exemplars(rec, exemplars)
 | |
| 				if err != nil {
 | |
| 					decodeErr = &wlog.CorruptionErr{
 | |
| 						Err:     errors.Wrap(err, "decode exemplars"),
 | |
| 						Segment: r.Segment(),
 | |
| 						Offset:  r.Offset(),
 | |
| 					}
 | |
| 					return
 | |
| 				}
 | |
| 				decoded <- exemplars
 | |
| 			case record.HistogramSamples:
 | |
| 				hists := histogramsPool.Get().([]record.RefHistogramSample)[:0]
 | |
| 				hists, err = dec.HistogramSamples(rec, hists)
 | |
| 				if err != nil {
 | |
| 					decodeErr = &wlog.CorruptionErr{
 | |
| 						Err:     errors.Wrap(err, "decode histograms"),
 | |
| 						Segment: r.Segment(),
 | |
| 						Offset:  r.Offset(),
 | |
| 					}
 | |
| 					return
 | |
| 				}
 | |
| 				decoded <- hists
 | |
| 			case record.FloatHistogramSamples:
 | |
| 				hists := floatHistogramsPool.Get().([]record.RefFloatHistogramSample)[:0]
 | |
| 				hists, err = dec.FloatHistogramSamples(rec, hists)
 | |
| 				if err != nil {
 | |
| 					decodeErr = &wlog.CorruptionErr{
 | |
| 						Err:     errors.Wrap(err, "decode float histograms"),
 | |
| 						Segment: r.Segment(),
 | |
| 						Offset:  r.Offset(),
 | |
| 					}
 | |
| 					return
 | |
| 				}
 | |
| 				decoded <- hists
 | |
| 			case record.Metadata:
 | |
| 				meta := metadataPool.Get().([]record.RefMetadata)[:0]
 | |
| 				meta, err := dec.Metadata(rec, meta)
 | |
| 				if err != nil {
 | |
| 					decodeErr = &wlog.CorruptionErr{
 | |
| 						Err:     errors.Wrap(err, "decode metadata"),
 | |
| 						Segment: r.Segment(),
 | |
| 						Offset:  r.Offset(),
 | |
| 					}
 | |
| 					return
 | |
| 				}
 | |
| 				decoded <- meta
 | |
| 			default:
 | |
| 				// Noop.
 | |
| 			}
 | |
| 		}
 | |
| 	}()
 | |
| 
 | |
| 	// The records are always replayed from the oldest to the newest.
 | |
| Outer:
 | |
| 	for d := range decoded {
 | |
| 		switch v := d.(type) {
 | |
| 		case []record.RefSeries:
 | |
| 			for _, walSeries := range v {
 | |
| 				mSeries, created, err := h.getOrCreateWithID(walSeries.Ref, walSeries.Labels.Hash(), walSeries.Labels)
 | |
| 				if err != nil {
 | |
| 					seriesCreationErr = err
 | |
| 					break Outer
 | |
| 				}
 | |
| 
 | |
| 				if chunks.HeadSeriesRef(h.lastSeriesID.Load()) < walSeries.Ref {
 | |
| 					h.lastSeriesID.Store(uint64(walSeries.Ref))
 | |
| 				}
 | |
| 				if !created {
 | |
| 					multiRef[walSeries.Ref] = mSeries.ref
 | |
| 				}
 | |
| 
 | |
| 				idx := uint64(mSeries.ref) % uint64(n)
 | |
| 				processors[idx].input <- walSubsetProcessorInputItem{walSeriesRef: walSeries.Ref, existingSeries: mSeries}
 | |
| 			}
 | |
| 			//nolint:staticcheck // Ignore SA6002 relax staticcheck verification.
 | |
| 			seriesPool.Put(v)
 | |
| 		case []record.RefSample:
 | |
| 			samples := v
 | |
| 			minValidTime := h.minValidTime.Load()
 | |
| 			// We split up the samples into chunks of 5000 samples or less.
 | |
| 			// With O(300 * #cores) in-flight sample batches, large scrapes could otherwise
 | |
| 			// cause thousands of very large in flight buffers occupying large amounts
 | |
| 			// of unused memory.
 | |
| 			for len(samples) > 0 {
 | |
| 				m := 5000
 | |
| 				if len(samples) < m {
 | |
| 					m = len(samples)
 | |
| 				}
 | |
| 				for i := 0; i < n; i++ {
 | |
| 					if shards[i] == nil {
 | |
| 						shards[i] = processors[i].reuseBuf()
 | |
| 					}
 | |
| 				}
 | |
| 				for _, sam := range samples[:m] {
 | |
| 					if sam.T < minValidTime {
 | |
| 						continue // Before minValidTime: discard.
 | |
| 					}
 | |
| 					if r, ok := multiRef[sam.Ref]; ok {
 | |
| 						sam.Ref = r
 | |
| 					}
 | |
| 					mod := uint64(sam.Ref) % uint64(n)
 | |
| 					shards[mod] = append(shards[mod], sam)
 | |
| 				}
 | |
| 				for i := 0; i < n; i++ {
 | |
| 					if len(shards[i]) > 0 {
 | |
| 						processors[i].input <- walSubsetProcessorInputItem{samples: shards[i]}
 | |
| 						shards[i] = nil
 | |
| 					}
 | |
| 				}
 | |
| 				samples = samples[m:]
 | |
| 			}
 | |
| 			//nolint:staticcheck // Ignore SA6002 relax staticcheck verification.
 | |
| 			samplesPool.Put(v)
 | |
| 		case []tombstones.Stone:
 | |
| 			for _, s := range v {
 | |
| 				for _, itv := range s.Intervals {
 | |
| 					if itv.Maxt < h.minValidTime.Load() {
 | |
| 						continue
 | |
| 					}
 | |
| 					if m := h.series.getByID(chunks.HeadSeriesRef(s.Ref)); m == nil {
 | |
| 						unknownRefs.Inc()
 | |
| 						continue
 | |
| 					}
 | |
| 					h.tombstones.AddInterval(storage.SeriesRef(s.Ref), itv)
 | |
| 				}
 | |
| 			}
 | |
| 			//nolint:staticcheck // Ignore SA6002 relax staticcheck verification.
 | |
| 			tstonesPool.Put(v)
 | |
| 		case []record.RefExemplar:
 | |
| 			for _, e := range v {
 | |
| 				exemplarsInput <- e
 | |
| 			}
 | |
| 			//nolint:staticcheck // Ignore SA6002 relax staticcheck verification.
 | |
| 			exemplarsPool.Put(v)
 | |
| 		case []record.RefHistogramSample:
 | |
| 			samples := v
 | |
| 			minValidTime := h.minValidTime.Load()
 | |
| 			// We split up the samples into chunks of 5000 samples or less.
 | |
| 			// With O(300 * #cores) in-flight sample batches, large scrapes could otherwise
 | |
| 			// cause thousands of very large in flight buffers occupying large amounts
 | |
| 			// of unused memory.
 | |
| 			for len(samples) > 0 {
 | |
| 				m := 5000
 | |
| 				if len(samples) < m {
 | |
| 					m = len(samples)
 | |
| 				}
 | |
| 				for i := 0; i < n; i++ {
 | |
| 					if histogramShards[i] == nil {
 | |
| 						histogramShards[i] = processors[i].reuseHistogramBuf()
 | |
| 					}
 | |
| 				}
 | |
| 				for _, sam := range samples[:m] {
 | |
| 					if sam.T < minValidTime {
 | |
| 						continue // Before minValidTime: discard.
 | |
| 					}
 | |
| 					if r, ok := multiRef[sam.Ref]; ok {
 | |
| 						sam.Ref = r
 | |
| 					}
 | |
| 					mod := uint64(sam.Ref) % uint64(n)
 | |
| 					histogramShards[mod] = append(histogramShards[mod], histogramRecord{ref: sam.Ref, t: sam.T, h: sam.H})
 | |
| 				}
 | |
| 				for i := 0; i < n; i++ {
 | |
| 					if len(histogramShards[i]) > 0 {
 | |
| 						processors[i].input <- walSubsetProcessorInputItem{histogramSamples: histogramShards[i]}
 | |
| 						histogramShards[i] = nil
 | |
| 					}
 | |
| 				}
 | |
| 				samples = samples[m:]
 | |
| 			}
 | |
| 			//nolint:staticcheck // Ignore SA6002 relax staticcheck verification.
 | |
| 			histogramsPool.Put(v)
 | |
| 		case []record.RefFloatHistogramSample:
 | |
| 			samples := v
 | |
| 			minValidTime := h.minValidTime.Load()
 | |
| 			// We split up the samples into chunks of 5000 samples or less.
 | |
| 			// With O(300 * #cores) in-flight sample batches, large scrapes could otherwise
 | |
| 			// cause thousands of very large in flight buffers occupying large amounts
 | |
| 			// of unused memory.
 | |
| 			for len(samples) > 0 {
 | |
| 				m := 5000
 | |
| 				if len(samples) < m {
 | |
| 					m = len(samples)
 | |
| 				}
 | |
| 				for i := 0; i < n; i++ {
 | |
| 					if histogramShards[i] == nil {
 | |
| 						histogramShards[i] = processors[i].reuseHistogramBuf()
 | |
| 					}
 | |
| 				}
 | |
| 				for _, sam := range samples[:m] {
 | |
| 					if sam.T < minValidTime {
 | |
| 						continue // Before minValidTime: discard.
 | |
| 					}
 | |
| 					if r, ok := multiRef[sam.Ref]; ok {
 | |
| 						sam.Ref = r
 | |
| 					}
 | |
| 					mod := uint64(sam.Ref) % uint64(n)
 | |
| 					histogramShards[mod] = append(histogramShards[mod], histogramRecord{ref: sam.Ref, t: sam.T, fh: sam.FH})
 | |
| 				}
 | |
| 				for i := 0; i < n; i++ {
 | |
| 					if len(histogramShards[i]) > 0 {
 | |
| 						processors[i].input <- walSubsetProcessorInputItem{histogramSamples: histogramShards[i]}
 | |
| 						histogramShards[i] = nil
 | |
| 					}
 | |
| 				}
 | |
| 				samples = samples[m:]
 | |
| 			}
 | |
| 			//nolint:staticcheck // Ignore SA6002 relax staticcheck verification.
 | |
| 			floatHistogramsPool.Put(v)
 | |
| 		case []record.RefMetadata:
 | |
| 			for _, m := range v {
 | |
| 				s := h.series.getByID(chunks.HeadSeriesRef(m.Ref))
 | |
| 				if s == nil {
 | |
| 					unknownMetadataRefs.Inc()
 | |
| 					continue
 | |
| 				}
 | |
| 				s.meta = &metadata.Metadata{
 | |
| 					Type: record.ToTextparseMetricType(m.Type),
 | |
| 					Unit: m.Unit,
 | |
| 					Help: m.Help,
 | |
| 				}
 | |
| 			}
 | |
| 			//nolint:staticcheck // Ignore SA6002 relax staticcheck verification.
 | |
| 			metadataPool.Put(v)
 | |
| 		default:
 | |
| 			panic(fmt.Errorf("unexpected decoded type: %T", d))
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if decodeErr != nil {
 | |
| 		return decodeErr
 | |
| 	}
 | |
| 	if seriesCreationErr != nil {
 | |
| 		// Drain the channel to unblock the goroutine.
 | |
| 		for range decoded {
 | |
| 		}
 | |
| 		return seriesCreationErr
 | |
| 	}
 | |
| 
 | |
| 	// Signal termination to each worker and wait for it to close its output channel.
 | |
| 	for i := 0; i < n; i++ {
 | |
| 		processors[i].closeAndDrain()
 | |
| 	}
 | |
| 	close(exemplarsInput)
 | |
| 	wg.Wait()
 | |
| 
 | |
| 	if r.Err() != nil {
 | |
| 		return errors.Wrap(r.Err(), "read records")
 | |
| 	}
 | |
| 
 | |
| 	if unknownRefs.Load()+unknownExemplarRefs.Load()+unknownHistogramRefs.Load()+unknownMetadataRefs.Load() > 0 {
 | |
| 		level.Warn(h.logger).Log(
 | |
| 			"msg", "Unknown series references",
 | |
| 			"samples", unknownRefs.Load(),
 | |
| 			"exemplars", unknownExemplarRefs.Load(),
 | |
| 			"histograms", unknownHistogramRefs.Load(),
 | |
| 			"metadata", unknownMetadataRefs.Load(),
 | |
| 		)
 | |
| 	}
 | |
| 	if count := mmapOverlappingChunks.Load(); count > 0 {
 | |
| 		level.Info(h.logger).Log("msg", "Overlapping m-map chunks on duplicate series records", "count", count)
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // resetSeriesWithMMappedChunks is only used during the WAL replay.
 | |
| func (h *Head) resetSeriesWithMMappedChunks(mSeries *memSeries, mmc, oooMmc []*mmappedChunk, walSeriesRef chunks.HeadSeriesRef) (overlapped bool) {
 | |
| 	if mSeries.ref != walSeriesRef {
 | |
| 		// Checking if the new m-mapped chunks overlap with the already existing ones.
 | |
| 		if len(mSeries.mmappedChunks) > 0 && len(mmc) > 0 {
 | |
| 			if overlapsClosedInterval(
 | |
| 				mSeries.mmappedChunks[0].minTime,
 | |
| 				mSeries.mmappedChunks[len(mSeries.mmappedChunks)-1].maxTime,
 | |
| 				mmc[0].minTime,
 | |
| 				mmc[len(mmc)-1].maxTime,
 | |
| 			) {
 | |
| 				level.Debug(h.logger).Log(
 | |
| 					"msg", "M-mapped chunks overlap on a duplicate series record",
 | |
| 					"series", mSeries.lset.String(),
 | |
| 					"oldref", mSeries.ref,
 | |
| 					"oldmint", mSeries.mmappedChunks[0].minTime,
 | |
| 					"oldmaxt", mSeries.mmappedChunks[len(mSeries.mmappedChunks)-1].maxTime,
 | |
| 					"newref", walSeriesRef,
 | |
| 					"newmint", mmc[0].minTime,
 | |
| 					"newmaxt", mmc[len(mmc)-1].maxTime,
 | |
| 				)
 | |
| 				overlapped = true
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	h.metrics.chunksCreated.Add(float64(len(mmc) + len(oooMmc)))
 | |
| 	h.metrics.chunksRemoved.Add(float64(len(mSeries.mmappedChunks)))
 | |
| 	h.metrics.chunks.Add(float64(len(mmc) + len(oooMmc) - len(mSeries.mmappedChunks)))
 | |
| 
 | |
| 	if mSeries.ooo != nil {
 | |
| 		h.metrics.chunksRemoved.Add(float64(len(mSeries.ooo.oooMmappedChunks)))
 | |
| 		h.metrics.chunks.Sub(float64(len(mSeries.ooo.oooMmappedChunks)))
 | |
| 	}
 | |
| 
 | |
| 	mSeries.mmappedChunks = mmc
 | |
| 	if len(oooMmc) == 0 {
 | |
| 		mSeries.ooo = nil
 | |
| 	} else {
 | |
| 		if mSeries.ooo == nil {
 | |
| 			mSeries.ooo = &memSeriesOOOFields{}
 | |
| 		}
 | |
| 		*mSeries.ooo = memSeriesOOOFields{oooMmappedChunks: oooMmc}
 | |
| 	}
 | |
| 	// Cache the last mmapped chunk time, so we can skip calling append() for samples it will reject.
 | |
| 	if len(mmc) == 0 {
 | |
| 		mSeries.mmMaxTime = math.MinInt64
 | |
| 	} else {
 | |
| 		mSeries.mmMaxTime = mmc[len(mmc)-1].maxTime
 | |
| 		h.updateMinMaxTime(mmc[0].minTime, mSeries.mmMaxTime)
 | |
| 	}
 | |
| 	if len(oooMmc) != 0 {
 | |
| 		// Mint and maxt can be in any chunk, they are not sorted.
 | |
| 		mint, maxt := int64(math.MaxInt64), int64(math.MinInt64)
 | |
| 		for _, ch := range oooMmc {
 | |
| 			if ch.minTime < mint {
 | |
| 				mint = ch.minTime
 | |
| 			}
 | |
| 			if ch.maxTime > maxt {
 | |
| 				maxt = ch.maxTime
 | |
| 			}
 | |
| 		}
 | |
| 		h.updateMinOOOMaxOOOTime(mint, maxt)
 | |
| 	}
 | |
| 
 | |
| 	// Any samples replayed till now would already be compacted. Resetting the head chunk.
 | |
| 	mSeries.nextAt = 0
 | |
| 	mSeries.headChunk = nil
 | |
| 	mSeries.app = nil
 | |
| 	return
 | |
| }
 | |
| 
 | |
| type walSubsetProcessor struct {
 | |
| 	input            chan walSubsetProcessorInputItem
 | |
| 	output           chan []record.RefSample
 | |
| 	histogramsOutput chan []histogramRecord
 | |
| }
 | |
| 
 | |
| type walSubsetProcessorInputItem struct {
 | |
| 	samples          []record.RefSample
 | |
| 	histogramSamples []histogramRecord
 | |
| 	existingSeries   *memSeries
 | |
| 	walSeriesRef     chunks.HeadSeriesRef
 | |
| }
 | |
| 
 | |
| func (wp *walSubsetProcessor) setup() {
 | |
| 	wp.input = make(chan walSubsetProcessorInputItem, 300)
 | |
| 	wp.output = make(chan []record.RefSample, 300)
 | |
| 	wp.histogramsOutput = make(chan []histogramRecord, 300)
 | |
| }
 | |
| 
 | |
| func (wp *walSubsetProcessor) closeAndDrain() {
 | |
| 	close(wp.input)
 | |
| 	for range wp.output {
 | |
| 	}
 | |
| 	for range wp.histogramsOutput {
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // If there is a buffer in the output chan, return it for reuse, otherwise return nil.
 | |
| func (wp *walSubsetProcessor) reuseBuf() []record.RefSample {
 | |
| 	select {
 | |
| 	case buf := <-wp.output:
 | |
| 		return buf[:0]
 | |
| 	default:
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // If there is a buffer in the output chan, return it for reuse, otherwise return nil.
 | |
| func (wp *walSubsetProcessor) reuseHistogramBuf() []histogramRecord {
 | |
| 	select {
 | |
| 	case buf := <-wp.histogramsOutput:
 | |
| 		return buf[:0]
 | |
| 	default:
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // processWALSamples adds the samples it receives to the head and passes
 | |
| // the buffer received to an output channel for reuse.
 | |
| // Samples before the minValidTime timestamp are discarded.
 | |
| func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk) (unknownRefs, unknownHistogramRefs, mmapOverlappingChunks uint64) {
 | |
| 	defer close(wp.output)
 | |
| 	defer close(wp.histogramsOutput)
 | |
| 
 | |
| 	minValidTime := h.minValidTime.Load()
 | |
| 	mint, maxt := int64(math.MaxInt64), int64(math.MinInt64)
 | |
| 	chunkRange := h.chunkRange.Load()
 | |
| 
 | |
| 	for in := range wp.input {
 | |
| 		if in.existingSeries != nil {
 | |
| 			mmc := mmappedChunks[in.walSeriesRef]
 | |
| 			oooMmc := oooMmappedChunks[in.walSeriesRef]
 | |
| 			if h.resetSeriesWithMMappedChunks(in.existingSeries, mmc, oooMmc, in.walSeriesRef) {
 | |
| 				mmapOverlappingChunks++
 | |
| 			}
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		for _, s := range in.samples {
 | |
| 			ms := h.series.getByID(s.Ref)
 | |
| 			if ms == nil {
 | |
| 				unknownRefs++
 | |
| 				continue
 | |
| 			}
 | |
| 			if s.T <= ms.mmMaxTime {
 | |
| 				continue
 | |
| 			}
 | |
| 			if s.T <= ms.mmMaxTime {
 | |
| 				continue
 | |
| 			}
 | |
| 			if _, chunkCreated := ms.append(s.T, s.V, 0, h.chunkDiskMapper, chunkRange); chunkCreated {
 | |
| 				h.metrics.chunksCreated.Inc()
 | |
| 				h.metrics.chunks.Inc()
 | |
| 			}
 | |
| 			if s.T > maxt {
 | |
| 				maxt = s.T
 | |
| 			}
 | |
| 			if s.T < mint {
 | |
| 				mint = s.T
 | |
| 			}
 | |
| 		}
 | |
| 		select {
 | |
| 		case wp.output <- in.samples:
 | |
| 		default:
 | |
| 		}
 | |
| 
 | |
| 		for _, s := range in.histogramSamples {
 | |
| 			if s.t < minValidTime {
 | |
| 				continue
 | |
| 			}
 | |
| 			ms := h.series.getByID(s.ref)
 | |
| 			if ms == nil {
 | |
| 				unknownHistogramRefs++
 | |
| 				continue
 | |
| 			}
 | |
| 			if s.t <= ms.mmMaxTime {
 | |
| 				continue
 | |
| 			}
 | |
| 			var chunkCreated bool
 | |
| 			if s.h != nil {
 | |
| 				_, chunkCreated = ms.appendHistogram(s.t, s.h, 0, h.chunkDiskMapper, chunkRange)
 | |
| 			} else {
 | |
| 				_, chunkCreated = ms.appendFloatHistogram(s.t, s.fh, 0, h.chunkDiskMapper, chunkRange)
 | |
| 			}
 | |
| 			if chunkCreated {
 | |
| 				h.metrics.chunksCreated.Inc()
 | |
| 				h.metrics.chunks.Inc()
 | |
| 			}
 | |
| 			if s.t > maxt {
 | |
| 				maxt = s.t
 | |
| 			}
 | |
| 			if s.t < mint {
 | |
| 				mint = s.t
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		select {
 | |
| 		case wp.histogramsOutput <- in.histogramSamples:
 | |
| 		default:
 | |
| 		}
 | |
| 	}
 | |
| 	h.updateMinMaxTime(mint, maxt)
 | |
| 
 | |
| 	return unknownRefs, unknownHistogramRefs, mmapOverlappingChunks
 | |
| }
 | |
| 
 | |
| func (h *Head) loadWBL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, lastMmapRef chunks.ChunkDiskMapperRef) (err error) {
 | |
| 	// Track number of samples, m-map markers, that referenced a series we don't know about
 | |
| 	// for error reporting.
 | |
| 	var unknownRefs, mmapMarkerUnknownRefs atomic.Uint64
 | |
| 
 | |
| 	lastSeq, lastOff := lastMmapRef.Unpack()
 | |
| 	// Start workers that each process samples for a partition of the series ID space.
 | |
| 	var (
 | |
| 		wg         sync.WaitGroup
 | |
| 		n          = runtime.GOMAXPROCS(0)
 | |
| 		processors = make([]wblSubsetProcessor, n)
 | |
| 
 | |
| 		dec    record.Decoder
 | |
| 		shards = make([][]record.RefSample, n)
 | |
| 
 | |
| 		decodedCh   = make(chan interface{}, 10)
 | |
| 		decodeErr   error
 | |
| 		samplesPool = sync.Pool{
 | |
| 			New: func() interface{} {
 | |
| 				return []record.RefSample{}
 | |
| 			},
 | |
| 		}
 | |
| 		markersPool = sync.Pool{
 | |
| 			New: func() interface{} {
 | |
| 				return []record.RefMmapMarker{}
 | |
| 			},
 | |
| 		}
 | |
| 	)
 | |
| 
 | |
| 	defer func() {
 | |
| 		// For CorruptionErr ensure to terminate all workers before exiting.
 | |
| 		// We also wrap it to identify OOO WBL corruption.
 | |
| 		_, ok := err.(*wlog.CorruptionErr)
 | |
| 		if ok {
 | |
| 			err = &errLoadWbl{err: err}
 | |
| 			for i := 0; i < n; i++ {
 | |
| 				processors[i].closeAndDrain()
 | |
| 			}
 | |
| 			wg.Wait()
 | |
| 		}
 | |
| 	}()
 | |
| 
 | |
| 	wg.Add(n)
 | |
| 	for i := 0; i < n; i++ {
 | |
| 		processors[i].setup()
 | |
| 
 | |
| 		go func(wp *wblSubsetProcessor) {
 | |
| 			unknown := wp.processWBLSamples(h)
 | |
| 			unknownRefs.Add(unknown)
 | |
| 			wg.Done()
 | |
| 		}(&processors[i])
 | |
| 	}
 | |
| 
 | |
| 	go func() {
 | |
| 		defer close(decodedCh)
 | |
| 		for r.Next() {
 | |
| 			rec := r.Record()
 | |
| 			switch dec.Type(rec) {
 | |
| 			case record.Samples:
 | |
| 				samples := samplesPool.Get().([]record.RefSample)[:0]
 | |
| 				samples, err = dec.Samples(rec, samples)
 | |
| 				if err != nil {
 | |
| 					decodeErr = &wlog.CorruptionErr{
 | |
| 						Err:     errors.Wrap(err, "decode samples"),
 | |
| 						Segment: r.Segment(),
 | |
| 						Offset:  r.Offset(),
 | |
| 					}
 | |
| 					return
 | |
| 				}
 | |
| 				decodedCh <- samples
 | |
| 			case record.MmapMarkers:
 | |
| 				markers := markersPool.Get().([]record.RefMmapMarker)[:0]
 | |
| 				markers, err = dec.MmapMarkers(rec, markers)
 | |
| 				if err != nil {
 | |
| 					decodeErr = &wlog.CorruptionErr{
 | |
| 						Err:     errors.Wrap(err, "decode mmap markers"),
 | |
| 						Segment: r.Segment(),
 | |
| 						Offset:  r.Offset(),
 | |
| 					}
 | |
| 					return
 | |
| 				}
 | |
| 				decodedCh <- markers
 | |
| 			default:
 | |
| 				// Noop.
 | |
| 			}
 | |
| 		}
 | |
| 	}()
 | |
| 
 | |
| 	// The records are always replayed from the oldest to the newest.
 | |
| 	for d := range decodedCh {
 | |
| 		switch v := d.(type) {
 | |
| 		case []record.RefSample:
 | |
| 			samples := v
 | |
| 			// We split up the samples into parts of 5000 samples or less.
 | |
| 			// With O(300 * #cores) in-flight sample batches, large scrapes could otherwise
 | |
| 			// cause thousands of very large in flight buffers occupying large amounts
 | |
| 			// of unused memory.
 | |
| 			for len(samples) > 0 {
 | |
| 				m := 5000
 | |
| 				if len(samples) < m {
 | |
| 					m = len(samples)
 | |
| 				}
 | |
| 				for i := 0; i < n; i++ {
 | |
| 					shards[i] = processors[i].reuseBuf()
 | |
| 				}
 | |
| 				for _, sam := range samples[:m] {
 | |
| 					if r, ok := multiRef[sam.Ref]; ok {
 | |
| 						sam.Ref = r
 | |
| 					}
 | |
| 					mod := uint64(sam.Ref) % uint64(n)
 | |
| 					shards[mod] = append(shards[mod], sam)
 | |
| 				}
 | |
| 				for i := 0; i < n; i++ {
 | |
| 					processors[i].input <- shards[i]
 | |
| 				}
 | |
| 				samples = samples[m:]
 | |
| 			}
 | |
| 			//nolint:staticcheck // Ignore SA6002 relax staticcheck verification.
 | |
| 			samplesPool.Put(d)
 | |
| 		case []record.RefMmapMarker:
 | |
| 			markers := v
 | |
| 			for _, rm := range markers {
 | |
| 				seq, off := rm.MmapRef.Unpack()
 | |
| 				if seq > lastSeq || (seq == lastSeq && off > lastOff) {
 | |
| 					// This m-map chunk from markers was not present during
 | |
| 					// the load of mmapped chunks that happened in the head
 | |
| 					// initialization.
 | |
| 					continue
 | |
| 				}
 | |
| 
 | |
| 				if r, ok := multiRef[rm.Ref]; ok {
 | |
| 					rm.Ref = r
 | |
| 				}
 | |
| 
 | |
| 				ms := h.series.getByID(rm.Ref)
 | |
| 				if ms == nil {
 | |
| 					mmapMarkerUnknownRefs.Inc()
 | |
| 					continue
 | |
| 				}
 | |
| 				idx := uint64(ms.ref) % uint64(n)
 | |
| 				// It is possible that some old sample is being processed in processWALSamples that
 | |
| 				// could cause race below. So we wait for the goroutine to empty input the buffer and finish
 | |
| 				// processing all old samples after emptying the buffer.
 | |
| 				processors[idx].waitUntilIdle()
 | |
| 				// Lock the subset so we can modify the series object
 | |
| 				processors[idx].mx.Lock()
 | |
| 
 | |
| 				// All samples till now have been m-mapped. Hence clear out the headChunk.
 | |
| 				// In case some samples slipped through and went into m-map chunks because of changed
 | |
| 				// chunk size parameters, we are not taking care of that here.
 | |
| 				// TODO(codesome): see if there is a way to avoid duplicate m-map chunks if
 | |
| 				// the size of ooo chunk was reduced between restart.
 | |
| 				if ms.ooo != nil {
 | |
| 					ms.ooo.oooHeadChunk = nil
 | |
| 				}
 | |
| 
 | |
| 				processors[idx].mx.Unlock()
 | |
| 			}
 | |
| 		default:
 | |
| 			panic(fmt.Errorf("unexpected decodedCh type: %T", d))
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if decodeErr != nil {
 | |
| 		return decodeErr
 | |
| 	}
 | |
| 
 | |
| 	// Signal termination to each worker and wait for it to close its output channel.
 | |
| 	for i := 0; i < n; i++ {
 | |
| 		processors[i].closeAndDrain()
 | |
| 	}
 | |
| 	wg.Wait()
 | |
| 
 | |
| 	if r.Err() != nil {
 | |
| 		return errors.Wrap(r.Err(), "read records")
 | |
| 	}
 | |
| 
 | |
| 	if unknownRefs.Load() > 0 || mmapMarkerUnknownRefs.Load() > 0 {
 | |
| 		level.Warn(h.logger).Log("msg", "Unknown series references for ooo WAL replay", "samples", unknownRefs.Load(), "mmap_markers", mmapMarkerUnknownRefs.Load())
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| type errLoadWbl struct {
 | |
| 	err error
 | |
| }
 | |
| 
 | |
| func (e errLoadWbl) Error() string {
 | |
| 	return e.err.Error()
 | |
| }
 | |
| 
 | |
| // To support errors.Cause().
 | |
| func (e errLoadWbl) Cause() error {
 | |
| 	return e.err
 | |
| }
 | |
| 
 | |
| // To support errors.Unwrap().
 | |
| func (e errLoadWbl) Unwrap() error {
 | |
| 	return e.err
 | |
| }
 | |
| 
 | |
| // isErrLoadOOOWal returns a boolean if the error is errLoadWbl.
 | |
| func isErrLoadOOOWal(err error) bool {
 | |
| 	_, ok := err.(*errLoadWbl)
 | |
| 	return ok
 | |
| }
 | |
| 
 | |
| type wblSubsetProcessor struct {
 | |
| 	mx     sync.Mutex // Take this lock while modifying series in the subset.
 | |
| 	input  chan []record.RefSample
 | |
| 	output chan []record.RefSample
 | |
| }
 | |
| 
 | |
| func (wp *wblSubsetProcessor) setup() {
 | |
| 	wp.output = make(chan []record.RefSample, 300)
 | |
| 	wp.input = make(chan []record.RefSample, 300)
 | |
| }
 | |
| 
 | |
| func (wp *wblSubsetProcessor) closeAndDrain() {
 | |
| 	close(wp.input)
 | |
| 	for range wp.output {
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // If there is a buffer in the output chan, return it for reuse, otherwise return nil.
 | |
| func (wp *wblSubsetProcessor) reuseBuf() []record.RefSample {
 | |
| 	select {
 | |
| 	case buf := <-wp.output:
 | |
| 		return buf[:0]
 | |
| 	default:
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // processWBLSamples adds the samples it receives to the head and passes
 | |
| // the buffer received to an output channel for reuse.
 | |
| // Samples before the minValidTime timestamp are discarded.
 | |
| func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (unknownRefs uint64) {
 | |
| 	defer close(wp.output)
 | |
| 
 | |
| 	oooCapMax := h.opts.OutOfOrderCapMax.Load()
 | |
| 	// We don't check for minValidTime for ooo samples.
 | |
| 	mint, maxt := int64(math.MaxInt64), int64(math.MinInt64)
 | |
| 	for samples := range wp.input {
 | |
| 		wp.mx.Lock()
 | |
| 		for _, s := range samples {
 | |
| 			ms := h.series.getByID(s.Ref)
 | |
| 			if ms == nil {
 | |
| 				unknownRefs++
 | |
| 				continue
 | |
| 			}
 | |
| 			ok, chunkCreated, _ := ms.insert(s.T, s.V, h.chunkDiskMapper, oooCapMax)
 | |
| 			if chunkCreated {
 | |
| 				h.metrics.chunksCreated.Inc()
 | |
| 				h.metrics.chunks.Inc()
 | |
| 			}
 | |
| 			if ok {
 | |
| 				if s.T < mint {
 | |
| 					mint = s.T
 | |
| 				}
 | |
| 				if s.T > maxt {
 | |
| 					maxt = s.T
 | |
| 				}
 | |
| 			}
 | |
| 		}
 | |
| 		wp.mx.Unlock()
 | |
| 
 | |
| 	}
 | |
| 
 | |
| 	h.updateMinOOOMaxOOOTime(mint, maxt)
 | |
| 
 | |
| 	return unknownRefs
 | |
| }
 | |
| 
 | |
| func (wp *wblSubsetProcessor) waitUntilIdle() {
 | |
| 	select {
 | |
| 	case <-wp.output: // Allow output side to drain to avoid deadlock.
 | |
| 	default:
 | |
| 	}
 | |
| 	wp.input <- []record.RefSample{}
 | |
| 	for len(wp.input) != 0 {
 | |
| 		time.Sleep(10 * time.Microsecond)
 | |
| 		select {
 | |
| 		case <-wp.output: // Allow output side to drain to avoid deadlock.
 | |
| 		default:
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| const (
 | |
| 	chunkSnapshotRecordTypeSeries     uint8 = 1
 | |
| 	chunkSnapshotRecordTypeTombstones uint8 = 2
 | |
| 	chunkSnapshotRecordTypeExemplars  uint8 = 3
 | |
| )
 | |
| 
 | |
| type chunkSnapshotRecord struct {
 | |
| 	ref       chunks.HeadSeriesRef
 | |
| 	lset      labels.Labels
 | |
| 	mc        *memChunk
 | |
| 	lastValue float64
 | |
| }
 | |
| 
 | |
| func (s *memSeries) encodeToSnapshotRecord(b []byte) []byte {
 | |
| 	buf := encoding.Encbuf{B: b}
 | |
| 
 | |
| 	buf.PutByte(chunkSnapshotRecordTypeSeries)
 | |
| 	buf.PutBE64(uint64(s.ref))
 | |
| 	record.EncodeLabels(&buf, s.lset)
 | |
| 	buf.PutBE64int64(0) // Backwards-compatibility; was chunkRange but now unused.
 | |
| 
 | |
| 	s.Lock()
 | |
| 	if s.headChunk == nil {
 | |
| 		buf.PutUvarint(0)
 | |
| 	} else {
 | |
| 		buf.PutUvarint(1)
 | |
| 		buf.PutBE64int64(s.headChunk.minTime)
 | |
| 		buf.PutBE64int64(s.headChunk.maxTime)
 | |
| 		buf.PutByte(byte(s.headChunk.chunk.Encoding()))
 | |
| 		buf.PutUvarintBytes(s.headChunk.chunk.Bytes())
 | |
| 		// Backwards compatibility for old sampleBuf which had last 4 samples.
 | |
| 		for i := 0; i < 3; i++ {
 | |
| 			buf.PutBE64int64(0)
 | |
| 			buf.PutBEFloat64(0)
 | |
| 		}
 | |
| 		buf.PutBE64int64(0)
 | |
| 		buf.PutBEFloat64(s.lastValue)
 | |
| 	}
 | |
| 	s.Unlock()
 | |
| 
 | |
| 	return buf.Get()
 | |
| }
 | |
| 
 | |
| func decodeSeriesFromChunkSnapshot(d *record.Decoder, b []byte) (csr chunkSnapshotRecord, err error) {
 | |
| 	dec := encoding.Decbuf{B: b}
 | |
| 
 | |
| 	if flag := dec.Byte(); flag != chunkSnapshotRecordTypeSeries {
 | |
| 		return csr, errors.Errorf("invalid record type %x", flag)
 | |
| 	}
 | |
| 
 | |
| 	csr.ref = chunks.HeadSeriesRef(dec.Be64())
 | |
| 	// The label set written to the disk is already sorted.
 | |
| 	// TODO: figure out why DecodeLabels calls Sort(), and perhaps remove it.
 | |
| 	csr.lset = d.DecodeLabels(&dec)
 | |
| 
 | |
| 	_ = dec.Be64int64() // Was chunkRange but now unused.
 | |
| 	if dec.Uvarint() == 0 {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	csr.mc = &memChunk{}
 | |
| 	csr.mc.minTime = dec.Be64int64()
 | |
| 	csr.mc.maxTime = dec.Be64int64()
 | |
| 	enc := chunkenc.Encoding(dec.Byte())
 | |
| 
 | |
| 	// The underlying bytes gets re-used later, so make a copy.
 | |
| 	chunkBytes := dec.UvarintBytes()
 | |
| 	chunkBytesCopy := make([]byte, len(chunkBytes))
 | |
| 	copy(chunkBytesCopy, chunkBytes)
 | |
| 
 | |
| 	chk, err := chunkenc.FromData(enc, chunkBytesCopy)
 | |
| 	if err != nil {
 | |
| 		return csr, errors.Wrap(err, "chunk from data")
 | |
| 	}
 | |
| 	csr.mc.chunk = chk
 | |
| 
 | |
| 	// Backwards-compatibility for old sampleBuf which had last 4 samples.
 | |
| 	for i := 0; i < 3; i++ {
 | |
| 		_ = dec.Be64int64()
 | |
| 		_ = dec.Be64Float64()
 | |
| 	}
 | |
| 	_ = dec.Be64int64()
 | |
| 	csr.lastValue = dec.Be64Float64()
 | |
| 
 | |
| 	err = dec.Err()
 | |
| 	if err != nil && len(dec.B) > 0 {
 | |
| 		err = errors.Errorf("unexpected %d bytes left in entry", len(dec.B))
 | |
| 	}
 | |
| 
 | |
| 	return
 | |
| }
 | |
| 
 | |
| func encodeTombstonesToSnapshotRecord(tr tombstones.Reader) ([]byte, error) {
 | |
| 	buf := encoding.Encbuf{}
 | |
| 
 | |
| 	buf.PutByte(chunkSnapshotRecordTypeTombstones)
 | |
| 	b, err := tombstones.Encode(tr)
 | |
| 	if err != nil {
 | |
| 		return nil, errors.Wrap(err, "encode tombstones")
 | |
| 	}
 | |
| 	buf.PutUvarintBytes(b)
 | |
| 
 | |
| 	return buf.Get(), nil
 | |
| }
 | |
| 
 | |
| func decodeTombstonesSnapshotRecord(b []byte) (tombstones.Reader, error) {
 | |
| 	dec := encoding.Decbuf{B: b}
 | |
| 
 | |
| 	if flag := dec.Byte(); flag != chunkSnapshotRecordTypeTombstones {
 | |
| 		return nil, errors.Errorf("invalid record type %x", flag)
 | |
| 	}
 | |
| 
 | |
| 	tr, err := tombstones.Decode(dec.UvarintBytes())
 | |
| 	return tr, errors.Wrap(err, "decode tombstones")
 | |
| }
 | |
| 
 | |
| const chunkSnapshotPrefix = "chunk_snapshot."
 | |
| 
 | |
| // ChunkSnapshot creates a snapshot of all the series and tombstones in the head.
 | |
| // It deletes the old chunk snapshots if the chunk snapshot creation is successful.
 | |
| //
 | |
| // The chunk snapshot is stored in a directory named chunk_snapshot.N.M and is written
 | |
| // using the WAL package. N is the last WAL segment present during snapshotting and
 | |
| // M is the offset in segment N upto which data was written.
 | |
| //
 | |
| // The snapshot first contains all series (each in individual records and not sorted), followed by
 | |
| // tombstones (a single record), and finally exemplars (>= 1 record). Exemplars are in the order they
 | |
| // were written to the circular buffer.
 | |
| func (h *Head) ChunkSnapshot() (*ChunkSnapshotStats, error) {
 | |
| 	if h.wal == nil {
 | |
| 		// If we are not storing any WAL, does not make sense to take a snapshot too.
 | |
| 		level.Warn(h.logger).Log("msg", "skipping chunk snapshotting as WAL is disabled")
 | |
| 		return &ChunkSnapshotStats{}, nil
 | |
| 	}
 | |
| 	h.chunkSnapshotMtx.Lock()
 | |
| 	defer h.chunkSnapshotMtx.Unlock()
 | |
| 
 | |
| 	stats := &ChunkSnapshotStats{}
 | |
| 
 | |
| 	wlast, woffset, err := h.wal.LastSegmentAndOffset()
 | |
| 	if err != nil && err != record.ErrNotFound {
 | |
| 		return stats, errors.Wrap(err, "get last wal segment and offset")
 | |
| 	}
 | |
| 
 | |
| 	_, cslast, csoffset, err := LastChunkSnapshot(h.opts.ChunkDirRoot)
 | |
| 	if err != nil && err != record.ErrNotFound {
 | |
| 		return stats, errors.Wrap(err, "find last chunk snapshot")
 | |
| 	}
 | |
| 
 | |
| 	if wlast == cslast && woffset == csoffset {
 | |
| 		// Nothing has been written to the WAL/Head since the last snapshot.
 | |
| 		return stats, nil
 | |
| 	}
 | |
| 
 | |
| 	snapshotName := chunkSnapshotDir(wlast, woffset)
 | |
| 
 | |
| 	cpdir := filepath.Join(h.opts.ChunkDirRoot, snapshotName)
 | |
| 	cpdirtmp := cpdir + ".tmp"
 | |
| 	stats.Dir = cpdir
 | |
| 
 | |
| 	if err := os.MkdirAll(cpdirtmp, 0o777); err != nil {
 | |
| 		return stats, errors.Wrap(err, "create chunk snapshot dir")
 | |
| 	}
 | |
| 	cp, err := wlog.New(nil, nil, cpdirtmp, h.wal.CompressionEnabled())
 | |
| 	if err != nil {
 | |
| 		return stats, errors.Wrap(err, "open chunk snapshot")
 | |
| 	}
 | |
| 
 | |
| 	// Ensures that an early return caused by an error doesn't leave any tmp files.
 | |
| 	defer func() {
 | |
| 		cp.Close()
 | |
| 		os.RemoveAll(cpdirtmp)
 | |
| 	}()
 | |
| 
 | |
| 	var (
 | |
| 		buf  []byte
 | |
| 		recs [][]byte
 | |
| 	)
 | |
| 	// Add all series to the snapshot.
 | |
| 	stripeSize := h.series.size
 | |
| 	for i := 0; i < stripeSize; i++ {
 | |
| 		h.series.locks[i].RLock()
 | |
| 
 | |
| 		for _, s := range h.series.series[i] {
 | |
| 			start := len(buf)
 | |
| 			buf = s.encodeToSnapshotRecord(buf)
 | |
| 			if len(buf[start:]) == 0 {
 | |
| 				continue // All contents discarded.
 | |
| 			}
 | |
| 			recs = append(recs, buf[start:])
 | |
| 			// Flush records in 10 MB increments.
 | |
| 			if len(buf) > 10*1024*1024 {
 | |
| 				if err := cp.Log(recs...); err != nil {
 | |
| 					h.series.locks[i].RUnlock()
 | |
| 					return stats, errors.Wrap(err, "flush records")
 | |
| 				}
 | |
| 				buf, recs = buf[:0], recs[:0]
 | |
| 			}
 | |
| 		}
 | |
| 		stats.TotalSeries += len(h.series.series[i])
 | |
| 
 | |
| 		h.series.locks[i].RUnlock()
 | |
| 	}
 | |
| 
 | |
| 	// Add tombstones to the snapshot.
 | |
| 	tombstonesReader, err := h.Tombstones()
 | |
| 	if err != nil {
 | |
| 		return stats, errors.Wrap(err, "get tombstones")
 | |
| 	}
 | |
| 	rec, err := encodeTombstonesToSnapshotRecord(tombstonesReader)
 | |
| 	if err != nil {
 | |
| 		return stats, errors.Wrap(err, "encode tombstones")
 | |
| 	}
 | |
| 	recs = append(recs, rec)
 | |
| 	// Flush remaining series records and tombstones.
 | |
| 	if err := cp.Log(recs...); err != nil {
 | |
| 		return stats, errors.Wrap(err, "flush records")
 | |
| 	}
 | |
| 	buf = buf[:0]
 | |
| 
 | |
| 	// Add exemplars in the snapshot.
 | |
| 	// We log in batches, with each record having upto 10000 exemplars.
 | |
| 	// Assuming 100 bytes (overestimate) per exemplar, that's ~1MB.
 | |
| 	maxExemplarsPerRecord := 10000
 | |
| 	batch := make([]record.RefExemplar, 0, maxExemplarsPerRecord)
 | |
| 	enc := record.Encoder{}
 | |
| 	flushExemplars := func() error {
 | |
| 		if len(batch) == 0 {
 | |
| 			return nil
 | |
| 		}
 | |
| 		buf = buf[:0]
 | |
| 		encbuf := encoding.Encbuf{B: buf}
 | |
| 		encbuf.PutByte(chunkSnapshotRecordTypeExemplars)
 | |
| 		enc.EncodeExemplarsIntoBuffer(batch, &encbuf)
 | |
| 		if err := cp.Log(encbuf.Get()); err != nil {
 | |
| 			return errors.Wrap(err, "log exemplars")
 | |
| 		}
 | |
| 		buf, batch = buf[:0], batch[:0]
 | |
| 		return nil
 | |
| 	}
 | |
| 	err = h.exemplars.IterateExemplars(func(seriesLabels labels.Labels, e exemplar.Exemplar) error {
 | |
| 		if len(batch) >= maxExemplarsPerRecord {
 | |
| 			if err := flushExemplars(); err != nil {
 | |
| 				return errors.Wrap(err, "flush exemplars")
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		ms := h.series.getByHash(seriesLabels.Hash(), seriesLabels)
 | |
| 		if ms == nil {
 | |
| 			// It is possible that exemplar refers to some old series. We discard such exemplars.
 | |
| 			return nil
 | |
| 		}
 | |
| 		batch = append(batch, record.RefExemplar{
 | |
| 			Ref:    ms.ref,
 | |
| 			T:      e.Ts,
 | |
| 			V:      e.Value,
 | |
| 			Labels: e.Labels,
 | |
| 		})
 | |
| 		return nil
 | |
| 	})
 | |
| 	if err != nil {
 | |
| 		return stats, errors.Wrap(err, "iterate exemplars")
 | |
| 	}
 | |
| 
 | |
| 	// Flush remaining exemplars.
 | |
| 	if err := flushExemplars(); err != nil {
 | |
| 		return stats, errors.Wrap(err, "flush exemplars at the end")
 | |
| 	}
 | |
| 
 | |
| 	if err := cp.Close(); err != nil {
 | |
| 		return stats, errors.Wrap(err, "close chunk snapshot")
 | |
| 	}
 | |
| 	if err := fileutil.Replace(cpdirtmp, cpdir); err != nil {
 | |
| 		return stats, errors.Wrap(err, "rename chunk snapshot directory")
 | |
| 	}
 | |
| 
 | |
| 	if err := DeleteChunkSnapshots(h.opts.ChunkDirRoot, wlast, woffset); err != nil {
 | |
| 		// Leftover old chunk snapshots do not cause problems down the line beyond
 | |
| 		// occupying disk space.
 | |
| 		// They will just be ignored since a higher chunk snapshot exists.
 | |
| 		level.Error(h.logger).Log("msg", "delete old chunk snapshots", "err", err)
 | |
| 	}
 | |
| 	return stats, nil
 | |
| }
 | |
| 
 | |
| func chunkSnapshotDir(wlast, woffset int) string {
 | |
| 	return fmt.Sprintf(chunkSnapshotPrefix+"%06d.%010d", wlast, woffset)
 | |
| }
 | |
| 
 | |
| func (h *Head) performChunkSnapshot() error {
 | |
| 	level.Info(h.logger).Log("msg", "creating chunk snapshot")
 | |
| 	startTime := time.Now()
 | |
| 	stats, err := h.ChunkSnapshot()
 | |
| 	elapsed := time.Since(startTime)
 | |
| 	if err == nil {
 | |
| 		level.Info(h.logger).Log("msg", "chunk snapshot complete", "duration", elapsed.String(), "num_series", stats.TotalSeries, "dir", stats.Dir)
 | |
| 	}
 | |
| 	return errors.Wrap(err, "chunk snapshot")
 | |
| }
 | |
| 
 | |
| // ChunkSnapshotStats returns stats about a created chunk snapshot.
 | |
| type ChunkSnapshotStats struct {
 | |
| 	TotalSeries int
 | |
| 	Dir         string
 | |
| }
 | |
| 
 | |
| // LastChunkSnapshot returns the directory name and index of the most recent chunk snapshot.
 | |
| // If dir does not contain any chunk snapshots, ErrNotFound is returned.
 | |
| func LastChunkSnapshot(dir string) (string, int, int, error) {
 | |
| 	files, err := os.ReadDir(dir)
 | |
| 	if err != nil {
 | |
| 		return "", 0, 0, err
 | |
| 	}
 | |
| 	maxIdx, maxOffset := -1, -1
 | |
| 	maxFileName := ""
 | |
| 	for i := 0; i < len(files); i++ {
 | |
| 		fi := files[i]
 | |
| 
 | |
| 		if !strings.HasPrefix(fi.Name(), chunkSnapshotPrefix) {
 | |
| 			continue
 | |
| 		}
 | |
| 		if !fi.IsDir() {
 | |
| 			return "", 0, 0, errors.Errorf("chunk snapshot %s is not a directory", fi.Name())
 | |
| 		}
 | |
| 
 | |
| 		splits := strings.Split(fi.Name()[len(chunkSnapshotPrefix):], ".")
 | |
| 		if len(splits) != 2 {
 | |
| 			// Chunk snapshots is not in the right format, we do not care about it.
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		idx, err := strconv.Atoi(splits[0])
 | |
| 		if err != nil {
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		offset, err := strconv.Atoi(splits[1])
 | |
| 		if err != nil {
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		if idx > maxIdx || (idx == maxIdx && offset > maxOffset) {
 | |
| 			maxIdx, maxOffset = idx, offset
 | |
| 			maxFileName = filepath.Join(dir, fi.Name())
 | |
| 		}
 | |
| 	}
 | |
| 	if maxFileName == "" {
 | |
| 		return "", 0, 0, record.ErrNotFound
 | |
| 	}
 | |
| 	return maxFileName, maxIdx, maxOffset, nil
 | |
| }
 | |
| 
 | |
| // DeleteChunkSnapshots deletes all chunk snapshots in a directory below a given index.
 | |
| func DeleteChunkSnapshots(dir string, maxIndex, maxOffset int) error {
 | |
| 	files, err := os.ReadDir(dir)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	errs := tsdb_errors.NewMulti()
 | |
| 	for _, fi := range files {
 | |
| 		if !strings.HasPrefix(fi.Name(), chunkSnapshotPrefix) {
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		splits := strings.Split(fi.Name()[len(chunkSnapshotPrefix):], ".")
 | |
| 		if len(splits) != 2 {
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		idx, err := strconv.Atoi(splits[0])
 | |
| 		if err != nil {
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		offset, err := strconv.Atoi(splits[1])
 | |
| 		if err != nil {
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		if idx < maxIndex || (idx == maxIndex && offset < maxOffset) {
 | |
| 			if err := os.RemoveAll(filepath.Join(dir, fi.Name())); err != nil {
 | |
| 				errs.Add(err)
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 	}
 | |
| 	return errs.Err()
 | |
| }
 | |
| 
 | |
| // loadChunkSnapshot replays the chunk snapshot and restores the Head state from it. If there was any error returned,
 | |
| // it is the responsibility of the caller to clear the contents of the Head.
 | |
| func (h *Head) loadChunkSnapshot() (int, int, map[chunks.HeadSeriesRef]*memSeries, error) {
 | |
| 	dir, snapIdx, snapOffset, err := LastChunkSnapshot(h.opts.ChunkDirRoot)
 | |
| 	if err != nil {
 | |
| 		if err == record.ErrNotFound {
 | |
| 			return snapIdx, snapOffset, nil, nil
 | |
| 		}
 | |
| 		return snapIdx, snapOffset, nil, errors.Wrap(err, "find last chunk snapshot")
 | |
| 	}
 | |
| 
 | |
| 	start := time.Now()
 | |
| 	sr, err := wlog.NewSegmentsReader(dir)
 | |
| 	if err != nil {
 | |
| 		return snapIdx, snapOffset, nil, errors.Wrap(err, "open chunk snapshot")
 | |
| 	}
 | |
| 	defer func() {
 | |
| 		if err := sr.Close(); err != nil {
 | |
| 			level.Warn(h.logger).Log("msg", "error while closing the wal segments reader", "err", err)
 | |
| 		}
 | |
| 	}()
 | |
| 
 | |
| 	var (
 | |
| 		numSeries        = 0
 | |
| 		unknownRefs      = int64(0)
 | |
| 		n                = runtime.GOMAXPROCS(0)
 | |
| 		wg               sync.WaitGroup
 | |
| 		recordChan       = make(chan chunkSnapshotRecord, 5*n)
 | |
| 		shardedRefSeries = make([]map[chunks.HeadSeriesRef]*memSeries, n)
 | |
| 		errChan          = make(chan error, n)
 | |
| 		refSeries        map[chunks.HeadSeriesRef]*memSeries
 | |
| 		exemplarBuf      []record.RefExemplar
 | |
| 		dec              record.Decoder
 | |
| 	)
 | |
| 
 | |
| 	wg.Add(n)
 | |
| 	for i := 0; i < n; i++ {
 | |
| 		go func(idx int, rc <-chan chunkSnapshotRecord) {
 | |
| 			defer wg.Done()
 | |
| 			defer func() {
 | |
| 				// If there was an error, drain the channel
 | |
| 				// to unblock the main thread.
 | |
| 				for range rc {
 | |
| 				}
 | |
| 			}()
 | |
| 
 | |
| 			shardedRefSeries[idx] = make(map[chunks.HeadSeriesRef]*memSeries)
 | |
| 			localRefSeries := shardedRefSeries[idx]
 | |
| 
 | |
| 			for csr := range rc {
 | |
| 				series, _, err := h.getOrCreateWithID(csr.ref, csr.lset.Hash(), csr.lset)
 | |
| 				if err != nil {
 | |
| 					errChan <- err
 | |
| 					return
 | |
| 				}
 | |
| 				localRefSeries[csr.ref] = series
 | |
| 				for {
 | |
| 					seriesID := uint64(series.ref)
 | |
| 					lastSeriesID := h.lastSeriesID.Load()
 | |
| 					if lastSeriesID >= seriesID || h.lastSeriesID.CompareAndSwap(lastSeriesID, seriesID) {
 | |
| 						break
 | |
| 					}
 | |
| 				}
 | |
| 
 | |
| 				if csr.mc == nil {
 | |
| 					continue
 | |
| 				}
 | |
| 				series.nextAt = csr.mc.maxTime // This will create a new chunk on append.
 | |
| 				series.headChunk = csr.mc
 | |
| 				series.lastValue = csr.lastValue
 | |
| 
 | |
| 				app, err := series.headChunk.chunk.Appender()
 | |
| 				if err != nil {
 | |
| 					errChan <- err
 | |
| 					return
 | |
| 				}
 | |
| 				series.app = app
 | |
| 
 | |
| 				h.updateMinMaxTime(csr.mc.minTime, csr.mc.maxTime)
 | |
| 			}
 | |
| 		}(i, recordChan)
 | |
| 	}
 | |
| 
 | |
| 	r := wlog.NewReader(sr)
 | |
| 	var loopErr error
 | |
| Outer:
 | |
| 	for r.Next() {
 | |
| 		select {
 | |
| 		case err := <-errChan:
 | |
| 			errChan <- err
 | |
| 			break Outer
 | |
| 		default:
 | |
| 		}
 | |
| 
 | |
| 		rec := r.Record()
 | |
| 		switch rec[0] {
 | |
| 		case chunkSnapshotRecordTypeSeries:
 | |
| 			numSeries++
 | |
| 			csr, err := decodeSeriesFromChunkSnapshot(&dec, rec)
 | |
| 			if err != nil {
 | |
| 				loopErr = errors.Wrap(err, "decode series record")
 | |
| 				break Outer
 | |
| 			}
 | |
| 			recordChan <- csr
 | |
| 
 | |
| 		case chunkSnapshotRecordTypeTombstones:
 | |
| 			tr, err := decodeTombstonesSnapshotRecord(rec)
 | |
| 			if err != nil {
 | |
| 				loopErr = errors.Wrap(err, "decode tombstones")
 | |
| 				break Outer
 | |
| 			}
 | |
| 
 | |
| 			if err = tr.Iter(func(ref storage.SeriesRef, ivs tombstones.Intervals) error {
 | |
| 				h.tombstones.AddInterval(ref, ivs...)
 | |
| 				return nil
 | |
| 			}); err != nil {
 | |
| 				loopErr = errors.Wrap(err, "iterate tombstones")
 | |
| 				break Outer
 | |
| 			}
 | |
| 
 | |
| 		case chunkSnapshotRecordTypeExemplars:
 | |
| 			// Exemplars are at the end of snapshot. So all series are loaded at this point.
 | |
| 			if len(refSeries) == 0 {
 | |
| 				close(recordChan)
 | |
| 				wg.Wait()
 | |
| 
 | |
| 				refSeries = make(map[chunks.HeadSeriesRef]*memSeries, numSeries)
 | |
| 				for _, shard := range shardedRefSeries {
 | |
| 					for k, v := range shard {
 | |
| 						refSeries[k] = v
 | |
| 					}
 | |
| 				}
 | |
| 			}
 | |
| 
 | |
| 			if !h.opts.EnableExemplarStorage || h.opts.MaxExemplars.Load() <= 0 {
 | |
| 				// Exemplar storage is disabled.
 | |
| 				continue Outer
 | |
| 			}
 | |
| 
 | |
| 			decbuf := encoding.Decbuf{B: rec[1:]}
 | |
| 
 | |
| 			exemplarBuf = exemplarBuf[:0]
 | |
| 			exemplarBuf, err = dec.ExemplarsFromBuffer(&decbuf, exemplarBuf)
 | |
| 			if err != nil {
 | |
| 				loopErr = errors.Wrap(err, "exemplars from buffer")
 | |
| 				break Outer
 | |
| 			}
 | |
| 
 | |
| 			for _, e := range exemplarBuf {
 | |
| 				ms, ok := refSeries[e.Ref]
 | |
| 				if !ok {
 | |
| 					unknownRefs++
 | |
| 					continue
 | |
| 				}
 | |
| 
 | |
| 				if err := h.exemplars.AddExemplar(ms.lset, exemplar.Exemplar{
 | |
| 					Labels: e.Labels,
 | |
| 					Value:  e.V,
 | |
| 					Ts:     e.T,
 | |
| 				}); err != nil {
 | |
| 					loopErr = errors.Wrap(err, "add exemplar")
 | |
| 					break Outer
 | |
| 				}
 | |
| 			}
 | |
| 
 | |
| 		default:
 | |
| 			// This is a record type we don't understand. It is either and old format from earlier versions,
 | |
| 			// or a new format and the code was rolled back to old version.
 | |
| 			loopErr = errors.Errorf("unsuported snapshot record type 0b%b", rec[0])
 | |
| 			break Outer
 | |
| 		}
 | |
| 	}
 | |
| 	if len(refSeries) == 0 {
 | |
| 		close(recordChan)
 | |
| 		wg.Wait()
 | |
| 	}
 | |
| 
 | |
| 	close(errChan)
 | |
| 	merr := tsdb_errors.NewMulti(errors.Wrap(loopErr, "decode loop"))
 | |
| 	for err := range errChan {
 | |
| 		merr.Add(errors.Wrap(err, "record processing"))
 | |
| 	}
 | |
| 	if err := merr.Err(); err != nil {
 | |
| 		return -1, -1, nil, err
 | |
| 	}
 | |
| 
 | |
| 	if r.Err() != nil {
 | |
| 		return -1, -1, nil, errors.Wrap(r.Err(), "read records")
 | |
| 	}
 | |
| 
 | |
| 	if len(refSeries) == 0 {
 | |
| 		// We had no exemplar record, so we have to build the map here.
 | |
| 		refSeries = make(map[chunks.HeadSeriesRef]*memSeries, numSeries)
 | |
| 		for _, shard := range shardedRefSeries {
 | |
| 			for k, v := range shard {
 | |
| 				refSeries[k] = v
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	elapsed := time.Since(start)
 | |
| 	level.Info(h.logger).Log("msg", "chunk snapshot loaded", "dir", dir, "num_series", numSeries, "duration", elapsed.String())
 | |
| 	if unknownRefs > 0 {
 | |
| 		level.Warn(h.logger).Log("msg", "unknown series references during chunk snapshot replay", "count", unknownRefs)
 | |
| 	}
 | |
| 
 | |
| 	return snapIdx, snapOffset, refSeries, nil
 | |
| }
 |