prometheus/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go

351 lines
12 KiB
Go
Raw Normal View History

// Copyright 2024 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Provenance-includes-location: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/95e8f8fdc2a9dc87230406c9a3cf02be4fd68bea/pkg/translator/prometheusremotewrite/metrics_to_prw.go
// Provenance-includes-license: Apache-2.0
// Provenance-includes-copyright: Copyright The OpenTelemetry Authors.
package prometheusremotewrite
import (
"context"
"errors"
"fmt"
"sort"
"github.com/prometheus/otlptranslator"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/multierr"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/util/annotations"
)
type PromoteResourceAttributes struct {
promoteAll bool
attrs map[string]struct{}
}
type Settings struct {
Namespace string
ExternalLabels map[string]string
DisableTargetInfo bool
ExportCreatedMetric bool
AddMetricSuffixes bool
AllowUTF8 bool
PromoteResourceAttributes *PromoteResourceAttributes
KeepIdentifyingResourceAttributes bool
ConvertHistogramsToNHCB bool
Add primitive support for ingesting OTLP delta metrics as-is (#16360) * Add simple delta support Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Rename delta2cumulative part Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Whoops bad refactor Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Add example yml Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Feature flag instead and histogram hint handling Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Delete otel_delta.yml - outdated Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Renaming to native delta support Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Add more explanatory comments Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Add more explanation to histograms Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Correct comment on d2c consumer Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Add tests for counters and fix bug Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Add histogram tests Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Add docs Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Sort series to make test deterministic Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * More formatting Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Change flag name to ingestion Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Explain where rate calculation can go wrong Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Add warning about duplicate timestamps Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Update docs/feature_flags.md Co-authored-by: Arthur Silva Sens <arthursens2005@gmail.com> Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Fix tests Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Remove unnecessary if Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Add warning to d2c section Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Make unknown type error when getting temporality Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Correct type comment - not planning to add delta metric metadata type Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Remove unused param for empty type Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Rewrite temporality logic to be clearer Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Change spurious to unnecessary - better description Signed-off-by: Fiona Liao <fiona.liao@grafana.com> --------- Signed-off-by: Fiona Liao <fiona.liao@grafana.com> Co-authored-by: Arthur Silva Sens <arthursens2005@gmail.com>
2025-04-23 20:58:02 +08:00
AllowDeltaTemporality bool
}
// PrometheusConverter converts from OTel write format to Prometheus remote write format.
type PrometheusConverter struct {
unique map[uint64]*prompb.TimeSeries
conflicts map[uint64][]*prompb.TimeSeries
everyN everyNTimes
metadata []prompb.MetricMetadata
}
func NewPrometheusConverter() *PrometheusConverter {
return &PrometheusConverter{
unique: map[uint64]*prompb.TimeSeries{},
conflicts: map[uint64][]*prompb.TimeSeries{},
}
}
func TranslatorMetricFromOtelMetric(metric pmetric.Metric) otlptranslator.Metric {
m := otlptranslator.Metric{
Name: metric.Name(),
Unit: metric.Unit(),
Type: otlptranslator.MetricTypeUnknown,
}
switch metric.Type() {
case pmetric.MetricTypeGauge:
m.Type = otlptranslator.MetricTypeGauge
case pmetric.MetricTypeSum:
if metric.Sum().IsMonotonic() {
m.Type = otlptranslator.MetricTypeMonotonicCounter
} else {
m.Type = otlptranslator.MetricTypeNonMonotonicCounter
}
case pmetric.MetricTypeSummary:
m.Type = otlptranslator.MetricTypeSummary
case pmetric.MetricTypeHistogram:
m.Type = otlptranslator.MetricTypeHistogram
case pmetric.MetricTypeExponentialHistogram:
m.Type = otlptranslator.MetricTypeExponentialHistogram
}
return m
}
// FromMetrics converts pmetric.Metrics to Prometheus remote write format.
func (c *PrometheusConverter) FromMetrics(ctx context.Context, md pmetric.Metrics, settings Settings) (annots annotations.Annotations, errs error) {
namer := otlptranslator.MetricNamer{
Namespace: settings.Namespace,
WithMetricSuffixes: settings.AddMetricSuffixes,
UTF8Allowed: settings.AllowUTF8,
}
c.everyN = everyNTimes{n: 128}
resourceMetricsSlice := md.ResourceMetrics()
numMetrics := 0
for i := 0; i < resourceMetricsSlice.Len(); i++ {
scopeMetricsSlice := resourceMetricsSlice.At(i).ScopeMetrics()
for j := 0; j < scopeMetricsSlice.Len(); j++ {
numMetrics += scopeMetricsSlice.At(j).Metrics().Len()
}
}
c.metadata = make([]prompb.MetricMetadata, 0, numMetrics)
for i := 0; i < resourceMetricsSlice.Len(); i++ {
resourceMetrics := resourceMetricsSlice.At(i)
resource := resourceMetrics.Resource()
scopeMetricsSlice := resourceMetrics.ScopeMetrics()
// keep track of the most recent timestamp in the ResourceMetrics for
// use with the "target" info metric
var mostRecentTimestamp pcommon.Timestamp
for j := 0; j < scopeMetricsSlice.Len(); j++ {
metricSlice := scopeMetricsSlice.At(j).Metrics()
// TODO: decide if instrumentation library information should be exported as labels
for k := 0; k < metricSlice.Len(); k++ {
if err := c.everyN.checkContext(ctx); err != nil {
errs = multierr.Append(errs, err)
return
}
metric := metricSlice.At(k)
mostRecentTimestamp = max(mostRecentTimestamp, mostRecentTimestampInMetric(metric))
Add primitive support for ingesting OTLP delta metrics as-is (#16360) * Add simple delta support Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Rename delta2cumulative part Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Whoops bad refactor Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Add example yml Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Feature flag instead and histogram hint handling Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Delete otel_delta.yml - outdated Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Renaming to native delta support Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Add more explanatory comments Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Add more explanation to histograms Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Correct comment on d2c consumer Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Add tests for counters and fix bug Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Add histogram tests Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Add docs Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Sort series to make test deterministic Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * More formatting Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Change flag name to ingestion Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Explain where rate calculation can go wrong Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Add warning about duplicate timestamps Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Update docs/feature_flags.md Co-authored-by: Arthur Silva Sens <arthursens2005@gmail.com> Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Fix tests Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Remove unnecessary if Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Add warning to d2c section Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Make unknown type error when getting temporality Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Correct type comment - not planning to add delta metric metadata type Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Remove unused param for empty type Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Rewrite temporality logic to be clearer Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Change spurious to unnecessary - better description Signed-off-by: Fiona Liao <fiona.liao@grafana.com> --------- Signed-off-by: Fiona Liao <fiona.liao@grafana.com> Co-authored-by: Arthur Silva Sens <arthursens2005@gmail.com>
2025-04-23 20:58:02 +08:00
temporality, hasTemporality, err := aggregationTemporality(metric)
if err != nil {
errs = multierr.Append(errs, err)
continue
}
Add primitive support for ingesting OTLP delta metrics as-is (#16360) * Add simple delta support Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Rename delta2cumulative part Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Whoops bad refactor Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Add example yml Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Feature flag instead and histogram hint handling Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Delete otel_delta.yml - outdated Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Renaming to native delta support Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Add more explanatory comments Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Add more explanation to histograms Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Correct comment on d2c consumer Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Add tests for counters and fix bug Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Add histogram tests Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Add docs Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Sort series to make test deterministic Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * More formatting Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Change flag name to ingestion Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Explain where rate calculation can go wrong Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Add warning about duplicate timestamps Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Update docs/feature_flags.md Co-authored-by: Arthur Silva Sens <arthursens2005@gmail.com> Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Fix tests Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Remove unnecessary if Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Add warning to d2c section Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Make unknown type error when getting temporality Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Correct type comment - not planning to add delta metric metadata type Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Remove unused param for empty type Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Rewrite temporality logic to be clearer Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Change spurious to unnecessary - better description Signed-off-by: Fiona Liao <fiona.liao@grafana.com> --------- Signed-off-by: Fiona Liao <fiona.liao@grafana.com> Co-authored-by: Arthur Silva Sens <arthursens2005@gmail.com>
2025-04-23 20:58:02 +08:00
if hasTemporality &&
// Cumulative temporality is always valid.
// Delta temporality is also valid if AllowDeltaTemporality is true.
// All other temporality values are invalid.
(temporality != pmetric.AggregationTemporalityCumulative &&
(!settings.AllowDeltaTemporality || temporality != pmetric.AggregationTemporalityDelta)) {
errs = multierr.Append(errs, fmt.Errorf("invalid temporality and type combination for metric %q", metric.Name()))
continue
}
promName := namer.Build(TranslatorMetricFromOtelMetric(metric))
c.metadata = append(c.metadata, prompb.MetricMetadata{
Type: otelMetricTypeToPromMetricType(metric),
MetricFamilyName: promName,
Help: metric.Description(),
Unit: metric.Unit(),
})
// handle individual metrics based on type
//exhaustive:enforce
switch metric.Type() {
case pmetric.MetricTypeGauge:
dataPoints := metric.Gauge().DataPoints()
if dataPoints.Len() == 0 {
errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name()))
break
}
if err := c.addGaugeNumberDataPoints(ctx, dataPoints, resource, settings, promName); err != nil {
errs = multierr.Append(errs, err)
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return
}
}
case pmetric.MetricTypeSum:
dataPoints := metric.Sum().DataPoints()
if dataPoints.Len() == 0 {
errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name()))
break
}
if err := c.addSumNumberDataPoints(ctx, dataPoints, resource, metric, settings, promName); err != nil {
errs = multierr.Append(errs, err)
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return
}
}
case pmetric.MetricTypeHistogram:
dataPoints := metric.Histogram().DataPoints()
if dataPoints.Len() == 0 {
errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name()))
break
}
if settings.ConvertHistogramsToNHCB {
ws, err := c.addCustomBucketsHistogramDataPoints(
ctx, dataPoints, resource, settings, promName, temporality,
)
annots.Merge(ws)
if err != nil {
errs = multierr.Append(errs, err)
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return
}
}
} else {
if err := c.addHistogramDataPoints(ctx, dataPoints, resource, settings, promName); err != nil {
errs = multierr.Append(errs, err)
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return
}
}
}
case pmetric.MetricTypeExponentialHistogram:
dataPoints := metric.ExponentialHistogram().DataPoints()
if dataPoints.Len() == 0 {
errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name()))
break
}
ws, err := c.addExponentialHistogramDataPoints(
ctx,
dataPoints,
resource,
settings,
promName,
Add primitive support for ingesting OTLP delta metrics as-is (#16360) * Add simple delta support Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Rename delta2cumulative part Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Whoops bad refactor Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Add example yml Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Feature flag instead and histogram hint handling Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Delete otel_delta.yml - outdated Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Renaming to native delta support Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Add more explanatory comments Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Add more explanation to histograms Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Correct comment on d2c consumer Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Add tests for counters and fix bug Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Add histogram tests Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Add docs Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Sort series to make test deterministic Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * More formatting Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Change flag name to ingestion Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Explain where rate calculation can go wrong Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Add warning about duplicate timestamps Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Update docs/feature_flags.md Co-authored-by: Arthur Silva Sens <arthursens2005@gmail.com> Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Fix tests Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Remove unnecessary if Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Add warning to d2c section Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Make unknown type error when getting temporality Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Correct type comment - not planning to add delta metric metadata type Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Remove unused param for empty type Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Rewrite temporality logic to be clearer Signed-off-by: Fiona Liao <fiona.liao@grafana.com> * Change spurious to unnecessary - better description Signed-off-by: Fiona Liao <fiona.liao@grafana.com> --------- Signed-off-by: Fiona Liao <fiona.liao@grafana.com> Co-authored-by: Arthur Silva Sens <arthursens2005@gmail.com>
2025-04-23 20:58:02 +08:00
temporality,
)
annots.Merge(ws)
if err != nil {
errs = multierr.Append(errs, err)
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return
}
}
case pmetric.MetricTypeSummary:
dataPoints := metric.Summary().DataPoints()
if dataPoints.Len() == 0 {
errs = multierr.Append(errs, fmt.Errorf("empty data points. %s is dropped", metric.Name()))
break
}
if err := c.addSummaryDataPoints(ctx, dataPoints, resource, settings, promName); err != nil {
errs = multierr.Append(errs, err)
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return
}
}
default:
errs = multierr.Append(errs, errors.New("unsupported metric type"))
}
}
}
addResourceTargetInfo(resource, settings, mostRecentTimestamp, c)
}
return annots, errs
}
func isSameMetric(ts *prompb.TimeSeries, lbls []prompb.Label) bool {
if len(ts.Labels) != len(lbls) {
return false
}
for i, l := range ts.Labels {
if l.Name != ts.Labels[i].Name || l.Value != ts.Labels[i].Value {
return false
}
}
return true
}
// addExemplars adds exemplars for the dataPoint. For each exemplar, if it can find a bucket bound corresponding to its value,
// the exemplar is added to the bucket bound's time series, provided that the time series' has samples.
func (c *PrometheusConverter) addExemplars(ctx context.Context, dataPoint pmetric.HistogramDataPoint, bucketBounds []bucketBoundsData) error {
if len(bucketBounds) == 0 {
return nil
}
exemplars, err := getPromExemplars(ctx, &c.everyN, dataPoint)
if err != nil {
return err
}
if len(exemplars) == 0 {
return nil
}
sort.Sort(byBucketBoundsData(bucketBounds))
for _, exemplar := range exemplars {
for _, bound := range bucketBounds {
if err := c.everyN.checkContext(ctx); err != nil {
return err
}
if len(bound.ts.Samples) > 0 && exemplar.Value <= bound.bound {
bound.ts.Exemplars = append(bound.ts.Exemplars, exemplar)
break
}
}
}
return nil
}
// addSample finds a TimeSeries that corresponds to lbls, and adds sample to it.
// If there is no corresponding TimeSeries already, it's created.
// The corresponding TimeSeries is returned.
// If either lbls is nil/empty or sample is nil, nothing is done.
func (c *PrometheusConverter) addSample(sample *prompb.Sample, lbls []prompb.Label) *prompb.TimeSeries {
if sample == nil || len(lbls) == 0 {
// This shouldn't happen
return nil
}
ts, _ := c.getOrCreateTimeSeries(lbls)
ts.Samples = append(ts.Samples, *sample)
return ts
}
func NewPromoteResourceAttributes(otlpCfg config.OTLPConfig) *PromoteResourceAttributes {
attrs := otlpCfg.PromoteResourceAttributes
if otlpCfg.PromoteAllResourceAttributes {
attrs = otlpCfg.IgnoreResourceAttributes
}
attrsMap := make(map[string]struct{}, len(attrs))
for _, s := range attrs {
attrsMap[s] = struct{}{}
}
return &PromoteResourceAttributes{
promoteAll: otlpCfg.PromoteAllResourceAttributes,
attrs: attrsMap,
}
}
// promotedAttributes returns labels for promoted resourceAttributes.
func (s *PromoteResourceAttributes) promotedAttributes(resourceAttributes pcommon.Map) []prompb.Label {
if s == nil {
return nil
}
var promotedAttrs []prompb.Label
if s.promoteAll {
promotedAttrs = make([]prompb.Label, 0, resourceAttributes.Len())
resourceAttributes.Range(func(name string, value pcommon.Value) bool {
if _, exists := s.attrs[name]; !exists {
promotedAttrs = append(promotedAttrs, prompb.Label{Name: name, Value: value.AsString()})
}
return true
})
} else {
promotedAttrs = make([]prompb.Label, 0, len(s.attrs))
resourceAttributes.Range(func(name string, value pcommon.Value) bool {
if _, exists := s.attrs[name]; exists {
promotedAttrs = append(promotedAttrs, prompb.Label{Name: name, Value: value.AsString()})
}
return true
})
}
sort.Stable(ByLabelName(promotedAttrs))
return promotedAttrs
}