Remove busy waiting testTopicRefreshInMetadata

This commit is contained in:
ChangYu Huang 2025-08-19 00:36:37 -04:00
parent 39fb8a0f95
commit 9922d9dacd
1 changed files with 19 additions and 10 deletions

View File

@ -958,8 +958,9 @@ public class KafkaProducerTest {
@Test
public void testTopicRefreshInMetadata() throws InterruptedException {
Map<String, Object> configs = new HashMap<>();
final String maxBlockMs = "600000";
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "600000");
configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs);
// test under normal producer for simplicity
configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false);
long refreshBackoffMs = 500L;
@ -969,20 +970,28 @@ public class KafkaProducerTest {
final Time time = new MockTime();
final ProducerMetadata metadata = new ProducerMetadata(refreshBackoffMs, refreshBackoffMaxMs, metadataExpireMs, metadataIdleMs,
new LogContext(), new ClusterResourceListeners(), time);
final String warmupTopic = "warmup-topic";
final String topic = "topic";
MockClient client = new MockClient(time, metadata);
client.setShouldUpdateWithCurrentMetadata(false);
try (KafkaProducer<String, String> producer = kafkaProducer(configs,
new StringSerializer(), new StringSerializer(), metadata, new MockClient(time, metadata), null, time)) {
new StringSerializer(), new StringSerializer(), metadata, client, null, time)) {
AtomicBoolean running = new AtomicBoolean(true);
Thread t = new Thread(() -> {
long startTimeMs = System.currentTimeMillis();
while (running.get()) {
while (!metadata.updateRequested() && System.currentTimeMillis() - startTimeMs < 100)
Thread.yield();
MetadataResponse updateResponse = RequestTestUtils.metadataUpdateWith("kafka-cluster", 1,
singletonMap(topic, Errors.UNKNOWN_TOPIC_OR_PARTITION), emptyMap());
metadata.updateWithCurrentRequestVersion(updateResponse, false, time.milliseconds());
time.sleep(60 * 1000L);
try {
MetadataResponse updateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap(warmupTopic, 1));
client.updateMetadata(updateResponse);
while (running.get()) {
if (client.awaitMetadataUpdateRequest(100)) {
updateResponse = RequestTestUtils.metadataUpdateWith("kafka-cluster", 1,
singletonMap(topic, Errors.UNKNOWN_TOPIC_OR_PARTITION), emptyMap());
client.updateMetadata(updateResponse);
time.sleep(Long.parseLong(maxBlockMs));
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
});
t.start();