Correctly close querier on error, revendor tsdb
This commit is contained in:
		
							parent
							
								
									3abf54c660
								
							
						
					
					
						commit
						b09b90a940
					
				|  | @ -345,10 +345,16 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) ( | ||||||
| 	prepareTimer.Stop() | 	prepareTimer.Stop() | ||||||
| 	queryPrepareTime.Observe(prepareTimer.ElapsedTime().Seconds()) | 	queryPrepareTime.Observe(prepareTimer.ElapsedTime().Seconds()) | ||||||
| 
 | 
 | ||||||
|  | 	// XXX(fabxc): the querier returned by populateIterators might be instantiated
 | ||||||
|  | 	// we must not return without closing irrespective of the error.
 | ||||||
|  | 	// TODO: make this semantically saner.
 | ||||||
|  | 	if querier != nil { | ||||||
|  | 		defer querier.Close() | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
| 	defer querier.Close() |  | ||||||
| 
 | 
 | ||||||
| 	evalTimer := query.stats.GetTimer(stats.InnerEvalTime).Start() | 	evalTimer := query.stats.GetTimer(stats.InnerEvalTime).Start() | ||||||
| 	// Instant evaluation.
 | 	// Instant evaluation.
 | ||||||
|  |  | ||||||
|  | @ -369,10 +369,7 @@ func (db *DB) reloadBlocks() error { | ||||||
| 			} | 			} | ||||||
| 			heads = append(heads, b.(*headBlock)) | 			heads = append(heads, b.(*headBlock)) | ||||||
| 		} else { | 		} else { | ||||||
| 			if ok && meta.ULID != b.Meta().ULID { | 			if !ok || meta.ULID != b.Meta().ULID { | ||||||
| 				if err := b.Close(); err != nil { |  | ||||||
| 					return err |  | ||||||
| 				} |  | ||||||
| 				b, err = newPersistedBlock(dirs[i]) | 				b, err = newPersistedBlock(dirs[i]) | ||||||
| 				if err != nil { | 				if err != nil { | ||||||
| 					return errors.Wrapf(err, "open persisted block %s", dirs[i]) | 					return errors.Wrapf(err, "open persisted block %s", dirs[i]) | ||||||
|  | @ -385,7 +382,7 @@ func (db *DB) reloadBlocks() error { | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	for seq, b := range db.seqBlocks { | 	for seq, b := range db.seqBlocks { | ||||||
| 		if _, ok := seqBlocks[seq]; !ok { | 		if nb, ok := seqBlocks[seq]; !ok || nb != b { | ||||||
| 			if err := b.Close(); err != nil { | 			if err := b.Close(); err != nil { | ||||||
| 				return errors.Wrapf(err, "closing removed block %d", b.Meta().Sequence) | 				return errors.Wrapf(err, "closing removed block %d", b.Meta().Sequence) | ||||||
| 			} | 			} | ||||||
|  |  | ||||||
|  | @ -299,6 +299,8 @@ func (w *WAL) entry(et WALEntryType, flag byte, buf []byte) error { | ||||||
| 		sz    = int64(6 + 4 + len(buf)) | 		sz    = int64(6 + 4 + len(buf)) | ||||||
| 		newsz = w.curN + sz | 		newsz = w.curN + sz | ||||||
| 	) | 	) | ||||||
|  | 	// XXX(fabxc): this currently cuts a new file whenever the WAL was newly opened.
 | ||||||
|  | 	// Probably fine in general but may yield a lot of short files in some cases.
 | ||||||
| 	if w.cur == nil || w.curN > w.segmentSize || newsz > w.segmentSize && sz <= w.segmentSize { | 	if w.cur == nil || w.curN > w.segmentSize || newsz > w.segmentSize && sz <= w.segmentSize { | ||||||
| 		if err := w.cut(); err != nil { | 		if err := w.cut(); err != nil { | ||||||
| 			return err | 			return err | ||||||
|  | @ -431,6 +433,8 @@ func NewWALReader(rs ...io.ReadCloser) *WALReader { | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // At returns the last decoded entry of labels or samples.
 | // At returns the last decoded entry of labels or samples.
 | ||||||
|  | // The returned slices are only valid until the next call to Next(). Their elements
 | ||||||
|  | // have to be copied to preserve them.
 | ||||||
| func (r *WALReader) At() ([]labels.Labels, []refdSample) { | func (r *WALReader) At() ([]labels.Labels, []refdSample) { | ||||||
| 	return r.labels, r.samples | 	return r.labels, r.samples | ||||||
| } | } | ||||||
|  |  | ||||||
|  | @ -368,10 +368,10 @@ | ||||||
| 			"revisionTime": "2016-09-30T00:14:02Z" | 			"revisionTime": "2016-09-30T00:14:02Z" | ||||||
| 		}, | 		}, | ||||||
| 		{ | 		{ | ||||||
| 			"checksumSHA1": "Aj4Cn1RClamxluIri/LQMnK/yB4=", | 			"checksumSHA1": "hnxY08GfzanNSvD8vjz/wSWnwmk=", | ||||||
| 			"path": "github.com/fabxc/tsdb", | 			"path": "github.com/fabxc/tsdb", | ||||||
| 			"revision": "ca1bc920b795cfc670002e7643471b0277e79a9b", | 			"revision": "32c32013a6d2a8ee5fb231d3f3cb5538128650d2", | ||||||
| 			"revisionTime": "2017-03-08T15:54:13Z" | 			"revisionTime": "2017-03-09T14:40:13Z" | ||||||
| 		}, | 		}, | ||||||
| 		{ | 		{ | ||||||
| 			"checksumSHA1": "uVzWuLvF646YjiKomsc2CR1ua58=", | 			"checksumSHA1": "uVzWuLvF646YjiKomsc2CR1ua58=", | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue