KAFKA-18827: Incorporate initializing topics in share group heartbeat [4/N] (#19339)

* Currently, when we get a heartbeat with new share partition
subscriptions, we return an initialize request to the caller which the
caller executes asynchronously as a timer task.
* Meanwhile, if a new heartbeat request comes with same or null
subscription - the same initialize request will be returned since the
`GroupMetadataManager` has no idea about the older in flight request.
* In this PR, we have added a new field to the
`ShareGroupStatePartitionMetadata` record `initializingTopics` where
this information can be recorded in the GMM. Consequently, the
subsequent heartbeats can check this field and not return duplicate
initialize requests.
* If any errors are encountered while initializing by the
`GroupCoordinatorService` an additional method
`uninitializeShareGroupState` has been added which will remove the
requisite info from the `initializingFields`.
* New tests have been added wherever applicable and older ones updated.

Reviewers: Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
Sushant Mahajan 2025-04-03 14:08:20 +05:30 committed by GitHub
parent be80e3cb8a
commit 37f7434eac
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 660 additions and 131 deletions

View File

@ -824,9 +824,17 @@ public class GroupCoordinatorRecordHelpers {
*/ */
public static CoordinatorRecord newShareGroupStatePartitionMetadataRecord( public static CoordinatorRecord newShareGroupStatePartitionMetadataRecord(
String groupId, String groupId,
Map<Uuid, Map.Entry<String, Set<Integer>>> initializingTopics,
Map<Uuid, Map.Entry<String, Set<Integer>>> initializedTopics, Map<Uuid, Map.Entry<String, Set<Integer>>> initializedTopics,
Map<Uuid, String> deletingTopics Map<Uuid, String> deletingTopics
) { ) {
List<ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo> initializingTopicPartitionInfo = initializingTopics.entrySet().stream()
.map(entry -> new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
.setTopicId(entry.getKey())
.setTopicName(entry.getValue().getKey())
.setPartitions(entry.getValue().getValue().stream().toList()))
.toList();
List<ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo> initializedTopicPartitionInfo = initializedTopics.entrySet().stream() List<ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo> initializedTopicPartitionInfo = initializedTopics.entrySet().stream()
.map(entry -> new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo() .map(entry -> new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
.setTopicId(entry.getKey()) .setTopicId(entry.getKey())
@ -845,6 +853,7 @@ public class GroupCoordinatorRecordHelpers {
.setGroupId(groupId), .setGroupId(groupId),
new ApiMessageAndVersion( new ApiMessageAndVersion(
new ShareGroupStatePartitionMetadataValue() new ShareGroupStatePartitionMetadataValue()
.setInitializingTopics(initializingTopicPartitionInfo)
.setInitializedTopics(initializedTopicPartitionInfo) .setInitializedTopics(initializedTopicPartitionInfo)
.setDeletingTopics(deletingTopicsInfo), .setDeletingTopics(deletingTopicsInfo),
(short) 0 (short) 0

View File

@ -447,9 +447,8 @@ public class GroupCoordinatorService implements GroupCoordinator {
Duration.ofMillis(config.offsetCommitTimeoutMs()), Duration.ofMillis(config.offsetCommitTimeoutMs()),
coordinator -> coordinator.shareGroupHeartbeat(context, request) coordinator -> coordinator.shareGroupHeartbeat(context, request)
).thenCompose(result -> { ).thenCompose(result -> {
// This ensures that the previous group write has completed successfully
// before we start the persister initialize phase.
if (result.getValue().isPresent()) { if (result.getValue().isPresent()) {
// Adding to timer makes this call async with respect to the heartbeat.
timer.add(new TimerTask(0L) { timer.add(new TimerTask(0L) {
@Override @Override
public void run() { public void run() {
@ -475,16 +474,19 @@ public class GroupCoordinatorService implements GroupCoordinator {
ShareGroupHeartbeatResponseData defaultResponse ShareGroupHeartbeatResponseData defaultResponse
) { ) {
return persister.initializeState(request) return persister.initializeState(request)
.thenCompose( .handle((response, exp) -> {
response -> handlePersisterInitializeResponse(request.groupTopicPartitionData().groupId(), response, defaultResponse) if (exp == null) {
).exceptionally(exception -> { return handlePersisterInitializeResponse(request.groupTopicPartitionData().groupId(), response, defaultResponse);
}
GroupTopicPartitionData<PartitionStateData> gtp = request.groupTopicPartitionData(); GroupTopicPartitionData<PartitionStateData> gtp = request.groupTopicPartitionData();
log.error("Unable to initialize share group state {}, {}", gtp.groupId(), gtp.topicsData(), exception); log.error("Unable to initialize share group state {}, {}", gtp.groupId(), gtp.topicsData(), exp);
Errors error = Errors.forException(exception); Errors error = Errors.forException(exp);
return new ShareGroupHeartbeatResponseData() Map<Uuid, Set<Integer>> topicPartitionMap = new HashMap<>();
.setErrorCode(error.code()) gtp.topicsData().forEach(topicData -> topicPartitionMap.computeIfAbsent(topicData.topicId(), k -> new HashSet<>())
.setErrorMessage(error.message()); .addAll(topicData.partitions().stream().map(PartitionStateData::partition).collect(Collectors.toSet())));
}); return uninitializeShareGroupState(error, gtp.groupId(), topicPartitionMap);
})
.thenCompose(resp -> resp);
} }
private CompletableFuture<ShareGroupHeartbeatResponseData> handlePersisterInitializeResponse( private CompletableFuture<ShareGroupHeartbeatResponseData> handlePersisterInitializeResponse(
@ -501,7 +503,6 @@ public class GroupCoordinatorService implements GroupCoordinator {
} }
} }
if (persisterError.code() == Errors.NONE.code()) {
Map<Uuid, Set<Integer>> topicPartitionMap = new HashMap<>(); Map<Uuid, Set<Integer>> topicPartitionMap = new HashMap<>();
for (TopicData<PartitionErrorData> topicData : persisterInitializeResult.topicsData()) { for (TopicData<PartitionErrorData> topicData : persisterInitializeResult.topicsData()) {
topicPartitionMap.put( topicPartitionMap.put(
@ -509,18 +510,37 @@ public class GroupCoordinatorService implements GroupCoordinator {
topicData.partitions().stream().map(PartitionErrorData::partition).collect(Collectors.toSet()) topicData.partitions().stream().map(PartitionErrorData::partition).collect(Collectors.toSet())
); );
} }
if (persisterError.code() == Errors.NONE.code()) {
if (topicPartitionMap.isEmpty()) { if (topicPartitionMap.isEmpty()) {
return CompletableFuture.completedFuture(defaultResponse); return CompletableFuture.completedFuture(defaultResponse);
} }
return performShareGroupStateMetadataInitialize(groupId, topicPartitionMap, defaultResponse); return performShareGroupStateMetadataInitialize(groupId, topicPartitionMap, defaultResponse);
} else {
log.error("Received error while calling initialize state for {} on persister {}.", groupId, persisterError.code());
return CompletableFuture.completedFuture(
new ShareGroupHeartbeatResponseData()
.setErrorCode(persisterError.code())
.setErrorMessage(persisterError.message())
);
} }
log.error("Received error while calling initialize state for {} on persister {}.", groupId, persisterError.code());
return uninitializeShareGroupState(persisterError, groupId, topicPartitionMap);
}
private CompletableFuture<ShareGroupHeartbeatResponseData> uninitializeShareGroupState(
Errors error,
String groupId,
Map<Uuid, Set<Integer>> topicPartitionMap
) {
return runtime.scheduleWriteOperation(
"uninitialize-share-group-state",
topicPartitionFor(groupId),
Duration.ofMillis(config.offsetCommitTimeoutMs()),
coordinator -> coordinator.uninitializeShareGroupState(groupId, topicPartitionMap)
).thenApply(__ -> new ShareGroupHeartbeatResponseData()
.setErrorCode(error.code())
.setErrorMessage(error.message())
).exceptionally(exception -> {
log.error("Unable to cleanup topic partitions from share group state metadata", exception);
Errors err = Errors.forException(new IllegalStateException("Unable to cleanup topic partitions from share group state metadata", exception));
return new ShareGroupHeartbeatResponseData()
.setErrorCode(err.code())
.setErrorMessage(err.message());
});
} }
private CompletableFuture<ShareGroupHeartbeatResponseData> performShareGroupStateMetadataInitialize( private CompletableFuture<ShareGroupHeartbeatResponseData> performShareGroupStateMetadataInitialize(
@ -533,15 +553,38 @@ public class GroupCoordinatorService implements GroupCoordinator {
topicPartitionFor(groupId), topicPartitionFor(groupId),
Duration.ofMillis(config.offsetCommitTimeoutMs()), Duration.ofMillis(config.offsetCommitTimeoutMs()),
coordinator -> coordinator.initializeShareGroupState(groupId, topicPartitionMap) coordinator -> coordinator.initializeShareGroupState(groupId, topicPartitionMap)
).thenApply( ).handle((__, exp) -> {
__ -> defaultResponse if (exp == null) {
).exceptionally(exception -> { return CompletableFuture.completedFuture(defaultResponse);
log.error("Unable to initialize share group state partition metadata for {}.", groupId, exception); }
Errors error = Errors.forException(exception); log.error("Unable to initialize share group state partition metadata for {}.", groupId, exp);
return new ShareGroupHeartbeatResponseData() Errors error = Errors.forException(exp);
.setErrorCode(error.code()) return uninitializeShareGroupState(error, groupId, topicPartitionMap);
.setErrorMessage(error.message()); }).thenCompose(resp -> resp);
}
// Visibility for testing
CompletableFuture<Void> reconcileShareGroupStateInitializingState() {
List<CompletableFuture<List<InitializeShareGroupStateParameters>>> requestsStages = runtime.scheduleReadAllOperation(
"reconcile-share-group-initializing-state",
GroupCoordinatorShard::reconcileShareGroupStateInitializingState
);
if (requestsStages.isEmpty()) {
log.debug("Nothing to reconcile for share group initializing state.");
return CompletableFuture.completedFuture(null);
}
CompletableFuture<Void> allRequestsStage = CompletableFuture.allOf(requestsStages.toArray(new CompletableFuture<?>[0]));
final List<CompletableFuture<ShareGroupHeartbeatResponseData>> persisterResponses = new ArrayList<>();
allRequestsStage.thenApply(__ -> {
requestsStages.forEach(requestsStage -> requestsStage.join().forEach(request -> {
log.debug("Reconciling initializing state - {}", request);
persisterResponses.add(persisterInitialize(request, new ShareGroupHeartbeatResponseData()));
}));
return null;
}); });
return CompletableFuture.allOf(persisterResponses.toArray(new CompletableFuture<?>[0]));
} }
/** /**
@ -1586,6 +1629,13 @@ public class GroupCoordinatorService implements GroupCoordinator {
new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupMetadataPartitionIndex), new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupMetadataPartitionIndex),
groupMetadataPartitionLeaderEpoch groupMetadataPartitionLeaderEpoch
); );
// Wait for reconciliation to complete.
try {
reconcileShareGroupStateInitializingState().join();
} catch (Exception e) {
log.error("Share group reconciliation failed", e);
}
} }
/** /**

View File

@ -442,6 +442,30 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord
return groupMetadataManager.initializeShareGroupState(groupId, topicPartitionMap); return groupMetadataManager.initializeShareGroupState(groupId, topicPartitionMap);
} }
/**
* Removes specific topic partitions from the initializing state for a share group. This is usually part of
* shareGroupHeartbeat code flow, specifically, if there is a persister exception.
* @param groupId The group id corresponding to the share group whose share partitions have been initialized.
* @param topicPartitionMap Map representing topic partition data to be cleaned from the share state partition metadata.
*
* @return A Result containing ShareGroupStatePartitionMetadata records and Void response.
*/
public CoordinatorResult<Void, CoordinatorRecord> uninitializeShareGroupState(
String groupId,
Map<Uuid, Set<Integer>> topicPartitionMap
) {
return groupMetadataManager.uninitializeShareGroupState(groupId, topicPartitionMap);
}
/**
* Reconcile initializing and initialized tps in share group state metadata records.
*
* @return A Result containing ShareGroupStatePartitionMetadata records and Void response.
*/
public List<InitializeShareGroupStateParameters> reconcileShareGroupStateInitializingState(long offset) {
return groupMetadataManager.reconcileShareGroupStateInitializingState(offset);
}
/** /**
* Handles a JoinGroup request. * Handles a JoinGroup request.
* *

View File

@ -187,6 +187,7 @@ import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
@ -514,6 +515,7 @@ public class GroupMetadataManager {
* @param deletingTopics Set of topic ids. * @param deletingTopics Set of topic ids.
*/ */
private record ShareGroupStatePartitionMetadataInfo( private record ShareGroupStatePartitionMetadataInfo(
Map<Uuid, Set<Integer>> initializingTopics,
Map<Uuid, Set<Integer>> initializedTopics, Map<Uuid, Set<Integer>> initializedTopics,
Set<Uuid> deletingTopics Set<Uuid> deletingTopics
) { ) {
@ -2830,7 +2832,7 @@ public class GroupMetadataManager {
records, records,
Map.entry( Map.entry(
response, response,
maybeCreateInitializeShareGroupStateRequest(groupId, groupEpoch, subscriptionMetadata) maybeCreateInitializeShareGroupStateRequest(groupId, groupEpoch, subscriptionMetadata, records)
) )
); );
} }
@ -2868,15 +2870,14 @@ public class GroupMetadataManager {
* @return A map of topic partitions which are subscribed by the share group but not initialized yet. * @return A map of topic partitions which are subscribed by the share group but not initialized yet.
*/ */
// Visibility for testing // Visibility for testing
Map<Uuid, Map.Entry<String, Set<Integer>>> subscribedTopicsChangeMap(String groupId, Map<String, TopicMetadata> subscriptionMetadata) { Map<Uuid, Set<Integer>> subscribedTopicsChangeMap(String groupId, Map<String, TopicMetadata> subscriptionMetadata) {
TopicsImage topicsImage = metadataImage.topics(); if (subscriptionMetadata == null || subscriptionMetadata.isEmpty()) {
if (topicsImage == null || topicsImage.isEmpty() || subscriptionMetadata == null || subscriptionMetadata.isEmpty()) {
return Map.of(); return Map.of();
} }
Map<Uuid, Map.Entry<String, Set<Integer>>> topicPartitionChangeMap = new HashMap<>(); Map<Uuid, Set<Integer>> topicPartitionChangeMap = new HashMap<>();
ShareGroupStatePartitionMetadataInfo info = shareGroupPartitionMetadata.get(groupId); ShareGroupStatePartitionMetadataInfo info = shareGroupPartitionMetadata.get(groupId);
Map<Uuid, Set<Integer>> alreadyInitialized = info == null ? Map.of() : info.initializedTopics(); Map<Uuid, Set<Integer>> alreadyInitialized = info == null ? new HashMap<>() : mergeShareGroupInitMaps(info.initializedTopics(), info.initializingTopics());
subscriptionMetadata.forEach((topicName, topicMetadata) -> { subscriptionMetadata.forEach((topicName, topicMetadata) -> {
Set<Integer> alreadyInitializedPartSet = alreadyInitialized.getOrDefault(topicMetadata.id(), Set.of()); Set<Integer> alreadyInitializedPartSet = alreadyInitialized.getOrDefault(topicMetadata.id(), Set.of());
@ -2884,10 +2885,7 @@ public class GroupMetadataManager {
Set<Integer> partitionSet = IntStream.range(0, topicMetadata.numPartitions()).boxed().collect(Collectors.toSet()); Set<Integer> partitionSet = IntStream.range(0, topicMetadata.numPartitions()).boxed().collect(Collectors.toSet());
partitionSet.removeAll(alreadyInitializedPartSet); partitionSet.removeAll(alreadyInitializedPartSet);
topicPartitionChangeMap.computeIfAbsent(topicMetadata.id(), k -> Map.entry( topicPartitionChangeMap.computeIfAbsent(topicMetadata.id(), k -> partitionSet);
topicName,
partitionSet
));
} }
}); });
return topicPartitionChangeMap; return topicPartitionChangeMap;
@ -2905,31 +2903,80 @@ public class GroupMetadataManager {
private Optional<InitializeShareGroupStateParameters> maybeCreateInitializeShareGroupStateRequest( private Optional<InitializeShareGroupStateParameters> maybeCreateInitializeShareGroupStateRequest(
String groupId, String groupId,
int groupEpoch, int groupEpoch,
Map<String, TopicMetadata> subscriptionMetadata Map<String, TopicMetadata> subscriptionMetadata,
List<CoordinatorRecord> records
) { ) {
if (subscriptionMetadata == null || subscriptionMetadata.isEmpty() || metadataImage.isEmpty()) { if (subscriptionMetadata == null || subscriptionMetadata.isEmpty() || metadataImage.isEmpty()) {
return Optional.empty(); return Optional.empty();
} }
Map<Uuid, Map.Entry<String, Set<Integer>>> topicPartitionChangeMap = subscribedTopicsChangeMap(groupId, subscriptionMetadata); Map<Uuid, Set<Integer>> topicPartitionChangeMap = subscribedTopicsChangeMap(groupId, subscriptionMetadata);
// Nothing to initialize. // Nothing to initialize.
if (topicPartitionChangeMap.isEmpty()) { if (topicPartitionChangeMap.isEmpty()) {
return Optional.empty(); return Optional.empty();
} }
return Optional.of(new InitializeShareGroupStateParameters.Builder().setGroupTopicPartitionData( addInitializingTopicsRecords(groupId, records, topicPartitionChangeMap);
new GroupTopicPartitionData<>(groupId, topicPartitionChangeMap.entrySet().stream() return Optional.of(buildInitializeShareGroupStateRequest(groupId, groupEpoch, topicPartitionChangeMap));
}
private InitializeShareGroupStateParameters buildInitializeShareGroupStateRequest(String groupId, int groupEpoch, Map<Uuid, Set<Integer>> topicPartitions) {
return new InitializeShareGroupStateParameters.Builder().setGroupTopicPartitionData(
new GroupTopicPartitionData<>(groupId, topicPartitions.entrySet().stream()
.map(entry -> new TopicData<>( .map(entry -> new TopicData<>(
entry.getKey(), entry.getKey(),
entry.getValue().getValue().stream() entry.getValue().stream()
.map(partitionId -> PartitionFactory.newPartitionStateData(partitionId, groupEpoch, -1)) .map(partitionId -> PartitionFactory.newPartitionStateData(partitionId, groupEpoch, -1))
.toList()) .toList())
).toList() ).toList()
)).build() )).build();
}
private void addInitializingTopicsRecords(String groupId, List<CoordinatorRecord> records, Map<Uuid, Set<Integer>> topicPartitionMap) {
if (topicPartitionMap == null || topicPartitionMap.isEmpty()) {
return;
}
ShareGroupStatePartitionMetadataInfo currentMap = shareGroupPartitionMetadata.get(groupId);
if (currentMap == null) {
records.add(newShareGroupStatePartitionMetadataRecord(groupId, attachTopicName(topicPartitionMap), Map.of(), Map.of()));
return;
}
// We must combine the existing information in the record with the topicPartitionMap argument.
Map<Uuid, Set<Integer>> finalInitializingMap = mergeShareGroupInitMaps(currentMap.initializingTopics(), topicPartitionMap);
records.add(
newShareGroupStatePartitionMetadataRecord(
groupId,
attachTopicName(finalInitializingMap),
attachTopicName(currentMap.initializedTopics()),
Map.of()
)
); );
} }
// Visibility for tests
static Map<Uuid, Set<Integer>> mergeShareGroupInitMaps(
Map<Uuid, Set<Integer>> existingShareGroupInitMap,
Map<Uuid, Set<Integer>> newShareGroupInitMap
) {
Map<Uuid, Set<Integer>> finalInitMap = new HashMap<>();
Set<Uuid> combinedTopicIdSet = new HashSet<>(existingShareGroupInitMap.keySet());
combinedTopicIdSet.addAll(newShareGroupInitMap.keySet());
for (Uuid topicId : combinedTopicIdSet) {
Set<Integer> partitions = new HashSet<>(existingShareGroupInitMap.getOrDefault(topicId, new HashSet<>()));
if (newShareGroupInitMap.containsKey(topicId)) {
partitions.addAll(newShareGroupInitMap.get(topicId));
}
finalInitMap.putIfAbsent(topicId, partitions);
}
return finalInitMap;
}
/** /**
* Gets or subscribes a new dynamic consumer group member. * Gets or subscribes a new dynamic consumer group member.
* *
@ -4870,37 +4917,127 @@ public class GroupMetadataManager {
} }
ShareGroup group = (ShareGroup) groups.get(groupId); ShareGroup group = (ShareGroup) groups.get(groupId);
// We must combine the existing information in the record with the
// topicPartitionMap argument.
Map<Uuid, Map.Entry<String, Set<Integer>>> finalMap = new HashMap<>();
ShareGroupStatePartitionMetadataInfo currentMap = shareGroupPartitionMetadata.get(groupId); ShareGroupStatePartitionMetadataInfo currentMap = shareGroupPartitionMetadata.get(groupId);
if (currentMap == null) { if (currentMap == null) {
topicPartitionMap.forEach((k, v) -> finalMap.put(k, Map.entry(metadataImage.topics().getTopic(k).name(), v)));
return new CoordinatorResult<>( return new CoordinatorResult<>(
List.of(newShareGroupStatePartitionMetadataRecord(group.groupId(), finalMap, Map.of())), List.of(newShareGroupStatePartitionMetadataRecord(group.groupId(), Map.of(), attachTopicName(topicPartitionMap), Map.of())),
null null
); );
} }
Set<Uuid> combinedTopicIdSet = new HashSet<>(topicPartitionMap.keySet()); // We must combine the existing information in the record with the topicPartitionMap argument so that the final
combinedTopicIdSet.addAll(currentMap.initializedTopics.keySet()); // record has up-to-date information.
Map<Uuid, Set<Integer>> finalInitializedMap = mergeShareGroupInitMaps(currentMap.initializedTopics(), topicPartitionMap);
for (Uuid topicId : combinedTopicIdSet) { // Fetch initializing info from state metadata.
String topicName = metadataImage.topics().getTopic(topicId).name(); Map<Uuid, Set<Integer>> finalInitializingMap = new HashMap<>(currentMap.initializingTopics());
Set<Integer> partitions = new HashSet<>(currentMap.initializedTopics.getOrDefault(topicId, new HashSet<>()));
if (topicPartitionMap.containsKey(topicId)) { // Remove any entries which are already initialized.
partitions.addAll(topicPartitionMap.get(topicId)); for (Map.Entry<Uuid, Set<Integer>> entry : topicPartitionMap.entrySet()) {
Uuid topicId = entry.getKey();
if (finalInitializingMap.containsKey(topicId)) {
Set<Integer> partitions = finalInitializingMap.get(topicId);
partitions.removeAll(entry.getValue());
if (partitions.isEmpty()) {
finalInitializingMap.remove(topicId);
}
}
}
return new CoordinatorResult<>(List.of(
newShareGroupStatePartitionMetadataRecord(
group.groupId(),
attachTopicName(finalInitializingMap),
attachTopicName(finalInitializedMap),
Map.of()
)),
null
);
}
/**
* Removes specific topic partitions from the initializing state for a share group. This is usually part of
* shareGroupHeartbeat code flow, specifically, if there is a persister exception.
* @param groupId The group id corresponding to the share group whose share partitions have been initialized.
* @param topicPartitionMap Map representing topic partition data to be cleaned from the share state partition metadata.
*
* @return A Result containing ShareGroupStatePartitionMetadata records and Void response.
*/
public CoordinatorResult<Void, CoordinatorRecord> uninitializeShareGroupState(
String groupId,
Map<Uuid, Set<Integer>> topicPartitionMap
) {
ShareGroupStatePartitionMetadataInfo info = shareGroupPartitionMetadata.get(groupId);
if (info == null || info.initializingTopics().isEmpty() || topicPartitionMap.isEmpty()) {
return new CoordinatorResult<>(List.of(), null);
}
Map<Uuid, Set<Integer>> initializingTopics = info.initializingTopics();
Map<Uuid, Set<Integer>> finalInitializingTopics = new HashMap<>();
for (Map.Entry<Uuid, Set<Integer>> entry : initializingTopics.entrySet()) {
Uuid topicId = entry.getKey();
// If topicId to clean is not present in topicPartitionMap map, retain it.
if (!topicPartitionMap.containsKey(topicId)) {
finalInitializingTopics.put(entry.getKey(), entry.getValue());
} else {
Set<Integer> partitions = new HashSet<>(entry.getValue());
partitions.removeAll(topicPartitionMap.get(topicId));
if (!partitions.isEmpty()) {
finalInitializingTopics.put(entry.getKey(), partitions);
}
} }
finalMap.computeIfAbsent(topicId, k -> Map.entry(topicName, partitions));
} }
return new CoordinatorResult<>( return new CoordinatorResult<>(
List.of(newShareGroupStatePartitionMetadataRecord(group.groupId(), finalMap, Map.of())), List.of(
newShareGroupStatePartitionMetadataRecord(
groupId,
attachTopicName(finalInitializingTopics),
attachTopicName(info.initializedTopics()),
Map.of()
)
),
null null
); );
} }
/**
* Iterates over all share groups and returns persister initialize requests corresponding to any initializing
* topic partitions found in the group associated {@link ShareGroupStatePartitionMetadataInfo}.
* @param offset The last committed offset for the {@link ShareGroupStatePartitionMetadataInfo} timeline hashmap.
*
* @return A list containing {@link InitializeShareGroupStateParameters} requests, could be empty.
*/
public List<InitializeShareGroupStateParameters> reconcileShareGroupStateInitializingState(long offset) {
List<InitializeShareGroupStateParameters> requests = new LinkedList<>();
for (Group group : groups.values()) {
if (!(group instanceof ShareGroup shareGroup)) {
continue;
}
if (!(shareGroupPartitionMetadata.containsKey(shareGroup.groupId()))) {
continue;
}
Map<Uuid, Set<Integer>> initializing = shareGroupPartitionMetadata.get(shareGroup.groupId(), offset).initializingTopics();
if (initializing == null || initializing.isEmpty()) {
continue;
}
requests.add(buildInitializeShareGroupStateRequest(shareGroup.groupId(), shareGroup.groupEpoch(), initializing));
}
return requests;
}
private Map<Uuid, Map.Entry<String, Set<Integer>>> attachTopicName(Map<Uuid, Set<Integer>> initMap) {
TopicsImage topicsImage = metadataImage.topics();
Map<Uuid, Map.Entry<String, Set<Integer>>> finalMap = new HashMap<>();
for (Map.Entry<Uuid, Set<Integer>> entry : initMap.entrySet()) {
Uuid topicId = entry.getKey();
String topicName = topicsImage.getTopic(topicId).name();
finalMap.put(topicId, Map.entry(topicName, entry.getValue()));
}
return Collections.unmodifiableMap(finalMap);
}
/** /**
* Replays ConsumerGroupMemberMetadataKey/Value to update the hard state of * Replays ConsumerGroupMemberMetadataKey/Value to update the hard state of
* the consumer group. It updates the subscription part of the member or * the consumer group. It updates the subscription part of the member or
@ -5687,8 +5824,10 @@ public class GroupMetadataManager {
// Tombstone! // Tombstone!
shareGroupPartitionMetadata.remove(groupId); shareGroupPartitionMetadata.remove(groupId);
} else { } else {
ShareGroupStatePartitionMetadataInfo info = new ShareGroupStatePartitionMetadataInfo( ShareGroupStatePartitionMetadataInfo info = new ShareGroupStatePartitionMetadataInfo(
value.initializingTopics().stream()
.map(topicPartitionInfo -> Map.entry(topicPartitionInfo.topicId(), new HashSet<>(topicPartitionInfo.partitions())))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)),
value.initializedTopics().stream() value.initializedTopics().stream()
.map(topicPartitionInfo -> Map.entry(topicPartitionInfo.topicId(), new HashSet<>(topicPartitionInfo.partitions()))) .map(topicPartitionInfo -> Map.entry(topicPartitionInfo.topicId(), new HashSet<>(topicPartitionInfo.partitions())))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)), .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)),

View File

@ -21,6 +21,8 @@
"validVersions": "0", "validVersions": "0",
"flexibleVersions": "0+", "flexibleVersions": "0+",
"fields": [ "fields": [
{ "name": "InitializingTopics", "versions": "0+", "type": "[]TopicPartitionsInfo",
"about": "The topic-partitions whose share-group state is being initialized." },
{ "name": "InitializedTopics", "versions": "0+", "type": "[]TopicPartitionsInfo", { "name": "InitializedTopics", "versions": "0+", "type": "[]TopicPartitionsInfo",
"about": "The topics with initialized share-group state." }, "about": "The topics with initialized share-group state." },
{ "name": "DeletingTopics", "versions": "0+", "type": "[]TopicInfo", { "name": "DeletingTopics", "versions": "0+", "type": "[]TopicInfo",

View File

@ -323,6 +323,7 @@ public class GroupCoordinatorRecordHelpersTest {
CoordinatorRecord record = GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord( CoordinatorRecord record = GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord(
groupId, groupId,
Map.of(),
Map.of( Map.of(
topicId1, topicId1,
Map.entry(topicName1, partitions) Map.entry(topicName1, partitions)

View File

@ -97,6 +97,7 @@ import org.apache.kafka.server.share.persister.InitializeShareGroupStateResult;
import org.apache.kafka.server.share.persister.NoOpStatePersister; import org.apache.kafka.server.share.persister.NoOpStatePersister;
import org.apache.kafka.server.share.persister.PartitionFactory; import org.apache.kafka.server.share.persister.PartitionFactory;
import org.apache.kafka.server.share.persister.PartitionIdData; import org.apache.kafka.server.share.persister.PartitionIdData;
import org.apache.kafka.server.share.persister.PartitionStateData;
import org.apache.kafka.server.share.persister.Persister; import org.apache.kafka.server.share.persister.Persister;
import org.apache.kafka.server.share.persister.ReadShareGroupStateSummaryParameters; import org.apache.kafka.server.share.persister.ReadShareGroupStateSummaryParameters;
import org.apache.kafka.server.share.persister.ReadShareGroupStateSummaryResult; import org.apache.kafka.server.share.persister.ReadShareGroupStateSummaryResult;
@ -139,7 +140,6 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
@ -1943,7 +1943,7 @@ public class GroupCoordinatorServiceTest {
Map.of("share-group-id-1", Map.entry(createDeleteShareRequest("share-group-id-1", shareGroupTopicId, List.of(0, 1)), Errors.NONE)) Map.of("share-group-id-1", Map.entry(createDeleteShareRequest("share-group-id-1", shareGroupTopicId, List.of(0, 1)), Errors.NONE))
)).thenReturn(CompletableFuture.completedFuture(Map.of())); // non-share group )).thenReturn(CompletableFuture.completedFuture(Map.of())); // non-share group
when(persister.deleteState(any())).thenReturn(CompletableFuture.completedFuture( when(persister.deleteState(ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(
new DeleteShareGroupStateResult.Builder() new DeleteShareGroupStateResult.Builder()
.setTopicsData(List.of( .setTopicsData(List.of(
new TopicData<>( new TopicData<>(
@ -1978,7 +1978,7 @@ public class GroupCoordinatorServiceTest {
future.getNow(null); future.getNow(null);
assertEquals(expectedResultCollection, future.get()); assertEquals(expectedResultCollection, future.get());
verify(persister, times(1)).deleteState(any()); verify(persister, times(1)).deleteState(ArgumentMatchers.any());
} }
@Test @Test
@ -2027,7 +2027,7 @@ public class GroupCoordinatorServiceTest {
Map.of("share-group-id-2", Map.entry(createDeleteShareRequest("share-group-id-2", shareGroupTopicId2, List.of(0, 1)), Errors.NONE)) Map.of("share-group-id-2", Map.entry(createDeleteShareRequest("share-group-id-2", shareGroupTopicId2, List.of(0, 1)), Errors.NONE))
)); ));
when(persister.deleteState(any())).thenReturn(CompletableFuture.completedFuture( when(persister.deleteState(ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(
new DeleteShareGroupStateResult.Builder() new DeleteShareGroupStateResult.Builder()
.setTopicsData(List.of( .setTopicsData(List.of(
new TopicData<>( new TopicData<>(
@ -2073,7 +2073,7 @@ public class GroupCoordinatorServiceTest {
future.getNow(null); future.getNow(null);
assertEquals(expectedResultCollection, future.get()); assertEquals(expectedResultCollection, future.get());
verify(persister, times(2)).deleteState(any()); verify(persister, times(2)).deleteState(ArgumentMatchers.any());
} }
@Test @Test
@ -2125,7 +2125,7 @@ public class GroupCoordinatorServiceTest {
future.getNow(null); future.getNow(null);
assertEquals(expectedResultCollection, future.get()); assertEquals(expectedResultCollection, future.get());
verify(persister, times(0)).deleteState(any()); verify(persister, times(0)).deleteState(ArgumentMatchers.any());
} }
@Test @Test
@ -2171,7 +2171,7 @@ public class GroupCoordinatorServiceTest {
assertEquals(expectedResultCollection, future.get()); assertEquals(expectedResultCollection, future.get());
// If there is error creating share group delete req // If there is error creating share group delete req
// neither persister call nor general delete groups call is made. // neither persister call nor general delete groups call is made.
verify(persister, times(0)).deleteState(any()); verify(persister, times(0)).deleteState(ArgumentMatchers.any());
verify(runtime, times(0)).scheduleWriteOperation( verify(runtime, times(0)).scheduleWriteOperation(
ArgumentMatchers.eq("delete-groups"), ArgumentMatchers.eq("delete-groups"),
ArgumentMatchers.any(), ArgumentMatchers.any(),
@ -2224,7 +2224,7 @@ public class GroupCoordinatorServiceTest {
ArgumentMatchers.any() ArgumentMatchers.any()
)).thenReturn(CompletableFuture.failedFuture(Errors.CLUSTER_AUTHORIZATION_FAILED.exception())); )).thenReturn(CompletableFuture.failedFuture(Errors.CLUSTER_AUTHORIZATION_FAILED.exception()));
when(persister.deleteState(any())).thenReturn(CompletableFuture.completedFuture(new DeleteShareGroupStateResult.Builder() when(persister.deleteState(ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(new DeleteShareGroupStateResult.Builder()
.setTopicsData(List.of( .setTopicsData(List.of(
new TopicData<>( new TopicData<>(
shareGroupTopicId, shareGroupTopicId,
@ -2242,7 +2242,7 @@ public class GroupCoordinatorServiceTest {
future.getNow(null); future.getNow(null);
assertEquals(expectedResultCollection, future.get()); assertEquals(expectedResultCollection, future.get());
verify(persister, times(1)).deleteState(any()); verify(persister, times(1)).deleteState(ArgumentMatchers.any());
} }
@ParameterizedTest @ParameterizedTest
@ -3115,7 +3115,7 @@ public class GroupCoordinatorServiceTest {
service.onNewMetadataImage(image, null); service.onNewMetadataImage(image, null);
when(mockPersister.initializeState(any())).thenReturn(CompletableFuture.completedFuture( when(mockPersister.initializeState(ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(
new InitializeShareGroupStateResult.Builder() new InitializeShareGroupStateResult.Builder()
.setTopicsData(List.of( .setTopicsData(List.of(
new TopicData<>(topicId, List.of( new TopicData<>(topicId, List.of(
@ -3165,7 +3165,7 @@ public class GroupCoordinatorServiceTest {
Uuid topicId = Uuid.randomUuid(); Uuid topicId = Uuid.randomUuid();
Exception exp = new NotCoordinatorException("bad stuff"); Exception exp = new NotCoordinatorException("bad stuff");
when(mockPersister.initializeState(any())).thenReturn(CompletableFuture.failedFuture(exp)); when(mockPersister.initializeState(ArgumentMatchers.any())).thenReturn(CompletableFuture.failedFuture(exp));
when(runtime.scheduleWriteOperation( when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("initialize-share-group-state"), ArgumentMatchers.eq("initialize-share-group-state"),
@ -3174,6 +3174,13 @@ public class GroupCoordinatorServiceTest {
ArgumentMatchers.any() ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(null)); )).thenReturn(CompletableFuture.completedFuture(null));
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("uninitialize-share-group-state"),
ArgumentMatchers.any(),
ArgumentMatchers.any(),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(null));
ShareGroupHeartbeatResponseData defaultResponse = new ShareGroupHeartbeatResponseData(); ShareGroupHeartbeatResponseData defaultResponse = new ShareGroupHeartbeatResponseData();
InitializeShareGroupStateParameters params = new InitializeShareGroupStateParameters.Builder() InitializeShareGroupStateParameters params = new InitializeShareGroupStateParameters.Builder()
.setGroupTopicPartitionData( .setGroupTopicPartitionData(
@ -3190,6 +3197,12 @@ public class GroupCoordinatorServiceTest {
ArgumentMatchers.any(), ArgumentMatchers.any(),
ArgumentMatchers.any() ArgumentMatchers.any()
); );
verify(runtime, times(1)).scheduleWriteOperation(
ArgumentMatchers.eq("uninitialize-share-group-state"),
ArgumentMatchers.any(),
ArgumentMatchers.any(),
ArgumentMatchers.any()
);
verify(mockPersister, times(1)).initializeState(ArgumentMatchers.any()); verify(mockPersister, times(1)).initializeState(ArgumentMatchers.any());
} }
@ -3206,7 +3219,7 @@ public class GroupCoordinatorServiceTest {
String groupId = "share-group"; String groupId = "share-group";
Uuid topicId = Uuid.randomUuid(); Uuid topicId = Uuid.randomUuid();
when(mockPersister.initializeState(any())).thenReturn(CompletableFuture.completedFuture( when(mockPersister.initializeState(ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(
new InitializeShareGroupStateResult.Builder() new InitializeShareGroupStateResult.Builder()
.setTopicsData(List.of( .setTopicsData(List.of(
new TopicData<>(topicId, List.of( new TopicData<>(topicId, List.of(
@ -3223,6 +3236,13 @@ public class GroupCoordinatorServiceTest {
ArgumentMatchers.any() ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(null)); )).thenReturn(CompletableFuture.completedFuture(null));
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("uninitialize-share-group-state"),
ArgumentMatchers.any(),
ArgumentMatchers.any(),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(null));
ShareGroupHeartbeatResponseData defaultResponse = new ShareGroupHeartbeatResponseData(); ShareGroupHeartbeatResponseData defaultResponse = new ShareGroupHeartbeatResponseData();
InitializeShareGroupStateParameters params = new InitializeShareGroupStateParameters.Builder() InitializeShareGroupStateParameters params = new InitializeShareGroupStateParameters.Builder()
.setGroupTopicPartitionData( .setGroupTopicPartitionData(
@ -3242,6 +3262,12 @@ public class GroupCoordinatorServiceTest {
ArgumentMatchers.any(), ArgumentMatchers.any(),
ArgumentMatchers.any() ArgumentMatchers.any()
); );
verify(runtime, times(1)).scheduleWriteOperation(
ArgumentMatchers.eq("uninitialize-share-group-state"),
ArgumentMatchers.any(),
ArgumentMatchers.any(),
ArgumentMatchers.any()
);
verify(mockPersister, times(1)).initializeState(ArgumentMatchers.any()); verify(mockPersister, times(1)).initializeState(ArgumentMatchers.any());
} }
@ -3264,7 +3290,7 @@ public class GroupCoordinatorServiceTest {
service.onNewMetadataImage(image, null); service.onNewMetadataImage(image, null);
when(mockPersister.initializeState(any())).thenReturn(CompletableFuture.completedFuture( when(mockPersister.initializeState(ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(
new InitializeShareGroupStateResult.Builder() new InitializeShareGroupStateResult.Builder()
.setTopicsData(List.of( .setTopicsData(List.of(
new TopicData<>(topicId, List.of( new TopicData<>(topicId, List.of(
@ -3280,6 +3306,13 @@ public class GroupCoordinatorServiceTest {
ArgumentMatchers.any() ArgumentMatchers.any()
)).thenReturn(CompletableFuture.failedFuture(exp)); )).thenReturn(CompletableFuture.failedFuture(exp));
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("uninitialize-share-group-state"),
ArgumentMatchers.any(),
ArgumentMatchers.any(),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(null));
ShareGroupHeartbeatResponseData defaultResponse = new ShareGroupHeartbeatResponseData(); ShareGroupHeartbeatResponseData defaultResponse = new ShareGroupHeartbeatResponseData();
InitializeShareGroupStateParameters params = new InitializeShareGroupStateParameters.Builder() InitializeShareGroupStateParameters params = new InitializeShareGroupStateParameters.Builder()
.setGroupTopicPartitionData( .setGroupTopicPartitionData(
@ -3299,10 +3332,116 @@ public class GroupCoordinatorServiceTest {
ArgumentMatchers.any(), ArgumentMatchers.any(),
ArgumentMatchers.any() ArgumentMatchers.any()
); );
verify(runtime, times(1)).scheduleWriteOperation(
ArgumentMatchers.eq("uninitialize-share-group-state"),
ArgumentMatchers.any(),
ArgumentMatchers.any(),
ArgumentMatchers.any()
);
verify(mockPersister, times(1)).initializeState(ArgumentMatchers.any()); verify(mockPersister, times(1)).initializeState(ArgumentMatchers.any());
} }
@Test
public void testReconcileShareGroupInitializingStateNoRequests() {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
Persister mockPersister = mock(Persister.class);
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(runtime)
.setPersister(mockPersister)
.build(true);
when(runtime.scheduleReadAllOperation(
ArgumentMatchers.eq("reconcile-share-group-initializing-state"),
ArgumentMatchers.any()
)).thenReturn(List.of());
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("initialize-share-group-state"),
ArgumentMatchers.any(),
ArgumentMatchers.any(),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(null));
service.reconcileShareGroupStateInitializingState().join();
verify(runtime, times(1)).scheduleReadAllOperation(
ArgumentMatchers.eq("reconcile-share-group-initializing-state"),
ArgumentMatchers.any()
);
verify(mockPersister, times(0)).initializeState(ArgumentMatchers.any());
verify(runtime, times(0)).scheduleWriteOperation(
ArgumentMatchers.eq("initialize-share-group-state"),
ArgumentMatchers.any(),
ArgumentMatchers.any(),
ArgumentMatchers.any()
);
}
@Test
public void testReconcileShareGroupInitializingState() {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
Persister mockPersister = mock(Persister.class);
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(runtime)
.setPersister(mockPersister)
.build(true);
String groupId1 = "groupId1";
String groupId2 = "groupId2";
Uuid topicId1 = Uuid.randomUuid();
Uuid topicId2 = Uuid.randomUuid();
InitializeShareGroupStateParameters req1 = new InitializeShareGroupStateParameters.Builder()
.setGroupTopicPartitionData(new GroupTopicPartitionData.Builder<PartitionStateData>()
.setGroupId(groupId1)
.setTopicsData(List.of(new TopicData<>(topicId1, List.of(PartitionFactory.newPartitionStateData(0, 1, 0)))))
.build())
.build();
InitializeShareGroupStateParameters req2 = new InitializeShareGroupStateParameters.Builder()
.setGroupTopicPartitionData(new GroupTopicPartitionData.Builder<PartitionStateData>()
.setGroupId(groupId2)
.setTopicsData(List.of(new TopicData<>(topicId2, List.of(PartitionFactory.newPartitionStateData(0, 2, 10)))))
.build())
.build();
when(runtime.scheduleReadAllOperation(
ArgumentMatchers.eq("reconcile-share-group-initializing-state"),
ArgumentMatchers.any()
)).thenReturn(List.of(
CompletableFuture.completedFuture(List.of(req1)),
CompletableFuture.completedFuture(List.of(req2))
));
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("initialize-share-group-state"),
ArgumentMatchers.any(),
ArgumentMatchers.any(),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(null));
when(mockPersister.initializeState(ArgumentMatchers.eq(req1))).thenReturn(CompletableFuture.completedFuture(new InitializeShareGroupStateResult.Builder()
.setTopicsData(List.of(new TopicData<>(topicId1, List.of(PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), Errors.NONE.message())))))
.build())
);
when(mockPersister.initializeState(ArgumentMatchers.eq(req2))).thenReturn(CompletableFuture.completedFuture(new InitializeShareGroupStateResult.Builder()
.setTopicsData(List.of(new TopicData<>(topicId2, List.of(PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), Errors.NONE.message())))))
.build())
);
service.reconcileShareGroupStateInitializingState().join();
verify(mockPersister, times(2)).initializeState(ArgumentMatchers.any());
verify(runtime, times(2)).scheduleWriteOperation(
ArgumentMatchers.eq("initialize-share-group-state"),
ArgumentMatchers.any(),
ArgumentMatchers.any(),
ArgumentMatchers.any()
);
}
@FunctionalInterface @FunctionalInterface
private interface TriFunction<A, B, C, R> { private interface TriFunction<A, B, C, R> {
R apply(A a, B b, C c); R apply(A a, B b, C c);

View File

@ -148,6 +148,7 @@ import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
@ -15226,12 +15227,34 @@ public class GroupMetadataManagerTest {
mkTopicAssignment(barTopicId, 0, 1, 2) mkTopicAssignment(barTopicId, 0, 1, 2)
)), )),
GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentEpochRecord(groupId, 1), GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentEpochRecord(groupId, 1),
GroupCoordinatorRecordHelpers.newShareGroupCurrentAssignmentRecord(groupId, expectedMember) GroupCoordinatorRecordHelpers.newShareGroupCurrentAssignmentRecord(groupId, expectedMember),
GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord(groupId, mkShareGroupStateMap(List.of(
mkShareGroupStateMetadataEntry(fooTopicId, fooTopicName, List.of(0, 1, 2, 3, 4, 5)),
mkShareGroupStateMetadataEntry(barTopicId, barTopicName, List.of(0, 1, 2))
)),
Map.of(),
Map.of()
)
); );
assertRecordsEquals(expectedRecords, result.records()); assertRecordsEquals(expectedRecords, result.records());
} }
private Map<Uuid, Map.Entry<String, Set<Integer>>> mkShareGroupStateMap(List<Map.Entry<Uuid, Map.Entry<String, Set<Integer>>>> entries) {
Map<Uuid, Map.Entry<String, Set<Integer>>> map = new HashMap<>();
for (Map.Entry<Uuid, Map.Entry<String, Set<Integer>>> entry : entries) {
map.put(entry.getKey(), entry.getValue());
}
return map;
}
private Map.Entry<Uuid, Map.Entry<String, Set<Integer>>> mkShareGroupStateMetadataEntry(Uuid topicId, String topicName, List<Integer> partitions) {
return Map.entry(
topicId,
Map.entry(topicName, new LinkedHashSet<>(partitions))
);
};
@Test @Test
public void testShareGroupLeavingMemberBumpsGroupEpoch() { public void testShareGroupLeavingMemberBumpsGroupEpoch() {
String groupId = "fooup"; String groupId = "fooup";
@ -20465,6 +20488,16 @@ public class GroupMetadataManagerTest {
.setMemberEpoch(0) .setMemberEpoch(0)
.setSubscribedTopicNames(List.of(t1Name, t2Name))); .setSubscribedTopicNames(List.of(t1Name, t2Name)));
assertTrue(result.records().contains(
newShareGroupStatePartitionMetadataRecord(groupId, mkShareGroupStateMap(List.of(
mkShareGroupStateMetadataEntry(t1Uuid, t1Name, List.of(0, 1)),
mkShareGroupStateMetadataEntry(t2Uuid, t2Name, List.of(0, 1))
)),
Map.of(),
Map.of()
))
);
verifyShareGroupHeartbeatInitializeRequest( verifyShareGroupHeartbeatInitializeRequest(
result.response().getValue(), result.response().getValue(),
Map.of( Map.of(
@ -20524,6 +20557,18 @@ public class GroupMetadataManagerTest {
.setMemberEpoch(1) .setMemberEpoch(1)
.setSubscribedTopicNames(null)); .setSubscribedTopicNames(null));
assertTrue(result.records().contains(
newShareGroupStatePartitionMetadataRecord(groupId, mkShareGroupStateMap(List.of(
mkShareGroupStateMetadataEntry(t1Uuid, t1Name, List.of(2, 3))
)),
mkShareGroupStateMap(List.of(
mkShareGroupStateMetadataEntry(t1Uuid, t1Name, List.of(0, 1)),
mkShareGroupStateMetadataEntry(t2Uuid, t2Name, List.of(0, 1))
)),
Map.of()
))
);
verifyShareGroupHeartbeatInitializeRequest( verifyShareGroupHeartbeatInitializeRequest(
result.response().getValue(), result.response().getValue(),
Map.of( Map.of(
@ -20536,6 +20581,69 @@ public class GroupMetadataManagerTest {
); );
} }
@Test
public void testShareGroupHeartbeatNoPersisterRequestWithInitializing() {
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
assignor.prepareGroupAssignment(new GroupAssignment(Map.of()));
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withShareGroupAssignor(assignor)
.build();
Uuid t1Uuid = Uuid.randomUuid();
String t1Name = "t1";
MetadataImage image = new MetadataImageBuilder()
.addTopic(t1Uuid, t1Name, 2)
.build();
String groupId = "share-group";
context.groupMetadataManager.onNewMetadataImage(image, mock(MetadataDelta.class));
context.groupMetadataManager.replay(
new ShareGroupMetadataKey()
.setGroupId(groupId),
new ShareGroupMetadataValue()
.setEpoch(0)
);
context.groupMetadataManager.replay(
new ShareGroupStatePartitionMetadataKey()
.setGroupId(groupId),
new ShareGroupStatePartitionMetadataValue()
.setInitializingTopics(List.of(
new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
.setTopicId(t1Uuid)
.setTopicName(t1Name)
.setPartitions(List.of(0, 1))
))
.setInitializedTopics(List.of())
.setDeletingTopics(List.of())
);
Uuid memberId = Uuid.randomUuid();
CoordinatorResult<Map.Entry<ShareGroupHeartbeatResponseData, Optional<InitializeShareGroupStateParameters>>, CoordinatorRecord> result = context.shareGroupHeartbeat(
new ShareGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId.toString())
.setMemberEpoch(0)
.setSubscribedTopicNames(List.of(t1Name)));
assertFalse(result.records().contains(
newShareGroupStatePartitionMetadataRecord(groupId, mkShareGroupStateMap(List.of(
mkShareGroupStateMetadataEntry(t1Uuid, t1Name, List.of(0, 1))
)),
Map.of(),
Map.of()
))
);
verifyShareGroupHeartbeatInitializeRequest(
result.response().getValue(),
Map.of(),
groupId,
0,
false
);
}
@Test @Test
public void testShareGroupInitializeSuccess() { public void testShareGroupInitializeSuccess() {
String groupId = "groupId"; String groupId = "groupId";
@ -20574,7 +20682,7 @@ public class GroupMetadataManagerTest {
CoordinatorResult<Void, CoordinatorRecord> result = context.groupMetadataManager.initializeShareGroupState(groupId, snapshotMetadataInitializeMap); CoordinatorResult<Void, CoordinatorRecord> result = context.groupMetadataManager.initializeShareGroupState(groupId, snapshotMetadataInitializeMap);
CoordinatorRecord record = newShareGroupStatePartitionMetadataRecord(groupId, snapshotMetadataInitializeRecordMap, Map.of()); CoordinatorRecord record = newShareGroupStatePartitionMetadataRecord(groupId, Map.of(), snapshotMetadataInitializeRecordMap, Map.of());
assertNull(result.response()); assertNull(result.response());
assertEquals(List.of(record), result.records()); assertEquals(List.of(record), result.records());
@ -20620,69 +20728,30 @@ public class GroupMetadataManagerTest {
.withShareGroupAssignor(assignor) .withShareGroupAssignor(assignor)
.build(); .build();
// Empty on empty metadata image
MetadataImage image = MetadataImage.EMPTY;
MetadataDelta delta = new MetadataDelta.Builder()
.setImage(image)
.build();
context.groupMetadataManager.onNewMetadataImage(image, delta);
assertEquals(
Map.of(),
context.groupMetadataManager.subscribedTopicsChangeMap(groupId, Map.of(
topicName, new TopicMetadata(topicId, topicName, partitions)
))
);
// Empty on empty subscription metadata // Empty on empty subscription metadata
image = new MetadataImageBuilder()
.addTopic(topicId, topicName, partitions)
.build();
delta = new MetadataDelta.Builder()
.setImage(image)
.build();
context.groupMetadataManager.onNewMetadataImage(image, delta);
assertEquals( assertEquals(
Map.of(), Map.of(),
context.groupMetadataManager.subscribedTopicsChangeMap(groupId, Map.of()) context.groupMetadataManager.subscribedTopicsChangeMap(groupId, Map.of())
); );
// No error on empty initialized metadata (no replay of initialized topics) // No error on empty initialized metadata (no replay of initialized topics)
image = new MetadataImageBuilder()
.addTopic(topicId, topicName, partitions)
.build();
delta = new MetadataDelta.Builder()
.setImage(image)
.build();
context.groupMetadataManager.onNewMetadataImage(image, delta);
assertEquals( assertEquals(
Map.of( Map.of(
topicId, Map.entry( topicId, Set.of(0)
topicName,
Set.of(0)
)
), ),
context.groupMetadataManager.subscribedTopicsChangeMap(groupId, Map.of( context.groupMetadataManager.subscribedTopicsChangeMap(groupId, Map.of(
topicName, new TopicMetadata(topicId, topicName, partitions) topicName, new TopicMetadata(topicId, topicName, partitions)
)) ))
); );
// Calculates correct diff // Calculates correct diff respecting both initialized and initializing maps.
String t1Name = "t1"; String t1Name = "t1";
Uuid t1Id = Uuid.randomUuid(); Uuid t1Id = Uuid.randomUuid();
String t2Name = "t2"; String t2Name = "t2";
Uuid t2Id = Uuid.randomUuid(); Uuid t2Id = Uuid.randomUuid();
String t3Name = "t3";
Uuid t3Id = Uuid.randomUuid();
image = new MetadataImageBuilder()
.addTopic(t1Id, t1Name, 2)
.addTopic(t2Id, t2Name, 2)
.build();
delta = new MetadataDelta.Builder()
.setImage(image)
.build();
context.groupMetadataManager.onNewMetadataImage(image, delta);
context.groupMetadataManager.replay( context.groupMetadataManager.replay(
new ShareGroupMetadataKey() new ShareGroupMetadataKey()
.setGroupId(groupId), .setGroupId(groupId),
@ -20693,30 +20762,117 @@ public class GroupMetadataManagerTest {
new ShareGroupStatePartitionMetadataKey() new ShareGroupStatePartitionMetadataKey()
.setGroupId(groupId), .setGroupId(groupId),
new ShareGroupStatePartitionMetadataValue() new ShareGroupStatePartitionMetadataValue()
.setInitializedTopics(List.of( .setInitializingTopics(List.of(
new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo() new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
.setTopicId(t1Id) .setTopicId(t1Id)
.setTopicName(t1Name) .setTopicName(t1Name)
.setPartitions(List.of(0, 1)) .setPartitions(List.of(0, 1))
)) ))
.setInitializedTopics(List.of(
new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
.setTopicId(t2Id)
.setTopicName(t2Name)
.setPartitions(List.of(0, 1, 2))
))
.setDeletingTopics(List.of()) .setDeletingTopics(List.of())
); );
// Since t1 is already initialized due to replay above // Since t1 is initializing and t2 is initialized due to replay above.
assertEquals( assertEquals(
Map.of( Map.of(
t2Id, Map.entry( t3Id, Set.of(0, 1, 2)
t2Name,
Set.of(0, 1)
)
), ),
context.groupMetadataManager.subscribedTopicsChangeMap(groupId, Map.of( context.groupMetadataManager.subscribedTopicsChangeMap(groupId, Map.of(
t1Name, new TopicMetadata(t1Id, t1Name, 2), t1Name, new TopicMetadata(t1Id, t1Name, 2),
t2Name, new TopicMetadata(t2Id, t2Name, 2) t2Name, new TopicMetadata(t2Id, t2Name, 2),
t3Name, new TopicMetadata(t3Id, t3Name, 3)
)) ))
); );
} }
@Test
public void testUninitializeTopics() {
MockPartitionAssignor assignor = new MockPartitionAssignor("simple");
assignor.prepareGroupAssignment(new GroupAssignment(Map.of()));
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withShareGroupAssignor(assignor)
.build();
String groupId = "shareGroupId";
Uuid t1Id = Uuid.randomUuid();
String t1Name = "t1Name";
Uuid t2Id = Uuid.randomUuid();
String t2Name = "t2Name";
// No records if topics to be uninitialized are not in metadata info.
CoordinatorResult<Void, CoordinatorRecord> result = context.groupMetadataManager.uninitializeShareGroupState(groupId, Map.of(t1Id, Set.of(0)));
assertEquals(
List.of(),
result.records()
);
MetadataImage image = new MetadataImageBuilder()
.addTopic(t1Id, t1Name, 2)
.addTopic(t2Id, t2Name, 3)
.build();
MetadataDelta delta = new MetadataDelta(image);
context.groupMetadataManager.onNewMetadataImage(image, delta);
// Cleanup happens from initialzing state only.
context.groupMetadataManager.replay(
new ShareGroupMetadataKey()
.setGroupId(groupId),
new ShareGroupMetadataValue()
.setEpoch(0)
);
context.groupMetadataManager.replay(
new ShareGroupStatePartitionMetadataKey()
.setGroupId(groupId),
new ShareGroupStatePartitionMetadataValue()
.setInitializingTopics(List.of(
new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
.setTopicId(t1Id)
.setTopicName(t1Name)
.setPartitions(List.of(0, 1))
))
.setInitializedTopics(List.of(
new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
.setTopicId(t2Id)
.setTopicName(t2Name)
.setPartitions(List.of(0, 1, 2))
))
.setDeletingTopics(List.of())
);
result = context.groupMetadataManager.uninitializeShareGroupState(groupId, Map.of(t1Id, Set.of(0, 1)));
Set<Integer> partitions = new LinkedHashSet<>(List.of(0, 1, 2));
assertEquals(
List.of(newShareGroupStatePartitionMetadataRecord(groupId, Map.of(), Map.of(t2Id, Map.entry(t2Name, partitions)), Map.of())),
result.records()
);
}
@Test
public void testMergeShareGroupInitMaps() {
Map<Uuid, Set<Integer>> m1 = new HashMap<>();
Map<Uuid, Set<Integer>> m2 = new HashMap<>();
Uuid t1 = Uuid.randomUuid();
Uuid t2 = Uuid.randomUuid();
Uuid t3 = Uuid.randomUuid();
m1.put(t1, new HashSet<>(List.of(1, 2)));
m1.put(t2, new HashSet<>(List.of(3, 4)));
m2.put(t1, new HashSet<>(List.of(3, 4)));
m2.put(t3, new HashSet<>(List.of(5, 6)));
Map<Uuid, Set<Integer>> m3 = GroupMetadataManager.mergeShareGroupInitMaps(m1, m2);
// The arg maps should not be overridden.
assertEquals(Map.of(t1, Set.of(1, 2), t2, Set.of(3, 4)), m1);
assertEquals(Map.of(t1, Set.of(3, 4), t3, Set.of(5, 6)), m2);
assertEquals(Map.of(t1, Set.of(1, 2, 3, 4), t2, Set.of(3, 4), t3, Set.of(5, 6)), m3);
}
private static void checkJoinGroupResponse( private static void checkJoinGroupResponse(
JoinGroupResponseData expectedResponse, JoinGroupResponseData expectedResponse,
JoinGroupResponseData actualResponse, JoinGroupResponseData actualResponse,

View File

@ -84,6 +84,8 @@ import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataValue; import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataKey; import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataValue; import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberKey; import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberValue; import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataKey; import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataKey;
@ -1701,6 +1703,13 @@ public class GroupMetadataManagerTestContext {
); );
break; break;
case SHARE_GROUP_STATE_PARTITION_METADATA:
groupMetadataManager.replay(
(ShareGroupStatePartitionMetadataKey) key,
(ShareGroupStatePartitionMetadataValue) messageOrNull(value)
);
break;
case CONSUMER_GROUP_REGULAR_EXPRESSION: case CONSUMER_GROUP_REGULAR_EXPRESSION:
groupMetadataManager.replay( groupMetadataManager.replay(
(ConsumerGroupRegularExpressionKey) key, (ConsumerGroupRegularExpressionKey) key,