KAFKA-16991: Flaky PurgeRepartitionTopicIntegrationTest (#16503)

When the PurgeRepartitionTopicintegrationTest was written, the InitialTaskDelayMs was hard-coded on the broker requiring setting a timeout in the test to wait for the delay to expire. But I believe this creates a race condition where the test times out before the broker deletes the inactive segment. PR #15719 introduced an internal config to control the IntitialTaskDelayMs config for speeding up tests, and this PR leverages this internal config to reduce the task delay to 0 to eliminate this race condition.
This commit is contained in:
Bill Bejeck 2024-07-03 10:02:28 -04:00 committed by GitHub
parent 35baa0ac4f
commit 20e101c2e4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 3 additions and 2 deletions

View File

@ -68,10 +68,12 @@ public class PurgeRepartitionTopicIntegrationTest {
private static KafkaStreams kafkaStreams;
private static final Integer PURGE_INTERVAL_MS = 10;
private static final Integer PURGE_SEGMENT_BYTES = 2000;
private static final Integer INITIAL_TASK_DELAY_MS = 0;
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS, new Properties() {
{
put("log.retention.check.interval.ms", PURGE_INTERVAL_MS);
put("log.initial.task.delay.ms", INITIAL_TASK_DELAY_MS);
put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, 0);
}
});
@ -215,8 +217,7 @@ public class PurgeRepartitionTopicIntegrationTest {
60000,
"Repartition topic " + REPARTITION_TOPIC + " not received more than " + PURGE_SEGMENT_BYTES + "B of data after 60000 ms."
);
// we need long enough timeout to by-pass the log manager's InitialTaskDelayMs, which is hard-coded on server side
final long waitForPurgeMs = 60000;
TestUtils.waitForCondition(
new RepartitionTopicVerified(currentSize -> currentSize <= PURGE_SEGMENT_BYTES),