KAFKA-10767: Adding test cases for all, reverseAll and reverseRange for ThreadCache (#9779)

The test cases for ThreaCache didn't have the corresponding unit tests for all, reverseAll and reverseRange methods. This PR aims to add the same.

Reviewers: Bruno Cadonna <cadonna@apache.org>
This commit is contained in:
vamossagar12 2021-05-05 15:56:51 +05:30 committed by GitHub
parent 45f24c4195
commit 9a71468cb0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 169 additions and 58 deletions

View File

@ -22,6 +22,7 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import static org.hamcrest.MatcherAssert.assertThat;
import org.junit.Test;
import java.util.ArrayList;
@ -29,7 +30,9 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.function.Supplier;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@ -42,19 +45,20 @@ public class ThreadCacheTest {
final String namespace1 = "0.1-namespace";
final String namespace2 = "0.2-namespace";
private final LogContext logContext = new LogContext("testCache ");
private final byte[][] bytes = new byte[][]{{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10}};
@Test
public void basicPutGet() {
final List<KeyValue<String, String>> toInsert = Arrays.asList(
new KeyValue<>("K1", "V1"),
new KeyValue<>("K2", "V2"),
new KeyValue<>("K3", "V3"),
new KeyValue<>("K4", "V4"),
new KeyValue<>("K5", "V5"));
new KeyValue<>("K1", "V1"),
new KeyValue<>("K2", "V2"),
new KeyValue<>("K3", "V3"),
new KeyValue<>("K4", "V4"),
new KeyValue<>("K5", "V5"));
final KeyValue<String, String> kv = toInsert.get(0);
final ThreadCache cache = new ThreadCache(logContext,
toInsert.size() * memoryCacheEntrySize(kv.key.getBytes(), kv.value.getBytes(), ""),
new MockStreamsMetrics(new Metrics()));
toInsert.size() * memoryCacheEntrySize(kv.key.getBytes(), kv.value.getBytes(), ""),
new MockStreamsMetrics(new Metrics()));
for (final KeyValue<String, String> kvToInsert : toInsert) {
final Bytes key = Bytes.wrap(kvToInsert.key.getBytes());
@ -132,35 +136,35 @@ public class ThreadCacheTest {
static int memoryCacheEntrySize(final byte[] key, final byte[] value, final String topic) {
return key.length +
value.length +
1 + // isDirty
8 + // timestamp
8 + // offset
4 +
topic.length() +
// LRU Node entries
key.length +
8 + // entry
8 + // previous
8; // next
value.length +
1 + // isDirty
8 + // timestamp
8 + // offset
4 +
topic.length() +
// LRU Node entries
key.length +
8 + // entry
8 + // previous
8; // next
}
@Test
public void evict() {
final List<KeyValue<String, String>> received = new ArrayList<>();
final List<KeyValue<String, String>> expected = Collections.singletonList(
new KeyValue<>("K1", "V1"));
new KeyValue<>("K1", "V1"));
final List<KeyValue<String, String>> toInsert = Arrays.asList(
new KeyValue<>("K1", "V1"),
new KeyValue<>("K2", "V2"),
new KeyValue<>("K3", "V3"),
new KeyValue<>("K4", "V4"),
new KeyValue<>("K5", "V5"));
new KeyValue<>("K1", "V1"),
new KeyValue<>("K2", "V2"),
new KeyValue<>("K3", "V3"),
new KeyValue<>("K4", "V4"),
new KeyValue<>("K5", "V5"));
final KeyValue<String, String> kv = toInsert.get(0);
final ThreadCache cache = new ThreadCache(logContext,
memoryCacheEntrySize(kv.key.getBytes(), kv.value.getBytes(), ""),
new MockStreamsMetrics(new Metrics()));
memoryCacheEntrySize(kv.key.getBytes(), kv.value.getBytes(), ""),
new MockStreamsMetrics(new Metrics()));
cache.addDirtyEntryFlushListener(namespace, dirty -> {
for (final ThreadCache.DirtyEntry dirtyEntry : dirty) {
received.add(new KeyValue<>(dirtyEntry.key().toString(), new String(dirtyEntry.newValue())));
@ -233,46 +237,88 @@ public class ThreadCacheTest {
assertArrayEquals(name1Byte.get(), cache.get(namespace2, nameByte).value());
}
private ThreadCache setupThreadCache(final int first, final int last, final long entrySize, final boolean reverse) {
final ThreadCache cache = new ThreadCache(logContext, entrySize, new MockStreamsMetrics(new Metrics()));
cache.addDirtyEntryFlushListener(namespace, dirty -> { });
int index = first;
while ((!reverse && index < last) || (reverse && index >= last)) {
cache.put(namespace, Bytes.wrap(bytes[index]), dirtyEntry(bytes[index]));
if (!reverse)
index++;
else
index--;
}
return cache;
}
@Test
public void shouldPeekNextKey() {
final ThreadCache cache = new ThreadCache(logContext, 10000L, new MockStreamsMetrics(new Metrics()));
final ThreadCache cache = setupThreadCache(0, 1, 10000L, false);
final Bytes theByte = Bytes.wrap(new byte[]{0});
cache.put(namespace, theByte, dirtyEntry(theByte.get()));
final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range(namespace, theByte, Bytes.wrap(new byte[]{1}));
assertEquals(theByte, iterator.peekNextKey());
assertEquals(theByte, iterator.peekNextKey());
}
@Test
public void shouldPeekNextKeyReverseRange() {
final ThreadCache cache = setupThreadCache(1, 1, 10000L, true);
final Bytes theByte = Bytes.wrap(new byte[]{1});
final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.reverseRange(namespace, Bytes.wrap(new byte[]{0}), theByte);
assertThat(iterator.peekNextKey(), is(theByte));
assertThat(iterator.peekNextKey(), is(theByte));
}
@Test
public void shouldGetSameKeyAsPeekNext() {
final ThreadCache cache = new ThreadCache(logContext, 10000L, new MockStreamsMetrics(new Metrics()));
final ThreadCache cache = setupThreadCache(0, 1, 10000L, false);
final Bytes theByte = Bytes.wrap(new byte[]{0});
cache.put(namespace, theByte, dirtyEntry(theByte.get()));
final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range(namespace, theByte, Bytes.wrap(new byte[]{1}));
assertEquals(iterator.peekNextKey(), iterator.next().key);
assertThat(iterator.peekNextKey(), is(iterator.next().key));
}
@Test
public void shouldThrowIfNoPeekNextKey() {
final ThreadCache cache = new ThreadCache(logContext, 10000L, new MockStreamsMetrics(new Metrics()));
final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range(namespace, Bytes.wrap(new byte[]{0}), Bytes.wrap(new byte[]{1}));
public void shouldGetSameKeyAsPeekNextReverseRange() {
final ThreadCache cache = setupThreadCache(1, 1, 10000L, true);
final Bytes theByte = Bytes.wrap(new byte[]{1});
final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.reverseRange(namespace, Bytes.wrap(new byte[]{0}), theByte);
assertThat(iterator.peekNextKey(), is(iterator.next().key));
}
private void shouldThrowIfNoPeekNextKey(final Supplier<ThreadCache.MemoryLRUCacheBytesIterator> methodUnderTest) {
final ThreadCache.MemoryLRUCacheBytesIterator iterator = methodUnderTest.get();
assertThrows(NoSuchElementException.class, iterator::peekNextKey);
}
@Test
public void shouldThrowIfNoPeekNextKeyRange() {
final ThreadCache cache = setupThreadCache(0, 0, 10000L, false);
shouldThrowIfNoPeekNextKey(() -> cache.range(namespace, Bytes.wrap(new byte[]{0}), Bytes.wrap(new byte[]{1})));
}
@Test
public void shouldThrowIfNoPeekNextKeyReverseRange() {
final ThreadCache cache = setupThreadCache(-1, 0, 10000L, true);
shouldThrowIfNoPeekNextKey(() -> cache.reverseRange(namespace, Bytes.wrap(new byte[]{0}), Bytes.wrap(new byte[]{1})));
}
@Test
public void shouldReturnFalseIfNoNextKey() {
final ThreadCache cache = new ThreadCache(logContext, 10000L, new MockStreamsMetrics(new Metrics()));
final ThreadCache cache = setupThreadCache(0, 0, 10000L, false);
final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range(namespace, Bytes.wrap(new byte[]{0}), Bytes.wrap(new byte[]{1}));
assertFalse(iterator.hasNext());
}
@Test
public void shouldReturnFalseIfNoNextKeyReverseRange() {
final ThreadCache cache = setupThreadCache(-1, 0, 10000L, true);
final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.reverseRange(namespace, Bytes.wrap(new byte[]{0}), Bytes.wrap(new byte[]{1}));
assertFalse(iterator.hasNext());
}
@Test
public void shouldPeekAndIterateOverRange() {
final ThreadCache cache = new ThreadCache(logContext, 10000L, new MockStreamsMetrics(new Metrics()));
final byte[][] bytes = {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10}};
for (final byte[] aByte : bytes) {
cache.put(namespace, Bytes.wrap(aByte), dirtyEntry(aByte));
}
final ThreadCache cache = setupThreadCache(0, 10, 10000L, false);
final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range(namespace, Bytes.wrap(new byte[]{1}), Bytes.wrap(new byte[]{4}));
int bytesIndex = 1;
while (iterator.hasNext()) {
@ -286,12 +332,8 @@ public class ThreadCacheTest {
}
@Test
public void shouldSkipToEntryWhentoInclusiveIsFalseInRange() {
final ThreadCache cache = new ThreadCache(logContext, 10000L, new MockStreamsMetrics(new Metrics()));
final byte[][] bytes = {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10}};
for (final byte[] aByte : bytes) {
cache.put(namespace, Bytes.wrap(aByte), dirtyEntry(aByte));
}
public void shouldSkipToEntryWhenToInclusiveIsFalseInRange() {
final ThreadCache cache = setupThreadCache(0, 10, 10000L, false);
final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range(namespace, Bytes.wrap(new byte[]{1}), Bytes.wrap(new byte[]{4}), false);
int bytesIndex = 1;
while (iterator.hasNext()) {
@ -304,26 +346,95 @@ public class ThreadCacheTest {
assertEquals(4, bytesIndex);
}
@Test
public void shouldPeekAndIterateOverReverseRange() {
final ThreadCache cache = setupThreadCache(10, 0, 10000L, true);
final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.reverseRange(namespace, Bytes.wrap(new byte[]{1}), Bytes.wrap(new byte[]{4}));
int bytesIndex = 4;
while (iterator.hasNext()) {
final Bytes peekedKey = iterator.peekNextKey();
final KeyValue<Bytes, LRUCacheEntry> next = iterator.next();
assertArrayEquals(bytes[bytesIndex], peekedKey.get());
assertArrayEquals(bytes[bytesIndex], next.key.get());
bytesIndex--;
}
assertEquals(0, bytesIndex);
}
@Test
public void shouldSkipEntriesWhereValueHasBeenEvictedFromCache() {
final int entrySize = memoryCacheEntrySize(new byte[1], new byte[1], "");
final ThreadCache cache = new ThreadCache(logContext, entrySize * 5, new MockStreamsMetrics(new Metrics()));
cache.addDirtyEntryFlushListener(namespace, dirty -> { });
final byte[][] bytes = {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}};
for (int i = 0; i < 5; i++) {
cache.put(namespace, Bytes.wrap(bytes[i]), dirtyEntry(bytes[i]));
}
final ThreadCache cache = setupThreadCache(0, 5, entrySize * 5, false);
assertEquals(5, cache.size());
// should evict byte[] {0}
cache.put(namespace, Bytes.wrap(new byte[]{6}), dirtyEntry(new byte[]{6}));
final ThreadCache.MemoryLRUCacheBytesIterator range = cache.range(namespace, Bytes.wrap(new byte[]{0}), Bytes.wrap(new byte[]{5}));
assertEquals(Bytes.wrap(new byte[]{1}), range.peekNextKey());
}
@Test
public void shouldSkipEntriesWhereValueHasBeenEvictedFromCacheReverseRange() {
final int entrySize = memoryCacheEntrySize(new byte[1], new byte[1], "");
final ThreadCache cache = setupThreadCache(4, 0, entrySize * 5, true);
assertEquals(5, cache.size());
// should evict byte[] {4}
cache.put(namespace, Bytes.wrap(new byte[]{6}), dirtyEntry(new byte[]{6}));
final ThreadCache.MemoryLRUCacheBytesIterator range = cache.reverseRange(namespace, Bytes.wrap(new byte[]{0}), Bytes.wrap(new byte[]{5}));
assertEquals(Bytes.wrap(new byte[]{3}), range.peekNextKey());
}
@Test
public void shouldFetchAllEntriesInCache() {
final ThreadCache cache = setupThreadCache(0, 11, 10000L, false);
final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.all(namespace);
int bytesIndex = 0;
while (iterator.hasNext()) {
final Bytes peekedKey = iterator.peekNextKey();
final KeyValue<Bytes, LRUCacheEntry> next = iterator.next();
assertArrayEquals(bytes[bytesIndex], peekedKey.get());
assertArrayEquals(bytes[bytesIndex], next.key.get());
bytesIndex++;
}
assertEquals(11, bytesIndex);
}
@Test
public void shouldFetchAllEntriesInCacheInReverseOrder() {
final ThreadCache cache = setupThreadCache(10, 0, 10000L, true);
final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.reverseAll(namespace);
int bytesIndex = 10;
while (iterator.hasNext()) {
final Bytes peekedKey = iterator.peekNextKey();
final KeyValue<Bytes, LRUCacheEntry> next = iterator.next();
assertArrayEquals(bytes[bytesIndex], peekedKey.get());
assertArrayEquals(bytes[bytesIndex], next.key.get());
bytesIndex--;
}
assertEquals(-1, bytesIndex);
}
@Test
public void shouldReturnAllUnevictedValuesFromCache() {
final int entrySize = memoryCacheEntrySize(new byte[1], new byte[1], "");
final ThreadCache cache = setupThreadCache(0, 5, entrySize * 5, false);
assertEquals(5, cache.size());
// should evict byte[] {0}
cache.put(namespace, Bytes.wrap(new byte[]{6}), dirtyEntry(new byte[]{6}));
final ThreadCache.MemoryLRUCacheBytesIterator range = cache.all(namespace);
assertEquals(Bytes.wrap(new byte[]{1}), range.peekNextKey());
}
@Test
public void shouldReturnAllUnevictedValuesFromCacheInReverseOrder() {
final int entrySize = memoryCacheEntrySize(new byte[1], new byte[1], "");
final ThreadCache cache = setupThreadCache(4, 0, entrySize * 5, true);
assertEquals(5, cache.size());
// should evict byte[] {4}
cache.put(namespace, Bytes.wrap(new byte[]{6}), dirtyEntry(new byte[]{6}));
final ThreadCache.MemoryLRUCacheBytesIterator range = cache.reverseAll(namespace);
assertEquals(Bytes.wrap(new byte[]{6}), range.peekNextKey());
}
@Test
public void shouldFlushDirtyEntriesForNamespace() {
final ThreadCache cache = new ThreadCache(logContext, 100000, new MockStreamsMetrics(new Metrics()));
@ -394,7 +505,7 @@ public class ThreadCacheTest {
cache.addDirtyEntryFlushListener(namespace, received::addAll);
cache.putAll(namespace, Arrays.asList(KeyValue.pair(Bytes.wrap(new byte[]{0}), dirtyEntry(new byte[]{5})),
KeyValue.pair(Bytes.wrap(new byte[]{1}), dirtyEntry(new byte[]{6}))));
KeyValue.pair(Bytes.wrap(new byte[]{1}), dirtyEntry(new byte[]{6}))));
assertEquals(cache.evicts(), 2);
assertEquals(received.size(), 2);
@ -405,7 +516,7 @@ public class ThreadCacheTest {
final ThreadCache cache = new ThreadCache(logContext, 100000, new MockStreamsMetrics(new Metrics()));
cache.putAll(namespace, Arrays.asList(KeyValue.pair(Bytes.wrap(new byte[]{0}), dirtyEntry(new byte[]{5})),
KeyValue.pair(Bytes.wrap(new byte[]{1}), dirtyEntry(new byte[]{6}))));
KeyValue.pair(Bytes.wrap(new byte[]{1}), dirtyEntry(new byte[]{6}))));
assertArrayEquals(new byte[]{5}, cache.get(namespace, Bytes.wrap(new byte[]{0})).value());
assertArrayEquals(new byte[]{6}, cache.get(namespace, Bytes.wrap(new byte[]{1})).value());