mirror of https://github.com/grafana/grafana.git
241 lines
7.7 KiB
Go
241 lines
7.7 KiB
Go
package resource
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"github.com/prometheus/client_golang/prometheus/promauto"
|
|
"google.golang.org/grpc"
|
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
|
|
|
"github.com/grafana/grafana/pkg/infra/log"
|
|
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
|
"github.com/grafana/grafana/pkg/storage/legacysql/dualwrite"
|
|
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
|
)
|
|
|
|
const (
|
|
// backgroundRequestTimeout is the timeout for background shadow traffic requests
|
|
backgroundRequestTimeout = 500 * time.Millisecond
|
|
)
|
|
|
|
var (
|
|
// searchResultsMatchHistogram tracks the percentage match between legacy and unified search results
|
|
searchResultsMatchHistogram = promauto.NewHistogramVec(
|
|
prometheus.HistogramOpts{
|
|
Namespace: "grafana",
|
|
Subsystem: "unified_storage",
|
|
Name: "search_results_match_percentage",
|
|
Help: "Native histogram of percentage match between legacy and unified search results",
|
|
NativeHistogramBucketFactor: 1.1,
|
|
NativeHistogramMaxBucketNumber: 100,
|
|
},
|
|
[]string{"resource_type"},
|
|
)
|
|
)
|
|
|
|
type DualWriter interface {
|
|
IsEnabled(schema.GroupResource) bool
|
|
ReadFromUnified(context.Context, schema.GroupResource) (bool, error)
|
|
Status(ctx context.Context, gr schema.GroupResource) (dualwrite.StorageStatus, error)
|
|
}
|
|
|
|
func NewSearchClient(dual DualWriter, gr schema.GroupResource, unifiedClient resourcepb.ResourceIndexClient,
|
|
legacyClient resourcepb.ResourceIndexClient, features featuremgmt.FeatureToggles) resourcepb.ResourceIndexClient {
|
|
return &searchWrapper{
|
|
dual: dual,
|
|
groupResource: gr,
|
|
unifiedClient: unifiedClient,
|
|
legacyClient: legacyClient,
|
|
features: features,
|
|
logger: log.New("unified-storage.search-client"),
|
|
}
|
|
}
|
|
|
|
type searchWrapper struct {
|
|
dual DualWriter
|
|
groupResource schema.GroupResource
|
|
|
|
unifiedClient resourcepb.ResourceIndexClient
|
|
legacyClient resourcepb.ResourceIndexClient
|
|
features featuremgmt.FeatureToggles
|
|
logger log.Logger
|
|
}
|
|
|
|
// extractUIDs extracts unique UIDs from search response results
|
|
func extractUIDs(response *resourcepb.ResourceSearchResponse) map[string]struct{} {
|
|
uids := make(map[string]struct{})
|
|
if response == nil || response.Results == nil || response.Results.Rows == nil {
|
|
return uids
|
|
}
|
|
|
|
for _, row := range response.Results.Rows {
|
|
if row.Key != nil && row.Key.Name != "" {
|
|
uids[row.Key.Name] = struct{}{}
|
|
}
|
|
}
|
|
return uids
|
|
}
|
|
|
|
// calculateMatchPercentage calculates recall: what percentage of legacy results were also found by unified search
|
|
func calculateMatchPercentage(legacyUIDs, unifiedUIDs map[string]struct{}) float64 {
|
|
if len(legacyUIDs) == 0 && len(unifiedUIDs) == 0 {
|
|
return 100.0 // Both empty, consider as 100% match
|
|
}
|
|
if len(legacyUIDs) == 0 || len(unifiedUIDs) == 0 {
|
|
return 0.0 // One empty, other not
|
|
}
|
|
|
|
// Count matches: how many legacy results did unified also return?
|
|
matches := 0
|
|
for uid := range legacyUIDs {
|
|
if _, exists := unifiedUIDs[uid]; exists {
|
|
matches++
|
|
}
|
|
}
|
|
|
|
// Calculate recall: percentage of legacy results that unified also returned
|
|
// Legacy is the source of truth, so we use it as the denominator
|
|
return float64(matches) / float64(len(legacyUIDs)) * 100.0
|
|
}
|
|
|
|
// If dual reader feature flag is enabled, and legacy is the main storage,
|
|
// and we are writing to unified (which means we are effectively dual writing),
|
|
// then make a background call to unified
|
|
func shouldMakeBackgroundCall(ctx context.Context, features featuremgmt.FeatureToggles, dual DualWriter, gr schema.GroupResource) (bool, error) {
|
|
unifiedIsMainStorage, err := dual.ReadFromUnified(ctx, gr)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
status, err := dual.Status(ctx, gr)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
res := features != nil &&
|
|
features.IsEnabledGlobally(featuremgmt.FlagUnifiedStorageSearchDualReaderEnabled) &&
|
|
!unifiedIsMainStorage &&
|
|
status.WriteUnified
|
|
|
|
return res, nil
|
|
}
|
|
|
|
func (s *searchWrapper) GetStats(ctx context.Context, in *resourcepb.ResourceStatsRequest,
|
|
opts ...grpc.CallOption) (*resourcepb.ResourceStatsResponse, error) {
|
|
client := s.legacyClient
|
|
unified, err := s.dual.ReadFromUnified(ctx, s.groupResource)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if unified {
|
|
client = s.unifiedClient
|
|
}
|
|
|
|
makeBackgroundCall, err := shouldMakeBackgroundCall(ctx, s.features, s.dual, s.groupResource)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if makeBackgroundCall {
|
|
// Create background context with timeout but ignore parent cancelation
|
|
ctxBg := context.WithoutCancel(ctx)
|
|
|
|
// Make background call without blocking the main request
|
|
go func() {
|
|
ctxBgWithTimeout, cancel := context.WithTimeout(ctxBg, backgroundRequestTimeout)
|
|
defer cancel() // Ensure we clean up the context
|
|
_, bgErr := s.unifiedClient.GetStats(ctxBgWithTimeout, in, opts...)
|
|
if bgErr != nil {
|
|
s.logger.Error("Background GetStats call to unified failed", "error", bgErr, "timeout", backgroundRequestTimeout)
|
|
} else {
|
|
s.logger.Debug("Background GetStats call to unified succeeded")
|
|
}
|
|
}()
|
|
}
|
|
|
|
return client.GetStats(ctx, in, opts...)
|
|
}
|
|
|
|
func (s *searchWrapper) Search(ctx context.Context, in *resourcepb.ResourceSearchRequest,
|
|
opts ...grpc.CallOption) (*resourcepb.ResourceSearchResponse, error) {
|
|
client := s.legacyClient
|
|
unified, err := s.dual.ReadFromUnified(ctx, s.groupResource)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if unified {
|
|
client = s.unifiedClient
|
|
}
|
|
|
|
makeBackgroundCall, err := shouldMakeBackgroundCall(ctx, s.features, s.dual, s.groupResource)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if makeBackgroundCall {
|
|
// Get the legacy result first
|
|
legacyResponse, legacyErr := s.legacyClient.Search(ctx, in, opts...)
|
|
if legacyErr != nil {
|
|
return nil, legacyErr
|
|
}
|
|
|
|
// Create background context with timeout but ignore parent cancelation
|
|
ctxBg := context.WithoutCancel(ctx)
|
|
|
|
// Make background call and compare results
|
|
go func() {
|
|
ctxBgWithTimeout, cancel := context.WithTimeout(ctxBg, backgroundRequestTimeout)
|
|
defer cancel() // Ensure we clean up the context
|
|
unifiedResponse, bgErr := s.unifiedClient.Search(ctxBgWithTimeout, in, opts...)
|
|
if bgErr != nil {
|
|
s.logger.Error("Background Search call to unified failed", "error", bgErr, "timeout", backgroundRequestTimeout)
|
|
} else {
|
|
s.logger.Debug("Background Search call to unified succeeded")
|
|
|
|
// Compare results when both are successful
|
|
var requestKey *resourcepb.ResourceKey
|
|
if in.Options != nil {
|
|
requestKey = in.Options.Key
|
|
}
|
|
s.compareSearchResults(legacyResponse, unifiedResponse, requestKey)
|
|
}
|
|
}()
|
|
|
|
return legacyResponse, nil
|
|
}
|
|
|
|
return client.Search(ctx, in, opts...)
|
|
}
|
|
|
|
// compareSearchResults compares legacy and unified search results and logs/metrics the outcome
|
|
func (s *searchWrapper) compareSearchResults(legacyResponse, unifiedResponse *resourcepb.ResourceSearchResponse, requestKey *resourcepb.ResourceKey) {
|
|
if legacyResponse == nil || unifiedResponse == nil {
|
|
return
|
|
}
|
|
|
|
legacyUIDs := extractUIDs(legacyResponse)
|
|
unifiedUIDs := extractUIDs(unifiedResponse)
|
|
|
|
matchPercentage := calculateMatchPercentage(legacyUIDs, unifiedUIDs)
|
|
|
|
// Determine resource type for labeling - handle nil safely
|
|
resourceType := "unknown"
|
|
if requestKey != nil && requestKey.Resource != "" {
|
|
resourceType = requestKey.Resource
|
|
}
|
|
|
|
s.logger.Debug("Search results comparison completed",
|
|
"resource_type", resourceType,
|
|
"legacy_count", len(legacyUIDs),
|
|
"unified_count", len(unifiedUIDs),
|
|
"match_percentage", fmt.Sprintf("%.1f%%", matchPercentage),
|
|
"legacy_total_hits", legacyResponse.TotalHits,
|
|
"unified_total_hits", unifiedResponse.TotalHits,
|
|
)
|
|
|
|
searchResultsMatchHistogram.WithLabelValues(resourceType).Observe(matchPercentage)
|
|
}
|