mirror of https://github.com/apache/kafka.git
KAFKA-18318: Add logs for online/offline migration indication (#18406)
Add some logs when offline/online happens. Reviewers: David Jacot <djacot@confluent.io>
This commit is contained in:
parent
65b93fa8d5
commit
9026b6c6c0
|
@ -671,7 +671,10 @@ public class GroupMetadataManager {
|
|||
throw new GroupIdNotFoundException(String.format("Consumer group %s not found.", groupId));
|
||||
}
|
||||
|
||||
if (group == null || (createIfNotExists && maybeDeleteEmptyClassicGroup(group, records))) {
|
||||
if (group == null) {
|
||||
return new ConsumerGroup(snapshotRegistry, groupId, metrics);
|
||||
} else if (createIfNotExists && maybeDeleteEmptyClassicGroup(group, records)) {
|
||||
log.info("[GroupId {}] Converted the empty classic group to a consumer group.", groupId);
|
||||
return new ConsumerGroup(snapshotRegistry, groupId, metrics);
|
||||
} else {
|
||||
if (group.type() == CONSUMER) {
|
||||
|
@ -1037,6 +1040,8 @@ public class GroupMetadataManager {
|
|||
if (joiningMember == null) {
|
||||
prepareRebalance(classicGroup, String.format("Downgrade group %s from consumer to classic.", classicGroup.groupId()));
|
||||
}
|
||||
|
||||
log.info("[GroupId {}] Converted the consumer group to a classic group.", consumerGroup.groupId());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1117,6 +1122,8 @@ public class GroupMetadataManager {
|
|||
scheduleConsumerGroupSessionTimeout(consumerGroup.groupId(), memberId, member.classicProtocolSessionTimeout().get())
|
||||
);
|
||||
|
||||
log.info("[GroupId {}] Converted the classic group to a consumer group.", classicGroup.groupId());
|
||||
|
||||
return consumerGroup;
|
||||
}
|
||||
|
||||
|
@ -4379,7 +4386,9 @@ public class GroupMetadataManager {
|
|||
// Group is created if it does not exist and the member id is UNKNOWN. if member
|
||||
// is specified but group does not exist, request is rejected with GROUP_ID_NOT_FOUND
|
||||
ClassicGroup group;
|
||||
maybeDeleteEmptyConsumerGroup(groupId, records);
|
||||
if (maybeDeleteEmptyConsumerGroup(groupId, records)) {
|
||||
log.info("[GroupId {}] Converted the empty consumer group to a classic group.", groupId);
|
||||
}
|
||||
boolean isNewGroup = !groups.containsKey(groupId);
|
||||
try {
|
||||
group = getOrMaybeCreateClassicGroup(groupId, isUnknownMember);
|
||||
|
@ -6324,7 +6333,7 @@ public class GroupMetadataManager {
|
|||
* @param group The group to be deleted.
|
||||
* @param records The list of records to delete the group.
|
||||
*
|
||||
* @return true if the group is empty
|
||||
* @return true if the group is an empty classic group.
|
||||
*/
|
||||
private boolean maybeDeleteEmptyClassicGroup(Group group, List<CoordinatorRecord> records) {
|
||||
if (isEmptyClassicGroup(group)) {
|
||||
|
@ -6341,15 +6350,19 @@ public class GroupMetadataManager {
|
|||
*
|
||||
* @param groupId The group id to be deleted.
|
||||
* @param records The list of records to delete the group.
|
||||
*
|
||||
* @return true if the group is an empty consumer group.
|
||||
*/
|
||||
private void maybeDeleteEmptyConsumerGroup(String groupId, List<CoordinatorRecord> records) {
|
||||
private boolean maybeDeleteEmptyConsumerGroup(String groupId, List<CoordinatorRecord> records) {
|
||||
Group group = groups.get(groupId, Long.MAX_VALUE);
|
||||
if (isEmptyConsumerGroup(group)) {
|
||||
// Add tombstones for the previous consumer group. The tombstones won't actually be
|
||||
// replayed because its coordinator result has a non-null appendFuture.
|
||||
createGroupTombstoneRecords(group, records);
|
||||
removeGroup(groupId);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
Loading…
Reference in New Issue