mirror of https://github.com/apache/kafka.git
KAFKA-19585: Avoid noisy NPE logs when closing consumer after constructor failures (#20491)
If there's a failure in the kafka consumer constructor, we attempt to
close it
2329def2ff/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java (L540)
In that case, it could be the case that some components may have not
been created, so we should consider some null checks to avoid noisy logs
about NPE.
This noisy logs have been reported with the console share consumer in a
similar scenario, so this task is to review and do a similar fix for the
Async if needed.
The fix is to check if handlers/invokers are null before trying to close
them. Similar to what was done here
https://github.com/apache/kafka/pull/20290
Reviewers: TengYao Chi <frankvicky@apache.org>, Lianet Magrans
<lmagrans@confluent.io>
This commit is contained in:
parent
f7593db287
commit
872647fe06
|
@ -1517,7 +1517,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
}
|
||||
|
||||
private void autoCommitOnClose(final Timer timer) {
|
||||
if (groupMetadata.get().isEmpty())
|
||||
if (groupMetadata.get().isEmpty() || applicationEventHandler == null)
|
||||
return;
|
||||
|
||||
if (autoCommitEnabled)
|
||||
|
@ -1527,7 +1527,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
}
|
||||
|
||||
private void runRebalanceCallbacksOnClose() {
|
||||
if (groupMetadata.get().isEmpty())
|
||||
if (groupMetadata.get().isEmpty() || rebalanceListenerInvoker == null)
|
||||
return;
|
||||
|
||||
int memberEpoch = groupMetadata.get().get().generationId();
|
||||
|
@ -1553,7 +1553,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
}
|
||||
|
||||
private void leaveGroupOnClose(final Timer timer, final CloseOptions.GroupMembershipOperation membershipOperation) {
|
||||
if (groupMetadata.get().isEmpty())
|
||||
if (groupMetadata.get().isEmpty() || applicationEventHandler == null)
|
||||
return;
|
||||
|
||||
log.debug("Leaving the consumer group during consumer close");
|
||||
|
@ -1569,7 +1569,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
}
|
||||
|
||||
private void stopFindCoordinatorOnClose() {
|
||||
if (groupMetadata.get().isEmpty())
|
||||
if (groupMetadata.get().isEmpty() || applicationEventHandler == null)
|
||||
return;
|
||||
log.debug("Stop finding coordinator during consumer close");
|
||||
applicationEventHandler.add(new StopFindCoordinatorOnCloseEvent());
|
||||
|
@ -1634,7 +1634,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
}
|
||||
|
||||
private void awaitPendingAsyncCommitsAndExecuteCommitCallbacks(Timer timer, boolean enableWakeup) {
|
||||
if (lastPendingAsyncCommit == null) {
|
||||
if (lastPendingAsyncCommit == null || offsetCommitCallbackInvoker == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
|
@ -83,6 +83,7 @@ import org.apache.kafka.common.requests.ListOffsetsRequest;
|
|||
import org.apache.kafka.common.requests.MetadataResponse;
|
||||
import org.apache.kafka.common.requests.RequestTestUtils;
|
||||
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||
import org.apache.kafka.common.utils.LogCaptureAppender;
|
||||
import org.apache.kafka.common.utils.LogContext;
|
||||
import org.apache.kafka.common.utils.MockTime;
|
||||
import org.apache.kafka.common.utils.Time;
|
||||
|
@ -2029,6 +2030,28 @@ public class AsyncKafkaConsumerTest {
|
|||
assertEquals(10, (double) metrics.metric(metrics.metricName("background-event-queue-time-max", CONSUMER_METRIC_GROUP)).metricValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailConstructor() {
|
||||
final Properties props = requiredConsumerConfig();
|
||||
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id");
|
||||
props.put(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, "an.invalid.class");
|
||||
final ConsumerConfig config = new ConsumerConfig(props);
|
||||
|
||||
try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister()) {
|
||||
KafkaException ce = assertThrows(
|
||||
KafkaException.class,
|
||||
() -> newConsumer(config));
|
||||
assertTrue(ce.getMessage().contains("Failed to construct kafka consumer"), "Unexpected exception message: " + ce.getMessage());
|
||||
assertTrue(ce.getCause().getMessage().contains("Class an.invalid.class cannot be found"), "Unexpected cause: " + ce.getCause());
|
||||
|
||||
boolean npeLogged = appender.getEvents().stream()
|
||||
.flatMap(event -> event.getThrowableInfo().stream())
|
||||
.anyMatch(str -> str.contains("NullPointerException"));
|
||||
|
||||
assertFalse(npeLogged, "Unexpected NullPointerException during consumer construction");
|
||||
}
|
||||
}
|
||||
|
||||
private Map<TopicPartition, OffsetAndMetadata> mockTopicPartitionOffset() {
|
||||
final TopicPartition t0 = new TopicPartition("t0", 2);
|
||||
final TopicPartition t1 = new TopicPartition("t0", 3);
|
||||
|
|
Loading…
Reference in New Issue