diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java index 0e27e1f0383..acb53e11088 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import java.io.Closeable; +import java.time.Duration; import java.util.Collection; import java.util.List; import java.util.Map; @@ -38,156 +39,162 @@ public interface Consumer extends Closeable { /** * @see KafkaConsumer#assignment() */ - public Set assignment(); + Set assignment(); /** * @see KafkaConsumer#subscription() */ - public Set subscription(); + Set subscription(); /** * @see KafkaConsumer#subscribe(Collection) */ - public void subscribe(Collection topics); + void subscribe(Collection topics); /** * @see KafkaConsumer#subscribe(Collection, ConsumerRebalanceListener) */ - public void subscribe(Collection topics, ConsumerRebalanceListener callback); + void subscribe(Collection topics, ConsumerRebalanceListener callback); /** * @see KafkaConsumer#assign(Collection) */ - public void assign(Collection partitions); + void assign(Collection partitions); /** * @see KafkaConsumer#subscribe(Pattern, ConsumerRebalanceListener) */ - public void subscribe(Pattern pattern, ConsumerRebalanceListener callback); + void subscribe(Pattern pattern, ConsumerRebalanceListener callback); /** * @see KafkaConsumer#subscribe(Pattern) */ - public void subscribe(Pattern pattern); + void subscribe(Pattern pattern); /** * @see KafkaConsumer#unsubscribe() */ - public void unsubscribe(); + void unsubscribe(); /** * @see KafkaConsumer#poll(long) */ - public ConsumerRecords poll(long timeout); + @Deprecated + ConsumerRecords poll(long timeout); + + /** + * @see KafkaConsumer#poll(Duration) + */ + ConsumerRecords poll(Duration timeout); /** * @see KafkaConsumer#commitSync() */ - public void commitSync(); + void commitSync(); /** * @see KafkaConsumer#commitSync(Map) */ - public void commitSync(Map offsets); + void commitSync(Map offsets); /** * @see KafkaConsumer#commitAsync() */ - public void commitAsync(); + void commitAsync(); /** * @see KafkaConsumer#commitAsync(OffsetCommitCallback) */ - public void commitAsync(OffsetCommitCallback callback); + void commitAsync(OffsetCommitCallback callback); /** * @see KafkaConsumer#commitAsync(Map, OffsetCommitCallback) */ - public void commitAsync(Map offsets, OffsetCommitCallback callback); + void commitAsync(Map offsets, OffsetCommitCallback callback); /** * @see KafkaConsumer#seek(TopicPartition, long) */ - public void seek(TopicPartition partition, long offset); + void seek(TopicPartition partition, long offset); /** * @see KafkaConsumer#seekToBeginning(Collection) */ - public void seekToBeginning(Collection partitions); + void seekToBeginning(Collection partitions); /** * @see KafkaConsumer#seekToEnd(Collection) */ - public void seekToEnd(Collection partitions); + void seekToEnd(Collection partitions); /** * @see KafkaConsumer#position(TopicPartition) */ - public long position(TopicPartition partition); + long position(TopicPartition partition); /** * @see KafkaConsumer#committed(TopicPartition) */ - public OffsetAndMetadata committed(TopicPartition partition); + OffsetAndMetadata committed(TopicPartition partition); /** * @see KafkaConsumer#metrics() */ - public Map metrics(); + Map metrics(); /** * @see KafkaConsumer#partitionsFor(String) */ - public List partitionsFor(String topic); + List partitionsFor(String topic); /** * @see KafkaConsumer#listTopics() */ - public Map> listTopics(); + Map> listTopics(); /** * @see KafkaConsumer#paused() */ - public Set paused(); + Set paused(); /** * @see KafkaConsumer#pause(Collection) */ - public void pause(Collection partitions); + void pause(Collection partitions); /** * @see KafkaConsumer#resume(Collection) */ - public void resume(Collection partitions); + void resume(Collection partitions); /** * @see KafkaConsumer#offsetsForTimes(java.util.Map) */ - public Map offsetsForTimes(Map timestampsToSearch); + Map offsetsForTimes(Map timestampsToSearch); /** * @see KafkaConsumer#beginningOffsets(java.util.Collection) */ - public Map beginningOffsets(Collection partitions); + Map beginningOffsets(Collection partitions); /** * @see KafkaConsumer#endOffsets(java.util.Collection) */ - public Map endOffsets(Collection partitions); + Map endOffsets(Collection partitions); /** * @see KafkaConsumer#close() */ - public void close(); + void close(); /** * @see KafkaConsumer#close(long, TimeUnit) */ - public void close(long timeout, TimeUnit unit); + void close(long timeout, TimeUnit unit); /** * @see KafkaConsumer#wakeup() */ - public void wakeup(); + void wakeup(); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java index 2f4e310f702..763fe512217 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java @@ -35,14 +35,16 @@ import java.util.Map; * the user configures the interceptor with the wrong key and value type parameters, the consumer will not throw an exception, * just log the errors. *

- * ConsumerInterceptor callbacks are called from the same thread that invokes {@link org.apache.kafka.clients.consumer.KafkaConsumer#poll(long)}. + * ConsumerInterceptor callbacks are called from the same thread that invokes + * {@link org.apache.kafka.clients.consumer.KafkaConsumer#poll(java.time.Duration)}. *

* Implement {@link org.apache.kafka.common.ClusterResourceListener} to receive cluster metadata once it's available. Please see the class documentation for ClusterResourceListener for more information. */ public interface ConsumerInterceptor extends Configurable { /** - * This is called just before the records are returned by {@link org.apache.kafka.clients.consumer.KafkaConsumer#poll(long)} + * This is called just before the records are returned by + * {@link org.apache.kafka.clients.consumer.KafkaConsumer#poll(java.time.Duration)} *

* This method is allowed to modify consumer records, in which case the new records will be * returned. There is no limitation on number of records that could be returned from this diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java index ad2f61d1166..74e8b060c73 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java @@ -44,7 +44,8 @@ import org.apache.kafka.common.TopicPartition; * partition is reassigned it may want to automatically trigger a flush of this cache, before the new owner takes over * consumption. *

- * This callback will only execute in the user thread as part of the {@link Consumer#poll(long) poll(long)} call whenever partition assignment changes. + * This callback will only execute in the user thread as part of the {@link Consumer#poll(java.time.Duration) poll(long)} call + * whenever partition assignment changes. *

* It is guaranteed that all consumer processes will invoke {@link #onPartitionsRevoked(Collection) onPartitionsRevoked} prior to * any process invoking {@link #onPartitionsAssigned(Collection) onPartitionsAssigned}. So if offsets or other state is saved in the @@ -91,7 +92,7 @@ public interface ConsumerRebalanceListener { * It is common for the revocation callback to use the consumer instance in order to commit offsets. It is possible * for a {@link org.apache.kafka.common.errors.WakeupException} or {@link org.apache.kafka.common.errors.InterruptException} * to be raised from one these nested invocations. In this case, the exception will be propagated to the current - * invocation of {@link KafkaConsumer#poll(long)} in which this callback is being executed. This means it is not + * invocation of {@link KafkaConsumer#poll(java.time.Duration)} in which this callback is being executed. This means it is not * necessary to catch these exceptions and re-attempt to wakeup or interrupt the consumer thread. * * @param partitions The list of partitions that were assigned to the consumer on the last rebalance @@ -103,7 +104,7 @@ public interface ConsumerRebalanceListener { /** * A callback method the user can implement to provide handling of customized offsets on completion of a successful * partition re-assignment. This method will be called after the partition re-assignment completes and before the - * consumer starts fetching data, and only as the result of a {@link Consumer#poll(long) poll(long)} call. + * consumer starts fetching data, and only as the result of a {@link Consumer#poll(java.time.Duration) poll(long)} call. *

* It is guaranteed that all the processes in a consumer group will execute their * {@link #onPartitionsRevoked(Collection)} callback before any instance executes its @@ -112,7 +113,7 @@ public interface ConsumerRebalanceListener { * It is common for the assignment callback to use the consumer instance in order to query offsets. It is possible * for a {@link org.apache.kafka.common.errors.WakeupException} or {@link org.apache.kafka.common.errors.InterruptException} * to be raised from one these nested invocations. In this case, the exception will be propagated to the current - * invocation of {@link KafkaConsumer#poll(long)} in which this callback is being executed. This means it is not + * invocation of {@link KafkaConsumer#poll(java.time.Duration)} in which this callback is being executed. This means it is not * necessary to catch these exceptions and re-attempt to wakeup or interrupt the consumer thread. * * @param partitions The list of partitions that are now assigned to the consumer (may include partitions previously diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java index f2dc9bbc2ba..4d0f62c3a1d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java @@ -29,7 +29,7 @@ import java.util.Set; /** * A container that holds the list {@link ConsumerRecord} per partition for a * particular topic. There is one {@link ConsumerRecord} list for every topic - * partition returned by a {@link Consumer#poll(long)} operation. + * partition returned by a {@link Consumer#poll(java.time.Duration)} operation. */ public class ConsumerRecords implements Iterable> { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 0b18927c300..e5bc5c1d33c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -24,7 +24,6 @@ import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator; import org.apache.kafka.clients.consumer.internals.ConsumerInterceptors; import org.apache.kafka.clients.consumer.internals.ConsumerMetrics; import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient; -import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.PollCondition; import org.apache.kafka.clients.consumer.internals.Fetcher; import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener; import org.apache.kafka.clients.consumer.internals.PartitionAssignor; @@ -55,6 +54,7 @@ import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; import java.net.InetSocketAddress; +import java.time.Duration; import java.util.Collection; import java.util.Collections; import java.util.ConcurrentModificationException; @@ -98,7 +98,7 @@ import java.util.regex.Pattern; *

* The {@link #position(TopicPartition) position} of the consumer gives the offset of the next record that will be given * out. It will be one larger than the highest offset the consumer has seen in that partition. It automatically advances - * every time the consumer receives messages in a call to {@link #poll(long)}. + * every time the consumer receives messages in a call to {@link #poll(Duration)}. *

* The {@link #commitSync() committed position} is the last offset that has been stored securely. Should the * process fail and restart, this is the offset that the consumer will recover to. The consumer can either automatically commit @@ -149,7 +149,7 @@ import java.util.regex.Pattern; * *

Detecting Consumer Failures

* - * After subscribing to a set of topics, the consumer will automatically join the group when {@link #poll(long)} is + * After subscribing to a set of topics, the consumer will automatically join the group when {@link #poll(Duration)} is * invoked. The poll API is designed to ensure consumer liveness. As long as you continue to call poll, the consumer * will stay in the group and continue to receive messages from the partitions it was assigned. Underneath the covers, * the consumer sends periodic heartbeats to the server. If the consumer crashes or is unable to send heartbeats for @@ -168,10 +168,10 @@ import java.util.regex.Pattern; * The consumer provides two configuration settings to control the behavior of the poll loop: *
    *
  1. max.poll.interval.ms: By increasing the interval between expected polls, you can give - * the consumer more time to handle a batch of records returned from {@link #poll(long)}. The drawback + * the consumer more time to handle a batch of records returned from {@link #poll(Duration)}. The drawback * is that increasing this value may delay a group rebalance since the consumer will only join the rebalance * inside the call to poll. You can use this setting to bound the time to finish a rebalance, but - * you risk slower progress if the consumer cannot actually call {@link #poll(long) poll} often enough.
  2. + * you risk slower progress if the consumer cannot actually call {@link #poll(Duration) poll} often enough. *
  3. max.poll.records: Use this setting to limit the total records returned from a single * call to poll. This can make it easier to predict the maximum that must be handled within each poll * interval. By tuning this value, you may be able to reduce the poll interval, which will reduce the @@ -180,12 +180,12 @@ import java.util.regex.Pattern; *

    * For use cases where message processing time varies unpredictably, neither of these options may be sufficient. * The recommended way to handle these cases is to move message processing to another thread, which allows - * the consumer to continue calling {@link #poll(long) poll} while the processor is still working. Some care must be taken - * to ensure that committed offsets do not get ahead of the actual position. Typically, you must disable automatic - * commits and manually commit processed offsets for records only after the thread has finished handling them - * (depending on the delivery semantics you need). Note also that you will need to {@link #pause(Collection) pause} - * the partition so that no new records are received from poll until after thread has finished handling those - * previously returned. + * the consumer to continue calling {@link #poll(Duration) poll} while the processor is still working. + * Some care must be taken to ensure that committed offsets do not get ahead of the actual position. + * Typically, you must disable automatic commits and manually commit processed offsets for records only after the + * thread has finished handling them (depending on the delivery semantics you need). + * Note also that you will need to {@link #pause(Collection) pause} the partition so that no new records are received + * from poll until after thread has finished handling those previously returned. * *

    Usage Examples

    * The consumer APIs offer flexibility to cover a variety of consumption use cases. Here are some examples to @@ -258,7 +258,8 @@ import java.util.regex.Pattern; * * In this example we will consume a batch of records and batch them up in memory. When we have enough records * batched, we will insert them into a database. If we allowed offsets to auto commit as in the previous example, records - * would be considered consumed after they were returned to the user in {@link #poll(long) poll}. It would then be possible + * would be considered consumed after they were returned to the user in {@link #poll(Duration) poll}. It would then be + * possible * for our process to fail after batching the records, but before they had been inserted into the database. *

    * To avoid this, we will manually commit the offsets only after the corresponding records have been inserted into the @@ -270,7 +271,7 @@ import java.util.regex.Pattern; * time but in failure cases could be duplicated. *

    * Note: Using automatic offset commits can also give you "at-least-once" delivery, but the requirement is that - * you must consume all data returned from each call to {@link #poll(long)} before any subsequent calls, or before + * you must consume all data returned from each call to {@link #poll(Duration)} before any subsequent calls, or before * {@link #close() closing} the consumer. If you fail to do either of these, it is possible for the committed offset * to get ahead of the consumed position, which results in missing records. The advantage of using manual offset * control is that you have direct control over when a record is considered "consumed." @@ -325,7 +326,7 @@ import java.util.regex.Pattern; * consumer.assign(Arrays.asList(partition0, partition1)); * * - * Once assigned, you can call {@link #poll(long) poll} in a loop, just as in the preceding examples to consume + * Once assigned, you can call {@link #poll(Duration) poll} in a loop, just as in the preceding examples to consume * records. The group that the consumer specifies is still used for committing offsets, but now the set of partitions * will only change with another call to {@link #assign(Collection) assign}. Manual partition assignment does * not use group coordination, so consumer failures will not cause assigned partitions to be rebalanced. Each consumer @@ -417,7 +418,7 @@ import java.util.regex.Pattern; *

    * Kafka supports dynamic controlling of consumption flows by using {@link #pause(Collection)} and {@link #resume(Collection)} * to pause the consumption on the specified assigned partitions and resume the consumption - * on the specified paused partitions respectively in the future {@link #poll(long)} calls. + * on the specified paused partitions respectively in the future {@link #poll(Duration)} calls. * *

    Reading Transactional Messages

    * @@ -468,7 +469,7 @@ import java.util.regex.Pattern; * try { * consumer.subscribe(Arrays.asList("topic")); * while (!closed.get()) { - * ConsumerRecords records = consumer.poll(10000); + * ConsumerRecords records = consumer.poll(Duration.ofMillis(10000)); * // Handle new records * } * } catch (WakeupException e) { @@ -574,6 +575,9 @@ public class KafkaConsumer implements Consumer { // refcount is used to allow reentrant access by the thread who has acquired currentThread private final AtomicInteger refcount = new AtomicInteger(0); + // to keep from repeatedly scanning subscriptions in poll(), cache the result during metadata updates + private boolean cachedSubscriptionHashAllFetchPositions; + /** * A consumer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings * are documented here. Values can be @@ -878,7 +882,7 @@ public class KafkaConsumer implements Consumer { *

    * When any of these events are triggered, the provided listener will be invoked first to indicate that * the consumer's assignment has been revoked, and then again when the new assignment has been received. - * Note that rebalances will only occur during an active call to {@link #poll(long)}, so callbacks will + * Note that rebalances will only occur during an active call to {@link #poll(Duration)}, so callbacks will * also only be invoked during that time. * * The provided listener will immediately override any listener set in a previous call to subscribe. @@ -954,7 +958,7 @@ public class KafkaConsumer implements Consumer { * See {@link #subscribe(Collection, ConsumerRebalanceListener)} for details on the * use of the {@link ConsumerRebalanceListener}. Generally rebalances are triggered when there * is a change to the topics matching the provided pattern and when consumer group membership changes. - * Group rebalances only take place during an active call to {@link #poll(long)}. + * Group rebalances only take place during an active call to {@link #poll(Duration)}. * * @param pattern Pattern to subscribe to * @param listener Non-null listener instance to get notifications on partition assignment/revocation for the @@ -1096,22 +1100,82 @@ public class KafkaConsumer implements Consumer { * @throws java.lang.IllegalArgumentException if the timeout value is negative * @throws java.lang.IllegalStateException if the consumer is not subscribed to any topics or manually assigned any * partitions to consume from + * + * + * @deprecated Since 2.0. Use {@link #poll(Duration)} to poll for records. + */ + @Deprecated + @Override + public ConsumerRecords poll(final long timeout) { + return poll(timeout, false); + } + + /** + * Fetch data for the topics or partitions specified using one of the subscribe/assign APIs. It is an error to not have + * subscribed to any topics or partitions before polling for data. + *

    + * On each poll, consumer will try to use the last consumed offset as the starting offset and fetch sequentially. The last + * consumed offset can be manually set through {@link #seek(TopicPartition, long)} or automatically set as the last committed + * offset for the subscribed list of partitions + * + * + * @param timeout The maximum time to block (must not be greater than {@link Long#MAX_VALUE} milliseconds) + * + * @return map of topic to records since the last fetch for the subscribed list of topics and partitions + * + * @throws org.apache.kafka.clients.consumer.InvalidOffsetException if the offset for a partition or set of + * partitions is undefined or out of range and no offset reset policy has been configured + * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this + * function is called + * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while + * this function is called + * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details + * @throws org.apache.kafka.common.errors.AuthorizationException if caller lacks Read access to any of the subscribed + * topics or to the configured groupId. See the exception for more details + * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. invalid groupId or + * session timeout, errors deserializing key/value pairs, or any new error cases in future versions) + * @throws java.lang.IllegalArgumentException if the timeout value is negative + * @throws java.lang.IllegalStateException if the consumer is not subscribed to any topics or manually assigned any + * partitions to consume from + * @throws java.lang.ArithmeticException if the timeout is greater than {@link Long#MAX_VALUE} milliseconds. */ @Override - public ConsumerRecords poll(long timeout) { + public ConsumerRecords poll(final Duration timeout) { + return poll(timeout.toMillis(), true); + } + + private ConsumerRecords poll(final long timeoutMs, final boolean includeMetadataInTimeout) { acquireAndEnsureOpen(); try { - if (timeout < 0) - throw new IllegalArgumentException("Timeout must not be negative"); + if (timeoutMs < 0) throw new IllegalArgumentException("Timeout must not be negative"); - if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) + if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) { throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions"); + } // poll for new data until the timeout expires - long start = time.milliseconds(); - long remaining = timeout; + long elapsedTime = 0L; do { - Map>> records = pollOnce(remaining); + + client.maybeTriggerWakeup(); + + final long metadataEnd; + if (includeMetadataInTimeout) { + final long metadataStart = time.milliseconds(); + if (!updateAssignmentMetadataIfNeeded(remainingTimeAtLeastZero(timeoutMs, elapsedTime))) { + return ConsumerRecords.empty(); + } + metadataEnd = time.milliseconds(); + elapsedTime += metadataEnd - metadataStart; + } else { + while (!updateAssignmentMetadataIfNeeded(Long.MAX_VALUE)) { + log.warn("Still waiting for metadata"); + } + metadataEnd = time.milliseconds(); + } + + final Map>> records = pollForFetches(remainingTimeAtLeastZero(timeoutMs, elapsedTime)); + if (!records.isEmpty()) { // before returning the fetched records, we can send off the next round of fetches // and avoid block waiting for their responses to enable pipelining while the user @@ -1119,15 +1183,16 @@ public class KafkaConsumer implements Consumer { // // NOTE: since the consumed position has already been updated, we must not allow // wakeups or any other errors to be triggered prior to returning the fetched records. - if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) + if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) { client.pollNoWakeup(); + } return this.interceptors.onConsume(new ConsumerRecords<>(records)); } + final long fetchEnd = time.milliseconds(); + elapsedTime += fetchEnd - metadataEnd; - long elapsed = time.milliseconds() - start; - remaining = timeout - elapsed; - } while (remaining > 0); + } while (elapsedTime < timeoutMs); return ConsumerRecords.empty(); } finally { @@ -1136,56 +1201,61 @@ public class KafkaConsumer implements Consumer { } /** - * Do one round of polling. In addition to checking for new data, this does any needed offset commits - * (if auto-commit is enabled), and offset resets (if an offset reset policy is defined). - * @param timeout The maximum time to block in the underlying call to {@link ConsumerNetworkClient#poll(long)}. - * @return The fetched records (may be empty) + * Visible for testing */ - private Map>> pollOnce(long timeout) { - client.maybeTriggerWakeup(); + boolean updateAssignmentMetadataIfNeeded(final long timeoutMs) { + final long startMs = time.milliseconds(); + if (!coordinator.poll(timeoutMs)) { + return false; + } - long startMs = time.milliseconds(); - coordinator.poll(startMs, timeout); + return updateFetchPositions(remainingTimeAtLeastZero(timeoutMs, time.milliseconds() - startMs)); + } - // Lookup positions of assigned partitions - boolean hasAllFetchPositions = updateFetchPositions(); + private Map>> pollForFetches(final long timeoutMs) { + final long startMs = time.milliseconds(); + long pollTimeout = Math.min(coordinator.timeToNextPoll(startMs), timeoutMs); // if data is available already, return it immediately - Map>> records = fetcher.fetchedRecords(); - if (!records.isEmpty()) + final Map>> records = fetcher.fetchedRecords(); + if (!records.isEmpty()) { return records; + } // send any new fetches (won't resend pending fetches) fetcher.sendFetches(); - long nowMs = time.milliseconds(); - long remainingTimeMs = Math.max(0, timeout - (nowMs - startMs)); - long pollTimeout = Math.min(coordinator.timeToNextPoll(nowMs), remainingTimeMs); - // We do not want to be stuck blocking in poll if we are missing some positions // since the offset lookup may be backing off after a failure - if (!hasAllFetchPositions && pollTimeout > retryBackoffMs) - pollTimeout = retryBackoffMs; - client.poll(pollTimeout, nowMs, new PollCondition() { - @Override - public boolean shouldBlock() { - // since a fetch might be completed by the background thread, we need this poll condition - // to ensure that we do not block unnecessarily in poll() - return !fetcher.hasCompletedFetches(); - } + // NOTE: the use of cachedSubscriptionHashAllFetchPositions means we MUST call + // updateAssignmentMetadataIfNeeded before this method. + if (!cachedSubscriptionHashAllFetchPositions && pollTimeout > retryBackoffMs) { + pollTimeout = retryBackoffMs; + } + + client.poll(pollTimeout, startMs, () -> { + // since a fetch might be completed by the background thread, we need this poll condition + // to ensure that we do not block unnecessarily in poll() + return !fetcher.hasCompletedFetches(); }); // after the long poll, we should check whether the group needs to rebalance // prior to returning data so that the group can stabilize faster - if (coordinator.needRejoin()) + if (coordinator.rejoinNeededOrPending()) { return Collections.emptyMap(); + } return fetcher.fetchedRecords(); } + private long remainingTimeAtLeastZero(final long timeoutMs, final long elapsedTime) { + return Math.max(0, timeoutMs - elapsedTime); + } + /** - * Commit offsets returned on the last {@link #poll(long) poll()} for all the subscribed list of topics and partitions. + * Commit offsets returned on the last {@link #poll(Duration) poll()} for all the subscribed list of topics and + * partitions. *

    * This commits offsets only to Kafka. The offsets committed using this API will be used on the first fetch after * every rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API @@ -1260,7 +1330,7 @@ public class KafkaConsumer implements Consumer { } /** - * Commit offsets returned on the last {@link #poll(long) poll()} for all the subscribed list of topics and partition. + * Commit offsets returned on the last {@link #poll(Duration) poll()} for all the subscribed list of topics and partition. * Same as {@link #commitAsync(OffsetCommitCallback) commitAsync(null)} */ @Override @@ -1269,7 +1339,7 @@ public class KafkaConsumer implements Consumer { } /** - * Commit offsets returned on the last {@link #poll(long) poll()} for the subscribed list of topics and partitions. + * Commit offsets returned on the last {@link #poll(Duration) poll()} for the subscribed list of topics and partitions. *

    * This commits offsets only to Kafka. The offsets committed using this API will be used on the first fetch after * every rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API @@ -1327,7 +1397,7 @@ public class KafkaConsumer implements Consumer { } /** - * Overrides the fetch offsets that the consumer will use on the next {@link #poll(long) poll(timeout)}. If this API + * Overrides the fetch offsets that the consumer will use on the next {@link #poll(Duration) poll(timeout)}. If this API * is invoked for the same partition more than once, the latest offset will be used on the next poll(). Note that * you may lose data if this API is arbitrarily used in the middle of consumption, to reset the fetch offsets * @@ -1350,7 +1420,7 @@ public class KafkaConsumer implements Consumer { /** * Seek to the first offset for each of the given partitions. This function evaluates lazily, seeking to the - * first offset in all partitions only when {@link #poll(long)} or {@link #position(TopicPartition)} are called. + * first offset in all partitions only when {@link #poll(Duration)} or {@link #position(TopicPartition)} are called. * If no partitions are provided, seek to the first offset for all of the currently assigned partitions. * * @throws IllegalArgumentException if {@code partitions} is {@code null} or the provided TopicPartition is not assigned to this consumer @@ -1373,7 +1443,7 @@ public class KafkaConsumer implements Consumer { /** * Seek to the last offset for each of the given partitions. This function evaluates lazily, seeking to the - * final offset in all partitions only when {@link #poll(long)} or {@link #position(TopicPartition)} are called. + * final offset in all partitions only when {@link #poll(Duration)} or {@link #position(TopicPartition)} are called. * If no partitions are provided, seek to the final offset for all of the currently assigned partitions. *

    * If {@code isolation.level=read_committed}, the end offset will be the Last Stable Offset, i.e., the offset @@ -1426,7 +1496,9 @@ public class KafkaConsumer implements Consumer { Long offset = this.subscriptions.position(partition); while (offset == null) { // batch update fetch positions for any partitions without a valid position - updateFetchPositions(); + while (!updateFetchPositions(Long.MAX_VALUE)) { + log.warn("Still updating fetch positions"); + } client.poll(retryBackoffMs); offset = this.subscriptions.position(partition); } @@ -1457,7 +1529,13 @@ public class KafkaConsumer implements Consumer { public OffsetAndMetadata committed(TopicPartition partition) { acquireAndEnsureOpen(); try { - Map offsets = coordinator.fetchCommittedOffsets(Collections.singleton(partition)); + Map offsets = null; + while (offsets == null) { + offsets = coordinator.fetchCommittedOffsets( + Collections.singleton(partition), + Long.MAX_VALUE + ); + } return offsets.get(partition); } finally { release(); @@ -1477,6 +1555,7 @@ public class KafkaConsumer implements Consumer { * does not already have any metadata about the given topic. * * @param topic The topic to get partition metadata for + * * @return The list of partitions * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this * function is called @@ -1510,6 +1589,7 @@ public class KafkaConsumer implements Consumer { * remote call to the server. * @return The map of topics and its partitions + * * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this * function is called * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while @@ -1529,7 +1609,7 @@ public class KafkaConsumer implements Consumer { } /** - * Suspend fetching from the requested partitions. Future calls to {@link #poll(long)} will not return + * Suspend fetching from the requested partitions. Future calls to {@link #poll(Duration)} will not return * any records from these partitions until they have been resumed using {@link #resume(Collection)}. * Note that this method does not affect partition subscription. In particular, it does not cause a group * rebalance when automatic assignment is used. @@ -1551,7 +1631,7 @@ public class KafkaConsumer implements Consumer { /** * Resume specified partitions which have been paused with {@link #pause(Collection)}. New calls to - * {@link #poll(long)} will return records from these partitions if there are any to be fetched. + * {@link #poll(Duration)} will return records from these partitions if there are any to be fetched. * If the partitions were not previously paused, this method is a no-op. * @param partitions The partitions which should be resumed * @throws IllegalStateException if one of the provided partitions is not assigned to this consumer @@ -1593,6 +1673,7 @@ public class KafkaConsumer implements Consumer { * will be returned for that partition. * * @param timestampsToSearch the mapping from partition to the timestamp to look up. + * * @return a mapping from partition to the timestamp and offset of the first message with timestamp greater * than or equal to the target timestamp. {@code null} will be returned for the partition if there is no * such message. @@ -1772,18 +1853,18 @@ public class KafkaConsumer implements Consumer { * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details * @throws NoOffsetForPartitionException If no offset is stored for a given partition and no offset reset policy is * defined - * @return true if all assigned positions have a position, false otherwise + * @return true iff the operation completed without timing out */ - private boolean updateFetchPositions() { - if (subscriptions.hasAllFetchPositions()) - return true; + private boolean updateFetchPositions(final long timeoutMs) { + cachedSubscriptionHashAllFetchPositions = subscriptions.hasAllFetchPositions(); + if (cachedSubscriptionHashAllFetchPositions) return true; // If there are any partitions which do not have a valid position and are not // awaiting reset, then we need to fetch committed offsets. We will only do a // coordinator lookup if there are partitions which have missing positions, so // a consumer with manually assigned partitions can avoid a coordinator dependence // by always ensuring that assigned partitions have an initial position. - coordinator.refreshCommittedOffsetsIfNeeded(); + if (!coordinator.refreshCommittedOffsetsIfNeeded(timeoutMs)) return false; // If there are partitions still needing a position and a reset policy is defined, // request reset using the default policy. If no reset strategy is defined and there @@ -1794,7 +1875,7 @@ public class KafkaConsumer implements Consumer { // partitions which are awaiting reset. fetcher.resetOffsetsIfNeeded(); - return false; + return true; } /** @@ -1835,4 +1916,5 @@ public class KafkaConsumer implements Consumer { throw new IllegalStateException("Must configure at least one partition assigner class name to " + ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG + " configuration property"); } + } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java index ceb7024b97b..479a9ffaaf0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java @@ -25,6 +25,7 @@ import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -43,7 +44,7 @@ import java.util.regex.Pattern; /** * A mock of the {@link Consumer} interface you can use for testing code that uses Kafka. This class is not * threadsafe . However, you can use the {@link #schedulePollTask(Runnable)} method to write multithreaded tests - * where a driver thread waits for {@link #poll(long)} to be called by a background thread and then can safely perform + * where a driver thread waits for {@link #poll(Duration)} to be called by a background thread and then can safely perform * operations during a callback. */ public class MockConsumer implements Consumer { @@ -146,8 +147,14 @@ public class MockConsumer implements Consumer { subscriptions.unsubscribe(); } + @Deprecated @Override public synchronized ConsumerRecords poll(long timeout) { + return poll(Duration.ZERO); + } + + @Override + public synchronized ConsumerRecords poll(final Duration timeout) { ensureNotClosed(); // Synchronize around the entire execution so new tasks to be triggered on subsequent poll calls can be added in @@ -401,7 +408,7 @@ public class MockConsumer implements Consumer { } /** - * Schedule a task to be executed during a poll(). One enqueued task will be executed per {@link #poll(long)} + * Schedule a task to be executed during a poll(). One enqueued task will be executed per {@link #poll(Duration)} * invocation. You can use this repeatedly to mock out multiple responses to poll invocations. * @param task the task to be executed */ diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetCommitCallback.java b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetCommitCallback.java index b217a632574..383c1c82bce 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetCommitCallback.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetCommitCallback.java @@ -23,7 +23,7 @@ import java.util.Map; /** * A callback interface that the user can implement to trigger custom actions when a commit request completes. The callback - * may be executed in any thread calling {@link Consumer#poll(long) poll()}. + * may be executed in any thread calling {@link Consumer#poll(java.time.Duration) poll()}. */ public interface OffsetCommitCallback { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index dd4bb7038f3..adbaae776ab 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -58,6 +58,7 @@ import java.nio.ByteBuffer; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -198,46 +199,44 @@ public abstract class AbstractCoordinator implements Closeable { ByteBuffer memberAssignment); /** - * Block until the coordinator for this group is known and is ready to receive requests. - */ - public synchronized void ensureCoordinatorReady() { - // Using zero as current time since timeout is effectively infinite - ensureCoordinatorReady(0, Long.MAX_VALUE); - } - - /** + * Visible for testing. + * * Ensure that the coordinator is ready to receive requests. - * @param startTimeMs Current time in milliseconds + * * @param timeoutMs Maximum time to wait to discover the coordinator * @return true If coordinator discovery and initial connection succeeded, false otherwise */ - protected synchronized boolean ensureCoordinatorReady(long startTimeMs, long timeoutMs) { - long remainingMs = timeoutMs; + protected synchronized boolean ensureCoordinatorReady(final long timeoutMs) { + final long startTimeMs = time.milliseconds(); + long elapsedTime = 0L; while (coordinatorUnknown()) { - RequestFuture future = lookupCoordinator(); - client.poll(future, remainingMs); + final RequestFuture future = lookupCoordinator(); + client.poll(future, remainingTimeAtLeastZero(timeoutMs, elapsedTime)); + if (!future.isDone()) { + // ran out of time + break; + } if (future.failed()) { if (future.isRetriable()) { - remainingMs = timeoutMs - (time.milliseconds() - startTimeMs); - if (remainingMs <= 0) - break; + elapsedTime = time.milliseconds() - startTimeMs; + + if (elapsedTime >= timeoutMs) break; log.debug("Coordinator discovery failed, refreshing metadata"); - client.awaitMetadataUpdate(remainingMs); + client.awaitMetadataUpdate(remainingTimeAtLeastZero(timeoutMs, elapsedTime)); + elapsedTime = time.milliseconds() - startTimeMs; } else throw future.exception(); } else if (coordinator != null && client.isUnavailable(coordinator)) { // we found the coordinator, but the connection has failed, so mark // it dead and backoff before retrying discovery markCoordinatorUnknown(); - time.sleep(retryBackoffMs); + final long sleepTime = Math.min(retryBackoffMs, remainingTimeAtLeastZero(timeoutMs, elapsedTime)); + time.sleep(sleepTime); + elapsedTime += sleepTime; } - - remainingMs = timeoutMs - (time.milliseconds() - startTimeMs); - if (remainingMs <= 0) - break; } return !coordinatorUnknown(); @@ -261,15 +260,14 @@ public abstract class AbstractCoordinator implements Closeable { } /** - * Check whether the group should be rejoined (e.g. if metadata changes) + * Check whether the group should be rejoined (e.g. if metadata changes) or whether a + * rejoin request is already in flight and needs to be completed. + * * @return true if it should, false otherwise */ - protected synchronized boolean needRejoin() { - return rejoinNeeded; - } - - private synchronized boolean rejoinIncomplete() { - return joinFuture != null; + protected synchronized boolean rejoinNeededOrPending() { + // if there's a pending joinFuture, we should try to complete handling it. + return rejoinNeeded || joinFuture != null; } /** @@ -309,11 +307,28 @@ public abstract class AbstractCoordinator implements Closeable { * Ensure that the group is active (i.e. joined and synced) */ public void ensureActiveGroup() { + while (!ensureActiveGroup(Long.MAX_VALUE)) { + log.warn("still waiting to ensure active group"); + } + } + + /** + * Ensure the group is active (i.e., joined and synced) + * + * @param timeoutMs A time budget for ensuring the group is active + * @return true iff the group is active + */ + boolean ensureActiveGroup(final long timeoutMs) { + final long startTime = time.milliseconds(); // always ensure that the coordinator is ready because we may have been disconnected // when sending heartbeats and does not necessarily require us to rejoin the group. - ensureCoordinatorReady(); + if (!ensureCoordinatorReady(timeoutMs)) { + return false; + } + startHeartbeatThreadIfNeeded(); - joinGroupIfNeeded(); + + return joinGroupIfNeeded(remainingTimeAtLeastZero(timeoutMs, time.milliseconds() - startTime)); } private synchronized void startHeartbeatThreadIfNeeded() { @@ -345,10 +360,23 @@ public abstract class AbstractCoordinator implements Closeable { } } - // visible for testing. Joins the group without starting the heartbeat thread. - void joinGroupIfNeeded() { - while (needRejoin() || rejoinIncomplete()) { - ensureCoordinatorReady(); + /** + * Joins the group without starting the heartbeat thread. + * + * Visible for testing. + * + * @param timeoutMs Time to complete this action + * @return true iff the operation succeeded + */ + boolean joinGroupIfNeeded(final long timeoutMs) { + final long startTime = time.milliseconds(); + long elapsedTime = 0L; + + while (rejoinNeededOrPending()) { + if (!ensureCoordinatorReady(remainingTimeAtLeastZero(timeoutMs, elapsedTime))) { + return false; + } + elapsedTime = time.milliseconds() - startTime; // call onJoinPrepare if needed. We set a flag to make sure that we do not call it a second // time if the client is woken up before a pending rebalance completes. This must be called @@ -360,8 +388,12 @@ public abstract class AbstractCoordinator implements Closeable { needsJoinPrepare = false; } - RequestFuture future = initiateJoinGroup(); - client.poll(future); + final RequestFuture future = initiateJoinGroup(); + client.poll(future, remainingTimeAtLeastZero(timeoutMs, elapsedTime)); + if (!future.isDone()) { + // we ran out of time + return false; + } if (future.succeeded()) { onJoinComplete(generation.generationId, generation.memberId, generation.protocol, future.value()); @@ -372,7 +404,7 @@ public abstract class AbstractCoordinator implements Closeable { needsJoinPrepare = true; } else { resetJoinGroupFuture(); - RuntimeException exception = future.exception(); + final RuntimeException exception = future.exception(); if (exception instanceof UnknownMemberIdException || exception instanceof RebalanceInProgressException || exception instanceof IllegalGenerationException) @@ -381,7 +413,16 @@ public abstract class AbstractCoordinator implements Closeable { throw exception; time.sleep(retryBackoffMs); } + + if (rejoinNeededOrPending()) { + elapsedTime = time.milliseconds() - startTime; + } } + return true; + } + + private long remainingTimeAtLeastZero(final long timeout, final long elapsedTime) { + return Math.max(0, timeout - elapsedTime); } private synchronized void resetJoinGroupFuture() { @@ -1035,6 +1076,21 @@ public abstract class AbstractCoordinator implements Closeable { this.memberId = memberId; this.protocol = protocol; } + + @Override + public boolean equals(final Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + final Generation that = (Generation) o; + return generationId == that.generationId && + Objects.equals(memberId, that.memberId) && + Objects.equals(protocol, that.protocol); + } + + @Override + public int hashCode() { + return Objects.hash(generationId, memberId, protocol); + } } private static class UnjoinedGroupException extends RetriableException { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index eec070ec663..e8c5bc6e598 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -31,6 +31,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.GroupAuthorizationException; import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.metrics.Measurable; @@ -86,6 +87,32 @@ public final class ConsumerCoordinator extends AbstractCoordinator { private MetadataSnapshot assignmentSnapshot; private long nextAutoCommitDeadline; + // hold onto request&future for commited offset requests to enable async calls. + private PendingCommittedOffsetRequest pendingCommittedOffsetRequest = null; + + private static class PendingCommittedOffsetRequest { + private final Set request; + private final Generation generation; + private final RequestFuture> response; + + private PendingCommittedOffsetRequest(final Set request, + final Generation generationAtRequestTime, + final RequestFuture> response + ) { + if (request == null) throw new NullPointerException(); + if (response == null) throw new NullPointerException(); + + this.request = request; + this.generation = generationAtRequestTime; + this.response = response; + } + + private boolean sameRequest(final Set currentRequest, final Generation currentGeneration) { + return (generation == null ? currentGeneration == null : generation.equals(currentGeneration)) + && request.equals(currentRequest); + } + } + /** * Initialize the coordination manager. */ @@ -243,7 +270,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { // update the metadata and enforce a refresh to make sure the fetcher can start // fetching data in the next iteration this.metadata.setTopics(subscriptions.groupSubscription()); - client.ensureFreshMetadata(); + if (!client.ensureFreshMetadata(Long.MAX_VALUE)) throw new TimeoutException(); // give the assignor a chance to update internal state based on the received assignment assignor.onAssignment(assignment); @@ -268,30 +295,43 @@ public final class ConsumerCoordinator extends AbstractCoordinator { * Poll for coordinator events. This ensures that the coordinator is known and that the consumer * has joined the group (if it is using group management). This also handles periodic offset commits * if they are enabled. + *

    + * Returns early if the timeout expires * - * @param now current time in milliseconds + * @param timeoutMs The amount of time, in ms, allotted for this operation. + * @return true iff the operation succeeded */ - public void poll(long now, long remainingMs) { + public boolean poll(final long timeoutMs) { + final long startTime = time.milliseconds(); + long elapsed = 0L; + invokeCompletedOffsetCommitCallbacks(); if (subscriptions.partitionsAutoAssigned()) { if (coordinatorUnknown()) { - ensureCoordinatorReady(); - now = time.milliseconds(); + if (!ensureCoordinatorReady(remainingTimeAtLeastZero(timeoutMs, elapsed))) { + return false; + } + elapsed = time.milliseconds() - startTime; } - if (needRejoin()) { + if (rejoinNeededOrPending()) { // due to a race condition between the initial metadata fetch and the initial rebalance, // we need to ensure that the metadata is fresh before joining initially. This ensures // that we have matched the pattern against the cluster's topics at least once before joining. - if (subscriptions.hasPatternSubscription()) - client.ensureFreshMetadata(); + if (subscriptions.hasPatternSubscription()) { + if (!client.ensureFreshMetadata(remainingTimeAtLeastZero(timeoutMs, elapsed))) { + return false; + } + elapsed = time.milliseconds() - startTime; + } - ensureActiveGroup(); - now = time.milliseconds(); + if (!ensureActiveGroup(remainingTimeAtLeastZero(timeoutMs, elapsed))) { + return false; + } } - pollHeartbeat(now); + pollHeartbeat(startTime); } else { // For manually assigned partitions, if there are no ready nodes, await metadata. // If connections to all nodes fail, wakeups triggered while attempting to send fetch @@ -301,18 +341,23 @@ public final class ConsumerCoordinator extends AbstractCoordinator { // When group management is used, metadata wait is already performed for this scenario as // coordinator is unknown, hence this check is not required. if (metadata.updateRequested() && !client.hasReadyNodes()) { - boolean metadataUpdated = client.awaitMetadataUpdate(remainingMs); - if (!metadataUpdated && !client.hasReadyNodes()) - return; - now = time.milliseconds(); + final boolean metadataUpdated = client.awaitMetadataUpdate(remainingTimeAtLeastZero(timeoutMs, elapsed)); + if (!metadataUpdated && !client.hasReadyNodes()) { + return false; + } } } - maybeAutoCommitOffsetsAsync(now); + maybeAutoCommitOffsetsAsync(startTime); + return true; + } + + private long remainingTimeAtLeastZero(final long timeoutMs, final long elapsed) { + return Math.max(0, timeoutMs - elapsed); } /** - * Return the time to the next needed invocation of {@link #poll(long, long)}. + * Return the time to the next needed invocation of {@link #poll(long)}. * @param now current time in milliseconds * @return the maximum time in milliseconds the caller should wait before the next invocation of poll() */ @@ -349,7 +394,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { // update metadata (if needed) and keep track of the metadata used for assignment so that // we can check after rebalance completion whether anything has changed - client.ensureFreshMetadata(); + if (!client.ensureFreshMetadata(Long.MAX_VALUE)) throw new TimeoutException(); isLeader = true; @@ -385,7 +430,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { allSubscribedTopics.addAll(assignedTopics); this.subscriptions.groupSubscribe(allSubscribedTopics); metadata.setTopics(this.subscriptions.groupSubscription()); - client.ensureFreshMetadata(); + if (!client.ensureFreshMetadata(Long.MAX_VALUE)) throw new TimeoutException(); } assignmentSnapshot = metadataSnapshot; @@ -423,7 +468,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { } @Override - public boolean needRejoin() { + public boolean rejoinNeededOrPending() { if (!subscriptions.partitionsAutoAssigned()) return false; @@ -435,60 +480,94 @@ public final class ConsumerCoordinator extends AbstractCoordinator { if (joinedSubscription != null && !joinedSubscription.equals(subscriptions.subscription())) return true; - return super.needRejoin(); + return super.rejoinNeededOrPending(); } /** * Refresh the committed offsets for provided partitions. + * + * @param timeoutMs A time limit for this operation + * @return true iff the operation completed within the timeout */ - public void refreshCommittedOffsetsIfNeeded() { - Set missingFetchPositions = subscriptions.missingFetchPositions(); - Map offsets = fetchCommittedOffsets(missingFetchPositions); - for (Map.Entry entry : offsets.entrySet()) { - TopicPartition tp = entry.getKey(); - long offset = entry.getValue().offset(); + public boolean refreshCommittedOffsetsIfNeeded(final long timeoutMs) { + final Set missingFetchPositions = subscriptions.missingFetchPositions(); + + final Map offsets = fetchCommittedOffsets(missingFetchPositions, timeoutMs); + if (offsets == null) return false; + + for (final Map.Entry entry : offsets.entrySet()) { + final TopicPartition tp = entry.getKey(); + final long offset = entry.getValue().offset(); log.debug("Setting offset for partition {} to the committed offset {}", tp, offset); this.subscriptions.seek(tp, offset); } + return true; } /** * Fetch the current committed offsets from the coordinator for a set of partitions. + * * @param partitions The partitions to fetch offsets for - * @return A map from partition to the committed offset + * @return A map from partition to the committed offset or null if the operation timed out */ - public Map fetchCommittedOffsets(Set partitions) { - if (partitions.isEmpty()) - return Collections.emptyMap(); + public Map fetchCommittedOffsets(final Set partitions, + final long timeoutMs) { + if (partitions.isEmpty()) return Collections.emptyMap(); + + final Generation generation = generation(); + if (pendingCommittedOffsetRequest != null && !pendingCommittedOffsetRequest.sameRequest(partitions, generation)) { + // if we were waiting for a different request, then just clear it. + pendingCommittedOffsetRequest = null; + } + + final long startMs = time.milliseconds(); + long elapsedTime = 0L; while (true) { - ensureCoordinatorReady(); + if (!ensureCoordinatorReady(remainingTimeAtLeastZero(timeoutMs, elapsedTime))) return null; + elapsedTime = time.milliseconds() - startMs; // contact coordinator to fetch committed offsets - RequestFuture> future = sendOffsetFetchRequest(partitions); - client.poll(future); + final RequestFuture> future; + if (pendingCommittedOffsetRequest != null) { + future = pendingCommittedOffsetRequest.response; + } else { + future = sendOffsetFetchRequest(partitions); + pendingCommittedOffsetRequest = new PendingCommittedOffsetRequest(partitions, generation, future); - if (future.succeeded()) - return future.value(); + } + client.poll(future, remainingTimeAtLeastZero(timeoutMs, elapsedTime)); - if (!future.isRetriable()) - throw future.exception(); + if (future.isDone()) { + pendingCommittedOffsetRequest = null; - time.sleep(retryBackoffMs); + if (future.succeeded()) { + return future.value(); + } else if (!future.isRetriable()) { + throw future.exception(); + } else { + elapsedTime = time.milliseconds() - startMs; + final long sleepTime = Math.min(retryBackoffMs, remainingTimeAtLeastZero(startMs, elapsedTime)); + time.sleep(sleepTime); + elapsedTime += sleepTime; + } + } else { + return null; + } } } - public void close(long timeoutMs) { + public void close(final long timeoutMs) { // we do not need to re-enable wakeups since we are closing already client.disableWakeups(); long now = time.milliseconds(); - long endTimeMs = now + timeoutMs; + final long endTimeMs = now + timeoutMs; try { maybeAutoCommitOffsetsSync(timeoutMs); now = time.milliseconds(); if (pendingAsyncCommits.get() > 0 && endTimeMs > now) { - ensureCoordinatorReady(now, endTimeMs - now); + ensureCoordinatorReady(endTimeMs - now); now = time.milliseconds(); } } finally { @@ -587,7 +666,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { long remainingMs = timeoutMs; do { if (coordinatorUnknown()) { - if (!ensureCoordinatorReady(now, remainingMs)) + if (!ensureCoordinatorReady(remainingMs)) return false; remainingMs = timeoutMs - (time.milliseconds() - startMs); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java index e1598fa646d..7a9f717e28b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java @@ -164,9 +164,13 @@ public class ConsumerNetworkClient implements Closeable { * Ensure our metadata is fresh (if an update is expected, this will block * until it has completed). */ - public void ensureFreshMetadata() { - if (this.metadata.updateRequested() || this.metadata.timeToNextUpdate(time.milliseconds()) == 0) - awaitMetadataUpdate(); + boolean ensureFreshMetadata(final long timeout) { + if (this.metadata.updateRequested() || this.metadata.timeToNextUpdate(time.milliseconds()) == 0) { + return awaitMetadataUpdate(timeout); + } else { + // the metadata is already fresh + return true; + } } /** diff --git a/clients/src/main/java/org/apache/kafka/common/ClusterResourceListener.java b/clients/src/main/java/org/apache/kafka/common/ClusterResourceListener.java index f8f99ece183..97245a393b0 100644 --- a/clients/src/main/java/org/apache/kafka/common/ClusterResourceListener.java +++ b/clients/src/main/java/org/apache/kafka/common/ClusterResourceListener.java @@ -36,7 +36,8 @@ package org.apache.kafka.common; * {@link org.apache.kafka.common.serialization.Deserializer} : The {@link ClusterResourceListener#onUpdate(ClusterResource)} method will be invoked before {@link org.apache.kafka.common.serialization.Deserializer#deserialize(String, byte[])} *

    * {@link org.apache.kafka.common.metrics.MetricsReporter} : The {@link ClusterResourceListener#onUpdate(ClusterResource)} method will be invoked after first {@link org.apache.kafka.clients.producer.KafkaProducer#send(ProducerRecord)} invocation for Producer metrics reporter - * and after first {@link org.apache.kafka.clients.consumer.KafkaConsumer#poll(long)} invocation for Consumer metrics reporters. The reporter may receive metric events from the network layer before this method is invoked. + * and after first {@link org.apache.kafka.clients.consumer.KafkaConsumer#poll(java.time.Duration)} invocation for Consumer metrics + * reporters. The reporter may receive metric events from the network layer before this method is invoked. *

    Broker

    * There is a single invocation {@link ClusterResourceListener#onUpdate(ClusterResource)} on broker start-up and the cluster metadata will never change. *

    diff --git a/clients/src/main/java/org/apache/kafka/common/errors/WakeupException.java b/clients/src/main/java/org/apache/kafka/common/errors/WakeupException.java index 4726ec13cca..f8ae8403a5d 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/WakeupException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/WakeupException.java @@ -21,7 +21,7 @@ import org.apache.kafka.common.KafkaException; /** * Exception used to indicate preemption of a blocking operation by an external thread. * For example, {@link org.apache.kafka.clients.consumer.KafkaConsumer#wakeup} - * can be used to break out of an active {@link org.apache.kafka.clients.consumer.KafkaConsumer#poll(long)}, + * can be used to break out of an active {@link org.apache.kafka.clients.consumer.KafkaConsumer#poll(java.time.Duration)}, * which would raise an instance of this exception. */ public class WakeupException extends KafkaException { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 8c147a58f77..ce722cf12e3 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -78,6 +78,7 @@ import org.junit.Test; import org.junit.rules.ExpectedException; import java.nio.ByteBuffer; +import java.time.Duration; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -87,6 +88,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Queue; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -353,7 +355,12 @@ public class KafkaConsumerTest { // initial fetch client.prepareResponseFrom(fetchResponse(tp0, 0, 0), node); - consumer.poll(0); + // We need two update calls: + // 1. the first call "sends" the metadata update requests + // 2. the second one gets the response we already queued up + consumer.updateAssignmentMetadataIfNeeded(0L); + consumer.updateAssignmentMetadataIfNeeded(0L); + assertEquals(singleton(tp0), consumer.assignment()); AtomicBoolean heartbeatReceived = prepareHeartbeatResponse(client, coordinator); @@ -362,7 +369,8 @@ public class KafkaConsumerTest { time.sleep(heartbeatIntervalMs); Thread.sleep(heartbeatIntervalMs); - consumer.poll(0); + consumer.updateAssignmentMetadataIfNeeded(0L); + consumer.updateAssignmentMetadataIfNeeded(0L); assertTrue(heartbeatReceived.get()); consumer.close(0, TimeUnit.MILLISECONDS); @@ -385,7 +393,9 @@ public class KafkaConsumerTest { consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer)); Node coordinator = prepareRebalance(client, node, assignor, singletonList(tp0), null); - consumer.poll(0); + consumer.updateAssignmentMetadataIfNeeded(0L); + consumer.updateAssignmentMetadataIfNeeded(0L); + consumer.poll(Duration.ZERO); // respond to the outstanding fetch so that we have data available on the next poll client.respondFrom(fetchResponse(tp0, 0, 5), node); @@ -397,12 +407,63 @@ public class KafkaConsumerTest { time.sleep(heartbeatIntervalMs); Thread.sleep(heartbeatIntervalMs); - consumer.poll(0); + consumer.poll(Duration.ZERO); assertTrue(heartbeatReceived.get()); consumer.close(0, TimeUnit.MILLISECONDS); } + @Test + public void verifyPollTimesOutDuringMetadataUpdate() throws Exception { + final Time time = new MockTime(); + final Cluster cluster = TestUtils.singletonCluster(topic, 1); + final Node node = cluster.nodes().get(0); + + final Metadata metadata = createMetadata(); + metadata.update(cluster, Collections.emptySet(), time.milliseconds()); + + final MockClient client = new MockClient(time, metadata); + client.setNode(node); + final PartitionAssignor assignor = new RoundRobinAssignor(); + + final KafkaConsumer consumer = newConsumer(time, client, metadata, assignor, true); + consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer)); + prepareRebalance(client, node, assignor, singletonList(tp0), null); + + consumer.poll(Duration.ZERO); + + // The underlying client should NOT get a fetch request + final Queue requests = client.requests(); + Assert.assertEquals(0, requests.size()); + } + + @Test + public void verifyDeprecatedPollDoesNotTimeOutDuringMetadataUpdate() throws Exception { + final Time time = new MockTime(); + final Cluster cluster = TestUtils.singletonCluster(topic, 1); + final Node node = cluster.nodes().get(0); + + final Metadata metadata = createMetadata(); + metadata.update(cluster, Collections.emptySet(), time.milliseconds()); + + final MockClient client = new MockClient(time, metadata); + client.setNode(node); + final PartitionAssignor assignor = new RoundRobinAssignor(); + + final KafkaConsumer consumer = newConsumer(time, client, metadata, assignor, true); + consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer)); + prepareRebalance(client, node, assignor, singletonList(tp0), null); + + //noinspection deprecation + consumer.poll(0L); + + // The underlying client SHOULD get a fetch request + final Queue requests = client.requests(); + Assert.assertEquals(1, requests.size()); + final Class aClass = requests.peek().requestBuilder().getClass(); + Assert.assertEquals(FetchRequest.Builder.class, aClass); + } + @Test public void verifyNoCoordinatorLookupForManualAssignmentWithSeek() { Time time = new MockTime(); @@ -425,7 +486,7 @@ public class KafkaConsumerTest { client.prepareResponse(listOffsetsResponse(Collections.singletonMap(tp0, 50L))); client.prepareResponse(fetchResponse(tp0, 50L, 5)); - ConsumerRecords records = consumer.poll(5); + ConsumerRecords records = consumer.poll(Duration.ofMillis(1)); assertEquals(5, records.count()); assertEquals(55L, consumer.position(tp0)); consumer.close(0, TimeUnit.MILLISECONDS); @@ -474,7 +535,7 @@ public class KafkaConsumerTest { } }, fetchResponse(tp0, 50L, 5)); - ConsumerRecords records = consumer.poll(5); + ConsumerRecords records = consumer.poll(Duration.ofMillis(1)); assertEquals(5, records.count()); assertEquals(singleton(tp0), records.partitions()); } @@ -501,7 +562,7 @@ public class KafkaConsumerTest { // lookup committed offset and find nothing client.prepareResponseFrom(offsetResponse(Collections.singletonMap(tp0, -1L), Errors.NONE), coordinator); - consumer.poll(0); + consumer.poll(Duration.ZERO); } @Test @@ -525,7 +586,7 @@ public class KafkaConsumerTest { Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port()); client.prepareResponseFrom(offsetResponse(Collections.singletonMap(tp0, 539L), Errors.NONE), coordinator); - consumer.poll(0); + consumer.poll(Duration.ZERO); assertEquals(539L, consumer.position(tp0)); } @@ -553,7 +614,7 @@ public class KafkaConsumerTest { client.prepareResponseFrom(offsetResponse(Collections.singletonMap(tp0, -1L), Errors.NONE), coordinator); client.prepareResponse(listOffsetsResponse(Collections.singletonMap(tp0, 50L))); - consumer.poll(0); + consumer.poll(Duration.ZERO); assertEquals(50L, consumer.position(tp0)); } @@ -617,7 +678,9 @@ public class KafkaConsumerTest { consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer)); Node coordinator = prepareRebalance(client, node, assignor, singletonList(tp0), null); - consumer.poll(0); + consumer.updateAssignmentMetadataIfNeeded(0L); + consumer.updateAssignmentMetadataIfNeeded(0L); + consumer.poll(Duration.ZERO); // respond to the outstanding fetch so that we have data available on the next poll client.respondFrom(fetchResponse(tp0, 0, 5), node); @@ -630,7 +693,7 @@ public class KafkaConsumerTest { // no data has been returned to the user yet, so the committed offset should be 0 AtomicBoolean commitReceived = prepareOffsetCommitResponse(client, coordinator, tp0, 0); - consumer.poll(0); + consumer.poll(Duration.ZERO); assertTrue(commitReceived.get()); consumer.close(0, TimeUnit.MILLISECONDS); @@ -660,7 +723,9 @@ public class KafkaConsumerTest { client.prepareMetadataUpdate(cluster, Collections.emptySet()); - consumer.poll(0); + consumer.updateAssignmentMetadataIfNeeded(0L); + consumer.updateAssignmentMetadataIfNeeded(0L); + assertEquals(singleton(topic), consumer.subscription()); assertEquals(singleton(tp0), consumer.assignment()); consumer.close(0, TimeUnit.MILLISECONDS); @@ -694,13 +759,16 @@ public class KafkaConsumerTest { Node coordinator = prepareRebalance(client, node, singleton(topic), assignor, singletonList(tp0), null); consumer.subscribe(Pattern.compile(topic), getConsumerRebalanceListener(consumer)); - consumer.poll(0); + consumer.updateAssignmentMetadataIfNeeded(0L); + consumer.updateAssignmentMetadataIfNeeded(0L); + consumer.poll(Duration.ZERO); + assertEquals(singleton(topic), consumer.subscription()); consumer.subscribe(Pattern.compile(otherTopic), getConsumerRebalanceListener(consumer)); prepareRebalance(client, node, singleton(otherTopic), assignor, singletonList(otherTopicPartition), coordinator); - consumer.poll(0); + consumer.poll(Duration.ZERO); assertEquals(singleton(otherTopic), consumer.subscription()); consumer.close(0, TimeUnit.MILLISECONDS); @@ -723,7 +791,9 @@ public class KafkaConsumerTest { consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer)); prepareRebalance(client, node, assignor, singletonList(tp0), null); - consumer.poll(0); + consumer.updateAssignmentMetadataIfNeeded(0L); + consumer.updateAssignmentMetadataIfNeeded(0L); + consumer.poll(Duration.ZERO); // respond to the outstanding fetch so that we have data available on the next poll client.respondFrom(fetchResponse(tp0, 0, 5), node); @@ -732,7 +802,7 @@ public class KafkaConsumerTest { consumer.wakeup(); try { - consumer.poll(0); + consumer.poll(Duration.ZERO); fail(); } catch (WakeupException e) { } @@ -741,7 +811,7 @@ public class KafkaConsumerTest { assertEquals(0, consumer.position(tp0)); // the next poll should return the completed fetch - ConsumerRecords records = consumer.poll(0); + ConsumerRecords records = consumer.poll(Duration.ZERO); assertEquals(5, records.count()); // Increment time asynchronously to clear timeouts in closing the consumer final ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor(); @@ -773,13 +843,15 @@ public class KafkaConsumerTest { consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer)); prepareRebalance(client, node, assignor, singletonList(tp0), null); - consumer.poll(0); + consumer.updateAssignmentMetadataIfNeeded(0L); + consumer.updateAssignmentMetadataIfNeeded(0L); + consumer.poll(Duration.ZERO); // interrupt the thread and call poll try { Thread.currentThread().interrupt(); expectedException.expect(InterruptException.class); - consumer.poll(0); + consumer.poll(Duration.ZERO); } finally { // clear interrupted state again since this thread may be reused by JUnit Thread.interrupted(); @@ -810,7 +882,10 @@ public class KafkaConsumerTest { fetches1.put(t2p0, new FetchInfo(0, 10)); // not assigned and not fetched client.prepareResponseFrom(fetchResponse(fetches1), node); - ConsumerRecords records = consumer.poll(0); + consumer.updateAssignmentMetadataIfNeeded(0L); + consumer.updateAssignmentMetadataIfNeeded(0L); + + ConsumerRecords records = consumer.poll(Duration.ZERO); assertEquals(0, records.count()); consumer.close(0, TimeUnit.MILLISECONDS); } @@ -852,7 +927,9 @@ public class KafkaConsumerTest { // mock rebalance responses Node coordinator = prepareRebalance(client, node, assignor, Arrays.asList(tp0, t2p0), null); - consumer.poll(0); + consumer.updateAssignmentMetadataIfNeeded(0L); + consumer.updateAssignmentMetadataIfNeeded(0L); + consumer.poll(Duration.ZERO); // verify that subscription is still the same, and now assignment has caught up assertTrue(consumer.subscription().size() == 2); @@ -867,7 +944,7 @@ public class KafkaConsumerTest { client.respondFrom(fetchResponse(fetches1), node); client.poll(0, time.milliseconds()); - ConsumerRecords records = consumer.poll(0); + ConsumerRecords records = consumer.poll(Duration.ofMillis(1)); // clear out the prefetch so it doesn't interfere with the rest of the test fetches1.put(tp0, new FetchInfo(1, 0)); @@ -904,7 +981,7 @@ public class KafkaConsumerTest { fetches2.put(t3p0, new FetchInfo(0, 100)); client.prepareResponse(fetchResponse(fetches2)); - records = consumer.poll(0); + records = consumer.poll(Duration.ofMillis(1)); // verify that the fetch occurred as expected assertEquals(101, records.count()); @@ -965,13 +1042,15 @@ public class KafkaConsumerTest { // mock rebalance responses prepareRebalance(client, node, assignor, singletonList(tp0), null); - consumer.poll(0); + consumer.updateAssignmentMetadataIfNeeded(0L); + consumer.updateAssignmentMetadataIfNeeded(0L); + consumer.poll(Duration.ZERO); // verify that subscription is still the same, and now assignment has caught up assertTrue(consumer.subscription().equals(singleton(topic))); assertTrue(consumer.assignment().equals(singleton(tp0))); - consumer.poll(0); + consumer.poll(Duration.ZERO); // subscription change consumer.subscribe(singleton(topic2), getConsumerRebalanceListener(consumer)); @@ -1037,7 +1116,7 @@ public class KafkaConsumerTest { client.prepareResponse(listOffsetsResponse(Collections.singletonMap(tp0, 10L))); client.prepareResponse(fetchResponse(tp0, 10L, 1)); - ConsumerRecords records = consumer.poll(5); + ConsumerRecords records = consumer.poll(Duration.ofMillis(1)); assertEquals(1, records.count()); assertEquals(11L, consumer.position(tp0)); @@ -1096,7 +1175,7 @@ public class KafkaConsumerTest { client.prepareResponse(listOffsetsResponse(Collections.singletonMap(tp0, 10L))); client.prepareResponse(fetchResponse(tp0, 10L, 1)); - ConsumerRecords records = consumer.poll(5); + ConsumerRecords records = consumer.poll(Duration.ofMillis(1)); assertEquals(1, records.count()); assertEquals(11L, consumer.position(tp0)); @@ -1171,7 +1250,7 @@ public class KafkaConsumerTest { @Test(expected = IllegalStateException.class) public void testPollWithNoSubscription() { try (KafkaConsumer consumer = newConsumer()) { - consumer.poll(0); + consumer.poll(Duration.ZERO); } } @@ -1179,7 +1258,7 @@ public class KafkaConsumerTest { public void testPollWithEmptySubscription() { try (KafkaConsumer consumer = newConsumer()) { consumer.subscribe(Collections.emptyList()); - consumer.poll(0); + consumer.poll(Duration.ZERO); } } @@ -1187,7 +1266,7 @@ public class KafkaConsumerTest { public void testPollWithEmptyUserAssignment() { try (KafkaConsumer consumer = newConsumer()) { consumer.assign(Collections.emptySet()); - consumer.poll(0); + consumer.poll(Duration.ZERO); } } @@ -1220,7 +1299,7 @@ public class KafkaConsumerTest { @Test public void testCloseInterrupt() throws Exception { - consumerCloseTest(Long.MAX_VALUE, Collections.emptyList(), 0, true); + consumerCloseTest(Long.MAX_VALUE, Collections.emptyList(), 0, true); } @Test @@ -1270,7 +1349,9 @@ public class KafkaConsumerTest { client.prepareResponseFrom(fetchResponse(tp0, 0, 1), node); client.prepareResponseFrom(fetchResponse(tp0, 1, 0), node); - consumer.poll(0); + consumer.updateAssignmentMetadataIfNeeded(0L); + consumer.updateAssignmentMetadataIfNeeded(0L); + consumer.poll(Duration.ZERO); // heartbeat fails due to rebalance in progress client.prepareResponseFrom(new MockClient.RequestMatcher() { @@ -1306,7 +1387,9 @@ public class KafkaConsumerTest { }, fetchResponse(tp0, 1, 1), node); time.sleep(heartbeatIntervalMs); Thread.sleep(heartbeatIntervalMs); - final ConsumerRecords records = consumer.poll(0); + consumer.updateAssignmentMetadataIfNeeded(0L); + consumer.updateAssignmentMetadataIfNeeded(0L); + final ConsumerRecords records = consumer.poll(Duration.ZERO); assertFalse(records.isEmpty()); consumer.close(0, TimeUnit.MILLISECONDS); } @@ -1333,10 +1416,13 @@ public class KafkaConsumerTest { client.prepareMetadataUpdate(cluster, Collections.emptySet()); + consumer.updateAssignmentMetadataIfNeeded(0L); + consumer.updateAssignmentMetadataIfNeeded(0L); + // Poll with responses client.prepareResponseFrom(fetchResponse(tp0, 0, 1), node); client.prepareResponseFrom(fetchResponse(tp0, 1, 0), node); - consumer.poll(0); + consumer.poll(Duration.ZERO); // Initiate close() after a commit request on another thread. // Kafka consumer is single-threaded, but the implementation allows calls on a @@ -1457,7 +1543,7 @@ public class KafkaConsumerTest { } try { - consumer.poll(10); + consumer.poll(Duration.ZERO); fail("Expected an authentication error!"); } catch (AuthenticationException e) { // OK diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java index 67f8e8d6837..1d01eb6d0b2 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.record.TimestampType; import org.junit.Test; +import java.time.Duration; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -34,6 +35,32 @@ public class MockConsumerTest { @Test public void testSimpleMock() { + consumer.subscribe(Collections.singleton("test")); + assertEquals(0, consumer.poll(Duration.ZERO).count()); + consumer.rebalance(Arrays.asList(new TopicPartition("test", 0), new TopicPartition("test", 1))); + // Mock consumers need to seek manually since they cannot automatically reset offsets + HashMap beginningOffsets = new HashMap<>(); + beginningOffsets.put(new TopicPartition("test", 0), 0L); + beginningOffsets.put(new TopicPartition("test", 1), 0L); + consumer.updateBeginningOffsets(beginningOffsets); + consumer.seek(new TopicPartition("test", 0), 0); + ConsumerRecord rec1 = new ConsumerRecord<>("test", 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, "key1", "value1"); + ConsumerRecord rec2 = new ConsumerRecord<>("test", 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, "key2", "value2"); + consumer.addRecord(rec1); + consumer.addRecord(rec2); + ConsumerRecords recs = consumer.poll(Duration.ofMillis(1)); + Iterator> iter = recs.iterator(); + assertEquals(rec1, iter.next()); + assertEquals(rec2, iter.next()); + assertFalse(iter.hasNext()); + assertEquals(2L, consumer.position(new TopicPartition("test", 0))); + consumer.commitSync(); + assertEquals(2L, consumer.committed(new TopicPartition("test", 0)).offset()); + } + + @SuppressWarnings("deprecation") + @Test + public void testSimpleMockDeprecated() { consumer.subscribe(Collections.singleton("test")); assertEquals(0, consumer.poll(1000).count()); consumer.rebalance(Arrays.asList(new TopicPartition("test", 0), new TopicPartition("test", 1))); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java index 1c88803e26c..135763d3f41 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java @@ -102,7 +102,7 @@ public class AbstractCoordinatorTest { mockClient.blackout(coordinatorNode, 10L); long initialTime = mockTime.milliseconds(); - coordinator.ensureCoordinatorReady(); + coordinator.ensureCoordinatorReady(Long.MAX_VALUE); long endTime = mockTime.milliseconds(); assertTrue(endTime - initialTime >= RETRY_BACKOFF_MS); @@ -183,7 +183,7 @@ public class AbstractCoordinatorTest { assertTrue("New request sent while one is in progress", future == coordinator.lookupCoordinator()); mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - coordinator.ensureCoordinatorReady(); + coordinator.ensureCoordinatorReady(Long.MAX_VALUE); assertTrue("New request not sent after previous completed", future != coordinator.lookupCoordinator()); } @@ -527,7 +527,7 @@ public class AbstractCoordinatorTest { mockClient.authenticationFailed(node, 300); try { - coordinator.ensureCoordinatorReady(); + coordinator.ensureCoordinatorReady(Long.MAX_VALUE); fail("Expected an authentication error."); } catch (AuthenticationException e) { // OK @@ -537,7 +537,7 @@ public class AbstractCoordinatorTest { assertTrue(mockClient.connectionFailed(node)); try { - coordinator.ensureCoordinatorReady(); + coordinator.ensureCoordinatorReady(Long.MAX_VALUE); fail("Expected an authentication error."); } catch (AuthenticationException e) { // OK diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 030419075c6..18288735805 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -144,7 +144,7 @@ public class ConsumerCoordinatorTest { @Test public void testNormalHeartbeat() { client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - coordinator.ensureCoordinatorReady(); + coordinator.ensureCoordinatorReady(Long.MAX_VALUE); // normal heartbeat time.sleep(sessionTimeoutMs); @@ -162,7 +162,7 @@ public class ConsumerCoordinatorTest { @Test(expected = GroupAuthorizationException.class) public void testGroupDescribeUnauthorized() { client.prepareResponse(groupCoordinatorResponse(node, Errors.GROUP_AUTHORIZATION_FAILED)); - coordinator.ensureCoordinatorReady(); + coordinator.ensureCoordinatorReady(Long.MAX_VALUE); } @Test(expected = GroupAuthorizationException.class) @@ -170,17 +170,17 @@ public class ConsumerCoordinatorTest { subscriptions.subscribe(singleton(topic1), rebalanceListener); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - coordinator.ensureCoordinatorReady(); + coordinator.ensureCoordinatorReady(Long.MAX_VALUE); client.prepareResponse(joinGroupLeaderResponse(0, "memberId", Collections.>emptyMap(), Errors.GROUP_AUTHORIZATION_FAILED)); - coordinator.poll(time.milliseconds(), Long.MAX_VALUE); + coordinator.poll(Long.MAX_VALUE); } @Test public void testCoordinatorNotAvailable() { client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - coordinator.ensureCoordinatorReady(); + coordinator.ensureCoordinatorReady(Long.MAX_VALUE); // COORDINATOR_NOT_AVAILABLE will mark coordinator as unknown time.sleep(sessionTimeoutMs); @@ -201,7 +201,7 @@ public class ConsumerCoordinatorTest { @Test public void testManyInFlightAsyncCommitsWithCoordinatorDisconnect() throws Exception { client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - coordinator.ensureCoordinatorReady(); + coordinator.ensureCoordinatorReady(Long.MAX_VALUE); int numRequests = 1000; TopicPartition tp = new TopicPartition("foo", 0); @@ -233,7 +233,7 @@ public class ConsumerCoordinatorTest { // the coordinator as unknown which prevents additional retries to the same coordinator. client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - coordinator.ensureCoordinatorReady(); + coordinator.ensureCoordinatorReady(Long.MAX_VALUE); final AtomicBoolean asyncCallbackInvoked = new AtomicBoolean(false); Map offsets = singletonMap( @@ -259,7 +259,7 @@ public class ConsumerCoordinatorTest { @Test public void testNotCoordinator() { client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - coordinator.ensureCoordinatorReady(); + coordinator.ensureCoordinatorReady(Long.MAX_VALUE); // not_coordinator will mark coordinator as unknown time.sleep(sessionTimeoutMs); @@ -280,7 +280,7 @@ public class ConsumerCoordinatorTest { @Test public void testIllegalGeneration() { client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - coordinator.ensureCoordinatorReady(); + coordinator.ensureCoordinatorReady(Long.MAX_VALUE); // illegal_generation will cause re-partition subscriptions.subscribe(singleton(topic1), rebalanceListener); @@ -298,13 +298,13 @@ public class ConsumerCoordinatorTest { assertTrue(future.isDone()); assertTrue(future.failed()); assertEquals(Errors.ILLEGAL_GENERATION.exception(), future.exception()); - assertTrue(coordinator.needRejoin()); + assertTrue(coordinator.rejoinNeededOrPending()); } @Test public void testUnknownConsumerId() { client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - coordinator.ensureCoordinatorReady(); + coordinator.ensureCoordinatorReady(Long.MAX_VALUE); // illegal_generation will cause re-partition subscriptions.subscribe(singleton(topic1), rebalanceListener); @@ -322,13 +322,13 @@ public class ConsumerCoordinatorTest { assertTrue(future.isDone()); assertTrue(future.failed()); assertEquals(Errors.UNKNOWN_MEMBER_ID.exception(), future.exception()); - assertTrue(coordinator.needRejoin()); + assertTrue(coordinator.rejoinNeededOrPending()); } @Test public void testCoordinatorDisconnect() { client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - coordinator.ensureCoordinatorReady(); + coordinator.ensureCoordinatorReady(Long.MAX_VALUE); // coordinator disconnect will mark coordinator as unknown time.sleep(sessionTimeoutMs); @@ -357,11 +357,11 @@ public class ConsumerCoordinatorTest { metadata.update(cluster, Collections.emptySet(), time.milliseconds()); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - coordinator.ensureCoordinatorReady(); + coordinator.ensureCoordinatorReady(Long.MAX_VALUE); client.prepareResponse(joinGroupLeaderResponse(0, consumerId, Collections.>emptyMap(), Errors.INVALID_GROUP_ID)); - coordinator.poll(time.milliseconds(), Long.MAX_VALUE); + coordinator.poll(Long.MAX_VALUE); } @Test @@ -375,7 +375,7 @@ public class ConsumerCoordinatorTest { metadata.update(cluster, Collections.emptySet(), time.milliseconds()); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - coordinator.ensureCoordinatorReady(); + coordinator.ensureCoordinatorReady(Long.MAX_VALUE); // normal join group Map> memberSubscriptions = singletonMap(consumerId, singletonList(topic1)); @@ -391,9 +391,9 @@ public class ConsumerCoordinatorTest { sync.groupAssignment().containsKey(consumerId); } }, syncGroupResponse(singletonList(t1p), Errors.NONE)); - coordinator.poll(time.milliseconds(), Long.MAX_VALUE); + coordinator.poll(Long.MAX_VALUE); - assertFalse(coordinator.needRejoin()); + assertFalse(coordinator.rejoinNeededOrPending()); assertEquals(singleton(t1p), subscriptions.assignedPartitions()); assertEquals(singleton(topic1), subscriptions.groupSubscription()); assertEquals(1, rebalanceListener.revokedCount); @@ -414,7 +414,7 @@ public class ConsumerCoordinatorTest { metadata.update(TestUtils.singletonCluster(topic1, 1), Collections.emptySet(), time.milliseconds()); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - coordinator.ensureCoordinatorReady(); + coordinator.ensureCoordinatorReady(Long.MAX_VALUE); // normal join group Map> memberSubscriptions = singletonMap(consumerId, singletonList(topic1)); @@ -433,9 +433,9 @@ public class ConsumerCoordinatorTest { // expect client to force updating the metadata, if yes gives it both topics client.prepareMetadataUpdate(cluster, Collections.emptySet()); - coordinator.poll(time.milliseconds(), Long.MAX_VALUE); + coordinator.poll(Long.MAX_VALUE); - assertFalse(coordinator.needRejoin()); + assertFalse(coordinator.rejoinNeededOrPending()); assertEquals(2, subscriptions.assignedPartitions().size()); assertEquals(2, subscriptions.groupSubscription().size()); assertEquals(2, subscriptions.subscription().size()); @@ -456,7 +456,7 @@ public class ConsumerCoordinatorTest { assertEquals(singleton(topic1), subscriptions.subscription()); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - coordinator.ensureCoordinatorReady(); + coordinator.ensureCoordinatorReady(Long.MAX_VALUE); Map> initialSubscription = singletonMap(consumerId, singletonList(topic1)); partitionAssignor.prepare(singletonMap(consumerId, singletonList(t1p))); @@ -496,9 +496,9 @@ public class ConsumerCoordinatorTest { }, joinGroupLeaderResponse(2, consumerId, updatedSubscriptions, Errors.NONE)); client.prepareResponse(syncGroupResponse(newAssignment, Errors.NONE)); - coordinator.poll(time.milliseconds(), Long.MAX_VALUE); + coordinator.poll(Long.MAX_VALUE); - assertFalse(coordinator.needRejoin()); + assertFalse(coordinator.rejoinNeededOrPending()); assertEquals(updatedSubscriptionSet, subscriptions.subscription()); assertEquals(newAssignmentSet, subscriptions.assignedPartitions()); assertEquals(2, rebalanceListener.revokedCount); @@ -518,7 +518,7 @@ public class ConsumerCoordinatorTest { metadata.update(cluster, Collections.emptySet(), time.milliseconds()); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - coordinator.ensureCoordinatorReady(); + coordinator.ensureCoordinatorReady(Long.MAX_VALUE); Map> memberSubscriptions = singletonMap(consumerId, singletonList(topic1)); partitionAssignor.prepare(singletonMap(consumerId, singletonList(t1p))); @@ -528,16 +528,16 @@ public class ConsumerCoordinatorTest { consumerClient.wakeup(); try { - coordinator.poll(time.milliseconds(), Long.MAX_VALUE); + coordinator.poll(Long.MAX_VALUE); } catch (WakeupException e) { // ignore } // now complete the second half client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE)); - coordinator.poll(time.milliseconds(), Long.MAX_VALUE); + coordinator.poll(Long.MAX_VALUE); - assertFalse(coordinator.needRejoin()); + assertFalse(coordinator.rejoinNeededOrPending()); assertEquals(singleton(t1p), subscriptions.assignedPartitions()); assertEquals(1, rebalanceListener.revokedCount); assertEquals(Collections.emptySet(), rebalanceListener.revoked); @@ -552,7 +552,7 @@ public class ConsumerCoordinatorTest { subscriptions.subscribe(singleton(topic1), rebalanceListener); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - coordinator.ensureCoordinatorReady(); + coordinator.ensureCoordinatorReady(Long.MAX_VALUE); // normal join group client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE)); @@ -566,9 +566,9 @@ public class ConsumerCoordinatorTest { } }, syncGroupResponse(singletonList(t1p), Errors.NONE)); - coordinator.joinGroupIfNeeded(); + coordinator.joinGroupIfNeeded(Long.MAX_VALUE); - assertFalse(coordinator.needRejoin()); + assertFalse(coordinator.rejoinNeededOrPending()); assertEquals(singleton(t1p), subscriptions.assignedPartitions()); assertEquals(singleton(topic1), subscriptions.groupSubscription()); assertEquals(1, rebalanceListener.revokedCount); @@ -589,7 +589,7 @@ public class ConsumerCoordinatorTest { metadata.update(TestUtils.singletonCluster(topic1, 1), Collections.emptySet(), time.milliseconds()); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - coordinator.ensureCoordinatorReady(); + coordinator.ensureCoordinatorReady(Long.MAX_VALUE); // normal join group client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE)); @@ -605,9 +605,9 @@ public class ConsumerCoordinatorTest { // expect client to force updating the metadata, if yes gives it both topics client.prepareMetadataUpdate(cluster, Collections.emptySet()); - coordinator.joinGroupIfNeeded(); + coordinator.joinGroupIfNeeded(Long.MAX_VALUE); - assertFalse(coordinator.needRejoin()); + assertFalse(coordinator.rejoinNeededOrPending()); assertEquals(2, subscriptions.assignedPartitions().size()); assertEquals(2, subscriptions.subscription().size()); assertEquals(1, rebalanceListener.revokedCount); @@ -667,12 +667,12 @@ public class ConsumerCoordinatorTest { subscriptions.subscribe(singleton(topic1), rebalanceListener); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - coordinator.ensureCoordinatorReady(); + coordinator.ensureCoordinatorReady(Long.MAX_VALUE); // join initially, but let coordinator rebalance on sync client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE)); client.prepareResponse(syncGroupResponse(Collections.emptyList(), Errors.UNKNOWN_SERVER_ERROR)); - coordinator.joinGroupIfNeeded(); + coordinator.joinGroupIfNeeded(Long.MAX_VALUE); } @Test @@ -682,7 +682,7 @@ public class ConsumerCoordinatorTest { subscriptions.subscribe(singleton(topic1), rebalanceListener); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - coordinator.ensureCoordinatorReady(); + coordinator.ensureCoordinatorReady(Long.MAX_VALUE); // join initially, but let coordinator returns unknown member id client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE)); @@ -698,9 +698,9 @@ public class ConsumerCoordinatorTest { }, joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE)); client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE)); - coordinator.joinGroupIfNeeded(); + coordinator.joinGroupIfNeeded(Long.MAX_VALUE); - assertFalse(coordinator.needRejoin()); + assertFalse(coordinator.rejoinNeededOrPending()); assertEquals(singleton(t1p), subscriptions.assignedPartitions()); } @@ -711,7 +711,7 @@ public class ConsumerCoordinatorTest { subscriptions.subscribe(singleton(topic1), rebalanceListener); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - coordinator.ensureCoordinatorReady(); + coordinator.ensureCoordinatorReady(Long.MAX_VALUE); // join initially, but let coordinator rebalance on sync client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE)); @@ -721,9 +721,9 @@ public class ConsumerCoordinatorTest { client.prepareResponse(joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE)); client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE)); - coordinator.joinGroupIfNeeded(); + coordinator.joinGroupIfNeeded(Long.MAX_VALUE); - assertFalse(coordinator.needRejoin()); + assertFalse(coordinator.rejoinNeededOrPending()); assertEquals(singleton(t1p), subscriptions.assignedPartitions()); } @@ -734,7 +734,7 @@ public class ConsumerCoordinatorTest { subscriptions.subscribe(singleton(topic1), rebalanceListener); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - coordinator.ensureCoordinatorReady(); + coordinator.ensureCoordinatorReady(Long.MAX_VALUE); // join initially, but let coordinator rebalance on sync client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE)); @@ -750,9 +750,9 @@ public class ConsumerCoordinatorTest { }, joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE)); client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE)); - coordinator.joinGroupIfNeeded(); + coordinator.joinGroupIfNeeded(Long.MAX_VALUE); - assertFalse(coordinator.needRejoin()); + assertFalse(coordinator.rejoinNeededOrPending()); assertEquals(singleton(t1p), subscriptions.assignedPartitions()); } @@ -767,7 +767,7 @@ public class ConsumerCoordinatorTest { subscriptions.subscribe(singleton(topic1), rebalanceListener); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - coordinator.ensureCoordinatorReady(); + coordinator.ensureCoordinatorReady(Long.MAX_VALUE); Map> memberSubscriptions = singletonMap(consumerId, singletonList(topic1)); partitionAssignor.prepare(singletonMap(consumerId, singletonList(t1p))); @@ -776,15 +776,15 @@ public class ConsumerCoordinatorTest { client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE)); client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE)); - coordinator.poll(time.milliseconds(), Long.MAX_VALUE); + coordinator.poll(Long.MAX_VALUE); - assertFalse(coordinator.needRejoin()); + assertFalse(coordinator.rejoinNeededOrPending()); // a new partition is added to the topic metadata.update(TestUtils.singletonCluster(topic1, 2), Collections.emptySet(), time.milliseconds()); // we should detect the change and ask for reassignment - assertTrue(coordinator.needRejoin()); + assertTrue(coordinator.rejoinNeededOrPending()); } @Test @@ -804,7 +804,7 @@ public class ConsumerCoordinatorTest { metadata.update(TestUtils.singletonCluster(topic1, 1), Collections.emptySet(), time.milliseconds()); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - coordinator.ensureCoordinatorReady(); + coordinator.ensureCoordinatorReady(Long.MAX_VALUE); // prepare initial rebalance Map> memberSubscriptions = singletonMap(consumerId, topics); @@ -833,9 +833,9 @@ public class ConsumerCoordinatorTest { client.prepareResponse(joinGroupLeaderResponse(2, consumerId, memberSubscriptions, Errors.NONE)); client.prepareResponse(syncGroupResponse(Arrays.asList(tp1, tp2), Errors.NONE)); - coordinator.poll(time.milliseconds(), Long.MAX_VALUE); + coordinator.poll(Long.MAX_VALUE); - assertFalse(coordinator.needRejoin()); + assertFalse(coordinator.rejoinNeededOrPending()); assertEquals(new HashSet<>(Arrays.asList(tp1, tp2)), subscriptions.assignedPartitions()); } @@ -873,16 +873,16 @@ public class ConsumerCoordinatorTest { subscriptions.subscribe(singleton(topic1), rebalanceListener); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - coordinator.ensureCoordinatorReady(); + coordinator.ensureCoordinatorReady(Long.MAX_VALUE); Map> memberSubscriptions = singletonMap(consumerId, singletonList(topic1)); partitionAssignor.prepare(Collections.>emptyMap()); client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE)); client.prepareResponse(syncGroupResponse(Collections.emptyList(), Errors.NONE)); - coordinator.poll(time.milliseconds(), Long.MAX_VALUE); + coordinator.poll(Long.MAX_VALUE); if (!assign) { - assertFalse(coordinator.needRejoin()); + assertFalse(coordinator.rejoinNeededOrPending()); assertEquals(Collections.emptySet(), rebalanceListener.assigned); } assertTrue("Metadata refresh not requested for unavailable partitions", metadata.updateRequested()); @@ -891,11 +891,11 @@ public class ConsumerCoordinatorTest { client.poll(0, time.milliseconds()); client.prepareResponse(joinGroupLeaderResponse(2, consumerId, memberSubscriptions, Errors.NONE)); client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE)); - coordinator.poll(time.milliseconds(), Long.MAX_VALUE); + coordinator.poll(Long.MAX_VALUE); assertFalse("Metadata refresh requested unnecessarily", metadata.updateRequested()); if (!assign) { - assertFalse(coordinator.needRejoin()); + assertFalse(coordinator.rejoinNeededOrPending()); assertEquals(singleton(t1p), rebalanceListener.assigned); } } @@ -937,7 +937,7 @@ public class ConsumerCoordinatorTest { subscriptions.subscribe(new HashSet<>(Arrays.asList(topic1, otherTopic)), rebalanceListener); client.prepareResponse(joinGroupFollowerResponse(2, "consumer", "leader", Errors.NONE)); client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE)); - coordinator.joinGroupIfNeeded(); + coordinator.joinGroupIfNeeded(Long.MAX_VALUE); assertEquals(2, rebalanceListener.revokedCount); assertEquals(singleton(t1p), rebalanceListener.revoked); @@ -950,16 +950,16 @@ public class ConsumerCoordinatorTest { subscriptions.subscribe(singleton(topic1), rebalanceListener); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - coordinator.ensureCoordinatorReady(); + coordinator.ensureCoordinatorReady(Long.MAX_VALUE); // disconnected from original coordinator will cause re-discover and join again client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE), true); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE)); client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE)); - coordinator.joinGroupIfNeeded(); + coordinator.joinGroupIfNeeded(Long.MAX_VALUE); - assertFalse(coordinator.needRejoin()); + assertFalse(coordinator.rejoinNeededOrPending()); assertEquals(singleton(t1p), subscriptions.assignedPartitions()); assertEquals(1, rebalanceListener.revokedCount); assertEquals(1, rebalanceListener.assignedCount); @@ -971,11 +971,11 @@ public class ConsumerCoordinatorTest { subscriptions.subscribe(singleton(topic1), rebalanceListener); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - coordinator.ensureCoordinatorReady(); + coordinator.ensureCoordinatorReady(Long.MAX_VALUE); // coordinator doesn't like the session timeout client.prepareResponse(joinGroupFollowerResponse(0, "consumer", "", Errors.INVALID_SESSION_TIMEOUT)); - coordinator.joinGroupIfNeeded(); + coordinator.joinGroupIfNeeded(Long.MAX_VALUE); } @Test @@ -983,7 +983,7 @@ public class ConsumerCoordinatorTest { subscriptions.assignFromUser(singleton(t1p)); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - coordinator.ensureCoordinatorReady(); + coordinator.ensureCoordinatorReady(Long.MAX_VALUE); prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NONE); @@ -1005,7 +1005,7 @@ public class ConsumerCoordinatorTest { private void testInFlightRequestsFailedAfterCoordinatorMarkedDead(Errors error) { client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - coordinator.ensureCoordinatorReady(); + coordinator.ensureCoordinatorReady(Long.MAX_VALUE); // Send two async commits and fail the first one with an error. // This should cause a coordinator disconnect which will cancel the second request. @@ -1038,7 +1038,7 @@ public class ConsumerCoordinatorTest { prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NONE); time.sleep(autoCommitIntervalMs); - coordinator.poll(time.milliseconds(), Long.MAX_VALUE); + coordinator.poll(Long.MAX_VALUE); assertFalse(client.hasPendingResponses()); } @@ -1055,23 +1055,23 @@ public class ConsumerCoordinatorTest { // Send an offset commit, but let it fail with a retriable error prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NOT_COORDINATOR); - coordinator.poll(time.milliseconds(), Long.MAX_VALUE); + coordinator.poll(Long.MAX_VALUE); assertTrue(coordinator.coordinatorUnknown()); // After the disconnect, we should rediscover the coordinator client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - coordinator.poll(time.milliseconds(), Long.MAX_VALUE); + coordinator.poll(Long.MAX_VALUE); subscriptions.seek(t1p, 200); // Until the retry backoff has expired, we should not retry the offset commit time.sleep(retryBackoffMs / 2); - coordinator.poll(time.milliseconds(), Long.MAX_VALUE); + coordinator.poll(Long.MAX_VALUE); assertEquals(0, client.inFlightRequestCount()); // Once the backoff expires, we should retry time.sleep(retryBackoffMs / 2); - coordinator.poll(time.milliseconds(), Long.MAX_VALUE); + coordinator.poll(Long.MAX_VALUE); assertEquals(1, client.inFlightRequestCount()); respondToOffsetCommitRequest(singletonMap(t1p, 200L), Errors.NONE); } @@ -1088,28 +1088,28 @@ public class ConsumerCoordinatorTest { time.sleep(autoCommitIntervalMs); // Send the offset commit request, but do not respond - coordinator.poll(time.milliseconds(), Long.MAX_VALUE); + coordinator.poll(Long.MAX_VALUE); assertEquals(1, client.inFlightRequestCount()); time.sleep(autoCommitIntervalMs / 2); // Ensure that no additional offset commit is sent - coordinator.poll(time.milliseconds(), Long.MAX_VALUE); + coordinator.poll(Long.MAX_VALUE); assertEquals(1, client.inFlightRequestCount()); respondToOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NONE); - coordinator.poll(time.milliseconds(), Long.MAX_VALUE); + coordinator.poll(Long.MAX_VALUE); assertEquals(0, client.inFlightRequestCount()); subscriptions.seek(t1p, 200); // If we poll again before the auto-commit interval, there should be no new sends - coordinator.poll(time.milliseconds(), Long.MAX_VALUE); + coordinator.poll(Long.MAX_VALUE); assertEquals(0, client.inFlightRequestCount()); // After the remainder of the interval passes, we send a new offset commit time.sleep(autoCommitIntervalMs / 2); - coordinator.poll(time.milliseconds(), Long.MAX_VALUE); + coordinator.poll(Long.MAX_VALUE); assertEquals(1, client.inFlightRequestCount()); respondToOffsetCommitRequest(singletonMap(t1p, 200L), Errors.NONE); } @@ -1124,7 +1124,7 @@ public class ConsumerCoordinatorTest { subscriptions.subscribe(singleton(topic1), rebalanceListener); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - coordinator.ensureCoordinatorReady(); + coordinator.ensureCoordinatorReady(Long.MAX_VALUE); // haven't joined, so should not cause a commit time.sleep(autoCommitIntervalMs); @@ -1132,13 +1132,13 @@ public class ConsumerCoordinatorTest { client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE)); client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE)); - coordinator.joinGroupIfNeeded(); + coordinator.joinGroupIfNeeded(Long.MAX_VALUE); subscriptions.seek(t1p, 100); prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NONE); time.sleep(autoCommitIntervalMs); - coordinator.poll(time.milliseconds(), Long.MAX_VALUE); + coordinator.poll(Long.MAX_VALUE); assertFalse(client.hasPendingResponses()); } @@ -1151,11 +1151,11 @@ public class ConsumerCoordinatorTest { subscriptions.seek(t1p, 100); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - coordinator.ensureCoordinatorReady(); + coordinator.ensureCoordinatorReady(Long.MAX_VALUE); prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NONE); time.sleep(autoCommitIntervalMs); - coordinator.poll(time.milliseconds(), Long.MAX_VALUE); + coordinator.poll(Long.MAX_VALUE); assertFalse(client.hasPendingResponses()); } @@ -1174,12 +1174,12 @@ public class ConsumerCoordinatorTest { // now find the coordinator client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - coordinator.ensureCoordinatorReady(); + coordinator.ensureCoordinatorReady(Long.MAX_VALUE); // sleep only for the retry backoff time.sleep(retryBackoffMs); prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NONE); - coordinator.poll(time.milliseconds(), Long.MAX_VALUE); + coordinator.poll(Long.MAX_VALUE); assertFalse(client.hasPendingResponses()); } @@ -1188,7 +1188,7 @@ public class ConsumerCoordinatorTest { subscriptions.assignFromUser(singleton(t1p)); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - coordinator.ensureCoordinatorReady(); + coordinator.ensureCoordinatorReady(Long.MAX_VALUE); prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NONE); @@ -1204,7 +1204,7 @@ public class ConsumerCoordinatorTest { public void testCommitOffsetAsyncWithDefaultCallback() { int invokedBeforeTest = mockOffsetCommitCallback.invoked; client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - coordinator.ensureCoordinatorReady(); + coordinator.ensureCoordinatorReady(Long.MAX_VALUE); prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NONE); coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), mockOffsetCommitCallback); coordinator.invokeCompletedOffsetCommitCallbacks(); @@ -1245,7 +1245,7 @@ public class ConsumerCoordinatorTest { public void testCommitOffsetAsyncFailedWithDefaultCallback() { int invokedBeforeTest = mockOffsetCommitCallback.invoked; client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - coordinator.ensureCoordinatorReady(); + coordinator.ensureCoordinatorReady(Long.MAX_VALUE); prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.COORDINATOR_NOT_AVAILABLE); coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), mockOffsetCommitCallback); coordinator.invokeCompletedOffsetCommitCallbacks(); @@ -1256,7 +1256,7 @@ public class ConsumerCoordinatorTest { @Test public void testCommitOffsetAsyncCoordinatorNotAvailable() { client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - coordinator.ensureCoordinatorReady(); + coordinator.ensureCoordinatorReady(Long.MAX_VALUE); // async commit with coordinator not available MockCommitCallback cb = new MockCommitCallback(); @@ -1272,7 +1272,7 @@ public class ConsumerCoordinatorTest { @Test public void testCommitOffsetAsyncNotCoordinator() { client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - coordinator.ensureCoordinatorReady(); + coordinator.ensureCoordinatorReady(Long.MAX_VALUE); // async commit with not coordinator MockCommitCallback cb = new MockCommitCallback(); @@ -1288,7 +1288,7 @@ public class ConsumerCoordinatorTest { @Test public void testCommitOffsetAsyncDisconnected() { client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - coordinator.ensureCoordinatorReady(); + coordinator.ensureCoordinatorReady(Long.MAX_VALUE); // async commit with coordinator disconnected MockCommitCallback cb = new MockCommitCallback(); @@ -1304,7 +1304,7 @@ public class ConsumerCoordinatorTest { @Test public void testCommitOffsetSyncNotCoordinator() { client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - coordinator.ensureCoordinatorReady(); + coordinator.ensureCoordinatorReady(Long.MAX_VALUE); // sync commit with coordinator disconnected (should connect, get metadata, and then submit the commit request) prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NOT_COORDINATOR); @@ -1316,7 +1316,7 @@ public class ConsumerCoordinatorTest { @Test public void testCommitOffsetSyncCoordinatorNotAvailable() { client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - coordinator.ensureCoordinatorReady(); + coordinator.ensureCoordinatorReady(Long.MAX_VALUE); // sync commit with coordinator disconnected (should connect, get metadata, and then submit the commit request) prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.COORDINATOR_NOT_AVAILABLE); @@ -1328,7 +1328,7 @@ public class ConsumerCoordinatorTest { @Test public void testCommitOffsetSyncCoordinatorDisconnected() { client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - coordinator.ensureCoordinatorReady(); + coordinator.ensureCoordinatorReady(Long.MAX_VALUE); // sync commit with coordinator disconnected (should connect, get metadata, and then submit the commit request) prepareOffsetCommitRequestDisconnect(singletonMap(t1p, 100L)); @@ -1340,7 +1340,7 @@ public class ConsumerCoordinatorTest { @Test public void testAsyncCommitCallbacksInvokedPriorToSyncCommitCompletion() throws Exception { client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - coordinator.ensureCoordinatorReady(); + coordinator.ensureCoordinatorReady(Long.MAX_VALUE); final List committedOffsets = Collections.synchronizedList(new ArrayList()); final OffsetAndMetadata firstOffset = new OffsetAndMetadata(0L); @@ -1376,7 +1376,7 @@ public class ConsumerCoordinatorTest { @Test public void testRetryCommitUnknownTopicOrPartition() { client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - coordinator.ensureCoordinatorReady(); + coordinator.ensureCoordinatorReady(Long.MAX_VALUE); client.prepareResponse(offsetCommitResponse(singletonMap(t1p, Errors.UNKNOWN_TOPIC_OR_PARTITION))); client.prepareResponse(offsetCommitResponse(singletonMap(t1p, Errors.NONE))); @@ -1388,7 +1388,7 @@ public class ConsumerCoordinatorTest { public void testCommitOffsetMetadataTooLarge() { // since offset metadata is provided by the user, we have to propagate the exception so they can handle it client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - coordinator.ensureCoordinatorReady(); + coordinator.ensureCoordinatorReady(Long.MAX_VALUE); prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.OFFSET_METADATA_TOO_LARGE); coordinator.commitOffsetsSync(singletonMap(t1p, new OffsetAndMetadata(100L, "metadata")), Long.MAX_VALUE); @@ -1398,7 +1398,7 @@ public class ConsumerCoordinatorTest { public void testCommitOffsetIllegalGeneration() { // we cannot retry if a rebalance occurs before the commit completed client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - coordinator.ensureCoordinatorReady(); + coordinator.ensureCoordinatorReady(Long.MAX_VALUE); prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.ILLEGAL_GENERATION); coordinator.commitOffsetsSync(singletonMap(t1p, new OffsetAndMetadata(100L, "metadata")), Long.MAX_VALUE); @@ -1408,7 +1408,7 @@ public class ConsumerCoordinatorTest { public void testCommitOffsetUnknownMemberId() { // we cannot retry if a rebalance occurs before the commit completed client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - coordinator.ensureCoordinatorReady(); + coordinator.ensureCoordinatorReady(Long.MAX_VALUE); prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.UNKNOWN_MEMBER_ID); coordinator.commitOffsetsSync(singletonMap(t1p, new OffsetAndMetadata(100L, "metadata")), Long.MAX_VALUE); @@ -1418,7 +1418,7 @@ public class ConsumerCoordinatorTest { public void testCommitOffsetRebalanceInProgress() { // we cannot retry if a rebalance occurs before the commit completed client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - coordinator.ensureCoordinatorReady(); + coordinator.ensureCoordinatorReady(Long.MAX_VALUE); prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.REBALANCE_IN_PROGRESS); coordinator.commitOffsetsSync(singletonMap(t1p, new OffsetAndMetadata(100L, "metadata")), Long.MAX_VALUE); @@ -1427,7 +1427,7 @@ public class ConsumerCoordinatorTest { @Test(expected = KafkaException.class) public void testCommitOffsetSyncCallbackWithNonRetriableException() { client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - coordinator.ensureCoordinatorReady(); + coordinator.ensureCoordinatorReady(Long.MAX_VALUE); // sync commit with invalid partitions should throw if we have no callback prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.UNKNOWN_SERVER_ERROR); @@ -1453,18 +1453,18 @@ public class ConsumerCoordinatorTest { @Test public void testCommitOffsetSyncWithoutFutureGetsCompleted() { client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - coordinator.ensureCoordinatorReady(); + coordinator.ensureCoordinatorReady(Long.MAX_VALUE); assertFalse(coordinator.commitOffsetsSync(singletonMap(t1p, new OffsetAndMetadata(100L)), 0)); } @Test public void testRefreshOffset() { client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - coordinator.ensureCoordinatorReady(); + coordinator.ensureCoordinatorReady(Long.MAX_VALUE); subscriptions.assignFromUser(singleton(t1p)); client.prepareResponse(offsetFetchResponse(t1p, Errors.NONE, "", 100L)); - coordinator.refreshCommittedOffsetsIfNeeded(); + coordinator.refreshCommittedOffsetsIfNeeded(Long.MAX_VALUE); assertEquals(Collections.emptySet(), subscriptions.missingFetchPositions()); assertTrue(subscriptions.hasAllFetchPositions()); @@ -1474,12 +1474,12 @@ public class ConsumerCoordinatorTest { @Test public void testRefreshOffsetLoadInProgress() { client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - coordinator.ensureCoordinatorReady(); + coordinator.ensureCoordinatorReady(Long.MAX_VALUE); subscriptions.assignFromUser(singleton(t1p)); client.prepareResponse(offsetFetchResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS)); client.prepareResponse(offsetFetchResponse(t1p, Errors.NONE, "", 100L)); - coordinator.refreshCommittedOffsetsIfNeeded(); + coordinator.refreshCommittedOffsetsIfNeeded(Long.MAX_VALUE); assertEquals(Collections.emptySet(), subscriptions.missingFetchPositions()); assertTrue(subscriptions.hasAllFetchPositions()); @@ -1489,12 +1489,12 @@ public class ConsumerCoordinatorTest { @Test public void testRefreshOffsetsGroupNotAuthorized() { client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - coordinator.ensureCoordinatorReady(); + coordinator.ensureCoordinatorReady(Long.MAX_VALUE); subscriptions.assignFromUser(singleton(t1p)); client.prepareResponse(offsetFetchResponse(Errors.GROUP_AUTHORIZATION_FAILED)); try { - coordinator.refreshCommittedOffsetsIfNeeded(); + coordinator.refreshCommittedOffsetsIfNeeded(Long.MAX_VALUE); fail("Expected group authorization error"); } catch (GroupAuthorizationException e) { assertEquals(groupId, e.groupId()); @@ -1504,23 +1504,23 @@ public class ConsumerCoordinatorTest { @Test(expected = KafkaException.class) public void testRefreshOffsetUnknownTopicOrPartition() { client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - coordinator.ensureCoordinatorReady(); + coordinator.ensureCoordinatorReady(Long.MAX_VALUE); subscriptions.assignFromUser(singleton(t1p)); client.prepareResponse(offsetFetchResponse(t1p, Errors.UNKNOWN_TOPIC_OR_PARTITION, "", 100L)); - coordinator.refreshCommittedOffsetsIfNeeded(); + coordinator.refreshCommittedOffsetsIfNeeded(Long.MAX_VALUE); } @Test public void testRefreshOffsetNotCoordinatorForConsumer() { client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - coordinator.ensureCoordinatorReady(); + coordinator.ensureCoordinatorReady(Long.MAX_VALUE); subscriptions.assignFromUser(singleton(t1p)); client.prepareResponse(offsetFetchResponse(Errors.NOT_COORDINATOR)); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); client.prepareResponse(offsetFetchResponse(t1p, Errors.NONE, "", 100L)); - coordinator.refreshCommittedOffsetsIfNeeded(); + coordinator.refreshCommittedOffsetsIfNeeded(Long.MAX_VALUE); assertEquals(Collections.emptySet(), subscriptions.missingFetchPositions()); assertTrue(subscriptions.hasAllFetchPositions()); @@ -1530,11 +1530,11 @@ public class ConsumerCoordinatorTest { @Test public void testRefreshOffsetWithNoFetchableOffsets() { client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - coordinator.ensureCoordinatorReady(); + coordinator.ensureCoordinatorReady(Long.MAX_VALUE); subscriptions.assignFromUser(singleton(t1p)); client.prepareResponse(offsetFetchResponse(t1p, Errors.NONE, "", -1L)); - coordinator.refreshCommittedOffsetsIfNeeded(); + coordinator.refreshCommittedOffsetsIfNeeded(Long.MAX_VALUE); assertEquals(Collections.singleton(t1p), subscriptions.missingFetchPositions()); assertEquals(Collections.emptySet(), subscriptions.partitionsNeedingReset(time.milliseconds())); @@ -1548,7 +1548,7 @@ public class ConsumerCoordinatorTest { subscriptions.assignFromUser(singleton(t1p)); subscriptions.seek(t1p, 500L); - coordinator.refreshCommittedOffsetsIfNeeded(); + coordinator.refreshCommittedOffsetsIfNeeded(Long.MAX_VALUE); assertEquals(Collections.emptySet(), subscriptions.missingFetchPositions()); assertTrue(subscriptions.hasAllFetchPositions()); @@ -1562,7 +1562,7 @@ public class ConsumerCoordinatorTest { subscriptions.assignFromUser(singleton(t1p)); subscriptions.requestOffsetReset(t1p, OffsetResetStrategy.EARLIEST); - coordinator.refreshCommittedOffsetsIfNeeded(); + coordinator.refreshCommittedOffsetsIfNeeded(Long.MAX_VALUE); assertEquals(Collections.emptySet(), subscriptions.missingFetchPositions()); assertFalse(subscriptions.hasAllFetchPositions()); @@ -1741,17 +1741,17 @@ public class ConsumerCoordinatorTest { ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors, ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, autoCommit, leaveGroup); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - coordinator.ensureCoordinatorReady(); + coordinator.ensureCoordinatorReady(Long.MAX_VALUE); if (useGroupManagement) { subscriptions.subscribe(singleton(topic1), rebalanceListener); client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE)); client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE)); - coordinator.joinGroupIfNeeded(); + coordinator.joinGroupIfNeeded(Long.MAX_VALUE); } else subscriptions.assignFromUser(singleton(t1p)); subscriptions.seek(t1p, 100); - coordinator.poll(time.milliseconds(), Long.MAX_VALUE); + coordinator.poll(Long.MAX_VALUE); return coordinator; } @@ -1910,10 +1910,10 @@ public class ConsumerCoordinatorTest { ConsumerCoordinator coordinator, List assignment) { client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - coordinator.ensureCoordinatorReady(); + coordinator.ensureCoordinatorReady(Long.MAX_VALUE); client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE)); client.prepareResponse(syncGroupResponse(assignment, Errors.NONE)); - coordinator.joinGroupIfNeeded(); + coordinator.joinGroupIfNeeded(Long.MAX_VALUE); } private void prepareOffsetCommitRequest(Map expectedOffsets, Errors error) { diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java index c64597b7e8b..413997f2931 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java @@ -38,6 +38,7 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import java.time.Duration; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -85,12 +86,12 @@ public class ClientAuthenticationFailureTest { try (KafkaConsumer consumer = new KafkaConsumer<>(props, deserializer, deserializer)) { consumer.subscribe(Arrays.asList(topic)); - consumer.poll(100); + consumer.poll(Duration.ofSeconds(10)); fail("Expected an authentication error!"); } catch (SaslAuthenticationException e) { // OK } catch (Exception e) { - fail("Expected only an authentication error, but another error occurred: " + e.getMessage()); + throw new AssertionError("Expected only an authentication error, but another error occurred.", e); } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java index 60407c1d0dc..796ecdf2dfe 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java @@ -103,6 +103,12 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos return "connect"; } + // expose for tests + @Override + protected synchronized boolean ensureCoordinatorReady(final long timeoutMs) { + return super.ensureCoordinatorReady(timeoutMs); + } + public void poll(long timeout) { // poll for io until the timeout expires final long start = time.milliseconds(); @@ -111,11 +117,11 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos do { if (coordinatorUnknown()) { - ensureCoordinatorReady(); + ensureCoordinatorReady(Long.MAX_VALUE); now = time.milliseconds(); } - if (needRejoin()) { + if (rejoinNeededOrPending()) { ensureActiveGroup(); now = time.milliseconds(); } @@ -282,8 +288,8 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos } @Override - protected boolean needRejoin() { - return super.needRejoin() || (assignmentSnapshot == null || assignmentSnapshot.failed()) || rejoinRequested; + protected boolean rejoinNeededOrPending() { + return super.rejoinNeededOrPending() || (assignmentSnapshot == null || assignmentSnapshot.failed()) || rejoinRequested; } public String memberId() { @@ -298,13 +304,13 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos } public String ownerUrl(String connector) { - if (needRejoin() || !isLeader()) + if (rejoinNeededOrPending() || !isLeader()) return null; return leaderState.ownerUrl(connector); } public String ownerUrl(ConnectorTaskId task) { - if (needRejoin() || !isLeader()) + if (rejoinNeededOrPending() || !isLeader()) return null; return leaderState.ownerUrl(task); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java index 154b1df5e73..e1017f2654f 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java @@ -208,7 +208,7 @@ public class WorkerCoordinatorTest { final String consumerId = "leader"; client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - coordinator.ensureCoordinatorReady(); + coordinator.ensureCoordinatorReady(Long.MAX_VALUE); // normal join group Map memberConfigOffsets = new HashMap<>(); @@ -227,7 +227,7 @@ public class WorkerCoordinatorTest { Collections.emptyList(), Errors.NONE)); coordinator.ensureActiveGroup(); - assertFalse(coordinator.needRejoin()); + assertFalse(coordinator.rejoinNeededOrPending()); assertEquals(0, rebalanceListener.revokedCount); assertEquals(1, rebalanceListener.assignedCount); assertFalse(rebalanceListener.assignment.failed()); @@ -248,7 +248,7 @@ public class WorkerCoordinatorTest { final String memberId = "member"; client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - coordinator.ensureCoordinatorReady(); + coordinator.ensureCoordinatorReady(Long.MAX_VALUE); // normal join group client.prepareResponse(joinGroupFollowerResponse(1, memberId, "leader", Errors.NONE)); @@ -264,7 +264,7 @@ public class WorkerCoordinatorTest { Collections.singletonList(taskId1x0), Errors.NONE)); coordinator.ensureActiveGroup(); - assertFalse(coordinator.needRejoin()); + assertFalse(coordinator.rejoinNeededOrPending()); assertEquals(0, rebalanceListener.revokedCount); assertEquals(1, rebalanceListener.assignedCount); assertFalse(rebalanceListener.assignment.failed()); @@ -289,7 +289,7 @@ public class WorkerCoordinatorTest { final String memberId = "member"; client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - coordinator.ensureCoordinatorReady(); + coordinator.ensureCoordinatorReady(Long.MAX_VALUE); // config mismatch results in assignment error client.prepareResponse(joinGroupFollowerResponse(1, memberId, "leader", Errors.NONE)); @@ -320,7 +320,7 @@ public class WorkerCoordinatorTest { PowerMock.replayAll(); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - coordinator.ensureCoordinatorReady(); + coordinator.ensureCoordinatorReady(Long.MAX_VALUE); // join the group once client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE)); diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala index a1222977751..034557e8a2a 100755 --- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala +++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala @@ -121,6 +121,10 @@ object ZooKeeperTestHarness { val (threads, noUnexpected) = TestUtils.computeUntilTrue(allThreads) { threads => threads.forall(t => unexpectedThreadNames.forall(s => !t.contains(s))) } - assertTrue(s"Found unexpected threads during $context, allThreads=$threads", noUnexpected) + assertTrue( + s"Found unexpected threads during $context, allThreads=$threads, " + + s"unexpected=${threads.filterNot(t => unexpectedThreadNames.forall(s => !t.contains(s)))}", + noUnexpected + ) } } diff --git a/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java b/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java index ce889170bbf..3070e36482f 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java +++ b/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java @@ -85,6 +85,7 @@ public class MockRestoreConsumer extends MockConsumer { super.assign(partitions); } + @Deprecated @Override public ConsumerRecords poll(long timeout) { // add buffered records to MockConsumer