KAFKA-19454: Handle topics missing in metadata in share delete. (#20090)

* There are instances where share group delete calls in group
coordinator (`onPartitionsDelete`, `deleteShareGroups`) where we lookup
the metadata image to fetch the topic id/partitions/topic name for a
topic name/id. However, there have been
instances where the looked up info was not found due to cluster being
under load or the underlying topic being deleted and information not
propagated correctly.
* To remedy the same, this PR adds checks to determine that topic is
indeed present in the image before the lookups thus preventing NPEs. The
problematic situations are logged.
* New tests have been added for `GroupMetadataManger` and
`GroupCoordinatorService`.

Reviewers: Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
Sushant Mahajan 2025-07-03 15:49:24 +05:30 committed by GitHub
parent 729f9ccf06
commit 268cf664c3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 303 additions and 30 deletions

View File

@ -96,6 +96,7 @@ import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics;
import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult; import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult;
import org.apache.kafka.image.MetadataDelta; import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage; import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.TopicImage;
import org.apache.kafka.server.authorizer.AuthorizableRequestContext; import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
import org.apache.kafka.server.authorizer.Authorizer; import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.server.record.BrokerCompressionType; import org.apache.kafka.server.record.BrokerCompressionType;
@ -2136,10 +2137,22 @@ public class GroupCoordinatorService implements GroupCoordinator {
).get(); ).get();
// At this point the metadata will not have been updated // At this point the metadata will not have been updated
// with the deleted topics. // with the deleted topics. However, we must guard against it.
Set<Uuid> topicIds = topicPartitions.stream() if (metadataImage == null || metadataImage.equals(MetadataImage.EMPTY)) {
.map(tp -> metadataImage.topics().getTopic(tp.topic()).id()) return;
.collect(Collectors.toSet()); }
Set<Uuid> topicIds = new HashSet<>();
for (TopicPartition tp : topicPartitions) {
TopicImage image = metadataImage.topics().getTopic(tp.topic());
if (image != null) {
topicIds.add(image.id());
}
}
if (topicIds.isEmpty()) {
return;
}
CompletableFuture.allOf( CompletableFuture.allOf(
FutureUtils.mapExceptionally( FutureUtils.mapExceptionally(

View File

@ -8020,24 +8020,52 @@ public class GroupMetadataManager {
// a retry for the same is possible. Since this is part of an admin operation // a retry for the same is possible. Since this is part of an admin operation
// retrying delete should not pose issues related to // retrying delete should not pose issues related to
// performance. Also, the share coordinator is idempotent on delete partitions. // performance. Also, the share coordinator is idempotent on delete partitions.
Map<Uuid, InitMapValue> deletingTopics = shareGroupStatePartitionMetadata.get(shareGroupId).deletingTopics().stream()
.map(tid -> {
TopicImage image = metadataImage.topics().getTopic(tid);
return Map.entry(tid, new InitMapValue(image.name(), image.partitions().keySet(), -1));
})
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
if (!deletingTopics.isEmpty()) { Set<Uuid> currentDeleting = shareGroupStatePartitionMetadata.get(shareGroupId).deletingTopics();
log.info("Existing deleting entries found in share group {} - {}", shareGroupId, deletingTopics); Map<Uuid, InitMapValue> deleteRetryCandidates = new HashMap<>();
deleteCandidates = combineInitMaps(deleteCandidates, deletingTopics); Set<Uuid> deletingToIgnore = new HashSet<>();
if (!currentDeleting.isEmpty()) {
if (metadataImage == null || metadataImage.equals(MetadataImage.EMPTY)) {
deletingToIgnore.addAll(currentDeleting);
} else {
for (Uuid deletingTopicId : currentDeleting) {
TopicImage topicImage = metadataImage.topics().getTopic(deletingTopicId);
if (topicImage == null) {
deletingToIgnore.add(deletingTopicId);
} else {
deleteRetryCandidates.put(deletingTopicId, new InitMapValue(topicImage.name(), topicImage.partitions().keySet(), -1));
}
}
}
} }
if (!deletingToIgnore.isEmpty()) {
log.warn("Some topics for share group id {} were not found in the metadata image - {}", shareGroupId, deletingToIgnore);
}
if (!deleteRetryCandidates.isEmpty()) {
log.info("Existing deleting entries found in share group {} - {}", shareGroupId, deleteRetryCandidates);
deleteCandidates = combineInitMaps(deleteCandidates, deleteRetryCandidates);
}
// Remove all initializing and initialized topic info from record and add deleting. There
// could be previous deleting topics due to offsets delete, we need to account for them as well.
// If some older deleting topics could not be found in the metadata image, they will be ignored
// and logged.
records.add(GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord(
shareGroupId,
Map.of(),
Map.of(),
deleteCandidates.entrySet().stream()
.map(entry -> Map.entry(entry.getKey(), entry.getValue().name()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))
));
if (deleteCandidates.isEmpty()) { if (deleteCandidates.isEmpty()) {
return Optional.empty(); return Optional.empty();
} }
List<TopicData<PartitionIdData>> topicDataList = new ArrayList<>(deleteCandidates.size()); List<TopicData<PartitionIdData>> topicDataList = new ArrayList<>(deleteCandidates.size());
for (Map.Entry<Uuid, InitMapValue> entry : deleteCandidates.entrySet()) { for (Map.Entry<Uuid, InitMapValue> entry : deleteCandidates.entrySet()) {
topicDataList.add(new TopicData<>( topicDataList.add(new TopicData<>(
entry.getKey(), entry.getKey(),
@ -8047,15 +8075,6 @@ public class GroupMetadataManager {
)); ));
} }
// Remove all initializing and initialized topic info from record and add deleting. There
// could be previous deleting topics due to offsets delete, we need to account for them as well.
records.add(GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord(
shareGroupId,
Map.of(),
Map.of(),
attachTopicName(deleteCandidates.keySet())
));
return Optional.of(new DeleteShareGroupStateParameters.Builder() return Optional.of(new DeleteShareGroupStateParameters.Builder()
.setGroupTopicPartitionData(new GroupTopicPartitionData.Builder<PartitionIdData>() .setGroupTopicPartitionData(new GroupTopicPartitionData.Builder<PartitionIdData>()
.setGroupId(shareGroupId) .setGroupId(shareGroupId)
@ -8247,13 +8266,15 @@ public class GroupMetadataManager {
shareGroupStatePartitionMetadata.forEach((groupId, metadata) -> { shareGroupStatePartitionMetadata.forEach((groupId, metadata) -> {
Set<Uuid> initializingDeletedCurrent = new HashSet<>(metadata.initializingTopics().keySet()); Set<Uuid> initializingDeletedCurrent = new HashSet<>(metadata.initializingTopics().keySet());
Set<Uuid> initializedDeletedCurrent = new HashSet<>(metadata.initializedTopics().keySet()); Set<Uuid> initializedDeletedCurrent = new HashSet<>(metadata.initializedTopics().keySet());
Set<Uuid> deletingDeletedCurrent = new HashSet<>(metadata.deletingTopics());
initializingDeletedCurrent.retainAll(deletedTopicIds); initializingDeletedCurrent.retainAll(deletedTopicIds);
initializedDeletedCurrent.retainAll(deletedTopicIds); initializedDeletedCurrent.retainAll(deletedTopicIds);
deletingDeletedCurrent.retainAll(deletedTopicIds);
// The deleted topic ids are neither present in initializing // The deleted topic ids are neither present in initializing
// not initialized, so we have nothing to do. // nor in initialized nor in deleting, so we have nothing to do.
if (initializingDeletedCurrent.isEmpty() && initializedDeletedCurrent.isEmpty()) { if (initializingDeletedCurrent.isEmpty() && initializedDeletedCurrent.isEmpty() && deletingDeletedCurrent.isEmpty()) {
return; return;
} }
@ -8268,14 +8289,14 @@ public class GroupMetadataManager {
Map<Uuid, InitMapValue> finalInitialized = new HashMap<>(metadata.initializedTopics()); Map<Uuid, InitMapValue> finalInitialized = new HashMap<>(metadata.initializedTopics());
initializedDeletedCurrent.forEach(finalInitialized::remove); initializedDeletedCurrent.forEach(finalInitialized::remove);
Set<Uuid> deletingTopics = new HashSet<>(metadata.deletingTopics()); Set<Uuid> finalDeleting = new HashSet<>(metadata.deletingTopics());
deletingTopics.removeAll(deletedTopicIds); finalDeleting.removeAll(deletedTopicIds);
records.add(GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord( records.add(GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord(
groupId, groupId,
finalInitializing, finalInitializing,
finalInitialized, finalInitialized,
attachTopicName(deletingTopics) attachTopicName(finalDeleting)
)); ));
}); });

View File

@ -3240,6 +3240,110 @@ public class GroupCoordinatorServiceTest {
BufferSupplier.NO_CACHING BufferSupplier.NO_CACHING
) )
); );
verify(runtime, times(1)).scheduleWriteAllOperation(
ArgumentMatchers.eq("maybe-cleanup-share-group-state"),
ArgumentMatchers.eq(Duration.ofMillis(5000)),
ArgumentMatchers.any()
);
}
@Test
public void testOnPartitionsDeletedCleanupShareGroupStateEmptyMetadata() {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(runtime)
.build();
service.startup(() -> 3);
MetadataImage image = new MetadataImageBuilder()
.addTopic(Uuid.randomUuid(), "bar", 1)
.build();
service.onNewMetadataImage(image, new MetadataDelta(image));
// No error in partition deleted callback
when(runtime.scheduleWriteAllOperation(
ArgumentMatchers.eq("on-partition-deleted"),
ArgumentMatchers.eq(Duration.ofMillis(5000)),
ArgumentMatchers.any()
)).thenReturn(List.of(
CompletableFuture.completedFuture(null),
CompletableFuture.completedFuture(null),
CompletableFuture.completedFuture(null)
));
when(runtime.scheduleWriteAllOperation(
ArgumentMatchers.eq("maybe-cleanup-share-group-state"),
ArgumentMatchers.eq(Duration.ofMillis(5000)),
ArgumentMatchers.any()
)).thenReturn(List.of(
CompletableFuture.completedFuture(null),
CompletableFuture.completedFuture(null),
CompletableFuture.completedFuture(null)
));
// The exception is logged and swallowed.
assertDoesNotThrow(() ->
service.onPartitionsDeleted(
List.of(new TopicPartition("foo", 0)),
BufferSupplier.NO_CACHING
)
);
verify(runtime, times(0)).scheduleWriteAllOperation(
ArgumentMatchers.eq("maybe-cleanup-share-group-state"),
ArgumentMatchers.eq(Duration.ofMillis(5000)),
ArgumentMatchers.any()
);
}
@Test
public void testOnPartitionsDeletedCleanupShareGroupStateTopicsNotInMetadata() {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(runtime)
.build();
service.startup(() -> 3);
MetadataImage image = MetadataImage.EMPTY;
service.onNewMetadataImage(image, new MetadataDelta(image));
// No error in partition deleted callback
when(runtime.scheduleWriteAllOperation(
ArgumentMatchers.eq("on-partition-deleted"),
ArgumentMatchers.eq(Duration.ofMillis(5000)),
ArgumentMatchers.any()
)).thenReturn(List.of(
CompletableFuture.completedFuture(null),
CompletableFuture.completedFuture(null),
CompletableFuture.completedFuture(null)
));
when(runtime.scheduleWriteAllOperation(
ArgumentMatchers.eq("maybe-cleanup-share-group-state"),
ArgumentMatchers.eq(Duration.ofMillis(5000)),
ArgumentMatchers.any()
)).thenReturn(List.of(
CompletableFuture.completedFuture(null),
CompletableFuture.completedFuture(null),
CompletableFuture.completedFuture(null)
));
// The exception is logged and swallowed.
assertDoesNotThrow(() ->
service.onPartitionsDeleted(
List.of(new TopicPartition("foo", 0)),
BufferSupplier.NO_CACHING
)
);
verify(runtime, times(0)).scheduleWriteAllOperation(
ArgumentMatchers.eq("maybe-cleanup-share-group-state"),
ArgumentMatchers.eq(Duration.ofMillis(5000)),
ArgumentMatchers.any()
);
} }
@Test @Test

View File

@ -21613,6 +21613,135 @@ public class GroupMetadataManagerTest {
assertRecordsEquals(expectedRecords, records); assertRecordsEquals(expectedRecords, records);
} }
@Test
public void testShareGroupDeleteRequestWithAlreadyDeletingTopicsButNotInMetadata() {
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
assignor.prepareGroupAssignment(new GroupAssignment(Map.of()));
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor))
.build();
Uuid t1Uuid = Uuid.randomUuid();
Uuid t2Uuid = Uuid.randomUuid();
Uuid t3Uuid = Uuid.randomUuid();
String t1Name = "t1";
String t2Name = "t2";
String t3Name = "t3";
String groupId = "share-group";
ShareGroup shareGroup = mock(ShareGroup.class);
when(shareGroup.groupId()).thenReturn(groupId);
when(shareGroup.isEmpty()).thenReturn(false);
MetadataImage image = new MetadataImageBuilder()
.addTopic(t1Uuid, t1Name, 2)
.addTopic(t2Uuid, t2Name, 2)
// .addTopic(t3Uuid, t3Name, 2) // Simulate deleting topic not present in metadata image.
.build();
MetadataDelta delta = new MetadataDelta(image);
context.groupMetadataManager.onNewMetadataImage(image, delta);
context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 0, 0));
context.replay(
GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord(
groupId,
Map.of(t1Uuid, new InitMapValue(t1Name, Set.of(0, 1), 1)),
Map.of(t2Uuid, new InitMapValue(t2Name, Set.of(0, 1), 1)),
Map.of(t3Uuid, t3Name)
)
);
context.commit();
Map<Uuid, Set<Integer>> expectedTopicPartitionMap = Map.of(
t1Uuid, Set.of(0, 1),
t2Uuid, Set.of(0, 1)
);
List<CoordinatorRecord> expectedRecords = List.of(
newShareGroupStatePartitionMetadataRecord(
groupId,
Map.of(),
Map.of(),
Map.of(t1Uuid, t1Name, t2Uuid, t2Name) // Existing deleting topics should be ignored.
)
);
List<CoordinatorRecord> records = new ArrayList<>();
Optional<DeleteShareGroupStateParameters> params = context.groupMetadataManager.shareGroupBuildPartitionDeleteRequest(groupId, records);
verifyShareGroupDeleteRequest(
params,
expectedTopicPartitionMap,
groupId,
true
);
assertRecordsEquals(expectedRecords, records);
}
@Test
public void testShareGroupDeleteRequestWithAlreadyDeletingTopicsButMetadataIsEmpty() {
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
assignor.prepareGroupAssignment(new GroupAssignment(Map.of()));
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor))
.build();
Uuid t1Uuid = Uuid.randomUuid();
Uuid t2Uuid = Uuid.randomUuid();
Uuid t3Uuid = Uuid.randomUuid();
String t1Name = "t1";
String t2Name = "t2";
String t3Name = "t3";
String groupId = "share-group";
ShareGroup shareGroup = mock(ShareGroup.class);
when(shareGroup.groupId()).thenReturn(groupId);
when(shareGroup.isEmpty()).thenReturn(false);
MetadataImage image = MetadataImage.EMPTY;
MetadataDelta delta = new MetadataDelta(image);
context.groupMetadataManager.onNewMetadataImage(image, delta);
context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 0, 0));
context.replay(
GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord(
groupId,
Map.of(t1Uuid, new InitMapValue(t1Name, Set.of(0, 1), 1)),
Map.of(t2Uuid, new InitMapValue(t2Name, Set.of(0, 1), 1)),
Map.of(t3Uuid, t3Name)
)
);
context.commit();
Map<Uuid, Set<Integer>> expectedTopicPartitionMap = Map.of(
t1Uuid, Set.of(0, 1),
t2Uuid, Set.of(0, 1)
);
List<CoordinatorRecord> expectedRecords = List.of(
newShareGroupStatePartitionMetadataRecord(
groupId,
Map.of(),
Map.of(),
Map.of(t1Uuid, t1Name, t2Uuid, t2Name) // Existing deleting topics should be ignored.
)
);
List<CoordinatorRecord> records = new ArrayList<>();
Optional<DeleteShareGroupStateParameters> params = context.groupMetadataManager.shareGroupBuildPartitionDeleteRequest(groupId, records);
verifyShareGroupDeleteRequest(
params,
expectedTopicPartitionMap,
groupId,
true
);
assertRecordsEquals(expectedRecords, records);
}
@Test @Test
public void testSharePartitionsEligibleForOffsetDeletionSuccess() { public void testSharePartitionsEligibleForOffsetDeletionSuccess() {
MockPartitionAssignor assignor = new MockPartitionAssignor("range"); MockPartitionAssignor assignor = new MockPartitionAssignor("range");
@ -22783,6 +22912,8 @@ public class GroupMetadataManagerTest {
String t4Name = "t4"; String t4Name = "t4";
Uuid t5Id = Uuid.randomUuid(); Uuid t5Id = Uuid.randomUuid();
String t5Name = "t5"; String t5Name = "t5";
Uuid t6Id = Uuid.randomUuid();
String t6Name = "t6";
MetadataImage image = new MetadataImageBuilder() MetadataImage image = new MetadataImageBuilder()
.addTopic(t1Id, t1Name, 2) .addTopic(t1Id, t1Name, 2)
@ -22790,6 +22921,7 @@ public class GroupMetadataManagerTest {
.addTopic(t3Id, t3Name, 3) .addTopic(t3Id, t3Name, 3)
.addTopic(t4Id, t4Name, 3) .addTopic(t4Id, t4Name, 3)
.addTopic(t5Id, t5Name, 3) .addTopic(t5Id, t5Name, 3)
.addTopic(t6Id, t6Name, 3)
.build(); .build();
MetadataDelta delta = new MetadataDelta(image); MetadataDelta delta = new MetadataDelta(image);
@ -22828,7 +22960,10 @@ public class GroupMetadataManagerTest {
.setDeletingTopics(List.of( .setDeletingTopics(List.of(
new ShareGroupStatePartitionMetadataValue.TopicInfo() new ShareGroupStatePartitionMetadataValue.TopicInfo()
.setTopicId(t5Id) .setTopicId(t5Id)
.setTopicName(t5Name) .setTopicName(t5Name),
new ShareGroupStatePartitionMetadataValue.TopicInfo()
.setTopicId(t6Id)
.setTopicName(t6Name)
)) ))
); );
@ -22861,7 +22996,7 @@ public class GroupMetadataManagerTest {
); );
CoordinatorResult<Void, CoordinatorRecord> expectedResult = new CoordinatorResult<>(expectedRecords); CoordinatorResult<Void, CoordinatorRecord> expectedResult = new CoordinatorResult<>(expectedRecords);
assertEquals(expectedResult, context.groupMetadataManager.maybeCleanupShareGroupState(Set.of(t1Id, t2Id))); assertEquals(expectedResult, context.groupMetadataManager.maybeCleanupShareGroupState(Set.of(t1Id, t2Id, t6Id)));
} }
@Test @Test