KAFKA-14412: Decouple RocksDB access from CF (#15105)

To support future use-cases that use different strategies for accessing
RocksDB, we need to de-couple the RocksDB access strategy from the
Column Family access strategy.

To do this, we now have two separate accessors:

  * `DBAccessor`: dictates how we access RocksDB. Currently only one
    strategy is supported: `DirectDBAccessor`, which access RocksDB
    directly, via the `RocksDB` class for all operations. In the future, a
    `BatchedDBAccessor` will be added, which enables transactions via
    `WriteBatch`.
  * `ColumnFamilyAccessor`: maps StateStore operations to operations on
    one or more column families. This is a rename of the old
    `RocksDBDBAccessor`.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
This commit is contained in:
Nick Telford 2024-01-04 10:42:30 +00:00 committed by GitHub
parent c078e51c8f
commit 5bc3aa4280
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 178 additions and 91 deletions

View File

@ -131,7 +131,7 @@
<module name="ClassFanOutComplexity">
<!-- default is 20 -->
<property name="max" value="50"/>
<property name="max" value="52"/>
</module>
<module name="CyclomaticComplexity">
<!-- default is 10-->

View File

@ -108,7 +108,8 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
// VisibleForTesting
protected File dbDir;
RocksDB db;
RocksDBAccessor dbAccessor;
DBAccessor dbAccessor;
ColumnFamilyAccessor cfAccessor;
// the following option objects will be created in openDB and closed in the close() method
private RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter userSpecifiedOptions;
@ -251,6 +252,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
// with the measurements from Rocks DB
setupStatistics(configs, dbOptions);
openRocksDB(dbOptions, columnFamilyOptions);
dbAccessor = new DirectDBAccessor(db, fOptions);
open = true;
addValueProvidersToMetricsRecorder();
@ -291,7 +293,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnFamilyOptions)
);
dbAccessor = new SingleColumnFamilyAccessor(columnFamilies.get(0));
cfAccessor = new SingleColumnFamilyAccessor(columnFamilies.get(0));
}
/**
@ -390,7 +392,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
final byte[] value) {
Objects.requireNonNull(key, "key cannot be null");
validateStoreOpen();
dbAccessor.put(key.get(), value);
cfAccessor.put(dbAccessor, key.get(), value);
StoreQueryUtils.updatePosition(position, context);
}
@ -409,7 +411,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
@Override
public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
try (final WriteBatch batch = new WriteBatch()) {
dbAccessor.prepareBatch(entries, batch);
cfAccessor.prepareBatch(entries, batch);
write(batch);
StoreQueryUtils.updatePosition(position, context);
} catch (final RocksDBException e) {
@ -459,7 +461,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
Objects.requireNonNull(prefixKeySerializer, "prefixKeySerializer cannot be null");
final Bytes prefixBytes = Bytes.wrap(prefixKeySerializer.serialize(null, prefix));
final ManagedKeyValueIterator<Bytes, byte[]> rocksDbPrefixSeekIterator = dbAccessor.prefixScan(prefixBytes);
final ManagedKeyValueIterator<Bytes, byte[]> rocksDbPrefixSeekIterator = cfAccessor.prefixScan(dbAccessor, prefixBytes);
openIterators.add(rocksDbPrefixSeekIterator);
rocksDbPrefixSeekIterator.onClose(() -> openIterators.remove(rocksDbPrefixSeekIterator));
@ -478,7 +480,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
private synchronized byte[] get(final Bytes key, final Optional<ReadOptions> readOptions) {
validateStoreOpen();
try {
return readOptions.isPresent() ? dbAccessor.get(key.get(), readOptions.get()) : dbAccessor.get(key.get());
return readOptions.isPresent() ? cfAccessor.get(dbAccessor, key.get(), readOptions.get()) : cfAccessor.get(dbAccessor, key.get());
} catch (final RocksDBException e) {
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
throw new ProcessorStateException("Error while getting value for key from store " + name, e);
@ -490,7 +492,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
Objects.requireNonNull(key, "key cannot be null");
final byte[] oldValue;
try {
oldValue = dbAccessor.getOnly(key.get());
oldValue = cfAccessor.getOnly(dbAccessor, key.get());
} catch (final RocksDBException e) {
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
throw new ProcessorStateException("Error while getting value for key from store " + name, e);
@ -509,7 +511,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
// RocksDB's deleteRange() does not support a null upper bound so in the event
// of overflow from increment(), the operation cannot be performed and an
// IndexOutOfBoundsException will be thrown.
dbAccessor.deleteRange(keyFrom.get(), Bytes.increment(keyTo).get());
cfAccessor.deleteRange(dbAccessor, keyFrom.get(), Bytes.increment(keyTo).get());
}
@Override
@ -562,7 +564,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
validateStoreOpen();
final ManagedKeyValueIterator<Bytes, byte[]> rocksDBRangeIterator = dbAccessor.range(from, to, forward);
final ManagedKeyValueIterator<Bytes, byte[]> rocksDBRangeIterator = cfAccessor.range(dbAccessor, from, to, forward);
openIterators.add(rocksDBRangeIterator);
rocksDBRangeIterator.onClose(() -> openIterators.remove(rocksDBRangeIterator));
@ -602,7 +604,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
private KeyValueIterator<Bytes, byte[]> all(final boolean forward,
final Set<KeyValueIterator<Bytes, byte[]>> openIterators) {
validateStoreOpen();
final ManagedKeyValueIterator<Bytes, byte[]> rocksDbIterator = dbAccessor.all(forward);
final ManagedKeyValueIterator<Bytes, byte[]> rocksDbIterator = cfAccessor.all(dbAccessor, forward);
openIterators.add(rocksDbIterator);
rocksDbIterator.onClose(() -> openIterators.remove(rocksDbIterator));
return rocksDbIterator;
@ -624,7 +626,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
validateStoreOpen();
final long numEntries;
try {
numEntries = dbAccessor.approximateNumEntries();
numEntries = cfAccessor.approximateNumEntries(dbAccessor);
} catch (final RocksDBException e) {
throw new ProcessorStateException("Error fetching property from store " + name, e);
}
@ -646,7 +648,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
return;
}
try {
dbAccessor.flush();
cfAccessor.flush(dbAccessor);
} catch (final RocksDBException e) {
throw new ProcessorStateException("Error while executing flush from store " + name, e);
}
@ -655,7 +657,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
@Override
public void addToBatch(final KeyValue<byte[], byte[]> record,
final WriteBatchInterface batch) throws RocksDBException {
dbAccessor.addToBatch(record.key, record.value, batch);
cfAccessor.addToBatch(record.key, record.value, batch);
}
@Override
@ -690,6 +692,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
// Important: do not rearrange the order in which the below objects are closed!
// Order of closing must follow: ColumnFamilyHandle > RocksDB > DBOptions > ColumnFamilyOptions
cfAccessor.close();
dbAccessor.close();
db.close();
userSpecifiedOptions.close();
@ -701,6 +704,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
statistics.close();
}
cfAccessor = null;
dbAccessor = null;
userSpecifiedOptions = null;
wOptions = null;
@ -724,42 +728,122 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
}
}
interface RocksDBAccessor {
interface DBAccessor {
byte[] get(final ColumnFamilyHandle columnFamily, final byte[] key) throws RocksDBException;
byte[] get(final ColumnFamilyHandle columnFamily, final ReadOptions readOptions, final byte[] key) throws RocksDBException;
RocksIterator newIterator(final ColumnFamilyHandle columnFamily);
void put(final ColumnFamilyHandle columnFamily, final byte[] key, final byte[] value) throws RocksDBException;
void delete(final ColumnFamilyHandle columnFamily, final byte[] key) throws RocksDBException;
void deleteRange(final ColumnFamilyHandle columnFamily, final byte[] from, final byte[] to) throws RocksDBException;
long approximateNumEntries(final ColumnFamilyHandle columnFamily) throws RocksDBException;
void flush(final ColumnFamilyHandle... columnFamilies) throws RocksDBException;
void reset();
void close();
}
void put(final byte[] key,
final byte[] value);
static class DirectDBAccessor implements DBAccessor {
private final RocksDB db;
private final FlushOptions flushOptions;
DirectDBAccessor(final RocksDB db, final FlushOptions flushOptions) {
this.db = db;
this.flushOptions = flushOptions;
}
@Override
public byte[] get(final ColumnFamilyHandle columnFamily, final byte[] key) throws RocksDBException {
return db.get(columnFamily, key);
}
@Override
public byte[] get(final ColumnFamilyHandle columnFamily, final ReadOptions readOptions, final byte[] key) throws RocksDBException {
return db.get(columnFamily, readOptions, key);
}
@Override
public RocksIterator newIterator(final ColumnFamilyHandle columnFamily) {
return db.newIterator(columnFamily);
}
@Override
public void put(final ColumnFamilyHandle columnFamily, final byte[] key, final byte[] value) throws RocksDBException {
db.put(columnFamily, key, value);
}
@Override
public void delete(final ColumnFamilyHandle columnFamily, final byte[] key) throws RocksDBException {
db.delete(columnFamily, key);
}
@Override
public void deleteRange(final ColumnFamilyHandle columnFamily, final byte[] from, final byte[] to) throws RocksDBException {
db.deleteRange(columnFamily, from, to);
}
@Override
public long approximateNumEntries(final ColumnFamilyHandle columnFamily) throws RocksDBException {
return db.getLongProperty(columnFamily, "rocksdb.estimate-num-keys");
}
@Override
public void flush(final ColumnFamilyHandle... columnFamilies) throws RocksDBException {
if (columnFamilies.length == 0) {
db.flush(flushOptions);
} else if (columnFamilies.length == 1) {
db.flush(flushOptions, columnFamilies[0]);
} else {
db.flush(flushOptions, Arrays.asList(columnFamilies));
}
}
@Override
public void reset() {
// no state to reset
}
@Override
public void close() {
// nothing to close
}
}
interface ColumnFamilyAccessor {
void put(final DBAccessor accessor, final byte[] key, final byte[] value);
void prepareBatch(final List<KeyValue<Bytes, byte[]>> entries,
final WriteBatchInterface batch) throws RocksDBException;
byte[] get(final byte[] key) throws RocksDBException;
byte[] get(final DBAccessor accessor, final byte[] key) throws RocksDBException;
byte[] get(final byte[] key, ReadOptions readOptions) throws RocksDBException;
byte[] get(final DBAccessor accessor, final byte[] key, final ReadOptions readOptions) throws RocksDBException;
/**
* In contrast to get(), we don't migrate the key to new CF.
* <p>
* Use for get() within delete() -- no need to migrate, as it's deleted anyway
*/
byte[] getOnly(final byte[] key) throws RocksDBException;
byte[] getOnly(final DBAccessor accessor, final byte[] key) throws RocksDBException;
ManagedKeyValueIterator<Bytes, byte[]> range(final Bytes from,
ManagedKeyValueIterator<Bytes, byte[]> range(final DBAccessor accessor,
final Bytes from,
final Bytes to,
final boolean forward);
/**
* Deletes keys entries in the range ['from', 'to'], including 'from' and excluding 'to'.
*/
void deleteRange(final byte[] from,
final byte[] to);
void deleteRange(final DBAccessor accessor, final byte[] from, final byte[] to);
ManagedKeyValueIterator<Bytes, byte[]> all(final boolean forward);
ManagedKeyValueIterator<Bytes, byte[]> all(final DBAccessor accessor, final boolean forward);
ManagedKeyValueIterator<Bytes, byte[]> prefixScan(final Bytes prefix);
ManagedKeyValueIterator<Bytes, byte[]> prefixScan(final DBAccessor accessor, final Bytes prefix);
long approximateNumEntries() throws RocksDBException;
long approximateNumEntries(final DBAccessor accessor) throws RocksDBException;
void flush() throws RocksDBException;
void flush(final DBAccessor accessor) throws RocksDBException;
void addToBatch(final byte[] key,
final byte[] value,
@ -768,7 +852,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
void close();
}
class SingleColumnFamilyAccessor implements RocksDBAccessor {
class SingleColumnFamilyAccessor implements ColumnFamilyAccessor {
private final ColumnFamilyHandle columnFamily;
SingleColumnFamilyAccessor(final ColumnFamilyHandle columnFamily) {
@ -776,18 +860,19 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
}
@Override
public void put(final byte[] key,
public void put(final DBAccessor accessor,
final byte[] key,
final byte[] value) {
if (value == null) {
try {
db.delete(columnFamily, wOptions, key);
accessor.delete(columnFamily, key);
} catch (final RocksDBException e) {
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
throw new ProcessorStateException("Error while removing key from store " + name, e);
}
} else {
try {
db.put(columnFamily, wOptions, key, value);
accessor.put(columnFamily, key, value);
} catch (final RocksDBException e) {
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
throw new ProcessorStateException("Error while putting key/value into store " + name, e);
@ -805,27 +890,28 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
}
@Override
public byte[] get(final byte[] key) throws RocksDBException {
return db.get(columnFamily, key);
public byte[] get(final DBAccessor accessor, final byte[] key) throws RocksDBException {
return accessor.get(columnFamily, key);
}
@Override
public byte[] get(final byte[] key, final ReadOptions readOptions) throws RocksDBException {
return db.get(columnFamily, readOptions, key);
public byte[] get(final DBAccessor accessor, final byte[] key, final ReadOptions readOptions) throws RocksDBException {
return accessor.get(columnFamily, readOptions, key);
}
@Override
public byte[] getOnly(final byte[] key) throws RocksDBException {
return db.get(columnFamily, key);
public byte[] getOnly(final DBAccessor accessor, final byte[] key) throws RocksDBException {
return get(accessor, key);
}
@Override
public ManagedKeyValueIterator<Bytes, byte[]> range(final Bytes from,
public ManagedKeyValueIterator<Bytes, byte[]> range(final DBAccessor accessor,
final Bytes from,
final Bytes to,
final boolean forward) {
return new RocksDBRangeIterator(
name,
db.newIterator(columnFamily),
accessor.newIterator(columnFamily),
from,
to,
forward,
@ -834,9 +920,9 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
}
@Override
public void deleteRange(final byte[] from, final byte[] to) {
public void deleteRange(final DBAccessor accessor, final byte[] from, final byte[] to) {
try {
db.deleteRange(columnFamily, wOptions, from, to);
accessor.deleteRange(columnFamily, from, to);
} catch (final RocksDBException e) {
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
throw new ProcessorStateException("Error while removing key from store " + name, e);
@ -844,8 +930,8 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
}
@Override
public ManagedKeyValueIterator<Bytes, byte[]> all(final boolean forward) {
final RocksIterator innerIterWithTimestamp = db.newIterator(columnFamily);
public ManagedKeyValueIterator<Bytes, byte[]> all(final DBAccessor accessor, final boolean forward) {
final RocksIterator innerIterWithTimestamp = accessor.newIterator(columnFamily);
if (forward) {
innerIterWithTimestamp.seekToFirst();
} else {
@ -855,11 +941,11 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
}
@Override
public ManagedKeyValueIterator<Bytes, byte[]> prefixScan(final Bytes prefix) {
public ManagedKeyValueIterator<Bytes, byte[]> prefixScan(final DBAccessor accessor, final Bytes prefix) {
final Bytes to = incrementWithoutOverflow(prefix);
return new RocksDBRangeIterator(
name,
db.newIterator(columnFamily),
accessor.newIterator(columnFamily),
prefix,
to,
true,
@ -868,13 +954,13 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
}
@Override
public long approximateNumEntries() throws RocksDBException {
return db.getLongProperty(columnFamily, "rocksdb.estimate-num-keys");
public long approximateNumEntries(final DBAccessor accessor) throws RocksDBException {
return accessor.approximateNumEntries(columnFamily);
}
@Override
public void flush() throws RocksDBException {
db.flush(fOptions, columnFamily);
public void flush(final DBAccessor accessor) throws RocksDBException {
accessor.flush(columnFamily);
}
@Override
@ -903,7 +989,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
position
);
// If version headers are not present or version is V0
dbAccessor.addToBatch(record.key(), record.value(), batch);
cfAccessor.addToBatch(record.key(), record.value(), batch);
}
write(batch);
} catch (final RocksDBException e) {

View File

@ -78,16 +78,16 @@ public class RocksDBTimestampedStore extends RocksDBStore implements Timestamped
noTimestampsIter.seekToFirst();
if (noTimestampsIter.isValid()) {
log.info("Opening store {} in upgrade mode", name);
dbAccessor = new DualColumnFamilyAccessor(noTimestampColumnFamily, withTimestampColumnFamily);
cfAccessor = new DualColumnFamilyAccessor(noTimestampColumnFamily, withTimestampColumnFamily);
} else {
log.info("Opening store {} in regular mode", name);
dbAccessor = new SingleColumnFamilyAccessor(withTimestampColumnFamily);
cfAccessor = new SingleColumnFamilyAccessor(withTimestampColumnFamily);
noTimestampColumnFamily.close();
}
noTimestampsIter.close();
}
private class DualColumnFamilyAccessor implements RocksDBAccessor {
private class DualColumnFamilyAccessor implements ColumnFamilyAccessor {
private final ColumnFamilyHandle oldColumnFamily;
private final ColumnFamilyHandle newColumnFamily;
@ -98,30 +98,31 @@ public class RocksDBTimestampedStore extends RocksDBStore implements Timestamped
}
@Override
public void put(final byte[] key,
public void put(final DBAccessor accessor,
final byte[] key,
final byte[] valueWithTimestamp) {
if (valueWithTimestamp == null) {
try {
db.delete(oldColumnFamily, wOptions, key);
accessor.delete(oldColumnFamily, key);
} catch (final RocksDBException e) {
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
throw new ProcessorStateException("Error while removing key from store " + name, e);
}
try {
db.delete(newColumnFamily, wOptions, key);
accessor.delete(newColumnFamily, key);
} catch (final RocksDBException e) {
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
throw new ProcessorStateException("Error while removing key from store " + name, e);
}
} else {
try {
db.delete(oldColumnFamily, wOptions, key);
accessor.delete(oldColumnFamily, key);
} catch (final RocksDBException e) {
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
throw new ProcessorStateException("Error while removing key from store " + name, e);
}
try {
db.put(newColumnFamily, wOptions, key, valueWithTimestamp);
accessor.put(newColumnFamily, key, valueWithTimestamp);
StoreQueryUtils.updatePosition(position, context);
} catch (final RocksDBException e) {
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
@ -140,28 +141,28 @@ public class RocksDBTimestampedStore extends RocksDBStore implements Timestamped
}
@Override
public byte[] get(final byte[] key) throws RocksDBException {
return get(key, Optional.empty());
public byte[] get(final DBAccessor accessor, final byte[] key) throws RocksDBException {
return get(accessor, key, Optional.empty());
}
@Override
public byte[] get(final byte[] key, final ReadOptions readOptions) throws RocksDBException {
return get(key, Optional.of(readOptions));
public byte[] get(final DBAccessor accessor, final byte[] key, final ReadOptions readOptions) throws RocksDBException {
return get(accessor, key, Optional.of(readOptions));
}
private byte[] get(final byte[] key, final Optional<ReadOptions> readOptions) throws RocksDBException {
final byte[] valueWithTimestamp = readOptions.isPresent() ? db.get(newColumnFamily, readOptions.get(), key) : db.get(newColumnFamily, key);
private byte[] get(final DBAccessor accessor, final byte[] key, final Optional<ReadOptions> readOptions) throws RocksDBException {
final byte[] valueWithTimestamp = readOptions.isPresent() ? accessor.get(newColumnFamily, readOptions.get(), key) : accessor.get(newColumnFamily, key);
if (valueWithTimestamp != null) {
return valueWithTimestamp;
}
final byte[] plainValue = readOptions.isPresent() ? db.get(oldColumnFamily, readOptions.get(), key) : db.get(oldColumnFamily, key);
final byte[] plainValue = readOptions.isPresent() ? accessor.get(oldColumnFamily, readOptions.get(), key) : accessor.get(oldColumnFamily, key);
if (plainValue != null) {
final byte[] valueWithUnknownTimestamp = convertToTimestampedFormat(plainValue);
// this does only work, because the changelog topic contains correct data already
// for other format changes, we cannot take this short cut and can only migrate data
// from old to new store on put()
put(key, valueWithUnknownTimestamp);
put(accessor, key, valueWithUnknownTimestamp);
return valueWithUnknownTimestamp;
}
@ -169,13 +170,13 @@ public class RocksDBTimestampedStore extends RocksDBStore implements Timestamped
}
@Override
public byte[] getOnly(final byte[] key) throws RocksDBException {
final byte[] valueWithTimestamp = db.get(newColumnFamily, key);
public byte[] getOnly(final DBAccessor accessor, final byte[] key) throws RocksDBException {
final byte[] valueWithTimestamp = accessor.get(newColumnFamily, key);
if (valueWithTimestamp != null) {
return valueWithTimestamp;
}
final byte[] plainValue = db.get(oldColumnFamily, key);
final byte[] plainValue = accessor.get(oldColumnFamily, key);
if (plainValue != null) {
return convertToTimestampedFormat(plainValue);
}
@ -184,13 +185,14 @@ public class RocksDBTimestampedStore extends RocksDBStore implements Timestamped
}
@Override
public ManagedKeyValueIterator<Bytes, byte[]> range(final Bytes from,
public ManagedKeyValueIterator<Bytes, byte[]> range(final DBAccessor accessor,
final Bytes from,
final Bytes to,
final boolean forward) {
return new RocksDBDualCFRangeIterator(
name,
db.newIterator(newColumnFamily),
db.newIterator(oldColumnFamily),
accessor.newIterator(newColumnFamily),
accessor.newIterator(oldColumnFamily),
from,
to,
forward,
@ -198,15 +200,15 @@ public class RocksDBTimestampedStore extends RocksDBStore implements Timestamped
}
@Override
public void deleteRange(final byte[] from, final byte[] to) {
public void deleteRange(final DBAccessor accessor, final byte[] from, final byte[] to) {
try {
db.deleteRange(oldColumnFamily, wOptions, from, to);
accessor.deleteRange(oldColumnFamily, from, to);
} catch (final RocksDBException e) {
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
throw new ProcessorStateException("Error while removing key from store " + name, e);
}
try {
db.deleteRange(newColumnFamily, wOptions, from, to);
accessor.deleteRange(newColumnFamily, from, to);
} catch (final RocksDBException e) {
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
throw new ProcessorStateException("Error while removing key from store " + name, e);
@ -214,9 +216,9 @@ public class RocksDBTimestampedStore extends RocksDBStore implements Timestamped
}
@Override
public ManagedKeyValueIterator<Bytes, byte[]> all(final boolean forward) {
final RocksIterator innerIterWithTimestamp = db.newIterator(newColumnFamily);
final RocksIterator innerIterNoTimestamp = db.newIterator(oldColumnFamily);
public ManagedKeyValueIterator<Bytes, byte[]> all(final DBAccessor accessor, final boolean forward) {
final RocksIterator innerIterWithTimestamp = accessor.newIterator(newColumnFamily);
final RocksIterator innerIterNoTimestamp = accessor.newIterator(oldColumnFamily);
if (forward) {
innerIterWithTimestamp.seekToFirst();
innerIterNoTimestamp.seekToFirst();
@ -228,12 +230,12 @@ public class RocksDBTimestampedStore extends RocksDBStore implements Timestamped
}
@Override
public ManagedKeyValueIterator<Bytes, byte[]> prefixScan(final Bytes prefix) {
public ManagedKeyValueIterator<Bytes, byte[]> prefixScan(final DBAccessor accessor, final Bytes prefix) {
final Bytes to = incrementWithoutOverflow(prefix);
return new RocksDBDualCFRangeIterator(
name,
db.newIterator(newColumnFamily),
db.newIterator(oldColumnFamily),
accessor.newIterator(newColumnFamily),
accessor.newIterator(oldColumnFamily),
prefix,
to,
true,
@ -242,15 +244,14 @@ public class RocksDBTimestampedStore extends RocksDBStore implements Timestamped
}
@Override
public long approximateNumEntries() throws RocksDBException {
return db.getLongProperty(oldColumnFamily, "rocksdb.estimate-num-keys")
+ db.getLongProperty(newColumnFamily, "rocksdb.estimate-num-keys");
public long approximateNumEntries(final DBAccessor accessor) throws RocksDBException {
return accessor.approximateNumEntries(oldColumnFamily) +
accessor.approximateNumEntries(newColumnFamily);
}
@Override
public void flush() throws RocksDBException {
db.flush(fOptions, oldColumnFamily);
db.flush(fOptions, newColumnFamily);
public void flush(final DBAccessor accessor) throws RocksDBException {
accessor.flush(oldColumnFamily, newColumnFamily);
}
@Override