, DeleteShareGroupsOptions)} call.
+ * Options for the {@link Admin#deleteShareGroups(Collection, DeleteShareGroupsOptions)} call.
*
* The API of this class is evolving, see {@link Admin} for details.
*/
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteShareGroupsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteShareGroupsResult.java
index c2791e681f7..ff53da08df8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteShareGroupsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteShareGroupsResult.java
@@ -25,7 +25,7 @@ import java.util.HashMap;
import java.util.Map;
/**
- * The result of the {@link Admin#deleteShareGroups(Collection , DeleteShareGroupsOptions)} call.
+ * The result of the {@link Admin#deleteShareGroups(Collection, DeleteShareGroupsOptions)} call.
*
* The API of this class is evolving, see {@link Admin} for details.
*/
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteStreamsGroupsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteStreamsGroupsOptions.java
index 6cd14797122..6ca2ec66a27 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteStreamsGroupsOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteStreamsGroupsOptions.java
@@ -21,7 +21,7 @@ import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.Collection;
/**
- * Options for the {@link Admin#deleteStreamsGroups(Collection, DeleteStreamsGroupsOptions)} call.
+ * Options for the {@link Admin#deleteStreamsGroups(Collection, DeleteStreamsGroupsOptions)} call.
*
* The API of this class is evolving, see {@link Admin} for details.
*/
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListStreamsGroupOffsetsSpec.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListStreamsGroupOffsetsSpec.java
index ce2d1552a53..4f5380f7491 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ListStreamsGroupOffsetsSpec.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListStreamsGroupOffsetsSpec.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.Collection;
+import java.util.Map;
/**
* Specification of streams group offsets to list using {@link Admin#listStreamsGroupOffsets(Map, ListStreamsGroupOffsetsOptions)}.
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/RecordsToDelete.java b/clients/src/main/java/org/apache/kafka/clients/admin/RecordsToDelete.java
index d3da26b03bb..57421e3568b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/RecordsToDelete.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/RecordsToDelete.java
@@ -33,14 +33,16 @@ public class RecordsToDelete {
/**
* Delete all the records before the given {@code offset}
*
- * @param offset the offset before which all records will be deleted
+ * @param offset The offset before which all records will be deleted.
+ * Use {@code -1} to truncate to the high watermark.
*/
public static RecordsToDelete beforeOffset(long offset) {
return new RecordsToDelete(offset);
}
/**
- * The offset before which all records will be deleted
+ * The offset before which all records will be deleted.
+ * Use {@code -1} to truncate to the high watermark.
*/
public long beforeOffset() {
return offset;
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteShareGroupOffsetsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteShareGroupOffsetsHandler.java
index 9f5742937c8..9f0ab59e32c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteShareGroupOffsetsHandler.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteShareGroupOffsetsHandler.java
@@ -122,8 +122,8 @@ public class DeleteShareGroupOffsetsHandler extends AdminApiHandler.Batched handleUnsupportedVersionException(
int brokerId, UnsupportedVersionException exception, Set keys
) {
- log.warn("Broker " + brokerId + " does not support MAX_TIMESTAMP offset specs");
+ log.warn("Broker {} does not support MAX_TIMESTAMP offset specs", brokerId);
Map maxTimestampPartitions = new HashMap<>();
for (TopicPartition topicPartition : keys) {
Long offsetTimestamp = offsetTimestampsByPartition.get(topicPartition);
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java
index c49b2c8045a..23e045b7600 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java
@@ -50,7 +50,7 @@ import java.util.Collection;
* Under normal conditions, if a partition is reassigned from one consumer to another, then the old consumer will
* always invoke {@link #onPartitionsRevoked(Collection) onPartitionsRevoked} for that partition prior to the new consumer
* invoking {@link #onPartitionsAssigned(Collection) onPartitionsAssigned} for the same partition. So if offsets or other state is saved in the
- * {@link #onPartitionsRevoked(Collection) onPartitionsRevoked} call by one consumer member, it will be always accessible by the time the
+ * {@link #onPartitionsRevoked(Collection) onPartitionsRevoked} call by one consumer member, it will always be accessible by the time the
* other consumer member taking over that partition and triggering its {@link #onPartitionsAssigned(Collection) onPartitionsAssigned} callback to load the state.
*
* You can think of revocation as a graceful way to give up ownership of a partition. In some cases, the consumer may not have an opportunity to do so.
@@ -120,13 +120,31 @@ public interface ConsumerRebalanceListener {
/**
* A callback method the user can implement to provide handling of offset commits to a customized store.
* This method will be called during a rebalance operation when the consumer has to give up some partitions.
- * It can also be called when consumer is being closed ({@link KafkaConsumer#close(CloseOptions option)})
- * or is unsubscribing ({@link KafkaConsumer#unsubscribe()}).
+ * The consumer may need to give up some partitions (thus this callback executed) under the following scenarios:
+ *
+ * - If the consumer assignment changes
+ * - If the consumer is being closed ({@link KafkaConsumer#close(CloseOptions option)})
+ * - If the consumer is unsubscribing ({@link KafkaConsumer#unsubscribe()})
+ *
* It is recommended that offsets should be committed in this callback to either Kafka or a
* custom offset store to prevent duplicate data.
*
- * In eager rebalancing, it will always be called at the start of a rebalance and after the consumer stops fetching data.
- * In cooperative rebalancing, it will be called at the end of a rebalance on the set of partitions being revoked iff the set is non-empty.
+ * This callback is always called before re-assigning the partitions.
+ * If the consumer is using the {@link GroupProtocol#CLASSIC} rebalance protocol:
+ *
+ * -
+ * In eager rebalancing, onPartitionsRevoked will be called with the full set of assigned partitions as a parameter (all partitions are revoked).
+ * It will be called even if there are no partitions to revoke.
+ *
+ * -
+ * In cooperative rebalancing, onPartitionsRevoked will be called with the set of partitions to revoke,
+ * iff the set is non-empty.
+ *
+ *
+ * If the consumer is using the {@link GroupProtocol#CONSUMER} rebalance protocol, this callback will be called
+ * with the set of partitions to revoke iff the set is non-empty
+ * (same behavior as the {@link GroupProtocol#CLASSIC} rebalance protocol with Cooperative mode).
+ *
* For examples on usage of this API, see Usage Examples section of {@link KafkaConsumer KafkaConsumer}.
*
* It is common for the revocation callback to use the consumer instance in order to commit offsets. It is possible
@@ -135,8 +153,9 @@ public interface ConsumerRebalanceListener {
* invocation of {@link KafkaConsumer#poll(java.time.Duration)} in which this callback is being executed. This means it is not
* necessary to catch these exceptions and re-attempt to wakeup or interrupt the consumer thread.
*
- * @param partitions The list of partitions that were assigned to the consumer and now need to be revoked (may not
- * include all currently assigned partitions, i.e. there may still be some partitions left)
+ * @param partitions The list of partitions that were assigned to the consumer and now need to be revoked. This will
+ * include the full assignment under the Classic/Eager protocol, given that it revokes all partitions.
+ * It will only include the subset to revoke under the Classic/Cooperative and Consumer protocols.
* @throws org.apache.kafka.common.errors.WakeupException If raised from a nested call to {@link KafkaConsumer}
* @throws org.apache.kafka.common.errors.InterruptException If raised from a nested call to {@link KafkaConsumer}
*/
@@ -144,12 +163,13 @@ public interface ConsumerRebalanceListener {
/**
* A callback method the user can implement to provide handling of customized offsets on completion of a successful
- * partition re-assignment. This method will be called after the partition re-assignment completes and before the
- * consumer starts fetching data, and only as the result of a {@link Consumer#poll(java.time.Duration) poll(long)} call.
+ * partition re-assignment. This method will be called after the partition re-assignment completes (even if no new
+ * partitions were assigned to the consumer), and before the consumer starts fetching data,
+ * and only as the result of a {@link Consumer#poll(java.time.Duration) poll(long)} call.
*
* It is guaranteed that under normal conditions all the processes in a consumer group will execute their
- * {@link #onPartitionsRevoked(Collection)} callback before any instance executes its
- * {@link #onPartitionsAssigned(Collection)} callback. During exceptional scenarios, partitions may be migrated
+ * {@link #onPartitionsRevoked(Collection)} callback before any instance executes this onPartitionsAssigned callback.
+ * During exceptional scenarios, partitions may be migrated
* without the old owner being notified (i.e. their {@link #onPartitionsRevoked(Collection)} callback not triggered),
* and later when the old owner consumer realized this event, the {@link #onPartitionsLost(Collection)} callback
* will be triggered by the consumer then.
@@ -160,9 +180,11 @@ public interface ConsumerRebalanceListener {
* invocation of {@link KafkaConsumer#poll(java.time.Duration)} in which this callback is being executed. This means it is not
* necessary to catch these exceptions and re-attempt to wakeup or interrupt the consumer thread.
*
- * @param partitions The list of partitions that are now assigned to the consumer (previously owned partitions will
- * NOT be included, i.e. this list will only include newly added partitions)
- * @throws org.apache.kafka.common.errors.WakeupException If raised from a nested call to {@link KafkaConsumer}
+ * @param partitions Partitions that have been added to the assignment as a result of the rebalance.
+ * Note that partitions that were already owned by this consumer and remain assigned are not
+ * included in this list under the Classic/Cooperative or Consumer protocols. THe full assignment
+ * will be received under the Classic/Eager protocol.
+ * @throws org.apache.kafka.common.errors.WakeupException If raised from a nested call to {@link KafkaConsumer}
* @throws org.apache.kafka.common.errors.InterruptException If raised from a nested call to {@link KafkaConsumer}
*/
void onPartitionsAssigned(Collection partitions);
@@ -187,10 +209,9 @@ public interface ConsumerRebalanceListener {
* necessary to catch these exceptions and re-attempt to wakeup or interrupt the consumer thread.
*
* @param partitions The list of partitions that were assigned to the consumer and now have been reassigned
- * to other consumers. With the current protocol this will always include all of the consumer's
- * previously assigned partitions, but this may change in future protocols (ie there would still
- * be some partitions left)
- * @throws org.apache.kafka.common.errors.WakeupException If raised from a nested call to {@link KafkaConsumer}
+ * to other consumers. With both, the Classic and Consumer protocols, this will always include
+ * all partitions that were previously assigned to the consumer.
+ * @throws org.apache.kafka.common.errors.WakeupException If raised from a nested call to {@link KafkaConsumer}
* @throws org.apache.kafka.common.errors.InterruptException If raised from a nested call to {@link KafkaConsumer}
*/
default void onPartitionsLost(Collection partitions) {
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index e74cf0414a8..9f1992d6568 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -661,7 +661,7 @@ public class KafkaConsumer implements Consumer {
* If the given list of topics is empty, it is treated the same as {@link #unsubscribe()}.
*
*
- * As part of group management, the consumer will keep track of the list of consumers that belong to a particular
+ * As part of group management, the group coordinator will keep track of the list of consumers that belong to a particular
* group and will trigger a rebalance operation if any one of the following events are triggered:
*
* - Number of partitions change for any of the subscribed topics
@@ -670,8 +670,11 @@ public class KafkaConsumer implements Consumer {
*
- A new member is added to the consumer group
*
*
- * When any of these events are triggered, the provided listener will be invoked first to indicate that
- * the consumer's assignment has been revoked, and then again when the new assignment has been received.
+ * When any of these events are triggered, the provided listener will be invoked in this way:
+ *
+ * - {@link ConsumerRebalanceListener#onPartitionsRevoked(Collection)} will be invoked with the partitions to revoke, before re-assigning those partitions to another consumer.
+ * - {@link ConsumerRebalanceListener#onPartitionsAssigned(Collection)} will be invoked when the rebalance completes (even if no new partitions are assigned to the consumer)
+ *
* Note that rebalances will only occur during an active call to {@link #poll(Duration)}, so callbacks will
* also only be invoked during that time.
*
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
index 74ecf7f9bb8..ffe01c089e7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
@@ -988,7 +988,7 @@ public abstract class AbstractMembershipManager impl
String reason = rejoinedWhileReconciliationInProgress ?
"the member has re-joined the group" :
"the member already transitioned out of the reconciling state into " + state;
- log.info("Interrupting reconciliation that is not relevant anymore because " + reason);
+ log.info("Interrupting reconciliation that is not relevant anymore because {}", reason);
markReconciliationCompleted();
}
return shouldAbort;
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
index 4ac1513ede5..c38b5859f5f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
@@ -965,8 +965,8 @@ public abstract class AbstractStickyAssignor extends AbstractPartitionAssignor {
super(partitionsPerTopic, rackInfo, currentAssignment);
this.subscriptions = subscriptions;
- topic2AllPotentialConsumers = new HashMap<>(partitionsPerTopic.keySet().size());
- consumer2AllPotentialTopics = new HashMap<>(subscriptions.keySet().size());
+ topic2AllPotentialConsumers = new HashMap<>(partitionsPerTopic.size());
+ consumer2AllPotentialTopics = new HashMap<>(subscriptions.size());
// initialize topic2AllPotentialConsumers and consumer2AllPotentialTopics
partitionsPerTopic.keySet().forEach(
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index c6bdffed9a6..9f8ee0e0550 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -187,25 +187,6 @@ public class AsyncKafkaConsumer implements ConsumerDelegate {
*/
private class BackgroundEventProcessor implements EventProcessor {
- private Optional streamsRebalanceListener = Optional.empty();
- private final Optional streamsRebalanceData;
-
- public BackgroundEventProcessor() {
- this.streamsRebalanceData = Optional.empty();
- }
-
- public BackgroundEventProcessor(final Optional streamsRebalanceData) {
- this.streamsRebalanceData = streamsRebalanceData;
- }
-
- private void setStreamsRebalanceListener(final StreamsRebalanceListener streamsRebalanceListener) {
- if (streamsRebalanceData.isEmpty()) {
- throw new IllegalStateException("Background event processor was not created to be used with Streams " +
- "rebalance protocol events");
- }
- this.streamsRebalanceListener = Optional.of(streamsRebalanceListener);
- }
-
@Override
public void process(final BackgroundEvent event) {
switch (event.type()) {
@@ -278,44 +259,26 @@ public class AsyncKafkaConsumer implements ConsumerDelegate {
private StreamsOnTasksRevokedCallbackCompletedEvent invokeOnTasksRevokedCallback(final Set activeTasksToRevoke,
final CompletableFuture future) {
- final Optional exceptionFromCallback = streamsRebalanceListener().onTasksRevoked(activeTasksToRevoke);
+ final Optional exceptionFromCallback = Optional.ofNullable(streamsRebalanceListenerInvoker().invokeTasksRevoked(activeTasksToRevoke));
final Optional error = exceptionFromCallback.map(e -> ConsumerUtils.maybeWrapAsKafkaException(e, "Task revocation callback throws an error"));
return new StreamsOnTasksRevokedCallbackCompletedEvent(future, error);
}
private StreamsOnTasksAssignedCallbackCompletedEvent invokeOnTasksAssignedCallback(final StreamsRebalanceData.Assignment assignment,
final CompletableFuture future) {
- final Optional error;
- final Optional exceptionFromCallback = streamsRebalanceListener().onTasksAssigned(assignment);
- if (exceptionFromCallback.isPresent()) {
- error = Optional.of(ConsumerUtils.maybeWrapAsKafkaException(exceptionFromCallback.get(), "Task assignment callback throws an error"));
- } else {
- error = Optional.empty();
- streamsRebalanceData().setReconciledAssignment(assignment);
- }
+ final Optional exceptionFromCallback = Optional.ofNullable(streamsRebalanceListenerInvoker().invokeTasksAssigned(assignment));
+ final Optional error = exceptionFromCallback.map(e -> ConsumerUtils.maybeWrapAsKafkaException(e, "Task assignment callback throws an error"));
return new StreamsOnTasksAssignedCallbackCompletedEvent(future, error);
}
private StreamsOnAllTasksLostCallbackCompletedEvent invokeOnAllTasksLostCallback(final CompletableFuture future) {
- final Optional error;
- final Optional exceptionFromCallback = streamsRebalanceListener().onAllTasksLost();
- if (exceptionFromCallback.isPresent()) {
- error = Optional.of(ConsumerUtils.maybeWrapAsKafkaException(exceptionFromCallback.get(), "All tasks lost callback throws an error"));
- } else {
- error = Optional.empty();
- streamsRebalanceData().setReconciledAssignment(StreamsRebalanceData.Assignment.EMPTY);
- }
+ final Optional exceptionFromCallback = Optional.ofNullable(streamsRebalanceListenerInvoker().invokeAllTasksLost());
+ final Optional error = exceptionFromCallback.map(e -> ConsumerUtils.maybeWrapAsKafkaException(e, "All tasks lost callback throws an error"));
return new StreamsOnAllTasksLostCallbackCompletedEvent(future, error);
}
- private StreamsRebalanceData streamsRebalanceData() {
- return streamsRebalanceData.orElseThrow(
- () -> new IllegalStateException("Background event processor was not created to be used with Streams " +
- "rebalance protocol events"));
- }
-
- private StreamsRebalanceListener streamsRebalanceListener() {
- return streamsRebalanceListener.orElseThrow(
+ private StreamsRebalanceListenerInvoker streamsRebalanceListenerInvoker() {
+ return streamsRebalanceListenerInvoker.orElseThrow(
() -> new IllegalStateException("Background event processor was not created to be used with Streams " +
"rebalance protocol events"));
}
@@ -365,6 +328,7 @@ public class AsyncKafkaConsumer implements ConsumerDelegate {
private final WakeupTrigger wakeupTrigger = new WakeupTrigger();
private final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker;
private final ConsumerRebalanceListenerInvoker rebalanceListenerInvoker;
+ private final Optional streamsRebalanceListenerInvoker;
// Last triggered async commit future. Used to wait until all previous async commits are completed.
// We only need to keep track of the last one, since they are guaranteed to complete in order.
private CompletableFuture