KAFKA-17593; [5/N] Include resolved regular expressions into target assignment computation (#17750)

This patch does a few things:
* Refactors the `TargetAssignmentBuilder` to use inheritance to differentiate Consumer and Share groups.
* Introduces `UnionSet` to lazily aggregate the subscriptions for a given member. 
* Wires the resolved regular expressions in the `GroupMetadataManager`. At the moment, they are only used when the target assignment is computed.

Reviewers: Sean Quah <squah@confluent.io>, Jeff Kim <jeff.kim@confluent.io>, Lianet Magrans <lmagrans@confluent.io>
This commit is contained in:
David Jacot 2024-11-13 15:59:52 +01:00 committed by GitHub
parent 05bca43c61
commit a802865aad
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 803 additions and 131 deletions

View File

@ -2606,8 +2606,8 @@ public class GroupMetadataManager {
updatedMember
).orElse(defaultConsumerGroupAssignor.name());
try {
TargetAssignmentBuilder<ConsumerGroupMember> assignmentResultBuilder =
new TargetAssignmentBuilder<ConsumerGroupMember>(group.groupId(), groupEpoch, consumerGroupAssignors.get(preferredServerAssignor))
TargetAssignmentBuilder.ConsumerTargetAssignmentBuilder assignmentResultBuilder =
new TargetAssignmentBuilder.ConsumerTargetAssignmentBuilder(group.groupId(), groupEpoch, consumerGroupAssignors.get(preferredServerAssignor))
.withMembers(group.members())
.withStaticMembers(group.staticMembers())
.withSubscriptionMetadata(subscriptionMetadata)
@ -2615,6 +2615,7 @@ public class GroupMetadataManager {
.withTargetAssignment(group.targetAssignment())
.withInvertedTargetAssignment(group.invertedTargetAssignment())
.withTopicsImage(metadataImage.topics())
.withResolvedRegularExpressions(group.resolvedRegularExpressions())
.addOrUpdateMember(updatedMember.memberId(), updatedMember);
// If the instance id was associated to a different member, it means that the
@ -2673,16 +2674,14 @@ public class GroupMetadataManager {
List<CoordinatorRecord> records
) {
try {
TargetAssignmentBuilder<ShareGroupMember> assignmentResultBuilder =
new TargetAssignmentBuilder<ShareGroupMember>(group.groupId(), groupEpoch, shareGroupAssignor)
TargetAssignmentBuilder.ShareTargetAssignmentBuilder assignmentResultBuilder =
new TargetAssignmentBuilder.ShareTargetAssignmentBuilder(group.groupId(), groupEpoch, shareGroupAssignor)
.withMembers(group.members())
.withSubscriptionMetadata(subscriptionMetadata)
.withSubscriptionType(subscriptionType)
.withTargetAssignment(group.targetAssignment())
.withInvertedTargetAssignment(group.invertedTargetAssignment())
.withTopicsImage(metadataImage.topics())
.withTargetAssignmentRecordBuilder(GroupCoordinatorRecordHelpers::newShareGroupTargetAssignmentRecord)
.withTargetAssignmentEpochRecordBuilder(GroupCoordinatorRecordHelpers::newShareGroupTargetAssignmentEpochRecord)
.addOrUpdateMember(updatedMember.memberId(), updatedMember);
long startTimeMs = time.milliseconds();

View File

@ -24,6 +24,9 @@ import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignor;
import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException;
import org.apache.kafka.coordinator.group.api.assignor.SubscriptionType;
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember;
import org.apache.kafka.coordinator.group.modern.consumer.ResolvedRegularExpression;
import org.apache.kafka.coordinator.group.modern.share.ShareGroupMember;
import org.apache.kafka.image.TopicsImage;
import java.util.ArrayList;
@ -47,7 +50,7 @@ import java.util.Set;
* is deleted as part of the member deletion process. In other words, this class
* does not yield a tombstone for removed members.
*/
public class TargetAssignmentBuilder<T extends ModernGroupMember> {
public abstract class TargetAssignmentBuilder<T extends ModernGroupMember, U extends TargetAssignmentBuilder<T, U>> {
/**
* The assignment result returned by {{@link TargetAssignmentBuilder#build()}}.
@ -89,6 +92,144 @@ public class TargetAssignmentBuilder<T extends ModernGroupMember> {
}
}
public static class ConsumerTargetAssignmentBuilder extends TargetAssignmentBuilder<ConsumerGroupMember, ConsumerTargetAssignmentBuilder> {
/**
* The resolved regular expressions.
*/
private Map<String, ResolvedRegularExpression> resolvedRegularExpressions = Collections.emptyMap();
public ConsumerTargetAssignmentBuilder(
String groupId,
int groupEpoch,
PartitionAssignor assignor
) {
super(groupId, groupEpoch, assignor);
}
/**
* Adds all the existing resolved regular expressions.
*
* @param resolvedRegularExpressions The resolved regular expressions.
* @return This object.
*/
public ConsumerTargetAssignmentBuilder withResolvedRegularExpressions(
Map<String, ResolvedRegularExpression> resolvedRegularExpressions
) {
this.resolvedRegularExpressions = resolvedRegularExpressions;
return self();
}
@Override
protected ConsumerTargetAssignmentBuilder self() {
return this;
}
@Override
protected CoordinatorRecord newTargetAssignmentRecord(
String groupId,
String memberId,
Map<Uuid, Set<Integer>> partitions
) {
return GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(
groupId,
memberId,
partitions
);
}
@Override
protected CoordinatorRecord newTargetAssignmentEpochRecord(String groupId, int assignmentEpoch) {
return GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(
groupId,
assignmentEpoch
);
}
@Override
protected MemberSubscriptionAndAssignmentImpl newMemberSubscriptionAndAssignment(
ConsumerGroupMember member,
Assignment memberAssignment,
TopicIds.TopicResolver topicResolver
) {
Set<String> subscriptions = member.subscribedTopicNames();
// Check whether the member is also subscribed to a regular expression. If it is,
// create the union of the two subscriptions.
String subscribedTopicRegex = member.subscribedTopicRegex();
if (subscribedTopicRegex != null && !subscribedTopicRegex.isEmpty()) {
ResolvedRegularExpression resolvedRegularExpression = resolvedRegularExpressions.get(subscribedTopicRegex);
if (resolvedRegularExpression != null) {
if (subscriptions.isEmpty()) {
subscriptions = resolvedRegularExpression.topics;
} else if (!resolvedRegularExpression.topics.isEmpty()) {
// We only use a UnionSet when the member uses both type of subscriptions. The
// protocol allows it. However, the Apache Kafka Consumer does not support it.
// Other clients such as librdkafka may support it.
subscriptions = new UnionSet<>(subscriptions, resolvedRegularExpression.topics);
}
}
}
return new MemberSubscriptionAndAssignmentImpl(
Optional.ofNullable(member.rackId()),
Optional.ofNullable(member.instanceId()),
new TopicIds(subscriptions, topicResolver),
memberAssignment
);
}
}
public static class ShareTargetAssignmentBuilder extends TargetAssignmentBuilder<ShareGroupMember, ShareTargetAssignmentBuilder> {
public ShareTargetAssignmentBuilder(
String groupId,
int groupEpoch,
PartitionAssignor assignor
) {
super(groupId, groupEpoch, assignor);
}
@Override
protected ShareTargetAssignmentBuilder self() {
return this;
}
@Override
protected CoordinatorRecord newTargetAssignmentRecord(
String groupId,
String memberId,
Map<Uuid, Set<Integer>> partitions
) {
return GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentRecord(
groupId,
memberId,
partitions
);
}
@Override
protected CoordinatorRecord newTargetAssignmentEpochRecord(String groupId, int assignmentEpoch) {
return GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentEpochRecord(
groupId,
assignmentEpoch
);
}
@Override
protected MemberSubscriptionAndAssignmentImpl newMemberSubscriptionAndAssignment(
ShareGroupMember member,
Assignment memberAssignment,
TopicIds.TopicResolver topicResolver
) {
return new MemberSubscriptionAndAssignmentImpl(
Optional.ofNullable(member.rackId()),
Optional.ofNullable(member.instanceId()),
new TopicIds(member.subscribedTopicNames(), topicResolver),
memberAssignment
);
}
}
/**
* The group id.
*/
@ -146,27 +287,6 @@ public class TargetAssignmentBuilder<T extends ModernGroupMember> {
*/
private Map<String, String> staticMembers = new HashMap<>();
public interface TargetAssignmentRecordBuilder {
CoordinatorRecord build(
final String groupId,
final String memberId,
final Map<Uuid, Set<Integer>> partitions
);
}
public interface TargetAssignmentEpochRecordBuilder {
CoordinatorRecord build(
final String groupId,
final int assignmentEpoch
);
}
private TargetAssignmentRecordBuilder targetAssignmentRecordBuilder =
GroupCoordinatorRecordHelpers::newConsumerGroupTargetAssignmentRecord;
private TargetAssignmentEpochRecordBuilder targetAssignmentEpochRecordBuilder =
GroupCoordinatorRecordHelpers::newConsumerGroupTargetAssignmentEpochRecord;
/**
* Constructs the object.
*
@ -190,11 +310,11 @@ public class TargetAssignmentBuilder<T extends ModernGroupMember> {
* @param members The existing members in the consumer group.
* @return This object.
*/
public TargetAssignmentBuilder<T> withMembers(
public U withMembers(
Map<String, T> members
) {
this.members = members;
return this;
return self();
}
/**
@ -203,11 +323,11 @@ public class TargetAssignmentBuilder<T extends ModernGroupMember> {
* @param staticMembers The existing static members in the consumer group.
* @return This object.
*/
public TargetAssignmentBuilder<T> withStaticMembers(
public U withStaticMembers(
Map<String, String> staticMembers
) {
this.staticMembers = staticMembers;
return this;
return self();
}
/**
@ -216,11 +336,11 @@ public class TargetAssignmentBuilder<T extends ModernGroupMember> {
* @param subscriptionMetadata The subscription metadata.
* @return This object.
*/
public TargetAssignmentBuilder<T> withSubscriptionMetadata(
public U withSubscriptionMetadata(
Map<String, TopicMetadata> subscriptionMetadata
) {
this.subscriptionMetadata = subscriptionMetadata;
return this;
return self();
}
/**
@ -229,11 +349,11 @@ public class TargetAssignmentBuilder<T extends ModernGroupMember> {
* @param subscriptionType Subscription type of the group.
* @return This object.
*/
public TargetAssignmentBuilder<T> withSubscriptionType(
public U withSubscriptionType(
SubscriptionType subscriptionType
) {
this.subscriptionType = subscriptionType;
return this;
return self();
}
/**
@ -242,11 +362,11 @@ public class TargetAssignmentBuilder<T extends ModernGroupMember> {
* @param targetAssignment The existing target assignment.
* @return This object.
*/
public TargetAssignmentBuilder<T> withTargetAssignment(
public U withTargetAssignment(
Map<String, Assignment> targetAssignment
) {
this.targetAssignment = targetAssignment;
return this;
return self();
}
/**
@ -255,11 +375,11 @@ public class TargetAssignmentBuilder<T extends ModernGroupMember> {
* @param invertedTargetAssignment The reverse lookup map of the current target assignment.
* @return This object.
*/
public TargetAssignmentBuilder<T> withInvertedTargetAssignment(
public U withInvertedTargetAssignment(
Map<Uuid, Map<Integer, String>> invertedTargetAssignment
) {
this.invertedTargetAssignment = invertedTargetAssignment;
return this;
return self();
}
/**
@ -268,25 +388,11 @@ public class TargetAssignmentBuilder<T extends ModernGroupMember> {
* @param topicsImage The topics image.
* @return This object.
*/
public TargetAssignmentBuilder<T> withTopicsImage(
public U withTopicsImage(
TopicsImage topicsImage
) {
this.topicsImage = topicsImage;
return this;
}
public TargetAssignmentBuilder<T> withTargetAssignmentRecordBuilder(
TargetAssignmentRecordBuilder targetAssignmentRecordBuilder
) {
this.targetAssignmentRecordBuilder = targetAssignmentRecordBuilder;
return this;
}
public TargetAssignmentBuilder<T> withTargetAssignmentEpochRecordBuilder(
TargetAssignmentEpochRecordBuilder targetAssignmentEpochRecordBuilder
) {
this.targetAssignmentEpochRecordBuilder = targetAssignmentEpochRecordBuilder;
return this;
return self();
}
/**
@ -297,12 +403,12 @@ public class TargetAssignmentBuilder<T extends ModernGroupMember> {
* @param member The member to add or update.
* @return This object.
*/
public TargetAssignmentBuilder<T> addOrUpdateMember(
public U addOrUpdateMember(
String memberId,
T member
) {
this.updatedMembers.put(memberId, member);
return this;
return self();
}
/**
@ -312,7 +418,7 @@ public class TargetAssignmentBuilder<T extends ModernGroupMember> {
* @param memberId The member id.
* @return This object.
*/
public TargetAssignmentBuilder<T> removeMember(
public U removeMember(
String memberId
) {
return addOrUpdateMember(memberId, null);
@ -331,7 +437,7 @@ public class TargetAssignmentBuilder<T extends ModernGroupMember> {
// Prepare the member spec for all members.
members.forEach((memberId, member) ->
memberSpecs.put(memberId, createMemberSubscriptionAndAssignment(
memberSpecs.put(memberId, newMemberSubscriptionAndAssignment(
member,
targetAssignment.getOrDefault(memberId, Assignment.EMPTY),
topicResolver
@ -353,7 +459,7 @@ public class TargetAssignmentBuilder<T extends ModernGroupMember> {
}
}
memberSpecs.put(memberId, createMemberSubscriptionAndAssignment(
memberSpecs.put(memberId, newMemberSubscriptionAndAssignment(
updatedMemberOrNull,
assignment,
topicResolver
@ -391,7 +497,7 @@ public class TargetAssignmentBuilder<T extends ModernGroupMember> {
if (!newMemberAssignment.equals(oldMemberAssignment)) {
// If the member had no assignment or had a different assignment, we
// create a record for the new assignment.
records.add(targetAssignmentRecordBuilder.build(
records.add(newTargetAssignmentRecord(
groupId,
memberId,
newMemberAssignment.partitions()
@ -400,11 +506,30 @@ public class TargetAssignmentBuilder<T extends ModernGroupMember> {
}
// Bump the target assignment epoch.
records.add(targetAssignmentEpochRecordBuilder.build(groupId, groupEpoch));
records.add(newTargetAssignmentEpochRecord(groupId, groupEpoch));
return new TargetAssignmentResult(records, newGroupAssignment.members());
}
protected abstract U self();
protected abstract CoordinatorRecord newTargetAssignmentRecord(
String groupId,
String memberId,
Map<Uuid, Set<Integer>> partitions
);
protected abstract CoordinatorRecord newTargetAssignmentEpochRecord(
String groupId,
int assignmentEpoch
);
protected abstract MemberSubscriptionAndAssignmentImpl newMemberSubscriptionAndAssignment(
T member,
Assignment memberAssignment,
TopicIds.TopicResolver topicResolver
);
private Assignment newMemberAssignment(
GroupAssignment newGroupAssignment,
String memberId
@ -416,18 +541,4 @@ public class TargetAssignmentBuilder<T extends ModernGroupMember> {
return Assignment.EMPTY;
}
}
// private for testing
static <T extends ModernGroupMember> MemberSubscriptionAndAssignmentImpl createMemberSubscriptionAndAssignment(
T member,
Assignment memberAssignment,
TopicIds.TopicResolver topicResolver
) {
return new MemberSubscriptionAndAssignmentImpl(
Optional.ofNullable(member.rackId()),
Optional.ofNullable(member.instanceId()),
new TopicIds(member.subscribedTopicNames(), topicResolver),
memberAssignment
);
}
}

View File

@ -0,0 +1,219 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.modern;
import java.lang.reflect.Array;
import java.util.Collection;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Set;
/**
* A set which presents the union of two underlying sets without
* materializing it. This class expects the underlying sets to
* be immutable.
*
* @param <T> The set type.
*/
public class UnionSet<T> implements Set<T> {
private final Set<T> largeSet;
private final Set<T> smallSet;
private int size = -1;
public UnionSet(Set<T> s1, Set<T> s2) {
Objects.requireNonNull(s1);
Objects.requireNonNull(s2);
if (s1.size() > s2.size()) {
largeSet = s1;
smallSet = s2;
} else {
largeSet = s2;
smallSet = s1;
}
}
@Override
public int size() {
if (size == -1) {
size = largeSet.size();
for (T item : smallSet) {
if (!largeSet.contains(item)) {
size++;
}
}
}
return size;
}
@Override
public boolean isEmpty() {
return largeSet.isEmpty() && smallSet.isEmpty();
}
@Override
public boolean contains(Object o) {
return largeSet.contains(o) || smallSet.contains(o);
}
@Override
public Iterator<T> iterator() {
return new Iterator<T>() {
private final Iterator<T> largeSetIterator = largeSet.iterator();
private final Iterator<T> smallSetIterator = smallSet.iterator();
private T next = null;
@Override
public boolean hasNext() {
if (next != null) return true;
if (largeSetIterator.hasNext()) {
next = largeSetIterator.next();
return true;
}
while (smallSetIterator.hasNext()) {
next = smallSetIterator.next();
if (!largeSet.contains(next)) {
return true;
}
}
next = null;
return false;
}
@Override
public T next() {
if (!hasNext()) throw new NoSuchElementException();
T result = next;
next = null;
return result;
}
};
}
@Override
public Object[] toArray() {
Object[] array = new Object[size()];
int index = 0;
for (T item : largeSet) {
array[index] = item;
index++;
}
for (T item : smallSet) {
if (!largeSet.contains(item)) {
array[index] = item;
index++;
}
}
return array;
}
@Override
@SuppressWarnings("unchecked")
public <U> U[] toArray(U[] array) {
int size = size();
if (array.length < size) {
// Create a new array of the same type with the correct size
array = (U[]) Array.newInstance(array.getClass().getComponentType(), size);
}
int index = 0;
for (T item : largeSet) {
array[index] = (U) item;
index++;
}
for (T item : smallSet) {
if (!largeSet.contains(item)) {
array[index] = (U) item;
index++;
}
}
if (array.length > size) {
array[size] = null;
}
return array;
}
@Override
public boolean add(T t) {
throw new UnsupportedOperationException();
}
@Override
public boolean remove(Object o) {
throw new UnsupportedOperationException();
}
@Override
public boolean containsAll(Collection<?> c) {
for (Object o : c) {
if (!contains(o)) return false;
}
return true;
}
@Override
public boolean addAll(Collection<? extends T> c) {
throw new UnsupportedOperationException();
}
@Override
public boolean retainAll(Collection<?> c) {
throw new UnsupportedOperationException();
}
@Override
public boolean removeAll(Collection<?> c) {
throw new UnsupportedOperationException();
}
@Override
public void clear() {
throw new UnsupportedOperationException();
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof Set)) return false;
Set<?> set = (Set<?>) o;
if (set.size() != size()) return false;
return containsAll(set);
}
@Override
public int hashCode() {
int h = 0;
for (T item : largeSet) {
h += item.hashCode();
}
for (T item : smallSet) {
if (!largeSet.contains(item)) {
h += item.hashCode();
}
}
return h;
}
@Override
public String toString() {
return "UnionSet(" +
"largeSet=" + largeSet +
", smallSet=" + smallSet +
')';
}
}

View File

@ -416,6 +416,13 @@ public class ConsumerGroup extends ModernGroup<ConsumerGroupMember> {
return Collections.unmodifiableMap(staticMembers);
}
/**
* @return An immutable Map containing all the resolved regular expressions.
*/
public Map<String, ResolvedRegularExpression> resolvedRegularExpressions() {
return Collections.unmodifiableMap(resolvedRegularExpressions);
}
/**
* Returns the current epoch of a partition or -1 if the partition
* does not have one.

View File

@ -70,6 +70,7 @@ import org.apache.kafka.coordinator.common.runtime.MockCoordinatorTimer.ExpiredT
import org.apache.kafka.coordinator.common.runtime.MockCoordinatorTimer.ScheduledTimeout;
import org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor;
import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.api.assignor.GroupSpec;
import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException;
import org.apache.kafka.coordinator.group.classic.ClassicGroup;
import org.apache.kafka.coordinator.group.classic.ClassicGroupMember;
@ -14979,6 +14980,83 @@ public class GroupMetadataManagerTest {
);
}
@Test
public void testConsumerGroupMemberPicksUpExistingResolvedRegularExpression() {
String groupId = "fooup";
String memberId1 = Uuid.randomUuid().toString();
String memberId2 = Uuid.randomUuid().toString();
Uuid fooTopicId = Uuid.randomUuid();
String fooTopicName = "foo";
ConsumerGroupPartitionAssignor assignor = mock(ConsumerGroupPartitionAssignor.class);
when(assignor.name()).thenReturn("range");
when(assignor.assign(any(), any())).thenAnswer(answer -> {
GroupSpec spec = answer.getArgument(0);
List.of(memberId1, memberId2).forEach(memberId ->
assertEquals(
Collections.singleton(fooTopicId),
spec.memberSubscription(memberId).subscribedTopicIds(),
String.format("Member %s has unexpected subscribed topic ids", memberId)
)
);
return new GroupAssignment(Map.of(
memberId1, new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 0)
)),
memberId2, new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 1)
))
));
});
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withConsumerGroupAssignors(Collections.singletonList(assignor))
.withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 2)
.build())
.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
.withMember(new ConsumerGroupMember.Builder(memberId1)
.setState(MemberState.STABLE)
.setMemberEpoch(10)
.setPreviousMemberEpoch(10)
.setClientId(DEFAULT_CLIENT_ID)
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
.setSubscribedTopicRegex("foo*")
.setServerAssignorName("range")
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1)))
.build())
.withResolvedRegularExpression("foo*", new ResolvedRegularExpression(
Collections.singleton(fooTopicName),
100L,
12345L))
.withAssignment(memberId1, mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1)))
.withAssignmentEpoch(10))
.build();
CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> result = context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId2)
.setMemberEpoch(0)
.setRebalanceTimeoutMs(10000)
.setSubscribedTopicRegex("foo*")
.setTopicPartitions(Collections.emptyList()));
assertEquals(
new ConsumerGroupHeartbeatResponseData()
.setMemberId(memberId2)
.setMemberEpoch(11)
.setHeartbeatIntervalMs(5000)
.setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()),
result.response()
);
}
private static void checkJoinGroupResponse(
JoinGroupResponseData expectedResponse,
JoinGroupResponseData actualResponse,

View File

@ -21,10 +21,10 @@ import org.apache.kafka.coordinator.group.AssignmentTestUtil;
import org.apache.kafka.coordinator.group.MetadataImageBuilder;
import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.api.assignor.MemberSubscription;
import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignor;
import org.apache.kafka.coordinator.group.api.assignor.SubscriptionType;
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember;
import org.apache.kafka.coordinator.group.modern.consumer.ResolvedRegularExpression;
import org.apache.kafka.image.TopicsImage;
import org.junit.jupiter.api.Test;
@ -43,7 +43,6 @@ import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssig
import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord;
import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord;
import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS;
import static org.apache.kafka.coordinator.group.modern.TargetAssignmentBuilder.createMemberSubscriptionAndAssignment;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
@ -63,6 +62,7 @@ public class TargetAssignmentBuilderTest {
private final Map<String, Assignment> targetAssignment = new HashMap<>();
private final Map<String, MemberAssignment> memberAssignments = new HashMap<>();
private final Map<String, String> staticMembers = new HashMap<>();
private final Map<String, ResolvedRegularExpression> resolvedRegularExpressions = new HashMap<>();
private MetadataImageBuilder topicsImageBuilder = new MetadataImageBuilder();
public TargetAssignmentBuilderTestContext(
@ -78,17 +78,37 @@ public class TargetAssignmentBuilderTest {
List<String> subscriptions,
Map<Uuid, Set<Integer>> targetPartitions
) {
addGroupMember(memberId, null, subscriptions, targetPartitions);
addGroupMember(memberId, null, subscriptions, "", targetPartitions);
}
private void addGroupMember(
public void addGroupMember(
String memberId,
List<String> subscriptions,
String subscribedRegex,
Map<Uuid, Set<Integer>> targetPartitions
) {
addGroupMember(memberId, null, subscriptions, subscribedRegex, targetPartitions);
}
public void addGroupMember(
String memberId,
String instanceId,
List<String> subscriptions,
Map<Uuid, Set<Integer>> targetPartitions
) {
addGroupMember(memberId, instanceId, subscriptions, "", targetPartitions);
}
public void addGroupMember(
String memberId,
String instanceId,
List<String> subscriptions,
String subscribedRegex,
Map<Uuid, Set<Integer>> targetPartitions
) {
ConsumerGroupMember.Builder memberBuilder = new ConsumerGroupMember.Builder(memberId)
.setSubscribedTopicNames(subscriptions);
.setSubscribedTopicNames(subscriptions)
.setSubscribedTopicRegex(subscribedRegex);
if (instanceId != null) {
memberBuilder.setInstanceId(instanceId);
@ -158,6 +178,45 @@ public class TargetAssignmentBuilderTest {
memberAssignments.put(memberId, new MemberAssignmentImpl(assignment));
}
public void addResolvedRegularExpression(
String regex,
ResolvedRegularExpression resolvedRegularExpression
) {
resolvedRegularExpressions.put(regex, resolvedRegularExpression);
}
private MemberSubscriptionAndAssignmentImpl newMemberSubscriptionAndAssignment(
ConsumerGroupMember member,
Assignment memberAssignment,
TopicIds.TopicResolver topicResolver
) {
Set<String> subscriptions = member.subscribedTopicNames();
// Check whether the member is also subscribed to a regular expression. If it is,
// create the union of the two subscriptions.
String subscribedTopicRegex = member.subscribedTopicRegex();
if (subscribedTopicRegex != null && !subscribedTopicRegex.isEmpty()) {
ResolvedRegularExpression resolvedRegularExpression = resolvedRegularExpressions.get(subscribedTopicRegex);
if (resolvedRegularExpression != null) {
if (subscriptions.isEmpty()) {
subscriptions = resolvedRegularExpression.topics;
} else if (!resolvedRegularExpression.topics.isEmpty()) {
// We only use a UnionSet when the member uses both type of subscriptions. The
// protocol allows it. However, the Apache Kafka Consumer does not support it.
// Other clients such as librdkafka may support it.
subscriptions = new UnionSet<>(subscriptions, resolvedRegularExpression.topics);
}
}
}
return new MemberSubscriptionAndAssignmentImpl(
Optional.ofNullable(member.rackId()),
Optional.ofNullable(member.instanceId()),
new TopicIds(subscriptions, topicResolver),
memberAssignment
);
}
public TargetAssignmentBuilder.TargetAssignmentResult build() {
TopicsImage topicsImage = topicsImageBuilder.build().topics();
TopicIds.TopicResolver topicResolver = new TopicIds.CachedTopicResolver(topicsImage);
@ -166,7 +225,7 @@ public class TargetAssignmentBuilderTest {
// All the existing members are prepared.
members.forEach((memberId, member) ->
memberSubscriptions.put(memberId, createMemberSubscriptionAndAssignment(
memberSubscriptions.put(memberId, newMemberSubscriptionAndAssignment(
member,
targetAssignment.getOrDefault(memberId, Assignment.EMPTY),
topicResolver
@ -189,7 +248,7 @@ public class TargetAssignmentBuilderTest {
}
}
memberSubscriptions.put(memberId, createMemberSubscriptionAndAssignment(
memberSubscriptions.put(memberId, newMemberSubscriptionAndAssignment(
updatedMemberOrNull,
assignment,
topicResolver
@ -223,15 +282,16 @@ public class TargetAssignmentBuilderTest {
.thenReturn(new GroupAssignment(memberAssignments));
// Create and populate the assignment builder.
TargetAssignmentBuilder<ConsumerGroupMember> builder =
new TargetAssignmentBuilder<ConsumerGroupMember>(groupId, groupEpoch, assignor)
.withMembers(members)
.withStaticMembers(staticMembers)
.withSubscriptionMetadata(subscriptionMetadata)
.withSubscriptionType(subscriptionType)
.withTargetAssignment(targetAssignment)
.withInvertedTargetAssignment(invertedTargetAssignment)
.withTopicsImage(topicsImage);
TargetAssignmentBuilder.ConsumerTargetAssignmentBuilder builder =
new TargetAssignmentBuilder.ConsumerTargetAssignmentBuilder(groupId, groupEpoch, assignor)
.withMembers(members)
.withStaticMembers(staticMembers)
.withSubscriptionMetadata(subscriptionMetadata)
.withSubscriptionType(subscriptionType)
.withTargetAssignment(targetAssignment)
.withInvertedTargetAssignment(invertedTargetAssignment)
.withTopicsImage(topicsImage)
.withResolvedRegularExpressions(resolvedRegularExpressions);
// Add the updated members or delete the deleted members.
updatedMembers.forEach((memberId, updatedMemberOrNull) -> {
@ -254,42 +314,6 @@ public class TargetAssignmentBuilderTest {
}
}
@Test
public void testCreateMemberSubscriptionSpecImpl() {
Uuid fooTopicId = Uuid.randomUuid();
Uuid barTopicId = Uuid.randomUuid();
TopicsImage topicsImage = new MetadataImageBuilder()
.addTopic(fooTopicId, "foo", 5)
.addTopic(barTopicId, "bar", 5)
.build()
.topics();
TopicIds.TopicResolver topicResolver = new TopicIds.DefaultTopicResolver(topicsImage);
ConsumerGroupMember member = new ConsumerGroupMember.Builder("member-id")
.setSubscribedTopicNames(Arrays.asList("foo", "bar", "zar"))
.setRackId("rackId")
.setInstanceId("instanceId")
.build();
Assignment assignment = new Assignment(mkAssignment(
mkTopicAssignment(fooTopicId, 1, 2, 3),
mkTopicAssignment(barTopicId, 1, 2, 3)
));
MemberSubscription subscriptionSpec = createMemberSubscriptionAndAssignment(
member,
assignment,
topicResolver
);
assertEquals(new MemberSubscriptionAndAssignmentImpl(
Optional.of("rackId"),
Optional.of("instanceId"),
new TopicIds(Set.of("bar", "foo", "zar"), topicsImage),
assignment
), subscriptionSpec);
}
@Test
public void testEmpty() {
TargetAssignmentBuilderTestContext context = new TargetAssignmentBuilderTestContext(
@ -810,4 +834,80 @@ public class TargetAssignmentBuilderTest {
assertEquals(expectedAssignment, result.targetAssignment());
}
@Test
public void testRegularExpressions() {
TargetAssignmentBuilderTestContext context = new TargetAssignmentBuilderTestContext(
"my-group",
20
);
Uuid fooTopicId = context.addTopicMetadata("foo", 6);
Uuid barTopicId = context.addTopicMetadata("bar", 6);
context.addGroupMember("member-1", Arrays.asList("bar", "zar"), "foo*", mkAssignment());
context.addGroupMember("member-2", Arrays.asList("foo", "bar", "zar"), mkAssignment());
context.addGroupMember("member-3", Collections.emptyList(), "foo*", mkAssignment());
context.addResolvedRegularExpression("foo*", new ResolvedRegularExpression(
Collections.singleton("foo"),
10L,
12345L
));
context.prepareMemberAssignment("member-1", mkAssignment(
mkTopicAssignment(fooTopicId, 1, 2),
mkTopicAssignment(barTopicId, 1, 2, 3)
));
context.prepareMemberAssignment("member-2", mkAssignment(
mkTopicAssignment(fooTopicId, 3, 4),
mkTopicAssignment(barTopicId, 4, 5, 6)
));
context.prepareMemberAssignment("member-3", mkAssignment(
mkTopicAssignment(fooTopicId, 5, 6)
));
TargetAssignmentBuilder.TargetAssignmentResult result = context.build();
assertEquals(4, result.records().size());
assertUnorderedListEquals(Arrays.asList(
newConsumerGroupTargetAssignmentRecord("my-group", "member-1", mkAssignment(
mkTopicAssignment(fooTopicId, 1, 2),
mkTopicAssignment(barTopicId, 1, 2, 3)
)),
newConsumerGroupTargetAssignmentRecord("my-group", "member-2", mkAssignment(
mkTopicAssignment(fooTopicId, 3, 4),
mkTopicAssignment(barTopicId, 4, 5, 6)
)),
newConsumerGroupTargetAssignmentRecord("my-group", "member-3", mkAssignment(
mkTopicAssignment(fooTopicId, 5, 6)
))
), result.records().subList(0, 3));
assertEquals(newConsumerGroupTargetAssignmentEpochRecord(
"my-group",
20
), result.records().get(3));
Map<String, MemberAssignment> expectedAssignment = new HashMap<>();
expectedAssignment.put("member-1", new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 1, 2),
mkTopicAssignment(barTopicId, 1, 2, 3)
)));
expectedAssignment.put("member-2", new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 3, 4),
mkTopicAssignment(barTopicId, 4, 5, 6)
)));
expectedAssignment.put("member-3", new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 5, 6)
)));
assertEquals(expectedAssignment, result.targetAssignment());
}
}

View File

@ -0,0 +1,144 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.modern;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.IntStream;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class UnionSetTest {
@Test
public void testSetsCannotBeNull() {
assertThrows(NullPointerException.class, () -> new UnionSet<String>(Collections.emptySet(), null));
assertThrows(NullPointerException.class, () -> new UnionSet<String>(null, Collections.emptySet()));
}
@Test
public void testUnion() {
UnionSet<Integer> union = new UnionSet<>(
Set.of(1, 2, 3),
Set.of(2, 3, 4, 5)
);
List<Integer> result = new ArrayList<>();
result.addAll(union);
result.sort(Integer::compareTo);
assertEquals(List.of(1, 2, 3, 4, 5), result);
}
@Test
public void testSize() {
UnionSet<Integer> union = new UnionSet<>(
Set.of(1, 2, 3),
Set.of(2, 3, 4, 5)
);
assertEquals(5, union.size());
}
@Test
public void testIsEmpty() {
UnionSet<Integer> union = new UnionSet<>(
Set.of(1, 2, 3),
Set.of(2, 3, 4, 5)
);
assertFalse(union.isEmpty());
union = new UnionSet<>(
Set.of(1, 2, 3),
Collections.emptySet()
);
assertFalse(union.isEmpty());
union = new UnionSet<>(
Collections.emptySet(),
Set.of(2, 3, 4, 5)
);
assertFalse(union.isEmpty());
union = new UnionSet<>(
Collections.emptySet(),
Collections.emptySet()
);
assertTrue(union.isEmpty());
}
@Test
public void testContains() {
UnionSet<Integer> union = new UnionSet<>(
Set.of(1, 2, 3),
Set.of(2, 3, 4, 5)
);
IntStream.range(1, 6).forEach(item -> assertTrue(union.contains(item)));
assertFalse(union.contains(0));
assertFalse(union.contains(6));
}
@Test
public void testToArray() {
UnionSet<Integer> union = new UnionSet<>(
Set.of(1, 2, 3),
Set.of(2, 3, 4, 5)
);
Object[] expected = {1, 2, 3, 4, 5};
Object[] actual = union.toArray();
Arrays.sort(actual);
assertArrayEquals(expected, actual);
}
@Test
public void testToArrayWithArrayParameter() {
UnionSet<Integer> union = new UnionSet<>(
Set.of(1, 2, 3),
Set.of(2, 3, 4, 5)
);
Integer[] input = new Integer[5];
Integer[] expected = {1, 2, 3, 4, 5};
union.toArray(input);
Arrays.sort(input);
assertArrayEquals(expected, input);
}
@Test
public void testEquals() {
UnionSet<Integer> union = new UnionSet<>(
Set.of(1, 2, 3),
Set.of(2, 3, 4, 5)
);
assertEquals(Set.of(1, 2, 3, 4, 5), union);
}
}

View File

@ -37,6 +37,7 @@ public class ConsumerGroupBuilder {
private final Map<String, ConsumerGroupMember> members = new HashMap<>();
private final Map<String, Assignment> assignments = new HashMap<>();
private Map<String, TopicMetadata> subscriptionMetadata;
private final Map<String, ResolvedRegularExpression> resolvedRegularExpressions = new HashMap<>();
public ConsumerGroupBuilder(String groupId, int groupEpoch) {
this.groupId = groupId;
@ -49,6 +50,14 @@ public class ConsumerGroupBuilder {
return this;
}
public ConsumerGroupBuilder withResolvedRegularExpression(
String regex,
ResolvedRegularExpression resolvedRegularExpression
) {
this.resolvedRegularExpressions.put(regex, resolvedRegularExpression);
return this;
}
public ConsumerGroupBuilder withSubscriptionMetadata(Map<String, TopicMetadata> subscriptionMetadata) {
this.subscriptionMetadata = subscriptionMetadata;
return this;
@ -72,6 +81,11 @@ public class ConsumerGroupBuilder {
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, member))
);
// Add resolved regular expressions.
resolvedRegularExpressions.forEach((regex, resolvedRegularExpression) ->
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionRecord(groupId, regex, resolvedRegularExpression))
);
// Add subscription metadata.
if (subscriptionMetadata == null) {
subscriptionMetadata = new HashMap<>();

View File

@ -82,7 +82,7 @@ public class TargetAssignmentBuilderBenchmark {
private PartitionAssignor partitionAssignor;
private TargetAssignmentBuilder<ConsumerGroupMember> targetAssignmentBuilder;
private TargetAssignmentBuilder.ConsumerTargetAssignmentBuilder targetAssignmentBuilder;
/** The number of homogeneous subgroups to create for the heterogeneous subscription case. */
private static final int MAX_BUCKET_COUNT = 5;
@ -116,7 +116,7 @@ public class TargetAssignmentBuilderBenchmark {
.setSubscribedTopicNames(allTopicNames)
.build();
targetAssignmentBuilder = new TargetAssignmentBuilder<ConsumerGroupMember>(GROUP_ID, GROUP_EPOCH, partitionAssignor)
targetAssignmentBuilder = new TargetAssignmentBuilder.ConsumerTargetAssignmentBuilder(GROUP_ID, GROUP_EPOCH, partitionAssignor)
.withMembers(members)
.withSubscriptionMetadata(subscriptionMetadata)
.withSubscriptionType(subscriptionType)