mirror of https://github.com/apache/kafka.git
MINOR: Clean up ThreadCacheTest (#6485)
Minor clean up ofThreadCacheTest Reviewers: Guozhang Wang <wangguoz@gmail.com>, Matthias J. Sax <mjsax@apache.org>
This commit is contained in:
parent
1deb072f56
commit
1acae2a67c
|
|
@ -146,7 +146,7 @@ public class ThreadCacheTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void evict() throws IOException {
|
||||
public void evict() {
|
||||
final List<KeyValue<String, String>> received = new ArrayList<>();
|
||||
final List<KeyValue<String, String>> expected = Collections.singletonList(
|
||||
new KeyValue<>("K1", "V1"));
|
||||
|
|
@ -161,14 +161,10 @@ public class ThreadCacheTest {
|
|||
final ThreadCache cache = new ThreadCache(logContext,
|
||||
memoryCacheEntrySize(kv.key.getBytes(), kv.value.getBytes(), ""),
|
||||
new MockStreamsMetrics(new Metrics()));
|
||||
cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() {
|
||||
@Override
|
||||
public void apply(final List<ThreadCache.DirtyEntry> dirty) {
|
||||
for (final ThreadCache.DirtyEntry dirtyEntry : dirty) {
|
||||
received.add(new KeyValue<>(dirtyEntry.key().toString(), new String(dirtyEntry.newValue())));
|
||||
}
|
||||
cache.addDirtyEntryFlushListener(namespace, dirty -> {
|
||||
for (final ThreadCache.DirtyEntry dirtyEntry : dirty) {
|
||||
received.add(new KeyValue<>(dirtyEntry.key().toString(), new String(dirtyEntry.newValue())));
|
||||
}
|
||||
|
||||
});
|
||||
|
||||
for (final KeyValue<String, String> kvToInsert : toInsert) {
|
||||
|
|
@ -200,12 +196,7 @@ public class ThreadCacheTest {
|
|||
final Bytes key = Bytes.wrap(new byte[]{0});
|
||||
final ThreadCache cache = new ThreadCache(logContext, 10000L, new MockStreamsMetrics(new Metrics()));
|
||||
final List<ThreadCache.DirtyEntry> received = new ArrayList<>();
|
||||
cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() {
|
||||
@Override
|
||||
public void apply(final List<ThreadCache.DirtyEntry> dirty) {
|
||||
received.addAll(dirty);
|
||||
}
|
||||
});
|
||||
cache.addDirtyEntryFlushListener(namespace, received::addAll);
|
||||
cache.put(namespace, key, dirtyEntry(key.get()));
|
||||
assertEquals(key.get(), cache.delete(namespace, key).value());
|
||||
|
||||
|
|
@ -298,12 +289,8 @@ public class ThreadCacheTest {
|
|||
public void shouldSkipEntriesWhereValueHasBeenEvictedFromCache() {
|
||||
final int entrySize = memoryCacheEntrySize(new byte[1], new byte[1], "");
|
||||
final ThreadCache cache = new ThreadCache(logContext, entrySize * 5, new MockStreamsMetrics(new Metrics()));
|
||||
cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() {
|
||||
@Override
|
||||
public void apply(final List<ThreadCache.DirtyEntry> dirty) {
|
||||
cache.addDirtyEntryFlushListener(namespace, dirty -> { });
|
||||
|
||||
}
|
||||
});
|
||||
final byte[][] bytes = {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}};
|
||||
for (int i = 0; i < 5; i++) {
|
||||
cache.put(namespace, Bytes.wrap(bytes[i]), dirtyEntry(bytes[i]));
|
||||
|
|
@ -322,12 +309,9 @@ public class ThreadCacheTest {
|
|||
public void shouldFlushDirtyEntriesForNamespace() {
|
||||
final ThreadCache cache = new ThreadCache(logContext, 100000, new MockStreamsMetrics(new Metrics()));
|
||||
final List<byte[]> received = new ArrayList<>();
|
||||
cache.addDirtyEntryFlushListener(namespace1, new ThreadCache.DirtyEntryFlushListener() {
|
||||
@Override
|
||||
public void apply(final List<ThreadCache.DirtyEntry> dirty) {
|
||||
for (final ThreadCache.DirtyEntry dirtyEntry : dirty) {
|
||||
received.add(dirtyEntry.key().get());
|
||||
}
|
||||
cache.addDirtyEntryFlushListener(namespace1, dirty -> {
|
||||
for (final ThreadCache.DirtyEntry dirtyEntry : dirty) {
|
||||
received.add(dirtyEntry.key().get());
|
||||
}
|
||||
});
|
||||
final List<byte[]> expected = Arrays.asList(new byte[]{0}, new byte[]{1}, new byte[]{2});
|
||||
|
|
@ -344,12 +328,9 @@ public class ThreadCacheTest {
|
|||
public void shouldNotFlushCleanEntriesForNamespace() {
|
||||
final ThreadCache cache = new ThreadCache(logContext, 100000, new MockStreamsMetrics(new Metrics()));
|
||||
final List<byte[]> received = new ArrayList<>();
|
||||
cache.addDirtyEntryFlushListener(namespace1, new ThreadCache.DirtyEntryFlushListener() {
|
||||
@Override
|
||||
public void apply(final List<ThreadCache.DirtyEntry> dirty) {
|
||||
for (final ThreadCache.DirtyEntry dirtyEntry : dirty) {
|
||||
received.add(dirtyEntry.key().get());
|
||||
}
|
||||
cache.addDirtyEntryFlushListener(namespace1, dirty -> {
|
||||
for (final ThreadCache.DirtyEntry dirtyEntry : dirty) {
|
||||
received.add(dirtyEntry.key().get());
|
||||
}
|
||||
});
|
||||
final List<byte[]> toInsert = Arrays.asList(new byte[]{0}, new byte[]{1}, new byte[]{2});
|
||||
|
|
@ -366,12 +347,7 @@ public class ThreadCacheTest {
|
|||
private void shouldEvictImmediatelyIfCacheSizeIsZeroOrVerySmall(final ThreadCache cache) {
|
||||
final List<ThreadCache.DirtyEntry> received = new ArrayList<>();
|
||||
|
||||
cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() {
|
||||
@Override
|
||||
public void apply(final List<ThreadCache.DirtyEntry> dirty) {
|
||||
received.addAll(dirty);
|
||||
}
|
||||
});
|
||||
cache.addDirtyEntryFlushListener(namespace, received::addAll);
|
||||
cache.put(namespace, Bytes.wrap(new byte[]{0}), dirtyEntry(new byte[]{0}));
|
||||
assertEquals(1, received.size());
|
||||
|
||||
|
|
@ -396,12 +372,7 @@ public class ThreadCacheTest {
|
|||
public void shouldEvictAfterPutAll() {
|
||||
final List<ThreadCache.DirtyEntry> received = new ArrayList<>();
|
||||
final ThreadCache cache = new ThreadCache(logContext, 1, new MockStreamsMetrics(new Metrics()));
|
||||
cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() {
|
||||
@Override
|
||||
public void apply(final List<ThreadCache.DirtyEntry> dirty) {
|
||||
received.addAll(dirty);
|
||||
}
|
||||
});
|
||||
cache.addDirtyEntryFlushListener(namespace, received::addAll);
|
||||
|
||||
cache.putAll(namespace, Arrays.asList(KeyValue.pair(Bytes.wrap(new byte[]{0}), dirtyEntry(new byte[]{5})),
|
||||
KeyValue.pair(Bytes.wrap(new byte[]{1}), dirtyEntry(new byte[]{6}))));
|
||||
|
|
@ -425,12 +396,7 @@ public class ThreadCacheTest {
|
|||
public void shouldNotForwardCleanEntryOnEviction() {
|
||||
final ThreadCache cache = new ThreadCache(logContext, 0, new MockStreamsMetrics(new Metrics()));
|
||||
final List<ThreadCache.DirtyEntry> received = new ArrayList<>();
|
||||
cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() {
|
||||
@Override
|
||||
public void apply(final List<ThreadCache.DirtyEntry> dirty) {
|
||||
received.addAll(dirty);
|
||||
}
|
||||
});
|
||||
cache.addDirtyEntryFlushListener(namespace, received::addAll);
|
||||
cache.put(namespace, Bytes.wrap(new byte[]{1}), cleanEntry(new byte[]{0}));
|
||||
assertEquals(0, received.size());
|
||||
}
|
||||
|
|
@ -448,12 +414,7 @@ public class ThreadCacheTest {
|
|||
public void shouldEvictAfterPutIfAbsent() {
|
||||
final List<ThreadCache.DirtyEntry> received = new ArrayList<>();
|
||||
final ThreadCache cache = new ThreadCache(logContext, 1, new MockStreamsMetrics(new Metrics()));
|
||||
cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() {
|
||||
@Override
|
||||
public void apply(final List<ThreadCache.DirtyEntry> dirty) {
|
||||
received.addAll(dirty);
|
||||
}
|
||||
});
|
||||
cache.addDirtyEntryFlushListener(namespace, received::addAll);
|
||||
|
||||
cache.putIfAbsent(namespace, Bytes.wrap(new byte[]{0}), dirtyEntry(new byte[]{5}));
|
||||
cache.putIfAbsent(namespace, Bytes.wrap(new byte[]{1}), dirtyEntry(new byte[]{6}));
|
||||
|
|
@ -468,26 +429,13 @@ public class ThreadCacheTest {
|
|||
final int maxCacheSizeInBytes = 100;
|
||||
final ThreadCache threadCache = new ThreadCache(logContext, maxCacheSizeInBytes, new MockStreamsMetrics(new Metrics()));
|
||||
// trigger a put into another cache on eviction from "name"
|
||||
threadCache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() {
|
||||
@Override
|
||||
public void apply(final List<ThreadCache.DirtyEntry> dirty) {
|
||||
// put an item into an empty cache when the total cache size
|
||||
// is already > than maxCacheSizeBytes
|
||||
threadCache.put(namespace1, Bytes.wrap(new byte[]{0}), dirtyEntry(new byte[2]));
|
||||
}
|
||||
});
|
||||
threadCache.addDirtyEntryFlushListener(namespace1, new ThreadCache.DirtyEntryFlushListener() {
|
||||
@Override
|
||||
public void apply(final List<ThreadCache.DirtyEntry> dirty) {
|
||||
//
|
||||
}
|
||||
});
|
||||
threadCache.addDirtyEntryFlushListener(namespace2, new ThreadCache.DirtyEntryFlushListener() {
|
||||
@Override
|
||||
public void apply(final List<ThreadCache.DirtyEntry> dirty) {
|
||||
|
||||
}
|
||||
threadCache.addDirtyEntryFlushListener(namespace, dirty -> {
|
||||
// put an item into an empty cache when the total cache size
|
||||
// is already > than maxCacheSizeBytes
|
||||
threadCache.put(namespace1, Bytes.wrap(new byte[]{0}), dirtyEntry(new byte[2]));
|
||||
});
|
||||
threadCache.addDirtyEntryFlushListener(namespace1, dirty -> { });
|
||||
threadCache.addDirtyEntryFlushListener(namespace2, dirty -> { });
|
||||
|
||||
threadCache.put(namespace2, Bytes.wrap(new byte[]{1}), dirtyEntry(new byte[1]));
|
||||
threadCache.put(namespace, Bytes.wrap(new byte[]{1}), dirtyEntry(new byte[1]));
|
||||
|
|
|
|||
Loading…
Reference in New Issue