KAFKA-14412: Decouple RocksDB access from CF (#15105)

To support future use-cases that use different strategies for accessing
RocksDB, we need to de-couple the RocksDB access strategy from the
Column Family access strategy.

To do this, we now have two separate accessors:

  * `DBAccessor`: dictates how we access RocksDB. Currently only one
    strategy is supported: `DirectDBAccessor`, which access RocksDB
    directly, via the `RocksDB` class for all operations. In the future, a
    `BatchedDBAccessor` will be added, which enables transactions via
    `WriteBatch`.
  * `ColumnFamilyAccessor`: maps StateStore operations to operations on
    one or more column families. This is a rename of the old
    `RocksDBDBAccessor`.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
This commit is contained in:
Nick Telford 2024-01-04 10:42:30 +00:00 committed by GitHub
parent c078e51c8f
commit 5bc3aa4280
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 178 additions and 91 deletions

View File

@ -131,7 +131,7 @@
<module name="ClassFanOutComplexity"> <module name="ClassFanOutComplexity">
<!-- default is 20 --> <!-- default is 20 -->
<property name="max" value="50"/> <property name="max" value="52"/>
</module> </module>
<module name="CyclomaticComplexity"> <module name="CyclomaticComplexity">
<!-- default is 10--> <!-- default is 10-->

View File

@ -108,7 +108,8 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
// VisibleForTesting // VisibleForTesting
protected File dbDir; protected File dbDir;
RocksDB db; RocksDB db;
RocksDBAccessor dbAccessor; DBAccessor dbAccessor;
ColumnFamilyAccessor cfAccessor;
// the following option objects will be created in openDB and closed in the close() method // the following option objects will be created in openDB and closed in the close() method
private RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter userSpecifiedOptions; private RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter userSpecifiedOptions;
@ -251,6 +252,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
// with the measurements from Rocks DB // with the measurements from Rocks DB
setupStatistics(configs, dbOptions); setupStatistics(configs, dbOptions);
openRocksDB(dbOptions, columnFamilyOptions); openRocksDB(dbOptions, columnFamilyOptions);
dbAccessor = new DirectDBAccessor(db, fOptions);
open = true; open = true;
addValueProvidersToMetricsRecorder(); addValueProvidersToMetricsRecorder();
@ -291,7 +293,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnFamilyOptions) new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnFamilyOptions)
); );
dbAccessor = new SingleColumnFamilyAccessor(columnFamilies.get(0)); cfAccessor = new SingleColumnFamilyAccessor(columnFamilies.get(0));
} }
/** /**
@ -390,7 +392,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
final byte[] value) { final byte[] value) {
Objects.requireNonNull(key, "key cannot be null"); Objects.requireNonNull(key, "key cannot be null");
validateStoreOpen(); validateStoreOpen();
dbAccessor.put(key.get(), value); cfAccessor.put(dbAccessor, key.get(), value);
StoreQueryUtils.updatePosition(position, context); StoreQueryUtils.updatePosition(position, context);
} }
@ -409,7 +411,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
@Override @Override
public void putAll(final List<KeyValue<Bytes, byte[]>> entries) { public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
try (final WriteBatch batch = new WriteBatch()) { try (final WriteBatch batch = new WriteBatch()) {
dbAccessor.prepareBatch(entries, batch); cfAccessor.prepareBatch(entries, batch);
write(batch); write(batch);
StoreQueryUtils.updatePosition(position, context); StoreQueryUtils.updatePosition(position, context);
} catch (final RocksDBException e) { } catch (final RocksDBException e) {
@ -459,7 +461,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
Objects.requireNonNull(prefixKeySerializer, "prefixKeySerializer cannot be null"); Objects.requireNonNull(prefixKeySerializer, "prefixKeySerializer cannot be null");
final Bytes prefixBytes = Bytes.wrap(prefixKeySerializer.serialize(null, prefix)); final Bytes prefixBytes = Bytes.wrap(prefixKeySerializer.serialize(null, prefix));
final ManagedKeyValueIterator<Bytes, byte[]> rocksDbPrefixSeekIterator = dbAccessor.prefixScan(prefixBytes); final ManagedKeyValueIterator<Bytes, byte[]> rocksDbPrefixSeekIterator = cfAccessor.prefixScan(dbAccessor, prefixBytes);
openIterators.add(rocksDbPrefixSeekIterator); openIterators.add(rocksDbPrefixSeekIterator);
rocksDbPrefixSeekIterator.onClose(() -> openIterators.remove(rocksDbPrefixSeekIterator)); rocksDbPrefixSeekIterator.onClose(() -> openIterators.remove(rocksDbPrefixSeekIterator));
@ -478,7 +480,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
private synchronized byte[] get(final Bytes key, final Optional<ReadOptions> readOptions) { private synchronized byte[] get(final Bytes key, final Optional<ReadOptions> readOptions) {
validateStoreOpen(); validateStoreOpen();
try { try {
return readOptions.isPresent() ? dbAccessor.get(key.get(), readOptions.get()) : dbAccessor.get(key.get()); return readOptions.isPresent() ? cfAccessor.get(dbAccessor, key.get(), readOptions.get()) : cfAccessor.get(dbAccessor, key.get());
} catch (final RocksDBException e) { } catch (final RocksDBException e) {
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores. // String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
throw new ProcessorStateException("Error while getting value for key from store " + name, e); throw new ProcessorStateException("Error while getting value for key from store " + name, e);
@ -490,7 +492,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
Objects.requireNonNull(key, "key cannot be null"); Objects.requireNonNull(key, "key cannot be null");
final byte[] oldValue; final byte[] oldValue;
try { try {
oldValue = dbAccessor.getOnly(key.get()); oldValue = cfAccessor.getOnly(dbAccessor, key.get());
} catch (final RocksDBException e) { } catch (final RocksDBException e) {
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores. // String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
throw new ProcessorStateException("Error while getting value for key from store " + name, e); throw new ProcessorStateException("Error while getting value for key from store " + name, e);
@ -509,7 +511,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
// RocksDB's deleteRange() does not support a null upper bound so in the event // RocksDB's deleteRange() does not support a null upper bound so in the event
// of overflow from increment(), the operation cannot be performed and an // of overflow from increment(), the operation cannot be performed and an
// IndexOutOfBoundsException will be thrown. // IndexOutOfBoundsException will be thrown.
dbAccessor.deleteRange(keyFrom.get(), Bytes.increment(keyTo).get()); cfAccessor.deleteRange(dbAccessor, keyFrom.get(), Bytes.increment(keyTo).get());
} }
@Override @Override
@ -562,7 +564,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
validateStoreOpen(); validateStoreOpen();
final ManagedKeyValueIterator<Bytes, byte[]> rocksDBRangeIterator = dbAccessor.range(from, to, forward); final ManagedKeyValueIterator<Bytes, byte[]> rocksDBRangeIterator = cfAccessor.range(dbAccessor, from, to, forward);
openIterators.add(rocksDBRangeIterator); openIterators.add(rocksDBRangeIterator);
rocksDBRangeIterator.onClose(() -> openIterators.remove(rocksDBRangeIterator)); rocksDBRangeIterator.onClose(() -> openIterators.remove(rocksDBRangeIterator));
@ -602,7 +604,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
private KeyValueIterator<Bytes, byte[]> all(final boolean forward, private KeyValueIterator<Bytes, byte[]> all(final boolean forward,
final Set<KeyValueIterator<Bytes, byte[]>> openIterators) { final Set<KeyValueIterator<Bytes, byte[]>> openIterators) {
validateStoreOpen(); validateStoreOpen();
final ManagedKeyValueIterator<Bytes, byte[]> rocksDbIterator = dbAccessor.all(forward); final ManagedKeyValueIterator<Bytes, byte[]> rocksDbIterator = cfAccessor.all(dbAccessor, forward);
openIterators.add(rocksDbIterator); openIterators.add(rocksDbIterator);
rocksDbIterator.onClose(() -> openIterators.remove(rocksDbIterator)); rocksDbIterator.onClose(() -> openIterators.remove(rocksDbIterator));
return rocksDbIterator; return rocksDbIterator;
@ -624,7 +626,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
validateStoreOpen(); validateStoreOpen();
final long numEntries; final long numEntries;
try { try {
numEntries = dbAccessor.approximateNumEntries(); numEntries = cfAccessor.approximateNumEntries(dbAccessor);
} catch (final RocksDBException e) { } catch (final RocksDBException e) {
throw new ProcessorStateException("Error fetching property from store " + name, e); throw new ProcessorStateException("Error fetching property from store " + name, e);
} }
@ -646,7 +648,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
return; return;
} }
try { try {
dbAccessor.flush(); cfAccessor.flush(dbAccessor);
} catch (final RocksDBException e) { } catch (final RocksDBException e) {
throw new ProcessorStateException("Error while executing flush from store " + name, e); throw new ProcessorStateException("Error while executing flush from store " + name, e);
} }
@ -655,7 +657,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
@Override @Override
public void addToBatch(final KeyValue<byte[], byte[]> record, public void addToBatch(final KeyValue<byte[], byte[]> record,
final WriteBatchInterface batch) throws RocksDBException { final WriteBatchInterface batch) throws RocksDBException {
dbAccessor.addToBatch(record.key, record.value, batch); cfAccessor.addToBatch(record.key, record.value, batch);
} }
@Override @Override
@ -690,6 +692,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
// Important: do not rearrange the order in which the below objects are closed! // Important: do not rearrange the order in which the below objects are closed!
// Order of closing must follow: ColumnFamilyHandle > RocksDB > DBOptions > ColumnFamilyOptions // Order of closing must follow: ColumnFamilyHandle > RocksDB > DBOptions > ColumnFamilyOptions
cfAccessor.close();
dbAccessor.close(); dbAccessor.close();
db.close(); db.close();
userSpecifiedOptions.close(); userSpecifiedOptions.close();
@ -701,6 +704,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
statistics.close(); statistics.close();
} }
cfAccessor = null;
dbAccessor = null; dbAccessor = null;
userSpecifiedOptions = null; userSpecifiedOptions = null;
wOptions = null; wOptions = null;
@ -724,42 +728,122 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
} }
} }
interface RocksDBAccessor { interface DBAccessor {
byte[] get(final ColumnFamilyHandle columnFamily, final byte[] key) throws RocksDBException;
byte[] get(final ColumnFamilyHandle columnFamily, final ReadOptions readOptions, final byte[] key) throws RocksDBException;
RocksIterator newIterator(final ColumnFamilyHandle columnFamily);
void put(final ColumnFamilyHandle columnFamily, final byte[] key, final byte[] value) throws RocksDBException;
void delete(final ColumnFamilyHandle columnFamily, final byte[] key) throws RocksDBException;
void deleteRange(final ColumnFamilyHandle columnFamily, final byte[] from, final byte[] to) throws RocksDBException;
long approximateNumEntries(final ColumnFamilyHandle columnFamily) throws RocksDBException;
void flush(final ColumnFamilyHandle... columnFamilies) throws RocksDBException;
void reset();
void close();
}
void put(final byte[] key, static class DirectDBAccessor implements DBAccessor {
final byte[] value);
private final RocksDB db;
private final FlushOptions flushOptions;
DirectDBAccessor(final RocksDB db, final FlushOptions flushOptions) {
this.db = db;
this.flushOptions = flushOptions;
}
@Override
public byte[] get(final ColumnFamilyHandle columnFamily, final byte[] key) throws RocksDBException {
return db.get(columnFamily, key);
}
@Override
public byte[] get(final ColumnFamilyHandle columnFamily, final ReadOptions readOptions, final byte[] key) throws RocksDBException {
return db.get(columnFamily, readOptions, key);
}
@Override
public RocksIterator newIterator(final ColumnFamilyHandle columnFamily) {
return db.newIterator(columnFamily);
}
@Override
public void put(final ColumnFamilyHandle columnFamily, final byte[] key, final byte[] value) throws RocksDBException {
db.put(columnFamily, key, value);
}
@Override
public void delete(final ColumnFamilyHandle columnFamily, final byte[] key) throws RocksDBException {
db.delete(columnFamily, key);
}
@Override
public void deleteRange(final ColumnFamilyHandle columnFamily, final byte[] from, final byte[] to) throws RocksDBException {
db.deleteRange(columnFamily, from, to);
}
@Override
public long approximateNumEntries(final ColumnFamilyHandle columnFamily) throws RocksDBException {
return db.getLongProperty(columnFamily, "rocksdb.estimate-num-keys");
}
@Override
public void flush(final ColumnFamilyHandle... columnFamilies) throws RocksDBException {
if (columnFamilies.length == 0) {
db.flush(flushOptions);
} else if (columnFamilies.length == 1) {
db.flush(flushOptions, columnFamilies[0]);
} else {
db.flush(flushOptions, Arrays.asList(columnFamilies));
}
}
@Override
public void reset() {
// no state to reset
}
@Override
public void close() {
// nothing to close
}
}
interface ColumnFamilyAccessor {
void put(final DBAccessor accessor, final byte[] key, final byte[] value);
void prepareBatch(final List<KeyValue<Bytes, byte[]>> entries, void prepareBatch(final List<KeyValue<Bytes, byte[]>> entries,
final WriteBatchInterface batch) throws RocksDBException; final WriteBatchInterface batch) throws RocksDBException;
byte[] get(final byte[] key) throws RocksDBException; byte[] get(final DBAccessor accessor, final byte[] key) throws RocksDBException;
byte[] get(final byte[] key, ReadOptions readOptions) throws RocksDBException; byte[] get(final DBAccessor accessor, final byte[] key, final ReadOptions readOptions) throws RocksDBException;
/** /**
* In contrast to get(), we don't migrate the key to new CF. * In contrast to get(), we don't migrate the key to new CF.
* <p> * <p>
* Use for get() within delete() -- no need to migrate, as it's deleted anyway * Use for get() within delete() -- no need to migrate, as it's deleted anyway
*/ */
byte[] getOnly(final byte[] key) throws RocksDBException; byte[] getOnly(final DBAccessor accessor, final byte[] key) throws RocksDBException;
ManagedKeyValueIterator<Bytes, byte[]> range(final Bytes from, ManagedKeyValueIterator<Bytes, byte[]> range(final DBAccessor accessor,
final Bytes from,
final Bytes to, final Bytes to,
final boolean forward); final boolean forward);
/** /**
* Deletes keys entries in the range ['from', 'to'], including 'from' and excluding 'to'. * Deletes keys entries in the range ['from', 'to'], including 'from' and excluding 'to'.
*/ */
void deleteRange(final byte[] from, void deleteRange(final DBAccessor accessor, final byte[] from, final byte[] to);
final byte[] to);
ManagedKeyValueIterator<Bytes, byte[]> all(final boolean forward); ManagedKeyValueIterator<Bytes, byte[]> all(final DBAccessor accessor, final boolean forward);
ManagedKeyValueIterator<Bytes, byte[]> prefixScan(final Bytes prefix); ManagedKeyValueIterator<Bytes, byte[]> prefixScan(final DBAccessor accessor, final Bytes prefix);
long approximateNumEntries() throws RocksDBException; long approximateNumEntries(final DBAccessor accessor) throws RocksDBException;
void flush() throws RocksDBException; void flush(final DBAccessor accessor) throws RocksDBException;
void addToBatch(final byte[] key, void addToBatch(final byte[] key,
final byte[] value, final byte[] value,
@ -768,7 +852,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
void close(); void close();
} }
class SingleColumnFamilyAccessor implements RocksDBAccessor { class SingleColumnFamilyAccessor implements ColumnFamilyAccessor {
private final ColumnFamilyHandle columnFamily; private final ColumnFamilyHandle columnFamily;
SingleColumnFamilyAccessor(final ColumnFamilyHandle columnFamily) { SingleColumnFamilyAccessor(final ColumnFamilyHandle columnFamily) {
@ -776,18 +860,19 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
} }
@Override @Override
public void put(final byte[] key, public void put(final DBAccessor accessor,
final byte[] key,
final byte[] value) { final byte[] value) {
if (value == null) { if (value == null) {
try { try {
db.delete(columnFamily, wOptions, key); accessor.delete(columnFamily, key);
} catch (final RocksDBException e) { } catch (final RocksDBException e) {
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores. // String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
throw new ProcessorStateException("Error while removing key from store " + name, e); throw new ProcessorStateException("Error while removing key from store " + name, e);
} }
} else { } else {
try { try {
db.put(columnFamily, wOptions, key, value); accessor.put(columnFamily, key, value);
} catch (final RocksDBException e) { } catch (final RocksDBException e) {
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores. // String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
throw new ProcessorStateException("Error while putting key/value into store " + name, e); throw new ProcessorStateException("Error while putting key/value into store " + name, e);
@ -805,27 +890,28 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
} }
@Override @Override
public byte[] get(final byte[] key) throws RocksDBException { public byte[] get(final DBAccessor accessor, final byte[] key) throws RocksDBException {
return db.get(columnFamily, key); return accessor.get(columnFamily, key);
} }
@Override @Override
public byte[] get(final byte[] key, final ReadOptions readOptions) throws RocksDBException { public byte[] get(final DBAccessor accessor, final byte[] key, final ReadOptions readOptions) throws RocksDBException {
return db.get(columnFamily, readOptions, key); return accessor.get(columnFamily, readOptions, key);
} }
@Override @Override
public byte[] getOnly(final byte[] key) throws RocksDBException { public byte[] getOnly(final DBAccessor accessor, final byte[] key) throws RocksDBException {
return db.get(columnFamily, key); return get(accessor, key);
} }
@Override @Override
public ManagedKeyValueIterator<Bytes, byte[]> range(final Bytes from, public ManagedKeyValueIterator<Bytes, byte[]> range(final DBAccessor accessor,
final Bytes to, final Bytes from,
final boolean forward) { final Bytes to,
final boolean forward) {
return new RocksDBRangeIterator( return new RocksDBRangeIterator(
name, name,
db.newIterator(columnFamily), accessor.newIterator(columnFamily),
from, from,
to, to,
forward, forward,
@ -834,9 +920,9 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
} }
@Override @Override
public void deleteRange(final byte[] from, final byte[] to) { public void deleteRange(final DBAccessor accessor, final byte[] from, final byte[] to) {
try { try {
db.deleteRange(columnFamily, wOptions, from, to); accessor.deleteRange(columnFamily, from, to);
} catch (final RocksDBException e) { } catch (final RocksDBException e) {
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores. // String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
throw new ProcessorStateException("Error while removing key from store " + name, e); throw new ProcessorStateException("Error while removing key from store " + name, e);
@ -844,8 +930,8 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
} }
@Override @Override
public ManagedKeyValueIterator<Bytes, byte[]> all(final boolean forward) { public ManagedKeyValueIterator<Bytes, byte[]> all(final DBAccessor accessor, final boolean forward) {
final RocksIterator innerIterWithTimestamp = db.newIterator(columnFamily); final RocksIterator innerIterWithTimestamp = accessor.newIterator(columnFamily);
if (forward) { if (forward) {
innerIterWithTimestamp.seekToFirst(); innerIterWithTimestamp.seekToFirst();
} else { } else {
@ -855,11 +941,11 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
} }
@Override @Override
public ManagedKeyValueIterator<Bytes, byte[]> prefixScan(final Bytes prefix) { public ManagedKeyValueIterator<Bytes, byte[]> prefixScan(final DBAccessor accessor, final Bytes prefix) {
final Bytes to = incrementWithoutOverflow(prefix); final Bytes to = incrementWithoutOverflow(prefix);
return new RocksDBRangeIterator( return new RocksDBRangeIterator(
name, name,
db.newIterator(columnFamily), accessor.newIterator(columnFamily),
prefix, prefix,
to, to,
true, true,
@ -868,13 +954,13 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
} }
@Override @Override
public long approximateNumEntries() throws RocksDBException { public long approximateNumEntries(final DBAccessor accessor) throws RocksDBException {
return db.getLongProperty(columnFamily, "rocksdb.estimate-num-keys"); return accessor.approximateNumEntries(columnFamily);
} }
@Override @Override
public void flush() throws RocksDBException { public void flush(final DBAccessor accessor) throws RocksDBException {
db.flush(fOptions, columnFamily); accessor.flush(columnFamily);
} }
@Override @Override
@ -903,7 +989,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
position position
); );
// If version headers are not present or version is V0 // If version headers are not present or version is V0
dbAccessor.addToBatch(record.key(), record.value(), batch); cfAccessor.addToBatch(record.key(), record.value(), batch);
} }
write(batch); write(batch);
} catch (final RocksDBException e) { } catch (final RocksDBException e) {

View File

@ -78,16 +78,16 @@ public class RocksDBTimestampedStore extends RocksDBStore implements Timestamped
noTimestampsIter.seekToFirst(); noTimestampsIter.seekToFirst();
if (noTimestampsIter.isValid()) { if (noTimestampsIter.isValid()) {
log.info("Opening store {} in upgrade mode", name); log.info("Opening store {} in upgrade mode", name);
dbAccessor = new DualColumnFamilyAccessor(noTimestampColumnFamily, withTimestampColumnFamily); cfAccessor = new DualColumnFamilyAccessor(noTimestampColumnFamily, withTimestampColumnFamily);
} else { } else {
log.info("Opening store {} in regular mode", name); log.info("Opening store {} in regular mode", name);
dbAccessor = new SingleColumnFamilyAccessor(withTimestampColumnFamily); cfAccessor = new SingleColumnFamilyAccessor(withTimestampColumnFamily);
noTimestampColumnFamily.close(); noTimestampColumnFamily.close();
} }
noTimestampsIter.close(); noTimestampsIter.close();
} }
private class DualColumnFamilyAccessor implements RocksDBAccessor { private class DualColumnFamilyAccessor implements ColumnFamilyAccessor {
private final ColumnFamilyHandle oldColumnFamily; private final ColumnFamilyHandle oldColumnFamily;
private final ColumnFamilyHandle newColumnFamily; private final ColumnFamilyHandle newColumnFamily;
@ -98,30 +98,31 @@ public class RocksDBTimestampedStore extends RocksDBStore implements Timestamped
} }
@Override @Override
public void put(final byte[] key, public void put(final DBAccessor accessor,
final byte[] key,
final byte[] valueWithTimestamp) { final byte[] valueWithTimestamp) {
if (valueWithTimestamp == null) { if (valueWithTimestamp == null) {
try { try {
db.delete(oldColumnFamily, wOptions, key); accessor.delete(oldColumnFamily, key);
} catch (final RocksDBException e) { } catch (final RocksDBException e) {
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores. // String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
throw new ProcessorStateException("Error while removing key from store " + name, e); throw new ProcessorStateException("Error while removing key from store " + name, e);
} }
try { try {
db.delete(newColumnFamily, wOptions, key); accessor.delete(newColumnFamily, key);
} catch (final RocksDBException e) { } catch (final RocksDBException e) {
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores. // String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
throw new ProcessorStateException("Error while removing key from store " + name, e); throw new ProcessorStateException("Error while removing key from store " + name, e);
} }
} else { } else {
try { try {
db.delete(oldColumnFamily, wOptions, key); accessor.delete(oldColumnFamily, key);
} catch (final RocksDBException e) { } catch (final RocksDBException e) {
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores. // String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
throw new ProcessorStateException("Error while removing key from store " + name, e); throw new ProcessorStateException("Error while removing key from store " + name, e);
} }
try { try {
db.put(newColumnFamily, wOptions, key, valueWithTimestamp); accessor.put(newColumnFamily, key, valueWithTimestamp);
StoreQueryUtils.updatePosition(position, context); StoreQueryUtils.updatePosition(position, context);
} catch (final RocksDBException e) { } catch (final RocksDBException e) {
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores. // String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
@ -140,28 +141,28 @@ public class RocksDBTimestampedStore extends RocksDBStore implements Timestamped
} }
@Override @Override
public byte[] get(final byte[] key) throws RocksDBException { public byte[] get(final DBAccessor accessor, final byte[] key) throws RocksDBException {
return get(key, Optional.empty()); return get(accessor, key, Optional.empty());
} }
@Override @Override
public byte[] get(final byte[] key, final ReadOptions readOptions) throws RocksDBException { public byte[] get(final DBAccessor accessor, final byte[] key, final ReadOptions readOptions) throws RocksDBException {
return get(key, Optional.of(readOptions)); return get(accessor, key, Optional.of(readOptions));
} }
private byte[] get(final byte[] key, final Optional<ReadOptions> readOptions) throws RocksDBException { private byte[] get(final DBAccessor accessor, final byte[] key, final Optional<ReadOptions> readOptions) throws RocksDBException {
final byte[] valueWithTimestamp = readOptions.isPresent() ? db.get(newColumnFamily, readOptions.get(), key) : db.get(newColumnFamily, key); final byte[] valueWithTimestamp = readOptions.isPresent() ? accessor.get(newColumnFamily, readOptions.get(), key) : accessor.get(newColumnFamily, key);
if (valueWithTimestamp != null) { if (valueWithTimestamp != null) {
return valueWithTimestamp; return valueWithTimestamp;
} }
final byte[] plainValue = readOptions.isPresent() ? db.get(oldColumnFamily, readOptions.get(), key) : db.get(oldColumnFamily, key); final byte[] plainValue = readOptions.isPresent() ? accessor.get(oldColumnFamily, readOptions.get(), key) : accessor.get(oldColumnFamily, key);
if (plainValue != null) { if (plainValue != null) {
final byte[] valueWithUnknownTimestamp = convertToTimestampedFormat(plainValue); final byte[] valueWithUnknownTimestamp = convertToTimestampedFormat(plainValue);
// this does only work, because the changelog topic contains correct data already // this does only work, because the changelog topic contains correct data already
// for other format changes, we cannot take this short cut and can only migrate data // for other format changes, we cannot take this short cut and can only migrate data
// from old to new store on put() // from old to new store on put()
put(key, valueWithUnknownTimestamp); put(accessor, key, valueWithUnknownTimestamp);
return valueWithUnknownTimestamp; return valueWithUnknownTimestamp;
} }
@ -169,13 +170,13 @@ public class RocksDBTimestampedStore extends RocksDBStore implements Timestamped
} }
@Override @Override
public byte[] getOnly(final byte[] key) throws RocksDBException { public byte[] getOnly(final DBAccessor accessor, final byte[] key) throws RocksDBException {
final byte[] valueWithTimestamp = db.get(newColumnFamily, key); final byte[] valueWithTimestamp = accessor.get(newColumnFamily, key);
if (valueWithTimestamp != null) { if (valueWithTimestamp != null) {
return valueWithTimestamp; return valueWithTimestamp;
} }
final byte[] plainValue = db.get(oldColumnFamily, key); final byte[] plainValue = accessor.get(oldColumnFamily, key);
if (plainValue != null) { if (plainValue != null) {
return convertToTimestampedFormat(plainValue); return convertToTimestampedFormat(plainValue);
} }
@ -184,13 +185,14 @@ public class RocksDBTimestampedStore extends RocksDBStore implements Timestamped
} }
@Override @Override
public ManagedKeyValueIterator<Bytes, byte[]> range(final Bytes from, public ManagedKeyValueIterator<Bytes, byte[]> range(final DBAccessor accessor,
final Bytes to, final Bytes from,
final boolean forward) { final Bytes to,
final boolean forward) {
return new RocksDBDualCFRangeIterator( return new RocksDBDualCFRangeIterator(
name, name,
db.newIterator(newColumnFamily), accessor.newIterator(newColumnFamily),
db.newIterator(oldColumnFamily), accessor.newIterator(oldColumnFamily),
from, from,
to, to,
forward, forward,
@ -198,15 +200,15 @@ public class RocksDBTimestampedStore extends RocksDBStore implements Timestamped
} }
@Override @Override
public void deleteRange(final byte[] from, final byte[] to) { public void deleteRange(final DBAccessor accessor, final byte[] from, final byte[] to) {
try { try {
db.deleteRange(oldColumnFamily, wOptions, from, to); accessor.deleteRange(oldColumnFamily, from, to);
} catch (final RocksDBException e) { } catch (final RocksDBException e) {
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores. // String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
throw new ProcessorStateException("Error while removing key from store " + name, e); throw new ProcessorStateException("Error while removing key from store " + name, e);
} }
try { try {
db.deleteRange(newColumnFamily, wOptions, from, to); accessor.deleteRange(newColumnFamily, from, to);
} catch (final RocksDBException e) { } catch (final RocksDBException e) {
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores. // String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
throw new ProcessorStateException("Error while removing key from store " + name, e); throw new ProcessorStateException("Error while removing key from store " + name, e);
@ -214,9 +216,9 @@ public class RocksDBTimestampedStore extends RocksDBStore implements Timestamped
} }
@Override @Override
public ManagedKeyValueIterator<Bytes, byte[]> all(final boolean forward) { public ManagedKeyValueIterator<Bytes, byte[]> all(final DBAccessor accessor, final boolean forward) {
final RocksIterator innerIterWithTimestamp = db.newIterator(newColumnFamily); final RocksIterator innerIterWithTimestamp = accessor.newIterator(newColumnFamily);
final RocksIterator innerIterNoTimestamp = db.newIterator(oldColumnFamily); final RocksIterator innerIterNoTimestamp = accessor.newIterator(oldColumnFamily);
if (forward) { if (forward) {
innerIterWithTimestamp.seekToFirst(); innerIterWithTimestamp.seekToFirst();
innerIterNoTimestamp.seekToFirst(); innerIterNoTimestamp.seekToFirst();
@ -228,12 +230,12 @@ public class RocksDBTimestampedStore extends RocksDBStore implements Timestamped
} }
@Override @Override
public ManagedKeyValueIterator<Bytes, byte[]> prefixScan(final Bytes prefix) { public ManagedKeyValueIterator<Bytes, byte[]> prefixScan(final DBAccessor accessor, final Bytes prefix) {
final Bytes to = incrementWithoutOverflow(prefix); final Bytes to = incrementWithoutOverflow(prefix);
return new RocksDBDualCFRangeIterator( return new RocksDBDualCFRangeIterator(
name, name,
db.newIterator(newColumnFamily), accessor.newIterator(newColumnFamily),
db.newIterator(oldColumnFamily), accessor.newIterator(oldColumnFamily),
prefix, prefix,
to, to,
true, true,
@ -242,15 +244,14 @@ public class RocksDBTimestampedStore extends RocksDBStore implements Timestamped
} }
@Override @Override
public long approximateNumEntries() throws RocksDBException { public long approximateNumEntries(final DBAccessor accessor) throws RocksDBException {
return db.getLongProperty(oldColumnFamily, "rocksdb.estimate-num-keys") return accessor.approximateNumEntries(oldColumnFamily) +
+ db.getLongProperty(newColumnFamily, "rocksdb.estimate-num-keys"); accessor.approximateNumEntries(newColumnFamily);
} }
@Override @Override
public void flush() throws RocksDBException { public void flush(final DBAccessor accessor) throws RocksDBException {
db.flush(fOptions, oldColumnFamily); accessor.flush(oldColumnFamily, newColumnFamily);
db.flush(fOptions, newColumnFamily);
} }
@Override @Override