KAFKA-16741: Add share group classes for Heartbeat API (1/N) (KIP-932) (#16516)

Defined share group, member and sinmple assignor classes with API definition for Share Group Heartbeat and Describe API.

The ShareGroup and ShareGroupMember extends the common ModernGroup and ModernGroupMember respectively.

Reviewers: Andrew Schofield <aschofield@confluent.io>, Manikumar Reddy <manikumar.reddy@gmail.com>
This commit is contained in:
Apoorv Mittal 2024-07-04 16:01:47 +01:00 committed by GitHub
parent efe7ccaf77
commit e0dcfa7b51
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
25 changed files with 2512 additions and 93 deletions

View File

@ -340,7 +340,7 @@
<suppress checks="CyclomaticComplexity"
files="(ConsumerGroupMember|GroupMetadataManager|GeneralUniformAssignmentBuilder).java"/>
<suppress checks="(NPathComplexity|MethodLength)"
files="(GroupMetadataManager|ConsumerGroupTest|GroupMetadataManagerTest|GroupMetadataManagerTestContext|GeneralUniformAssignmentBuilder).java"/>
files="(GroupMetadataManager|ConsumerGroupTest|ShareGroupTest|GroupMetadataManagerTest|GroupMetadataManagerTestContext|GeneralUniformAssignmentBuilder).java"/>
<suppress checks="NPathComplexity"
files="CoordinatorRuntime.java"/>
<suppress checks="ClassFanOutComplexity"

View File

@ -20,7 +20,7 @@ import kafka.common.OffsetAndMetadata
import kafka.server.{KafkaConfig, ReplicaManager, RequestLocal}
import kafka.utils.Implicits.MapExtensionMethods
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.common.message.{ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchResponseData, SyncGroupRequestData, SyncGroupResponseData, TxnOffsetCommitRequestData, TxnOffsetCommitResponseData}
import org.apache.kafka.common.message.{ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchResponseData, ShareGroupDescribeResponseData, ShareGroupHeartbeatRequestData, ShareGroupHeartbeatResponseData, SyncGroupRequestData, SyncGroupResponseData, TxnOffsetCommitRequestData, TxnOffsetCommitResponseData}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.RecordBatch
@ -74,6 +74,15 @@ private[group] class GroupCoordinatorAdapter(
))
}
override def shareGroupHeartbeat(
context: RequestContext,
request: ShareGroupHeartbeatRequestData
): CompletableFuture[ShareGroupHeartbeatResponseData] = {
FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
s"The old group coordinator does not support ${ApiKeys.SHARE_GROUP_HEARTBEAT.name} API."
))
}
override def joinGroup(
context: RequestContext,
request: JoinGroupRequestData,
@ -627,4 +636,13 @@ private[group] class GroupCoordinatorAdapter(
s"The old group coordinator does not support ${ApiKeys.CONSUMER_GROUP_DESCRIBE.name} API."
))
}
override def shareGroupDescribe(
context: RequestContext,
groupIds: util.List[String]
): CompletableFuture[util.List[ShareGroupDescribeResponseData.DescribedGroup]] = {
FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
s"The old group coordinator does not support ${ApiKeys.SHARE_GROUP_DESCRIBE.name} API."
))
}
}

View File

@ -21,7 +21,7 @@ import kafka.coordinator.group.GroupCoordinatorConcurrencyTest.{JoinGroupCallbac
import kafka.server.RequestLocal
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.common.errors.{InvalidGroupIdException, UnsupportedVersionException}
import org.apache.kafka.common.message.{ConsumerGroupHeartbeatRequestData, DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchResponseData, SyncGroupRequestData, SyncGroupResponseData, TxnOffsetCommitRequestData, TxnOffsetCommitResponseData}
import org.apache.kafka.common.message.{ConsumerGroupHeartbeatRequestData, DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchResponseData, ShareGroupHeartbeatRequestData, SyncGroupRequestData, SyncGroupResponseData, TxnOffsetCommitRequestData, TxnOffsetCommitResponseData}
import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol
import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember
import org.apache.kafka.common.message.OffsetDeleteRequestData.{OffsetDeleteRequestPartition, OffsetDeleteRequestTopic, OffsetDeleteRequestTopicCollection}
@ -78,6 +78,22 @@ class GroupCoordinatorAdapterTest {
assertFutureThrows(future, classOf[UnsupportedVersionException])
}
@Test
def testJoinShareGroup(): Unit = {
val groupCoordinator = mock(classOf[GroupCoordinator])
val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM)
val ctx = makeContext(ApiKeys.SHARE_GROUP_HEARTBEAT, ApiKeys.SHARE_GROUP_HEARTBEAT.latestVersion)
val request = new ShareGroupHeartbeatRequestData()
.setGroupId("group")
val future = adapter.shareGroupHeartbeat(ctx, request)
assertTrue(future.isDone)
assertTrue(future.isCompletedExceptionally)
assertFutureThrows(future, classOf[UnsupportedVersionException])
}
@ParameterizedTest
@ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP)
def testJoinGroup(version: Short): Unit = {
@ -901,4 +917,17 @@ class GroupCoordinatorAdapterTest {
assertTrue(future.isCompletedExceptionally)
assertFutureThrows(future, classOf[UnsupportedVersionException])
}
@Test
def testShareGroupDescribe(): Unit = {
val groupCoordinator = mock(classOf[GroupCoordinator])
val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM)
val context = makeContext(ApiKeys.SHARE_GROUP_DESCRIBE, ApiKeys.SHARE_GROUP_DESCRIBE.latestVersion)
val groupIds = List("group-id-1", "group-id-2").asJava
val future = adapter.shareGroupDescribe(context, groupIds)
assertTrue(future.isDone)
assertTrue(future.isCompletedExceptionally)
assertFutureThrows(future, classOf[UnsupportedVersionException])
}
}

View File

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.api.assignor;
import org.apache.kafka.common.annotation.InterfaceStability;
/**
* Server-side partition assignor for share groups used by the GroupCoordinator.
*/
@InterfaceStability.Unstable
public interface ShareGroupPartitionAssignor extends PartitionAssignor {
}

View File

@ -19,6 +19,7 @@ package org.apache.kafka.coordinator.group;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.coordinator.group.Group.GroupType;
import org.apache.kafka.coordinator.group.classic.ClassicGroup;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
@ -36,6 +37,7 @@ import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataKey;
import org.apache.kafka.coordinator.group.modern.TopicMetadata;
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember;
import org.apache.kafka.server.common.ApiMessageAndVersion;
@ -209,14 +211,40 @@ public class CoordinatorRecordHelpers {
public static CoordinatorRecord newGroupEpochTombstoneRecord(
String groupId
) {
return new CoordinatorRecord(
new ApiMessageAndVersion(
new ConsumerGroupMetadataKey()
.setGroupId(groupId),
(short) 3
),
null // Tombstone.
);
return newGroupEpochTombstoneRecord(groupId, GroupType.CONSUMER);
}
/**
* Creates a ConsumerGroupMetadata tombstone.
*
* @param groupId The consumer group id.
* @param groupType The group type.
* @return The record.
*/
public static CoordinatorRecord newGroupEpochTombstoneRecord(
String groupId,
GroupType groupType
) {
if (groupType == GroupType.CONSUMER) {
return new CoordinatorRecord(
new ApiMessageAndVersion(
new ConsumerGroupMetadataKey()
.setGroupId(groupId),
(short) 3
),
null // Tombstone.
);
} else if (groupType == GroupType.SHARE) {
return new CoordinatorRecord(
new ApiMessageAndVersion(
new ShareGroupMetadataKey()
.setGroupId(groupId),
(short) 11
),
null // Tombstone.
);
}
throw new IllegalArgumentException("Unsupported group type: " + groupType);
}
/**

View File

@ -35,6 +35,7 @@ public interface Group {
enum GroupType {
CONSUMER("consumer"),
CLASSIC("classic"),
SHARE("share"),
UNKNOWN("unknown");
private final String name;

View File

@ -36,6 +36,9 @@ import org.apache.kafka.common.message.OffsetDeleteRequestData;
import org.apache.kafka.common.message.OffsetDeleteResponseData;
import org.apache.kafka.common.message.OffsetFetchRequestData;
import org.apache.kafka.common.message.OffsetFetchResponseData;
import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
@ -73,6 +76,20 @@ public interface GroupCoordinator {
ConsumerGroupHeartbeatRequestData request
);
/**
* Heartbeat to a Share Group.
*
* @param context The request context.
* @param request The ShareGroupHeartbeatResponse data.
*
* @return A future yielding the response.
* The error code(s) of the response are set to indicate the error(s) occurred during the execution.
*/
CompletableFuture<ShareGroupHeartbeatResponseData> shareGroupHeartbeat(
RequestContext context,
ShareGroupHeartbeatRequestData request
);
/**
* Join a Classic Group.
*
@ -174,6 +191,19 @@ public interface GroupCoordinator {
List<String> groupIds
);
/**
* Describe share groups.
*
* @param context The coordinator request context.
* @param groupIds The group ids.
*
* @return A future yielding the results or an exception.
*/
CompletableFuture<List<ShareGroupDescribeResponseData.DescribedGroup>> shareGroupDescribe(
RequestContext context,
List<String> groupIds
);
/**
* Delete Groups.
*

View File

@ -40,6 +40,9 @@ import org.apache.kafka.common.message.OffsetDeleteRequestData;
import org.apache.kafka.common.message.OffsetDeleteResponseData;
import org.apache.kafka.common.message.OffsetFetchRequestData;
import org.apache.kafka.common.message.OffsetFetchResponseData;
import org.apache.kafka.common.message.ShareGroupDescribeResponseData.DescribedGroup;
import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
@ -308,6 +311,18 @@ public class GroupCoordinatorService implements GroupCoordinator {
));
}
/**
* See {@link GroupCoordinator#shareGroupHeartbeat(RequestContext, ShareGroupHeartbeatRequestData)}.
*/
@Override
public CompletableFuture<ShareGroupHeartbeatResponseData> shareGroupHeartbeat(
RequestContext context,
ShareGroupHeartbeatRequestData request
) {
// TODO: Implement this method as part of KIP-932.
throw new UnsupportedOperationException();
}
/**
* See {@link GroupCoordinator#joinGroup(RequestContext, JoinGroupRequestData, BufferSupplier)}.
*/
@ -589,6 +604,17 @@ public class GroupCoordinatorService implements GroupCoordinator {
return FutureUtils.combineFutures(futures, ArrayList::new, List::addAll);
}
/**
* See {@link GroupCoordinator#shareGroupDescribe(RequestContext, List)}.
*/
@Override
public CompletableFuture<List<DescribedGroup>> shareGroupDescribe(
RequestContext context,
List<String> groupIds) {
// TODO: Implement this method as part of KIP-932.
throw new UnsupportedOperationException();
}
/**
* See {@link GroupCoordinator#describeGroups(RequestContext, List)}.
*/

View File

@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.assignor;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.api.assignor.GroupSpec;
import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.api.assignor.MemberSubscription;
import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException;
import org.apache.kafka.coordinator.group.api.assignor.ShareGroupPartitionAssignor;
import org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber;
import org.apache.kafka.coordinator.group.modern.MemberAssignmentImpl;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS;
/**
* A simple partition assignor that assigns each member all partitions of the subscribed topics.
*/
public class SimpleAssignor implements ShareGroupPartitionAssignor {
private static final String SIMPLE_ASSIGNOR_NAME = "simple";
@Override
public String name() {
return SIMPLE_ASSIGNOR_NAME;
}
@Override
public GroupAssignment assign(
GroupSpec groupSpec,
SubscribedTopicDescriber subscribedTopicDescriber
) throws PartitionAssignorException {
if (groupSpec.memberIds().isEmpty())
return new GroupAssignment(Collections.emptyMap());
if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) {
return assignHomogenous(groupSpec, subscribedTopicDescriber);
} else {
return assignHeterogeneous(groupSpec, subscribedTopicDescriber);
}
}
private GroupAssignment assignHomogenous(
GroupSpec groupSpec,
SubscribedTopicDescriber subscribedTopicDescriber
) {
Set<Uuid> subscribeTopicIds = groupSpec.memberSubscription(groupSpec.memberIds().iterator().next())
.subscribedTopicIds();
if (subscribeTopicIds.isEmpty())
return new GroupAssignment(Collections.emptyMap());
Map<Uuid, Set<Integer>> targetPartitions = computeTargetPartitions(
subscribeTopicIds, subscribedTopicDescriber);
return new GroupAssignment(groupSpec.memberIds().stream().collect(Collectors.toMap(
Function.identity(), memberId -> new MemberAssignmentImpl(targetPartitions))));
}
private GroupAssignment assignHeterogeneous(
GroupSpec groupSpec,
SubscribedTopicDescriber subscribedTopicDescriber
) {
Map<String, MemberAssignment> members = new HashMap<>();
for (String memberId : groupSpec.memberIds()) {
MemberSubscription spec = groupSpec.memberSubscription(memberId);
if (spec.subscribedTopicIds().isEmpty())
continue;
Map<Uuid, Set<Integer>> targetPartitions = computeTargetPartitions(
spec.subscribedTopicIds(), subscribedTopicDescriber);
members.put(memberId, new MemberAssignmentImpl(targetPartitions));
}
return new GroupAssignment(members);
}
private Map<Uuid, Set<Integer>> computeTargetPartitions(
Set<Uuid> subscribeTopicIds,
SubscribedTopicDescriber subscribedTopicDescriber
) {
Map<Uuid, Set<Integer>> targetPartitions = new HashMap<>();
subscribeTopicIds.forEach(topicId -> {
int numPartitions = subscribedTopicDescriber.numPartitions(topicId);
if (numPartitions == -1) {
throw new PartitionAssignorException(
"Members are subscribed to topic " + topicId
+ " which doesn't exist in the topic metadata."
);
}
Set<Integer> partitions = new HashSet<>();
for (int i = 0; i < numPartitions; i++) {
partitions.add(i);
}
targetPartitions.put(topicId, partitions);
});
return targetPartitions;
}
}

View File

@ -591,6 +591,73 @@ public abstract class ModernGroup<T extends ModernGroupMember> implements Group
return HOMOGENEOUS;
}
/**
* Removes the partition epochs based on the provided assignment.
*
* @param assignment The assignment.
* @param expectedEpoch The expected epoch.
* @throws IllegalStateException if the epoch does not match the expected one.
* package-private for testing.
*/
public void removePartitionEpochs(
Map<Uuid, Set<Integer>> assignment,
int expectedEpoch
) {
assignment.forEach((topicId, assignedPartitions) -> {
currentPartitionEpoch.compute(topicId, (__, partitionsOrNull) -> {
if (partitionsOrNull != null) {
assignedPartitions.forEach(partitionId -> {
Integer prevValue = partitionsOrNull.remove(partitionId);
if (prevValue != expectedEpoch) {
throw new IllegalStateException(
String.format("Cannot remove the epoch %d from %s-%s because the partition is " +
"still owned at a different epoch %d", expectedEpoch, topicId, partitionId, prevValue));
}
});
if (partitionsOrNull.isEmpty()) {
return null;
} else {
return partitionsOrNull;
}
} else {
throw new IllegalStateException(
String.format("Cannot remove the epoch %d from %s because it does not have any epoch",
expectedEpoch, topicId));
}
});
});
}
/**
* Adds the partitions epoch based on the provided assignment.
*
* @param assignment The assignment.
* @param epoch The new epoch.
* @throws IllegalStateException if the partition already has an epoch assigned.
* package-private for testing.
*/
public void addPartitionEpochs(
Map<Uuid, Set<Integer>> assignment,
int epoch
) {
assignment.forEach((topicId, assignedPartitions) -> {
currentPartitionEpoch.compute(topicId, (__, partitionsOrNull) -> {
if (partitionsOrNull == null) {
partitionsOrNull = new TimelineHashMap<>(snapshotRegistry, assignedPartitions.size());
}
for (Integer partitionId : assignedPartitions) {
Integer prevValue = partitionsOrNull.put(partitionId, epoch);
if (prevValue != null) {
throw new IllegalStateException(
String.format("Cannot set the epoch of %s-%s to %d because the partition is " +
"still owned at epoch %d", topicId, partitionId, epoch, prevValue));
}
}
return partitionsOrNull;
});
});
}
/**
* Gets the protocol type for the group.
*

View File

@ -56,11 +56,6 @@ public abstract class ModernGroupMember {
*/
protected String rackId;
/**
* The rebalance timeout provided by the member.
*/
protected int rebalanceTimeoutMs;
/**
* The client id reported by the member.
*/
@ -87,7 +82,6 @@ public abstract class ModernGroupMember {
int previousMemberEpoch,
String instanceId,
String rackId,
int rebalanceTimeoutMs,
String clientId,
String clientHost,
Set<String> subscribedTopicNames,
@ -100,7 +94,6 @@ public abstract class ModernGroupMember {
this.state = state;
this.instanceId = instanceId;
this.rackId = rackId;
this.rebalanceTimeoutMs = rebalanceTimeoutMs;
this.clientId = clientId;
this.clientHost = clientHost;
this.subscribedTopicNames = subscribedTopicNames;
@ -142,13 +135,6 @@ public abstract class ModernGroupMember {
return rackId;
}
/**
* @return The rebalance timeout in millis.
*/
public int rebalanceTimeoutMs() {
return rebalanceTimeoutMs;
}
/**
* @return The client id.
*/

View File

@ -677,73 +677,6 @@ public class ConsumerGroup extends ModernGroup<ConsumerGroupMember> {
}
}
/**
* Removes the partition epochs based on the provided assignment.
*
* @param assignment The assignment.
* @param expectedEpoch The expected epoch.
* @throws IllegalStateException if the epoch does not match the expected one.
* package-private for testing.
*/
void removePartitionEpochs(
Map<Uuid, Set<Integer>> assignment,
int expectedEpoch
) {
assignment.forEach((topicId, assignedPartitions) -> {
currentPartitionEpoch.compute(topicId, (__, partitionsOrNull) -> {
if (partitionsOrNull != null) {
assignedPartitions.forEach(partitionId -> {
Integer prevValue = partitionsOrNull.remove(partitionId);
if (prevValue != expectedEpoch) {
throw new IllegalStateException(
String.format("Cannot remove the epoch %d from %s-%s because the partition is " +
"still owned at a different epoch %d", expectedEpoch, topicId, partitionId, prevValue));
}
});
if (partitionsOrNull.isEmpty()) {
return null;
} else {
return partitionsOrNull;
}
} else {
throw new IllegalStateException(
String.format("Cannot remove the epoch %d from %s because it does not have any epoch",
expectedEpoch, topicId));
}
});
});
}
/**
* Adds the partitions epoch based on the provided assignment.
*
* @param assignment The assignment.
* @param epoch The new epoch.
* @throws IllegalStateException if the partition already has an epoch assigned.
* package-private for testing.
*/
void addPartitionEpochs(
Map<Uuid, Set<Integer>> assignment,
int epoch
) {
assignment.forEach((topicId, assignedPartitions) -> {
currentPartitionEpoch.compute(topicId, (__, partitionsOrNull) -> {
if (partitionsOrNull == null) {
partitionsOrNull = new TimelineHashMap<>(snapshotRegistry, assignedPartitions.size());
}
for (Integer partitionId : assignedPartitions) {
Integer prevValue = partitionsOrNull.put(partitionId, epoch);
if (prevValue != null) {
throw new IllegalStateException(
String.format("Cannot set the epoch of %s-%s to %d because the partition is " +
"still owned at epoch %d", topicId, partitionId, epoch, prevValue));
}
}
return partitionsOrNull;
});
});
}
public ConsumerGroupDescribeResponseData.DescribedGroup asDescribedGroup(
long committedOffset,
String defaultAssignor,

View File

@ -250,6 +250,10 @@ public class ConsumerGroupMember extends ModernGroupMember {
}
}
/**
* The rebalance timeout provided by the member.
*/
private int rebalanceTimeoutMs;
/**
* The subscription pattern configured by the member.
@ -294,19 +298,26 @@ public class ConsumerGroupMember extends ModernGroupMember {
previousMemberEpoch,
instanceId,
rackId,
rebalanceTimeoutMs,
clientId,
clientHost,
subscribedTopicNames,
state,
assignedPartitions
);
this.rebalanceTimeoutMs = rebalanceTimeoutMs;
this.subscribedTopicRegex = subscribedTopicRegex;
this.serverAssignorName = serverAssignorName;
this.partitionsPendingRevocation = partitionsPendingRevocation;
this.classicMemberMetadata = classicMemberMetadata;
}
/**
* @return The rebalance timeout in millis.
*/
public int rebalanceTimeoutMs() {
return rebalanceTimeoutMs;
}
/**
* @return The regular expression based subscription.
*/

View File

@ -0,0 +1,299 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.modern.share;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.coordinator.group.CoordinatorRecord;
import org.apache.kafka.coordinator.group.CoordinatorRecordHelpers;
import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
import org.apache.kafka.coordinator.group.modern.ModernGroup;
import org.apache.kafka.image.TopicsImage;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineObject;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.Set;
/**
* A Share Group.
*/
public class ShareGroup extends ModernGroup<ShareGroupMember> {
private static final String PROTOCOL_TYPE = "share";
public enum ShareGroupState {
EMPTY("Empty"),
STABLE("Stable"),
DEAD("Dead"),
UNKNOWN("Unknown");
private final String name;
private final String lowerCaseName;
ShareGroupState(String name) {
this.name = name;
this.lowerCaseName = name.toLowerCase(Locale.ROOT);
}
@Override
public String toString() {
return name;
}
public String toLowerCaseString() {
return lowerCaseName;
}
}
/**
* The group state.
*/
private final TimelineObject<ShareGroupState> state;
public ShareGroup(
SnapshotRegistry snapshotRegistry,
String groupId
) {
super(snapshotRegistry, groupId);
this.state = new TimelineObject<>(snapshotRegistry, ShareGroupState.EMPTY);
}
/**
* @return The group type (Share).
*/
@Override
public GroupType type() {
return GroupType.SHARE;
}
/**
* @return The group protocol type (share).
*/
@Override
public String protocolType() {
return PROTOCOL_TYPE;
}
/**
* @return The current state as a String.
*/
@Override
public String stateAsString() {
return state.get().toString();
}
/**
* @return The current state as a String with given committedOffset.
*/
public String stateAsString(long committedOffset) {
return state.get(committedOffset).toString();
}
/**
* @return The current state.
*/
public ShareGroupState state() {
return state.get();
}
/**
* @return The current state based on committed offset.
*/
public ShareGroupState state(long committedOffset) {
return state.get(committedOffset);
}
/**
* Gets or creates a member.
*
* @param memberId The member id.
* @param createIfNotExists Booleans indicating whether the member must be
* created if it does not exist.
*
* @return A ShareGroupMember.
*/
public ShareGroupMember getOrMaybeCreateMember(
String memberId,
boolean createIfNotExists
) {
ShareGroupMember member = members.get(memberId);
if (member != null) return member;
if (!createIfNotExists) {
throw new UnknownMemberIdException(
String.format("Member %s is not a member of group %s.", memberId, groupId));
}
return new ShareGroupMember.Builder(memberId).build();
}
/**
* Updates the member.
*
* @param newMember The new share group member.
*/
@Override
public void updateMember(ShareGroupMember newMember) {
if (newMember == null) {
throw new IllegalArgumentException("newMember cannot be null.");
}
ShareGroupMember oldMember = members.put(newMember.memberId(), newMember);
maybeUpdateSubscribedTopicNamesAndGroupSubscriptionType(oldMember, newMember);
maybeUpdatePartitionEpoch(oldMember, newMember);
maybeUpdateGroupState();
}
/**
* Remove the member from the group.
*
* @param memberId The member id to remove.
*/
public void removeMember(String memberId) {
ShareGroupMember oldMember = members.remove(memberId);
maybeUpdateSubscribedTopicNamesAndGroupSubscriptionType(oldMember, null);
maybeRemovePartitionEpoch(oldMember);
maybeUpdateGroupState();
}
@Override
public void validateOffsetCommit(
String memberId,
String groupInstanceId,
int memberEpoch,
boolean isTransactional,
short apiVersion
) {
throw new UnsupportedOperationException("validateOffsetCommit is not supported for Share Groups.");
}
@Override
public void validateOffsetFetch(
String memberId,
int memberEpoch,
long lastCommittedOffset
) {
throw new UnsupportedOperationException("validateOffsetFetch is not supported for Share Groups.");
}
@Override
public void validateOffsetDelete() {
throw new UnsupportedOperationException("validateOffsetDelete is not supported for Share Groups.");
}
/**
* Validates the DeleteGroups request.
*/
@Override
public void validateDeleteGroup() throws ApiException {
if (state() != ShareGroupState.EMPTY) {
throw Errors.NON_EMPTY_GROUP.exception();
}
}
/**
* Populates the list of records with tombstone(s) for deleting the group.
*
* @param records The list of records.
*/
@Override
public void createGroupTombstoneRecords(List<CoordinatorRecord> records) {
records.add(CoordinatorRecordHelpers.newGroupEpochTombstoneRecord(groupId(), GroupType.SHARE));
}
@Override
public boolean isEmpty() {
return state() == ShareGroupState.EMPTY;
}
@Override
public Optional<OffsetExpirationCondition> offsetExpirationCondition() {
throw new UnsupportedOperationException("offsetExpirationCondition is not supported for Share Groups.");
}
@Override
public boolean isInStates(final Set<String> statesFilter, final long committedOffset) {
return statesFilter.contains(state.get(committedOffset).toLowerCaseString());
}
/**
* Updates the current state of the group.
*/
@Override
protected void maybeUpdateGroupState() {
ShareGroupState newState = ShareGroupState.STABLE;
if (members.isEmpty()) {
newState = ShareGroupState.EMPTY;
}
state.set(newState);
}
/**
* Updates the partition epochs based on the old and the new member.
*
* @param oldMember The old member.
* @param newMember The new member.
*/
private void maybeUpdatePartitionEpoch(
ShareGroupMember oldMember,
ShareGroupMember newMember
) {
maybeRemovePartitionEpoch(oldMember);
addPartitionEpochs(newMember.assignedPartitions(), newMember.memberEpoch());
}
/**
* Removes the partition epochs for the provided member.
*
* @param oldMember The old member.
*/
private void maybeRemovePartitionEpoch(
ShareGroupMember oldMember
) {
if (oldMember != null) {
removePartitionEpochs(oldMember.assignedPartitions(), oldMember.memberEpoch());
}
}
public ShareGroupDescribeResponseData.DescribedGroup asDescribedGroup(
long committedOffset,
String defaultAssignor,
TopicsImage topicsImage
) {
ShareGroupDescribeResponseData.DescribedGroup describedGroup = new ShareGroupDescribeResponseData.DescribedGroup()
.setGroupId(groupId)
.setAssignorName(defaultAssignor)
.setGroupEpoch(groupEpoch.get(committedOffset))
.setGroupState(state.get(committedOffset).toString())
.setAssignmentEpoch(targetAssignmentEpoch.get(committedOffset));
members.entrySet(committedOffset).forEach(
entry -> describedGroup.members().add(
entry.getValue().asShareGroupDescribeMember(
topicsImage
)
)
);
return describedGroup;
}
}

View File

@ -0,0 +1,264 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.modern.share;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataValue;
import org.apache.kafka.coordinator.group.modern.MemberState;
import org.apache.kafka.coordinator.group.modern.ModernGroupMember;
import org.apache.kafka.image.TopicImage;
import org.apache.kafka.image.TopicsImage;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
/**
* ShareGroupMember contains all the information related to a member
* within a share group. This class is immutable.
*/
public class ShareGroupMember extends ModernGroupMember {
/**
* A builder that facilitates the creation of a new member or the update of
* an existing one.
* <p>
* Please refer to the javadoc of {{@link ShareGroupMember}} for the
* definition of the fields.
*/
public static class Builder {
private final String memberId;
private int memberEpoch = 0;
private int previousMemberEpoch = -1;
private MemberState state = MemberState.STABLE;
private String rackId = null;
private String clientId = "";
private String clientHost = "";
private Set<String> subscribedTopicNames = Collections.emptySet();
private Map<Uuid, Set<Integer>> assignedPartitions = Collections.emptyMap();
public Builder(String memberId) {
this.memberId = Objects.requireNonNull(memberId);
}
public Builder(ShareGroupMember member) {
this(
Objects.requireNonNull(member),
member.memberId
);
}
public Builder(ShareGroupMember member, String newMemberId) {
Objects.requireNonNull(member);
this.memberId = Objects.requireNonNull(newMemberId);
this.memberEpoch = member.memberEpoch;
this.previousMemberEpoch = member.previousMemberEpoch;
this.rackId = member.rackId;
this.clientId = member.clientId;
this.clientHost = member.clientHost;
this.subscribedTopicNames = member.subscribedTopicNames;
this.assignedPartitions = member.assignedPartitions;
}
public Builder setMemberEpoch(int memberEpoch) {
this.memberEpoch = memberEpoch;
return this;
}
public Builder setPreviousMemberEpoch(int previousMemberEpoch) {
this.previousMemberEpoch = previousMemberEpoch;
return this;
}
public Builder setRackId(String rackId) {
this.rackId = rackId;
return this;
}
public Builder maybeUpdateRackId(Optional<String> rackId) {
this.rackId = rackId.orElse(this.rackId);
return this;
}
public Builder setClientId(String clientId) {
this.clientId = clientId;
return this;
}
public Builder setClientHost(String clientHost) {
this.clientHost = clientHost;
return this;
}
public Builder setSubscribedTopicNames(List<String> subscribedTopicList) {
if (subscribedTopicNames != null) this.subscribedTopicNames = new HashSet<>(subscribedTopicList);
return this;
}
public Builder maybeUpdateSubscribedTopicNames(Optional<List<String>> subscribedTopicList) {
subscribedTopicList.ifPresent(list -> this.subscribedTopicNames = new HashSet<>(list));
return this;
}
public Builder setState(MemberState state) {
this.state = state;
return this;
}
public Builder setAssignedPartitions(Map<Uuid, Set<Integer>> assignedPartitions) {
this.assignedPartitions = assignedPartitions;
return this;
}
public Builder updateWith(ShareGroupMemberMetadataValue record) {
setRackId(record.rackId());
setClientId(record.clientId());
setClientHost(record.clientHost());
setSubscribedTopicNames(record.subscribedTopicNames());
return this;
}
public ShareGroupMember build() {
return new ShareGroupMember(
memberId,
memberEpoch,
previousMemberEpoch,
rackId,
clientId,
clientHost,
subscribedTopicNames,
state,
assignedPartitions
);
}
}
private ShareGroupMember(
String memberId,
int memberEpoch,
int previousMemberEpoch,
String rackId,
String clientId,
String clientHost,
Set<String> subscribedTopicNames,
MemberState state,
Map<Uuid, Set<Integer>> assignedPartitions
) {
super(
memberId,
memberEpoch,
previousMemberEpoch,
null,
rackId,
clientId,
clientHost,
subscribedTopicNames,
state,
assignedPartitions
);
}
/**
* Converts this ShareGroupMember to a ShareGroupDescribeResponseData.Member.
*
* @param topicsImage: Topics image object to search for a specific topic id
*
* @return The ShareGroupMember mapped as ShareGroupDescribeResponseData.Member.
*/
public ShareGroupDescribeResponseData.Member asShareGroupDescribeMember(
TopicsImage topicsImage
) {
return new ShareGroupDescribeResponseData.Member()
.setMemberEpoch(memberEpoch)
.setMemberId(memberId)
.setAssignment(new ShareGroupDescribeResponseData.Assignment()
.setTopicPartitions(topicPartitionsFromMap(assignedPartitions, topicsImage)))
.setClientHost(clientHost)
.setClientId(clientId)
.setRackId(rackId)
.setSubscribedTopicNames(subscribedTopicNames == null ? null : new ArrayList<>(subscribedTopicNames));
}
private static List<ShareGroupDescribeResponseData.TopicPartitions> topicPartitionsFromMap(
Map<Uuid, Set<Integer>> partitions,
TopicsImage topicsImage
) {
List<ShareGroupDescribeResponseData.TopicPartitions> topicPartitions = new ArrayList<>();
partitions.forEach((topicId, partitionSet) -> {
TopicImage topicImage = topicsImage.getTopic(topicId);
if (topicImage != null) {
topicPartitions.add(new ShareGroupDescribeResponseData.TopicPartitions()
.setTopicId(topicId)
.setTopicName(topicImage.name())
.setPartitions(new ArrayList<>(partitionSet)));
}
});
return topicPartitions;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ShareGroupMember that = (ShareGroupMember) o;
return memberEpoch == that.memberEpoch
&& previousMemberEpoch == that.previousMemberEpoch
&& state == that.state
&& Objects.equals(memberId, that.memberId)
&& Objects.equals(rackId, that.rackId)
&& Objects.equals(clientId, that.clientId)
&& Objects.equals(clientHost, that.clientHost)
&& Objects.equals(subscribedTopicNames, that.subscribedTopicNames)
&& Objects.equals(assignedPartitions, that.assignedPartitions);
}
@Override
public int hashCode() {
int result = memberId != null ? memberId.hashCode() : 0;
result = 31 * result + memberEpoch;
result = 31 * result + previousMemberEpoch;
result = 31 * result + Objects.hashCode(state);
result = 31 * result + Objects.hashCode(rackId);
result = 31 * result + Objects.hashCode(clientId);
result = 31 * result + Objects.hashCode(clientHost);
result = 31 * result + Objects.hashCode(subscribedTopicNames);
result = 31 * result + Objects.hashCode(assignedPartitions);
return result;
}
@Override
public String toString() {
return "ShareGroupMember(" +
"memberId='" + memberId + '\'' +
", memberEpoch=" + memberEpoch + '\'' +
", previousMemberEpoch=" + previousMemberEpoch + '\'' +
", state='" + state + '\'' +
", rackId='" + rackId + '\'' +
", clientId='" + clientId + '\'' +
", clientHost='" + clientHost + '\'' +
", subscribedTopicNames=" + subscribedTopicNames + '\'' +
", assignedPartitions=" + assignedPartitions +
')';
}
}

View File

@ -0,0 +1,28 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// KIP-932 is in development. This schema is subject to non-backwards-compatible changes.
{
"type": "data",
"name": "ShareGroupMemberMetadataKey",
"validVersions": "10",
"flexibleVersions": "none",
"fields": [
{ "name": "GroupId", "type": "string", "versions": "10",
"about": "The group id." },
{ "name": "MemberId", "type": "string", "versions": "10",
"about": "The member id." }
]
}

View File

@ -0,0 +1,32 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// KIP-932 is in development. This schema is subject to non-backwards-compatible changes.
{
"type": "data",
"name": "ShareGroupMemberMetadataValue",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "RackId", "versions": "0+", "nullableVersions": "0+", "type": "string",
"about": "The (optional) rack id." },
{ "name": "ClientId", "versions": "0+", "type": "string",
"about": "The client id." },
{ "name": "ClientHost", "versions": "0+", "type": "string",
"about": "The client host." },
{ "name": "SubscribedTopicNames", "versions": "0+", "type": "[]string",
"about": "The list of subscribed topic names." }
]
}

View File

@ -0,0 +1,26 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// KIP-932 is in development. This schema is subject to non-backwards-compatible changes.
{
"type": "data",
"name": "ShareGroupMetadataKey",
"validVersions": "11",
"flexibleVersions": "none",
"fields": [
{ "name": "GroupId", "type": "string", "versions": "11",
"about": "The group id." }
]
}

View File

@ -0,0 +1,26 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// KIP-932 is in development. This schema is subject to non-backwards-compatible changes.
{
"type": "data",
"name": "ShareGroupMetadataValue",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "Epoch", "type": "int32", "versions": "0+",
"about": "The group epoch." }
]
}

View File

@ -0,0 +1,27 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// KIP-932 is in development. This schema is subject to non-backwards-compatible changes.
{
"type": "data",
"name": "ShareGroupPartitionMetadataKey",
"validVersions": "9",
"flexibleVersions": "none",
"fields": [
{ "name": "GroupId", "type": "string", "versions": "9",
"about": "The group id." }
]
}

View File

@ -0,0 +1,44 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// KIP-932 is in development. This schema is subject to non-backwards-compatible changes.
{
"type": "data",
"name": "ShareGroupPartitionMetadataValue",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "InitializedTopics", "versions": "0+", "type": "[]TopicPartitionsInfo",
"about": "The topics with initialized share-group state." },
{ "name": "DeletingTopics", "versions": "0+", "type": "[]TopicInfo",
"about": "The topics whose share-group state is being deleted." }
],
"commonStructs": [
{ "name": "TopicPartitionsInfo", "versions": "0+", "fields": [
{ "name": "TopicId", "type": "uuid", "versions": "0+",
"about": "The topic identifier." },
{ "name": "TopicName", "type": "string", "versions": "0+",
"about": "The topic name." },
{ "name": "Partitions", "type": "[]int32", "versions": "0+",
"about": "The partitions." }
]},
{ "name": "TopicInfo", "versions": "0+", "fields": [
{ "name": "TopicId", "type": "uuid", "versions": "0+",
"about": "The topic identifier." },
{ "name": "TopicName", "type": "string", "versions": "0+",
"about": "The topic name." }
]}
]
}

View File

@ -22,6 +22,7 @@ import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProt
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.Group.GroupType;
import org.apache.kafka.coordinator.group.classic.ClassicGroup;
import org.apache.kafka.coordinator.group.classic.ClassicGroupMember;
import org.apache.kafka.coordinator.group.classic.ClassicGroupState;
@ -41,6 +42,7 @@ import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataKey;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
import org.apache.kafka.coordinator.group.modern.MemberState;
import org.apache.kafka.coordinator.group.modern.TopicMetadata;
@ -292,6 +294,27 @@ public class CoordinatorRecordHelpersTest {
));
}
@Test
public void testNewGroupEpochTombstoneRecordShareGroup() {
CoordinatorRecord expectedRecord = new CoordinatorRecord(
new ApiMessageAndVersion(
new ShareGroupMetadataKey()
.setGroupId("group-id"),
(short) 11),
null);
assertEquals(expectedRecord, newGroupEpochTombstoneRecord(
"group-id", GroupType.SHARE
));
}
@Test
public void testNewGroupEpochTombstoneRecordUnknownGroup() {
assertThrows(IllegalArgumentException.class, () -> newGroupEpochTombstoneRecord(
"group-id", GroupType.UNKNOWN
));
}
@Test
public void testNewTargetAssignmentRecord() {
Uuid topicId1 = Uuid.randomUuid();

View File

@ -0,0 +1,340 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.assignor;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.api.assignor.GroupSpec;
import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException;
import org.apache.kafka.coordinator.group.modern.Assignment;
import org.apache.kafka.coordinator.group.modern.GroupSpecImpl;
import org.apache.kafka.coordinator.group.modern.MemberSubscriptionAndAssignmentImpl;
import org.apache.kafka.coordinator.group.modern.SubscribedTopicDescriberImpl;
import org.apache.kafka.coordinator.group.modern.TopicMetadata;
import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HETEROGENEOUS;
import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class SimpleAssignorTest {
private static final Uuid TOPIC_1_UUID = Uuid.randomUuid();
private static final Uuid TOPIC_2_UUID = Uuid.randomUuid();
private static final Uuid TOPIC_3_UUID = Uuid.randomUuid();
private static final String TOPIC_1_NAME = "topic1";
private static final String TOPIC_3_NAME = "topic3";
private static final String MEMBER_A = "A";
private static final String MEMBER_B = "B";
private final SimpleAssignor assignor = new SimpleAssignor();
@Test
public void testName() {
assertEquals("simple", assignor.name());
}
@Test
public void testAssignWithEmptyMembers() {
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(
Collections.emptyMap()
);
GroupSpec groupSpec = new GroupSpecImpl(
Collections.emptyMap(),
HOMOGENEOUS,
Collections.emptyMap()
);
GroupAssignment groupAssignment = assignor.assign(
groupSpec,
subscribedTopicMetadata
);
assertEquals(Collections.emptyMap(), groupAssignment.members());
}
@Test
public void testAssignWithNoSubscribedTopic() {
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(
Collections.singletonMap(
TOPIC_1_UUID,
new TopicMetadata(
TOPIC_1_UUID,
TOPIC_1_NAME,
3,
Collections.emptyMap()
)
)
);
Map<String, MemberSubscriptionAndAssignmentImpl> members = Collections.singletonMap(
MEMBER_A,
new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
Collections.emptySet(),
Assignment.EMPTY
)
);
GroupSpec groupSpec = new GroupSpecImpl(
members,
HOMOGENEOUS,
Collections.emptyMap()
);
GroupAssignment groupAssignment = assignor.assign(
groupSpec,
subscribedTopicMetadata
);
assertEquals(Collections.emptyMap(), groupAssignment.members());
}
@Test
public void testAssignWithSubscribedToNonExistentTopic() {
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(
Collections.singletonMap(
TOPIC_1_UUID,
new TopicMetadata(
TOPIC_1_UUID,
TOPIC_1_NAME,
3,
Collections.emptyMap()
)
)
);
Map<String, MemberSubscriptionAndAssignmentImpl> members = Collections.singletonMap(
MEMBER_A,
new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
mkSet(TOPIC_2_UUID),
Assignment.EMPTY
)
);
GroupSpec groupSpec = new GroupSpecImpl(
members,
HOMOGENEOUS,
Collections.emptyMap()
);
assertThrows(PartitionAssignorException.class,
() -> assignor.assign(groupSpec, subscribedTopicMetadata));
}
@Test
public void testAssignWithTwoMembersAndTwoTopicsHomogeneous() {
Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
topicMetadata.put(TOPIC_1_UUID, new TopicMetadata(
TOPIC_1_UUID,
TOPIC_1_NAME,
3,
Collections.emptyMap()
));
topicMetadata.put(TOPIC_3_UUID, new TopicMetadata(
TOPIC_3_UUID,
TOPIC_3_NAME,
2,
Collections.emptyMap()
));
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
members.put(MEMBER_A, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
mkSet(TOPIC_1_UUID, TOPIC_3_UUID),
Assignment.EMPTY
));
members.put(MEMBER_B, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
mkSet(TOPIC_1_UUID, TOPIC_3_UUID),
Assignment.EMPTY
));
GroupSpec groupSpec = new GroupSpecImpl(
members,
HOMOGENEOUS,
Collections.emptyMap()
);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
subscribedTopicMetadata
);
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(MEMBER_A, mkAssignment(
mkTopicAssignment(TOPIC_1_UUID, 0, 1, 2),
mkTopicAssignment(TOPIC_3_UUID, 0, 1)
));
expectedAssignment.put(MEMBER_B, mkAssignment(
mkTopicAssignment(TOPIC_1_UUID, 0, 1, 2),
mkTopicAssignment(TOPIC_3_UUID, 0, 1)
));
assertAssignment(expectedAssignment, computedAssignment);
}
@Test
public void testAssignWithThreeMembersThreeTopicsHeterogeneous() {
Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
topicMetadata.put(TOPIC_1_UUID, new TopicMetadata(
TOPIC_1_UUID,
TOPIC_1_NAME,
3,
Collections.emptyMap()
));
topicMetadata.put(TOPIC_2_UUID, new TopicMetadata(
TOPIC_2_UUID,
"topic2",
3,
Collections.emptyMap()
));
topicMetadata.put(TOPIC_3_UUID, new TopicMetadata(
TOPIC_3_UUID,
TOPIC_3_NAME,
2,
Collections.emptyMap()
));
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
members.put(MEMBER_A, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
mkSet(TOPIC_1_UUID, TOPIC_2_UUID),
Assignment.EMPTY
));
members.put(MEMBER_B, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
mkSet(TOPIC_3_UUID),
Assignment.EMPTY
));
String memberC = "C";
members.put(memberC, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
mkSet(TOPIC_2_UUID, TOPIC_3_UUID),
Assignment.EMPTY
));
GroupSpec groupSpec = new GroupSpecImpl(
members,
HETEROGENEOUS,
Collections.emptyMap()
);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
subscribedTopicMetadata
);
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(MEMBER_A, mkAssignment(
mkTopicAssignment(TOPIC_1_UUID, 0, 1, 2),
mkTopicAssignment(TOPIC_2_UUID, 0, 1, 2)
));
expectedAssignment.put(MEMBER_B, mkAssignment(
mkTopicAssignment(TOPIC_3_UUID, 0, 1)
));
expectedAssignment.put(memberC, mkAssignment(
mkTopicAssignment(TOPIC_2_UUID, 0, 1, 2),
mkTopicAssignment(TOPIC_3_UUID, 0, 1)
));
assertAssignment(expectedAssignment, computedAssignment);
}
@Test
public void testAssignWithOneMemberNoAssignedTopicHeterogeneous() {
Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
topicMetadata.put(TOPIC_1_UUID, new TopicMetadata(
TOPIC_1_UUID,
TOPIC_1_NAME,
3,
Collections.emptyMap()
));
topicMetadata.put(TOPIC_2_UUID, new TopicMetadata(
TOPIC_2_UUID,
"topic2",
2,
Collections.emptyMap()
));
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
members.put(MEMBER_A, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
mkSet(TOPIC_1_UUID, TOPIC_2_UUID),
Assignment.EMPTY
));
members.put(MEMBER_B, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
Collections.emptySet(),
Assignment.EMPTY
));
GroupSpec groupSpec = new GroupSpecImpl(
members,
HETEROGENEOUS,
Collections.emptyMap()
);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
subscribedTopicMetadata
);
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(MEMBER_A, mkAssignment(
mkTopicAssignment(TOPIC_1_UUID, 0, 1, 2),
mkTopicAssignment(TOPIC_2_UUID, 0, 1)
));
assertAssignment(expectedAssignment, computedAssignment);
}
private void assertAssignment(
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment,
GroupAssignment computedGroupAssignment
) {
assertEquals(expectedAssignment.size(), computedGroupAssignment.members().size());
for (String memberId : computedGroupAssignment.members().keySet()) {
Map<Uuid, Set<Integer>> computedAssignmentForMember = computedGroupAssignment.members().get(memberId).partitions();
assertEquals(expectedAssignment.get(memberId), computedAssignmentForMember);
}
}
}

View File

@ -0,0 +1,192 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.modern.share;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
import org.apache.kafka.coordinator.group.MetadataImageBuilder;
import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataValue;
import org.apache.kafka.image.MetadataImage;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
public class ShareGroupMemberTest {
@Test
public void testNewMember() {
Uuid topicId1 = Uuid.randomUuid();
ShareGroupMember member = new ShareGroupMember.Builder("member-id")
.setMemberEpoch(10)
.setPreviousMemberEpoch(9)
.setRackId("rack-id")
.setClientId("client-id")
.setClientHost("hostname")
.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 1, 2, 3)))
.build();
assertEquals("member-id", member.memberId());
assertEquals(10, member.memberEpoch());
assertEquals(9, member.previousMemberEpoch());
assertNull(member.instanceId());
assertEquals("rack-id", member.rackId());
assertEquals("client-id", member.clientId());
assertEquals("hostname", member.clientHost());
assertEquals(mkSet("bar", "foo"), member.subscribedTopicNames());
assertEquals(mkAssignment(mkTopicAssignment(topicId1, 1, 2, 3)), member.assignedPartitions());
}
@Test
public void testEquals() {
Uuid topicId1 = Uuid.randomUuid();
ShareGroupMember member1 = new ShareGroupMember.Builder("member-id")
.setMemberEpoch(10)
.setPreviousMemberEpoch(9)
.setRackId("rack-id")
.setClientId("client-id")
.setClientHost("hostname")
.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 1, 2, 3)))
.build();
ShareGroupMember member2 = new ShareGroupMember.Builder("member-id")
.setMemberEpoch(10)
.setPreviousMemberEpoch(9)
.setRackId("rack-id")
.setClientId("client-id")
.setClientHost("hostname")
.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 1, 2, 3)))
.build();
assertEquals(member1, member2);
}
@Test
public void testUpdateMember() {
Uuid topicId1 = Uuid.randomUuid();
ShareGroupMember member = new ShareGroupMember.Builder("member-id")
.setMemberEpoch(10)
.setPreviousMemberEpoch(9)
.setRackId("rack-id")
.setClientId("client-id")
.setClientHost("hostname")
.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 1, 2, 3)))
.build();
// This is a no-op.
ShareGroupMember updatedMember = new ShareGroupMember.Builder(member)
.maybeUpdateRackId(Optional.empty())
.maybeUpdateSubscribedTopicNames(Optional.empty())
.build();
assertEquals(member, updatedMember);
updatedMember = new ShareGroupMember.Builder(member)
.maybeUpdateRackId(Optional.of("new-rack-id"))
.maybeUpdateSubscribedTopicNames(Optional.of(Collections.singletonList("zar")))
.build();
assertNull(member.instanceId());
assertEquals("new-rack-id", updatedMember.rackId());
// Names are sorted.
assertEquals(mkSet("zar"), updatedMember.subscribedTopicNames());
}
@Test
public void testUpdateWithShareGroupMemberMetadataValue() {
ShareGroupMemberMetadataValue record = new ShareGroupMemberMetadataValue()
.setClientId("client-id")
.setClientHost("host-id")
.setRackId("rack-id")
.setSubscribedTopicNames(Arrays.asList("foo", "bar"));
ShareGroupMember member = new ShareGroupMember.Builder("member-id")
.updateWith(record)
.build();
assertNull(member.instanceId());
assertEquals("rack-id", member.rackId());
assertEquals("client-id", member.clientId());
assertEquals("host-id", member.clientHost());
assertEquals(mkSet("bar", "foo"), member.subscribedTopicNames());
}
@Test
public void testAsShareGroupDescribeMember() {
Uuid topicId1 = Uuid.randomUuid();
Uuid topicId2 = Uuid.randomUuid();
MetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(topicId1, "topic1", 3)
.addTopic(topicId2, "topic2", 3)
.build();
List<String> subscribedTopicNames = Arrays.asList("topic1", "topic2");
List<Integer> assignedPartitions = Arrays.asList(0, 1, 2);
int epoch = 10;
ShareGroupMemberMetadataValue record = new ShareGroupMemberMetadataValue()
.setClientId("client-id")
.setClientHost("host-id")
.setRackId("rack-id")
.setSubscribedTopicNames(subscribedTopicNames);
String memberId = Uuid.randomUuid().toString();
ShareGroupMember member = new ShareGroupMember.Builder(memberId)
.updateWith(record)
.setMemberEpoch(epoch)
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(topicId1, 0, 1, 2)))
.build();
ShareGroupDescribeResponseData.Member actual = member.asShareGroupDescribeMember(metadataImage.topics());
ShareGroupDescribeResponseData.Member expected = new ShareGroupDescribeResponseData.Member()
.setMemberId(memberId)
.setMemberEpoch(epoch)
.setClientId("client-id")
.setRackId("rack-id")
.setClientHost("host-id")
.setSubscribedTopicNames(subscribedTopicNames)
.setAssignment(
new ShareGroupDescribeResponseData.Assignment()
.setTopicPartitions(Collections.singletonList(new ShareGroupDescribeResponseData.TopicPartitions()
.setTopicId(topicId1)
.setTopicName("topic1")
.setPartitions(assignedPartitions)
))
);
assertEquals(expected, actual);
}
}

View File

@ -0,0 +1,841 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.modern.share;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.GroupNotEmptyException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
import org.apache.kafka.coordinator.group.Group;
import org.apache.kafka.coordinator.group.MetadataImageBuilder;
import org.apache.kafka.coordinator.group.modern.Assignment;
import org.apache.kafka.coordinator.group.modern.MemberState;
import org.apache.kafka.coordinator.group.modern.TopicMetadata;
import org.apache.kafka.coordinator.group.modern.share.ShareGroup.ShareGroupState;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
import static org.apache.kafka.coordinator.group.CoordinatorRecordHelpersTest.mkMapOfPartitionRacks;
import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HETEROGENEOUS;
import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class ShareGroupTest {
@Test
public void testType() {
ShareGroup shareGroup = createShareGroup("foo");
assertEquals(Group.GroupType.SHARE, shareGroup.type());
}
@Test
public void testProtocolType() {
ShareGroup shareGroup = createShareGroup("foo");
assertEquals("share", shareGroup.protocolType());
}
@Test
public void testGetOrCreateMember() {
ShareGroup shareGroup = createShareGroup("foo");
ShareGroupMember member;
// Create a member.
member = shareGroup.getOrMaybeCreateMember("member-id", true);
assertEquals("member-id", member.memberId());
// Add member to the group.
shareGroup.updateMember(member);
// Get that member back.
member = shareGroup.getOrMaybeCreateMember("member-id", false);
assertEquals("member-id", member.memberId());
assertThrows(UnknownMemberIdException.class, () ->
shareGroup.getOrMaybeCreateMember("does-not-exist", false));
}
@Test
public void testUpdateMember() {
ShareGroup shareGroup = createShareGroup("foo");
ShareGroupMember member;
member = shareGroup.getOrMaybeCreateMember("member", true);
member = new ShareGroupMember.Builder(member)
.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
.build();
shareGroup.updateMember(member);
assertEquals(member, shareGroup.getOrMaybeCreateMember("member", false));
}
@Test
public void testRemoveMember() {
ShareGroup shareGroup = createShareGroup("foo");
ShareGroupMember member = shareGroup.getOrMaybeCreateMember("member", true);
shareGroup.updateMember(member);
assertTrue(shareGroup.hasMember("member"));
shareGroup.removeMember("member");
assertFalse(shareGroup.hasMember("member"));
}
@Test
public void testUpdatingMemberUpdatesPartitionEpoch() {
Uuid fooTopicId = Uuid.randomUuid();
Uuid barTopicId = Uuid.randomUuid();
ShareGroup shareGroup = createShareGroup("foo");
ShareGroupMember member;
member = new ShareGroupMember.Builder("member")
.setMemberEpoch(10)
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(fooTopicId, 1, 2, 3)))
.build();
shareGroup.updateMember(member);
assertEquals(10, shareGroup.currentPartitionEpoch(fooTopicId, 1));
assertEquals(10, shareGroup.currentPartitionEpoch(fooTopicId, 2));
assertEquals(10, shareGroup.currentPartitionEpoch(fooTopicId, 3));
assertEquals(-1, shareGroup.currentPartitionEpoch(barTopicId, 4));
assertEquals(-1, shareGroup.currentPartitionEpoch(barTopicId, 5));
assertEquals(-1, shareGroup.currentPartitionEpoch(barTopicId, 6));
member = new ShareGroupMember.Builder(member)
.setMemberEpoch(11)
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(barTopicId, 1, 2, 3)))
.build();
shareGroup.updateMember(member);
assertEquals(11, shareGroup.currentPartitionEpoch(barTopicId, 1));
assertEquals(11, shareGroup.currentPartitionEpoch(barTopicId, 2));
assertEquals(11, shareGroup.currentPartitionEpoch(barTopicId, 3));
assertEquals(-1, shareGroup.currentPartitionEpoch(fooTopicId, 1));
assertEquals(-1, shareGroup.currentPartitionEpoch(fooTopicId, 2));
assertEquals(-1, shareGroup.currentPartitionEpoch(fooTopicId, 3));
}
@Test
public void testRemovePartitionEpochs() {
Uuid fooTopicId = Uuid.randomUuid();
ShareGroup shareGroup = createShareGroup("foo");
// Removing should fail because there is no epoch set.
assertThrows(IllegalStateException.class, () -> shareGroup.removePartitionEpochs(
mkAssignment(
mkTopicAssignment(fooTopicId, 1)
),
10
));
ShareGroupMember m1 = new ShareGroupMember.Builder("m1")
.setMemberEpoch(10)
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(fooTopicId, 1)))
.build();
shareGroup.updateMember(m1);
// Removing should fail because the expected epoch is incorrect.
assertThrows(IllegalStateException.class, () -> shareGroup.removePartitionEpochs(
mkAssignment(
mkTopicAssignment(fooTopicId, 1)
),
11
));
}
@Test
public void testAddPartitionEpochs() {
Uuid fooTopicId = Uuid.randomUuid();
ShareGroup shareGroup = createShareGroup("foo");
shareGroup.addPartitionEpochs(
mkAssignment(
mkTopicAssignment(fooTopicId, 1)
),
10
);
// Changing the epoch should fail because the owner of the partition
// should remove it first.
assertThrows(IllegalStateException.class, () -> shareGroup.addPartitionEpochs(
mkAssignment(
mkTopicAssignment(fooTopicId, 1)
),
11
));
}
@Test
public void testDeletingMemberRemovesPartitionEpoch() {
Uuid fooTopicId = Uuid.randomUuid();
ShareGroup shareGroup = createShareGroup("foo");
ShareGroupMember member;
member = new ShareGroupMember.Builder("member")
.setMemberEpoch(10)
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(fooTopicId, 1, 2, 3)))
.build();
shareGroup.updateMember(member);
assertEquals(10, shareGroup.currentPartitionEpoch(fooTopicId, 1));
assertEquals(10, shareGroup.currentPartitionEpoch(fooTopicId, 2));
assertEquals(10, shareGroup.currentPartitionEpoch(fooTopicId, 3));
shareGroup.removeMember(member.memberId());
assertEquals(-1, shareGroup.currentPartitionEpoch(fooTopicId, 1));
assertEquals(-1, shareGroup.currentPartitionEpoch(fooTopicId, 2));
assertEquals(-1, shareGroup.currentPartitionEpoch(fooTopicId, 3));
}
@Test
public void testGroupState() {
ShareGroup shareGroup = createShareGroup("foo");
assertEquals(ShareGroup.ShareGroupState.EMPTY, shareGroup.state());
assertEquals("Empty", shareGroup.stateAsString());
ShareGroupMember member1 = new ShareGroupMember.Builder("member1")
.setState(MemberState.STABLE)
.setMemberEpoch(1)
.setPreviousMemberEpoch(0)
.build();
shareGroup.updateMember(member1);
shareGroup.setGroupEpoch(1);
assertEquals(MemberState.STABLE, member1.state());
assertEquals(ShareGroupState.STABLE, shareGroup.state());
assertEquals("Stable", shareGroup.stateAsString());
}
@Test
public void testGroupTypeFromString() {
assertEquals(Group.GroupType.parse("share"), Group.GroupType.SHARE);
// Test case insensitivity.
assertEquals(Group.GroupType.parse("Share"), Group.GroupType.SHARE);
assertEquals(Group.GroupType.parse("SHare"), Group.GroupType.SHARE);
}
@Test
public void testUpdateSubscriptionMetadata() {
Uuid fooTopicId = Uuid.randomUuid();
Uuid barTopicId = Uuid.randomUuid();
Uuid zarTopicId = Uuid.randomUuid();
MetadataImage image = new MetadataImageBuilder()
.addTopic(fooTopicId, "foo", 1)
.addTopic(barTopicId, "bar", 2)
.addTopic(zarTopicId, "zar", 3)
.addRacks()
.build();
ShareGroupMember member1 = new ShareGroupMember.Builder("member1")
.setSubscribedTopicNames(Collections.singletonList("foo"))
.build();
ShareGroupMember member2 = new ShareGroupMember.Builder("member2")
.setSubscribedTopicNames(Collections.singletonList("bar"))
.build();
ShareGroupMember member3 = new ShareGroupMember.Builder("member3")
.setSubscribedTopicNames(Collections.singletonList("zar"))
.build();
ShareGroup shareGroup = createShareGroup("group-foo");
// It should be empty by default.
assertEquals(
Collections.emptyMap(),
shareGroup.computeSubscriptionMetadata(
shareGroup.computeSubscribedTopicNames(null, null),
image.topics(),
image.cluster()
)
);
// Compute while taking into account member 1.
assertEquals(
mkMap(
mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1)))
),
shareGroup.computeSubscriptionMetadata(
shareGroup.computeSubscribedTopicNames(null, member1),
image.topics(),
image.cluster()
)
);
// Updating the group with member1.
shareGroup.updateMember(member1);
// It should return foo now.
assertEquals(
mkMap(
mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1)))
),
shareGroup.computeSubscriptionMetadata(
shareGroup.computeSubscribedTopicNames(null, null),
image.topics(),
image.cluster()
)
);
// Compute while taking into account removal of member 1.
assertEquals(
Collections.emptyMap(),
shareGroup.computeSubscriptionMetadata(
shareGroup.computeSubscribedTopicNames(member1, null),
image.topics(),
image.cluster()
)
);
// Compute while taking into account member 2.
assertEquals(
mkMap(
mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1))),
mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, mkMapOfPartitionRacks(2)))
),
shareGroup.computeSubscriptionMetadata(
shareGroup.computeSubscribedTopicNames(null, member2),
image.topics(),
image.cluster()
)
);
// Updating the group with member2.
shareGroup.updateMember(member2);
// It should return foo and bar.
assertEquals(
mkMap(
mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1))),
mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, mkMapOfPartitionRacks(2)))
),
shareGroup.computeSubscriptionMetadata(
shareGroup.computeSubscribedTopicNames(null, null),
image.topics(),
image.cluster()
)
);
// Compute while taking into account removal of member 2.
assertEquals(
mkMap(
mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1)))
),
shareGroup.computeSubscriptionMetadata(
shareGroup.computeSubscribedTopicNames(member2, null),
image.topics(),
image.cluster()
)
);
// Removing member1 results in returning bar.
assertEquals(
mkMap(
mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, mkMapOfPartitionRacks(2)))
),
shareGroup.computeSubscriptionMetadata(
shareGroup.computeSubscribedTopicNames(member1, null),
image.topics(),
image.cluster()
)
);
// Compute while taking into account member 3.
assertEquals(
mkMap(
mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1))),
mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, mkMapOfPartitionRacks(2))),
mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3, mkMapOfPartitionRacks(3)))
),
shareGroup.computeSubscriptionMetadata(
shareGroup.computeSubscribedTopicNames(null, member3),
image.topics(),
image.cluster()
)
);
// Updating group with member3.
shareGroup.updateMember(member3);
// It should return foo, bar and zar.
assertEquals(
mkMap(
mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1))),
mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, mkMapOfPartitionRacks(2))),
mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3, mkMapOfPartitionRacks(3)))
),
shareGroup.computeSubscriptionMetadata(
shareGroup.computeSubscribedTopicNames(null, null),
image.topics(),
image.cluster()
)
);
// Compute while taking into account removal of member 1, member 2 and member 3
assertEquals(
Collections.emptyMap(),
shareGroup.computeSubscriptionMetadata(
shareGroup.computeSubscribedTopicNames(new HashSet<>(Arrays.asList(member1, member2, member3))),
image.topics(),
image.cluster()
)
);
// Compute while taking into account removal of member 2 and member 3.
assertEquals(
mkMap(
mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1)))
),
shareGroup.computeSubscriptionMetadata(
shareGroup.computeSubscribedTopicNames(new HashSet<>(Arrays.asList(member2, member3))),
image.topics(),
image.cluster()
)
);
// Compute while taking into account removal of member 1.
assertEquals(
mkMap(
mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, mkMapOfPartitionRacks(2))),
mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3, mkMapOfPartitionRacks(3)))
),
shareGroup.computeSubscriptionMetadata(
shareGroup.computeSubscribedTopicNames(Collections.singleton(member1)),
image.topics(),
image.cluster()
)
);
// It should return foo, bar and zar.
assertEquals(
mkMap(
mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1))),
mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, mkMapOfPartitionRacks(2))),
mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3, mkMapOfPartitionRacks(3)))
),
shareGroup.computeSubscriptionMetadata(
shareGroup.computeSubscribedTopicNames(Collections.emptySet()),
image.topics(),
image.cluster()
)
);
}
@Test
public void testUpdateSubscribedTopicNamesAndSubscriptionType() {
ShareGroupMember member1 = new ShareGroupMember.Builder("member1")
.setSubscribedTopicNames(Collections.singletonList("foo"))
.build();
ShareGroupMember member2 = new ShareGroupMember.Builder("member2")
.setSubscribedTopicNames(Arrays.asList("bar", "foo"))
.build();
ShareGroupMember member3 = new ShareGroupMember.Builder("member3")
.setSubscribedTopicNames(Arrays.asList("bar", "foo"))
.build();
ShareGroup shareGroup = createShareGroup("group-foo");
// It should be empty by default.
assertEquals(
Collections.emptyMap(),
shareGroup.subscribedTopicNames()
);
// It should be Homogeneous by default.
assertEquals(
HOMOGENEOUS,
shareGroup.subscriptionType()
);
shareGroup.updateMember(member1);
// It should be Homogeneous since there is just 1 member
assertEquals(
HOMOGENEOUS,
shareGroup.subscriptionType()
);
shareGroup.updateMember(member2);
assertEquals(
HETEROGENEOUS,
shareGroup.subscriptionType()
);
shareGroup.updateMember(member3);
assertEquals(
HETEROGENEOUS,
shareGroup.subscriptionType()
);
shareGroup.removeMember(member1.memberId());
assertEquals(
HOMOGENEOUS,
shareGroup.subscriptionType()
);
ShareGroupMember member4 = new ShareGroupMember.Builder("member2")
.setSubscribedTopicNames(Arrays.asList("bar", "foo", "zar"))
.build();
shareGroup.updateMember(member4);
assertEquals(
HETEROGENEOUS,
shareGroup.subscriptionType()
);
}
@Test
public void testUpdateInvertedAssignment() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
ShareGroup shareGroup = new ShareGroup(snapshotRegistry, "test-group");
Uuid topicId = Uuid.randomUuid();
String memberId1 = "member1";
String memberId2 = "member2";
// Initial assignment for member1
Assignment initialAssignment = new Assignment(Collections.singletonMap(
topicId,
new HashSet<>(Collections.singletonList(0))
));
shareGroup.updateTargetAssignment(memberId1, initialAssignment);
// Verify that partition 0 is assigned to member1.
assertEquals(
mkMap(
mkEntry(topicId, mkMap(mkEntry(0, memberId1)))
),
shareGroup.invertedTargetAssignment()
);
// New assignment for member1
Assignment newAssignment = new Assignment(Collections.singletonMap(
topicId,
new HashSet<>(Collections.singletonList(1))
));
shareGroup.updateTargetAssignment(memberId1, newAssignment);
// Verify that partition 0 is no longer assigned and partition 1 is assigned to member1
assertEquals(
mkMap(
mkEntry(topicId, mkMap(mkEntry(1, memberId1)))
),
shareGroup.invertedTargetAssignment()
);
// New assignment for member2 to add partition 1
Assignment newAssignment2 = new Assignment(Collections.singletonMap(
topicId,
new HashSet<>(Collections.singletonList(1))
));
shareGroup.updateTargetAssignment(memberId2, newAssignment2);
// Verify that partition 1 is assigned to member2
assertEquals(
mkMap(
mkEntry(topicId, mkMap(mkEntry(1, memberId2)))
),
shareGroup.invertedTargetAssignment()
);
// New assignment for member1 to revoke partition 1 and assign partition 0
Assignment newAssignment1 = new Assignment(Collections.singletonMap(
topicId,
new HashSet<>(Collections.singletonList(0))
));
shareGroup.updateTargetAssignment(memberId1, newAssignment1);
// Verify that partition 1 is still assigned to member2 and partition 0 is assigned to member1
assertEquals(
mkMap(
mkEntry(topicId, mkMap(
mkEntry(0, memberId1),
mkEntry(1, memberId2)
))
),
shareGroup.invertedTargetAssignment()
);
// Test remove target assignment for member1
shareGroup.removeTargetAssignment(memberId1);
// Verify that partition 0 is no longer assigned and partition 1 is still assigned to member2
assertEquals(
mkMap(
mkEntry(topicId, mkMap(mkEntry(1, memberId2)))
),
shareGroup.invertedTargetAssignment()
);
}
@Test
public void testMetadataRefreshDeadline() {
MockTime time = new MockTime();
ShareGroup shareGroup = createShareGroup("group-foo");
// Group epoch starts at 0.
assertEquals(0, shareGroup.groupEpoch());
// The refresh time deadline should be empty when the group is created or loaded.
assertTrue(shareGroup.hasMetadataExpired(time.milliseconds()));
assertEquals(0L, shareGroup.metadataRefreshDeadline().deadlineMs);
assertEquals(0, shareGroup.metadataRefreshDeadline().epoch);
// Set the refresh deadline. The metadata remains valid because the deadline
// has not past and the group epoch is correct.
shareGroup.setMetadataRefreshDeadline(time.milliseconds() + 1000, shareGroup.groupEpoch());
assertFalse(shareGroup.hasMetadataExpired(time.milliseconds()));
assertEquals(time.milliseconds() + 1000, shareGroup.metadataRefreshDeadline().deadlineMs);
assertEquals(shareGroup.groupEpoch(), shareGroup.metadataRefreshDeadline().epoch);
// Advance past the deadline. The metadata should have expired.
time.sleep(1001L);
assertTrue(shareGroup.hasMetadataExpired(time.milliseconds()));
// Set the refresh time deadline with a higher group epoch. The metadata is considered
// as expired because the group epoch attached to the deadline is higher than the
// current group epoch.
shareGroup.setMetadataRefreshDeadline(time.milliseconds() + 1000, shareGroup.groupEpoch() + 1);
assertTrue(shareGroup.hasMetadataExpired(time.milliseconds()));
assertEquals(time.milliseconds() + 1000, shareGroup.metadataRefreshDeadline().deadlineMs);
assertEquals(shareGroup.groupEpoch() + 1, shareGroup.metadataRefreshDeadline().epoch);
// Advance the group epoch.
shareGroup.setGroupEpoch(shareGroup.groupEpoch() + 1);
// Set the refresh deadline. The metadata remains valid because the deadline
// has not past and the group epoch is correct.
shareGroup.setMetadataRefreshDeadline(time.milliseconds() + 1000, shareGroup.groupEpoch());
assertFalse(shareGroup.hasMetadataExpired(time.milliseconds()));
assertEquals(time.milliseconds() + 1000, shareGroup.metadataRefreshDeadline().deadlineMs);
assertEquals(shareGroup.groupEpoch(), shareGroup.metadataRefreshDeadline().epoch);
// Request metadata refresh. The metadata expires immediately.
shareGroup.requestMetadataRefresh();
assertTrue(shareGroup.hasMetadataExpired(time.milliseconds()));
assertEquals(0L, shareGroup.metadataRefreshDeadline().deadlineMs);
assertEquals(0, shareGroup.metadataRefreshDeadline().epoch);
}
@ParameterizedTest
@ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
public void testValidateOffsetCommit(short version) {
ShareGroup shareGroup = createShareGroup("group-foo");
assertThrows(UnsupportedOperationException.class, () ->
shareGroup.validateOffsetCommit(null, null, -1, false, version));
}
@Test
public void testAsListedGroup() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
ShareGroup shareGroup = new ShareGroup(snapshotRegistry, "group-foo");
snapshotRegistry.getOrCreateSnapshot(0);
assertEquals(ShareGroupState.EMPTY, shareGroup.state(0));
assertEquals("Empty", shareGroup.stateAsString(0));
shareGroup.updateMember(new ShareGroupMember.Builder("member1")
.setSubscribedTopicNames(Collections.singletonList("foo"))
.build());
snapshotRegistry.getOrCreateSnapshot(1);
assertEquals(ShareGroupState.EMPTY, shareGroup.state(0));
assertEquals("Empty", shareGroup.stateAsString(0));
assertEquals(ShareGroupState.STABLE, shareGroup.state(1));
assertEquals("Stable", shareGroup.stateAsString(1));
}
@Test
public void testOffsetExpirationCondition() {
ShareGroup shareGroup = createShareGroup("group-foo");
assertThrows(UnsupportedOperationException.class, shareGroup::offsetExpirationCondition);
}
@Test
public void testValidateOffsetFetch() {
ShareGroup shareGroup = createShareGroup("group-foo");
assertThrows(UnsupportedOperationException.class, () ->
shareGroup.validateOffsetFetch(null, -1, -1));
}
@Test
public void testValidateOffsetDelete() {
ShareGroup shareGroup = createShareGroup("group-foo");
assertThrows(UnsupportedOperationException.class, shareGroup::validateOffsetDelete);
}
@Test
public void testValidateDeleteGroup() {
ShareGroup shareGroup = createShareGroup("foo");
assertEquals(ShareGroupState.EMPTY, shareGroup.state());
assertDoesNotThrow(shareGroup::validateDeleteGroup);
ShareGroupMember member1 = new ShareGroupMember.Builder("member1")
.setMemberEpoch(1)
.setPreviousMemberEpoch(0)
.build();
shareGroup.updateMember(member1);
assertEquals(ShareGroupState.STABLE, shareGroup.state());
assertThrows(GroupNotEmptyException.class, shareGroup::validateDeleteGroup);
shareGroup.setGroupEpoch(1);
assertEquals(ShareGroupState.STABLE, shareGroup.state());
assertThrows(GroupNotEmptyException.class, shareGroup::validateDeleteGroup);
shareGroup.setTargetAssignmentEpoch(1);
assertEquals(ShareGroupState.STABLE, shareGroup.state());
assertThrows(GroupNotEmptyException.class, shareGroup::validateDeleteGroup);
}
@Test
public void testIsSubscribedToTopic() {
Uuid fooTopicId = Uuid.randomUuid();
Uuid barTopicId = Uuid.randomUuid();
MetadataImage image = new MetadataImageBuilder()
.addTopic(fooTopicId, "foo", 1)
.addTopic(barTopicId, "bar", 2)
.addRacks()
.build();
ShareGroupMember member1 = new ShareGroupMember.Builder("member1")
.setSubscribedTopicNames(Collections.singletonList("foo"))
.build();
ShareGroupMember member2 = new ShareGroupMember.Builder("member2")
.setSubscribedTopicNames(Collections.singletonList("bar"))
.build();
ShareGroup shareGroup = createShareGroup("group-foo");
shareGroup.updateMember(member1);
shareGroup.updateMember(member2);
assertEquals(
mkMap(
mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1))),
mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, mkMapOfPartitionRacks(2)))
),
shareGroup.computeSubscriptionMetadata(
shareGroup.computeSubscribedTopicNames(null, null),
image.topics(),
image.cluster()
)
);
assertTrue(shareGroup.isSubscribedToTopic("foo"));
assertTrue(shareGroup.isSubscribedToTopic("bar"));
shareGroup.removeMember("member1");
assertFalse(shareGroup.isSubscribedToTopic("foo"));
shareGroup.removeMember("member2");
assertFalse(shareGroup.isSubscribedToTopic("bar"));
}
@Test
public void testAsDescribedGroup() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
ShareGroup shareGroup = new ShareGroup(snapshotRegistry, "group-id-1");
snapshotRegistry.getOrCreateSnapshot(0);
assertEquals(ShareGroupState.EMPTY.toString(), shareGroup.stateAsString(0));
shareGroup.updateMember(new ShareGroupMember.Builder("member1")
.setSubscribedTopicNames(Collections.singletonList("foo"))
.build());
shareGroup.updateMember(new ShareGroupMember.Builder("member2")
.build());
snapshotRegistry.getOrCreateSnapshot(1);
ShareGroupDescribeResponseData.DescribedGroup expected = new ShareGroupDescribeResponseData.DescribedGroup()
.setGroupId("group-id-1")
.setGroupState(ShareGroupState.STABLE.toString())
.setGroupEpoch(0)
.setAssignmentEpoch(0)
.setAssignorName("assignorName")
.setMembers(Arrays.asList(
new ShareGroupDescribeResponseData.Member()
.setMemberId("member1")
.setSubscribedTopicNames(Collections.singletonList("foo")),
new ShareGroupDescribeResponseData.Member().setMemberId("member2")
));
ShareGroupDescribeResponseData.DescribedGroup actual = shareGroup.asDescribedGroup(1, "assignorName",
new MetadataImageBuilder().build().topics());
assertEquals(expected, actual);
}
@Test
public void testIsInStatesCaseInsensitive() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
ShareGroup shareGroup = new ShareGroup(snapshotRegistry, "group-foo");
snapshotRegistry.getOrCreateSnapshot(0);
assertTrue(shareGroup.isInStates(Collections.singleton("empty"), 0));
assertFalse(shareGroup.isInStates(Collections.singleton("Empty"), 0));
shareGroup.updateMember(new ShareGroupMember.Builder("member1")
.setSubscribedTopicNames(Collections.singletonList("foo"))
.build());
snapshotRegistry.getOrCreateSnapshot(1);
assertTrue(shareGroup.isInStates(Collections.singleton("empty"), 0));
assertTrue(shareGroup.isInStates(Collections.singleton("stable"), 1));
assertFalse(shareGroup.isInStates(Collections.singleton("empty"), 1));
}
private ShareGroup createShareGroup(String groupId) {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
return new ShareGroup(
snapshotRegistry,
groupId
);
}
}