MINOR: Improve error message when Connect's EmbeddedKafkaCluster::verifyClusterReadiness method fails (#16918)

Reviewers: Greg Harris <greg.harris@aiven.io>
This commit is contained in:
Chris Egerton 2024-09-07 10:23:59 -04:00 committed by GitHub
parent e4d108d4df
commit 50e7022a1b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 17 additions and 6 deletions

View File

@ -88,6 +88,7 @@ import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_
import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
import static org.junit.jupiter.api.Assertions.fail;
/** /**
* Setup an embedded Kafka KRaft cluster (using {@link kafka.testkit.KafkaClusterTestKit} internally) with the * Setup an embedded Kafka KRaft cluster (using {@link kafka.testkit.KafkaClusterTestKit} internally) with the
@ -182,14 +183,24 @@ public class EmbeddedKafkaCluster {
Map<String, Object> consumerConfig = Collections.singletonMap(GROUP_ID_CONFIG, consumerGroupId); Map<String, Object> consumerConfig = Collections.singletonMap(GROUP_ID_CONFIG, consumerGroupId);
String topic = "consumer-warmup-" + consumerGroupId; String topic = "consumer-warmup-" + consumerGroupId;
createTopic(topic); try {
produce(topic, "warmup message key", "warmup message value"); createTopic(topic);
produce(topic, "warmup message key", "warmup message value");
try (Consumer<?, ?> consumer = createConsumerAndSubscribeTo(consumerConfig, topic)) { try (Consumer<?, ?> consumer = createConsumerAndSubscribeTo(consumerConfig, topic)) {
ConsumerRecords<?, ?> records = consumer.poll(Duration.ofMillis(GROUP_COORDINATOR_AVAILABILITY_DURATION_MS)); ConsumerRecords<?, ?> records = consumer.poll(Duration.ofMillis(GROUP_COORDINATOR_AVAILABILITY_DURATION_MS));
if (records.isEmpty()) { if (records.isEmpty()) {
throw new AssertionError("Failed to verify availability of group coordinator and produce/consume APIs on Kafka cluster in time"); throw new AssertionError("Failed to verify availability of group coordinator and/or consume APIs on Kafka cluster in time");
}
} }
} catch (Throwable e) {
fail(
"The Kafka cluster used in this test was not able to start successfully in time. "
+ "If no recent changes have altered the behavior of Kafka brokers or clients, and this error "
+ "is not occurring frequently, it is probably the result of the testing machine being temporarily "
+ "overloaded and can be safely ignored.",
e
);
} }
try (Admin admin = createAdminClient()) { try (Admin admin = createAdminClient()) {