KAFKA-19570: Implement offline migration for streams groups (#20288)

Offline migration essentially preserves offsets and nothing else. So
effectively write tombstones for classic group type when a streams
heartbeat is sent to with the group ID of an empty classic group, and
write tombstones for the streams group type when a classic consumer
attempts to join with a group ID of an empty streams group.

Reviewers: Bill Bejeck <bbejeck@apache.org>, Sean Quah
 <squah@confluent.io>, Dongnuo Lyu <dlyu@confluent.io>
This commit is contained in:
Lucas Brutschy 2025-08-26 10:05:30 +02:00 committed by GitHub
parent 30ffd42b26
commit f621a635c1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 195 additions and 3 deletions

View File

@ -833,19 +833,28 @@ public class GroupMetadataManager {
* Gets or creates a streams group without updating the groups map.
* The group will be materialized during the replay.
*
* If there is an empty classic consumer group of the same name, it will be deleted and a new streams
* group will be created.
*
* @param groupId The group ID.
* @param records The record list to which the group tombstones are written
* if the group is empty and is a classic group.
*
* @return A StreamsGroup.
*
* Package private for testing.
*/
StreamsGroup getOrCreateStreamsGroup(
String groupId
String groupId,
List<CoordinatorRecord> records
) {
Group group = groups.get(groupId);
if (group == null) {
return new StreamsGroup(logContext, snapshotRegistry, groupId, metrics);
} else if (maybeDeleteEmptyClassicGroup(group, records)) {
log.info("[GroupId {}] Converted the empty classic group to a streams group.", groupId);
return new StreamsGroup(logContext, snapshotRegistry, groupId, metrics);
} else {
return castToStreamsGroup(group);
}
@ -1871,7 +1880,7 @@ public class GroupMetadataManager {
boolean isJoining = memberEpoch == 0;
StreamsGroup group;
if (isJoining) {
group = getOrCreateStreamsGroup(groupId);
group = getOrCreateStreamsGroup(groupId, records);
throwIfStreamsGroupIsFull(group);
} else {
group = getStreamsGroupOrThrow(groupId);
@ -6066,7 +6075,11 @@ public class GroupMetadataManager {
// classicGroupJoinToConsumerGroup takes the join requests to non-empty consumer groups.
// The empty consumer groups should be converted to classic groups in classicGroupJoinToClassicGroup.
return classicGroupJoinToConsumerGroup((ConsumerGroup) group, context, request, responseFuture);
} else if (group.type() == CONSUMER || group.type() == CLASSIC) {
} else if (group.type() == CONSUMER || group.type() == CLASSIC || group.type() == STREAMS && group.isEmpty()) {
// classicGroupJoinToClassicGroup accepts:
// - classic groups
// - empty streams groups
// - empty consumer groups
return classicGroupJoinToClassicGroup(context, request, responseFuture);
} else {
// Group exists but it's not a consumer group
@ -6107,6 +6120,8 @@ public class GroupMetadataManager {
ClassicGroup group;
if (maybeDeleteEmptyConsumerGroup(groupId, records)) {
log.info("[GroupId {}] Converted the empty consumer group to a classic group.", groupId);
} else if (maybeDeleteEmptyStreamsGroup(groupId, records)) {
log.info("[GroupId {}] Converted the empty streams group to a classic group.", groupId);
}
boolean isNewGroup = !groups.containsKey(groupId);
try {
@ -8398,6 +8413,13 @@ public class GroupMetadataManager {
return group != null && group.type() == CONSUMER && group.isEmpty();
}
/**
* @return true if the group is an empty streams group.
*/
private static boolean isEmptyStreamsGroup(Group group) {
return group != null && group.type() == STREAMS && group.isEmpty();
}
/**
* Write tombstones for the group if it's empty and is a classic group.
*
@ -8435,6 +8457,26 @@ public class GroupMetadataManager {
}
return false;
}
/**
* Delete and write tombstones for the group if it's empty and is a streams group.
*
* @param groupId The group id to be deleted.
* @param records The list of records to delete the group.
*
* @return true if the group is an empty streams group.
*/
private boolean maybeDeleteEmptyStreamsGroup(String groupId, List<CoordinatorRecord> records) {
Group group = groups.get(groupId, Long.MAX_VALUE);
if (isEmptyStreamsGroup(group)) {
// Add tombstones for the previous streams group. The tombstones won't actually be
// replayed because its coordinator result has a non-null appendFuture.
createGroupTombstoneRecords(group, records);
removeGroup(groupId);
return true;
}
return false;
}
/**
* Checks whether the given protocol type or name in the request is inconsistent with the group's.

View File

@ -18633,6 +18633,156 @@ public class GroupMetadataManagerTest {
assertNull(result.response().data().partitionsByUserEndpoint());
}
@Test
public void testStreamsGroupHeartbeatWithNonEmptyClassicGroup() {
String classicGroupId = "classic-group-id";
String memberId = Uuid.randomUuid().toString();
String subtopology1 = "subtopology1";
String fooTopicName = "foo";
Topology topology = new Topology().setSubtopologies(List.of(
new Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
));
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder().build();
ClassicGroup classicGroup = new ClassicGroup(
new LogContext(),
classicGroupId,
EMPTY,
context.time
);
context.replay(GroupCoordinatorRecordHelpers.newGroupMetadataRecord(classicGroup, classicGroup.groupAssignment()));
context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId, false).transitionTo(PREPARING_REBALANCE);
assertThrows(GroupIdNotFoundException.class, () ->
context.streamsGroupHeartbeat(
new StreamsGroupHeartbeatRequestData()
.setGroupId(classicGroupId)
.setMemberId(memberId)
.setMemberEpoch(0)
.setRebalanceTimeoutMs(12000)
.setTopology(topology)
.setActiveTasks(List.of())
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())));
}
@Test
public void testStreamsGroupHeartbeatWithEmptyClassicGroup() {
String classicGroupId = "classic-group-id";
String memberId = Uuid.randomUuid().toString();
String fooTopicName = "foo";
String subtopology1 = "subtopology1";
Topology topology = new Topology().setSubtopologies(List.of(
new Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
));
MockTaskAssignor assignor = new MockTaskAssignor("sticky");
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withStreamsGroupTaskAssignors(List.of(assignor))
.build();
ClassicGroup classicGroup = new ClassicGroup(
new LogContext(),
classicGroupId,
EMPTY,
context.time
);
context.replay(GroupCoordinatorRecordHelpers.newGroupMetadataRecord(classicGroup, classicGroup.groupAssignment()));
CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> result = context.streamsGroupHeartbeat(
new StreamsGroupHeartbeatRequestData()
.setGroupId(classicGroupId)
.setMemberId(memberId)
.setMemberEpoch(0)
.setRebalanceTimeoutMs(12000)
.setTopology(topology)
.setActiveTasks(List.of())
.setStandbyTasks(List.of())
.setWarmupTasks(List.of()));
StreamsGroupMember expectedMember = StreamsGroupMember.Builder.withDefaults(memberId)
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
.setMemberEpoch(1)
.setPreviousMemberEpoch(0)
.setRebalanceTimeoutMs(5000)
.setClientId(DEFAULT_CLIENT_ID)
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
.setAssignedTasks(TasksTuple.EMPTY)
.setTasksPendingRevocation(TasksTuple.EMPTY)
.setRebalanceTimeoutMs(12000)
.setTopologyEpoch(0)
.build();
assertEquals(Errors.NONE.code(), result.response().data().errorCode());
assertEquals(
List.of(
GroupCoordinatorRecordHelpers.newGroupMetadataTombstoneRecord(classicGroupId),
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(classicGroupId, expectedMember),
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(classicGroupId, topology),
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(classicGroupId, 1, 0),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(classicGroupId, memberId, TasksTuple.EMPTY),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(classicGroupId, 1),
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(classicGroupId, expectedMember)
),
result.records()
);
assertEquals(
Group.GroupType.STREAMS,
context.groupMetadataManager.streamsGroup(classicGroupId).type()
);
}
@Test
public void testClassicGroupJoinWithEmptyStreamsGroup() throws Exception {
String streamsGroupId = "streams-group-id";
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withStreamsGroup(new StreamsGroupBuilder(streamsGroupId, 10))
.build();
JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
.withGroupId(streamsGroupId)
.withMemberId(UNKNOWN_MEMBER_ID)
.withDefaultProtocolTypeAndProtocols()
.build();
GroupMetadataManagerTestContext.JoinResult joinResult = context.sendClassicGroupJoin(request, true);
List<CoordinatorRecord> expectedRecords = List.of(
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochTombstoneRecord(streamsGroupId),
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochTombstoneRecord(streamsGroupId),
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecordTombstone(streamsGroupId)
);
assertEquals(Errors.MEMBER_ID_REQUIRED.code(), joinResult.joinFuture.get().errorCode());
assertEquals(expectedRecords, joinResult.records.subList(0, expectedRecords.size()));
assertEquals(
Group.GroupType.CLASSIC,
context.groupMetadataManager.getOrMaybeCreateClassicGroup(streamsGroupId, false).type()
);
}
@Test
public void testClassicGroupJoinWithNonEmptyStreamsGroup() throws Exception {
String streamsGroupId = "streams-group-id";
String memberId = Uuid.randomUuid().toString();
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withStreamsGroup(new StreamsGroupBuilder(streamsGroupId, 10)
.withMember(StreamsGroupMember.Builder.withDefaults(memberId)
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
.setMemberEpoch(10)
.setPreviousMemberEpoch(10)
.build()))
.build();
JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
.withGroupId(streamsGroupId)
.withMemberId(UNKNOWN_MEMBER_ID)
.withDefaultProtocolTypeAndProtocols()
.build();
GroupMetadataManagerTestContext.JoinResult joinResult = context.sendClassicGroupJoin(request);
assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL.code(), joinResult.joinFuture.get().errorCode());
}
@Test
public void testConsumerGroupDynamicConfigs() {
String groupId = "fooup";