diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 5b9712f3466..b2f944ad5d2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -1022,15 +1022,28 @@ public abstract class AbstractCoordinator implements Closeable { resetStateAndRejoin("consumer pro-actively leaving the group", true); } - public synchronized void requestRejoinIfNecessary(final String reason) { + public synchronized void requestRejoinIfNecessary(final String shortReason, + final String fullReason) { if (!this.rejoinNeeded) { - requestRejoin(reason); + requestRejoin(shortReason, fullReason); } } - public synchronized void requestRejoin(final String reason) { - log.info("Request joining group due to: {}", reason); - this.rejoinReason = reason; + public synchronized void requestRejoin(final String shortReason) { + requestRejoin(shortReason, shortReason); + } + + /** + * Request to rejoin the group. + * + * @param shortReason This is the reason passed up to the group coordinator. It must be + * reasonably small. + * @param fullReason This is the reason logged locally. + */ + public synchronized void requestRejoin(final String shortReason, + final String fullReason) { + log.info("Request joining group due to: {}", fullReason); + this.rejoinReason = shortReason; this.rejoinNeeded = true; } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 10939b2a0e4..51fa0b62ed1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -401,10 +401,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator { assignedPartitions.addAll(assignment.partitions()); if (!subscriptions.checkAssignmentMatchedSubscription(assignedPartitions)) { - final String reason = String.format("received assignment %s does not match the current subscription %s; " + + final String fullReason = String.format("received assignment %s does not match the current subscription %s; " + "it is likely that the subscription has changed since we joined the group, will re-join with current subscription", assignment.partitions(), subscriptions.prettyString()); - requestRejoin(reason); + requestRejoin("received assignment does not match the current subscription", fullReason); return; } @@ -437,9 +437,9 @@ public final class ConsumerCoordinator extends AbstractCoordinator { firstException.compareAndSet(null, invokePartitionsRevoked(revokedPartitions)); // If revoked any partitions, need to re-join the group afterwards - final String reason = String.format("need to revoke partitions %s as indicated " + + final String fullReason = String.format("need to revoke partitions %s as indicated " + "by the current assignment and re-join", revokedPartitions); - requestRejoin(reason); + requestRejoin("need to revoke partitions and re-join", fullReason); } } @@ -851,17 +851,17 @@ public final class ConsumerCoordinator extends AbstractCoordinator { // we need to rejoin if we performed the assignment and metadata has changed; // also for those owned-but-no-longer-existed partitions we should drop them as lost if (assignmentSnapshot != null && !assignmentSnapshot.matches(metadataSnapshot)) { - final String reason = String.format("cached metadata has changed from %s at the beginning of the rebalance to %s", + final String fullReason = String.format("cached metadata has changed from %s at the beginning of the rebalance to %s", assignmentSnapshot, metadataSnapshot); - requestRejoinIfNecessary(reason); + requestRejoinIfNecessary("cached metadata has changed", fullReason); return true; } // we need to join if our subscription has changed since the last join if (joinedSubscription != null && !joinedSubscription.equals(subscriptions.subscription())) { - final String reason = String.format("subscription has changed from %s at the beginning of the rebalance to %s", + final String fullReason = String.format("subscription has changed from %s at the beginning of the rebalance to %s", joinedSubscription, subscriptions.subscription()); - requestRejoinIfNecessary(reason); + requestRejoinIfNecessary("subscription has changed", fullReason); return true; }