KAFKA-2707: make KStream processor names deterministic

guozhangwang

Author: Yasuhiro Matsuda <yasuhiro@confluent.io>

Reviewers: Guozhang Wang

Closes #408 from ymatsuda/kstream_processor_name
This commit is contained in:
Yasuhiro Matsuda 2015-11-02 14:40:52 -08:00 committed by Guozhang Wang
parent 1f5d05fe71
commit e466ccd711
4 changed files with 51 additions and 30 deletions

View File

@ -22,12 +22,15 @@ import org.apache.kafka.streams.kstream.internals.KStreamImpl;
import org.apache.kafka.streams.processor.TopologyBuilder;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger;
/**
* KStreamBuilder is the class to create KStream instances.
*/
public class KStreamBuilder extends TopologyBuilder {
private final AtomicInteger index = new AtomicInteger(0);
public KStreamBuilder() {
super();
}
@ -40,7 +43,7 @@ public class KStreamBuilder extends TopologyBuilder {
* @return KStream
*/
public <K, V> KStream<K, V> from(String... topics) {
String name = KStreamImpl.SOURCE_NAME + KStreamImpl.INDEX.getAndIncrement();
String name = newName(KStreamImpl.SOURCE_NAME);
addSource(name, topics);
@ -58,10 +61,14 @@ public class KStreamBuilder extends TopologyBuilder {
* @return KStream
*/
public <K, V> KStream<K, V> from(Deserializer<? extends K> keyDeserializer, Deserializer<? extends V> valDeserializer, String... topics) {
String name = KStreamImpl.SOURCE_NAME + KStreamImpl.INDEX.getAndIncrement();
String name = newName(KStreamImpl.SOURCE_NAME);
addSource(name, keyDeserializer, valDeserializer, topics);
return new KStreamImpl<>(this, name, Collections.singleton(name));
}
public String newName(String prefix) {
return prefix + String.format("%010d", index.getAndIncrement());
}
}

View File

@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KeyValue;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
@ -29,12 +30,10 @@ import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.WindowSupplier;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.TopologyBuilder;
import java.lang.reflect.Array;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
public class KStreamImpl<K, V> implements KStream<K, V> {
@ -70,13 +69,11 @@ public class KStreamImpl<K, V> implements KStream<K, V> {
public static final String SOURCE_NAME = "KAFKA-SOURCE-";
public static final AtomicInteger INDEX = new AtomicInteger(1);
protected final TopologyBuilder topology;
protected final KStreamBuilder topology;
protected final String name;
protected final Set<String> sourceNodes;
public KStreamImpl(TopologyBuilder topology, String name, Set<String> sourceNodes) {
public KStreamImpl(KStreamBuilder topology, String name, Set<String> sourceNodes) {
this.topology = topology;
this.name = name;
this.sourceNodes = sourceNodes;
@ -84,7 +81,7 @@ public class KStreamImpl<K, V> implements KStream<K, V> {
@Override
public KStream<K, V> filter(Predicate<K, V> predicate) {
String name = FILTER_NAME + INDEX.getAndIncrement();
String name = topology.newName(FILTER_NAME);
topology.addProcessor(name, new KStreamFilter<>(predicate, false), this.name);
@ -93,7 +90,7 @@ public class KStreamImpl<K, V> implements KStream<K, V> {
@Override
public KStream<K, V> filterOut(final Predicate<K, V> predicate) {
String name = FILTER_NAME + INDEX.getAndIncrement();
String name = topology.newName(FILTER_NAME);
topology.addProcessor(name, new KStreamFilter<>(predicate, true), this.name);
@ -102,7 +99,7 @@ public class KStreamImpl<K, V> implements KStream<K, V> {
@Override
public <K1, V1> KStream<K1, V1> map(KeyValueMapper<K, V, KeyValue<K1, V1>> mapper) {
String name = MAP_NAME + INDEX.getAndIncrement();
String name = topology.newName(MAP_NAME);
topology.addProcessor(name, new KStreamMap<>(mapper), this.name);
@ -111,7 +108,7 @@ public class KStreamImpl<K, V> implements KStream<K, V> {
@Override
public <V1> KStream<K, V1> mapValues(ValueMapper<V, V1> mapper) {
String name = MAPVALUES_NAME + INDEX.getAndIncrement();
String name = topology.newName(MAPVALUES_NAME);
topology.addProcessor(name, new KStreamMapValues<>(mapper), this.name);
@ -120,7 +117,7 @@ public class KStreamImpl<K, V> implements KStream<K, V> {
@Override
public <K1, V1> KStream<K1, V1> flatMap(KeyValueMapper<K, V, Iterable<KeyValue<K1, V1>>> mapper) {
String name = FLATMAP_NAME + INDEX.getAndIncrement();
String name = topology.newName(FLATMAP_NAME);
topology.addProcessor(name, new KStreamFlatMap<>(mapper), this.name);
@ -129,7 +126,7 @@ public class KStreamImpl<K, V> implements KStream<K, V> {
@Override
public <V1> KStream<K, V1> flatMapValues(ValueMapper<V, Iterable<V1>> mapper) {
String name = FLATMAPVALUES_NAME + INDEX.getAndIncrement();
String name = topology.newName(FLATMAPVALUES_NAME);
topology.addProcessor(name, new KStreamFlatMapValues<>(mapper), this.name);
@ -138,7 +135,7 @@ public class KStreamImpl<K, V> implements KStream<K, V> {
@Override
public KStreamWindowed<K, V> with(WindowSupplier<K, V> windowSupplier) {
String name = WINDOWED_NAME + INDEX.getAndIncrement();
String name = topology.newName(WINDOWED_NAME);
topology.addProcessor(name, new KStreamWindow<>(windowSupplier), this.name);
@ -148,13 +145,13 @@ public class KStreamImpl<K, V> implements KStream<K, V> {
@Override
@SuppressWarnings("unchecked")
public KStream<K, V>[] branch(Predicate<K, V>... predicates) {
String branchName = BRANCH_NAME + INDEX.getAndIncrement();
String branchName = topology.newName(BRANCH_NAME);
topology.addProcessor(branchName, new KStreamBranch(predicates.clone()), this.name);
KStream<K, V>[] branchChildren = (KStream<K, V>[]) Array.newInstance(KStream.class, predicates.length);
for (int i = 0; i < predicates.length; i++) {
String childName = BRANCHCHILD_NAME + INDEX.getAndIncrement();
String childName = topology.newName(BRANCHCHILD_NAME);
topology.addProcessor(childName, new KStreamPassThrough<K, V>(), branchName);
@ -170,11 +167,11 @@ public class KStreamImpl<K, V> implements KStream<K, V> {
Serializer<V> valSerializer,
Deserializer<K1> keyDeserializer,
Deserializer<V1> valDeserializer) {
String sendName = SINK_NAME + INDEX.getAndIncrement();
String sendName = topology.newName(SINK_NAME);
topology.addSink(sendName, topic, keySerializer, valSerializer, this.name);
String sourceName = SOURCE_NAME + INDEX.getAndIncrement();
String sourceName = topology.newName(SOURCE_NAME);
topology.addSource(sourceName, keyDeserializer, valDeserializer, topic);
@ -188,21 +185,21 @@ public class KStreamImpl<K, V> implements KStream<K, V> {
@Override
public void to(String topic) {
String name = SINK_NAME + INDEX.getAndIncrement();
String name = topology.newName(SINK_NAME);
topology.addSink(name, topic, this.name);
}
@Override
public void to(String topic, Serializer<K> keySerializer, Serializer<V> valSerializer) {
String name = SINK_NAME + INDEX.getAndIncrement();
String name = topology.newName(SINK_NAME);
topology.addSink(name, topic, keySerializer, valSerializer, this.name);
}
@Override
public <K1, V1> KStream<K1, V1> transform(TransformerSupplier<K, V, KeyValue<K1, V1>> transformerSupplier, String... stateStoreNames) {
String name = TRANSFORM_NAME + INDEX.getAndIncrement();
String name = topology.newName(TRANSFORM_NAME);
topology.addProcessor(name, new KStreamTransform<>(transformerSupplier), this.name);
topology.connectProcessorAndStateStores(name, stateStoreNames);
@ -212,7 +209,7 @@ public class KStreamImpl<K, V> implements KStream<K, V> {
@Override
public <V1> KStream<K, V1> transformValues(ValueTransformerSupplier<V, V1> valueTransformerSupplier, String... stateStoreNames) {
String name = TRANSFORMVALUES_NAME + INDEX.getAndIncrement();
String name = topology.newName(TRANSFORMVALUES_NAME);
topology.addProcessor(name, new KStreamTransformValues<>(valueTransformerSupplier), this.name);
topology.connectProcessorAndStateStores(name, stateStoreNames);
@ -222,7 +219,7 @@ public class KStreamImpl<K, V> implements KStream<K, V> {
@Override
public void process(final ProcessorSupplier<K, V> processorSupplier, String... stateStoreNames) {
String name = PROCESSOR_NAME + INDEX.getAndIncrement();
String name = topology.newName(PROCESSOR_NAME);
topology.addProcessor(name, processorSupplier, this.name);
topology.connectProcessorAndStateStores(name, stateStoreNames);

View File

@ -19,10 +19,10 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KStreamWindowed;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.WindowSupplier;
import org.apache.kafka.streams.processor.TopologyBuilder;
import java.util.HashSet;
import java.util.Set;
@ -31,7 +31,7 @@ public final class KStreamWindowedImpl<K, V> extends KStreamImpl<K, V> implement
private final WindowSupplier<K, V> windowSupplier;
public KStreamWindowedImpl(TopologyBuilder topology, String name, Set<String> sourceNodes, WindowSupplier<K, V> windowSupplier) {
public KStreamWindowedImpl(KStreamBuilder topology, String name, Set<String> sourceNodes, WindowSupplier<K, V> windowSupplier) {
super(topology, name, sourceNodes);
this.windowSupplier = windowSupplier;
}
@ -53,9 +53,9 @@ public final class KStreamWindowedImpl<K, V> extends KStreamImpl<K, V> implement
KStreamJoin<K, V2, V1, V> joinOther = new KStreamJoin<>(thisWindowName, KStreamJoin.reverseJoiner(valueJoiner));
KStreamPassThrough<K, V2> joinMerge = new KStreamPassThrough<>();
String joinThisName = JOINTHIS_NAME + INDEX.getAndIncrement();
String joinOtherName = JOINOTHER_NAME + INDEX.getAndIncrement();
String joinMergeName = JOINMERGE_NAME + INDEX.getAndIncrement();
String joinThisName = topology.newName(JOINTHIS_NAME);
String joinOtherName = topology.newName(JOINOTHER_NAME);
String joinMergeName = topology.newName(JOINMERGE_NAME);
topology.addProcessor(joinThisName, joinThis, this.name);
topology.addProcessor(joinOtherName, joinOther, ((KStreamImpl) other).name);

View File

@ -21,6 +21,8 @@ import org.apache.kafka.streams.kstream.internals.KStreamImpl;
import org.apache.kafka.streams.processor.TopologyException;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
public class KStreamBuilderTest {
@Test(expected = TopologyException.class)
@ -29,6 +31,21 @@ public class KStreamBuilderTest {
builder.from("topic-1", "topic-2");
builder.addSource(KStreamImpl.SOURCE_NAME + KStreamImpl.INDEX.decrementAndGet(), "topic-3");
builder.addSource(KStreamImpl.SOURCE_NAME + "0000000000", "topic-3");
}
@Test
public void testNewName() {
KStreamBuilder builder = new KStreamBuilder();
assertEquals("X-0000000000", builder.newName("X-"));
assertEquals("Y-0000000001", builder.newName("Y-"));
assertEquals("Z-0000000002", builder.newName("Z-"));
builder = new KStreamBuilder();
assertEquals("X-0000000000", builder.newName("X-"));
assertEquals("Y-0000000001", builder.newName("Y-"));
assertEquals("Z-0000000002", builder.newName("Z-"));
}
}