KAFKA-13828; Ensure reasons sent by the consumer are small (#12043)

This PR reworks the reasons used in the ConsumerCoordinator to ensure that they remain reasonably short.

Reviewers: Bruno Cadonna <bruno@confluent.io>
This commit is contained in:
David Jacot 2022-04-13 13:42:27 +02:00 committed by GitHub
parent b9fc893546
commit 4eeb707107
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 26 additions and 13 deletions

View File

@ -1022,15 +1022,28 @@ public abstract class AbstractCoordinator implements Closeable {
resetStateAndRejoin("consumer pro-actively leaving the group", true); resetStateAndRejoin("consumer pro-actively leaving the group", true);
} }
public synchronized void requestRejoinIfNecessary(final String reason) { public synchronized void requestRejoinIfNecessary(final String shortReason,
final String fullReason) {
if (!this.rejoinNeeded) { if (!this.rejoinNeeded) {
requestRejoin(reason); requestRejoin(shortReason, fullReason);
} }
} }
public synchronized void requestRejoin(final String reason) { public synchronized void requestRejoin(final String shortReason) {
log.info("Request joining group due to: {}", reason); requestRejoin(shortReason, shortReason);
this.rejoinReason = reason; }
/**
* Request to rejoin the group.
*
* @param shortReason This is the reason passed up to the group coordinator. It must be
* reasonably small.
* @param fullReason This is the reason logged locally.
*/
public synchronized void requestRejoin(final String shortReason,
final String fullReason) {
log.info("Request joining group due to: {}", fullReason);
this.rejoinReason = shortReason;
this.rejoinNeeded = true; this.rejoinNeeded = true;
} }

View File

@ -401,10 +401,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
assignedPartitions.addAll(assignment.partitions()); assignedPartitions.addAll(assignment.partitions());
if (!subscriptions.checkAssignmentMatchedSubscription(assignedPartitions)) { if (!subscriptions.checkAssignmentMatchedSubscription(assignedPartitions)) {
final String reason = String.format("received assignment %s does not match the current subscription %s; " + final String fullReason = String.format("received assignment %s does not match the current subscription %s; " +
"it is likely that the subscription has changed since we joined the group, will re-join with current subscription", "it is likely that the subscription has changed since we joined the group, will re-join with current subscription",
assignment.partitions(), subscriptions.prettyString()); assignment.partitions(), subscriptions.prettyString());
requestRejoin(reason); requestRejoin("received assignment does not match the current subscription", fullReason);
return; return;
} }
@ -437,9 +437,9 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
firstException.compareAndSet(null, invokePartitionsRevoked(revokedPartitions)); firstException.compareAndSet(null, invokePartitionsRevoked(revokedPartitions));
// If revoked any partitions, need to re-join the group afterwards // If revoked any partitions, need to re-join the group afterwards
final String reason = String.format("need to revoke partitions %s as indicated " + final String fullReason = String.format("need to revoke partitions %s as indicated " +
"by the current assignment and re-join", revokedPartitions); "by the current assignment and re-join", revokedPartitions);
requestRejoin(reason); requestRejoin("need to revoke partitions and re-join", fullReason);
} }
} }
@ -851,17 +851,17 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
// we need to rejoin if we performed the assignment and metadata has changed; // we need to rejoin if we performed the assignment and metadata has changed;
// also for those owned-but-no-longer-existed partitions we should drop them as lost // also for those owned-but-no-longer-existed partitions we should drop them as lost
if (assignmentSnapshot != null && !assignmentSnapshot.matches(metadataSnapshot)) { if (assignmentSnapshot != null && !assignmentSnapshot.matches(metadataSnapshot)) {
final String reason = String.format("cached metadata has changed from %s at the beginning of the rebalance to %s", final String fullReason = String.format("cached metadata has changed from %s at the beginning of the rebalance to %s",
assignmentSnapshot, metadataSnapshot); assignmentSnapshot, metadataSnapshot);
requestRejoinIfNecessary(reason); requestRejoinIfNecessary("cached metadata has changed", fullReason);
return true; return true;
} }
// we need to join if our subscription has changed since the last join // we need to join if our subscription has changed since the last join
if (joinedSubscription != null && !joinedSubscription.equals(subscriptions.subscription())) { if (joinedSubscription != null && !joinedSubscription.equals(subscriptions.subscription())) {
final String reason = String.format("subscription has changed from %s at the beginning of the rebalance to %s", final String fullReason = String.format("subscription has changed from %s at the beginning of the rebalance to %s",
joinedSubscription, subscriptions.subscription()); joinedSubscription, subscriptions.subscription());
requestRejoinIfNecessary(reason); requestRejoinIfNecessary("subscription has changed", fullReason);
return true; return true;
} }