mirror of https://github.com/apache/kafka.git
KAFKA-14412: Decouple RocksDB access from CF (#15105)
To support future use-cases that use different strategies for accessing RocksDB, we need to de-couple the RocksDB access strategy from the Column Family access strategy. To do this, we now have two separate accessors: * `DBAccessor`: dictates how we access RocksDB. Currently only one strategy is supported: `DirectDBAccessor`, which access RocksDB directly, via the `RocksDB` class for all operations. In the future, a `BatchedDBAccessor` will be added, which enables transactions via `WriteBatch`. * `ColumnFamilyAccessor`: maps StateStore operations to operations on one or more column families. This is a rename of the old `RocksDBDBAccessor`. Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
This commit is contained in:
parent
c078e51c8f
commit
5bc3aa4280
|
@ -131,7 +131,7 @@
|
|||
|
||||
<module name="ClassFanOutComplexity">
|
||||
<!-- default is 20 -->
|
||||
<property name="max" value="50"/>
|
||||
<property name="max" value="52"/>
|
||||
</module>
|
||||
<module name="CyclomaticComplexity">
|
||||
<!-- default is 10-->
|
||||
|
|
|
@ -108,7 +108,8 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
|
|||
// VisibleForTesting
|
||||
protected File dbDir;
|
||||
RocksDB db;
|
||||
RocksDBAccessor dbAccessor;
|
||||
DBAccessor dbAccessor;
|
||||
ColumnFamilyAccessor cfAccessor;
|
||||
|
||||
// the following option objects will be created in openDB and closed in the close() method
|
||||
private RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter userSpecifiedOptions;
|
||||
|
@ -251,6 +252,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
|
|||
// with the measurements from Rocks DB
|
||||
setupStatistics(configs, dbOptions);
|
||||
openRocksDB(dbOptions, columnFamilyOptions);
|
||||
dbAccessor = new DirectDBAccessor(db, fOptions);
|
||||
open = true;
|
||||
|
||||
addValueProvidersToMetricsRecorder();
|
||||
|
@ -291,7 +293,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
|
|||
new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnFamilyOptions)
|
||||
);
|
||||
|
||||
dbAccessor = new SingleColumnFamilyAccessor(columnFamilies.get(0));
|
||||
cfAccessor = new SingleColumnFamilyAccessor(columnFamilies.get(0));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -390,7 +392,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
|
|||
final byte[] value) {
|
||||
Objects.requireNonNull(key, "key cannot be null");
|
||||
validateStoreOpen();
|
||||
dbAccessor.put(key.get(), value);
|
||||
cfAccessor.put(dbAccessor, key.get(), value);
|
||||
|
||||
StoreQueryUtils.updatePosition(position, context);
|
||||
}
|
||||
|
@ -409,7 +411,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
|
|||
@Override
|
||||
public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
|
||||
try (final WriteBatch batch = new WriteBatch()) {
|
||||
dbAccessor.prepareBatch(entries, batch);
|
||||
cfAccessor.prepareBatch(entries, batch);
|
||||
write(batch);
|
||||
StoreQueryUtils.updatePosition(position, context);
|
||||
} catch (final RocksDBException e) {
|
||||
|
@ -459,7 +461,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
|
|||
Objects.requireNonNull(prefixKeySerializer, "prefixKeySerializer cannot be null");
|
||||
final Bytes prefixBytes = Bytes.wrap(prefixKeySerializer.serialize(null, prefix));
|
||||
|
||||
final ManagedKeyValueIterator<Bytes, byte[]> rocksDbPrefixSeekIterator = dbAccessor.prefixScan(prefixBytes);
|
||||
final ManagedKeyValueIterator<Bytes, byte[]> rocksDbPrefixSeekIterator = cfAccessor.prefixScan(dbAccessor, prefixBytes);
|
||||
openIterators.add(rocksDbPrefixSeekIterator);
|
||||
rocksDbPrefixSeekIterator.onClose(() -> openIterators.remove(rocksDbPrefixSeekIterator));
|
||||
|
||||
|
@ -478,7 +480,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
|
|||
private synchronized byte[] get(final Bytes key, final Optional<ReadOptions> readOptions) {
|
||||
validateStoreOpen();
|
||||
try {
|
||||
return readOptions.isPresent() ? dbAccessor.get(key.get(), readOptions.get()) : dbAccessor.get(key.get());
|
||||
return readOptions.isPresent() ? cfAccessor.get(dbAccessor, key.get(), readOptions.get()) : cfAccessor.get(dbAccessor, key.get());
|
||||
} catch (final RocksDBException e) {
|
||||
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
|
||||
throw new ProcessorStateException("Error while getting value for key from store " + name, e);
|
||||
|
@ -490,7 +492,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
|
|||
Objects.requireNonNull(key, "key cannot be null");
|
||||
final byte[] oldValue;
|
||||
try {
|
||||
oldValue = dbAccessor.getOnly(key.get());
|
||||
oldValue = cfAccessor.getOnly(dbAccessor, key.get());
|
||||
} catch (final RocksDBException e) {
|
||||
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
|
||||
throw new ProcessorStateException("Error while getting value for key from store " + name, e);
|
||||
|
@ -509,7 +511,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
|
|||
// RocksDB's deleteRange() does not support a null upper bound so in the event
|
||||
// of overflow from increment(), the operation cannot be performed and an
|
||||
// IndexOutOfBoundsException will be thrown.
|
||||
dbAccessor.deleteRange(keyFrom.get(), Bytes.increment(keyTo).get());
|
||||
cfAccessor.deleteRange(dbAccessor, keyFrom.get(), Bytes.increment(keyTo).get());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -562,7 +564,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
|
|||
|
||||
validateStoreOpen();
|
||||
|
||||
final ManagedKeyValueIterator<Bytes, byte[]> rocksDBRangeIterator = dbAccessor.range(from, to, forward);
|
||||
final ManagedKeyValueIterator<Bytes, byte[]> rocksDBRangeIterator = cfAccessor.range(dbAccessor, from, to, forward);
|
||||
openIterators.add(rocksDBRangeIterator);
|
||||
rocksDBRangeIterator.onClose(() -> openIterators.remove(rocksDBRangeIterator));
|
||||
|
||||
|
@ -602,7 +604,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
|
|||
private KeyValueIterator<Bytes, byte[]> all(final boolean forward,
|
||||
final Set<KeyValueIterator<Bytes, byte[]>> openIterators) {
|
||||
validateStoreOpen();
|
||||
final ManagedKeyValueIterator<Bytes, byte[]> rocksDbIterator = dbAccessor.all(forward);
|
||||
final ManagedKeyValueIterator<Bytes, byte[]> rocksDbIterator = cfAccessor.all(dbAccessor, forward);
|
||||
openIterators.add(rocksDbIterator);
|
||||
rocksDbIterator.onClose(() -> openIterators.remove(rocksDbIterator));
|
||||
return rocksDbIterator;
|
||||
|
@ -624,7 +626,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
|
|||
validateStoreOpen();
|
||||
final long numEntries;
|
||||
try {
|
||||
numEntries = dbAccessor.approximateNumEntries();
|
||||
numEntries = cfAccessor.approximateNumEntries(dbAccessor);
|
||||
} catch (final RocksDBException e) {
|
||||
throw new ProcessorStateException("Error fetching property from store " + name, e);
|
||||
}
|
||||
|
@ -646,7 +648,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
|
|||
return;
|
||||
}
|
||||
try {
|
||||
dbAccessor.flush();
|
||||
cfAccessor.flush(dbAccessor);
|
||||
} catch (final RocksDBException e) {
|
||||
throw new ProcessorStateException("Error while executing flush from store " + name, e);
|
||||
}
|
||||
|
@ -655,7 +657,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
|
|||
@Override
|
||||
public void addToBatch(final KeyValue<byte[], byte[]> record,
|
||||
final WriteBatchInterface batch) throws RocksDBException {
|
||||
dbAccessor.addToBatch(record.key, record.value, batch);
|
||||
cfAccessor.addToBatch(record.key, record.value, batch);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -690,6 +692,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
|
|||
|
||||
// Important: do not rearrange the order in which the below objects are closed!
|
||||
// Order of closing must follow: ColumnFamilyHandle > RocksDB > DBOptions > ColumnFamilyOptions
|
||||
cfAccessor.close();
|
||||
dbAccessor.close();
|
||||
db.close();
|
||||
userSpecifiedOptions.close();
|
||||
|
@ -701,6 +704,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
|
|||
statistics.close();
|
||||
}
|
||||
|
||||
cfAccessor = null;
|
||||
dbAccessor = null;
|
||||
userSpecifiedOptions = null;
|
||||
wOptions = null;
|
||||
|
@ -724,42 +728,122 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
|
|||
}
|
||||
}
|
||||
|
||||
interface RocksDBAccessor {
|
||||
interface DBAccessor {
|
||||
byte[] get(final ColumnFamilyHandle columnFamily, final byte[] key) throws RocksDBException;
|
||||
byte[] get(final ColumnFamilyHandle columnFamily, final ReadOptions readOptions, final byte[] key) throws RocksDBException;
|
||||
RocksIterator newIterator(final ColumnFamilyHandle columnFamily);
|
||||
void put(final ColumnFamilyHandle columnFamily, final byte[] key, final byte[] value) throws RocksDBException;
|
||||
void delete(final ColumnFamilyHandle columnFamily, final byte[] key) throws RocksDBException;
|
||||
void deleteRange(final ColumnFamilyHandle columnFamily, final byte[] from, final byte[] to) throws RocksDBException;
|
||||
long approximateNumEntries(final ColumnFamilyHandle columnFamily) throws RocksDBException;
|
||||
void flush(final ColumnFamilyHandle... columnFamilies) throws RocksDBException;
|
||||
void reset();
|
||||
void close();
|
||||
}
|
||||
|
||||
void put(final byte[] key,
|
||||
final byte[] value);
|
||||
static class DirectDBAccessor implements DBAccessor {
|
||||
|
||||
private final RocksDB db;
|
||||
private final FlushOptions flushOptions;
|
||||
|
||||
DirectDBAccessor(final RocksDB db, final FlushOptions flushOptions) {
|
||||
this.db = db;
|
||||
this.flushOptions = flushOptions;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] get(final ColumnFamilyHandle columnFamily, final byte[] key) throws RocksDBException {
|
||||
return db.get(columnFamily, key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] get(final ColumnFamilyHandle columnFamily, final ReadOptions readOptions, final byte[] key) throws RocksDBException {
|
||||
return db.get(columnFamily, readOptions, key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RocksIterator newIterator(final ColumnFamilyHandle columnFamily) {
|
||||
return db.newIterator(columnFamily);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void put(final ColumnFamilyHandle columnFamily, final byte[] key, final byte[] value) throws RocksDBException {
|
||||
db.put(columnFamily, key, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void delete(final ColumnFamilyHandle columnFamily, final byte[] key) throws RocksDBException {
|
||||
db.delete(columnFamily, key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteRange(final ColumnFamilyHandle columnFamily, final byte[] from, final byte[] to) throws RocksDBException {
|
||||
db.deleteRange(columnFamily, from, to);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long approximateNumEntries(final ColumnFamilyHandle columnFamily) throws RocksDBException {
|
||||
return db.getLongProperty(columnFamily, "rocksdb.estimate-num-keys");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flush(final ColumnFamilyHandle... columnFamilies) throws RocksDBException {
|
||||
if (columnFamilies.length == 0) {
|
||||
db.flush(flushOptions);
|
||||
} else if (columnFamilies.length == 1) {
|
||||
db.flush(flushOptions, columnFamilies[0]);
|
||||
} else {
|
||||
db.flush(flushOptions, Arrays.asList(columnFamilies));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reset() {
|
||||
// no state to reset
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
// nothing to close
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
interface ColumnFamilyAccessor {
|
||||
|
||||
void put(final DBAccessor accessor, final byte[] key, final byte[] value);
|
||||
|
||||
void prepareBatch(final List<KeyValue<Bytes, byte[]>> entries,
|
||||
final WriteBatchInterface batch) throws RocksDBException;
|
||||
|
||||
byte[] get(final byte[] key) throws RocksDBException;
|
||||
byte[] get(final DBAccessor accessor, final byte[] key) throws RocksDBException;
|
||||
|
||||
byte[] get(final byte[] key, ReadOptions readOptions) throws RocksDBException;
|
||||
byte[] get(final DBAccessor accessor, final byte[] key, final ReadOptions readOptions) throws RocksDBException;
|
||||
|
||||
/**
|
||||
* In contrast to get(), we don't migrate the key to new CF.
|
||||
* <p>
|
||||
* Use for get() within delete() -- no need to migrate, as it's deleted anyway
|
||||
*/
|
||||
byte[] getOnly(final byte[] key) throws RocksDBException;
|
||||
byte[] getOnly(final DBAccessor accessor, final byte[] key) throws RocksDBException;
|
||||
|
||||
ManagedKeyValueIterator<Bytes, byte[]> range(final Bytes from,
|
||||
ManagedKeyValueIterator<Bytes, byte[]> range(final DBAccessor accessor,
|
||||
final Bytes from,
|
||||
final Bytes to,
|
||||
final boolean forward);
|
||||
|
||||
/**
|
||||
* Deletes keys entries in the range ['from', 'to'], including 'from' and excluding 'to'.
|
||||
*/
|
||||
void deleteRange(final byte[] from,
|
||||
final byte[] to);
|
||||
void deleteRange(final DBAccessor accessor, final byte[] from, final byte[] to);
|
||||
|
||||
ManagedKeyValueIterator<Bytes, byte[]> all(final boolean forward);
|
||||
ManagedKeyValueIterator<Bytes, byte[]> all(final DBAccessor accessor, final boolean forward);
|
||||
|
||||
ManagedKeyValueIterator<Bytes, byte[]> prefixScan(final Bytes prefix);
|
||||
ManagedKeyValueIterator<Bytes, byte[]> prefixScan(final DBAccessor accessor, final Bytes prefix);
|
||||
|
||||
long approximateNumEntries() throws RocksDBException;
|
||||
long approximateNumEntries(final DBAccessor accessor) throws RocksDBException;
|
||||
|
||||
void flush() throws RocksDBException;
|
||||
void flush(final DBAccessor accessor) throws RocksDBException;
|
||||
|
||||
void addToBatch(final byte[] key,
|
||||
final byte[] value,
|
||||
|
@ -768,7 +852,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
|
|||
void close();
|
||||
}
|
||||
|
||||
class SingleColumnFamilyAccessor implements RocksDBAccessor {
|
||||
class SingleColumnFamilyAccessor implements ColumnFamilyAccessor {
|
||||
private final ColumnFamilyHandle columnFamily;
|
||||
|
||||
SingleColumnFamilyAccessor(final ColumnFamilyHandle columnFamily) {
|
||||
|
@ -776,18 +860,19 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
|
|||
}
|
||||
|
||||
@Override
|
||||
public void put(final byte[] key,
|
||||
public void put(final DBAccessor accessor,
|
||||
final byte[] key,
|
||||
final byte[] value) {
|
||||
if (value == null) {
|
||||
try {
|
||||
db.delete(columnFamily, wOptions, key);
|
||||
accessor.delete(columnFamily, key);
|
||||
} catch (final RocksDBException e) {
|
||||
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
|
||||
throw new ProcessorStateException("Error while removing key from store " + name, e);
|
||||
}
|
||||
} else {
|
||||
try {
|
||||
db.put(columnFamily, wOptions, key, value);
|
||||
accessor.put(columnFamily, key, value);
|
||||
} catch (final RocksDBException e) {
|
||||
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
|
||||
throw new ProcessorStateException("Error while putting key/value into store " + name, e);
|
||||
|
@ -805,27 +890,28 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
|
|||
}
|
||||
|
||||
@Override
|
||||
public byte[] get(final byte[] key) throws RocksDBException {
|
||||
return db.get(columnFamily, key);
|
||||
public byte[] get(final DBAccessor accessor, final byte[] key) throws RocksDBException {
|
||||
return accessor.get(columnFamily, key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] get(final byte[] key, final ReadOptions readOptions) throws RocksDBException {
|
||||
return db.get(columnFamily, readOptions, key);
|
||||
public byte[] get(final DBAccessor accessor, final byte[] key, final ReadOptions readOptions) throws RocksDBException {
|
||||
return accessor.get(columnFamily, readOptions, key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getOnly(final byte[] key) throws RocksDBException {
|
||||
return db.get(columnFamily, key);
|
||||
public byte[] getOnly(final DBAccessor accessor, final byte[] key) throws RocksDBException {
|
||||
return get(accessor, key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ManagedKeyValueIterator<Bytes, byte[]> range(final Bytes from,
|
||||
public ManagedKeyValueIterator<Bytes, byte[]> range(final DBAccessor accessor,
|
||||
final Bytes from,
|
||||
final Bytes to,
|
||||
final boolean forward) {
|
||||
return new RocksDBRangeIterator(
|
||||
name,
|
||||
db.newIterator(columnFamily),
|
||||
accessor.newIterator(columnFamily),
|
||||
from,
|
||||
to,
|
||||
forward,
|
||||
|
@ -834,9 +920,9 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
|
|||
}
|
||||
|
||||
@Override
|
||||
public void deleteRange(final byte[] from, final byte[] to) {
|
||||
public void deleteRange(final DBAccessor accessor, final byte[] from, final byte[] to) {
|
||||
try {
|
||||
db.deleteRange(columnFamily, wOptions, from, to);
|
||||
accessor.deleteRange(columnFamily, from, to);
|
||||
} catch (final RocksDBException e) {
|
||||
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
|
||||
throw new ProcessorStateException("Error while removing key from store " + name, e);
|
||||
|
@ -844,8 +930,8 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
|
|||
}
|
||||
|
||||
@Override
|
||||
public ManagedKeyValueIterator<Bytes, byte[]> all(final boolean forward) {
|
||||
final RocksIterator innerIterWithTimestamp = db.newIterator(columnFamily);
|
||||
public ManagedKeyValueIterator<Bytes, byte[]> all(final DBAccessor accessor, final boolean forward) {
|
||||
final RocksIterator innerIterWithTimestamp = accessor.newIterator(columnFamily);
|
||||
if (forward) {
|
||||
innerIterWithTimestamp.seekToFirst();
|
||||
} else {
|
||||
|
@ -855,11 +941,11 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
|
|||
}
|
||||
|
||||
@Override
|
||||
public ManagedKeyValueIterator<Bytes, byte[]> prefixScan(final Bytes prefix) {
|
||||
public ManagedKeyValueIterator<Bytes, byte[]> prefixScan(final DBAccessor accessor, final Bytes prefix) {
|
||||
final Bytes to = incrementWithoutOverflow(prefix);
|
||||
return new RocksDBRangeIterator(
|
||||
name,
|
||||
db.newIterator(columnFamily),
|
||||
accessor.newIterator(columnFamily),
|
||||
prefix,
|
||||
to,
|
||||
true,
|
||||
|
@ -868,13 +954,13 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
|
|||
}
|
||||
|
||||
@Override
|
||||
public long approximateNumEntries() throws RocksDBException {
|
||||
return db.getLongProperty(columnFamily, "rocksdb.estimate-num-keys");
|
||||
public long approximateNumEntries(final DBAccessor accessor) throws RocksDBException {
|
||||
return accessor.approximateNumEntries(columnFamily);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flush() throws RocksDBException {
|
||||
db.flush(fOptions, columnFamily);
|
||||
public void flush(final DBAccessor accessor) throws RocksDBException {
|
||||
accessor.flush(columnFamily);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -903,7 +989,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
|
|||
position
|
||||
);
|
||||
// If version headers are not present or version is V0
|
||||
dbAccessor.addToBatch(record.key(), record.value(), batch);
|
||||
cfAccessor.addToBatch(record.key(), record.value(), batch);
|
||||
}
|
||||
write(batch);
|
||||
} catch (final RocksDBException e) {
|
||||
|
|
|
@ -78,16 +78,16 @@ public class RocksDBTimestampedStore extends RocksDBStore implements Timestamped
|
|||
noTimestampsIter.seekToFirst();
|
||||
if (noTimestampsIter.isValid()) {
|
||||
log.info("Opening store {} in upgrade mode", name);
|
||||
dbAccessor = new DualColumnFamilyAccessor(noTimestampColumnFamily, withTimestampColumnFamily);
|
||||
cfAccessor = new DualColumnFamilyAccessor(noTimestampColumnFamily, withTimestampColumnFamily);
|
||||
} else {
|
||||
log.info("Opening store {} in regular mode", name);
|
||||
dbAccessor = new SingleColumnFamilyAccessor(withTimestampColumnFamily);
|
||||
cfAccessor = new SingleColumnFamilyAccessor(withTimestampColumnFamily);
|
||||
noTimestampColumnFamily.close();
|
||||
}
|
||||
noTimestampsIter.close();
|
||||
}
|
||||
|
||||
private class DualColumnFamilyAccessor implements RocksDBAccessor {
|
||||
private class DualColumnFamilyAccessor implements ColumnFamilyAccessor {
|
||||
private final ColumnFamilyHandle oldColumnFamily;
|
||||
private final ColumnFamilyHandle newColumnFamily;
|
||||
|
||||
|
@ -98,30 +98,31 @@ public class RocksDBTimestampedStore extends RocksDBStore implements Timestamped
|
|||
}
|
||||
|
||||
@Override
|
||||
public void put(final byte[] key,
|
||||
public void put(final DBAccessor accessor,
|
||||
final byte[] key,
|
||||
final byte[] valueWithTimestamp) {
|
||||
if (valueWithTimestamp == null) {
|
||||
try {
|
||||
db.delete(oldColumnFamily, wOptions, key);
|
||||
accessor.delete(oldColumnFamily, key);
|
||||
} catch (final RocksDBException e) {
|
||||
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
|
||||
throw new ProcessorStateException("Error while removing key from store " + name, e);
|
||||
}
|
||||
try {
|
||||
db.delete(newColumnFamily, wOptions, key);
|
||||
accessor.delete(newColumnFamily, key);
|
||||
} catch (final RocksDBException e) {
|
||||
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
|
||||
throw new ProcessorStateException("Error while removing key from store " + name, e);
|
||||
}
|
||||
} else {
|
||||
try {
|
||||
db.delete(oldColumnFamily, wOptions, key);
|
||||
accessor.delete(oldColumnFamily, key);
|
||||
} catch (final RocksDBException e) {
|
||||
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
|
||||
throw new ProcessorStateException("Error while removing key from store " + name, e);
|
||||
}
|
||||
try {
|
||||
db.put(newColumnFamily, wOptions, key, valueWithTimestamp);
|
||||
accessor.put(newColumnFamily, key, valueWithTimestamp);
|
||||
StoreQueryUtils.updatePosition(position, context);
|
||||
} catch (final RocksDBException e) {
|
||||
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
|
||||
|
@ -140,28 +141,28 @@ public class RocksDBTimestampedStore extends RocksDBStore implements Timestamped
|
|||
}
|
||||
|
||||
@Override
|
||||
public byte[] get(final byte[] key) throws RocksDBException {
|
||||
return get(key, Optional.empty());
|
||||
public byte[] get(final DBAccessor accessor, final byte[] key) throws RocksDBException {
|
||||
return get(accessor, key, Optional.empty());
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] get(final byte[] key, final ReadOptions readOptions) throws RocksDBException {
|
||||
return get(key, Optional.of(readOptions));
|
||||
public byte[] get(final DBAccessor accessor, final byte[] key, final ReadOptions readOptions) throws RocksDBException {
|
||||
return get(accessor, key, Optional.of(readOptions));
|
||||
}
|
||||
|
||||
private byte[] get(final byte[] key, final Optional<ReadOptions> readOptions) throws RocksDBException {
|
||||
final byte[] valueWithTimestamp = readOptions.isPresent() ? db.get(newColumnFamily, readOptions.get(), key) : db.get(newColumnFamily, key);
|
||||
private byte[] get(final DBAccessor accessor, final byte[] key, final Optional<ReadOptions> readOptions) throws RocksDBException {
|
||||
final byte[] valueWithTimestamp = readOptions.isPresent() ? accessor.get(newColumnFamily, readOptions.get(), key) : accessor.get(newColumnFamily, key);
|
||||
if (valueWithTimestamp != null) {
|
||||
return valueWithTimestamp;
|
||||
}
|
||||
|
||||
final byte[] plainValue = readOptions.isPresent() ? db.get(oldColumnFamily, readOptions.get(), key) : db.get(oldColumnFamily, key);
|
||||
final byte[] plainValue = readOptions.isPresent() ? accessor.get(oldColumnFamily, readOptions.get(), key) : accessor.get(oldColumnFamily, key);
|
||||
if (plainValue != null) {
|
||||
final byte[] valueWithUnknownTimestamp = convertToTimestampedFormat(plainValue);
|
||||
// this does only work, because the changelog topic contains correct data already
|
||||
// for other format changes, we cannot take this short cut and can only migrate data
|
||||
// from old to new store on put()
|
||||
put(key, valueWithUnknownTimestamp);
|
||||
put(accessor, key, valueWithUnknownTimestamp);
|
||||
return valueWithUnknownTimestamp;
|
||||
}
|
||||
|
||||
|
@ -169,13 +170,13 @@ public class RocksDBTimestampedStore extends RocksDBStore implements Timestamped
|
|||
}
|
||||
|
||||
@Override
|
||||
public byte[] getOnly(final byte[] key) throws RocksDBException {
|
||||
final byte[] valueWithTimestamp = db.get(newColumnFamily, key);
|
||||
public byte[] getOnly(final DBAccessor accessor, final byte[] key) throws RocksDBException {
|
||||
final byte[] valueWithTimestamp = accessor.get(newColumnFamily, key);
|
||||
if (valueWithTimestamp != null) {
|
||||
return valueWithTimestamp;
|
||||
}
|
||||
|
||||
final byte[] plainValue = db.get(oldColumnFamily, key);
|
||||
final byte[] plainValue = accessor.get(oldColumnFamily, key);
|
||||
if (plainValue != null) {
|
||||
return convertToTimestampedFormat(plainValue);
|
||||
}
|
||||
|
@ -184,13 +185,14 @@ public class RocksDBTimestampedStore extends RocksDBStore implements Timestamped
|
|||
}
|
||||
|
||||
@Override
|
||||
public ManagedKeyValueIterator<Bytes, byte[]> range(final Bytes from,
|
||||
public ManagedKeyValueIterator<Bytes, byte[]> range(final DBAccessor accessor,
|
||||
final Bytes from,
|
||||
final Bytes to,
|
||||
final boolean forward) {
|
||||
return new RocksDBDualCFRangeIterator(
|
||||
name,
|
||||
db.newIterator(newColumnFamily),
|
||||
db.newIterator(oldColumnFamily),
|
||||
accessor.newIterator(newColumnFamily),
|
||||
accessor.newIterator(oldColumnFamily),
|
||||
from,
|
||||
to,
|
||||
forward,
|
||||
|
@ -198,15 +200,15 @@ public class RocksDBTimestampedStore extends RocksDBStore implements Timestamped
|
|||
}
|
||||
|
||||
@Override
|
||||
public void deleteRange(final byte[] from, final byte[] to) {
|
||||
public void deleteRange(final DBAccessor accessor, final byte[] from, final byte[] to) {
|
||||
try {
|
||||
db.deleteRange(oldColumnFamily, wOptions, from, to);
|
||||
accessor.deleteRange(oldColumnFamily, from, to);
|
||||
} catch (final RocksDBException e) {
|
||||
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
|
||||
throw new ProcessorStateException("Error while removing key from store " + name, e);
|
||||
}
|
||||
try {
|
||||
db.deleteRange(newColumnFamily, wOptions, from, to);
|
||||
accessor.deleteRange(newColumnFamily, from, to);
|
||||
} catch (final RocksDBException e) {
|
||||
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
|
||||
throw new ProcessorStateException("Error while removing key from store " + name, e);
|
||||
|
@ -214,9 +216,9 @@ public class RocksDBTimestampedStore extends RocksDBStore implements Timestamped
|
|||
}
|
||||
|
||||
@Override
|
||||
public ManagedKeyValueIterator<Bytes, byte[]> all(final boolean forward) {
|
||||
final RocksIterator innerIterWithTimestamp = db.newIterator(newColumnFamily);
|
||||
final RocksIterator innerIterNoTimestamp = db.newIterator(oldColumnFamily);
|
||||
public ManagedKeyValueIterator<Bytes, byte[]> all(final DBAccessor accessor, final boolean forward) {
|
||||
final RocksIterator innerIterWithTimestamp = accessor.newIterator(newColumnFamily);
|
||||
final RocksIterator innerIterNoTimestamp = accessor.newIterator(oldColumnFamily);
|
||||
if (forward) {
|
||||
innerIterWithTimestamp.seekToFirst();
|
||||
innerIterNoTimestamp.seekToFirst();
|
||||
|
@ -228,12 +230,12 @@ public class RocksDBTimestampedStore extends RocksDBStore implements Timestamped
|
|||
}
|
||||
|
||||
@Override
|
||||
public ManagedKeyValueIterator<Bytes, byte[]> prefixScan(final Bytes prefix) {
|
||||
public ManagedKeyValueIterator<Bytes, byte[]> prefixScan(final DBAccessor accessor, final Bytes prefix) {
|
||||
final Bytes to = incrementWithoutOverflow(prefix);
|
||||
return new RocksDBDualCFRangeIterator(
|
||||
name,
|
||||
db.newIterator(newColumnFamily),
|
||||
db.newIterator(oldColumnFamily),
|
||||
accessor.newIterator(newColumnFamily),
|
||||
accessor.newIterator(oldColumnFamily),
|
||||
prefix,
|
||||
to,
|
||||
true,
|
||||
|
@ -242,15 +244,14 @@ public class RocksDBTimestampedStore extends RocksDBStore implements Timestamped
|
|||
}
|
||||
|
||||
@Override
|
||||
public long approximateNumEntries() throws RocksDBException {
|
||||
return db.getLongProperty(oldColumnFamily, "rocksdb.estimate-num-keys")
|
||||
+ db.getLongProperty(newColumnFamily, "rocksdb.estimate-num-keys");
|
||||
public long approximateNumEntries(final DBAccessor accessor) throws RocksDBException {
|
||||
return accessor.approximateNumEntries(oldColumnFamily) +
|
||||
accessor.approximateNumEntries(newColumnFamily);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flush() throws RocksDBException {
|
||||
db.flush(fOptions, oldColumnFamily);
|
||||
db.flush(fOptions, newColumnFamily);
|
||||
public void flush(final DBAccessor accessor) throws RocksDBException {
|
||||
accessor.flush(oldColumnFamily, newColumnFamily);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue