mirror of https://github.com/apache/kafka.git
KAFKA-14260: add `synchronized` to `prefixScan` method (#12893)
As a result of "14260: InMemoryKeyValueStore iterator still throws ConcurrentModificationException", I'm adding synchronized to prefixScan as an alternative to going back to the ConcurrentSkipList. Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
This commit is contained in:
parent
96b1db510a
commit
923fea583b
|
|
@ -169,7 +169,7 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> prefixScan(final P prefix, final PS prefixKeySerializer) {
|
public synchronized <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> prefixScan(final P prefix, final PS prefixKeySerializer) {
|
||||||
|
|
||||||
final Bytes from = Bytes.wrap(prefixKeySerializer.serialize(null, prefix));
|
final Bytes from = Bytes.wrap(prefixKeySerializer.serialize(null, prefix));
|
||||||
final Bytes to = Bytes.increment(from);
|
final Bytes to = Bytes.increment(from);
|
||||||
|
|
|
||||||
|
|
@ -16,6 +16,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.kafka.streams.state.internals;
|
package org.apache.kafka.streams.state.internals;
|
||||||
|
|
||||||
|
import org.apache.kafka.common.serialization.IntegerSerializer;
|
||||||
import org.apache.kafka.common.serialization.Serdes;
|
import org.apache.kafka.common.serialization.Serdes;
|
||||||
import org.apache.kafka.common.serialization.Serializer;
|
import org.apache.kafka.common.serialization.Serializer;
|
||||||
import org.apache.kafka.common.serialization.StringDeserializer;
|
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||||
|
|
@ -55,7 +56,6 @@ import static org.junit.Assert.fail;
|
||||||
public abstract class AbstractKeyValueStoreTest {
|
public abstract class AbstractKeyValueStoreTest {
|
||||||
|
|
||||||
protected abstract <K, V> KeyValueStore<K, V> createKeyValueStore(final StateStoreContext context);
|
protected abstract <K, V> KeyValueStore<K, V> createKeyValueStore(final StateStoreContext context);
|
||||||
|
|
||||||
protected InternalMockProcessorContext context;
|
protected InternalMockProcessorContext context;
|
||||||
protected KeyValueStore<Integer, String> store;
|
protected KeyValueStore<Integer, String> store;
|
||||||
protected KeyValueStoreTestDriver<Integer, String> driver;
|
protected KeyValueStoreTestDriver<Integer, String> driver;
|
||||||
|
|
@ -648,4 +648,25 @@ public abstract class AbstractKeyValueStoreTest {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void prefixScanShouldNotThrowConcurrentModificationException() {
|
||||||
|
|
||||||
|
store.put(0, "zero");
|
||||||
|
store.put(1, "one");
|
||||||
|
store.put(222, "two-hundred-twenty-two");
|
||||||
|
store.put(2, "two");
|
||||||
|
store.put(22, "twenty-two");
|
||||||
|
store.put(3, "three");
|
||||||
|
|
||||||
|
try (final KeyValueIterator<Integer, String> iter = store.prefixScan(2, new IntegerSerializer())) {
|
||||||
|
|
||||||
|
store.delete(22);
|
||||||
|
|
||||||
|
while (iter.hasNext()) {
|
||||||
|
iter.next();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue