362 lines
		
	
	
		
			7.9 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			362 lines
		
	
	
		
			7.9 KiB
		
	
	
	
		
			Go
		
	
	
	
| // Copyright 2017 The Prometheus Authors
 | |
| // Licensed under the Apache License, Version 2.0 (the "License");
 | |
| // you may not use this file except in compliance with the License.
 | |
| // You may obtain a copy of the License at
 | |
| //
 | |
| // http://www.apache.org/licenses/LICENSE-2.0
 | |
| //
 | |
| // Unless required by applicable law or agreed to in writing, software
 | |
| // distributed under the License is distributed on an "AS IS" BASIS,
 | |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| // See the License for the specific language governing permissions and
 | |
| // limitations under the License.
 | |
| 
 | |
| package tsdb
 | |
| 
 | |
| import (
 | |
| 	"bufio"
 | |
| 	"encoding/binary"
 | |
| 	"fmt"
 | |
| 	"hash"
 | |
| 	"io"
 | |
| 	"os"
 | |
| 
 | |
| 	"github.com/prometheus/tsdb/fileutil"
 | |
| 	"github.com/pkg/errors"
 | |
| 	"github.com/prometheus/tsdb/chunks"
 | |
| )
 | |
| 
 | |
| const (
 | |
| 	// MagicChunks is 4 bytes at the head of a series file.
 | |
| 	MagicChunks = 0x85BD40DD
 | |
| )
 | |
| 
 | |
| // ChunkMeta holds information about a chunk of data.
 | |
| type ChunkMeta struct {
 | |
| 	// Ref and Chunk hold either a reference that can be used to retrieve
 | |
| 	// chunk data or the data itself.
 | |
| 	// Generally, only one of them is set.
 | |
| 	Ref   uint64
 | |
| 	Chunk chunks.Chunk
 | |
| 
 | |
| 	MinTime, MaxTime int64 // time range the data covers
 | |
| }
 | |
| 
 | |
| // writeHash writes the chunk encoding and raw data into the provided hash.
 | |
| func (cm *ChunkMeta) writeHash(h hash.Hash) error {
 | |
| 	if _, err := h.Write([]byte{byte(cm.Chunk.Encoding())}); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	if _, err := h.Write(cm.Chunk.Bytes()); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // deletedIterator wraps an Iterator and makes sure any deleted metrics are not
 | |
| // returned.
 | |
| type deletedIterator struct {
 | |
| 	it chunks.Iterator
 | |
| 
 | |
| 	intervals Intervals
 | |
| }
 | |
| 
 | |
| func (it *deletedIterator) At() (int64, float64) {
 | |
| 	return it.it.At()
 | |
| }
 | |
| 
 | |
| func (it *deletedIterator) Next() bool {
 | |
| Outer:
 | |
| 	for it.it.Next() {
 | |
| 		ts, _ := it.it.At()
 | |
| 
 | |
| 		for _, tr := range it.intervals {
 | |
| 			if tr.inBounds(ts) {
 | |
| 				continue Outer
 | |
| 			}
 | |
| 
 | |
| 			if ts > tr.Maxt {
 | |
| 				it.intervals = it.intervals[1:]
 | |
| 				continue
 | |
| 			}
 | |
| 
 | |
| 			return true
 | |
| 		}
 | |
| 
 | |
| 		return true
 | |
| 	}
 | |
| 
 | |
| 	return false
 | |
| }
 | |
| 
 | |
| func (it *deletedIterator) Err() error {
 | |
| 	return it.it.Err()
 | |
| }
 | |
| 
 | |
| // ChunkWriter serializes a time block of chunked series data.
 | |
| type ChunkWriter interface {
 | |
| 	// WriteChunks writes several chunks. The Chunk field of the ChunkMetas
 | |
| 	// must be populated.
 | |
| 	// After returning successfully, the Ref fields in the ChunkMetas
 | |
| 	// are set and can be used to retrieve the chunks from the written data.
 | |
| 	WriteChunks(chunks ...ChunkMeta) error
 | |
| 
 | |
| 	// Close writes any required finalization and closes the resources
 | |
| 	// associated with the underlying writer.
 | |
| 	Close() error
 | |
| }
 | |
| 
 | |
| // chunkWriter implements the ChunkWriter interface for the standard
 | |
| // serialization format.
 | |
| type chunkWriter struct {
 | |
| 	dirFile *os.File
 | |
| 	files   []*os.File
 | |
| 	wbuf    *bufio.Writer
 | |
| 	n       int64
 | |
| 	crc32   hash.Hash
 | |
| 
 | |
| 	segmentSize int64
 | |
| }
 | |
| 
 | |
| const (
 | |
| 	defaultChunkSegmentSize = 512 * 1024 * 1024
 | |
| 
 | |
| 	chunksFormatV1 = 1
 | |
| )
 | |
| 
 | |
| func newChunkWriter(dir string) (*chunkWriter, error) {
 | |
| 	if err := os.MkdirAll(dir, 0777); err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	dirFile, err := fileutil.OpenDir(dir)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	cw := &chunkWriter{
 | |
| 		dirFile:     dirFile,
 | |
| 		n:           0,
 | |
| 		crc32:       newCRC32(),
 | |
| 		segmentSize: defaultChunkSegmentSize,
 | |
| 	}
 | |
| 	return cw, nil
 | |
| }
 | |
| 
 | |
| func (w *chunkWriter) tail() *os.File {
 | |
| 	if len(w.files) == 0 {
 | |
| 		return nil
 | |
| 	}
 | |
| 	return w.files[len(w.files)-1]
 | |
| }
 | |
| 
 | |
| // finalizeTail writes all pending data to the current tail file,
 | |
| // truncates its size, and closes it.
 | |
| func (w *chunkWriter) finalizeTail() error {
 | |
| 	tf := w.tail()
 | |
| 	if tf == nil {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	if err := w.wbuf.Flush(); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	if err := fileutil.Fsync(tf); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	// As the file was pre-allocated, we truncate any superfluous zero bytes.
 | |
| 	off, err := tf.Seek(0, os.SEEK_CUR)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	if err := tf.Truncate(off); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	return tf.Close()
 | |
| }
 | |
| 
 | |
| func (w *chunkWriter) cut() error {
 | |
| 	// Sync current tail to disk and close.
 | |
| 	if err := w.finalizeTail(); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	p, _, err := nextSequenceFile(w.dirFile.Name())
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	f, err := os.OpenFile(p, os.O_WRONLY|os.O_CREATE, 0666)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	if err = fileutil.Preallocate(f, w.segmentSize, true); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	if err = w.dirFile.Sync(); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	// Write header metadata for new file.
 | |
| 
 | |
| 	metab := make([]byte, 8)
 | |
| 	binary.BigEndian.PutUint32(metab[:4], MagicChunks)
 | |
| 	metab[4] = chunksFormatV1
 | |
| 
 | |
| 	if _, err := f.Write(metab); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	w.files = append(w.files, f)
 | |
| 	if w.wbuf != nil {
 | |
| 		w.wbuf.Reset(f)
 | |
| 	} else {
 | |
| 		w.wbuf = bufio.NewWriterSize(f, 8*1024*1024)
 | |
| 	}
 | |
| 	w.n = 8
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (w *chunkWriter) write(b []byte) error {
 | |
| 	n, err := w.wbuf.Write(b)
 | |
| 	w.n += int64(n)
 | |
| 	return err
 | |
| }
 | |
| 
 | |
| func (w *chunkWriter) WriteChunks(chks ...ChunkMeta) error {
 | |
| 	// Calculate maximum space we need and cut a new segment in case
 | |
| 	// we don't fit into the current one.
 | |
| 	maxLen := int64(binary.MaxVarintLen32) // The number of chunks.
 | |
| 	for _, c := range chks {
 | |
| 		maxLen += binary.MaxVarintLen32 + 1 // The number of bytes in the chunk and its encoding.
 | |
| 		maxLen += int64(len(c.Chunk.Bytes()))
 | |
| 	}
 | |
| 	newsz := w.n + maxLen
 | |
| 
 | |
| 	if w.wbuf == nil || w.n > w.segmentSize || newsz > w.segmentSize && maxLen <= w.segmentSize {
 | |
| 		if err := w.cut(); err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	var (
 | |
| 		b   = [binary.MaxVarintLen32]byte{}
 | |
| 		seq = uint64(w.seq()) << 32
 | |
| 	)
 | |
| 	for i := range chks {
 | |
| 		chk := &chks[i]
 | |
| 
 | |
| 		chk.Ref = seq | uint64(w.n)
 | |
| 
 | |
| 		n := binary.PutUvarint(b[:], uint64(len(chk.Chunk.Bytes())))
 | |
| 
 | |
| 		if err := w.write(b[:n]); err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		b[0] = byte(chk.Chunk.Encoding())
 | |
| 		if err := w.write(b[:1]); err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		if err := w.write(chk.Chunk.Bytes()); err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 
 | |
| 		w.crc32.Reset()
 | |
| 		if err := chk.writeHash(w.crc32); err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		if err := w.write(w.crc32.Sum(b[:0])); err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (w *chunkWriter) seq() int {
 | |
| 	return len(w.files) - 1
 | |
| }
 | |
| 
 | |
| func (w *chunkWriter) Close() error {
 | |
| 	return w.finalizeTail()
 | |
| }
 | |
| 
 | |
| // ChunkReader provides reading access of serialized time series data.
 | |
| type ChunkReader interface {
 | |
| 	// Chunk returns the series data chunk with the given reference.
 | |
| 	Chunk(ref uint64) (chunks.Chunk, error)
 | |
| 
 | |
| 	// Close releases all underlying resources of the reader.
 | |
| 	Close() error
 | |
| }
 | |
| 
 | |
| // chunkReader implements a SeriesReader for a serialized byte stream
 | |
| // of series data.
 | |
| type chunkReader struct {
 | |
| 	// The underlying bytes holding the encoded series data.
 | |
| 	bs [][]byte
 | |
| 
 | |
| 	// Closers for resources behind the byte slices.
 | |
| 	cs []io.Closer
 | |
| 
 | |
| 	pool chunks.Pool
 | |
| }
 | |
| 
 | |
| // newChunkReader returns a new chunkReader based on mmaped files found in dir.
 | |
| func newChunkReader(dir string, pool chunks.Pool) (*chunkReader, error) {
 | |
| 	files, err := sequenceFiles(dir)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	if pool == nil {
 | |
| 		pool = chunks.NewPool()
 | |
| 	}
 | |
| 	cr := chunkReader{pool: pool}
 | |
| 
 | |
| 	for _, fn := range files {
 | |
| 		f, err := openMmapFile(fn)
 | |
| 		if err != nil {
 | |
| 			return nil, errors.Wrapf(err, "mmap files")
 | |
| 		}
 | |
| 		cr.cs = append(cr.cs, f)
 | |
| 		cr.bs = append(cr.bs, f.b)
 | |
| 	}
 | |
| 
 | |
| 	for i, b := range cr.bs {
 | |
| 		if len(b) < 4 {
 | |
| 			return nil, errors.Wrapf(errInvalidSize, "validate magic in segment %d", i)
 | |
| 		}
 | |
| 		// Verify magic number.
 | |
| 		if m := binary.BigEndian.Uint32(b[:4]); m != MagicChunks {
 | |
| 			return nil, fmt.Errorf("invalid magic number %x", m)
 | |
| 		}
 | |
| 	}
 | |
| 	return &cr, nil
 | |
| }
 | |
| 
 | |
| func (s *chunkReader) Close() error {
 | |
| 	return closeAll(s.cs...)
 | |
| }
 | |
| 
 | |
| func (s *chunkReader) Chunk(ref uint64) (chunks.Chunk, error) {
 | |
| 	var (
 | |
| 		seq = int(ref >> 32)
 | |
| 		off = int((ref << 32) >> 32)
 | |
| 	)
 | |
| 	if seq >= len(s.bs) {
 | |
| 		return nil, errors.Errorf("reference sequence %d out of range", seq)
 | |
| 	}
 | |
| 	b := s.bs[seq]
 | |
| 
 | |
| 	if int(off) >= len(b) {
 | |
| 		return nil, errors.Errorf("offset %d beyond data size %d", off, len(b))
 | |
| 	}
 | |
| 	b = b[off:]
 | |
| 
 | |
| 	l, n := binary.Uvarint(b)
 | |
| 	if n < 0 {
 | |
| 		return nil, fmt.Errorf("reading chunk length failed")
 | |
| 	}
 | |
| 	b = b[n:]
 | |
| 
 | |
| 	return s.pool.Get(chunks.Encoding(b[0]), b[1:1+l])
 | |
| }
 |