KAFKA-4532: StateStores can be connected to the wrong source topic resulting in incorrect metadata returned from Interactive Queries

When building a topology with tables and StateStores, the StateStores are mapped to the source topic names. This map is retrieved via TopologyBuilder.stateStoreNameToSourceTopics() and is used in Interactive Queries to find the source topics and partitions when resolving the partitions that particular keys will be in.
There is an issue where by this mapping for a table that is originally created with builder.table("topic", "table");, and then is subsequently used in a join, is changed to the internal repartition topic. This is because the mapping is updated during the call to topology.connectProcessorAndStateStores(..).
In the case that the stateStoreNameToSourceTopics Map already has a value for the state store name it should not update the Map.

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Matthias J. Sax, Guozhang Wang

Closes #2250 from dguy/kafka-4532
This commit is contained in:
Damian Guy 2016-12-13 12:34:46 -08:00 committed by Guozhang Wang
parent 8591137869
commit 448f194c70
2 changed files with 26 additions and 0 deletions

View File

@ -692,6 +692,13 @@ public class TopologyBuilder {
private void connectStateStoreNameToSourceTopics(final String stateStoreName,
final ProcessorNodeFactory processorNodeFactory) {
// we should never update the mapping from state store names to source topics if the store name already exists
// in the map; this scenario is possible, for example, that a state store underlying a source KTable is
// connecting to a join operator whose source topic is not the original KTable's source topic but an internal repartition topic.
if (stateStoreNameToSourceTopics.containsKey(stateStoreName)) {
return;
}
final Set<String> sourceTopics = findSourceTopicsForProcessorParents(processorNodeFactory.parents);
if (sourceTopics.isEmpty()) {
throw new TopologyBuilderException("can't find source topic for state store " +

View File

@ -22,10 +22,13 @@ import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.kstream.internals.KStreamImpl;
import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockKeyValueMapper;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.junit.After;
import org.junit.Test;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
@ -150,4 +153,20 @@ public class KStreamBuilderTest {
new KStreamBuilder().stream(Serdes.String(), Serdes.String(), null, null);
}
@Test
public void shouldMapStateStoresToCorrectSourceTopics() throws Exception {
final KStreamBuilder builder = new KStreamBuilder();
builder.setApplicationId("app-id");
final KStream<String, String> playEvents = builder.stream("events");
final KTable<String, String> table = builder.table("table-topic", "table-store");
assertEquals(Collections.singleton("table-topic"), builder.stateStoreNameToSourceTopics().get("table-store"));
final KStream<String, String> mapped = playEvents.map(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper());
mapped.leftJoin(table, MockValueJoiner.STRING_JOINER).groupByKey().count("count");
assertEquals(Collections.singleton("table-topic"), builder.stateStoreNameToSourceTopics().get("table-store"));
assertEquals(Collections.singleton("app-id-KSTREAM-MAP-0000000003-repartition"), builder.stateStoreNameToSourceTopics().get("count"));
}
}