Rework sample limit to work for 2.0
Correctly update reported series. Increment prometheus_target_scrapes_exceeded_sample_limit_total. Add back unittests. Ignore stale markers when calculating sample limit. Fixes #2770
This commit is contained in:
		
							parent
							
								
									72a276e7ed
								
							
						
					
					
						commit
						37bc607e96
					
				|  | @ -515,10 +515,10 @@ mainLoop: | ||||||
| 		// A failed scrape is the same as an empty scrape,
 | 		// A failed scrape is the same as an empty scrape,
 | ||||||
| 		// we still call sl.append to trigger stale markers.
 | 		// we still call sl.append to trigger stale markers.
 | ||||||
| 		if total, added, err = sl.append(b, start); err != nil { | 		if total, added, err = sl.append(b, start); err != nil { | ||||||
| 			sl.l.With("err", err).Error("append failed") | 			sl.l.With("err", err).Warn("append failed") | ||||||
| 			// The append failed, probably due to a parse error.
 | 			// The append failed, probably due to a parse error or sample limit.
 | ||||||
| 			// Call sl.append again with an empty scrape to trigger stale markers.
 | 			// Call sl.append again with an empty scrape to trigger stale markers.
 | ||||||
| 			if _, _, err = sl.append([]byte{}, start); err != nil { | 			if _, _, err := sl.append([]byte{}, start); err != nil { | ||||||
| 				sl.l.With("err", err).Error("append failed") | 				sl.l.With("err", err).Error("append failed") | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
|  | @ -625,6 +625,7 @@ func (sl *scrapeLoop) append(b []byte, ts time.Time) (total, added int, err erro | ||||||
| 		numOutOfOrder = 0 | 		numOutOfOrder = 0 | ||||||
| 		numDuplicates = 0 | 		numDuplicates = 0 | ||||||
| 	) | 	) | ||||||
|  | 	var sampleLimitErr error | ||||||
| 
 | 
 | ||||||
| loop: | loop: | ||||||
| 	for p.Next() { | 	for p.Next() { | ||||||
|  | @ -658,6 +659,12 @@ loop: | ||||||
| 				numDuplicates += 1 | 				numDuplicates += 1 | ||||||
| 				sl.l.With("timeseries", string(met)).Debug("Duplicate sample for timestamp") | 				sl.l.With("timeseries", string(met)).Debug("Duplicate sample for timestamp") | ||||||
| 				continue | 				continue | ||||||
|  | 			case errSampleLimit: | ||||||
|  | 				// Keep on parsing output if we hit the limit, so we report the correct
 | ||||||
|  | 				// total number of samples scraped.
 | ||||||
|  | 				sampleLimitErr = err | ||||||
|  | 				added++ | ||||||
|  | 				continue | ||||||
| 			default: | 			default: | ||||||
| 				break loop | 				break loop | ||||||
| 			} | 			} | ||||||
|  | @ -683,6 +690,10 @@ loop: | ||||||
| 				numDuplicates += 1 | 				numDuplicates += 1 | ||||||
| 				sl.l.With("timeseries", string(met)).Debug("Duplicate sample for timestamp") | 				sl.l.With("timeseries", string(met)).Debug("Duplicate sample for timestamp") | ||||||
| 				continue | 				continue | ||||||
|  | 			case errSampleLimit: | ||||||
|  | 				sampleLimitErr = err | ||||||
|  | 				added++ | ||||||
|  | 				continue | ||||||
| 			default: | 			default: | ||||||
| 				break loop | 				break loop | ||||||
| 			} | 			} | ||||||
|  | @ -701,6 +712,10 @@ loop: | ||||||
| 	if err == nil { | 	if err == nil { | ||||||
| 		err = p.Err() | 		err = p.Err() | ||||||
| 	} | 	} | ||||||
|  | 	if err == nil && sampleLimitErr != nil { | ||||||
|  | 		targetScrapeSampleLimit.Inc() | ||||||
|  | 		err = sampleLimitErr | ||||||
|  | 	} | ||||||
| 	if numOutOfOrder > 0 { | 	if numOutOfOrder > 0 { | ||||||
| 		sl.l.With("numDropped", numOutOfOrder).Warn("Error on ingesting out-of-order samples") | 		sl.l.With("numDropped", numOutOfOrder).Warn("Error on ingesting out-of-order samples") | ||||||
| 	} | 	} | ||||||
|  | @ -730,10 +745,10 @@ loop: | ||||||
| 	} | 	} | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		app.Rollback() | 		app.Rollback() | ||||||
| 		return total, 0, err | 		return total, added, err | ||||||
| 	} | 	} | ||||||
| 	if err := app.Commit(); err != nil { | 	if err := app.Commit(); err != nil { | ||||||
| 		return total, 0, err | 		return total, added, err | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	// Swap current and previous series.
 | 	// Swap current and previous series.
 | ||||||
|  |  | ||||||
|  | @ -718,6 +718,116 @@ func TestScrapeLoopAppendNoStalenessIfTimestamp(t *testing.T) { | ||||||
| 
 | 
 | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | func TestScrapeLoopRunAppliesScrapeLimit(t *testing.T) { | ||||||
|  | 
 | ||||||
|  | 	cases := []struct { | ||||||
|  | 		appender                                  func() storage.Appender | ||||||
|  | 		up                                        float64 | ||||||
|  | 		scrapeSamplesScraped                      float64 | ||||||
|  | 		scrapeSamplesScrapedPostMetricRelabelling float64 | ||||||
|  | 	}{ | ||||||
|  | 		{ | ||||||
|  | 			appender:                                  func() storage.Appender { return nopAppender{} }, | ||||||
|  | 			up:                                        1, | ||||||
|  | 			scrapeSamplesScraped:                      3, | ||||||
|  | 			scrapeSamplesScrapedPostMetricRelabelling: 3, | ||||||
|  | 		}, | ||||||
|  | 		{ | ||||||
|  | 			appender: func() storage.Appender { | ||||||
|  | 				return &limitAppender{Appender: nopAppender{}, limit: 3} | ||||||
|  | 			}, | ||||||
|  | 			up:                                        1, | ||||||
|  | 			scrapeSamplesScraped:                      3, | ||||||
|  | 			scrapeSamplesScrapedPostMetricRelabelling: 3, | ||||||
|  | 		}, | ||||||
|  | 		{ | ||||||
|  | 			appender: func() storage.Appender { | ||||||
|  | 				return &limitAppender{Appender: nopAppender{}, limit: 2} | ||||||
|  | 			}, | ||||||
|  | 			up:                                        0, | ||||||
|  | 			scrapeSamplesScraped:                      3, | ||||||
|  | 			scrapeSamplesScrapedPostMetricRelabelling: 3, | ||||||
|  | 		}, | ||||||
|  | 		{ | ||||||
|  | 			appender: func() storage.Appender { | ||||||
|  | 				return &relabelAppender{ | ||||||
|  | 					Appender: &limitAppender{Appender: nopAppender{}, limit: 2}, | ||||||
|  | 					relabelings: []*config.RelabelConfig{ | ||||||
|  | 						&config.RelabelConfig{ | ||||||
|  | 							SourceLabels: model.LabelNames{"__name__"}, | ||||||
|  | 							Regex:        config.MustNewRegexp("a"), | ||||||
|  | 							Action:       config.RelabelDrop, | ||||||
|  | 						}, | ||||||
|  | 					}, | ||||||
|  | 				} | ||||||
|  | 			}, | ||||||
|  | 			up:                                        1, | ||||||
|  | 			scrapeSamplesScraped:                      3, | ||||||
|  | 			scrapeSamplesScrapedPostMetricRelabelling: 2, | ||||||
|  | 		}, | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	for i, c := range cases { | ||||||
|  | 		reportAppender := &collectResultAppender{} | ||||||
|  | 		var ( | ||||||
|  | 			signal     = make(chan struct{}) | ||||||
|  | 			scraper    = &testScraper{} | ||||||
|  | 			numScrapes = 0 | ||||||
|  | 			reportApp  = func() storage.Appender { | ||||||
|  | 				// Get result of the 2nd scrape.
 | ||||||
|  | 				if numScrapes == 2 { | ||||||
|  | 					return reportAppender | ||||||
|  | 				} else { | ||||||
|  | 					return nopAppender{} | ||||||
|  | 				} | ||||||
|  | 			} | ||||||
|  | 		) | ||||||
|  | 		defer close(signal) | ||||||
|  | 
 | ||||||
|  | 		ctx, cancel := context.WithCancel(context.Background()) | ||||||
|  | 		sl := newScrapeLoop(ctx, scraper, c.appender, reportApp, nil) | ||||||
|  | 
 | ||||||
|  | 		// Setup a series to be stale, then 3 samples, then stop.
 | ||||||
|  | 		scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { | ||||||
|  | 			numScrapes += 1 | ||||||
|  | 			if numScrapes == 1 { | ||||||
|  | 				w.Write([]byte("stale 0\n")) | ||||||
|  | 				return nil | ||||||
|  | 			} else if numScrapes == 2 { | ||||||
|  | 				w.Write([]byte("a 0\nb 0\nc 0 \n")) | ||||||
|  | 				return nil | ||||||
|  | 			} else if numScrapes == 3 { | ||||||
|  | 				cancel() | ||||||
|  | 			} | ||||||
|  | 			return fmt.Errorf("Scrape failed.") | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		go func() { | ||||||
|  | 			sl.run(10*time.Millisecond, time.Hour, nil) | ||||||
|  | 			signal <- struct{}{} | ||||||
|  | 		}() | ||||||
|  | 
 | ||||||
|  | 		select { | ||||||
|  | 		case <-signal: | ||||||
|  | 		case <-time.After(5 * time.Second): | ||||||
|  | 			t.Fatalf("Scrape wasn't stopped.") | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		if len(reportAppender.result) != 4 { | ||||||
|  | 			t.Fatalf("Case %d appended report samples not as expected. Wanted: %d samples Got: %d", i, 4, len(reportAppender.result)) | ||||||
|  | 		} | ||||||
|  | 		if reportAppender.result[0].v != c.up { | ||||||
|  | 			t.Fatalf("Case %d appended up sample not as expected. Wanted: %f Got: %+v", i, c.up, reportAppender.result[0]) | ||||||
|  | 		} | ||||||
|  | 		if reportAppender.result[2].v != c.scrapeSamplesScraped { | ||||||
|  | 			t.Fatalf("Case %d appended scrape_samples_scraped sample not as expected. Wanted: %f Got: %+v", i, c.scrapeSamplesScraped, reportAppender.result[2]) | ||||||
|  | 		} | ||||||
|  | 		if reportAppender.result[3].v != c.scrapeSamplesScrapedPostMetricRelabelling { | ||||||
|  | 			t.Fatalf("Case %d appended scrape_samples_scraped_post_metric_relabeling sample not as expected. Wanted: %f Got: %+v", i, c.scrapeSamplesScrapedPostMetricRelabelling, reportAppender.result[3]) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
| type errorAppender struct { | type errorAppender struct { | ||||||
| 	collectResultAppender | 	collectResultAppender | ||||||
| } | } | ||||||
|  |  | ||||||
|  | @ -30,6 +30,7 @@ import ( | ||||||
| 	"github.com/prometheus/prometheus/config" | 	"github.com/prometheus/prometheus/config" | ||||||
| 	"github.com/prometheus/prometheus/pkg/labels" | 	"github.com/prometheus/prometheus/pkg/labels" | ||||||
| 	"github.com/prometheus/prometheus/pkg/relabel" | 	"github.com/prometheus/prometheus/pkg/relabel" | ||||||
|  | 	"github.com/prometheus/prometheus/pkg/value" | ||||||
| 	"github.com/prometheus/prometheus/storage" | 	"github.com/prometheus/prometheus/storage" | ||||||
| 	"github.com/prometheus/prometheus/util/httputil" | 	"github.com/prometheus/prometheus/util/httputil" | ||||||
| ) | ) | ||||||
|  | @ -228,6 +229,8 @@ func (ts Targets) Len() int           { return len(ts) } | ||||||
| func (ts Targets) Less(i, j int) bool { return ts[i].URL().String() < ts[j].URL().String() } | func (ts Targets) Less(i, j int) bool { return ts[i].URL().String() < ts[j].URL().String() } | ||||||
| func (ts Targets) Swap(i, j int)      { ts[i], ts[j] = ts[j], ts[i] } | func (ts Targets) Swap(i, j int)      { ts[i], ts[j] = ts[j], ts[i] } | ||||||
| 
 | 
 | ||||||
|  | var errSampleLimit = errors.New("sample limit exceeded") | ||||||
|  | 
 | ||||||
| // limitAppender limits the number of total appended samples in a batch.
 | // limitAppender limits the number of total appended samples in a batch.
 | ||||||
| type limitAppender struct { | type limitAppender struct { | ||||||
| 	storage.Appender | 	storage.Appender | ||||||
|  | @ -237,26 +240,29 @@ type limitAppender struct { | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (app *limitAppender) Add(lset labels.Labels, t int64, v float64) (string, error) { | func (app *limitAppender) Add(lset labels.Labels, t int64, v float64) (string, error) { | ||||||
| 	if app.i+1 > app.limit { | 	if !value.IsStaleNaN(v) { | ||||||
| 		return "", fmt.Errorf("sample limit of %d exceeded", app.limit) | 		app.i++ | ||||||
|  | 		if app.i > app.limit { | ||||||
|  | 			return "", errSampleLimit | ||||||
|  | 		} | ||||||
| 	} | 	} | ||||||
| 	ref, err := app.Appender.Add(lset, t, v) | 	ref, err := app.Appender.Add(lset, t, v) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return "", err | 		return "", err | ||||||
| 	} | 	} | ||||||
| 	app.i++ |  | ||||||
| 	return ref, nil | 	return ref, nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (app *limitAppender) AddFast(ref string, t int64, v float64) error { | func (app *limitAppender) AddFast(ref string, t int64, v float64) error { | ||||||
| 	if app.i+1 > app.limit { | 	if !value.IsStaleNaN(v) { | ||||||
| 		return fmt.Errorf("sample limit of %d exceeded", app.limit) | 		app.i++ | ||||||
|  | 		if app.i > app.limit { | ||||||
|  | 			return errSampleLimit | ||||||
|  | 		} | ||||||
| 	} | 	} | ||||||
| 
 |  | ||||||
| 	if err := app.Appender.AddFast(ref, t, v); err != nil { | 	if err := app.Appender.AddFast(ref, t, v); err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| 	app.i++ |  | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue