unistore: refactor get to return a reader (#107951)

This commit is contained in:
Georges Chaudy 2025-07-11 11:10:19 +02:00 committed by GitHub
parent a314b99589
commit ea0ddb3fc9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 48 additions and 65 deletions

View File

@ -239,11 +239,7 @@ func (d *dataStore) Get(ctx context.Context, key DataKey) (io.ReadCloser, error)
return nil, fmt.Errorf("invalid data key: %w", err) return nil, fmt.Errorf("invalid data key: %w", err)
} }
obj, err := d.kv.Get(ctx, dataSection, key.String()) return d.kv.Get(ctx, dataSection, key.String())
if err != nil {
return nil, err
}
return obj.Value, nil
} }
func (d *dataStore) Save(ctx context.Context, key DataKey, value io.Reader) error { func (d *dataStore) Save(ctx context.Context, key DataKey, value io.Reader) error {

View File

@ -147,16 +147,15 @@ func (n *eventStore) Get(ctx context.Context, key EventKey) (Event, error) {
return Event{}, fmt.Errorf("invalid event key: %w", err) return Event{}, fmt.Errorf("invalid event key: %w", err)
} }
obj, err := n.kv.Get(ctx, eventsSection, key.String()) reader, err := n.kv.Get(ctx, eventsSection, key.String())
if err != nil { if err != nil {
return Event{}, err return Event{}, err
} }
defer func() { _ = reader.Close() }()
var event Event var event Event
if err = json.NewDecoder(obj.Value).Decode(&event); err != nil { if err = json.NewDecoder(reader).Decode(&event); err != nil {
_ = obj.Value.Close()
return Event{}, err return Event{}, err
} }
defer func() { _ = obj.Value.Close() }()
return event, nil return event, nil
} }
@ -173,14 +172,16 @@ func (n *eventStore) ListSince(ctx context.Context, sinceRV int64) iter.Seq2[Eve
if err != nil { if err != nil {
return return
} }
obj, err := n.kv.Get(ctx, eventsSection, key) reader, err := n.kv.Get(ctx, eventsSection, key)
if err != nil { if err != nil {
return return
} }
var event Event var event Event
if err := json.NewDecoder(obj.Value).Decode(&event); err != nil { if err := json.NewDecoder(reader).Decode(&event); err != nil {
_ = reader.Close()
return return
} }
_ = reader.Close()
if !yield(event, nil) { if !yield(event, nil) {
return return
} }

View File

@ -29,18 +29,12 @@ type ListOptions struct {
Limit int64 // maximum number of results to return. 0 means no limit. Limit int64 // maximum number of results to return. 0 means no limit.
} }
// KVObject represents a key-value object
type KVObject struct {
Key string // the key of the object within the section
Value io.ReadCloser // the value of the object
}
type KV interface { type KV interface {
// Keys returns all the keys in the store // Keys returns all the keys in the store
Keys(ctx context.Context, section string, opt ListOptions) iter.Seq2[string, error] Keys(ctx context.Context, section string, opt ListOptions) iter.Seq2[string, error]
// Get retrieves a key-value pair from the store // Get retrieves the value for a key from the store
Get(ctx context.Context, section string, key string) (KVObject, error) Get(ctx context.Context, section string, key string) (io.ReadCloser, error)
// Save a new value // Save a new value
Save(ctx context.Context, section string, key string, value io.Reader) error Save(ctx context.Context, section string, key string, value io.Reader) error
@ -67,16 +61,16 @@ func NewBadgerKV(db *badger.DB) *badgerKV {
} }
} }
func (k *badgerKV) Get(ctx context.Context, section string, key string) (KVObject, error) { func (k *badgerKV) Get(ctx context.Context, section string, key string) (io.ReadCloser, error) {
if k.db.IsClosed() { if k.db.IsClosed() {
return KVObject{}, fmt.Errorf("database is closed") return nil, fmt.Errorf("database is closed")
} }
txn := k.db.NewTransaction(false) txn := k.db.NewTransaction(false)
defer txn.Discard() defer txn.Discard()
if section == "" { if section == "" {
return KVObject{}, fmt.Errorf("section is required") return nil, fmt.Errorf("section is required")
} }
key = section + "/" + key key = section + "/" + key
@ -84,24 +78,18 @@ func (k *badgerKV) Get(ctx context.Context, section string, key string) (KVObjec
item, err := txn.Get([]byte(key)) item, err := txn.Get([]byte(key))
if err != nil { if err != nil {
if errors.Is(err, badger.ErrKeyNotFound) { if errors.Is(err, badger.ErrKeyNotFound) {
return KVObject{}, ErrNotFound return nil, ErrNotFound
} }
return KVObject{}, err return nil, err
}
out := KVObject{
Key: string(item.Key())[len(section)+1:],
} }
// Get the value and create a reader from it // Get the value and create a reader from it
value, err := item.ValueCopy(nil) value, err := item.ValueCopy(nil)
if err != nil { if err != nil {
return KVObject{}, err return nil, err
} }
out.Value = io.NopCloser(bytes.NewReader(value)) return io.NopCloser(bytes.NewReader(value)), nil
return out, nil
} }
func (k *badgerKV) Save(ctx context.Context, section string, key string, value io.Reader) error { func (k *badgerKV) Save(ctx context.Context, section string, key string, value io.Reader) error {

View File

@ -116,20 +116,20 @@ func TestBadgerKV_UnderlyingStorage(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
// Verify KV interface returns correct values for each section // Verify KV interface returns correct values for each section
obj1, err := kv.Get(ctx, section1, key) reader1, err := kv.Get(ctx, section1, key)
require.NoError(t, err) require.NoError(t, err)
val1, err := io.ReadAll(obj1.Value) val1, err := io.ReadAll(reader1)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, value1, string(val1)) require.Equal(t, value1, string(val1))
err = obj1.Value.Close() err = reader1.Close()
require.NoError(t, err) require.NoError(t, err)
obj2, err := kv.Get(ctx, section2, key) reader2, err := kv.Get(ctx, section2, key)
require.NoError(t, err) require.NoError(t, err)
val2, err := io.ReadAll(obj2.Value) val2, err := io.ReadAll(reader2)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, value2, string(val2)) require.Equal(t, value2, string(val2))
err = obj2.Value.Close() err = reader2.Close()
require.NoError(t, err) require.NoError(t, err)
}) })

View File

@ -206,15 +206,15 @@ func (d *metadataStore) Get(ctx context.Context, key MetaDataKey) (MetaData, err
return MetaData{}, fmt.Errorf("invalid metadata key: %w", err) return MetaData{}, fmt.Errorf("invalid metadata key: %w", err)
} }
obj, err := d.kv.Get(ctx, metaSection, key.String()) reader, err := d.kv.Get(ctx, metaSection, key.String())
if err != nil { if err != nil {
return MetaData{}, err return MetaData{}, err
} }
defer func() { defer func() {
_ = obj.Value.Close() _ = reader.Close()
}() }()
var meta MetaData var meta MetaData
err = json.NewDecoder(obj.Value).Decode(&meta) err = json.NewDecoder(reader).Decode(&meta)
return meta, err return meta, err
} }

View File

@ -110,11 +110,12 @@ func TestMetadataStore_Save(t *testing.T) {
}) })
require.NoError(t, err) require.NoError(t, err)
// Verify in the kv store that the metadata is saved // Verify in the kv store that the metadata is saved
retrievedObj, err := store.kv.Get(ctx, metaSection, key.String()) reader, err := store.kv.Get(ctx, metaSection, key.String())
require.NoError(t, err) require.NoError(t, err)
assert.Equal(t, key.String(), retrievedObj.Key)
var retrivedMeta MetaData var retrivedMeta MetaData
actualData, err := io.ReadAll(retrievedObj.Value) actualData, err := io.ReadAll(reader)
require.NoError(t, err)
err = reader.Close()
require.NoError(t, err) require.NoError(t, err)
err = json.Unmarshal(actualData, &retrivedMeta) err = json.Unmarshal(actualData, &retrivedMeta)
require.NoError(t, err) require.NoError(t, err)

View File

@ -85,17 +85,16 @@ func runTestKVGet(t *testing.T, kv resource.KV, nsPrefix string) {
require.NoError(t, err) require.NoError(t, err)
// Now get it // Now get it
obj, err := kv.Get(ctx, section, "existing-key") reader, err := kv.Get(ctx, section, "existing-key")
require.NoError(t, err) require.NoError(t, err)
assert.Equal(t, "existing-key", obj.Key)
// Read the value // Read the value
value, err := io.ReadAll(obj.Value) value, err := io.ReadAll(reader)
require.NoError(t, err) require.NoError(t, err)
assert.Equal(t, testValue, string(value)) assert.Equal(t, testValue, string(value))
// Close the value reader // Close the value reader
err = obj.Value.Close() err = reader.Close()
require.NoError(t, err) require.NoError(t, err)
}) })
@ -122,14 +121,13 @@ func runTestKVSave(t *testing.T, kv resource.KV, nsPrefix string) {
require.NoError(t, err) require.NoError(t, err)
// Verify it was saved // Verify it was saved
obj, err := kv.Get(ctx, section, "new-key") reader, err := kv.Get(ctx, section, "new-key")
require.NoError(t, err) require.NoError(t, err)
assert.Equal(t, "new-key", obj.Key)
value, err := io.ReadAll(obj.Value) value, err := io.ReadAll(reader)
require.NoError(t, err) require.NoError(t, err)
assert.Equal(t, testValue, string(value)) assert.Equal(t, testValue, string(value))
err = obj.Value.Close() err = reader.Close()
require.NoError(t, err) require.NoError(t, err)
}) })
@ -144,13 +142,13 @@ func runTestKVSave(t *testing.T, kv resource.KV, nsPrefix string) {
require.NoError(t, err) require.NoError(t, err)
// Verify it was updated // Verify it was updated
obj, err := kv.Get(ctx, section, "overwrite-key") reader, err := kv.Get(ctx, section, "overwrite-key")
require.NoError(t, err) require.NoError(t, err)
value, err := io.ReadAll(obj.Value) value, err := io.ReadAll(reader)
require.NoError(t, err) require.NoError(t, err)
assert.Equal(t, newValue, string(value)) assert.Equal(t, newValue, string(value))
err = obj.Value.Close() err = reader.Close()
require.NoError(t, err) require.NoError(t, err)
}) })
@ -166,13 +164,13 @@ func runTestKVSave(t *testing.T, kv resource.KV, nsPrefix string) {
require.NoError(t, err) require.NoError(t, err)
// Verify binary data // Verify binary data
obj, err := kv.Get(ctx, section, "binary-key") reader, err := kv.Get(ctx, section, "binary-key")
require.NoError(t, err) require.NoError(t, err)
value, err := io.ReadAll(obj.Value) value, err := io.ReadAll(reader)
require.NoError(t, err) require.NoError(t, err)
assert.Equal(t, binaryData, value) assert.Equal(t, binaryData, value)
err = obj.Value.Close() err = reader.Close()
require.NoError(t, err) require.NoError(t, err)
}) })
@ -182,15 +180,14 @@ func runTestKVSave(t *testing.T, kv resource.KV, nsPrefix string) {
require.NoError(t, err) require.NoError(t, err)
// Verify it was saved with empty data // Verify it was saved with empty data
obj, err := kv.Get(ctx, section, "empty-key") reader, err := kv.Get(ctx, section, "empty-key")
require.NoError(t, err) require.NoError(t, err)
assert.Equal(t, "empty-key", obj.Key)
value, err := io.ReadAll(obj.Value) value, err := io.ReadAll(reader)
require.NoError(t, err) require.NoError(t, err)
assert.Equal(t, "", string(value)) assert.Equal(t, "", string(value))
assert.Len(t, value, 0) assert.Len(t, value, 0)
err = obj.Value.Close() err = reader.Close()
require.NoError(t, err) require.NoError(t, err)
}) })
} }
@ -416,14 +413,14 @@ func runTestKVConcurrent(t *testing.T, kv resource.KV, nsPrefix string) {
} }
// Get immediately // Get immediately
obj, err := kv.Get(ctx, section, key) reader, err := kv.Get(ctx, section, key)
if err != nil { if err != nil {
return return
} }
readValue, err := io.ReadAll(obj.Value) readValue, err := io.ReadAll(reader)
require.NoError(t, err) require.NoError(t, err)
err = obj.Value.Close() err = reader.Close()
require.NoError(t, err) require.NoError(t, err)
assert.Equal(t, value, string(readValue)) assert.Equal(t, value, string(readValue))
} }