grafana/pkg/storage/unified/resource/search_client.go

241 lines
7.7 KiB
Go

package resource
import (
"context"
"fmt"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"google.golang.org/grpc"
"k8s.io/apimachinery/pkg/runtime/schema"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/storage/legacysql/dualwrite"
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
)
const (
// backgroundRequestTimeout is the timeout for background shadow traffic requests
backgroundRequestTimeout = 500 * time.Millisecond
)
var (
// searchResultsMatchHistogram tracks the percentage match between legacy and unified search results
searchResultsMatchHistogram = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "grafana",
Subsystem: "unified_storage",
Name: "search_results_match_percentage",
Help: "Native histogram of percentage match between legacy and unified search results",
NativeHistogramBucketFactor: 1.1,
NativeHistogramMaxBucketNumber: 100,
},
[]string{"resource_type"},
)
)
type DualWriter interface {
IsEnabled(schema.GroupResource) bool
ReadFromUnified(context.Context, schema.GroupResource) (bool, error)
Status(ctx context.Context, gr schema.GroupResource) (dualwrite.StorageStatus, error)
}
func NewSearchClient(dual DualWriter, gr schema.GroupResource, unifiedClient resourcepb.ResourceIndexClient,
legacyClient resourcepb.ResourceIndexClient, features featuremgmt.FeatureToggles) resourcepb.ResourceIndexClient {
return &searchWrapper{
dual: dual,
groupResource: gr,
unifiedClient: unifiedClient,
legacyClient: legacyClient,
features: features,
logger: log.New("unified-storage.search-client"),
}
}
type searchWrapper struct {
dual DualWriter
groupResource schema.GroupResource
unifiedClient resourcepb.ResourceIndexClient
legacyClient resourcepb.ResourceIndexClient
features featuremgmt.FeatureToggles
logger log.Logger
}
// extractUIDs extracts unique UIDs from search response results
func extractUIDs(response *resourcepb.ResourceSearchResponse) map[string]struct{} {
uids := make(map[string]struct{})
if response == nil || response.Results == nil || response.Results.Rows == nil {
return uids
}
for _, row := range response.Results.Rows {
if row.Key != nil && row.Key.Name != "" {
uids[row.Key.Name] = struct{}{}
}
}
return uids
}
// calculateMatchPercentage calculates recall: what percentage of legacy results were also found by unified search
func calculateMatchPercentage(legacyUIDs, unifiedUIDs map[string]struct{}) float64 {
if len(legacyUIDs) == 0 && len(unifiedUIDs) == 0 {
return 100.0 // Both empty, consider as 100% match
}
if len(legacyUIDs) == 0 || len(unifiedUIDs) == 0 {
return 0.0 // One empty, other not
}
// Count matches: how many legacy results did unified also return?
matches := 0
for uid := range legacyUIDs {
if _, exists := unifiedUIDs[uid]; exists {
matches++
}
}
// Calculate recall: percentage of legacy results that unified also returned
// Legacy is the source of truth, so we use it as the denominator
return float64(matches) / float64(len(legacyUIDs)) * 100.0
}
// If dual reader feature flag is enabled, and legacy is the main storage,
// and we are writing to unified (which means we are effectively dual writing),
// then make a background call to unified
func shouldMakeBackgroundCall(ctx context.Context, features featuremgmt.FeatureToggles, dual DualWriter, gr schema.GroupResource) (bool, error) {
unifiedIsMainStorage, err := dual.ReadFromUnified(ctx, gr)
if err != nil {
return false, err
}
status, err := dual.Status(ctx, gr)
if err != nil {
return false, err
}
res := features != nil &&
features.IsEnabledGlobally(featuremgmt.FlagUnifiedStorageSearchDualReaderEnabled) &&
!unifiedIsMainStorage &&
status.WriteUnified
return res, nil
}
func (s *searchWrapper) GetStats(ctx context.Context, in *resourcepb.ResourceStatsRequest,
opts ...grpc.CallOption) (*resourcepb.ResourceStatsResponse, error) {
client := s.legacyClient
unified, err := s.dual.ReadFromUnified(ctx, s.groupResource)
if err != nil {
return nil, err
}
if unified {
client = s.unifiedClient
}
makeBackgroundCall, err := shouldMakeBackgroundCall(ctx, s.features, s.dual, s.groupResource)
if err != nil {
return nil, err
}
if makeBackgroundCall {
// Create background context with timeout but ignore parent cancelation
ctxBg := context.WithoutCancel(ctx)
// Make background call without blocking the main request
go func() {
ctxBgWithTimeout, cancel := context.WithTimeout(ctxBg, backgroundRequestTimeout)
defer cancel() // Ensure we clean up the context
_, bgErr := s.unifiedClient.GetStats(ctxBgWithTimeout, in, opts...)
if bgErr != nil {
s.logger.Error("Background GetStats call to unified failed", "error", bgErr, "timeout", backgroundRequestTimeout)
} else {
s.logger.Debug("Background GetStats call to unified succeeded")
}
}()
}
return client.GetStats(ctx, in, opts...)
}
func (s *searchWrapper) Search(ctx context.Context, in *resourcepb.ResourceSearchRequest,
opts ...grpc.CallOption) (*resourcepb.ResourceSearchResponse, error) {
client := s.legacyClient
unified, err := s.dual.ReadFromUnified(ctx, s.groupResource)
if err != nil {
return nil, err
}
if unified {
client = s.unifiedClient
}
makeBackgroundCall, err := shouldMakeBackgroundCall(ctx, s.features, s.dual, s.groupResource)
if err != nil {
return nil, err
}
if makeBackgroundCall {
// Get the legacy result first
legacyResponse, legacyErr := s.legacyClient.Search(ctx, in, opts...)
if legacyErr != nil {
return nil, legacyErr
}
// Create background context with timeout but ignore parent cancelation
ctxBg := context.WithoutCancel(ctx)
// Make background call and compare results
go func() {
ctxBgWithTimeout, cancel := context.WithTimeout(ctxBg, backgroundRequestTimeout)
defer cancel() // Ensure we clean up the context
unifiedResponse, bgErr := s.unifiedClient.Search(ctxBgWithTimeout, in, opts...)
if bgErr != nil {
s.logger.Error("Background Search call to unified failed", "error", bgErr, "timeout", backgroundRequestTimeout)
} else {
s.logger.Debug("Background Search call to unified succeeded")
// Compare results when both are successful
var requestKey *resourcepb.ResourceKey
if in.Options != nil {
requestKey = in.Options.Key
}
s.compareSearchResults(legacyResponse, unifiedResponse, requestKey)
}
}()
return legacyResponse, nil
}
return client.Search(ctx, in, opts...)
}
// compareSearchResults compares legacy and unified search results and logs/metrics the outcome
func (s *searchWrapper) compareSearchResults(legacyResponse, unifiedResponse *resourcepb.ResourceSearchResponse, requestKey *resourcepb.ResourceKey) {
if legacyResponse == nil || unifiedResponse == nil {
return
}
legacyUIDs := extractUIDs(legacyResponse)
unifiedUIDs := extractUIDs(unifiedResponse)
matchPercentage := calculateMatchPercentage(legacyUIDs, unifiedUIDs)
// Determine resource type for labeling - handle nil safely
resourceType := "unknown"
if requestKey != nil && requestKey.Resource != "" {
resourceType = requestKey.Resource
}
s.logger.Debug("Search results comparison completed",
"resource_type", resourceType,
"legacy_count", len(legacyUIDs),
"unified_count", len(unifiedUIDs),
"match_percentage", fmt.Sprintf("%.1f%%", matchPercentage),
"legacy_total_hits", legacyResponse.TotalHits,
"unified_total_hits", unifiedResponse.TotalHits,
)
searchResultsMatchHistogram.WithLabelValues(resourceType).Observe(matchPercentage)
}