mirror of https://github.com/apache/kafka.git
KAFKA-17486: Flaky test RestoreIntegrationTest.shouldInvokeUserDefinedGlobalStateRestoreListener (#17104)
This test has a tricky race condition. We want the restoration to go slow enough so that when a second Kafka Streams instance starts, the restoration of a given TopicPartition pauses due to task re-assignment. But after that point, we'd like the test to proceed faster to avoid any timeout assertions. To that end, here are the changes in this PR: Increase the restore pause to 2 seconds; this should slow the restoration enough so that the process is still in progress once the second instance starts. But once tasks are re-assigned and onRestorePause is called, the restore pause is decremented to zero, allowing the test to proceed faster. Increase the restore batch to its original value of 5 - otherwise, the test moved too slowly. Decrease the number of test records produced to the original value of 100. By increasing the time of restoring each batch until Kafka Streams calls onRestorePause and removing the intentional restoration slowness, 100 records proved good enough in local testing. Reviewers: Matthias J. Sax <mjsax@apache.org>, Chia-Ping Tsai <chia7712@gmail.com>, Yu-LinChen <kh87313@gmail.com>
This commit is contained in:
parent
040ae26472
commit
981133d350
|
@ -114,7 +114,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
public class RestoreIntegrationTest {
|
public class RestoreIntegrationTest {
|
||||||
private static final Logger log = LoggerFactory.getLogger(RestoreIntegrationTest.class);
|
private static final Logger log = LoggerFactory.getLogger(RestoreIntegrationTest.class);
|
||||||
|
|
||||||
private static final Duration RESTORATION_DELAY = Duration.ofMillis(500);
|
private static final Duration RESTORATION_DELAY = Duration.ofMillis(2000);
|
||||||
|
|
||||||
private static final int NUM_BROKERS = 1;
|
private static final int NUM_BROKERS = 1;
|
||||||
|
|
||||||
|
@ -558,8 +558,7 @@ public class RestoreIntegrationTest {
|
||||||
);
|
);
|
||||||
final Map<String, Object> kafkaStreams2Configuration = mkMap(
|
final Map<String, Object> kafkaStreams2Configuration = mkMap(
|
||||||
mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId).getPath() + "-ks2"),
|
mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId).getPath() + "-ks2"),
|
||||||
mkEntry(StreamsConfig.CLIENT_ID_CONFIG, appId + "-ks2"),
|
mkEntry(StreamsConfig.CLIENT_ID_CONFIG, appId + "-ks2")
|
||||||
mkEntry(StreamsConfig.restoreConsumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 1)
|
|
||||||
);
|
);
|
||||||
|
|
||||||
final StreamsBuilder builder = new StreamsBuilder();
|
final StreamsBuilder builder = new StreamsBuilder();
|
||||||
|
@ -569,7 +568,7 @@ public class RestoreIntegrationTest {
|
||||||
.toStream()
|
.toStream()
|
||||||
.to(outputTopic);
|
.to(outputTopic);
|
||||||
|
|
||||||
final List<KeyValue<Integer, Integer>> sampleData = IntStream.range(0, 500)
|
final List<KeyValue<Integer, Integer>> sampleData = IntStream.range(0, 100)
|
||||||
.mapToObj(i -> new KeyValue<>(i, i))
|
.mapToObj(i -> new KeyValue<>(i, i))
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
|
|
||||||
|
@ -592,7 +591,7 @@ public class RestoreIntegrationTest {
|
||||||
// Simulate a new instance joining in the middle of the restoration.
|
// Simulate a new instance joining in the middle of the restoration.
|
||||||
// When this happens, some of the partitions that kafkaStreams1 was restoring will be migrated to kafkaStreams2,
|
// When this happens, some of the partitions that kafkaStreams1 was restoring will be migrated to kafkaStreams2,
|
||||||
// and kafkaStreams1 must call StateRestoreListener#onRestoreSuspended.
|
// and kafkaStreams1 must call StateRestoreListener#onRestoreSuspended.
|
||||||
final TestStateRestoreListener kafkaStreams2StateRestoreListener = new TestStateRestoreListener("ks2", RESTORATION_DELAY);
|
final TestStateRestoreListener kafkaStreams2StateRestoreListener = new TestStateRestoreListener("ks2", Duration.ZERO);
|
||||||
|
|
||||||
try (final KafkaStreams kafkaStreams2 = startKafkaStreams(builder,
|
try (final KafkaStreams kafkaStreams2 = startKafkaStreams(builder,
|
||||||
kafkaStreams2StateRestoreListener,
|
kafkaStreams2StateRestoreListener,
|
||||||
|
@ -647,7 +646,7 @@ public class RestoreIntegrationTest {
|
||||||
|
|
||||||
private static final class TestStateRestoreListener implements StateRestoreListener {
|
private static final class TestStateRestoreListener implements StateRestoreListener {
|
||||||
private final String instanceName;
|
private final String instanceName;
|
||||||
private final Duration onBatchRestoredSleepDuration;
|
private Duration onBatchRestoredSleepDuration;
|
||||||
|
|
||||||
private final CountDownLatch onRestoreStartLatch = new CountDownLatch(1);
|
private final CountDownLatch onRestoreStartLatch = new CountDownLatch(1);
|
||||||
private final CountDownLatch onRestoreEndLatch = new CountDownLatch(1);
|
private final CountDownLatch onRestoreEndLatch = new CountDownLatch(1);
|
||||||
|
@ -711,6 +710,7 @@ public class RestoreIntegrationTest {
|
||||||
final long totalRestored) {
|
final long totalRestored) {
|
||||||
log.info("[{}] called onRestoreSuspended. topicPartition={}, storeName={}, totalRestored={}",
|
log.info("[{}] called onRestoreSuspended. topicPartition={}, storeName={}, totalRestored={}",
|
||||||
instanceName, topicPartition, storeName, totalRestored);
|
instanceName, topicPartition, storeName, totalRestored);
|
||||||
|
onBatchRestoredSleepDuration = Duration.ZERO;
|
||||||
onRestoreSuspendedLatch.countDown();
|
onRestoreSuspendedLatch.countDown();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue