KAFKA-19206 ConsumerNetworkThread.cleanup() throws NullPointerException if initializeResources() previously failed (#19569)

Guard against possible `NullPointerExceptions` in
`ConsumerNetworkThread.cleanup()` if
`ConsumerNetworkThread.initializeResources()` previously failed.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Kirk True 2025-05-13 23:26:51 -07:00 committed by GitHub
parent cafe83f928
commit 692c7f14d6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 68 additions and 2 deletions

View File

@ -336,11 +336,20 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable {
log.trace("Closing the consumer network thread"); log.trace("Closing the consumer network thread");
Timer timer = time.timer(closeTimeout); Timer timer = time.timer(closeTimeout);
try { try {
// If an error was thrown from initializeResources(), it's possible that the list of request managers
// is null, so check before using. If the request manager list is null, there wasn't any real work
// performed, so not being able to close the request managers isn't so bad.
if (requestManagers != null && networkClientDelegate != null)
runAtClose(requestManagers.entries(), networkClientDelegate, time.milliseconds()); runAtClose(requestManagers.entries(), networkClientDelegate, time.milliseconds());
} catch (Exception e) { } catch (Exception e) {
log.error("Unexpected error during shutdown. Proceed with closing.", e); log.error("Unexpected error during shutdown. Proceed with closing.", e);
} finally { } finally {
// Likewise, if an error was thrown from initializeResources(), it's possible for the network client
// to be null, so check before using. If the network client is null, things have failed catastrophically
// enough that there aren't any outstanding requests to be sent anyway.
if (networkClientDelegate != null)
sendUnsentRequests(timer); sendUnsentRequests(timer);
asyncConsumerMetrics.recordApplicationEventExpiredSize(applicationEventReaper.reap(applicationEventQueue)); asyncConsumerMetrics.recordApplicationEventExpiredSize(applicationEventReaper.reap(applicationEventQueue));
closeQuietly(requestManagers, "request managers"); closeQuietly(requestManagers, "request managers");

View File

@ -21,6 +21,7 @@ import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProces
import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper; import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
import org.apache.kafka.clients.consumer.internals.events.PollEvent; import org.apache.kafka.clients.consumer.internals.events.PollEvent;
import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics; import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.MockTime;
@ -39,11 +40,14 @@ import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Supplier;
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP; import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP;
import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS; import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyLong;
@ -279,4 +283,57 @@ public class ConsumerNetworkThreadTest {
); );
} }
} }
@Test
public void testNetworkClientDelegateInitializeResourcesError() {
Supplier<NetworkClientDelegate> networkClientDelegateSupplier = () -> {
throw new KafkaException("Injecting NetworkClientDelegate initialization failure");
};
Supplier<RequestManagers> requestManagersSupplier = () -> requestManagers;
testInitializeResourcesError(networkClientDelegateSupplier, requestManagersSupplier);
}
@Test
public void testRequestManagersInitializeResourcesError() {
Supplier<NetworkClientDelegate> networkClientDelegateSupplier = () -> networkClientDelegate;
Supplier<RequestManagers> requestManagersSupplier = () -> {
throw new KafkaException("Injecting RequestManagers initialization failure");
};
testInitializeResourcesError(networkClientDelegateSupplier, requestManagersSupplier);
}
@Test
public void testNetworkClientDelegateAndRequestManagersInitializeResourcesError() {
Supplier<NetworkClientDelegate> networkClientDelegateSupplier = () -> {
throw new KafkaException("Injecting NetworkClientDelegate initialization failure");
};
Supplier<RequestManagers> requestManagersSupplier = () -> {
throw new KafkaException("Injecting RequestManagers initialization failure");
};
testInitializeResourcesError(networkClientDelegateSupplier, requestManagersSupplier);
}
/**
* Tests that when an error occurs during {@link ConsumerNetworkThread#initializeResources()} that the
* logic in {@link ConsumerNetworkThread#cleanup()} will not throw errors when closing.
*/
private void testInitializeResourcesError(Supplier<NetworkClientDelegate> networkClientDelegateSupplier,
Supplier<RequestManagers> requestManagersSupplier) {
// A new ConsumerNetworkThread is created because the shared one doesn't have any issues initializing its
// resources. However, most of the mocks can be reused, so this is mostly boilerplate except for the error
// when a supplier is invoked.
try (ConsumerNetworkThread thread = new ConsumerNetworkThread(
new LogContext(),
time,
applicationEventQueue,
applicationEventReaper,
() -> applicationEventProcessor,
networkClientDelegateSupplier,
requestManagersSupplier,
asyncConsumerMetrics
)) {
assertThrows(KafkaException.class, thread::initializeResources, "initializeResources should fail because one or more Supplier throws an error on get()");
assertDoesNotThrow(thread::cleanup, "cleanup() should not cause an error because all references are checked before use");
}
}
} }