| 
									
										
										
										
											2018-05-17 21:02:47 +08:00
										 |  |  | // Copyright 2018 The Prometheus Authors
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // Licensed under the Apache License, Version 2.0 (the "License");
 | 
					
						
							|  |  |  | // you may not use this file except in compliance with the License.
 | 
					
						
							|  |  |  | // You may obtain a copy of the License at
 | 
					
						
							|  |  |  | //
 | 
					
						
							|  |  |  | // http://www.apache.org/licenses/LICENSE-2.0
 | 
					
						
							|  |  |  | //
 | 
					
						
							|  |  |  | // Unless required by applicable law or agreed to in writing, software
 | 
					
						
							|  |  |  | // distributed under the License is distributed on an "AS IS" BASIS,
 | 
					
						
							|  |  |  | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
					
						
							|  |  |  | // See the License for the specific language governing permissions and
 | 
					
						
							|  |  |  | // limitations under the License.
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-10-10 23:08:46 +08:00
										 |  |  | package wlog | 
					
						
							| 
									
										
										
										
											2018-05-17 21:02:47 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | import ( | 
					
						
							| 
									
										
										
										
											2023-11-09 04:45:14 +08:00
										 |  |  | 	"errors" | 
					
						
							| 
									
										
										
										
											2018-05-17 21:02:47 +08:00
										 |  |  | 	"fmt" | 
					
						
							|  |  |  | 	"io" | 
					
						
							| 
									
										
										
										
											2024-09-10 09:41:53 +08:00
										 |  |  | 	"log/slog" | 
					
						
							| 
									
										
										
										
											2018-11-30 22:46:16 +08:00
										 |  |  | 	"math" | 
					
						
							| 
									
										
										
										
											2018-05-17 21:02:47 +08:00
										 |  |  | 	"os" | 
					
						
							|  |  |  | 	"path/filepath" | 
					
						
							| 
									
										
										
										
											2024-01-16 00:24:46 +08:00
										 |  |  | 	"slices" | 
					
						
							| 
									
										
										
										
											2018-05-17 21:02:47 +08:00
										 |  |  | 	"strconv" | 
					
						
							|  |  |  | 	"strings" | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-04-17 05:12:01 +08:00
										 |  |  | 	"github.com/prometheus/prometheus/model/labels" | 
					
						
							| 
									
										
										
										
											2021-11-06 18:10:04 +08:00
										 |  |  | 	"github.com/prometheus/prometheus/tsdb/chunks" | 
					
						
							| 
									
										
										
										
											2019-08-13 16:34:14 +08:00
										 |  |  | 	tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" | 
					
						
							|  |  |  | 	"github.com/prometheus/prometheus/tsdb/fileutil" | 
					
						
							| 
									
										
										
										
											2019-09-19 17:15:41 +08:00
										 |  |  | 	"github.com/prometheus/prometheus/tsdb/record" | 
					
						
							|  |  |  | 	"github.com/prometheus/prometheus/tsdb/tombstones" | 
					
						
							| 
									
										
										
										
											2018-05-17 21:02:47 +08:00
										 |  |  | ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | // CheckpointStats returns stats about a created checkpoint.
 | 
					
						
							|  |  |  | type CheckpointStats struct { | 
					
						
							|  |  |  | 	DroppedSeries     int | 
					
						
							| 
									
										
										
										
											2022-08-29 20:08:36 +08:00
										 |  |  | 	DroppedSamples    int // Includes histograms.
 | 
					
						
							| 
									
										
										
										
											2018-05-17 21:02:47 +08:00
										 |  |  | 	DroppedTombstones int | 
					
						
							| 
									
										
										
										
											2021-05-07 04:53:52 +08:00
										 |  |  | 	DroppedExemplars  int | 
					
						
							| 
									
										
										
										
											2022-07-19 16:58:52 +08:00
										 |  |  | 	DroppedMetadata   int | 
					
						
							| 
									
										
										
										
											2018-06-18 19:52:57 +08:00
										 |  |  | 	TotalSeries       int // Processed series including dropped ones.
 | 
					
						
							| 
									
										
										
										
											2022-08-29 20:08:36 +08:00
										 |  |  | 	TotalSamples      int // Processed float and histogram samples including dropped ones.
 | 
					
						
							| 
									
										
										
										
											2018-08-03 05:46:45 +08:00
										 |  |  | 	TotalTombstones   int // Processed tombstones including dropped ones.
 | 
					
						
							| 
									
										
										
										
											2021-05-07 04:53:52 +08:00
										 |  |  | 	TotalExemplars    int // Processed exemplars including dropped ones.
 | 
					
						
							| 
									
										
										
										
											2022-07-19 16:58:52 +08:00
										 |  |  | 	TotalMetadata     int // Processed metadata including dropped ones.
 | 
					
						
							| 
									
										
										
										
											2018-05-17 21:02:47 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-10-11 23:23:52 +08:00
										 |  |  | // LastCheckpoint returns the directory name and index of the most recent checkpoint.
 | 
					
						
							| 
									
										
										
										
											2018-05-17 21:02:47 +08:00
										 |  |  | // If dir does not contain any checkpoints, ErrNotFound is returned.
 | 
					
						
							|  |  |  | func LastCheckpoint(dir string) (string, int, error) { | 
					
						
							| 
									
										
										
										
											2020-03-18 23:10:41 +08:00
										 |  |  | 	checkpoints, err := listCheckpoints(dir) | 
					
						
							| 
									
										
										
										
											2018-05-17 21:02:47 +08:00
										 |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return "", 0, err | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-03-18 23:10:41 +08:00
										 |  |  | 	if len(checkpoints) == 0 { | 
					
						
							|  |  |  | 		return "", 0, record.ErrNotFound | 
					
						
							| 
									
										
										
										
											2018-05-17 21:02:47 +08:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2020-03-18 23:10:41 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	checkpoint := checkpoints[len(checkpoints)-1] | 
					
						
							|  |  |  | 	return filepath.Join(dir, checkpoint.name), checkpoint.index, nil | 
					
						
							| 
									
										
										
										
											2018-05-17 21:02:47 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-10-11 23:23:52 +08:00
										 |  |  | // DeleteCheckpoints deletes all checkpoints in a directory below a given index.
 | 
					
						
							|  |  |  | func DeleteCheckpoints(dir string, maxIndex int) error { | 
					
						
							| 
									
										
										
										
											2020-03-18 23:10:41 +08:00
										 |  |  | 	checkpoints, err := listCheckpoints(dir) | 
					
						
							| 
									
										
										
										
											2018-05-17 21:02:47 +08:00
										 |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return err | 
					
						
							|  |  |  | 	} | 
					
						
							| 
									
										
										
										
											2020-03-18 23:10:41 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-10-28 23:24:58 +08:00
										 |  |  | 	errs := tsdb_errors.NewMulti() | 
					
						
							| 
									
										
										
										
											2020-03-18 23:10:41 +08:00
										 |  |  | 	for _, checkpoint := range checkpoints { | 
					
						
							|  |  |  | 		if checkpoint.index >= maxIndex { | 
					
						
							|  |  |  | 			break | 
					
						
							| 
									
										
										
										
											2018-05-17 21:02:47 +08:00
										 |  |  | 		} | 
					
						
							| 
									
										
										
										
											2020-10-28 23:24:58 +08:00
										 |  |  | 		errs.Add(os.RemoveAll(filepath.Join(dir, checkpoint.name))) | 
					
						
							| 
									
										
										
										
											2018-05-17 21:02:47 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 	return errs.Err() | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-12-30 00:54:45 +08:00
										 |  |  | // CheckpointPrefix is the prefix used for checkpoint files.
 | 
					
						
							|  |  |  | const CheckpointPrefix = "checkpoint." | 
					
						
							| 
									
										
										
										
											2018-05-17 21:02:47 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-11-06 17:39:16 +08:00
										 |  |  | // Checkpoint creates a compacted checkpoint of segments in range [from, to] in the given WAL.
 | 
					
						
							| 
									
										
										
										
											2018-05-17 21:02:47 +08:00
										 |  |  | // It includes the most recent checkpoint if it exists.
 | 
					
						
							| 
									
										
										
										
											2022-07-19 16:58:52 +08:00
										 |  |  | // All series not satisfying keep, samples/tombstones/exemplars below mint and
 | 
					
						
							|  |  |  | // metadata that are not the latest are dropped.
 | 
					
						
							| 
									
										
										
										
											2018-05-17 21:02:47 +08:00
										 |  |  | //
 | 
					
						
							|  |  |  | // The checkpoint is stored in a directory named checkpoint.N in the same
 | 
					
						
							|  |  |  | // segmented format as the original WAL itself.
 | 
					
						
							|  |  |  | // This makes it easy to read it through the WAL package and concatenate
 | 
					
						
							|  |  |  | // it with the original WAL.
 | 
					
						
							| 
									
										
										
										
											2025-08-09 01:06:37 +08:00
										 |  |  | func Checkpoint(logger *slog.Logger, w *WL, from, to int, keep func(id chunks.HeadSeriesRef) bool, mint int64) (*CheckpointStats, error) { | 
					
						
							| 
									
										
										
										
											2018-05-17 21:02:47 +08:00
										 |  |  | 	stats := &CheckpointStats{} | 
					
						
							| 
									
										
										
										
											2018-11-30 22:46:16 +08:00
										 |  |  | 	var sgmReader io.ReadCloser | 
					
						
							| 
									
										
										
										
											2018-05-17 21:02:47 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-09-10 09:41:53 +08:00
										 |  |  | 	logger.Info("Creating checkpoint", "from_segment", from, "to_segment", to, "mint", mint) | 
					
						
							| 
									
										
										
										
											2020-07-15 21:45:37 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-05-17 21:02:47 +08:00
										 |  |  | 	{ | 
					
						
							| 
									
										
										
										
											2019-09-19 17:15:41 +08:00
										 |  |  | 		var sgmRange []SegmentRange | 
					
						
							| 
									
										
										
										
											2018-10-11 23:23:52 +08:00
										 |  |  | 		dir, idx, err := LastCheckpoint(w.Dir()) | 
					
						
							| 
									
										
										
										
											2023-11-09 04:45:14 +08:00
										 |  |  | 		if err != nil && !errors.Is(err, record.ErrNotFound) { | 
					
						
							|  |  |  | 			return nil, fmt.Errorf("find last checkpoint: %w", err) | 
					
						
							| 
									
										
										
										
											2018-05-17 21:02:47 +08:00
										 |  |  | 		} | 
					
						
							| 
									
										
										
										
											2018-10-11 23:23:52 +08:00
										 |  |  | 		last := idx + 1 | 
					
						
							| 
									
										
										
										
											2018-05-17 21:02:47 +08:00
										 |  |  | 		if err == nil { | 
					
						
							| 
									
										
										
										
											2018-10-11 23:23:52 +08:00
										 |  |  | 			if from > last { | 
					
						
							|  |  |  | 				return nil, fmt.Errorf("unexpected gap to last checkpoint. expected:%v, requested:%v", last, from) | 
					
						
							| 
									
										
										
										
											2018-05-17 21:02:47 +08:00
										 |  |  | 			} | 
					
						
							|  |  |  | 			// Ignore WAL files below the checkpoint. They shouldn't exist to begin with.
 | 
					
						
							| 
									
										
										
										
											2018-10-11 23:23:52 +08:00
										 |  |  | 			from = last | 
					
						
							| 
									
										
										
										
											2018-05-17 21:02:47 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-09-19 17:15:41 +08:00
										 |  |  | 			sgmRange = append(sgmRange, SegmentRange{Dir: dir, Last: math.MaxInt32}) | 
					
						
							| 
									
										
										
										
											2018-05-17 21:02:47 +08:00
										 |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-09-19 17:15:41 +08:00
										 |  |  | 		sgmRange = append(sgmRange, SegmentRange{Dir: w.Dir(), First: from, Last: to}) | 
					
						
							|  |  |  | 		sgmReader, err = NewSegmentsRangeReader(sgmRange...) | 
					
						
							| 
									
										
										
										
											2018-05-17 21:02:47 +08:00
										 |  |  | 		if err != nil { | 
					
						
							| 
									
										
										
										
											2023-11-09 04:45:14 +08:00
										 |  |  | 			return nil, fmt.Errorf("create segment reader: %w", err) | 
					
						
							| 
									
										
										
										
											2018-05-17 21:02:47 +08:00
										 |  |  | 		} | 
					
						
							| 
									
										
										
										
											2018-11-30 22:46:16 +08:00
										 |  |  | 		defer sgmReader.Close() | 
					
						
							| 
									
										
										
										
											2018-05-17 21:02:47 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-03-18 23:10:41 +08:00
										 |  |  | 	cpdir := checkpointDir(w.Dir(), to) | 
					
						
							| 
									
										
										
										
											2018-05-17 21:02:47 +08:00
										 |  |  | 	cpdirtmp := cpdir + ".tmp" | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-01-14 17:35:24 +08:00
										 |  |  | 	if err := os.RemoveAll(cpdirtmp); err != nil { | 
					
						
							| 
									
										
										
										
											2023-11-09 04:45:14 +08:00
										 |  |  | 		return nil, fmt.Errorf("remove previous temporary checkpoint dir: %w", err) | 
					
						
							| 
									
										
										
										
											2020-01-14 17:35:24 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-10-22 16:06:44 +08:00
										 |  |  | 	if err := os.MkdirAll(cpdirtmp, 0o777); err != nil { | 
					
						
							| 
									
										
										
										
											2023-11-09 04:45:14 +08:00
										 |  |  | 		return nil, fmt.Errorf("create checkpoint dir: %w", err) | 
					
						
							| 
									
										
										
										
											2018-05-17 21:02:47 +08:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2023-07-11 20:57:57 +08:00
										 |  |  | 	cp, err := New(nil, nil, cpdirtmp, w.CompressionType()) | 
					
						
							| 
									
										
										
										
											2018-05-17 21:02:47 +08:00
										 |  |  | 	if err != nil { | 
					
						
							| 
									
										
										
										
											2023-11-09 04:45:14 +08:00
										 |  |  | 		return nil, fmt.Errorf("open checkpoint: %w", err) | 
					
						
							| 
									
										
										
										
											2018-05-17 21:02:47 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-01-07 16:43:33 +08:00
										 |  |  | 	// Ensures that an early return caused by an error doesn't leave any tmp files.
 | 
					
						
							|  |  |  | 	defer func() { | 
					
						
							|  |  |  | 		cp.Close() | 
					
						
							|  |  |  | 		os.RemoveAll(cpdirtmp) | 
					
						
							|  |  |  | 	}() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-09-19 17:15:41 +08:00
										 |  |  | 	r := NewReader(sgmReader) | 
					
						
							| 
									
										
										
										
											2018-05-17 21:02:47 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	var ( | 
					
						
							| 
									
										
										
										
											2024-03-26 22:16:45 +08:00
										 |  |  | 		series                []record.RefSeries | 
					
						
							|  |  |  | 		samples               []record.RefSample | 
					
						
							|  |  |  | 		histogramSamples      []record.RefHistogramSample | 
					
						
							|  |  |  | 		floatHistogramSamples []record.RefFloatHistogramSample | 
					
						
							|  |  |  | 		tstones               []tombstones.Stone | 
					
						
							|  |  |  | 		exemplars             []record.RefExemplar | 
					
						
							|  |  |  | 		metadata              []record.RefMetadata | 
					
						
							|  |  |  | 		st                    = labels.NewSymbolTable() // Needed for decoding; labels do not outlive this function.
 | 
					
						
							| 
									
										
										
										
											2025-09-18 20:00:12 +08:00
										 |  |  | 		dec                   = record.NewDecoder(st, logger) | 
					
						
							| 
									
										
										
										
											2024-03-26 22:16:45 +08:00
										 |  |  | 		enc                   record.Encoder | 
					
						
							|  |  |  | 		buf                   []byte | 
					
						
							|  |  |  | 		recs                  [][]byte | 
					
						
							| 
									
										
										
										
											2022-07-19 16:58:52 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 		latestMetadataMap = make(map[chunks.HeadSeriesRef]record.RefMetadata) | 
					
						
							| 
									
										
										
										
											2018-05-17 21:02:47 +08:00
										 |  |  | 	) | 
					
						
							|  |  |  | 	for r.Next() { | 
					
						
							| 
									
										
										
										
											2024-03-26 22:16:45 +08:00
										 |  |  | 		series, samples, histogramSamples, floatHistogramSamples, tstones, exemplars, metadata = series[:0], samples[:0], histogramSamples[:0], floatHistogramSamples[:0], tstones[:0], exemplars[:0], metadata[:0] | 
					
						
							| 
									
										
										
										
											2018-05-17 21:02:47 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 		// We don't reset the buffer since we batch up multiple records
 | 
					
						
							|  |  |  | 		// before writing them to the checkpoint.
 | 
					
						
							| 
									
										
										
										
											2018-06-18 19:52:57 +08:00
										 |  |  | 		// Remember where the record for this iteration starts.
 | 
					
						
							| 
									
										
										
										
											2018-05-17 21:02:47 +08:00
										 |  |  | 		start := len(buf) | 
					
						
							|  |  |  | 		rec := r.Record() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		switch dec.Type(rec) { | 
					
						
							| 
									
										
										
										
											2019-09-19 17:15:41 +08:00
										 |  |  | 		case record.Series: | 
					
						
							| 
									
										
										
										
											2018-05-17 21:02:47 +08:00
										 |  |  | 			series, err = dec.Series(rec, series) | 
					
						
							|  |  |  | 			if err != nil { | 
					
						
							| 
									
										
										
										
											2023-11-09 04:45:14 +08:00
										 |  |  | 				return nil, fmt.Errorf("decode series: %w", err) | 
					
						
							| 
									
										
										
										
											2018-05-17 21:02:47 +08:00
										 |  |  | 			} | 
					
						
							|  |  |  | 			// Drop irrelevant series in place.
 | 
					
						
							|  |  |  | 			repl := series[:0] | 
					
						
							|  |  |  | 			for _, s := range series { | 
					
						
							| 
									
										
										
										
											2025-08-09 01:06:37 +08:00
										 |  |  | 				if keep(s.Ref) { | 
					
						
							| 
									
										
										
										
											2018-05-17 21:02:47 +08:00
										 |  |  | 					repl = append(repl, s) | 
					
						
							|  |  |  | 				} | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 			if len(repl) > 0 { | 
					
						
							|  |  |  | 				buf = enc.Series(repl, buf) | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 			stats.TotalSeries += len(series) | 
					
						
							|  |  |  | 			stats.DroppedSeries += len(series) - len(repl) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-09-19 17:15:41 +08:00
										 |  |  | 		case record.Samples: | 
					
						
							| 
									
										
										
										
											2018-05-17 21:02:47 +08:00
										 |  |  | 			samples, err = dec.Samples(rec, samples) | 
					
						
							|  |  |  | 			if err != nil { | 
					
						
							| 
									
										
										
										
											2023-11-09 04:45:14 +08:00
										 |  |  | 				return nil, fmt.Errorf("decode samples: %w", err) | 
					
						
							| 
									
										
										
										
											2018-05-17 21:02:47 +08:00
										 |  |  | 			} | 
					
						
							|  |  |  | 			// Drop irrelevant samples in place.
 | 
					
						
							|  |  |  | 			repl := samples[:0] | 
					
						
							|  |  |  | 			for _, s := range samples { | 
					
						
							|  |  |  | 				if s.T >= mint { | 
					
						
							|  |  |  | 					repl = append(repl, s) | 
					
						
							|  |  |  | 				} | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 			if len(repl) > 0 { | 
					
						
							|  |  |  | 				buf = enc.Samples(repl, buf) | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 			stats.TotalSamples += len(samples) | 
					
						
							|  |  |  | 			stats.DroppedSamples += len(samples) - len(repl) | 
					
						
							| 
									
										
										
										
											2022-08-29 20:08:36 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-12-07 05:46:20 +08:00
										 |  |  | 		case record.HistogramSamples: | 
					
						
							| 
									
										
										
										
											2022-08-29 20:08:36 +08:00
										 |  |  | 			histogramSamples, err = dec.HistogramSamples(rec, histogramSamples) | 
					
						
							|  |  |  | 			if err != nil { | 
					
						
							| 
									
										
										
										
											2023-11-09 04:45:14 +08:00
										 |  |  | 				return nil, fmt.Errorf("decode histogram samples: %w", err) | 
					
						
							| 
									
										
										
										
											2022-08-29 20:08:36 +08:00
										 |  |  | 			} | 
					
						
							|  |  |  | 			// Drop irrelevant histogramSamples in place.
 | 
					
						
							|  |  |  | 			repl := histogramSamples[:0] | 
					
						
							|  |  |  | 			for _, h := range histogramSamples { | 
					
						
							|  |  |  | 				if h.T >= mint { | 
					
						
							|  |  |  | 					repl = append(repl, h) | 
					
						
							|  |  |  | 				} | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 			if len(repl) > 0 { | 
					
						
							| 
									
										
										
										
											2024-12-07 05:46:20 +08:00
										 |  |  | 				buf, _ = enc.HistogramSamples(repl, buf) | 
					
						
							| 
									
										
										
										
											2024-11-14 06:20:11 +08:00
										 |  |  | 			} | 
					
						
							|  |  |  | 			stats.TotalSamples += len(histogramSamples) | 
					
						
							|  |  |  | 			stats.DroppedSamples += len(histogramSamples) - len(repl) | 
					
						
							| 
									
										
										
										
											2024-12-07 05:46:20 +08:00
										 |  |  | 		case record.CustomBucketsHistogramSamples: | 
					
						
							|  |  |  | 			histogramSamples, err = dec.HistogramSamples(rec, histogramSamples) | 
					
						
							| 
									
										
										
										
											2024-12-10 23:25:20 +08:00
										 |  |  | 			if err != nil { | 
					
						
							|  |  |  | 				return nil, fmt.Errorf("decode histogram samples: %w", err) | 
					
						
							|  |  |  | 			} | 
					
						
							| 
									
										
										
										
											2024-12-07 05:46:20 +08:00
										 |  |  | 			// Drop irrelevant histogramSamples in place.
 | 
					
						
							|  |  |  | 			repl := histogramSamples[:0] | 
					
						
							|  |  |  | 			for _, h := range histogramSamples { | 
					
						
							|  |  |  | 				if h.T >= mint { | 
					
						
							|  |  |  | 					repl = append(repl, h) | 
					
						
							|  |  |  | 				} | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 			if len(repl) > 0 { | 
					
						
							|  |  |  | 				buf = enc.CustomBucketsHistogramSamples(repl, buf) | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 			stats.TotalSamples += len(histogramSamples) | 
					
						
							|  |  |  | 			stats.DroppedSamples += len(histogramSamples) - len(repl) | 
					
						
							|  |  |  | 		case record.FloatHistogramSamples: | 
					
						
							|  |  |  | 			floatHistogramSamples, err = dec.FloatHistogramSamples(rec, floatHistogramSamples) | 
					
						
							|  |  |  | 			if err != nil { | 
					
						
							|  |  |  | 				return nil, fmt.Errorf("decode float histogram samples: %w", err) | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 			// Drop irrelevant floatHistogramSamples in place.
 | 
					
						
							|  |  |  | 			repl := floatHistogramSamples[:0] | 
					
						
							|  |  |  | 			for _, fh := range floatHistogramSamples { | 
					
						
							|  |  |  | 				if fh.T >= mint { | 
					
						
							|  |  |  | 					repl = append(repl, fh) | 
					
						
							|  |  |  | 				} | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 			if len(repl) > 0 { | 
					
						
							|  |  |  | 				buf, _ = enc.FloatHistogramSamples(repl, buf) | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 			stats.TotalSamples += len(floatHistogramSamples) | 
					
						
							|  |  |  | 			stats.DroppedSamples += len(floatHistogramSamples) - len(repl) | 
					
						
							|  |  |  | 		case record.CustomBucketsFloatHistogramSamples: | 
					
						
							| 
									
										
										
										
											2024-11-14 06:20:11 +08:00
										 |  |  | 			floatHistogramSamples, err = dec.FloatHistogramSamples(rec, floatHistogramSamples) | 
					
						
							|  |  |  | 			if err != nil { | 
					
						
							|  |  |  | 				return nil, fmt.Errorf("decode float histogram samples: %w", err) | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 			// Drop irrelevant floatHistogramSamples in place.
 | 
					
						
							|  |  |  | 			repl := floatHistogramSamples[:0] | 
					
						
							|  |  |  | 			for _, fh := range floatHistogramSamples { | 
					
						
							|  |  |  | 				if fh.T >= mint { | 
					
						
							|  |  |  | 					repl = append(repl, fh) | 
					
						
							|  |  |  | 				} | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 			if len(repl) > 0 { | 
					
						
							| 
									
										
										
										
											2024-12-07 05:46:20 +08:00
										 |  |  | 				buf = enc.CustomBucketsFloatHistogramSamples(repl, buf) | 
					
						
							| 
									
										
										
										
											2024-03-26 22:16:45 +08:00
										 |  |  | 			} | 
					
						
							|  |  |  | 			stats.TotalSamples += len(floatHistogramSamples) | 
					
						
							|  |  |  | 			stats.DroppedSamples += len(floatHistogramSamples) - len(repl) | 
					
						
							| 
									
										
										
										
											2019-09-19 17:15:41 +08:00
										 |  |  | 		case record.Tombstones: | 
					
						
							| 
									
										
										
										
											2018-05-17 21:02:47 +08:00
										 |  |  | 			tstones, err = dec.Tombstones(rec, tstones) | 
					
						
							|  |  |  | 			if err != nil { | 
					
						
							| 
									
										
										
										
											2023-11-09 04:45:14 +08:00
										 |  |  | 				return nil, fmt.Errorf("decode deletes: %w", err) | 
					
						
							| 
									
										
										
										
											2018-05-17 21:02:47 +08:00
										 |  |  | 			} | 
					
						
							|  |  |  | 			// Drop irrelevant tombstones in place.
 | 
					
						
							|  |  |  | 			repl := tstones[:0] | 
					
						
							|  |  |  | 			for _, s := range tstones { | 
					
						
							| 
									
										
										
										
											2019-09-19 17:15:41 +08:00
										 |  |  | 				for _, iv := range s.Intervals { | 
					
						
							| 
									
										
										
										
											2018-05-17 21:02:47 +08:00
										 |  |  | 					if iv.Maxt >= mint { | 
					
						
							|  |  |  | 						repl = append(repl, s) | 
					
						
							|  |  |  | 						break | 
					
						
							|  |  |  | 					} | 
					
						
							|  |  |  | 				} | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 			if len(repl) > 0 { | 
					
						
							|  |  |  | 				buf = enc.Tombstones(repl, buf) | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 			stats.TotalTombstones += len(tstones) | 
					
						
							|  |  |  | 			stats.DroppedTombstones += len(tstones) - len(repl) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-05-07 04:53:52 +08:00
										 |  |  | 		case record.Exemplars: | 
					
						
							|  |  |  | 			exemplars, err = dec.Exemplars(rec, exemplars) | 
					
						
							|  |  |  | 			if err != nil { | 
					
						
							| 
									
										
										
										
											2023-11-09 04:45:14 +08:00
										 |  |  | 				return nil, fmt.Errorf("decode exemplars: %w", err) | 
					
						
							| 
									
										
										
										
											2021-05-07 04:53:52 +08:00
										 |  |  | 			} | 
					
						
							|  |  |  | 			// Drop irrelevant exemplars in place.
 | 
					
						
							|  |  |  | 			repl := exemplars[:0] | 
					
						
							|  |  |  | 			for _, e := range exemplars { | 
					
						
							|  |  |  | 				if e.T >= mint { | 
					
						
							|  |  |  | 					repl = append(repl, e) | 
					
						
							|  |  |  | 				} | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 			if len(repl) > 0 { | 
					
						
							|  |  |  | 				buf = enc.Exemplars(repl, buf) | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 			stats.TotalExemplars += len(exemplars) | 
					
						
							|  |  |  | 			stats.DroppedExemplars += len(exemplars) - len(repl) | 
					
						
							| 
									
										
										
										
											2022-07-19 16:58:52 +08:00
										 |  |  | 		case record.Metadata: | 
					
						
							|  |  |  | 			metadata, err := dec.Metadata(rec, metadata) | 
					
						
							|  |  |  | 			if err != nil { | 
					
						
							| 
									
										
										
										
											2023-11-09 04:45:14 +08:00
										 |  |  | 				return nil, fmt.Errorf("decode metadata: %w", err) | 
					
						
							| 
									
										
										
										
											2022-07-19 16:58:52 +08:00
										 |  |  | 			} | 
					
						
							|  |  |  | 			// Only keep reference to the latest found metadata for each refID.
 | 
					
						
							|  |  |  | 			repl := 0 | 
					
						
							|  |  |  | 			for _, m := range metadata { | 
					
						
							| 
									
										
										
										
											2025-08-09 01:06:37 +08:00
										 |  |  | 				if keep(m.Ref) { | 
					
						
							| 
									
										
										
										
											2022-07-19 16:58:52 +08:00
										 |  |  | 					if _, ok := latestMetadataMap[m.Ref]; !ok { | 
					
						
							|  |  |  | 						repl++ | 
					
						
							|  |  |  | 					} | 
					
						
							|  |  |  | 					latestMetadataMap[m.Ref] = m | 
					
						
							|  |  |  | 				} | 
					
						
							|  |  |  | 			} | 
					
						
							|  |  |  | 			stats.TotalMetadata += len(metadata) | 
					
						
							|  |  |  | 			stats.DroppedMetadata += len(metadata) - repl | 
					
						
							| 
									
										
										
										
											2018-05-17 21:02:47 +08:00
										 |  |  | 		default: | 
					
						
							| 
									
										
										
										
											2020-10-05 17:09:59 +08:00
										 |  |  | 			// Unknown record type, probably from a future Prometheus version.
 | 
					
						
							|  |  |  | 			continue | 
					
						
							| 
									
										
										
										
											2018-05-17 21:02:47 +08:00
										 |  |  | 		} | 
					
						
							|  |  |  | 		if len(buf[start:]) == 0 { | 
					
						
							|  |  |  | 			continue // All contents discarded.
 | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		recs = append(recs, buf[start:]) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		// Flush records in 1 MB increments.
 | 
					
						
							|  |  |  | 		if len(buf) > 1*1024*1024 { | 
					
						
							|  |  |  | 			if err := cp.Log(recs...); err != nil { | 
					
						
							| 
									
										
										
										
											2023-11-09 04:45:14 +08:00
										 |  |  | 				return nil, fmt.Errorf("flush records: %w", err) | 
					
						
							| 
									
										
										
										
											2018-05-17 21:02:47 +08:00
										 |  |  | 			} | 
					
						
							|  |  |  | 			buf, recs = buf[:0], recs[:0] | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 	// If we hit any corruption during checkpointing, repairing is not an option.
 | 
					
						
							|  |  |  | 	// The head won't know which series records are lost.
 | 
					
						
							|  |  |  | 	if r.Err() != nil { | 
					
						
							| 
									
										
										
										
											2023-11-09 04:45:14 +08:00
										 |  |  | 		return nil, fmt.Errorf("read segments: %w", r.Err()) | 
					
						
							| 
									
										
										
										
											2018-05-17 21:02:47 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	// Flush remaining records.
 | 
					
						
							|  |  |  | 	if err := cp.Log(recs...); err != nil { | 
					
						
							| 
									
										
										
										
											2023-11-09 04:45:14 +08:00
										 |  |  | 		return nil, fmt.Errorf("flush records: %w", err) | 
					
						
							| 
									
										
										
										
											2018-05-17 21:02:47 +08:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2022-07-19 16:58:52 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	// Flush latest metadata records for each series.
 | 
					
						
							|  |  |  | 	if len(latestMetadataMap) > 0 { | 
					
						
							|  |  |  | 		latestMetadata := make([]record.RefMetadata, 0, len(latestMetadataMap)) | 
					
						
							|  |  |  | 		for _, m := range latestMetadataMap { | 
					
						
							|  |  |  | 			latestMetadata = append(latestMetadata, m) | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		if err := cp.Log(enc.Metadata(latestMetadata, buf[:0])); err != nil { | 
					
						
							| 
									
										
										
										
											2023-11-09 04:45:14 +08:00
										 |  |  | 			return nil, fmt.Errorf("flush metadata records: %w", err) | 
					
						
							| 
									
										
										
										
											2022-07-19 16:58:52 +08:00
										 |  |  | 		} | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-05-17 21:02:47 +08:00
										 |  |  | 	if err := cp.Close(); err != nil { | 
					
						
							| 
									
										
										
										
											2023-11-09 04:45:14 +08:00
										 |  |  | 		return nil, fmt.Errorf("close checkpoint: %w", err) | 
					
						
							| 
									
										
										
										
											2018-05-17 21:02:47 +08:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2020-09-07 22:34:49 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | 	// Sync temporary directory before rename.
 | 
					
						
							|  |  |  | 	df, err := fileutil.OpenDir(cpdirtmp) | 
					
						
							|  |  |  | 	if err != nil { | 
					
						
							| 
									
										
										
										
											2023-11-09 04:45:14 +08:00
										 |  |  | 		return nil, fmt.Errorf("open temporary checkpoint directory: %w", err) | 
					
						
							| 
									
										
										
										
											2020-09-07 22:34:49 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 	if err := df.Sync(); err != nil { | 
					
						
							|  |  |  | 		df.Close() | 
					
						
							| 
									
										
										
										
											2023-11-09 04:45:14 +08:00
										 |  |  | 		return nil, fmt.Errorf("sync temporary checkpoint directory: %w", err) | 
					
						
							| 
									
										
										
										
											2020-09-07 22:34:49 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 	if err = df.Close(); err != nil { | 
					
						
							| 
									
										
										
										
											2023-11-09 04:45:14 +08:00
										 |  |  | 		return nil, fmt.Errorf("close temporary checkpoint directory: %w", err) | 
					
						
							| 
									
										
										
										
											2020-09-07 22:34:49 +08:00
										 |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-08-03 05:46:45 +08:00
										 |  |  | 	if err := fileutil.Replace(cpdirtmp, cpdir); err != nil { | 
					
						
							| 
									
										
										
										
											2023-11-09 04:45:14 +08:00
										 |  |  | 		return nil, fmt.Errorf("rename checkpoint directory: %w", err) | 
					
						
							| 
									
										
										
										
											2018-05-17 21:02:47 +08:00
										 |  |  | 	} | 
					
						
							| 
									
										
										
										
											2018-11-30 22:46:16 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-05-17 21:02:47 +08:00
										 |  |  | 	return stats, nil | 
					
						
							|  |  |  | } | 
					
						
							| 
									
										
										
										
											2020-03-18 23:10:41 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | func checkpointDir(dir string, i int) string { | 
					
						
							| 
									
										
										
										
											2024-12-30 00:54:45 +08:00
										 |  |  | 	return filepath.Join(dir, fmt.Sprintf(CheckpointPrefix+"%08d", i)) | 
					
						
							| 
									
										
										
										
											2020-03-18 23:10:41 +08:00
										 |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | type checkpointRef struct { | 
					
						
							|  |  |  | 	name  string | 
					
						
							|  |  |  | 	index int | 
					
						
							|  |  |  | } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | func listCheckpoints(dir string) (refs []checkpointRef, err error) { | 
					
						
							| 
									
										
										
										
											2022-04-27 17:24:36 +08:00
										 |  |  | 	files, err := os.ReadDir(dir) | 
					
						
							| 
									
										
										
										
											2020-03-18 23:10:41 +08:00
										 |  |  | 	if err != nil { | 
					
						
							|  |  |  | 		return nil, err | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-08-27 20:38:54 +08:00
										 |  |  | 	for i := range files { | 
					
						
							| 
									
										
										
										
											2020-03-18 23:10:41 +08:00
										 |  |  | 		fi := files[i] | 
					
						
							| 
									
										
										
										
											2024-12-30 00:54:45 +08:00
										 |  |  | 		if !strings.HasPrefix(fi.Name(), CheckpointPrefix) { | 
					
						
							| 
									
										
										
										
											2020-03-18 23:10:41 +08:00
										 |  |  | 			continue | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 		if !fi.IsDir() { | 
					
						
							| 
									
										
										
										
											2023-11-09 04:45:14 +08:00
										 |  |  | 			return nil, fmt.Errorf("checkpoint %s is not a directory", fi.Name()) | 
					
						
							| 
									
										
										
										
											2020-03-18 23:10:41 +08:00
										 |  |  | 		} | 
					
						
							| 
									
										
										
										
											2024-12-30 00:54:45 +08:00
										 |  |  | 		idx, err := strconv.Atoi(fi.Name()[len(CheckpointPrefix):]) | 
					
						
							| 
									
										
										
										
											2020-03-18 23:10:41 +08:00
										 |  |  | 		if err != nil { | 
					
						
							|  |  |  | 			continue | 
					
						
							|  |  |  | 		} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 		refs = append(refs, checkpointRef{name: fi.Name(), index: idx}) | 
					
						
							|  |  |  | 	} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-09-22 04:53:51 +08:00
										 |  |  | 	slices.SortFunc(refs, func(a, b checkpointRef) int { | 
					
						
							|  |  |  | 		return a.index - b.index | 
					
						
							| 
									
										
										
										
											2020-03-18 23:10:41 +08:00
										 |  |  | 	}) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 	return refs, nil | 
					
						
							|  |  |  | } |