diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilder.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilder.java index 930cdfb2e7c..ba08a236ba6 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilder.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilder.java @@ -327,13 +327,14 @@ public class TargetAssignmentBuilder { */ public TargetAssignmentResult build() throws PartitionAssignorException { Map memberSpecs = new HashMap<>(); + TopicIds.TopicResolver topicResolver = new TopicIds.CachedTopicResolver(topicsImage); // Prepare the member spec for all members. members.forEach((memberId, member) -> memberSpecs.put(memberId, createMemberSubscriptionAndAssignment( member, targetAssignment.getOrDefault(memberId, Assignment.EMPTY), - topicsImage + topicResolver )) ); @@ -355,7 +356,7 @@ public class TargetAssignmentBuilder { memberSpecs.put(memberId, createMemberSubscriptionAndAssignment( updatedMemberOrNull, assignment, - topicsImage + topicResolver )); } }); @@ -420,12 +421,12 @@ public class TargetAssignmentBuilder { static MemberSubscriptionAndAssignmentImpl createMemberSubscriptionAndAssignment( T member, Assignment memberAssignment, - TopicsImage topicsImage + TopicIds.TopicResolver topicResolver ) { return new MemberSubscriptionAndAssignmentImpl( Optional.ofNullable(member.rackId()), Optional.ofNullable(member.instanceId()), - new TopicIds(member.subscribedTopicNames(), topicsImage), + new TopicIds(member.subscribedTopicNames(), topicResolver), memberAssignment ); } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TopicIds.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TopicIds.java index baf4e8ce778..f45735a527c 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TopicIds.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TopicIds.java @@ -21,25 +21,161 @@ import org.apache.kafka.image.TopicImage; import org.apache.kafka.image.TopicsImage; import java.util.Collection; +import java.util.HashMap; import java.util.Iterator; +import java.util.Map; import java.util.NoSuchElementException; import java.util.Objects; import java.util.Set; /** * TopicIds is initialized with topic names (String) but exposes a Set of topic ids (Uuid) to the - * user and performs the conversion lazily with TopicsImage. + * user and performs the conversion lazily with a TopicResolver backed by a TopicsImage. */ public class TopicIds implements Set { + /** + * Converts between topic ids (Uuids) and topic names (Strings). + */ + public interface TopicResolver { + /** + * @return The TopicsImage used by the resolver. + */ + TopicsImage image(); + + /** + * Converts a topic id to a topic name. + * + * @param id The topic id. + * @return The topic name for the given topic id, or null if the topic does not exist. + */ + String name(Uuid id); + + /** + * Converts a topic name to a topic id. + * + * @param name The topic name. + * @return The topic id for the given topic name, or null if the topic does not exist. + */ + Uuid id(String name); + + /** + * Clears any cached data. + * + * Used for benchmarking purposes. + */ + void clear(); + } + + /** + * A TopicResolver without any caching. + */ + public static class DefaultTopicResolver implements TopicResolver { + private final TopicsImage image; + + public DefaultTopicResolver( + TopicsImage image + ) { + this.image = Objects.requireNonNull(image); + } + + @Override + public final TopicsImage image() { + return image; + } + + @Override + public String name(Uuid id) { + TopicImage topic = image.getTopic(id); + if (topic == null) return null; + return topic.name(); + } + + @Override + public Uuid id(String name) { + TopicImage topic = image.getTopic(name); + if (topic == null) return null; + return topic.id(); + } + + @Override + public void clear() {} + + @Override + public String toString() { + return "DefaultTopicResolver(image=" + image + ")"; + } + } + + /** + * A TopicResolver that caches results. + * + * This cache is expected to be short-lived and only used within a single + * TargetAssignmentBuilder.build() call. + */ + public static class CachedTopicResolver implements TopicResolver { + private final TopicsImage image; + + private final Map topicIds = new HashMap<>(); + private final Map topicNames = new HashMap<>(); + + public CachedTopicResolver( + TopicsImage image + ) { + this.image = Objects.requireNonNull(image); + } + + @Override + public final TopicsImage image() { + return image; + } + + @Override + public String name(Uuid id) { + return topicNames.computeIfAbsent(id, __ -> { + TopicImage topic = image.getTopic(id); + if (topic == null) return null; + return topic.name(); + }); + } + + @Override + public Uuid id(String name) { + return topicIds.computeIfAbsent(name, __ -> { + TopicImage topic = image.getTopic(name); + if (topic == null) return null; + return topic.id(); + }); + } + + @Override + public void clear() { + this.topicNames.clear(); + this.topicIds.clear(); + } + + @Override + public String toString() { + return "CachedTopicResolver(image=" + image + ")"; + } + } + private final Set topicNames; - private final TopicsImage image; + private final TopicResolver resolver; public TopicIds( Set topicNames, TopicsImage image ) { this.topicNames = Objects.requireNonNull(topicNames); - this.image = Objects.requireNonNull(image); + this.resolver = new DefaultTopicResolver(image); + } + + public TopicIds( + Set topicNames, + TopicResolver resolver + ) { + this.topicNames = Objects.requireNonNull(topicNames); + this.resolver = Objects.requireNonNull(resolver); } @Override @@ -56,24 +192,24 @@ public class TopicIds implements Set { public boolean contains(Object o) { if (o instanceof Uuid) { Uuid topicId = (Uuid) o; - TopicImage topicImage = image.getTopic(topicId); - if (topicImage == null) return false; - return topicNames.contains(topicImage.name()); + String topicName = resolver.name(topicId); + if (topicName == null) return false; + return topicNames.contains(topicName); } return false; } private static class TopicIdIterator implements Iterator { final Iterator iterator; - final TopicsImage image; + final TopicResolver resolver; private Uuid next = null; private TopicIdIterator( Iterator iterator, - TopicsImage image + TopicResolver resolver ) { this.iterator = Objects.requireNonNull(iterator); - this.image = Objects.requireNonNull(image); + this.resolver = Objects.requireNonNull(resolver); } @Override @@ -85,9 +221,9 @@ public class TopicIds implements Set { return false; } String next = iterator.next(); - TopicImage topicImage = image.getTopic(next); - if (topicImage != null) { - result = topicImage.id(); + Uuid topicId = resolver.id(next); + if (topicId != null) { + result = topicId; } } while (result == null); next = result; @@ -105,7 +241,7 @@ public class TopicIds implements Set { @Override public Iterator iterator() { - return new TopicIdIterator(topicNames.iterator(), image); + return new TopicIdIterator(topicNames.iterator(), resolver); } @Override @@ -164,20 +300,20 @@ public class TopicIds implements Set { TopicIds uuids = (TopicIds) o; if (!Objects.equals(topicNames, uuids.topicNames)) return false; - return Objects.equals(image, uuids.image); + return Objects.equals(resolver.image(), uuids.resolver.image()); } @Override public int hashCode() { int result = topicNames.hashCode(); - result = 31 * result + image.hashCode(); + result = 31 * result + resolver.image().hashCode(); return result; } @Override public String toString() { return "TopicIds(topicNames=" + topicNames + - ", image=" + image + + ", resolver=" + resolver + ')'; } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilderTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilderTest.java index 093145650b9..e3445663df8 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilderTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilderTest.java @@ -161,6 +161,7 @@ public class TargetAssignmentBuilderTest { public TargetAssignmentBuilder.TargetAssignmentResult build() { TopicsImage topicsImage = topicsImageBuilder.build().topics(); + TopicIds.TopicResolver topicResolver = new TopicIds.CachedTopicResolver(topicsImage); // Prepare expected member specs. Map memberSubscriptions = new HashMap<>(); @@ -169,7 +170,7 @@ public class TargetAssignmentBuilderTest { memberSubscriptions.put(memberId, createMemberSubscriptionAndAssignment( member, targetAssignment.getOrDefault(memberId, Assignment.EMPTY), - topicsImage + topicResolver )) ); @@ -192,7 +193,7 @@ public class TargetAssignmentBuilderTest { memberSubscriptions.put(memberId, createMemberSubscriptionAndAssignment( updatedMemberOrNull, assignment, - topicsImage + topicResolver )); } }); @@ -263,6 +264,7 @@ public class TargetAssignmentBuilderTest { .addTopic(barTopicId, "bar", 5) .build() .topics(); + TopicIds.TopicResolver topicResolver = new TopicIds.DefaultTopicResolver(topicsImage); ConsumerGroupMember member = new ConsumerGroupMember.Builder("member-id") .setSubscribedTopicNames(Arrays.asList("foo", "bar", "zar")) @@ -278,7 +280,7 @@ public class TargetAssignmentBuilderTest { MemberSubscription subscriptionSpec = createMemberSubscriptionAndAssignment( member, assignment, - topicsImage + topicResolver ); assertEquals(new MemberSubscriptionAndAssignmentImpl( diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TopicIdsTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TopicIdsTest.java index 876131fa51c..0706624a97a 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TopicIdsTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TopicIdsTest.java @@ -41,7 +41,12 @@ public class TopicIdsTest { @Test public void testTopicsImageCannotBeNull() { - assertThrows(NullPointerException.class, () -> new TopicIds(Collections.emptySet(), null)); + assertThrows(NullPointerException.class, () -> new TopicIds(Collections.emptySet(), (TopicsImage) null)); + } + + @Test + public void testTopicResolverCannotBeNull() { + assertThrows(NullPointerException.class, () -> new TopicIds(Collections.emptySet(), (TopicIds.TopicResolver) null)); } @Test diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java index e3dceb19c2a..9402728e1eb 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java @@ -159,13 +159,13 @@ public class AssignorBenchmarkUtils { * * @param members The ConsumerGroupMembers. * @param subscriptionType  The group's subscription type. - * @param topicsImage The TopicsImage to use. + * @param topicResolver The TopicResolver to use. * @return The new GroupSpec. */ public static GroupSpec createGroupSpec( Map members, SubscriptionType subscriptionType, - TopicsImage topicsImage + TopicIds.TopicResolver topicResolver ) { Map memberSpecs = new HashMap<>(); @@ -177,7 +177,7 @@ public class AssignorBenchmarkUtils { memberSpecs.put(memberId, new MemberSubscriptionAndAssignmentImpl( Optional.ofNullable(member.rackId()), Optional.ofNullable(member.instanceId()), - new TopicIds(member.subscribedTopicNames(), topicsImage), + new TopicIds(member.subscribedTopicNames(), topicResolver), new Assignment(member.assignedPartitions()) )); } diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java index 1ad30e320b5..971828c1c47 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java @@ -129,6 +129,8 @@ public class ServerSideAssignorBenchmark { private TopicsImage topicsImage = TopicsImage.EMPTY; + private TopicIds.TopicResolver topicResolver; + private SubscribedTopicDescriber subscribedTopicDescriber; @Setup(Level.Trial) @@ -138,7 +140,7 @@ public class ServerSideAssignorBenchmark { setupTopics(); Map members = createMembers(); - this.groupSpec = AssignorBenchmarkUtils.createGroupSpec(members, subscriptionType, topicsImage); + this.groupSpec = AssignorBenchmarkUtils.createGroupSpec(members, subscriptionType, topicResolver); if (assignmentType == AssignmentType.INCREMENTAL) { simulateIncrementalRebalance(); @@ -155,6 +157,7 @@ public class ServerSideAssignorBenchmark { ); topicsImage = AssignorBenchmarkUtils.createTopicsImage(subscriptionMetadata); + topicResolver = new TopicIds.CachedTopicResolver(topicsImage); Map topicMetadata = AssignorBenchmarkUtils.createTopicMetadata(subscriptionMetadata); subscribedTopicDescriber = new SubscribedTopicDescriberImpl(topicMetadata); @@ -229,7 +232,7 @@ public class ServerSideAssignorBenchmark { if (subscriptionType == HETEROGENEOUS) { subscribedTopicIdsForNewMember = updatedMemberSpec.get(memberId(memberCount - 2)).subscribedTopicIds(); } else { - subscribedTopicIdsForNewMember = new TopicIds(new HashSet<>(allTopicNames), topicsImage); + subscribedTopicIdsForNewMember = new TopicIds(new HashSet<>(allTopicNames), topicResolver); } Optional rackId = rackId(memberCount - 1); @@ -251,6 +254,7 @@ public class ServerSideAssignorBenchmark { @Threads(1) @OutputTimeUnit(TimeUnit.MILLISECONDS) public void doAssignment() { + topicResolver.clear(); partitionAssignor.assign(groupSpec, subscribedTopicDescriber); } } diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java index a06e9b0e527..2a23d22b655 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java @@ -27,6 +27,7 @@ import org.apache.kafka.coordinator.group.assignor.UniformAssignor; import org.apache.kafka.coordinator.group.modern.Assignment; import org.apache.kafka.coordinator.group.modern.SubscribedTopicDescriberImpl; import org.apache.kafka.coordinator.group.modern.TargetAssignmentBuilder; +import org.apache.kafka.coordinator.group.modern.TopicIds; import org.apache.kafka.coordinator.group.modern.TopicMetadata; import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember; import org.apache.kafka.image.TopicsImage; @@ -96,6 +97,8 @@ public class TargetAssignmentBuilderBenchmark { private TopicsImage topicsImage; + private TopicIds.TopicResolver topicResolver; + private SubscribedTopicDescriber subscribedTopicDescriber; @Setup(Level.Trial) @@ -133,6 +136,7 @@ public class TargetAssignmentBuilderBenchmark { ); topicsImage = AssignorBenchmarkUtils.createTopicsImage(subscriptionMetadata); + topicResolver = new TopicIds.CachedTopicResolver(topicsImage); Map topicMetadata = AssignorBenchmarkUtils.createTopicMetadata(subscriptionMetadata); subscribedTopicDescriber = new SubscribedTopicDescriberImpl(topicMetadata); @@ -144,7 +148,7 @@ public class TargetAssignmentBuilderBenchmark { this.groupSpec = AssignorBenchmarkUtils.createGroupSpec( members, subscriptionType, - topicsImage + topicResolver ); GroupAssignment groupAssignment = partitionAssignor.assign(