KAFKA-14209: Change Topology optimization to accept list of rules 1/3 (#12641)

This PR is part of a series implementing the self-join rewriting. As part of it, we decided to clean up the TOPOLOGY_OPTIMIZATION_CONFIG and make it a list of optimization rules. Acceptable values are: NO_OPTIMIZATION, OPTIMIZE which applies all optimization rules or a comma separated list of specific optimizations.

Reviewers: Guozhang Wang <guozhang@apache.org>, John Roesler <vvcephei@apache.org>
This commit is contained in:
Vicky Papavasileiou 2022-09-22 17:20:37 +01:00 committed by GitHub
parent 3549a5524e
commit cda5da9b65
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 157 additions and 23 deletions

View File

@ -424,8 +424,8 @@ streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG), 1);</code></pre>
</tr>
<tr class="row-even"><td>topology.optimization</td>
<td>Medium</td>
<td colspan="2">A configuration telling Kafka Streams if it should optimize the topology</td>
<td>none</td>
<td colspan="2">A configuration telling Kafka Streams if it should optimize the topology and what optimizations to apply. Acceptable values are: <code>StreamsConfig.NO_OPTIMIZATION</code> (<code>none</code>), <code>StreamsConfig.OPTIMIZE</code> (<code>all</code>) or a comma separated list of specific optimizations: (<code>StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS</code> (<code>reuse.ktable.source.topics</code>), <code>StreamsConfig.MERGE_REPARTITION_TOPICS</code> (<code>merge.repartition.topics</code>)). </td>
<td><code> NO_OPTIMIZATION</code></td>
</tr>
<tr class="row-odd"><td>upgrade.from</td>
<td>Medium</td>
@ -942,8 +942,13 @@ streamsConfig.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, CustomRocksD
<blockquote>
<div>
<p>
You can tell Streams to apply topology optimizations by setting this config. The optimizations are currently all or none and disabled by default.
These optimizations include moving/reducing repartition topics and reusing the source topic as the changelog for source KTables. It is recommended to enable this.
A configuration telling Kafka Streams if it should optimize the topology and what optimizations to apply. Acceptable values are: <code>StreamsConfig.NO_OPTIMIZATION</code> (<code>none</code>), <code>StreamsConfig.OPTIMIZE</code> (<code>all</code>) or a comma separated list of specific optimizations: (<code>StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS</code> (<code>reuse.ktable.source.topics</code>), <code>StreamsConfig.MERGE_REPARTITION_TOPICS</code> (<code>merge.repartition.topics</code>)).
</p>
<p>
We recommend listing specific optimizations in the config for production code so that the structure of your topology will not change unexpectedly during upgrades of the Streams library.
</p>
<p>
These optimizations include moving/reducing repartition topics and reusing the source topic as the changelog for source KTables. These optimizations will save on network traffic and storage in Kafka without changing the semantics of your applications. Enabling them is recommended.
</p>
<p>
Note that as of 2.3, you need to do two things to enable optimizations. In addition to setting this config to <code>StreamsConfig.OPTIMIZE</code>, you'll need to pass in your

View File

@ -621,11 +621,7 @@ public class StreamsBuilder {
* @return the {@link Topology} that represents the specified processing logic
*/
public synchronized Topology build(final Properties props) {
final boolean optimizeTopology =
props != null &&
StreamsConfig.OPTIMIZE.equals(props.getProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG));
internalStreamsBuilder.buildAndOptimizeTopology(optimizeTopology);
internalStreamsBuilder.buildAndOptimizeTopology(props);
return topology;
}
}

View File

@ -16,6 +16,8 @@
*/
package org.apache.kafka.streams;
import java.util.Arrays;
import java.util.HashSet;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
@ -233,6 +235,18 @@ public class StreamsConfig extends AbstractConfig {
@SuppressWarnings("WeakerAccess")
public static final String CLIENT_TAG_PREFIX = "client.tag.";
/** {@code topology.optimization} */
private static final String CONFIG_ERROR_MSG = "Acceptable values are:"
+ " \"+NO_OPTIMIZATION+\", \"+OPTIMIZE+\", "
+ "or a comma separated list of specific optimizations: "
+ "(\"+REUSE_KTABLE_SOURCE_TOPICS+\", \"+MERGE_REPARTITION_TOPICS+\").";
public static final String TOPOLOGY_OPTIMIZATION_CONFIG = "topology.optimization";
private static final String TOPOLOGY_OPTIMIZATION_DOC = "A configuration telling Kafka "
+ "Streams if it should optimize the topology and what optimizations to apply. "
+ CONFIG_ERROR_MSG
+ "\"NO_OPTIMIZATION\" by default.";
/**
* Config value for parameter {@link #TOPOLOGY_OPTIMIZATION_CONFIG "topology.optimization"} for disabling topology optimization
*/
@ -243,6 +257,22 @@ public class StreamsConfig extends AbstractConfig {
*/
public static final String OPTIMIZE = "all";
/**
* Config value for parameter {@link #TOPOLOGY_OPTIMIZATION_CONFIG "topology.optimization"}
* for enabling the specific optimization that reuses source topic as changelog topic
* for KTables.
*/
public static final String REUSE_KTABLE_SOURCE_TOPICS = "reuse.ktable.source.topics";
/**
* Config value for parameter {@link #TOPOLOGY_OPTIMIZATION_CONFIG "topology.optimization"}
* for enabling the specific optimization that merges duplicated repartition topics.
*/
public static final String MERGE_REPARTITION_TOPICS = "merge.repartition.topics";
private static final List<String> TOPOLOGY_OPTIMIZATION_CONFIGS = Arrays.asList(
OPTIMIZE, NO_OPTIMIZATION, REUSE_KTABLE_SOURCE_TOPICS, MERGE_REPARTITION_TOPICS);
/**
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 0.10.0.x}.
*/
@ -663,9 +693,6 @@ public class StreamsConfig extends AbstractConfig {
"For a timeout of 0ms, a task would raise an error for the first internal error. " +
"For any timeout larger than 0ms, a task will retry at least once before an error is raised.";
/** {@code topology.optimization} */
public static final String TOPOLOGY_OPTIMIZATION_CONFIG = "topology.optimization";
private static final String TOPOLOGY_OPTIMIZATION_DOC = "A configuration telling Kafka Streams if it should optimize the topology, disabled by default";
/** {@code window.size.ms} */
public static final String WINDOW_SIZE_MS_CONFIG = "window.size.ms";
@ -845,7 +872,7 @@ public class StreamsConfig extends AbstractConfig {
.define(TOPOLOGY_OPTIMIZATION_CONFIG,
Type.STRING,
NO_OPTIMIZATION,
in(NO_OPTIMIZATION, OPTIMIZE),
(name, value) -> verifyTopologyOptimizationConfigs((String) value),
Importance.MEDIUM,
TOPOLOGY_OPTIMIZATION_DOC)
@ -1265,6 +1292,7 @@ public class StreamsConfig extends AbstractConfig {
if (eosEnabled) {
verifyEOSTransactionTimeoutCompatibility();
}
verifyTopologyOptimizationConfigs(getString(TOPOLOGY_OPTIMIZATION_CONFIG));
}
private void verifyEOSTransactionTimeoutCompatibility() {
@ -1653,6 +1681,29 @@ public class StreamsConfig extends AbstractConfig {
return props;
}
public static Set<String> verifyTopologyOptimizationConfigs(final String config) {
final List<String> configs = Arrays.asList(config.split("\\s*,\\s*"));
final Set<String> verifiedConfigs = new HashSet<>();
// Verify it doesn't contain none or all plus a list of optimizations
if (configs.contains(NO_OPTIMIZATION) || configs.contains(OPTIMIZE)) {
if (configs.size() > 1) {
throw new ConfigException("\"" + config + "\" is not a valid optimization config. " + CONFIG_ERROR_MSG);
}
}
for (final String conf: configs) {
if (!TOPOLOGY_OPTIMIZATION_CONFIGS.contains(conf)) {
throw new ConfigException("Unrecognized config. " + CONFIG_ERROR_MSG);
}
}
if (configs.contains(OPTIMIZE)) {
verifiedConfigs.add(REUSE_KTABLE_SOURCE_TOPICS);
verifiedConfigs.add(MERGE_REPARTITION_TOPICS);
} else if (!configs.contains(NO_OPTIMIZATION)) {
verifiedConfigs.addAll(configs);
}
return verifiedConfigs;
}
/**
* Return an {@link Serde#configure(Map, boolean) configured} instance of {@link #DEFAULT_KEY_SERDE_CLASS_CONFIG key Serde
* class}.

View File

@ -16,9 +16,11 @@
*/
package org.apache.kafka.streams.kstream.internals;
import java.util.Properties;
import java.util.TreeMap;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.GlobalKTable;
@ -270,17 +272,12 @@ public class InternalStreamsBuilder implements InternalNameProvider {
// use this method for testing only
public void buildAndOptimizeTopology() {
buildAndOptimizeTopology(false);
buildAndOptimizeTopology(null);
}
public void buildAndOptimizeTopology(final boolean optimizeTopology) {
public void buildAndOptimizeTopology(final Properties props) {
mergeDuplicateSourceNodes();
if (optimizeTopology) {
LOG.debug("Optimizing the Kafka Streams graph for repartition nodes");
optimizeKTableSourceTopics();
maybeOptimizeRepartitionOperations();
}
optimizeTopology(props);
final PriorityQueue<GraphNode> graphNodePriorityQueue = new PriorityQueue<>(5, Comparator.comparing(GraphNode::buildPriority));
@ -305,6 +302,28 @@ public class InternalStreamsBuilder implements InternalNameProvider {
internalTopologyBuilder.validateCopartition();
}
/**
* A user can provide either the config OPTIMIZE which means all optimizations rules will be
* applied or they can provide a list of optimization rules.
*/
private void optimizeTopology(final Properties props) {
final Set<String> optimizationConfigs;
if (props == null || !props.containsKey(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG)) {
optimizationConfigs = Collections.emptySet();
} else {
optimizationConfigs = StreamsConfig.verifyTopologyOptimizationConfigs(
(String) props.get(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG));
}
if (optimizationConfigs.contains(StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS)) {
LOG.debug("Optimizing the Kafka Streams graph for ktable source nodes");
reuseKTableSourceTopics();
}
if (optimizationConfigs.contains(StreamsConfig.MERGE_REPARTITION_TOPICS)) {
LOG.debug("Optimizing the Kafka Streams graph for repartition nodes");
mergeRepartitionTopics();
}
}
private void mergeDuplicateSourceNodes() {
final Map<String, StreamSourceNode<?, ?>> topicsToSourceNodes = new HashMap<>();
@ -356,12 +375,12 @@ public class InternalStreamsBuilder implements InternalNameProvider {
}
}
private void optimizeKTableSourceTopics() {
private void reuseKTableSourceTopics() {
LOG.debug("Marking KTable source nodes to optimize using source topic for changelogs ");
tableSourceNodes.forEach(node -> ((TableSourceNode<?, ?>) node).reuseSourceTopicForChangeLog(true));
}
private void maybeOptimizeRepartitionOperations() {
private void mergeRepartitionTopics() {
maybeUpdateKeyChangingRepartitionNodeMap();
final Iterator<Entry<GraphNode, LinkedHashSet<OptimizableRepartitionNode<?, ?>>>> entryIterator =
keyChangingOperationsToOptimizableRepartitionNodes.entrySet().iterator();

View File

@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams;
import java.util.Set;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
@ -1263,6 +1264,68 @@ public class StreamsConfigTest {
assertTrue(ce.getMessage().contains(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
}
@Test
public void shouldThrowExceptionWhenTopologyOptimizationOnAndOff() {
final String value = String.join(",", StreamsConfig.OPTIMIZE, StreamsConfig.NO_OPTIMIZATION);
props.put(TOPOLOGY_OPTIMIZATION_CONFIG, value);
final ConfigException exception = assertThrows(ConfigException.class, () -> new StreamsConfig(props));
assertTrue(exception.getMessage().contains("is not a valid optimization config"));
}
@Test
public void shouldThrowExceptionWhenTopologyOptimizationOffAndSet() {
final String value = String.join(",", StreamsConfig.NO_OPTIMIZATION, StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS);
props.put(TOPOLOGY_OPTIMIZATION_CONFIG, value);
final ConfigException exception = assertThrows(ConfigException.class, () -> new StreamsConfig(props));
assertTrue(exception.getMessage().contains("is not a valid optimization config"));
}
@Test
public void shouldThrowExceptionWhenOptimizationDoesNotExistInList() {
final String value = String.join(",",
StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS,
"topology.optimization.does.not.exist",
StreamsConfig.MERGE_REPARTITION_TOPICS);
props.put(TOPOLOGY_OPTIMIZATION_CONFIG, value);
final ConfigException exception = assertThrows(ConfigException.class, () -> new StreamsConfig(props));
assertTrue(exception.getMessage().contains("Unrecognized config."));
}
@Test
public void shouldThrowExceptionWhenTopologyOptimizationDoesNotExist() {
final String value = String.join(",", "topology.optimization.does.not.exist");
props.put(TOPOLOGY_OPTIMIZATION_CONFIG, value);
final ConfigException exception = assertThrows(ConfigException.class, () -> new StreamsConfig(props));
assertTrue(exception.getMessage().contains("Unrecognized config."));
}
@Test
public void shouldAllowMultipleOptimizations() {
final String value = String.join(",",
StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS,
StreamsConfig.MERGE_REPARTITION_TOPICS);
props.put(TOPOLOGY_OPTIMIZATION_CONFIG, value);
final StreamsConfig config = new StreamsConfig(props);
final List<String> configs = Arrays.asList(config.getString(TOPOLOGY_OPTIMIZATION_CONFIG).split(","));
assertEquals(2, configs.size());
assertTrue(configs.contains(StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS));
assertTrue(configs.contains(StreamsConfig.MERGE_REPARTITION_TOPICS));
}
@Test
public void shouldEnableAllOptimizationsWithOptimizeConfig() {
final Set<String> configs = StreamsConfig.verifyTopologyOptimizationConfigs(StreamsConfig.OPTIMIZE);
assertEquals(2, configs.size());
assertTrue(configs.contains(StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS));
assertTrue(configs.contains(StreamsConfig.MERGE_REPARTITION_TOPICS));
}
@Test
public void shouldNotEnableAnyOptimizationsWithNoOptimizationConfig() {
final Set<String> configs = StreamsConfig.verifyTopologyOptimizationConfigs(StreamsConfig.NO_OPTIMIZATION);
assertEquals(0, configs.size());
}
static class MisconfiguredSerde implements Serde<Object> {
@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {