From bb97d63d418fe047cf5a59b16c7004e5011402da Mon Sep 17 00:00:00 2001 From: PoAn Yang Date: Wed, 25 Sep 2024 15:48:48 +0800 Subject: [PATCH] KAFKA-17578: Remove partitionRacks from TopicMetadata (#17233) The ModernGroup#subscribedTopicMetadata takes too much memory due to partitionRacks. This is not being used at the moment as the consumer protocol does not support rack aware assignments. A heap dump from a group with 500 members, 2K subscribed topic partitions shows 654,400 bytes used for partitionRacks. The rest of the ConsumerGroup object holds 822,860 bytes. Reviewers: David Jacot --- .../group/GroupCoordinatorRecordHelpers.java | 22 --- .../coordinator/group/modern/ModernGroup.java | 20 +-- .../modern/SubscribedTopicDescriberImpl.java | 3 +- .../group/modern/TopicMetadata.java | 60 ++----- .../ConsumerGroupPartitionMetadataValue.json | 2 +- .../GroupCoordinatorRecordHelpersTest.java | 46 +----- .../group/GroupMetadataManagerTest.java | 153 ++++++++---------- ...OptimizedUniformAssignmentBuilderTest.java | 49 ++---- .../group/assignor/RangeAssignorTest.java | 72 +++------ .../group/assignor/SimpleAssignorTest.java | 27 ++-- ...ormHeterogeneousAssignmentBuilderTest.java | 64 +++----- .../modern/SubscribedTopicMetadataTest.java | 35 +--- .../modern/TargetAssignmentBuilderTest.java | 35 ++-- .../group/modern/TopicMetadataTest.java | 21 +-- .../modern/consumer/ConsumerGroupBuilder.java | 4 +- .../modern/consumer/ConsumerGroupTest.java | 45 +++--- .../group/modern/share/ShareGroupBuilder.java | 4 +- .../group/modern/share/ShareGroupTest.java | 45 +++--- .../jmh/assignor/AssignorBenchmarkUtils.java | 8 +- .../assignor/ServerSideAssignorBenchmark.java | 6 +- .../TargetAssignmentBuilderBenchmark.java | 3 +- 21 files changed, 238 insertions(+), 486 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java index 3665fb05224..67364470b0f 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java @@ -139,21 +139,10 @@ public class GroupCoordinatorRecordHelpers { ) { ConsumerGroupPartitionMetadataValue value = new ConsumerGroupPartitionMetadataValue(); newSubscriptionMetadata.forEach((topicName, topicMetadata) -> { - List partitionMetadata = new ArrayList<>(); - // If the partition rack information map is empty, store an empty list in the record. - if (!topicMetadata.partitionRacks().isEmpty()) { - topicMetadata.partitionRacks().forEach((partition, racks) -> - partitionMetadata.add(new ConsumerGroupPartitionMetadataValue.PartitionMetadata() - .setPartition(partition) - .setRacks(new ArrayList<>(racks)) - ) - ); - } value.topics().add(new ConsumerGroupPartitionMetadataValue.TopicMetadata() .setTopicId(topicMetadata.id()) .setTopicName(topicMetadata.name()) .setNumPartitions(topicMetadata.numPartitions()) - .setPartitionMetadata(partitionMetadata) ); }); @@ -657,21 +646,10 @@ public class GroupCoordinatorRecordHelpers { ) { ShareGroupPartitionMetadataValue value = new ShareGroupPartitionMetadataValue(); newSubscriptionMetadata.forEach((topicName, topicMetadata) -> { - List partitionMetadata = new ArrayList<>(); - // If the partition rack information map is empty, store an empty list in the record. - if (!topicMetadata.partitionRacks().isEmpty()) { - topicMetadata.partitionRacks().forEach((partition, racks) -> - partitionMetadata.add(new ShareGroupPartitionMetadataValue.PartitionMetadata() - .setPartition(partition) - .setRacks(new ArrayList<>(racks)) - ) - ); - } value.topics().add(new ShareGroupPartitionMetadataValue.TopicMetadata() .setTopicId(topicMetadata.id()) .setTopicName(topicMetadata.name()) .setNumPartitions(topicMetadata.numPartitions()) - .setPartitionMetadata(partitionMetadata) ); }); diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java index 508b1c7d131..823792c198e 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java @@ -35,7 +35,6 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Objects; -import java.util.Optional; import java.util.Set; import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HETEROGENEOUS; @@ -389,26 +388,11 @@ public abstract class ModernGroup implements Group subscribedTopicNames.forEach((topicName, count) -> { TopicImage topicImage = topicsImage.getTopic(topicName); if (topicImage != null) { - Map> partitionRacks = new HashMap<>(); - topicImage.partitions().forEach((partition, partitionRegistration) -> { - Set racks = new HashSet<>(); - for (int replica : partitionRegistration.replicas) { - Optional rackOptional = clusterImage.broker(replica).rack(); - // Only add the rack if it is available for the broker/replica. - rackOptional.ifPresent(racks::add); - } - // If rack information is unavailable for all replicas of this partition, - // no corresponding entry will be stored for it in the map. - if (!racks.isEmpty()) - partitionRacks.put(partition, racks); - }); - newSubscriptionMetadata.put(topicName, new TopicMetadata( topicImage.id(), topicImage.name(), - topicImage.partitions().size(), - partitionRacks) - ); + topicImage.partitions().size() + )); } }); diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/SubscribedTopicDescriberImpl.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/SubscribedTopicDescriberImpl.java index 81a03f97a82..7871b04d722 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/SubscribedTopicDescriberImpl.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/SubscribedTopicDescriberImpl.java @@ -72,8 +72,7 @@ public class SubscribedTopicDescriberImpl implements SubscribedTopicDescriber { */ @Override public Set racksForPartition(Uuid topicId, int partition) { - TopicMetadata topic = this.topicMetadata.get(topicId); - return topic == null ? Collections.emptySet() : topic.partitionRacks().getOrDefault(partition, Collections.emptySet()); + return Collections.emptySet(); } @Override diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TopicMetadata.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TopicMetadata.java index 2eba447203b..ace670cc0e4 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TopicMetadata.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TopicMetadata.java @@ -20,12 +20,7 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataValue; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; import java.util.Objects; -import java.util.Set; /** * Immutable topic metadata. @@ -46,17 +41,10 @@ public class TopicMetadata { */ private final int numPartitions; - /** - * Map of every partition Id to a set of its rack Ids, if they exist. - * If rack information is unavailable for all partitions, this is an empty map. - */ - private final Map> partitionRacks; - public TopicMetadata( Uuid id, String name, - int numPartitions, - Map> partitionRacks + int numPartitions ) { this.id = Objects.requireNonNull(id); if (Uuid.ZERO_UUID.equals(id)) { @@ -70,7 +58,6 @@ public class TopicMetadata { if (numPartitions < 0) { throw new IllegalArgumentException("Number of partitions cannot be negative."); } - this.partitionRacks = Objects.requireNonNull(partitionRacks); } /** @@ -94,14 +81,6 @@ public class TopicMetadata { return this.numPartitions; } - /** - * @return Every partition mapped to the set of corresponding available rack Ids of its replicas. - * An empty map is returned if rack information is unavailable for all partitions. - */ - public Map> partitionRacks() { - return this.partitionRacks; - } - @Override public boolean equals(Object o) { if (this == o) return true; @@ -111,8 +90,7 @@ public class TopicMetadata { if (!id.equals(that.id)) return false; if (!name.equals(that.name)) return false; - if (numPartitions != that.numPartitions) return false; - return partitionRacks.equals(that.partitionRacks); + return numPartitions == that.numPartitions; } @Override @@ -120,7 +98,6 @@ public class TopicMetadata { int result = id.hashCode(); result = 31 * result + name.hashCode(); result = 31 * result + numPartitions; - result = 31 * result + partitionRacks.hashCode(); return result; } @@ -130,45 +107,26 @@ public class TopicMetadata { "id=" + id + ", name=" + name + ", numPartitions=" + numPartitions + - ", partitionRacks=" + partitionRacks + ')'; } public static TopicMetadata fromRecord( ConsumerGroupPartitionMetadataValue.TopicMetadata record ) { - // Converting the data type from a list stored in the record to a map for the topic metadata. - Map> partitionRacks = new HashMap<>(); - for (ConsumerGroupPartitionMetadataValue.PartitionMetadata partitionMetadata : record.partitionMetadata()) { - partitionRacks.put( - partitionMetadata.partition(), - Collections.unmodifiableSet(new HashSet<>(partitionMetadata.racks())) - ); - } - return new TopicMetadata( record.topicId(), record.topicName(), - record.numPartitions(), - partitionRacks); + record.numPartitions() + ); } public static TopicMetadata fromRecord( - ShareGroupPartitionMetadataValue.TopicMetadata record + ShareGroupPartitionMetadataValue.TopicMetadata record ) { - // Converting the data type from a list stored in the record to a map for the topic metadata. - Map> partitionRacks = new HashMap<>(); - for (ShareGroupPartitionMetadataValue.PartitionMetadata partitionMetadata : record.partitionMetadata()) { - partitionRacks.put( - partitionMetadata.partition(), - Collections.unmodifiableSet(new HashSet<>(partitionMetadata.racks())) - ); - } - return new TopicMetadata( - record.topicId(), - record.topicName(), - record.numPartitions(), - partitionRacks); + record.topicId(), + record.topicName(), + record.numPartitions() + ); } } diff --git a/group-coordinator/src/main/resources/common/message/ConsumerGroupPartitionMetadataValue.json b/group-coordinator/src/main/resources/common/message/ConsumerGroupPartitionMetadataValue.json index 4a17d6f685d..89be8cfa056 100644 --- a/group-coordinator/src/main/resources/common/message/ConsumerGroupPartitionMetadataValue.json +++ b/group-coordinator/src/main/resources/common/message/ConsumerGroupPartitionMetadataValue.json @@ -29,7 +29,7 @@ { "name": "NumPartitions", "versions": "0+", "type": "int32", "about": "The number of partitions of the topic." }, { "name": "PartitionMetadata", "versions": "0+", "type": "[]PartitionMetadata", - "about": "Partitions mapped to a set of racks. If the rack information is unavailable for all the partitions, an empty list is stored", "fields": [ + "about": "Deprecated: this field is not used after 4.0. Partitions mapped to a set of racks. If the rack information is unavailable for all the partitions, an empty list is stored", "fields": [ { "name": "Partition", "versions": "0+", "type": "int32", "about": "The partition number." }, { "name": "Racks", "versions": "0+", "type": "[]string", diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java index f71798a0759..38e2b9d68cf 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java @@ -165,14 +165,12 @@ public class GroupCoordinatorRecordHelpersTest { subscriptionMetadata.put("foo", new TopicMetadata( fooTopicId, "foo", - 10, - mkMapOfPartitionRacks(10) + 10 )); subscriptionMetadata.put("bar", new TopicMetadata( barTopicId, "bar", - 20, - mkMapOfPartitionRacks(20) + 20 )); CoordinatorRecord expectedRecord = new CoordinatorRecord( @@ -187,13 +185,11 @@ public class GroupCoordinatorRecordHelpersTest { new ConsumerGroupPartitionMetadataValue.TopicMetadata() .setTopicId(fooTopicId) .setTopicName("foo") - .setNumPartitions(10) - .setPartitionMetadata(mkListOfPartitionRacks(10)), + .setNumPartitions(10), new ConsumerGroupPartitionMetadataValue.TopicMetadata() .setTopicId(barTopicId) .setTopicName("bar") - .setNumPartitions(20) - .setPartitionMetadata(mkListOfPartitionRacks(20)))), + .setNumPartitions(20))), (short) 0)); assertRecordEquals(expectedRecord, newConsumerGroupSubscriptionMetadataRecord( @@ -226,14 +222,12 @@ public class GroupCoordinatorRecordHelpersTest { subscriptionMetadata.put("foo", new TopicMetadata( fooTopicId, "foo", - 10, - Collections.emptyMap() + 10 )); subscriptionMetadata.put("bar", new TopicMetadata( barTopicId, "bar", - 20, - Collections.emptyMap() + 20 )); CoordinatorRecord expectedRecord = new CoordinatorRecord( @@ -248,13 +242,11 @@ public class GroupCoordinatorRecordHelpersTest { new ConsumerGroupPartitionMetadataValue.TopicMetadata() .setTopicId(fooTopicId) .setTopicName("foo") - .setNumPartitions(10) - .setPartitionMetadata(Collections.emptyList()), + .setNumPartitions(10), new ConsumerGroupPartitionMetadataValue.TopicMetadata() .setTopicId(barTopicId) .setTopicName("bar") - .setNumPartitions(20) - .setPartitionMetadata(Collections.emptyList()))), + .setNumPartitions(20))), (short) 0)); assertRecordEquals(expectedRecord, newConsumerGroupSubscriptionMetadataRecord( @@ -821,28 +813,6 @@ public class GroupCoordinatorRecordHelpersTest { assertEquals(expectedRecord, record); } - /** - * Creates a list of values to be added to the record and assigns partitions to racks for testing. - * - * @param numPartitions The number of partitions for the topic. - * - * For testing purposes, the following criteria are used: - * - Number of replicas for each partition: 2 - * - Number of racks available to the cluster: 4 - */ - public static List mkListOfPartitionRacks(int numPartitions) { - List partitionRacks = new ArrayList<>(numPartitions); - for (int i = 0; i < numPartitions; i++) { - List racks = new ArrayList<>(Arrays.asList("rack" + i % 4, "rack" + (i + 1) % 4)); - partitionRacks.add( - new ConsumerGroupPartitionMetadataValue.PartitionMetadata() - .setPartition(i) - .setRacks(racks) - ); - } - return partitionRacks; - } - /** * Creates a map of partitions to racks for testing. * diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index 4fbe8967ad7..2a56cd912cd 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -125,7 +125,6 @@ import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; import static org.apache.kafka.coordinator.group.GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG; import static org.apache.kafka.coordinator.group.GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG; -import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpersTest.mkMapOfPartitionRacks; import static org.apache.kafka.coordinator.group.GroupMetadataManager.EMPTY_RESULT; import static org.apache.kafka.coordinator.group.GroupMetadataManager.appendGroupMetadataErrorToResponseError; import static org.apache.kafka.coordinator.group.GroupMetadataManager.classicGroupHeartbeatKey; @@ -501,8 +500,8 @@ public class GroupMetadataManagerTest { List expectedRecords = Arrays.asList( GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember), GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, new HashMap() {{ - put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6))); - put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3, mkMapOfPartitionRacks(3))); + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6)); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3)); }}), GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId, mkAssignment( @@ -600,8 +599,8 @@ public class GroupMetadataManagerTest { GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember), GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, new HashMap() { { - put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6))); - put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3, mkMapOfPartitionRacks(3))); + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6)); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3)); } }), GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11), @@ -725,12 +724,6 @@ public class GroupMetadataManagerTest { List expectedRecords = Arrays.asList( GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember3), - GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, new HashMap() { - { - put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6))); - put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3, mkMapOfPartitionRacks(3))); - } - }), GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, mkAssignment( mkTopicAssignment(fooTopicId, 0, 1), @@ -748,9 +741,9 @@ public class GroupMetadataManagerTest { GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember3) ); - assertRecordsEquals(expectedRecords.subList(0, 3), result.records().subList(0, 3)); - assertUnorderedListEquals(expectedRecords.subList(3, 6), result.records().subList(3, 6)); - assertRecordsEquals(expectedRecords.subList(6, 8), result.records().subList(6, 8)); + assertRecordsEquals(expectedRecords.subList(0, 2), result.records().subList(0, 2)); + assertUnorderedListEquals(expectedRecords.subList(2, 5), result.records().subList(2, 5)); + assertRecordsEquals(expectedRecords.subList(5, 7), result.records().subList(5, 7)); } @Test @@ -837,8 +830,8 @@ public class GroupMetadataManagerTest { // Subscription metadata is recomputed because zar is no longer there. GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, new HashMap() { { - put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6))); - put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3, mkMapOfPartitionRacks(3))); + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6)); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3)); } }), GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11) @@ -961,12 +954,6 @@ public class GroupMetadataManagerTest { List expectedRecords = Arrays.asList( GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember3), - GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, new HashMap() { - { - put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6))); - put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3, mkMapOfPartitionRacks(3))); - } - }), GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, mkAssignment( mkTopicAssignment(fooTopicId, 0, 1), @@ -984,9 +971,9 @@ public class GroupMetadataManagerTest { GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember3) ); - assertRecordsEquals(expectedRecords.subList(0, 3), result.records().subList(0, 3)); - assertUnorderedListEquals(expectedRecords.subList(3, 6), result.records().subList(3, 6)); - assertRecordsEquals(expectedRecords.subList(6, 8), result.records().subList(6, 8)); + assertRecordsEquals(expectedRecords.subList(0, 2), result.records().subList(0, 2)); + assertUnorderedListEquals(expectedRecords.subList(2, 5), result.records().subList(2, 5)); + assertRecordsEquals(expectedRecords.subList(5, 7), result.records().subList(5, 7)); } @Test @@ -1052,8 +1039,8 @@ public class GroupMetadataManagerTest { .withAssignmentEpoch(10) .withSubscriptionMetadata(new HashMap() { { - put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6))); - put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3, mkMapOfPartitionRacks(3))); + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6)); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3)); } })) .build(); @@ -1225,7 +1212,7 @@ public class GroupMetadataManagerTest { .withAssignmentEpoch(10) .withSubscriptionMetadata(new HashMap() { { - put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6))); + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6)); } })) .build(); @@ -1343,8 +1330,8 @@ public class GroupMetadataManagerTest { GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedRejoinedMember), GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, new HashMap() { { - put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6))); - put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3, mkMapOfPartitionRacks(3))); + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6)); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3)); } }), GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11), @@ -1538,8 +1525,8 @@ public class GroupMetadataManagerTest { // Subscription metadata is recomputed because zar is no longer there. GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, new HashMap() { { - put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6))); - put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3, mkMapOfPartitionRacks(3))); + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6)); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3)); } }), GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11) @@ -2597,7 +2584,7 @@ public class GroupMetadataManagerTest { { // foo only has 3 partitions stored in the metadata but foo has // 6 partitions the metadata image. - put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 3, mkMapOfPartitionRacks(3))); + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 3)); } })) .build(); @@ -2651,7 +2638,7 @@ public class GroupMetadataManagerTest { List expectedRecords = Arrays.asList( GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, new HashMap() { { - put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6))); + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6)); } }), GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11), @@ -2708,7 +2695,7 @@ public class GroupMetadataManagerTest { { // foo only has 3 partitions stored in the metadata but foo has // 6 partitions the metadata image. - put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 3, mkMapOfPartitionRacks(3))); + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 3)); } })) .build(); @@ -2780,7 +2767,7 @@ public class GroupMetadataManagerTest { List expectedRecords = Arrays.asList( GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, new HashMap() { { - put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6))); + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6)); } }), GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11), @@ -9845,8 +9832,8 @@ public class GroupMetadataManagerTest { // The subscription metadata hasn't been updated during the conversion, so a new one is computed. GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, new HashMap() { { - put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 1, mkMapOfPartitionRacks(1))); - put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1, mkMapOfPartitionRacks(1))); + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 1)); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1)); } }), @@ -10078,8 +10065,8 @@ public class GroupMetadataManagerTest { // The subscription metadata hasn't been updated during the conversion, so a new one is computed. GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, new HashMap() { { - put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); - put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1, mkMapOfPartitionRacks(1))); + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2)); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1)); } }), @@ -10241,7 +10228,7 @@ public class GroupMetadataManagerTest { // The subscription metadata hasn't been updated during the conversion, so a new one is computed. GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, new HashMap() { { - put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 1, mkMapOfPartitionRacks(1))); + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 1)); } }), @@ -10345,8 +10332,8 @@ public class GroupMetadataManagerTest { context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, new HashMap() { { - put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6))); - put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3, mkMapOfPartitionRacks(3))); + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6)); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3)); } })); @@ -10632,8 +10619,8 @@ public class GroupMetadataManagerTest { // The subscription metadata hasn't been updated during the conversion, so a new one is computed. GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, new HashMap() { { - put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); - put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1, mkMapOfPartitionRacks(1))); + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2)); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1)); } }), @@ -10846,8 +10833,8 @@ public class GroupMetadataManagerTest { context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, new HashMap() { { - put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6))); - put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3, mkMapOfPartitionRacks(3))); + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6)); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3)); } })); @@ -11034,8 +11021,8 @@ public class GroupMetadataManagerTest { context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, new HashMap() { { - put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6))); - put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3, mkMapOfPartitionRacks(3))); + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6)); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3)); } })); @@ -11221,9 +11208,9 @@ public class GroupMetadataManagerTest { context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, new HashMap() { { - put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6))); - put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3, mkMapOfPartitionRacks(3))); - put(zarTopicName, new TopicMetadata(zarTopicId, zarTopicName, 1, mkMapOfPartitionRacks(1))); + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6)); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3)); + put(zarTopicName, new TopicMetadata(zarTopicId, zarTopicName, 1)); } })); @@ -11428,7 +11415,7 @@ public class GroupMetadataManagerTest { .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) .withSubscriptionMetadata(new HashMap() { { - put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2)); } }) .withMember(new ConsumerGroupMember.Builder(memberId) @@ -11514,8 +11501,8 @@ public class GroupMetadataManagerTest { GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember), GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, new HashMap() { { - put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); - put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1, mkMapOfPartitionRacks(1))); + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2)); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1)); } }), GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11), @@ -11577,7 +11564,7 @@ public class GroupMetadataManagerTest { .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) .withSubscriptionMetadata(new HashMap() { { - put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2)); } }) .withMember(new ConsumerGroupMember.Builder(memberId) @@ -11633,7 +11620,7 @@ public class GroupMetadataManagerTest { .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) .withSubscriptionMetadata(new HashMap() { { - put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2)); } }) .withMember(new ConsumerGroupMember.Builder(memberId) @@ -11684,8 +11671,8 @@ public class GroupMetadataManagerTest { GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember), GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, new HashMap() { { - put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); - put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1, mkMapOfPartitionRacks(1))); + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2)); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1)); } }), GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11), @@ -11728,7 +11715,7 @@ public class GroupMetadataManagerTest { .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) .withSubscriptionMetadata(new HashMap() { { - put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2)); } }) .withMember(new ConsumerGroupMember.Builder(memberId) @@ -11850,9 +11837,9 @@ public class GroupMetadataManagerTest { .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) .withSubscriptionMetadata(new HashMap() { { - put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); - put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1, mkMapOfPartitionRacks(1))); - put(zarTopicName, new TopicMetadata(zarTopicId, zarTopicName, 1, mkMapOfPartitionRacks(1))); + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2)); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1)); + put(zarTopicName, new TopicMetadata(zarTopicId, zarTopicName, 1)); } }) .withMember(new ConsumerGroupMember.Builder(memberId1) @@ -12080,8 +12067,8 @@ public class GroupMetadataManagerTest { .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) .withSubscriptionMetadata(new HashMap() { { - put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); - put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1, mkMapOfPartitionRacks(1))); + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2)); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1)); } }) .withMember(new ConsumerGroupMember.Builder(memberId1) @@ -12182,9 +12169,9 @@ public class GroupMetadataManagerTest { GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1), GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, new HashMap() { { - put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); - put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1, mkMapOfPartitionRacks(1))); - put(zarTopicName, new TopicMetadata(zarTopicId, zarTopicName, 1, mkMapOfPartitionRacks(1))); + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2)); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1)); + put(zarTopicName, new TopicMetadata(zarTopicId, zarTopicName, 1)); } }), GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11), @@ -12322,8 +12309,8 @@ public class GroupMetadataManagerTest { .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) .withSubscriptionMetadata(new HashMap() { { - put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); - put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1, mkMapOfPartitionRacks(1))); + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2)); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1)); } }) .withMember(new ConsumerGroupMember.Builder(memberId1) @@ -12424,9 +12411,9 @@ public class GroupMetadataManagerTest { GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1), GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, new HashMap() { { - put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); - put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1, mkMapOfPartitionRacks(1))); - put(zarTopicName, new TopicMetadata(zarTopicId, zarTopicName, 1, mkMapOfPartitionRacks(1))); + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2)); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1)); + put(zarTopicName, new TopicMetadata(zarTopicId, zarTopicName, 1)); } }), GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11), @@ -13422,8 +13409,8 @@ public class GroupMetadataManagerTest { context.groupMetadataManager.consumerGroup(groupId).setMetadataRefreshDeadline(Long.MAX_VALUE, 10); context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, new HashMap() { { - put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); - put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1, mkMapOfPartitionRacks(1))); + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2)); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1)); } })); @@ -13594,8 +13581,8 @@ public class GroupMetadataManagerTest { context.groupMetadataManager.consumerGroup(groupId).setMetadataRefreshDeadline(Long.MAX_VALUE, 10); context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, new HashMap() { { - put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); - put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1, mkMapOfPartitionRacks(1))); + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2)); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 1)); } })); @@ -13627,7 +13614,7 @@ public class GroupMetadataManagerTest { // Update the subscription metadata. GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, new HashMap() { { - put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2, mkMapOfPartitionRacks(2))); + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2)); } }), // Bump the group epoch. @@ -13988,8 +13975,8 @@ public class GroupMetadataManagerTest { GroupCoordinatorRecordHelpers.newShareGroupMemberSubscriptionRecord(groupId, expectedMember), GroupCoordinatorRecordHelpers.newShareGroupSubscriptionMetadataRecord(groupId, new HashMap() { { - put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6))); - put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3, mkMapOfPartitionRacks(3))); + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6)); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3)); } }), GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 1), @@ -14082,8 +14069,8 @@ public class GroupMetadataManagerTest { // Subscription metadata is recomputed because zar is no longer there. GroupCoordinatorRecordHelpers.newShareGroupSubscriptionMetadataRecord(groupId, new HashMap() { { - put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6))); - put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3, mkMapOfPartitionRacks(3))); + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6)); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3)); } }), GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 11) @@ -14348,7 +14335,7 @@ public class GroupMetadataManagerTest { Map metadata = Collections.singletonMap( "bar", - new TopicMetadata(Uuid.randomUuid(), "bar", 10, Collections.emptyMap()) + new TopicMetadata(Uuid.randomUuid(), "bar", 10) ); // The group is created if it does not exist. diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java index 295794c36ad..a0d524ab839 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java @@ -44,7 +44,6 @@ import static org.apache.kafka.coordinator.group.AssignmentTestUtil.invertedTarg import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkOrderedAssignment; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; -import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpersTest.mkMapOfPartitionRacks; import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -70,8 +69,7 @@ public class OptimizedUniformAssignmentBuilderTest { new TopicMetadata( topic1Uuid, topic1Name, - 3, - mkMapOfPartitionRacks(3) + 3 ) ) ); @@ -108,8 +106,7 @@ public class OptimizedUniformAssignmentBuilderTest { new TopicMetadata( topic1Uuid, topic1Name, - 3, - mkMapOfPartitionRacks(3) + 3 ) ) ); @@ -140,14 +137,12 @@ public class OptimizedUniformAssignmentBuilderTest { topicMetadata.put(topic1Uuid, new TopicMetadata( topic1Uuid, topic1Name, - 3, - mkMapOfPartitionRacks(3) + 3 )); topicMetadata.put(topic3Uuid, new TopicMetadata( topic3Uuid, topic3Name, - 2, - mkMapOfPartitionRacks(2) + 2 )); Map members = new TreeMap<>(); @@ -197,8 +192,7 @@ public class OptimizedUniformAssignmentBuilderTest { topicMetadata.put(topic3Uuid, new TopicMetadata( topic3Uuid, topic3Name, - 2, - mkMapOfPartitionRacks(2) + 2 )); Map members = new TreeMap<>(); @@ -260,8 +254,7 @@ public class OptimizedUniformAssignmentBuilderTest { topicMetadata.put(topicId, new TopicMetadata( topicId, "topic-" + i, - 3, - mkMapOfPartitionRacks(3) + 3 )); } @@ -296,14 +289,12 @@ public class OptimizedUniformAssignmentBuilderTest { topicMetadata.put(topic1Uuid, new TopicMetadata( topic1Uuid, topic1Name, - 3, - mkMapOfPartitionRacks(3) + 3 )); topicMetadata.put(topic2Uuid, new TopicMetadata( topic2Uuid, topic2Name, - 3, - mkMapOfPartitionRacks(3) + 3 )); Map members = new TreeMap<>(); @@ -361,14 +352,12 @@ public class OptimizedUniformAssignmentBuilderTest { topicMetadata.put(topic1Uuid, new TopicMetadata( topic1Uuid, topic1Name, - 6, - mkMapOfPartitionRacks(6) + 6 )); topicMetadata.put(topic2Uuid, new TopicMetadata( topic2Uuid, topic2Name, - 5, - mkMapOfPartitionRacks(5) + 5 )); Map members = new TreeMap<>(); @@ -425,14 +414,12 @@ public class OptimizedUniformAssignmentBuilderTest { topicMetadata.put(topic1Uuid, new TopicMetadata( topic1Uuid, topic1Name, - 3, - mkMapOfPartitionRacks(3) + 3 )); topicMetadata.put(topic2Uuid, new TopicMetadata( topic2Uuid, topic2Name, - 3, - mkMapOfPartitionRacks(3) + 3 )); Map members = new TreeMap<>(); @@ -499,14 +486,12 @@ public class OptimizedUniformAssignmentBuilderTest { topicMetadata.put(topic1Uuid, new TopicMetadata( topic1Uuid, topic1Name, - 3, - mkMapOfPartitionRacks(3) + 3 )); topicMetadata.put(topic2Uuid, new TopicMetadata( topic2Uuid, topic2Name, - 3, - mkMapOfPartitionRacks(3) + 3 )); Map members = new TreeMap<>(); @@ -565,14 +550,12 @@ public class OptimizedUniformAssignmentBuilderTest { topicMetadata.put(topic1Uuid, new TopicMetadata( topic1Uuid, topic1Name, - 2, - mkMapOfPartitionRacks(2) + 2 )); topicMetadata.put(topic2Uuid, new TopicMetadata( topic2Uuid, topic2Name, - 2, - mkMapOfPartitionRacks(2) + 2 )); // Initial subscriptions were [T1, T2] diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java index 64b946f906e..4c4321cfb2e 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java @@ -68,8 +68,7 @@ public class RangeAssignorTest { new TopicMetadata( topic1Uuid, topic1Name, - 3, - Collections.emptyMap() + 3 ) ) ); @@ -111,8 +110,7 @@ public class RangeAssignorTest { new TopicMetadata( topic1Uuid, topic1Name, - 3, - Collections.emptyMap() + 3 ) ) ); @@ -143,14 +141,12 @@ public class RangeAssignorTest { topicMetadata.put(topic1Uuid, new TopicMetadata( topic1Uuid, topic1Name, - 3, - Collections.emptyMap() + 3 )); topicMetadata.put(topic3Uuid, new TopicMetadata( topic3Uuid, topic3Name, - 2, - Collections.emptyMap() + 2 )); Map members = new TreeMap<>(); @@ -200,20 +196,17 @@ public class RangeAssignorTest { topicMetadata.put(topic1Uuid, new TopicMetadata( topic1Uuid, topic1Name, - 3, - Collections.emptyMap() + 3 )); topicMetadata.put(topic2Uuid, new TopicMetadata( topic2Uuid, topic2Name, - 3, - Collections.emptyMap() + 3 )); topicMetadata.put(topic3Uuid, new TopicMetadata( topic3Uuid, topic3Name, - 2, - Collections.emptyMap() + 2 )); Map members = new TreeMap<>(); @@ -273,14 +266,12 @@ public class RangeAssignorTest { topicMetadata.put(topic1Uuid, new TopicMetadata( topic1Uuid, topic1Name, - 3, - Collections.emptyMap() + 3 )); topicMetadata.put(topic3Uuid, new TopicMetadata( topic3Uuid, topic3Name, - 2, - Collections.emptyMap() + 2 )); Map members = new TreeMap<>(); @@ -343,8 +334,7 @@ public class RangeAssignorTest { new TopicMetadata( topic1Uuid, topic1Name, - 3, - Collections.emptyMap() + 3 ) ) ); @@ -414,8 +404,7 @@ public class RangeAssignorTest { new TopicMetadata( topic1Uuid, topic1Name, - 5, - Collections.emptyMap() + 5 ) ) ); @@ -497,14 +486,12 @@ public class RangeAssignorTest { topicMetadata.put(topic1Uuid, new TopicMetadata( topic1Uuid, topic1Name, - 2, - Collections.emptyMap() + 2 )); topicMetadata.put(topic2Uuid, new TopicMetadata( topic2Uuid, topic2Name, - 2, - Collections.emptyMap() + 2 )); Map members = new TreeMap<>(); @@ -571,14 +558,12 @@ public class RangeAssignorTest { topicMetadata.put(topic1Uuid, new TopicMetadata( topic1Uuid, topic1Name, - 4, - Collections.emptyMap() + 4 )); topicMetadata.put(topic2Uuid, new TopicMetadata( topic2Uuid, topic2Name, - 4, - Collections.emptyMap() + 4 )); Map members = new TreeMap<>(); @@ -634,14 +619,12 @@ public class RangeAssignorTest { topicMetadata.put(topic1Uuid, new TopicMetadata( topic1Uuid, topic1Name, - 3, - Collections.emptyMap() + 3 )); topicMetadata.put(topic2Uuid, new TopicMetadata( topic2Uuid, topic2Name, - 3, - Collections.emptyMap() + 3 )); Map members = new TreeMap<>(); @@ -710,14 +693,12 @@ public class RangeAssignorTest { topicMetadata.put(topic1Uuid, new TopicMetadata( topic1Uuid, topic1Name, - 4, - Collections.emptyMap() + 4 )); topicMetadata.put(topic2Uuid, new TopicMetadata( topic2Uuid, topic2Name, - 3, - Collections.emptyMap() + 3 )); Map members = new TreeMap<>(); @@ -784,14 +765,12 @@ public class RangeAssignorTest { topicMetadata.put(topic1Uuid, new TopicMetadata( topic1Uuid, topic1Name, - 3, - Collections.emptyMap() + 3 )); topicMetadata.put(topic2Uuid, new TopicMetadata( topic2Uuid, topic2Name, - 3, - Collections.emptyMap() + 3 )); Map members = new TreeMap<>(); @@ -835,20 +814,17 @@ public class RangeAssignorTest { topicMetadata.put(topic1Uuid, new TopicMetadata( topic1Uuid, topic1Name, - 3, - Collections.emptyMap() + 3 )); topicMetadata.put(topic2Uuid, new TopicMetadata( topic2Uuid, topic2Name, - 3, - Collections.emptyMap() + 3 )); topicMetadata.put(topic3Uuid, new TopicMetadata( topic3Uuid, topic3Name, - 2, - Collections.emptyMap() + 2 )); // Let initial subscriptions be A -> T1, T2 // B -> T2 // C -> T2, T3 diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignorTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignorTest.java index 0553db6a487..4d0fe8cdf7b 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignorTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignorTest.java @@ -88,8 +88,7 @@ public class SimpleAssignorTest { new TopicMetadata( TOPIC_1_UUID, TOPIC_1_NAME, - 3, - Collections.emptyMap() + 3 ) ) ); @@ -126,8 +125,7 @@ public class SimpleAssignorTest { new TopicMetadata( TOPIC_1_UUID, TOPIC_1_NAME, - 3, - Collections.emptyMap() + 3 ) ) ); @@ -158,14 +156,12 @@ public class SimpleAssignorTest { topicMetadata.put(TOPIC_1_UUID, new TopicMetadata( TOPIC_1_UUID, TOPIC_1_NAME, - 3, - Collections.emptyMap() + 3 )); topicMetadata.put(TOPIC_3_UUID, new TopicMetadata( TOPIC_3_UUID, TOPIC_3_NAME, - 2, - Collections.emptyMap() + 2 )); Map members = new TreeMap<>(); @@ -215,21 +211,18 @@ public class SimpleAssignorTest { topicMetadata.put(TOPIC_1_UUID, new TopicMetadata( TOPIC_1_UUID, TOPIC_1_NAME, - 3, - Collections.emptyMap() + 3 )); topicMetadata.put(TOPIC_2_UUID, new TopicMetadata( TOPIC_2_UUID, "topic2", - 3, - Collections.emptyMap() + 3 )); topicMetadata.put(TOPIC_3_UUID, new TopicMetadata( TOPIC_3_UUID, TOPIC_3_NAME, - 2, - Collections.emptyMap() + 2 )); Map members = new TreeMap<>(); @@ -289,15 +282,13 @@ public class SimpleAssignorTest { topicMetadata.put(TOPIC_1_UUID, new TopicMetadata( TOPIC_1_UUID, TOPIC_1_NAME, - 3, - Collections.emptyMap() + 3 )); topicMetadata.put(TOPIC_2_UUID, new TopicMetadata( TOPIC_2_UUID, "topic2", - 2, - Collections.emptyMap() + 2 )); Map members = new TreeMap<>(); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/UniformHeterogeneousAssignmentBuilderTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/UniformHeterogeneousAssignmentBuilderTest.java index 0ced6b2f63d..93be24076f1 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/UniformHeterogeneousAssignmentBuilderTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/UniformHeterogeneousAssignmentBuilderTest.java @@ -40,7 +40,6 @@ import static org.apache.kafka.coordinator.group.AssignmentTestUtil.assertAssign import static org.apache.kafka.coordinator.group.AssignmentTestUtil.invertedTargetAssignment; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; -import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpersTest.mkMapOfPartitionRacks; import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HETEROGENEOUS; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -67,8 +66,7 @@ public class UniformHeterogeneousAssignmentBuilderTest { new TopicMetadata( topic1Uuid, topic1Name, - 3, - mkMapOfPartitionRacks(3) + 3 ) ) ); @@ -109,8 +107,7 @@ public class UniformHeterogeneousAssignmentBuilderTest { new TopicMetadata( topic1Uuid, topic1Name, - 3, - mkMapOfPartitionRacks(3) + 3 ) ) ); @@ -146,14 +143,12 @@ public class UniformHeterogeneousAssignmentBuilderTest { topicMetadata.put(topic1Uuid, new TopicMetadata( topic1Uuid, topic1Name, - 3, - mkMapOfPartitionRacks(3) + 3 )); topicMetadata.put(topic3Uuid, new TopicMetadata( topic3Uuid, topic3Name, - 6, - mkMapOfPartitionRacks(6) + 6 )); Map members = new TreeMap<>(); @@ -202,14 +197,12 @@ public class UniformHeterogeneousAssignmentBuilderTest { topicMetadata.put(topic3Uuid, new TopicMetadata( topic3Uuid, topic3Name, - 1, - mkMapOfPartitionRacks(1) + 1 )); topicMetadata.put(topic1Uuid, new TopicMetadata( topic1Uuid, topic1Name, - 2, - mkMapOfPartitionRacks(2) + 2 )); Map members = new TreeMap<>(); @@ -268,20 +261,17 @@ public class UniformHeterogeneousAssignmentBuilderTest { topicMetadata.put(topic1Uuid, new TopicMetadata( topic1Uuid, topic1Name, - 6, - mkMapOfPartitionRacks(6) + 6 )); topicMetadata.put(topic2Uuid, new TopicMetadata( topic2Uuid, topic2Name, - 4, - mkMapOfPartitionRacks(4) + 4 )); topicMetadata.put(topic3Uuid, new TopicMetadata( topic3Uuid, topic3Name, - 4, - mkMapOfPartitionRacks(4) + 4 )); Map members = new TreeMap<>(); @@ -350,26 +340,22 @@ public class UniformHeterogeneousAssignmentBuilderTest { topicMetadata.put(topic1Uuid, new TopicMetadata( topic1Uuid, topic1Name, - 6, - mkMapOfPartitionRacks(6) + 6 )); topicMetadata.put(topic2Uuid, new TopicMetadata( topic2Uuid, topic2Name, - 5, - mkMapOfPartitionRacks(5) + 5 )); topicMetadata.put(topic3Uuid, new TopicMetadata( topic3Uuid, topic3Name, - 3, - mkMapOfPartitionRacks(3) + 3 )); topicMetadata.put(topic4Uuid, new TopicMetadata( topic4Uuid, topic4Name, - 3, - mkMapOfPartitionRacks(3) + 3 )); Map members = new TreeMap<>(); @@ -426,14 +412,12 @@ public class UniformHeterogeneousAssignmentBuilderTest { topicMetadata.put(topic1Uuid, new TopicMetadata( topic1Uuid, topic1Name, - 6, - mkMapOfPartitionRacks(6) + 6 )); topicMetadata.put(topic2Uuid, new TopicMetadata( topic2Uuid, topic2Name, - 7, - mkMapOfPartitionRacks(7) + 7 )); Map members = new TreeMap<>(); @@ -499,20 +483,17 @@ public class UniformHeterogeneousAssignmentBuilderTest { topicMetadata.put(topic1Uuid, new TopicMetadata( topic1Uuid, topic1Name, - 3, - mkMapOfPartitionRacks(3) + 3 )); topicMetadata.put(topic2Uuid, new TopicMetadata( topic2Uuid, topic2Name, - 8, - mkMapOfPartitionRacks(4) + 8 )); topicMetadata.put(topic3Uuid, new TopicMetadata( topic3Uuid, topic3Name, - 3, - mkMapOfPartitionRacks(3) + 3 )); Map members = new TreeMap<>(); @@ -568,14 +549,12 @@ public class UniformHeterogeneousAssignmentBuilderTest { topicMetadata.put(topic1Uuid, new TopicMetadata( topic1Uuid, topic1Name, - 3, - mkMapOfPartitionRacks(3) + 3 )); topicMetadata.put(topic2Uuid, new TopicMetadata( topic2Uuid, topic2Name, - 5, - mkMapOfPartitionRacks(5) + 5 )); // Initial subscriptions were [T1, T2] @@ -630,8 +609,7 @@ public class UniformHeterogeneousAssignmentBuilderTest { topicMetadata.put(topic1Uuid, new TopicMetadata( topic1Uuid, topic1Name, - 3, - mkMapOfPartitionRacks(3) + 3 )); Map members = new TreeMap<>(); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/SubscribedTopicMetadataTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/SubscribedTopicMetadataTest.java index c90aaf02bcb..0842cee241d 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/SubscribedTopicMetadataTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/SubscribedTopicMetadataTest.java @@ -21,12 +21,9 @@ import org.apache.kafka.common.Uuid; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import java.util.Collections; import java.util.HashMap; import java.util.Map; -import java.util.Set; -import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpersTest.mkMapOfPartitionRacks; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -42,10 +39,9 @@ public class SubscribedTopicMetadataTest { for (int i = 0; i < 5; i++) { Uuid topicId = Uuid.randomUuid(); String topicName = "topic" + i; - Map> partitionRacks = mkMapOfPartitionRacks(5); topicMetadataMap.put( topicId, - new TopicMetadata(topicId, topicName, 5, partitionRacks) + new TopicMetadata(topicId, topicName, 5) ); } subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadataMap); @@ -68,44 +64,19 @@ public class SubscribedTopicMetadataTest { // Test -1 is returned when the topic Id doesn't exist. assertEquals(-1, subscribedTopicMetadata.numPartitions(topicId)); - topicMetadataMap.put(topicId, new TopicMetadata(topicId, "topic6", 3, Collections.emptyMap())); + topicMetadataMap.put(topicId, new TopicMetadata(topicId, "topic6", 3)); // Test that the correct number of partitions are returned for a given topic Id. assertEquals(3, subscribedTopicMetadata.numPartitions(topicId)); } - @Test - public void testRacksForPartition() { - Uuid topicId = Uuid.randomUuid(); - - // Test that an empty set is returned for a non-existent topic Id. - assertEquals(Collections.emptySet(), subscribedTopicMetadata.racksForPartition(topicId, 0)); - - // Add topic Id with partition racks included. - Map> partitionRacks = mkMapOfPartitionRacks(3); - topicMetadataMap.put(topicId, new TopicMetadata(topicId, "topic6", 3, partitionRacks)); - - // Test that an empty set is returned for a non-existent partition Id. - assertEquals(Collections.emptySet(), subscribedTopicMetadata.racksForPartition(topicId, 4)); - - // Test that a correct set of racks is returned for the given topic Id and partition Id. - assertEquals(partitionRacks.get(2), subscribedTopicMetadata.racksForPartition(topicId, 2)); - - // Add another topic Id without partition racks. - topicId = Uuid.randomUuid(); - topicMetadataMap.put(topicId, new TopicMetadata(topicId, "topic6", 3, Collections.emptyMap())); - - // Test that an empty set is returned when the partition rack info is absent. - assertEquals(Collections.emptySet(), subscribedTopicMetadata.racksForPartition(topicId, 1)); - } - @Test public void testEquals() { assertEquals(new SubscribedTopicDescriberImpl(topicMetadataMap), subscribedTopicMetadata); Map topicMetadataMap2 = new HashMap<>(); Uuid topicId = Uuid.randomUuid(); - topicMetadataMap2.put(topicId, new TopicMetadata(topicId, "newTopic", 5, Collections.emptyMap())); + topicMetadataMap2.put(topicId, new TopicMetadata(topicId, "newTopic", 5)); assertNotEquals(new SubscribedTopicDescriberImpl(topicMetadataMap2), subscribedTopicMetadata); } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilderTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilderTest.java index b4f25e00f2e..093145650b9 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilderTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilderTest.java @@ -43,7 +43,6 @@ import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord; import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord; -import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpersTest.mkMapOfPartitionRacks; import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS; import static org.apache.kafka.coordinator.group.modern.TargetAssignmentBuilder.createMemberSubscriptionAndAssignment; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -102,15 +101,13 @@ public class TargetAssignmentBuilderTest { public Uuid addTopicMetadata( String topicName, - int numPartitions, - Map> partitionRacks + int numPartitions ) { Uuid topicId = Uuid.randomUuid(); subscriptionMetadata.put(topicName, new TopicMetadata( topicId, topicName, - numPartitions, - partitionRacks + numPartitions )); topicsImageBuilder = topicsImageBuilder.addTopic(topicId, topicName, numPartitions); @@ -314,8 +311,8 @@ public class TargetAssignmentBuilderTest { 20 ); - Uuid fooTopicId = context.addTopicMetadata("foo", 6, Collections.emptyMap()); - Uuid barTopicId = context.addTopicMetadata("bar", 6, Collections.emptyMap()); + Uuid fooTopicId = context.addTopicMetadata("foo", 6); + Uuid barTopicId = context.addTopicMetadata("bar", 6); context.addGroupMember("member-1", Arrays.asList("foo", "bar", "zar"), mkAssignment( mkTopicAssignment(fooTopicId, 1, 2, 3), @@ -364,8 +361,8 @@ public class TargetAssignmentBuilderTest { 20 ); - Uuid fooTopicId = context.addTopicMetadata("foo", 6, Collections.emptyMap()); - Uuid barTopicId = context.addTopicMetadata("bar", 6, Collections.emptyMap()); + Uuid fooTopicId = context.addTopicMetadata("foo", 6); + Uuid barTopicId = context.addTopicMetadata("bar", 6); context.addGroupMember("member-1", Arrays.asList("foo", "bar", "zar"), mkAssignment( mkTopicAssignment(fooTopicId, 1, 2, 3), @@ -427,8 +424,8 @@ public class TargetAssignmentBuilderTest { 20 ); - Uuid fooTopicId = context.addTopicMetadata("foo", 6, Collections.emptyMap()); - Uuid barTopicId = context.addTopicMetadata("bar", 6, Collections.emptyMap()); + Uuid fooTopicId = context.addTopicMetadata("foo", 6); + Uuid barTopicId = context.addTopicMetadata("bar", 6); context.addGroupMember("member-1", Arrays.asList("foo", "bar", "zar"), mkAssignment( mkTopicAssignment(fooTopicId, 1, 2, 3), @@ -505,8 +502,8 @@ public class TargetAssignmentBuilderTest { 20 ); - Uuid fooTopicId = context.addTopicMetadata("foo", 6, Collections.emptyMap()); - Uuid barTopicId = context.addTopicMetadata("bar", 6, Collections.emptyMap()); + Uuid fooTopicId = context.addTopicMetadata("foo", 6); + Uuid barTopicId = context.addTopicMetadata("bar", 6); context.addGroupMember("member-1", Arrays.asList("foo", "bar", "zar"), mkAssignment( mkTopicAssignment(fooTopicId, 1, 2, 3), @@ -592,8 +589,8 @@ public class TargetAssignmentBuilderTest { 20 ); - Uuid fooTopicId = context.addTopicMetadata("foo", 6, mkMapOfPartitionRacks(6)); - Uuid barTopicId = context.addTopicMetadata("bar", 6, mkMapOfPartitionRacks(6)); + Uuid fooTopicId = context.addTopicMetadata("foo", 6); + Uuid barTopicId = context.addTopicMetadata("bar", 6); context.addGroupMember("member-1", Arrays.asList("foo", "bar", "zar"), mkAssignment( mkTopicAssignment(fooTopicId, 1, 2), @@ -670,8 +667,8 @@ public class TargetAssignmentBuilderTest { 20 ); - Uuid fooTopicId = context.addTopicMetadata("foo", 6, Collections.emptyMap()); - Uuid barTopicId = context.addTopicMetadata("bar", 6, Collections.emptyMap()); + Uuid fooTopicId = context.addTopicMetadata("foo", 6); + Uuid barTopicId = context.addTopicMetadata("bar", 6); context.addGroupMember("member-1", Arrays.asList("foo", "bar", "zar"), mkAssignment( mkTopicAssignment(fooTopicId, 1, 2), @@ -740,8 +737,8 @@ public class TargetAssignmentBuilderTest { 20 ); - Uuid fooTopicId = context.addTopicMetadata("foo", 6, Collections.emptyMap()); - Uuid barTopicId = context.addTopicMetadata("bar", 6, Collections.emptyMap()); + Uuid fooTopicId = context.addTopicMetadata("foo", 6); + Uuid barTopicId = context.addTopicMetadata("bar", 6); context.addGroupMember("member-1", "instance-member-1", Arrays.asList("foo", "bar", "zar"), mkAssignment( mkTopicAssignment(fooTopicId, 1, 2), diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TopicMetadataTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TopicMetadataTest.java index fbec17e1d5c..00894138957 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TopicMetadataTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TopicMetadataTest.java @@ -21,11 +21,9 @@ import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetada import org.junit.jupiter.api.Test; -import java.util.Collections; import java.util.Map; import java.util.Set; -import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpersTest.mkListOfPartitionRacks; import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpersTest.mkMapOfPartitionRacks; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; @@ -35,29 +33,27 @@ public class TopicMetadataTest { @Test public void testAttributes() { Uuid topicId = Uuid.randomUuid(); - Map> partitionRacks = mkMapOfPartitionRacks(15); - TopicMetadata topicMetadata = new TopicMetadata(topicId, "foo", 15, partitionRacks); + TopicMetadata topicMetadata = new TopicMetadata(topicId, "foo", 15); assertEquals(topicId, topicMetadata.id()); assertEquals("foo", topicMetadata.name()); assertEquals(15, topicMetadata.numPartitions()); - assertEquals(partitionRacks, topicMetadata.partitionRacks()); } @Test public void testTopicIdAndNameCannotBeNull() { - assertThrows(NullPointerException.class, () -> new TopicMetadata(Uuid.randomUuid(), null, 15, Collections.emptyMap())); - assertThrows(NullPointerException.class, () -> new TopicMetadata(null, "foo", 15, Collections.emptyMap())); + assertThrows(NullPointerException.class, () -> new TopicMetadata(Uuid.randomUuid(), null, 15)); + assertThrows(NullPointerException.class, () -> new TopicMetadata(null, "foo", 15)); } @Test public void testEquals() { Uuid topicId = Uuid.randomUuid(); Map> partitionRacks = mkMapOfPartitionRacks(15); - TopicMetadata topicMetadata = new TopicMetadata(topicId, "foo", 15, partitionRacks); + TopicMetadata topicMetadata = new TopicMetadata(topicId, "foo", 15); - assertEquals(new TopicMetadata(topicId, "foo", 15, partitionRacks), topicMetadata); - assertNotEquals(new TopicMetadata(topicId, "foo", 5, mkMapOfPartitionRacks(5)), topicMetadata); + assertEquals(new TopicMetadata(topicId, "foo", 15), topicMetadata); + assertNotEquals(new TopicMetadata(topicId, "foo", 5), topicMetadata); } @Test @@ -68,11 +64,10 @@ public class TopicMetadataTest { ConsumerGroupPartitionMetadataValue.TopicMetadata record = new ConsumerGroupPartitionMetadataValue.TopicMetadata() .setTopicId(topicId) .setTopicName(topicName) - .setNumPartitions(15) - .setPartitionMetadata(mkListOfPartitionRacks(15)); + .setNumPartitions(15); assertEquals( - new TopicMetadata(topicId, topicName, 15, mkMapOfPartitionRacks(15)), + new TopicMetadata(topicId, topicName, 15), TopicMetadata.fromRecord(record) ); } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupBuilder.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupBuilder.java index fa1e67f28c0..4c044323d06 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupBuilder.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupBuilder.java @@ -25,7 +25,6 @@ import org.apache.kafka.image.TopicImage; import org.apache.kafka.image.TopicsImage; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -83,8 +82,7 @@ public class ConsumerGroupBuilder { subscriptionMetadata.put(topicName, new TopicMetadata( topicImage.id(), topicImage.name(), - topicImage.partitions().size(), - Collections.emptyMap() + topicImage.partitions().size() )); } }) diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java index 15d5e71bc37..d6eee7bbd9b 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java @@ -58,7 +58,6 @@ import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; -import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpersTest.mkMapOfPartitionRacks; import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HETEROGENEOUS; import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; @@ -655,7 +654,7 @@ public class ConsumerGroupTest { // Compute while taking into account member 1. assertEquals( mkMap( - mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1))) + mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)) ), consumerGroup.computeSubscriptionMetadata( consumerGroup.computeSubscribedTopicNames(null, member1), @@ -670,7 +669,7 @@ public class ConsumerGroupTest { // It should return foo now. assertEquals( mkMap( - mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1))) + mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)) ), consumerGroup.computeSubscriptionMetadata( consumerGroup.computeSubscribedTopicNames(null, null), @@ -692,8 +691,8 @@ public class ConsumerGroupTest { // Compute while taking into account member 2. assertEquals( mkMap( - mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1))), - mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, mkMapOfPartitionRacks(2))) + mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)), + mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2)) ), consumerGroup.computeSubscriptionMetadata( consumerGroup.computeSubscribedTopicNames(null, member2), @@ -708,8 +707,8 @@ public class ConsumerGroupTest { // It should return foo and bar. assertEquals( mkMap( - mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1))), - mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, mkMapOfPartitionRacks(2))) + mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)), + mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2)) ), consumerGroup.computeSubscriptionMetadata( consumerGroup.computeSubscribedTopicNames(null, null), @@ -721,7 +720,7 @@ public class ConsumerGroupTest { // Compute while taking into account removal of member 2. assertEquals( mkMap( - mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1))) + mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)) ), consumerGroup.computeSubscriptionMetadata( consumerGroup.computeSubscribedTopicNames(member2, null), @@ -733,7 +732,7 @@ public class ConsumerGroupTest { // Removing member1 results in returning bar. assertEquals( mkMap( - mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, mkMapOfPartitionRacks(2))) + mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2)) ), consumerGroup.computeSubscriptionMetadata( consumerGroup.computeSubscribedTopicNames(member1, null), @@ -745,9 +744,9 @@ public class ConsumerGroupTest { // Compute while taking into account member 3. assertEquals( mkMap( - mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1))), - mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, mkMapOfPartitionRacks(2))), - mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3, mkMapOfPartitionRacks(3))) + mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)), + mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2)), + mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3)) ), consumerGroup.computeSubscriptionMetadata( consumerGroup.computeSubscribedTopicNames(null, member3), @@ -762,9 +761,9 @@ public class ConsumerGroupTest { // It should return foo, bar and zar. assertEquals( mkMap( - mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1))), - mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, mkMapOfPartitionRacks(2))), - mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3, mkMapOfPartitionRacks(3))) + mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)), + mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2)), + mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3)) ), consumerGroup.computeSubscriptionMetadata( consumerGroup.computeSubscribedTopicNames(null, null), @@ -786,7 +785,7 @@ public class ConsumerGroupTest { // Compute while taking into account removal of member 2 and member 3. assertEquals( mkMap( - mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1))) + mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)) ), consumerGroup.computeSubscriptionMetadata( consumerGroup.computeSubscribedTopicNames(new HashSet<>(Arrays.asList(member2, member3))), @@ -798,8 +797,8 @@ public class ConsumerGroupTest { // Compute while taking into account removal of member 1. assertEquals( mkMap( - mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, mkMapOfPartitionRacks(2))), - mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3, mkMapOfPartitionRacks(3))) + mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2)), + mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3)) ), consumerGroup.computeSubscriptionMetadata( consumerGroup.computeSubscribedTopicNames(Collections.singleton(member1)), @@ -811,9 +810,9 @@ public class ConsumerGroupTest { // It should return foo, bar and zar. assertEquals( mkMap( - mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1))), - mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, mkMapOfPartitionRacks(2))), - mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3, mkMapOfPartitionRacks(3))) + mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)), + mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2)), + mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3)) ), consumerGroup.computeSubscriptionMetadata( consumerGroup.computeSubscribedTopicNames(Collections.emptySet()), @@ -1220,8 +1219,8 @@ public class ConsumerGroupTest { assertEquals( mkMap( - mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1))), - mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, mkMapOfPartitionRacks(2))) + mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)), + mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2)) ), consumerGroup.computeSubscriptionMetadata( consumerGroup.computeSubscribedTopicNames(null, null), diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupBuilder.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupBuilder.java index c8fe2fcf538..2ae29961434 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupBuilder.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupBuilder.java @@ -25,7 +25,6 @@ import org.apache.kafka.image.TopicImage; import org.apache.kafka.image.TopicsImage; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -83,8 +82,7 @@ public class ShareGroupBuilder { subscriptionMetadata.put(topicName, new TopicMetadata( topicImage.id(), topicImage.name(), - topicImage.partitions().size(), - Collections.emptyMap() + topicImage.partitions().size() )); } }) diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupTest.java index e53a9dac910..8dc7167d898 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupTest.java @@ -43,7 +43,6 @@ import java.util.HashSet; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; -import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpersTest.mkMapOfPartitionRacks; import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HETEROGENEOUS; import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; @@ -181,7 +180,7 @@ public class ShareGroupTest { // Compute while taking into account member 1. assertEquals( mkMap( - mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1))) + mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)) ), shareGroup.computeSubscriptionMetadata( shareGroup.computeSubscribedTopicNames(null, member1), @@ -196,7 +195,7 @@ public class ShareGroupTest { // It should return foo now. assertEquals( mkMap( - mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1))) + mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)) ), shareGroup.computeSubscriptionMetadata( shareGroup.computeSubscribedTopicNames(null, null), @@ -218,8 +217,8 @@ public class ShareGroupTest { // Compute while taking into account member 2. assertEquals( mkMap( - mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1))), - mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, mkMapOfPartitionRacks(2))) + mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)), + mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2)) ), shareGroup.computeSubscriptionMetadata( shareGroup.computeSubscribedTopicNames(null, member2), @@ -234,8 +233,8 @@ public class ShareGroupTest { // It should return foo and bar. assertEquals( mkMap( - mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1))), - mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, mkMapOfPartitionRacks(2))) + mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)), + mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2)) ), shareGroup.computeSubscriptionMetadata( shareGroup.computeSubscribedTopicNames(null, null), @@ -247,7 +246,7 @@ public class ShareGroupTest { // Compute while taking into account removal of member 2. assertEquals( mkMap( - mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1))) + mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)) ), shareGroup.computeSubscriptionMetadata( shareGroup.computeSubscribedTopicNames(member2, null), @@ -259,7 +258,7 @@ public class ShareGroupTest { // Removing member1 results in returning bar. assertEquals( mkMap( - mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, mkMapOfPartitionRacks(2))) + mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2)) ), shareGroup.computeSubscriptionMetadata( shareGroup.computeSubscribedTopicNames(member1, null), @@ -271,9 +270,9 @@ public class ShareGroupTest { // Compute while taking into account member 3. assertEquals( mkMap( - mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1))), - mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, mkMapOfPartitionRacks(2))), - mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3, mkMapOfPartitionRacks(3))) + mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)), + mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2)), + mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3)) ), shareGroup.computeSubscriptionMetadata( shareGroup.computeSubscribedTopicNames(null, member3), @@ -288,9 +287,9 @@ public class ShareGroupTest { // It should return foo, bar and zar. assertEquals( mkMap( - mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1))), - mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, mkMapOfPartitionRacks(2))), - mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3, mkMapOfPartitionRacks(3))) + mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)), + mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2)), + mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3)) ), shareGroup.computeSubscriptionMetadata( shareGroup.computeSubscribedTopicNames(null, null), @@ -312,7 +311,7 @@ public class ShareGroupTest { // Compute while taking into account removal of member 2 and member 3. assertEquals( mkMap( - mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1))) + mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)) ), shareGroup.computeSubscriptionMetadata( shareGroup.computeSubscribedTopicNames(new HashSet<>(Arrays.asList(member2, member3))), @@ -324,8 +323,8 @@ public class ShareGroupTest { // Compute while taking into account removal of member 1. assertEquals( mkMap( - mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, mkMapOfPartitionRacks(2))), - mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3, mkMapOfPartitionRacks(3))) + mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2)), + mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3)) ), shareGroup.computeSubscriptionMetadata( shareGroup.computeSubscribedTopicNames(Collections.singleton(member1)), @@ -337,9 +336,9 @@ public class ShareGroupTest { // It should return foo, bar and zar. assertEquals( mkMap( - mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1))), - mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, mkMapOfPartitionRacks(2))), - mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3, mkMapOfPartitionRacks(3))) + mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)), + mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2)), + mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3)) ), shareGroup.computeSubscriptionMetadata( shareGroup.computeSubscribedTopicNames(Collections.emptySet()), @@ -644,8 +643,8 @@ public class ShareGroupTest { assertEquals( mkMap( - mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1))), - mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, mkMapOfPartitionRacks(2))) + mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)), + mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2)) ), shareGroup.computeSubscriptionMetadata( shareGroup.computeSubscribedTopicNames(null, null), diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java index b9ce9bab9af..e3dceb19c2a 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java @@ -93,14 +93,11 @@ public class AssignorBenchmarkUtils { * * @param topicNames The names of the topics. * @param partitionsPerTopic The number of partitions per topic. - * @param getTopicPartitionRacks A function to get the racks map for each topic. May return - * an empty map if no rack info is desired. * @return The subscription metadata map. */ public static Map createSubscriptionMetadata( List topicNames, - int partitionsPerTopic, - Function>> getTopicPartitionRacks + int partitionsPerTopic ) { Map subscriptionMetadata = new HashMap<>(); @@ -110,8 +107,7 @@ public class AssignorBenchmarkUtils { TopicMetadata metadata = new TopicMetadata( topicId, topicName, - partitionsPerTopic, - getTopicPartitionRacks.apply(topicName) + partitionsPerTopic ); subscriptionMetadata.put(topicName, metadata); } diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java index 82f11350381..a22673c3f59 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java @@ -148,13 +148,9 @@ public class ServerSideAssignorBenchmark { allTopicNames = AssignorBenchmarkUtils.createTopicNames(topicCount); int partitionsPerTopic = (memberCount * partitionsToMemberRatio) / topicCount; - Map> partitionRacks = isRackAware ? - mkMapOfPartitionRacks(partitionsPerTopic) : - Collections.emptyMap(); subscriptionMetadata = AssignorBenchmarkUtils.createSubscriptionMetadata( allTopicNames, - partitionsPerTopic, - topicName -> partitionRacks + partitionsPerTopic ); topicsImage = AssignorBenchmarkUtils.createTopicsImage(subscriptionMetadata); diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java index adf40671ebc..c9ddb54363b 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java @@ -122,8 +122,7 @@ public class TargetAssignmentBuilderBenchmark { int partitionsPerTopic = (memberCount * partitionsToMemberRatio) / topicCount; subscriptionMetadata = AssignorBenchmarkUtils.createSubscriptionMetadata( allTopicNames, - partitionsPerTopic, - topicName -> Collections.emptyMap() + partitionsPerTopic ); topicsImage = AssignorBenchmarkUtils.createTopicsImage(subscriptionMetadata);