mirror of https://github.com/apache/kafka.git
MINOR: improve Streams error message (#5975)
Reviewers: Bill Bejeck <bill@confluent.io>, Guozhang Wang <guozhang@confluent.io>
This commit is contained in:
parent
152292994e
commit
c441528b93
|
|
@ -42,6 +42,8 @@ import org.rocksdb.RocksDBException;
|
||||||
import org.rocksdb.RocksIterator;
|
import org.rocksdb.RocksIterator;
|
||||||
import org.rocksdb.WriteBatch;
|
import org.rocksdb.WriteBatch;
|
||||||
import org.rocksdb.WriteOptions;
|
import org.rocksdb.WriteOptions;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
@ -66,6 +68,8 @@ import java.util.regex.Pattern;
|
||||||
*/
|
*/
|
||||||
public class RocksDBStore implements KeyValueStore<Bytes, byte[]> {
|
public class RocksDBStore implements KeyValueStore<Bytes, byte[]> {
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(RocksDBStore.class);
|
||||||
|
|
||||||
private static final Pattern SST_FILE_EXTENSION = Pattern.compile(".*\\.sst");
|
private static final Pattern SST_FILE_EXTENSION = Pattern.compile(".*\\.sst");
|
||||||
|
|
||||||
private static final CompressionType COMPRESSION_TYPE = CompressionType.NO_COMPRESSION;
|
private static final CompressionType COMPRESSION_TYPE = CompressionType.NO_COMPRESSION;
|
||||||
|
|
@ -422,8 +426,11 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> {
|
||||||
synchronized (openIterators) {
|
synchronized (openIterators) {
|
||||||
iterators = new HashSet<>(openIterators);
|
iterators = new HashSet<>(openIterators);
|
||||||
}
|
}
|
||||||
for (final KeyValueIterator iterator : iterators) {
|
if (iterators.size() != 0) {
|
||||||
iterator.close();
|
log.warn("Closing {} open iterators for store {}", iterators.size(), name);
|
||||||
|
for (final KeyValueIterator iterator : iterators) {
|
||||||
|
iterator.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -444,7 +451,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> {
|
||||||
@Override
|
@Override
|
||||||
public synchronized boolean hasNext() {
|
public synchronized boolean hasNext() {
|
||||||
if (!open) {
|
if (!open) {
|
||||||
throw new InvalidStateStoreException(String.format("RocksDB store %s has closed", storeName));
|
throw new InvalidStateStoreException(String.format("RocksDB iterator for store %s has closed", storeName));
|
||||||
}
|
}
|
||||||
return super.hasNext();
|
return super.hasNext();
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue