KAFKA-7391; Introduce close(Duration) to Producer and AdminClient instead of close(long, TimeUnit) (#5667)

See KIP-367: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=89070496.

Reviewers: Viktor Somogyi <viktorsomogyi@gmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com>, Jason Gustafson <jason@confluent.io>
This commit is contained in:
Chia-Ping Tsai 2019-01-16 00:48:32 +08:00 committed by Jason Gustafson
parent 13f679013a
commit af634a4a98
11 changed files with 70 additions and 43 deletions

View File

@ -26,6 +26,7 @@ import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.config.ConfigResource;
import java.time.Duration;
import java.util.Collection;
import java.util.Map;
import java.util.Properties;
@ -84,8 +85,24 @@ public abstract class AdminClient implements AutoCloseable {
*
* @param duration The duration to use for the wait time.
* @param unit The time unit to use for the wait time.
* @deprecated Since 2.2. Use {@link #close(Duration)} or {@link #close()}.
*/
public abstract void close(long duration, TimeUnit unit);
@Deprecated
public void close(long duration, TimeUnit unit) {
close(Duration.ofMillis(unit.toMillis(duration)));
}
/**
* Close the AdminClient and release all associated resources.
*
* The close operation has a grace period during which current operations will be allowed to
* complete, specified by the given duration.
* New operations will not be accepted during the grace period. Once the grace period is over,
* all operations that have not yet been completed will be aborted with a TimeoutException.
*
* @param timeout The time to use for the wait time.
*/
public abstract void close(Duration timeout);
/**
* Create a batch of new topics with the default options.

View File

@ -126,6 +126,7 @@ import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -439,8 +440,10 @@ public class KafkaAdminClient extends AdminClient {
}
@Override
public void close(long duration, TimeUnit unit) {
long waitTimeMs = unit.toMillis(duration);
public void close(Duration timeout) {
long waitTimeMs = timeout.toMillis();
if (waitTimeMs < 0)
throw new IllegalArgumentException("The timeout cannot be negative.");
waitTimeMs = Math.min(TimeUnit.DAYS.toMillis(365), waitTimeMs); // Limit the timeout to a year.
long now = time.milliseconds();
long newHardShutdownTimeMs = now + waitTimeMs;

View File

@ -17,6 +17,7 @@
package org.apache.kafka.clients.producer;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@ -424,7 +425,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
log.debug("Kafka producer started");
} catch (Throwable t) {
// call close methods if internal objects are already constructed this is to prevent resource leak. see KAFKA-2121
close(0, TimeUnit.MILLISECONDS, true);
close(Duration.ofMillis(0), true);
// now propagate the exception
throw new KafkaException("Failed to construct kafka producer", t);
}
@ -1107,7 +1108,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
*/
@Override
public void close() {
close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
close(Duration.ofMillis(Long.MAX_VALUE));
}
/**
@ -1117,25 +1118,24 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
* any unsent and unacknowledged records immediately.
* <p>
* If invoked from within a {@link Callback} this method will not block and will be equivalent to
* <code>close(0, TimeUnit.MILLISECONDS)</code>. This is done since no further sending will happen while
* <code>close(Duration.ofMillis(0))</code>. This is done since no further sending will happen while
* blocking the I/O thread of the producer.
*
* @param timeout The maximum time to wait for producer to complete any pending requests. The value should be
* non-negative. Specifying a timeout of zero means do not wait for pending send requests to complete.
* @param timeUnit The time unit for the <code>timeout</code>
* @throws InterruptException If the thread is interrupted while blocked
* @throws IllegalArgumentException If the <code>timeout</code> is negative.
*
*/
@Override
public void close(long timeout, TimeUnit timeUnit) {
close(timeout, timeUnit, false);
public void close(Duration timeout) {
close(timeout, false);
}
private void close(long timeout, TimeUnit timeUnit, boolean swallowException) {
if (timeout < 0)
private void close(Duration timeout, boolean swallowException) {
long timeoutMs = timeout.toMillis();
if (timeoutMs < 0)
throw new IllegalArgumentException("The timeout cannot be negative.");
long timeoutMs = timeUnit.toMillis(timeout);
log.info("Closing the Kafka producer with timeoutMillis = {} ms.", timeoutMs);
// this will keep track of the first encountered exception

View File

@ -31,6 +31,7 @@ import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Time;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
@ -39,7 +40,6 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
* A mock of the producer interface you can use for testing code that uses Kafka.
@ -312,11 +312,11 @@ public class MockProducer<K, V> implements Producer<K, V> {
@Override
public void close() {
close(0, null);
close(Duration.ofMillis(0));
}
@Override
public void close(long timeout, TimeUnit timeUnit) {
public void close(Duration timeout) {
if (producerFencedOnClose) {
throw new ProducerFencedException("MockProducer is fenced.");
}

View File

@ -24,6 +24,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
import java.io.Closeable;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
@ -92,9 +93,13 @@ public interface Producer<K, V> extends Closeable {
*/
void close();
/**
* See {@link KafkaProducer#close(long, TimeUnit)}
*/
void close(long timeout, TimeUnit unit);
@Deprecated
default void close(long timeout, TimeUnit unit) {
close(Duration.ofMillis(unit.toMillis(timeout)));
}
/**
* See {@link KafkaProducer#close(Duration)}
*/
void close(Duration timeout);
}

View File

@ -31,13 +31,13 @@ import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class MockAdminClient extends AdminClient {
public static final String DEFAULT_CLUSTER_ID = "I4ZmrWqfT2e-upky_4fdPA";
@ -385,7 +385,7 @@ public class MockAdminClient extends AdminClient {
}
@Override
public void close(long duration, TimeUnit unit) {}
public void close(Duration timeout) {}
private final static class TopicMetadata {

View File

@ -50,6 +50,7 @@ import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.Test;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -344,7 +345,7 @@ public class KafkaProducerTest {
verify(metadata, times(4)).awaitUpdate(anyInt(), anyLong());
verify(metadata, times(7)).fetch();
producer.close(0, TimeUnit.MILLISECONDS);
producer.close(Duration.ofMillis(0));
}
@Test
@ -388,7 +389,7 @@ public class KafkaProducerTest {
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof TimeoutException);
} finally {
producer.close(0, TimeUnit.MILLISECONDS);
producer.close(Duration.ofMillis(0));
}
}
@ -420,7 +421,7 @@ public class KafkaProducerTest {
verify(metadata, times(2)).awaitUpdate(anyInt(), anyLong());
verify(metadata, times(3)).fetch();
producer.close(0, TimeUnit.MILLISECONDS);
producer.close(Duration.ofMillis(0));
}
@Test
@ -464,7 +465,7 @@ public class KafkaProducerTest {
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof TimeoutException);
} finally {
producer.close(0, TimeUnit.MILLISECONDS);
producer.close(Duration.ofMillis(0));
}
}
@ -561,7 +562,7 @@ public class KafkaProducerTest {
verify(valueSerializer).serialize(topic, record.headers(), value);
verify(keySerializer).serialize(topic, record.headers(), key);
producer.close(0, TimeUnit.MILLISECONDS);
producer.close(Duration.ofMillis(0));
}
@Test
@ -611,7 +612,7 @@ public class KafkaProducerTest {
verify(interceptors).onSend(record);
verify(interceptors).onSendError(eq(record), notNull(), notNull());
producer.close(0, TimeUnit.MILLISECONDS);
producer.close(Duration.ofMillis(0));
}
@Test
@ -672,7 +673,7 @@ public class KafkaProducerTest {
try {
producer.beginTransaction();
} finally {
producer.close(0, TimeUnit.MILLISECONDS);
producer.close(Duration.ofMillis(0));
}
}
@ -711,7 +712,7 @@ public class KafkaProducerTest {
metadata.fetch().invalidTopics());
TestUtils.assertFutureError(future, InvalidTopicException.class);
producer.close(0, TimeUnit.MILLISECONDS);
producer.close(Duration.ofMillis(0));
}
@Test
@ -750,7 +751,7 @@ public class KafkaProducerTest {
// Wait until metadata update for the topic has been requested
TestUtils.waitForCondition(() -> metadata.containsTopic(topicName), "Timeout when waiting for topic to be added to metadata");
producer.close(0, TimeUnit.MILLISECONDS);
producer.close(Duration.ofMillis(0));
TestUtils.waitForCondition(() -> sendException.get() != null, "No producer exception within timeout");
assertEquals(KafkaException.class, sendException.get().getClass());
} finally {

View File

@ -48,6 +48,7 @@ import org.apache.kafka.connect.util.ConnectorTaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
@ -151,7 +152,7 @@ class WorkerSourceTask extends WorkerTask {
}
if (producer != null) {
try {
producer.close(30, TimeUnit.SECONDS);
producer.close(Duration.ofSeconds(30));
} catch (Throwable t) {
log.warn("Could not close producer", t);
}

View File

@ -54,6 +54,7 @@ import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.reflect.Whitebox;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@ -169,7 +170,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
}
});
producer.close(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class));
producer.close(EasyMock.anyObject(Duration.class));
EasyMock.expectLastCall();
transformationChain.close();
@ -217,7 +218,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
statusListener.onShutdown(taskId);
EasyMock.expectLastCall();
producer.close(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class));
producer.close(EasyMock.anyObject(Duration.class));
EasyMock.expectLastCall();
transformationChain.close();
@ -266,7 +267,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
statusListener.onShutdown(taskId);
EasyMock.expectLastCall();
producer.close(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class));
producer.close(EasyMock.anyObject(Duration.class));
EasyMock.expectLastCall();
transformationChain.close();
@ -315,7 +316,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
EasyMock.expectLastCall();
expectOffsetFlush(true);
producer.close(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class));
producer.close(EasyMock.anyObject(Duration.class));
EasyMock.expectLastCall();
transformationChain.close();
@ -359,7 +360,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
statusListener.onShutdown(taskId);
EasyMock.expectLastCall();
producer.close(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class));
producer.close(EasyMock.anyObject(Duration.class));
EasyMock.expectLastCall();
transformationChain.close();
@ -404,7 +405,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
statusListener.onShutdown(taskId);
EasyMock.expectLastCall();
producer.close(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class));
producer.close(EasyMock.anyObject(Duration.class));
EasyMock.expectLastCall();
transformationChain.close();
@ -594,7 +595,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
statusListener.onShutdown(taskId);
EasyMock.expectLastCall();
producer.close(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class));
producer.close(EasyMock.anyObject(Duration.class));
EasyMock.expectLastCall();
transformationChain.close();

View File

@ -46,13 +46,13 @@ import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.hamcrest.MatcherAssert.assertThat;
@ -160,7 +160,7 @@ public abstract class AbstractJoinIntegrationTest {
@After
public void cleanup() throws InterruptedException {
producer.close(0, TimeUnit.MILLISECONDS);
producer.close(Duration.ofMillis(0));
CLUSTER.deleteAllTopicsAndWait(120000);
}

View File

@ -64,7 +64,6 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import kafka.tools.StreamsResetter;
@ -86,7 +85,7 @@ public abstract class AbstractResetIntegrationTest {
@AfterClass
public static void afterClassCleanup() {
if (adminClient != null) {
adminClient.close(10, TimeUnit.SECONDS);
adminClient.close(Duration.ofSeconds(10));
adminClient = null;
}
}