mirror of https://github.com/apache/kafka.git
KAFKA-14412: Decouple RocksDB access from CF (#15105)
To support future use-cases that use different strategies for accessing RocksDB, we need to de-couple the RocksDB access strategy from the Column Family access strategy. To do this, we now have two separate accessors: * `DBAccessor`: dictates how we access RocksDB. Currently only one strategy is supported: `DirectDBAccessor`, which access RocksDB directly, via the `RocksDB` class for all operations. In the future, a `BatchedDBAccessor` will be added, which enables transactions via `WriteBatch`. * `ColumnFamilyAccessor`: maps StateStore operations to operations on one or more column families. This is a rename of the old `RocksDBDBAccessor`. Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
This commit is contained in:
parent
c078e51c8f
commit
5bc3aa4280
|
@ -131,7 +131,7 @@
|
||||||
|
|
||||||
<module name="ClassFanOutComplexity">
|
<module name="ClassFanOutComplexity">
|
||||||
<!-- default is 20 -->
|
<!-- default is 20 -->
|
||||||
<property name="max" value="50"/>
|
<property name="max" value="52"/>
|
||||||
</module>
|
</module>
|
||||||
<module name="CyclomaticComplexity">
|
<module name="CyclomaticComplexity">
|
||||||
<!-- default is 10-->
|
<!-- default is 10-->
|
||||||
|
|
|
@ -108,7 +108,8 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
|
||||||
// VisibleForTesting
|
// VisibleForTesting
|
||||||
protected File dbDir;
|
protected File dbDir;
|
||||||
RocksDB db;
|
RocksDB db;
|
||||||
RocksDBAccessor dbAccessor;
|
DBAccessor dbAccessor;
|
||||||
|
ColumnFamilyAccessor cfAccessor;
|
||||||
|
|
||||||
// the following option objects will be created in openDB and closed in the close() method
|
// the following option objects will be created in openDB and closed in the close() method
|
||||||
private RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter userSpecifiedOptions;
|
private RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter userSpecifiedOptions;
|
||||||
|
@ -251,6 +252,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
|
||||||
// with the measurements from Rocks DB
|
// with the measurements from Rocks DB
|
||||||
setupStatistics(configs, dbOptions);
|
setupStatistics(configs, dbOptions);
|
||||||
openRocksDB(dbOptions, columnFamilyOptions);
|
openRocksDB(dbOptions, columnFamilyOptions);
|
||||||
|
dbAccessor = new DirectDBAccessor(db, fOptions);
|
||||||
open = true;
|
open = true;
|
||||||
|
|
||||||
addValueProvidersToMetricsRecorder();
|
addValueProvidersToMetricsRecorder();
|
||||||
|
@ -291,7 +293,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
|
||||||
new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnFamilyOptions)
|
new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnFamilyOptions)
|
||||||
);
|
);
|
||||||
|
|
||||||
dbAccessor = new SingleColumnFamilyAccessor(columnFamilies.get(0));
|
cfAccessor = new SingleColumnFamilyAccessor(columnFamilies.get(0));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -390,7 +392,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
|
||||||
final byte[] value) {
|
final byte[] value) {
|
||||||
Objects.requireNonNull(key, "key cannot be null");
|
Objects.requireNonNull(key, "key cannot be null");
|
||||||
validateStoreOpen();
|
validateStoreOpen();
|
||||||
dbAccessor.put(key.get(), value);
|
cfAccessor.put(dbAccessor, key.get(), value);
|
||||||
|
|
||||||
StoreQueryUtils.updatePosition(position, context);
|
StoreQueryUtils.updatePosition(position, context);
|
||||||
}
|
}
|
||||||
|
@ -409,7 +411,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
|
||||||
@Override
|
@Override
|
||||||
public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
|
public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
|
||||||
try (final WriteBatch batch = new WriteBatch()) {
|
try (final WriteBatch batch = new WriteBatch()) {
|
||||||
dbAccessor.prepareBatch(entries, batch);
|
cfAccessor.prepareBatch(entries, batch);
|
||||||
write(batch);
|
write(batch);
|
||||||
StoreQueryUtils.updatePosition(position, context);
|
StoreQueryUtils.updatePosition(position, context);
|
||||||
} catch (final RocksDBException e) {
|
} catch (final RocksDBException e) {
|
||||||
|
@ -459,7 +461,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
|
||||||
Objects.requireNonNull(prefixKeySerializer, "prefixKeySerializer cannot be null");
|
Objects.requireNonNull(prefixKeySerializer, "prefixKeySerializer cannot be null");
|
||||||
final Bytes prefixBytes = Bytes.wrap(prefixKeySerializer.serialize(null, prefix));
|
final Bytes prefixBytes = Bytes.wrap(prefixKeySerializer.serialize(null, prefix));
|
||||||
|
|
||||||
final ManagedKeyValueIterator<Bytes, byte[]> rocksDbPrefixSeekIterator = dbAccessor.prefixScan(prefixBytes);
|
final ManagedKeyValueIterator<Bytes, byte[]> rocksDbPrefixSeekIterator = cfAccessor.prefixScan(dbAccessor, prefixBytes);
|
||||||
openIterators.add(rocksDbPrefixSeekIterator);
|
openIterators.add(rocksDbPrefixSeekIterator);
|
||||||
rocksDbPrefixSeekIterator.onClose(() -> openIterators.remove(rocksDbPrefixSeekIterator));
|
rocksDbPrefixSeekIterator.onClose(() -> openIterators.remove(rocksDbPrefixSeekIterator));
|
||||||
|
|
||||||
|
@ -478,7 +480,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
|
||||||
private synchronized byte[] get(final Bytes key, final Optional<ReadOptions> readOptions) {
|
private synchronized byte[] get(final Bytes key, final Optional<ReadOptions> readOptions) {
|
||||||
validateStoreOpen();
|
validateStoreOpen();
|
||||||
try {
|
try {
|
||||||
return readOptions.isPresent() ? dbAccessor.get(key.get(), readOptions.get()) : dbAccessor.get(key.get());
|
return readOptions.isPresent() ? cfAccessor.get(dbAccessor, key.get(), readOptions.get()) : cfAccessor.get(dbAccessor, key.get());
|
||||||
} catch (final RocksDBException e) {
|
} catch (final RocksDBException e) {
|
||||||
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
|
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
|
||||||
throw new ProcessorStateException("Error while getting value for key from store " + name, e);
|
throw new ProcessorStateException("Error while getting value for key from store " + name, e);
|
||||||
|
@ -490,7 +492,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
|
||||||
Objects.requireNonNull(key, "key cannot be null");
|
Objects.requireNonNull(key, "key cannot be null");
|
||||||
final byte[] oldValue;
|
final byte[] oldValue;
|
||||||
try {
|
try {
|
||||||
oldValue = dbAccessor.getOnly(key.get());
|
oldValue = cfAccessor.getOnly(dbAccessor, key.get());
|
||||||
} catch (final RocksDBException e) {
|
} catch (final RocksDBException e) {
|
||||||
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
|
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
|
||||||
throw new ProcessorStateException("Error while getting value for key from store " + name, e);
|
throw new ProcessorStateException("Error while getting value for key from store " + name, e);
|
||||||
|
@ -509,7 +511,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
|
||||||
// RocksDB's deleteRange() does not support a null upper bound so in the event
|
// RocksDB's deleteRange() does not support a null upper bound so in the event
|
||||||
// of overflow from increment(), the operation cannot be performed and an
|
// of overflow from increment(), the operation cannot be performed and an
|
||||||
// IndexOutOfBoundsException will be thrown.
|
// IndexOutOfBoundsException will be thrown.
|
||||||
dbAccessor.deleteRange(keyFrom.get(), Bytes.increment(keyTo).get());
|
cfAccessor.deleteRange(dbAccessor, keyFrom.get(), Bytes.increment(keyTo).get());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -562,7 +564,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
|
||||||
|
|
||||||
validateStoreOpen();
|
validateStoreOpen();
|
||||||
|
|
||||||
final ManagedKeyValueIterator<Bytes, byte[]> rocksDBRangeIterator = dbAccessor.range(from, to, forward);
|
final ManagedKeyValueIterator<Bytes, byte[]> rocksDBRangeIterator = cfAccessor.range(dbAccessor, from, to, forward);
|
||||||
openIterators.add(rocksDBRangeIterator);
|
openIterators.add(rocksDBRangeIterator);
|
||||||
rocksDBRangeIterator.onClose(() -> openIterators.remove(rocksDBRangeIterator));
|
rocksDBRangeIterator.onClose(() -> openIterators.remove(rocksDBRangeIterator));
|
||||||
|
|
||||||
|
@ -602,7 +604,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
|
||||||
private KeyValueIterator<Bytes, byte[]> all(final boolean forward,
|
private KeyValueIterator<Bytes, byte[]> all(final boolean forward,
|
||||||
final Set<KeyValueIterator<Bytes, byte[]>> openIterators) {
|
final Set<KeyValueIterator<Bytes, byte[]>> openIterators) {
|
||||||
validateStoreOpen();
|
validateStoreOpen();
|
||||||
final ManagedKeyValueIterator<Bytes, byte[]> rocksDbIterator = dbAccessor.all(forward);
|
final ManagedKeyValueIterator<Bytes, byte[]> rocksDbIterator = cfAccessor.all(dbAccessor, forward);
|
||||||
openIterators.add(rocksDbIterator);
|
openIterators.add(rocksDbIterator);
|
||||||
rocksDbIterator.onClose(() -> openIterators.remove(rocksDbIterator));
|
rocksDbIterator.onClose(() -> openIterators.remove(rocksDbIterator));
|
||||||
return rocksDbIterator;
|
return rocksDbIterator;
|
||||||
|
@ -624,7 +626,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
|
||||||
validateStoreOpen();
|
validateStoreOpen();
|
||||||
final long numEntries;
|
final long numEntries;
|
||||||
try {
|
try {
|
||||||
numEntries = dbAccessor.approximateNumEntries();
|
numEntries = cfAccessor.approximateNumEntries(dbAccessor);
|
||||||
} catch (final RocksDBException e) {
|
} catch (final RocksDBException e) {
|
||||||
throw new ProcessorStateException("Error fetching property from store " + name, e);
|
throw new ProcessorStateException("Error fetching property from store " + name, e);
|
||||||
}
|
}
|
||||||
|
@ -646,7 +648,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
dbAccessor.flush();
|
cfAccessor.flush(dbAccessor);
|
||||||
} catch (final RocksDBException e) {
|
} catch (final RocksDBException e) {
|
||||||
throw new ProcessorStateException("Error while executing flush from store " + name, e);
|
throw new ProcessorStateException("Error while executing flush from store " + name, e);
|
||||||
}
|
}
|
||||||
|
@ -655,7 +657,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
|
||||||
@Override
|
@Override
|
||||||
public void addToBatch(final KeyValue<byte[], byte[]> record,
|
public void addToBatch(final KeyValue<byte[], byte[]> record,
|
||||||
final WriteBatchInterface batch) throws RocksDBException {
|
final WriteBatchInterface batch) throws RocksDBException {
|
||||||
dbAccessor.addToBatch(record.key, record.value, batch);
|
cfAccessor.addToBatch(record.key, record.value, batch);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -690,6 +692,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
|
||||||
|
|
||||||
// Important: do not rearrange the order in which the below objects are closed!
|
// Important: do not rearrange the order in which the below objects are closed!
|
||||||
// Order of closing must follow: ColumnFamilyHandle > RocksDB > DBOptions > ColumnFamilyOptions
|
// Order of closing must follow: ColumnFamilyHandle > RocksDB > DBOptions > ColumnFamilyOptions
|
||||||
|
cfAccessor.close();
|
||||||
dbAccessor.close();
|
dbAccessor.close();
|
||||||
db.close();
|
db.close();
|
||||||
userSpecifiedOptions.close();
|
userSpecifiedOptions.close();
|
||||||
|
@ -701,6 +704,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
|
||||||
statistics.close();
|
statistics.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
cfAccessor = null;
|
||||||
dbAccessor = null;
|
dbAccessor = null;
|
||||||
userSpecifiedOptions = null;
|
userSpecifiedOptions = null;
|
||||||
wOptions = null;
|
wOptions = null;
|
||||||
|
@ -724,42 +728,122 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
interface RocksDBAccessor {
|
interface DBAccessor {
|
||||||
|
byte[] get(final ColumnFamilyHandle columnFamily, final byte[] key) throws RocksDBException;
|
||||||
|
byte[] get(final ColumnFamilyHandle columnFamily, final ReadOptions readOptions, final byte[] key) throws RocksDBException;
|
||||||
|
RocksIterator newIterator(final ColumnFamilyHandle columnFamily);
|
||||||
|
void put(final ColumnFamilyHandle columnFamily, final byte[] key, final byte[] value) throws RocksDBException;
|
||||||
|
void delete(final ColumnFamilyHandle columnFamily, final byte[] key) throws RocksDBException;
|
||||||
|
void deleteRange(final ColumnFamilyHandle columnFamily, final byte[] from, final byte[] to) throws RocksDBException;
|
||||||
|
long approximateNumEntries(final ColumnFamilyHandle columnFamily) throws RocksDBException;
|
||||||
|
void flush(final ColumnFamilyHandle... columnFamilies) throws RocksDBException;
|
||||||
|
void reset();
|
||||||
|
void close();
|
||||||
|
}
|
||||||
|
|
||||||
void put(final byte[] key,
|
static class DirectDBAccessor implements DBAccessor {
|
||||||
final byte[] value);
|
|
||||||
|
private final RocksDB db;
|
||||||
|
private final FlushOptions flushOptions;
|
||||||
|
|
||||||
|
DirectDBAccessor(final RocksDB db, final FlushOptions flushOptions) {
|
||||||
|
this.db = db;
|
||||||
|
this.flushOptions = flushOptions;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public byte[] get(final ColumnFamilyHandle columnFamily, final byte[] key) throws RocksDBException {
|
||||||
|
return db.get(columnFamily, key);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public byte[] get(final ColumnFamilyHandle columnFamily, final ReadOptions readOptions, final byte[] key) throws RocksDBException {
|
||||||
|
return db.get(columnFamily, readOptions, key);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RocksIterator newIterator(final ColumnFamilyHandle columnFamily) {
|
||||||
|
return db.newIterator(columnFamily);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void put(final ColumnFamilyHandle columnFamily, final byte[] key, final byte[] value) throws RocksDBException {
|
||||||
|
db.put(columnFamily, key, value);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void delete(final ColumnFamilyHandle columnFamily, final byte[] key) throws RocksDBException {
|
||||||
|
db.delete(columnFamily, key);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void deleteRange(final ColumnFamilyHandle columnFamily, final byte[] from, final byte[] to) throws RocksDBException {
|
||||||
|
db.deleteRange(columnFamily, from, to);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long approximateNumEntries(final ColumnFamilyHandle columnFamily) throws RocksDBException {
|
||||||
|
return db.getLongProperty(columnFamily, "rocksdb.estimate-num-keys");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void flush(final ColumnFamilyHandle... columnFamilies) throws RocksDBException {
|
||||||
|
if (columnFamilies.length == 0) {
|
||||||
|
db.flush(flushOptions);
|
||||||
|
} else if (columnFamilies.length == 1) {
|
||||||
|
db.flush(flushOptions, columnFamilies[0]);
|
||||||
|
} else {
|
||||||
|
db.flush(flushOptions, Arrays.asList(columnFamilies));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void reset() {
|
||||||
|
// no state to reset
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
// nothing to close
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
interface ColumnFamilyAccessor {
|
||||||
|
|
||||||
|
void put(final DBAccessor accessor, final byte[] key, final byte[] value);
|
||||||
|
|
||||||
void prepareBatch(final List<KeyValue<Bytes, byte[]>> entries,
|
void prepareBatch(final List<KeyValue<Bytes, byte[]>> entries,
|
||||||
final WriteBatchInterface batch) throws RocksDBException;
|
final WriteBatchInterface batch) throws RocksDBException;
|
||||||
|
|
||||||
byte[] get(final byte[] key) throws RocksDBException;
|
byte[] get(final DBAccessor accessor, final byte[] key) throws RocksDBException;
|
||||||
|
|
||||||
byte[] get(final byte[] key, ReadOptions readOptions) throws RocksDBException;
|
byte[] get(final DBAccessor accessor, final byte[] key, final ReadOptions readOptions) throws RocksDBException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* In contrast to get(), we don't migrate the key to new CF.
|
* In contrast to get(), we don't migrate the key to new CF.
|
||||||
* <p>
|
* <p>
|
||||||
* Use for get() within delete() -- no need to migrate, as it's deleted anyway
|
* Use for get() within delete() -- no need to migrate, as it's deleted anyway
|
||||||
*/
|
*/
|
||||||
byte[] getOnly(final byte[] key) throws RocksDBException;
|
byte[] getOnly(final DBAccessor accessor, final byte[] key) throws RocksDBException;
|
||||||
|
|
||||||
ManagedKeyValueIterator<Bytes, byte[]> range(final Bytes from,
|
ManagedKeyValueIterator<Bytes, byte[]> range(final DBAccessor accessor,
|
||||||
|
final Bytes from,
|
||||||
final Bytes to,
|
final Bytes to,
|
||||||
final boolean forward);
|
final boolean forward);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Deletes keys entries in the range ['from', 'to'], including 'from' and excluding 'to'.
|
* Deletes keys entries in the range ['from', 'to'], including 'from' and excluding 'to'.
|
||||||
*/
|
*/
|
||||||
void deleteRange(final byte[] from,
|
void deleteRange(final DBAccessor accessor, final byte[] from, final byte[] to);
|
||||||
final byte[] to);
|
|
||||||
|
|
||||||
ManagedKeyValueIterator<Bytes, byte[]> all(final boolean forward);
|
ManagedKeyValueIterator<Bytes, byte[]> all(final DBAccessor accessor, final boolean forward);
|
||||||
|
|
||||||
ManagedKeyValueIterator<Bytes, byte[]> prefixScan(final Bytes prefix);
|
ManagedKeyValueIterator<Bytes, byte[]> prefixScan(final DBAccessor accessor, final Bytes prefix);
|
||||||
|
|
||||||
long approximateNumEntries() throws RocksDBException;
|
long approximateNumEntries(final DBAccessor accessor) throws RocksDBException;
|
||||||
|
|
||||||
void flush() throws RocksDBException;
|
void flush(final DBAccessor accessor) throws RocksDBException;
|
||||||
|
|
||||||
void addToBatch(final byte[] key,
|
void addToBatch(final byte[] key,
|
||||||
final byte[] value,
|
final byte[] value,
|
||||||
|
@ -768,7 +852,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
|
||||||
void close();
|
void close();
|
||||||
}
|
}
|
||||||
|
|
||||||
class SingleColumnFamilyAccessor implements RocksDBAccessor {
|
class SingleColumnFamilyAccessor implements ColumnFamilyAccessor {
|
||||||
private final ColumnFamilyHandle columnFamily;
|
private final ColumnFamilyHandle columnFamily;
|
||||||
|
|
||||||
SingleColumnFamilyAccessor(final ColumnFamilyHandle columnFamily) {
|
SingleColumnFamilyAccessor(final ColumnFamilyHandle columnFamily) {
|
||||||
|
@ -776,18 +860,19 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void put(final byte[] key,
|
public void put(final DBAccessor accessor,
|
||||||
|
final byte[] key,
|
||||||
final byte[] value) {
|
final byte[] value) {
|
||||||
if (value == null) {
|
if (value == null) {
|
||||||
try {
|
try {
|
||||||
db.delete(columnFamily, wOptions, key);
|
accessor.delete(columnFamily, key);
|
||||||
} catch (final RocksDBException e) {
|
} catch (final RocksDBException e) {
|
||||||
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
|
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
|
||||||
throw new ProcessorStateException("Error while removing key from store " + name, e);
|
throw new ProcessorStateException("Error while removing key from store " + name, e);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
try {
|
try {
|
||||||
db.put(columnFamily, wOptions, key, value);
|
accessor.put(columnFamily, key, value);
|
||||||
} catch (final RocksDBException e) {
|
} catch (final RocksDBException e) {
|
||||||
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
|
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
|
||||||
throw new ProcessorStateException("Error while putting key/value into store " + name, e);
|
throw new ProcessorStateException("Error while putting key/value into store " + name, e);
|
||||||
|
@ -805,27 +890,28 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public byte[] get(final byte[] key) throws RocksDBException {
|
public byte[] get(final DBAccessor accessor, final byte[] key) throws RocksDBException {
|
||||||
return db.get(columnFamily, key);
|
return accessor.get(columnFamily, key);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public byte[] get(final byte[] key, final ReadOptions readOptions) throws RocksDBException {
|
public byte[] get(final DBAccessor accessor, final byte[] key, final ReadOptions readOptions) throws RocksDBException {
|
||||||
return db.get(columnFamily, readOptions, key);
|
return accessor.get(columnFamily, readOptions, key);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public byte[] getOnly(final byte[] key) throws RocksDBException {
|
public byte[] getOnly(final DBAccessor accessor, final byte[] key) throws RocksDBException {
|
||||||
return db.get(columnFamily, key);
|
return get(accessor, key);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ManagedKeyValueIterator<Bytes, byte[]> range(final Bytes from,
|
public ManagedKeyValueIterator<Bytes, byte[]> range(final DBAccessor accessor,
|
||||||
|
final Bytes from,
|
||||||
final Bytes to,
|
final Bytes to,
|
||||||
final boolean forward) {
|
final boolean forward) {
|
||||||
return new RocksDBRangeIterator(
|
return new RocksDBRangeIterator(
|
||||||
name,
|
name,
|
||||||
db.newIterator(columnFamily),
|
accessor.newIterator(columnFamily),
|
||||||
from,
|
from,
|
||||||
to,
|
to,
|
||||||
forward,
|
forward,
|
||||||
|
@ -834,9 +920,9 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void deleteRange(final byte[] from, final byte[] to) {
|
public void deleteRange(final DBAccessor accessor, final byte[] from, final byte[] to) {
|
||||||
try {
|
try {
|
||||||
db.deleteRange(columnFamily, wOptions, from, to);
|
accessor.deleteRange(columnFamily, from, to);
|
||||||
} catch (final RocksDBException e) {
|
} catch (final RocksDBException e) {
|
||||||
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
|
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
|
||||||
throw new ProcessorStateException("Error while removing key from store " + name, e);
|
throw new ProcessorStateException("Error while removing key from store " + name, e);
|
||||||
|
@ -844,8 +930,8 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ManagedKeyValueIterator<Bytes, byte[]> all(final boolean forward) {
|
public ManagedKeyValueIterator<Bytes, byte[]> all(final DBAccessor accessor, final boolean forward) {
|
||||||
final RocksIterator innerIterWithTimestamp = db.newIterator(columnFamily);
|
final RocksIterator innerIterWithTimestamp = accessor.newIterator(columnFamily);
|
||||||
if (forward) {
|
if (forward) {
|
||||||
innerIterWithTimestamp.seekToFirst();
|
innerIterWithTimestamp.seekToFirst();
|
||||||
} else {
|
} else {
|
||||||
|
@ -855,11 +941,11 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ManagedKeyValueIterator<Bytes, byte[]> prefixScan(final Bytes prefix) {
|
public ManagedKeyValueIterator<Bytes, byte[]> prefixScan(final DBAccessor accessor, final Bytes prefix) {
|
||||||
final Bytes to = incrementWithoutOverflow(prefix);
|
final Bytes to = incrementWithoutOverflow(prefix);
|
||||||
return new RocksDBRangeIterator(
|
return new RocksDBRangeIterator(
|
||||||
name,
|
name,
|
||||||
db.newIterator(columnFamily),
|
accessor.newIterator(columnFamily),
|
||||||
prefix,
|
prefix,
|
||||||
to,
|
to,
|
||||||
true,
|
true,
|
||||||
|
@ -868,13 +954,13 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long approximateNumEntries() throws RocksDBException {
|
public long approximateNumEntries(final DBAccessor accessor) throws RocksDBException {
|
||||||
return db.getLongProperty(columnFamily, "rocksdb.estimate-num-keys");
|
return accessor.approximateNumEntries(columnFamily);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void flush() throws RocksDBException {
|
public void flush(final DBAccessor accessor) throws RocksDBException {
|
||||||
db.flush(fOptions, columnFamily);
|
accessor.flush(columnFamily);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -903,7 +989,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
|
||||||
position
|
position
|
||||||
);
|
);
|
||||||
// If version headers are not present or version is V0
|
// If version headers are not present or version is V0
|
||||||
dbAccessor.addToBatch(record.key(), record.value(), batch);
|
cfAccessor.addToBatch(record.key(), record.value(), batch);
|
||||||
}
|
}
|
||||||
write(batch);
|
write(batch);
|
||||||
} catch (final RocksDBException e) {
|
} catch (final RocksDBException e) {
|
||||||
|
|
|
@ -78,16 +78,16 @@ public class RocksDBTimestampedStore extends RocksDBStore implements Timestamped
|
||||||
noTimestampsIter.seekToFirst();
|
noTimestampsIter.seekToFirst();
|
||||||
if (noTimestampsIter.isValid()) {
|
if (noTimestampsIter.isValid()) {
|
||||||
log.info("Opening store {} in upgrade mode", name);
|
log.info("Opening store {} in upgrade mode", name);
|
||||||
dbAccessor = new DualColumnFamilyAccessor(noTimestampColumnFamily, withTimestampColumnFamily);
|
cfAccessor = new DualColumnFamilyAccessor(noTimestampColumnFamily, withTimestampColumnFamily);
|
||||||
} else {
|
} else {
|
||||||
log.info("Opening store {} in regular mode", name);
|
log.info("Opening store {} in regular mode", name);
|
||||||
dbAccessor = new SingleColumnFamilyAccessor(withTimestampColumnFamily);
|
cfAccessor = new SingleColumnFamilyAccessor(withTimestampColumnFamily);
|
||||||
noTimestampColumnFamily.close();
|
noTimestampColumnFamily.close();
|
||||||
}
|
}
|
||||||
noTimestampsIter.close();
|
noTimestampsIter.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
private class DualColumnFamilyAccessor implements RocksDBAccessor {
|
private class DualColumnFamilyAccessor implements ColumnFamilyAccessor {
|
||||||
private final ColumnFamilyHandle oldColumnFamily;
|
private final ColumnFamilyHandle oldColumnFamily;
|
||||||
private final ColumnFamilyHandle newColumnFamily;
|
private final ColumnFamilyHandle newColumnFamily;
|
||||||
|
|
||||||
|
@ -98,30 +98,31 @@ public class RocksDBTimestampedStore extends RocksDBStore implements Timestamped
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void put(final byte[] key,
|
public void put(final DBAccessor accessor,
|
||||||
|
final byte[] key,
|
||||||
final byte[] valueWithTimestamp) {
|
final byte[] valueWithTimestamp) {
|
||||||
if (valueWithTimestamp == null) {
|
if (valueWithTimestamp == null) {
|
||||||
try {
|
try {
|
||||||
db.delete(oldColumnFamily, wOptions, key);
|
accessor.delete(oldColumnFamily, key);
|
||||||
} catch (final RocksDBException e) {
|
} catch (final RocksDBException e) {
|
||||||
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
|
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
|
||||||
throw new ProcessorStateException("Error while removing key from store " + name, e);
|
throw new ProcessorStateException("Error while removing key from store " + name, e);
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
db.delete(newColumnFamily, wOptions, key);
|
accessor.delete(newColumnFamily, key);
|
||||||
} catch (final RocksDBException e) {
|
} catch (final RocksDBException e) {
|
||||||
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
|
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
|
||||||
throw new ProcessorStateException("Error while removing key from store " + name, e);
|
throw new ProcessorStateException("Error while removing key from store " + name, e);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
try {
|
try {
|
||||||
db.delete(oldColumnFamily, wOptions, key);
|
accessor.delete(oldColumnFamily, key);
|
||||||
} catch (final RocksDBException e) {
|
} catch (final RocksDBException e) {
|
||||||
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
|
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
|
||||||
throw new ProcessorStateException("Error while removing key from store " + name, e);
|
throw new ProcessorStateException("Error while removing key from store " + name, e);
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
db.put(newColumnFamily, wOptions, key, valueWithTimestamp);
|
accessor.put(newColumnFamily, key, valueWithTimestamp);
|
||||||
StoreQueryUtils.updatePosition(position, context);
|
StoreQueryUtils.updatePosition(position, context);
|
||||||
} catch (final RocksDBException e) {
|
} catch (final RocksDBException e) {
|
||||||
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
|
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
|
||||||
|
@ -140,28 +141,28 @@ public class RocksDBTimestampedStore extends RocksDBStore implements Timestamped
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public byte[] get(final byte[] key) throws RocksDBException {
|
public byte[] get(final DBAccessor accessor, final byte[] key) throws RocksDBException {
|
||||||
return get(key, Optional.empty());
|
return get(accessor, key, Optional.empty());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public byte[] get(final byte[] key, final ReadOptions readOptions) throws RocksDBException {
|
public byte[] get(final DBAccessor accessor, final byte[] key, final ReadOptions readOptions) throws RocksDBException {
|
||||||
return get(key, Optional.of(readOptions));
|
return get(accessor, key, Optional.of(readOptions));
|
||||||
}
|
}
|
||||||
|
|
||||||
private byte[] get(final byte[] key, final Optional<ReadOptions> readOptions) throws RocksDBException {
|
private byte[] get(final DBAccessor accessor, final byte[] key, final Optional<ReadOptions> readOptions) throws RocksDBException {
|
||||||
final byte[] valueWithTimestamp = readOptions.isPresent() ? db.get(newColumnFamily, readOptions.get(), key) : db.get(newColumnFamily, key);
|
final byte[] valueWithTimestamp = readOptions.isPresent() ? accessor.get(newColumnFamily, readOptions.get(), key) : accessor.get(newColumnFamily, key);
|
||||||
if (valueWithTimestamp != null) {
|
if (valueWithTimestamp != null) {
|
||||||
return valueWithTimestamp;
|
return valueWithTimestamp;
|
||||||
}
|
}
|
||||||
|
|
||||||
final byte[] plainValue = readOptions.isPresent() ? db.get(oldColumnFamily, readOptions.get(), key) : db.get(oldColumnFamily, key);
|
final byte[] plainValue = readOptions.isPresent() ? accessor.get(oldColumnFamily, readOptions.get(), key) : accessor.get(oldColumnFamily, key);
|
||||||
if (plainValue != null) {
|
if (plainValue != null) {
|
||||||
final byte[] valueWithUnknownTimestamp = convertToTimestampedFormat(plainValue);
|
final byte[] valueWithUnknownTimestamp = convertToTimestampedFormat(plainValue);
|
||||||
// this does only work, because the changelog topic contains correct data already
|
// this does only work, because the changelog topic contains correct data already
|
||||||
// for other format changes, we cannot take this short cut and can only migrate data
|
// for other format changes, we cannot take this short cut and can only migrate data
|
||||||
// from old to new store on put()
|
// from old to new store on put()
|
||||||
put(key, valueWithUnknownTimestamp);
|
put(accessor, key, valueWithUnknownTimestamp);
|
||||||
return valueWithUnknownTimestamp;
|
return valueWithUnknownTimestamp;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -169,13 +170,13 @@ public class RocksDBTimestampedStore extends RocksDBStore implements Timestamped
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public byte[] getOnly(final byte[] key) throws RocksDBException {
|
public byte[] getOnly(final DBAccessor accessor, final byte[] key) throws RocksDBException {
|
||||||
final byte[] valueWithTimestamp = db.get(newColumnFamily, key);
|
final byte[] valueWithTimestamp = accessor.get(newColumnFamily, key);
|
||||||
if (valueWithTimestamp != null) {
|
if (valueWithTimestamp != null) {
|
||||||
return valueWithTimestamp;
|
return valueWithTimestamp;
|
||||||
}
|
}
|
||||||
|
|
||||||
final byte[] plainValue = db.get(oldColumnFamily, key);
|
final byte[] plainValue = accessor.get(oldColumnFamily, key);
|
||||||
if (plainValue != null) {
|
if (plainValue != null) {
|
||||||
return convertToTimestampedFormat(plainValue);
|
return convertToTimestampedFormat(plainValue);
|
||||||
}
|
}
|
||||||
|
@ -184,13 +185,14 @@ public class RocksDBTimestampedStore extends RocksDBStore implements Timestamped
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ManagedKeyValueIterator<Bytes, byte[]> range(final Bytes from,
|
public ManagedKeyValueIterator<Bytes, byte[]> range(final DBAccessor accessor,
|
||||||
|
final Bytes from,
|
||||||
final Bytes to,
|
final Bytes to,
|
||||||
final boolean forward) {
|
final boolean forward) {
|
||||||
return new RocksDBDualCFRangeIterator(
|
return new RocksDBDualCFRangeIterator(
|
||||||
name,
|
name,
|
||||||
db.newIterator(newColumnFamily),
|
accessor.newIterator(newColumnFamily),
|
||||||
db.newIterator(oldColumnFamily),
|
accessor.newIterator(oldColumnFamily),
|
||||||
from,
|
from,
|
||||||
to,
|
to,
|
||||||
forward,
|
forward,
|
||||||
|
@ -198,15 +200,15 @@ public class RocksDBTimestampedStore extends RocksDBStore implements Timestamped
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void deleteRange(final byte[] from, final byte[] to) {
|
public void deleteRange(final DBAccessor accessor, final byte[] from, final byte[] to) {
|
||||||
try {
|
try {
|
||||||
db.deleteRange(oldColumnFamily, wOptions, from, to);
|
accessor.deleteRange(oldColumnFamily, from, to);
|
||||||
} catch (final RocksDBException e) {
|
} catch (final RocksDBException e) {
|
||||||
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
|
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
|
||||||
throw new ProcessorStateException("Error while removing key from store " + name, e);
|
throw new ProcessorStateException("Error while removing key from store " + name, e);
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
db.deleteRange(newColumnFamily, wOptions, from, to);
|
accessor.deleteRange(newColumnFamily, from, to);
|
||||||
} catch (final RocksDBException e) {
|
} catch (final RocksDBException e) {
|
||||||
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
|
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
|
||||||
throw new ProcessorStateException("Error while removing key from store " + name, e);
|
throw new ProcessorStateException("Error while removing key from store " + name, e);
|
||||||
|
@ -214,9 +216,9 @@ public class RocksDBTimestampedStore extends RocksDBStore implements Timestamped
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ManagedKeyValueIterator<Bytes, byte[]> all(final boolean forward) {
|
public ManagedKeyValueIterator<Bytes, byte[]> all(final DBAccessor accessor, final boolean forward) {
|
||||||
final RocksIterator innerIterWithTimestamp = db.newIterator(newColumnFamily);
|
final RocksIterator innerIterWithTimestamp = accessor.newIterator(newColumnFamily);
|
||||||
final RocksIterator innerIterNoTimestamp = db.newIterator(oldColumnFamily);
|
final RocksIterator innerIterNoTimestamp = accessor.newIterator(oldColumnFamily);
|
||||||
if (forward) {
|
if (forward) {
|
||||||
innerIterWithTimestamp.seekToFirst();
|
innerIterWithTimestamp.seekToFirst();
|
||||||
innerIterNoTimestamp.seekToFirst();
|
innerIterNoTimestamp.seekToFirst();
|
||||||
|
@ -228,12 +230,12 @@ public class RocksDBTimestampedStore extends RocksDBStore implements Timestamped
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ManagedKeyValueIterator<Bytes, byte[]> prefixScan(final Bytes prefix) {
|
public ManagedKeyValueIterator<Bytes, byte[]> prefixScan(final DBAccessor accessor, final Bytes prefix) {
|
||||||
final Bytes to = incrementWithoutOverflow(prefix);
|
final Bytes to = incrementWithoutOverflow(prefix);
|
||||||
return new RocksDBDualCFRangeIterator(
|
return new RocksDBDualCFRangeIterator(
|
||||||
name,
|
name,
|
||||||
db.newIterator(newColumnFamily),
|
accessor.newIterator(newColumnFamily),
|
||||||
db.newIterator(oldColumnFamily),
|
accessor.newIterator(oldColumnFamily),
|
||||||
prefix,
|
prefix,
|
||||||
to,
|
to,
|
||||||
true,
|
true,
|
||||||
|
@ -242,15 +244,14 @@ public class RocksDBTimestampedStore extends RocksDBStore implements Timestamped
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long approximateNumEntries() throws RocksDBException {
|
public long approximateNumEntries(final DBAccessor accessor) throws RocksDBException {
|
||||||
return db.getLongProperty(oldColumnFamily, "rocksdb.estimate-num-keys")
|
return accessor.approximateNumEntries(oldColumnFamily) +
|
||||||
+ db.getLongProperty(newColumnFamily, "rocksdb.estimate-num-keys");
|
accessor.approximateNumEntries(newColumnFamily);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void flush() throws RocksDBException {
|
public void flush(final DBAccessor accessor) throws RocksDBException {
|
||||||
db.flush(fOptions, oldColumnFamily);
|
accessor.flush(oldColumnFamily, newColumnFamily);
|
||||||
db.flush(fOptions, newColumnFamily);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
Loading…
Reference in New Issue