mirror of https://github.com/apache/kafka.git
MINOR: consumer log fixes (#16345)
Reviewers: Kirk True <kirk@kirktrue.pro>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
2fd00ce536
commit
8199290500
|
@ -87,6 +87,12 @@ public class CommitRequestManager implements RequestManager, MemberStateListener
|
|||
final PendingRequests pendingRequests;
|
||||
private boolean closing = false;
|
||||
|
||||
/**
|
||||
* Last member epoch sent in a commit request. Empty if no epoch was included in the last
|
||||
* request. Used for logging.
|
||||
*/
|
||||
private Optional<Integer> lastEpochSentOnCommit;
|
||||
|
||||
/**
|
||||
* Latest member ID and epoch received via the {@link #onMemberEpochUpdated(Optional, Optional)},
|
||||
* to be included in the OffsetFetch and OffsetCommit requests if present. This will have
|
||||
|
@ -156,6 +162,7 @@ public class CommitRequestManager implements RequestManager, MemberStateListener
|
|||
this.memberInfo = new MemberInfo();
|
||||
this.metricsManager = new OffsetCommitMetricsManager(metrics);
|
||||
this.offsetCommitCallbackInvoker = offsetCommitCallbackInvoker;
|
||||
this.lastEpochSentOnCommit = Optional.empty();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -330,7 +337,10 @@ public class CommitRequestManager implements RequestManager, MemberStateListener
|
|||
log.debug("Auto-commit sync before revocation failed because topic or partition were deleted");
|
||||
result.completeExceptionally(error);
|
||||
} else {
|
||||
// Make sure the auto-commit is retries with the latest offsets
|
||||
// Make sure the auto-commit is retried with the latest offsets
|
||||
log.debug("Member {} will retry auto-commit of latest offsets after receiving retriable error {}",
|
||||
memberInfo.memberId.orElse("undefined"),
|
||||
error.getMessage());
|
||||
requestAttempt.offsets = subscriptions.allConsumed();
|
||||
requestAttempt.resetFuture();
|
||||
autoCommitSyncBeforeRevocationWithRetries(requestAttempt, result);
|
||||
|
@ -568,6 +578,10 @@ public class CommitRequestManager implements RequestManager, MemberStateListener
|
|||
*/
|
||||
@Override
|
||||
public void onMemberEpochUpdated(Optional<Integer> memberEpoch, Optional<String> memberId) {
|
||||
if (!memberEpoch.isPresent() && memberInfo.memberEpoch.isPresent()) {
|
||||
log.info("Member {} won't include member id and epoch in following offset " +
|
||||
"commit/fetch requests because it has left the group.", memberInfo.memberId.orElse("unknown"));
|
||||
}
|
||||
memberInfo.memberId = memberId;
|
||||
memberInfo.memberEpoch = memberEpoch;
|
||||
}
|
||||
|
@ -680,6 +694,9 @@ public class CommitRequestManager implements RequestManager, MemberStateListener
|
|||
}
|
||||
if (memberInfo.memberEpoch.isPresent()) {
|
||||
data = data.setGenerationIdOrMemberEpoch(memberInfo.memberEpoch.get());
|
||||
lastEpochSentOnCommit = memberInfo.memberEpoch;
|
||||
} else {
|
||||
lastEpochSentOnCommit = Optional.empty();
|
||||
}
|
||||
|
||||
OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(data);
|
||||
|
@ -741,6 +758,9 @@ public class CommitRequestManager implements RequestManager, MemberStateListener
|
|||
"failed with unknown member ID. " + error.message()));
|
||||
return;
|
||||
} else if (error == Errors.STALE_MEMBER_EPOCH) {
|
||||
log.error("OffsetCommit failed for member {} with stale member epoch error. Last epoch sent: {}",
|
||||
memberInfo.memberId.orElse("undefined"),
|
||||
lastEpochSentOnCommit.isPresent() ? lastEpochSentOnCommit.get() : "undefined");
|
||||
future.completeExceptionally(error.exception());
|
||||
return;
|
||||
} else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
|
||||
|
@ -785,6 +805,11 @@ public class CommitRequestManager implements RequestManager, MemberStateListener
|
|||
}
|
||||
}
|
||||
|
||||
// Visible for testing
|
||||
Optional<Integer> lastEpochSentOnCommit() {
|
||||
return lastEpochSentOnCommit;
|
||||
}
|
||||
|
||||
/**
|
||||
* Represents a request that can be retried or aborted, based on member ID and epoch
|
||||
* information.
|
||||
|
|
|
@ -362,7 +362,7 @@ public class MembershipManagerImpl implements MembershipManager {
|
|||
metricsManager.recordRebalanceStarted(time.milliseconds());
|
||||
}
|
||||
|
||||
log.trace("Member {} with epoch {} transitioned from {} to {}.", memberId, memberEpoch, state, nextState);
|
||||
log.info("Member {} with epoch {} transitioned from {} to {}.", memberId, memberEpoch, state, nextState);
|
||||
this.state = nextState;
|
||||
}
|
||||
|
||||
|
@ -679,11 +679,11 @@ public class MembershipManagerImpl implements MembershipManager {
|
|||
CompletableFuture<Void> callbackResult = invokeOnPartitionsRevokedOrLostToReleaseAssignment();
|
||||
callbackResult.whenComplete((result, error) -> {
|
||||
if (error != null) {
|
||||
log.error("Member {} callback to release assignment failed. Member will proceed " +
|
||||
"to send leave group heartbeat", memberId, error);
|
||||
log.error("Member {} callback to release assignment failed. It will proceed " +
|
||||
"to clear its assignment and send a leave group heartbeat", memberId, error);
|
||||
} else {
|
||||
log.debug("Member {} completed callback to release assignment and will send leave " +
|
||||
"group heartbeat", memberId);
|
||||
log.info("Member {} completed callback to release assignment. It will proceed " +
|
||||
"to clear its assignment and send a leave group heartbeat", memberId);
|
||||
}
|
||||
// Clear the subscription, no matter if the callback execution failed or succeeded.
|
||||
subscriptions.unsubscribe();
|
||||
|
@ -718,7 +718,7 @@ public class MembershipManagerImpl implements MembershipManager {
|
|||
SortedSet<TopicPartition> droppedPartitions = new TreeSet<>(TOPIC_PARTITION_COMPARATOR);
|
||||
droppedPartitions.addAll(subscriptions.assignedPartitions());
|
||||
|
||||
log.debug("Member {} is triggering callbacks to release assignment {} and leave group",
|
||||
log.info("Member {} is triggering callbacks to release assignment {} and leave group",
|
||||
memberId, droppedPartitions);
|
||||
|
||||
CompletableFuture<Void> callbackResult;
|
||||
|
@ -965,8 +965,8 @@ public class MembershipManagerImpl implements MembershipManager {
|
|||
"\tCurrent owned partitions: {}\n" +
|
||||
"\tAdded partitions (assigned - owned): {}\n" +
|
||||
"\tRevoked partitions (owned - assigned): {}\n",
|
||||
memberId,
|
||||
resolvedAssignment.localEpoch,
|
||||
memberId,
|
||||
assignedTopicPartitions,
|
||||
ownedPartitions,
|
||||
addedPartitions,
|
||||
|
|
|
@ -1107,6 +1107,44 @@ public class CommitRequestManagerTest {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLastEpochSentOnCommit() {
|
||||
// Enable auto-commit but with very long interval to avoid triggering auto-commits on the
|
||||
// interval and just test the auto-commits triggered before revocation
|
||||
CommitRequestManager commitRequestManager = create(true, Integer.MAX_VALUE);
|
||||
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
|
||||
|
||||
TopicPartition tp = new TopicPartition("topic", 1);
|
||||
subscriptionState.assignFromUser(singleton(tp));
|
||||
subscriptionState.seek(tp, 100);
|
||||
|
||||
// Send auto commit to revoke partitions, expected to be retried on STALE_MEMBER_EPOCH
|
||||
// with the latest epochs received (using long deadline to avoid expiring the request
|
||||
// while retrying with the new epochs)
|
||||
commitRequestManager.maybeAutoCommitSyncBeforeRevocation(Long.MAX_VALUE);
|
||||
|
||||
int initialEpoch = 1;
|
||||
String memberId = "member1";
|
||||
commitRequestManager.onMemberEpochUpdated(Optional.of(initialEpoch), Optional.of(memberId));
|
||||
|
||||
// Send request with epoch 1
|
||||
completeOffsetCommitRequestWithError(commitRequestManager, Errors.STALE_MEMBER_EPOCH);
|
||||
assertEquals(initialEpoch, commitRequestManager.lastEpochSentOnCommit().orElse(null));
|
||||
|
||||
// Receive new epoch. Last epoch sent should change only when sending out the next request
|
||||
commitRequestManager.onMemberEpochUpdated(Optional.of(initialEpoch + 1), Optional.of(memberId));
|
||||
assertEquals(initialEpoch, commitRequestManager.lastEpochSentOnCommit().get());
|
||||
time.sleep(retryBackoffMs);
|
||||
completeOffsetCommitRequestWithError(commitRequestManager, Errors.STALE_MEMBER_EPOCH);
|
||||
assertEquals(initialEpoch + 1, commitRequestManager.lastEpochSentOnCommit().orElse(null));
|
||||
|
||||
// Receive empty epochs
|
||||
commitRequestManager.onMemberEpochUpdated(Optional.empty(), Optional.empty());
|
||||
time.sleep(retryBackoffMs * 2);
|
||||
completeOffsetCommitRequestWithError(commitRequestManager, Errors.STALE_MEMBER_EPOCH);
|
||||
assertFalse(commitRequestManager.lastEpochSentOnCommit().isPresent());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEnsureCommitSensorRecordsMetric() {
|
||||
CommitRequestManager commitRequestManager = create(true, 100);
|
||||
|
|
Loading…
Reference in New Issue