MINOR: Cache topic resolution in TopicIds set (#17285)

Looking up topics in a TopicsImage is relatively slow. Cache the results
in TopicIds to improve assignor performance. In benchmarks, we see a
noticeable improvement in performance in the heterogeneous case.

Before
```
Benchmark                                       (assignmentType)  (assignorType)  (isRackAware)  (memberCount)  (partitionsToMemberRatio)  (subscriptionType)  (topicCount)  Mode  Cnt    Score   Error  Units
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false          10000                         10         HOMOGENEOUS          1000  avgt    5   36.400 ± 3.004  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false          10000                         10       HETEROGENEOUS          1000  avgt    5  158.340 ± 0.825  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL         UNIFORM          false          10000                         10         HOMOGENEOUS          1000  avgt    5    1.329 ± 0.041  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL         UNIFORM          false          10000                         10       HETEROGENEOUS          1000  avgt    5  382.901 ± 6.203  ms/op
```

After
```
Benchmark                                       (assignmentType)  (assignorType)  (isRackAware)  (memberCount)  (partitionsToMemberRatio)  (subscriptionType)  (topicCount)  Mode  Cnt    Score   Error  Units
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false          10000                         10         HOMOGENEOUS          1000  avgt    5   36.465 ± 1.954  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL           RANGE          false          10000                         10       HETEROGENEOUS          1000  avgt    5  114.043 ± 1.424  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL         UNIFORM          false          10000                         10         HOMOGENEOUS          1000  avgt    5    1.454 ± 0.019  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL         UNIFORM          false          10000                         10       HETEROGENEOUS          1000  avgt    5  342.840 ± 2.744  ms/op
```

---

Based heavily on https://github.com/apache/kafka/pull/16527.

Reviewers: David Arthur <mumrah@gmail.com>, David Jacot <djacot@confluent.io>
This commit is contained in:
Sean Quah 2024-10-03 08:40:25 +01:00 committed by GitHub
parent 696e33ee6d
commit 99e1d8fbb3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 182 additions and 30 deletions

View File

@ -327,13 +327,14 @@ public class TargetAssignmentBuilder<T extends ModernGroupMember> {
*/
public TargetAssignmentResult build() throws PartitionAssignorException {
Map<String, MemberSubscriptionAndAssignmentImpl> memberSpecs = new HashMap<>();
TopicIds.TopicResolver topicResolver = new TopicIds.CachedTopicResolver(topicsImage);
// Prepare the member spec for all members.
members.forEach((memberId, member) ->
memberSpecs.put(memberId, createMemberSubscriptionAndAssignment(
member,
targetAssignment.getOrDefault(memberId, Assignment.EMPTY),
topicsImage
topicResolver
))
);
@ -355,7 +356,7 @@ public class TargetAssignmentBuilder<T extends ModernGroupMember> {
memberSpecs.put(memberId, createMemberSubscriptionAndAssignment(
updatedMemberOrNull,
assignment,
topicsImage
topicResolver
));
}
});
@ -420,12 +421,12 @@ public class TargetAssignmentBuilder<T extends ModernGroupMember> {
static <T extends ModernGroupMember> MemberSubscriptionAndAssignmentImpl createMemberSubscriptionAndAssignment(
T member,
Assignment memberAssignment,
TopicsImage topicsImage
TopicIds.TopicResolver topicResolver
) {
return new MemberSubscriptionAndAssignmentImpl(
Optional.ofNullable(member.rackId()),
Optional.ofNullable(member.instanceId()),
new TopicIds(member.subscribedTopicNames(), topicsImage),
new TopicIds(member.subscribedTopicNames(), topicResolver),
memberAssignment
);
}

View File

@ -21,25 +21,161 @@ import org.apache.kafka.image.TopicImage;
import org.apache.kafka.image.TopicsImage;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Set;
/**
* TopicIds is initialized with topic names (String) but exposes a Set of topic ids (Uuid) to the
* user and performs the conversion lazily with TopicsImage.
* user and performs the conversion lazily with a TopicResolver backed by a TopicsImage.
*/
public class TopicIds implements Set<Uuid> {
/**
* Converts between topic ids (Uuids) and topic names (Strings).
*/
public interface TopicResolver {
/**
* @return The TopicsImage used by the resolver.
*/
TopicsImage image();
/**
* Converts a topic id to a topic name.
*
* @param id The topic id.
* @return The topic name for the given topic id, or null if the topic does not exist.
*/
String name(Uuid id);
/**
* Converts a topic name to a topic id.
*
* @param name The topic name.
* @return The topic id for the given topic name, or null if the topic does not exist.
*/
Uuid id(String name);
/**
* Clears any cached data.
*
* Used for benchmarking purposes.
*/
void clear();
}
/**
* A TopicResolver without any caching.
*/
public static class DefaultTopicResolver implements TopicResolver {
private final TopicsImage image;
public DefaultTopicResolver(
TopicsImage image
) {
this.image = Objects.requireNonNull(image);
}
@Override
public final TopicsImage image() {
return image;
}
@Override
public String name(Uuid id) {
TopicImage topic = image.getTopic(id);
if (topic == null) return null;
return topic.name();
}
@Override
public Uuid id(String name) {
TopicImage topic = image.getTopic(name);
if (topic == null) return null;
return topic.id();
}
@Override
public void clear() {}
@Override
public String toString() {
return "DefaultTopicResolver(image=" + image + ")";
}
}
/**
* A TopicResolver that caches results.
*
* This cache is expected to be short-lived and only used within a single
* TargetAssignmentBuilder.build() call.
*/
public static class CachedTopicResolver implements TopicResolver {
private final TopicsImage image;
private final Map<String, Uuid> topicIds = new HashMap<>();
private final Map<Uuid, String> topicNames = new HashMap<>();
public CachedTopicResolver(
TopicsImage image
) {
this.image = Objects.requireNonNull(image);
}
@Override
public final TopicsImage image() {
return image;
}
@Override
public String name(Uuid id) {
return topicNames.computeIfAbsent(id, __ -> {
TopicImage topic = image.getTopic(id);
if (topic == null) return null;
return topic.name();
});
}
@Override
public Uuid id(String name) {
return topicIds.computeIfAbsent(name, __ -> {
TopicImage topic = image.getTopic(name);
if (topic == null) return null;
return topic.id();
});
}
@Override
public void clear() {
this.topicNames.clear();
this.topicIds.clear();
}
@Override
public String toString() {
return "CachedTopicResolver(image=" + image + ")";
}
}
private final Set<String> topicNames;
private final TopicsImage image;
private final TopicResolver resolver;
public TopicIds(
Set<String> topicNames,
TopicsImage image
) {
this.topicNames = Objects.requireNonNull(topicNames);
this.image = Objects.requireNonNull(image);
this.resolver = new DefaultTopicResolver(image);
}
public TopicIds(
Set<String> topicNames,
TopicResolver resolver
) {
this.topicNames = Objects.requireNonNull(topicNames);
this.resolver = Objects.requireNonNull(resolver);
}
@Override
@ -56,24 +192,24 @@ public class TopicIds implements Set<Uuid> {
public boolean contains(Object o) {
if (o instanceof Uuid) {
Uuid topicId = (Uuid) o;
TopicImage topicImage = image.getTopic(topicId);
if (topicImage == null) return false;
return topicNames.contains(topicImage.name());
String topicName = resolver.name(topicId);
if (topicName == null) return false;
return topicNames.contains(topicName);
}
return false;
}
private static class TopicIdIterator implements Iterator<Uuid> {
final Iterator<String> iterator;
final TopicsImage image;
final TopicResolver resolver;
private Uuid next = null;
private TopicIdIterator(
Iterator<String> iterator,
TopicsImage image
TopicResolver resolver
) {
this.iterator = Objects.requireNonNull(iterator);
this.image = Objects.requireNonNull(image);
this.resolver = Objects.requireNonNull(resolver);
}
@Override
@ -85,9 +221,9 @@ public class TopicIds implements Set<Uuid> {
return false;
}
String next = iterator.next();
TopicImage topicImage = image.getTopic(next);
if (topicImage != null) {
result = topicImage.id();
Uuid topicId = resolver.id(next);
if (topicId != null) {
result = topicId;
}
} while (result == null);
next = result;
@ -105,7 +241,7 @@ public class TopicIds implements Set<Uuid> {
@Override
public Iterator<Uuid> iterator() {
return new TopicIdIterator(topicNames.iterator(), image);
return new TopicIdIterator(topicNames.iterator(), resolver);
}
@Override
@ -164,20 +300,20 @@ public class TopicIds implements Set<Uuid> {
TopicIds uuids = (TopicIds) o;
if (!Objects.equals(topicNames, uuids.topicNames)) return false;
return Objects.equals(image, uuids.image);
return Objects.equals(resolver.image(), uuids.resolver.image());
}
@Override
public int hashCode() {
int result = topicNames.hashCode();
result = 31 * result + image.hashCode();
result = 31 * result + resolver.image().hashCode();
return result;
}
@Override
public String toString() {
return "TopicIds(topicNames=" + topicNames +
", image=" + image +
", resolver=" + resolver +
')';
}
}

View File

@ -161,6 +161,7 @@ public class TargetAssignmentBuilderTest {
public TargetAssignmentBuilder.TargetAssignmentResult build() {
TopicsImage topicsImage = topicsImageBuilder.build().topics();
TopicIds.TopicResolver topicResolver = new TopicIds.CachedTopicResolver(topicsImage);
// Prepare expected member specs.
Map<String, MemberSubscriptionAndAssignmentImpl> memberSubscriptions = new HashMap<>();
@ -169,7 +170,7 @@ public class TargetAssignmentBuilderTest {
memberSubscriptions.put(memberId, createMemberSubscriptionAndAssignment(
member,
targetAssignment.getOrDefault(memberId, Assignment.EMPTY),
topicsImage
topicResolver
))
);
@ -192,7 +193,7 @@ public class TargetAssignmentBuilderTest {
memberSubscriptions.put(memberId, createMemberSubscriptionAndAssignment(
updatedMemberOrNull,
assignment,
topicsImage
topicResolver
));
}
});
@ -263,6 +264,7 @@ public class TargetAssignmentBuilderTest {
.addTopic(barTopicId, "bar", 5)
.build()
.topics();
TopicIds.TopicResolver topicResolver = new TopicIds.DefaultTopicResolver(topicsImage);
ConsumerGroupMember member = new ConsumerGroupMember.Builder("member-id")
.setSubscribedTopicNames(Arrays.asList("foo", "bar", "zar"))
@ -278,7 +280,7 @@ public class TargetAssignmentBuilderTest {
MemberSubscription subscriptionSpec = createMemberSubscriptionAndAssignment(
member,
assignment,
topicsImage
topicResolver
);
assertEquals(new MemberSubscriptionAndAssignmentImpl(

View File

@ -41,7 +41,12 @@ public class TopicIdsTest {
@Test
public void testTopicsImageCannotBeNull() {
assertThrows(NullPointerException.class, () -> new TopicIds(Collections.emptySet(), null));
assertThrows(NullPointerException.class, () -> new TopicIds(Collections.emptySet(), (TopicsImage) null));
}
@Test
public void testTopicResolverCannotBeNull() {
assertThrows(NullPointerException.class, () -> new TopicIds(Collections.emptySet(), (TopicIds.TopicResolver) null));
}
@Test

View File

@ -159,13 +159,13 @@ public class AssignorBenchmarkUtils {
*
* @param members The ConsumerGroupMembers.
* @param subscriptionType  The group's subscription type.
* @param topicsImage The TopicsImage to use.
* @param topicResolver The TopicResolver to use.
* @return The new GroupSpec.
*/
public static GroupSpec createGroupSpec(
Map<String, ConsumerGroupMember> members,
SubscriptionType subscriptionType,
TopicsImage topicsImage
TopicIds.TopicResolver topicResolver
) {
Map<String, MemberSubscriptionAndAssignmentImpl> memberSpecs = new HashMap<>();
@ -177,7 +177,7 @@ public class AssignorBenchmarkUtils {
memberSpecs.put(memberId, new MemberSubscriptionAndAssignmentImpl(
Optional.ofNullable(member.rackId()),
Optional.ofNullable(member.instanceId()),
new TopicIds(member.subscribedTopicNames(), topicsImage),
new TopicIds(member.subscribedTopicNames(), topicResolver),
new Assignment(member.assignedPartitions())
));
}

View File

@ -129,6 +129,8 @@ public class ServerSideAssignorBenchmark {
private TopicsImage topicsImage = TopicsImage.EMPTY;
private TopicIds.TopicResolver topicResolver;
private SubscribedTopicDescriber subscribedTopicDescriber;
@Setup(Level.Trial)
@ -138,7 +140,7 @@ public class ServerSideAssignorBenchmark {
setupTopics();
Map<String, ConsumerGroupMember> members = createMembers();
this.groupSpec = AssignorBenchmarkUtils.createGroupSpec(members, subscriptionType, topicsImage);
this.groupSpec = AssignorBenchmarkUtils.createGroupSpec(members, subscriptionType, topicResolver);
if (assignmentType == AssignmentType.INCREMENTAL) {
simulateIncrementalRebalance();
@ -155,6 +157,7 @@ public class ServerSideAssignorBenchmark {
);
topicsImage = AssignorBenchmarkUtils.createTopicsImage(subscriptionMetadata);
topicResolver = new TopicIds.CachedTopicResolver(topicsImage);
Map<Uuid, TopicMetadata> topicMetadata = AssignorBenchmarkUtils.createTopicMetadata(subscriptionMetadata);
subscribedTopicDescriber = new SubscribedTopicDescriberImpl(topicMetadata);
@ -229,7 +232,7 @@ public class ServerSideAssignorBenchmark {
if (subscriptionType == HETEROGENEOUS) {
subscribedTopicIdsForNewMember = updatedMemberSpec.get(memberId(memberCount - 2)).subscribedTopicIds();
} else {
subscribedTopicIdsForNewMember = new TopicIds(new HashSet<>(allTopicNames), topicsImage);
subscribedTopicIdsForNewMember = new TopicIds(new HashSet<>(allTopicNames), topicResolver);
}
Optional<String> rackId = rackId(memberCount - 1);
@ -251,6 +254,7 @@ public class ServerSideAssignorBenchmark {
@Threads(1)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void doAssignment() {
topicResolver.clear();
partitionAssignor.assign(groupSpec, subscribedTopicDescriber);
}
}

View File

@ -27,6 +27,7 @@ import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
import org.apache.kafka.coordinator.group.modern.Assignment;
import org.apache.kafka.coordinator.group.modern.SubscribedTopicDescriberImpl;
import org.apache.kafka.coordinator.group.modern.TargetAssignmentBuilder;
import org.apache.kafka.coordinator.group.modern.TopicIds;
import org.apache.kafka.coordinator.group.modern.TopicMetadata;
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember;
import org.apache.kafka.image.TopicsImage;
@ -96,6 +97,8 @@ public class TargetAssignmentBuilderBenchmark {
private TopicsImage topicsImage;
private TopicIds.TopicResolver topicResolver;
private SubscribedTopicDescriber subscribedTopicDescriber;
@Setup(Level.Trial)
@ -133,6 +136,7 @@ public class TargetAssignmentBuilderBenchmark {
);
topicsImage = AssignorBenchmarkUtils.createTopicsImage(subscriptionMetadata);
topicResolver = new TopicIds.CachedTopicResolver(topicsImage);
Map<Uuid, TopicMetadata> topicMetadata = AssignorBenchmarkUtils.createTopicMetadata(subscriptionMetadata);
subscribedTopicDescriber = new SubscribedTopicDescriberImpl(topicMetadata);
@ -144,7 +148,7 @@ public class TargetAssignmentBuilderBenchmark {
this.groupSpec = AssignorBenchmarkUtils.createGroupSpec(
members,
subscriptionType,
topicsImage
topicResolver
);
GroupAssignment groupAssignment = partitionAssignor.assign(