MINOR: Add replayRecords to CoordinatorResult (#15818)

The patch adds a boolean attribute `replayRecords` that specifies whether the records should be replayed.

Reviewers: David Jacot <djacot@confluent.io>
This commit is contained in:
Dongnuo Lyu 2024-04-30 12:14:02 -04:00 committed by GitHub
parent 7c0a302c4d
commit 1e8415160f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 66 additions and 51 deletions

View File

@ -367,7 +367,7 @@ public class GroupMetadataManager {
* Package private for testing.
*/
static final CoordinatorResult<Void, Record> EMPTY_RESULT =
new CoordinatorResult<>(Collections.emptyList(), CompletableFuture.completedFuture(null));
new CoordinatorResult<>(Collections.emptyList(), CompletableFuture.completedFuture(null), false);
/**
* The maximum number of members allowed in a single classic group.
@ -810,10 +810,15 @@ public class GroupMetadataManager {
*
* @param consumerGroup The converted ConsumerGroup.
* @param leavingMemberId The leaving member that triggers the downgrade validation.
* @param records The list of Records.
* @return An appendFuture of the conversion.
* @param response The response of the returned CoordinatorResult.
* @return A CoordinatorResult.
*/
private CompletableFuture<Void> convertToClassicGroup(ConsumerGroup consumerGroup, String leavingMemberId, List<Record> records) {
private <T> CoordinatorResult<T, Record> convertToClassicGroup(
ConsumerGroup consumerGroup,
String leavingMemberId,
T response
) {
List<Record> records = new ArrayList<>();
consumerGroup.createGroupTombstoneRecords(records);
ClassicGroup classicGroup;
@ -851,7 +856,7 @@ public class GroupMetadataManager {
metrics.onClassicGroupStateTransition(classicGroup.currentState(), null);
return null;
});
return appendFuture;
return new CoordinatorResult<>(records, response, appendFuture, false);
}
/**
@ -1498,12 +1503,14 @@ public class GroupMetadataManager {
int memberEpoch
) throws ApiException {
ConsumerGroup group = consumerGroup(groupId);
List<Record> records = new ArrayList<>();
CompletableFuture<Void> appendFuture = null;
ConsumerGroupHeartbeatResponseData response = new ConsumerGroupHeartbeatResponseData()
.setMemberId(memberId)
.setMemberEpoch(memberEpoch);
if (instanceId == null) {
ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false);
log.info("[GroupId {}] Member {} left the consumer group.", groupId, memberId);
appendFuture = consumerGroupFenceMember(group, member, records);
return consumerGroupFenceMember(group, member, response);
} else {
ConsumerGroupMember member = group.staticMember(instanceId);
throwIfStaticMemberIsUnknown(member, instanceId);
@ -1511,21 +1518,13 @@ public class GroupMetadataManager {
if (memberEpoch == LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
log.info("[GroupId {}] Static Member {} with instance id {} temporarily left the consumer group.",
group.groupId(), memberId, instanceId);
records = consumerGroupStaticMemberGroupLeave(group, member);
return consumerGroupStaticMemberGroupLeave(group, member);
} else {
log.info("[GroupId {}] Static Member {} with instance id {} left the consumer group.",
group.groupId(), memberId, instanceId);
appendFuture = consumerGroupFenceMember(group, member, records);
return consumerGroupFenceMember(group, member, response);
}
}
return new CoordinatorResult<>(
records,
new ConsumerGroupHeartbeatResponseData()
.setMemberId(memberId)
.setMemberEpoch(memberEpoch),
appendFuture
);
}
/**
@ -1538,9 +1537,9 @@ public class GroupMetadataManager {
* @param group The group.
* @param member The static member in the group for the instance id.
*
* @return A list with a single record signifying that the static member is leaving.
* @return A CoordinatorResult with a single record signifying that the static member is leaving.
*/
private List<Record> consumerGroupStaticMemberGroupLeave(
private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGroupStaticMemberGroupLeave(
ConsumerGroup group,
ConsumerGroupMember member
) {
@ -1549,7 +1548,13 @@ public class GroupMetadataManager {
.setMemberEpoch(LEAVE_GROUP_STATIC_MEMBER_EPOCH)
.setPartitionsPendingRevocation(Collections.emptyMap())
.build();
return Collections.singletonList(newCurrentAssignmentRecord(group.groupId(), leavingStaticMember));
return new CoordinatorResult<>(
Collections.singletonList(newCurrentAssignmentRecord(group.groupId(), leavingStaticMember)),
new ConsumerGroupHeartbeatResponseData()
.setMemberId(member.memberId())
.setMemberEpoch(LEAVE_GROUP_STATIC_MEMBER_EPOCH)
);
}
/**
@ -1557,17 +1562,19 @@ public class GroupMetadataManager {
*
* @param group The group.
* @param member The member.
* @param records The list of records to be applied to the state.
* @return The append future to be applied.
* @param response The response of the CoordinatorResult.
*
* @return The CoordinatorResult to be applied.
*/
private CompletableFuture<Void> consumerGroupFenceMember(
private <T> CoordinatorResult<T, Record> consumerGroupFenceMember(
ConsumerGroup group,
ConsumerGroupMember member,
List<Record> records
T response
) {
if (validateOnlineDowngrade(group, member.memberId())) {
return convertToClassicGroup(group, member.memberId(), records);
return convertToClassicGroup(group, member.memberId(), response);
} else {
List<Record> records = new ArrayList<>();
removeMember(records, group.groupId(), member.memberId());
// We update the subscription metadata without the leaving member.
@ -1590,7 +1597,7 @@ public class GroupMetadataManager {
cancelTimers(group.groupId(), member.memberId());
return null;
return new CoordinatorResult<>(records, response);
}
}
@ -1636,9 +1643,7 @@ public class GroupMetadataManager {
log.info("[GroupId {}] Member {} fenced from the group because its session expired.",
groupId, memberId);
List<Record> records = new ArrayList<>();
CompletableFuture<Void> appendFuture = consumerGroupFenceMember(group, member, records);
return new CoordinatorResult<>(records, appendFuture);
return consumerGroupFenceMember(group, member, null);
} catch (GroupIdNotFoundException ex) {
log.debug("[GroupId {}] Could not fence {} because the group does not exist.",
groupId, memberId);
@ -1689,9 +1694,7 @@ public class GroupMetadataManager {
"it failed to transition from epoch {} within {}ms.",
groupId, memberId, memberEpoch, rebalanceTimeoutMs);
List<Record> records = new ArrayList<>();
CompletableFuture<Void> appendFuture = consumerGroupFenceMember(group, member, records);
return new CoordinatorResult<>(records, appendFuture);
return consumerGroupFenceMember(group, member, null);
} else {
log.debug("[GroupId {}] Ignoring rebalance timeout for {} because the member " +
"left the epoch {}.", groupId, memberId, memberEpoch);
@ -2300,7 +2303,7 @@ public class GroupMetadataManager {
RecordHelpers.newEmptyGroupMetadataRecord(group, metadataImage.features().metadataVersion())
);
return new CoordinatorResult<>(records, appendFuture);
return new CoordinatorResult<>(records, appendFuture, false);
}
}
return result;
@ -2718,7 +2721,7 @@ public class GroupMetadataManager {
List<Record> records = Collections.singletonList(RecordHelpers.newGroupMetadataRecord(
group, Collections.emptyMap(), metadataImage.features().metadataVersion()));
return new CoordinatorResult<>(records, appendFuture);
return new CoordinatorResult<>(records, appendFuture, false);
} else {
log.info("Stabilized group {} generation {} with {} members.",
@ -3381,7 +3384,7 @@ public class GroupMetadataManager {
RecordHelpers.newGroupMetadataRecord(group, groupAssignment, metadataImage.features().metadataVersion())
);
return new CoordinatorResult<>(records, appendFuture);
return new CoordinatorResult<>(records, appendFuture, false);
} else {
return maybePrepareRebalanceOrCompleteJoin(
group,
@ -3497,7 +3500,7 @@ public class GroupMetadataManager {
List<Record> records = Collections.singletonList(
RecordHelpers.newGroupMetadataRecord(group, assignment, metadataImage.features().metadataVersion())
);
return new CoordinatorResult<>(records, appendFuture);
return new CoordinatorResult<>(records, appendFuture, false);
}
} else if (group.isInState(STABLE)) {
removePendingSyncMember(group, memberId);
@ -3772,7 +3775,8 @@ public class GroupMetadataManager {
coordinatorResult.records(),
new LeaveGroupResponseData()
.setMembers(memberResponses),
coordinatorResult.appendFuture()
coordinatorResult.appendFuture(),
coordinatorResult.replayRecords()
);
}

View File

@ -43,6 +43,12 @@ public class CoordinatorResult<T, U> {
*/
private final CompletableFuture<Void> appendFuture;
/**
* The boolean indicating whether to replay the records.
* The default value is {@code true} unless specified otherwise.
*/
private final boolean replayRecords;
/**
* Constructs a Result with records and a response.
*
@ -53,7 +59,7 @@ public class CoordinatorResult<T, U> {
List<U> records,
T response
) {
this(records, response, null);
this(records, response, null, true);
}
/**
@ -61,29 +67,34 @@ public class CoordinatorResult<T, U> {
*
* @param records A non-null list of records.
* @param appendFuture The future to complete once the records are committed.
* @param replayRecords The replayRecords.
*/
public CoordinatorResult(
List<U> records,
CompletableFuture<Void> appendFuture
CompletableFuture<Void> appendFuture,
boolean replayRecords
) {
this(records, null, appendFuture);
this(records, null, appendFuture, replayRecords);
}
/**
* Constructs a Result with records, a response, and an append-future.
* Constructs a Result with records, a response, an append-future, and replayRecords.
*
* @param records A non-null list of records.
* @param response A response.
* @param appendFuture The future to complete once the records are committed.
* @param replayRecords The replayRecords.
*/
public CoordinatorResult(
List<U> records,
T response,
CompletableFuture<Void> appendFuture
CompletableFuture<Void> appendFuture,
boolean replayRecords
) {
this.records = Objects.requireNonNull(records);
this.response = response;
this.appendFuture = appendFuture;
this.replayRecords = replayRecords;
}
/**
@ -94,7 +105,7 @@ public class CoordinatorResult<T, U> {
public CoordinatorResult(
List<U> records
) {
this(records, null, null);
this(records, null, null, true);
}
/**
@ -119,13 +130,10 @@ public class CoordinatorResult<T, U> {
}
/**
* If the append-future exists, this means
* that the in-memory state was already updated.
*
* @return Whether to replay the records.
*/
public boolean replayRecords() {
return appendFuture == null;
return replayRecords;
}
@Override
@ -137,6 +145,7 @@ public class CoordinatorResult<T, U> {
if (!Objects.equals(records, that.records)) return false;
if (!Objects.equals(response, that.response)) return false;
if (!Objects.equals(replayRecords, that.replayRecords)) return false;
return Objects.equals(appendFuture, that.appendFuture);
}
@ -145,6 +154,7 @@ public class CoordinatorResult<T, U> {
int result = records != null ? records.hashCode() : 0;
result = 31 * result + (response != null ? response.hashCode() : 0);
result = 31 * result + (appendFuture != null ? appendFuture.hashCode() : 0);
result = 31 * result + (replayRecords ? 1 : 0);
return result;
}
@Override
@ -152,6 +162,7 @@ public class CoordinatorResult<T, U> {
return "CoordinatorResult(records=" + records +
", response=" + response +
", appendFuture=" + appendFuture +
", replayRecords=" + replayRecords +
")";
}
}

View File

@ -528,7 +528,7 @@ public class GroupMetadataManagerTestContext {
request
);
if (result.appendFuture() == null) {
if (result.replayRecords()) {
result.records().forEach(this::replay);
}
return result;