diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java index 7037e8d7fd3..02b199c0c74 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java @@ -75,7 +75,7 @@ public class StreamsBuilder { public StreamsBuilder() { topology = new Topology(); internalTopologyBuilder = topology.internalTopologyBuilder; - internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder); + internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder, false); } /** @@ -87,7 +87,14 @@ public class StreamsBuilder { public StreamsBuilder(final TopologyConfig topologyConfigs) { topology = newTopology(topologyConfigs); internalTopologyBuilder = topology.internalTopologyBuilder; - internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder); + internalStreamsBuilder = new InternalStreamsBuilder( + internalTopologyBuilder, + TopologyConfig.InternalConfig.getBoolean( + topologyConfigs.originals(), + TopologyConfig.InternalConfig.ENABLE_PROCESS_PROCESSVALUE_FIX, + false + ) + ); } protected Topology newTopology(final TopologyConfig topologyConfigs) { diff --git a/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java b/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java index e96d5281d09..be43da321bb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java @@ -34,6 +34,7 @@ import org.apache.kafka.streams.state.DslStoreSuppliers; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Map; import java.util.Optional; import java.util.Properties; import java.util.function.Supplier; @@ -84,6 +85,28 @@ import static org.apache.kafka.streams.internals.StreamsConfigUtils.totalCacheSi */ @SuppressWarnings("deprecation") public final class TopologyConfig extends AbstractConfig { + + public static class InternalConfig { + // Cf https://issues.apache.org/jira/browse/KAFKA-19668 + public static final String ENABLE_PROCESS_PROCESSVALUE_FIX = "__enable.process.processValue.fix__"; + + public static boolean getBoolean(final Map configs, final String key, final boolean defaultValue) { + final Object value = configs.getOrDefault(key, defaultValue); + if (value instanceof Boolean) { + return (boolean) value; + } else if (value instanceof String) { + return Boolean.parseBoolean((String) value); + } else { + log.warn( + "Invalid value ({}) on internal configuration '{}'. Please specify a true/false value.", + value, + key + ); + return defaultValue; + } + } + } + private static final ConfigDef CONFIG; static { CONFIG = new ConfigDef() diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java index 954b88bfbea..e0d9ce12a23 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java @@ -72,6 +72,7 @@ public class InternalStreamsBuilder implements InternalNameProvider { private static final String TABLE_SOURCE_SUFFIX = "-source"; final InternalTopologyBuilder internalTopologyBuilder; + private final boolean processProcessValueFixEnabled; private final AtomicInteger index = new AtomicInteger(0); private final AtomicInteger buildPriorityIndex = new AtomicInteger(0); @@ -91,8 +92,10 @@ public class InternalStreamsBuilder implements InternalNameProvider { } }; - public InternalStreamsBuilder(final InternalTopologyBuilder internalTopologyBuilder) { + public InternalStreamsBuilder(final InternalTopologyBuilder internalTopologyBuilder, + final boolean processProcessValueFixEnabled) { this.internalTopologyBuilder = internalTopologyBuilder; + this.processProcessValueFixEnabled = processProcessValueFixEnabled; } public KStream stream(final Collection topics, @@ -706,4 +709,7 @@ public class InternalStreamsBuilder implements InternalNameProvider { return internalTopologyBuilder; } + public boolean processProcessValueFixEnabled() { + return processProcessValueFixEnabled; + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 7beaa1abffb..a5b20d2d20a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -1235,7 +1235,12 @@ public class KStreamImpl extends AbstractStream implements KStream( name, new ProcessorParameters<>(processorSupplier, name), - stateStoreNames); + stateStoreNames + ); + if (builder.processProcessValueFixEnabled()) { + processNode.keyChangingOperation(true); + processNode.setValueChangingOperation(true); + } builder.addGraphNode(graphNode, processNode); @@ -1280,7 +1285,11 @@ public class KStreamImpl extends AbstractStream implements KStream processNode = new ProcessorToStateConnectorNode<>( name, new ProcessorParameters<>(processorSupplier, name), - stateStoreNames); + stateStoreNames + ); + if (builder.processProcessValueFixEnabled()) { + processNode.setValueChangingOperation(true); + } builder.addGraphNode(graphNode, processNode); // cannot inherit value serde diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java index c761d1eda7c..8db9441d89f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java @@ -81,7 +81,7 @@ public class InternalStreamsBuilderTest { private static final String APP_ID = "app-id"; - private final InternalStreamsBuilder builder = new InternalStreamsBuilder(new InternalTopologyBuilder()); + private final InternalStreamsBuilder builder = new InternalStreamsBuilder(new InternalTopologyBuilder(), false); private final ConsumedInternal consumed = new ConsumedInternal<>(Consumed.with(null, null)); private final String storePrefix = "prefix-"; private final MaterializedInternal> materialized = new MaterializedInternal<>(Materialized.as("test-store"), builder, storePrefix); @@ -93,7 +93,7 @@ public class InternalStreamsBuilderTest { assertEquals("Y-0000000001", builder.newProcessorName("Y-")); assertEquals("Z-0000000002", builder.newProcessorName("Z-")); - final InternalStreamsBuilder newBuilder = new InternalStreamsBuilder(new InternalTopologyBuilder()); + final InternalStreamsBuilder newBuilder = new InternalStreamsBuilder(new InternalTopologyBuilder(), false); assertEquals("X-0000000000", newBuilder.newProcessorName("X-")); assertEquals("Y-0000000001", newBuilder.newProcessorName("Y-")); @@ -106,7 +106,7 @@ public class InternalStreamsBuilderTest { assertEquals("Y-STATE-STORE-0000000001", builder.newStoreName("Y-")); assertEquals("Z-STATE-STORE-0000000002", builder.newStoreName("Z-")); - final InternalStreamsBuilder newBuilder = new InternalStreamsBuilder(new InternalTopologyBuilder()); + final InternalStreamsBuilder newBuilder = new InternalStreamsBuilder(new InternalTopologyBuilder(), false); assertEquals("X-STATE-STORE-0000000000", newBuilder.newStoreName("X-")); assertEquals("Y-STATE-STORE-0000000001", newBuilder.newStoreName("Y-")); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/MaterializedInternalTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/MaterializedInternalTest.java index 24600c57fec..a3eaadd27db 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/MaterializedInternalTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/MaterializedInternalTest.java @@ -95,7 +95,7 @@ public class MaterializedInternalTest { final InternalTopologyBuilder topologyBuilder = new InternalTopologyBuilder( new TopologyConfig("my-topology", config, topologyOverrides)); - final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(topologyBuilder); + final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(topologyBuilder, false); final MaterializedInternal> materialized = new MaterializedInternal<>(Materialized.as(supplier), internalStreamsBuilder, prefix); @@ -113,7 +113,7 @@ public class MaterializedInternalTest { final InternalTopologyBuilder topologyBuilder = new InternalTopologyBuilder( new TopologyConfig("my-topology", config, topologyOverrides)); - final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(topologyBuilder); + final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(topologyBuilder, false); final MaterializedInternal> materialized = new MaterializedInternal<>(Materialized.as(supplier), internalStreamsBuilder, prefix); @@ -129,7 +129,7 @@ public class MaterializedInternalTest { final InternalTopologyBuilder topologyBuilder = new InternalTopologyBuilder( new TopologyConfig("my-topology", config, topologyOverrides)); - final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(topologyBuilder); + final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(topologyBuilder, false); final MaterializedInternal> materialized = new MaterializedInternal<>(Materialized.as(supplier), internalStreamsBuilder, prefix); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java index e046ba19533..1675619480f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyConfig; import org.apache.kafka.streams.kstream.Aggregator; import org.apache.kafka.streams.kstream.Branched; import org.apache.kafka.streams.kstream.Consumed; @@ -36,6 +37,8 @@ import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.Suppressed; import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.ValueJoiner; +import org.apache.kafka.streams.processor.api.ContextualFixedKeyProcessor; +import org.apache.kafka.streams.processor.api.FixedKeyRecord; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.ProcessorSupplier; @@ -127,7 +130,7 @@ public class StreamsGraphTest { initializer = () -> ""; aggregator = (aggKey, value, aggregate) -> aggregate + value.length(); final ProcessorSupplier processorSupplier = - () -> new Processor() { + () -> new Processor<>() { private ProcessorContext context; @Override @@ -185,14 +188,163 @@ public class StreamsGraphTest { } @Test - public void shouldNotOptimizeWithValueOrKeyChangingOperatorsAfterInitialKeyChange() { + public void shouldPartiallyOptimizeWithValueOrKeyChangingOperatorsAfterInitialKeyChangeWithFixDisabled() { + final Topology attemptedOptimize = getTopologyWithChangingValuesAfterChangingKey(StreamsConfig.OPTIMIZE, false); + final Topology noOptimization = getTopologyWithChangingValuesAfterChangingKey(StreamsConfig.NO_OPTIMIZATION, false); - final Topology attemptedOptimize = getTopologyWithChangingValuesAfterChangingKey(StreamsConfig.OPTIMIZE); - final Topology noOptimization = getTopologyWithChangingValuesAfterChangingKey(StreamsConfig.NO_OPTIMIZATION); + System.out.println(attemptedOptimize.describe().toString()); + System.out.println(noOptimization.describe().toString()); + assertEquals("Topologies:\n" + + " Sub-topology: 0\n" + + " Source: KSTREAM-SOURCE-0000000000 (topics: [input])\n" + + " --> KSTREAM-KEY-SELECT-0000000001\n" + + " Processor: KSTREAM-KEY-SELECT-0000000001 (stores: [])\n" + + " --> KSTREAM-FLATMAPVALUES-0000000010, KSTREAM-MAPVALUES-0000000002, KSTREAM-PROCESSVALUES-0000000018\n" + + " <-- KSTREAM-SOURCE-0000000000\n" + + " Processor: KSTREAM-FLATMAPVALUES-0000000010 (stores: [])\n" + + " --> KSTREAM-FILTER-0000000014\n" + + " <-- KSTREAM-KEY-SELECT-0000000001\n" + + " Processor: KSTREAM-MAPVALUES-0000000002 (stores: [])\n" + + " --> KSTREAM-FILTER-0000000006\n" + + " <-- KSTREAM-KEY-SELECT-0000000001\n" + + " Processor: KSTREAM-PROCESSVALUES-0000000018 (stores: [])\n" + + " --> KSTREAM-FILTER-0000000022\n" + + " <-- KSTREAM-KEY-SELECT-0000000001\n" + + " Processor: KSTREAM-FILTER-0000000006 (stores: [])\n" + + " --> KSTREAM-SINK-0000000005\n" + + " <-- KSTREAM-MAPVALUES-0000000002\n" + + " Processor: KSTREAM-FILTER-0000000014 (stores: [])\n" + + " --> KSTREAM-SINK-0000000013\n" + + " <-- KSTREAM-FLATMAPVALUES-0000000010\n" + + " Processor: KSTREAM-FILTER-0000000022 (stores: [])\n" + + " --> KSTREAM-SINK-0000000021\n" + + " <-- KSTREAM-PROCESSVALUES-0000000018\n" + + " Sink: KSTREAM-SINK-0000000005 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition)\n" + + " <-- KSTREAM-FILTER-0000000006\n" + + " Sink: KSTREAM-SINK-0000000013 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000011-repartition)\n" + + " <-- KSTREAM-FILTER-0000000014\n" + + " Sink: KSTREAM-SINK-0000000021 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000019-repartition)\n" + + " <-- KSTREAM-FILTER-0000000022\n" + + "\n" + + " Sub-topology: 1\n" + + " Source: KSTREAM-SOURCE-0000000007 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition])\n" + + " --> KSTREAM-AGGREGATE-0000000004\n" + + " Processor: KSTREAM-AGGREGATE-0000000004 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000003])\n" + + " --> KTABLE-TOSTREAM-0000000008\n" + + " <-- KSTREAM-SOURCE-0000000007\n" + + " Processor: KTABLE-TOSTREAM-0000000008 (stores: [])\n" + + " --> KSTREAM-SINK-0000000009\n" + + " <-- KSTREAM-AGGREGATE-0000000004\n" + + " Sink: KSTREAM-SINK-0000000009 (topic: output)\n" + + " <-- KTABLE-TOSTREAM-0000000008\n" + + "\n" + + " Sub-topology: 2\n" + + " Source: KSTREAM-SOURCE-0000000015 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000011-repartition])\n" + + " --> KSTREAM-AGGREGATE-0000000012\n" + + " Processor: KSTREAM-AGGREGATE-0000000012 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000011])\n" + + " --> KTABLE-TOSTREAM-0000000016\n" + + " <-- KSTREAM-SOURCE-0000000015\n" + + " Processor: KTABLE-TOSTREAM-0000000016 (stores: [])\n" + + " --> KSTREAM-SINK-0000000017\n" + + " <-- KSTREAM-AGGREGATE-0000000012\n" + + " Sink: KSTREAM-SINK-0000000017 (topic: windowed-output)\n" + + " <-- KTABLE-TOSTREAM-0000000016\n" + + "\n" + + " Sub-topology: 3\n" + + " Source: KSTREAM-SOURCE-0000000023 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000019-repartition])\n" + + " --> KSTREAM-AGGREGATE-0000000020\n" + + " Processor: KSTREAM-AGGREGATE-0000000020 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000019])\n" + + " --> KTABLE-TOSTREAM-0000000024\n" + + " <-- KSTREAM-SOURCE-0000000023\n" + + " Processor: KTABLE-TOSTREAM-0000000024 (stores: [])\n" + + " --> KSTREAM-SINK-0000000025\n" + + " <-- KSTREAM-AGGREGATE-0000000020\n" + + " Sink: KSTREAM-SINK-0000000025 (topic: output)\n" + + " <-- KTABLE-TOSTREAM-0000000024\n" + + "\n", + noOptimization.describe().toString() + ); + assertEquals("Topologies:\n" + + " Sub-topology: 0\n" + + " Source: KSTREAM-SOURCE-0000000000 (topics: [input])\n" + + " --> KSTREAM-KEY-SELECT-0000000001\n" + + " Processor: KSTREAM-KEY-SELECT-0000000001 (stores: [])\n" + + " --> KSTREAM-FLATMAPVALUES-0000000010, KSTREAM-MAPVALUES-0000000002, KSTREAM-PROCESSVALUES-0000000018\n" + + " <-- KSTREAM-SOURCE-0000000000\n" + + " Processor: KSTREAM-FLATMAPVALUES-0000000010 (stores: [])\n" + + " --> KSTREAM-FILTER-0000000014\n" + + " <-- KSTREAM-KEY-SELECT-0000000001\n" + + " Processor: KSTREAM-MAPVALUES-0000000002 (stores: [])\n" + + " --> KSTREAM-FILTER-0000000006\n" + + " <-- KSTREAM-KEY-SELECT-0000000001\n" + + " Processor: KSTREAM-PROCESSVALUES-0000000018 (stores: [])\n" + + " --> KSTREAM-FILTER-0000000022\n" + + " <-- KSTREAM-KEY-SELECT-0000000001\n" + + " Processor: KSTREAM-FILTER-0000000006 (stores: [])\n" + + " --> KSTREAM-SINK-0000000005\n" + + " <-- KSTREAM-MAPVALUES-0000000002\n" + + " Processor: KSTREAM-FILTER-0000000014 (stores: [])\n" + + " --> KSTREAM-SINK-0000000013\n" + + " <-- KSTREAM-FLATMAPVALUES-0000000010\n" + + " Processor: KSTREAM-FILTER-0000000022 (stores: [])\n" + + " --> KSTREAM-SINK-0000000021\n" + + " <-- KSTREAM-PROCESSVALUES-0000000018\n" + + " Sink: KSTREAM-SINK-0000000005 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition)\n" + + " <-- KSTREAM-FILTER-0000000006\n" + + " Sink: KSTREAM-SINK-0000000013 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000011-repartition)\n" + + " <-- KSTREAM-FILTER-0000000014\n" + + " Sink: KSTREAM-SINK-0000000021 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000019-repartition)\n" + + " <-- KSTREAM-FILTER-0000000022\n" + + "\n" + + " Sub-topology: 1\n" + + " Source: KSTREAM-SOURCE-0000000007 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition])\n" + + " --> KSTREAM-AGGREGATE-0000000004\n" + + " Processor: KSTREAM-AGGREGATE-0000000004 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000003])\n" + + " --> KTABLE-TOSTREAM-0000000008\n" + + " <-- KSTREAM-SOURCE-0000000007\n" + + " Processor: KTABLE-TOSTREAM-0000000008 (stores: [])\n" + + " --> KSTREAM-SINK-0000000009\n" + + " <-- KSTREAM-AGGREGATE-0000000004\n" + + " Sink: KSTREAM-SINK-0000000009 (topic: output)\n" + + " <-- KTABLE-TOSTREAM-0000000008\n" + + "\n" + + " Sub-topology: 2\n" + + " Source: KSTREAM-SOURCE-0000000015 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000011-repartition])\n" + + " --> KSTREAM-AGGREGATE-0000000012\n" + + " Processor: KSTREAM-AGGREGATE-0000000012 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000011])\n" + + " --> KTABLE-TOSTREAM-0000000016\n" + + " <-- KSTREAM-SOURCE-0000000015\n" + + " Processor: KTABLE-TOSTREAM-0000000016 (stores: [])\n" + + " --> KSTREAM-SINK-0000000017\n" + + " <-- KSTREAM-AGGREGATE-0000000012\n" + + " Sink: KSTREAM-SINK-0000000017 (topic: windowed-output)\n" + + " <-- KTABLE-TOSTREAM-0000000016\n" + + "\n" + + " Sub-topology: 3\n" + + " Source: KSTREAM-SOURCE-0000000023 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000019-repartition])\n" + + " --> KSTREAM-AGGREGATE-0000000020\n" + + " Processor: KSTREAM-AGGREGATE-0000000020 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000019])\n" + + " --> KTABLE-TOSTREAM-0000000024\n" + + " <-- KSTREAM-SOURCE-0000000023\n" + + " Processor: KTABLE-TOSTREAM-0000000024 (stores: [])\n" + + " --> KSTREAM-SINK-0000000025\n" + + " <-- KSTREAM-AGGREGATE-0000000020\n" + + " Sink: KSTREAM-SINK-0000000025 (topic: output)\n" + + " <-- KTABLE-TOSTREAM-0000000024\n\n", + noOptimization.describe().toString() + ); + assertEquals(3, getCountOfRepartitionTopicsFound(attemptedOptimize.describe().toString())); + assertEquals(3, getCountOfRepartitionTopicsFound(noOptimization.describe().toString())); + } + + @Test + public void shouldNotOptimizeWithValueOrKeyChangingOperatorsAfterInitialKeyChangeWithFixEnabled() { + final Topology attemptedOptimize = getTopologyWithChangingValuesAfterChangingKey(StreamsConfig.OPTIMIZE, true); + final Topology noOptimization = getTopologyWithChangingValuesAfterChangingKey(StreamsConfig.NO_OPTIMIZATION, true); assertEquals(attemptedOptimize.describe().toString(), noOptimization.describe().toString()); - assertEquals(2, getCountOfRepartitionTopicsFound(attemptedOptimize.describe().toString())); - assertEquals(2, getCountOfRepartitionTopicsFound(noOptimization.describe().toString())); + assertEquals(3, getCountOfRepartitionTopicsFound(attemptedOptimize.describe().toString())); + assertEquals(3, getCountOfRepartitionTopicsFound(noOptimization.describe().toString())); } @Test @@ -227,20 +379,30 @@ public class StreamsGraphTest { assertEquals(2, getCountOfRepartitionTopicsFound(noOptimization.describe().toString())); } - private Topology getTopologyWithChangingValuesAfterChangingKey(final String optimizeConfig) { - - final StreamsBuilder builder = new StreamsBuilder(); + private Topology getTopologyWithChangingValuesAfterChangingKey(final String optimizeConfig, + final boolean enableFix) { final Properties properties = new Properties(); + properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "application-id"); + properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, optimizeConfig); + properties.put(TopologyConfig.InternalConfig.ENABLE_PROCESS_PROCESSVALUE_FIX, enableFix); + + final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(properties))); final KStream inputStream = builder.stream("input"); final KStream mappedKeyStream = inputStream.selectKey((k, v) -> k + v); mappedKeyStream.mapValues(v -> v.toUpperCase(Locale.getDefault())).groupByKey().count().toStream().to("output"); mappedKeyStream.flatMapValues(v -> Arrays.asList(v.split("\\s"))).groupByKey().windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(5000))).count().toStream().to("windowed-output"); + mappedKeyStream.processValues( + () -> new ContextualFixedKeyProcessor<>() { + @Override + public void process(final FixedKeyRecord record) { + context().forward(record.withValue(record.value().toUpperCase(Locale.getDefault()))); + } + }).groupByKey().count().toStream().to("output"); return builder.build(properties); - } private Topology getTopologyWithRepartitionOperation(final String optimizeConfig) { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java index 91b95b87a7f..0d453b05271 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java @@ -30,6 +30,7 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.TestInputTopic; import org.apache.kafka.streams.TestOutputTopic; import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyConfig; import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.kstream.Aggregator; import org.apache.kafka.streams.kstream.Consumed; @@ -42,6 +43,8 @@ import org.apache.kafka.streams.kstream.Named; import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.Reducer; import org.apache.kafka.streams.kstream.StreamJoined; +import org.apache.kafka.streams.processor.api.ContextualFixedKeyProcessor; +import org.apache.kafka.streams.processor.api.FixedKeyRecord; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.state.Stores; @@ -224,6 +227,38 @@ public class RepartitionOptimizingTest { assertThat(joinedOutputTopic.readKeyValuesToMap(), equalTo(keyValueListToMap(expectedJoinKeyValues))); } + @Test + public void shouldNotPushRepartitionAcrossValueChangingOperation() { + streamsConfiguration.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE); + streamsConfiguration.setProperty(TopologyConfig.InternalConfig.ENABLE_PROCESS_PROCESSVALUE_FIX, "true"); + + final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(streamsConfiguration))); + + builder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), Serdes.String()).withName("sourceStream")) + .map((k, v) -> KeyValue.pair(k.toUpperCase(Locale.getDefault()), v)) + .processValues(() -> new ContextualFixedKeyProcessor() { + @Override + public void process(final FixedKeyRecord record) { + context().forward(record.withValue(record.value().length())); + } + }) + .groupByKey(Grouped.valueSerde(new Serdes.IntegerSerde())) + .reduce(Integer::sum) + .toStream() + .to(AGGREGATION_TOPIC); + + final Topology topology = builder.build(streamsConfiguration); + + topologyTestDriver = new TopologyTestDriver(topology, streamsConfiguration); + + final TestInputTopic inputTopic = topologyTestDriver.createInputTopic(INPUT_TOPIC, stringSerializer, stringSerializer); + final TestOutputTopic outputTopic = topologyTestDriver.createOutputTopic(AGGREGATION_TOPIC, stringDeserializer, new IntegerDeserializer()); + + inputTopic.pipeKeyValueList(getKeyValues()); + + assertThat(outputTopic.readKeyValuesToMap(), equalTo(keyValueListToMap(expectedAggKeyValues))); + } + private Map keyValueListToMap(final List> keyValuePairs) { final Map map = new HashMap<>(); for (final KeyValue pair : keyValuePairs) { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index d3a05720cd7..e5b3c0b1194 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -192,7 +192,7 @@ public class StreamThreadTest { private final ChangelogReader changelogReader = new MockChangelogReader(); private StateDirectory stateDirectory = null; private final InternalTopologyBuilder internalTopologyBuilder = new InternalTopologyBuilder(); - private final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder); + private final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder, false); private StreamThread thread = null; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java index 29fa204a579..0d65c4fa837 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java @@ -2623,7 +2623,7 @@ public class StreamsPartitionAssignorTest { builder = new CorruptedInternalTopologyBuilder(); topologyMetadata = new TopologyMetadata(builder, new StreamsConfig(configProps(parameterizedConfig))); - final InternalStreamsBuilder streamsBuilder = new InternalStreamsBuilder(builder); + final InternalStreamsBuilder streamsBuilder = new InternalStreamsBuilder(builder, false); final KStream inputTopic = streamsBuilder.stream(singleton("topic1"), new ConsumedInternal<>(Consumed.with(null, null))); final KTable inputTable = streamsBuilder.table("topic2", new ConsumedInternal<>(Consumed.with(null, null)), new MaterializedInternal<>(Materialized.as("store")));