feat(storage): add new CombinedAppender interface and compatibility layer
Signed-off-by: bwplotka <bwplotka@gmail.com>
This commit is contained in:
parent
c808a71e18
commit
2fb680a229
|
@ -21,3 +21,7 @@ type Metadata struct {
|
||||||
Unit string `json:"unit"`
|
Unit string `json:"unit"`
|
||||||
Help string `json:"help"`
|
Help string `json:"help"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m Metadata) IsEmpty() bool {
|
||||||
|
return (m.Type == "" || m.Type == model.MetricTypeUnknown) && m.Unit == "" && m.Help == ""
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,147 @@
|
||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/prometheus/prometheus/model/exemplar"
|
||||||
|
"github.com/prometheus/prometheus/model/histogram"
|
||||||
|
"github.com/prometheus/prometheus/model/labels"
|
||||||
|
"github.com/prometheus/prometheus/model/metadata"
|
||||||
|
)
|
||||||
|
|
||||||
|
// AsAppender exposes new combined appender implementation as old appender.
|
||||||
|
//
|
||||||
|
// Because old appender interface was not combined, you need extra appender pieces
|
||||||
|
// to handle old behaviour of disconnected exemplars, metadata and sample cts.
|
||||||
|
func AsAppender(appender CombinedAppender, exemplar ExemplarAppender, metadata MetadataUpdater, ct CreatedTimestampAppender) Appender {
|
||||||
|
return &asAppender{app: appender, exemplar: exemplar, metadata: metadata, ct: ct}
|
||||||
|
}
|
||||||
|
|
||||||
|
type asAppender struct {
|
||||||
|
opts CombinedAppendOptions
|
||||||
|
app CombinedAppender
|
||||||
|
|
||||||
|
exemplar ExemplarAppender
|
||||||
|
metadata MetadataUpdater
|
||||||
|
ct CreatedTimestampAppender
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *asAppender) Append(ref SeriesRef, l labels.Labels, t int64, v float64) (SeriesRef, error) {
|
||||||
|
return a.app.AppendSample(ref, l, metadata.Metadata{}, 0, t, v, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *asAppender) AppendHistogram(ref SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (SeriesRef, error) {
|
||||||
|
return a.app.AppendHistogram(ref, l, metadata.Metadata{}, 0, t, h, fh, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *asAppender) Commit() error {
|
||||||
|
return a.app.Commit()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *asAppender) Rollback() error {
|
||||||
|
return a.app.Rollback()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *asAppender) SetOptions(opts *AppendOptions) {
|
||||||
|
if opts == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
a.opts.DiscardOutOfOrder = opts.DiscardOutOfOrder
|
||||||
|
a.app.SetOptions(&a.opts)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *asAppender) AppendCTZeroSample(ref SeriesRef, l labels.Labels, t int64, ct int64) (SeriesRef, error) {
|
||||||
|
return a.ct.AppendCTZeroSample(ref, l, t, ct)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *asAppender) AppendHistogramCTZeroSample(ref SeriesRef, l labels.Labels, t int64, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (SeriesRef, error) {
|
||||||
|
a.opts.AppendCTAsZero = true
|
||||||
|
a.app.SetOptions(&a.opts)
|
||||||
|
return a.app.AppendHistogram(ref, l, metadata.Metadata{}, ct, t, h, fh, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *asAppender) AppendExemplar(ref SeriesRef, l labels.Labels, e exemplar.Exemplar) (SeriesRef, error) {
|
||||||
|
return a.exemplar.AppendExemplar(ref, l, e)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *asAppender) UpdateMetadata(ref SeriesRef, l labels.Labels, m metadata.Metadata) (SeriesRef, error) {
|
||||||
|
return a.metadata.UpdateMetadata(ref, l, m)
|
||||||
|
}
|
||||||
|
|
||||||
|
// AsCombinedAppender exposes old appender implementation as a new combined appender.
|
||||||
|
func AsCombinedAppender(appender Appender) CombinedAppender {
|
||||||
|
return &asCombinedAppender{app: appender}
|
||||||
|
}
|
||||||
|
|
||||||
|
type asCombinedAppender struct {
|
||||||
|
app Appender
|
||||||
|
opts CombinedAppendOptions
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *asCombinedAppender) AppendSample(ref SeriesRef, ls labels.Labels, meta metadata.Metadata, ct, t int64, v float64, es []exemplar.Exemplar) (_ SeriesRef, err error) {
|
||||||
|
if ct != 0 && a.opts.AppendCTAsZero {
|
||||||
|
ref, err = a.app.AppendCTZeroSample(ref, ls, t, ct)
|
||||||
|
if err != nil {
|
||||||
|
return ref, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ref, err = a.app.Append(ref, ls, t, v)
|
||||||
|
if err != nil {
|
||||||
|
return ref, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check feature flag?
|
||||||
|
for _, e := range es {
|
||||||
|
ref, err = a.app.AppendExemplar(ref, ls, e)
|
||||||
|
if err != nil {
|
||||||
|
return ref, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Check feature flag?
|
||||||
|
if !meta.IsEmpty() {
|
||||||
|
ref, err = a.app.UpdateMetadata(ref, ls, meta)
|
||||||
|
if err != nil {
|
||||||
|
return ref, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ref, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *asCombinedAppender) AppendHistogram(ref SeriesRef, ls labels.Labels, meta metadata.Metadata, ct, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram, es []exemplar.Exemplar) (_ SeriesRef, err error) {
|
||||||
|
if ct != 0 && a.opts.AppendCTAsZero {
|
||||||
|
ref, err = a.app.AppendHistogramCTZeroSample(ref, ls, t, ct, h, fh)
|
||||||
|
if err != nil {
|
||||||
|
return ref, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ref, err = a.app.AppendHistogram(ref, ls, t, h, fh)
|
||||||
|
if err != nil {
|
||||||
|
return ref, err
|
||||||
|
}
|
||||||
|
// Check feature flag?
|
||||||
|
for _, e := range es {
|
||||||
|
ref, err = a.app.AppendExemplar(ref, ls, e)
|
||||||
|
if err != nil {
|
||||||
|
return ref, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Check feature flag?
|
||||||
|
if !meta.IsEmpty() {
|
||||||
|
ref, err = a.app.UpdateMetadata(ref, ls, meta)
|
||||||
|
if err != nil {
|
||||||
|
return ref, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ref, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *asCombinedAppender) Commit() error { return a.app.Commit() }
|
||||||
|
|
||||||
|
func (a *asCombinedAppender) Rollback() error { return a.app.Rollback() }
|
||||||
|
|
||||||
|
func (a *asCombinedAppender) SetOptions(opts *CombinedAppendOptions) {
|
||||||
|
if opts == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
a.opts = *opts
|
||||||
|
a.app.SetOptions(&AppendOptions{DiscardOutOfOrder: opts.DiscardOutOfOrder})
|
||||||
|
}
|
|
@ -255,6 +255,42 @@ func (f QueryableFunc) Querier(mint, maxt int64) (Querier, error) {
|
||||||
return f(mint, maxt)
|
return f(mint, maxt)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type CombinedAppendOptions struct {
|
||||||
|
DiscardOutOfOrder bool
|
||||||
|
|
||||||
|
// AppendCTAsZero ensures that CT is appended as fake zero sample on all append methods.
|
||||||
|
AppendCTAsZero bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// CombinedAppender provides batched appends against a storage.
|
||||||
|
// It must be completed with a call to Commit or Rollback and must not be reused afterwards.
|
||||||
|
//
|
||||||
|
// Operations on the Appender interface are not goroutine-safe.
|
||||||
|
//
|
||||||
|
// The type of samples (float64, histogram, etc) appended for a given series must remain same within an Appender.
|
||||||
|
// The behaviour is undefined if samples of different types are appended to the same series in a single Commit().
|
||||||
|
type CombinedAppender interface {
|
||||||
|
// AppendSample appends a sample and related exemplars, metadata, and created timestamp to the storage.
|
||||||
|
AppendSample(ref SeriesRef, ls labels.Labels, meta metadata.Metadata, ct, t int64, v float64, es []exemplar.Exemplar) (SeriesRef, error)
|
||||||
|
|
||||||
|
// AppendHistogram appends a histogram and related exemplars, metadata, and created timestamp to the storage.
|
||||||
|
AppendHistogram(ref SeriesRef, ls labels.Labels, meta metadata.Metadata, ct, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram, es []exemplar.Exemplar) (SeriesRef, error)
|
||||||
|
|
||||||
|
// Commit submits the collected samples and purges the batch. If Commit
|
||||||
|
// returns a non-nil error, it also rolls back all modifications made in
|
||||||
|
// the appender so far, as Rollback would do. In any case, an Appender
|
||||||
|
// must not be used anymore after Commit has been called.
|
||||||
|
Commit() error
|
||||||
|
|
||||||
|
// Rollback rolls back all modifications made in the appender so far.
|
||||||
|
// Appender has to be discarded after rollback.
|
||||||
|
Rollback() error
|
||||||
|
|
||||||
|
// SetOptions configures the appender with specific append options such as
|
||||||
|
// discarding out-of-order samples even if out-of-order is enabled in the TSDB.
|
||||||
|
SetOptions(opts *CombinedAppendOptions)
|
||||||
|
}
|
||||||
|
|
||||||
type AppendOptions struct {
|
type AppendOptions struct {
|
||||||
DiscardOutOfOrder bool
|
DiscardOutOfOrder bool
|
||||||
}
|
}
|
||||||
|
@ -266,6 +302,8 @@ type AppendOptions struct {
|
||||||
//
|
//
|
||||||
// The type of samples (float64, histogram, etc) appended for a given series must remain same within an Appender.
|
// The type of samples (float64, histogram, etc) appended for a given series must remain same within an Appender.
|
||||||
// The behaviour is undefined if samples of different types are appended to the same series in a single Commit().
|
// The behaviour is undefined if samples of different types are appended to the same series in a single Commit().
|
||||||
|
//
|
||||||
|
// DEPRECATED: Use CombinedAppender instead, this interface and TSDB support for it will be removed at one point.
|
||||||
type Appender interface {
|
type Appender interface {
|
||||||
// Append adds a sample pair for the given series.
|
// Append adds a sample pair for the given series.
|
||||||
// An optional series reference can be provided to accelerate calls.
|
// An optional series reference can be provided to accelerate calls.
|
||||||
|
@ -300,7 +338,7 @@ type Appender interface {
|
||||||
// GetRef is an extra interface on Appenders used by downstream projects
|
// GetRef is an extra interface on Appenders used by downstream projects
|
||||||
// (e.g. Cortex) to avoid maintaining a parallel set of references.
|
// (e.g. Cortex) to avoid maintaining a parallel set of references.
|
||||||
type GetRef interface {
|
type GetRef interface {
|
||||||
// Returns reference number that can be used to pass to Appender.Append(),
|
// GetRef returns reference number that can be used to pass to Appender.Append(),
|
||||||
// and a set of labels that will not cause another copy when passed to Appender.Append().
|
// and a set of labels that will not cause another copy when passed to Appender.Append().
|
||||||
// 0 means the appender does not have a reference to this series.
|
// 0 means the appender does not have a reference to this series.
|
||||||
// hash should be a hash of lset.
|
// hash should be a hash of lset.
|
||||||
|
|
Loading…
Reference in New Issue