KAFKA-17515 Fix flaky RestoreIntegrationTest.shouldInvokeUserDefinedGlobalStateRestoreListener (#17187)

Found two issues in the flaky tests: (Put the log analysis under Jira comments.)

1) The error "java.nio.file.DirectoryNotEmptyException" occurs if the flush() of kafkaStreams.close() and purgeLocalStreamsState() are triggered in the same time. (The current timeout is 5 sec, which is too short since the CI is unstable and slow).
2) Racing issue: Task to-be restored in ks-1 are rebalanced to ks-2 before entering active restoring state. So no onRestoreSuspend() was triggered.

To solve the issues:
1) Remove the timeout in kafkaStreams.close()
2) Ensure all tasks in ks-1 are active restoring before start second KafkaStreams(ks-2)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
陳昱霖(Yu-Lin Chen) 2024-09-19 14:58:04 +08:00 committed by GitHub
parent 09e3c12057
commit 8f5cf9968f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 18 additions and 1 deletions

View File

@ -102,6 +102,7 @@ import static org.apache.kafka.streams.Topology.AutoOffsetReset.EARLIEST;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForActiveRestoringTask;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForCompletion;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForStandbyCompletion;
import static org.apache.kafka.test.TestUtils.waitForCondition;
@ -579,12 +580,16 @@ public class RestoreIntegrationTest {
validateReceivedMessages(sampleData, outputTopic);
// Close kafkaStreams1 (with cleanup) and start it again to force the restoration of the state.
kafkaStreams.close(Duration.ofMillis(5000L));
kafkaStreams.close();
IntegrationTestUtils.purgeLocalStreamsState(streamsConfigurations);
final TestStateRestoreListener kafkaStreams1StateRestoreListener = new TestStateRestoreListener("ks1", RESTORATION_DELAY);
kafkaStreams = startKafkaStreams(builder, kafkaStreams1StateRestoreListener, kafkaStreams1Configuration);
// Ensure all the restoring tasks are in active state before starting the new instance.
// Otherwise, the tasks which assigned to first kafka streams won't encounter "restoring suspend" after being reassigned to the second instance.
waitForActiveRestoringTask(kafkaStreams, 5, IntegrationTestUtils.DEFAULT_TIMEOUT);
assertTrue(kafkaStreams1StateRestoreListener.awaitUntilRestorationStarts());
assertTrue(kafkaStreams1StateRestoreListener.awaitUntilBatchRestoredIsCalled());

View File

@ -634,6 +634,18 @@ public class IntegrationTestUtils {
}
}
/**
* Wait until enough restoring tasks have been started
*/
public static void waitForActiveRestoringTask(final KafkaStreams streams,
final int expectedTasks,
final long timeoutMilliseconds) throws Exception {
TestUtils.waitForCondition(() -> streams.metrics().entrySet().stream()
.filter(metric -> metric.getKey().name().equals("active-restoring-tasks"))
.anyMatch(metric -> ((Number) metric.getValue().metricValue()).intValue() == expectedTasks),
timeoutMilliseconds, "Timed out waiting for active restoring task");
}
/**
* Wait until enough data (consumer records) has been consumed.
*