From a26d803f22e0f3f36ea96dc7afc29ae292e5a5f3 Mon Sep 17 00:00:00 2001 From: Sushant Mahajan Date: Thu, 15 May 2025 19:57:25 +0530 Subject: [PATCH] MINOR: Remove share group code from group coord onElection. (#19730) * Previously we had added code to `GroupCoordinatorService.onElection` to reconcile pending share group initializing topics. This was done to manage state in case of failovers and broker failures. * However, we later modified share group heartbeat code to do the retry to clean up the state and the `onElection` code is now redundant. * In this PR we are cleaning up this code. Reviewers: David Jacot , Andrew Schofield --- .../group/GroupCoordinatorService.java | 31 ------ .../group/GroupCoordinatorShard.java | 9 -- .../group/GroupMetadataManager.java | 26 ----- .../group/GroupCoordinatorServiceTest.java | 102 ------------------ 4 files changed, 168 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java index 5f10756f388..7072a8a8fb3 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java @@ -762,30 +762,6 @@ public class GroupCoordinatorService implements GroupCoordinator { }).thenCompose(resp -> resp); } - // Visibility for testing - CompletableFuture reconcileShareGroupStateInitializingState() { - List>> requestsStages = runtime.scheduleReadAllOperation( - "reconcile-share-group-initializing-state", - GroupCoordinatorShard::reconcileShareGroupStateInitializingState - ); - - if (requestsStages.isEmpty()) { - log.debug("Nothing to reconcile for share group initializing state."); - return CompletableFuture.completedFuture(null); - } - - CompletableFuture allRequestsStage = CompletableFuture.allOf(requestsStages.toArray(new CompletableFuture[0])); - final List> persisterResponses = new ArrayList<>(); - allRequestsStage.thenApply(__ -> { - requestsStages.forEach(requestsStage -> requestsStage.join().forEach(request -> { - log.debug("Reconciling initializing state - {}", request); - persisterResponses.add(persisterInitialize(request, new ShareGroupHeartbeatResponseData())); - })); - return null; - }); - return CompletableFuture.allOf(persisterResponses.toArray(new CompletableFuture[0])); - } - /** * See {@link GroupCoordinator#joinGroup(AuthorizableRequestContext, JoinGroupRequestData, BufferSupplier)}. */ @@ -2075,13 +2051,6 @@ public class GroupCoordinatorService implements GroupCoordinator { new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupMetadataPartitionIndex), groupMetadataPartitionLeaderEpoch ); - - // Wait for reconciliation to complete. - try { - reconcileShareGroupStateInitializingState().join(); - } catch (Exception e) { - log.error("Share group reconciliation failed", e); - } } /** diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java index 6d723c8e99e..2d32b3136b2 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java @@ -525,15 +525,6 @@ public class GroupCoordinatorShard implements CoordinatorShard reconcileShareGroupStateInitializingState(long offset) { - return groupMetadataManager.reconcileShareGroupStateInitializingState(offset); - } - /** * Returns the set of share-partitions whose share-group state has been initialized in the persister. * diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index c3b1de6fb40..afeb5f463c3 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -190,7 +190,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Objects; @@ -4822,31 +4821,6 @@ public class GroupMetadataManager { ); } - /** - * Iterates over all share groups and returns persister initialize requests corresponding to any initializing - * topic partitions found in the group associated {@link ShareGroupStatePartitionMetadataInfo}. - * @param offset The last committed offset for the {@link ShareGroupStatePartitionMetadataInfo} timeline hashmap. - * - * @return A list containing {@link InitializeShareGroupStateParameters} requests, could be empty. - */ - public List reconcileShareGroupStateInitializingState(long offset) { - List requests = new LinkedList<>(); - for (Group group : groups.values()) { - if (!(group instanceof ShareGroup shareGroup)) { - continue; - } - if (!(shareGroupPartitionMetadata.containsKey(shareGroup.groupId()))) { - continue; - } - Map> initializing = shareGroupPartitionMetadata.get(shareGroup.groupId(), offset).initializingTopics(); - if (initializing == null || initializing.isEmpty()) { - continue; - } - requests.add(buildInitializeShareGroupStateRequest(shareGroup.groupId(), shareGroup.groupEpoch(), initializing)); - } - return requests; - } - private Map attachTopicName(Set topicIds) { TopicsImage topicsImage = metadataImage.topics(); Map finalMap = new HashMap<>(); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java index 83b4748c754..a6f32d1fedf 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java @@ -103,7 +103,6 @@ import org.apache.kafka.server.share.persister.InitializeShareGroupStateResult; import org.apache.kafka.server.share.persister.NoOpStatePersister; import org.apache.kafka.server.share.persister.PartitionFactory; import org.apache.kafka.server.share.persister.PartitionIdData; -import org.apache.kafka.server.share.persister.PartitionStateData; import org.apache.kafka.server.share.persister.Persister; import org.apache.kafka.server.share.persister.ReadShareGroupStateSummaryParameters; import org.apache.kafka.server.share.persister.ReadShareGroupStateSummaryResult; @@ -5231,107 +5230,6 @@ public class GroupCoordinatorServiceTest { verify(mockPersister, times(1)).initializeState(ArgumentMatchers.any()); } - @Test - public void testReconcileShareGroupInitializingStateNoRequests() { - CoordinatorRuntime runtime = mockRuntime(); - Persister mockPersister = mock(Persister.class); - GroupCoordinatorService service = new GroupCoordinatorServiceBuilder() - .setConfig(createConfig()) - .setRuntime(runtime) - .setPersister(mockPersister) - .build(true); - - when(runtime.scheduleReadAllOperation( - ArgumentMatchers.eq("reconcile-share-group-initializing-state"), - ArgumentMatchers.any() - )).thenReturn(List.of()); - - when(runtime.scheduleWriteOperation( - ArgumentMatchers.eq("initialize-share-group-state"), - ArgumentMatchers.any(), - ArgumentMatchers.any(), - ArgumentMatchers.any() - )).thenReturn(CompletableFuture.completedFuture(null)); - - service.reconcileShareGroupStateInitializingState().join(); - verify(runtime, times(1)).scheduleReadAllOperation( - ArgumentMatchers.eq("reconcile-share-group-initializing-state"), - ArgumentMatchers.any() - ); - verify(mockPersister, times(0)).initializeState(ArgumentMatchers.any()); - verify(runtime, times(0)).scheduleWriteOperation( - ArgumentMatchers.eq("initialize-share-group-state"), - ArgumentMatchers.any(), - ArgumentMatchers.any(), - ArgumentMatchers.any() - ); - } - - @Test - public void testReconcileShareGroupInitializingState() { - CoordinatorRuntime runtime = mockRuntime(); - Persister mockPersister = mock(Persister.class); - GroupCoordinatorService service = new GroupCoordinatorServiceBuilder() - .setConfig(createConfig()) - .setRuntime(runtime) - .setPersister(mockPersister) - .build(true); - - String groupId1 = "groupId1"; - String groupId2 = "groupId2"; - - Uuid topicId1 = Uuid.randomUuid(); - Uuid topicId2 = Uuid.randomUuid(); - - InitializeShareGroupStateParameters req1 = new InitializeShareGroupStateParameters.Builder() - .setGroupTopicPartitionData(new GroupTopicPartitionData.Builder() - .setGroupId(groupId1) - .setTopicsData(List.of(new TopicData<>(topicId1, List.of(PartitionFactory.newPartitionStateData(0, 1, 0))))) - .build()) - .build(); - - InitializeShareGroupStateParameters req2 = new InitializeShareGroupStateParameters.Builder() - .setGroupTopicPartitionData(new GroupTopicPartitionData.Builder() - .setGroupId(groupId2) - .setTopicsData(List.of(new TopicData<>(topicId2, List.of(PartitionFactory.newPartitionStateData(0, 2, 10))))) - .build()) - .build(); - - when(runtime.scheduleReadAllOperation( - ArgumentMatchers.eq("reconcile-share-group-initializing-state"), - ArgumentMatchers.any() - )).thenReturn(List.of( - CompletableFuture.completedFuture(List.of(req1)), - CompletableFuture.completedFuture(List.of(req2)) - )); - - when(runtime.scheduleWriteOperation( - ArgumentMatchers.eq("initialize-share-group-state"), - ArgumentMatchers.any(), - ArgumentMatchers.any(), - ArgumentMatchers.any() - )).thenReturn(CompletableFuture.completedFuture(null)); - - when(mockPersister.initializeState(ArgumentMatchers.eq(req1))).thenReturn(CompletableFuture.completedFuture(new InitializeShareGroupStateResult.Builder() - .setTopicsData(List.of(new TopicData<>(topicId1, List.of(PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), Errors.NONE.message()))))) - .build()) - ); - - when(mockPersister.initializeState(ArgumentMatchers.eq(req2))).thenReturn(CompletableFuture.completedFuture(new InitializeShareGroupStateResult.Builder() - .setTopicsData(List.of(new TopicData<>(topicId2, List.of(PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), Errors.NONE.message()))))) - .build()) - ); - - service.reconcileShareGroupStateInitializingState().join(); - verify(mockPersister, times(2)).initializeState(ArgumentMatchers.any()); - verify(runtime, times(2)).scheduleWriteOperation( - ArgumentMatchers.eq("initialize-share-group-state"), - ArgumentMatchers.any(), - ArgumentMatchers.any(), - ArgumentMatchers.any() - ); - } - @FunctionalInterface private interface TriFunction { R apply(A a, B b, C c);