mirror of https://github.com/apache/kafka.git
Revert "MINOR: make flush no-op as we don't need to call flush on commit."
This reverts commit 90b2a2bf66
.
This commit is contained in:
parent
254add9534
commit
f848e2cd68
|
@ -95,7 +95,6 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
|
|||
private Options options;
|
||||
private WriteOptions wOptions;
|
||||
private FlushOptions fOptions;
|
||||
private boolean eosEnabled;
|
||||
|
||||
protected volatile boolean open = false;
|
||||
|
||||
|
@ -179,7 +178,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
|
|||
putInternal(key, value);
|
||||
}
|
||||
});
|
||||
eosEnabled = checkForEos(context.appConfigs());
|
||||
|
||||
open = true;
|
||||
}
|
||||
|
||||
|
@ -356,12 +355,12 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
|
|||
|
||||
@Override
|
||||
public synchronized void flush() {
|
||||
if (db == null || eosEnabled) {
|
||||
if (db == null) {
|
||||
return;
|
||||
}
|
||||
// flush RocksDB
|
||||
flushInternal();
|
||||
}
|
||||
|
||||
/**
|
||||
* @throws ProcessorStateException if flushing failed because of any internal store exceptions
|
||||
*/
|
||||
|
@ -373,10 +372,6 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
|
|||
}
|
||||
}
|
||||
|
||||
private boolean checkForEos(Map<String, Object> configs) {
|
||||
return StreamsConfig.EXACTLY_ONCE.equals(configs.get(StreamsConfig.PROCESSING_GUARANTEE_CONFIG));
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void close() {
|
||||
if (!open) {
|
||||
|
|
Loading…
Reference in New Issue