promql: add test for race conditions in query engine (#11743)
* promql: refactor BenchmarkRangeQuery so we can re-use test cases Signed-off-by: Bryan Boreham <bjboreham@gmail.com> * promql: add test for race conditions in query engine Note we skip large count_values queries - `count_values` allocates a slice per unique value in the output, and this test has unique values on every step of every series so it adds up to a lot of slices. Add Go runtime overhead for checking `-race`, and it chews up many gigabytes. Signed-off-by: Bryan Boreham <bjboreham@gmail.com> * TestConcurrentRangeQueries: wait before starting goroutine Instead of starting 100 goroutines which just wait for the semaphore. Signed-off-by: Bryan Boreham <bjboreham@gmail.com> Signed-off-by: Bryan Boreham <bjboreham@gmail.com>
This commit is contained in:
		
							parent
							
								
									6fd89a6fd2
								
							
						
					
					
						commit
						dbd7021cc2
					
				|  | @ -27,17 +27,7 @@ import ( | ||||||
| 	"github.com/prometheus/prometheus/util/teststorage" | 	"github.com/prometheus/prometheus/util/teststorage" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| func BenchmarkRangeQuery(b *testing.B) { | func setupRangeQueryTestData(stor *teststorage.TestStorage, engine *Engine, interval, numIntervals int) error { | ||||||
| 	stor := teststorage.New(b) |  | ||||||
| 	defer stor.Close() |  | ||||||
| 	opts := EngineOpts{ |  | ||||||
| 		Logger:     nil, |  | ||||||
| 		Reg:        nil, |  | ||||||
| 		MaxSamples: 50000000, |  | ||||||
| 		Timeout:    100 * time.Second, |  | ||||||
| 	} |  | ||||||
| 	engine := NewEngine(opts) |  | ||||||
| 
 |  | ||||||
| 	metrics := []labels.Labels{} | 	metrics := []labels.Labels{} | ||||||
| 	metrics = append(metrics, labels.FromStrings("__name__", "a_one")) | 	metrics = append(metrics, labels.FromStrings("__name__", "a_one")) | ||||||
| 	metrics = append(metrics, labels.FromStrings("__name__", "b_one")) | 	metrics = append(metrics, labels.FromStrings("__name__", "b_one")) | ||||||
|  | @ -65,25 +55,26 @@ func BenchmarkRangeQuery(b *testing.B) { | ||||||
| 	} | 	} | ||||||
| 	refs := make([]storage.SeriesRef, len(metrics)) | 	refs := make([]storage.SeriesRef, len(metrics)) | ||||||
| 
 | 
 | ||||||
| 	// A day of data plus 10k steps.
 |  | ||||||
| 	numIntervals := 8640 + 10000 |  | ||||||
| 
 |  | ||||||
| 	for s := 0; s < numIntervals; s++ { | 	for s := 0; s < numIntervals; s++ { | ||||||
| 		a := stor.Appender(context.Background()) | 		a := stor.Appender(context.Background()) | ||||||
| 		ts := int64(s * 10000) // 10s interval.
 | 		ts := int64(s * interval) | ||||||
| 		for i, metric := range metrics { | 		for i, metric := range metrics { | ||||||
| 			ref, _ := a.Append(refs[i], metric, ts, float64(s)+float64(i)/float64(len(metrics))) | 			ref, _ := a.Append(refs[i], metric, ts, float64(s)+float64(i)/float64(len(metrics))) | ||||||
| 			refs[i] = ref | 			refs[i] = ref | ||||||
| 		} | 		} | ||||||
| 		if err := a.Commit(); err != nil { | 		if err := a.Commit(); err != nil { | ||||||
| 			b.Fatal(err) | 			return err | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
| 
 | 
 | ||||||
| 	type benchCase struct { | type benchCase struct { | ||||||
| 		expr  string | 	expr  string | ||||||
| 		steps int | 	steps int | ||||||
| 	} | } | ||||||
|  | 
 | ||||||
|  | func rangeQueryCases() []benchCase { | ||||||
| 	cases := []benchCase{ | 	cases := []benchCase{ | ||||||
| 		// Plain retrieval.
 | 		// Plain retrieval.
 | ||||||
| 		{ | 		{ | ||||||
|  | @ -210,7 +201,30 @@ func BenchmarkRangeQuery(b *testing.B) { | ||||||
| 			tmp = append(tmp, benchCase{expr: c.expr, steps: 1000}) | 			tmp = append(tmp, benchCase{expr: c.expr, steps: 1000}) | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 	cases = tmp | 	return tmp | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func BenchmarkRangeQuery(b *testing.B) { | ||||||
|  | 	stor := teststorage.New(b) | ||||||
|  | 	defer stor.Close() | ||||||
|  | 	opts := EngineOpts{ | ||||||
|  | 		Logger:     nil, | ||||||
|  | 		Reg:        nil, | ||||||
|  | 		MaxSamples: 50000000, | ||||||
|  | 		Timeout:    100 * time.Second, | ||||||
|  | 	} | ||||||
|  | 	engine := NewEngine(opts) | ||||||
|  | 
 | ||||||
|  | 	const interval = 10000 // 10s interval.
 | ||||||
|  | 	// A day of data plus 10k steps.
 | ||||||
|  | 	numIntervals := 8640 + 10000 | ||||||
|  | 
 | ||||||
|  | 	err := setupRangeQueryTestData(stor, engine, interval, numIntervals) | ||||||
|  | 	if err != nil { | ||||||
|  | 		b.Fatal(err) | ||||||
|  | 	} | ||||||
|  | 	cases := rangeQueryCases() | ||||||
|  | 
 | ||||||
| 	for _, c := range cases { | 	for _, c := range cases { | ||||||
| 		name := fmt.Sprintf("expr=%s,steps=%d", c.expr, c.steps) | 		name := fmt.Sprintf("expr=%s,steps=%d", c.expr, c.steps) | ||||||
| 		b.Run(name, func(b *testing.B) { | 		b.Run(name, func(b *testing.B) { | ||||||
|  |  | ||||||
|  | @ -14,10 +14,16 @@ | ||||||
| package promql | package promql | ||||||
| 
 | 
 | ||||||
| import ( | import ( | ||||||
|  | 	"context" | ||||||
| 	"path/filepath" | 	"path/filepath" | ||||||
|  | 	"strings" | ||||||
| 	"testing" | 	"testing" | ||||||
|  | 	"time" | ||||||
| 
 | 
 | ||||||
| 	"github.com/stretchr/testify/require" | 	"github.com/stretchr/testify/require" | ||||||
|  | 	"golang.org/x/sync/errgroup" | ||||||
|  | 
 | ||||||
|  | 	"github.com/prometheus/prometheus/util/teststorage" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| func TestEvaluations(t *testing.T) { | func TestEvaluations(t *testing.T) { | ||||||
|  | @ -34,3 +40,60 @@ func TestEvaluations(t *testing.T) { | ||||||
| 		}) | 		}) | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  | 
 | ||||||
|  | // Run a lot of queries at the same time, to check for race conditions.
 | ||||||
|  | func TestConcurrentRangeQueries(t *testing.T) { | ||||||
|  | 	stor := teststorage.New(t) | ||||||
|  | 	defer stor.Close() | ||||||
|  | 	opts := EngineOpts{ | ||||||
|  | 		Logger:     nil, | ||||||
|  | 		Reg:        nil, | ||||||
|  | 		MaxSamples: 50000000, | ||||||
|  | 		Timeout:    100 * time.Second, | ||||||
|  | 	} | ||||||
|  | 	engine := NewEngine(opts) | ||||||
|  | 
 | ||||||
|  | 	const interval = 10000 // 10s interval.
 | ||||||
|  | 	// A day of data plus 10k steps.
 | ||||||
|  | 	numIntervals := 8640 + 10000 | ||||||
|  | 	err := setupRangeQueryTestData(stor, engine, interval, numIntervals) | ||||||
|  | 	require.NoError(t, err) | ||||||
|  | 
 | ||||||
|  | 	cases := rangeQueryCases() | ||||||
|  | 
 | ||||||
|  | 	// Limit the number of queries running at the same time.
 | ||||||
|  | 	const numConcurrent = 4 | ||||||
|  | 	sem := make(chan struct{}, numConcurrent) | ||||||
|  | 	for i := 0; i < numConcurrent; i++ { | ||||||
|  | 		sem <- struct{}{} | ||||||
|  | 	} | ||||||
|  | 	var g errgroup.Group | ||||||
|  | 	for _, c := range cases { | ||||||
|  | 		c := c | ||||||
|  | 		if strings.Contains(c.expr, "count_values") && c.steps > 10 { | ||||||
|  | 			continue // This test is too big to run with -race.
 | ||||||
|  | 		} | ||||||
|  | 		<-sem | ||||||
|  | 		g.Go(func() error { | ||||||
|  | 			defer func() { | ||||||
|  | 				sem <- struct{}{} | ||||||
|  | 			}() | ||||||
|  | 			qry, err := engine.NewRangeQuery( | ||||||
|  | 				stor, nil, c.expr, | ||||||
|  | 				time.Unix(int64((numIntervals-c.steps)*10), 0), | ||||||
|  | 				time.Unix(int64(numIntervals*10), 0), time.Second*10) | ||||||
|  | 			if err != nil { | ||||||
|  | 				return err | ||||||
|  | 			} | ||||||
|  | 			res := qry.Exec(context.Background()) | ||||||
|  | 			if res.Err != nil { | ||||||
|  | 				return res.Err | ||||||
|  | 			} | ||||||
|  | 			qry.Close() | ||||||
|  | 			return nil | ||||||
|  | 		}) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	err = g.Wait() | ||||||
|  | 	require.NoError(t, err) | ||||||
|  | } | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue