diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html index b1af752754a..be7a95b69a8 100644 --- a/docs/streams/upgrade-guide.html +++ b/docs/streams/upgrade-guide.html @@ -86,6 +86,14 @@ More details about the new config StreamsConfig#TOPOLOGY_OPTIMIZATION can be found in KIP-295.

+

Streams API changes in 2.7.0

+

+ The StreamsConfig variable for configuration parameter "topology.optimization" + is renamed from TOPOLOGY_OPTIMIZATION to TOPOLOGY_OPTIMIZATION_CONFIG. + The old variable is deprecated. Note, that the parameter name itself is not affected. + (Cf. KIP-629.) +

+

Streams API changes in 2.6.0

We added a new processing mode that improves application scalability using exactly-once guarantees diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index f9e75fe7fb9..af65fd05206 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -207,12 +207,12 @@ public class StreamsConfig extends AbstractConfig { public static final String ADMIN_CLIENT_PREFIX = "admin."; /** - * Config value for parameter {@link #TOPOLOGY_OPTIMIZATION "topology.optimization"} for disabling topology optimization + * Config value for parameter {@link #TOPOLOGY_OPTIMIZATION_CONFIG "topology.optimization"} for disabling topology optimization */ public static final String NO_OPTIMIZATION = "none"; /** - * Config value for parameter {@link #TOPOLOGY_OPTIMIZATION "topology.optimization"} for enabling topology optimization + * Config value for parameter {@link #TOPOLOGY_OPTIMIZATION_CONFIG "topology.optimization"} for enabling topology optimization */ public static final String OPTIMIZE = "all"; @@ -524,7 +524,7 @@ public class StreamsConfig extends AbstractConfig { private static final String STATE_DIR_DOC = "Directory location for state store. This path must be unique for each streams instance sharing the same underlying filesystem."; /** {@code topology.optimization} */ - public static final String TOPOLOGY_OPTIMIZATION = "topology.optimization"; + public static final String TOPOLOGY_OPTIMIZATION_CONFIG = "topology.optimization"; private static final String TOPOLOGY_OPTIMIZATION_DOC = "A configuration telling Kafka Streams if it should optimize the topology, disabled by default"; /** {@code upgrade.from} */ @@ -552,6 +552,13 @@ public class StreamsConfig extends AbstractConfig { private static final String PARTITION_GROUPER_CLASS_DOC = "Partition grouper class that implements the org.apache.kafka.streams.processor.PartitionGrouper interface." + " WARNING: This config is deprecated and will be removed in 3.0.0 release."; + /** + * {@code topology.optimization} + * @deprecated since 2.7; use {@link #TOPOLOGY_OPTIMIZATION_CONFIG} instead + */ + @Deprecated + public static final String TOPOLOGY_OPTIMIZATION = TOPOLOGY_OPTIMIZATION_CONFIG; + private static final String[] NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS = new String[] {ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG}; @@ -664,7 +671,7 @@ public class StreamsConfig extends AbstractConfig { CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL, Importance.MEDIUM, CommonClientConfigs.SECURITY_PROTOCOL_DOC) - .define(TOPOLOGY_OPTIMIZATION, + .define(TOPOLOGY_OPTIMIZATION_CONFIG, Type.STRING, NO_OPTIMIZATION, in(NO_OPTIMIZATION, OPTIMIZE), diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java index bcc911c3af6..f5f02dc2e41 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java @@ -933,7 +933,7 @@ public interface KStream { * For this case, all data of this stream will be redistributed through the repartitioning topic by writing all * records to it, and rereading all records from it, such that the resulting {@link KTable} is partitioned * correctly on its key. - * Note that you cannot enable {@link StreamsConfig#TOPOLOGY_OPTIMIZATION} config for this case, because + * Note that you cannot enable {@link StreamsConfig#TOPOLOGY_OPTIMIZATION_CONFIG} config for this case, because * repartition topics are considered transient and don't allow to recover the result {@link KTable} in cause of * a failure; hence, a dedicated changelog topic is required to guarantee fault-tolerance. *

@@ -959,7 +959,7 @@ public interface KStream { * For this case, all data of this stream will be redistributed through the repartitioning topic by writing all * records to it, and rereading all records from it, such that the resulting {@link KTable} is partitioned * correctly on its key. - * Note that you cannot enable {@link StreamsConfig#TOPOLOGY_OPTIMIZATION} config for this case, because + * Note that you cannot enable {@link StreamsConfig#TOPOLOGY_OPTIMIZATION_CONFIG} config for this case, because * repartition topics are considered transient and don't allow to recover the result {@link KTable} in cause of * a failure; hence, a dedicated changelog topic is required to guarantee fault-tolerance. *

@@ -986,7 +986,7 @@ public interface KStream { * For this case, all data of this stream will be redistributed through the repartitioning topic by writing all * records to it, and rereading all records from it, such that the resulting {@link KTable} is partitioned * correctly on its key. - * Note that you cannot enable {@link StreamsConfig#TOPOLOGY_OPTIMIZATION} config for this case, because + * Note that you cannot enable {@link StreamsConfig#TOPOLOGY_OPTIMIZATION_CONFIG} config for this case, because * repartition topics are considered transient and don't allow to recover the result {@link KTable} in cause of * a failure; hence, a dedicated changelog topic is required to guarantee fault-tolerance. *

@@ -1014,7 +1014,7 @@ public interface KStream { * For this case, all data of this stream will be redistributed through the repartitioning topic by writing all * records to it, and rereading all records from it, such that the resulting {@link KTable} is partitioned * correctly on its key. - * Note that you cannot enable {@link StreamsConfig#TOPOLOGY_OPTIMIZATION} config for this case, because + * Note that you cannot enable {@link StreamsConfig#TOPOLOGY_OPTIMIZATION_CONFIG} config for this case, because * repartition topics are considered transient and don't allow to recover the result {@link KTable} in cause of * a failure; hence, a dedicated changelog topic is required to guarantee fault-tolerance. *

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java index e6aa2afc0b4..f859db8edd0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java @@ -317,7 +317,7 @@ public class InternalStreamsBuilder implements InternalNameProvider { private void maybePerformOptimizations(final Properties props) { - if (props != null && StreamsConfig.OPTIMIZE.equals(props.getProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION))) { + if (props != null && StreamsConfig.OPTIMIZE.equals(props.getProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG))) { LOG.debug("Optimizing the Kafka Streams graph for repartition nodes"); optimizeKTableSourceTopics(); maybeOptimizeRepartitionOperations(); diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java index f77b3e586e8..bdc60a78330 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java @@ -397,7 +397,7 @@ public class StreamsBuilderTest { final String topic = "topic"; builder.table(topic, Materialized.>as("store")); final Properties props = StreamsTestUtils.getStreamsConfig(); - props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE); + props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE); final Topology topology = builder.build(props); final InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(topology); diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index 92d2881320d..1745cc7a453 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -47,7 +47,7 @@ import static org.apache.kafka.common.IsolationLevel.READ_COMMITTED; import static org.apache.kafka.common.IsolationLevel.READ_UNCOMMITTED; import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE; import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE_BETA; -import static org.apache.kafka.streams.StreamsConfig.TOPOLOGY_OPTIMIZATION; +import static org.apache.kafka.streams.StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG; import static org.apache.kafka.streams.StreamsConfig.adminClientPrefix; import static org.apache.kafka.streams.StreamsConfig.consumerPrefix; import static org.apache.kafka.streams.StreamsConfig.producerPrefix; @@ -895,22 +895,22 @@ public class StreamsConfigTest { @Test public void shouldSpecifyNoOptimizationWhenNotExplicitlyAddedToConfigs() { final String expectedOptimizeConfig = "none"; - final String actualOptimizedConifig = streamsConfig.getString(TOPOLOGY_OPTIMIZATION); + final String actualOptimizedConifig = streamsConfig.getString(TOPOLOGY_OPTIMIZATION_CONFIG); assertEquals("Optimization should be \"none\"", expectedOptimizeConfig, actualOptimizedConifig); } @Test public void shouldSpecifyOptimizationWhenNotExplicitlyAddedToConfigs() { final String expectedOptimizeConfig = "all"; - props.put(TOPOLOGY_OPTIMIZATION, "all"); + props.put(TOPOLOGY_OPTIMIZATION_CONFIG, "all"); final StreamsConfig config = new StreamsConfig(props); - final String actualOptimizedConifig = config.getString(TOPOLOGY_OPTIMIZATION); + final String actualOptimizedConifig = config.getString(TOPOLOGY_OPTIMIZATION_CONFIG); assertEquals("Optimization should be \"all\"", expectedOptimizeConfig, actualOptimizedConifig); } @Test(expected = ConfigException.class) public void shouldThrowConfigExceptionWhenOptimizationConfigNotValueInRange() { - props.put(TOPOLOGY_OPTIMIZATION, "maybe"); + props.put(TOPOLOGY_OPTIMIZATION_CONFIG, "maybe"); new StreamsConfig(props); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java index 03410e057b2..a12bb854cbf 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java @@ -131,7 +131,7 @@ public class KStreamRepartitionIntegrationTest { streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); - streamsConfiguration.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, topologyOptimization); + streamsConfiguration.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, topologyOptimization); } @After diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java index 5ebd9ef8100..d774fa627fa 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java @@ -95,7 +95,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest { mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName), mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "asdf:0000"), mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()), - mkEntry(StreamsConfig.TOPOLOGY_OPTIMIZATION, optimization) + mkEntry(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, optimization) )); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java index 2e6d6c06f66..d288c3f9ee4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java @@ -156,7 +156,7 @@ public class LagFetchIntegrationTest { props.put(StreamsConfig.InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS, FallbackPriorTaskAssignor.class.getName()); props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:" + i); props.put(StreamsConfig.CLIENT_ID_CONFIG, "instance-" + i); - props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, optimization); + props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, optimization); props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); props.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(stateStoreName + i).getAbsolutePath()); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java index fbf40e2835e..2507436f65a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java @@ -180,7 +180,7 @@ public class OptimizedKTableIntegrationTest { private Properties streamsConfiguration() { final String safeTestName = safeUniqueTestName(getClass(), testName); final Properties config = new Properties(); - config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE); + config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE); config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName); config.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:" + (++port)); config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java index faac172b639..0dad8c31233 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java @@ -135,7 +135,7 @@ public class RestoreIntegrationTest { final StreamsBuilder builder = new StreamsBuilder(); final Properties props = props(); - props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE); + props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE); // restoring from 1000 to 4000 (committed), and then process from 4000 to 5000 on each of the two partitions final int offsetLimitDelta = 1000; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java index 3bf2d8ffaa4..79f14b5d2ec 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java @@ -127,9 +127,9 @@ public class StandbyTaskCreationIntegrationTest { @Test public void shouldCreateStandByTasksForMaterializedAndOptimizedSourceTables() throws Exception { final Properties streamsConfiguration1 = streamsConfiguration(); - streamsConfiguration1.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE); + streamsConfiguration1.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE); final Properties streamsConfiguration2 = streamsConfiguration(); - streamsConfiguration2.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE); + streamsConfiguration2.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE); final StreamsBuilder builder = new StreamsBuilder(); builder.table(INPUT_TOPIC, Consumed.with(Serdes.Integer(), Serdes.Integer()), Materialized.as("source-table")); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java index fb7310178ff..ef34def3221 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java @@ -389,7 +389,7 @@ public class StoreQueryIntegrationTest { private Properties streamsConfiguration() { final String safeTestName = safeUniqueTestName(getClass(), testName); final Properties config = new Properties(); - config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE); + config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE); config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName); config.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:" + (++port)); config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java index 5638dbdc5cd..c286b142459 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java @@ -192,7 +192,7 @@ public class RepartitionTopicNamingTest { kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(10L))).count(); kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(30L))).count(); final Properties properties = new Properties(); - properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE); + properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE); final Topology topology = builder.build(properties); assertThat(getCountOfRepartitionTopicsFound(topology.describe().toString(), repartitionTopicPattern), is(1)); } @@ -224,7 +224,7 @@ public class RepartitionTopicNamingTest { public void shouldPassWithSameRepartitionTopicNameUsingSameKGroupedStreamOptimized() { final StreamsBuilder builder = new StreamsBuilder(); final Properties properties = new Properties(); - properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE); + properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE); final KGroupedStream kGroupedStream = builder.stream("topic") .selectKey((k, v) -> k) .groupByKey(Grouped.as("grouping")); @@ -500,7 +500,7 @@ public class RepartitionTopicNamingTest { final Properties properties = new Properties(); - properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, optimizationConfig); + properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, optimizationConfig); return builder.build(properties); } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/CogroupedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/CogroupedKStreamImplTest.java index d6195bea405..d780fa7d5a1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/CogroupedKStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/CogroupedKStreamImplTest.java @@ -379,7 +379,7 @@ public class CogroupedKStreamImplTest { @Test public void shouldInsertRepartitionsTopicForUpstreamKeyModificationWithGroupedReusedInSameCogroupsWithOptimization() { final Properties properties = new Properties(); - properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE); + properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE); final StreamsBuilder builder = new StreamsBuilder(); final KStream stream1 = builder.stream("one", stringConsumed); @@ -528,7 +528,7 @@ public class CogroupedKStreamImplTest { final StreamsBuilder builder = new StreamsBuilder(); final Properties properties = new Properties(); - properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE); + properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE); final KStream stream1 = builder.stream("one", stringConsumed); final KStream stream2 = builder.stream("two", stringConsumed); @@ -653,7 +653,7 @@ public class CogroupedKStreamImplTest { final StreamsBuilder builder = new StreamsBuilder(); final Properties properties = new Properties(); - properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE); + properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE); final KStream stream1 = builder.stream("one", stringConsumed); final KStream stream2 = builder.stream("two", stringConsumed); @@ -707,7 +707,7 @@ public class CogroupedKStreamImplTest { final StreamsBuilder builder = new StreamsBuilder(); final Properties properties = new Properties(); - properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE); + properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE); final KStream stream1 = builder.stream("one", stringConsumed); final KStream stream2 = builder.stream("two", stringConsumed); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java index 56190eed3aa..4dd7f70d6bd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java @@ -84,7 +84,7 @@ public class KStreamKStreamJoinTest { public void shouldReuseRepartitionTopicWithGeneratedName() { final StreamsBuilder builder = new StreamsBuilder(); final Properties props = new Properties(); - props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.NO_OPTIMIZATION); + props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.NO_OPTIMIZATION); final KStream stream1 = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String())); final KStream stream2 = builder.stream("topic2", Consumed.with(Serdes.String(), Serdes.String())); final KStream stream3 = builder.stream("topic3", Consumed.with(Serdes.String(), Serdes.String())); @@ -98,7 +98,7 @@ public class KStreamKStreamJoinTest { public void shouldCreateRepartitionTopicsWithUserProvidedName() { final StreamsBuilder builder = new StreamsBuilder(); final Properties props = new Properties(); - props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.NO_OPTIMIZATION); + props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.NO_OPTIMIZATION); final KStream stream1 = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String())); final KStream stream2 = builder.stream("topic2", Consumed.with(Serdes.String(), Serdes.String())); final KStream stream3 = builder.stream("topic3", Consumed.with(Serdes.String(), Serdes.String())); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java index a5368cde9c3..c5067797595 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java @@ -118,7 +118,7 @@ public class KStreamKTableJoinTest { public void shouldReuseRepartitionTopicWithGeneratedName() { final StreamsBuilder builder = new StreamsBuilder(); final Properties props = new Properties(); - props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.NO_OPTIMIZATION); + props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.NO_OPTIMIZATION); final KStream streamA = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String())); final KTable tableB = builder.table("topic2", Consumed.with(Serdes.String(), Serdes.String())); final KTable tableC = builder.table("topic3", Consumed.with(Serdes.String(), Serdes.String())); @@ -133,7 +133,7 @@ public class KStreamKTableJoinTest { public void shouldCreateRepartitionTopicsWithUserProvidedName() { final StreamsBuilder builder = new StreamsBuilder(); final Properties props = new Properties(); - props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.NO_OPTIMIZATION); + props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.NO_OPTIMIZATION); final KStream streamA = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String())); final KTable tableB = builder.table("topic2", Consumed.with(Serdes.String(), Serdes.String())); final KTable tableC = builder.table("topic3", Consumed.with(Serdes.String(), Serdes.String())); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java index d8e8303f633..96016027434 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java @@ -89,7 +89,7 @@ public class StreamsGraphTest { final Properties properties = new Properties(); properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "test-application"); properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); - properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE); + properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE); final StreamsBuilder builder = new StreamsBuilder(); final KStream inputStream = builder.stream("inputTopic"); @@ -118,7 +118,7 @@ public class StreamsGraphTest { final Properties properties = new Properties(); properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "test-application"); properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); - properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE); + properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE); final StreamsBuilder builder = new StreamsBuilder(); initializer = () -> ""; @@ -222,7 +222,7 @@ public class StreamsGraphTest { .to("output_topic"); final Properties properties = new Properties(); - properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE); + properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE); final Topology topology = streamsBuilder.build(properties); assertEquals(expectedMergeOptimizedTopology, topology.describe().toString()); @@ -242,7 +242,7 @@ public class StreamsGraphTest { final StreamsBuilder builder = new StreamsBuilder(); final Properties properties = new Properties(); - properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, optimizeConfig); + properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, optimizeConfig); final KStream inputStream = builder.stream("input"); final KStream mappedKeyStream = inputStream.selectKey((k, v) -> k + v); @@ -259,7 +259,7 @@ public class StreamsGraphTest { final StreamsBuilder builder = new StreamsBuilder(); final Properties properties = new Properties(); - properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, optimizeConfig); + properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, optimizeConfig); final KStream inputStream = builder.stream("input"); final KStream mappedKeyStream = inputStream.selectKey((k, v) -> k + v).through("through-topic"); @@ -274,7 +274,7 @@ public class StreamsGraphTest { private Topology getTopologyWithRepartitionOperation(final String optimizeConfig) { final StreamsBuilder builder = new StreamsBuilder(); final Properties properties = new Properties(); - properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, optimizeConfig); + properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, optimizeConfig); final KStream inputStream = builder.stream("input").selectKey((k, v) -> k + v); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java index 3dcb2a3f4e1..681119829f4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java @@ -200,7 +200,7 @@ public class RepartitionOptimizingTest { .withOtherValueSerde(Serdes.Long())) .to(JOINED_TOPIC, Produced.as("join-to")); - streamsConfiguration.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, optimizationConfig); + streamsConfiguration.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, optimizationConfig); final Topology topology = builder.build(streamsConfiguration); topologyTestDriver = new TopologyTestDriver(topology, streamsConfiguration); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionWithMergeOptimizingTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionWithMergeOptimizingTest.java index bd2656cb4b0..b50ef8ecee0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionWithMergeOptimizingTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionWithMergeOptimizingTest.java @@ -120,7 +120,7 @@ public class RepartitionWithMergeOptimizingTest { private void runTest(final String optimizationConfig, final int expectedNumberRepartitionTopics) { - streamsConfiguration.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, optimizationConfig); + streamsConfiguration.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, optimizationConfig); final StreamsBuilder builder = new StreamsBuilder(); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java index b3d4169cd9f..a2c2b4602e9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java @@ -1941,7 +1941,7 @@ public class StreamsPartitionAssignorTest { createDefaultMockTaskManager(); EasyMock.expect(taskManager.mainConsumer()).andStubReturn(consumerClient); - configurePartitionAssignorWith(singletonMap(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE)); + configurePartitionAssignorWith(singletonMap(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE)); overwriteInternalTopicManagerWithMock(false); EasyMock.expect(consumerClient.committed(changelogs)) diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala index 3107db6f1f2..3cf0059b409 100644 --- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala +++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala @@ -335,10 +335,10 @@ class TopologyTest { def shouldBuildIdenticalTopologyInJavaNScalaProperties(): Unit = { val props = new Properties() - props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE) + props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE) val propsNoOptimization = new Properties() - propsNoOptimization.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.NO_OPTIMIZATION) + propsNoOptimization.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.NO_OPTIMIZATION) val AGGREGATION_TOPIC = "aggregationTopic" val REDUCE_TOPIC = "reduceTopic"