KAFKA-5169; KafkaConsumer.close should be idempotent

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #2968 from mjsax/kafka-5169-consumer-close
This commit is contained in:
Matthias J. Sax 2017-05-05 08:51:02 +01:00 committed by Ismael Juma
parent 05ea454dfb
commit 715eae6da9
3 changed files with 21 additions and 4 deletions

View File

@ -1533,6 +1533,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* @throws IllegalArgumentException If the <code>timeout</code> is negative.
*/
public void close(long timeout, TimeUnit timeUnit) {
if (closed)
return;
if (timeout < 0)
throw new IllegalArgumentException("The timeout cannot be negative.");
acquire();

View File

@ -16,7 +16,6 @@
*/
package org.apache.kafka.clients.consumer;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
@ -68,7 +67,9 @@ import org.apache.kafka.test.MockConsumerInterceptor;
import org.apache.kafka.test.MockMetricsReporter;
import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.nio.ByteBuffer;
import java.util.Arrays;
@ -84,6 +85,7 @@ import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
@ -98,9 +100,6 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import org.junit.Rule;
import org.junit.rules.ExpectedException;
public class KafkaConsumerTest {
private final String topic = "test";
private final TopicPartition tp0 = new TopicPartition(topic, 0);
@ -1228,6 +1227,13 @@ public class KafkaConsumerTest {
consumerCloseTest(Long.MAX_VALUE, Collections.<AbstractResponse>emptyList(), 0, true);
}
@Test
public void closeShouldBeIdempotent() {
KafkaConsumer<byte[], byte[]> consumer = newConsumer();
consumer.close();
consumer.close();
}
private void consumerCloseTest(final long closeTimeoutMs,
List<? extends AbstractResponse> responses,
long waitMs,

View File

@ -409,4 +409,13 @@ public class KafkaProducerTest {
}
@Test
public void closeShouldBeIdempotent() {
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
Producer producer = new KafkaProducer<>(producerProps, new ByteArraySerializer(), new ByteArraySerializer());
producer.close();
producer.close();
}
}