diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index da5b7fbb840..9df674da0c1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -1533,6 +1533,8 @@ public class KafkaConsumer implements Consumer { * @throws IllegalArgumentException If the timeout is negative. */ public void close(long timeout, TimeUnit timeUnit) { + if (closed) + return; if (timeout < 0) throw new IllegalArgumentException("The timeout cannot be negative."); acquire(); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 8e4935016ea..a598b5da873 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.clients.consumer; -import java.util.concurrent.ScheduledExecutorService; import org.apache.kafka.clients.ClientRequest; import org.apache.kafka.clients.KafkaClient; import org.apache.kafka.clients.Metadata; @@ -68,7 +67,9 @@ import org.apache.kafka.test.MockConsumerInterceptor; import org.apache.kafka.test.MockMetricsReporter; import org.apache.kafka.test.TestUtils; import org.junit.Assert; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import java.nio.ByteBuffer; import java.util.Arrays; @@ -84,6 +85,7 @@ import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -98,9 +100,6 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import org.junit.Rule; -import org.junit.rules.ExpectedException; - public class KafkaConsumerTest { private final String topic = "test"; private final TopicPartition tp0 = new TopicPartition(topic, 0); @@ -1228,6 +1227,13 @@ public class KafkaConsumerTest { consumerCloseTest(Long.MAX_VALUE, Collections.emptyList(), 0, true); } + @Test + public void closeShouldBeIdempotent() { + KafkaConsumer consumer = newConsumer(); + consumer.close(); + consumer.close(); + } + private void consumerCloseTest(final long closeTimeoutMs, List responses, long waitMs, diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java index 514426d40c0..ea493d22538 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java @@ -409,4 +409,13 @@ public class KafkaProducerTest { } + @Test + public void closeShouldBeIdempotent() { + Properties producerProps = new Properties(); + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000"); + Producer producer = new KafkaProducer<>(producerProps, new ByteArraySerializer(), new ByteArraySerializer()); + producer.close(); + producer.close(); + } + }