Remove busy waiting in testTopicExpiryInMetadata

This commit is contained in:
ChangYu Huang 2025-08-19 16:24:09 -04:00
parent 2b68e44a35
commit f91dc5a239
1 changed files with 17 additions and 11 deletions

View File

@ -1047,8 +1047,9 @@ public class KafkaProducerTest {
@Test
public void testTopicExpiryInMetadata() throws InterruptedException {
Map<String, Object> configs = new HashMap<>();
final String maxBlockMs = "300000";
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "30000");
configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs);
long refreshBackoffMs = 500L;
long refreshBackoffMaxMs = 5000L;
long metadataExpireMs = 60000L;
@ -1057,28 +1058,33 @@ public class KafkaProducerTest {
final ProducerMetadata metadata = new ProducerMetadata(refreshBackoffMs, refreshBackoffMaxMs, metadataExpireMs, metadataIdleMs,
new LogContext(), new ClusterResourceListeners(), time);
final String topic = "topic";
final String warmupTopic = "warmup-topic";
MockClient client = new MockClient(time, metadata);
client.setShouldUpdateWithCurrentMetadata(false);
client.advanceTimeDuringPoll(true);
try (KafkaProducer<String, String> producer = kafkaProducer(configs, new StringSerializer(),
new StringSerializer(), metadata, new MockClient(time, metadata), null, time)) {
new StringSerializer(), metadata, client, null, time)) {
Exchanger<Void> exchanger = new Exchanger<>();
Thread t = new Thread(() -> {
try {
MetadataResponse updateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap(warmupTopic, 1));
client.updateMetadata(updateResponse);
exchanger.exchange(null); // 1
while (!metadata.updateRequested())
Thread.sleep(100);
MetadataResponse updateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap(topic, 1));
metadata.updateWithCurrentRequestVersion(updateResponse, false, time.milliseconds());
client.awaitMetadataUpdateRequest(Long.parseLong(maxBlockMs));
updateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap(topic, 1));
client.updateMetadata(updateResponse);
exchanger.exchange(null); // 2
time.sleep(120 * 1000L);
// Update the metadata again, but it should be expired at this point.
updateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap(topic, 1));
metadata.updateWithCurrentRequestVersion(updateResponse, false, time.milliseconds());
updateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap(warmupTopic, 1));
client.updateMetadata(updateResponse);
exchanger.exchange(null); // 3
while (!metadata.updateRequested())
Thread.sleep(100);
time.sleep(30 * 1000L);
assertTrue(client.awaitMetadataUpdateRequest(Long.parseLong(maxBlockMs)));
System.out.println("Received metadata update request");
time.sleep(Long.parseLong(maxBlockMs));
} catch (Exception e) {
throw new RuntimeException(e);
}