diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 93ed987ddc0..5c72c2babbb 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -1517,7 +1517,7 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { } private void autoCommitOnClose(final Timer timer) { - if (groupMetadata.get().isEmpty()) + if (groupMetadata.get().isEmpty() || applicationEventHandler == null) return; if (autoCommitEnabled) @@ -1527,7 +1527,7 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { } private void runRebalanceCallbacksOnClose() { - if (groupMetadata.get().isEmpty()) + if (groupMetadata.get().isEmpty() || rebalanceListenerInvoker == null) return; int memberEpoch = groupMetadata.get().get().generationId(); @@ -1553,7 +1553,7 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { } private void leaveGroupOnClose(final Timer timer, final CloseOptions.GroupMembershipOperation membershipOperation) { - if (groupMetadata.get().isEmpty()) + if (groupMetadata.get().isEmpty() || applicationEventHandler == null) return; log.debug("Leaving the consumer group during consumer close"); @@ -1569,7 +1569,7 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { } private void stopFindCoordinatorOnClose() { - if (groupMetadata.get().isEmpty()) + if (groupMetadata.get().isEmpty() || applicationEventHandler == null) return; log.debug("Stop finding coordinator during consumer close"); applicationEventHandler.add(new StopFindCoordinatorOnCloseEvent()); @@ -1634,7 +1634,7 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { } private void awaitPendingAsyncCommitsAndExecuteCommitCallbacks(Timer timer, boolean enableWakeup) { - if (lastPendingAsyncCommit == null) { + if (lastPendingAsyncCommit == null || offsetCommitCallbackInvoker == null) { return; } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index 6a5cb871fd1..dcf604d6b81 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -83,6 +83,7 @@ import org.apache.kafka.common.requests.ListOffsetsRequest; import org.apache.kafka.common.requests.MetadataResponse; import org.apache.kafka.common.requests.RequestTestUtils; import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; @@ -2029,6 +2030,28 @@ public class AsyncKafkaConsumerTest { assertEquals(10, (double) metrics.metric(metrics.metricName("background-event-queue-time-max", CONSUMER_METRIC_GROUP)).metricValue()); } + @Test + public void testFailConstructor() { + final Properties props = requiredConsumerConfig(); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id"); + props.put(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, "an.invalid.class"); + final ConsumerConfig config = new ConsumerConfig(props); + + try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister()) { + KafkaException ce = assertThrows( + KafkaException.class, + () -> newConsumer(config)); + assertTrue(ce.getMessage().contains("Failed to construct kafka consumer"), "Unexpected exception message: " + ce.getMessage()); + assertTrue(ce.getCause().getMessage().contains("Class an.invalid.class cannot be found"), "Unexpected cause: " + ce.getCause()); + + boolean npeLogged = appender.getEvents().stream() + .flatMap(event -> event.getThrowableInfo().stream()) + .anyMatch(str -> str.contains("NullPointerException")); + + assertFalse(npeLogged, "Unexpected NullPointerException during consumer construction"); + } + } + private Map mockTopicPartitionOffset() { final TopicPartition t0 = new TopicPartition("t0", 2); final TopicPartition t1 = new TopicPartition("t0", 3);