diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java index d2ae013b818..d0f6901d444 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java @@ -824,9 +824,17 @@ public class GroupCoordinatorRecordHelpers { */ public static CoordinatorRecord newShareGroupStatePartitionMetadataRecord( String groupId, + Map>> initializingTopics, Map>> initializedTopics, Map deletingTopics ) { + List initializingTopicPartitionInfo = initializingTopics.entrySet().stream() + .map(entry -> new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo() + .setTopicId(entry.getKey()) + .setTopicName(entry.getValue().getKey()) + .setPartitions(entry.getValue().getValue().stream().toList())) + .toList(); + List initializedTopicPartitionInfo = initializedTopics.entrySet().stream() .map(entry -> new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo() .setTopicId(entry.getKey()) @@ -845,6 +853,7 @@ public class GroupCoordinatorRecordHelpers { .setGroupId(groupId), new ApiMessageAndVersion( new ShareGroupStatePartitionMetadataValue() + .setInitializingTopics(initializingTopicPartitionInfo) .setInitializedTopics(initializedTopicPartitionInfo) .setDeletingTopics(deletingTopicsInfo), (short) 0 diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java index e670b7c865b..0096b21e398 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java @@ -447,9 +447,8 @@ public class GroupCoordinatorService implements GroupCoordinator { Duration.ofMillis(config.offsetCommitTimeoutMs()), coordinator -> coordinator.shareGroupHeartbeat(context, request) ).thenCompose(result -> { - // This ensures that the previous group write has completed successfully - // before we start the persister initialize phase. if (result.getValue().isPresent()) { + // Adding to timer makes this call async with respect to the heartbeat. timer.add(new TimerTask(0L) { @Override public void run() { @@ -475,16 +474,19 @@ public class GroupCoordinatorService implements GroupCoordinator { ShareGroupHeartbeatResponseData defaultResponse ) { return persister.initializeState(request) - .thenCompose( - response -> handlePersisterInitializeResponse(request.groupTopicPartitionData().groupId(), response, defaultResponse) - ).exceptionally(exception -> { + .handle((response, exp) -> { + if (exp == null) { + return handlePersisterInitializeResponse(request.groupTopicPartitionData().groupId(), response, defaultResponse); + } GroupTopicPartitionData gtp = request.groupTopicPartitionData(); - log.error("Unable to initialize share group state {}, {}", gtp.groupId(), gtp.topicsData(), exception); - Errors error = Errors.forException(exception); - return new ShareGroupHeartbeatResponseData() - .setErrorCode(error.code()) - .setErrorMessage(error.message()); - }); + log.error("Unable to initialize share group state {}, {}", gtp.groupId(), gtp.topicsData(), exp); + Errors error = Errors.forException(exp); + Map> topicPartitionMap = new HashMap<>(); + gtp.topicsData().forEach(topicData -> topicPartitionMap.computeIfAbsent(topicData.topicId(), k -> new HashSet<>()) + .addAll(topicData.partitions().stream().map(PartitionStateData::partition).collect(Collectors.toSet()))); + return uninitializeShareGroupState(error, gtp.groupId(), topicPartitionMap); + }) + .thenCompose(resp -> resp); } private CompletableFuture handlePersisterInitializeResponse( @@ -501,26 +503,44 @@ public class GroupCoordinatorService implements GroupCoordinator { } } + Map> topicPartitionMap = new HashMap<>(); + for (TopicData topicData : persisterInitializeResult.topicsData()) { + topicPartitionMap.put( + topicData.topicId(), + topicData.partitions().stream().map(PartitionErrorData::partition).collect(Collectors.toSet()) + ); + } + if (persisterError.code() == Errors.NONE.code()) { - Map> topicPartitionMap = new HashMap<>(); - for (TopicData topicData : persisterInitializeResult.topicsData()) { - topicPartitionMap.put( - topicData.topicId(), - topicData.partitions().stream().map(PartitionErrorData::partition).collect(Collectors.toSet()) - ); - } if (topicPartitionMap.isEmpty()) { return CompletableFuture.completedFuture(defaultResponse); } return performShareGroupStateMetadataInitialize(groupId, topicPartitionMap, defaultResponse); - } else { - log.error("Received error while calling initialize state for {} on persister {}.", groupId, persisterError.code()); - return CompletableFuture.completedFuture( - new ShareGroupHeartbeatResponseData() - .setErrorCode(persisterError.code()) - .setErrorMessage(persisterError.message()) - ); } + log.error("Received error while calling initialize state for {} on persister {}.", groupId, persisterError.code()); + return uninitializeShareGroupState(persisterError, groupId, topicPartitionMap); + } + + private CompletableFuture uninitializeShareGroupState( + Errors error, + String groupId, + Map> topicPartitionMap + ) { + return runtime.scheduleWriteOperation( + "uninitialize-share-group-state", + topicPartitionFor(groupId), + Duration.ofMillis(config.offsetCommitTimeoutMs()), + coordinator -> coordinator.uninitializeShareGroupState(groupId, topicPartitionMap) + ).thenApply(__ -> new ShareGroupHeartbeatResponseData() + .setErrorCode(error.code()) + .setErrorMessage(error.message()) + ).exceptionally(exception -> { + log.error("Unable to cleanup topic partitions from share group state metadata", exception); + Errors err = Errors.forException(new IllegalStateException("Unable to cleanup topic partitions from share group state metadata", exception)); + return new ShareGroupHeartbeatResponseData() + .setErrorCode(err.code()) + .setErrorMessage(err.message()); + }); } private CompletableFuture performShareGroupStateMetadataInitialize( @@ -533,15 +553,38 @@ public class GroupCoordinatorService implements GroupCoordinator { topicPartitionFor(groupId), Duration.ofMillis(config.offsetCommitTimeoutMs()), coordinator -> coordinator.initializeShareGroupState(groupId, topicPartitionMap) - ).thenApply( - __ -> defaultResponse - ).exceptionally(exception -> { - log.error("Unable to initialize share group state partition metadata for {}.", groupId, exception); - Errors error = Errors.forException(exception); - return new ShareGroupHeartbeatResponseData() - .setErrorCode(error.code()) - .setErrorMessage(error.message()); + ).handle((__, exp) -> { + if (exp == null) { + return CompletableFuture.completedFuture(defaultResponse); + } + log.error("Unable to initialize share group state partition metadata for {}.", groupId, exp); + Errors error = Errors.forException(exp); + return uninitializeShareGroupState(error, groupId, topicPartitionMap); + }).thenCompose(resp -> resp); + } + + // Visibility for testing + CompletableFuture reconcileShareGroupStateInitializingState() { + List>> requestsStages = runtime.scheduleReadAllOperation( + "reconcile-share-group-initializing-state", + GroupCoordinatorShard::reconcileShareGroupStateInitializingState + ); + + if (requestsStages.isEmpty()) { + log.debug("Nothing to reconcile for share group initializing state."); + return CompletableFuture.completedFuture(null); + } + + CompletableFuture allRequestsStage = CompletableFuture.allOf(requestsStages.toArray(new CompletableFuture[0])); + final List> persisterResponses = new ArrayList<>(); + allRequestsStage.thenApply(__ -> { + requestsStages.forEach(requestsStage -> requestsStage.join().forEach(request -> { + log.debug("Reconciling initializing state - {}", request); + persisterResponses.add(persisterInitialize(request, new ShareGroupHeartbeatResponseData())); + })); + return null; }); + return CompletableFuture.allOf(persisterResponses.toArray(new CompletableFuture[0])); } /** @@ -1586,6 +1629,13 @@ public class GroupCoordinatorService implements GroupCoordinator { new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupMetadataPartitionIndex), groupMetadataPartitionLeaderEpoch ); + + // Wait for reconciliation to complete. + try { + reconcileShareGroupStateInitializingState().join(); + } catch (Exception e) { + log.error("Share group reconciliation failed", e); + } } /** diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java index 88f6bb2b189..3224616f9a9 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java @@ -442,6 +442,30 @@ public class GroupCoordinatorShard implements CoordinatorShard uninitializeShareGroupState( + String groupId, + Map> topicPartitionMap + ) { + return groupMetadataManager.uninitializeShareGroupState(groupId, topicPartitionMap); + } + + /** + * Reconcile initializing and initialized tps in share group state metadata records. + * + * @return A Result containing ShareGroupStatePartitionMetadata records and Void response. + */ + public List reconcileShareGroupStateInitializingState(long offset) { + return groupMetadataManager.reconcileShareGroupStateInitializingState(offset); + } + /** * Handles a JoinGroup request. * diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 73aa43eda6f..62f227e50dc 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -187,6 +187,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Objects; @@ -514,6 +515,7 @@ public class GroupMetadataManager { * @param deletingTopics Set of topic ids. */ private record ShareGroupStatePartitionMetadataInfo( + Map> initializingTopics, Map> initializedTopics, Set deletingTopics ) { @@ -2830,7 +2832,7 @@ public class GroupMetadataManager { records, Map.entry( response, - maybeCreateInitializeShareGroupStateRequest(groupId, groupEpoch, subscriptionMetadata) + maybeCreateInitializeShareGroupStateRequest(groupId, groupEpoch, subscriptionMetadata, records) ) ); } @@ -2868,15 +2870,14 @@ public class GroupMetadataManager { * @return A map of topic partitions which are subscribed by the share group but not initialized yet. */ // Visibility for testing - Map>> subscribedTopicsChangeMap(String groupId, Map subscriptionMetadata) { - TopicsImage topicsImage = metadataImage.topics(); - if (topicsImage == null || topicsImage.isEmpty() || subscriptionMetadata == null || subscriptionMetadata.isEmpty()) { + Map> subscribedTopicsChangeMap(String groupId, Map subscriptionMetadata) { + if (subscriptionMetadata == null || subscriptionMetadata.isEmpty()) { return Map.of(); } - Map>> topicPartitionChangeMap = new HashMap<>(); + Map> topicPartitionChangeMap = new HashMap<>(); ShareGroupStatePartitionMetadataInfo info = shareGroupPartitionMetadata.get(groupId); - Map> alreadyInitialized = info == null ? Map.of() : info.initializedTopics(); + Map> alreadyInitialized = info == null ? new HashMap<>() : mergeShareGroupInitMaps(info.initializedTopics(), info.initializingTopics()); subscriptionMetadata.forEach((topicName, topicMetadata) -> { Set alreadyInitializedPartSet = alreadyInitialized.getOrDefault(topicMetadata.id(), Set.of()); @@ -2884,10 +2885,7 @@ public class GroupMetadataManager { Set partitionSet = IntStream.range(0, topicMetadata.numPartitions()).boxed().collect(Collectors.toSet()); partitionSet.removeAll(alreadyInitializedPartSet); - topicPartitionChangeMap.computeIfAbsent(topicMetadata.id(), k -> Map.entry( - topicName, - partitionSet - )); + topicPartitionChangeMap.computeIfAbsent(topicMetadata.id(), k -> partitionSet); } }); return topicPartitionChangeMap; @@ -2905,31 +2903,80 @@ public class GroupMetadataManager { private Optional maybeCreateInitializeShareGroupStateRequest( String groupId, int groupEpoch, - Map subscriptionMetadata + Map subscriptionMetadata, + List records ) { if (subscriptionMetadata == null || subscriptionMetadata.isEmpty() || metadataImage.isEmpty()) { return Optional.empty(); } - Map>> topicPartitionChangeMap = subscribedTopicsChangeMap(groupId, subscriptionMetadata); + Map> topicPartitionChangeMap = subscribedTopicsChangeMap(groupId, subscriptionMetadata); // Nothing to initialize. if (topicPartitionChangeMap.isEmpty()) { return Optional.empty(); } - return Optional.of(new InitializeShareGroupStateParameters.Builder().setGroupTopicPartitionData( - new GroupTopicPartitionData<>(groupId, topicPartitionChangeMap.entrySet().stream() + addInitializingTopicsRecords(groupId, records, topicPartitionChangeMap); + return Optional.of(buildInitializeShareGroupStateRequest(groupId, groupEpoch, topicPartitionChangeMap)); + } + + private InitializeShareGroupStateParameters buildInitializeShareGroupStateRequest(String groupId, int groupEpoch, Map> topicPartitions) { + return new InitializeShareGroupStateParameters.Builder().setGroupTopicPartitionData( + new GroupTopicPartitionData<>(groupId, topicPartitions.entrySet().stream() .map(entry -> new TopicData<>( entry.getKey(), - entry.getValue().getValue().stream() + entry.getValue().stream() .map(partitionId -> PartitionFactory.newPartitionStateData(partitionId, groupEpoch, -1)) .toList()) ).toList() - )).build() + )).build(); + } + + private void addInitializingTopicsRecords(String groupId, List records, Map> topicPartitionMap) { + if (topicPartitionMap == null || topicPartitionMap.isEmpty()) { + return; + } + + ShareGroupStatePartitionMetadataInfo currentMap = shareGroupPartitionMetadata.get(groupId); + if (currentMap == null) { + records.add(newShareGroupStatePartitionMetadataRecord(groupId, attachTopicName(topicPartitionMap), Map.of(), Map.of())); + return; + } + + // We must combine the existing information in the record with the topicPartitionMap argument. + Map> finalInitializingMap = mergeShareGroupInitMaps(currentMap.initializingTopics(), topicPartitionMap); + + records.add( + newShareGroupStatePartitionMetadataRecord( + groupId, + attachTopicName(finalInitializingMap), + attachTopicName(currentMap.initializedTopics()), + Map.of() + ) ); } + // Visibility for tests + static Map> mergeShareGroupInitMaps( + Map> existingShareGroupInitMap, + Map> newShareGroupInitMap + ) { + Map> finalInitMap = new HashMap<>(); + Set combinedTopicIdSet = new HashSet<>(existingShareGroupInitMap.keySet()); + combinedTopicIdSet.addAll(newShareGroupInitMap.keySet()); + + for (Uuid topicId : combinedTopicIdSet) { + Set partitions = new HashSet<>(existingShareGroupInitMap.getOrDefault(topicId, new HashSet<>())); + if (newShareGroupInitMap.containsKey(topicId)) { + partitions.addAll(newShareGroupInitMap.get(topicId)); + } + finalInitMap.putIfAbsent(topicId, partitions); + } + + return finalInitMap; + } + /** * Gets or subscribes a new dynamic consumer group member. * @@ -4870,37 +4917,127 @@ public class GroupMetadataManager { } ShareGroup group = (ShareGroup) groups.get(groupId); - // We must combine the existing information in the record with the - // topicPartitionMap argument. - Map>> finalMap = new HashMap<>(); - ShareGroupStatePartitionMetadataInfo currentMap = shareGroupPartitionMetadata.get(groupId); if (currentMap == null) { - topicPartitionMap.forEach((k, v) -> finalMap.put(k, Map.entry(metadataImage.topics().getTopic(k).name(), v))); return new CoordinatorResult<>( - List.of(newShareGroupStatePartitionMetadataRecord(group.groupId(), finalMap, Map.of())), + List.of(newShareGroupStatePartitionMetadataRecord(group.groupId(), Map.of(), attachTopicName(topicPartitionMap), Map.of())), null ); } - Set combinedTopicIdSet = new HashSet<>(topicPartitionMap.keySet()); - combinedTopicIdSet.addAll(currentMap.initializedTopics.keySet()); + // We must combine the existing information in the record with the topicPartitionMap argument so that the final + // record has up-to-date information. + Map> finalInitializedMap = mergeShareGroupInitMaps(currentMap.initializedTopics(), topicPartitionMap); - for (Uuid topicId : combinedTopicIdSet) { - String topicName = metadataImage.topics().getTopic(topicId).name(); - Set partitions = new HashSet<>(currentMap.initializedTopics.getOrDefault(topicId, new HashSet<>())); - if (topicPartitionMap.containsKey(topicId)) { - partitions.addAll(topicPartitionMap.get(topicId)); + // Fetch initializing info from state metadata. + Map> finalInitializingMap = new HashMap<>(currentMap.initializingTopics()); + + // Remove any entries which are already initialized. + for (Map.Entry> entry : topicPartitionMap.entrySet()) { + Uuid topicId = entry.getKey(); + if (finalInitializingMap.containsKey(topicId)) { + Set partitions = finalInitializingMap.get(topicId); + partitions.removeAll(entry.getValue()); + if (partitions.isEmpty()) { + finalInitializingMap.remove(topicId); + } + } + } + + return new CoordinatorResult<>(List.of( + newShareGroupStatePartitionMetadataRecord( + group.groupId(), + attachTopicName(finalInitializingMap), + attachTopicName(finalInitializedMap), + Map.of() + )), + null + ); + } + + /** + * Removes specific topic partitions from the initializing state for a share group. This is usually part of + * shareGroupHeartbeat code flow, specifically, if there is a persister exception. + * @param groupId The group id corresponding to the share group whose share partitions have been initialized. + * @param topicPartitionMap Map representing topic partition data to be cleaned from the share state partition metadata. + * + * @return A Result containing ShareGroupStatePartitionMetadata records and Void response. + */ + public CoordinatorResult uninitializeShareGroupState( + String groupId, + Map> topicPartitionMap + ) { + ShareGroupStatePartitionMetadataInfo info = shareGroupPartitionMetadata.get(groupId); + if (info == null || info.initializingTopics().isEmpty() || topicPartitionMap.isEmpty()) { + return new CoordinatorResult<>(List.of(), null); + } + + Map> initializingTopics = info.initializingTopics(); + Map> finalInitializingTopics = new HashMap<>(); + + for (Map.Entry> entry : initializingTopics.entrySet()) { + Uuid topicId = entry.getKey(); + // If topicId to clean is not present in topicPartitionMap map, retain it. + if (!topicPartitionMap.containsKey(topicId)) { + finalInitializingTopics.put(entry.getKey(), entry.getValue()); + } else { + Set partitions = new HashSet<>(entry.getValue()); + partitions.removeAll(topicPartitionMap.get(topicId)); + if (!partitions.isEmpty()) { + finalInitializingTopics.put(entry.getKey(), partitions); + } } - finalMap.computeIfAbsent(topicId, k -> Map.entry(topicName, partitions)); } return new CoordinatorResult<>( - List.of(newShareGroupStatePartitionMetadataRecord(group.groupId(), finalMap, Map.of())), + List.of( + newShareGroupStatePartitionMetadataRecord( + groupId, + attachTopicName(finalInitializingTopics), + attachTopicName(info.initializedTopics()), + Map.of() + ) + ), null ); } + /** + * Iterates over all share groups and returns persister initialize requests corresponding to any initializing + * topic partitions found in the group associated {@link ShareGroupStatePartitionMetadataInfo}. + * @param offset The last committed offset for the {@link ShareGroupStatePartitionMetadataInfo} timeline hashmap. + * + * @return A list containing {@link InitializeShareGroupStateParameters} requests, could be empty. + */ + public List reconcileShareGroupStateInitializingState(long offset) { + List requests = new LinkedList<>(); + for (Group group : groups.values()) { + if (!(group instanceof ShareGroup shareGroup)) { + continue; + } + if (!(shareGroupPartitionMetadata.containsKey(shareGroup.groupId()))) { + continue; + } + Map> initializing = shareGroupPartitionMetadata.get(shareGroup.groupId(), offset).initializingTopics(); + if (initializing == null || initializing.isEmpty()) { + continue; + } + requests.add(buildInitializeShareGroupStateRequest(shareGroup.groupId(), shareGroup.groupEpoch(), initializing)); + } + return requests; + } + + private Map>> attachTopicName(Map> initMap) { + TopicsImage topicsImage = metadataImage.topics(); + Map>> finalMap = new HashMap<>(); + for (Map.Entry> entry : initMap.entrySet()) { + Uuid topicId = entry.getKey(); + String topicName = topicsImage.getTopic(topicId).name(); + finalMap.put(topicId, Map.entry(topicName, entry.getValue())); + } + return Collections.unmodifiableMap(finalMap); + } + /** * Replays ConsumerGroupMemberMetadataKey/Value to update the hard state of * the consumer group. It updates the subscription part of the member or @@ -5687,8 +5824,10 @@ public class GroupMetadataManager { // Tombstone! shareGroupPartitionMetadata.remove(groupId); } else { - ShareGroupStatePartitionMetadataInfo info = new ShareGroupStatePartitionMetadataInfo( + value.initializingTopics().stream() + .map(topicPartitionInfo -> Map.entry(topicPartitionInfo.topicId(), new HashSet<>(topicPartitionInfo.partitions()))) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)), value.initializedTopics().stream() .map(topicPartitionInfo -> Map.entry(topicPartitionInfo.topicId(), new HashSet<>(topicPartitionInfo.partitions()))) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)), diff --git a/group-coordinator/src/main/resources/common/message/ShareGroupStatePartitionMetadataValue.json b/group-coordinator/src/main/resources/common/message/ShareGroupStatePartitionMetadataValue.json index 18a9df0885f..57ca75823d8 100644 --- a/group-coordinator/src/main/resources/common/message/ShareGroupStatePartitionMetadataValue.json +++ b/group-coordinator/src/main/resources/common/message/ShareGroupStatePartitionMetadataValue.json @@ -21,6 +21,8 @@ "validVersions": "0", "flexibleVersions": "0+", "fields": [ + { "name": "InitializingTopics", "versions": "0+", "type": "[]TopicPartitionsInfo", + "about": "The topic-partitions whose share-group state is being initialized." }, { "name": "InitializedTopics", "versions": "0+", "type": "[]TopicPartitionsInfo", "about": "The topics with initialized share-group state." }, { "name": "DeletingTopics", "versions": "0+", "type": "[]TopicInfo", diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java index 8d811eebfc5..2e67c1fa227 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java @@ -323,6 +323,7 @@ public class GroupCoordinatorRecordHelpersTest { CoordinatorRecord record = GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord( groupId, + Map.of(), Map.of( topicId1, Map.entry(topicName1, partitions) diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java index 68b16491798..2096ca4bf5e 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java @@ -97,6 +97,7 @@ import org.apache.kafka.server.share.persister.InitializeShareGroupStateResult; import org.apache.kafka.server.share.persister.NoOpStatePersister; import org.apache.kafka.server.share.persister.PartitionFactory; import org.apache.kafka.server.share.persister.PartitionIdData; +import org.apache.kafka.server.share.persister.PartitionStateData; import org.apache.kafka.server.share.persister.Persister; import org.apache.kafka.server.share.persister.ReadShareGroupStateSummaryParameters; import org.apache.kafka.server.share.persister.ReadShareGroupStateSummaryResult; @@ -139,7 +140,6 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -1943,7 +1943,7 @@ public class GroupCoordinatorServiceTest { Map.of("share-group-id-1", Map.entry(createDeleteShareRequest("share-group-id-1", shareGroupTopicId, List.of(0, 1)), Errors.NONE)) )).thenReturn(CompletableFuture.completedFuture(Map.of())); // non-share group - when(persister.deleteState(any())).thenReturn(CompletableFuture.completedFuture( + when(persister.deleteState(ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture( new DeleteShareGroupStateResult.Builder() .setTopicsData(List.of( new TopicData<>( @@ -1978,7 +1978,7 @@ public class GroupCoordinatorServiceTest { future.getNow(null); assertEquals(expectedResultCollection, future.get()); - verify(persister, times(1)).deleteState(any()); + verify(persister, times(1)).deleteState(ArgumentMatchers.any()); } @Test @@ -2027,7 +2027,7 @@ public class GroupCoordinatorServiceTest { Map.of("share-group-id-2", Map.entry(createDeleteShareRequest("share-group-id-2", shareGroupTopicId2, List.of(0, 1)), Errors.NONE)) )); - when(persister.deleteState(any())).thenReturn(CompletableFuture.completedFuture( + when(persister.deleteState(ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture( new DeleteShareGroupStateResult.Builder() .setTopicsData(List.of( new TopicData<>( @@ -2073,7 +2073,7 @@ public class GroupCoordinatorServiceTest { future.getNow(null); assertEquals(expectedResultCollection, future.get()); - verify(persister, times(2)).deleteState(any()); + verify(persister, times(2)).deleteState(ArgumentMatchers.any()); } @Test @@ -2125,7 +2125,7 @@ public class GroupCoordinatorServiceTest { future.getNow(null); assertEquals(expectedResultCollection, future.get()); - verify(persister, times(0)).deleteState(any()); + verify(persister, times(0)).deleteState(ArgumentMatchers.any()); } @Test @@ -2171,7 +2171,7 @@ public class GroupCoordinatorServiceTest { assertEquals(expectedResultCollection, future.get()); // If there is error creating share group delete req // neither persister call nor general delete groups call is made. - verify(persister, times(0)).deleteState(any()); + verify(persister, times(0)).deleteState(ArgumentMatchers.any()); verify(runtime, times(0)).scheduleWriteOperation( ArgumentMatchers.eq("delete-groups"), ArgumentMatchers.any(), @@ -2224,7 +2224,7 @@ public class GroupCoordinatorServiceTest { ArgumentMatchers.any() )).thenReturn(CompletableFuture.failedFuture(Errors.CLUSTER_AUTHORIZATION_FAILED.exception())); - when(persister.deleteState(any())).thenReturn(CompletableFuture.completedFuture(new DeleteShareGroupStateResult.Builder() + when(persister.deleteState(ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(new DeleteShareGroupStateResult.Builder() .setTopicsData(List.of( new TopicData<>( shareGroupTopicId, @@ -2242,7 +2242,7 @@ public class GroupCoordinatorServiceTest { future.getNow(null); assertEquals(expectedResultCollection, future.get()); - verify(persister, times(1)).deleteState(any()); + verify(persister, times(1)).deleteState(ArgumentMatchers.any()); } @ParameterizedTest @@ -3115,7 +3115,7 @@ public class GroupCoordinatorServiceTest { service.onNewMetadataImage(image, null); - when(mockPersister.initializeState(any())).thenReturn(CompletableFuture.completedFuture( + when(mockPersister.initializeState(ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture( new InitializeShareGroupStateResult.Builder() .setTopicsData(List.of( new TopicData<>(topicId, List.of( @@ -3165,7 +3165,7 @@ public class GroupCoordinatorServiceTest { Uuid topicId = Uuid.randomUuid(); Exception exp = new NotCoordinatorException("bad stuff"); - when(mockPersister.initializeState(any())).thenReturn(CompletableFuture.failedFuture(exp)); + when(mockPersister.initializeState(ArgumentMatchers.any())).thenReturn(CompletableFuture.failedFuture(exp)); when(runtime.scheduleWriteOperation( ArgumentMatchers.eq("initialize-share-group-state"), @@ -3174,6 +3174,13 @@ public class GroupCoordinatorServiceTest { ArgumentMatchers.any() )).thenReturn(CompletableFuture.completedFuture(null)); + when(runtime.scheduleWriteOperation( + ArgumentMatchers.eq("uninitialize-share-group-state"), + ArgumentMatchers.any(), + ArgumentMatchers.any(), + ArgumentMatchers.any() + )).thenReturn(CompletableFuture.completedFuture(null)); + ShareGroupHeartbeatResponseData defaultResponse = new ShareGroupHeartbeatResponseData(); InitializeShareGroupStateParameters params = new InitializeShareGroupStateParameters.Builder() .setGroupTopicPartitionData( @@ -3190,6 +3197,12 @@ public class GroupCoordinatorServiceTest { ArgumentMatchers.any(), ArgumentMatchers.any() ); + verify(runtime, times(1)).scheduleWriteOperation( + ArgumentMatchers.eq("uninitialize-share-group-state"), + ArgumentMatchers.any(), + ArgumentMatchers.any(), + ArgumentMatchers.any() + ); verify(mockPersister, times(1)).initializeState(ArgumentMatchers.any()); } @@ -3206,7 +3219,7 @@ public class GroupCoordinatorServiceTest { String groupId = "share-group"; Uuid topicId = Uuid.randomUuid(); - when(mockPersister.initializeState(any())).thenReturn(CompletableFuture.completedFuture( + when(mockPersister.initializeState(ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture( new InitializeShareGroupStateResult.Builder() .setTopicsData(List.of( new TopicData<>(topicId, List.of( @@ -3223,6 +3236,13 @@ public class GroupCoordinatorServiceTest { ArgumentMatchers.any() )).thenReturn(CompletableFuture.completedFuture(null)); + when(runtime.scheduleWriteOperation( + ArgumentMatchers.eq("uninitialize-share-group-state"), + ArgumentMatchers.any(), + ArgumentMatchers.any(), + ArgumentMatchers.any() + )).thenReturn(CompletableFuture.completedFuture(null)); + ShareGroupHeartbeatResponseData defaultResponse = new ShareGroupHeartbeatResponseData(); InitializeShareGroupStateParameters params = new InitializeShareGroupStateParameters.Builder() .setGroupTopicPartitionData( @@ -3242,6 +3262,12 @@ public class GroupCoordinatorServiceTest { ArgumentMatchers.any(), ArgumentMatchers.any() ); + verify(runtime, times(1)).scheduleWriteOperation( + ArgumentMatchers.eq("uninitialize-share-group-state"), + ArgumentMatchers.any(), + ArgumentMatchers.any(), + ArgumentMatchers.any() + ); verify(mockPersister, times(1)).initializeState(ArgumentMatchers.any()); } @@ -3264,7 +3290,7 @@ public class GroupCoordinatorServiceTest { service.onNewMetadataImage(image, null); - when(mockPersister.initializeState(any())).thenReturn(CompletableFuture.completedFuture( + when(mockPersister.initializeState(ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture( new InitializeShareGroupStateResult.Builder() .setTopicsData(List.of( new TopicData<>(topicId, List.of( @@ -3280,6 +3306,13 @@ public class GroupCoordinatorServiceTest { ArgumentMatchers.any() )).thenReturn(CompletableFuture.failedFuture(exp)); + when(runtime.scheduleWriteOperation( + ArgumentMatchers.eq("uninitialize-share-group-state"), + ArgumentMatchers.any(), + ArgumentMatchers.any(), + ArgumentMatchers.any() + )).thenReturn(CompletableFuture.completedFuture(null)); + ShareGroupHeartbeatResponseData defaultResponse = new ShareGroupHeartbeatResponseData(); InitializeShareGroupStateParameters params = new InitializeShareGroupStateParameters.Builder() .setGroupTopicPartitionData( @@ -3299,10 +3332,116 @@ public class GroupCoordinatorServiceTest { ArgumentMatchers.any(), ArgumentMatchers.any() ); - + verify(runtime, times(1)).scheduleWriteOperation( + ArgumentMatchers.eq("uninitialize-share-group-state"), + ArgumentMatchers.any(), + ArgumentMatchers.any(), + ArgumentMatchers.any() + ); verify(mockPersister, times(1)).initializeState(ArgumentMatchers.any()); } + @Test + public void testReconcileShareGroupInitializingStateNoRequests() { + CoordinatorRuntime runtime = mockRuntime(); + Persister mockPersister = mock(Persister.class); + GroupCoordinatorService service = new GroupCoordinatorServiceBuilder() + .setConfig(createConfig()) + .setRuntime(runtime) + .setPersister(mockPersister) + .build(true); + + when(runtime.scheduleReadAllOperation( + ArgumentMatchers.eq("reconcile-share-group-initializing-state"), + ArgumentMatchers.any() + )).thenReturn(List.of()); + + when(runtime.scheduleWriteOperation( + ArgumentMatchers.eq("initialize-share-group-state"), + ArgumentMatchers.any(), + ArgumentMatchers.any(), + ArgumentMatchers.any() + )).thenReturn(CompletableFuture.completedFuture(null)); + + service.reconcileShareGroupStateInitializingState().join(); + verify(runtime, times(1)).scheduleReadAllOperation( + ArgumentMatchers.eq("reconcile-share-group-initializing-state"), + ArgumentMatchers.any() + ); + verify(mockPersister, times(0)).initializeState(ArgumentMatchers.any()); + verify(runtime, times(0)).scheduleWriteOperation( + ArgumentMatchers.eq("initialize-share-group-state"), + ArgumentMatchers.any(), + ArgumentMatchers.any(), + ArgumentMatchers.any() + ); + } + + @Test + public void testReconcileShareGroupInitializingState() { + CoordinatorRuntime runtime = mockRuntime(); + Persister mockPersister = mock(Persister.class); + GroupCoordinatorService service = new GroupCoordinatorServiceBuilder() + .setConfig(createConfig()) + .setRuntime(runtime) + .setPersister(mockPersister) + .build(true); + + String groupId1 = "groupId1"; + String groupId2 = "groupId2"; + + Uuid topicId1 = Uuid.randomUuid(); + Uuid topicId2 = Uuid.randomUuid(); + + InitializeShareGroupStateParameters req1 = new InitializeShareGroupStateParameters.Builder() + .setGroupTopicPartitionData(new GroupTopicPartitionData.Builder() + .setGroupId(groupId1) + .setTopicsData(List.of(new TopicData<>(topicId1, List.of(PartitionFactory.newPartitionStateData(0, 1, 0))))) + .build()) + .build(); + + InitializeShareGroupStateParameters req2 = new InitializeShareGroupStateParameters.Builder() + .setGroupTopicPartitionData(new GroupTopicPartitionData.Builder() + .setGroupId(groupId2) + .setTopicsData(List.of(new TopicData<>(topicId2, List.of(PartitionFactory.newPartitionStateData(0, 2, 10))))) + .build()) + .build(); + + when(runtime.scheduleReadAllOperation( + ArgumentMatchers.eq("reconcile-share-group-initializing-state"), + ArgumentMatchers.any() + )).thenReturn(List.of( + CompletableFuture.completedFuture(List.of(req1)), + CompletableFuture.completedFuture(List.of(req2)) + )); + + when(runtime.scheduleWriteOperation( + ArgumentMatchers.eq("initialize-share-group-state"), + ArgumentMatchers.any(), + ArgumentMatchers.any(), + ArgumentMatchers.any() + )).thenReturn(CompletableFuture.completedFuture(null)); + + when(mockPersister.initializeState(ArgumentMatchers.eq(req1))).thenReturn(CompletableFuture.completedFuture(new InitializeShareGroupStateResult.Builder() + .setTopicsData(List.of(new TopicData<>(topicId1, List.of(PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), Errors.NONE.message()))))) + .build()) + ); + + when(mockPersister.initializeState(ArgumentMatchers.eq(req2))).thenReturn(CompletableFuture.completedFuture(new InitializeShareGroupStateResult.Builder() + .setTopicsData(List.of(new TopicData<>(topicId2, List.of(PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), Errors.NONE.message()))))) + .build()) + ); + + service.reconcileShareGroupStateInitializingState().join(); + verify(mockPersister, times(2)).initializeState(ArgumentMatchers.any()); + verify(runtime, times(2)).scheduleWriteOperation( + ArgumentMatchers.eq("initialize-share-group-state"), + ArgumentMatchers.any(), + ArgumentMatchers.any(), + ArgumentMatchers.any() + ); + } + @FunctionalInterface private interface TriFunction { R apply(A a, B b, C c); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index 94464eb42fd..2a0df23c1ab 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -148,6 +148,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -15226,12 +15227,34 @@ public class GroupMetadataManagerTest { mkTopicAssignment(barTopicId, 0, 1, 2) )), GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentEpochRecord(groupId, 1), - GroupCoordinatorRecordHelpers.newShareGroupCurrentAssignmentRecord(groupId, expectedMember) + GroupCoordinatorRecordHelpers.newShareGroupCurrentAssignmentRecord(groupId, expectedMember), + GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord(groupId, mkShareGroupStateMap(List.of( + mkShareGroupStateMetadataEntry(fooTopicId, fooTopicName, List.of(0, 1, 2, 3, 4, 5)), + mkShareGroupStateMetadataEntry(barTopicId, barTopicName, List.of(0, 1, 2)) + )), + Map.of(), + Map.of() + ) ); assertRecordsEquals(expectedRecords, result.records()); } + private Map>> mkShareGroupStateMap(List>>> entries) { + Map>> map = new HashMap<>(); + for (Map.Entry>> entry : entries) { + map.put(entry.getKey(), entry.getValue()); + } + return map; + } + + private Map.Entry>> mkShareGroupStateMetadataEntry(Uuid topicId, String topicName, List partitions) { + return Map.entry( + topicId, + Map.entry(topicName, new LinkedHashSet<>(partitions)) + ); + }; + @Test public void testShareGroupLeavingMemberBumpsGroupEpoch() { String groupId = "fooup"; @@ -20465,6 +20488,16 @@ public class GroupMetadataManagerTest { .setMemberEpoch(0) .setSubscribedTopicNames(List.of(t1Name, t2Name))); + assertTrue(result.records().contains( + newShareGroupStatePartitionMetadataRecord(groupId, mkShareGroupStateMap(List.of( + mkShareGroupStateMetadataEntry(t1Uuid, t1Name, List.of(0, 1)), + mkShareGroupStateMetadataEntry(t2Uuid, t2Name, List.of(0, 1)) + )), + Map.of(), + Map.of() + )) + ); + verifyShareGroupHeartbeatInitializeRequest( result.response().getValue(), Map.of( @@ -20524,6 +20557,18 @@ public class GroupMetadataManagerTest { .setMemberEpoch(1) .setSubscribedTopicNames(null)); + assertTrue(result.records().contains( + newShareGroupStatePartitionMetadataRecord(groupId, mkShareGroupStateMap(List.of( + mkShareGroupStateMetadataEntry(t1Uuid, t1Name, List.of(2, 3)) + )), + mkShareGroupStateMap(List.of( + mkShareGroupStateMetadataEntry(t1Uuid, t1Name, List.of(0, 1)), + mkShareGroupStateMetadataEntry(t2Uuid, t2Name, List.of(0, 1)) + )), + Map.of() + )) + ); + verifyShareGroupHeartbeatInitializeRequest( result.response().getValue(), Map.of( @@ -20536,6 +20581,69 @@ public class GroupMetadataManagerTest { ); } + @Test + public void testShareGroupHeartbeatNoPersisterRequestWithInitializing() { + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + assignor.prepareGroupAssignment(new GroupAssignment(Map.of())); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withShareGroupAssignor(assignor) + .build(); + + Uuid t1Uuid = Uuid.randomUuid(); + String t1Name = "t1"; + MetadataImage image = new MetadataImageBuilder() + .addTopic(t1Uuid, t1Name, 2) + .build(); + + String groupId = "share-group"; + + context.groupMetadataManager.onNewMetadataImage(image, mock(MetadataDelta.class)); + context.groupMetadataManager.replay( + new ShareGroupMetadataKey() + .setGroupId(groupId), + new ShareGroupMetadataValue() + .setEpoch(0) + ); + context.groupMetadataManager.replay( + new ShareGroupStatePartitionMetadataKey() + .setGroupId(groupId), + new ShareGroupStatePartitionMetadataValue() + .setInitializingTopics(List.of( + new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo() + .setTopicId(t1Uuid) + .setTopicName(t1Name) + .setPartitions(List.of(0, 1)) + )) + .setInitializedTopics(List.of()) + .setDeletingTopics(List.of()) + ); + + Uuid memberId = Uuid.randomUuid(); + CoordinatorResult>, CoordinatorRecord> result = context.shareGroupHeartbeat( + new ShareGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId.toString()) + .setMemberEpoch(0) + .setSubscribedTopicNames(List.of(t1Name))); + + assertFalse(result.records().contains( + newShareGroupStatePartitionMetadataRecord(groupId, mkShareGroupStateMap(List.of( + mkShareGroupStateMetadataEntry(t1Uuid, t1Name, List.of(0, 1)) + )), + Map.of(), + Map.of() + )) + ); + + verifyShareGroupHeartbeatInitializeRequest( + result.response().getValue(), + Map.of(), + groupId, + 0, + false + ); + } + @Test public void testShareGroupInitializeSuccess() { String groupId = "groupId"; @@ -20574,7 +20682,7 @@ public class GroupMetadataManagerTest { CoordinatorResult result = context.groupMetadataManager.initializeShareGroupState(groupId, snapshotMetadataInitializeMap); - CoordinatorRecord record = newShareGroupStatePartitionMetadataRecord(groupId, snapshotMetadataInitializeRecordMap, Map.of()); + CoordinatorRecord record = newShareGroupStatePartitionMetadataRecord(groupId, Map.of(), snapshotMetadataInitializeRecordMap, Map.of()); assertNull(result.response()); assertEquals(List.of(record), result.records()); @@ -20620,69 +20728,30 @@ public class GroupMetadataManagerTest { .withShareGroupAssignor(assignor) .build(); - // Empty on empty metadata image - MetadataImage image = MetadataImage.EMPTY; - MetadataDelta delta = new MetadataDelta.Builder() - .setImage(image) - .build(); - context.groupMetadataManager.onNewMetadataImage(image, delta); - assertEquals( - Map.of(), - context.groupMetadataManager.subscribedTopicsChangeMap(groupId, Map.of( - topicName, new TopicMetadata(topicId, topicName, partitions) - )) - ); - // Empty on empty subscription metadata - image = new MetadataImageBuilder() - .addTopic(topicId, topicName, partitions) - .build(); - - delta = new MetadataDelta.Builder() - .setImage(image) - .build(); - context.groupMetadataManager.onNewMetadataImage(image, delta); assertEquals( Map.of(), context.groupMetadataManager.subscribedTopicsChangeMap(groupId, Map.of()) ); // No error on empty initialized metadata (no replay of initialized topics) - image = new MetadataImageBuilder() - .addTopic(topicId, topicName, partitions) - .build(); - - delta = new MetadataDelta.Builder() - .setImage(image) - .build(); - context.groupMetadataManager.onNewMetadataImage(image, delta); assertEquals( Map.of( - topicId, Map.entry( - topicName, - Set.of(0) - ) + topicId, Set.of(0) ), context.groupMetadataManager.subscribedTopicsChangeMap(groupId, Map.of( topicName, new TopicMetadata(topicId, topicName, partitions) )) ); - // Calculates correct diff + // Calculates correct diff respecting both initialized and initializing maps. String t1Name = "t1"; Uuid t1Id = Uuid.randomUuid(); String t2Name = "t2"; Uuid t2Id = Uuid.randomUuid(); + String t3Name = "t3"; + Uuid t3Id = Uuid.randomUuid(); - image = new MetadataImageBuilder() - .addTopic(t1Id, t1Name, 2) - .addTopic(t2Id, t2Name, 2) - .build(); - - delta = new MetadataDelta.Builder() - .setImage(image) - .build(); - context.groupMetadataManager.onNewMetadataImage(image, delta); context.groupMetadataManager.replay( new ShareGroupMetadataKey() .setGroupId(groupId), @@ -20693,30 +20762,117 @@ public class GroupMetadataManagerTest { new ShareGroupStatePartitionMetadataKey() .setGroupId(groupId), new ShareGroupStatePartitionMetadataValue() - .setInitializedTopics(List.of( + .setInitializingTopics(List.of( new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo() .setTopicId(t1Id) .setTopicName(t1Name) .setPartitions(List.of(0, 1)) )) + .setInitializedTopics(List.of( + new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo() + .setTopicId(t2Id) + .setTopicName(t2Name) + .setPartitions(List.of(0, 1, 2)) + )) .setDeletingTopics(List.of()) ); - // Since t1 is already initialized due to replay above + // Since t1 is initializing and t2 is initialized due to replay above. assertEquals( Map.of( - t2Id, Map.entry( - t2Name, - Set.of(0, 1) - ) + t3Id, Set.of(0, 1, 2) ), context.groupMetadataManager.subscribedTopicsChangeMap(groupId, Map.of( t1Name, new TopicMetadata(t1Id, t1Name, 2), - t2Name, new TopicMetadata(t2Id, t2Name, 2) + t2Name, new TopicMetadata(t2Id, t2Name, 2), + t3Name, new TopicMetadata(t3Id, t3Name, 3) )) ); } + @Test + public void testUninitializeTopics() { + MockPartitionAssignor assignor = new MockPartitionAssignor("simple"); + assignor.prepareGroupAssignment(new GroupAssignment(Map.of())); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withShareGroupAssignor(assignor) + .build(); + + String groupId = "shareGroupId"; + Uuid t1Id = Uuid.randomUuid(); + String t1Name = "t1Name"; + Uuid t2Id = Uuid.randomUuid(); + String t2Name = "t2Name"; + + // No records if topics to be uninitialized are not in metadata info. + CoordinatorResult result = context.groupMetadataManager.uninitializeShareGroupState(groupId, Map.of(t1Id, Set.of(0))); + assertEquals( + List.of(), + result.records() + ); + + MetadataImage image = new MetadataImageBuilder() + .addTopic(t1Id, t1Name, 2) + .addTopic(t2Id, t2Name, 3) + .build(); + + MetadataDelta delta = new MetadataDelta(image); + context.groupMetadataManager.onNewMetadataImage(image, delta); + + // Cleanup happens from initialzing state only. + context.groupMetadataManager.replay( + new ShareGroupMetadataKey() + .setGroupId(groupId), + new ShareGroupMetadataValue() + .setEpoch(0) + ); + context.groupMetadataManager.replay( + new ShareGroupStatePartitionMetadataKey() + .setGroupId(groupId), + new ShareGroupStatePartitionMetadataValue() + .setInitializingTopics(List.of( + new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo() + .setTopicId(t1Id) + .setTopicName(t1Name) + .setPartitions(List.of(0, 1)) + )) + .setInitializedTopics(List.of( + new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo() + .setTopicId(t2Id) + .setTopicName(t2Name) + .setPartitions(List.of(0, 1, 2)) + )) + .setDeletingTopics(List.of()) + ); + result = context.groupMetadataManager.uninitializeShareGroupState(groupId, Map.of(t1Id, Set.of(0, 1))); + Set partitions = new LinkedHashSet<>(List.of(0, 1, 2)); + assertEquals( + List.of(newShareGroupStatePartitionMetadataRecord(groupId, Map.of(), Map.of(t2Id, Map.entry(t2Name, partitions)), Map.of())), + result.records() + ); + } + + @Test + public void testMergeShareGroupInitMaps() { + Map> m1 = new HashMap<>(); + Map> m2 = new HashMap<>(); + + Uuid t1 = Uuid.randomUuid(); + Uuid t2 = Uuid.randomUuid(); + Uuid t3 = Uuid.randomUuid(); + + m1.put(t1, new HashSet<>(List.of(1, 2))); + m1.put(t2, new HashSet<>(List.of(3, 4))); + m2.put(t1, new HashSet<>(List.of(3, 4))); + m2.put(t3, new HashSet<>(List.of(5, 6))); + + Map> m3 = GroupMetadataManager.mergeShareGroupInitMaps(m1, m2); + // The arg maps should not be overridden. + assertEquals(Map.of(t1, Set.of(1, 2), t2, Set.of(3, 4)), m1); + assertEquals(Map.of(t1, Set.of(3, 4), t3, Set.of(5, 6)), m2); + assertEquals(Map.of(t1, Set.of(1, 2, 3, 4), t2, Set.of(3, 4), t3, Set.of(5, 6)), m3); + } + private static void checkJoinGroupResponse( JoinGroupResponseData expectedResponse, JoinGroupResponseData actualResponse, diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java index 846c0a3d475..3818889aa95 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java @@ -84,6 +84,8 @@ import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataKey; import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataValue; import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataKey; import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataValue; import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberKey; import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberValue; import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataKey; @@ -1701,6 +1703,13 @@ public class GroupMetadataManagerTestContext { ); break; + case SHARE_GROUP_STATE_PARTITION_METADATA: + groupMetadataManager.replay( + (ShareGroupStatePartitionMetadataKey) key, + (ShareGroupStatePartitionMetadataValue) messageOrNull(value) + ); + break; + case CONSUMER_GROUP_REGULAR_EXPRESSION: groupMetadataManager.replay( (ConsumerGroupRegularExpressionKey) key,