Add temporality label for metrics ingested in OTLP endpoint

Signed-off-by: Carrie Edwards <edwrdscarrie@gmail.com>
This commit is contained in:
Carrie Edwards 2025-07-31 10:20:57 -07:00
parent ff3882fd35
commit 76ff1ac360
9 changed files with 62 additions and 15 deletions

View File

@ -285,8 +285,9 @@ func (c *flagConfig) setFeatureListOptions(logger *slog.Logger) error {
logger.Info("Converting delta OTLP metrics to cumulative")
case "otlp-native-delta-ingestion":
// Experimental OTLP native delta ingestion.
// This currently just stores the raw delta value as-is with unknown metric type. Better typing and
// type-aware functions may come later.
// This currently stores the raw delta value as-is with a __temporality__
// label set to "delta" and a __type__ label set to "gauge"/"gaugehistogram".
// The type-and-unit-labels flag must be enabled as well.
// See proposal: https://github.com/prometheus/proposals/pull/48
c.web.NativeOTLPDeltaIngestion = true
logger.Info("Enabling native ingestion of delta OTLP metrics, storing the raw sample values without conversion. WARNING: Delta support is in an early stage of development. The ingestion and querying process is likely to change over time.")
@ -307,6 +308,10 @@ func (c *flagConfig) setFeatureListOptions(logger *slog.Logger) error {
return errors.New("cannot enable otlp-deltatocumulative and otlp-native-delta-ingestion features at the same time")
}
if c.web.NativeOTLPDeltaIngestion && !c.web.EnableTypeAndUnitLabels {
return errors.New("cannot enable otlp-native-delta-ingestion feature without enabling type and unit labels feature")
}
return nil
}

View File

@ -239,9 +239,9 @@ Examples of equivalent durations:
`--enable-feature=otlp-native-delta-ingestion`
When enabled, allows for the native ingestion of delta OTLP metrics, storing the raw sample values without conversion. This cannot be enabled in conjunction with `otlp-deltatocumulative`.
When enabled, allows for the native ingestion of delta OTLP metrics, storing the raw sample values without conversion. This cannot be enabled in conjunction with `otlp-deltatocumulative`, and `type-and-unit-labels` must be enabled.
Currently, the StartTimeUnixNano field is ignored, and deltas are given the unknown metric metadata type.
Currently, the StartTimeUnixNano field is ignored. Delta metrics are given a `__temporality__` label with a value of "delta" and a `__type__` label with a value of "gauge"/"gaugehistogram".
Delta support is in a very early stage of development and the ingestion and querying process my change over time. For the open proposal see [prometheus/proposals#48](https://github.com/prometheus/proposals/pull/48).
@ -260,7 +260,7 @@ These may not work well if the `<range>` is not a multiple of the collection int
* If delta metrics are exposed via [federation](https://prometheus.io/docs/prometheus/latest/federation/), data can be incorrectly collected if the ingestion interval is not the same as the scrape interval for the federated endpoint.
* It is difficult to figure out whether a metric has delta or cumulative temporality, since there's no indication of temporality in metric names or labels. For now, if you are ingesting a mix of delta and cumulative metrics we advise you to explicitly add your own labels to distinguish them. In the future, we plan to introduce type labels to consistently distinguish metric types and potentially make PromQL functions type-aware (e.g. providing warnings when cumulative-only functions are used with delta metrics).
* In the future, we plan to introduce type labels to consistently distinguish metric types and potentially make PromQL functions type-aware (e.g. providing warnings when cumulative-only functions are used with delta metrics).
* If there are multiple samples being ingested at the same timestamp, only one of the points is kept - the samples are **not** summed together (this is how Prometheus works in general - duplicate timestamp samples are rejected). Any aggregation will have to be done before sending samples to Prometheus.

View File

@ -119,7 +119,7 @@ var seps = []byte{'\xff'}
// if logOnOverwrite is true, the overwrite is logged. Resulting label names are sanitized.
// If settings.PromoteResourceAttributes is not empty, it's a set of resource attributes that should be promoted to labels.
func createAttributes(resource pcommon.Resource, attributes pcommon.Map, scope scope, settings Settings,
ignoreAttrs []string, logOnOverwrite bool, metadata prompb.MetricMetadata, extras ...string,
ignoreAttrs []string, logOnOverwrite bool, metadata prompb.MetricMetadata, temporality pmetric.AggregationTemporality, hasTemporality bool, extras ...string,
) []prompb.Label {
resourceAttrs := resource.Attributes()
serviceName, haveServiceName := resourceAttrs.Get(conventions.AttributeServiceName)
@ -145,6 +145,9 @@ func createAttributes(resource pcommon.Resource, attributes pcommon.Map, scope s
}
if settings.EnableTypeAndUnitLabels {
maxLabelCount += 2
if settings.AllowDeltaTemporality && hasTemporality {
maxLabelCount++
}
}
// Ensure attributes are sorted by key for consistent merging of keys which
@ -192,12 +195,34 @@ func createAttributes(resource pcommon.Resource, attributes pcommon.Map, scope s
if settings.EnableTypeAndUnitLabels {
unitNamer := otlptranslator.UnitNamer{UTF8Allowed: settings.AllowUTF8}
if metadata.Type != prompb.MetricMetadata_UNKNOWN {
l["__type__"] = strings.ToLower(metadata.Type.String())
typeValue := strings.ToLower(metadata.Type.String())
if settings.AllowDeltaTemporality && hasTemporality && temporality == pmetric.AggregationTemporalityDelta {
switch metadata.Type {
case prompb.MetricMetadata_COUNTER:
typeValue = "gauge"
case prompb.MetricMetadata_HISTOGRAM:
typeValue = "gaugehistogram"
}
}
l["__type__"] = typeValue
}
if metadata.Unit != "" {
l["__unit__"] = unitNamer.Build(metadata.Unit)
}
if settings.AllowDeltaTemporality && hasTemporality {
switch temporality {
case pmetric.AggregationTemporalityCumulative:
l["__temporality__"] = "cumulative"
case pmetric.AggregationTemporalityDelta:
l["__temporality__"] = "delta"
}
}
}
// Map service.name + service.namespace to job.
@ -269,7 +294,7 @@ func aggregationTemporality(metric pmetric.Metric) (pmetric.AggregationTemporali
// However, work is under way to resolve this shortcoming through a feature called native histograms custom buckets:
// https://github.com/prometheus/prometheus/issues/13485.
func (c *PrometheusConverter) addHistogramDataPoints(ctx context.Context, dataPoints pmetric.HistogramDataPointSlice,
resource pcommon.Resource, settings Settings, metadata prompb.MetricMetadata, scope scope,
resource pcommon.Resource, settings Settings, metadata prompb.MetricMetadata, scope scope, temporality pmetric.AggregationTemporality, hasTemporality bool,
) error {
for x := 0; x < dataPoints.Len(); x++ {
if err := c.everyN.checkContext(ctx); err != nil {
@ -278,7 +303,7 @@ func (c *PrometheusConverter) addHistogramDataPoints(ctx context.Context, dataPo
pt := dataPoints.At(x)
timestamp := convertTimeStamp(pt.Timestamp())
baseLabels := createAttributes(resource, pt.Attributes(), scope, settings, nil, false, metadata)
baseLabels := createAttributes(resource, pt.Attributes(), scope, settings, nil, false, metadata, temporality, hasTemporality)
// If the sum is unset, it indicates the _sum metric point should be
// omitted
@ -488,7 +513,7 @@ func (c *PrometheusConverter) addSummaryDataPoints(ctx context.Context, dataPoin
pt := dataPoints.At(x)
timestamp := convertTimeStamp(pt.Timestamp())
baseLabels := createAttributes(resource, pt.Attributes(), scope, settings, nil, false, metadata)
baseLabels := createAttributes(resource, pt.Attributes(), scope, settings, nil, false, metadata, 0, false)
// treat sum as a sample in an individual TimeSeries
sum := &prompb.Sample{
@ -655,7 +680,7 @@ func addResourceTargetInfo(resource pcommon.Resource, settings Settings, earlies
// Do not pass identifying attributes as ignoreAttrs below.
identifyingAttrs = nil
}
labels := createAttributes(resource, attributes, scope{}, settings, identifyingAttrs, false, prompb.MetricMetadata{}, model.MetricNameLabel, name)
labels := createAttributes(resource, attributes, scope{}, settings, identifyingAttrs, false, prompb.MetricMetadata{}, 0, false, model.MetricNameLabel, name)
haveIdentifier := false
for _, l := range labels {
if l.Name == model.JobLabel || l.Name == model.InstanceLabel {

View File

@ -531,7 +531,7 @@ func TestCreateAttributes(t *testing.T) {
}),
PromoteScopeMetadata: tc.promoteScope,
}
lbls := createAttributes(resource, attrs, tc.scope, settings, tc.ignoreAttrs, false, prompb.MetricMetadata{}, model.MetricNameLabel, "test_metric")
lbls := createAttributes(resource, attrs, tc.scope, settings, tc.ignoreAttrs, false, prompb.MetricMetadata{}, 0, false, model.MetricNameLabel, "test_metric")
require.ElementsMatch(t, lbls, tc.expectedLabels)
})
@ -946,6 +946,8 @@ func TestPrometheusConverter_AddHistogramDataPoints(t *testing.T) {
},
prompb.MetricMetadata{MetricFamilyName: metric.Name()},
tt.scope,
pmetric.AggregationTemporalityCumulative,
true,
)
require.Equal(t, tt.want(), converter.unique)

View File

@ -60,6 +60,8 @@ func (c *PrometheusConverter) addExponentialHistogramDataPoints(ctx context.Cont
nil,
true,
metadata,
temporality,
true,
model.MetricNameLabel,
metadata.MetricFamilyName,
)
@ -279,6 +281,8 @@ func (c *PrometheusConverter) addCustomBucketsHistogramDataPoints(ctx context.Co
nil,
true,
metadata,
temporality,
true,
model.MetricNameLabel,
metadata.MetricFamilyName,
)

View File

@ -200,7 +200,7 @@ func (c *PrometheusConverter) FromMetrics(ctx context.Context, md pmetric.Metric
errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name()))
break
}
if err := c.addSumNumberDataPoints(ctx, dataPoints, resource, metric, settings, metadata, scope); err != nil {
if err := c.addSumNumberDataPoints(ctx, dataPoints, resource, metric, settings, metadata, scope, temporality, hasTemporality); err != nil {
errs = multierr.Append(errs, err)
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return
@ -224,7 +224,7 @@ func (c *PrometheusConverter) FromMetrics(ctx context.Context, md pmetric.Metric
}
}
} else {
if err := c.addHistogramDataPoints(ctx, dataPoints, resource, settings, metadata, scope); err != nil {
if err := c.addHistogramDataPoints(ctx, dataPoints, resource, settings, metadata, scope, temporality, hasTemporality); err != nil {
errs = multierr.Append(errs, err)
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return

View File

@ -45,6 +45,8 @@ func (c *PrometheusConverter) addGaugeNumberDataPoints(ctx context.Context, data
nil,
true,
metadata,
0,
false,
model.MetricNameLabel,
metadata.MetricFamilyName,
)
@ -69,7 +71,7 @@ func (c *PrometheusConverter) addGaugeNumberDataPoints(ctx context.Context, data
}
func (c *PrometheusConverter) addSumNumberDataPoints(ctx context.Context, dataPoints pmetric.NumberDataPointSlice,
resource pcommon.Resource, metric pmetric.Metric, settings Settings, metadata prompb.MetricMetadata, scope scope,
resource pcommon.Resource, metric pmetric.Metric, settings Settings, metadata prompb.MetricMetadata, scope scope, temporality pmetric.AggregationTemporality, hasTemporality bool,
) error {
for x := 0; x < dataPoints.Len(); x++ {
if err := c.everyN.checkContext(ctx); err != nil {
@ -85,6 +87,8 @@ func (c *PrometheusConverter) addSumNumberDataPoints(ctx context.Context, dataPo
nil,
true,
metadata,
temporality,
hasTemporality,
model.MetricNameLabel,
metadata.MetricFamilyName,
)

View File

@ -364,6 +364,8 @@ func TestPrometheusConverter_addSumNumberDataPoints(t *testing.T) {
},
prompb.MetricMetadata{MetricFamilyName: metric.Name()},
tt.scope,
pmetric.AggregationTemporalityCumulative,
true,
)
require.Equal(t, tt.want(), converter.unique)

View File

@ -545,6 +545,11 @@ func NewOTLPWriteHandler(logger *slog.Logger, _ prometheus.Registerer, appendabl
panic("cannot enable native delta ingestion and delta2cumulative conversion at the same time")
}
if opts.NativeDelta && !opts.EnableTypeAndUnitLabels {
// This should be validated when iterating through feature flags, so not expected to fail here.
panic("cannot enable native delta ingestion without enabling type and unit labels")
}
ex := &rwExporter{
writeHandler: &writeHandler{
logger: logger,