This commit is contained in:
Chang-Yu Huang 2025-10-07 15:53:47 -04:00 committed by GitHub
commit 6969ca94d2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 74 additions and 72 deletions

View File

@ -39,6 +39,8 @@ import java.util.Queue;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -72,6 +74,8 @@ public class MockClient implements KafkaClient {
private int correlation; private int correlation;
private Runnable wakeupHook; private Runnable wakeupHook;
private boolean advanceTimeDuringPoll; private boolean advanceTimeDuringPoll;
private boolean shouldUpdateWithCurrentMetadata = true;
private CountDownLatch isMetadataUpdateNeeded = new CountDownLatch(1);
private final Time time; private final Time time;
private final MockMetadataUpdater metadataUpdater; private final MockMetadataUpdater metadataUpdater;
private final Map<String, ConnectionState> connections = new HashMap<>(); private final Map<String, ConnectionState> connections = new HashMap<>();
@ -192,6 +196,10 @@ public class MockClient implements KafkaClient {
this.disconnectFuture = disconnectFuture; this.disconnectFuture = disconnectFuture;
} }
public void setShouldUpdateWithCurrentMetadata(boolean shouldUpdateWithCurrentMetadata) {
this.shouldUpdateWithCurrentMetadata = shouldUpdateWithCurrentMetadata;
}
@Override @Override
public void disconnect(String node) { public void disconnect(String node) {
disconnect(node, false); disconnect(node, false);
@ -329,8 +337,10 @@ public class MockClient implements KafkaClient {
MetadataUpdate metadataUpdate = metadataUpdates.poll(); MetadataUpdate metadataUpdate = metadataUpdates.poll();
if (metadataUpdate != null) { if (metadataUpdate != null) {
metadataUpdater.update(time, metadataUpdate); metadataUpdater.update(time, metadataUpdate);
} else { } else if (shouldUpdateWithCurrentMetadata) {
metadataUpdater.updateWithCurrentMetadata(time); metadataUpdater.updateWithCurrentMetadata(time);
} else {
isMetadataUpdateNeeded.countDown();
} }
} }
@ -350,6 +360,14 @@ public class MockClient implements KafkaClient {
return copy; return copy;
} }
public boolean awaitMetadataUpdateRequest(long timeoutMs) throws InterruptedException {
if (isMetadataUpdateNeeded.await(timeoutMs, TimeUnit.MILLISECONDS)) {
isMetadataUpdateNeeded = new CountDownLatch(1);
return true;
}
return false;
}
private long elapsedTimeMs(long currentTimeMs, long startTimeMs) { private long elapsedTimeMs(long currentTimeMs, long startTimeMs) {
return Math.max(0, currentTimeMs - startTimeMs); return Math.max(0, currentTimeMs - startTimeMs);
} }
@ -523,6 +541,10 @@ public class MockClient implements KafkaClient {
metadataUpdates.add(new MetadataUpdate(updateResponse, expectMatchMetadataTopics)); metadataUpdates.add(new MetadataUpdate(updateResponse, expectMatchMetadataTopics));
} }
public int preparedMetadataUpdatesCount() {
return metadataUpdates.size();
}
public void updateMetadata(MetadataResponse updateResponse) { public void updateMetadata(MetadataResponse updateResponse) {
metadataUpdater.update(time, new MetadataUpdate(updateResponse, false)); metadataUpdater.update(time, new MetadataUpdate(updateResponse, false));
} }

View File

@ -128,13 +128,11 @@ import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -961,10 +959,9 @@ public class KafkaProducerTest {
@Test @Test
public void testTopicRefreshInMetadata() throws InterruptedException { public void testTopicRefreshInMetadata() throws InterruptedException {
Map<String, Object> configs = new HashMap<>(); Map<String, Object> configs = new HashMap<>();
final String maxBlockMs = "600000";
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "600000"); configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs);
// test under normal producer for simplicity
configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false);
long refreshBackoffMs = 500L; long refreshBackoffMs = 500L;
long refreshBackoffMaxMs = 5000L; long refreshBackoffMaxMs = 5000L;
long metadataExpireMs = 60000L; long metadataExpireMs = 60000L;
@ -972,35 +969,35 @@ public class KafkaProducerTest {
final Time time = new MockTime(); final Time time = new MockTime();
final ProducerMetadata metadata = new ProducerMetadata(refreshBackoffMs, refreshBackoffMaxMs, metadataExpireMs, metadataIdleMs, final ProducerMetadata metadata = new ProducerMetadata(refreshBackoffMs, refreshBackoffMaxMs, metadataExpireMs, metadataIdleMs,
new LogContext(), new ClusterResourceListeners(), time); new LogContext(), new ClusterResourceListeners(), time);
final String warmupTopic = "warmup-topic";
final String topic = "topic"; final String topic = "topic";
MockClient client = new MockClient(time, metadata);
client.setShouldUpdateWithCurrentMetadata(false);
client.advanceTimeDuringPoll(true);
try (KafkaProducer<String, String> producer = kafkaProducer(configs, try (KafkaProducer<String, String> producer = kafkaProducer(configs,
new StringSerializer(), new StringSerializer(), metadata, new MockClient(time, metadata), null, time)) { new StringSerializer(), new StringSerializer(), metadata, client, null, time)) {
AtomicBoolean running = new AtomicBoolean(true); MetadataResponse updateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap(warmupTopic, 1));
Thread t = new Thread(() -> { client.updateMetadata(updateResponse);
long startTimeMs = System.currentTimeMillis();
while (running.get()) { final int preparedUpdatesCount = 100;
while (!metadata.updateRequested() && System.currentTimeMillis() - startTimeMs < 100) updateResponse = RequestTestUtils.metadataUpdateWith("kafka-cluster", 1,
Thread.yield();
MetadataResponse updateResponse = RequestTestUtils.metadataUpdateWith("kafka-cluster", 1,
singletonMap(topic, Errors.UNKNOWN_TOPIC_OR_PARTITION), emptyMap()); singletonMap(topic, Errors.UNKNOWN_TOPIC_OR_PARTITION), emptyMap());
metadata.updateWithCurrentRequestVersion(updateResponse, false, time.milliseconds()); for (int i = 0; i < preparedUpdatesCount; i++) {
time.sleep(60 * 1000L); client.prepareMetadataUpdate(updateResponse);
} }
});
t.start();
Throwable throwable = assertThrows(TimeoutException.class, () -> producer.partitionsFor(topic)); Throwable throwable = assertThrows(TimeoutException.class, () -> producer.partitionsFor(topic));
assertInstanceOf(UnknownTopicOrPartitionException.class, throwable.getCause()); assertInstanceOf(UnknownTopicOrPartitionException.class, throwable.getCause());
running.set(false); assertTrue(preparedUpdatesCount > client.preparedMetadataUpdatesCount());
t.join();
} }
} }
@Test @Test
public void testTopicNotExistingInMetadata() throws InterruptedException { public void testTopicNotExistingInMetadata() {
Map<String, Object> configs = new HashMap<>(); Map<String, Object> configs = new HashMap<>();
final String maxBlockMs = "30000";
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "30000"); configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs);
long refreshBackoffMs = 500L; long refreshBackoffMs = 500L;
long refreshBackoffMaxMs = 5000L; long refreshBackoffMaxMs = 5000L;
long metadataExpireMs = 60000L; long metadataExpireMs = 60000L;
@ -1008,39 +1005,33 @@ public class KafkaProducerTest {
final Time time = new MockTime(); final Time time = new MockTime();
final ProducerMetadata metadata = new ProducerMetadata(refreshBackoffMs, refreshBackoffMaxMs, metadataExpireMs, metadataIdleMs, final ProducerMetadata metadata = new ProducerMetadata(refreshBackoffMs, refreshBackoffMaxMs, metadataExpireMs, metadataIdleMs,
new LogContext(), new ClusterResourceListeners(), time); new LogContext(), new ClusterResourceListeners(), time);
final String warmupTopic = "warmup-topic";
final String topic = "topic"; final String topic = "topic";
MockClient client = new MockClient(time, metadata);
client.setShouldUpdateWithCurrentMetadata(false);
client.advanceTimeDuringPoll(true);
try (KafkaProducer<String, String> producer = kafkaProducer(configs, new StringSerializer(), try (KafkaProducer<String, String> producer = kafkaProducer(configs, new StringSerializer(),
new StringSerializer(), metadata, new MockClient(time, metadata), null, time)) { new StringSerializer(), metadata, client, null, time)) {
Exchanger<Void> exchanger = new Exchanger<>(); MetadataResponse updateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap(warmupTopic, 1));
client.updateMetadata(updateResponse);
Thread t = new Thread(() -> { updateResponse = RequestTestUtils.metadataUpdateWith("kafka-cluster", 1,
try {
// Update the metadata with non-existing topic.
MetadataResponse updateResponse = RequestTestUtils.metadataUpdateWith("kafka-cluster", 1,
singletonMap(topic, Errors.UNKNOWN_TOPIC_OR_PARTITION), emptyMap()); singletonMap(topic, Errors.UNKNOWN_TOPIC_OR_PARTITION), emptyMap());
metadata.updateWithCurrentRequestVersion(updateResponse, false, time.milliseconds()); client.prepareMetadataUpdate(updateResponse);
exchanger.exchange(null);
while (!metadata.updateRequested())
Thread.sleep(100);
time.sleep(30 * 1000L);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
t.start();
exchanger.exchange(null);
Throwable throwable = assertThrows(TimeoutException.class, () -> producer.partitionsFor(topic)); Throwable throwable = assertThrows(TimeoutException.class, () -> producer.partitionsFor(topic));
assertInstanceOf(UnknownTopicOrPartitionException.class, throwable.getCause()); assertInstanceOf(UnknownTopicOrPartitionException.class, throwable.getCause());
t.join(); assertEquals(0, client.preparedMetadataUpdatesCount());
} }
} }
@Test @Test
public void testTopicExpiryInMetadata() throws InterruptedException { public void testTopicExpiryInMetadata() throws InterruptedException {
Map<String, Object> configs = new HashMap<>(); Map<String, Object> configs = new HashMap<>();
final String maxBlockMs = "300000";
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "30000"); configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs);
long refreshBackoffMs = 500L; long refreshBackoffMs = 500L;
long refreshBackoffMaxMs = 5000L; long refreshBackoffMaxMs = 5000L;
long metadataExpireMs = 60000L; long metadataExpireMs = 60000L;
@ -1049,39 +1040,28 @@ public class KafkaProducerTest {
final ProducerMetadata metadata = new ProducerMetadata(refreshBackoffMs, refreshBackoffMaxMs, metadataExpireMs, metadataIdleMs, final ProducerMetadata metadata = new ProducerMetadata(refreshBackoffMs, refreshBackoffMaxMs, metadataExpireMs, metadataIdleMs,
new LogContext(), new ClusterResourceListeners(), time); new LogContext(), new ClusterResourceListeners(), time);
final String topic = "topic"; final String topic = "topic";
final String warmupTopic = "warmup-topic";
MockClient client = new MockClient(time, metadata);
client.setShouldUpdateWithCurrentMetadata(false);
client.advanceTimeDuringPoll(true);
try (KafkaProducer<String, String> producer = kafkaProducer(configs, new StringSerializer(), try (KafkaProducer<String, String> producer = kafkaProducer(configs, new StringSerializer(),
new StringSerializer(), metadata, new MockClient(time, metadata), null, time)) { new StringSerializer(), metadata, client, null, time)) {
Exchanger<Void> exchanger = new Exchanger<>(); MetadataResponse updateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap(warmupTopic, 1));
client.updateMetadata(updateResponse);
Thread t = new Thread(() -> { updateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap(topic, 1));
try { client.prepareMetadataUpdate(updateResponse);
exchanger.exchange(null); // 1 assertNotNull(producer.partitionsFor(topic));
while (!metadata.updateRequested())
Thread.sleep(100);
MetadataResponse updateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap(topic, 1));
metadata.updateWithCurrentRequestVersion(updateResponse, false, time.milliseconds());
exchanger.exchange(null); // 2
time.sleep(120 * 1000L);
// Update the metadata again, but it should be expired at this point. // Update the metadata again, but it should be expired at this point.
updateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap(topic, 1)); time.sleep(120 * 1000L);
metadata.updateWithCurrentRequestVersion(updateResponse, false, time.milliseconds()); updateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap(warmupTopic, 1));
exchanger.exchange(null); // 3 client.updateMetadata(updateResponse);
while (!metadata.updateRequested())
Thread.sleep(100); assertFalse(client.awaitMetadataUpdateRequest(0));
time.sleep(30 * 1000L);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
t.start();
exchanger.exchange(null); // 1
assertNotNull(producer.partitionsFor(topic));
exchanger.exchange(null); // 2
exchanger.exchange(null); // 3
assertThrows(TimeoutException.class, () -> producer.partitionsFor(topic)); assertThrows(TimeoutException.class, () -> producer.partitionsFor(topic));
t.join(); assertTrue(client.awaitMetadataUpdateRequest(0));
} }
} }