897 lines
		
	
	
		
			22 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			897 lines
		
	
	
		
			22 KiB
		
	
	
	
		
			Go
		
	
	
	
| // Copyright 2017 The Prometheus Authors
 | |
| // Licensed under the Apache License, Version 2.0 (the "License");
 | |
| // you may not use this file except in compliance with the License.
 | |
| // You may obtain a copy of the License at
 | |
| //
 | |
| // http://www.apache.org/licenses/LICENSE-2.0
 | |
| //
 | |
| // Unless required by applicable law or agreed to in writing, software
 | |
| // distributed under the License is distributed on an "AS IS" BASIS,
 | |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| // See the License for the specific language governing permissions and
 | |
| // limitations under the License.
 | |
| 
 | |
| package main
 | |
| 
 | |
| import (
 | |
| 	"bufio"
 | |
| 	"bytes"
 | |
| 	"context"
 | |
| 	"errors"
 | |
| 	"fmt"
 | |
| 	"io"
 | |
| 	"log/slog"
 | |
| 	"os"
 | |
| 	"path/filepath"
 | |
| 	"runtime"
 | |
| 	"runtime/pprof"
 | |
| 	"slices"
 | |
| 	"strconv"
 | |
| 	"strings"
 | |
| 	"sync"
 | |
| 	"text/tabwriter"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/alecthomas/units"
 | |
| 	"github.com/prometheus/common/promslog"
 | |
| 	"go.uber.org/atomic"
 | |
| 
 | |
| 	"github.com/prometheus/prometheus/model/labels"
 | |
| 	"github.com/prometheus/prometheus/promql/parser"
 | |
| 	"github.com/prometheus/prometheus/storage"
 | |
| 	"github.com/prometheus/prometheus/tsdb"
 | |
| 	"github.com/prometheus/prometheus/tsdb/chunkenc"
 | |
| 	"github.com/prometheus/prometheus/tsdb/chunks"
 | |
| 	tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
 | |
| 	"github.com/prometheus/prometheus/tsdb/fileutil"
 | |
| 	"github.com/prometheus/prometheus/tsdb/index"
 | |
| )
 | |
| 
 | |
| const timeDelta = 30000
 | |
| 
 | |
| type writeBenchmark struct {
 | |
| 	outPath     string
 | |
| 	samplesFile string
 | |
| 	cleanup     bool
 | |
| 	numMetrics  int
 | |
| 
 | |
| 	storage *tsdb.DB
 | |
| 
 | |
| 	cpuprof   *os.File
 | |
| 	memprof   *os.File
 | |
| 	blockprof *os.File
 | |
| 	mtxprof   *os.File
 | |
| 	logger    *slog.Logger
 | |
| }
 | |
| 
 | |
| func benchmarkWrite(outPath, samplesFile string, numMetrics, numScrapes int) error {
 | |
| 	b := &writeBenchmark{
 | |
| 		outPath:     outPath,
 | |
| 		samplesFile: samplesFile,
 | |
| 		numMetrics:  numMetrics,
 | |
| 		logger:      promslog.New(&promslog.Config{}),
 | |
| 	}
 | |
| 	if b.outPath == "" {
 | |
| 		dir, err := os.MkdirTemp("", "tsdb_bench")
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		b.outPath = dir
 | |
| 		b.cleanup = true
 | |
| 	}
 | |
| 	if err := os.RemoveAll(b.outPath); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	if err := os.MkdirAll(b.outPath, 0o777); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	dir := filepath.Join(b.outPath, "storage")
 | |
| 
 | |
| 	st, err := tsdb.Open(dir, b.logger, nil, &tsdb.Options{
 | |
| 		RetentionDuration: int64(15 * 24 * time.Hour / time.Millisecond),
 | |
| 		MinBlockDuration:  int64(2 * time.Hour / time.Millisecond),
 | |
| 	}, tsdb.NewDBStats())
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	st.DisableCompactions()
 | |
| 	b.storage = st
 | |
| 
 | |
| 	var lbs []labels.Labels
 | |
| 
 | |
| 	if _, err = measureTime("readData", func() error {
 | |
| 		f, err := os.Open(b.samplesFile)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		defer f.Close()
 | |
| 
 | |
| 		lbs, err = readPrometheusLabels(f, b.numMetrics)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		return nil
 | |
| 	}); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	var total uint64
 | |
| 
 | |
| 	dur, err := measureTime("ingestScrapes", func() error {
 | |
| 		if err := b.startProfiling(); err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		total, err = b.ingestScrapes(lbs, numScrapes)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		return nil
 | |
| 	})
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	fmt.Println(" > total samples:", total)
 | |
| 	fmt.Println(" > samples/sec:", float64(total)/dur.Seconds())
 | |
| 
 | |
| 	if _, err = measureTime("stopStorage", func() error {
 | |
| 		if err := b.storage.Close(); err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 
 | |
| 		return b.stopProfiling()
 | |
| 	}); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (b *writeBenchmark) ingestScrapes(lbls []labels.Labels, scrapeCount int) (uint64, error) {
 | |
| 	var total atomic.Uint64
 | |
| 
 | |
| 	for i := 0; i < scrapeCount; i += 100 {
 | |
| 		var wg sync.WaitGroup
 | |
| 		lbls := lbls
 | |
| 		for len(lbls) > 0 {
 | |
| 			l := 1000
 | |
| 			if len(lbls) < 1000 {
 | |
| 				l = len(lbls)
 | |
| 			}
 | |
| 			batch := lbls[:l]
 | |
| 			lbls = lbls[l:]
 | |
| 
 | |
| 			wg.Add(1)
 | |
| 			go func() {
 | |
| 				defer wg.Done()
 | |
| 
 | |
| 				n, err := b.ingestScrapesShard(batch, 100, int64(timeDelta*i))
 | |
| 				if err != nil {
 | |
| 					// exitWithError(err)
 | |
| 					fmt.Println(" err", err)
 | |
| 				}
 | |
| 				total.Add(n)
 | |
| 			}()
 | |
| 		}
 | |
| 		wg.Wait()
 | |
| 	}
 | |
| 	fmt.Println("ingestion completed")
 | |
| 
 | |
| 	return total.Load(), nil
 | |
| }
 | |
| 
 | |
| func (b *writeBenchmark) ingestScrapesShard(lbls []labels.Labels, scrapeCount int, baset int64) (uint64, error) {
 | |
| 	ts := baset
 | |
| 
 | |
| 	type sample struct {
 | |
| 		labels labels.Labels
 | |
| 		value  int64
 | |
| 		ref    *storage.SeriesRef
 | |
| 	}
 | |
| 
 | |
| 	scrape := make([]*sample, 0, len(lbls))
 | |
| 
 | |
| 	for _, m := range lbls {
 | |
| 		scrape = append(scrape, &sample{
 | |
| 			labels: m,
 | |
| 			value:  123456789,
 | |
| 		})
 | |
| 	}
 | |
| 	total := uint64(0)
 | |
| 
 | |
| 	for i := 0; i < scrapeCount; i++ {
 | |
| 		app := b.storage.Appender(context.TODO())
 | |
| 		ts += timeDelta
 | |
| 
 | |
| 		for _, s := range scrape {
 | |
| 			s.value += 1000
 | |
| 
 | |
| 			var ref storage.SeriesRef
 | |
| 			if s.ref != nil {
 | |
| 				ref = *s.ref
 | |
| 			}
 | |
| 
 | |
| 			ref, err := app.Append(ref, s.labels, ts, float64(s.value))
 | |
| 			if err != nil {
 | |
| 				panic(err)
 | |
| 			}
 | |
| 
 | |
| 			if s.ref == nil {
 | |
| 				s.ref = &ref
 | |
| 			}
 | |
| 			total++
 | |
| 		}
 | |
| 		if err := app.Commit(); err != nil {
 | |
| 			return total, err
 | |
| 		}
 | |
| 	}
 | |
| 	return total, nil
 | |
| }
 | |
| 
 | |
| func (b *writeBenchmark) startProfiling() error {
 | |
| 	var err error
 | |
| 
 | |
| 	// Start CPU profiling.
 | |
| 	b.cpuprof, err = os.Create(filepath.Join(b.outPath, "cpu.prof"))
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("bench: could not create cpu profile: %w", err)
 | |
| 	}
 | |
| 	if err := pprof.StartCPUProfile(b.cpuprof); err != nil {
 | |
| 		return fmt.Errorf("bench: could not start CPU profile: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	// Start memory profiling.
 | |
| 	b.memprof, err = os.Create(filepath.Join(b.outPath, "mem.prof"))
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("bench: could not create memory profile: %w", err)
 | |
| 	}
 | |
| 	runtime.MemProfileRate = 64 * 1024
 | |
| 
 | |
| 	// Start fatal profiling.
 | |
| 	b.blockprof, err = os.Create(filepath.Join(b.outPath, "block.prof"))
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("bench: could not create block profile: %w", err)
 | |
| 	}
 | |
| 	runtime.SetBlockProfileRate(20)
 | |
| 
 | |
| 	b.mtxprof, err = os.Create(filepath.Join(b.outPath, "mutex.prof"))
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("bench: could not create mutex profile: %w", err)
 | |
| 	}
 | |
| 	runtime.SetMutexProfileFraction(20)
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (b *writeBenchmark) stopProfiling() error {
 | |
| 	if b.cpuprof != nil {
 | |
| 		pprof.StopCPUProfile()
 | |
| 		b.cpuprof.Close()
 | |
| 		b.cpuprof = nil
 | |
| 	}
 | |
| 	if b.memprof != nil {
 | |
| 		if err := pprof.Lookup("heap").WriteTo(b.memprof, 0); err != nil {
 | |
| 			return fmt.Errorf("error writing mem profile: %w", err)
 | |
| 		}
 | |
| 		b.memprof.Close()
 | |
| 		b.memprof = nil
 | |
| 	}
 | |
| 	if b.blockprof != nil {
 | |
| 		if err := pprof.Lookup("block").WriteTo(b.blockprof, 0); err != nil {
 | |
| 			return fmt.Errorf("error writing block profile: %w", err)
 | |
| 		}
 | |
| 		b.blockprof.Close()
 | |
| 		b.blockprof = nil
 | |
| 		runtime.SetBlockProfileRate(0)
 | |
| 	}
 | |
| 	if b.mtxprof != nil {
 | |
| 		if err := pprof.Lookup("mutex").WriteTo(b.mtxprof, 0); err != nil {
 | |
| 			return fmt.Errorf("error writing mutex profile: %w", err)
 | |
| 		}
 | |
| 		b.mtxprof.Close()
 | |
| 		b.mtxprof = nil
 | |
| 		runtime.SetMutexProfileFraction(0)
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func measureTime(stage string, f func() error) (time.Duration, error) {
 | |
| 	fmt.Printf(">> start stage=%s\n", stage)
 | |
| 	start := time.Now()
 | |
| 	if err := f(); err != nil {
 | |
| 		return 0, err
 | |
| 	}
 | |
| 
 | |
| 	fmt.Printf(">> completed stage=%s duration=%s\n", stage, time.Since(start))
 | |
| 	return time.Since(start), nil
 | |
| }
 | |
| 
 | |
| func readPrometheusLabels(r io.Reader, n int) ([]labels.Labels, error) {
 | |
| 	scanner := bufio.NewScanner(r)
 | |
| 
 | |
| 	var mets []labels.Labels
 | |
| 	hashes := map[uint64]struct{}{}
 | |
| 	i := 0
 | |
| 
 | |
| 	for scanner.Scan() && i < n {
 | |
| 		r := strings.NewReplacer("\"", "", "{", "", "}", "")
 | |
| 		s := r.Replace(scanner.Text())
 | |
| 
 | |
| 		labelChunks := strings.Split(s, ",")
 | |
| 		m := make([]labels.Label, 0, len(labelChunks))
 | |
| 		for _, labelChunk := range labelChunks {
 | |
| 			split := strings.Split(labelChunk, ":")
 | |
| 			m = append(m, labels.Label{Name: split[0], Value: split[1]})
 | |
| 		}
 | |
| 		ml := labels.New(m...) // This sorts by name - order of the k/v labels matters, don't assume we'll always receive them already sorted.
 | |
| 		h := ml.Hash()
 | |
| 		if _, ok := hashes[h]; ok {
 | |
| 			continue
 | |
| 		}
 | |
| 		mets = append(mets, ml)
 | |
| 		hashes[h] = struct{}{}
 | |
| 		i++
 | |
| 	}
 | |
| 	return mets, nil
 | |
| }
 | |
| 
 | |
| func listBlocks(path string, humanReadable bool) error {
 | |
| 	db, err := tsdb.OpenDBReadOnly(path, "", nil)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	defer func() {
 | |
| 		err = tsdb_errors.NewMulti(err, db.Close()).Err()
 | |
| 	}()
 | |
| 	blocks, err := db.Blocks()
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	printBlocks(blocks, true, humanReadable)
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func printBlocks(blocks []tsdb.BlockReader, writeHeader, humanReadable bool) {
 | |
| 	tw := tabwriter.NewWriter(os.Stdout, 13, 0, 2, ' ', 0)
 | |
| 	defer tw.Flush()
 | |
| 
 | |
| 	if writeHeader {
 | |
| 		fmt.Fprintln(tw, "BLOCK ULID\tMIN TIME\tMAX TIME\tDURATION\tNUM SAMPLES\tNUM CHUNKS\tNUM SERIES\tSIZE")
 | |
| 	}
 | |
| 
 | |
| 	for _, b := range blocks {
 | |
| 		meta := b.Meta()
 | |
| 
 | |
| 		fmt.Fprintf(tw,
 | |
| 			"%v\t%v\t%v\t%v\t%v\t%v\t%v\t%v\n",
 | |
| 			meta.ULID,
 | |
| 			getFormattedTime(meta.MinTime, humanReadable),
 | |
| 			getFormattedTime(meta.MaxTime, humanReadable),
 | |
| 			time.Duration(meta.MaxTime-meta.MinTime)*time.Millisecond,
 | |
| 			meta.Stats.NumSamples,
 | |
| 			meta.Stats.NumChunks,
 | |
| 			meta.Stats.NumSeries,
 | |
| 			getFormattedBytes(b.Size(), humanReadable),
 | |
| 		)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func getFormattedTime(timestamp int64, humanReadable bool) string {
 | |
| 	if humanReadable {
 | |
| 		return time.Unix(timestamp/1000, 0).UTC().String()
 | |
| 	}
 | |
| 	return strconv.FormatInt(timestamp, 10)
 | |
| }
 | |
| 
 | |
| func getFormattedBytes(bytes int64, humanReadable bool) string {
 | |
| 	if humanReadable {
 | |
| 		return units.Base2Bytes(bytes).String()
 | |
| 	}
 | |
| 	return strconv.FormatInt(bytes, 10)
 | |
| }
 | |
| 
 | |
| func openBlock(path, blockID string) (*tsdb.DBReadOnly, tsdb.BlockReader, error) {
 | |
| 	db, err := tsdb.OpenDBReadOnly(path, "", nil)
 | |
| 	if err != nil {
 | |
| 		return nil, nil, err
 | |
| 	}
 | |
| 
 | |
| 	if blockID == "" {
 | |
| 		blockID, err = db.LastBlockID()
 | |
| 		if err != nil {
 | |
| 			return nil, nil, err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	b, err := db.Block(blockID, tsdb.DefaultPostingsDecoderFactory)
 | |
| 	if err != nil {
 | |
| 		return nil, nil, err
 | |
| 	}
 | |
| 
 | |
| 	return db, b, nil
 | |
| }
 | |
| 
 | |
| func analyzeBlock(ctx context.Context, path, blockID string, limit int, runExtended bool, matchers string) error {
 | |
| 	var (
 | |
| 		selectors []*labels.Matcher
 | |
| 		err       error
 | |
| 	)
 | |
| 	if len(matchers) > 0 {
 | |
| 		selectors, err = parser.ParseMetricSelector(matchers)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 	}
 | |
| 	db, block, err := openBlock(path, blockID)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	defer func() {
 | |
| 		err = tsdb_errors.NewMulti(err, db.Close()).Err()
 | |
| 	}()
 | |
| 
 | |
| 	meta := block.Meta()
 | |
| 	fmt.Printf("Block ID: %s\n", meta.ULID)
 | |
| 	// Presume 1ms resolution that Prometheus uses.
 | |
| 	fmt.Printf("Duration: %s\n", (time.Duration(meta.MaxTime-meta.MinTime) * 1e6).String())
 | |
| 	fmt.Printf("Total Series: %d\n", meta.Stats.NumSeries)
 | |
| 	if len(matchers) > 0 {
 | |
| 		fmt.Printf("Matcher: %s\n", matchers)
 | |
| 	}
 | |
| 	ir, err := block.Index()
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	defer ir.Close()
 | |
| 
 | |
| 	allLabelNames, err := ir.LabelNames(ctx, selectors...)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	fmt.Printf("Label names: %d\n", len(allLabelNames))
 | |
| 
 | |
| 	type postingInfo struct {
 | |
| 		key    string
 | |
| 		metric uint64
 | |
| 	}
 | |
| 	postingInfos := []postingInfo{}
 | |
| 
 | |
| 	printInfo := func(postingInfos []postingInfo) {
 | |
| 		slices.SortFunc(postingInfos, func(a, b postingInfo) int {
 | |
| 			switch {
 | |
| 			case b.metric < a.metric:
 | |
| 				return -1
 | |
| 			case b.metric > a.metric:
 | |
| 				return 1
 | |
| 			default:
 | |
| 				return 0
 | |
| 			}
 | |
| 		})
 | |
| 
 | |
| 		for i, pc := range postingInfos {
 | |
| 			if i >= limit {
 | |
| 				break
 | |
| 			}
 | |
| 			fmt.Printf("%d %s\n", pc.metric, pc.key)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	labelsUncovered := map[string]uint64{}
 | |
| 	labelpairsUncovered := map[string]uint64{}
 | |
| 	labelpairsCount := map[string]uint64{}
 | |
| 	entries := 0
 | |
| 	var (
 | |
| 		p    index.Postings
 | |
| 		refs []storage.SeriesRef
 | |
| 	)
 | |
| 	if len(matchers) > 0 {
 | |
| 		p, err = tsdb.PostingsForMatchers(ctx, ir, selectors...)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		// Expand refs first and cache in memory.
 | |
| 		// So later we don't have to expand again.
 | |
| 		refs, err = index.ExpandPostings(p)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		fmt.Printf("Matched series: %d\n", len(refs))
 | |
| 		p = index.NewListPostings(refs)
 | |
| 	} else {
 | |
| 		p, err = ir.Postings(ctx, "", "") // The special all key.
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	chks := []chunks.Meta{}
 | |
| 	builder := labels.ScratchBuilder{}
 | |
| 	for p.Next() {
 | |
| 		if err = ir.Series(p.At(), &builder, &chks); err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		// Amount of the block time range not covered by this series.
 | |
| 		uncovered := uint64(meta.MaxTime-meta.MinTime) - uint64(chks[len(chks)-1].MaxTime-chks[0].MinTime)
 | |
| 		builder.Labels().Range(func(lbl labels.Label) {
 | |
| 			key := lbl.Name + "=" + lbl.Value
 | |
| 			labelsUncovered[lbl.Name] += uncovered
 | |
| 			labelpairsUncovered[key] += uncovered
 | |
| 			labelpairsCount[key]++
 | |
| 			entries++
 | |
| 		})
 | |
| 	}
 | |
| 	if p.Err() != nil {
 | |
| 		return p.Err()
 | |
| 	}
 | |
| 	fmt.Printf("Postings (unique label pairs): %d\n", len(labelpairsUncovered))
 | |
| 	fmt.Printf("Postings entries (total label pairs): %d\n", entries)
 | |
| 
 | |
| 	postingInfos = postingInfos[:0]
 | |
| 	for k, m := range labelpairsUncovered {
 | |
| 		postingInfos = append(postingInfos, postingInfo{k, uint64(float64(m) / float64(meta.MaxTime-meta.MinTime))})
 | |
| 	}
 | |
| 
 | |
| 	fmt.Printf("\nLabel pairs most involved in churning:\n")
 | |
| 	printInfo(postingInfos)
 | |
| 
 | |
| 	postingInfos = postingInfos[:0]
 | |
| 	for k, m := range labelsUncovered {
 | |
| 		postingInfos = append(postingInfos, postingInfo{k, uint64(float64(m) / float64(meta.MaxTime-meta.MinTime))})
 | |
| 	}
 | |
| 
 | |
| 	fmt.Printf("\nLabel names most involved in churning:\n")
 | |
| 	printInfo(postingInfos)
 | |
| 
 | |
| 	postingInfos = postingInfos[:0]
 | |
| 	for k, m := range labelpairsCount {
 | |
| 		postingInfos = append(postingInfos, postingInfo{k, m})
 | |
| 	}
 | |
| 
 | |
| 	fmt.Printf("\nMost common label pairs:\n")
 | |
| 	printInfo(postingInfos)
 | |
| 
 | |
| 	postingInfos = postingInfos[:0]
 | |
| 	for _, n := range allLabelNames {
 | |
| 		values, err := ir.SortedLabelValues(ctx, n, selectors...)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		var cumulativeLength uint64
 | |
| 		for _, str := range values {
 | |
| 			cumulativeLength += uint64(len(str))
 | |
| 		}
 | |
| 		postingInfos = append(postingInfos, postingInfo{n, cumulativeLength})
 | |
| 	}
 | |
| 
 | |
| 	fmt.Printf("\nLabel names with highest cumulative label value length:\n")
 | |
| 	printInfo(postingInfos)
 | |
| 
 | |
| 	postingInfos = postingInfos[:0]
 | |
| 	for _, n := range allLabelNames {
 | |
| 		lv, err := ir.SortedLabelValues(ctx, n, selectors...)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		postingInfos = append(postingInfos, postingInfo{n, uint64(len(lv))})
 | |
| 	}
 | |
| 	fmt.Printf("\nHighest cardinality labels:\n")
 | |
| 	printInfo(postingInfos)
 | |
| 
 | |
| 	postingInfos = postingInfos[:0]
 | |
| 	lv, err := ir.SortedLabelValues(ctx, "__name__", selectors...)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	for _, n := range lv {
 | |
| 		postings, err := ir.Postings(ctx, "__name__", n)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		// Only intersect postings if matchers are specified.
 | |
| 		if len(matchers) > 0 {
 | |
| 			postings = index.Intersect(postings, index.NewListPostings(refs))
 | |
| 		}
 | |
| 		count := 0
 | |
| 		for postings.Next() {
 | |
| 			count++
 | |
| 		}
 | |
| 		if postings.Err() != nil {
 | |
| 			return postings.Err()
 | |
| 		}
 | |
| 		postingInfos = append(postingInfos, postingInfo{n, uint64(count)})
 | |
| 	}
 | |
| 	fmt.Printf("\nHighest cardinality metric names:\n")
 | |
| 	printInfo(postingInfos)
 | |
| 
 | |
| 	if runExtended {
 | |
| 		return analyzeCompaction(ctx, block, ir, selectors)
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func analyzeCompaction(ctx context.Context, block tsdb.BlockReader, indexr tsdb.IndexReader, matchers []*labels.Matcher) (err error) {
 | |
| 	var postingsr index.Postings
 | |
| 	if len(matchers) > 0 {
 | |
| 		postingsr, err = tsdb.PostingsForMatchers(ctx, indexr, matchers...)
 | |
| 	} else {
 | |
| 		n, v := index.AllPostingsKey()
 | |
| 		postingsr, err = indexr.Postings(ctx, n, v)
 | |
| 	}
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	chunkr, err := block.Chunks()
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	defer func() {
 | |
| 		err = tsdb_errors.NewMulti(err, chunkr.Close()).Err()
 | |
| 	}()
 | |
| 
 | |
| 	totalChunks := 0
 | |
| 	floatChunkSamplesCount := make([]int, 0)
 | |
| 	floatChunkSize := make([]int, 0)
 | |
| 	histogramChunkSamplesCount := make([]int, 0)
 | |
| 	histogramChunkSize := make([]int, 0)
 | |
| 	histogramChunkBucketsCount := make([]int, 0)
 | |
| 	var builder labels.ScratchBuilder
 | |
| 	for postingsr.Next() {
 | |
| 		var chks []chunks.Meta
 | |
| 		if err := indexr.Series(postingsr.At(), &builder, &chks); err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 
 | |
| 		for _, chk := range chks {
 | |
| 			// Load the actual data of the chunk.
 | |
| 			chk, iterable, err := chunkr.ChunkOrIterable(chk)
 | |
| 			if err != nil {
 | |
| 				return err
 | |
| 			}
 | |
| 			// Chunks within blocks should not need to be re-written, so an
 | |
| 			// iterable is not expected to be returned from the chunk reader.
 | |
| 			if iterable != nil {
 | |
| 				return errors.New("ChunkOrIterable should not return an iterable when reading a block")
 | |
| 			}
 | |
| 			switch chk.Encoding() {
 | |
| 			case chunkenc.EncXOR:
 | |
| 				floatChunkSamplesCount = append(floatChunkSamplesCount, chk.NumSamples())
 | |
| 				floatChunkSize = append(floatChunkSize, len(chk.Bytes()))
 | |
| 			case chunkenc.EncFloatHistogram:
 | |
| 				histogramChunkSamplesCount = append(histogramChunkSamplesCount, chk.NumSamples())
 | |
| 				histogramChunkSize = append(histogramChunkSize, len(chk.Bytes()))
 | |
| 				fhchk, ok := chk.(*chunkenc.FloatHistogramChunk)
 | |
| 				if !ok {
 | |
| 					return errors.New("chunk is not FloatHistogramChunk")
 | |
| 				}
 | |
| 				it := fhchk.Iterator(nil)
 | |
| 				bucketCount := 0
 | |
| 				for it.Next() == chunkenc.ValFloatHistogram {
 | |
| 					_, f := it.AtFloatHistogram(nil)
 | |
| 					bucketCount += len(f.PositiveBuckets)
 | |
| 					bucketCount += len(f.NegativeBuckets)
 | |
| 				}
 | |
| 				histogramChunkBucketsCount = append(histogramChunkBucketsCount, bucketCount)
 | |
| 			case chunkenc.EncHistogram:
 | |
| 				histogramChunkSamplesCount = append(histogramChunkSamplesCount, chk.NumSamples())
 | |
| 				histogramChunkSize = append(histogramChunkSize, len(chk.Bytes()))
 | |
| 				hchk, ok := chk.(*chunkenc.HistogramChunk)
 | |
| 				if !ok {
 | |
| 					return errors.New("chunk is not HistogramChunk")
 | |
| 				}
 | |
| 				it := hchk.Iterator(nil)
 | |
| 				bucketCount := 0
 | |
| 				for it.Next() == chunkenc.ValHistogram {
 | |
| 					_, f := it.AtHistogram(nil)
 | |
| 					bucketCount += len(f.PositiveBuckets)
 | |
| 					bucketCount += len(f.NegativeBuckets)
 | |
| 				}
 | |
| 				histogramChunkBucketsCount = append(histogramChunkBucketsCount, bucketCount)
 | |
| 			}
 | |
| 			totalChunks++
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	fmt.Printf("\nCompaction analysis:\n")
 | |
| 	fmt.Println()
 | |
| 	displayHistogram("samples per float chunk", floatChunkSamplesCount, totalChunks)
 | |
| 
 | |
| 	displayHistogram("bytes per float chunk", floatChunkSize, totalChunks)
 | |
| 
 | |
| 	displayHistogram("samples per histogram chunk", histogramChunkSamplesCount, totalChunks)
 | |
| 
 | |
| 	displayHistogram("bytes per histogram chunk", histogramChunkSize, totalChunks)
 | |
| 
 | |
| 	displayHistogram("buckets per histogram chunk", histogramChunkBucketsCount, totalChunks)
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| type SeriesSetFormatter func(series storage.SeriesSet) error
 | |
| 
 | |
| func dumpSamples(ctx context.Context, dbDir, sandboxDirRoot string, mint, maxt int64, match []string, formatter SeriesSetFormatter) (err error) {
 | |
| 	db, err := tsdb.OpenDBReadOnly(dbDir, sandboxDirRoot, nil)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	defer func() {
 | |
| 		err = tsdb_errors.NewMulti(err, db.Close()).Err()
 | |
| 	}()
 | |
| 	q, err := db.Querier(mint, maxt)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	defer q.Close()
 | |
| 
 | |
| 	matcherSets, err := parser.ParseMetricSelectors(match)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	var ss storage.SeriesSet
 | |
| 	if len(matcherSets) > 1 {
 | |
| 		var sets []storage.SeriesSet
 | |
| 		for _, mset := range matcherSets {
 | |
| 			sets = append(sets, q.Select(ctx, true, nil, mset...))
 | |
| 		}
 | |
| 		ss = storage.NewMergeSeriesSet(sets, 0, storage.ChainedSeriesMerge)
 | |
| 	} else {
 | |
| 		ss = q.Select(ctx, false, nil, matcherSets[0]...)
 | |
| 	}
 | |
| 
 | |
| 	err = formatter(ss)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	if ws := ss.Warnings(); len(ws) > 0 {
 | |
| 		return tsdb_errors.NewMulti(ws.AsErrors()...).Err()
 | |
| 	}
 | |
| 
 | |
| 	if ss.Err() != nil {
 | |
| 		return ss.Err()
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func formatSeriesSet(ss storage.SeriesSet) error {
 | |
| 	for ss.Next() {
 | |
| 		series := ss.At()
 | |
| 		lbs := series.Labels()
 | |
| 		it := series.Iterator(nil)
 | |
| 		for it.Next() == chunkenc.ValFloat {
 | |
| 			ts, val := it.At()
 | |
| 			fmt.Printf("%s %g %d\n", lbs, val, ts)
 | |
| 		}
 | |
| 		for it.Next() == chunkenc.ValFloatHistogram {
 | |
| 			ts, fh := it.AtFloatHistogram(nil)
 | |
| 			fmt.Printf("%s %s %d\n", lbs, fh.String(), ts)
 | |
| 		}
 | |
| 		for it.Next() == chunkenc.ValHistogram {
 | |
| 			ts, h := it.AtHistogram(nil)
 | |
| 			fmt.Printf("%s %s %d\n", lbs, h.String(), ts)
 | |
| 		}
 | |
| 		if it.Err() != nil {
 | |
| 			return ss.Err()
 | |
| 		}
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // CondensedString is labels.Labels.String() without spaces after the commas.
 | |
| func CondensedString(ls labels.Labels) string {
 | |
| 	var b bytes.Buffer
 | |
| 
 | |
| 	b.WriteByte('{')
 | |
| 	i := 0
 | |
| 	ls.Range(func(l labels.Label) {
 | |
| 		if i > 0 {
 | |
| 			b.WriteByte(',')
 | |
| 		}
 | |
| 		b.WriteString(l.Name)
 | |
| 		b.WriteByte('=')
 | |
| 		b.WriteString(strconv.Quote(l.Value))
 | |
| 		i++
 | |
| 	})
 | |
| 	b.WriteByte('}')
 | |
| 	return b.String()
 | |
| }
 | |
| 
 | |
| func formatSeriesSetOpenMetrics(ss storage.SeriesSet) error {
 | |
| 	for ss.Next() {
 | |
| 		series := ss.At()
 | |
| 		lbs := series.Labels()
 | |
| 		metricName := lbs.Get(labels.MetricName)
 | |
| 		lbs = lbs.DropMetricName()
 | |
| 		it := series.Iterator(nil)
 | |
| 		for it.Next() == chunkenc.ValFloat {
 | |
| 			ts, val := it.At()
 | |
| 			fmt.Printf("%s%s %g %.3f\n", metricName, CondensedString(lbs), val, float64(ts)/1000)
 | |
| 		}
 | |
| 		if it.Err() != nil {
 | |
| 			return ss.Err()
 | |
| 		}
 | |
| 	}
 | |
| 	fmt.Println("# EOF")
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func checkErr(err error) int {
 | |
| 	if err != nil {
 | |
| 		fmt.Fprintln(os.Stderr, err)
 | |
| 		return 1
 | |
| 	}
 | |
| 	return 0
 | |
| }
 | |
| 
 | |
| func backfillOpenMetrics(path, outputDir string, humanReadable, quiet bool, maxBlockDuration time.Duration, customLabels map[string]string) int {
 | |
| 	var buf []byte
 | |
| 	info, err := os.Stat(path)
 | |
| 	if err != nil {
 | |
| 		return checkErr(err)
 | |
| 	}
 | |
| 	if info.Mode()&(os.ModeNamedPipe|os.ModeCharDevice) != 0 {
 | |
| 		// Read the pipe chunks by chunks as it cannot be mmap-ed
 | |
| 		buf, err = os.ReadFile(path)
 | |
| 		if err != nil {
 | |
| 			return checkErr(err)
 | |
| 		}
 | |
| 	} else {
 | |
| 		inputFile, err := fileutil.OpenMmapFile(path)
 | |
| 		if err != nil {
 | |
| 			return checkErr(err)
 | |
| 		}
 | |
| 		defer inputFile.Close()
 | |
| 		buf = inputFile.Bytes()
 | |
| 	}
 | |
| 
 | |
| 	if err := os.MkdirAll(outputDir, 0o777); err != nil {
 | |
| 		return checkErr(fmt.Errorf("create output dir: %w", err))
 | |
| 	}
 | |
| 
 | |
| 	return checkErr(backfill(5000, buf, outputDir, humanReadable, quiet, maxBlockDuration, customLabels))
 | |
| }
 | |
| 
 | |
| func displayHistogram(dataType string, datas []int, total int) {
 | |
| 	if len(datas) == 0 {
 | |
| 		fmt.Printf("%s: N/A\n\n", dataType)
 | |
| 		return
 | |
| 	}
 | |
| 	slices.Sort(datas)
 | |
| 	start, end, step := generateBucket(datas[0], datas[len(datas)-1])
 | |
| 	sum := 0
 | |
| 	buckets := make([]int, (end-start)/step+1)
 | |
| 	maxCount := 0
 | |
| 	for _, c := range datas {
 | |
| 		sum += c
 | |
| 		buckets[(c-start)/step]++
 | |
| 		if buckets[(c-start)/step] > maxCount {
 | |
| 			maxCount = buckets[(c-start)/step]
 | |
| 		}
 | |
| 	}
 | |
| 	avg := sum / len(datas)
 | |
| 	fmt.Printf("%s (min/avg/max): %d/%d/%d\n", dataType, datas[0], avg, datas[len(datas)-1])
 | |
| 	maxLeftLen := strconv.Itoa(len(strconv.Itoa(end)))
 | |
| 	maxRightLen := strconv.Itoa(len(strconv.Itoa(end + step)))
 | |
| 	maxCountLen := strconv.Itoa(len(strconv.Itoa(maxCount)))
 | |
| 	for bucket, count := range buckets {
 | |
| 		percentage := 100.0 * count / total
 | |
| 		fmt.Printf("[%"+maxLeftLen+"d, %"+maxRightLen+"d]: %"+maxCountLen+"d %s\n", bucket*step+start+1, (bucket+1)*step+start, count, strings.Repeat("#", percentage))
 | |
| 	}
 | |
| 	fmt.Println()
 | |
| }
 | |
| 
 | |
| func generateBucket(minVal, maxVal int) (start, end, step int) {
 | |
| 	s := (maxVal - minVal) / 10
 | |
| 
 | |
| 	step = 10
 | |
| 	for step < s && step <= 10000 {
 | |
| 		step *= 10
 | |
| 	}
 | |
| 
 | |
| 	start = minVal - minVal%step
 | |
| 	end = maxVal - maxVal%step + step
 | |
| 
 | |
| 	return
 | |
| }
 |