KAFKA-6979; Add `default.api.timeout.ms` to KafkaConsumer (KIP-266) (#5122)

Adds a configuration that specifies the default timeout for KafkaConsumer APIs that could block. This was introduced in KIP-266.

Reviewers: Satish Duggana <satish.duggana@gmail.com>, Jason Gustafson <jason@confluent.io>
This commit is contained in:
Dhruvil Shah 2018-06-12 16:29:50 -07:00 committed by Jason Gustafson
parent a592402512
commit 53ca52f855
4 changed files with 54 additions and 21 deletions

View File

@ -218,6 +218,10 @@ public class ConsumerConfig extends AbstractConfig {
public static final String REQUEST_TIMEOUT_MS_CONFIG = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG;
private static final String REQUEST_TIMEOUT_MS_DOC = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC;
/** <code>default.api.timeout.ms</code> */
public static final String DEFAULT_API_TIMEOUT_MS_CONFIG = "default.api.timeout.ms";
public static final String DEFAULT_API_TIMEOUT_MS_DOC = "Specifies the timeout (in milliseconds) for consumer APIs that could block. This configuration is used as the default timeout for all consumer operations that do not explicitly accept a <code>timeout</code> parameter.";
/** <code>interceptor.classes</code> */
public static final String INTERCEPTOR_CLASSES_CONFIG = "interceptor.classes";
public static final String INTERCEPTOR_CLASSES_DOC = "A list of classes to use as interceptors. "
@ -403,6 +407,12 @@ public class ConsumerConfig extends AbstractConfig {
atLeast(0),
Importance.MEDIUM,
REQUEST_TIMEOUT_MS_DOC)
.define(DEFAULT_API_TIMEOUT_MS_CONFIG,
Type.INT,
60 * 1000,
atLeast(0),
Importance.MEDIUM,
DEFAULT_API_TIMEOUT_MS_DOC)
/* default is set to be a bit lower than the server default (10 min), to avoid both client and server closing connection at same time */
.define(CONNECTIONS_MAX_IDLE_MS_CONFIG,
Type.LONG,

View File

@ -567,6 +567,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
private final Metadata metadata;
private final long retryBackoffMs;
private final long requestTimeoutMs;
private final int defaultApiTimeoutMs;
private volatile boolean closed = false;
private List<PartitionAssignor> assignors;
@ -666,6 +667,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
log.debug("Initializing the Kafka consumer");
this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
this.defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
int sessionTimeOutMs = config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG);
int fetchMaxWaitMs = config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
if (this.requestTimeoutMs <= sessionTimeOutMs || this.requestTimeoutMs <= fetchMaxWaitMs)
@ -814,6 +816,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
Metadata metadata,
long retryBackoffMs,
long requestTimeoutMs,
int defaultApiTimeoutMs,
List<PartitionAssignor> assignors) {
this.log = logContext.logger(getClass());
this.clientId = clientId;
@ -829,6 +832,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
this.metadata = metadata;
this.retryBackoffMs = retryBackoffMs;
this.requestTimeoutMs = requestTimeoutMs;
this.defaultApiTimeoutMs = defaultApiTimeoutMs;
this.assignors = assignors;
}
@ -1268,8 +1272,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* every rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
* should not be used.
* <p>
* This is a synchronous commits and will block until either the commit succeeds or an unrecoverable error is
* encountered (in which case it is thrown to the caller).
* This is a synchronous commit and will block until either the commit succeeds, an unrecoverable error is
* encountered (in which case it is thrown to the caller), or the timeout specified by {@code default.api.timeout.ms} expires
* (in which case a {@link org.apache.kafka.common.errors.TimeoutException} is thrown to the caller).
* <p>
* Note that asynchronous offset commits sent previously with the {@link #commitAsync(OffsetCommitCallback)}
* (or similar) are guaranteed to have their callbacks invoked prior to completion of this method.
@ -1286,10 +1291,12 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* configured groupId. See the exception for more details
* @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. if offset metadata
* is too large or if the topic does not exist).
* @throws org.apache.kafka.common.errors.TimeoutException if the timeout specified by {@code default.api.timeout.ms} expires
* before successful completion of the offset commit
*/
@Override
public void commitSync() {
commitSync(Duration.ofMillis(Long.MAX_VALUE));
commitSync(Duration.ofMillis(defaultApiTimeoutMs));
}
/**
@ -1343,7 +1350,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* i.e. lastProcessedMessageOffset + 1.
* <p>
* This is a synchronous commits and will block until either the commit succeeds or an unrecoverable error is
* encountered (in which case it is thrown to the caller).
* encountered (in which case it is thrown to the caller), or the timeout specified by {@code default.api.timeout.ms} expires
* (in which case a {@link org.apache.kafka.common.errors.TimeoutException} is thrown to the caller).
* <p>
* Note that asynchronous offset commits sent previously with the {@link #commitAsync(OffsetCommitCallback)}
* (or similar) are guaranteed to have their callbacks invoked prior to completion of this method.
@ -1362,10 +1370,12 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* @throws java.lang.IllegalArgumentException if the committed offset is negative
* @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. if offset metadata
* is too large or if the topic does not exist).
* @throws org.apache.kafka.common.errors.TimeoutException if the timeout expires before successful completion
* of the offset commit
*/
@Override
public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets) {
commitSync(offsets, Duration.ofMillis(Long.MAX_VALUE));
commitSync(offsets, Duration.ofMillis(defaultApiTimeoutMs));
}
/**
@ -1560,7 +1570,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* This method may issue a remote call to the server if there is no current position for the given partition.
* <p>
* This call will block until either the position could be determined or an unrecoverable error is
* encountered (in which case it is thrown to the caller).
* encountered (in which case it is thrown to the caller), or the timeout specified by {@code default.api.timeout.ms} expires
* (in which case a {@link org.apache.kafka.common.errors.TimeoutException} is thrown to the caller).
*
* @param partition The partition to get the position for
* @return The current position of the consumer (that is, the offset of the next record to be fetched)
@ -1575,10 +1586,12 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the
* configured groupId. See the exception for more details
* @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
* @throws org.apache.kafka.common.errors.TimeoutException if the position cannot be determined before the
* timeout specified by {@code default.api.timeout.ms} expires
*/
@Override
public long position(TopicPartition partition) {
return position(partition, Duration.ofMillis(Long.MAX_VALUE));
return position(partition, Duration.ofMillis(defaultApiTimeoutMs));
}
/**
@ -1641,7 +1654,10 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* Get the last committed offset for the given partition (whether the commit happened by this process or
* another). This offset will be used as the position for the consumer in the event of a failure.
* <p>
* This call will block to do a remote call to get the latest committed offsets from the server.
* This call will do a remote call to get the latest committed offset from the server, and will block until the
* committed offset is gotten successfully, an unrecoverable error is encountered (in which case it is thrown to
* the caller), or the timeout specified by {@code default.api.timeout.ms} expires (in which case a
* {@link org.apache.kafka.common.errors.TimeoutException} is thrown to the caller).
*
* @param partition The partition to check
* @return The last committed offset and metadata or null if there was no prior commit
@ -1653,10 +1669,12 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the
* configured groupId. See the exception for more details
* @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
* @throws org.apache.kafka.common.errors.TimeoutException if the committed offset cannot be found before
* the timeout specified by {@code default.api.timeout.ms} expires.
*/
@Override
public OffsetAndMetadata committed(TopicPartition partition) {
return committed(partition, Duration.ofMillis(Long.MAX_VALUE));
return committed(partition, Duration.ofMillis(defaultApiTimeoutMs));
}
/**
@ -1718,11 +1736,11 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the specified topic. See the exception for more details
* @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
* @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before
* the amount of time allocated by {@code request.timeout.ms} expires.
* the amount of time allocated by {@code default.api.timeout.ms} expires.
*/
@Override
public List<PartitionInfo> partitionsFor(String topic) {
return partitionsFor(topic, Duration.ofMillis(requestTimeoutMs));
return partitionsFor(topic, Duration.ofMillis(defaultApiTimeoutMs));
}
/**
@ -1774,11 +1792,11 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* this function is called
* @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
* @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before
* the amount of time allocated by {@code request.timeout.ms} expires.
* the amount of time allocated by {@code default.api.timeout.ms} expires.
*/
@Override
public Map<String, List<PartitionInfo>> listTopics() {
return listTopics(Duration.ofMillis(requestTimeoutMs));
return listTopics(Duration.ofMillis(defaultApiTimeoutMs));
}
/**
@ -1879,13 +1897,13 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic(s). See the exception for more details
* @throws IllegalArgumentException if the target timestamp is negative
* @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before
* the amount of time allocated by {@code request.timeout.ms} expires.
* the amount of time allocated by {@code default.api.timeout.ms} expires.
* @throws org.apache.kafka.common.errors.UnsupportedVersionException if the broker does not support looking up
* the offsets by timestamp
*/
@Override
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {
return offsetsForTimes(timestampsToSearch, Duration.ofMillis(requestTimeoutMs));
return offsetsForTimes(timestampsToSearch, Duration.ofMillis(defaultApiTimeoutMs));
}
/**
@ -1939,11 +1957,11 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
* @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic(s). See the exception for more details
* @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before
* expiration of the configured {@code request.timeout.ms}
* expiration of the configured {@code default.api.timeout.ms}
*/
@Override
public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) {
return beginningOffsets(partitions, Duration.ofMillis(requestTimeoutMs));
return beginningOffsets(partitions, Duration.ofMillis(defaultApiTimeoutMs));
}
/**

View File

@ -1748,6 +1748,7 @@ public class KafkaConsumerTest {
String metricGroupPrefix = "consumer";
long retryBackoffMs = 100;
long requestTimeoutMs = 30000;
int defaultApiTimeoutMs = 30000;
boolean excludeInternalTopics = true;
int minBytes = 1;
int maxBytes = Integer.MAX_VALUE;
@ -1825,6 +1826,7 @@ public class KafkaConsumerTest {
metadata,
retryBackoffMs,
requestTimeoutMs,
defaultApiTimeoutMs,
assignors);
}

View File

@ -98,10 +98,13 @@
<code>internal.value.converter=org.apache.kafka.connect.json.JsonConverter</code>
<code>internal.value.converter.schemas.enable=false</code>
</li>
<li><a href="https://cwiki.apache.org/confluence/x/5kiHB">KIP-266</a> adds overloads to the consumer to support
timeout behavior for blocking APIs. In particular, a new <code>poll(Duration)</code> API has been added which
does not block for dynamic partition assignment. The old <code>poll(long)</code> API has been deprecated and
will be removed in a future version.</li>
<li><a href="https://cwiki.apache.org/confluence/x/5kiHB">KIP-266</a> adds a new consumer configuration <code>default.api.timeout.ms</code>
to specify the default timeout to use for <code>KafkaConsumer</code> APIs that could block. The KIP also adds overloads for such blocking
APIs to support specifying a specific timeout to use for each of them instead of using the default timeout set by <code>default.api.timeout.ms</code>.
In particular, a new <code>poll(Duration)</code> API has been added which does not block for dynamic partition assignment.
The old <code>poll(long)</code> API has been deprecated and will be removed in a future version. Overloads have also been added
for other <code>KafkaConsumer</code> methods like <code>partitionsFor</code>, <code>listTopics</code>, <code>offsetsForTimes</code>,
<code>beginningOffsets</code>, <code>endOffsets</code> and <code>close</code> that take in a <code>Duration</code>.</li>
<li>The internal method <code>kafka.admin.AdminClient.deleteRecordsBefore</code> has been removed. Users are encouraged to migrate to <code>org.apache.kafka.clients.admin.AdminClient.deleteRecords</code>.</li>
<li>The tool kafka.tools.ReplayLogProducer has been removed.</li>
<li>The AclCommand tool <code>--producer</code> convenience option uses the <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-277+-+Fine+Grained+ACL+for+CreateTopics+API">KIP-277</a> finer grained ACL on the given topic. </li>