Add support for temporality label in OTLP endpoint

Signed-off-by: Carrie Edwards <edwrdscarrie@gmail.com>
This commit is contained in:
Carrie Edwards 2025-07-31 08:20:12 -07:00
parent 2c04f2d7b1
commit 768ff54233
8 changed files with 100 additions and 13 deletions

View File

@ -119,7 +119,7 @@ var seps = []byte{'\xff'}
// if logOnOverwrite is true, the overwrite is logged. Resulting label names are sanitized.
// If settings.PromoteResourceAttributes is not empty, it's a set of resource attributes that should be promoted to labels.
func createAttributes(resource pcommon.Resource, attributes pcommon.Map, scope scope, settings Settings,
ignoreAttrs []string, logOnOverwrite bool, metadata prompb.MetricMetadata, extras ...string,
ignoreAttrs []string, logOnOverwrite bool, metadata prompb.MetricMetadata, temporality pmetric.AggregationTemporality, hasTemporality bool, isMonotonic bool, hasMonotonicity bool, extras ...string,
) []prompb.Label {
resourceAttrs := resource.Attributes()
serviceName, haveServiceName := resourceAttrs.Get(conventions.AttributeServiceName)
@ -145,6 +145,10 @@ func createAttributes(resource pcommon.Resource, attributes pcommon.Map, scope s
}
if settings.EnableTypeAndUnitLabels {
maxLabelCount += 2
// Add one more for temporality label if needed
if settings.AllowDeltaTemporality && hasTemporality {
maxLabelCount++
}
}
// Ensure attributes are sorted by key for consistent merging of keys which
@ -192,12 +196,46 @@ func createAttributes(resource pcommon.Resource, attributes pcommon.Map, scope s
if settings.EnableTypeAndUnitLabels {
unitNamer := otlptranslator.UnitNamer{UTF8Allowed: settings.AllowUTF8}
// Handle type label with delta temporality special case
if metadata.Type != prompb.MetricMetadata_UNKNOWN {
l["__type__"] = strings.ToLower(metadata.Type.String())
typeValue := strings.ToLower(metadata.Type.String())
// If delta temporality and NativeDelta is enabled, set type to gauge/gaugehistogram
if settings.AllowDeltaTemporality && hasTemporality && temporality == pmetric.AggregationTemporalityDelta {
switch metadata.Type {
case prompb.MetricMetadata_COUNTER:
typeValue = "gauge"
case prompb.MetricMetadata_HISTOGRAM:
typeValue = "gaugehistogram"
}
}
l["__type__"] = typeValue
}
if metadata.Unit != "" {
l["__unit__"] = unitNamer.Build(metadata.Unit)
}
// Add temporality label if conditions are met
if settings.AllowDeltaTemporality && hasTemporality {
switch temporality {
case pmetric.AggregationTemporalityCumulative:
l["__temporality__"] = "cumulative"
case pmetric.AggregationTemporalityDelta:
l["__temporality__"] = "delta"
}
}
// Add monotonicity label if conditions are met
if hasMonotonicity {
if isMonotonic {
l["__monotonicity__"] = "true"
} else {
l["__monotonicity__"] = "false"
}
}
}
// Map service.name + service.namespace to job.
@ -269,7 +307,7 @@ func aggregationTemporality(metric pmetric.Metric) (pmetric.AggregationTemporali
// However, work is under way to resolve this shortcoming through a feature called native histograms custom buckets:
// https://github.com/prometheus/prometheus/issues/13485.
func (c *PrometheusConverter) addHistogramDataPoints(ctx context.Context, dataPoints pmetric.HistogramDataPointSlice,
resource pcommon.Resource, settings Settings, metadata prompb.MetricMetadata, scope scope,
resource pcommon.Resource, settings Settings, metadata prompb.MetricMetadata, scope scope, temporality pmetric.AggregationTemporality, hasTemporality bool,
) error {
for x := 0; x < dataPoints.Len(); x++ {
if err := c.everyN.checkContext(ctx); err != nil {
@ -278,7 +316,7 @@ func (c *PrometheusConverter) addHistogramDataPoints(ctx context.Context, dataPo
pt := dataPoints.At(x)
timestamp := convertTimeStamp(pt.Timestamp())
baseLabels := createAttributes(resource, pt.Attributes(), scope, settings, nil, false, metadata)
baseLabels := createAttributes(resource, pt.Attributes(), scope, settings, nil, false, metadata, temporality, hasTemporality, false, false)
// If the sum is unset, it indicates the _sum metric point should be
// omitted
@ -488,7 +526,7 @@ func (c *PrometheusConverter) addSummaryDataPoints(ctx context.Context, dataPoin
pt := dataPoints.At(x)
timestamp := convertTimeStamp(pt.Timestamp())
baseLabels := createAttributes(resource, pt.Attributes(), scope, settings, nil, false, metadata)
baseLabels := createAttributes(resource, pt.Attributes(), scope, settings, nil, false, metadata, 0, false, false, false)
// treat sum as a sample in an individual TimeSeries
sum := &prompb.Sample{
@ -655,7 +693,7 @@ func addResourceTargetInfo(resource pcommon.Resource, settings Settings, earlies
// Do not pass identifying attributes as ignoreAttrs below.
identifyingAttrs = nil
}
labels := createAttributes(resource, attributes, scope{}, settings, identifyingAttrs, false, prompb.MetricMetadata{}, model.MetricNameLabel, name)
labels := createAttributes(resource, attributes, scope{}, settings, identifyingAttrs, false, prompb.MetricMetadata{}, 0, false, false, false, model.MetricNameLabel, name)
haveIdentifier := false
for _, l := range labels {
if l.Name == model.JobLabel || l.Name == model.InstanceLabel {

View File

@ -531,7 +531,7 @@ func TestCreateAttributes(t *testing.T) {
}),
PromoteScopeMetadata: tc.promoteScope,
}
lbls := createAttributes(resource, attrs, tc.scope, settings, tc.ignoreAttrs, false, prompb.MetricMetadata{}, model.MetricNameLabel, "test_metric")
lbls := createAttributes(resource, attrs, tc.scope, settings, tc.ignoreAttrs, false, prompb.MetricMetadata{}, 0, false, false, false, model.MetricNameLabel, "test_metric")
require.ElementsMatch(t, lbls, tc.expectedLabels)
})
@ -946,6 +946,8 @@ func TestPrometheusConverter_AddHistogramDataPoints(t *testing.T) {
},
prompb.MetricMetadata{MetricFamilyName: metric.Name()},
tt.scope,
pmetric.AggregationTemporalityCumulative,
true,
)
require.Equal(t, tt.want(), converter.unique)

View File

@ -60,6 +60,10 @@ func (c *PrometheusConverter) addExponentialHistogramDataPoints(ctx context.Cont
nil,
true,
metadata,
temporality,
true, // exponential histograms always have temporality
false, // exponential histograms don't have monotonicity
false, // exponential histograms don't have monotonicity
model.MetricNameLabel,
metadata.MetricFamilyName,
)
@ -279,6 +283,10 @@ func (c *PrometheusConverter) addCustomBucketsHistogramDataPoints(ctx context.Co
nil,
true,
metadata,
temporality,
true, // histograms always have temporality
false, // histograms don't have monotonicity
false, // histograms don't have monotonicity
model.MetricNameLabel,
metadata.MetricFamilyName,
)

View File

@ -171,9 +171,23 @@ func (c *PrometheusConverter) FromMetrics(ctx context.Context, md pmetric.Metric
continue
}
// For delta temporality, don't add any suffixes (unit or type)
var metricFamilyName string
if hasTemporality && temporality == pmetric.AggregationTemporalityDelta && settings.AllowDeltaTemporality {
// Create a namer without suffixes for delta temporality
deltaNamer := otlptranslator.MetricNamer{
Namespace: settings.Namespace,
WithMetricSuffixes: false, // Disable all suffixes for delta metrics
UTF8Allowed: settings.AllowUTF8,
}
metricFamilyName = deltaNamer.Build(TranslatorMetricFromOtelMetric(metric))
} else {
metricFamilyName = namer.Build(TranslatorMetricFromOtelMetric(metric))
}
metadata := prompb.MetricMetadata{
Type: otelMetricTypeToPromMetricType(metric),
MetricFamilyName: namer.Build(TranslatorMetricFromOtelMetric(metric)),
MetricFamilyName: metricFamilyName,
Help: metric.Description(),
Unit: metric.Unit(),
}
@ -200,7 +214,7 @@ func (c *PrometheusConverter) FromMetrics(ctx context.Context, md pmetric.Metric
errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name()))
break
}
if err := c.addSumNumberDataPoints(ctx, dataPoints, resource, metric, settings, metadata, scope); err != nil {
if err := c.addSumNumberDataPoints(ctx, dataPoints, resource, metric, settings, metadata, scope, temporality, hasTemporality); err != nil {
errs = multierr.Append(errs, err)
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return
@ -224,7 +238,7 @@ func (c *PrometheusConverter) FromMetrics(ctx context.Context, md pmetric.Metric
}
}
} else {
if err := c.addHistogramDataPoints(ctx, dataPoints, resource, settings, metadata, scope); err != nil {
if err := c.addHistogramDataPoints(ctx, dataPoints, resource, settings, metadata, scope, temporality, hasTemporality); err != nil {
errs = multierr.Append(errs, err)
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return

View File

@ -1037,7 +1037,12 @@ func createExportRequest(resourceAttributeCount, histogramCount, nonHistogramCou
var suffix string
if settings.AddMetricSuffixes {
suffix = "_unit"
// For delta temporality, don't add unit suffixes
if temporality == pmetric.AggregationTemporalityDelta && settings.AllowDeltaTemporality {
suffix = "" // No unit suffix for delta metrics
} else {
suffix = "_unit"
}
}
var wantPromMetrics []wantPrometheusMetric
for i := 1; i <= histogramCount; i++ {
@ -1132,7 +1137,12 @@ func createExportRequest(resourceAttributeCount, histogramCount, nonHistogramCou
var counterSuffix string
if settings.AddMetricSuffixes {
counterSuffix = suffix + "_total"
// For delta temporality, don't add any suffixes
if temporality == pmetric.AggregationTemporalityDelta && settings.AllowDeltaTemporality {
counterSuffix = "" // No suffixes at all for delta metrics
} else {
counterSuffix = suffix + "_total"
}
}
metricType := prompb.MetricMetadata_COUNTER

View File

@ -45,6 +45,10 @@ func (c *PrometheusConverter) addGaugeNumberDataPoints(ctx context.Context, data
nil,
true,
metadata,
0, // gauge has no temporality
false, // gauge has no temporality
false, // gauge has no monotonicity
false, // gauge has no monotonicity
model.MetricNameLabel,
metadata.MetricFamilyName,
)
@ -69,7 +73,7 @@ func (c *PrometheusConverter) addGaugeNumberDataPoints(ctx context.Context, data
}
func (c *PrometheusConverter) addSumNumberDataPoints(ctx context.Context, dataPoints pmetric.NumberDataPointSlice,
resource pcommon.Resource, metric pmetric.Metric, settings Settings, metadata prompb.MetricMetadata, scope scope,
resource pcommon.Resource, metric pmetric.Metric, settings Settings, metadata prompb.MetricMetadata, scope scope, temporality pmetric.AggregationTemporality, hasTemporality bool,
) error {
for x := 0; x < dataPoints.Len(); x++ {
if err := c.everyN.checkContext(ctx); err != nil {
@ -85,6 +89,10 @@ func (c *PrometheusConverter) addSumNumberDataPoints(ctx context.Context, dataPo
nil,
true,
metadata,
temporality,
hasTemporality,
metric.Sum().IsMonotonic(),
true, // Sum metrics always have monotonicity info
model.MetricNameLabel,
metadata.MetricFamilyName,
)

View File

@ -364,6 +364,8 @@ func TestPrometheusConverter_addSumNumberDataPoints(t *testing.T) {
},
prompb.MetricMetadata{MetricFamilyName: metric.Name()},
tt.scope,
pmetric.AggregationTemporalityCumulative,
true,
)
require.Equal(t, tt.want(), converter.unique)

View File

@ -545,6 +545,11 @@ func NewOTLPWriteHandler(logger *slog.Logger, _ prometheus.Registerer, appendabl
panic("cannot enable native delta ingestion and delta2cumulative conversion at the same time")
}
if opts.NativeDelta && !opts.EnableTypeAndUnitLabels {
// This should be validated when iterating through feature flags, so not expected to fail here.
panic("cannot enable native delta ingestion without enabling type and unit labels")
}
ex := &rwExporter{
writeHandler: &writeHandler{
logger: logger,