From af634a4a98eaa2457752e3f2841720020e0e9ad0 Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Wed, 16 Jan 2019 00:48:32 +0800 Subject: [PATCH] KAFKA-7391; Introduce close(Duration) to Producer and AdminClient instead of close(long, TimeUnit) (#5667) See KIP-367: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=89070496. Reviewers: Viktor Somogyi , Manikumar Reddy , Jason Gustafson --- .../kafka/clients/admin/AdminClient.java | 19 +++++++++++++++++- .../kafka/clients/admin/KafkaAdminClient.java | 7 +++++-- .../kafka/clients/producer/KafkaProducer.java | 20 +++++++++---------- .../kafka/clients/producer/MockProducer.java | 6 +++--- .../kafka/clients/producer/Producer.java | 13 ++++++++---- .../kafka/clients/admin/MockAdminClient.java | 4 ++-- .../clients/producer/KafkaProducerTest.java | 19 +++++++++--------- .../connect/runtime/WorkerSourceTask.java | 3 ++- .../connect/runtime/WorkerSourceTaskTest.java | 15 +++++++------- .../AbstractJoinIntegrationTest.java | 4 ++-- .../AbstractResetIntegrationTest.java | 3 +-- 11 files changed, 70 insertions(+), 43 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java index fdda622e030..1521ee53e1f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java @@ -26,6 +26,7 @@ import org.apache.kafka.common.acl.AclBindingFilter; import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.common.config.ConfigResource; +import java.time.Duration; import java.util.Collection; import java.util.Map; import java.util.Properties; @@ -84,8 +85,24 @@ public abstract class AdminClient implements AutoCloseable { * * @param duration The duration to use for the wait time. * @param unit The time unit to use for the wait time. + * @deprecated Since 2.2. Use {@link #close(Duration)} or {@link #close()}. */ - public abstract void close(long duration, TimeUnit unit); + @Deprecated + public void close(long duration, TimeUnit unit) { + close(Duration.ofMillis(unit.toMillis(duration))); + } + + /** + * Close the AdminClient and release all associated resources. + * + * The close operation has a grace period during which current operations will be allowed to + * complete, specified by the given duration. + * New operations will not be accepted during the grace period. Once the grace period is over, + * all operations that have not yet been completed will be aborted with a TimeoutException. + * + * @param timeout The time to use for the wait time. + */ + public abstract void close(Duration timeout); /** * Create a batch of new topics with the default options. diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index ce51eee4e5e..2ba3cf225b4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -126,6 +126,7 @@ import org.apache.kafka.common.utils.Time; import org.slf4j.Logger; import java.net.InetSocketAddress; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -439,8 +440,10 @@ public class KafkaAdminClient extends AdminClient { } @Override - public void close(long duration, TimeUnit unit) { - long waitTimeMs = unit.toMillis(duration); + public void close(Duration timeout) { + long waitTimeMs = timeout.toMillis(); + if (waitTimeMs < 0) + throw new IllegalArgumentException("The timeout cannot be negative."); waitTimeMs = Math.min(TimeUnit.DAYS.toMillis(365), waitTimeMs); // Limit the timeout to a year. long now = time.milliseconds(); long newHardShutdownTimeMs = now + waitTimeMs; diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 540493d86bd..bfd6bf3ec9b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -17,6 +17,7 @@ package org.apache.kafka.clients.producer; import java.net.InetSocketAddress; +import java.time.Duration; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -424,7 +425,7 @@ public class KafkaProducer implements Producer { log.debug("Kafka producer started"); } catch (Throwable t) { // call close methods if internal objects are already constructed this is to prevent resource leak. see KAFKA-2121 - close(0, TimeUnit.MILLISECONDS, true); + close(Duration.ofMillis(0), true); // now propagate the exception throw new KafkaException("Failed to construct kafka producer", t); } @@ -1107,7 +1108,7 @@ public class KafkaProducer implements Producer { */ @Override public void close() { - close(Long.MAX_VALUE, TimeUnit.MILLISECONDS); + close(Duration.ofMillis(Long.MAX_VALUE)); } /** @@ -1117,25 +1118,24 @@ public class KafkaProducer implements Producer { * any unsent and unacknowledged records immediately. *

* If invoked from within a {@link Callback} this method will not block and will be equivalent to - * close(0, TimeUnit.MILLISECONDS). This is done since no further sending will happen while + * close(Duration.ofMillis(0)). This is done since no further sending will happen while * blocking the I/O thread of the producer. * * @param timeout The maximum time to wait for producer to complete any pending requests. The value should be * non-negative. Specifying a timeout of zero means do not wait for pending send requests to complete. - * @param timeUnit The time unit for the timeout * @throws InterruptException If the thread is interrupted while blocked * @throws IllegalArgumentException If the timeout is negative. + * */ @Override - public void close(long timeout, TimeUnit timeUnit) { - close(timeout, timeUnit, false); + public void close(Duration timeout) { + close(timeout, false); } - private void close(long timeout, TimeUnit timeUnit, boolean swallowException) { - if (timeout < 0) + private void close(Duration timeout, boolean swallowException) { + long timeoutMs = timeout.toMillis(); + if (timeoutMs < 0) throw new IllegalArgumentException("The timeout cannot be negative."); - - long timeoutMs = timeUnit.toMillis(timeout); log.info("Closing the Kafka producer with timeoutMillis = {} ms.", timeoutMs); // this will keep track of the first encountered exception diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java index e448d6ebdc3..c2561cfc79b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java @@ -31,6 +31,7 @@ import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Time; +import java.time.Duration; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Deque; @@ -39,7 +40,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; /** * A mock of the producer interface you can use for testing code that uses Kafka. @@ -312,11 +312,11 @@ public class MockProducer implements Producer { @Override public void close() { - close(0, null); + close(Duration.ofMillis(0)); } @Override - public void close(long timeout, TimeUnit timeUnit) { + public void close(Duration timeout) { if (producerFencedOnClose) { throw new ProducerFencedException("MockProducer is fenced."); } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java index 49820333731..96d487a7923 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java @@ -24,6 +24,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.ProducerFencedException; import java.io.Closeable; +import java.time.Duration; import java.util.List; import java.util.Map; import java.util.concurrent.Future; @@ -92,9 +93,13 @@ public interface Producer extends Closeable { */ void close(); - /** - * See {@link KafkaProducer#close(long, TimeUnit)} - */ - void close(long timeout, TimeUnit unit); + @Deprecated + default void close(long timeout, TimeUnit unit) { + close(Duration.ofMillis(unit.toMillis(timeout))); + } + /** + * See {@link KafkaProducer#close(Duration)} + */ + void close(Duration timeout); } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java index ed7efd5a869..aa2e6835137 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java @@ -31,13 +31,13 @@ import org.apache.kafka.common.errors.TopicExistsException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.internals.KafkaFutureImpl; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; public class MockAdminClient extends AdminClient { public static final String DEFAULT_CLUSTER_ID = "I4ZmrWqfT2e-upky_4fdPA"; @@ -385,7 +385,7 @@ public class MockAdminClient extends AdminClient { } @Override - public void close(long duration, TimeUnit unit) {} + public void close(Duration timeout) {} private final static class TopicMetadata { diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java index a73ab7c14d5..f3faf26e616 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java @@ -50,6 +50,7 @@ import org.apache.kafka.test.TestUtils; import org.junit.Assert; import org.junit.Test; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -344,7 +345,7 @@ public class KafkaProducerTest { verify(metadata, times(4)).awaitUpdate(anyInt(), anyLong()); verify(metadata, times(7)).fetch(); - producer.close(0, TimeUnit.MILLISECONDS); + producer.close(Duration.ofMillis(0)); } @Test @@ -388,7 +389,7 @@ public class KafkaProducerTest { } catch (ExecutionException e) { assertTrue(e.getCause() instanceof TimeoutException); } finally { - producer.close(0, TimeUnit.MILLISECONDS); + producer.close(Duration.ofMillis(0)); } } @@ -420,7 +421,7 @@ public class KafkaProducerTest { verify(metadata, times(2)).awaitUpdate(anyInt(), anyLong()); verify(metadata, times(3)).fetch(); - producer.close(0, TimeUnit.MILLISECONDS); + producer.close(Duration.ofMillis(0)); } @Test @@ -464,7 +465,7 @@ public class KafkaProducerTest { } catch (ExecutionException e) { assertTrue(e.getCause() instanceof TimeoutException); } finally { - producer.close(0, TimeUnit.MILLISECONDS); + producer.close(Duration.ofMillis(0)); } } @@ -561,7 +562,7 @@ public class KafkaProducerTest { verify(valueSerializer).serialize(topic, record.headers(), value); verify(keySerializer).serialize(topic, record.headers(), key); - producer.close(0, TimeUnit.MILLISECONDS); + producer.close(Duration.ofMillis(0)); } @Test @@ -611,7 +612,7 @@ public class KafkaProducerTest { verify(interceptors).onSend(record); verify(interceptors).onSendError(eq(record), notNull(), notNull()); - producer.close(0, TimeUnit.MILLISECONDS); + producer.close(Duration.ofMillis(0)); } @Test @@ -672,7 +673,7 @@ public class KafkaProducerTest { try { producer.beginTransaction(); } finally { - producer.close(0, TimeUnit.MILLISECONDS); + producer.close(Duration.ofMillis(0)); } } @@ -711,7 +712,7 @@ public class KafkaProducerTest { metadata.fetch().invalidTopics()); TestUtils.assertFutureError(future, InvalidTopicException.class); - producer.close(0, TimeUnit.MILLISECONDS); + producer.close(Duration.ofMillis(0)); } @Test @@ -750,7 +751,7 @@ public class KafkaProducerTest { // Wait until metadata update for the topic has been requested TestUtils.waitForCondition(() -> metadata.containsTopic(topicName), "Timeout when waiting for topic to be added to metadata"); - producer.close(0, TimeUnit.MILLISECONDS); + producer.close(Duration.ofMillis(0)); TestUtils.waitForCondition(() -> sendException.get() != null, "No producer exception within timeout"); assertEquals(KafkaException.class, sendException.get().getClass()); } finally { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java index 623a210e0e2..71e026c7176 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java @@ -48,6 +48,7 @@ import org.apache.kafka.connect.util.ConnectorTaskId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Duration; import java.util.IdentityHashMap; import java.util.List; import java.util.Map; @@ -151,7 +152,7 @@ class WorkerSourceTask extends WorkerTask { } if (producer != null) { try { - producer.close(30, TimeUnit.SECONDS); + producer.close(Duration.ofSeconds(30)); } catch (Throwable t) { log.warn("Could not close producer", t); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java index 95762f3215f..24a13c2be26 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java @@ -54,6 +54,7 @@ import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.modules.junit4.PowerMockRunner; import org.powermock.reflect.Whitebox; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -169,7 +170,7 @@ public class WorkerSourceTaskTest extends ThreadedTest { } }); - producer.close(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class)); + producer.close(EasyMock.anyObject(Duration.class)); EasyMock.expectLastCall(); transformationChain.close(); @@ -217,7 +218,7 @@ public class WorkerSourceTaskTest extends ThreadedTest { statusListener.onShutdown(taskId); EasyMock.expectLastCall(); - producer.close(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class)); + producer.close(EasyMock.anyObject(Duration.class)); EasyMock.expectLastCall(); transformationChain.close(); @@ -266,7 +267,7 @@ public class WorkerSourceTaskTest extends ThreadedTest { statusListener.onShutdown(taskId); EasyMock.expectLastCall(); - producer.close(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class)); + producer.close(EasyMock.anyObject(Duration.class)); EasyMock.expectLastCall(); transformationChain.close(); @@ -315,7 +316,7 @@ public class WorkerSourceTaskTest extends ThreadedTest { EasyMock.expectLastCall(); expectOffsetFlush(true); - producer.close(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class)); + producer.close(EasyMock.anyObject(Duration.class)); EasyMock.expectLastCall(); transformationChain.close(); @@ -359,7 +360,7 @@ public class WorkerSourceTaskTest extends ThreadedTest { statusListener.onShutdown(taskId); EasyMock.expectLastCall(); - producer.close(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class)); + producer.close(EasyMock.anyObject(Duration.class)); EasyMock.expectLastCall(); transformationChain.close(); @@ -404,7 +405,7 @@ public class WorkerSourceTaskTest extends ThreadedTest { statusListener.onShutdown(taskId); EasyMock.expectLastCall(); - producer.close(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class)); + producer.close(EasyMock.anyObject(Duration.class)); EasyMock.expectLastCall(); transformationChain.close(); @@ -594,7 +595,7 @@ public class WorkerSourceTaskTest extends ThreadedTest { statusListener.onShutdown(taskId); EasyMock.expectLastCall(); - producer.close(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class)); + producer.close(EasyMock.anyObject(Duration.class)); EasyMock.expectLastCall(); transformationChain.close(); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java index a3baebb276c..badaa36cd55 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java @@ -46,13 +46,13 @@ import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Properties; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import static org.hamcrest.MatcherAssert.assertThat; @@ -160,7 +160,7 @@ public abstract class AbstractJoinIntegrationTest { @After public void cleanup() throws InterruptedException { - producer.close(0, TimeUnit.MILLISECONDS); + producer.close(Duration.ofMillis(0)); CLUSTER.deleteAllTopicsAndWait(120000); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java index cc9a2ffa495..aeb581eb185 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java @@ -64,7 +64,6 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; import kafka.tools.StreamsResetter; @@ -86,7 +85,7 @@ public abstract class AbstractResetIntegrationTest { @AfterClass public static void afterClassCleanup() { if (adminClient != null) { - adminClient.close(10, TimeUnit.SECONDS); + adminClient.close(Duration.ofSeconds(10)); adminClient = null; } }