mirror of https://github.com/grafana/grafana.git
unified-storage: Skip query when ListModifiedSince cannot return anything. (#110338)
* Skip query when ListModifiedSince cannot return anything. * Only save listRV if it's different than sinceRV. This saves a disk access if not needed. * Add test for ListModifiedSince with same RV.
This commit is contained in:
parent
e3f5a65372
commit
3a3ba483b1
|
@ -1273,7 +1273,7 @@ func (b *bleveIndex) updateIndexWithLatestModifications(ctx context.Context, req
|
|||
|
||||
startTime := time.Now()
|
||||
listRV, docs, err := b.updaterFn(ctx, b, sinceRV)
|
||||
if err == nil && listRV > 0 {
|
||||
if err == nil && listRV > 0 && listRV != sinceRV {
|
||||
err = b.updateResourceVersion(listRV) // updates b.resourceVersion
|
||||
}
|
||||
|
||||
|
|
|
@ -11,7 +11,6 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/go-sql-driver/mysql"
|
||||
"github.com/grafana/grafana/pkg/util/sqlite"
|
||||
"github.com/jackc/pgx/v5/pgconn"
|
||||
"github.com/lib/pq"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
|
@ -20,6 +19,8 @@ import (
|
|||
"google.golang.org/protobuf/proto"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
|
||||
"github.com/grafana/grafana/pkg/util/sqlite"
|
||||
|
||||
"github.com/grafana/grafana-app-sdk/logging"
|
||||
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resource"
|
||||
|
@ -642,21 +643,34 @@ func (b *backend) ListModifiedSince(ctx context.Context, key resource.Namespaced
|
|||
}
|
||||
}
|
||||
|
||||
rollbackOnDefer := true
|
||||
defer func() {
|
||||
if rollbackOnDefer {
|
||||
if terr := tx.Rollback(); terr != nil {
|
||||
b.log.Warn("Error rolling back transaction in ListModifiedSince", "error", terr)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Fetch latest RV within the transaction
|
||||
latestRv, err := b.fetchLatestRV(ctx, tx, b.dialect, key.Group, key.Resource)
|
||||
if err != nil {
|
||||
terr := tx.Rollback()
|
||||
if terr != nil {
|
||||
b.log.Warn("Error rolling back transaction in ListModifiedSince", "error", terr)
|
||||
}
|
||||
return 0, func(yield func(*resource.ModifiedResource, error) bool) {
|
||||
yield(nil, err)
|
||||
}
|
||||
}
|
||||
|
||||
// If latest RV is the same as request RV, there's nothing to report, and we can avoid running another query.
|
||||
if latestRv == sinceRv {
|
||||
return 0, func(yield func(*resource.ModifiedResource, error) bool) { /* nothing to return */ }
|
||||
}
|
||||
|
||||
// since results are sorted by name ASC and rv DESC, we can get away with tracking the last seen
|
||||
lastSeen := ""
|
||||
|
||||
// We will rollback after iteration has finished.
|
||||
rollbackOnDefer = false
|
||||
|
||||
// rollback transaction if iterator not called within 30 seconds
|
||||
rollbackTimer := time.AfterFunc(30*time.Second, func() {
|
||||
if err := tx.Rollback(); err != nil && !errors.Is(err, sql.ErrTxDone) {
|
||||
|
|
|
@ -3,6 +3,7 @@ package test
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"iter"
|
||||
"net/http"
|
||||
"slices"
|
||||
"strings"
|
||||
|
@ -523,11 +524,22 @@ func runTestIntegrationBackendListModifiedSince(t *testing.T, backend resource.S
|
|||
latestRv, seq := backend.ListModifiedSince(ctx, key, rvDeleted)
|
||||
require.GreaterOrEqual(t, latestRv, rvDeleted)
|
||||
|
||||
counter := 0
|
||||
for range seq {
|
||||
counter++
|
||||
isEmpty(t, seq)
|
||||
})
|
||||
|
||||
t.Run("no events for subsequent listModifiedSince calls", func(t *testing.T) {
|
||||
key := resource.NamespacedResource{
|
||||
Namespace: ns,
|
||||
Group: "group",
|
||||
Resource: "resource",
|
||||
}
|
||||
require.Equal(t, 0, counter) // no events should be returned
|
||||
latestRv1, seq := backend.ListModifiedSince(ctx, key, rvDeleted)
|
||||
require.GreaterOrEqual(t, latestRv1, rvDeleted)
|
||||
isEmpty(t, seq)
|
||||
|
||||
latestRv2, seq := backend.ListModifiedSince(ctx, key, latestRv1)
|
||||
require.GreaterOrEqual(t, latestRv1, latestRv2)
|
||||
isEmpty(t, seq)
|
||||
})
|
||||
|
||||
t.Run("will only return modified events for the given key", func(t *testing.T) {
|
||||
|
@ -582,6 +594,14 @@ func runTestIntegrationBackendListModifiedSince(t *testing.T, backend resource.S
|
|||
})
|
||||
}
|
||||
|
||||
func isEmpty(t *testing.T, seq iter.Seq2[*resource.ModifiedResource, error]) {
|
||||
counter := 0
|
||||
for range seq {
|
||||
counter++
|
||||
}
|
||||
require.Equal(t, 0, counter)
|
||||
}
|
||||
|
||||
func runTestIntegrationBackendListHistory(t *testing.T, backend resource.StorageBackend, nsPrefix string) {
|
||||
ctx := testutil.NewTestContext(t, time.Now().Add(30*time.Second))
|
||||
server := newServer(t, backend)
|
||||
|
|
Loading…
Reference in New Issue