MINOR: Remove share group code from group coord onElection. (#19730)

* Previously we had added code to `GroupCoordinatorService.onElection`
to reconcile pending share group initializing topics. This was done to
manage state in case of failovers and broker failures.
* However, we later modified share group heartbeat code to do the retry
to clean up the state and the `onElection` code is now redundant.
* In this PR we are cleaning up this code.

Reviewers: David Jacot <djacot@confluent.io>, Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
Sushant Mahajan 2025-05-15 19:57:25 +05:30 committed by GitHub
parent f9064f8bcd
commit a26d803f22
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 0 additions and 168 deletions

View File

@ -762,30 +762,6 @@ public class GroupCoordinatorService implements GroupCoordinator {
}).thenCompose(resp -> resp); }).thenCompose(resp -> resp);
} }
// Visibility for testing
CompletableFuture<Void> reconcileShareGroupStateInitializingState() {
List<CompletableFuture<List<InitializeShareGroupStateParameters>>> requestsStages = runtime.scheduleReadAllOperation(
"reconcile-share-group-initializing-state",
GroupCoordinatorShard::reconcileShareGroupStateInitializingState
);
if (requestsStages.isEmpty()) {
log.debug("Nothing to reconcile for share group initializing state.");
return CompletableFuture.completedFuture(null);
}
CompletableFuture<Void> allRequestsStage = CompletableFuture.allOf(requestsStages.toArray(new CompletableFuture<?>[0]));
final List<CompletableFuture<ShareGroupHeartbeatResponseData>> persisterResponses = new ArrayList<>();
allRequestsStage.thenApply(__ -> {
requestsStages.forEach(requestsStage -> requestsStage.join().forEach(request -> {
log.debug("Reconciling initializing state - {}", request);
persisterResponses.add(persisterInitialize(request, new ShareGroupHeartbeatResponseData()));
}));
return null;
});
return CompletableFuture.allOf(persisterResponses.toArray(new CompletableFuture<?>[0]));
}
/** /**
* See {@link GroupCoordinator#joinGroup(AuthorizableRequestContext, JoinGroupRequestData, BufferSupplier)}. * See {@link GroupCoordinator#joinGroup(AuthorizableRequestContext, JoinGroupRequestData, BufferSupplier)}.
*/ */
@ -2075,13 +2051,6 @@ public class GroupCoordinatorService implements GroupCoordinator {
new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupMetadataPartitionIndex), new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupMetadataPartitionIndex),
groupMetadataPartitionLeaderEpoch groupMetadataPartitionLeaderEpoch
); );
// Wait for reconciliation to complete.
try {
reconcileShareGroupStateInitializingState().join();
} catch (Exception e) {
log.error("Share group reconciliation failed", e);
}
} }
/** /**

View File

@ -525,15 +525,6 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord
return groupMetadataManager.uninitializeShareGroupState(groupId, topicPartitionMap); return groupMetadataManager.uninitializeShareGroupState(groupId, topicPartitionMap);
} }
/**
* Reconcile initializing and initialized tps in share group state metadata records.
*
* @return A Result containing ShareGroupStatePartitionMetadata records and Void response.
*/
public List<InitializeShareGroupStateParameters> reconcileShareGroupStateInitializingState(long offset) {
return groupMetadataManager.reconcileShareGroupStateInitializingState(offset);
}
/** /**
* Returns the set of share-partitions whose share-group state has been initialized in the persister. * Returns the set of share-partitions whose share-group state has been initialized in the persister.
* *

View File

@ -190,7 +190,6 @@ import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
@ -4822,31 +4821,6 @@ public class GroupMetadataManager {
); );
} }
/**
* Iterates over all share groups and returns persister initialize requests corresponding to any initializing
* topic partitions found in the group associated {@link ShareGroupStatePartitionMetadataInfo}.
* @param offset The last committed offset for the {@link ShareGroupStatePartitionMetadataInfo} timeline hashmap.
*
* @return A list containing {@link InitializeShareGroupStateParameters} requests, could be empty.
*/
public List<InitializeShareGroupStateParameters> reconcileShareGroupStateInitializingState(long offset) {
List<InitializeShareGroupStateParameters> requests = new LinkedList<>();
for (Group group : groups.values()) {
if (!(group instanceof ShareGroup shareGroup)) {
continue;
}
if (!(shareGroupPartitionMetadata.containsKey(shareGroup.groupId()))) {
continue;
}
Map<Uuid, Set<Integer>> initializing = shareGroupPartitionMetadata.get(shareGroup.groupId(), offset).initializingTopics();
if (initializing == null || initializing.isEmpty()) {
continue;
}
requests.add(buildInitializeShareGroupStateRequest(shareGroup.groupId(), shareGroup.groupEpoch(), initializing));
}
return requests;
}
private Map<Uuid, String> attachTopicName(Set<Uuid> topicIds) { private Map<Uuid, String> attachTopicName(Set<Uuid> topicIds) {
TopicsImage topicsImage = metadataImage.topics(); TopicsImage topicsImage = metadataImage.topics();
Map<Uuid, String> finalMap = new HashMap<>(); Map<Uuid, String> finalMap = new HashMap<>();

View File

@ -103,7 +103,6 @@ import org.apache.kafka.server.share.persister.InitializeShareGroupStateResult;
import org.apache.kafka.server.share.persister.NoOpStatePersister; import org.apache.kafka.server.share.persister.NoOpStatePersister;
import org.apache.kafka.server.share.persister.PartitionFactory; import org.apache.kafka.server.share.persister.PartitionFactory;
import org.apache.kafka.server.share.persister.PartitionIdData; import org.apache.kafka.server.share.persister.PartitionIdData;
import org.apache.kafka.server.share.persister.PartitionStateData;
import org.apache.kafka.server.share.persister.Persister; import org.apache.kafka.server.share.persister.Persister;
import org.apache.kafka.server.share.persister.ReadShareGroupStateSummaryParameters; import org.apache.kafka.server.share.persister.ReadShareGroupStateSummaryParameters;
import org.apache.kafka.server.share.persister.ReadShareGroupStateSummaryResult; import org.apache.kafka.server.share.persister.ReadShareGroupStateSummaryResult;
@ -5231,107 +5230,6 @@ public class GroupCoordinatorServiceTest {
verify(mockPersister, times(1)).initializeState(ArgumentMatchers.any()); verify(mockPersister, times(1)).initializeState(ArgumentMatchers.any());
} }
@Test
public void testReconcileShareGroupInitializingStateNoRequests() {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
Persister mockPersister = mock(Persister.class);
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(runtime)
.setPersister(mockPersister)
.build(true);
when(runtime.scheduleReadAllOperation(
ArgumentMatchers.eq("reconcile-share-group-initializing-state"),
ArgumentMatchers.any()
)).thenReturn(List.of());
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("initialize-share-group-state"),
ArgumentMatchers.any(),
ArgumentMatchers.any(),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(null));
service.reconcileShareGroupStateInitializingState().join();
verify(runtime, times(1)).scheduleReadAllOperation(
ArgumentMatchers.eq("reconcile-share-group-initializing-state"),
ArgumentMatchers.any()
);
verify(mockPersister, times(0)).initializeState(ArgumentMatchers.any());
verify(runtime, times(0)).scheduleWriteOperation(
ArgumentMatchers.eq("initialize-share-group-state"),
ArgumentMatchers.any(),
ArgumentMatchers.any(),
ArgumentMatchers.any()
);
}
@Test
public void testReconcileShareGroupInitializingState() {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
Persister mockPersister = mock(Persister.class);
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(runtime)
.setPersister(mockPersister)
.build(true);
String groupId1 = "groupId1";
String groupId2 = "groupId2";
Uuid topicId1 = Uuid.randomUuid();
Uuid topicId2 = Uuid.randomUuid();
InitializeShareGroupStateParameters req1 = new InitializeShareGroupStateParameters.Builder()
.setGroupTopicPartitionData(new GroupTopicPartitionData.Builder<PartitionStateData>()
.setGroupId(groupId1)
.setTopicsData(List.of(new TopicData<>(topicId1, List.of(PartitionFactory.newPartitionStateData(0, 1, 0)))))
.build())
.build();
InitializeShareGroupStateParameters req2 = new InitializeShareGroupStateParameters.Builder()
.setGroupTopicPartitionData(new GroupTopicPartitionData.Builder<PartitionStateData>()
.setGroupId(groupId2)
.setTopicsData(List.of(new TopicData<>(topicId2, List.of(PartitionFactory.newPartitionStateData(0, 2, 10)))))
.build())
.build();
when(runtime.scheduleReadAllOperation(
ArgumentMatchers.eq("reconcile-share-group-initializing-state"),
ArgumentMatchers.any()
)).thenReturn(List.of(
CompletableFuture.completedFuture(List.of(req1)),
CompletableFuture.completedFuture(List.of(req2))
));
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("initialize-share-group-state"),
ArgumentMatchers.any(),
ArgumentMatchers.any(),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(null));
when(mockPersister.initializeState(ArgumentMatchers.eq(req1))).thenReturn(CompletableFuture.completedFuture(new InitializeShareGroupStateResult.Builder()
.setTopicsData(List.of(new TopicData<>(topicId1, List.of(PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), Errors.NONE.message())))))
.build())
);
when(mockPersister.initializeState(ArgumentMatchers.eq(req2))).thenReturn(CompletableFuture.completedFuture(new InitializeShareGroupStateResult.Builder()
.setTopicsData(List.of(new TopicData<>(topicId2, List.of(PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), Errors.NONE.message())))))
.build())
);
service.reconcileShareGroupStateInitializingState().join();
verify(mockPersister, times(2)).initializeState(ArgumentMatchers.any());
verify(runtime, times(2)).scheduleWriteOperation(
ArgumentMatchers.eq("initialize-share-group-state"),
ArgumentMatchers.any(),
ArgumentMatchers.any(),
ArgumentMatchers.any()
);
}
@FunctionalInterface @FunctionalInterface
private interface TriFunction<A, B, C, R> { private interface TriFunction<A, B, C, R> {
R apply(A a, B b, C c); R apply(A a, B b, C c);