KAFKA-14460: Skip removed entries from in-memory KeyValueIterator (#16505)

As described in KAFKA-14460, one of the functional requirements of KeyValueStore is that "The returned iterator must not return null values" on methods which return iterator.

This is not completely the case today for InMemoryKeyValueStore. To iterate over the store, we copy the keySet in order not to block access for other threads. However, entries that are removed from the store after initializing the iterator will be returned with null values by the iterator.

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
This commit is contained in:
Ayoub Omari 2024-09-08 02:06:55 +02:00 committed by GitHub
parent 981133d350
commit be3ab8bdd5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 210 additions and 20 deletions

View File

@ -413,8 +413,7 @@ public class CachingKeyValueStore
@Override
public KeyValueIterator<Bytes, byte[]> all() {
validateStoreOpen();
final KeyValueIterator<Bytes, byte[]> storeIterator =
new DelegatingPeekingKeyValueIterator<>(this.name(), wrapped().all());
final KeyValueIterator<Bytes, byte[]> storeIterator = wrapped().all();
final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = context.cache().all(cacheName);
return new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator, storeIterator, true);
}
@ -432,8 +431,7 @@ public class CachingKeyValueStore
@Override
public KeyValueIterator<Bytes, byte[]> reverseAll() {
validateStoreOpen();
final KeyValueIterator<Bytes, byte[]> storeIterator =
new DelegatingPeekingKeyValueIterator<>(this.name(), wrapped().reverseAll());
final KeyValueIterator<Bytes, byte[]> storeIterator = wrapped().reverseAll();
final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = context.cache().reverseAll(cacheName);
return new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator, storeIterator, false);
}

View File

@ -17,7 +17,6 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.state.KeyValueIterator;
import java.util.NoSuchElementException;
@ -54,7 +53,7 @@ public class DelegatingPeekingKeyValueIterator<K, V> implements KeyValueIterator
@Override
public synchronized boolean hasNext() {
if (!open) {
throw new InvalidStateStoreException(String.format("Store %s has closed", storeName));
throw new IllegalStateException(String.format("Iterator for store %s has already been closed.", storeName));
}
if (next != null) {
return true;

View File

@ -39,6 +39,7 @@ import org.slf4j.LoggerFactory;
import java.util.Iterator;
import java.util.List;
import java.util.NavigableMap;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
@ -172,10 +173,7 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
final Bytes from = Bytes.wrap(prefixKeySerializer.serialize(null, prefix));
final Bytes to = Bytes.increment(from);
return new DelegatingPeekingKeyValueIterator<>(
name,
new InMemoryKeyValueIterator(map.subMap(from, true, to, false).keySet(), true)
);
return new InMemoryKeyValueIterator(map.subMap(from, true, to, false).keySet(), true);
}
@Override
@ -212,7 +210,7 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
}
private KeyValueIterator<Bytes, byte[]> getKeyValueIterator(final Set<Bytes> rangeSet, final boolean forward) {
return new DelegatingPeekingKeyValueIterator<>(name, new InMemoryKeyValueIterator(rangeSet, forward));
return new InMemoryKeyValueIterator(rangeSet, forward);
}
@Override
@ -222,9 +220,7 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
@Override
public synchronized KeyValueIterator<Bytes, byte[]> reverseAll() {
return new DelegatingPeekingKeyValueIterator<>(
name,
new InMemoryKeyValueIterator(map.keySet(), false));
return new InMemoryKeyValueIterator(map.keySet(), false);
}
@Override
@ -245,6 +241,8 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
private class InMemoryKeyValueIterator implements KeyValueIterator<Bytes, byte[]> {
private final Iterator<Bytes> iter;
private Bytes currentKey;
private Boolean iteratorOpen = true;
private InMemoryKeyValueIterator(final Set<Bytes> keySet, final boolean forward) {
if (forward) {
@ -256,23 +254,45 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
@Override
public boolean hasNext() {
return iter.hasNext();
if (!iteratorOpen) {
throw new IllegalStateException(String.format("Iterator for store %s has already been closed.", name));
}
if (currentKey != null) {
if (map.containsKey(currentKey)) {
return true;
} else {
currentKey = null;
return hasNext();
}
}
if (!iter.hasNext()) {
return false;
}
currentKey = iter.next();
return hasNext();
}
@Override
public KeyValue<Bytes, byte[]> next() {
final Bytes key = iter.next();
return new KeyValue<>(key, map.get(key));
if (!hasNext()) {
throw new NoSuchElementException();
}
final KeyValue<Bytes, byte[]> ret = new KeyValue<>(currentKey, map.get(currentKey));
currentKey = null;
return ret;
}
@Override
public void close() {
// do nothing
iteratorOpen = false;
}
@Override
public Bytes peekNextKey() {
throw new UnsupportedOperationException("peekNextKey() not supported in " + getClass().getName());
if (!hasNext()) {
throw new NoSuchElementException();
}
return currentKey;
}
}
}

View File

@ -38,6 +38,7 @@ import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.UUID;
import static org.apache.kafka.common.utils.Utils.mkEntry;
@ -45,8 +46,11 @@ import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class InMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest {
@ -266,6 +270,171 @@ public class InMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest {
assertEquals(expected, actual);
}
@Test
public void iteratorHasNextOnEmptyStoreShouldReturnFalse() {
inMemoryKeyValueStore.init((StateStoreContext) context, inMemoryKeyValueStore);
final KeyValueIterator<Bytes, byte[]> iter = inMemoryKeyValueStore.all();
assertFalse(iter.hasNext());
}
@Test
public void iteratorHasNextOnDeletedEntryShouldReturnFalse() {
inMemoryKeyValueStore.init((StateStoreContext) context, inMemoryKeyValueStore);
inMemoryKeyValueStore.put(bytesKey("key"), bytesValue("value"));
final KeyValueIterator<Bytes, byte[]> iter = inMemoryKeyValueStore.all();
assertTrue(iter.hasNext());
inMemoryKeyValueStore.delete(bytesKey("key"));
assertFalse(iter.hasNext());
}
@Test
public void iteratorHasNextShouldNotAdvanceIterator() {
inMemoryKeyValueStore.init((StateStoreContext) context, inMemoryKeyValueStore);
inMemoryKeyValueStore.put(bytesKey("key"), bytesValue("value"));
final KeyValueIterator<Bytes, byte[]> iter = inMemoryKeyValueStore.all();
assertTrue(iter.hasNext());
assertTrue(iter.hasNext()); // should still point to the first element
}
@Test
public void iteratorHasNextShouldReturnTrueIfElementsRemaining() {
inMemoryKeyValueStore.init((StateStoreContext) context, inMemoryKeyValueStore);
inMemoryKeyValueStore.put(bytesKey("key1"), bytesValue("value1"));
inMemoryKeyValueStore.put(bytesKey("key2"), bytesValue("value2"));
final KeyValueIterator<Bytes, byte[]> iter = inMemoryKeyValueStore.all();
inMemoryKeyValueStore.delete(bytesKey("key1"));
assertTrue(iter.hasNext());
}
@Test
public void iteratorNextShouldReturnNextElement() {
inMemoryKeyValueStore.init((StateStoreContext) context, inMemoryKeyValueStore);
inMemoryKeyValueStore.put(bytesKey("key"), bytesValue("value"));
final KeyValueIterator<Bytes, byte[]> iter = inMemoryKeyValueStore.all();
final KeyValue<Bytes, byte[]> next = iter.next();
assertEquals(bytesKey("key"), next.key);
assertArrayEquals(bytesValue("value"), next.value);
}
@Test
public void iteratorNextAfterHasNextShouldReturnNextElement() {
inMemoryKeyValueStore.init((StateStoreContext) context, inMemoryKeyValueStore);
inMemoryKeyValueStore.put(bytesKey("key"), bytesValue("value"));
final KeyValueIterator<Bytes, byte[]> iter = inMemoryKeyValueStore.all();
assertTrue(iter.hasNext());
final KeyValue<Bytes, byte[]> next = iter.next();
assertEquals(bytesKey("key"), next.key);
assertArrayEquals(bytesValue("value"), next.value);
}
@Test
public void iteratorNextOnEmptyStoreShouldThrowException() {
inMemoryKeyValueStore.init((StateStoreContext) context, inMemoryKeyValueStore);
final KeyValueIterator<Bytes, byte[]> iter = inMemoryKeyValueStore.all();
assertThrows(NoSuchElementException.class, iter::next);
}
@Test
public void iteratorNextShouldThrowExceptionIfRemainingElementsDeleted() {
inMemoryKeyValueStore.init((StateStoreContext) context, inMemoryKeyValueStore);
inMemoryKeyValueStore.put(bytesKey("key1"), bytesValue("value1"));
inMemoryKeyValueStore.put(bytesKey("key2"), bytesValue("value2"));
final KeyValueIterator<Bytes, byte[]> iter = inMemoryKeyValueStore.all();
final KeyValue<Bytes, byte[]> next = iter.next();
assertEquals(bytesKey("key1"), next.key);
assertArrayEquals(bytesValue("value1"), next.value);
inMemoryKeyValueStore.delete(bytesKey("key2"));
assertThrows(NoSuchElementException.class, iter::next);
}
@Test
public void iteratorNextShouldSkipDeletedElements() {
inMemoryKeyValueStore.init((StateStoreContext) context, inMemoryKeyValueStore);
inMemoryKeyValueStore.put(bytesKey("key1"), bytesValue("value1"));
inMemoryKeyValueStore.put(bytesKey("key2"), bytesValue("value2"));
final KeyValueIterator<Bytes, byte[]> iter = inMemoryKeyValueStore.all();
inMemoryKeyValueStore.delete(bytesKey("key1"));
final KeyValue<Bytes, byte[]> next = iter.next();
assertEquals(bytesKey("key2"), next.key);
assertArrayEquals(bytesValue("value2"), next.value);
}
@Test
public void iteratorNextShouldIterateOverAllElements() {
inMemoryKeyValueStore.init((StateStoreContext) context, inMemoryKeyValueStore);
inMemoryKeyValueStore.put(bytesKey("key1"), bytesValue("value1"));
inMemoryKeyValueStore.put(bytesKey("key2"), bytesValue("value2"));
final KeyValueIterator<Bytes, byte[]> iter = inMemoryKeyValueStore.all();
final KeyValue<Bytes, byte[]> next1 = iter.next();
assertEquals(bytesKey("key1"), next1.key);
assertArrayEquals(bytesValue("value1"), next1.value);
final KeyValue<Bytes, byte[]> next2 = iter.next();
assertEquals(bytesKey("key2"), next2.key);
assertArrayEquals(bytesValue("value2"), next2.value);
assertThrows(NoSuchElementException.class, iter::next);
}
@Test
public void iteratorPeekNextKeyOnEmptyStoreShouldThrowException() {
inMemoryKeyValueStore.init((StateStoreContext) context, inMemoryKeyValueStore);
final KeyValueIterator<Bytes, byte[]> iter = inMemoryKeyValueStore.all();
assertThrows(NoSuchElementException.class, iter::peekNextKey);
}
@Test
public void iteratorPeekNextKeyOnDeletedEntryShouldThrowException() {
inMemoryKeyValueStore.init((StateStoreContext) context, inMemoryKeyValueStore);
inMemoryKeyValueStore.put(bytesKey("key"), bytesValue("value"));
final KeyValueIterator<Bytes, byte[]> iter = inMemoryKeyValueStore.all();
assertEquals(bytesKey("key"), iter.peekNextKey());
inMemoryKeyValueStore.delete(bytesKey("key"));
assertThrows(NoSuchElementException.class, iter::peekNextKey);
}
@Test
public void iteratorPeekNextKeyShouldNotAdvanceIterator() {
inMemoryKeyValueStore.init((StateStoreContext) context, inMemoryKeyValueStore);
inMemoryKeyValueStore.put(bytesKey("key"), bytesValue("value"));
final KeyValueIterator<Bytes, byte[]> iter = inMemoryKeyValueStore.all();
assertEquals(bytesKey("key"), iter.peekNextKey());
assertEquals(bytesKey("key"), iter.peekNextKey());
}
@Test
public void iteratorPeekNextKeyShouldSkipDeletedElements() {
inMemoryKeyValueStore.init((StateStoreContext) context, inMemoryKeyValueStore);
inMemoryKeyValueStore.put(bytesKey("key1"), bytesValue("value1"));
inMemoryKeyValueStore.put(bytesKey("key2"), bytesValue("value2"));
final KeyValueIterator<Bytes, byte[]> iter = inMemoryKeyValueStore.all();
inMemoryKeyValueStore.delete(bytesKey("key1"));
assertEquals(bytesKey("key2"), iter.peekNextKey());
}
@Test
public void iteratorShouldThrowIllegalStateExceptionIfAlreadyClosed() {
inMemoryKeyValueStore.init((StateStoreContext) context, inMemoryKeyValueStore);
final KeyValueIterator<Bytes, byte[]> iter = inMemoryKeyValueStore.all();
iter.close();
assertThrows(IllegalStateException.class, iter::hasNext);
assertThrows(IllegalStateException.class, iter::next);
assertThrows(IllegalStateException.class, iter::peekNextKey);
}
private byte[] bytesValue(final String value) {
return value.getBytes();
}

View File

@ -201,6 +201,10 @@ public class ListValueStoreTest {
it.close();
// A new all() iterator after a previous all() iterator was closed should not return deleted records.
assertThrows(InvalidStateStoreException.class, it::next);
if (storeType == StoreType.InMemory) {
assertThrows(IllegalStateException.class, it::next);
} else {
assertThrows(InvalidStateStoreException.class, it::next);
}
}
}