From 6ca899e56d451eef04e81b0f4d88bdb10f3cf4b3 Mon Sep 17 00:00:00 2001 From: "Zhanxiang (Patrick) Huang" Date: Wed, 1 May 2019 12:40:49 -0700 Subject: [PATCH] KAFKA-8066; Always close the sensors in Selector.close() (#6402) When shutting down the ReplicaFetcher thread, we may fail to unregister sensors in selector.close(). When that happened, we will fail to start up the ReplicaFetcherThread with the same fetch id again because of the IllegalArgumentException in sensor registration. This issue will cause constant URPs in the cluster because the ReplicaFetchterThread is gone. This patch addresses this issue by introducing a try-finally block in selector.close() so that we will always unregister the sensors in shutting down ReplicaFetcherThreads. Reviewers: Rajini Sivaram , Jason Gustafson --- .../org/apache/kafka/clients/ClientUtils.java | 13 ---------- .../kafka/clients/consumer/KafkaConsumer.java | 12 ++++----- .../kafka/clients/producer/KafkaProducer.java | 11 ++++---- .../apache/kafka/common/network/Selector.java | 25 +++++++++++++------ .../org/apache/kafka/common/utils/Utils.java | 13 ++++++++++ .../kafka/common/network/SelectorTest.java | 21 ++++++++++++++++ .../distributed/WorkerGroupMember.java | 7 +++--- 7 files changed, 68 insertions(+), 34 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java index 4d933243655..cdd751338de 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java +++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java @@ -27,14 +27,12 @@ import org.apache.kafka.common.utils.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.Closeable; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.concurrent.atomic.AtomicReference; import static org.apache.kafka.common.utils.Utils.getHost; import static org.apache.kafka.common.utils.Utils.getPort; @@ -91,17 +89,6 @@ public final class ClientUtils { return addresses; } - public static void closeQuietly(Closeable c, String name, AtomicReference firstException) { - if (c != null) { - try { - c.close(); - } catch (Throwable t) { - firstException.compareAndSet(null, t); - log.error("Failed to close " + name, t); - } - } - } - /** * @param config client configs * @return configured ChannelBuilder based on the configs. diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index a59e8572eaa..9030308d6d1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -2201,12 +2201,12 @@ public class KafkaConsumer implements Consumer { firstException.compareAndSet(null, t); log.error("Failed to close coordinator", t); } - ClientUtils.closeQuietly(fetcher, "fetcher", firstException); - ClientUtils.closeQuietly(interceptors, "consumer interceptors", firstException); - ClientUtils.closeQuietly(metrics, "consumer metrics", firstException); - ClientUtils.closeQuietly(client, "consumer network client", firstException); - ClientUtils.closeQuietly(keyDeserializer, "consumer key deserializer", firstException); - ClientUtils.closeQuietly(valueDeserializer, "consumer value deserializer", firstException); + Utils.closeQuietly(fetcher, "fetcher", firstException); + Utils.closeQuietly(interceptors, "consumer interceptors", firstException); + Utils.closeQuietly(metrics, "consumer metrics", firstException); + Utils.closeQuietly(client, "consumer network client", firstException); + Utils.closeQuietly(keyDeserializer, "consumer key deserializer", firstException); + Utils.closeQuietly(valueDeserializer, "consumer value deserializer", firstException); AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId, metrics); log.debug("Kafka consumer has been closed"); Throwable exception = firstException.get(); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 06a0fc16dff..445bed1a995 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -67,6 +67,7 @@ import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.common.utils.KafkaThread; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; import java.net.InetSocketAddress; @@ -1188,11 +1189,11 @@ public class KafkaProducer implements Producer { } } - ClientUtils.closeQuietly(interceptors, "producer interceptors", firstException); - ClientUtils.closeQuietly(metrics, "producer metrics", firstException); - ClientUtils.closeQuietly(keySerializer, "producer keySerializer", firstException); - ClientUtils.closeQuietly(valueSerializer, "producer valueSerializer", firstException); - ClientUtils.closeQuietly(partitioner, "producer partitioner", firstException); + Utils.closeQuietly(interceptors, "producer interceptors", firstException); + Utils.closeQuietly(metrics, "producer metrics", firstException); + Utils.closeQuietly(keySerializer, "producer keySerializer", firstException); + Utils.closeQuietly(valueSerializer, "producer valueSerializer", firstException); + Utils.closeQuietly(partitioner, "producer partitioner", firstException); AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId, metrics); Throwable exception = firstException.get(); if (exception != null && !swallowException) { diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index b3497973afd..20e24b7be4a 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -30,6 +30,7 @@ import org.apache.kafka.common.metrics.stats.SampledStat; import org.apache.kafka.common.metrics.stats.Total; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; import java.io.IOException; @@ -52,6 +53,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; /** * A nioSelector interface for doing non-blocking multi-connection network I/O. @@ -358,15 +360,24 @@ public class Selector implements Selectable, AutoCloseable { @Override public void close() { List connections = new ArrayList<>(channels.keySet()); - for (String id : connections) - close(id); try { - this.nioSelector.close(); - } catch (IOException | SecurityException e) { - log.error("Exception closing nioSelector:", e); + for (String id : connections) + close(id); + } finally { + // If there is any exception thrown in close(id), we should still be able + // to close the remaining objects, especially the sensors because keeping + // the sensors may lead to failure to start up the ReplicaFetcherThread if + // the old sensors with the same names has not yet been cleaned up. + AtomicReference firstException = new AtomicReference<>(); + Utils.closeQuietly(nioSelector, "nioSelector", firstException); + Utils.closeQuietly(sensors, "sensors", firstException); + Utils.closeQuietly(channelBuilder, "channelBuilder", firstException); + Throwable exception = firstException.get(); + if (exception instanceof RuntimeException && !(exception instanceof SecurityException)) { + throw (RuntimeException) exception; + } + } - sensors.close(); - channelBuilder.close(); } /** diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index b5f7ab2b9c8..b5e334da95e 100755 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -57,6 +57,7 @@ import java.util.Map; import java.util.Objects; import java.util.Properties; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -857,6 +858,18 @@ public final class Utils { } } + public static void closeQuietly(AutoCloseable closeable, String name, AtomicReference firstException) { + if (closeable != null) { + try { + closeable.close(); + } catch (Throwable t) { + firstException.compareAndSet(null, t); + log.error("Failed to close {} with type {}", name, closeable.getClass().getName(), t); + } + } + } + + /** * A cheap way to deterministically convert a number to a positive value. When the input is * positive, the original value is returned. When the input number is negative, the returned diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java index 8cbada750c2..0f2d295f87b 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java @@ -59,6 +59,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.junit.Assert.assertThrows; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; @@ -717,6 +718,26 @@ public class SelectorTest { assertNull(selector.lowestPriorityChannel()); } + @Test + public void testMetricsCleanupOnSelectorClose() throws Exception { + Metrics metrics = new Metrics(); + Selector selector = new ImmediatelyConnectingSelector(5000, metrics, time, "MetricGroup", channelBuilder, new LogContext()) { + @Override + public void close(String id) { + throw new RuntimeException(); + } + }; + assertTrue(metrics.metrics().size() > 1); + String id = "0"; + selector.connect(id, new InetSocketAddress("localhost", server.port), BUFFER_SIZE, BUFFER_SIZE); + + // Close the selector and ensure a RuntimeException has been throw + assertThrows(RuntimeException.class, selector::close); + + // We should only have one remaining metric for kafka-metrics-count, which is a global metric + assertEquals(1, metrics.metrics().size()); + } + private String blockingRequest(String node, String s) throws IOException { selector.send(createSend(node, s)); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java index 71ce91eb792..5d18915d4c8 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java @@ -34,6 +34,7 @@ import org.apache.kafka.common.network.Selector; import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.storage.ConfigBackingStore; import org.apache.kafka.connect.util.ConnectorTaskId; import org.slf4j.Logger; @@ -205,9 +206,9 @@ public class WorkerGroupMember { log.trace("Stopping the Connect group member."); AtomicReference firstException = new AtomicReference<>(); this.stopped = true; - ClientUtils.closeQuietly(coordinator, "coordinator", firstException); - ClientUtils.closeQuietly(metrics, "consumer metrics", firstException); - ClientUtils.closeQuietly(client, "consumer network client", firstException); + Utils.closeQuietly(coordinator, "coordinator", firstException); + Utils.closeQuietly(metrics, "consumer metrics", firstException); + Utils.closeQuietly(client, "consumer network client", firstException); AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId, metrics); if (firstException.get() != null && !swallowException) throw new KafkaException("Failed to stop the Connect group member", firstException.get());