KAFKA-13714: Fix cache flush position (#11926)

The caching store layers were passing down writes into lower store layers upon eviction, but not setting the context to the evicted records' context. Instead, the context was from whatever unrelated record was being processed at the time.

Reviewers: Matthias J. Sax <mjsax@apache.org>
This commit is contained in:
John Roesler 2022-03-23 22:09:05 -05:00 committed by GitHub
parent 6ce69021fd
commit 322a065b90
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 252 additions and 59 deletions

View File

@ -37,13 +37,16 @@ Follow instructions in https://kafka.apache.org/quickstart
./gradlew integrationTest ./gradlew integrationTest
### Force re-running tests without code change ### ### Force re-running tests without code change ###
./gradlew cleanTest test ./gradlew -Prerun-tests test
./gradlew cleanTest unitTest ./gradlew -Prerun-tests unitTest
./gradlew cleanTest integrationTest ./gradlew -Prerun-tests integrationTest
### Running a particular unit/integration test ### ### Running a particular unit/integration test ###
./gradlew clients:test --tests RequestResponseTest ./gradlew clients:test --tests RequestResponseTest
### Repeatedly running a particular unit/integration test ###
I=0; while ./gradlew clients:test -Prerun-tests --tests RequestResponseTest --fail-fast; do (( I=$I+1 )); echo "Completed run: $I"; sleep 1; done
### Running a particular test method within a unit/integration test ### ### Running a particular test method within a unit/integration test ###
./gradlew core:test --tests kafka.api.ProducerFailureHandlingTest.testCannotSendToInternalTopic ./gradlew core:test --tests kafka.api.ProducerFailureHandlingTest.testCannotSendToInternalTopic
./gradlew clients:test --tests org.apache.kafka.clients.MetadataTest.testTimeToNextUpdate ./gradlew clients:test --tests org.apache.kafka.clients.MetadataTest.testTimeToNextUpdate

View File

@ -207,7 +207,7 @@ if (file('.git').exists()) {
} else { } else {
rat.enabled = false rat.enabled = false
} }
println("Starting build with version $version (commit id ${commitId.take(8)}) using Gradle $gradleVersion, Java ${JavaVersion.current()} and Scala ${versions.scala}") println("Starting build with version $version (commit id ${commitId == null ? "null" : commitId.take(8)}) using Gradle $gradleVersion, Java ${JavaVersion.current()} and Scala ${versions.scala}")
println("Build properties: maxParallelForks=$maxTestForks, maxScalacThreads=$maxScalacThreads, maxTestRetries=$userMaxTestRetries") println("Build properties: maxParallelForks=$maxTestForks, maxScalacThreads=$maxScalacThreads, maxTestRetries=$userMaxTestRetries")
subprojects { subprojects {
@ -435,6 +435,11 @@ subprojects {
maxRetries = userMaxTestRetries maxRetries = userMaxTestRetries
maxFailures = userMaxTestRetryFailures maxFailures = userMaxTestRetryFailures
} }
// Allows devs to run tests in a loop to debug flaky tests. See README.
if (project.hasProperty("rerun-tests")) {
outputs.upToDateWhen { false }
}
} }
task integrationTest(type: Test, dependsOn: compileJava) { task integrationTest(type: Test, dependsOn: compileJava) {
@ -468,6 +473,11 @@ subprojects {
maxRetries = userMaxTestRetries maxRetries = userMaxTestRetries
maxFailures = userMaxTestRetryFailures maxFailures = userMaxTestRetryFailures
} }
// Allows devs to run tests in a loop to debug flaky tests. See README.
if (project.hasProperty("rerun-tests")) {
outputs.upToDateWhen { false }
}
} }
task unitTest(type: Test, dependsOn: compileJava) { task unitTest(type: Test, dependsOn: compileJava) {

View File

@ -226,10 +226,9 @@ public class CachingKeyValueStore
if (rawNewValue != null || rawOldValue != null) { if (rawNewValue != null || rawOldValue != null) {
// we need to get the old values if needed, and then put to store, and then flush // we need to get the old values if needed, and then put to store, and then flush
final ProcessorRecordContext current = context.recordContext(); final ProcessorRecordContext current = context.recordContext();
context.setRecordContext(entry.entry().context());
wrapped().put(entry.key(), entry.newValue());
try { try {
context.setRecordContext(entry.entry().context());
wrapped().put(entry.key(), entry.newValue());
flushListener.apply( flushListener.apply(
new Record<>( new Record<>(
entry.key().get(), entry.key().get(),
@ -241,7 +240,13 @@ public class CachingKeyValueStore
} }
} }
} else { } else {
wrapped().put(entry.key(), entry.newValue()); final ProcessorRecordContext current = context.recordContext();
try {
context.setRecordContext(entry.entry().context());
wrapped().put(entry.key(), entry.newValue());
} finally {
context.setRecordContext(current);
}
} }
} }

View File

@ -105,11 +105,11 @@ class CachingSessionStore
// we can skip flushing to downstream as well as writing to underlying store // we can skip flushing to downstream as well as writing to underlying store
if (newValueBytes != null || oldValueBytes != null) { if (newValueBytes != null || oldValueBytes != null) {
// we need to get the old values if needed, and then put to store, and then flush // we need to get the old values if needed, and then put to store, and then flush
wrapped().put(bytesKey, entry.newValue());
final ProcessorRecordContext current = context.recordContext(); final ProcessorRecordContext current = context.recordContext();
context.setRecordContext(entry.entry().context());
try { try {
context.setRecordContext(entry.entry().context());
wrapped().put(bytesKey, entry.newValue());
flushListener.apply( flushListener.apply(
new Record<>( new Record<>(
binaryKey.get(), binaryKey.get(),
@ -121,7 +121,13 @@ class CachingSessionStore
} }
} }
} else { } else {
wrapped().put(bytesKey, entry.newValue()); final ProcessorRecordContext current = context.recordContext();
try {
context.setRecordContext(entry.entry().context());
wrapped().put(bytesKey, entry.newValue());
} finally {
context.setRecordContext(current);
}
} }
} }

View File

@ -122,11 +122,11 @@ class CachingWindowStore
// we can skip flushing to downstream as well as writing to underlying store // we can skip flushing to downstream as well as writing to underlying store
if (rawNewValue != null || rawOldValue != null) { if (rawNewValue != null || rawOldValue != null) {
// we need to get the old values if needed, and then put to store, and then flush // we need to get the old values if needed, and then put to store, and then flush
wrapped().put(binaryKey, entry.newValue(), windowStartTimestamp);
final ProcessorRecordContext current = context.recordContext(); final ProcessorRecordContext current = context.recordContext();
context.setRecordContext(entry.entry().context());
try { try {
context.setRecordContext(entry.entry().context());
wrapped().put(binaryKey, entry.newValue(), windowStartTimestamp);
flushListener.apply( flushListener.apply(
new Record<>( new Record<>(
binaryWindowKey, binaryWindowKey,
@ -138,7 +138,13 @@ class CachingWindowStore
} }
} }
} else { } else {
wrapped().put(binaryKey, entry.newValue(), windowStartTimestamp); final ProcessorRecordContext current = context.recordContext();
try {
context.setRecordContext(entry.entry().context());
wrapped().put(binaryKey, entry.newValue(), windowStartTimestamp);
} finally {
context.setRecordContext(current);
}
} }
} }

View File

@ -326,8 +326,8 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
public void putAll(final List<KeyValue<Bytes, byte[]>> entries) { public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
try (final WriteBatch batch = new WriteBatch()) { try (final WriteBatch batch = new WriteBatch()) {
dbAccessor.prepareBatch(entries, batch); dbAccessor.prepareBatch(entries, batch);
StoreQueryUtils.updatePosition(position, context);
write(batch); write(batch);
StoreQueryUtils.updatePosition(position, context);
} catch (final RocksDBException e) { } catch (final RocksDBException e) {
throw new ProcessorStateException("Error while batch writing to store " + name, e); throw new ProcessorStateException("Error while batch writing to store " + name, e);
} }

View File

@ -142,7 +142,9 @@ public final class StoreQueryUtils {
if (stateStoreContext != null && stateStoreContext.recordMetadata().isPresent()) { if (stateStoreContext != null && stateStoreContext.recordMetadata().isPresent()) {
final RecordMetadata meta = stateStoreContext.recordMetadata().get(); final RecordMetadata meta = stateStoreContext.recordMetadata().get();
position.withComponent(meta.topic(), meta.partition(), meta.offset()); if (meta.topic() != null) {
position.withComponent(meta.topic(), meta.partition(), meta.offset());
}
} }
} }

View File

@ -760,45 +760,50 @@ public class IQv2StoreIntegrationTest {
@Test @Test
public void verifyStore() { public void verifyStore() {
if (storeToTest.global()) { try {
// See KAFKA-13523 if (storeToTest.global()) {
globalShouldRejectAllQueries(); // See KAFKA-13523
} else { globalShouldRejectAllQueries();
shouldRejectUnknownQuery(); } else {
shouldCollectExecutionInfo(); shouldRejectUnknownQuery();
shouldCollectExecutionInfoUnderFailure(); shouldCollectExecutionInfo();
shouldCollectExecutionInfoUnderFailure();
if (storeToTest.keyValue()) { if (storeToTest.keyValue()) {
if (storeToTest.timestamped()) { if (storeToTest.timestamped()) {
final Function<ValueAndTimestamp<Integer>, Integer> valueExtractor = final Function<ValueAndTimestamp<Integer>, Integer> valueExtractor =
ValueAndTimestamp::value;
shouldHandleKeyQuery(2, valueExtractor, 2);
shouldHandleRangeQueries(valueExtractor);
} else {
final Function<Integer, Integer> valueExtractor = Function.identity();
shouldHandleKeyQuery(2, valueExtractor, 2);
shouldHandleRangeQueries(valueExtractor);
}
}
if (storeToTest.isWindowed()) {
if (storeToTest.timestamped()) {
final Function<ValueAndTimestamp<Integer>, Integer> valueExtractor =
ValueAndTimestamp::value; ValueAndTimestamp::value;
shouldHandleWindowKeyQueries(valueExtractor); shouldHandleKeyQuery(2, valueExtractor, 2);
shouldHandleWindowRangeQueries(valueExtractor); shouldHandleRangeQueries(valueExtractor);
} else { } else {
final Function<Integer, Integer> valueExtractor = Function.identity(); final Function<Integer, Integer> valueExtractor = Function.identity();
shouldHandleWindowKeyQueries(valueExtractor); shouldHandleKeyQuery(2, valueExtractor, 2);
shouldHandleWindowRangeQueries(valueExtractor); shouldHandleRangeQueries(valueExtractor);
}
}
if (storeToTest.isWindowed()) {
if (storeToTest.timestamped()) {
final Function<ValueAndTimestamp<Integer>, Integer> valueExtractor =
ValueAndTimestamp::value;
shouldHandleWindowKeyQueries(valueExtractor);
shouldHandleWindowRangeQueries(valueExtractor);
} else {
final Function<Integer, Integer> valueExtractor = Function.identity();
shouldHandleWindowKeyQueries(valueExtractor);
shouldHandleWindowRangeQueries(valueExtractor);
}
}
if (storeToTest.isSession()) {
// Note there's no "timestamped" differentiation here.
// Idiosyncratically, SessionStores are _never_ timestamped.
shouldHandleSessionKeyQueries();
} }
} }
} catch (final AssertionError e) {
if (storeToTest.isSession()) { LOG.error("Failed assertion", e);
// Note there's no "timestamped" differentiation here. throw e;
// Idiosyncratically, SessionStores are _never_ timestamped.
shouldHandleSessionKeyQueries();
}
} }
} }
@ -1350,7 +1355,7 @@ public class IQv2StoreIntegrationTest {
final String supplier, final String kind) { final String supplier, final String kind) {
final String safeTestName = final String safeTestName =
IQv2StoreIntegrationTest.class.getName() + "-" + cache + "-" + log + "-" + supplier IQv2StoreIntegrationTest.class.getName() + "-" + cache + "-" + log + "-" + supplier
+ "-" + kind; + "-" + kind + "-" + RANDOM.nextInt();
final Properties config = new Properties(); final Properties config = new Properties();
config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE); config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName); config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);

View File

@ -221,17 +221,44 @@ public class CachingInMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest
} }
@Test @Test
public void shouldMatchPositionAfterPut() { public void shouldMatchPositionAfterPutWithFlushListener() {
store.setFlushListener(record -> { }, false);
shouldMatchPositionAfterPut();
}
@Test
public void shouldMatchPositionAfterPutWithoutFlushListener() {
store.setFlushListener(null, false);
shouldMatchPositionAfterPut();
}
private void shouldMatchPositionAfterPut() {
context.setRecordContext(new ProcessorRecordContext(0, 1, 0, "", new RecordHeaders())); context.setRecordContext(new ProcessorRecordContext(0, 1, 0, "", new RecordHeaders()));
store.put(bytesKey("key1"), bytesValue("value1")); store.put(bytesKey("key1"), bytesValue("value1"));
context.setRecordContext(new ProcessorRecordContext(0, 2, 0, "", new RecordHeaders())); context.setRecordContext(new ProcessorRecordContext(0, 2, 0, "", new RecordHeaders()));
store.put(bytesKey("key2"), bytesValue("value2")); store.put(bytesKey("key2"), bytesValue("value2"));
context.setRecordContext(new ProcessorRecordContext(0, 3, 0, "", new RecordHeaders()));
store.put(bytesKey("key3"), bytesValue("value3"));
final Position expected = Position.fromMap(mkMap(mkEntry("", mkMap(mkEntry(0, 3L))))); // Position should correspond to the last record's context, not the current context.
final Position actual = store.getPosition(); context.setRecordContext(
assertEquals(expected, actual); new ProcessorRecordContext(0, 3, 0, "", new RecordHeaders())
);
assertEquals(
Position.fromMap(mkMap(mkEntry("", mkMap(mkEntry(0, 2L))))),
store.getPosition()
);
assertEquals(Position.emptyPosition(), underlyingStore.getPosition());
store.flush();
assertEquals(
Position.fromMap(mkMap(mkEntry("", mkMap(mkEntry(0, 2L))))),
store.getPosition()
);
assertEquals(
Position.fromMap(mkMap(mkEntry("", mkMap(mkEntry(0, 2L))))),
underlyingStore.getPosition()
);
} }
private byte[] bytesValue(final String value) { private byte[] bytesValue(final String value) {

View File

@ -36,6 +36,7 @@ import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.SessionStore; import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.test.InternalMockProcessorContext; import org.apache.kafka.test.InternalMockProcessorContext;
@ -53,6 +54,8 @@ import java.util.List;
import java.util.Random; import java.util.Random;
import static java.util.Arrays.asList; import static java.util.Arrays.asList;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.test.StreamsTestUtils.toList; import static org.apache.kafka.test.StreamsTestUtils.toList;
import static org.apache.kafka.test.StreamsTestUtils.verifyKeyValueList; import static org.apache.kafka.test.StreamsTestUtils.verifyKeyValueList;
import static org.apache.kafka.test.StreamsTestUtils.verifyWindowedKeyValue; import static org.apache.kafka.test.StreamsTestUtils.verifyWindowedKeyValue;
@ -142,6 +145,46 @@ public class CachingInMemorySessionStoreTest {
} }
} }
@Test
public void shouldMatchPositionAfterPutWithFlushListener() {
cachingStore.setFlushListener(record -> { }, false);
shouldMatchPositionAfterPut();
}
@Test
public void shouldMatchPositionAfterPutWithoutFlushListener() {
cachingStore.setFlushListener(null, false);
shouldMatchPositionAfterPut();
}
private void shouldMatchPositionAfterPut() {
context.setRecordContext(new ProcessorRecordContext(0, 1, 0, "", new RecordHeaders()));
cachingStore.put(new Windowed<>(keyA, new SessionWindow(0, 0)), "1".getBytes());
context.setRecordContext(new ProcessorRecordContext(0, 2, 0, "", new RecordHeaders()));
cachingStore.put(new Windowed<>(keyA, new SessionWindow(0, 0)), "1".getBytes());
// Position should correspond to the last record's context, not the current context.
context.setRecordContext(
new ProcessorRecordContext(0, 3, 0, "", new RecordHeaders())
);
// the caching session store doesn't maintain a separate
// position because it never serves queries from the cache
assertEquals(Position.emptyPosition(), cachingStore.getPosition());
assertEquals(Position.emptyPosition(), underlyingStore.getPosition());
cachingStore.flush();
assertEquals(
Position.fromMap(mkMap(mkEntry("", mkMap(mkEntry(0, 2L))))),
cachingStore.getPosition()
);
assertEquals(
Position.fromMap(mkMap(mkEntry("", mkMap(mkEntry(0, 2L))))),
underlyingStore.getPosition()
);
}
@Test @Test
public void shouldPutFetchAllKeysFromCache() { public void shouldPutFetchAllKeysFromCache() {
cachingStore.put(new Windowed<>(keyA, new SessionWindow(0, 0)), "1".getBytes()); cachingStore.put(new Windowed<>(keyA, new SessionWindow(0, 0)), "1".getBytes());

View File

@ -35,6 +35,7 @@ import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.SessionStore; import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.test.InternalMockProcessorContext; import org.apache.kafka.test.InternalMockProcessorContext;
@ -52,6 +53,8 @@ import java.util.List;
import java.util.Random; import java.util.Random;
import static java.util.Arrays.asList; import static java.util.Arrays.asList;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.test.StreamsTestUtils.toList; import static org.apache.kafka.test.StreamsTestUtils.toList;
import static org.apache.kafka.test.StreamsTestUtils.verifyKeyValueList; import static org.apache.kafka.test.StreamsTestUtils.verifyKeyValueList;
import static org.apache.kafka.test.StreamsTestUtils.verifyWindowedKeyValue; import static org.apache.kafka.test.StreamsTestUtils.verifyWindowedKeyValue;
@ -80,6 +83,7 @@ public class CachingPersistentSessionStoreTest {
private SessionStore<Bytes, byte[]> underlyingStore; private SessionStore<Bytes, byte[]> underlyingStore;
private CachingSessionStore cachingStore; private CachingSessionStore cachingStore;
private ThreadCache cache; private ThreadCache cache;
private InternalMockProcessorContext<Object, Object> context;
@Before @Before
public void before() { public void before() {
@ -93,7 +97,7 @@ public class CachingPersistentSessionStoreTest {
underlyingStore = new RocksDBSessionStore(segmented); underlyingStore = new RocksDBSessionStore(segmented);
cachingStore = new CachingSessionStore(underlyingStore, SEGMENT_INTERVAL); cachingStore = new CachingSessionStore(underlyingStore, SEGMENT_INTERVAL);
cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics())); cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
final InternalMockProcessorContext context = this.context =
new InternalMockProcessorContext<>(TestUtils.tempDirectory(), null, null, null, cache); new InternalMockProcessorContext<>(TestUtils.tempDirectory(), null, null, null, cache);
context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, TOPIC, new RecordHeaders())); context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, TOPIC, new RecordHeaders()));
cachingStore.init((StateStoreContext) context, cachingStore); cachingStore.init((StateStoreContext) context, cachingStore);
@ -123,6 +127,45 @@ public class CachingPersistentSessionStoreTest {
assertFalse(b.hasNext()); assertFalse(b.hasNext());
} }
} }
@Test
public void shouldMatchPositionAfterPutWithFlushListener() {
cachingStore.setFlushListener(record -> { }, false);
shouldMatchPositionAfterPut();
}
@Test
public void shouldMatchPositionAfterPutWithoutFlushListener() {
cachingStore.setFlushListener(null, false);
shouldMatchPositionAfterPut();
}
private void shouldMatchPositionAfterPut() {
context.setRecordContext(new ProcessorRecordContext(0, 1, 0, "", new RecordHeaders()));
cachingStore.put(new Windowed<>(keyA, new SessionWindow(0, 0)), "1".getBytes());
context.setRecordContext(new ProcessorRecordContext(0, 2, 0, "", new RecordHeaders()));
cachingStore.put(new Windowed<>(keyA, new SessionWindow(0, 0)), "1".getBytes());
// Position should correspond to the last record's context, not the current context.
context.setRecordContext(
new ProcessorRecordContext(0, 3, 0, "", new RecordHeaders())
);
// the caching session store doesn't maintain a separate
// position because it never serves queries from the cache
assertEquals(Position.emptyPosition(), cachingStore.getPosition());
assertEquals(Position.emptyPosition(), underlyingStore.getPosition());
cachingStore.flush();
assertEquals(
Position.fromMap(mkMap(mkEntry("", mkMap(mkEntry(0, 2L))))),
cachingStore.getPosition()
);
assertEquals(
Position.fromMap(mkMap(mkEntry("", mkMap(mkEntry(0, 2L))))),
underlyingStore.getPosition()
);
}
@Test @Test
public void shouldPutFetchAllKeysFromCache() { public void shouldPutFetchAllKeysFromCache() {

View File

@ -39,6 +39,7 @@ import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.Stores;
@ -63,6 +64,8 @@ import static java.time.Duration.ofHours;
import static java.time.Duration.ofMinutes; import static java.time.Duration.ofMinutes;
import static java.time.Instant.ofEpochMilli; import static java.time.Instant.ofEpochMilli;
import static java.util.Arrays.asList; import static java.util.Arrays.asList;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.streams.state.internals.ThreadCacheTest.memoryCacheEntrySize; import static org.apache.kafka.streams.state.internals.ThreadCacheTest.memoryCacheEntrySize;
import static org.apache.kafka.test.StreamsTestUtils.toList; import static org.apache.kafka.test.StreamsTestUtils.toList;
import static org.apache.kafka.test.StreamsTestUtils.verifyAllWindowedKeyValues; import static org.apache.kafka.test.StreamsTestUtils.verifyAllWindowedKeyValues;
@ -260,6 +263,46 @@ public class CachingPersistentWindowStoreTest {
} }
} }
@Test
public void shouldMatchPositionAfterPutWithFlushListener() {
cachingStore.setFlushListener(record -> { }, false);
shouldMatchPositionAfterPut();
}
@Test
public void shouldMatchPositionAfterPutWithoutFlushListener() {
cachingStore.setFlushListener(null, false);
shouldMatchPositionAfterPut();
}
private void shouldMatchPositionAfterPut() {
context.setRecordContext(new ProcessorRecordContext(0, 1, 0, "", new RecordHeaders()));
cachingStore.put(bytesKey("key1"), bytesValue("value1"), DEFAULT_TIMESTAMP);
context.setRecordContext(new ProcessorRecordContext(0, 2, 0, "", new RecordHeaders()));
cachingStore.put(bytesKey("key2"), bytesValue("value2"), DEFAULT_TIMESTAMP);
// Position should correspond to the last record's context, not the current context.
context.setRecordContext(
new ProcessorRecordContext(0, 3, 0, "", new RecordHeaders())
);
// the caching window store doesn't maintain a separate
// position because it never serves queries from the cache
assertEquals(Position.emptyPosition(), cachingStore.getPosition());
assertEquals(Position.emptyPosition(), underlyingStore.getPosition());
cachingStore.flush();
assertEquals(
Position.fromMap(mkMap(mkEntry("", mkMap(mkEntry(0, 2L))))),
cachingStore.getPosition()
);
assertEquals(
Position.fromMap(mkMap(mkEntry("", mkMap(mkEntry(0, 2L))))),
underlyingStore.getPosition()
);
}
private void verifyKeyValue(final KeyValue<Long, byte[]> next, private void verifyKeyValue(final KeyValue<Long, byte[]> next,
final long expectedKey, final long expectedKey,
final String expectedValue) { final String expectedValue) {