Minor: Replace InternalTopicMetadata with InternalTopicConfig (#6886)

Quick tech debt cleanup. For some reason StreamsPartitionAssignor uses an InternalTopicMetadata class which wraps an InternalTopicConfig object along with the number of partitions. But InternalTopicConfig already has a numPartitions field, so we should just use it directly instead.

Reviewers: Guozhang Wang <wangguoz@gmail.com>, Bruno Cadonna <bruno@confluent.io>,  Bill Bejeck <bbejeck@gmail.com>
This commit is contained in:
A. Sophie Blee-Goldman 2019-06-06 06:58:58 -07:00 committed by Bill Bejeck
parent 0e95c9f3a8
commit 59d3a56740
3 changed files with 44 additions and 71 deletions

View File

@ -26,10 +26,11 @@ import java.util.Objects;
* the internal topics we create for change-logs and repartitioning etc. * the internal topics we create for change-logs and repartitioning etc.
*/ */
public abstract class InternalTopicConfig { public abstract class InternalTopicConfig {
final String name; final String name;
final Map<String, String> topicConfigs; final Map<String, String> topicConfigs;
private int numberOfPartitions = -1; private int numberOfPartitions = StreamsPartitionAssignor.UNKNOWN;
InternalTopicConfig(final String name, final Map<String, String> topicConfigs) { InternalTopicConfig(final String name, final Map<String, String> topicConfigs) {
Objects.requireNonNull(name, "name can't be null"); Objects.requireNonNull(name, "name can't be null");
@ -53,9 +54,6 @@ public abstract class InternalTopicConfig {
} }
public int numberOfPartitions() { public int numberOfPartitions() {
if (numberOfPartitions == -1) {
throw new IllegalStateException("Number of partitions not specified.");
}
return numberOfPartitions; return numberOfPartitions;
} }

View File

@ -58,7 +58,7 @@ import static org.apache.kafka.common.utils.Utils.getPort;
public class StreamsPartitionAssignor implements PartitionAssignor, Configurable { public class StreamsPartitionAssignor implements PartitionAssignor, Configurable {
private final static int UNKNOWN = -1; final static int UNKNOWN = -1;
private final static int VERSION_ONE = 1; private final static int VERSION_ONE = 1;
private final static int VERSION_TWO = 2; private final static int VERSION_TWO = 2;
private final static int VERSION_THREE = 3; private final static int VERSION_THREE = 3;
@ -174,25 +174,6 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
} }
} }
static class InternalTopicMetadata {
public final InternalTopicConfig config;
public int numPartitions;
InternalTopicMetadata(final InternalTopicConfig config) {
this.config = config;
this.numPartitions = UNKNOWN;
}
@Override
public String toString() {
return "InternalTopicMetadata(" +
"config=" + config +
", numPartitions=" + numPartitions +
")";
}
}
private static final class InternalStreamsConfig extends StreamsConfig { private static final class InternalStreamsConfig extends StreamsConfig {
private InternalStreamsConfig(final Map<?, ?> props) { private InternalStreamsConfig(final Map<?, ?> props) {
super(props, false); super(props, false);
@ -458,7 +439,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
// the maximum of the depending sub-topologies source topics' number of partitions // the maximum of the depending sub-topologies source topics' number of partitions
final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = taskManager.builder().topicGroups(); final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = taskManager.builder().topicGroups();
final Map<String, InternalTopicMetadata> repartitionTopicMetadata = new HashMap<>(); final Map<String, InternalTopicConfig> repartitionTopicMetadata = new HashMap<>();
for (final InternalTopologyBuilder.TopicsInfo topicsInfo : topicGroups.values()) { for (final InternalTopologyBuilder.TopicsInfo topicsInfo : topicGroups.values()) {
for (final String topic : topicsInfo.sourceTopics) { for (final String topic : topicsInfo.sourceTopics) {
if (!topicsInfo.repartitionSourceTopics.keySet().contains(topic) && if (!topicsInfo.repartitionSourceTopics.keySet().contains(topic) &&
@ -469,7 +450,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
} }
} }
for (final InternalTopicConfig topic: topicsInfo.repartitionSourceTopics.values()) { for (final InternalTopicConfig topic: topicsInfo.repartitionSourceTopics.values()) {
repartitionTopicMetadata.put(topic.name(), new InternalTopicMetadata(topic)); repartitionTopicMetadata.put(topic.name(), topic);
} }
} }
@ -479,10 +460,10 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
for (final InternalTopologyBuilder.TopicsInfo topicsInfo : topicGroups.values()) { for (final InternalTopologyBuilder.TopicsInfo topicsInfo : topicGroups.values()) {
for (final String topicName : topicsInfo.repartitionSourceTopics.keySet()) { for (final String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
int numPartitions = repartitionTopicMetadata.get(topicName).numPartitions; int numPartitions = repartitionTopicMetadata.get(topicName).numberOfPartitions();
// try set the number of partitions for this repartition topic if it is not set yet
if (numPartitions == UNKNOWN) { if (numPartitions == UNKNOWN) {
// try set the number of partitions for this repartition topic if it is not set yet
for (final InternalTopologyBuilder.TopicsInfo otherTopicsInfo : topicGroups.values()) { for (final InternalTopologyBuilder.TopicsInfo otherTopicsInfo : topicGroups.values()) {
final Set<String> otherSinkTopics = otherTopicsInfo.sinkTopics; final Set<String> otherSinkTopics = otherTopicsInfo.sinkTopics;
@ -494,7 +475,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
// It is possible the sourceTopic is another internal topic, i.e, // It is possible the sourceTopic is another internal topic, i.e,
// map().join().join(map()) // map().join().join(map())
if (repartitionTopicMetadata.containsKey(sourceTopicName)) { if (repartitionTopicMetadata.containsKey(sourceTopicName)) {
numPartitionsCandidate = repartitionTopicMetadata.get(sourceTopicName).numPartitions; numPartitionsCandidate = repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions();
} else { } else {
numPartitionsCandidate = metadata.partitionCountForTopic(sourceTopicName); numPartitionsCandidate = metadata.partitionCountForTopic(sourceTopicName);
} }
@ -510,7 +491,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
if (numPartitions == UNKNOWN) { if (numPartitions == UNKNOWN) {
numPartitionsNeeded = true; numPartitionsNeeded = true;
} else { } else {
repartitionTopicMetadata.get(topicName).numPartitions = numPartitions; repartitionTopicMetadata.get(topicName).setNumberOfPartitions(numPartitions);
} }
} }
} }
@ -530,9 +511,9 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
// augment the metadata with the newly computed number of partitions for all the // augment the metadata with the newly computed number of partitions for all the
// repartition source topics // repartition source topics
final Map<TopicPartition, PartitionInfo> allRepartitionTopicPartitions = new HashMap<>(); final Map<TopicPartition, PartitionInfo> allRepartitionTopicPartitions = new HashMap<>();
for (final Map.Entry<String, InternalTopicMetadata> entry : repartitionTopicMetadata.entrySet()) { for (final Map.Entry<String, InternalTopicConfig> entry : repartitionTopicMetadata.entrySet()) {
final String topic = entry.getKey(); final String topic = entry.getKey();
final int numPartitions = entry.getValue().numPartitions; final int numPartitions = entry.getValue().numberOfPartitions();
for (int partition = 0; partition < numPartitions; partition++) { for (int partition = 0; partition < numPartitions; partition++) {
allRepartitionTopicPartitions.put(new TopicPartition(topic, partition), allRepartitionTopicPartitions.put(new TopicPartition(topic, partition),
@ -591,7 +572,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
} }
// add tasks to state change log topic subscribers // add tasks to state change log topic subscribers
final Map<String, InternalTopicMetadata> changelogTopicMetadata = new HashMap<>(); final Map<String, InternalTopicConfig> changelogTopicMetadata = new HashMap<>();
for (final Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) { for (final Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
final int topicGroupId = entry.getKey(); final int topicGroupId = entry.getKey();
final Map<String, InternalTopicConfig> stateChangelogTopics = entry.getValue().stateChangelogTopics; final Map<String, InternalTopicConfig> stateChangelogTopics = entry.getValue().stateChangelogTopics;
@ -605,10 +586,9 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
numPartitions = task.partition + 1; numPartitions = task.partition + 1;
} }
} }
final InternalTopicMetadata topicMetadata = new InternalTopicMetadata(topicConfig); topicConfig.setNumberOfPartitions(numPartitions);
topicMetadata.numPartitions = numPartitions;
changelogTopicMetadata.put(topicConfig.name(), topicMetadata); changelogTopicMetadata.put(topicConfig.name(), topicConfig);
} else { } else {
log.debug("No tasks found for topic group {}", topicGroupId); log.debug("No tasks found for topic group {}", topicGroupId);
} }
@ -949,17 +929,15 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
* *
* @param topicPartitions Map that contains the topic names to be created with the number of partitions * @param topicPartitions Map that contains the topic names to be created with the number of partitions
*/ */
private void prepareTopic(final Map<String, InternalTopicMetadata> topicPartitions) { private void prepareTopic(final Map<String, InternalTopicConfig> topicPartitions) {
log.debug("Starting to validate internal topics {} in partition assignor.", topicPartitions); log.debug("Starting to validate internal topics {} in partition assignor.", topicPartitions);
// first construct the topics to make ready // first construct the topics to make ready
final Map<String, InternalTopicConfig> topicsToMakeReady = new HashMap<>(); final Map<String, InternalTopicConfig> topicsToMakeReady = new HashMap<>();
for (final InternalTopicMetadata metadata : topicPartitions.values()) { for (final InternalTopicConfig topic : topicPartitions.values()) {
final InternalTopicConfig topic = metadata.config; final int numPartitions = topic.numberOfPartitions();
final int numPartitions = metadata.numPartitions; if (numPartitions == UNKNOWN) {
if (numPartitions < 0) {
throw new StreamsException(String.format("%sTopic [%s] number of partitions not defined", logPrefix, topic.name())); throw new StreamsException(String.format("%sTopic [%s] number of partitions not defined", logPrefix, topic.name()));
} }
@ -975,7 +953,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
} }
private void ensureCopartitioning(final Collection<Set<String>> copartitionGroups, private void ensureCopartitioning(final Collection<Set<String>> copartitionGroups,
final Map<String, InternalTopicMetadata> allRepartitionTopicsNumPartitions, final Map<String, InternalTopicConfig> allRepartitionTopicsNumPartitions,
final Cluster metadata) { final Cluster metadata) {
for (final Set<String> copartitionGroup : copartitionGroups) { for (final Set<String> copartitionGroup : copartitionGroups) {
copartitionedTopicsValidator.validate(copartitionGroup, allRepartitionTopicsNumPartitions, metadata); copartitionedTopicsValidator.validate(copartitionGroup, allRepartitionTopicsNumPartitions, metadata);
@ -993,7 +971,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
} }
void validate(final Set<String> copartitionGroup, void validate(final Set<String> copartitionGroup,
final Map<String, InternalTopicMetadata> allRepartitionTopicsNumPartitions, final Map<String, InternalTopicConfig> allRepartitionTopicsNumPartitions,
final Cluster metadata) { final Cluster metadata) {
int numPartitions = UNKNOWN; int numPartitions = UNKNOWN;
@ -1019,9 +997,9 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
// if all topics for this co-partition group is repartition topics, // if all topics for this co-partition group is repartition topics,
// then set the number of partitions to be the maximum of the number of partitions. // then set the number of partitions to be the maximum of the number of partitions.
if (numPartitions == UNKNOWN) { if (numPartitions == UNKNOWN) {
for (final Map.Entry<String, InternalTopicMetadata> entry: allRepartitionTopicsNumPartitions.entrySet()) { for (final Map.Entry<String, InternalTopicConfig> entry: allRepartitionTopicsNumPartitions.entrySet()) {
if (copartitionGroup.contains(entry.getKey())) { if (copartitionGroup.contains(entry.getKey())) {
final int partitions = entry.getValue().numPartitions; final int partitions = entry.getValue().numberOfPartitions();
if (partitions > numPartitions) { if (partitions > numPartitions) {
numPartitions = partitions; numPartitions = partitions;
} }
@ -1029,9 +1007,9 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
} }
} }
// enforce co-partitioning restrictions to repartition topics by updating their number of partitions // enforce co-partitioning restrictions to repartition topics by updating their number of partitions
for (final Map.Entry<String, InternalTopicMetadata> entry : allRepartitionTopicsNumPartitions.entrySet()) { for (final Map.Entry<String, InternalTopicConfig> entry : allRepartitionTopicsNumPartitions.entrySet()) {
if (copartitionGroup.contains(entry.getKey())) { if (copartitionGroup.contains(entry.getKey())) {
entry.getValue().numPartitions = numPartitions; entry.getValue().setNumberOfPartitions(numPartitions);
} }
} }

View File

@ -72,49 +72,46 @@ public class CopartitionedTopicsValidatorTest {
@Test @Test
public void shouldEnforceCopartitioningOnRepartitionTopics() { public void shouldEnforceCopartitioningOnRepartitionTopics() {
final StreamsPartitionAssignor.InternalTopicMetadata metadata = createTopicMetadata("repartitioned", 10); final InternalTopicConfig config = createTopicConfig("repartitioned", 10);
validator.validate(Utils.mkSet("first", "second", metadata.config.name()), validator.validate(Utils.mkSet("first", "second", config.name()),
Collections.singletonMap(metadata.config.name(), Collections.singletonMap(config.name(), config),
metadata),
cluster.withPartitions(partitions)); cluster.withPartitions(partitions));
assertThat(metadata.numPartitions, equalTo(2)); assertThat(config.numberOfPartitions(), equalTo(2));
} }
@Test @Test
public void shouldSetNumPartitionsToMaximumPartitionsWhenAllTopicsAreRepartitionTopics() { public void shouldSetNumPartitionsToMaximumPartitionsWhenAllTopicsAreRepartitionTopics() {
final StreamsPartitionAssignor.InternalTopicMetadata one = createTopicMetadata("one", 1); final InternalTopicConfig one = createTopicConfig("one", 1);
final StreamsPartitionAssignor.InternalTopicMetadata two = createTopicMetadata("two", 15); final InternalTopicConfig two = createTopicConfig("two", 15);
final StreamsPartitionAssignor.InternalTopicMetadata three = createTopicMetadata("three", 5); final InternalTopicConfig three = createTopicConfig("three", 5);
final Map<String, StreamsPartitionAssignor.InternalTopicMetadata> repartitionTopicConfig = new HashMap<>(); final Map<String, InternalTopicConfig> repartitionTopicConfig = new HashMap<>();
repartitionTopicConfig.put(one.config.name(), one); repartitionTopicConfig.put(one.name(), one);
repartitionTopicConfig.put(two.config.name(), two); repartitionTopicConfig.put(two.name(), two);
repartitionTopicConfig.put(three.config.name(), three); repartitionTopicConfig.put(three.name(), three);
validator.validate(Utils.mkSet(one.config.name(), validator.validate(Utils.mkSet(one.name(),
two.config.name(), two.name(),
three.config.name()), three.name()),
repartitionTopicConfig, repartitionTopicConfig,
cluster cluster
); );
assertThat(one.numPartitions, equalTo(15)); assertThat(one.numberOfPartitions(), equalTo(15));
assertThat(two.numPartitions, equalTo(15)); assertThat(two.numberOfPartitions(), equalTo(15));
assertThat(three.numPartitions, equalTo(15)); assertThat(three.numberOfPartitions(), equalTo(15));
} }
private StreamsPartitionAssignor.InternalTopicMetadata createTopicMetadata(final String repartitionTopic, private InternalTopicConfig createTopicConfig(final String repartitionTopic,
final int partitions) { final int partitions) {
final InternalTopicConfig repartitionTopicConfig = final InternalTopicConfig repartitionTopicConfig =
new RepartitionTopicConfig(repartitionTopic, Collections.emptyMap()); new RepartitionTopicConfig(repartitionTopic, Collections.emptyMap());
final StreamsPartitionAssignor.InternalTopicMetadata metadata = repartitionTopicConfig.setNumberOfPartitions(partitions);
new StreamsPartitionAssignor.InternalTopicMetadata(repartitionTopicConfig); return repartitionTopicConfig;
metadata.numPartitions = partitions;
return metadata;
} }
} }