KAFKA-19163: Avoid deleting groups with pending transactional offsets (#19496)

When a group has pending transactional offsets but no committed offsets,
we can accidentally delete it while cleaning up expired offsets. Add a
check to avoid this case.

Reviewers: David Jacot <djacot@confluent.io>
This commit is contained in:
Sean Quah 2025-05-13 13:10:26 +01:00 committed by David Jacot
parent 9aad1849e2
commit 48f75616d7
2 changed files with 39 additions and 2 deletions

View File

@ -1001,13 +1001,14 @@ public class OffsetMetadataManager {
* @param groupId The group id.
* @param records The list of records to populate with offset commit tombstone records.
*
* @return True if no offsets exist or if all offsets expired, false otherwise.
* @return True if no offsets exist after expiry and no pending transactional offsets exist,
* false otherwise.
*/
public boolean cleanupExpiredOffsets(String groupId, List<CoordinatorRecord> records) {
TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>> offsetsByTopic =
offsets.offsetsByGroup.get(groupId);
if (offsetsByTopic == null) {
return true;
return !openTransactions.contains(groupId);
}
// We expect the group to exist.

View File

@ -2591,6 +2591,42 @@ public class OffsetMetadataManagerTest {
assertEquals(Collections.emptyList(), records);
}
@Test
public void testCleanupExpiredOffsetsWithPendingTransactionalOffsetsOnly() {
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
Group group = mock(Group.class);
OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder()
.withGroupMetadataManager(groupMetadataManager)
.withOffsetsRetentionMinutes(1)
.build();
long commitTimestamp = context.time.milliseconds();
context.commitOffset("group-id", "foo", 0, 100L, 0, commitTimestamp);
context.commitOffset(10L, "group-id", "foo", 1, 101L, 0, commitTimestamp + 500);
context.time.sleep(Duration.ofMinutes(1).toMillis());
when(groupMetadataManager.group("group-id")).thenReturn(group);
when(group.offsetExpirationCondition()).thenReturn(Optional.of(
new OffsetExpirationConditionImpl(offsetAndMetadata -> offsetAndMetadata.commitTimestampMs)));
when(group.isSubscribedToTopic("foo")).thenReturn(false);
// foo-0 is expired, but the group is not deleted beacuse it has pending transactional offset commits.
List<CoordinatorRecord> expectedRecords = List.of(
GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord("group-id", "foo", 0)
);
List<CoordinatorRecord> records = new ArrayList<>();
assertFalse(context.cleanupExpiredOffsets("group-id", records));
assertEquals(expectedRecords, records);
// No offsets are expired, and the group is still not deleted because it has pending transactional offset commits.
records = new ArrayList<>();
assertFalse(context.cleanupExpiredOffsets("group-id", records));
assertEquals(List.of(), records);
}
private static OffsetFetchResponseData.OffsetFetchResponsePartitions mkOffsetPartitionResponse(
int partition,
long offset,