KAFKA-2121; Close internnal modules upon client shutdown; reviewed by Ewen Cheslack-Postava and Guozhang Wang

This commit is contained in:
Steven Wu 2015-04-22 10:12:45 -07:00 committed by Guozhang Wang
parent 6b964461af
commit 01e94e2b4a
11 changed files with 393 additions and 167 deletions

View File

@ -12,16 +12,21 @@
*/ */
package org.apache.kafka.clients; package org.apache.kafka.clients;
import java.io.Closeable;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.ConfigException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.kafka.common.utils.Utils.getHost; import static org.apache.kafka.common.utils.Utils.getHost;
import static org.apache.kafka.common.utils.Utils.getPort; import static org.apache.kafka.common.utils.Utils.getPort;
public class ClientUtils { public class ClientUtils {
private static final Logger log = LoggerFactory.getLogger(ClientUtils.class);
public static List<InetSocketAddress> parseAndValidateAddresses(List<String> urls) { public static List<InetSocketAddress> parseAndValidateAddresses(List<String> urls) {
List<InetSocketAddress> addresses = new ArrayList<InetSocketAddress>(); List<InetSocketAddress> addresses = new ArrayList<InetSocketAddress>();
@ -45,4 +50,15 @@ public class ClientUtils {
throw new ConfigException("No bootstrap urls given in " + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG); throw new ConfigException("No bootstrap urls given in " + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
return addresses; return addresses;
} }
public static void closeQuietly(Closeable c, String name, AtomicReference<Throwable> firstException) {
if (c != null) {
try {
c.close();
} catch (Throwable t) {
firstException.compareAndSet(null, t);
log.error("Failed to close " + name, t);
}
}
}
} }

View File

@ -12,6 +12,7 @@
*/ */
package org.apache.kafka.clients; package org.apache.kafka.clients;
import java.io.Closeable;
import java.util.List; import java.util.List;
import org.apache.kafka.common.Node; import org.apache.kafka.common.Node;
@ -21,7 +22,7 @@ import org.apache.kafka.common.requests.RequestHeader;
/** /**
* The interface for {@link NetworkClient} * The interface for {@link NetworkClient}
*/ */
public interface KafkaClient { public interface KafkaClient extends Closeable {
/** /**
* Check if we are currently ready to send another request to the given node but don't attempt to connect if we * Check if we are currently ready to send another request to the given node but don't attempt to connect if we
@ -130,9 +131,4 @@ public interface KafkaClient {
*/ */
public void wakeup(); public void wakeup();
/**
* Close the client and disconnect from all nodes
*/
public void close();
} }

View File

@ -24,6 +24,7 @@ import java.util.Properties;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.ClientUtils; import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.Metadata;
@ -32,6 +33,7 @@ import org.apache.kafka.clients.consumer.internals.Coordinator;
import org.apache.kafka.clients.consumer.internals.Fetcher; import org.apache.kafka.clients.consumer.internals.Fetcher;
import org.apache.kafka.clients.consumer.internals.SubscriptionState; import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric; import org.apache.kafka.common.Metric;
import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
@ -346,6 +348,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
private static final AtomicInteger CONSUMER_CLIENT_ID_SEQUENCE = new AtomicInteger(1); private static final AtomicInteger CONSUMER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
private final Coordinator coordinator; private final Coordinator coordinator;
private final Deserializer<K> keyDeserializer;
private final Deserializer<V> valueDeserializer;
private final Fetcher<K, V> fetcher; private final Fetcher<K, V> fetcher;
private final Time time; private final Time time;
@ -437,6 +441,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
ConsumerRebalanceCallback callback, ConsumerRebalanceCallback callback,
Deserializer<K> keyDeserializer, Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer) { Deserializer<V> valueDeserializer) {
try {
log.debug("Starting the Kafka consumer"); log.debug("Starting the Kafka consumer");
if (callback == null) if (callback == null)
this.rebalanceCallback = config.getConfiguredInstance(ConsumerConfig.CONSUMER_REBALANCE_CALLBACK_CLASS_CONFIG, this.rebalanceCallback = config.getConfiguredInstance(ConsumerConfig.CONSUMER_REBALANCE_CALLBACK_CLASS_CONFIG,
@ -486,6 +491,21 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
metricGrpPrefix, metricGrpPrefix,
metricsTags, metricsTags,
this.time); this.time);
if (keyDeserializer == null) {
this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
Deserializer.class);
this.keyDeserializer.configure(config.originals(), false);
} else {
this.keyDeserializer = keyDeserializer;
}
if (valueDeserializer == null) {
this.valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
Deserializer.class);
this.valueDeserializer.configure(config.originals(), false);
} else {
this.valueDeserializer = valueDeserializer;
}
this.fetcher = new Fetcher<K, V>(this.client, this.fetcher = new Fetcher<K, V>(this.client,
this.retryBackoffMs, this.retryBackoffMs,
config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG), config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG),
@ -493,8 +513,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG), config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG),
config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG), config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG),
config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(), config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(),
keyDeserializer == null ? config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class) : keyDeserializer, this.keyDeserializer,
valueDeserializer == null ? config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class) : valueDeserializer, this.valueDeserializer,
this.metadata, this.metadata,
this.subscriptions, this.subscriptions,
metrics, metrics,
@ -505,6 +525,13 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
config.logUnused(); config.logUnused();
log.debug("Kafka consumer created"); log.debug("Kafka consumer created");
} catch (Throwable t) {
// call close methods if internal objects are already constructed
// this is to prevent resource leak. see KAFKA-2121
close(true);
// now propagate the exception
throw new KafkaException("Failed to construct kafka consumer", t);
}
} }
/** /**
@ -806,13 +833,24 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
@Override @Override
public synchronized void close() { public synchronized void close() {
log.trace("Closing the Kafka consumer."); close(false);
this.closed = true;
this.metrics.close();
this.client.close();
log.debug("The Kafka consumer has closed.");
} }
private void close(boolean swallowException) {
log.trace("Closing the Kafka consumer.");
AtomicReference<Throwable> firstException = new AtomicReference<Throwable>();
this.closed = true;
ClientUtils.closeQuietly(metrics, "consumer metrics", firstException);
ClientUtils.closeQuietly(client, "consumer network client", firstException);
ClientUtils.closeQuietly(keyDeserializer, "consumer key deserializer", firstException);
ClientUtils.closeQuietly(valueDeserializer, "consumer value deserializer", firstException);
log.debug("The Kafka consumer has closed.");
if (firstException.get() != null && !swallowException) {
throw new KafkaException("Failed to close kafka consumer", firstException.get());
}
}
private boolean shouldAutoCommit(long now) { private boolean shouldAutoCommit(long now) {
return this.autoCommit && this.lastCommitAttemptMs <= now - this.autoCommitIntervalMs; return this.autoCommit && this.lastCommitAttemptMs <= now - this.autoCommitIntervalMs;
} }

View File

@ -18,6 +18,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.ClientUtils; import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.Metadata;
@ -191,6 +192,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) { private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
try {
log.trace("Starting the Kafka producer"); log.trace("Starting the Kafka producer");
this.producerConfig = config; this.producerConfig = config;
this.time = new SystemTime(); this.time = new SystemTime();
@ -226,7 +228,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds()); this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds());
NetworkClient client = new NetworkClient(new Selector(this.metrics, time , "producer", metricTags), NetworkClient client = new NetworkClient(new Selector(this.metrics, time, "producer", metricTags),
this.metadata, this.metadata,
clientId, clientId,
config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION), config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
@ -266,6 +268,13 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
config.logUnused(); config.logUnused();
log.debug("Kafka producer started"); log.debug("Kafka producer started");
} catch (Throwable t) {
// call close methods if internal objects are already constructed
// this is to prevent resource leak. see KAFKA-2121
close(true);
// now propagate the exception
throw new KafkaException("Failed to construct kafka producer", t);
}
} }
private static int parseAcks(String acksString) { private static int parseAcks(String acksString) {
@ -513,17 +522,36 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
*/ */
@Override @Override
public void close() { public void close() {
close(false);
}
private void close(boolean swallowException) {
log.trace("Closing the Kafka producer."); log.trace("Closing the Kafka producer.");
// this will keep track of the first encountered exception
AtomicReference<Throwable> firstException = new AtomicReference<Throwable>();
if (this.sender != null) {
try {
this.sender.initiateClose(); this.sender.initiateClose();
} catch (Throwable t) {
firstException.compareAndSet(null, t);
log.error("Failed to close sender", t);
}
}
if (this.ioThread != null) {
try { try {
this.ioThread.join(); this.ioThread.join();
} catch (InterruptedException e) { } catch (InterruptedException t) {
throw new InterruptException(e); firstException.compareAndSet(null, t);
log.error("Interrupted while joining ioThread", t);
} }
this.metrics.close(); }
this.keySerializer.close(); ClientUtils.closeQuietly(metrics, "producer metrics", firstException);
this.valueSerializer.close(); ClientUtils.closeQuietly(keySerializer, "producer keySerializer", firstException);
ClientUtils.closeQuietly(valueSerializer, "producer valueSerializer", firstException);
log.debug("The Kafka producer has closed."); log.debug("The Kafka producer has closed.");
if (firstException.get() != null && !swallowException) {
throw new KafkaException("Failed to close kafka producer", firstException.get());
}
} }
private static class FutureFailure implements Future<RecordMetadata> { private static class FutureFailure implements Future<RecordMetadata> {

View File

@ -139,8 +139,11 @@ public class Sender implements Runnable {
log.error("Uncaught error in kafka producer I/O thread: ", e); log.error("Uncaught error in kafka producer I/O thread: ", e);
} }
} }
try {
this.client.close(); this.client.close();
} catch (Exception e) {
log.error("Failed to close network client", e);
}
log.debug("Shutdown of Kafka producer I/O thread has completed."); log.debug("Shutdown of Kafka producer I/O thread has completed.");
} }

View File

@ -12,6 +12,7 @@
*/ */
package org.apache.kafka.common.metrics; package org.apache.kafka.common.metrics;
import java.io.Closeable;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -46,7 +47,7 @@ import org.apache.kafka.common.utils.Utils;
* sensor.record(messageSize); * sensor.record(messageSize);
* </pre> * </pre>
*/ */
public class Metrics { public class Metrics implements Closeable {
private final MetricConfig config; private final MetricConfig config;
private final ConcurrentMap<MetricName, KafkaMetric> metrics; private final ConcurrentMap<MetricName, KafkaMetric> metrics;
@ -192,6 +193,7 @@ public class Metrics {
/** /**
* Close this metrics repository. * Close this metrics repository.
*/ */
@Override
public void close() { public void close() {
for (MetricsReporter reporter : this.reporters) for (MetricsReporter reporter : this.reporters)
reporter.close(); reporter.close();

View File

@ -13,6 +13,7 @@
package org.apache.kafka.common.serialization; package org.apache.kafka.common.serialization;
import java.io.Closeable;
import java.util.Map; import java.util.Map;
/** /**
@ -21,7 +22,7 @@ import java.util.Map;
* *
* A class that implements this interface is expected to have a constructor with no parameter. * A class that implements this interface is expected to have a constructor with no parameter.
*/ */
public interface Deserializer<T> { public interface Deserializer<T> extends Closeable {
/** /**
* Configure this class. * Configure this class.
@ -38,8 +39,4 @@ public interface Deserializer<T> {
*/ */
public T deserialize(String topic, byte[] data); public T deserialize(String topic, byte[] data);
/**
* Close this deserializer
*/
public void close();
} }

View File

@ -13,6 +13,7 @@
package org.apache.kafka.common.serialization; package org.apache.kafka.common.serialization;
import java.io.Closeable;
import java.util.Map; import java.util.Map;
/** /**
@ -21,7 +22,7 @@ import java.util.Map;
* *
* A class that implements this interface is expected to have a constructor with no parameter. * A class that implements this interface is expected to have a constructor with no parameter.
*/ */
public interface Serializer<T> { public interface Serializer<T> extends Closeable {
/** /**
* Configure this class. * Configure this class.
@ -37,8 +38,4 @@ public interface Serializer<T> {
*/ */
public byte[] serialize(String topic, T data); public byte[] serialize(String topic, T data);
/**
* Close this serializer
*/
public void close();
} }

View File

@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.test.MockMetricsReporter;
import org.junit.Assert;
import org.junit.Test;
import java.util.Properties;
public class KafkaConsumerTest {
@Test
public void testConstructorClose() throws Exception {
Properties props = new Properties();
props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testConstructorClose");
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "some.invalid.hostname.foo.bar:9999");
props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
MockMetricsReporter.CLOSE_COUNT.set(0);
try {
KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(
props, null, new ByteArrayDeserializer(), new ByteArrayDeserializer());
} catch (KafkaException e) {
Assert.assertEquals(1, MockMetricsReporter.CLOSE_COUNT.get());
MockMetricsReporter.CLOSE_COUNT.set(0);
Assert.assertEquals("Failed to construct kafka consumer", e.getMessage());
return;
}
Assert.fail("should have caught an exception and returned");
}
}

View File

@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.producer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.test.MockMetricsReporter;
import org.junit.Assert;
import org.junit.Test;
import java.util.Properties;
public class KafkaProducerTest {
@Test
public void testConstructorClose() throws Exception {
Properties props = new Properties();
props.setProperty(ProducerConfig.CLIENT_ID_CONFIG, "testConstructorClose");
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "some.invalid.hostname.foo.bar:9999");
props.setProperty(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
MockMetricsReporter.CLOSE_COUNT.set(0);
try {
KafkaProducer<byte[], byte[]> producer = new KafkaProducer<byte[], byte[]>(
props, new ByteArraySerializer(), new ByteArraySerializer());
} catch (KafkaException e) {
Assert.assertEquals(1, MockMetricsReporter.CLOSE_COUNT.get());
MockMetricsReporter.CLOSE_COUNT.set(0);
Assert.assertEquals("Failed to construct kafka producer", e.getMessage());
return;
}
Assert.fail("should have caught an exception and returned");
}
}

View File

@ -0,0 +1,52 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.test;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricsReporter;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
public class MockMetricsReporter implements MetricsReporter {
public static final AtomicInteger CLOSE_COUNT = new AtomicInteger(0);
public MockMetricsReporter() {
}
@Override
public void init(List<KafkaMetric> metrics) {
}
@Override
public void metricChange(KafkaMetric metric) {
}
@Override
public void close() {
CLOSE_COUNT.incrementAndGet();
}
@Override
public void configure(Map<String, ?> configs) {
}
}