From f91dc5a2393b54f0b82a247168bcc9172a61759b Mon Sep 17 00:00:00 2001 From: ChangYu Huang Date: Tue, 19 Aug 2025 16:24:09 -0400 Subject: [PATCH] Remove busy waiting in testTopicExpiryInMetadata --- .../clients/producer/KafkaProducerTest.java | 28 +++++++++++-------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java index 5d700bba711..02d52fdb3a4 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java @@ -1047,8 +1047,9 @@ public class KafkaProducerTest { @Test public void testTopicExpiryInMetadata() throws InterruptedException { Map configs = new HashMap<>(); + final String maxBlockMs = "300000"; configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); - configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "30000"); + configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs); long refreshBackoffMs = 500L; long refreshBackoffMaxMs = 5000L; long metadataExpireMs = 60000L; @@ -1057,28 +1058,33 @@ public class KafkaProducerTest { final ProducerMetadata metadata = new ProducerMetadata(refreshBackoffMs, refreshBackoffMaxMs, metadataExpireMs, metadataIdleMs, new LogContext(), new ClusterResourceListeners(), time); final String topic = "topic"; + final String warmupTopic = "warmup-topic"; + MockClient client = new MockClient(time, metadata); + client.setShouldUpdateWithCurrentMetadata(false); + client.advanceTimeDuringPoll(true); try (KafkaProducer producer = kafkaProducer(configs, new StringSerializer(), - new StringSerializer(), metadata, new MockClient(time, metadata), null, time)) { + new StringSerializer(), metadata, client, null, time)) { Exchanger exchanger = new Exchanger<>(); Thread t = new Thread(() -> { try { + MetadataResponse updateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap(warmupTopic, 1)); + client.updateMetadata(updateResponse); exchanger.exchange(null); // 1 - while (!metadata.updateRequested()) - Thread.sleep(100); - MetadataResponse updateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap(topic, 1)); - metadata.updateWithCurrentRequestVersion(updateResponse, false, time.milliseconds()); + client.awaitMetadataUpdateRequest(Long.parseLong(maxBlockMs)); + updateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap(topic, 1)); + client.updateMetadata(updateResponse); exchanger.exchange(null); // 2 time.sleep(120 * 1000L); // Update the metadata again, but it should be expired at this point. - updateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap(topic, 1)); - metadata.updateWithCurrentRequestVersion(updateResponse, false, time.milliseconds()); + updateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap(warmupTopic, 1)); + client.updateMetadata(updateResponse); exchanger.exchange(null); // 3 - while (!metadata.updateRequested()) - Thread.sleep(100); - time.sleep(30 * 1000L); + assertTrue(client.awaitMetadataUpdateRequest(Long.parseLong(maxBlockMs))); + System.out.println("Received metadata update request"); + time.sleep(Long.parseLong(maxBlockMs)); } catch (Exception e) { throw new RuntimeException(e); }