mirror of https://github.com/apache/kafka.git
MINOR: Make online downgrade failure logs less noisy and update the timeouts scheduled in `convertToConsumerGroup` (#16290)
This patch: - changes the order of the checks in `validateOnlineDowngrade`, so that only when the last member using the consumer protocol leave and the group still has classic member(s), `online downgrade is disabled` is logged if the policy doesn't allow downgrade. - changes the session timeout in `convertToConsumerGroup` from `consumerGroupSessionTimeoutMs` to `member.classicProtocolSessionTimeout().get()`. Reviewers: David Jacot <djacot@confluent.io>
This commit is contained in:
parent
ea60666513
commit
11c85a93c3
|
|
@ -794,18 +794,16 @@ public class GroupMetadataManager {
|
|||
* @return A boolean indicating whether it's valid to online downgrade the consumer group.
|
||||
*/
|
||||
private boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, String memberId) {
|
||||
if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) {
|
||||
log.info("Cannot downgrade consumer group {} to classic group because the online downgrade is disabled.",
|
||||
consumerGroup.groupId());
|
||||
return false;
|
||||
} else if (!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) {
|
||||
log.debug("Cannot downgrade consumer group {} to classic group because not all its members use the classic protocol.",
|
||||
consumerGroup.groupId());
|
||||
if (!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) {
|
||||
return false;
|
||||
} else if (consumerGroup.numMembers() <= 1) {
|
||||
log.debug("Skip downgrading the consumer group {} to classic group because it's empty.",
|
||||
consumerGroup.groupId());
|
||||
return false;
|
||||
} else if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) {
|
||||
log.info("Cannot downgrade consumer group {} to classic group because the online downgrade is disabled.",
|
||||
consumerGroup.groupId());
|
||||
return false;
|
||||
} else if (consumerGroup.numMembers() - 1 > classicGroupMaxSize) {
|
||||
log.info("Cannot downgrade consumer group {} to classic group because its group size is greater than classic group max size.",
|
||||
consumerGroup.groupId());
|
||||
|
|
@ -927,8 +925,8 @@ public class GroupMetadataManager {
|
|||
|
||||
// Create the session timeouts for the new members. If the conversion fails, the group will remain a
|
||||
// classic group, thus these timers will fail the group type check and do nothing.
|
||||
consumerGroup.members().forEach((memberId, __) ->
|
||||
scheduleConsumerGroupSessionTimeout(consumerGroup.groupId(), memberId)
|
||||
consumerGroup.members().forEach((memberId, member) ->
|
||||
scheduleConsumerGroupSessionTimeout(consumerGroup.groupId(), memberId, member.classicProtocolSessionTimeout().get())
|
||||
);
|
||||
|
||||
return consumerGroup;
|
||||
|
|
|
|||
|
|
@ -9630,7 +9630,7 @@ public class GroupMetadataManagerTest {
|
|||
|
||||
assertRecordsEquals(expectedRecords, result.records());
|
||||
|
||||
context.assertSessionTimeout(groupId, memberId1, 45000);
|
||||
context.assertSessionTimeout(groupId, memberId1, expectedMember1.classicProtocolSessionTimeout().get());
|
||||
context.assertSessionTimeout(groupId, memberId2, 45000);
|
||||
|
||||
// Simulate a failed replay. The context is rolled back and the group is converted back to the classic group.
|
||||
|
|
@ -9876,8 +9876,8 @@ public class GroupMetadataManagerTest {
|
|||
assertTrue(joinResult.joinFuture.isDone());
|
||||
assertEquals(Errors.REBALANCE_IN_PROGRESS.code(), joinResult.joinFuture.get().errorCode());
|
||||
|
||||
context.assertSessionTimeout(groupId, memberId1, 45000);
|
||||
context.assertSessionTimeout(groupId, memberId2, 45000);
|
||||
context.assertSessionTimeout(groupId, memberId1, expectedMember1.classicProtocolSessionTimeout().get());
|
||||
context.assertSessionTimeout(groupId, memberId2, expectedMember2.classicProtocolSessionTimeout().get());
|
||||
context.assertSessionTimeout(groupId, memberId3, 45000);
|
||||
|
||||
// Simulate a failed replay. The context is rolled back and the group is converted back to the classic group.
|
||||
|
|
@ -10141,8 +10141,8 @@ public class GroupMetadataManagerTest {
|
|||
assertTrue(syncResult.syncFuture.isDone());
|
||||
assertEquals(Errors.REBALANCE_IN_PROGRESS.code(), syncResult.syncFuture.get().errorCode());
|
||||
|
||||
context.assertSessionTimeout(groupId, memberId1, 45000);
|
||||
context.assertSessionTimeout(groupId, memberId2, 45000);
|
||||
context.assertSessionTimeout(groupId, memberId1, expectedMember1.classicProtocolSessionTimeout().get());
|
||||
context.assertSessionTimeout(groupId, memberId2, expectedMember2.classicProtocolSessionTimeout().get());
|
||||
context.assertSessionTimeout(groupId, memberId3, 45000);
|
||||
|
||||
// Simulate a failed replay. The context is rolled back and the group is converted back to the classic group.
|
||||
|
|
|
|||
Loading…
Reference in New Issue