diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 35d8fa11d56..2d5fd1e3288 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -671,7 +671,10 @@ public class GroupMetadataManager { throw new GroupIdNotFoundException(String.format("Consumer group %s not found.", groupId)); } - if (group == null || (createIfNotExists && maybeDeleteEmptyClassicGroup(group, records))) { + if (group == null) { + return new ConsumerGroup(snapshotRegistry, groupId, metrics); + } else if (createIfNotExists && maybeDeleteEmptyClassicGroup(group, records)) { + log.info("[GroupId {}] Converted the empty classic group to a consumer group.", groupId); return new ConsumerGroup(snapshotRegistry, groupId, metrics); } else { if (group.type() == CONSUMER) { @@ -1037,6 +1040,8 @@ public class GroupMetadataManager { if (joiningMember == null) { prepareRebalance(classicGroup, String.format("Downgrade group %s from consumer to classic.", classicGroup.groupId())); } + + log.info("[GroupId {}] Converted the consumer group to a classic group.", consumerGroup.groupId()); } /** @@ -1117,6 +1122,8 @@ public class GroupMetadataManager { scheduleConsumerGroupSessionTimeout(consumerGroup.groupId(), memberId, member.classicProtocolSessionTimeout().get()) ); + log.info("[GroupId {}] Converted the classic group to a consumer group.", classicGroup.groupId()); + return consumerGroup; } @@ -4379,7 +4386,9 @@ public class GroupMetadataManager { // Group is created if it does not exist and the member id is UNKNOWN. if member // is specified but group does not exist, request is rejected with GROUP_ID_NOT_FOUND ClassicGroup group; - maybeDeleteEmptyConsumerGroup(groupId, records); + if (maybeDeleteEmptyConsumerGroup(groupId, records)) { + log.info("[GroupId {}] Converted the empty consumer group to a classic group.", groupId); + } boolean isNewGroup = !groups.containsKey(groupId); try { group = getOrMaybeCreateClassicGroup(groupId, isUnknownMember); @@ -6324,7 +6333,7 @@ public class GroupMetadataManager { * @param group The group to be deleted. * @param records The list of records to delete the group. * - * @return true if the group is empty + * @return true if the group is an empty classic group. */ private boolean maybeDeleteEmptyClassicGroup(Group group, List records) { if (isEmptyClassicGroup(group)) { @@ -6341,15 +6350,19 @@ public class GroupMetadataManager { * * @param groupId The group id to be deleted. * @param records The list of records to delete the group. + * + * @return true if the group is an empty consumer group. */ - private void maybeDeleteEmptyConsumerGroup(String groupId, List records) { + private boolean maybeDeleteEmptyConsumerGroup(String groupId, List records) { Group group = groups.get(groupId, Long.MAX_VALUE); if (isEmptyConsumerGroup(group)) { // Add tombstones for the previous consumer group. The tombstones won't actually be // replayed because its coordinator result has a non-null appendFuture. createGroupTombstoneRecords(group, records); removeGroup(groupId); + return true; } + return false; } /**