mirror of https://github.com/apache/kafka.git
MINOR: Add type check to classic group timeout operations (#15587)
When implementing the group type conversion from a classic group to a consumer group, if the replay of conversion records fails, the group should be reverted back including its timeouts. A possible solution is to keep all the classic group timeouts and add a type check to the timeout operations. If the group is successfully upgraded, it won't be able to pass the type check and its operations will be executed without actually doing anything; if the group upgrade fails, the group map will be reverted and the timeout operations will be executed as is. We've already have group type check in consumer group timeout operations. This patch adds similar type check to those classic group timeout operations. Reviewers: David Jacot <djacot@confluent.io>
This commit is contained in:
parent
5c855be015
commit
9bc48af1c1
|
@ -2286,7 +2286,7 @@ public class GroupMetadataManager {
|
|||
request.sessionTimeoutMs(),
|
||||
TimeUnit.MILLISECONDS,
|
||||
false,
|
||||
() -> expireClassicGroupMemberHeartbeat(group, newMemberId)
|
||||
() -> expireClassicGroupMemberHeartbeat(group.groupId(), newMemberId)
|
||||
);
|
||||
|
||||
responseFuture.complete(new JoinGroupResponseData()
|
||||
|
@ -2457,6 +2457,22 @@ public class GroupMetadataManager {
|
|||
return EMPTY_RESULT;
|
||||
}
|
||||
|
||||
/**
|
||||
* An overload of {@link GroupMetadataManager#completeClassicGroupJoin(ClassicGroup)} used as
|
||||
* timeout operation. It additionally looks up the group by the id and checks the group type.
|
||||
* completeClassicGroupJoin will only be called if the group is CLASSIC.
|
||||
*/
|
||||
private CoordinatorResult<Void, Record> completeClassicGroupJoin(String groupId) {
|
||||
ClassicGroup group;
|
||||
try {
|
||||
group = getOrMaybeCreateClassicGroup(groupId, false);
|
||||
} catch (UnknownMemberIdException | GroupIdNotFoundException exception) {
|
||||
log.debug("Cannot find the group, skipping rebalance stage.", exception);
|
||||
return EMPTY_RESULT;
|
||||
}
|
||||
return completeClassicGroupJoin(group);
|
||||
}
|
||||
|
||||
/**
|
||||
* Complete the join group phase. Remove all dynamic members that have not rejoined
|
||||
* during this stage and proceed with the next generation for this group. The generation id
|
||||
|
@ -2504,7 +2520,7 @@ public class GroupMetadataManager {
|
|||
group.rebalanceTimeoutMs(),
|
||||
TimeUnit.MILLISECONDS,
|
||||
false,
|
||||
() -> completeClassicGroupJoin(group)
|
||||
() -> completeClassicGroupJoin(group.groupId())
|
||||
);
|
||||
|
||||
return EMPTY_RESULT;
|
||||
|
@ -2575,22 +2591,31 @@ public class GroupMetadataManager {
|
|||
group.rebalanceTimeoutMs(),
|
||||
TimeUnit.MILLISECONDS,
|
||||
false,
|
||||
() -> expirePendingSync(group, group.generationId()));
|
||||
() -> expirePendingSync(group.groupId(), group.generationId()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Invoked when the heartbeat operation is expired from the timer. Possibly remove the member and
|
||||
* try complete the join phase.
|
||||
*
|
||||
* @param group The group.
|
||||
* @param groupId The group id.
|
||||
* @param memberId The member id.
|
||||
*
|
||||
* @return The coordinator result that will be appended to the log.
|
||||
*/
|
||||
private CoordinatorResult<Void, Record> expireClassicGroupMemberHeartbeat(
|
||||
ClassicGroup group,
|
||||
String groupId,
|
||||
String memberId
|
||||
) {
|
||||
ClassicGroup group;
|
||||
try {
|
||||
group = getOrMaybeCreateClassicGroup(groupId, false);
|
||||
} catch (UnknownMemberIdException | GroupIdNotFoundException exception) {
|
||||
log.debug("Received notification of heartbeat expiration for member {} after group {} " +
|
||||
"had already been deleted or upgraded.", memberId, groupId);
|
||||
return EMPTY_RESULT;
|
||||
}
|
||||
|
||||
if (group.isInState(DEAD)) {
|
||||
log.info("Received notification of heartbeat expiration for member {} after group {} " +
|
||||
"had already been unloaded or deleted.",
|
||||
|
@ -2805,7 +2830,7 @@ public class GroupMetadataManager {
|
|||
delayMs,
|
||||
TimeUnit.MILLISECONDS,
|
||||
false,
|
||||
() -> tryCompleteInitialRebalanceElseSchedule(group, delayMs, remainingMs)
|
||||
() -> tryCompleteInitialRebalanceElseSchedule(group.groupId(), delayMs, remainingMs)
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -2837,7 +2862,7 @@ public class GroupMetadataManager {
|
|||
group.rebalanceTimeoutMs(),
|
||||
TimeUnit.MILLISECONDS,
|
||||
false,
|
||||
() -> completeClassicGroupJoin(group)
|
||||
() -> completeClassicGroupJoin(group.groupId())
|
||||
);
|
||||
return EMPTY_RESULT;
|
||||
}
|
||||
|
@ -2847,15 +2872,23 @@ public class GroupMetadataManager {
|
|||
* Try to complete the join phase of the initial rebalance.
|
||||
* Otherwise, extend the rebalance.
|
||||
*
|
||||
* @param group The group under initial rebalance.
|
||||
* @param groupId The group under initial rebalance.
|
||||
*
|
||||
* @return The coordinator result that will be appended to the log.
|
||||
*/
|
||||
private CoordinatorResult<Void, Record> tryCompleteInitialRebalanceElseSchedule(
|
||||
ClassicGroup group,
|
||||
String groupId,
|
||||
int delayMs,
|
||||
int remainingMs
|
||||
) {
|
||||
ClassicGroup group;
|
||||
try {
|
||||
group = getOrMaybeCreateClassicGroup(groupId, false);
|
||||
} catch (UnknownMemberIdException | GroupIdNotFoundException exception) {
|
||||
log.debug("Cannot find the group, skipping the initial rebalance stage.", exception);
|
||||
return EMPTY_RESULT;
|
||||
}
|
||||
|
||||
if (group.newMemberAdded() && remainingMs != 0) {
|
||||
// A new member was added. Extend the delay.
|
||||
group.setNewMemberAdded(false);
|
||||
|
@ -2867,7 +2900,7 @@ public class GroupMetadataManager {
|
|||
newDelayMs,
|
||||
TimeUnit.MILLISECONDS,
|
||||
false,
|
||||
() -> tryCompleteInitialRebalanceElseSchedule(group, newDelayMs, newRemainingMs)
|
||||
() -> tryCompleteInitialRebalanceElseSchedule(group.groupId(), newDelayMs, newRemainingMs)
|
||||
);
|
||||
} else {
|
||||
// No more time remaining. Complete the join phase.
|
||||
|
@ -2979,7 +3012,7 @@ public class GroupMetadataManager {
|
|||
timeoutMs,
|
||||
TimeUnit.MILLISECONDS,
|
||||
false,
|
||||
() -> expireClassicGroupMemberHeartbeat(group, member.memberId()));
|
||||
() -> expireClassicGroupMemberHeartbeat(group.groupId(), member.memberId()));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -2996,15 +3029,23 @@ public class GroupMetadataManager {
|
|||
/**
|
||||
* Expire pending sync.
|
||||
*
|
||||
* @param group The group.
|
||||
* @param groupId The group id.
|
||||
* @param generationId The generation when the pending sync was originally scheduled.
|
||||
*
|
||||
* @return The coordinator result that will be appended to the log.
|
||||
* */
|
||||
private CoordinatorResult<Void, Record> expirePendingSync(
|
||||
ClassicGroup group,
|
||||
String groupId,
|
||||
int generationId
|
||||
) {
|
||||
ClassicGroup group;
|
||||
try {
|
||||
group = getOrMaybeCreateClassicGroup(groupId, false);
|
||||
} catch (UnknownMemberIdException | GroupIdNotFoundException exception) {
|
||||
log.debug("Received notification of sync expiration for an unknown classic group {}.", groupId);
|
||||
return EMPTY_RESULT;
|
||||
}
|
||||
|
||||
if (generationId != group.generationId()) {
|
||||
log.error("Received unexpected notification of sync expiration for {} with an old " +
|
||||
"generation {} while the group has {}.", group.groupId(), generationId, group.generationId());
|
||||
|
@ -3027,7 +3068,6 @@ public class GroupMetadataManager {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
return EMPTY_RESULT;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue