mirror of https://github.com/grafana/grafana.git
				
				
				
			Unified Storage: Improve observability for indexing latency (#99700)
* adds extra debug logs and a new metric for poller query latency to help us better understand watch and index latency for write events * adds trace span to the index for handling index write events
This commit is contained in:
		
							parent
							
								
									dddfce2df7
								
							
						
					
					
						commit
						3ba0d8d4b5
					
				|  | @ -15,6 +15,7 @@ var ( | |||
| 
 | ||||
| type StorageApiMetrics struct { | ||||
| 	WatchEventLatency *prometheus.HistogramVec | ||||
| 	PollerLatency     prometheus.Histogram | ||||
| } | ||||
| 
 | ||||
| func NewStorageMetrics() *StorageApiMetrics { | ||||
|  | @ -29,6 +30,15 @@ func NewStorageMetrics() *StorageApiMetrics { | |||
| 				NativeHistogramMaxBucketNumber:  160, | ||||
| 				NativeHistogramMinResetDuration: time.Hour, | ||||
| 			}, []string{"resource"}), | ||||
| 			PollerLatency: prometheus.NewHistogram(prometheus.HistogramOpts{ | ||||
| 				Namespace:                       "storage_server", | ||||
| 				Name:                            "poller_query_latency_seconds", | ||||
| 				Help:                            "poller query latency", | ||||
| 				Buckets:                         instrument.DefBuckets, | ||||
| 				NativeHistogramBucketFactor:     1.1, // enable native histograms
 | ||||
| 				NativeHistogramMaxBucketNumber:  160, | ||||
| 				NativeHistogramMinResetDuration: time.Hour, | ||||
| 			}), | ||||
| 		} | ||||
| 	}) | ||||
| 
 | ||||
|  | @ -37,8 +47,10 @@ func NewStorageMetrics() *StorageApiMetrics { | |||
| 
 | ||||
| func (s *StorageApiMetrics) Collect(ch chan<- prometheus.Metric) { | ||||
| 	s.WatchEventLatency.Collect(ch) | ||||
| 	s.PollerLatency.Collect(ch) | ||||
| } | ||||
| 
 | ||||
| func (s *StorageApiMetrics) Describe(ch chan<- *prometheus.Desc) { | ||||
| 	s.WatchEventLatency.Describe(ch) | ||||
| 	s.PollerLatency.Describe(ch) | ||||
| } | ||||
|  |  | |||
|  | @ -394,10 +394,19 @@ func (s *searchSupport) init(ctx context.Context) error { | |||
| 
 | ||||
| // Async event
 | ||||
| func (s *searchSupport) handleEvent(ctx context.Context, evt *WrittenEvent) { | ||||
| 	ctx, span := s.tracer.Start(ctx, tracingPrexfixSearch+"HandleEvent") | ||||
| 	if !slices.Contains([]WatchEvent_Type{WatchEvent_ADDED, WatchEvent_MODIFIED, WatchEvent_DELETED}, evt.Type) { | ||||
| 		s.log.Info("ignoring watch event", "type", evt.Type) | ||||
| 		return | ||||
| 	} | ||||
| 	defer span.End() | ||||
| 	span.SetAttributes( | ||||
| 		attribute.String("event_type", evt.Type.String()), | ||||
| 		attribute.String("namespace", evt.Key.Namespace), | ||||
| 		attribute.String("group", evt.Key.Group), | ||||
| 		attribute.String("resource", evt.Key.Resource), | ||||
| 		attribute.String("name", evt.Key.Name), | ||||
| 	) | ||||
| 
 | ||||
| 	nsr := NamespacedResource{ | ||||
| 		Namespace: evt.Key.Namespace, | ||||
|  | @ -447,7 +456,9 @@ func (s *searchSupport) handleEvent(ctx context.Context, evt *WrittenEvent) { | |||
| 
 | ||||
| 	// record latency from when event was created to when it was indexed
 | ||||
| 	latencySeconds := float64(time.Now().UnixMicro()-evt.ResourceVersion) / 1e6 | ||||
| 	span.AddEvent("index latency", trace.WithAttributes(attribute.Float64("latency_seconds", latencySeconds))) | ||||
| 	if latencySeconds > 5 { | ||||
| 		s.log.Debug("high index latency object details", "resource", evt.Key.Resource, "latency_seconds", latencySeconds, "name", evt.Object.GetName(), "namespace", evt.Object.GetNamespace(), "uid", evt.Object.GetUID()) | ||||
| 		s.log.Warn("high index latency", "latency", latencySeconds) | ||||
| 	} | ||||
| 	if IndexMetrics != nil { | ||||
|  |  | |||
|  | @ -805,6 +805,8 @@ func fetchLatestRV(ctx context.Context, x db.ContextExecer, d sqltemplate.Dialec | |||
| func (b *backend) poll(ctx context.Context, grp string, res string, since int64, stream chan<- *resource.WrittenEvent) (int64, error) { | ||||
| 	ctx, span := b.tracer.Start(ctx, tracePrefix+"poll") | ||||
| 	defer span.End() | ||||
| 
 | ||||
| 	start := time.Now() | ||||
| 	var records []*historyPollResponse | ||||
| 	err := b.db.WithTx(ctx, ReadCommittedRO, func(ctx context.Context, tx db.Tx) error { | ||||
| 		var err error | ||||
|  | @ -820,6 +822,8 @@ func (b *backend) poll(ctx context.Context, grp string, res string, since int64, | |||
| 	if err != nil { | ||||
| 		return 0, fmt.Errorf("poll history: %w", err) | ||||
| 	} | ||||
| 	end := time.Now() | ||||
| 	resource.NewStorageMetrics().PollerLatency.Observe(end.Sub(start).Seconds()) | ||||
| 
 | ||||
| 	var nextRV int64 | ||||
| 	for _, rec := range records { | ||||
|  | @ -847,6 +851,7 @@ func (b *backend) poll(ctx context.Context, grp string, res string, since int64, | |||
| 			ResourceVersion: rec.ResourceVersion, | ||||
| 			// Timestamp:  , // TODO: add timestamp
 | ||||
| 		} | ||||
| 		b.log.Debug("poller sent event to stream", "namespace", rec.Key.Namespace, "group", rec.Key.Group, "resource", rec.Key.Resource, "name", rec.Key.Name, "action", rec.Action, "rv", rec.ResourceVersion) | ||||
| 	} | ||||
| 
 | ||||
| 	return nextRV, nil | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue