diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java index ecac8c98432..74fea9cc73a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java @@ -692,6 +692,13 @@ public class TopologyBuilder { private void connectStateStoreNameToSourceTopics(final String stateStoreName, final ProcessorNodeFactory processorNodeFactory) { + // we should never update the mapping from state store names to source topics if the store name already exists + // in the map; this scenario is possible, for example, that a state store underlying a source KTable is + // connecting to a join operator whose source topic is not the original KTable's source topic but an internal repartition topic. + if (stateStoreNameToSourceTopics.containsKey(stateStoreName)) { + return; + } + final Set sourceTopics = findSourceTopicsForProcessorParents(processorNodeFactory.parents); if (sourceTopics.isEmpty()) { throw new TopologyBuilderException("can't find source topic for state store " + diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java index b9517436932..52decf48cc1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java @@ -22,10 +22,13 @@ import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.kstream.internals.KStreamImpl; import org.apache.kafka.streams.errors.TopologyBuilderException; import org.apache.kafka.test.KStreamTestDriver; +import org.apache.kafka.test.MockKeyValueMapper; import org.apache.kafka.test.MockProcessorSupplier; +import org.apache.kafka.test.MockValueJoiner; import org.junit.After; import org.junit.Test; +import java.util.Collections; import java.util.Map; import java.util.Set; @@ -150,4 +153,20 @@ public class KStreamBuilderTest { new KStreamBuilder().stream(Serdes.String(), Serdes.String(), null, null); } + @Test + public void shouldMapStateStoresToCorrectSourceTopics() throws Exception { + final KStreamBuilder builder = new KStreamBuilder(); + builder.setApplicationId("app-id"); + + final KStream playEvents = builder.stream("events"); + + final KTable table = builder.table("table-topic", "table-store"); + assertEquals(Collections.singleton("table-topic"), builder.stateStoreNameToSourceTopics().get("table-store")); + + final KStream mapped = playEvents.map(MockKeyValueMapper.SelectValueKeyValueMapper()); + mapped.leftJoin(table, MockValueJoiner.STRING_JOINER).groupByKey().count("count"); + assertEquals(Collections.singleton("table-topic"), builder.stateStoreNameToSourceTopics().get("table-store")); + assertEquals(Collections.singleton("app-id-KSTREAM-MAP-0000000003-repartition"), builder.stateStoreNameToSourceTopics().get("count")); + } + }