HOTFIX: RocksDBStore must clear dirty flags after flush

guozhangwang
Without clearing the dirty flags, RocksDBStore will perform flush for every new record. This bug made the store performance painfully slower.

Author: Yasuhiro Matsuda <yasuhiro@confluent.io>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #1163 from ymatsuda/clear_dirty_flag
This commit is contained in:
Yasuhiro Matsuda 2016-03-29 13:30:56 -07:00 committed by Guozhang Wang
parent 43d5078e98
commit 5089f547d5
1 changed files with 16 additions and 11 deletions

View File

@ -165,7 +165,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
public void apply(K key, RocksDBCacheEntry entry) {
// flush all the dirty entries to RocksDB if this evicted entry is dirty
if (entry.isDirty) {
flush();
flushCache();
}
}
});
@ -226,7 +226,6 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
RocksDBCacheEntry entry = cache.get(key);
if (entry == null) {
byte[] rawKey = serdes.rawKey(key);
V value = serdes.valueFrom(getInternal(serdes.rawKey(key)));
cache.put(key, new RocksDBCacheEntry(value));
@ -251,8 +250,8 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
@Override
public void put(K key, V value) {
if (cache != null) {
cache.put(key, new RocksDBCacheEntry(value, true));
cacheDirtyKeys.add(key);
cache.put(key, new RocksDBCacheEntry(value, true));
} else {
byte[] rawKey = serdes.rawKey(key);
byte[] rawValue = serdes.rawValue(value);
@ -298,7 +297,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
put(entry.key, entry.value);
}
// this function is only called in flush()
// this function is only called in flushCache()
private void putAllInternal(List<KeyValue<byte[], byte[]>> entries) {
WriteBatch batch = new WriteBatch();
@ -324,7 +323,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
public KeyValueIterator<K, V> range(K from, K to) {
// we need to flush the cache if necessary before returning the iterator
if (cache != null)
flush();
flushCache();
return new RocksDBRangeIterator<K, V>(db.newIterator(), serdes, from, to);
}
@ -333,15 +332,14 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
public KeyValueIterator<K, V> all() {
// we need to flush the cache if necessary before returning the iterator
if (cache != null)
flush();
flushCache();
RocksIterator innerIter = db.newIterator();
innerIter.seekToFirst();
return new RocksDbIterator<K, V>(innerIter, serdes);
}
@Override
public void flush() {
private void flushCache() {
// flush of the cache entries if necessary
if (cache != null) {
List<KeyValue<byte[], byte[]>> putBatch = new ArrayList<>(cache.keys.size());
@ -350,7 +348,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
for (K key : cacheDirtyKeys) {
RocksDBCacheEntry entry = cache.get(key);
assert entry.isDirty;
entry.isDirty = false;
byte[] rawKey = serdes.rawKey(key);
@ -386,12 +384,19 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
cacheDirtyKeys.clear();
}
flushInternal();
if (loggingEnabled)
changeLogger.logChange(getter);
}
@Override
public void flush() {
// flush of the cache entries if necessary
flushCache();
// flush RocksDB
flushInternal();
}
public void flushInternal() {
try {
db.flush(fOptions);