mirror of https://github.com/apache/kafka.git
KAFKA-10168: fix StreamsConfig parameter name variable (#8865)
Implements KIP-626. Reviewers: Boyang Chen <boyang@confluent.io>, John Roesler <john@confluent.io>
This commit is contained in:
parent
965877c11c
commit
712cc5d073
|
@ -86,6 +86,14 @@
|
||||||
More details about the new config <code>StreamsConfig#TOPOLOGY_OPTIMIZATION</code> can be found in <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-295%3A+Add+Streams+Configuration+Allowing+for+Optional+Topology+Optimization">KIP-295</a>.
|
More details about the new config <code>StreamsConfig#TOPOLOGY_OPTIMIZATION</code> can be found in <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-295%3A+Add+Streams+Configuration+Allowing+for+Optional+Topology+Optimization">KIP-295</a>.
|
||||||
</p>
|
</p>
|
||||||
|
|
||||||
|
<h3><a id="streams_api_changes_270" href="#streams_api_changes_270">Streams API changes in 2.7.0</a></h3>
|
||||||
|
<p>
|
||||||
|
The <code>StreamsConfig</code> variable for configuration parameter <code>"topology.optimization"</code>
|
||||||
|
is renamed from <code>TOPOLOGY_OPTIMIZATION</code> to <code>TOPOLOGY_OPTIMIZATION_CONFIG</code>.
|
||||||
|
The old variable is deprecated. Note, that the parameter name itself is not affected.
|
||||||
|
(Cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-626%3A+Rename+StreamsConfig+config+variable+name">KIP-629</a>.)
|
||||||
|
</p>
|
||||||
|
|
||||||
<h3><a id="streams_api_changes_260" href="#streams_api_changes_260">Streams API changes in 2.6.0</a></h3>
|
<h3><a id="streams_api_changes_260" href="#streams_api_changes_260">Streams API changes in 2.6.0</a></h3>
|
||||||
<p>
|
<p>
|
||||||
We added a new processing mode that improves application scalability using exactly-once guarantees
|
We added a new processing mode that improves application scalability using exactly-once guarantees
|
||||||
|
|
|
@ -207,12 +207,12 @@ public class StreamsConfig extends AbstractConfig {
|
||||||
public static final String ADMIN_CLIENT_PREFIX = "admin.";
|
public static final String ADMIN_CLIENT_PREFIX = "admin.";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Config value for parameter {@link #TOPOLOGY_OPTIMIZATION "topology.optimization"} for disabling topology optimization
|
* Config value for parameter {@link #TOPOLOGY_OPTIMIZATION_CONFIG "topology.optimization"} for disabling topology optimization
|
||||||
*/
|
*/
|
||||||
public static final String NO_OPTIMIZATION = "none";
|
public static final String NO_OPTIMIZATION = "none";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Config value for parameter {@link #TOPOLOGY_OPTIMIZATION "topology.optimization"} for enabling topology optimization
|
* Config value for parameter {@link #TOPOLOGY_OPTIMIZATION_CONFIG "topology.optimization"} for enabling topology optimization
|
||||||
*/
|
*/
|
||||||
public static final String OPTIMIZE = "all";
|
public static final String OPTIMIZE = "all";
|
||||||
|
|
||||||
|
@ -524,7 +524,7 @@ public class StreamsConfig extends AbstractConfig {
|
||||||
private static final String STATE_DIR_DOC = "Directory location for state store. This path must be unique for each streams instance sharing the same underlying filesystem.";
|
private static final String STATE_DIR_DOC = "Directory location for state store. This path must be unique for each streams instance sharing the same underlying filesystem.";
|
||||||
|
|
||||||
/** {@code topology.optimization} */
|
/** {@code topology.optimization} */
|
||||||
public static final String TOPOLOGY_OPTIMIZATION = "topology.optimization";
|
public static final String TOPOLOGY_OPTIMIZATION_CONFIG = "topology.optimization";
|
||||||
private static final String TOPOLOGY_OPTIMIZATION_DOC = "A configuration telling Kafka Streams if it should optimize the topology, disabled by default";
|
private static final String TOPOLOGY_OPTIMIZATION_DOC = "A configuration telling Kafka Streams if it should optimize the topology, disabled by default";
|
||||||
|
|
||||||
/** {@code upgrade.from} */
|
/** {@code upgrade.from} */
|
||||||
|
@ -552,6 +552,13 @@ public class StreamsConfig extends AbstractConfig {
|
||||||
private static final String PARTITION_GROUPER_CLASS_DOC = "Partition grouper class that implements the <code>org.apache.kafka.streams.processor.PartitionGrouper</code> interface." +
|
private static final String PARTITION_GROUPER_CLASS_DOC = "Partition grouper class that implements the <code>org.apache.kafka.streams.processor.PartitionGrouper</code> interface." +
|
||||||
" WARNING: This config is deprecated and will be removed in 3.0.0 release.";
|
" WARNING: This config is deprecated and will be removed in 3.0.0 release.";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@code topology.optimization}
|
||||||
|
* @deprecated since 2.7; use {@link #TOPOLOGY_OPTIMIZATION_CONFIG} instead
|
||||||
|
*/
|
||||||
|
@Deprecated
|
||||||
|
public static final String TOPOLOGY_OPTIMIZATION = TOPOLOGY_OPTIMIZATION_CONFIG;
|
||||||
|
|
||||||
|
|
||||||
private static final String[] NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS =
|
private static final String[] NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS =
|
||||||
new String[] {ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG};
|
new String[] {ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG};
|
||||||
|
@ -664,7 +671,7 @@ public class StreamsConfig extends AbstractConfig {
|
||||||
CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
|
CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
|
||||||
Importance.MEDIUM,
|
Importance.MEDIUM,
|
||||||
CommonClientConfigs.SECURITY_PROTOCOL_DOC)
|
CommonClientConfigs.SECURITY_PROTOCOL_DOC)
|
||||||
.define(TOPOLOGY_OPTIMIZATION,
|
.define(TOPOLOGY_OPTIMIZATION_CONFIG,
|
||||||
Type.STRING,
|
Type.STRING,
|
||||||
NO_OPTIMIZATION,
|
NO_OPTIMIZATION,
|
||||||
in(NO_OPTIMIZATION, OPTIMIZE),
|
in(NO_OPTIMIZATION, OPTIMIZE),
|
||||||
|
|
|
@ -933,7 +933,7 @@ public interface KStream<K, V> {
|
||||||
* For this case, all data of this stream will be redistributed through the repartitioning topic by writing all
|
* For this case, all data of this stream will be redistributed through the repartitioning topic by writing all
|
||||||
* records to it, and rereading all records from it, such that the resulting {@link KTable} is partitioned
|
* records to it, and rereading all records from it, such that the resulting {@link KTable} is partitioned
|
||||||
* correctly on its key.
|
* correctly on its key.
|
||||||
* Note that you cannot enable {@link StreamsConfig#TOPOLOGY_OPTIMIZATION} config for this case, because
|
* Note that you cannot enable {@link StreamsConfig#TOPOLOGY_OPTIMIZATION_CONFIG} config for this case, because
|
||||||
* repartition topics are considered transient and don't allow to recover the result {@link KTable} in cause of
|
* repartition topics are considered transient and don't allow to recover the result {@link KTable} in cause of
|
||||||
* a failure; hence, a dedicated changelog topic is required to guarantee fault-tolerance.
|
* a failure; hence, a dedicated changelog topic is required to guarantee fault-tolerance.
|
||||||
* <p>
|
* <p>
|
||||||
|
@ -959,7 +959,7 @@ public interface KStream<K, V> {
|
||||||
* For this case, all data of this stream will be redistributed through the repartitioning topic by writing all
|
* For this case, all data of this stream will be redistributed through the repartitioning topic by writing all
|
||||||
* records to it, and rereading all records from it, such that the resulting {@link KTable} is partitioned
|
* records to it, and rereading all records from it, such that the resulting {@link KTable} is partitioned
|
||||||
* correctly on its key.
|
* correctly on its key.
|
||||||
* Note that you cannot enable {@link StreamsConfig#TOPOLOGY_OPTIMIZATION} config for this case, because
|
* Note that you cannot enable {@link StreamsConfig#TOPOLOGY_OPTIMIZATION_CONFIG} config for this case, because
|
||||||
* repartition topics are considered transient and don't allow to recover the result {@link KTable} in cause of
|
* repartition topics are considered transient and don't allow to recover the result {@link KTable} in cause of
|
||||||
* a failure; hence, a dedicated changelog topic is required to guarantee fault-tolerance.
|
* a failure; hence, a dedicated changelog topic is required to guarantee fault-tolerance.
|
||||||
* <p>
|
* <p>
|
||||||
|
@ -986,7 +986,7 @@ public interface KStream<K, V> {
|
||||||
* For this case, all data of this stream will be redistributed through the repartitioning topic by writing all
|
* For this case, all data of this stream will be redistributed through the repartitioning topic by writing all
|
||||||
* records to it, and rereading all records from it, such that the resulting {@link KTable} is partitioned
|
* records to it, and rereading all records from it, such that the resulting {@link KTable} is partitioned
|
||||||
* correctly on its key.
|
* correctly on its key.
|
||||||
* Note that you cannot enable {@link StreamsConfig#TOPOLOGY_OPTIMIZATION} config for this case, because
|
* Note that you cannot enable {@link StreamsConfig#TOPOLOGY_OPTIMIZATION_CONFIG} config for this case, because
|
||||||
* repartition topics are considered transient and don't allow to recover the result {@link KTable} in cause of
|
* repartition topics are considered transient and don't allow to recover the result {@link KTable} in cause of
|
||||||
* a failure; hence, a dedicated changelog topic is required to guarantee fault-tolerance.
|
* a failure; hence, a dedicated changelog topic is required to guarantee fault-tolerance.
|
||||||
* <p>
|
* <p>
|
||||||
|
@ -1014,7 +1014,7 @@ public interface KStream<K, V> {
|
||||||
* For this case, all data of this stream will be redistributed through the repartitioning topic by writing all
|
* For this case, all data of this stream will be redistributed through the repartitioning topic by writing all
|
||||||
* records to it, and rereading all records from it, such that the resulting {@link KTable} is partitioned
|
* records to it, and rereading all records from it, such that the resulting {@link KTable} is partitioned
|
||||||
* correctly on its key.
|
* correctly on its key.
|
||||||
* Note that you cannot enable {@link StreamsConfig#TOPOLOGY_OPTIMIZATION} config for this case, because
|
* Note that you cannot enable {@link StreamsConfig#TOPOLOGY_OPTIMIZATION_CONFIG} config for this case, because
|
||||||
* repartition topics are considered transient and don't allow to recover the result {@link KTable} in cause of
|
* repartition topics are considered transient and don't allow to recover the result {@link KTable} in cause of
|
||||||
* a failure; hence, a dedicated changelog topic is required to guarantee fault-tolerance.
|
* a failure; hence, a dedicated changelog topic is required to guarantee fault-tolerance.
|
||||||
* <p>
|
* <p>
|
||||||
|
|
|
@ -317,7 +317,7 @@ public class InternalStreamsBuilder implements InternalNameProvider {
|
||||||
|
|
||||||
private void maybePerformOptimizations(final Properties props) {
|
private void maybePerformOptimizations(final Properties props) {
|
||||||
|
|
||||||
if (props != null && StreamsConfig.OPTIMIZE.equals(props.getProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION))) {
|
if (props != null && StreamsConfig.OPTIMIZE.equals(props.getProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG))) {
|
||||||
LOG.debug("Optimizing the Kafka Streams graph for repartition nodes");
|
LOG.debug("Optimizing the Kafka Streams graph for repartition nodes");
|
||||||
optimizeKTableSourceTopics();
|
optimizeKTableSourceTopics();
|
||||||
maybeOptimizeRepartitionOperations();
|
maybeOptimizeRepartitionOperations();
|
||||||
|
|
|
@ -397,7 +397,7 @@ public class StreamsBuilderTest {
|
||||||
final String topic = "topic";
|
final String topic = "topic";
|
||||||
builder.table(topic, Materialized.<Long, String, KeyValueStore<Bytes, byte[]>>as("store"));
|
builder.table(topic, Materialized.<Long, String, KeyValueStore<Bytes, byte[]>>as("store"));
|
||||||
final Properties props = StreamsTestUtils.getStreamsConfig();
|
final Properties props = StreamsTestUtils.getStreamsConfig();
|
||||||
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
|
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
|
||||||
final Topology topology = builder.build(props);
|
final Topology topology = builder.build(props);
|
||||||
|
|
||||||
final InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(topology);
|
final InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(topology);
|
||||||
|
|
|
@ -47,7 +47,7 @@ import static org.apache.kafka.common.IsolationLevel.READ_COMMITTED;
|
||||||
import static org.apache.kafka.common.IsolationLevel.READ_UNCOMMITTED;
|
import static org.apache.kafka.common.IsolationLevel.READ_UNCOMMITTED;
|
||||||
import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE;
|
import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE;
|
||||||
import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE_BETA;
|
import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE_BETA;
|
||||||
import static org.apache.kafka.streams.StreamsConfig.TOPOLOGY_OPTIMIZATION;
|
import static org.apache.kafka.streams.StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG;
|
||||||
import static org.apache.kafka.streams.StreamsConfig.adminClientPrefix;
|
import static org.apache.kafka.streams.StreamsConfig.adminClientPrefix;
|
||||||
import static org.apache.kafka.streams.StreamsConfig.consumerPrefix;
|
import static org.apache.kafka.streams.StreamsConfig.consumerPrefix;
|
||||||
import static org.apache.kafka.streams.StreamsConfig.producerPrefix;
|
import static org.apache.kafka.streams.StreamsConfig.producerPrefix;
|
||||||
|
@ -895,22 +895,22 @@ public class StreamsConfigTest {
|
||||||
@Test
|
@Test
|
||||||
public void shouldSpecifyNoOptimizationWhenNotExplicitlyAddedToConfigs() {
|
public void shouldSpecifyNoOptimizationWhenNotExplicitlyAddedToConfigs() {
|
||||||
final String expectedOptimizeConfig = "none";
|
final String expectedOptimizeConfig = "none";
|
||||||
final String actualOptimizedConifig = streamsConfig.getString(TOPOLOGY_OPTIMIZATION);
|
final String actualOptimizedConifig = streamsConfig.getString(TOPOLOGY_OPTIMIZATION_CONFIG);
|
||||||
assertEquals("Optimization should be \"none\"", expectedOptimizeConfig, actualOptimizedConifig);
|
assertEquals("Optimization should be \"none\"", expectedOptimizeConfig, actualOptimizedConifig);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldSpecifyOptimizationWhenNotExplicitlyAddedToConfigs() {
|
public void shouldSpecifyOptimizationWhenNotExplicitlyAddedToConfigs() {
|
||||||
final String expectedOptimizeConfig = "all";
|
final String expectedOptimizeConfig = "all";
|
||||||
props.put(TOPOLOGY_OPTIMIZATION, "all");
|
props.put(TOPOLOGY_OPTIMIZATION_CONFIG, "all");
|
||||||
final StreamsConfig config = new StreamsConfig(props);
|
final StreamsConfig config = new StreamsConfig(props);
|
||||||
final String actualOptimizedConifig = config.getString(TOPOLOGY_OPTIMIZATION);
|
final String actualOptimizedConifig = config.getString(TOPOLOGY_OPTIMIZATION_CONFIG);
|
||||||
assertEquals("Optimization should be \"all\"", expectedOptimizeConfig, actualOptimizedConifig);
|
assertEquals("Optimization should be \"all\"", expectedOptimizeConfig, actualOptimizedConifig);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(expected = ConfigException.class)
|
@Test(expected = ConfigException.class)
|
||||||
public void shouldThrowConfigExceptionWhenOptimizationConfigNotValueInRange() {
|
public void shouldThrowConfigExceptionWhenOptimizationConfigNotValueInRange() {
|
||||||
props.put(TOPOLOGY_OPTIMIZATION, "maybe");
|
props.put(TOPOLOGY_OPTIMIZATION_CONFIG, "maybe");
|
||||||
new StreamsConfig(props);
|
new StreamsConfig(props);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -131,7 +131,7 @@ public class KStreamRepartitionIntegrationTest {
|
||||||
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
|
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
|
||||||
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
|
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
|
||||||
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
|
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
|
||||||
streamsConfiguration.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, topologyOptimization);
|
streamsConfiguration.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, topologyOptimization);
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
|
|
|
@ -95,7 +95,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
|
||||||
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName),
|
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName),
|
||||||
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "asdf:0000"),
|
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "asdf:0000"),
|
||||||
mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
|
mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
|
||||||
mkEntry(StreamsConfig.TOPOLOGY_OPTIMIZATION, optimization)
|
mkEntry(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, optimization)
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -156,7 +156,7 @@ public class LagFetchIntegrationTest {
|
||||||
props.put(StreamsConfig.InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS, FallbackPriorTaskAssignor.class.getName());
|
props.put(StreamsConfig.InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS, FallbackPriorTaskAssignor.class.getName());
|
||||||
props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:" + i);
|
props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:" + i);
|
||||||
props.put(StreamsConfig.CLIENT_ID_CONFIG, "instance-" + i);
|
props.put(StreamsConfig.CLIENT_ID_CONFIG, "instance-" + i);
|
||||||
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, optimization);
|
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, optimization);
|
||||||
props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
|
props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
|
||||||
props.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(stateStoreName + i).getAbsolutePath());
|
props.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(stateStoreName + i).getAbsolutePath());
|
||||||
|
|
||||||
|
|
|
@ -180,7 +180,7 @@ public class OptimizedKTableIntegrationTest {
|
||||||
private Properties streamsConfiguration() {
|
private Properties streamsConfiguration() {
|
||||||
final String safeTestName = safeUniqueTestName(getClass(), testName);
|
final String safeTestName = safeUniqueTestName(getClass(), testName);
|
||||||
final Properties config = new Properties();
|
final Properties config = new Properties();
|
||||||
config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
|
config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
|
||||||
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
|
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
|
||||||
config.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:" + (++port));
|
config.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:" + (++port));
|
||||||
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
|
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
|
||||||
|
|
|
@ -135,7 +135,7 @@ public class RestoreIntegrationTest {
|
||||||
final StreamsBuilder builder = new StreamsBuilder();
|
final StreamsBuilder builder = new StreamsBuilder();
|
||||||
|
|
||||||
final Properties props = props();
|
final Properties props = props();
|
||||||
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
|
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
|
||||||
|
|
||||||
// restoring from 1000 to 4000 (committed), and then process from 4000 to 5000 on each of the two partitions
|
// restoring from 1000 to 4000 (committed), and then process from 4000 to 5000 on each of the two partitions
|
||||||
final int offsetLimitDelta = 1000;
|
final int offsetLimitDelta = 1000;
|
||||||
|
|
|
@ -127,9 +127,9 @@ public class StandbyTaskCreationIntegrationTest {
|
||||||
@Test
|
@Test
|
||||||
public void shouldCreateStandByTasksForMaterializedAndOptimizedSourceTables() throws Exception {
|
public void shouldCreateStandByTasksForMaterializedAndOptimizedSourceTables() throws Exception {
|
||||||
final Properties streamsConfiguration1 = streamsConfiguration();
|
final Properties streamsConfiguration1 = streamsConfiguration();
|
||||||
streamsConfiguration1.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
|
streamsConfiguration1.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
|
||||||
final Properties streamsConfiguration2 = streamsConfiguration();
|
final Properties streamsConfiguration2 = streamsConfiguration();
|
||||||
streamsConfiguration2.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
|
streamsConfiguration2.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
|
||||||
|
|
||||||
final StreamsBuilder builder = new StreamsBuilder();
|
final StreamsBuilder builder = new StreamsBuilder();
|
||||||
builder.table(INPUT_TOPIC, Consumed.with(Serdes.Integer(), Serdes.Integer()), Materialized.as("source-table"));
|
builder.table(INPUT_TOPIC, Consumed.with(Serdes.Integer(), Serdes.Integer()), Materialized.as("source-table"));
|
||||||
|
|
|
@ -389,7 +389,7 @@ public class StoreQueryIntegrationTest {
|
||||||
private Properties streamsConfiguration() {
|
private Properties streamsConfiguration() {
|
||||||
final String safeTestName = safeUniqueTestName(getClass(), testName);
|
final String safeTestName = safeUniqueTestName(getClass(), testName);
|
||||||
final Properties config = new Properties();
|
final Properties config = new Properties();
|
||||||
config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
|
config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
|
||||||
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
|
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
|
||||||
config.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:" + (++port));
|
config.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:" + (++port));
|
||||||
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
|
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
|
||||||
|
|
|
@ -192,7 +192,7 @@ public class RepartitionTopicNamingTest {
|
||||||
kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(10L))).count();
|
kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(10L))).count();
|
||||||
kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(30L))).count();
|
kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(30L))).count();
|
||||||
final Properties properties = new Properties();
|
final Properties properties = new Properties();
|
||||||
properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
|
properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
|
||||||
final Topology topology = builder.build(properties);
|
final Topology topology = builder.build(properties);
|
||||||
assertThat(getCountOfRepartitionTopicsFound(topology.describe().toString(), repartitionTopicPattern), is(1));
|
assertThat(getCountOfRepartitionTopicsFound(topology.describe().toString(), repartitionTopicPattern), is(1));
|
||||||
}
|
}
|
||||||
|
@ -224,7 +224,7 @@ public class RepartitionTopicNamingTest {
|
||||||
public void shouldPassWithSameRepartitionTopicNameUsingSameKGroupedStreamOptimized() {
|
public void shouldPassWithSameRepartitionTopicNameUsingSameKGroupedStreamOptimized() {
|
||||||
final StreamsBuilder builder = new StreamsBuilder();
|
final StreamsBuilder builder = new StreamsBuilder();
|
||||||
final Properties properties = new Properties();
|
final Properties properties = new Properties();
|
||||||
properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
|
properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
|
||||||
final KGroupedStream<String, String> kGroupedStream = builder.<String, String>stream("topic")
|
final KGroupedStream<String, String> kGroupedStream = builder.<String, String>stream("topic")
|
||||||
.selectKey((k, v) -> k)
|
.selectKey((k, v) -> k)
|
||||||
.groupByKey(Grouped.as("grouping"));
|
.groupByKey(Grouped.as("grouping"));
|
||||||
|
@ -500,7 +500,7 @@ public class RepartitionTopicNamingTest {
|
||||||
|
|
||||||
final Properties properties = new Properties();
|
final Properties properties = new Properties();
|
||||||
|
|
||||||
properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, optimizationConfig);
|
properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, optimizationConfig);
|
||||||
return builder.build(properties);
|
return builder.build(properties);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -379,7 +379,7 @@ public class CogroupedKStreamImplTest {
|
||||||
@Test
|
@Test
|
||||||
public void shouldInsertRepartitionsTopicForUpstreamKeyModificationWithGroupedReusedInSameCogroupsWithOptimization() {
|
public void shouldInsertRepartitionsTopicForUpstreamKeyModificationWithGroupedReusedInSameCogroupsWithOptimization() {
|
||||||
final Properties properties = new Properties();
|
final Properties properties = new Properties();
|
||||||
properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
|
properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
|
||||||
final StreamsBuilder builder = new StreamsBuilder();
|
final StreamsBuilder builder = new StreamsBuilder();
|
||||||
|
|
||||||
final KStream<String, String> stream1 = builder.stream("one", stringConsumed);
|
final KStream<String, String> stream1 = builder.stream("one", stringConsumed);
|
||||||
|
@ -528,7 +528,7 @@ public class CogroupedKStreamImplTest {
|
||||||
final StreamsBuilder builder = new StreamsBuilder();
|
final StreamsBuilder builder = new StreamsBuilder();
|
||||||
|
|
||||||
final Properties properties = new Properties();
|
final Properties properties = new Properties();
|
||||||
properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
|
properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
|
||||||
|
|
||||||
final KStream<String, String> stream1 = builder.stream("one", stringConsumed);
|
final KStream<String, String> stream1 = builder.stream("one", stringConsumed);
|
||||||
final KStream<String, String> stream2 = builder.stream("two", stringConsumed);
|
final KStream<String, String> stream2 = builder.stream("two", stringConsumed);
|
||||||
|
@ -653,7 +653,7 @@ public class CogroupedKStreamImplTest {
|
||||||
final StreamsBuilder builder = new StreamsBuilder();
|
final StreamsBuilder builder = new StreamsBuilder();
|
||||||
|
|
||||||
final Properties properties = new Properties();
|
final Properties properties = new Properties();
|
||||||
properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
|
properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
|
||||||
|
|
||||||
final KStream<String, String> stream1 = builder.stream("one", stringConsumed);
|
final KStream<String, String> stream1 = builder.stream("one", stringConsumed);
|
||||||
final KStream<String, String> stream2 = builder.stream("two", stringConsumed);
|
final KStream<String, String> stream2 = builder.stream("two", stringConsumed);
|
||||||
|
@ -707,7 +707,7 @@ public class CogroupedKStreamImplTest {
|
||||||
final StreamsBuilder builder = new StreamsBuilder();
|
final StreamsBuilder builder = new StreamsBuilder();
|
||||||
|
|
||||||
final Properties properties = new Properties();
|
final Properties properties = new Properties();
|
||||||
properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
|
properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
|
||||||
|
|
||||||
final KStream<String, String> stream1 = builder.stream("one", stringConsumed);
|
final KStream<String, String> stream1 = builder.stream("one", stringConsumed);
|
||||||
final KStream<String, String> stream2 = builder.stream("two", stringConsumed);
|
final KStream<String, String> stream2 = builder.stream("two", stringConsumed);
|
||||||
|
|
|
@ -84,7 +84,7 @@ public class KStreamKStreamJoinTest {
|
||||||
public void shouldReuseRepartitionTopicWithGeneratedName() {
|
public void shouldReuseRepartitionTopicWithGeneratedName() {
|
||||||
final StreamsBuilder builder = new StreamsBuilder();
|
final StreamsBuilder builder = new StreamsBuilder();
|
||||||
final Properties props = new Properties();
|
final Properties props = new Properties();
|
||||||
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.NO_OPTIMIZATION);
|
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.NO_OPTIMIZATION);
|
||||||
final KStream<String, String> stream1 = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String()));
|
final KStream<String, String> stream1 = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String()));
|
||||||
final KStream<String, String> stream2 = builder.stream("topic2", Consumed.with(Serdes.String(), Serdes.String()));
|
final KStream<String, String> stream2 = builder.stream("topic2", Consumed.with(Serdes.String(), Serdes.String()));
|
||||||
final KStream<String, String> stream3 = builder.stream("topic3", Consumed.with(Serdes.String(), Serdes.String()));
|
final KStream<String, String> stream3 = builder.stream("topic3", Consumed.with(Serdes.String(), Serdes.String()));
|
||||||
|
@ -98,7 +98,7 @@ public class KStreamKStreamJoinTest {
|
||||||
public void shouldCreateRepartitionTopicsWithUserProvidedName() {
|
public void shouldCreateRepartitionTopicsWithUserProvidedName() {
|
||||||
final StreamsBuilder builder = new StreamsBuilder();
|
final StreamsBuilder builder = new StreamsBuilder();
|
||||||
final Properties props = new Properties();
|
final Properties props = new Properties();
|
||||||
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.NO_OPTIMIZATION);
|
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.NO_OPTIMIZATION);
|
||||||
final KStream<String, String> stream1 = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String()));
|
final KStream<String, String> stream1 = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String()));
|
||||||
final KStream<String, String> stream2 = builder.stream("topic2", Consumed.with(Serdes.String(), Serdes.String()));
|
final KStream<String, String> stream2 = builder.stream("topic2", Consumed.with(Serdes.String(), Serdes.String()));
|
||||||
final KStream<String, String> stream3 = builder.stream("topic3", Consumed.with(Serdes.String(), Serdes.String()));
|
final KStream<String, String> stream3 = builder.stream("topic3", Consumed.with(Serdes.String(), Serdes.String()));
|
||||||
|
|
|
@ -118,7 +118,7 @@ public class KStreamKTableJoinTest {
|
||||||
public void shouldReuseRepartitionTopicWithGeneratedName() {
|
public void shouldReuseRepartitionTopicWithGeneratedName() {
|
||||||
final StreamsBuilder builder = new StreamsBuilder();
|
final StreamsBuilder builder = new StreamsBuilder();
|
||||||
final Properties props = new Properties();
|
final Properties props = new Properties();
|
||||||
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.NO_OPTIMIZATION);
|
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.NO_OPTIMIZATION);
|
||||||
final KStream<String, String> streamA = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String()));
|
final KStream<String, String> streamA = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String()));
|
||||||
final KTable<String, String> tableB = builder.table("topic2", Consumed.with(Serdes.String(), Serdes.String()));
|
final KTable<String, String> tableB = builder.table("topic2", Consumed.with(Serdes.String(), Serdes.String()));
|
||||||
final KTable<String, String> tableC = builder.table("topic3", Consumed.with(Serdes.String(), Serdes.String()));
|
final KTable<String, String> tableC = builder.table("topic3", Consumed.with(Serdes.String(), Serdes.String()));
|
||||||
|
@ -133,7 +133,7 @@ public class KStreamKTableJoinTest {
|
||||||
public void shouldCreateRepartitionTopicsWithUserProvidedName() {
|
public void shouldCreateRepartitionTopicsWithUserProvidedName() {
|
||||||
final StreamsBuilder builder = new StreamsBuilder();
|
final StreamsBuilder builder = new StreamsBuilder();
|
||||||
final Properties props = new Properties();
|
final Properties props = new Properties();
|
||||||
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.NO_OPTIMIZATION);
|
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.NO_OPTIMIZATION);
|
||||||
final KStream<String, String> streamA = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String()));
|
final KStream<String, String> streamA = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String()));
|
||||||
final KTable<String, String> tableB = builder.table("topic2", Consumed.with(Serdes.String(), Serdes.String()));
|
final KTable<String, String> tableB = builder.table("topic2", Consumed.with(Serdes.String(), Serdes.String()));
|
||||||
final KTable<String, String> tableC = builder.table("topic3", Consumed.with(Serdes.String(), Serdes.String()));
|
final KTable<String, String> tableC = builder.table("topic3", Consumed.with(Serdes.String(), Serdes.String()));
|
||||||
|
|
|
@ -89,7 +89,7 @@ public class StreamsGraphTest {
|
||||||
final Properties properties = new Properties();
|
final Properties properties = new Properties();
|
||||||
properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "test-application");
|
properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "test-application");
|
||||||
properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
|
properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
|
||||||
properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
|
properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
|
||||||
|
|
||||||
final StreamsBuilder builder = new StreamsBuilder();
|
final StreamsBuilder builder = new StreamsBuilder();
|
||||||
final KStream<String, String> inputStream = builder.stream("inputTopic");
|
final KStream<String, String> inputStream = builder.stream("inputTopic");
|
||||||
|
@ -118,7 +118,7 @@ public class StreamsGraphTest {
|
||||||
final Properties properties = new Properties();
|
final Properties properties = new Properties();
|
||||||
properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "test-application");
|
properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "test-application");
|
||||||
properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
|
properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
|
||||||
properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
|
properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
|
||||||
|
|
||||||
final StreamsBuilder builder = new StreamsBuilder();
|
final StreamsBuilder builder = new StreamsBuilder();
|
||||||
initializer = () -> "";
|
initializer = () -> "";
|
||||||
|
@ -222,7 +222,7 @@ public class StreamsGraphTest {
|
||||||
.to("output_topic");
|
.to("output_topic");
|
||||||
|
|
||||||
final Properties properties = new Properties();
|
final Properties properties = new Properties();
|
||||||
properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
|
properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
|
||||||
final Topology topology = streamsBuilder.build(properties);
|
final Topology topology = streamsBuilder.build(properties);
|
||||||
|
|
||||||
assertEquals(expectedMergeOptimizedTopology, topology.describe().toString());
|
assertEquals(expectedMergeOptimizedTopology, topology.describe().toString());
|
||||||
|
@ -242,7 +242,7 @@ public class StreamsGraphTest {
|
||||||
|
|
||||||
final StreamsBuilder builder = new StreamsBuilder();
|
final StreamsBuilder builder = new StreamsBuilder();
|
||||||
final Properties properties = new Properties();
|
final Properties properties = new Properties();
|
||||||
properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, optimizeConfig);
|
properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, optimizeConfig);
|
||||||
|
|
||||||
final KStream<String, String> inputStream = builder.stream("input");
|
final KStream<String, String> inputStream = builder.stream("input");
|
||||||
final KStream<String, String> mappedKeyStream = inputStream.selectKey((k, v) -> k + v);
|
final KStream<String, String> mappedKeyStream = inputStream.selectKey((k, v) -> k + v);
|
||||||
|
@ -259,7 +259,7 @@ public class StreamsGraphTest {
|
||||||
|
|
||||||
final StreamsBuilder builder = new StreamsBuilder();
|
final StreamsBuilder builder = new StreamsBuilder();
|
||||||
final Properties properties = new Properties();
|
final Properties properties = new Properties();
|
||||||
properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, optimizeConfig);
|
properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, optimizeConfig);
|
||||||
|
|
||||||
final KStream<String, String> inputStream = builder.stream("input");
|
final KStream<String, String> inputStream = builder.stream("input");
|
||||||
final KStream<String, String> mappedKeyStream = inputStream.selectKey((k, v) -> k + v).through("through-topic");
|
final KStream<String, String> mappedKeyStream = inputStream.selectKey((k, v) -> k + v).through("through-topic");
|
||||||
|
@ -274,7 +274,7 @@ public class StreamsGraphTest {
|
||||||
private Topology getTopologyWithRepartitionOperation(final String optimizeConfig) {
|
private Topology getTopologyWithRepartitionOperation(final String optimizeConfig) {
|
||||||
final StreamsBuilder builder = new StreamsBuilder();
|
final StreamsBuilder builder = new StreamsBuilder();
|
||||||
final Properties properties = new Properties();
|
final Properties properties = new Properties();
|
||||||
properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, optimizeConfig);
|
properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, optimizeConfig);
|
||||||
|
|
||||||
final KStream<String, String> inputStream = builder.<String, String>stream("input").selectKey((k, v) -> k + v);
|
final KStream<String, String> inputStream = builder.<String, String>stream("input").selectKey((k, v) -> k + v);
|
||||||
|
|
||||||
|
|
|
@ -200,7 +200,7 @@ public class RepartitionOptimizingTest {
|
||||||
.withOtherValueSerde(Serdes.Long()))
|
.withOtherValueSerde(Serdes.Long()))
|
||||||
.to(JOINED_TOPIC, Produced.as("join-to"));
|
.to(JOINED_TOPIC, Produced.as("join-to"));
|
||||||
|
|
||||||
streamsConfiguration.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, optimizationConfig);
|
streamsConfiguration.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, optimizationConfig);
|
||||||
final Topology topology = builder.build(streamsConfiguration);
|
final Topology topology = builder.build(streamsConfiguration);
|
||||||
|
|
||||||
topologyTestDriver = new TopologyTestDriver(topology, streamsConfiguration);
|
topologyTestDriver = new TopologyTestDriver(topology, streamsConfiguration);
|
||||||
|
|
|
@ -120,7 +120,7 @@ public class RepartitionWithMergeOptimizingTest {
|
||||||
|
|
||||||
private void runTest(final String optimizationConfig, final int expectedNumberRepartitionTopics) {
|
private void runTest(final String optimizationConfig, final int expectedNumberRepartitionTopics) {
|
||||||
|
|
||||||
streamsConfiguration.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, optimizationConfig);
|
streamsConfiguration.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, optimizationConfig);
|
||||||
|
|
||||||
final StreamsBuilder builder = new StreamsBuilder();
|
final StreamsBuilder builder = new StreamsBuilder();
|
||||||
|
|
||||||
|
|
|
@ -1941,7 +1941,7 @@ public class StreamsPartitionAssignorTest {
|
||||||
|
|
||||||
createDefaultMockTaskManager();
|
createDefaultMockTaskManager();
|
||||||
EasyMock.expect(taskManager.mainConsumer()).andStubReturn(consumerClient);
|
EasyMock.expect(taskManager.mainConsumer()).andStubReturn(consumerClient);
|
||||||
configurePartitionAssignorWith(singletonMap(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE));
|
configurePartitionAssignorWith(singletonMap(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE));
|
||||||
overwriteInternalTopicManagerWithMock(false);
|
overwriteInternalTopicManagerWithMock(false);
|
||||||
|
|
||||||
EasyMock.expect(consumerClient.committed(changelogs))
|
EasyMock.expect(consumerClient.committed(changelogs))
|
||||||
|
|
|
@ -335,10 +335,10 @@ class TopologyTest {
|
||||||
def shouldBuildIdenticalTopologyInJavaNScalaProperties(): Unit = {
|
def shouldBuildIdenticalTopologyInJavaNScalaProperties(): Unit = {
|
||||||
|
|
||||||
val props = new Properties()
|
val props = new Properties()
|
||||||
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE)
|
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE)
|
||||||
|
|
||||||
val propsNoOptimization = new Properties()
|
val propsNoOptimization = new Properties()
|
||||||
propsNoOptimization.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.NO_OPTIMIZATION)
|
propsNoOptimization.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.NO_OPTIMIZATION)
|
||||||
|
|
||||||
val AGGREGATION_TOPIC = "aggregationTopic"
|
val AGGREGATION_TOPIC = "aggregationTopic"
|
||||||
val REDUCE_TOPIC = "reduceTopic"
|
val REDUCE_TOPIC = "reduceTopic"
|
||||||
|
|
Loading…
Reference in New Issue