KAFKA-17578: Remove partitionRacks from TopicMetadata (#17233)

The ModernGroup#subscribedTopicMetadata takes too much memory due to partitionRacks. This is not being used at the moment as the consumer protocol does not support rack aware assignments.

A heap dump from a group with 500 members, 2K subscribed topic partitions shows 654,400 bytes used for partitionRacks. The rest of the ConsumerGroup object holds 822,860 bytes.

Reviewers: David Jacot <djacot@confluent.io>
This commit is contained in:
PoAn Yang 2024-09-25 15:48:48 +08:00 committed by GitHub
parent a577d30d38
commit bb97d63d41
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
21 changed files with 238 additions and 486 deletions

View File

@ -139,21 +139,10 @@ public class GroupCoordinatorRecordHelpers {
) {
ConsumerGroupPartitionMetadataValue value = new ConsumerGroupPartitionMetadataValue();
newSubscriptionMetadata.forEach((topicName, topicMetadata) -> {
List<ConsumerGroupPartitionMetadataValue.PartitionMetadata> partitionMetadata = new ArrayList<>();
// If the partition rack information map is empty, store an empty list in the record.
if (!topicMetadata.partitionRacks().isEmpty()) {
topicMetadata.partitionRacks().forEach((partition, racks) ->
partitionMetadata.add(new ConsumerGroupPartitionMetadataValue.PartitionMetadata()
.setPartition(partition)
.setRacks(new ArrayList<>(racks))
)
);
}
value.topics().add(new ConsumerGroupPartitionMetadataValue.TopicMetadata()
.setTopicId(topicMetadata.id())
.setTopicName(topicMetadata.name())
.setNumPartitions(topicMetadata.numPartitions())
.setPartitionMetadata(partitionMetadata)
);
});
@ -657,21 +646,10 @@ public class GroupCoordinatorRecordHelpers {
) {
ShareGroupPartitionMetadataValue value = new ShareGroupPartitionMetadataValue();
newSubscriptionMetadata.forEach((topicName, topicMetadata) -> {
List<ShareGroupPartitionMetadataValue.PartitionMetadata> partitionMetadata = new ArrayList<>();
// If the partition rack information map is empty, store an empty list in the record.
if (!topicMetadata.partitionRacks().isEmpty()) {
topicMetadata.partitionRacks().forEach((partition, racks) ->
partitionMetadata.add(new ShareGroupPartitionMetadataValue.PartitionMetadata()
.setPartition(partition)
.setRacks(new ArrayList<>(racks))
)
);
}
value.topics().add(new ShareGroupPartitionMetadataValue.TopicMetadata()
.setTopicId(topicMetadata.id())
.setTopicName(topicMetadata.name())
.setNumPartitions(topicMetadata.numPartitions())
.setPartitionMetadata(partitionMetadata)
);
});

View File

@ -35,7 +35,6 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HETEROGENEOUS;
@ -389,26 +388,11 @@ public abstract class ModernGroup<T extends ModernGroupMember> implements Group
subscribedTopicNames.forEach((topicName, count) -> {
TopicImage topicImage = topicsImage.getTopic(topicName);
if (topicImage != null) {
Map<Integer, Set<String>> partitionRacks = new HashMap<>();
topicImage.partitions().forEach((partition, partitionRegistration) -> {
Set<String> racks = new HashSet<>();
for (int replica : partitionRegistration.replicas) {
Optional<String> rackOptional = clusterImage.broker(replica).rack();
// Only add the rack if it is available for the broker/replica.
rackOptional.ifPresent(racks::add);
}
// If rack information is unavailable for all replicas of this partition,
// no corresponding entry will be stored for it in the map.
if (!racks.isEmpty())
partitionRacks.put(partition, racks);
});
newSubscriptionMetadata.put(topicName, new TopicMetadata(
topicImage.id(),
topicImage.name(),
topicImage.partitions().size(),
partitionRacks)
);
topicImage.partitions().size()
));
}
});

View File

@ -72,8 +72,7 @@ public class SubscribedTopicDescriberImpl implements SubscribedTopicDescriber {
*/
@Override
public Set<String> racksForPartition(Uuid topicId, int partition) {
TopicMetadata topic = this.topicMetadata.get(topicId);
return topic == null ? Collections.emptySet() : topic.partitionRacks().getOrDefault(partition, Collections.emptySet());
return Collections.emptySet();
}
@Override

View File

@ -20,12 +20,7 @@ import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataValue;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
/**
* Immutable topic metadata.
@ -46,17 +41,10 @@ public class TopicMetadata {
*/
private final int numPartitions;
/**
* Map of every partition Id to a set of its rack Ids, if they exist.
* If rack information is unavailable for all partitions, this is an empty map.
*/
private final Map<Integer, Set<String>> partitionRacks;
public TopicMetadata(
Uuid id,
String name,
int numPartitions,
Map<Integer, Set<String>> partitionRacks
int numPartitions
) {
this.id = Objects.requireNonNull(id);
if (Uuid.ZERO_UUID.equals(id)) {
@ -70,7 +58,6 @@ public class TopicMetadata {
if (numPartitions < 0) {
throw new IllegalArgumentException("Number of partitions cannot be negative.");
}
this.partitionRacks = Objects.requireNonNull(partitionRacks);
}
/**
@ -94,14 +81,6 @@ public class TopicMetadata {
return this.numPartitions;
}
/**
* @return Every partition mapped to the set of corresponding available rack Ids of its replicas.
* An empty map is returned if rack information is unavailable for all partitions.
*/
public Map<Integer, Set<String>> partitionRacks() {
return this.partitionRacks;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
@ -111,8 +90,7 @@ public class TopicMetadata {
if (!id.equals(that.id)) return false;
if (!name.equals(that.name)) return false;
if (numPartitions != that.numPartitions) return false;
return partitionRacks.equals(that.partitionRacks);
return numPartitions == that.numPartitions;
}
@Override
@ -120,7 +98,6 @@ public class TopicMetadata {
int result = id.hashCode();
result = 31 * result + name.hashCode();
result = 31 * result + numPartitions;
result = 31 * result + partitionRacks.hashCode();
return result;
}
@ -130,45 +107,26 @@ public class TopicMetadata {
"id=" + id +
", name=" + name +
", numPartitions=" + numPartitions +
", partitionRacks=" + partitionRacks +
')';
}
public static TopicMetadata fromRecord(
ConsumerGroupPartitionMetadataValue.TopicMetadata record
) {
// Converting the data type from a list stored in the record to a map for the topic metadata.
Map<Integer, Set<String>> partitionRacks = new HashMap<>();
for (ConsumerGroupPartitionMetadataValue.PartitionMetadata partitionMetadata : record.partitionMetadata()) {
partitionRacks.put(
partitionMetadata.partition(),
Collections.unmodifiableSet(new HashSet<>(partitionMetadata.racks()))
);
}
return new TopicMetadata(
record.topicId(),
record.topicName(),
record.numPartitions(),
partitionRacks);
record.numPartitions()
);
}
public static TopicMetadata fromRecord(
ShareGroupPartitionMetadataValue.TopicMetadata record
ShareGroupPartitionMetadataValue.TopicMetadata record
) {
// Converting the data type from a list stored in the record to a map for the topic metadata.
Map<Integer, Set<String>> partitionRacks = new HashMap<>();
for (ShareGroupPartitionMetadataValue.PartitionMetadata partitionMetadata : record.partitionMetadata()) {
partitionRacks.put(
partitionMetadata.partition(),
Collections.unmodifiableSet(new HashSet<>(partitionMetadata.racks()))
);
}
return new TopicMetadata(
record.topicId(),
record.topicName(),
record.numPartitions(),
partitionRacks);
record.topicId(),
record.topicName(),
record.numPartitions()
);
}
}

View File

@ -29,7 +29,7 @@
{ "name": "NumPartitions", "versions": "0+", "type": "int32",
"about": "The number of partitions of the topic." },
{ "name": "PartitionMetadata", "versions": "0+", "type": "[]PartitionMetadata",
"about": "Partitions mapped to a set of racks. If the rack information is unavailable for all the partitions, an empty list is stored", "fields": [
"about": "Deprecated: this field is not used after 4.0. Partitions mapped to a set of racks. If the rack information is unavailable for all the partitions, an empty list is stored", "fields": [
{ "name": "Partition", "versions": "0+", "type": "int32",
"about": "The partition number." },
{ "name": "Racks", "versions": "0+", "type": "[]string",

View File

@ -165,14 +165,12 @@ public class GroupCoordinatorRecordHelpersTest {
subscriptionMetadata.put("foo", new TopicMetadata(
fooTopicId,
"foo",
10,
mkMapOfPartitionRacks(10)
10
));
subscriptionMetadata.put("bar", new TopicMetadata(
barTopicId,
"bar",
20,
mkMapOfPartitionRacks(20)
20
));
CoordinatorRecord expectedRecord = new CoordinatorRecord(
@ -187,13 +185,11 @@ public class GroupCoordinatorRecordHelpersTest {
new ConsumerGroupPartitionMetadataValue.TopicMetadata()
.setTopicId(fooTopicId)
.setTopicName("foo")
.setNumPartitions(10)
.setPartitionMetadata(mkListOfPartitionRacks(10)),
.setNumPartitions(10),
new ConsumerGroupPartitionMetadataValue.TopicMetadata()
.setTopicId(barTopicId)
.setTopicName("bar")
.setNumPartitions(20)
.setPartitionMetadata(mkListOfPartitionRacks(20)))),
.setNumPartitions(20))),
(short) 0));
assertRecordEquals(expectedRecord, newConsumerGroupSubscriptionMetadataRecord(
@ -226,14 +222,12 @@ public class GroupCoordinatorRecordHelpersTest {
subscriptionMetadata.put("foo", new TopicMetadata(
fooTopicId,
"foo",
10,
Collections.emptyMap()
10
));
subscriptionMetadata.put("bar", new TopicMetadata(
barTopicId,
"bar",
20,
Collections.emptyMap()
20
));
CoordinatorRecord expectedRecord = new CoordinatorRecord(
@ -248,13 +242,11 @@ public class GroupCoordinatorRecordHelpersTest {
new ConsumerGroupPartitionMetadataValue.TopicMetadata()
.setTopicId(fooTopicId)
.setTopicName("foo")
.setNumPartitions(10)
.setPartitionMetadata(Collections.emptyList()),
.setNumPartitions(10),
new ConsumerGroupPartitionMetadataValue.TopicMetadata()
.setTopicId(barTopicId)
.setTopicName("bar")
.setNumPartitions(20)
.setPartitionMetadata(Collections.emptyList()))),
.setNumPartitions(20))),
(short) 0));
assertRecordEquals(expectedRecord, newConsumerGroupSubscriptionMetadataRecord(
@ -821,28 +813,6 @@ public class GroupCoordinatorRecordHelpersTest {
assertEquals(expectedRecord, record);
}
/**
* Creates a list of values to be added to the record and assigns partitions to racks for testing.
*
* @param numPartitions The number of partitions for the topic.
*
* For testing purposes, the following criteria are used:
* - Number of replicas for each partition: 2
* - Number of racks available to the cluster: 4
*/
public static List<ConsumerGroupPartitionMetadataValue.PartitionMetadata> mkListOfPartitionRacks(int numPartitions) {
List<ConsumerGroupPartitionMetadataValue.PartitionMetadata> partitionRacks = new ArrayList<>(numPartitions);
for (int i = 0; i < numPartitions; i++) {
List<String> racks = new ArrayList<>(Arrays.asList("rack" + i % 4, "rack" + (i + 1) % 4));
partitionRacks.add(
new ConsumerGroupPartitionMetadataValue.PartitionMetadata()
.setPartition(i)
.setRacks(racks)
);
}
return partitionRacks;
}
/**
* Creates a map of partitions to racks for testing.
*

View File

@ -125,7 +125,6 @@ import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
import static org.apache.kafka.coordinator.group.GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG;
import static org.apache.kafka.coordinator.group.GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG;
import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpersTest.mkMapOfPartitionRacks;
import static org.apache.kafka.coordinator.group.GroupMetadataManager.EMPTY_RESULT;
import static org.apache.kafka.coordinator.group.GroupMetadataManager.appendGroupMetadataErrorToResponseError;
import static org.apache.kafka.coordinator.group.GroupMetadataManager.classicGroupHeartbeatKey;
@ -501,8 +500,8 @@ public class GroupMetadataManagerTest {
List<CoordinatorRecord> expectedRecords = Arrays.asList(
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember),
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() {{
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6)));
put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3, mkMapOfPartitionRacks(3)));
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6));
put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3));
}}),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId, mkAssignment(
@ -600,8 +599,8 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember),
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() {
{
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6)));
put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3, mkMapOfPartitionRacks(3)));
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6));
put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3));
}
}),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11),
@ -725,12 +724,6 @@ public class GroupMetadataManagerTest {
List<CoordinatorRecord> expectedRecords = Arrays.asList(
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember3),
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() {
{
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6)));
put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3, mkMapOfPartitionRacks(3)));
}
}),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1),
@ -748,9 +741,9 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember3)
);
assertRecordsEquals(expectedRecords.subList(0, 3), result.records().subList(0, 3));
assertUnorderedListEquals(expectedRecords.subList(3, 6), result.records().subList(3, 6));
assertRecordsEquals(expectedRecords.subList(6, 8), result.records().subList(6, 8));
assertRecordsEquals(expectedRecords.subList(0, 2), result.records().subList(0, 2));
assertUnorderedListEquals(expectedRecords.subList(2, 5), result.records().subList(2, 5));
assertRecordsEquals(expectedRecords.subList(5, 7), result.records().subList(5, 7));
}
@Test
@ -837,8 +830,8 @@ public class GroupMetadataManagerTest {
// Subscription metadata is recomputed because zar is no longer there.
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() {
{
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6)));
put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3, mkMapOfPartitionRacks(3)));
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6));
put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3));
}
}),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11)
@ -961,12 +954,6 @@ public class GroupMetadataManagerTest {
List<CoordinatorRecord> expectedRecords = Arrays.asList(
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember3),
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() {
{
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6)));
put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3, mkMapOfPartitionRacks(3)));
}
}),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1),
@ -984,9 +971,9 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember3)
);
assertRecordsEquals(expectedRecords.subList(0, 3), result.records().subList(0, 3));
assertUnorderedListEquals(expectedRecords.subList(3, 6), result.records().subList(3, 6));
assertRecordsEquals(expectedRecords.subList(6, 8), result.records().subList(6, 8));
assertRecordsEquals(expectedRecords.subList(0, 2), result.records().subList(0, 2));
assertUnorderedListEquals(expectedRecords.subList(2, 5), result.records().subList(2, 5));
assertRecordsEquals(expectedRecords.subList(5, 7), result.records().subList(5, 7));
}
@Test
@ -1052,8 +1039,8 @@ public class GroupMetadataManagerTest {
.withAssignmentEpoch(10)
.withSubscriptionMetadata(new HashMap<String, TopicMetadata>() {
{
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6)));
put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3, mkMapOfPartitionRacks(3)));
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6));
put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3));
}
}))
.build();
@ -1225,7 +1212,7 @@ public class GroupMetadataManagerTest {
.withAssignmentEpoch(10)
.withSubscriptionMetadata(new HashMap<String, TopicMetadata>() {
{
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6)));
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6));
}
}))
.build();
@ -1343,8 +1330,8 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedRejoinedMember),
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() {
{
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6)));
put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3, mkMapOfPartitionRacks(3)));
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6));
put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3));
}
}),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11),
@ -1538,8 +1525,8 @@ public class GroupMetadataManagerTest {
// Subscription metadata is recomputed because zar is no longer there.
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() {
{
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6)));
put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3, mkMapOfPartitionRacks(3)));
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6));
put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3));
}
}),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11)
@ -2597,7 +2584,7 @@ public class GroupMetadataManagerTest {
{
// foo only has 3 partitions stored in the metadata but foo has
// 6 partitions the metadata image.
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 3, mkMapOfPartitionRacks(3)));
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 3));
}
}))
.build();
@ -2651,7 +2638,7 @@ public class GroupMetadataManagerTest {
List<CoordinatorRecord> expectedRecords = Arrays.asList(
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() {
{
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6)));
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6));
}
}),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11),
@ -2708,7 +2695,7 @@ public class GroupMetadataManagerTest {
{
// foo only has 3 partitions stored in the metadata but foo has
// 6 partitions the metadata image.
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 3, mkMapOfPartitionRacks(3)));
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 3));
}
}))
.build();
@ -2780,7 +2767,7 @@ public class GroupMetadataManagerTest {
List<CoordinatorRecord> expectedRecords = Arrays.asList(
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() {
{
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6)));
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6));
}
}),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11),
@ -9845,8 +9832,8 @@ public class GroupMetadataManagerTest {
// The subscription metadata hasn't been updated during the conversion, so a new one is computed.
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() {
{
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 1, mkMapOfPartitionRacks(1)));
put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1, mkMapOfPartitionRacks(1)));
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 1));
put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1));
}
}),
@ -10078,8 +10065,8 @@ public class GroupMetadataManagerTest {
// The subscription metadata hasn't been updated during the conversion, so a new one is computed.
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() {
{
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2)));
put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1, mkMapOfPartitionRacks(1)));
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2));
put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1));
}
}),
@ -10241,7 +10228,7 @@ public class GroupMetadataManagerTest {
// The subscription metadata hasn't been updated during the conversion, so a new one is computed.
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() {
{
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 1, mkMapOfPartitionRacks(1)));
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 1));
}
}),
@ -10345,8 +10332,8 @@ public class GroupMetadataManagerTest {
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() {
{
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6)));
put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3, mkMapOfPartitionRacks(3)));
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6));
put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3));
}
}));
@ -10632,8 +10619,8 @@ public class GroupMetadataManagerTest {
// The subscription metadata hasn't been updated during the conversion, so a new one is computed.
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() {
{
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2)));
put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1, mkMapOfPartitionRacks(1)));
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2));
put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1));
}
}),
@ -10846,8 +10833,8 @@ public class GroupMetadataManagerTest {
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() {
{
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6)));
put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3, mkMapOfPartitionRacks(3)));
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6));
put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3));
}
}));
@ -11034,8 +11021,8 @@ public class GroupMetadataManagerTest {
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() {
{
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6)));
put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3, mkMapOfPartitionRacks(3)));
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6));
put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3));
}
}));
@ -11221,9 +11208,9 @@ public class GroupMetadataManagerTest {
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() {
{
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6)));
put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3, mkMapOfPartitionRacks(3)));
put(zarTopicName, new TopicMetadata(zarTopicId, zarTopicName, 1, mkMapOfPartitionRacks(1)));
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6));
put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3));
put(zarTopicName, new TopicMetadata(zarTopicId, zarTopicName, 1));
}
}));
@ -11428,7 +11415,7 @@ public class GroupMetadataManagerTest {
.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
.withSubscriptionMetadata(new HashMap<String, TopicMetadata>() {
{
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2)));
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2));
}
})
.withMember(new ConsumerGroupMember.Builder(memberId)
@ -11514,8 +11501,8 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember),
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() {
{
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2)));
put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1, mkMapOfPartitionRacks(1)));
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2));
put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1));
}
}),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11),
@ -11577,7 +11564,7 @@ public class GroupMetadataManagerTest {
.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
.withSubscriptionMetadata(new HashMap<String, TopicMetadata>() {
{
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2)));
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2));
}
})
.withMember(new ConsumerGroupMember.Builder(memberId)
@ -11633,7 +11620,7 @@ public class GroupMetadataManagerTest {
.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
.withSubscriptionMetadata(new HashMap<String, TopicMetadata>() {
{
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2)));
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2));
}
})
.withMember(new ConsumerGroupMember.Builder(memberId)
@ -11684,8 +11671,8 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember),
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() {
{
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2)));
put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1, mkMapOfPartitionRacks(1)));
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2));
put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1));
}
}),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11),
@ -11728,7 +11715,7 @@ public class GroupMetadataManagerTest {
.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
.withSubscriptionMetadata(new HashMap<String, TopicMetadata>() {
{
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2)));
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2));
}
})
.withMember(new ConsumerGroupMember.Builder(memberId)
@ -11850,9 +11837,9 @@ public class GroupMetadataManagerTest {
.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
.withSubscriptionMetadata(new HashMap<String, TopicMetadata>() {
{
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2)));
put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1, mkMapOfPartitionRacks(1)));
put(zarTopicName, new TopicMetadata(zarTopicId, zarTopicName, 1, mkMapOfPartitionRacks(1)));
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2));
put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1));
put(zarTopicName, new TopicMetadata(zarTopicId, zarTopicName, 1));
}
})
.withMember(new ConsumerGroupMember.Builder(memberId1)
@ -12080,8 +12067,8 @@ public class GroupMetadataManagerTest {
.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
.withSubscriptionMetadata(new HashMap<String, TopicMetadata>() {
{
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2)));
put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1, mkMapOfPartitionRacks(1)));
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2));
put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1));
}
})
.withMember(new ConsumerGroupMember.Builder(memberId1)
@ -12182,9 +12169,9 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1),
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() {
{
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2)));
put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1, mkMapOfPartitionRacks(1)));
put(zarTopicName, new TopicMetadata(zarTopicId, zarTopicName, 1, mkMapOfPartitionRacks(1)));
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2));
put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1));
put(zarTopicName, new TopicMetadata(zarTopicId, zarTopicName, 1));
}
}),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11),
@ -12322,8 +12309,8 @@ public class GroupMetadataManagerTest {
.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
.withSubscriptionMetadata(new HashMap<String, TopicMetadata>() {
{
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2)));
put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1, mkMapOfPartitionRacks(1)));
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2));
put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1));
}
})
.withMember(new ConsumerGroupMember.Builder(memberId1)
@ -12424,9 +12411,9 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1),
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() {
{
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2)));
put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1, mkMapOfPartitionRacks(1)));
put(zarTopicName, new TopicMetadata(zarTopicId, zarTopicName, 1, mkMapOfPartitionRacks(1)));
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2));
put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1));
put(zarTopicName, new TopicMetadata(zarTopicId, zarTopicName, 1));
}
}),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11),
@ -13422,8 +13409,8 @@ public class GroupMetadataManagerTest {
context.groupMetadataManager.consumerGroup(groupId).setMetadataRefreshDeadline(Long.MAX_VALUE, 10);
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() {
{
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2)));
put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1, mkMapOfPartitionRacks(1)));
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2));
put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1));
}
}));
@ -13594,8 +13581,8 @@ public class GroupMetadataManagerTest {
context.groupMetadataManager.consumerGroup(groupId).setMetadataRefreshDeadline(Long.MAX_VALUE, 10);
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() {
{
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2)));
put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1, mkMapOfPartitionRacks(1)));
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2));
put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1));
}
}));
@ -13627,7 +13614,7 @@ public class GroupMetadataManagerTest {
// Update the subscription metadata.
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() {
{
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2)));
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2));
}
}),
// Bump the group epoch.
@ -13988,8 +13975,8 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newShareGroupMemberSubscriptionRecord(groupId, expectedMember),
GroupCoordinatorRecordHelpers.newShareGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() {
{
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6)));
put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3, mkMapOfPartitionRacks(3)));
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6));
put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3));
}
}),
GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 1),
@ -14082,8 +14069,8 @@ public class GroupMetadataManagerTest {
// Subscription metadata is recomputed because zar is no longer there.
GroupCoordinatorRecordHelpers.newShareGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() {
{
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6)));
put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3, mkMapOfPartitionRacks(3)));
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6));
put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3));
}
}),
GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 11)
@ -14348,7 +14335,7 @@ public class GroupMetadataManagerTest {
Map<String, TopicMetadata> metadata = Collections.singletonMap(
"bar",
new TopicMetadata(Uuid.randomUuid(), "bar", 10, Collections.emptyMap())
new TopicMetadata(Uuid.randomUuid(), "bar", 10)
);
// The group is created if it does not exist.

View File

@ -44,7 +44,6 @@ import static org.apache.kafka.coordinator.group.AssignmentTestUtil.invertedTarg
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkOrderedAssignment;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpersTest.mkMapOfPartitionRacks;
import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
@ -70,8 +69,7 @@ public class OptimizedUniformAssignmentBuilderTest {
new TopicMetadata(
topic1Uuid,
topic1Name,
3,
mkMapOfPartitionRacks(3)
3
)
)
);
@ -108,8 +106,7 @@ public class OptimizedUniformAssignmentBuilderTest {
new TopicMetadata(
topic1Uuid,
topic1Name,
3,
mkMapOfPartitionRacks(3)
3
)
)
);
@ -140,14 +137,12 @@ public class OptimizedUniformAssignmentBuilderTest {
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
3,
mkMapOfPartitionRacks(3)
3
));
topicMetadata.put(topic3Uuid, new TopicMetadata(
topic3Uuid,
topic3Name,
2,
mkMapOfPartitionRacks(2)
2
));
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
@ -197,8 +192,7 @@ public class OptimizedUniformAssignmentBuilderTest {
topicMetadata.put(topic3Uuid, new TopicMetadata(
topic3Uuid,
topic3Name,
2,
mkMapOfPartitionRacks(2)
2
));
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
@ -260,8 +254,7 @@ public class OptimizedUniformAssignmentBuilderTest {
topicMetadata.put(topicId, new TopicMetadata(
topicId,
"topic-" + i,
3,
mkMapOfPartitionRacks(3)
3
));
}
@ -296,14 +289,12 @@ public class OptimizedUniformAssignmentBuilderTest {
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
3,
mkMapOfPartitionRacks(3)
3
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
3,
mkMapOfPartitionRacks(3)
3
));
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
@ -361,14 +352,12 @@ public class OptimizedUniformAssignmentBuilderTest {
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
6,
mkMapOfPartitionRacks(6)
6
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
5,
mkMapOfPartitionRacks(5)
5
));
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
@ -425,14 +414,12 @@ public class OptimizedUniformAssignmentBuilderTest {
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
3,
mkMapOfPartitionRacks(3)
3
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
3,
mkMapOfPartitionRacks(3)
3
));
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
@ -499,14 +486,12 @@ public class OptimizedUniformAssignmentBuilderTest {
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
3,
mkMapOfPartitionRacks(3)
3
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
3,
mkMapOfPartitionRacks(3)
3
));
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
@ -565,14 +550,12 @@ public class OptimizedUniformAssignmentBuilderTest {
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
2,
mkMapOfPartitionRacks(2)
2
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
2,
mkMapOfPartitionRacks(2)
2
));
// Initial subscriptions were [T1, T2]

View File

@ -68,8 +68,7 @@ public class RangeAssignorTest {
new TopicMetadata(
topic1Uuid,
topic1Name,
3,
Collections.emptyMap()
3
)
)
);
@ -111,8 +110,7 @@ public class RangeAssignorTest {
new TopicMetadata(
topic1Uuid,
topic1Name,
3,
Collections.emptyMap()
3
)
)
);
@ -143,14 +141,12 @@ public class RangeAssignorTest {
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
3,
Collections.emptyMap()
3
));
topicMetadata.put(topic3Uuid, new TopicMetadata(
topic3Uuid,
topic3Name,
2,
Collections.emptyMap()
2
));
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
@ -200,20 +196,17 @@ public class RangeAssignorTest {
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
3,
Collections.emptyMap()
3
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
3,
Collections.emptyMap()
3
));
topicMetadata.put(topic3Uuid, new TopicMetadata(
topic3Uuid,
topic3Name,
2,
Collections.emptyMap()
2
));
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
@ -273,14 +266,12 @@ public class RangeAssignorTest {
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
3,
Collections.emptyMap()
3
));
topicMetadata.put(topic3Uuid, new TopicMetadata(
topic3Uuid,
topic3Name,
2,
Collections.emptyMap()
2
));
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
@ -343,8 +334,7 @@ public class RangeAssignorTest {
new TopicMetadata(
topic1Uuid,
topic1Name,
3,
Collections.emptyMap()
3
)
)
);
@ -414,8 +404,7 @@ public class RangeAssignorTest {
new TopicMetadata(
topic1Uuid,
topic1Name,
5,
Collections.emptyMap()
5
)
)
);
@ -497,14 +486,12 @@ public class RangeAssignorTest {
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
2,
Collections.emptyMap()
2
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
2,
Collections.emptyMap()
2
));
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
@ -571,14 +558,12 @@ public class RangeAssignorTest {
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
4,
Collections.emptyMap()
4
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
4,
Collections.emptyMap()
4
));
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
@ -634,14 +619,12 @@ public class RangeAssignorTest {
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
3,
Collections.emptyMap()
3
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
3,
Collections.emptyMap()
3
));
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
@ -710,14 +693,12 @@ public class RangeAssignorTest {
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
4,
Collections.emptyMap()
4
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
3,
Collections.emptyMap()
3
));
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
@ -784,14 +765,12 @@ public class RangeAssignorTest {
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
3,
Collections.emptyMap()
3
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
3,
Collections.emptyMap()
3
));
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
@ -835,20 +814,17 @@ public class RangeAssignorTest {
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
3,
Collections.emptyMap()
3
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
3,
Collections.emptyMap()
3
));
topicMetadata.put(topic3Uuid, new TopicMetadata(
topic3Uuid,
topic3Name,
2,
Collections.emptyMap()
2
));
// Let initial subscriptions be A -> T1, T2 // B -> T2 // C -> T2, T3

View File

@ -88,8 +88,7 @@ public class SimpleAssignorTest {
new TopicMetadata(
TOPIC_1_UUID,
TOPIC_1_NAME,
3,
Collections.emptyMap()
3
)
)
);
@ -126,8 +125,7 @@ public class SimpleAssignorTest {
new TopicMetadata(
TOPIC_1_UUID,
TOPIC_1_NAME,
3,
Collections.emptyMap()
3
)
)
);
@ -158,14 +156,12 @@ public class SimpleAssignorTest {
topicMetadata.put(TOPIC_1_UUID, new TopicMetadata(
TOPIC_1_UUID,
TOPIC_1_NAME,
3,
Collections.emptyMap()
3
));
topicMetadata.put(TOPIC_3_UUID, new TopicMetadata(
TOPIC_3_UUID,
TOPIC_3_NAME,
2,
Collections.emptyMap()
2
));
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
@ -215,21 +211,18 @@ public class SimpleAssignorTest {
topicMetadata.put(TOPIC_1_UUID, new TopicMetadata(
TOPIC_1_UUID,
TOPIC_1_NAME,
3,
Collections.emptyMap()
3
));
topicMetadata.put(TOPIC_2_UUID, new TopicMetadata(
TOPIC_2_UUID,
"topic2",
3,
Collections.emptyMap()
3
));
topicMetadata.put(TOPIC_3_UUID, new TopicMetadata(
TOPIC_3_UUID,
TOPIC_3_NAME,
2,
Collections.emptyMap()
2
));
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
@ -289,15 +282,13 @@ public class SimpleAssignorTest {
topicMetadata.put(TOPIC_1_UUID, new TopicMetadata(
TOPIC_1_UUID,
TOPIC_1_NAME,
3,
Collections.emptyMap()
3
));
topicMetadata.put(TOPIC_2_UUID, new TopicMetadata(
TOPIC_2_UUID,
"topic2",
2,
Collections.emptyMap()
2
));
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();

View File

@ -40,7 +40,6 @@ import static org.apache.kafka.coordinator.group.AssignmentTestUtil.assertAssign
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.invertedTargetAssignment;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpersTest.mkMapOfPartitionRacks;
import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HETEROGENEOUS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
@ -67,8 +66,7 @@ public class UniformHeterogeneousAssignmentBuilderTest {
new TopicMetadata(
topic1Uuid,
topic1Name,
3,
mkMapOfPartitionRacks(3)
3
)
)
);
@ -109,8 +107,7 @@ public class UniformHeterogeneousAssignmentBuilderTest {
new TopicMetadata(
topic1Uuid,
topic1Name,
3,
mkMapOfPartitionRacks(3)
3
)
)
);
@ -146,14 +143,12 @@ public class UniformHeterogeneousAssignmentBuilderTest {
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
3,
mkMapOfPartitionRacks(3)
3
));
topicMetadata.put(topic3Uuid, new TopicMetadata(
topic3Uuid,
topic3Name,
6,
mkMapOfPartitionRacks(6)
6
));
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
@ -202,14 +197,12 @@ public class UniformHeterogeneousAssignmentBuilderTest {
topicMetadata.put(topic3Uuid, new TopicMetadata(
topic3Uuid,
topic3Name,
1,
mkMapOfPartitionRacks(1)
1
));
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
2,
mkMapOfPartitionRacks(2)
2
));
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
@ -268,20 +261,17 @@ public class UniformHeterogeneousAssignmentBuilderTest {
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
6,
mkMapOfPartitionRacks(6)
6
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
4,
mkMapOfPartitionRacks(4)
4
));
topicMetadata.put(topic3Uuid, new TopicMetadata(
topic3Uuid,
topic3Name,
4,
mkMapOfPartitionRacks(4)
4
));
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
@ -350,26 +340,22 @@ public class UniformHeterogeneousAssignmentBuilderTest {
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
6,
mkMapOfPartitionRacks(6)
6
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
5,
mkMapOfPartitionRacks(5)
5
));
topicMetadata.put(topic3Uuid, new TopicMetadata(
topic3Uuid,
topic3Name,
3,
mkMapOfPartitionRacks(3)
3
));
topicMetadata.put(topic4Uuid, new TopicMetadata(
topic4Uuid,
topic4Name,
3,
mkMapOfPartitionRacks(3)
3
));
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
@ -426,14 +412,12 @@ public class UniformHeterogeneousAssignmentBuilderTest {
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
6,
mkMapOfPartitionRacks(6)
6
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
7,
mkMapOfPartitionRacks(7)
7
));
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
@ -499,20 +483,17 @@ public class UniformHeterogeneousAssignmentBuilderTest {
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
3,
mkMapOfPartitionRacks(3)
3
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
8,
mkMapOfPartitionRacks(4)
8
));
topicMetadata.put(topic3Uuid, new TopicMetadata(
topic3Uuid,
topic3Name,
3,
mkMapOfPartitionRacks(3)
3
));
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
@ -568,14 +549,12 @@ public class UniformHeterogeneousAssignmentBuilderTest {
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
3,
mkMapOfPartitionRacks(3)
3
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
5,
mkMapOfPartitionRacks(5)
5
));
// Initial subscriptions were [T1, T2]
@ -630,8 +609,7 @@ public class UniformHeterogeneousAssignmentBuilderTest {
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
3,
mkMapOfPartitionRacks(3)
3
));
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();

View File

@ -21,12 +21,9 @@ import org.apache.kafka.common.Uuid;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpersTest.mkMapOfPartitionRacks;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
@ -42,10 +39,9 @@ public class SubscribedTopicMetadataTest {
for (int i = 0; i < 5; i++) {
Uuid topicId = Uuid.randomUuid();
String topicName = "topic" + i;
Map<Integer, Set<String>> partitionRacks = mkMapOfPartitionRacks(5);
topicMetadataMap.put(
topicId,
new TopicMetadata(topicId, topicName, 5, partitionRacks)
new TopicMetadata(topicId, topicName, 5)
);
}
subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadataMap);
@ -68,44 +64,19 @@ public class SubscribedTopicMetadataTest {
// Test -1 is returned when the topic Id doesn't exist.
assertEquals(-1, subscribedTopicMetadata.numPartitions(topicId));
topicMetadataMap.put(topicId, new TopicMetadata(topicId, "topic6", 3, Collections.emptyMap()));
topicMetadataMap.put(topicId, new TopicMetadata(topicId, "topic6", 3));
// Test that the correct number of partitions are returned for a given topic Id.
assertEquals(3, subscribedTopicMetadata.numPartitions(topicId));
}
@Test
public void testRacksForPartition() {
Uuid topicId = Uuid.randomUuid();
// Test that an empty set is returned for a non-existent topic Id.
assertEquals(Collections.emptySet(), subscribedTopicMetadata.racksForPartition(topicId, 0));
// Add topic Id with partition racks included.
Map<Integer, Set<String>> partitionRacks = mkMapOfPartitionRacks(3);
topicMetadataMap.put(topicId, new TopicMetadata(topicId, "topic6", 3, partitionRacks));
// Test that an empty set is returned for a non-existent partition Id.
assertEquals(Collections.emptySet(), subscribedTopicMetadata.racksForPartition(topicId, 4));
// Test that a correct set of racks is returned for the given topic Id and partition Id.
assertEquals(partitionRacks.get(2), subscribedTopicMetadata.racksForPartition(topicId, 2));
// Add another topic Id without partition racks.
topicId = Uuid.randomUuid();
topicMetadataMap.put(topicId, new TopicMetadata(topicId, "topic6", 3, Collections.emptyMap()));
// Test that an empty set is returned when the partition rack info is absent.
assertEquals(Collections.emptySet(), subscribedTopicMetadata.racksForPartition(topicId, 1));
}
@Test
public void testEquals() {
assertEquals(new SubscribedTopicDescriberImpl(topicMetadataMap), subscribedTopicMetadata);
Map<Uuid, TopicMetadata> topicMetadataMap2 = new HashMap<>();
Uuid topicId = Uuid.randomUuid();
topicMetadataMap2.put(topicId, new TopicMetadata(topicId, "newTopic", 5, Collections.emptyMap()));
topicMetadataMap2.put(topicId, new TopicMetadata(topicId, "newTopic", 5));
assertNotEquals(new SubscribedTopicDescriberImpl(topicMetadataMap2), subscribedTopicMetadata);
}
}

View File

@ -43,7 +43,6 @@ import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord;
import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord;
import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpersTest.mkMapOfPartitionRacks;
import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS;
import static org.apache.kafka.coordinator.group.modern.TargetAssignmentBuilder.createMemberSubscriptionAndAssignment;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -102,15 +101,13 @@ public class TargetAssignmentBuilderTest {
public Uuid addTopicMetadata(
String topicName,
int numPartitions,
Map<Integer, Set<String>> partitionRacks
int numPartitions
) {
Uuid topicId = Uuid.randomUuid();
subscriptionMetadata.put(topicName, new TopicMetadata(
topicId,
topicName,
numPartitions,
partitionRacks
numPartitions
));
topicsImageBuilder = topicsImageBuilder.addTopic(topicId, topicName, numPartitions);
@ -314,8 +311,8 @@ public class TargetAssignmentBuilderTest {
20
);
Uuid fooTopicId = context.addTopicMetadata("foo", 6, Collections.emptyMap());
Uuid barTopicId = context.addTopicMetadata("bar", 6, Collections.emptyMap());
Uuid fooTopicId = context.addTopicMetadata("foo", 6);
Uuid barTopicId = context.addTopicMetadata("bar", 6);
context.addGroupMember("member-1", Arrays.asList("foo", "bar", "zar"), mkAssignment(
mkTopicAssignment(fooTopicId, 1, 2, 3),
@ -364,8 +361,8 @@ public class TargetAssignmentBuilderTest {
20
);
Uuid fooTopicId = context.addTopicMetadata("foo", 6, Collections.emptyMap());
Uuid barTopicId = context.addTopicMetadata("bar", 6, Collections.emptyMap());
Uuid fooTopicId = context.addTopicMetadata("foo", 6);
Uuid barTopicId = context.addTopicMetadata("bar", 6);
context.addGroupMember("member-1", Arrays.asList("foo", "bar", "zar"), mkAssignment(
mkTopicAssignment(fooTopicId, 1, 2, 3),
@ -427,8 +424,8 @@ public class TargetAssignmentBuilderTest {
20
);
Uuid fooTopicId = context.addTopicMetadata("foo", 6, Collections.emptyMap());
Uuid barTopicId = context.addTopicMetadata("bar", 6, Collections.emptyMap());
Uuid fooTopicId = context.addTopicMetadata("foo", 6);
Uuid barTopicId = context.addTopicMetadata("bar", 6);
context.addGroupMember("member-1", Arrays.asList("foo", "bar", "zar"), mkAssignment(
mkTopicAssignment(fooTopicId, 1, 2, 3),
@ -505,8 +502,8 @@ public class TargetAssignmentBuilderTest {
20
);
Uuid fooTopicId = context.addTopicMetadata("foo", 6, Collections.emptyMap());
Uuid barTopicId = context.addTopicMetadata("bar", 6, Collections.emptyMap());
Uuid fooTopicId = context.addTopicMetadata("foo", 6);
Uuid barTopicId = context.addTopicMetadata("bar", 6);
context.addGroupMember("member-1", Arrays.asList("foo", "bar", "zar"), mkAssignment(
mkTopicAssignment(fooTopicId, 1, 2, 3),
@ -592,8 +589,8 @@ public class TargetAssignmentBuilderTest {
20
);
Uuid fooTopicId = context.addTopicMetadata("foo", 6, mkMapOfPartitionRacks(6));
Uuid barTopicId = context.addTopicMetadata("bar", 6, mkMapOfPartitionRacks(6));
Uuid fooTopicId = context.addTopicMetadata("foo", 6);
Uuid barTopicId = context.addTopicMetadata("bar", 6);
context.addGroupMember("member-1", Arrays.asList("foo", "bar", "zar"), mkAssignment(
mkTopicAssignment(fooTopicId, 1, 2),
@ -670,8 +667,8 @@ public class TargetAssignmentBuilderTest {
20
);
Uuid fooTopicId = context.addTopicMetadata("foo", 6, Collections.emptyMap());
Uuid barTopicId = context.addTopicMetadata("bar", 6, Collections.emptyMap());
Uuid fooTopicId = context.addTopicMetadata("foo", 6);
Uuid barTopicId = context.addTopicMetadata("bar", 6);
context.addGroupMember("member-1", Arrays.asList("foo", "bar", "zar"), mkAssignment(
mkTopicAssignment(fooTopicId, 1, 2),
@ -740,8 +737,8 @@ public class TargetAssignmentBuilderTest {
20
);
Uuid fooTopicId = context.addTopicMetadata("foo", 6, Collections.emptyMap());
Uuid barTopicId = context.addTopicMetadata("bar", 6, Collections.emptyMap());
Uuid fooTopicId = context.addTopicMetadata("foo", 6);
Uuid barTopicId = context.addTopicMetadata("bar", 6);
context.addGroupMember("member-1", "instance-member-1", Arrays.asList("foo", "bar", "zar"), mkAssignment(
mkTopicAssignment(fooTopicId, 1, 2),

View File

@ -21,11 +21,9 @@ import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetada
import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpersTest.mkListOfPartitionRacks;
import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpersTest.mkMapOfPartitionRacks;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
@ -35,29 +33,27 @@ public class TopicMetadataTest {
@Test
public void testAttributes() {
Uuid topicId = Uuid.randomUuid();
Map<Integer, Set<String>> partitionRacks = mkMapOfPartitionRacks(15);
TopicMetadata topicMetadata = new TopicMetadata(topicId, "foo", 15, partitionRacks);
TopicMetadata topicMetadata = new TopicMetadata(topicId, "foo", 15);
assertEquals(topicId, topicMetadata.id());
assertEquals("foo", topicMetadata.name());
assertEquals(15, topicMetadata.numPartitions());
assertEquals(partitionRacks, topicMetadata.partitionRacks());
}
@Test
public void testTopicIdAndNameCannotBeNull() {
assertThrows(NullPointerException.class, () -> new TopicMetadata(Uuid.randomUuid(), null, 15, Collections.emptyMap()));
assertThrows(NullPointerException.class, () -> new TopicMetadata(null, "foo", 15, Collections.emptyMap()));
assertThrows(NullPointerException.class, () -> new TopicMetadata(Uuid.randomUuid(), null, 15));
assertThrows(NullPointerException.class, () -> new TopicMetadata(null, "foo", 15));
}
@Test
public void testEquals() {
Uuid topicId = Uuid.randomUuid();
Map<Integer, Set<String>> partitionRacks = mkMapOfPartitionRacks(15);
TopicMetadata topicMetadata = new TopicMetadata(topicId, "foo", 15, partitionRacks);
TopicMetadata topicMetadata = new TopicMetadata(topicId, "foo", 15);
assertEquals(new TopicMetadata(topicId, "foo", 15, partitionRacks), topicMetadata);
assertNotEquals(new TopicMetadata(topicId, "foo", 5, mkMapOfPartitionRacks(5)), topicMetadata);
assertEquals(new TopicMetadata(topicId, "foo", 15), topicMetadata);
assertNotEquals(new TopicMetadata(topicId, "foo", 5), topicMetadata);
}
@Test
@ -68,11 +64,10 @@ public class TopicMetadataTest {
ConsumerGroupPartitionMetadataValue.TopicMetadata record = new ConsumerGroupPartitionMetadataValue.TopicMetadata()
.setTopicId(topicId)
.setTopicName(topicName)
.setNumPartitions(15)
.setPartitionMetadata(mkListOfPartitionRacks(15));
.setNumPartitions(15);
assertEquals(
new TopicMetadata(topicId, topicName, 15, mkMapOfPartitionRacks(15)),
new TopicMetadata(topicId, topicName, 15),
TopicMetadata.fromRecord(record)
);
}

View File

@ -25,7 +25,6 @@ import org.apache.kafka.image.TopicImage;
import org.apache.kafka.image.TopicsImage;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -83,8 +82,7 @@ public class ConsumerGroupBuilder {
subscriptionMetadata.put(topicName, new TopicMetadata(
topicImage.id(),
topicImage.name(),
topicImage.partitions().size(),
Collections.emptyMap()
topicImage.partitions().size()
));
}
})

View File

@ -58,7 +58,6 @@ import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpersTest.mkMapOfPartitionRacks;
import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HETEROGENEOUS;
import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@ -655,7 +654,7 @@ public class ConsumerGroupTest {
// Compute while taking into account member 1.
assertEquals(
mkMap(
mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1)))
mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1))
),
consumerGroup.computeSubscriptionMetadata(
consumerGroup.computeSubscribedTopicNames(null, member1),
@ -670,7 +669,7 @@ public class ConsumerGroupTest {
// It should return foo now.
assertEquals(
mkMap(
mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1)))
mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1))
),
consumerGroup.computeSubscriptionMetadata(
consumerGroup.computeSubscribedTopicNames(null, null),
@ -692,8 +691,8 @@ public class ConsumerGroupTest {
// Compute while taking into account member 2.
assertEquals(
mkMap(
mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1))),
mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, mkMapOfPartitionRacks(2)))
mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)),
mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2))
),
consumerGroup.computeSubscriptionMetadata(
consumerGroup.computeSubscribedTopicNames(null, member2),
@ -708,8 +707,8 @@ public class ConsumerGroupTest {
// It should return foo and bar.
assertEquals(
mkMap(
mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1))),
mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, mkMapOfPartitionRacks(2)))
mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)),
mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2))
),
consumerGroup.computeSubscriptionMetadata(
consumerGroup.computeSubscribedTopicNames(null, null),
@ -721,7 +720,7 @@ public class ConsumerGroupTest {
// Compute while taking into account removal of member 2.
assertEquals(
mkMap(
mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1)))
mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1))
),
consumerGroup.computeSubscriptionMetadata(
consumerGroup.computeSubscribedTopicNames(member2, null),
@ -733,7 +732,7 @@ public class ConsumerGroupTest {
// Removing member1 results in returning bar.
assertEquals(
mkMap(
mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, mkMapOfPartitionRacks(2)))
mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2))
),
consumerGroup.computeSubscriptionMetadata(
consumerGroup.computeSubscribedTopicNames(member1, null),
@ -745,9 +744,9 @@ public class ConsumerGroupTest {
// Compute while taking into account member 3.
assertEquals(
mkMap(
mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1))),
mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, mkMapOfPartitionRacks(2))),
mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3, mkMapOfPartitionRacks(3)))
mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)),
mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2)),
mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3))
),
consumerGroup.computeSubscriptionMetadata(
consumerGroup.computeSubscribedTopicNames(null, member3),
@ -762,9 +761,9 @@ public class ConsumerGroupTest {
// It should return foo, bar and zar.
assertEquals(
mkMap(
mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1))),
mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, mkMapOfPartitionRacks(2))),
mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3, mkMapOfPartitionRacks(3)))
mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)),
mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2)),
mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3))
),
consumerGroup.computeSubscriptionMetadata(
consumerGroup.computeSubscribedTopicNames(null, null),
@ -786,7 +785,7 @@ public class ConsumerGroupTest {
// Compute while taking into account removal of member 2 and member 3.
assertEquals(
mkMap(
mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1)))
mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1))
),
consumerGroup.computeSubscriptionMetadata(
consumerGroup.computeSubscribedTopicNames(new HashSet<>(Arrays.asList(member2, member3))),
@ -798,8 +797,8 @@ public class ConsumerGroupTest {
// Compute while taking into account removal of member 1.
assertEquals(
mkMap(
mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, mkMapOfPartitionRacks(2))),
mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3, mkMapOfPartitionRacks(3)))
mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2)),
mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3))
),
consumerGroup.computeSubscriptionMetadata(
consumerGroup.computeSubscribedTopicNames(Collections.singleton(member1)),
@ -811,9 +810,9 @@ public class ConsumerGroupTest {
// It should return foo, bar and zar.
assertEquals(
mkMap(
mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1))),
mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, mkMapOfPartitionRacks(2))),
mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3, mkMapOfPartitionRacks(3)))
mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)),
mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2)),
mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3))
),
consumerGroup.computeSubscriptionMetadata(
consumerGroup.computeSubscribedTopicNames(Collections.emptySet()),
@ -1220,8 +1219,8 @@ public class ConsumerGroupTest {
assertEquals(
mkMap(
mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1))),
mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, mkMapOfPartitionRacks(2)))
mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)),
mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2))
),
consumerGroup.computeSubscriptionMetadata(
consumerGroup.computeSubscribedTopicNames(null, null),

View File

@ -25,7 +25,6 @@ import org.apache.kafka.image.TopicImage;
import org.apache.kafka.image.TopicsImage;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -83,8 +82,7 @@ public class ShareGroupBuilder {
subscriptionMetadata.put(topicName, new TopicMetadata(
topicImage.id(),
topicImage.name(),
topicImage.partitions().size(),
Collections.emptyMap()
topicImage.partitions().size()
));
}
})

View File

@ -43,7 +43,6 @@ import java.util.HashSet;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpersTest.mkMapOfPartitionRacks;
import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HETEROGENEOUS;
import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@ -181,7 +180,7 @@ public class ShareGroupTest {
// Compute while taking into account member 1.
assertEquals(
mkMap(
mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1)))
mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1))
),
shareGroup.computeSubscriptionMetadata(
shareGroup.computeSubscribedTopicNames(null, member1),
@ -196,7 +195,7 @@ public class ShareGroupTest {
// It should return foo now.
assertEquals(
mkMap(
mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1)))
mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1))
),
shareGroup.computeSubscriptionMetadata(
shareGroup.computeSubscribedTopicNames(null, null),
@ -218,8 +217,8 @@ public class ShareGroupTest {
// Compute while taking into account member 2.
assertEquals(
mkMap(
mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1))),
mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, mkMapOfPartitionRacks(2)))
mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)),
mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2))
),
shareGroup.computeSubscriptionMetadata(
shareGroup.computeSubscribedTopicNames(null, member2),
@ -234,8 +233,8 @@ public class ShareGroupTest {
// It should return foo and bar.
assertEquals(
mkMap(
mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1))),
mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, mkMapOfPartitionRacks(2)))
mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)),
mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2))
),
shareGroup.computeSubscriptionMetadata(
shareGroup.computeSubscribedTopicNames(null, null),
@ -247,7 +246,7 @@ public class ShareGroupTest {
// Compute while taking into account removal of member 2.
assertEquals(
mkMap(
mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1)))
mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1))
),
shareGroup.computeSubscriptionMetadata(
shareGroup.computeSubscribedTopicNames(member2, null),
@ -259,7 +258,7 @@ public class ShareGroupTest {
// Removing member1 results in returning bar.
assertEquals(
mkMap(
mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, mkMapOfPartitionRacks(2)))
mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2))
),
shareGroup.computeSubscriptionMetadata(
shareGroup.computeSubscribedTopicNames(member1, null),
@ -271,9 +270,9 @@ public class ShareGroupTest {
// Compute while taking into account member 3.
assertEquals(
mkMap(
mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1))),
mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, mkMapOfPartitionRacks(2))),
mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3, mkMapOfPartitionRacks(3)))
mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)),
mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2)),
mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3))
),
shareGroup.computeSubscriptionMetadata(
shareGroup.computeSubscribedTopicNames(null, member3),
@ -288,9 +287,9 @@ public class ShareGroupTest {
// It should return foo, bar and zar.
assertEquals(
mkMap(
mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1))),
mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, mkMapOfPartitionRacks(2))),
mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3, mkMapOfPartitionRacks(3)))
mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)),
mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2)),
mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3))
),
shareGroup.computeSubscriptionMetadata(
shareGroup.computeSubscribedTopicNames(null, null),
@ -312,7 +311,7 @@ public class ShareGroupTest {
// Compute while taking into account removal of member 2 and member 3.
assertEquals(
mkMap(
mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1)))
mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1))
),
shareGroup.computeSubscriptionMetadata(
shareGroup.computeSubscribedTopicNames(new HashSet<>(Arrays.asList(member2, member3))),
@ -324,8 +323,8 @@ public class ShareGroupTest {
// Compute while taking into account removal of member 1.
assertEquals(
mkMap(
mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, mkMapOfPartitionRacks(2))),
mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3, mkMapOfPartitionRacks(3)))
mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2)),
mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3))
),
shareGroup.computeSubscriptionMetadata(
shareGroup.computeSubscribedTopicNames(Collections.singleton(member1)),
@ -337,9 +336,9 @@ public class ShareGroupTest {
// It should return foo, bar and zar.
assertEquals(
mkMap(
mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1))),
mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, mkMapOfPartitionRacks(2))),
mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3, mkMapOfPartitionRacks(3)))
mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)),
mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2)),
mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3))
),
shareGroup.computeSubscriptionMetadata(
shareGroup.computeSubscribedTopicNames(Collections.emptySet()),
@ -644,8 +643,8 @@ public class ShareGroupTest {
assertEquals(
mkMap(
mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1))),
mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, mkMapOfPartitionRacks(2)))
mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)),
mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2))
),
shareGroup.computeSubscriptionMetadata(
shareGroup.computeSubscribedTopicNames(null, null),

View File

@ -93,14 +93,11 @@ public class AssignorBenchmarkUtils {
*
* @param topicNames The names of the topics.
* @param partitionsPerTopic The number of partitions per topic.
* @param getTopicPartitionRacks A function to get the racks map for each topic. May return
* an empty map if no rack info is desired.
* @return The subscription metadata map.
*/
public static Map<String, TopicMetadata> createSubscriptionMetadata(
List<String> topicNames,
int partitionsPerTopic,
Function<String, Map<Integer, Set<String>>> getTopicPartitionRacks
int partitionsPerTopic
) {
Map<String, TopicMetadata> subscriptionMetadata = new HashMap<>();
@ -110,8 +107,7 @@ public class AssignorBenchmarkUtils {
TopicMetadata metadata = new TopicMetadata(
topicId,
topicName,
partitionsPerTopic,
getTopicPartitionRacks.apply(topicName)
partitionsPerTopic
);
subscriptionMetadata.put(topicName, metadata);
}

View File

@ -148,13 +148,9 @@ public class ServerSideAssignorBenchmark {
allTopicNames = AssignorBenchmarkUtils.createTopicNames(topicCount);
int partitionsPerTopic = (memberCount * partitionsToMemberRatio) / topicCount;
Map<Integer, Set<String>> partitionRacks = isRackAware ?
mkMapOfPartitionRacks(partitionsPerTopic) :
Collections.emptyMap();
subscriptionMetadata = AssignorBenchmarkUtils.createSubscriptionMetadata(
allTopicNames,
partitionsPerTopic,
topicName -> partitionRacks
partitionsPerTopic
);
topicsImage = AssignorBenchmarkUtils.createTopicsImage(subscriptionMetadata);

View File

@ -122,8 +122,7 @@ public class TargetAssignmentBuilderBenchmark {
int partitionsPerTopic = (memberCount * partitionsToMemberRatio) / topicCount;
subscriptionMetadata = AssignorBenchmarkUtils.createSubscriptionMetadata(
allTopicNames,
partitionsPerTopic,
topicName -> Collections.emptyMap()
partitionsPerTopic
);
topicsImage = AssignorBenchmarkUtils.createTopicsImage(subscriptionMetadata);