KAFKA-4275: Check of State-Store-assignment to Processor-Nodes is not enabled

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Damian Guy, Guozhang Wang

Closes #1992 from mjsax/kafka-4275-stateStoreCheck
This commit is contained in:
Matthias J. Sax 2016-10-17 21:48:40 -07:00 committed by Guozhang Wang
parent 4e0b0b83a7
commit 925310aac0
20 changed files with 216 additions and 34 deletions

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals;
import java.util.ArrayList;
public abstract class AbstractKTableKTableJoinValueGetterSupplier<K, R, V1, V2> implements KTableValueGetterSupplier<K, R> {
final protected KTableValueGetterSupplier<K, V1> valueGetterSupplier1;
final protected KTableValueGetterSupplier<K, V2> valueGetterSupplier2;
public AbstractKTableKTableJoinValueGetterSupplier(final KTableValueGetterSupplier<K, V1> valueGetterSupplier1,
final KTableValueGetterSupplier<K, V2> valueGetterSupplier2) {
this.valueGetterSupplier1 = valueGetterSupplier1;
this.valueGetterSupplier2 = valueGetterSupplier2;
}
@Override
public String[] storeNames() {
final String[] storeNames1 = valueGetterSupplier1.storeNames();
final String[] storeNames2 = valueGetterSupplier2.storeNames();
final ArrayList<String> stores = new ArrayList<>(storeNames1.length + storeNames2.length);
for (final String storeName : storeNames1) {
stores.add(storeName);
}
for (final String storeName : storeNames2) {
stores.add(storeName);
}
return stores.toArray(new String[stores.size()]);
}
}

View File

@ -94,6 +94,10 @@ public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K,
return new KStreamAggregateValueGetter();
}
@Override
public String[] storeNames() {
return new String[]{storeName};
}
};
}

View File

@ -584,6 +584,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
String name = topology.newName(LEFTJOIN_NAME);
topology.addProcessor(name, new KStreamKTableLeftJoin<>((KTableImpl<K, ?, V1>) other, joiner), this.name);
topology.connectProcessorAndStateStores(name, ((KTableImpl<K, ?, V1>) other).valueGetterSupplier().storeNames());
topology.connectProcessors(this.name, ((KTableImpl<K, ?, V1>) other).name);
return new KStreamImpl<>(topology, name, allSourceNodes, false);
@ -703,8 +704,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
topology.addProcessor(joinThisName, joinThis, thisWindowStreamName);
topology.addProcessor(joinOtherName, joinOther, otherWindowStreamName);
topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName);
topology.addStateStore(thisWindow, thisWindowStreamName, otherWindowStreamName);
topology.addStateStore(otherWindow, thisWindowStreamName, otherWindowStreamName);
topology.addStateStore(thisWindow, thisWindowStreamName, joinOtherName);
topology.addStateStore(otherWindow, otherWindowStreamName, joinThisName);
Set<String> allSourceNodes = new HashSet<>(((AbstractStream) lhs).sourceNodes);
allSourceNodes.addAll(((KStreamImpl<K1, V2>) other).sourceNodes);

View File

@ -93,6 +93,10 @@ public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V,
return new KStreamReduceValueGetter();
}
@Override
public String[] storeNames() {
return new String[]{storeName};
}
};
}

View File

@ -17,8 +17,8 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
@ -134,6 +134,10 @@ public class KStreamWindowAggregate<K, V, T, W extends Window> implements KStrea
return new KStreamWindowAggregateValueGetter();
}
@Override
public String[] storeNames() {
return new String[]{storeName};
}
};
}

View File

@ -130,6 +130,10 @@ public class KStreamWindowReduce<K, V, W extends Window> implements KStreamAggPr
return new KStreamWindowReduceValueGetter();
}
@Override
public String[] storeNames() {
return new String[]{storeName};
}
};
}

View File

@ -105,6 +105,10 @@ public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T
return new KTableAggregateValueGetter();
}
@Override
public String[] storeNames() {
return new String[]{storeName};
}
};
}

View File

@ -52,6 +52,10 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> {
return new KTableFilterValueGetter(parentValueGetterSupplier.get());
}
@Override
public String[] storeNames() {
return parentValueGetterSupplier.storeNames();
}
};
}

View File

@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStream;
@ -32,7 +33,6 @@ import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreSupplier;
import java.io.FileNotFoundException;
@ -301,6 +301,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
topology.addProcessor(joinThisName, joinThis, this.name);
topology.addProcessor(joinOtherName, joinOther, ((KTableImpl) other).name);
topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName);
topology.connectProcessorAndStateStores(joinThisName, other.getStoreName());
topology.connectProcessorAndStateStores(joinOtherName, getStoreName());
return new KTableImpl<>(topology, joinMergeName, joinMerge, allSourceNodes, null);
}
@ -327,6 +329,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
topology.addProcessor(joinThisName, joinThis, this.name);
topology.addProcessor(joinOtherName, joinOther, ((KTableImpl) other).name);
topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName);
topology.connectProcessorAndStateStores(joinThisName, other.getStoreName());
topology.connectProcessorAndStateStores(joinOtherName, getStoreName());
return new KTableImpl<>(topology, joinMergeName, joinMerge, allSourceNodes, null);
}
@ -352,6 +356,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
topology.addProcessor(joinThisName, joinThis, this.name);
topology.addProcessor(joinOtherName, joinOther, ((KTableImpl) other).name);
topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName);
topology.connectProcessorAndStateStores(joinThisName, other.getStoreName());
topology.connectProcessorAndStateStores(joinOtherName, getStoreName());
return new KTableImpl<>(topology, joinMergeName, joinMerge, allSourceNodes, null);
}

View File

@ -36,13 +36,18 @@ class KTableKTableJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1,
@Override
public KTableValueGetterSupplier<K, R> view() {
return new KTableValueGetterSupplier<K, R>() {
return new KTableKTableJoinValueGetterSupplier(valueGetterSupplier1, valueGetterSupplier2);
}
public KTableValueGetter<K, R> get() {
return new KTableKTableJoinValueGetter(valueGetterSupplier1.get(), valueGetterSupplier2.get());
}
private class KTableKTableJoinValueGetterSupplier extends AbstractKTableKTableJoinValueGetterSupplier<K, R, V1, V2> {
};
public KTableKTableJoinValueGetterSupplier(KTableValueGetterSupplier<K, V1> valueGetterSupplier1, KTableValueGetterSupplier<K, V2> valueGetterSupplier2) {
super(valueGetterSupplier1, valueGetterSupplier2);
}
public KTableValueGetter<K, R> get() {
return new KTableKTableJoinValueGetter(valueGetterSupplier1.get(), valueGetterSupplier2.get());
}
}
private class KTableKTableJoinProcessor extends AbstractProcessor<K, Change<V1>> {

View File

@ -36,15 +36,21 @@ class KTableKTableLeftJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
@Override
public KTableValueGetterSupplier<K, R> view() {
return new KTableValueGetterSupplier<K, R>() {
public KTableValueGetter<K, R> get() {
return new KTableKTableLeftJoinValueGetter(valueGetterSupplier1.get(), valueGetterSupplier2.get());
}
};
return new KTableKTableLeftJoinValueGetterSupplier(valueGetterSupplier1, valueGetterSupplier2);
}
private class KTableKTableLeftJoinValueGetterSupplier extends AbstractKTableKTableJoinValueGetterSupplier<K, R, V1, V2> {
public KTableKTableLeftJoinValueGetterSupplier(KTableValueGetterSupplier<K, V1> valueGetterSupplier1, KTableValueGetterSupplier<K, V2> valueGetterSupplier2) {
super(valueGetterSupplier1, valueGetterSupplier2);
}
public KTableValueGetter<K, R> get() {
return new KTableKTableLeftJoinValueGetter(valueGetterSupplier1.get(), valueGetterSupplier2.get());
}
}
private class KTableKTableLeftJoinProcessor extends AbstractProcessor<K, Change<V1>> {
private final KTableValueGetter<K, V2> valueGetter;

View File

@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -36,13 +36,18 @@ class KTableKTableOuterJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
@Override
public KTableValueGetterSupplier<K, R> view() {
return new KTableValueGetterSupplier<K, R>() {
return new KTableKTableOuterJoinValueGetterSupplier(valueGetterSupplier1, valueGetterSupplier2);
}
public KTableValueGetter<K, R> get() {
return new KTableKTableOuterJoinValueGetter(valueGetterSupplier1.get(), valueGetterSupplier2.get());
}
private class KTableKTableOuterJoinValueGetterSupplier extends AbstractKTableKTableJoinValueGetterSupplier<K, R, V1, V2> {
};
public KTableKTableOuterJoinValueGetterSupplier(KTableValueGetterSupplier<K, V1> valueGetterSupplier1, KTableValueGetterSupplier<K, V2> valueGetterSupplier2) {
super(valueGetterSupplier1, valueGetterSupplier2);
}
public KTableValueGetter<K, R> get() {
return new KTableKTableOuterJoinValueGetter(valueGetterSupplier1.get(), valueGetterSupplier2.get());
}
}
private class KTableKTableOuterJoinProcessor extends AbstractProcessor<K, Change<V1>> {

View File

@ -37,13 +37,18 @@ class KTableKTableRightJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
@Override
public KTableValueGetterSupplier<K, R> view() {
return new KTableValueGetterSupplier<K, R>() {
return new KTableKTableRightJoinValueGetterSupplier(valueGetterSupplier1, valueGetterSupplier2);
}
public KTableValueGetter<K, R> get() {
return new KTableKTableRightJoinValueGetter(valueGetterSupplier1.get(), valueGetterSupplier2.get());
}
private class KTableKTableRightJoinValueGetterSupplier extends AbstractKTableKTableJoinValueGetterSupplier<K, R, V1, V2> {
};
public KTableKTableRightJoinValueGetterSupplier(KTableValueGetterSupplier<K, V1> valueGetterSupplier1, KTableValueGetterSupplier<K, V2> valueGetterSupplier2) {
super(valueGetterSupplier1, valueGetterSupplier2);
}
public KTableValueGetter<K, R> get() {
return new KTableKTableRightJoinValueGetter(valueGetterSupplier1.get(), valueGetterSupplier2.get());
}
}
private class KTableKTableRightJoinProcessor extends AbstractProcessor<K, Change<V1>> {

View File

@ -50,6 +50,10 @@ class KTableMapValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> {
return new KTableMapValuesValueGetter(parentValueGetterSupplier.get());
}
@Override
public String[] storeNames() {
return parentValueGetterSupplier.storeNames();
}
};
}

View File

@ -103,6 +103,10 @@ public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, V> {
return new KTableAggregateValueGetter();
}
@Override
public String[] storeNames() {
return new String[]{storeName};
}
};
}

View File

@ -53,6 +53,11 @@ public class KTableRepartitionMap<K, V, K1, V1> implements KTableProcessorSuppli
public KTableValueGetter<K, KeyValue<K1, V1>> get() {
return new KTableMapValueGetter(parentValueGetterSupplier.get());
}
@Override
public String[] storeNames() {
throw new StreamsException("Underlying state store not accessible due to repartitioning.");
}
};
}

View File

@ -32,6 +32,11 @@ public class KTableSourceValueGetterSupplier<K, V> implements KTableValueGetterS
return new KTableSourceValueGetter();
}
@Override
public String[] storeNames() {
return new String[]{storeName};
}
private class KTableSourceValueGetter implements KTableValueGetter<K, V> {
KeyValueStore<K, V> store = null;

View File

@ -21,4 +21,5 @@ public interface KTableValueGetterSupplier<K, V> {
KTableValueGetter<K, V> get();
String[] storeNames();
}

View File

@ -18,11 +18,11 @@
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.state.internals.ThreadCache;
@ -133,9 +133,9 @@ public class ProcessorContextImpl implements InternalProcessorContext, RecordCol
if (node == null)
throw new TopologyBuilderException("Accessing from an unknown node");
// TODO: restore this once we fix the ValueGetter initialization issue
//if (!node.stateStores.contains(name))
// throw new TopologyBuilderException("Processor " + node.name() + " has no access to StateStore " + name);
if (!node.stateStores.contains(name)) {
throw new TopologyBuilderException("Processor " + node.name() + " has no access to StateStore " + name);
}
return stateMgr.getStore(name);
}

View File

@ -17,6 +17,8 @@
package org.apache.kafka.streams.processor;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.processor.TopologyBuilder.TopicsInfo;
import org.apache.kafka.streams.processor.internals.InternalTopicConfig;
@ -24,9 +26,11 @@ import org.apache.kafka.streams.processor.internals.InternalTopicManager;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockStateStoreSupplier;
import org.apache.kafka.test.ProcessorTopologyTestDriver;
import org.junit.Test;
import java.util.Arrays;
@ -517,4 +521,66 @@ public class TopologyBuilderTest {
assertEquals(1, properties.size());
}
@Test(expected = TopologyBuilderException.class)
public void shouldThroughOnUnassignedStateStoreAccess() {
final String sourceNodeName = "source";
final String goodNodeName = "goodGuy";
final String badNodeName = "badGuy";
final Properties config = new Properties();
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "host:1");
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
final StreamsConfig streamsConfig = new StreamsConfig(config);
try {
final TopologyBuilder builder = new TopologyBuilder();
builder
.addSource(sourceNodeName, "topic")
.addProcessor(goodNodeName, new LocalMockProcessorSupplier(), sourceNodeName)
.addStateStore(
Stores.create(LocalMockProcessorSupplier.STORE_NAME).withStringKeys().withStringValues().inMemory().build(),
goodNodeName)
.addProcessor(badNodeName, new LocalMockProcessorSupplier(), sourceNodeName);
final ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(streamsConfig, builder, LocalMockProcessorSupplier.STORE_NAME);
driver.process("topic", null, null);
} catch (final StreamsException e) {
final Throwable cause = e.getCause();
if (cause != null
&& cause instanceof TopologyBuilderException
&& cause.getMessage().equals("Invalid topology building: Processor " + badNodeName + " has no access to StateStore " + LocalMockProcessorSupplier.STORE_NAME)) {
throw (TopologyBuilderException) cause;
} else {
throw new RuntimeException("Did expect different exception. Did catch:", e);
}
}
}
private static class LocalMockProcessorSupplier implements ProcessorSupplier {
final static String STORE_NAME = "store";
@Override
public Processor get() {
return new Processor() {
@Override
public void init(ProcessorContext context) {
context.getStateStore(STORE_NAME);
}
@Override
public void process(Object key, Object value) {
}
@Override
public void punctuate(long timestamp) {
}
@Override
public void close() {
}
};
}
}
}