From 2b68e44a35a7788d5f64564eb4a413a85774cd9c Mon Sep 17 00:00:00 2001 From: ChangYu Huang Date: Tue, 19 Aug 2025 12:30:24 -0400 Subject: [PATCH] Remove busy waiting in testTopicNotExistingInMetadata --- .../kafka/clients/producer/KafkaProducerTest.java | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java index 9f6d7898881..5d700bba711 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java @@ -1005,8 +1005,9 @@ public class KafkaProducerTest { @Test public void testTopicNotExistingInMetadata() throws InterruptedException { Map configs = new HashMap<>(); + final String maxBlockMs = "300000"; configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); - configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "30000"); + configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs); long refreshBackoffMs = 500L; long refreshBackoffMaxMs = 5000L; long metadataExpireMs = 60000L; @@ -1015,8 +1016,10 @@ public class KafkaProducerTest { final ProducerMetadata metadata = new ProducerMetadata(refreshBackoffMs, refreshBackoffMaxMs, metadataExpireMs, metadataIdleMs, new LogContext(), new ClusterResourceListeners(), time); final String topic = "topic"; + MockClient client = new MockClient(time, metadata); + client.setShouldUpdateWithCurrentMetadata(false); try (KafkaProducer producer = kafkaProducer(configs, new StringSerializer(), - new StringSerializer(), metadata, new MockClient(time, metadata), null, time)) { + new StringSerializer(), metadata, client, null, time)) { Exchanger exchanger = new Exchanger<>(); @@ -1025,11 +1028,10 @@ public class KafkaProducerTest { // Update the metadata with non-existing topic. MetadataResponse updateResponse = RequestTestUtils.metadataUpdateWith("kafka-cluster", 1, singletonMap(topic, Errors.UNKNOWN_TOPIC_OR_PARTITION), emptyMap()); - metadata.updateWithCurrentRequestVersion(updateResponse, false, time.milliseconds()); + client.updateMetadata(updateResponse); exchanger.exchange(null); - while (!metadata.updateRequested()) - Thread.sleep(100); - time.sleep(30 * 1000L); + assertTrue(client.awaitMetadataUpdateRequest(Long.parseLong(maxBlockMs))); + time.sleep(Long.parseLong(maxBlockMs)); } catch (Exception e) { throw new RuntimeException(e); }