Add more verbose error handling for closing, reduce locking
This commit introduces error returns in various places and is explicit
about closing persisted blocks.
{Index,Chunk,Tombstone}Readers are more consistent about their Close()
method. Whenever a reader is retrieved, the corresponding close method
must eventually be called. We use this to track pending readers against
persisted blocks.
Querier's against the DB no longer hold a read lock for their entire
lifecycle. This avoids long running queriers to starve new ones when we
have to acquire a write lock when reloading blocks.
			
			
This commit is contained in:
		
							parent
							
								
									963a270885
								
							
						
					
					
						commit
						fb9da52b11
					
				
							
								
								
									
										143
									
								
								block.go
								
								
								
								
							
							
						
						
									
										143
									
								
								block.go
								
								
								
								
							|  | @ -19,6 +19,7 @@ import ( | |||
| 	"io/ioutil" | ||||
| 	"os" | ||||
| 	"path/filepath" | ||||
| 	"sync" | ||||
| 
 | ||||
| 	"github.com/oklog/ulid" | ||||
| 	"github.com/pkg/errors" | ||||
|  | @ -26,33 +27,16 @@ import ( | |||
| 	"github.com/prometheus/tsdb/labels" | ||||
| ) | ||||
| 
 | ||||
| // DiskBlock represents a data block backed by on-disk data.
 | ||||
| type DiskBlock interface { | ||||
| 	BlockReader | ||||
| 
 | ||||
| 	// Directory where block data is stored.
 | ||||
| 	Dir() string | ||||
| 
 | ||||
| 	// Stats returns statistics about the block.
 | ||||
| 	Meta() BlockMeta | ||||
| 
 | ||||
| 	Delete(mint, maxt int64, m ...labels.Matcher) error | ||||
| 
 | ||||
| 	Snapshot(dir string) error | ||||
| 
 | ||||
| 	Close() error | ||||
| } | ||||
| 
 | ||||
| // BlockReader provides reading access to a data block.
 | ||||
| type BlockReader interface { | ||||
| 	// Index returns an IndexReader over the block's data.
 | ||||
| 	Index() IndexReader | ||||
| 	Index() (IndexReader, error) | ||||
| 
 | ||||
| 	// Chunks returns a ChunkReader over the block's data.
 | ||||
| 	Chunks() ChunkReader | ||||
| 	Chunks() (ChunkReader, error) | ||||
| 
 | ||||
| 	// Tombstones returns a TombstoneReader over the block's deleted data.
 | ||||
| 	Tombstones() TombstoneReader | ||||
| 	Tombstones() (TombstoneReader, error) | ||||
| } | ||||
| 
 | ||||
| // Appendable defines an entity to which data can be appended.
 | ||||
|  | @ -149,7 +133,12 @@ func writeMetaFile(dir string, meta *BlockMeta) error { | |||
| 	return renameFile(tmp, path) | ||||
| } | ||||
| 
 | ||||
| type persistedBlock struct { | ||||
| // Block represents a directory of time series data covering a continous time range.
 | ||||
| type Block struct { | ||||
| 	mtx            sync.RWMutex | ||||
| 	closing        bool | ||||
| 	pendingReaders sync.WaitGroup | ||||
| 
 | ||||
| 	dir  string | ||||
| 	meta BlockMeta | ||||
| 
 | ||||
|  | @ -159,7 +148,9 @@ type persistedBlock struct { | |||
| 	tombstones tombstoneReader | ||||
| } | ||||
| 
 | ||||
| func newPersistedBlock(dir string, pool chunks.Pool) (*persistedBlock, error) { | ||||
| // OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used
 | ||||
| // to instantiate chunk structs.
 | ||||
| func OpenBlock(dir string, pool chunks.Pool) (*Block, error) { | ||||
| 	meta, err := readMetaFile(dir) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
|  | @ -179,7 +170,7 @@ func newPersistedBlock(dir string, pool chunks.Pool) (*persistedBlock, error) { | |||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	pb := &persistedBlock{ | ||||
| 	pb := &Block{ | ||||
| 		dir:        dir, | ||||
| 		meta:       *meta, | ||||
| 		chunkr:     cr, | ||||
|  | @ -189,28 +180,110 @@ func newPersistedBlock(dir string, pool chunks.Pool) (*persistedBlock, error) { | |||
| 	return pb, nil | ||||
| } | ||||
| 
 | ||||
| func (pb *persistedBlock) Close() error { | ||||
| // Close closes the on-disk block. It blocks as long as there are readers reading from the block.
 | ||||
| func (pb *Block) Close() error { | ||||
| 	pb.mtx.Lock() | ||||
| 	pb.closing = true | ||||
| 	pb.mtx.Unlock() | ||||
| 
 | ||||
| 	pb.pendingReaders.Wait() | ||||
| 
 | ||||
| 	var merr MultiError | ||||
| 
 | ||||
| 	merr.Add(pb.chunkr.Close()) | ||||
| 	merr.Add(pb.indexr.Close()) | ||||
| 	merr.Add(pb.tombstones.Close()) | ||||
| 
 | ||||
| 	return merr.Err() | ||||
| } | ||||
| 
 | ||||
| func (pb *persistedBlock) String() string { | ||||
| func (pb *Block) String() string { | ||||
| 	return pb.meta.ULID.String() | ||||
| } | ||||
| 
 | ||||
| func (pb *persistedBlock) Dir() string         { return pb.dir } | ||||
| func (pb *persistedBlock) Index() IndexReader  { return pb.indexr } | ||||
| func (pb *persistedBlock) Chunks() ChunkReader { return pb.chunkr } | ||||
| func (pb *persistedBlock) Tombstones() TombstoneReader { | ||||
| 	return pb.tombstones | ||||
| } | ||||
| func (pb *persistedBlock) Meta() BlockMeta { return pb.meta } | ||||
| // Dir returns the directory of the block.
 | ||||
| func (pb *Block) Dir() string { return pb.dir } | ||||
| 
 | ||||
| // Meta returns meta information about the block.
 | ||||
| func (pb *Block) Meta() BlockMeta { return pb.meta } | ||||
| 
 | ||||
| // ErrClosing is returned when a block is in the process of being closed.
 | ||||
| var ErrClosing = errors.New("block is closing") | ||||
| 
 | ||||
| func (pb *Block) startRead() error { | ||||
| 	pb.mtx.RLock() | ||||
| 	defer pb.mtx.RUnlock() | ||||
| 
 | ||||
| 	if pb.closing { | ||||
| 		return ErrClosing | ||||
| 	} | ||||
| 	pb.pendingReaders.Add(1) | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| // Index returns a new IndexReader against the block data.
 | ||||
| func (pb *Block) Index() (IndexReader, error) { | ||||
| 	if err := pb.startRead(); err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	return blockIndexReader{IndexReader: pb.indexr, b: pb}, nil | ||||
| } | ||||
| 
 | ||||
| // Chunks returns a new ChunkReader against the block data.
 | ||||
| func (pb *Block) Chunks() (ChunkReader, error) { | ||||
| 	if err := pb.startRead(); err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	return blockChunkReader{ChunkReader: pb.chunkr, b: pb}, nil | ||||
| } | ||||
| 
 | ||||
| // Tombstones returns a new TombstoneReader against the block data.
 | ||||
| func (pb *Block) Tombstones() (TombstoneReader, error) { | ||||
| 	if err := pb.startRead(); err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	return blockTombstoneReader{TombstoneReader: pb.tombstones, b: pb}, nil | ||||
| } | ||||
| 
 | ||||
| type blockIndexReader struct { | ||||
| 	IndexReader | ||||
| 	b *Block | ||||
| } | ||||
| 
 | ||||
| func (r blockIndexReader) Close() error { | ||||
| 	r.b.pendingReaders.Done() | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| type blockTombstoneReader struct { | ||||
| 	TombstoneReader | ||||
| 	b *Block | ||||
| } | ||||
| 
 | ||||
| func (r blockTombstoneReader) Close() error { | ||||
| 	r.b.pendingReaders.Done() | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| type blockChunkReader struct { | ||||
| 	ChunkReader | ||||
| 	b *Block | ||||
| } | ||||
| 
 | ||||
| func (r blockChunkReader) Close() error { | ||||
| 	r.b.pendingReaders.Done() | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| // Delete matching series between mint and maxt in the block.
 | ||||
| func (pb *Block) Delete(mint, maxt int64, ms ...labels.Matcher) error { | ||||
| 	pb.mtx.Lock() | ||||
| 	defer pb.mtx.Unlock() | ||||
| 
 | ||||
| 	if pb.closing { | ||||
| 		return ErrClosing | ||||
| 	} | ||||
| 
 | ||||
| func (pb *persistedBlock) Delete(mint, maxt int64, ms ...labels.Matcher) error { | ||||
| 	pr := newPostingsReader(pb.indexr) | ||||
| 	p, absent := pr.Select(ms...) | ||||
| 
 | ||||
|  | @ -262,7 +335,8 @@ Outer: | |||
| 	return writeMetaFile(pb.dir, &pb.meta) | ||||
| } | ||||
| 
 | ||||
| func (pb *persistedBlock) Snapshot(dir string) error { | ||||
| // Snapshot creates snapshot of the block into dir.
 | ||||
| func (pb *Block) Snapshot(dir string) error { | ||||
| 	blockDir := filepath.Join(dir, pb.meta.ULID.String()) | ||||
| 	if err := os.MkdirAll(blockDir, 0777); err != nil { | ||||
| 		return errors.Wrap(err, "create snapshot block dir") | ||||
|  | @ -311,7 +385,6 @@ func clampInterval(a, b, mint, maxt int64) (int64, int64) { | |||
| 	if b > maxt { | ||||
| 		b = maxt | ||||
| 	} | ||||
| 
 | ||||
| 	return a, b | ||||
| } | ||||
| 
 | ||||
|  |  | |||
|  | @ -21,9 +21,9 @@ import ( | |||
| 	"io" | ||||
| 	"os" | ||||
| 
 | ||||
| 	"github.com/prometheus/tsdb/fileutil" | ||||
| 	"github.com/pkg/errors" | ||||
| 	"github.com/prometheus/tsdb/chunks" | ||||
| 	"github.com/prometheus/tsdb/fileutil" | ||||
| ) | ||||
| 
 | ||||
| const ( | ||||
|  |  | |||
							
								
								
									
										32
									
								
								compact.go
								
								
								
								
							
							
						
						
									
										32
									
								
								compact.go
								
								
								
								
							|  | @ -14,6 +14,7 @@ | |||
| package tsdb | ||||
| 
 | ||||
| import ( | ||||
| 	"io" | ||||
| 	"math/rand" | ||||
| 	"os" | ||||
| 	"path/filepath" | ||||
|  | @ -299,7 +300,7 @@ func (c *LeveledCompactor) Compact(dest string, dirs ...string) (err error) { | |||
| 	var metas []*BlockMeta | ||||
| 
 | ||||
| 	for _, d := range dirs { | ||||
| 		b, err := newPersistedBlock(d, c.chunkPool) | ||||
| 		b, err := OpenBlock(d, c.chunkPool) | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
|  | @ -444,10 +445,30 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, | |||
| 	var ( | ||||
| 		set        compactionSet | ||||
| 		allSymbols = make(map[string]struct{}, 1<<16) | ||||
| 		closers    = []io.Closer{} | ||||
| 	) | ||||
| 	for i, b := range blocks { | ||||
| 	defer func() { closeAll(closers...) }() | ||||
| 
 | ||||
| 		symbols, err := b.Index().Symbols() | ||||
| 	for i, b := range blocks { | ||||
| 		indexr, err := b.Index() | ||||
| 		if err != nil { | ||||
| 			return errors.Wrapf(err, "open index reader for block %s", b) | ||||
| 		} | ||||
| 		closers = append(closers, indexr) | ||||
| 
 | ||||
| 		chunkr, err := b.Chunks() | ||||
| 		if err != nil { | ||||
| 			return errors.Wrapf(err, "open chunk reader for block %s", b) | ||||
| 		} | ||||
| 		closers = append(closers, chunkr) | ||||
| 
 | ||||
| 		tombsr, err := b.Tombstones() | ||||
| 		if err != nil { | ||||
| 			return errors.Wrapf(err, "open tombstone reader for block %s", b) | ||||
| 		} | ||||
| 		closers = append(closers, tombsr) | ||||
| 
 | ||||
| 		symbols, err := indexr.Symbols() | ||||
| 		if err != nil { | ||||
| 			return errors.Wrap(err, "read symbols") | ||||
| 		} | ||||
|  | @ -455,15 +476,13 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, | |||
| 			allSymbols[s] = struct{}{} | ||||
| 		} | ||||
| 
 | ||||
| 		indexr := b.Index() | ||||
| 
 | ||||
| 		all, err := indexr.Postings(allPostingsKey.Name, allPostingsKey.Value) | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 		all = indexr.SortedPostings(all) | ||||
| 
 | ||||
| 		s := newCompactionSeriesSet(indexr, b.Chunks(), b.Tombstones(), all) | ||||
| 		s := newCompactionSeriesSet(indexr, chunkr, tombsr, all) | ||||
| 
 | ||||
| 		if i == 0 { | ||||
| 			set = s | ||||
|  | @ -565,7 +584,6 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, | |||
| 			return errors.Wrap(err, "write postings") | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
|  |  | |||
							
								
								
									
										83
									
								
								db.go
								
								
								
								
							
							
						
						
									
										83
									
								
								db.go
								
								
								
								
							|  | @ -30,7 +30,6 @@ import ( | |||
| 
 | ||||
| 	"golang.org/x/sync/errgroup" | ||||
| 
 | ||||
| 	"github.com/prometheus/tsdb/fileutil" | ||||
| 	"github.com/go-kit/kit/log" | ||||
| 	"github.com/go-kit/kit/log/level" | ||||
| 	"github.com/nightlyone/lockfile" | ||||
|  | @ -38,6 +37,7 @@ import ( | |||
| 	"github.com/pkg/errors" | ||||
| 	"github.com/prometheus/client_golang/prometheus" | ||||
| 	"github.com/prometheus/tsdb/chunks" | ||||
| 	"github.com/prometheus/tsdb/fileutil" | ||||
| 	"github.com/prometheus/tsdb/labels" | ||||
| ) | ||||
| 
 | ||||
|  | @ -105,7 +105,7 @@ type DB struct { | |||
| 
 | ||||
| 	// Mutex for that must be held when modifying the general block layout.
 | ||||
| 	mtx    sync.RWMutex | ||||
| 	blocks []DiskBlock | ||||
| 	blocks []*Block | ||||
| 
 | ||||
| 	head *Head | ||||
| 
 | ||||
|  | @ -431,7 +431,7 @@ func retentionCutoff(dir string, mint int64) (bool, error) { | |||
| 	return changes, fileutil.Fsync(df) | ||||
| } | ||||
| 
 | ||||
| func (db *DB) getBlock(id ulid.ULID) (DiskBlock, bool) { | ||||
| func (db *DB) getBlock(id ulid.ULID) (*Block, bool) { | ||||
| 	for _, b := range db.blocks { | ||||
| 		if b.Meta().ULID == id { | ||||
| 			return b, true | ||||
|  | @ -456,7 +456,7 @@ func (db *DB) reload() (err error) { | |||
| 		return errors.Wrap(err, "find blocks") | ||||
| 	} | ||||
| 	var ( | ||||
| 		blocks []DiskBlock | ||||
| 		blocks []*Block | ||||
| 		exist  = map[ulid.ULID]struct{}{} | ||||
| 	) | ||||
| 
 | ||||
|  | @ -468,7 +468,7 @@ func (db *DB) reload() (err error) { | |||
| 
 | ||||
| 		b, ok := db.getBlock(meta.ULID) | ||||
| 		if !ok { | ||||
| 			b, err = newPersistedBlock(dir, db.chunkPool) | ||||
| 			b, err = OpenBlock(dir, db.chunkPool) | ||||
| 			if err != nil { | ||||
| 				return errors.Wrapf(err, "open block %s", dir) | ||||
| 			} | ||||
|  | @ -505,7 +505,7 @@ func (db *DB) reload() (err error) { | |||
| 	return errors.Wrap(db.head.Truncate(maxt), "head truncate failed") | ||||
| } | ||||
| 
 | ||||
| func validateBlockSequence(bs []DiskBlock) error { | ||||
| func validateBlockSequence(bs []*Block) error { | ||||
| 	if len(bs) == 0 { | ||||
| 		return nil | ||||
| 	} | ||||
|  | @ -521,13 +521,19 @@ func validateBlockSequence(bs []DiskBlock) error { | |||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (db *DB) Blocks() []DiskBlock { | ||||
| func (db *DB) String() string { | ||||
| 	return "HEAD" | ||||
| } | ||||
| 
 | ||||
| // Blocks returns the databases persisted blocks.
 | ||||
| func (db *DB) Blocks() []*Block { | ||||
| 	db.mtx.RLock() | ||||
| 	defer db.mtx.RUnlock() | ||||
| 
 | ||||
| 	return db.blocks | ||||
| } | ||||
| 
 | ||||
| // Head returns the databases's head.
 | ||||
| func (db *DB) Head() *Head { | ||||
| 	return db.head | ||||
| } | ||||
|  | @ -587,41 +593,42 @@ func (db *DB) Snapshot(dir string) error { | |||
| 	db.cmtx.Lock() | ||||
| 	defer db.cmtx.Unlock() | ||||
| 
 | ||||
| 	db.mtx.RLock() | ||||
| 	defer db.mtx.RUnlock() | ||||
| 
 | ||||
| 	for _, b := range db.blocks { | ||||
| 	for _, b := range db.Blocks() { | ||||
| 		level.Info(db.logger).Log("msg", "snapshotting block", "block", b) | ||||
| 
 | ||||
| 		if err := b.Snapshot(dir); err != nil { | ||||
| 			return errors.Wrap(err, "error snapshotting headblock") | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	return db.compactor.Write(dir, db.head, db.head.MinTime(), db.head.MaxTime()) | ||||
| } | ||||
| 
 | ||||
| // Querier returns a new querier over the data partition for the given time range.
 | ||||
| // A goroutine must not handle more than one open Querier.
 | ||||
| func (db *DB) Querier(mint, maxt int64) Querier { | ||||
| 	db.mtx.RLock() | ||||
| func (db *DB) Querier(mint, maxt int64) (Querier, error) { | ||||
| 	var blocks []BlockReader | ||||
| 
 | ||||
| 	blocks := db.blocksForInterval(mint, maxt) | ||||
| 	for _, b := range db.Blocks() { | ||||
| 		m := b.Meta() | ||||
| 		if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) { | ||||
| 			blocks = append(blocks, b) | ||||
| 		} | ||||
| 	} | ||||
| 	if maxt >= db.head.MinTime() { | ||||
| 		blocks = append(blocks, db.head) | ||||
| 	} | ||||
| 
 | ||||
| 	sq := &querier{ | ||||
| 		blocks: make([]Querier, 0, len(blocks)), | ||||
| 		db:     db, | ||||
| 	} | ||||
| 	for _, b := range blocks { | ||||
| 		sq.blocks = append(sq.blocks, &blockQuerier{ | ||||
| 			mint:       mint, | ||||
| 			maxt:       maxt, | ||||
| 			index:      b.Index(), | ||||
| 			chunks:     b.Chunks(), | ||||
| 			tombstones: b.Tombstones(), | ||||
| 		}) | ||||
| 		q, err := NewBlockQuerier(b, mint, maxt) | ||||
| 		if err != nil { | ||||
| 			return nil, errors.Wrapf(err, "open querier for block %s", b) | ||||
| 		} | ||||
| 	return sq | ||||
| 		sq.blocks = append(sq.blocks, q) | ||||
| 	} | ||||
| 	return sq, nil | ||||
| } | ||||
| 
 | ||||
| func rangeForTimestamp(t int64, width int64) (mint, maxt int64) { | ||||
|  | @ -634,28 +641,22 @@ func (db *DB) Delete(mint, maxt int64, ms ...labels.Matcher) error { | |||
| 	db.cmtx.Lock() | ||||
| 	defer db.cmtx.Unlock() | ||||
| 
 | ||||
| 	db.mtx.Lock() | ||||
| 	defer db.mtx.Unlock() | ||||
| 
 | ||||
| 	var g errgroup.Group | ||||
| 
 | ||||
| 	for _, b := range db.blocks { | ||||
| 	for _, b := range db.Blocks() { | ||||
| 		m := b.Meta() | ||||
| 		if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) { | ||||
| 			g.Go(func(b DiskBlock) func() error { | ||||
| 			g.Go(func(b *Block) func() error { | ||||
| 				return func() error { return b.Delete(mint, maxt, ms...) } | ||||
| 			}(b)) | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	g.Go(func() error { | ||||
| 		return db.head.Delete(mint, maxt, ms...) | ||||
| 	}) | ||||
| 
 | ||||
| 	if err := g.Wait(); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
|  | @ -668,24 +669,6 @@ func intervalContains(min, max, t int64) bool { | |||
| 	return t >= min && t <= max | ||||
| } | ||||
| 
 | ||||
| // blocksForInterval returns all blocks within the partition that may contain
 | ||||
| // data for the given time range.
 | ||||
| func (db *DB) blocksForInterval(mint, maxt int64) []BlockReader { | ||||
| 	var bs []BlockReader | ||||
| 
 | ||||
| 	for _, b := range db.blocks { | ||||
| 		m := b.Meta() | ||||
| 		if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) { | ||||
| 			bs = append(bs, b) | ||||
| 		} | ||||
| 	} | ||||
| 	if maxt >= db.head.MinTime() { | ||||
| 		bs = append(bs, db.head) | ||||
| 	} | ||||
| 
 | ||||
| 	return bs | ||||
| } | ||||
| 
 | ||||
| func isBlockDir(fi os.FileInfo) bool { | ||||
| 	if !fi.IsDir() { | ||||
| 		return false | ||||
|  |  | |||
							
								
								
									
										32
									
								
								db_test.go
								
								
								
								
							
							
						
						
									
										32
									
								
								db_test.go
								
								
								
								
							|  | @ -68,7 +68,8 @@ func TestDataAvailableOnlyAfterCommit(t *testing.T) { | |||
| 	_, err := app.Add(labels.FromStrings("foo", "bar"), 0, 0) | ||||
| 	require.NoError(t, err) | ||||
| 
 | ||||
| 	querier := db.Querier(0, 1) | ||||
| 	querier, err := db.Querier(0, 1) | ||||
| 	require.NoError(t, err) | ||||
| 	seriesSet := readSeriesSet(t, querier.Select(labels.NewEqualMatcher("foo", "bar"))) | ||||
| 
 | ||||
| 	require.Equal(t, seriesSet, map[string][]sample{}) | ||||
|  | @ -77,7 +78,8 @@ func TestDataAvailableOnlyAfterCommit(t *testing.T) { | |||
| 	err = app.Commit() | ||||
| 	require.NoError(t, err) | ||||
| 
 | ||||
| 	querier = db.Querier(0, 1) | ||||
| 	querier, err = db.Querier(0, 1) | ||||
| 	require.NoError(t, err) | ||||
| 	defer querier.Close() | ||||
| 
 | ||||
| 	seriesSet = readSeriesSet(t, querier.Select(labels.NewEqualMatcher("foo", "bar"))) | ||||
|  | @ -96,7 +98,8 @@ func TestDataNotAvailableAfterRollback(t *testing.T) { | |||
| 	err = app.Rollback() | ||||
| 	require.NoError(t, err) | ||||
| 
 | ||||
| 	querier := db.Querier(0, 1) | ||||
| 	querier, err := db.Querier(0, 1) | ||||
| 	require.NoError(t, err) | ||||
| 	defer querier.Close() | ||||
| 
 | ||||
| 	seriesSet := readSeriesSet(t, querier.Select(labels.NewEqualMatcher("foo", "bar"))) | ||||
|  | @ -140,7 +143,9 @@ func TestDBAppenderAddRef(t *testing.T) { | |||
| 
 | ||||
| 	require.NoError(t, app2.Commit()) | ||||
| 
 | ||||
| 	q := db.Querier(0, 200) | ||||
| 	q, err := db.Querier(0, 200) | ||||
| 	require.NoError(t, err) | ||||
| 
 | ||||
| 	res := readSeriesSet(t, q.Select(labels.NewEqualMatcher("a", "b"))) | ||||
| 
 | ||||
| 	require.Equal(t, map[string][]sample{ | ||||
|  | @ -190,7 +195,9 @@ Outer: | |||
| 		} | ||||
| 
 | ||||
| 		// Compare the result.
 | ||||
| 		q := db.Querier(0, numSamples) | ||||
| 		q, err := db.Querier(0, numSamples) | ||||
| 		require.NoError(t, err) | ||||
| 
 | ||||
| 		res := q.Select(labels.NewEqualMatcher("a", "b")) | ||||
| 
 | ||||
| 		expSamples := make([]sample, 0, len(c.remaint)) | ||||
|  | @ -284,7 +291,9 @@ func TestSkippingInvalidValuesInSameTxn(t *testing.T) { | |||
| 	require.NoError(t, app.Commit()) | ||||
| 
 | ||||
| 	// Make sure the right value is stored.
 | ||||
| 	q := db.Querier(0, 10) | ||||
| 	q, err := db.Querier(0, 10) | ||||
| 	require.NoError(t, err) | ||||
| 
 | ||||
| 	ss := q.Select(labels.NewEqualMatcher("a", "b")) | ||||
| 	ssMap := readSeriesSet(t, ss) | ||||
| 
 | ||||
|  | @ -302,7 +311,9 @@ func TestSkippingInvalidValuesInSameTxn(t *testing.T) { | |||
| 	require.NoError(t, err) | ||||
| 	require.NoError(t, app.Commit()) | ||||
| 
 | ||||
| 	q = db.Querier(0, 10) | ||||
| 	q, err = db.Querier(0, 10) | ||||
| 	require.NoError(t, err) | ||||
| 
 | ||||
| 	ss = q.Select(labels.NewEqualMatcher("a", "b")) | ||||
| 	ssMap = readSeriesSet(t, ss) | ||||
| 
 | ||||
|  | @ -336,7 +347,8 @@ func TestDB_Snapshot(t *testing.T) { | |||
| 	db, err = Open(snap, nil, nil, nil) | ||||
| 	require.NoError(t, err) | ||||
| 
 | ||||
| 	querier := db.Querier(mint, mint+1000) | ||||
| 	querier, err := db.Querier(mint, mint+1000) | ||||
| 	require.NoError(t, err) | ||||
| 	defer querier.Close() | ||||
| 
 | ||||
| 	// sum values
 | ||||
|  | @ -485,7 +497,9 @@ func TestDB_e2e(t *testing.T) { | |||
| 				} | ||||
| 			} | ||||
| 
 | ||||
| 			q := db.Querier(mint, maxt) | ||||
| 			q, err := db.Querier(mint, maxt) | ||||
| 			require.NoError(t, err) | ||||
| 
 | ||||
| 			ss := q.Select(qry.ms...) | ||||
| 
 | ||||
| 			result := map[string][]sample{} | ||||
|  |  | |||
							
								
								
									
										47
									
								
								head.go
								
								
								
								
							
							
						
						
									
										47
									
								
								head.go
								
								
								
								
							|  | @ -305,6 +305,23 @@ func (h *Head) initTime(t int64) (initialized bool) { | |||
| 	return true | ||||
| } | ||||
| 
 | ||||
| type rangeHead struct { | ||||
| 	head       *Head | ||||
| 	mint, maxt int64 | ||||
| } | ||||
| 
 | ||||
| func (h *rangeHead) Index() (IndexReader, error) { | ||||
| 	return h.head.indexRange(h.mint, h.maxt), nil | ||||
| } | ||||
| 
 | ||||
| func (h *rangeHead) Chunks() (ChunkReader, error) { | ||||
| 	return h.head.chunksRange(h.mint, h.maxt), nil | ||||
| } | ||||
| 
 | ||||
| func (h *rangeHead) Tombstones() (TombstoneReader, error) { | ||||
| 	return h.head.tombstones, nil | ||||
| } | ||||
| 
 | ||||
| // initAppender is a helper to initialize the time bounds of a the head
 | ||||
| // upon the first sample it receives.
 | ||||
| type initAppender struct { | ||||
|  | @ -611,13 +628,14 @@ func (h *Head) gc() { | |||
| 	h.symMtx.Unlock() | ||||
| } | ||||
| 
 | ||||
| func (h *Head) Tombstones() TombstoneReader { | ||||
| 	return h.tombstones | ||||
| // Tombstones returns a new reader over the head's tombstones
 | ||||
| func (h *Head) Tombstones() (TombstoneReader, error) { | ||||
| 	return h.tombstones, nil | ||||
| } | ||||
| 
 | ||||
| // Index returns an IndexReader against the block.
 | ||||
| func (h *Head) Index() IndexReader { | ||||
| 	return h.indexRange(math.MinInt64, math.MaxInt64) | ||||
| func (h *Head) Index() (IndexReader, error) { | ||||
| 	return h.indexRange(math.MinInt64, math.MaxInt64), nil | ||||
| } | ||||
| 
 | ||||
| func (h *Head) indexRange(mint, maxt int64) *headIndexReader { | ||||
|  | @ -628,8 +646,8 @@ func (h *Head) indexRange(mint, maxt int64) *headIndexReader { | |||
| } | ||||
| 
 | ||||
| // Chunks returns a ChunkReader against the block.
 | ||||
| func (h *Head) Chunks() ChunkReader { | ||||
| 	return h.chunksRange(math.MinInt64, math.MaxInt64) | ||||
| func (h *Head) Chunks() (ChunkReader, error) { | ||||
| 	return h.chunksRange(math.MinInt64, math.MaxInt64), nil | ||||
| } | ||||
| 
 | ||||
| func (h *Head) chunksRange(mint, maxt int64) *headChunkReader { | ||||
|  | @ -712,23 +730,6 @@ func (c *safeChunk) Iterator() chunks.Iterator { | |||
| // func (c *safeChunk) Bytes() []byte                      { panic("illegal") }
 | ||||
| // func (c *safeChunk) Encoding() chunks.Encoding          { panic("illegal") }
 | ||||
| 
 | ||||
| type rangeHead struct { | ||||
| 	head       *Head | ||||
| 	mint, maxt int64 | ||||
| } | ||||
| 
 | ||||
| func (h *rangeHead) Index() IndexReader { | ||||
| 	return h.head.indexRange(h.mint, h.maxt) | ||||
| } | ||||
| 
 | ||||
| func (h *rangeHead) Chunks() ChunkReader { | ||||
| 	return h.head.chunksRange(h.mint, h.maxt) | ||||
| } | ||||
| 
 | ||||
| func (h *rangeHead) Tombstones() TombstoneReader { | ||||
| 	return newEmptyTombstoneReader() | ||||
| } | ||||
| 
 | ||||
| type headIndexReader struct { | ||||
| 	head       *Head | ||||
| 	mint, maxt int64 | ||||
|  |  | |||
|  | @ -322,7 +322,8 @@ Outer: | |||
| 		} | ||||
| 
 | ||||
| 		// Compare the result.
 | ||||
| 		q := NewBlockQuerier(head.Index(), head.Chunks(), head.Tombstones(), head.MinTime(), head.MaxTime()) | ||||
| 		q, err := NewBlockQuerier(head, head.MinTime(), head.MaxTime()) | ||||
| 		require.NoError(t, err) | ||||
| 		res := q.Select(labels.NewEqualMatcher("a", "b")) | ||||
| 
 | ||||
| 		expSamples := make([]sample, 0, len(c.remaint)) | ||||
|  |  | |||
							
								
								
									
										4
									
								
								index.go
								
								
								
								
							
							
						
						
									
										4
									
								
								index.go
								
								
								
								
							|  | @ -19,14 +19,14 @@ import ( | |||
| 	"fmt" | ||||
| 	"hash" | ||||
| 	"io" | ||||
| 	"math" | ||||
| 	"os" | ||||
| 	"path/filepath" | ||||
| 	"sort" | ||||
| 	"strings" | ||||
| 	"math" | ||||
| 
 | ||||
| 	"github.com/prometheus/tsdb/fileutil" | ||||
| 	"github.com/pkg/errors" | ||||
| 	"github.com/prometheus/tsdb/fileutil" | ||||
| 	"github.com/prometheus/tsdb/labels" | ||||
| ) | ||||
| 
 | ||||
|  |  | |||
|  | @ -40,12 +40,15 @@ type mockIndex struct { | |||
| } | ||||
| 
 | ||||
| func newMockIndex() mockIndex { | ||||
| 	return mockIndex{ | ||||
| 	ix := mockIndex{ | ||||
| 		series:     make(map[uint64]series), | ||||
| 		labelIndex: make(map[string][]string), | ||||
| 		postings:   newMemPostings(), | ||||
| 		symbols:    make(map[string]struct{}), | ||||
| 	} | ||||
| 	ix.postings.ensureOrder() | ||||
| 
 | ||||
| 	return ix | ||||
| } | ||||
| 
 | ||||
| func (m mockIndex) Symbols() (map[string]struct{}, error) { | ||||
|  | @ -277,6 +280,7 @@ func TestPersistence_index_e2e(t *testing.T) { | |||
| 		postings = newMemPostings() | ||||
| 		values   = map[string]stringset{} | ||||
| 	) | ||||
| 	postings.ensureOrder() | ||||
| 
 | ||||
| 	mi := newMockIndex() | ||||
| 
 | ||||
|  |  | |||
							
								
								
									
										35
									
								
								querier.go
								
								
								
								
							
							
						
						
									
										35
									
								
								querier.go
								
								
								
								
							|  | @ -18,6 +18,7 @@ import ( | |||
| 	"sort" | ||||
| 	"strings" | ||||
| 
 | ||||
| 	"github.com/pkg/errors" | ||||
| 	"github.com/prometheus/tsdb/chunks" | ||||
| 	"github.com/prometheus/tsdb/labels" | ||||
| ) | ||||
|  | @ -50,7 +51,6 @@ type Series interface { | |||
| // querier aggregates querying results from time blocks within
 | ||||
| // a single partition.
 | ||||
| type querier struct { | ||||
| 	db     *DB | ||||
| 	blocks []Querier | ||||
| } | ||||
| 
 | ||||
|  | @ -103,21 +103,30 @@ func (q *querier) Close() error { | |||
| 	for _, bq := range q.blocks { | ||||
| 		merr.Add(bq.Close()) | ||||
| 	} | ||||
| 	q.db.mtx.RUnlock() | ||||
| 
 | ||||
| 	return merr.Err() | ||||
| } | ||||
| 
 | ||||
| // NewBlockQuerier returns a queries against the readers.
 | ||||
| func NewBlockQuerier(ir IndexReader, cr ChunkReader, tr TombstoneReader, mint, maxt int64) Querier { | ||||
| func NewBlockQuerier(b BlockReader, mint, maxt int64) (Querier, error) { | ||||
| 	indexr, err := b.Index() | ||||
| 	if err != nil { | ||||
| 		return nil, errors.Wrapf(err, "open index reader") | ||||
| 	} | ||||
| 	chunkr, err := b.Chunks() | ||||
| 	if err != nil { | ||||
| 		return nil, errors.Wrapf(err, "open chunk reader") | ||||
| 	} | ||||
| 	tombsr, err := b.Tombstones() | ||||
| 	if err != nil { | ||||
| 		return nil, errors.Wrapf(err, "open tombstone reader") | ||||
| 	} | ||||
| 	return &blockQuerier{ | ||||
| 		index:      ir, | ||||
| 		chunks:     cr, | ||||
| 		tombstones: tr, | ||||
| 
 | ||||
| 		mint:       mint, | ||||
| 		maxt:       maxt, | ||||
| 	} | ||||
| 		index:      indexr, | ||||
| 		chunks:     chunkr, | ||||
| 		tombstones: tombsr, | ||||
| 	}, nil | ||||
| } | ||||
| 
 | ||||
| // blockQuerier provides querying access to a single block database.
 | ||||
|  | @ -175,7 +184,13 @@ func (q *blockQuerier) LabelValuesFor(string, labels.Label) ([]string, error) { | |||
| } | ||||
| 
 | ||||
| func (q *blockQuerier) Close() error { | ||||
| 	return nil | ||||
| 	var merr MultiError | ||||
| 
 | ||||
| 	merr.Add(q.index.Close()) | ||||
| 	merr.Add(q.chunks.Close()) | ||||
| 	merr.Add(q.tombstones.Close()) | ||||
| 
 | ||||
| 	return merr.Err() | ||||
| } | ||||
| 
 | ||||
| // postingsReader is used to select matching postings from an IndexReader.
 | ||||
|  |  | |||
|  | @ -33,9 +33,11 @@ const ( | |||
| 	tombstoneFormatV1 = 1 | ||||
| ) | ||||
| 
 | ||||
| // TombstoneReader is the iterator over tombstones.
 | ||||
| // TombstoneReader gives access to tombstone intervals by series reference.
 | ||||
| type TombstoneReader interface { | ||||
| 	Get(ref uint64) Intervals | ||||
| 
 | ||||
| 	Close() error | ||||
| } | ||||
| 
 | ||||
| func writeTombstoneFile(dir string, tr tombstoneReader) error { | ||||
|  | @ -154,6 +156,10 @@ func (t tombstoneReader) add(ref uint64, itv Interval) { | |||
| 	t[ref] = t[ref].add(itv) | ||||
| } | ||||
| 
 | ||||
| func (tombstoneReader) Close() error { | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| // Interval represents a single time-interval.
 | ||||
| type Interval struct { | ||||
| 	Mint, Maxt int64 | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue