mirror of https://github.com/apache/kafka.git
KAFKA-18311: Configuring repartition topics (3/N) (#18395)
A simplified port of "RepartitionTopics" from the client-side to the group coordinator. Compared to the client-side version, the implementation uses immutable data structures, and returns the computed number of partitions instead of modifying mutable data structures and calling the admin client. Reviewers: Bruno Cadonna <cadonna@apache.org>
This commit is contained in:
parent
11459ae7e9
commit
3bda9f817d
|
@ -0,0 +1,178 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.coordinator.group.streams.topics;
|
||||
|
||||
import org.apache.kafka.common.errors.StreamsInvalidTopologyException;
|
||||
import org.apache.kafka.common.utils.LogContext;
|
||||
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology;
|
||||
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicInfo;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.OptionalInt;
|
||||
import java.util.Set;
|
||||
import java.util.function.Function;
|
||||
|
||||
/**
|
||||
* Responsible for configuring the number of partitions in repartitioning topics. It computes a fix-point iteration, deriving the number of
|
||||
* partitions for each repartition topic based on the number of partitions of the source topics of the topology, if the number of
|
||||
* partitions is not explicitly set in the topology.
|
||||
*/
|
||||
public class RepartitionTopics {
|
||||
|
||||
private final Logger log;
|
||||
private final Collection<Subtopology> subtopologies;
|
||||
private final Function<String, OptionalInt> topicPartitionCountProvider;
|
||||
|
||||
/**
|
||||
* The constructor for the class.
|
||||
*
|
||||
* @param logContext The context for emitting log messages.
|
||||
* @param subtopologies The subtopologies for the requested topology.
|
||||
* @param topicPartitionCountProvider Returns the number of partitions for a given topic, representing the current state of the
|
||||
* broker.
|
||||
*/
|
||||
public RepartitionTopics(final LogContext logContext,
|
||||
final Collection<Subtopology> subtopologies,
|
||||
final Function<String, OptionalInt> topicPartitionCountProvider) {
|
||||
this.log = logContext.logger(getClass());
|
||||
this.subtopologies = subtopologies;
|
||||
this.topicPartitionCountProvider = topicPartitionCountProvider;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the set of the number of partitions for each repartition topic.
|
||||
*
|
||||
* @return the map of repartition topics for the requested topology to their required number of partitions.
|
||||
*
|
||||
* @throws TopicConfigurationException if no valid configuration can be found given the broker state, for example, if a source topic
|
||||
* is missing.
|
||||
* @throws StreamsInvalidTopologyException if the number of partitions for all repartition topics cannot be determined, e.g.
|
||||
* because of loops, or if a repartition source topic is not a sink topic of any subtopology.
|
||||
*/
|
||||
public Map<String, Integer> setup() {
|
||||
final Set<String> missingSourceTopicsForTopology = new HashSet<>();
|
||||
|
||||
for (final Subtopology subtopology : subtopologies) {
|
||||
final Set<String> missingSourceTopicsForSubtopology = computeMissingExternalSourceTopics(subtopology);
|
||||
missingSourceTopicsForTopology.addAll(missingSourceTopicsForSubtopology);
|
||||
}
|
||||
|
||||
if (!missingSourceTopicsForTopology.isEmpty()) {
|
||||
throw TopicConfigurationException.missingSourceTopics(String.format("Missing source topics: %s",
|
||||
String.join(", ", missingSourceTopicsForTopology)));
|
||||
}
|
||||
|
||||
final Map<String, Integer> repartitionTopicPartitionCount = computeRepartitionTopicPartitionCount();
|
||||
|
||||
for (final Subtopology subtopology : subtopologies) {
|
||||
if (subtopology.repartitionSourceTopics().stream().anyMatch(repartitionTopic -> !repartitionTopicPartitionCount.containsKey(repartitionTopic.name()))) {
|
||||
throw new StreamsInvalidTopologyException("Failed to compute number of partitions for all repartition topics, because "
|
||||
+ "a repartition source topic is never used as a sink topic.");
|
||||
}
|
||||
}
|
||||
|
||||
return repartitionTopicPartitionCount;
|
||||
}
|
||||
|
||||
private Set<String> computeMissingExternalSourceTopics(final Subtopology subtopology) {
|
||||
final Set<String> missingExternalSourceTopics = new HashSet<>(subtopology.sourceTopics());
|
||||
for (final TopicInfo topicInfo : subtopology.repartitionSourceTopics()) {
|
||||
missingExternalSourceTopics.remove(topicInfo.name());
|
||||
}
|
||||
missingExternalSourceTopics.removeIf(x -> topicPartitionCountProvider.apply(x).isPresent());
|
||||
return missingExternalSourceTopics;
|
||||
}
|
||||
|
||||
/**
|
||||
* Computes the number of partitions and returns it for each repartition topic.
|
||||
*/
|
||||
private Map<String, Integer> computeRepartitionTopicPartitionCount() {
|
||||
boolean partitionCountNeeded;
|
||||
Map<String, Integer> repartitionTopicPartitionCounts = new HashMap<>();
|
||||
|
||||
for (final Subtopology subtopology : subtopologies) {
|
||||
for (final TopicInfo repartitionSourceTopic : subtopology.repartitionSourceTopics()) {
|
||||
if (repartitionSourceTopic.partitions() != 0) {
|
||||
repartitionTopicPartitionCounts.put(repartitionSourceTopic.name(), repartitionSourceTopic.partitions());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
do {
|
||||
partitionCountNeeded = false;
|
||||
// avoid infinitely looping without making any progress on unknown repartitions
|
||||
boolean progressMadeThisIteration = false;
|
||||
|
||||
for (final Subtopology subtopology : subtopologies) {
|
||||
for (final String repartitionSinkTopic : subtopology.repartitionSinkTopics()) {
|
||||
if (!repartitionTopicPartitionCounts.containsKey(repartitionSinkTopic)) {
|
||||
final Integer numPartitions = computePartitionCount(
|
||||
repartitionTopicPartitionCounts,
|
||||
subtopology
|
||||
);
|
||||
|
||||
if (numPartitions == null) {
|
||||
partitionCountNeeded = true;
|
||||
log.trace("Unable to determine number of partitions for {}, another iteration is needed",
|
||||
repartitionSinkTopic);
|
||||
} else {
|
||||
log.trace("Determined number of partitions for {} to be {}",
|
||||
repartitionSinkTopic,
|
||||
numPartitions);
|
||||
repartitionTopicPartitionCounts.put(repartitionSinkTopic, numPartitions);
|
||||
progressMadeThisIteration = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!progressMadeThisIteration && partitionCountNeeded) {
|
||||
throw new StreamsInvalidTopologyException("Failed to compute number of partitions for all " +
|
||||
"repartition topics. There may be loops in the topology that cannot be resolved.");
|
||||
}
|
||||
} while (partitionCountNeeded);
|
||||
|
||||
return repartitionTopicPartitionCounts;
|
||||
}
|
||||
|
||||
private Integer computePartitionCount(final Map<String, Integer> repartitionTopicPartitionCounts,
|
||||
final Subtopology subtopology) {
|
||||
Integer partitionCount = null;
|
||||
// try set the number of partitions for this repartition topic if it is not set yet
|
||||
// use the maximum of all its source topic partitions as the number of partitions
|
||||
|
||||
// It is possible that there is another internal topic, i.e,
|
||||
// map().join().join(map())
|
||||
for (final TopicInfo repartitionSourceTopic : subtopology.repartitionSourceTopics()) {
|
||||
Integer numPartitionsCandidate = repartitionTopicPartitionCounts.get(repartitionSourceTopic.name());
|
||||
if (numPartitionsCandidate != null && (partitionCount == null || numPartitionsCandidate > partitionCount)) {
|
||||
partitionCount = numPartitionsCandidate;
|
||||
}
|
||||
}
|
||||
for (final String externalSourceTopic : subtopology.sourceTopics()) {
|
||||
final OptionalInt actualPartitionCount = topicPartitionCountProvider.apply(externalSourceTopic);
|
||||
if (actualPartitionCount.isPresent() && (partitionCount == null || actualPartitionCount.getAsInt() > partitionCount)) {
|
||||
partitionCount = actualPartitionCount.getAsInt();
|
||||
}
|
||||
}
|
||||
return partitionCount;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,206 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.coordinator.group.streams.topics;
|
||||
|
||||
import org.apache.kafka.common.errors.StreamsInvalidTopologyException;
|
||||
import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse.Status;
|
||||
import org.apache.kafka.common.utils.LogContext;
|
||||
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology;
|
||||
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicInfo;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.OptionalInt;
|
||||
import java.util.function.Function;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
public class RepartitionTopicsTest {
|
||||
|
||||
private static final LogContext LOG_CONTEXT = new LogContext();
|
||||
private static final String SOURCE_TOPIC_NAME1 = "source1";
|
||||
private static final String SOURCE_TOPIC_NAME2 = "source2";
|
||||
private static final TopicInfo REPARTITION_TOPIC1 = new TopicInfo().setName("repartition1").setPartitions(4);
|
||||
private static final TopicInfo REPARTITION_TOPIC2 = new TopicInfo().setName("repartition2").setPartitions(2);
|
||||
private static final TopicInfo REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT = new TopicInfo().setName("repartitionWithoutPartitionCount");
|
||||
|
||||
private static OptionalInt sourceTopicPartitionCounts(final String topicName) {
|
||||
return SOURCE_TOPIC_NAME1.equals(topicName) || SOURCE_TOPIC_NAME2.equals(topicName) ? OptionalInt.of(3) : OptionalInt.empty();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldSetupRepartitionTopics() {
|
||||
final Subtopology subtopology1 = new Subtopology()
|
||||
.setSubtopologyId("subtopology1")
|
||||
.setSourceTopics(List.of(SOURCE_TOPIC_NAME1, SOURCE_TOPIC_NAME2))
|
||||
.setRepartitionSinkTopics(List.of(REPARTITION_TOPIC1.name()));
|
||||
final Subtopology subtopology2 = new Subtopology()
|
||||
.setSubtopologyId("subtopology2")
|
||||
.setRepartitionSourceTopics(List.of(REPARTITION_TOPIC1));
|
||||
final List<Subtopology> subtopologies = List.of(subtopology1, subtopology2);
|
||||
final RepartitionTopics repartitionTopics = new RepartitionTopics(
|
||||
LOG_CONTEXT,
|
||||
subtopologies,
|
||||
RepartitionTopicsTest::sourceTopicPartitionCounts
|
||||
);
|
||||
|
||||
final Map<String, Integer> setup = repartitionTopics.setup();
|
||||
|
||||
assertEquals(
|
||||
Map.of(REPARTITION_TOPIC1.name(), REPARTITION_TOPIC1.partitions()),
|
||||
setup
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldThrowStreamsMissingSourceTopicsExceptionIfMissingSourceTopics() {
|
||||
final Subtopology subtopology1 = new Subtopology()
|
||||
.setSubtopologyId("subtopology1")
|
||||
.setSourceTopics(List.of(SOURCE_TOPIC_NAME1, SOURCE_TOPIC_NAME2))
|
||||
.setRepartitionSinkTopics(List.of(REPARTITION_TOPIC1.name()));
|
||||
final Subtopology subtopology2 = new Subtopology()
|
||||
.setSubtopologyId("subtopology2")
|
||||
.setRepartitionSourceTopics(List.of(REPARTITION_TOPIC1));
|
||||
final Function<String, OptionalInt> topicPartitionCountProvider =
|
||||
s -> Objects.equals(s, SOURCE_TOPIC_NAME1) ? OptionalInt.empty() : sourceTopicPartitionCounts(s);
|
||||
final RepartitionTopics repartitionTopics = new RepartitionTopics(
|
||||
LOG_CONTEXT,
|
||||
List.of(subtopology1, subtopology2),
|
||||
topicPartitionCountProvider
|
||||
);
|
||||
|
||||
final TopicConfigurationException exception = assertThrows(TopicConfigurationException.class,
|
||||
repartitionTopics::setup);
|
||||
|
||||
assertEquals(Status.MISSING_SOURCE_TOPICS, exception.status());
|
||||
assertEquals("Missing source topics: source1", exception.getMessage());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldThrowStreamsInvalidTopologyExceptionIfPartitionCountCannotBeComputedForAllRepartitionTopicsDueToLoops() {
|
||||
final Subtopology subtopology1 = new Subtopology()
|
||||
.setSubtopologyId("subtopology1")
|
||||
.setRepartitionSourceTopics(List.of(REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT))
|
||||
.setRepartitionSinkTopics(List.of(REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT.name()));
|
||||
final RepartitionTopics repartitionTopics = new RepartitionTopics(
|
||||
LOG_CONTEXT,
|
||||
List.of(subtopology1),
|
||||
RepartitionTopicsTest::sourceTopicPartitionCounts
|
||||
);
|
||||
|
||||
final StreamsInvalidTopologyException exception = assertThrows(StreamsInvalidTopologyException.class, repartitionTopics::setup);
|
||||
|
||||
assertEquals(
|
||||
"Failed to compute number of partitions for all repartition topics. There may be loops in the topology that cannot be resolved.",
|
||||
exception.getMessage()
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldThrowStreamsInvalidTopologyExceptionIfPartitionCountCannotBeComputedForAllRepartitionTopicsDueToMissingSinks() {
|
||||
final Subtopology subtopology1 = new Subtopology()
|
||||
.setSubtopologyId("subtopology1")
|
||||
.setRepartitionSourceTopics(List.of(REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT));
|
||||
final RepartitionTopics repartitionTopics = new RepartitionTopics(
|
||||
LOG_CONTEXT,
|
||||
List.of(subtopology1),
|
||||
RepartitionTopicsTest::sourceTopicPartitionCounts
|
||||
);
|
||||
|
||||
final StreamsInvalidTopologyException exception = assertThrows(StreamsInvalidTopologyException.class, repartitionTopics::setup);
|
||||
|
||||
assertEquals(
|
||||
"Failed to compute number of partitions for all repartition topics, because a repartition source topic is never used as a sink topic.",
|
||||
exception.getMessage()
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldSetRepartitionTopicPartitionCountFromUpstreamExternalSourceTopic() {
|
||||
final Subtopology subtopology = new Subtopology()
|
||||
.setSubtopologyId("subtopology0")
|
||||
.setSourceTopics(List.of(SOURCE_TOPIC_NAME1))
|
||||
.setRepartitionSinkTopics(List.of(REPARTITION_TOPIC1.name(), REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT.name()))
|
||||
.setRepartitionSourceTopics(List.of(REPARTITION_TOPIC2));
|
||||
final Subtopology subtopologyWithoutPartitionCount = new Subtopology()
|
||||
.setSubtopologyId("subtopologyWithoutPartitionCount")
|
||||
.setRepartitionSourceTopics(List.of(REPARTITION_TOPIC1, REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT));
|
||||
final RepartitionTopics repartitionTopics = new RepartitionTopics(
|
||||
LOG_CONTEXT,
|
||||
List.of(subtopology, subtopologyWithoutPartitionCount),
|
||||
RepartitionTopicsTest::sourceTopicPartitionCounts
|
||||
);
|
||||
|
||||
final Map<String, Integer> setup = repartitionTopics.setup();
|
||||
|
||||
assertEquals(Map.of(
|
||||
REPARTITION_TOPIC1.name(), REPARTITION_TOPIC1.partitions(),
|
||||
REPARTITION_TOPIC2.name(), REPARTITION_TOPIC2.partitions(),
|
||||
REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT.name(), sourceTopicPartitionCounts(SOURCE_TOPIC_NAME1).getAsInt()
|
||||
), setup);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldSetRepartitionTopicPartitionCountFromUpstreamInternalRepartitionSourceTopic() {
|
||||
final Subtopology subtopology = new Subtopology()
|
||||
.setSubtopologyId("subtopology0")
|
||||
.setSourceTopics(List.of(SOURCE_TOPIC_NAME1))
|
||||
.setRepartitionSourceTopics(List.of(REPARTITION_TOPIC1))
|
||||
.setRepartitionSinkTopics(List.of(REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT.name()));
|
||||
final Subtopology subtopologyWithoutPartitionCount = new Subtopology()
|
||||
.setSubtopologyId("subtopologyWithoutPartitionCount")
|
||||
.setRepartitionSourceTopics(List.of(REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT))
|
||||
.setRepartitionSinkTopics(List.of(REPARTITION_TOPIC1.name()));
|
||||
final RepartitionTopics repartitionTopics = new RepartitionTopics(
|
||||
LOG_CONTEXT,
|
||||
List.of(subtopology, subtopologyWithoutPartitionCount),
|
||||
RepartitionTopicsTest::sourceTopicPartitionCounts
|
||||
);
|
||||
|
||||
final Map<String, Integer> setup = repartitionTopics.setup();
|
||||
|
||||
assertEquals(
|
||||
Map.of(
|
||||
REPARTITION_TOPIC1.name(), REPARTITION_TOPIC1.partitions(),
|
||||
REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT.name(), REPARTITION_TOPIC1.partitions()
|
||||
),
|
||||
setup
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldNotSetupRepartitionTopicsWhenTopologyDoesNotContainAnyRepartitionTopics() {
|
||||
final Subtopology subtopology = new Subtopology()
|
||||
.setSubtopologyId("subtopology0")
|
||||
.setSourceTopics(List.of(SOURCE_TOPIC_NAME1));
|
||||
final RepartitionTopics repartitionTopics = new RepartitionTopics(
|
||||
LOG_CONTEXT,
|
||||
List.of(subtopology),
|
||||
RepartitionTopicsTest::sourceTopicPartitionCounts
|
||||
);
|
||||
|
||||
final Map<String, Integer> setup = repartitionTopics.setup();
|
||||
|
||||
assertEquals(Collections.emptyMap(), setup);
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue