diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 6463d78d9ce..9afb8d7791d 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -794,18 +794,16 @@ public class GroupMetadataManager { * @return A boolean indicating whether it's valid to online downgrade the consumer group. */ private boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, String memberId) { - if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) { - log.info("Cannot downgrade consumer group {} to classic group because the online downgrade is disabled.", - consumerGroup.groupId()); - return false; - } else if (!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) { - log.debug("Cannot downgrade consumer group {} to classic group because not all its members use the classic protocol.", - consumerGroup.groupId()); + if (!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) { return false; } else if (consumerGroup.numMembers() <= 1) { log.debug("Skip downgrading the consumer group {} to classic group because it's empty.", consumerGroup.groupId()); return false; + } else if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) { + log.info("Cannot downgrade consumer group {} to classic group because the online downgrade is disabled.", + consumerGroup.groupId()); + return false; } else if (consumerGroup.numMembers() - 1 > classicGroupMaxSize) { log.info("Cannot downgrade consumer group {} to classic group because its group size is greater than classic group max size.", consumerGroup.groupId()); @@ -927,8 +925,8 @@ public class GroupMetadataManager { // Create the session timeouts for the new members. If the conversion fails, the group will remain a // classic group, thus these timers will fail the group type check and do nothing. - consumerGroup.members().forEach((memberId, __) -> - scheduleConsumerGroupSessionTimeout(consumerGroup.groupId(), memberId) + consumerGroup.members().forEach((memberId, member) -> + scheduleConsumerGroupSessionTimeout(consumerGroup.groupId(), memberId, member.classicProtocolSessionTimeout().get()) ); return consumerGroup; diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index ad68c96aaab..248f5507a06 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -9630,7 +9630,7 @@ public class GroupMetadataManagerTest { assertRecordsEquals(expectedRecords, result.records()); - context.assertSessionTimeout(groupId, memberId1, 45000); + context.assertSessionTimeout(groupId, memberId1, expectedMember1.classicProtocolSessionTimeout().get()); context.assertSessionTimeout(groupId, memberId2, 45000); // Simulate a failed replay. The context is rolled back and the group is converted back to the classic group. @@ -9876,8 +9876,8 @@ public class GroupMetadataManagerTest { assertTrue(joinResult.joinFuture.isDone()); assertEquals(Errors.REBALANCE_IN_PROGRESS.code(), joinResult.joinFuture.get().errorCode()); - context.assertSessionTimeout(groupId, memberId1, 45000); - context.assertSessionTimeout(groupId, memberId2, 45000); + context.assertSessionTimeout(groupId, memberId1, expectedMember1.classicProtocolSessionTimeout().get()); + context.assertSessionTimeout(groupId, memberId2, expectedMember2.classicProtocolSessionTimeout().get()); context.assertSessionTimeout(groupId, memberId3, 45000); // Simulate a failed replay. The context is rolled back and the group is converted back to the classic group. @@ -10141,8 +10141,8 @@ public class GroupMetadataManagerTest { assertTrue(syncResult.syncFuture.isDone()); assertEquals(Errors.REBALANCE_IN_PROGRESS.code(), syncResult.syncFuture.get().errorCode()); - context.assertSessionTimeout(groupId, memberId1, 45000); - context.assertSessionTimeout(groupId, memberId2, 45000); + context.assertSessionTimeout(groupId, memberId1, expectedMember1.classicProtocolSessionTimeout().get()); + context.assertSessionTimeout(groupId, memberId2, expectedMember2.classicProtocolSessionTimeout().get()); context.assertSessionTimeout(groupId, memberId3, 45000); // Simulate a failed replay. The context is rolled back and the group is converted back to the classic group.