Merge remote-tracking branch 'origin/main' into krajo/native-histogram-schema-validation

This commit is contained in:
György Krajcsovits 2025-09-19 08:59:00 +02:00
commit b99378f2c4
No known key found for this signature in database
GPG Key ID: 47A8F9CE80FD7C7F
20 changed files with 1023 additions and 436 deletions

View File

@ -226,24 +226,24 @@ jobs:
- name: Install snmp_exporter/generator dependencies
run: sudo apt-get update && sudo apt-get -y install libsnmp-dev
if: github.repository == 'prometheus/snmp_exporter'
- name: Get golangci-lint version
id: golangci-lint-version
run: echo "version=$(make print-golangci-lint-version)" >> $GITHUB_OUTPUT
- name: Lint with stringlabels
uses: golangci/golangci-lint-action@4afd733a84b1f43292c63897423277bb7f4313a9 # v8.0.0
with:
args: --verbose --build-tags=stringlabels
# Make sure to sync this with Makefile.common and scripts/golangci-lint.yml.
version: v2.2.1
version: ${{ steps.golangci-lint-version.outputs.version }}
- name: Lint with slicelabels
uses: golangci/golangci-lint-action@4afd733a84b1f43292c63897423277bb7f4313a9 # v8.0.0
with:
args: --verbose --build-tags=slicelabels
# Make sure to sync this with Makefile.common and scripts/golangci-lint.yml.
version: v2.2.1
version: ${{ steps.golangci-lint-version.outputs.version }}
- name: Lint with dedupelabels
uses: golangci/golangci-lint-action@4afd733a84b1f43292c63897423277bb7f4313a9 # v8.0.0
with:
args: --verbose --build-tags=dedupelabels
# Make sure to sync this with Makefile.common and scripts/golangci-lint.yml.
version: v2.2.1
version: ${{ steps.golangci-lint-version.outputs.version }}
fuzzing:
uses: ./.github/workflows/fuzzing.yml
if: github.event_name == 'pull_request'

View File

@ -61,7 +61,7 @@ PROMU_URL := https://github.com/prometheus/promu/releases/download/v$(PROMU_
SKIP_GOLANGCI_LINT :=
GOLANGCI_LINT :=
GOLANGCI_LINT_OPTS ?=
GOLANGCI_LINT_VERSION ?= v2.2.1
GOLANGCI_LINT_VERSION ?= v2.4.0
GOLANGCI_FMT_OPTS ?=
# golangci-lint only supports linux, darwin and windows platforms on i386/amd64/arm64.
# windows isn't included here because of the path separator being different.
@ -266,6 +266,10 @@ $(GOLANGCI_LINT):
| sh -s -- -b $(FIRST_GOPATH)/bin $(GOLANGCI_LINT_VERSION)
endif
.PHONY: common-print-golangci-lint-version
common-print-golangci-lint-version:
@echo $(GOLANGCI_LINT_VERSION)
.PHONY: precheck
precheck::

View File

@ -42,8 +42,8 @@ var (
configTypesMu sync.Mutex
configTypes = make(map[reflect.Type]reflect.Type)
emptyStructType = reflect.TypeOf(struct{}{})
configsType = reflect.TypeOf(Configs{})
emptyStructType = reflect.TypeFor[struct{}]()
configsType = reflect.TypeFor[Configs]()
)
// RegisterConfig registers the given Config type for YAML marshaling and unmarshaling.
@ -54,7 +54,7 @@ func RegisterConfig(config Config) {
func init() {
// N.B.: static_configs is the only Config type implemented by default.
// All other types are registered at init by their implementing packages.
elemTyp := reflect.TypeOf(&targetgroup.Group{})
elemTyp := reflect.TypeFor[*targetgroup.Group]()
registerConfig(staticConfigsKey, elemTyp, StaticConfig{})
}

View File

@ -79,7 +79,7 @@ navigating to its metrics endpoint:
Let us explore data that Prometheus has collected about itself. To
use Prometheus's built-in expression browser, navigate to
http://localhost:9090/graph and choose the "Table" view within the "Graph" tab.
http://localhost:9090/query and choose the "Graph" tab.
As you can gather from [localhost:9090/metrics](http://localhost:9090/metrics),
one metric that Prometheus exports about itself is named
@ -113,7 +113,7 @@ For more about the expression language, see the
## Using the graphing interface
To graph expressions, navigate to http://localhost:9090/graph and use the "Graph"
To graph expressions, navigate to http://localhost:9090/query and use the "Graph"
tab.
For example, enter the following expression to graph the per-second rate of chunks

5
go.mod
View File

@ -27,7 +27,7 @@ require (
github.com/envoyproxy/go-control-plane/envoy v1.32.4
github.com/envoyproxy/protoc-gen-validate v1.2.1
github.com/facette/natsort v0.0.0-20181210072756-2cd4dd1e2dcb
github.com/fsnotify/fsnotify v1.8.0
github.com/fsnotify/fsnotify v1.9.0
github.com/go-openapi/strfmt v0.23.0
github.com/go-zookeeper/zk v1.0.4
github.com/gogo/protobuf v1.3.2
@ -247,6 +247,3 @@ exclude (
github.com/grpc-ecosystem/grpc-gateway v1.14.7
google.golang.org/api v0.30.0
)
// Pin until https://github.com/fsnotify/fsnotify/issues/656 is resolved.
replace github.com/fsnotify/fsnotify v1.8.0 => github.com/fsnotify/fsnotify v1.7.0

4
go.sum
View File

@ -141,8 +141,8 @@ github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM=
github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE=
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA=
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k=
github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0=
github.com/fxamacker/cbor/v2 v2.7.0 h1:iM5WgngdRBanHcxugY4JySA0nk1wZorNOpTgCMedv5E=
github.com/fxamacker/cbor/v2 v2.7.0/go.mod h1:pxXPTn3joSm21Gbwsv0w9OSA2y1HFR9qXEeXQVeNoDQ=
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=

View File

@ -3195,89 +3195,6 @@ func TestEngine_Close(t *testing.T) {
})
}
func TestInstantQueryWithRangeVectorSelector(t *testing.T) {
engine := newTestEngine(t)
baseT := timestamp.Time(0)
storage := promqltest.LoadedStorage(t, `
load 1m
some_metric{env="1"} 0+1x4
some_metric{env="2"} 0+2x4
some_metric{env="3"} {{count:0}}+{{count:1}}x4
some_metric_with_stale_marker 0 1 stale 3
`)
t.Cleanup(func() { require.NoError(t, storage.Close()) })
testCases := map[string]struct {
expr string
expected promql.Matrix
ts time.Time
}{
"matches series with points in range": {
expr: "some_metric[2m]",
ts: baseT.Add(2 * time.Minute),
expected: promql.Matrix{
{
Metric: labels.FromStrings("__name__", "some_metric", "env", "1"),
Floats: []promql.FPoint{
{T: timestamp.FromTime(baseT.Add(time.Minute)), F: 1},
{T: timestamp.FromTime(baseT.Add(2 * time.Minute)), F: 2},
},
},
{
Metric: labels.FromStrings("__name__", "some_metric", "env", "2"),
Floats: []promql.FPoint{
{T: timestamp.FromTime(baseT.Add(time.Minute)), F: 2},
{T: timestamp.FromTime(baseT.Add(2 * time.Minute)), F: 4},
},
},
{
Metric: labels.FromStrings("__name__", "some_metric", "env", "3"),
Histograms: []promql.HPoint{
{T: timestamp.FromTime(baseT.Add(time.Minute)), H: &histogram.FloatHistogram{Count: 1, CounterResetHint: histogram.NotCounterReset}},
{T: timestamp.FromTime(baseT.Add(2 * time.Minute)), H: &histogram.FloatHistogram{Count: 2, CounterResetHint: histogram.NotCounterReset}},
},
},
},
},
"matches no series": {
expr: "some_nonexistent_metric[1m]",
ts: baseT,
expected: promql.Matrix{},
},
"no samples in range": {
expr: "some_metric[1m]",
ts: baseT.Add(20 * time.Minute),
expected: promql.Matrix{},
},
"metric with stale marker": {
expr: "some_metric_with_stale_marker[3m]",
ts: baseT.Add(3 * time.Minute),
expected: promql.Matrix{
{
Metric: labels.FromStrings("__name__", "some_metric_with_stale_marker"),
Floats: []promql.FPoint{
{T: timestamp.FromTime(baseT.Add(time.Minute)), F: 1},
{T: timestamp.FromTime(baseT.Add(3 * time.Minute)), F: 3},
},
},
},
},
}
for name, testCase := range testCases {
t.Run(name, func(t *testing.T) {
q, err := engine.NewInstantQuery(context.Background(), storage, nil, testCase.expr, testCase.ts)
require.NoError(t, err)
defer q.Close()
res := q.Exec(context.Background())
require.NoError(t, res.Err)
testutil.RequireEqual(t, testCase.expected, res.Value)
})
}
}
func TestQueryLookbackDelta(t *testing.T) {
var (
load = `load 5m

View File

@ -106,8 +106,44 @@ eval range from <start> to <end> step <step> <query>
* `<start>` and `<end>` specify the time range of the range query, and use the same syntax as `<time>`
* `<step>` is the step of the range query, and uses the same syntax as `<time>` (eg. `30s`)
* `<expect>`(optional) specifies expected annotations, errors, or result ordering.
* `<expect range vector>` (optional) for an instant query you can specify expected range vector timestamps
* `<expect string> "<string>"` (optional) for matching a string literal
* `<series>` and `<points>` specify the expected values, and follow the same syntax as for `load` above
### `expect string`
This can be used to specify that a string literal is the expected result.
Note that this is only supported on instant queries.
For example;
```
eval instant at 50m ("Foo")
expect string "Foo"
```
The expected string value must be within quotes. Double or back quotes are supported.
### `expect range vector`
This can be used to specify the expected timestamps on a range vector resulting from an instant query.
```
expect range vector <start> to <end> step <step>
```
For example;
```
load 10s
some_metric{env="a"} 1+1x5
some_metric{env="b"} 2+2x5
eval instant at 1m some_metric[1m]
expect range vector from 10s to 1m step 10s
some_metric{env="a"} 2 3 4 5 6
some_metric{env="b"} 4 6 8 10 12
```
### `expect` Syntax
```

View File

@ -53,11 +53,14 @@ var (
patEvalRange = regexp.MustCompile(`^eval(?:_(fail|warn|info))?\s+range\s+from\s+(.+)\s+to\s+(.+)\s+step\s+(.+?)\s+(.+)$`)
patExpect = regexp.MustCompile(`^expect\s+(ordered|fail|warn|no_warn|info|no_info)(?:\s+(regex|msg):(.+))?$`)
patMatchAny = regexp.MustCompile(`^.*$`)
patExpectRange = regexp.MustCompile(`^` + rangeVectorPrefix + `\s+from\s+(.+)\s+to\s+(.+)\s+step\s+(.+)$`)
)
const (
defaultEpsilon = 0.000001 // Relative error allowed for sample values.
DefaultMaxSamplesPerQuery = 10000
rangeVectorPrefix = "expect range vector"
expectStringPrefix = "expect string"
)
type TBRun interface {
@ -314,7 +317,58 @@ func validateExpectedCmds(cmd *evalCmd) error {
return nil
}
func (*test) parseEval(lines []string, i int) (int, *evalCmd, error) {
// Given an expected range vector definition, parse the line and return the start & end times and the step duration.
// ie parse a line such as "expect range vector from 10s to 1m step 10s".
// The from and to are parsed as durations and their values added to epoch(0) to form a time.Time.
// The step is parsed as a duration and returned as a time.Duration.
func (t *test) parseExpectRangeVector(line string) (*time.Time, *time.Time, *time.Duration, error) {
parts := patExpectRange.FindStringSubmatch(line)
if len(parts) != 4 {
return nil, nil, nil, fmt.Errorf("invalid range vector definition %q", line)
}
from := parts[1]
to := parts[2]
step := parts[3]
parsedFrom, parsedTo, parsedStep, err := t.parseDurations(from, to, step)
if err != nil {
return nil, nil, nil, err
}
start := testStartTime.Add(time.Duration(*parsedFrom))
end := testStartTime.Add(time.Duration(*parsedTo))
stepDuration := time.Duration(*parsedStep)
return &start, &end, &stepDuration, nil
}
// parseDurations parses the given from, to and step strings to Durations.
// Additionally, a check is performed to ensure to is before from.
func (*test) parseDurations(from, to, step string) (*model.Duration, *model.Duration, *model.Duration, error) {
parsedFrom, err := model.ParseDuration(from)
if err != nil {
return nil, nil, nil, fmt.Errorf("invalid start timestamp definition %q: %w", from, err)
}
parsedTo, err := model.ParseDuration(to)
if err != nil {
return nil, nil, nil, fmt.Errorf("invalid end timestamp definition %q: %w", to, err)
}
if parsedTo < parsedFrom {
return nil, nil, nil, fmt.Errorf("invalid test definition, end timestamp (%s) is before start timestamp (%s)", to, from)
}
parsedStep, err := model.ParseDuration(step)
if err != nil {
return nil, nil, nil, fmt.Errorf("invalid step definition %q: %w", step, err)
}
return &parsedFrom, &parsedTo, &parsedStep, nil
}
func (t *test) parseEval(lines []string, i int) (int, *evalCmd, error) {
instantParts := patEvalInstant.FindStringSubmatch(lines[i])
rangeParts := patEvalRange.FindStringSubmatch(lines[i])
@ -355,10 +409,11 @@ func (*test) parseEval(lines []string, i int) (int, *evalCmd, error) {
}
var cmd *evalCmd
var offset model.Duration
if isInstant {
at := instantParts[2]
offset, err := model.ParseDuration(at)
offset, err = model.ParseDuration(at)
if err != nil {
return i, nil, formatErr("invalid timestamp definition %q: %s", at, err)
}
@ -369,26 +424,12 @@ func (*test) parseEval(lines []string, i int) (int, *evalCmd, error) {
to := rangeParts[3]
step := rangeParts[4]
parsedFrom, err := model.ParseDuration(from)
parsedFrom, parsedTo, parsedStep, err := t.parseDurations(from, to, step)
if err != nil {
return i, nil, formatErr("invalid start timestamp definition %q: %s", from, err)
return i, nil, formatErr(err.Error())
}
parsedTo, err := model.ParseDuration(to)
if err != nil {
return i, nil, formatErr("invalid end timestamp definition %q: %s", to, err)
}
if parsedTo < parsedFrom {
return i, nil, formatErr("invalid test definition, end timestamp (%s) is before start timestamp (%s)", to, from)
}
parsedStep, err := model.ParseDuration(step)
if err != nil {
return i, nil, formatErr("invalid step definition %q: %s", step, err)
}
cmd = newRangeEvalCmd(expr, testStartTime.Add(time.Duration(parsedFrom)), testStartTime.Add(time.Duration(parsedTo)), time.Duration(parsedStep), i+1)
cmd = newRangeEvalCmd(expr, testStartTime.Add(time.Duration(*parsedFrom)), testStartTime.Add(time.Duration(*parsedTo)), time.Duration(*parsedStep), i+1)
}
switch mod {
@ -404,6 +445,8 @@ func (*test) parseEval(lines []string, i int) (int, *evalCmd, error) {
cmd.info = true
}
var expectRangeVector bool
for j := 1; i+1 < len(lines); j++ {
i++
defLine := lines[i]
@ -426,6 +469,32 @@ func (*test) parseEval(lines []string, i int) (int, *evalCmd, error) {
break
}
if strings.HasPrefix(defLine, rangeVectorPrefix) {
start, end, step, err := t.parseExpectRangeVector(defLine)
if err != nil {
return i, nil, formatErr("%w", err)
}
expectRangeVector = true
cmd.start = *start
cmd.end = *end
cmd.step = *step
cmd.eval = *end
cmd.excludeFromRangeQuery = true
continue
}
if strings.HasPrefix(defLine, expectStringPrefix) {
expectString, err := parseAsStringLiteral(defLine)
if err != nil {
return i, nil, formatErr("%w", err)
}
cmd.expectedString = expectString
cmd.excludeFromRangeQuery = true
continue
}
// This would still allow a metric named 'expect' if it is written as 'expect{}'.
if strings.Split(defLine, " ")[0] == "expect" {
annoType, expectedAnno, err := parseExpect(defLine)
@ -450,15 +519,35 @@ func (*test) parseEval(lines []string, i int) (int, *evalCmd, error) {
return i, nil, err
}
// Currently, we are not expecting any matrices.
if len(vals) > 1 && isInstant {
return i, nil, formatErr("expecting multiple values in instant evaluation not allowed")
// Only allow a range vector for an instant query where we have defined the expected range vector timestamps.
if len(vals) > 1 && isInstant && !expectRangeVector {
return i, nil, formatErr("expecting multiple values in instant evaluation not allowed. consider using 'expect range vector' directive to enable a range vector result for an instant query")
}
cmd.expectMetric(j, metric, vals...)
}
return i, cmd, nil
}
// parseAsStringLiteral returns the expected string from an expect string expression.
// It is valid for the line to match the expect string prefix exactly, and an empty string is returned.
func parseAsStringLiteral(line string) (string, error) {
if line == expectStringPrefix {
return "", errors.New("expected string literal not valid - a quoted string literal is required")
}
str := strings.TrimPrefix(line, expectStringPrefix+" ")
if len(str) == 0 {
return "", errors.New("expected string literal not valid - a quoted string literal is required")
}
str, err := strconv.Unquote(str)
if err != nil {
return "", errors.New("expected string literal not valid - check that the string is correctly quoted")
}
return str, nil
}
// getLines returns trimmed lines after removing the comments.
func getLines(input string) []string {
lines := strings.Split(input, "\n")
@ -692,6 +781,7 @@ type evalCmd struct {
end time.Time
step time.Duration
line int
eval time.Time
isRange bool // if false, instant query
fail, warn, ordered, info bool
@ -703,6 +793,12 @@ type evalCmd struct {
metrics map[uint64]labels.Labels
expectScalar bool
expected map[uint64]entry
// we expect a string literal - is set instead of expected
expectedString string
// if true and this is an instant query then we will not test this in a range query scenario
excludeFromRangeQuery bool
}
func (ev *evalCmd) isOrdered() bool {
@ -772,6 +868,7 @@ func newInstantEvalCmd(expr string, start time.Time, line int) *evalCmd {
return &evalCmd{
expr: expr,
start: start,
eval: start,
line: line,
metrics: map[uint64]labels.Labels{},
@ -1016,7 +1113,10 @@ func (ev *evalCmd) compareResult(result parser.Value) error {
if !almost.Equal(exp0.Value, val.V, defaultEpsilon) {
return fmt.Errorf("expected scalar %v but got %v", exp0.Value, val.V)
}
case promql.String:
if ev.expectedString != val.V {
return fmt.Errorf("expected string \"%v\" but got \"%v\"", ev.expectedString, val.V)
}
default:
panic(fmt.Errorf("promql.Test.compareResult: unexpected result type %T", result))
}
@ -1354,11 +1454,12 @@ func (t *test) execRangeEval(cmd *evalCmd, engine promql.QueryEngine) error {
}
func (t *test) execInstantEval(cmd *evalCmd, engine promql.QueryEngine) error {
queries, err := atModifierTestCases(cmd.expr, cmd.start)
queries, err := atModifierTestCases(cmd.expr, cmd.eval)
if err != nil {
return err
}
queries = append([]atModifierTestCase{{expr: cmd.expr, evalTime: cmd.start}}, queries...)
queries = append([]atModifierTestCase{{expr: cmd.expr, evalTime: cmd.eval}}, queries...)
for _, iq := range queries {
if err := t.runInstantQuery(iq, cmd, engine); err != nil {
return err
@ -1395,6 +1496,12 @@ func (t *test) runInstantQuery(iq atModifierTestCase, cmd *evalCmd, engine promq
return fmt.Errorf("error in %s %s (line %d): %w", cmd, iq.expr, cmd.line, err)
}
// this query has have been explicitly excluded from range query testing
// ie it could be that the query result is not an instant vector or scalar
if cmd.excludeFromRangeQuery {
return nil
}
// Check query returns same result in range mode,
// by checking against the middle step.
q, err = engine.NewRangeQuery(t.context, t.storage, nil, iq.expr, iq.evalTime.Add(-time.Minute), iq.evalTime.Add(time.Minute), time.Minute)

View File

@ -948,6 +948,144 @@ eval instant at 0m http_requests
`,
expectedError: `error in eval http_requests (line 12): invalid expect lines, multiple expect fail lines are not allowed`,
},
"instant query with string literal": {
input: `
eval instant at 50m ("Foo")
expect string "Foo"
`,
},
"instant query with string literal with leading space": {
input: `
eval instant at 50m (" Foo")
expect string " Foo"
`,
},
"instant query with string literal with trailing space": {
input: `
eval instant at 50m ("Foo ")
expect string "Foo "
`,
},
"instant query with string literal as space": {
input: `
eval instant at 50m (" ")
expect string " "
`,
},
"instant query with string literal with empty string": {
input: `
eval instant at 50m ("")
expect string
`,
expectedError: `error in eval ("") (line 3): expected string literal not valid - a quoted string literal is required`,
},
"instant query with string literal with correctly quoted empty string": {
input: `
eval instant at 50m ("")
expect string ""
`,
},
"instant query with string literal - not quoted": {
input: `
eval instant at 50m ("Foo")
expect string Foo
`,
expectedError: `error in eval ("Foo") (line 3): expected string literal not valid - check that the string is correctly quoted`,
},
"instant query with empty string literal": {
input: `
eval instant at 50m ("Foo")
expect string ""
`,
expectedError: `error in eval ("Foo") (line 2): expected string "" but got "Foo"`,
},
"instant query with error string literal": {
input: `
eval instant at 50m ("Foo")
expect string "Bar"
`,
expectedError: `error in eval ("Foo") (line 2): expected string "Bar" but got "Foo"`,
},
"instant query with range result - result does not have a series that is expected": {
input: `
load 10s
some_metric{env="a"} 1+1x5
eval instant at 1m some_metric[1m]
expect range vector from 10s to 1m step 10s
some_metric{env="a"} 2 3 4 5 6
some_metric{env="b"} 4 6 8 10 12
`,
expectedError: `error in eval some_metric[1m] (line 5): expected metric {__name__="some_metric", env="b"} not found`,
},
"instant query with range result - result has a series which is not expected": {
input: `
load 10s
some_metric{env="a"} 1+1x5
some_metric{env="b"} 1+1x5
eval instant at 1m some_metric[1m]
expect range vector from 10s to 1m step 10s
some_metric{env="a"} 2 3 4 5 6
`,
expectedError: `error in eval some_metric[1m] (line 6): unexpected metric {__name__="some_metric", env="b"} in result, has 5 float points [2 @[10000] 3 @[20000] 4 @[30000] 5 @[40000] 6 @[50000]] and 0 histogram points []`,
},
"instant query with range result - result has a value that is not expected": {
input: `
load 10s
some_metric{env="a"} 1+1x5
eval instant at 1m some_metric[1m]
expect range vector from 10s to 1m step 10s
some_metric{env="a"} 9 3 4 5 6
`,
expectedError: `error in eval some_metric[1m] (line 5): expected float value at index 0 (t=10000) for {__name__="some_metric", env="a"} to be 9, but got 2 (result has 5 float points [2 @[10000] 3 @[20000] 4 @[30000] 5 @[40000] 6 @[50000]] and 0 histogram points [])`,
},
"instant query with range result - invalid expect range vector directive": {
input: `
load 10s
some_metric{env="a"} 1+1x5
eval instant at 1m some_metric[1m]
expect range vector from 10s
some_metric{env="a"} 2 3 4 5 6
`,
expectedError: `error in eval some_metric[1m] (line 6): invalid range vector definition "expect range vector from 10s"`,
},
"instant query with range result - result matches expected value": {
input: `
load 1m
some_metric{env="1"} 0+1x4
some_metric{env="2"} 0+2x4
eval instant at 2m some_metric[2m]
expect range vector from 1m to 2m step 60s
some_metric{env="1"} 1 2
some_metric{env="2"} 2 4
`,
},
"instant query with range result - result has a is missing a sample": {
input: `
load 1m
some_metric_with_stale_marker 0 1 stale 3
eval instant at 3m some_metric_with_stale_marker[3m]
expect range vector from 1m to 3m step 60s
some_metric_with_stale_marker{} 1 2 3
`,
expectedError: `error in eval some_metric_with_stale_marker[3m] (line 5): expected 3 float points and 0 histogram points for {__name__="some_metric_with_stale_marker"}, but got 2 float points [1 @[60000] 3 @[180000]] and 0 histogram points []`,
},
"instant query with range result - result has a sample where none is expected": {
input: `
load 1m
some_metric_with_stale_marker 0 1 2 3
eval instant at 3m some_metric_with_stale_marker[3m]
expect range vector from 1m to 3m step 60s
some_metric_with_stale_marker{} 1 _ 3
`,
expectedError: `error in eval some_metric_with_stale_marker[3m] (line 5): expected 2 float points and 0 histogram points for {__name__="some_metric_with_stale_marker"}, but got 3 float points [1 @[60000] 2 @[120000] 3 @[180000]] and 0 histogram points []`,
},
}
for name, testCase := range testCases {

View File

@ -57,3 +57,18 @@ eval instant at 50m 0 / 0
eval instant at 50m 1 % 0
NaN
eval instant at 50m ("Foo")
expect string `Foo`
eval instant at 50m "Foo"
expect string "Foo"
eval instant at 50m " Foo "
expect string " Foo "
eval instant at 50m ("")
expect string ""
eval instant at 50m ""
expect string ""

View File

@ -1677,3 +1677,18 @@ eval instant at 1m histogram_count(histogram unless histogram_quantile(0.5, hist
eval instant at 1m histogram_quantile(0.5, histogram unless histogram_count(histogram) == 0)
{} 3.1748021039363987
clear
# Regression test for:
# https://github.com/prometheus/prometheus/issues/14172
# https://github.com/prometheus/prometheus/issues/15177
load 1m
mixed_metric1 1 2 3 {{schema:0 sum:5 count:4 buckets:[1 2 1]}} {{schema:0 sum:8 count:6 buckets:[1 4 1]}} 4 5 {{schema:0 sum:18 count:10 buckets:[3 4 3]}}
mixed_metric2 1 2 3 {{schema:0 sum:5 count:4 buckets:[1 2 1]}} {{schema:0 sum:8 count:6 buckets:[1 4 1]}}
# The order of the float vs native histograms is preserved.
eval range from 0 to 8m step 1m mixed_metric1
mixed_metric1{} 1 2 3 {{count:4 sum:5 buckets:[1 2 1]}} {{count:6 sum:8 buckets:[1 4 1]}} 4 5 {{schema:0 sum:18 count:10 buckets:[3 4 3]}} {{schema:0 sum:18 count:10 buckets:[3 4 3]}}
eval range from 0 to 5m step 1m mixed_metric2
mixed_metric2 1 2 3 {{count:4 sum:5 buckets:[1 2 1]}} {{count:6 sum:8 buckets:[1 4 1]}} {{count:6 sum:8 buckets:[1 4 1]}}

View File

@ -71,3 +71,37 @@ eval range from 0 to 2m step 1m requests * 2
{job="1", __address__="bar"} 200 200 200
clear
load 10s
some_metric{env="a"} 1+1x5
some_metric{env="b"} 2+2x5
# Return a range vector - note the use of the expect range vector directive which defines expected range
eval instant at 1m some_metric[1m]
expect range vector from 10s to 1m step 10s
some_metric{env="a"} 2 3 4 5 6
some_metric{env="b"} 4 6 8 10 12
clear
load 1m
some_metric{env="1"} 0+1x4
some_metric{env="2"} 0+2x4
some_metric{env="3"} {{count:0}}+{{count:1}}x4
some_metric_with_stale_marker 0 1 stale 3
eval instant at 2m some_metric[2m]
expect range vector from 1m to 2m step 60s
some_metric{env="1"} 1 2
some_metric{env="2"} 2 4
some_metric{env="3"} {{count:1 counter_reset_hint:not_reset}} {{count:2 counter_reset_hint:not_reset}}
eval instant at 3m some_metric_with_stale_marker[3m]
expect range vector from 1m to 3m step 60s
some_metric_with_stale_marker{} 1 _ 3
eval instant at 1m some_nonexistent_metric[1m]
expect range vector from 10s to 1m step 10s
eval instant at 10m some_metric[1m]
expect range vector from 9m10s to 10m step 1m

View File

@ -34,8 +34,11 @@ jobs:
- name: Install snmp_exporter/generator dependencies
run: sudo apt-get update && sudo apt-get -y install libsnmp-dev
if: github.repository == 'prometheus/snmp_exporter'
- name: Get golangci-lint version
id: golangci-lint-version
run: echo "version=$(make print-golangci-lint-version)" >> $GITHUB_OUTPUT
- name: Lint
uses: golangci/golangci-lint-action@4afd733a84b1f43292c63897423277bb7f4313a9 # v8.0.0
with:
args: --verbose
version: v2.2.1
version: ${{ steps.golangci-lint-version.outputs.version }}

View File

@ -139,7 +139,7 @@ func (b *combinedAppender) appendFloatOrHistogram(ls labels.Labels, meta metadat
ref = 0
}
updateRefs := !exists || series.ct != ct
if updateRefs && ct != 0 && b.ingestCTZeroSample {
if updateRefs && ct != 0 && ct < t && b.ingestCTZeroSample {
var newRef storage.SeriesRef
if h != nil {
newRef, err = b.app.AppendHistogramCTZeroSample(ref, ls, t, ct, h, nil)
@ -147,10 +147,14 @@ func (b *combinedAppender) appendFloatOrHistogram(ls labels.Labels, meta metadat
newRef, err = b.app.AppendCTZeroSample(ref, ls, t, ct)
}
if err != nil {
if !errors.Is(err, storage.ErrOutOfOrderCT) {
if !errors.Is(err, storage.ErrOutOfOrderCT) && !errors.Is(err, storage.ErrDuplicateSampleForTimestamp) {
// Even for the first sample OOO is a common scenario because
// we can't tell if a CT was already ingested in a previous request.
// We ignore the error.
// ErrDuplicateSampleForTimestamp is also a common scenario because
// unknown start times in Opentelemetry are indicated by setting
// the start time to the same as the first sample time.
// https://opentelemetry.io/docs/specs/otel/metrics/data-model/#cumulative-streams-handling-unknown-start-time
b.logger.Warn("Error when appending CT from OTLP", "err", err, "series", ls.String(), "created_timestamp", ct, "timestamp", t, "sample_type", sampleType(h))
}
} else {

View File

@ -14,6 +14,7 @@
package prometheusremotewrite
import (
"bytes"
"context"
"errors"
"math"
@ -160,8 +161,10 @@ func testCombinedAppenderOnTSDB(t *testing.T, ingestCTZeroSample bool) {
testCases := map[string]struct {
appendFunc func(*testing.T, CombinedAppender)
extraAppendFunc func(*testing.T, CombinedAppender)
expectedSamples []sample
expectedExemplars []exemplar.QueryResult
expectedLogsForCT []string
}{
"single float sample, zero CT": {
appendFunc: func(t *testing.T, app CombinedAppender) {
@ -185,6 +188,10 @@ func testCombinedAppenderOnTSDB(t *testing.T, ingestCTZeroSample bool) {
f: 42.0,
},
},
expectedLogsForCT: []string{
"Error when appending CT from OTLP",
"out of bound",
},
},
"single float sample, normal CT": {
appendFunc: func(t *testing.T, app CombinedAppender) {
@ -212,6 +219,24 @@ func testCombinedAppenderOnTSDB(t *testing.T, ingestCTZeroSample bool) {
},
},
},
"two float samples in different messages, CT same time as first sample": {
appendFunc: func(t *testing.T, app CombinedAppender) {
require.NoError(t, app.AppendSample(seriesLabels.Copy(), floatMetadata, now.UnixMilli(), now.UnixMilli(), 42.0, nil))
},
extraAppendFunc: func(t *testing.T, app CombinedAppender) {
require.NoError(t, app.AppendSample(seriesLabels.Copy(), floatMetadata, now.UnixMilli(), now.Add(time.Second).UnixMilli(), 43.0, nil))
},
expectedSamples: []sample{
{
t: now.UnixMilli(),
f: 42.0,
},
{
t: now.Add(time.Second).UnixMilli(),
f: 43.0,
},
},
},
"single float sample, CT in the future of the sample": {
appendFunc: func(t *testing.T, app CombinedAppender) {
require.NoError(t, app.AppendSample(seriesLabels.Copy(), floatMetadata, now.Add(time.Minute).UnixMilli(), now.UnixMilli(), 42.0, nil))
@ -245,6 +270,10 @@ func testCombinedAppenderOnTSDB(t *testing.T, ingestCTZeroSample bool) {
h: tsdbutil.GenerateTestHistogram(42),
},
},
expectedLogsForCT: []string{
"Error when appending CT from OTLP",
"out of bound",
},
},
"single histogram sample, normal CT": {
appendFunc: func(t *testing.T, app CombinedAppender) {
@ -273,6 +302,24 @@ func testCombinedAppenderOnTSDB(t *testing.T, ingestCTZeroSample bool) {
},
},
},
"two histogram samples in different messages, CT same time as first sample": {
appendFunc: func(t *testing.T, app CombinedAppender) {
require.NoError(t, app.AppendHistogram(seriesLabels.Copy(), floatMetadata, now.UnixMilli(), now.UnixMilli(), tsdbutil.GenerateTestHistogram(42), nil))
},
extraAppendFunc: func(t *testing.T, app CombinedAppender) {
require.NoError(t, app.AppendHistogram(seriesLabels.Copy(), floatMetadata, now.UnixMilli(), now.Add(time.Second).UnixMilli(), tsdbutil.GenerateTestHistogram(43), nil))
},
expectedSamples: []sample{
{
t: now.UnixMilli(),
h: tsdbutil.GenerateTestHistogram(42),
},
{
t: now.Add(time.Second).UnixMilli(),
h: tsdbutil.GenerateTestHistogram(43),
},
},
},
"single histogram sample, CT in the future of the sample": {
appendFunc: func(t *testing.T, app CombinedAppender) {
require.NoError(t, app.AppendHistogram(seriesLabels.Copy(), histogramMetadata, now.Add(time.Minute).UnixMilli(), now.UnixMilli(), tsdbutil.GenerateTestHistogram(42), nil))
@ -344,6 +391,11 @@ func testCombinedAppenderOnTSDB(t *testing.T, ingestCTZeroSample bool) {
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
var expectedLogs []string
if ingestCTZeroSample {
expectedLogs = append(expectedLogs, tc.expectedLogsForCT...)
}
dir := t.TempDir()
opts := tsdb.DefaultOptions()
opts.EnableExemplarStorage = true
@ -354,15 +406,32 @@ func testCombinedAppenderOnTSDB(t *testing.T, ingestCTZeroSample bool) {
t.Cleanup(func() { db.Close() })
var output bytes.Buffer
logger := promslog.New(&promslog.Config{Writer: &output})
ctx := context.Background()
reg := prometheus.NewRegistry()
cappMetrics := NewCombinedAppenderMetrics(reg)
app := db.Appender(ctx)
capp := NewCombinedAppender(app, promslog.NewNopLogger(), ingestCTZeroSample, NewCombinedAppenderMetrics(reg))
capp := NewCombinedAppender(app, logger, ingestCTZeroSample, cappMetrics)
tc.appendFunc(t, capp)
require.NoError(t, app.Commit())
if tc.extraAppendFunc != nil {
app = db.Appender(ctx)
capp = NewCombinedAppender(app, logger, ingestCTZeroSample, cappMetrics)
tc.extraAppendFunc(t, capp)
require.NoError(t, app.Commit())
}
if len(expectedLogs) > 0 {
for _, expectedLog := range expectedLogs {
require.Contains(t, output.String(), expectedLog)
}
} else {
require.Empty(t, output.String(), "unexpected log output")
}
q, err := db.Querier(int64(math.MinInt64), int64(math.MaxInt64))
require.NoError(t, err)

View File

@ -300,19 +300,87 @@ func TestDataNotAvailableAfterRollback(t *testing.T) {
}()
app := db.Appender(context.Background())
_, err := app.Append(0, labels.FromStrings("foo", "bar"), 0, 0)
_, err := app.Append(0, labels.FromStrings("type", "float"), 0, 0)
require.NoError(t, err)
_, err = app.AppendHistogram(
0, labels.FromStrings("type", "histogram"), 0,
&histogram.Histogram{Count: 42, Sum: math.NaN()}, nil,
)
require.NoError(t, err)
_, err = app.AppendHistogram(
0, labels.FromStrings("type", "floathistogram"), 0,
nil, &histogram.FloatHistogram{Count: 42, Sum: math.NaN()},
)
require.NoError(t, err)
err = app.Rollback()
require.NoError(t, err)
for _, typ := range []string{"float", "histogram", "floathistogram"} {
querier, err := db.Querier(0, 1)
require.NoError(t, err)
defer querier.Close()
seriesSet := query(t, querier, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
seriesSet := query(t, querier, labels.MustNewMatcher(labels.MatchEqual, "type", typ))
require.Equal(t, map[string][]chunks.Sample{}, seriesSet)
}
sr, err := wlog.NewSegmentsReader(db.head.wal.Dir())
require.NoError(t, err)
defer func() {
require.NoError(t, sr.Close())
}()
// Read records from WAL and check for expected count of series and samples.
var (
r = wlog.NewReader(sr)
dec = record.NewDecoder(labels.NewSymbolTable())
walSeriesCount, walSamplesCount, walHistogramCount, walFloatHistogramCount, walExemplarsCount int
)
for r.Next() {
rec := r.Record()
switch dec.Type(rec) {
case record.Series:
var series []record.RefSeries
series, err = dec.Series(rec, series)
require.NoError(t, err)
walSeriesCount += len(series)
case record.Samples:
var samples []record.RefSample
samples, err = dec.Samples(rec, samples)
require.NoError(t, err)
walSamplesCount += len(samples)
case record.Exemplars:
var exemplars []record.RefExemplar
exemplars, err = dec.Exemplars(rec, exemplars)
require.NoError(t, err)
walExemplarsCount += len(exemplars)
case record.HistogramSamples, record.CustomBucketsHistogramSamples:
var histograms []record.RefHistogramSample
histograms, err = dec.HistogramSamples(rec, histograms)
require.NoError(t, err)
walHistogramCount += len(histograms)
case record.FloatHistogramSamples, record.CustomBucketsFloatHistogramSamples:
var floatHistograms []record.RefFloatHistogramSample
floatHistograms, err = dec.FloatHistogramSamples(rec, floatHistograms)
require.NoError(t, err)
walFloatHistogramCount += len(floatHistograms)
default:
}
}
// Check that only series get stored after calling Rollback.
require.Equal(t, 3, walSeriesCount, "series should have been written to WAL")
require.Equal(t, 0, walSamplesCount, "samples should not have been written to WAL")
require.Equal(t, 0, walExemplarsCount, "exemplars should not have been written to WAL")
require.Equal(t, 0, walHistogramCount, "histograms should not have been written to WAL")
require.Equal(t, 0, walFloatHistogramCount, "float histograms should not have been written to WAL")
}
func TestDBAppenderAddRef(t *testing.T) {
@ -4856,10 +4924,7 @@ func TestMetadataAssertInMemoryData(t *testing.T) {
}
// TestMultipleEncodingsCommitOrder mainly serves to demonstrate when happens when committing a batch of samples for the
// same series when there are multiple encodings. Commit() will process all float samples before histogram samples. This
// means that if histograms are appended before floats, the histograms could be marked as OOO when they are committed.
// While possible, this shouldn't happen very often - you need the same series to be ingested as both a float and a
// histogram in a single write request.
// same series when there are multiple encodings. With issue #15177 fixed, this now all works as expected.
func TestMultipleEncodingsCommitOrder(t *testing.T) {
opts := DefaultOptions()
opts.OutOfOrderCapMax = 30
@ -4933,26 +4998,19 @@ func TestMultipleEncodingsCommitOrder(t *testing.T) {
s := addSample(app, int64(i), chunkenc.ValFloat)
expSamples = append(expSamples, s)
}
// These samples will be marked as OOO as their timestamps are less than the max timestamp for float samples in the
// same batch.
for i := 110; i < 120; i++ {
s := addSample(app, int64(i), chunkenc.ValHistogram)
expSamples = append(expSamples, s)
}
// These samples will be marked as OOO as their timestamps are less than the max timestamp for float samples in the
// same batch.
for i := 120; i < 130; i++ {
s := addSample(app, int64(i), chunkenc.ValFloatHistogram)
expSamples = append(expSamples, s)
}
// These samples will be marked as in-order as their timestamps are greater than the max timestamp for float
// samples in the same batch.
for i := 140; i < 150; i++ {
s := addSample(app, int64(i), chunkenc.ValFloatHistogram)
expSamples = append(expSamples, s)
}
// These samples will be marked as in-order, even though they're appended after the float histograms from ts 140-150
// because float samples are processed first and these samples are in-order wrt to the float samples in the batch.
// These samples will be marked as out-of-order.
for i := 130; i < 135; i++ {
s := addSample(app, int64(i), chunkenc.ValFloat)
expSamples = append(expSamples, s)
@ -4964,8 +5022,8 @@ func TestMultipleEncodingsCommitOrder(t *testing.T) {
return expSamples[i].T() < expSamples[j].T()
})
// oooCount = 20 because the histograms from 120 - 130 and float histograms from 120 - 130 are detected as OOO.
verifySamples(100, 150, expSamples, 20)
// oooCount = 5 for the samples 130 to 134.
verifySamples(100, 150, expSamples, 5)
// Append and commit some in-order histograms by themselves.
app = db.Appender(context.Background())
@ -4975,8 +5033,8 @@ func TestMultipleEncodingsCommitOrder(t *testing.T) {
}
require.NoError(t, app.Commit())
// oooCount remains at 20 as no new OOO samples have been added.
verifySamples(100, 160, expSamples, 20)
// oooCount remains at 5.
verifySamples(100, 160, expSamples, 5)
// Append and commit samples for all encoding types. This time all samples will be treated as OOO because samples
// with newer timestamps have already been committed.
@ -5004,8 +5062,8 @@ func TestMultipleEncodingsCommitOrder(t *testing.T) {
return expSamples[i].T() < expSamples[j].T()
})
// oooCount = 50 as we've added 30 more OOO samples.
verifySamples(50, 160, expSamples, 50)
// oooCount = 35 as we've added 30 more OOO samples.
verifySamples(50, 160, expSamples, 35)
}
// TODO(codesome): test more samples incoming once compaction has started. To verify new samples after the start

View File

@ -86,7 +86,8 @@ type Head struct {
exemplarMetrics *ExemplarMetrics
exemplars ExemplarStorage
logger *slog.Logger
appendPool zeropool.Pool[[]record.RefSample]
refSeriesPool zeropool.Pool[[]record.RefSeries]
floatsPool zeropool.Pool[[]record.RefSample]
exemplarsPool zeropool.Pool[[]exemplarWithSeriesRef]
histogramsPool zeropool.Pool[[]record.RefHistogramSample]
floatHistogramsPool zeropool.Pool[[]record.RefFloatHistogramSample]

View File

@ -164,13 +164,6 @@ func (h *Head) Appender(context.Context) storage.Appender {
func (h *Head) appender() *headAppender {
minValidTime := h.appendableMinValidTime()
appendID, cleanupAppendIDsBelow := h.iso.newAppendID(minValidTime) // Every appender gets an ID that is cleared upon commit/rollback.
// Allocate the exemplars buffer only if exemplars are enabled.
var exemplarsBuf []exemplarWithSeriesRef
if h.opts.EnableExemplarStorage {
exemplarsBuf = h.getExemplarBuffer()
}
return &headAppender{
head: h,
minValidTime: minValidTime,
@ -178,12 +171,9 @@ func (h *Head) appender() *headAppender {
maxt: math.MinInt64,
headMaxt: h.MaxTime(),
oooTimeWindow: h.opts.OutOfOrderTimeWindow.Load(),
samples: h.getAppendBuffer(),
sampleSeries: h.getSeriesBuffer(),
exemplars: exemplarsBuf,
histograms: h.getHistogramBuffer(),
floatHistograms: h.getFloatHistogramBuffer(),
metadata: h.getMetadataBuffer(),
seriesRefs: h.getRefSeriesBuffer(),
series: h.getSeriesBuffer(),
typesInBatch: map[chunks.HeadSeriesRef]sampleType{},
appendID: appendID,
cleanupAppendIDsBelow: cleanupAppendIDsBelow,
}
@ -213,16 +203,28 @@ func (h *Head) AppendableMinValidTime() (int64, bool) {
return h.appendableMinValidTime(), true
}
func (h *Head) getAppendBuffer() []record.RefSample {
b := h.appendPool.Get()
func (h *Head) getRefSeriesBuffer() []record.RefSeries {
b := h.refSeriesPool.Get()
if b == nil {
return make([]record.RefSeries, 0, 512)
}
return b
}
func (h *Head) putRefSeriesBuffer(b []record.RefSeries) {
h.refSeriesPool.Put(b[:0])
}
func (h *Head) getFloatBuffer() []record.RefSample {
b := h.floatsPool.Get()
if b == nil {
return make([]record.RefSample, 0, 512)
}
return b
}
func (h *Head) putAppendBuffer(b []record.RefSample) {
h.appendPool.Put(b[:0])
func (h *Head) putFloatBuffer(b []record.RefSample) {
h.floatsPool.Put(b[:0])
}
func (h *Head) getExemplarBuffer() []exemplarWithSeriesRef {
@ -312,6 +314,61 @@ type exemplarWithSeriesRef struct {
exemplar exemplar.Exemplar
}
// sampleType describes sample types we need to distinguish for append batching.
// We need separate types for everything that goes into a different WAL record
// type or into a different chunk encoding.
type sampleType byte
const (
stNone sampleType = iota // To mark that the sample type does not matter.
stFloat // All simple floats (counters, gauges, untyped). Goes to `floats`.
stHistogram // Native integer histograms with a standard exponential schema. Goes to `histograms`.
stCustomBucketHistogram // Native integer histograms with custom bucket boundaries. Goes to `histograms`.
stFloatHistogram // Native float histograms. Goes to `floatHistograms`.
stCustomBucketFloatHistogram // Native float histograms with custom bucket boundaries. Goes to `floatHistograms`.
)
// appendBatch is used to partition all the appended data into batches that are
// "type clean", i.e. every series receives only samples of one type within the
// batch. Types in this regard are defined by the sampleType enum above.
// TODO(beorn7): The same concept could be extended to make sure every series in
// the batch has at most one metadata record. This is currently not implemented
// because it is unclear if it is needed at all. (Maybe we will remove metadata
// records altogether, see issue #15911.)
type appendBatch struct {
floats []record.RefSample // New float samples held by this appender.
floatSeries []*memSeries // Float series corresponding to the samples held by this appender (using corresponding slice indices - same series may appear more than once).
histograms []record.RefHistogramSample // New histogram samples held by this appender.
histogramSeries []*memSeries // HistogramSamples series corresponding to the samples held by this appender (using corresponding slice indices - same series may appear more than once).
floatHistograms []record.RefFloatHistogramSample // New float histogram samples held by this appender.
floatHistogramSeries []*memSeries // FloatHistogramSamples series corresponding to the samples held by this appender (using corresponding slice indices - same series may appear more than once).
metadata []record.RefMetadata // New metadata held by this appender.
metadataSeries []*memSeries // Series corresponding to the metadata held by this appender.
exemplars []exemplarWithSeriesRef // New exemplars held by this appender.
}
// close returns all the slices to the pools in Head and nil's them.
func (b *appendBatch) close(h *Head) {
h.putFloatBuffer(b.floats)
b.floats = nil
h.putSeriesBuffer(b.floatSeries)
b.floatSeries = nil
h.putHistogramBuffer(b.histograms)
b.histograms = nil
h.putSeriesBuffer(b.histogramSeries)
b.histogramSeries = nil
h.putFloatHistogramBuffer(b.floatHistograms)
b.floatHistograms = nil
h.putSeriesBuffer(b.floatHistogramSeries)
b.floatHistogramSeries = nil
h.putMetadataBuffer(b.metadata)
b.metadata = nil
h.putSeriesBuffer(b.metadataSeries)
b.metadataSeries = nil
h.putExemplarBuffer(b.exemplars)
b.exemplars = nil
}
type headAppender struct {
head *Head
minValidTime int64 // No samples below this timestamp are allowed.
@ -321,15 +378,9 @@ type headAppender struct {
seriesRefs []record.RefSeries // New series records held by this appender.
series []*memSeries // New series held by this appender (using corresponding slices indexes from seriesRefs)
samples []record.RefSample // New float samples held by this appender.
sampleSeries []*memSeries // Float series corresponding to the samples held by this appender (using corresponding slice indices - same series may appear more than once).
histograms []record.RefHistogramSample // New histogram samples held by this appender.
histogramSeries []*memSeries // HistogramSamples series corresponding to the samples held by this appender (using corresponding slice indices - same series may appear more than once).
floatHistograms []record.RefFloatHistogramSample // New float histogram samples held by this appender.
floatHistogramSeries []*memSeries // FloatHistogramSamples series corresponding to the samples held by this appender (using corresponding slice indices - same series may appear more than once).
metadata []record.RefMetadata // New metadata held by this appender.
metadataSeries []*memSeries // Series corresponding to the metadata held by this appender.
exemplars []exemplarWithSeriesRef // New exemplars held by this appender.
batches []*appendBatch // Holds all the other data to append. (In regular cases, there should be only one of these.)
typesInBatch map[chunks.HeadSeriesRef]sampleType // Which (one) sample type each series holds in the most recent batch.
appendID, cleanupAppendIDsBelow uint64
closed bool
@ -357,21 +408,27 @@ func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64
}
}
s.Lock()
if value.IsStaleNaN(v) {
// TODO(krajorama): reorganize Commit() to handle samples in append order
// not floats first and then histograms. Then we could do this conversion
// in commit. This code should move into Commit().
switch {
case s.lastHistogramValue != nil:
s.Unlock()
// If we have added a sample before with this same appender, we
// can check the previously used type and turn a stale float
// sample into a stale histogram sample or stale float histogram
// sample as appropriate. This prevents an unnecessary creation
// of a new batch. However, since other appenders might append
// to the same series concurrently, this is not perfect but just
// an optimization for the more likely case.
switch a.typesInBatch[s.ref] {
case stHistogram, stCustomBucketHistogram:
return a.AppendHistogram(ref, lset, t, &histogram.Histogram{Sum: v}, nil)
case s.lastFloatHistogramValue != nil:
s.Unlock()
case stFloatHistogram, stCustomBucketFloatHistogram:
return a.AppendHistogram(ref, lset, t, nil, &histogram.FloatHistogram{Sum: v})
}
// Note that a series reference not yet in the map will come out
// as stNone, but since we do not handle that case separately,
// we do not need to check for the difference between "unknown
// series" and "known series with stNone".
}
s.Lock()
defer s.Unlock()
// TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise
// to skip that sample from the WAL and write only in the WBL.
@ -403,12 +460,13 @@ func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64
a.maxt = t
}
a.samples = append(a.samples, record.RefSample{
b := a.getCurrentBatch(stFloat, s.ref)
b.floats = append(b.floats, record.RefSample{
Ref: s.ref,
T: t,
V: v,
})
a.sampleSeries = append(a.sampleSeries, s)
b.floatSeries = append(b.floatSeries, s)
return storage.SeriesRef(s.ref), nil
}
@ -448,8 +506,9 @@ func (a *headAppender) AppendCTZeroSample(ref storage.SeriesRef, lset labels.Lab
if ct > a.maxt {
a.maxt = ct
}
a.samples = append(a.samples, record.RefSample{Ref: s.ref, T: ct, V: 0.0})
a.sampleSeries = append(a.sampleSeries, s)
b := a.getCurrentBatch(stFloat, s.ref)
b.floats = append(b.floats, record.RefSample{Ref: s.ref, T: ct, V: 0.0})
b.floatSeries = append(b.floatSeries, s)
return storage.SeriesRef(s.ref), nil
}
@ -476,6 +535,65 @@ func (a *headAppender) getOrCreate(lset labels.Labels) (s *memSeries, created bo
return s, created, nil
}
// getCurrentBatch returns the current batch if it fits the provided sampleType
// for the provided series. Otherwise, it adds a new batch and returns it.
func (a *headAppender) getCurrentBatch(st sampleType, s chunks.HeadSeriesRef) *appendBatch {
h := a.head
newBatch := func() *appendBatch {
b := appendBatch{
floats: h.getFloatBuffer(),
floatSeries: h.getSeriesBuffer(),
histograms: h.getHistogramBuffer(),
histogramSeries: h.getSeriesBuffer(),
floatHistograms: h.getFloatHistogramBuffer(),
floatHistogramSeries: h.getSeriesBuffer(),
metadata: h.getMetadataBuffer(),
metadataSeries: h.getSeriesBuffer(),
}
// Allocate the exemplars buffer only if exemplars are enabled.
if h.opts.EnableExemplarStorage {
b.exemplars = h.getExemplarBuffer()
}
clear(a.typesInBatch)
if st != stNone {
a.typesInBatch[s] = st
}
a.batches = append(a.batches, &b)
return &b
}
// First batch ever. Create it.
if len(a.batches) == 0 {
return newBatch()
}
// TODO(beorn7): If we ever see that the a.typesInBatch map grows so
// large that it matters for total memory consumption, we could limit
// the batch size here, i.e. cut a new batch even without a type change.
// Something like:
// if len(a.typesInBatch > limit) {
// return newBatch()
// }
lastBatch := a.batches[len(a.batches)-1]
if st == stNone {
// Type doesn't matter, last batch will always do.
return lastBatch
}
prevST, ok := a.typesInBatch[s]
switch {
case !ok: // New series. Add it to map and return current batch.
a.typesInBatch[s] = st
return lastBatch
case prevST == st: // Old series, same type. Just return batch.
return lastBatch
}
// An old series got a new type. Start new batch.
return newBatch()
}
// appendable checks whether the given sample is valid for appending to the series.
// If the sample is valid and in-order, it returns false with no error.
// If the sample belongs to the out-of-order chunk, it returns true with no error.
@ -638,7 +756,8 @@ func (a *headAppender) AppendExemplar(ref storage.SeriesRef, lset labels.Labels,
return 0, err
}
a.exemplars = append(a.exemplars, exemplarWithSeriesRef{ref, e})
b := a.getCurrentBatch(stNone, chunks.HeadSeriesRef(ref))
b.exemplars = append(b.exemplars, exemplarWithSeriesRef{ref, e})
return storage.SeriesRef(s.ref), nil
}
@ -667,11 +786,10 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels
}
}
var created bool
s := a.head.series.getByID(chunks.HeadSeriesRef(ref))
if s == nil {
var err error
s, created, err = a.getOrCreate(lset)
s, _, err = a.getOrCreate(lset)
if err != nil {
return 0, err
}
@ -680,14 +798,6 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels
switch {
case h != nil:
s.Lock()
// TODO(krajorama): reorganize Commit() to handle samples in append order
// not floats first and then histograms. Then we would not need to do this.
// This whole "if" should be removed.
if created && s.lastHistogramValue == nil && s.lastFloatHistogramValue == nil {
s.lastHistogramValue = &histogram.Histogram{}
}
// TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise
// to skip that sample from the WAL and write only in the WBL.
_, delta, err := s.appendableHistogram(t, h, a.headMaxt, a.minValidTime, a.oooTimeWindow)
@ -707,22 +817,19 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels
}
return 0, err
}
a.histograms = append(a.histograms, record.RefHistogramSample{
st := stHistogram
if h.UsesCustomBuckets() {
st = stCustomBucketHistogram
}
b := a.getCurrentBatch(st, s.ref)
b.histograms = append(b.histograms, record.RefHistogramSample{
Ref: s.ref,
T: t,
H: h,
})
a.histogramSeries = append(a.histogramSeries, s)
b.histogramSeries = append(b.histogramSeries, s)
case fh != nil:
s.Lock()
// TODO(krajorama): reorganize Commit() to handle samples in append order
// not floats first and then histograms. Then we would not need to do this.
// This whole "if" should be removed.
if created && s.lastHistogramValue == nil && s.lastFloatHistogramValue == nil {
s.lastFloatHistogramValue = &histogram.FloatHistogram{}
}
// TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise
// to skip that sample from the WAL and write only in the WBL.
_, delta, err := s.appendableFloatHistogram(t, fh, a.headMaxt, a.minValidTime, a.oooTimeWindow)
@ -742,12 +849,17 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels
}
return 0, err
}
a.floatHistograms = append(a.floatHistograms, record.RefFloatHistogramSample{
st := stFloatHistogram
if fh.UsesCustomBuckets() {
st = stCustomBucketFloatHistogram
}
b := a.getCurrentBatch(st, s.ref)
b.floatHistograms = append(b.floatHistograms, record.RefFloatHistogramSample{
Ref: s.ref,
T: t,
FH: fh,
})
a.floatHistogramSeries = append(a.floatHistogramSeries, s)
b.floatHistogramSeries = append(b.floatHistogramSeries, s)
}
if t < a.mint {
@ -769,11 +881,10 @@ func (a *headAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, lset l
return 0, storage.ErrCTNewerThanSample
}
var created bool
s := a.head.series.getByID(chunks.HeadSeriesRef(ref))
if s == nil {
var err error
s, created, err = a.getOrCreate(lset)
s, _, err = a.getOrCreate(lset)
if err != nil {
return 0, err
}
@ -784,16 +895,12 @@ func (a *headAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, lset l
zeroHistogram := &histogram.Histogram{
// The CTZeroSample represents a counter reset by definition.
CounterResetHint: histogram.CounterReset,
// Replicate other fields to avoid needless chunk creation.
Schema: h.Schema,
ZeroThreshold: h.ZeroThreshold,
CustomValues: h.CustomValues,
}
s.Lock()
// TODO(krajorama): reorganize Commit() to handle samples in append order
// not floats first and then histograms. Then we would not need to do this.
// This whole "if" should be removed.
if created && s.lastHistogramValue == nil && s.lastFloatHistogramValue == nil {
s.lastHistogramValue = zeroHistogram
}
// For CTZeroSamples OOO is not allowed.
// We set it to true to make this implementation as close as possible to the float implementation.
isOOO, _, err := s.appendableHistogram(ct, zeroHistogram, a.headMaxt, a.minValidTime, a.oooTimeWindow)
@ -815,26 +922,27 @@ func (a *headAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, lset l
s.pendingCommit = true
s.Unlock()
a.histograms = append(a.histograms, record.RefHistogramSample{
st := stHistogram
if h.UsesCustomBuckets() {
st = stCustomBucketHistogram
}
b := a.getCurrentBatch(st, s.ref)
b.histograms = append(b.histograms, record.RefHistogramSample{
Ref: s.ref,
T: ct,
H: zeroHistogram,
})
a.histogramSeries = append(a.histogramSeries, s)
b.histogramSeries = append(b.histogramSeries, s)
case fh != nil:
zeroFloatHistogram := &histogram.FloatHistogram{
// The CTZeroSample represents a counter reset by definition.
CounterResetHint: histogram.CounterReset,
// Replicate other fields to avoid needless chunk creation.
Schema: fh.Schema,
ZeroThreshold: fh.ZeroThreshold,
CustomValues: fh.CustomValues,
}
s.Lock()
// TODO(krajorama): reorganize Commit() to handle samples in append order
// not floats first and then histograms. Then we would not need to do this.
// This whole "if" should be removed.
if created && s.lastHistogramValue == nil && s.lastFloatHistogramValue == nil {
s.lastFloatHistogramValue = zeroFloatHistogram
}
// We set it to true to make this implementation as close as possible to the float implementation.
isOOO, _, err := s.appendableFloatHistogram(ct, zeroFloatHistogram, a.headMaxt, a.minValidTime, a.oooTimeWindow) // OOO is not allowed for CTZeroSamples.
if err != nil {
@ -855,12 +963,17 @@ func (a *headAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, lset l
s.pendingCommit = true
s.Unlock()
a.floatHistograms = append(a.floatHistograms, record.RefFloatHistogramSample{
st := stFloatHistogram
if fh.UsesCustomBuckets() {
st = stCustomBucketFloatHistogram
}
b := a.getCurrentBatch(st, s.ref)
b.floatHistograms = append(b.floatHistograms, record.RefFloatHistogramSample{
Ref: s.ref,
T: ct,
FH: zeroFloatHistogram,
})
a.floatHistogramSeries = append(a.floatHistogramSeries, s)
b.floatHistogramSeries = append(b.floatHistogramSeries, s)
}
if ct > a.maxt {
@ -889,13 +1002,14 @@ func (a *headAppender) UpdateMetadata(ref storage.SeriesRef, lset labels.Labels,
s.Unlock()
if hasNewMetadata {
a.metadata = append(a.metadata, record.RefMetadata{
b := a.getCurrentBatch(stNone, s.ref)
b.metadata = append(b.metadata, record.RefMetadata{
Ref: s.ref,
Type: record.GetMetricType(meta.Type),
Unit: meta.Unit,
Help: meta.Help,
})
a.metadataSeries = append(a.metadataSeries, s)
b.metadataSeries = append(b.metadataSeries, s)
}
return ref, nil
@ -932,25 +1046,26 @@ func (a *headAppender) log() error {
return fmt.Errorf("log series: %w", err)
}
}
if len(a.metadata) > 0 {
rec = enc.Metadata(a.metadata, buf)
for _, b := range a.batches {
if len(b.metadata) > 0 {
rec = enc.Metadata(b.metadata, buf)
buf = rec[:0]
if err := a.head.wal.Log(rec); err != nil {
return fmt.Errorf("log metadata: %w", err)
}
}
if len(a.samples) > 0 {
rec = enc.Samples(a.samples, buf)
if len(b.floats) > 0 {
rec = enc.Samples(b.floats, buf)
buf = rec[:0]
if err := a.head.wal.Log(rec); err != nil {
return fmt.Errorf("log samples: %w", err)
}
}
if len(a.histograms) > 0 {
if len(b.histograms) > 0 {
var customBucketsHistograms []record.RefHistogramSample
rec, customBucketsHistograms = enc.HistogramSamples(a.histograms, buf)
rec, customBucketsHistograms = enc.HistogramSamples(b.histograms, buf)
buf = rec[:0]
if len(rec) > 0 {
if err := a.head.wal.Log(rec); err != nil {
@ -965,9 +1080,9 @@ func (a *headAppender) log() error {
}
}
}
if len(a.floatHistograms) > 0 {
if len(b.floatHistograms) > 0 {
var customBucketsFloatHistograms []record.RefFloatHistogramSample
rec, customBucketsFloatHistograms = enc.FloatHistogramSamples(a.floatHistograms, buf)
rec, customBucketsFloatHistograms = enc.FloatHistogramSamples(b.floatHistograms, buf)
buf = rec[:0]
if len(rec) > 0 {
if err := a.head.wal.Log(rec); err != nil {
@ -986,14 +1101,15 @@ func (a *headAppender) log() error {
// otherwise it might happen that we send the exemplars in a remote write
// batch before the samples, which in turn means the exemplar is rejected
// for missing series, since series are created due to samples.
if len(a.exemplars) > 0 {
rec = enc.Exemplars(exemplarsForEncoding(a.exemplars), buf)
if len(b.exemplars) > 0 {
rec = enc.Exemplars(exemplarsForEncoding(b.exemplars), buf)
buf = rec[:0]
if err := a.head.wal.Log(rec); err != nil {
return fmt.Errorf("log exemplars: %w", err)
}
}
}
return nil
}
@ -1040,10 +1156,10 @@ type appenderCommitContext struct {
enc record.Encoder
}
// commitExemplars adds all exemplars from headAppender to the head's exemplar storage.
func (a *headAppender) commitExemplars() {
// commitExemplars adds all exemplars from the provided batch to the head's exemplar storage.
func (a *headAppender) commitExemplars(b *appendBatch) {
// No errors logging to WAL, so pass the exemplars along to the in memory storage.
for _, e := range a.exemplars {
for _, e := range b.exemplars {
s := a.head.series.getByID(chunks.HeadSeriesRef(e.ref))
if s == nil {
// This is very unlikely to happen, but we have seen it in the wild.
@ -1147,9 +1263,9 @@ func handleAppendableError(err error, appended, oooRejected, oobRejected, tooOld
}
}
// commitSamples processes and commits the samples in the headAppender to the series.
// It handles both in-order and out-of-order samples, updating the appenderCommitContext
// with the results of the append operations.
// commitFloats processes and commits the samples in the provided batch to the
// series. It handles both in-order and out-of-order samples, updating the
// appenderCommitContext with the results of the append operations.
//
// The function iterates over the samples in the headAppender and attempts to append each sample
// to its corresponding series. It handles various error cases such as out-of-order samples,
@ -1166,14 +1282,68 @@ func handleAppendableError(err error, appended, oooRejected, oobRejected, tooOld
// operations on the series after appending the samples.
//
// There are also specific functions to commit histograms and float histograms.
func (a *headAppender) commitSamples(acc *appenderCommitContext) {
func (a *headAppender) commitFloats(b *appendBatch, acc *appenderCommitContext) {
var ok, chunkCreated bool
var series *memSeries
for i, s := range a.samples {
series = a.sampleSeries[i]
for i, s := range b.floats {
series = b.floatSeries[i]
series.Lock()
if value.IsStaleNaN(s.V) {
// If a float staleness marker had been appended for a
// series that got a histogram or float histogram
// appended before via this same appender, it would not
// show up here because we had already converted it. We
// end up here for two reasons: (1) This is the very
// first sample for this series appended via this
// appender. (2) A float sample was appended to this
// series before via this same appender.
//
// In either case, we need to check the previous sample
// in the memSeries to append the appropriately typed
// staleness marker. This is obviously so in case (1).
// In case (2), we would usually expect a float sample
// as the previous sample, but there might be concurrent
// appends that have added a histogram sample in the
// meantime. (This will probably lead to OOO shenanigans
// anyway, but that's a different story.)
//
// If the last sample in the memSeries is indeed a
// float, we don't have to do anything special here and
// just go on with the normal commit for a float sample.
// However, if the last sample in the memSeries is a
// histogram or float histogram, we have to convert the
// staleness marker to a histogram (or float histogram,
// respectively), and just add it at the end of the
// histograms (or float histograms) in the same batch,
// to be committed later in commitHistograms (or
// commitFloatHistograms). The latter is fine because we
// know there is no other histogram (or float histogram)
// sample for this same series in this same batch
// (because any such sample would have triggered a new
// batch).
switch {
case series.lastHistogramValue != nil:
b.histograms = append(b.histograms, record.RefHistogramSample{
Ref: series.ref,
T: s.T,
H: &histogram.Histogram{Sum: s.V},
})
b.histogramSeries = append(b.histogramSeries, series)
series.Unlock()
continue
case series.lastFloatHistogramValue != nil:
b.floatHistograms = append(b.floatHistograms, record.RefFloatHistogramSample{
Ref: series.ref,
T: s.T,
FH: &histogram.FloatHistogram{Sum: s.V},
})
b.floatHistogramSeries = append(b.floatHistogramSeries, series)
series.Unlock()
continue
}
}
oooSample, _, err := series.appendable(s.T, s.V, a.headMaxt, a.minValidTime, a.oooTimeWindow)
if err != nil {
handleAppendableError(err, &acc.floatsAppended, &acc.floatOOORejected, &acc.floatOOBRejected, &acc.floatTooOldRejected)
@ -1261,15 +1431,24 @@ func (a *headAppender) commitSamples(acc *appenderCommitContext) {
}
}
// For details on the commitHistograms function, see the commitSamples docs.
func (a *headAppender) commitHistograms(acc *appenderCommitContext) {
// For details on the commitHistograms function, see the commitFloats docs.
func (a *headAppender) commitHistograms(b *appendBatch, acc *appenderCommitContext) {
var ok, chunkCreated bool
var series *memSeries
for i, s := range a.histograms {
series = a.histogramSeries[i]
for i, s := range b.histograms {
series = b.histogramSeries[i]
series.Lock()
// At this point, we could encounter a histogram staleness
// marker that should better be a float staleness marker or a
// float histogram staleness marker. This can only happen with
// concurrent appenders appending to the same series _and_ doing
// so in a mixed-type scenario. This case is expected to be very
// rare, so we do not bother here to convert the staleness
// marker. The worst case is that we need to cut a new chunk
// just for the staleness marker.
oooSample, _, err := series.appendableHistogram(s.T, s.H, a.headMaxt, a.minValidTime, a.oooTimeWindow)
if err != nil {
handleAppendableError(err, &acc.histogramsAppended, &acc.histoOOORejected, &acc.histoOOBRejected, &acc.histoTooOldRejected)
@ -1361,15 +1540,24 @@ func (a *headAppender) commitHistograms(acc *appenderCommitContext) {
}
}
// For details on the commitFloatHistograms function, see the commitSamples docs.
func (a *headAppender) commitFloatHistograms(acc *appenderCommitContext) {
// For details on the commitFloatHistograms function, see the commitFloats docs.
func (a *headAppender) commitFloatHistograms(b *appendBatch, acc *appenderCommitContext) {
var ok, chunkCreated bool
var series *memSeries
for i, s := range a.floatHistograms {
series = a.floatHistogramSeries[i]
for i, s := range b.floatHistograms {
series = b.floatHistogramSeries[i]
series.Lock()
// At this point, we could encounter a float histogram staleness
// marker that should better be a float staleness marker or an
// integer histogram staleness marker. This can only happen with
// concurrent appenders appending to the same series _and_ doing
// so in a mixed-type scenario. This case is expected to be very
// rare, so we do not bother here to convert the staleness
// marker. The worst case is that we need to cut a new chunk
// just for the staleness marker.
oooSample, _, err := series.appendableFloatHistogram(s.T, s.FH, a.headMaxt, a.minValidTime, a.oooTimeWindow)
if err != nil {
handleAppendableError(err, &acc.histogramsAppended, &acc.histoOOORejected, &acc.histoOOBRejected, &acc.histoTooOldRejected)
@ -1461,14 +1649,14 @@ func (a *headAppender) commitFloatHistograms(acc *appenderCommitContext) {
}
}
// commitMetadata commits the metadata for each series in the headAppender.
// commitMetadata commits the metadata for each series in the provided batch.
// It iterates over the metadata slice and updates the corresponding series
// with the new metadata information. The series is locked during the update
// to ensure thread safety.
func (a *headAppender) commitMetadata() {
func commitMetadata(b *appendBatch) {
var series *memSeries
for i, m := range a.metadata {
series = a.metadataSeries[i]
for i, m := range b.metadata {
series = b.metadataSeries[i]
series.Lock()
series.meta = &metadata.Metadata{Type: record.ToMetricType(m.Type), Unit: m.Unit, Help: m.Help}
series.Unlock()
@ -1489,75 +1677,82 @@ func (a *headAppender) Commit() (err error) {
if a.closed {
return ErrAppenderClosed
}
defer func() { a.closed = true }()
h := a.head
defer func() {
h.putRefSeriesBuffer(a.seriesRefs)
h.putSeriesBuffer(a.series)
a.closed = true
}()
if err := a.log(); err != nil {
_ = a.Rollback() // Most likely the same error will happen again.
return fmt.Errorf("write to WAL: %w", err)
}
if a.head.writeNotified != nil {
a.head.writeNotified.Notify()
if h.writeNotified != nil {
h.writeNotified.Notify()
}
a.commitExemplars()
defer a.head.metrics.activeAppenders.Dec()
defer a.head.putAppendBuffer(a.samples)
defer a.head.putSeriesBuffer(a.sampleSeries)
defer a.head.putExemplarBuffer(a.exemplars)
defer a.head.putHistogramBuffer(a.histograms)
defer a.head.putFloatHistogramBuffer(a.floatHistograms)
defer a.head.putMetadataBuffer(a.metadata)
defer a.head.iso.closeAppend(a.appendID)
acc := &appenderCommitContext{
floatsAppended: len(a.samples),
histogramsAppended: len(a.histograms) + len(a.floatHistograms),
inOrderMint: math.MaxInt64,
inOrderMaxt: math.MinInt64,
oooMinT: math.MaxInt64,
oooMaxT: math.MinInt64,
oooCapMax: a.head.opts.OutOfOrderCapMax.Load(),
oooCapMax: h.opts.OutOfOrderCapMax.Load(),
appendChunkOpts: chunkOpts{
chunkDiskMapper: a.head.chunkDiskMapper,
chunkRange: a.head.chunkRange.Load(),
samplesPerChunk: a.head.opts.SamplesPerChunk,
chunkDiskMapper: h.chunkDiskMapper,
chunkRange: h.chunkRange.Load(),
samplesPerChunk: h.opts.SamplesPerChunk,
},
}
for _, b := range a.batches {
acc.floatsAppended += len(b.floats)
acc.histogramsAppended += len(b.histograms) + len(b.floatHistograms)
a.commitExemplars(b)
defer b.close(h)
}
defer h.metrics.activeAppenders.Dec()
defer h.iso.closeAppend(a.appendID)
defer func() {
for i := range acc.oooRecords {
a.head.putBytesBuffer(acc.oooRecords[i][:0])
h.putBytesBuffer(acc.oooRecords[i][:0])
}
}()
a.commitSamples(acc)
a.commitHistograms(acc)
a.commitFloatHistograms(acc)
a.commitMetadata()
for _, b := range a.batches {
// Do not change the order of these calls. The staleness marker
// handling depends on it.
a.commitFloats(b, acc)
a.commitHistograms(b, acc)
a.commitFloatHistograms(b, acc)
commitMetadata(b)
}
// Unmark all series as pending commit after all samples have been committed.
a.unmarkCreatedSeriesAsPendingCommit()
a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeFloat).Add(float64(acc.floatOOORejected))
a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeHistogram).Add(float64(acc.histoOOORejected))
a.head.metrics.outOfBoundSamples.WithLabelValues(sampleMetricTypeFloat).Add(float64(acc.floatOOBRejected))
a.head.metrics.tooOldSamples.WithLabelValues(sampleMetricTypeFloat).Add(float64(acc.floatTooOldRejected))
a.head.metrics.samplesAppended.WithLabelValues(sampleMetricTypeFloat).Add(float64(acc.floatsAppended))
a.head.metrics.samplesAppended.WithLabelValues(sampleMetricTypeHistogram).Add(float64(acc.histogramsAppended))
a.head.metrics.outOfOrderSamplesAppended.WithLabelValues(sampleMetricTypeFloat).Add(float64(acc.oooFloatsAccepted))
a.head.metrics.outOfOrderSamplesAppended.WithLabelValues(sampleMetricTypeHistogram).Add(float64(acc.oooHistogramAccepted))
a.head.updateMinMaxTime(acc.inOrderMint, acc.inOrderMaxt)
a.head.updateMinOOOMaxOOOTime(acc.oooMinT, acc.oooMaxT)
h.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeFloat).Add(float64(acc.floatOOORejected))
h.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeHistogram).Add(float64(acc.histoOOORejected))
h.metrics.outOfBoundSamples.WithLabelValues(sampleMetricTypeFloat).Add(float64(acc.floatOOBRejected))
h.metrics.tooOldSamples.WithLabelValues(sampleMetricTypeFloat).Add(float64(acc.floatTooOldRejected))
h.metrics.samplesAppended.WithLabelValues(sampleMetricTypeFloat).Add(float64(acc.floatsAppended))
h.metrics.samplesAppended.WithLabelValues(sampleMetricTypeHistogram).Add(float64(acc.histogramsAppended))
h.metrics.outOfOrderSamplesAppended.WithLabelValues(sampleMetricTypeFloat).Add(float64(acc.oooFloatsAccepted))
h.metrics.outOfOrderSamplesAppended.WithLabelValues(sampleMetricTypeHistogram).Add(float64(acc.oooHistogramAccepted))
h.updateMinMaxTime(acc.inOrderMint, acc.inOrderMaxt)
h.updateMinOOOMaxOOOTime(acc.oooMinT, acc.oooMaxT)
acc.collectOOORecords(a)
if a.head.wbl != nil {
if err := a.head.wbl.Log(acc.oooRecords...); err != nil {
if h.wbl != nil {
if err := h.wbl.Log(acc.oooRecords...); err != nil {
// TODO(codesome): Currently WBL logging of ooo samples is best effort here since we cannot try logging
// until we have found what samples become OOO. We can try having a metric for this failure.
// Returning the error here is not correct because we have already put the samples into the memory,
// hence the append/insert was a success.
a.head.logger.Error("Failed to log out of order samples into the WAL", "err", err)
h.logger.Error("Failed to log out of order samples into the WAL", "err", err)
}
}
return nil
@ -2007,37 +2202,43 @@ func (a *headAppender) Rollback() (err error) {
if a.closed {
return ErrAppenderClosed
}
defer func() { a.closed = true }()
defer a.head.metrics.activeAppenders.Dec()
defer a.head.iso.closeAppend(a.appendID)
defer a.head.putSeriesBuffer(a.sampleSeries)
defer a.unmarkCreatedSeriesAsPendingCommit()
h := a.head
defer func() {
a.unmarkCreatedSeriesAsPendingCommit()
h.iso.closeAppend(a.appendID)
h.metrics.activeAppenders.Dec()
a.closed = true
h.putRefSeriesBuffer(a.seriesRefs)
h.putSeriesBuffer(a.series)
}()
var series *memSeries
for i := range a.samples {
series = a.sampleSeries[i]
fmt.Println("ROLLBACK")
for _, b := range a.batches {
for i := range b.floats {
series = b.floatSeries[i]
series.Lock()
series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow)
series.pendingCommit = false
series.Unlock()
}
for i := range a.histograms {
series = a.histogramSeries[i]
for i := range b.histograms {
series = b.histogramSeries[i]
series.Lock()
series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow)
series.pendingCommit = false
series.Unlock()
}
a.head.putAppendBuffer(a.samples)
a.head.putExemplarBuffer(a.exemplars)
a.head.putHistogramBuffer(a.histograms)
a.head.putFloatHistogramBuffer(a.floatHistograms)
a.head.putMetadataBuffer(a.metadata)
a.samples = nil
a.exemplars = nil
a.histograms = nil
a.metadata = nil
for i := range b.floatHistograms {
series = b.floatHistogramSeries[i]
series.Lock()
series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow)
series.pendingCommit = false
series.Unlock()
}
b.close(h)
}
a.batches = a.batches[:0]
// Series are created in the head memory regardless of rollback. Thus we have
// to log them to the WAL in any case.
return a.log()

View File

@ -5336,8 +5336,6 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) {
samples []chunks.Sample
expChunks int
err error
// If this is empty, samples above will be taken instead of this.
addToExp []chunks.Sample
}{
// Histograms that end up in the expected samples are copied here so that we
// can independently set the CounterResetHint later.
@ -5377,43 +5375,29 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) {
samples: []chunks.Sample{sample{t: 100, fh: floatHists[4].Copy()}},
err: storage.ErrOutOfOrderSample,
},
// The three next tests all failed before #15177 was fixed.
{
// Combination of histograms and float64 in the same commit. The behaviour is undefined, but we want to also
// verify how TSDB would behave. Here the histogram is appended at the end, hence will be considered as out of order.
samples: []chunks.Sample{
sample{t: 400, f: 4},
sample{t: 500, h: hists[5]}, // This won't be committed.
sample{t: 500, h: hists[5]},
sample{t: 600, f: 6},
},
addToExp: []chunks.Sample{
sample{t: 400, f: 4},
sample{t: 600, f: 6},
},
expChunks: 7, // Only 1 new chunk for float64.
expChunks: 9, // Each of the three samples above creates a new chunk because the type changes.
},
{
// Here the histogram is appended at the end, hence the first histogram is out of order.
samples: []chunks.Sample{
sample{t: 700, h: hists[7]}, // Out of order w.r.t. the next float64 sample that is appended first.
sample{t: 700, h: hists[7]},
sample{t: 800, f: 8},
sample{t: 900, h: hists[9]},
},
addToExp: []chunks.Sample{
sample{t: 800, f: 8},
sample{t: 900, h: hists[9].Copy()},
},
expChunks: 8, // float64 added to old chunk, only 1 new for histograms.
expChunks: 12, // Again each sample creates a new chunk.
},
{
// Float histogram is appended at the end.
samples: []chunks.Sample{
sample{t: 1000, fh: floatHists[7]}, // Out of order w.r.t. the next histogram.
sample{t: 1000, fh: floatHists[7]},
sample{t: 1100, h: hists[9]},
},
addToExp: []chunks.Sample{
sample{t: 1100, h: hists[9].Copy()},
},
expChunks: 8,
expChunks: 14, // Even changes between float and integer histogram create new chunks.
},
}
@ -5431,11 +5415,7 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) {
if a.err == nil {
require.NoError(t, app.Commit())
if len(a.addToExp) > 0 {
expResult = append(expResult, a.addToExp...)
} else {
expResult = append(expResult, a.samples...)
}
checkExpChunks(a.expChunks)
} else {
require.NoError(t, app.Rollback())
@ -6751,7 +6731,27 @@ func TestHeadAppender_AppendFloatWithSameTimestampAsPreviousHistogram(t *testing
func TestHeadAppender_AppendCT(t *testing.T) {
testHistogram := tsdbutil.GenerateTestHistogram(1)
testHistogram.CounterResetHint = histogram.NotCounterReset
testFloatHistogram := tsdbutil.GenerateTestFloatHistogram(1)
testFloatHistogram.CounterResetHint = histogram.NotCounterReset
// TODO(beorn7): Once issue #15346 is fixed, the CounterResetHint of the
// following two zero histograms should be histogram.CounterReset.
testZeroHistogram := &histogram.Histogram{
Schema: testHistogram.Schema,
ZeroThreshold: testHistogram.ZeroThreshold,
PositiveSpans: testHistogram.PositiveSpans,
NegativeSpans: testHistogram.NegativeSpans,
PositiveBuckets: []int64{0, 0, 0, 0},
NegativeBuckets: []int64{0, 0, 0, 0},
}
testZeroFloatHistogram := &histogram.FloatHistogram{
Schema: testFloatHistogram.Schema,
ZeroThreshold: testFloatHistogram.ZeroThreshold,
PositiveSpans: testFloatHistogram.PositiveSpans,
NegativeSpans: testFloatHistogram.NegativeSpans,
PositiveBuckets: []float64{0, 0, 0, 0},
NegativeBuckets: []float64{0, 0, 0, 0},
}
type appendableSamples struct {
ts int64
fSample float64
@ -6783,12 +6783,10 @@ func TestHeadAppender_AppendCT(t *testing.T) {
{ts: 101, h: testHistogram, ct: 1},
},
expectedSamples: func() []chunks.Sample {
hNoCounterReset := *testHistogram
hNoCounterReset.CounterResetHint = histogram.NotCounterReset
return []chunks.Sample{
sample{t: 1, h: &histogram.Histogram{}},
sample{t: 1, h: testZeroHistogram},
sample{t: 100, h: testHistogram},
sample{t: 101, h: &hNoCounterReset},
sample{t: 101, h: testHistogram},
}
}(),
},
@ -6799,12 +6797,10 @@ func TestHeadAppender_AppendCT(t *testing.T) {
{ts: 101, fh: testFloatHistogram, ct: 1},
},
expectedSamples: func() []chunks.Sample {
fhNoCounterReset := *testFloatHistogram
fhNoCounterReset.CounterResetHint = histogram.NotCounterReset
return []chunks.Sample{
sample{t: 1, fh: &histogram.FloatHistogram{}},
sample{t: 1, fh: testZeroFloatHistogram},
sample{t: 100, fh: testFloatHistogram},
sample{t: 101, fh: &fhNoCounterReset},
sample{t: 101, fh: testFloatHistogram},
}
}(),
},
@ -6827,12 +6823,10 @@ func TestHeadAppender_AppendCT(t *testing.T) {
{ts: 101, h: testHistogram, ct: 1},
},
expectedSamples: func() []chunks.Sample {
hNoCounterReset := *testHistogram
hNoCounterReset.CounterResetHint = histogram.NotCounterReset
return []chunks.Sample{
sample{t: 1, h: &histogram.Histogram{}},
sample{t: 1, h: testZeroHistogram},
sample{t: 100, h: testHistogram},
sample{t: 101, h: &hNoCounterReset},
sample{t: 101, h: testHistogram},
}
}(),
},
@ -6843,12 +6837,10 @@ func TestHeadAppender_AppendCT(t *testing.T) {
{ts: 101, fh: testFloatHistogram, ct: 1},
},
expectedSamples: func() []chunks.Sample {
fhNoCounterReset := *testFloatHistogram
fhNoCounterReset.CounterResetHint = histogram.NotCounterReset
return []chunks.Sample{
sample{t: 1, fh: &histogram.FloatHistogram{}},
sample{t: 1, fh: testZeroFloatHistogram},
sample{t: 100, fh: testFloatHistogram},
sample{t: 101, fh: &fhNoCounterReset},
sample{t: 101, fh: testFloatHistogram},
}
}(),
},
@ -6872,9 +6864,9 @@ func TestHeadAppender_AppendCT(t *testing.T) {
{ts: 102, h: testHistogram, ct: 101},
},
expectedSamples: []chunks.Sample{
sample{t: 1, h: &histogram.Histogram{}},
sample{t: 1, h: testZeroHistogram},
sample{t: 100, h: testHistogram},
sample{t: 101, h: &histogram.Histogram{CounterResetHint: histogram.UnknownCounterReset}},
sample{t: 101, h: testZeroHistogram},
sample{t: 102, h: testHistogram},
},
},
@ -6885,9 +6877,9 @@ func TestHeadAppender_AppendCT(t *testing.T) {
{ts: 102, fh: testFloatHistogram, ct: 101},
},
expectedSamples: []chunks.Sample{
sample{t: 1, fh: &histogram.FloatHistogram{}},
sample{t: 1, fh: testZeroFloatHistogram},
sample{t: 100, fh: testFloatHistogram},
sample{t: 101, fh: &histogram.FloatHistogram{CounterResetHint: histogram.UnknownCounterReset}},
sample{t: 101, fh: testZeroFloatHistogram},
sample{t: 102, fh: testFloatHistogram},
},
},
@ -6910,12 +6902,10 @@ func TestHeadAppender_AppendCT(t *testing.T) {
{ts: 101, h: testHistogram, ct: 100},
},
expectedSamples: func() []chunks.Sample {
hNoCounterReset := *testHistogram
hNoCounterReset.CounterResetHint = histogram.NotCounterReset
return []chunks.Sample{
sample{t: 1, h: &histogram.Histogram{}},
sample{t: 1, h: testZeroHistogram},
sample{t: 100, h: testHistogram},
sample{t: 101, h: &hNoCounterReset},
sample{t: 101, h: testHistogram},
}
}(),
},
@ -6926,12 +6916,10 @@ func TestHeadAppender_AppendCT(t *testing.T) {
{ts: 101, fh: testFloatHistogram, ct: 100},
},
expectedSamples: func() []chunks.Sample {
fhNoCounterReset := *testFloatHistogram
fhNoCounterReset.CounterResetHint = histogram.NotCounterReset
return []chunks.Sample{
sample{t: 1, fh: &histogram.FloatHistogram{}},
sample{t: 1, fh: testZeroFloatHistogram},
sample{t: 100, fh: testFloatHistogram},
sample{t: 101, fh: &fhNoCounterReset},
sample{t: 101, fh: testFloatHistogram},
}
}(),
},