mirror of https://github.com/apache/kafka.git
MINOR: Follow-up improvements for KIP-266 (#5084)
This patch contains a few follow-up improvements/cleanup for KIP-266: - Add upgrade notes - Add missing `commitSync(Duration)` API - Improve timeout messages and fix some naming inconsistencies - Various small cleanups Reviewers: John Roesler <john@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
This commit is contained in:
parent
08e8facdc9
commit
3683d475ed
|
@ -92,6 +92,11 @@ public interface Consumer<K, V> extends Closeable {
|
|||
*/
|
||||
void commitSync();
|
||||
|
||||
/**
|
||||
* @see KafkaConsumer#commitSync(Duration)
|
||||
*/
|
||||
void commitSync(Duration timeout);
|
||||
|
||||
/**
|
||||
* @see KafkaConsumer#commitSync(Map)
|
||||
*/
|
||||
|
@ -219,7 +224,7 @@ public interface Consumer<K, V> extends Closeable {
|
|||
/**
|
||||
* @see KafkaConsumer#endOffsets(Collection, Duration)
|
||||
*/
|
||||
Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions, Duration timeoutMs);
|
||||
Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions, Duration timeout);
|
||||
|
||||
/**
|
||||
* @see KafkaConsumer#close()
|
||||
|
|
|
@ -1103,7 +1103,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
|
|||
* partitions to consume from
|
||||
*
|
||||
*
|
||||
* @deprecated Since 2.0. Use {@link #poll(Duration)} to poll for records.
|
||||
* @deprecated Since 2.0. Use {@link #poll(Duration)}, which does not block beyond the timeout awaiting partition
|
||||
* assignment. See <a href="https://cwiki.apache.org/confluence/x/5kiHB">KIP-266</a> for more information.
|
||||
*/
|
||||
@Deprecated
|
||||
@Override
|
||||
|
@ -1119,6 +1120,11 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
|
|||
* consumed offset can be manually set through {@link #seek(TopicPartition, long)} or automatically set as the last committed
|
||||
* offset for the subscribed list of partitions
|
||||
*
|
||||
* <p>
|
||||
* This method returns immediately if there are records available. Otherwise, it will await the passed timeout.
|
||||
* If the timeout expires, an empty record set will be returned. Note that this method may block beyond the
|
||||
* timeout in order to execute custom {@link ConsumerRebalanceListener} callbacks.
|
||||
*
|
||||
*
|
||||
* @param timeout The maximum time to block (must not be greater than {@link Long#MAX_VALUE} milliseconds)
|
||||
*
|
||||
|
@ -1283,9 +1289,46 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
|
|||
*/
|
||||
@Override
|
||||
public void commitSync() {
|
||||
commitSync(Duration.ofMillis(Long.MAX_VALUE));
|
||||
}
|
||||
|
||||
/**
|
||||
* Commit offsets returned on the last {@link #poll(Duration) poll()} for all the subscribed list of topics and
|
||||
* partitions.
|
||||
* <p>
|
||||
* This commits offsets only to Kafka. The offsets committed using this API will be used on the first fetch after
|
||||
* every rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
|
||||
* should not be used.
|
||||
* <p>
|
||||
* This is a synchronous commits and will block until either the commit succeeds, an unrecoverable error is
|
||||
* encountered (in which case it is thrown to the caller), or the passed timeout expires.
|
||||
* <p>
|
||||
* Note that asynchronous offset commits sent previously with the {@link #commitAsync(OffsetCommitCallback)}
|
||||
* (or similar) are guaranteed to have their callbacks invoked prior to completion of this method.
|
||||
*
|
||||
* @throws org.apache.kafka.clients.consumer.CommitFailedException if the commit failed and cannot be retried.
|
||||
* This can only occur if you are using automatic group management with {@link #subscribe(Collection)},
|
||||
* or if there is an active group with the same groupId which is using group management.
|
||||
* @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
|
||||
* function is called
|
||||
* @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
|
||||
* this function is called
|
||||
* @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
|
||||
* @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the
|
||||
* configured groupId. See the exception for more details
|
||||
* @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. if offset metadata
|
||||
* is too large or if the topic does not exist).
|
||||
* @throws org.apache.kafka.common.errors.TimeoutException if the timeout expires before successful completion
|
||||
* of the offset commit
|
||||
*/
|
||||
@Override
|
||||
public void commitSync(Duration timeout) {
|
||||
acquireAndEnsureOpen();
|
||||
try {
|
||||
coordinator.commitOffsetsSync(subscriptions.allConsumed(), Long.MAX_VALUE);
|
||||
if (!coordinator.commitOffsetsSync(subscriptions.allConsumed(), timeout.toMillis())) {
|
||||
throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before successfully " +
|
||||
"committing the current consumed offsets");
|
||||
}
|
||||
} finally {
|
||||
release();
|
||||
}
|
||||
|
@ -1362,7 +1405,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
|
|||
acquireAndEnsureOpen();
|
||||
try {
|
||||
if (!coordinator.commitOffsetsSync(new HashMap<>(offsets), timeout.toMillis())) {
|
||||
throw new TimeoutException("Committing offsets synchronously took too long.");
|
||||
throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before successfully " +
|
||||
"committing offsets " + offsets);
|
||||
}
|
||||
} finally {
|
||||
release();
|
||||
|
@ -1584,7 +1628,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
|
|||
offset = this.subscriptions.position(partition);
|
||||
finishMs = time.milliseconds();
|
||||
}
|
||||
if (offset == null) throw new TimeoutException("request timed out, position is unable to be acquired.");
|
||||
if (offset == null)
|
||||
throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before the position " +
|
||||
"for partition " + partition + " could be determined");
|
||||
return offset;
|
||||
} finally {
|
||||
release();
|
||||
|
@ -1640,7 +1686,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
|
|||
Map<TopicPartition, OffsetAndMetadata> offsets = coordinator.fetchCommittedOffsets(
|
||||
Collections.singleton(partition), timeout.toMillis());
|
||||
if (offsets == null) {
|
||||
throw new TimeoutException("Unable to find committed offsets for partition within set duration.");
|
||||
throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before the last " +
|
||||
"committed offset for partition " + partition + " could be determined");
|
||||
}
|
||||
return offsets.get(partition);
|
||||
} finally {
|
||||
|
|
|
@ -251,6 +251,11 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
|
|||
commitSync(this.subscriptions.allConsumed());
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void commitSync(Duration timeout) {
|
||||
commitSync(this.subscriptions.allConsumed());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets, final Duration timeout) {
|
||||
commitSync(offsets);
|
||||
|
@ -508,7 +513,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions, Duration duration) {
|
||||
public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions, Duration timeout) {
|
||||
return endOffsets(partitions);
|
||||
}
|
||||
|
||||
|
|
|
@ -57,6 +57,7 @@ import java.util.HashMap;
|
|||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
@ -87,29 +88,25 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
|
|||
private MetadataSnapshot assignmentSnapshot;
|
||||
private long nextAutoCommitDeadline;
|
||||
|
||||
// hold onto request&future for commited offset requests to enable async calls.
|
||||
// hold onto request&future for committed offset requests to enable async calls.
|
||||
private PendingCommittedOffsetRequest pendingCommittedOffsetRequest = null;
|
||||
|
||||
private static class PendingCommittedOffsetRequest {
|
||||
private final Set<TopicPartition> request;
|
||||
private final Generation generation;
|
||||
private final Set<TopicPartition> requestedPartitions;
|
||||
private final Generation requestedGeneration;
|
||||
private final RequestFuture<Map<TopicPartition, OffsetAndMetadata>> response;
|
||||
|
||||
private PendingCommittedOffsetRequest(final Set<TopicPartition> request,
|
||||
private PendingCommittedOffsetRequest(final Set<TopicPartition> requestedPartitions,
|
||||
final Generation generationAtRequestTime,
|
||||
final RequestFuture<Map<TopicPartition, OffsetAndMetadata>> response
|
||||
) {
|
||||
if (request == null) throw new NullPointerException();
|
||||
if (response == null) throw new NullPointerException();
|
||||
|
||||
this.request = request;
|
||||
this.generation = generationAtRequestTime;
|
||||
this.response = response;
|
||||
final RequestFuture<Map<TopicPartition, OffsetAndMetadata>> response) {
|
||||
this.requestedPartitions = Objects.requireNonNull(requestedPartitions);
|
||||
this.response = Objects.requireNonNull(response);
|
||||
this.requestedGeneration = generationAtRequestTime;
|
||||
}
|
||||
|
||||
private boolean sameRequest(final Set<TopicPartition> currentRequest, final Generation currentGeneration) {
|
||||
return (generation == null ? currentGeneration == null : generation.equals(currentGeneration))
|
||||
&& request.equals(currentRequest);
|
||||
return (requestedGeneration == null ? currentGeneration == null : requestedGeneration.equals(currentGeneration))
|
||||
&& requestedPartitions.equals(currentRequest);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -303,6 +300,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
|
|||
*/
|
||||
public boolean poll(final long timeoutMs) {
|
||||
final long startTime = time.milliseconds();
|
||||
long currentTime = startTime;
|
||||
long elapsed = 0L;
|
||||
|
||||
invokeCompletedOffsetCommitCallbacks();
|
||||
|
@ -312,7 +310,9 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
|
|||
if (!ensureCoordinatorReady(remainingTimeAtLeastZero(timeoutMs, elapsed))) {
|
||||
return false;
|
||||
}
|
||||
elapsed = time.milliseconds() - startTime;
|
||||
currentTime = time.milliseconds();
|
||||
elapsed = currentTime - startTime;
|
||||
|
||||
}
|
||||
|
||||
if (rejoinNeededOrPending()) {
|
||||
|
@ -323,15 +323,18 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
|
|||
if (!client.ensureFreshMetadata(remainingTimeAtLeastZero(timeoutMs, elapsed))) {
|
||||
return false;
|
||||
}
|
||||
elapsed = time.milliseconds() - startTime;
|
||||
currentTime = time.milliseconds();
|
||||
elapsed = currentTime - startTime;
|
||||
}
|
||||
|
||||
if (!ensureActiveGroup(remainingTimeAtLeastZero(timeoutMs, elapsed))) {
|
||||
return false;
|
||||
}
|
||||
|
||||
currentTime = time.milliseconds();
|
||||
}
|
||||
|
||||
pollHeartbeat(startTime);
|
||||
pollHeartbeat(currentTime);
|
||||
} else {
|
||||
// For manually assigned partitions, if there are no ready nodes, await metadata.
|
||||
// If connections to all nodes fail, wakeups triggered while attempting to send fetch
|
||||
|
@ -345,10 +348,12 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
|
|||
if (!metadataUpdated && !client.hasReadyNodes(time.milliseconds())) {
|
||||
return false;
|
||||
}
|
||||
|
||||
currentTime = time.milliseconds();
|
||||
}
|
||||
}
|
||||
|
||||
maybeAutoCommitOffsetsAsync(startTime);
|
||||
maybeAutoCommitOffsetsAsync(currentTime);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
|
|
@ -60,7 +60,7 @@
|
|||
<li>Bumping the protocol version and restarting can be done any time after the brokers are upgraded. It does not have to be immediately after.
|
||||
Similarly for the message format version.</li>
|
||||
<li>If you are using Java8 method references in your Kafka Streams code you might need to update your code to resolve method ambiguties.
|
||||
Hot-swaping the jar-file only might not work.</li>
|
||||
Hot-swapping the jar-file only might not work.</li>
|
||||
</ol>
|
||||
|
||||
<h5><a id="upgrade_200_notable" href="#upgrade_200_notable">Notable changes in 2.0.0</a></h5>
|
||||
|
@ -91,6 +91,10 @@
|
|||
<code>internal.value.converter=org.apache.kafka.connect.json.JsonConverter</code>
|
||||
<code>internal.value.converter.schemas.enable=false</code>
|
||||
</li>
|
||||
<li><a href="https://cwiki.apache.org/confluence/x/5kiHB">KIP-266</a> adds overloads to the consumer to support
|
||||
timeout behavior for blocking APIs. In particular, a new <code>poll(Duration)</code> API has been added which
|
||||
does not block for dynamic partition assignment. The old <code>poll(long)</code> API has been deprecated and
|
||||
will be removed in a future version.</li>
|
||||
</ul>
|
||||
|
||||
<h5><a id="upgrade_200_new_protocols" href="#upgrade_200_new_protocols">New Protocol Versions</a></h5>
|
||||
|
|
Loading…
Reference in New Issue