diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java index 5b3feb6b97f..c8a8bd3be32 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java @@ -22,12 +22,15 @@ import org.apache.kafka.streams.kstream.internals.KStreamImpl; import org.apache.kafka.streams.processor.TopologyBuilder; import java.util.Collections; +import java.util.concurrent.atomic.AtomicInteger; /** * KStreamBuilder is the class to create KStream instances. */ public class KStreamBuilder extends TopologyBuilder { + private final AtomicInteger index = new AtomicInteger(0); + public KStreamBuilder() { super(); } @@ -40,7 +43,7 @@ public class KStreamBuilder extends TopologyBuilder { * @return KStream */ public KStream from(String... topics) { - String name = KStreamImpl.SOURCE_NAME + KStreamImpl.INDEX.getAndIncrement(); + String name = newName(KStreamImpl.SOURCE_NAME); addSource(name, topics); @@ -58,10 +61,14 @@ public class KStreamBuilder extends TopologyBuilder { * @return KStream */ public KStream from(Deserializer keyDeserializer, Deserializer valDeserializer, String... topics) { - String name = KStreamImpl.SOURCE_NAME + KStreamImpl.INDEX.getAndIncrement(); + String name = newName(KStreamImpl.SOURCE_NAME); addSource(name, keyDeserializer, valDeserializer, topics); return new KStreamImpl<>(this, name, Collections.singleton(name)); } + + public String newName(String prefix) { + return prefix + String.format("%010d", index.getAndIncrement()); + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 1ea9b1e64fe..09864056f7f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KeyValue; import org.apache.kafka.streams.kstream.TransformerSupplier; import org.apache.kafka.streams.kstream.ValueTransformerSupplier; @@ -29,12 +30,10 @@ import org.apache.kafka.streams.kstream.Predicate; import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.streams.kstream.WindowSupplier; import org.apache.kafka.streams.processor.ProcessorSupplier; -import org.apache.kafka.streams.processor.TopologyBuilder; import java.lang.reflect.Array; import java.util.Collections; import java.util.Set; -import java.util.concurrent.atomic.AtomicInteger; public class KStreamImpl implements KStream { @@ -70,13 +69,11 @@ public class KStreamImpl implements KStream { public static final String SOURCE_NAME = "KAFKA-SOURCE-"; - public static final AtomicInteger INDEX = new AtomicInteger(1); - - protected final TopologyBuilder topology; + protected final KStreamBuilder topology; protected final String name; protected final Set sourceNodes; - public KStreamImpl(TopologyBuilder topology, String name, Set sourceNodes) { + public KStreamImpl(KStreamBuilder topology, String name, Set sourceNodes) { this.topology = topology; this.name = name; this.sourceNodes = sourceNodes; @@ -84,7 +81,7 @@ public class KStreamImpl implements KStream { @Override public KStream filter(Predicate predicate) { - String name = FILTER_NAME + INDEX.getAndIncrement(); + String name = topology.newName(FILTER_NAME); topology.addProcessor(name, new KStreamFilter<>(predicate, false), this.name); @@ -93,7 +90,7 @@ public class KStreamImpl implements KStream { @Override public KStream filterOut(final Predicate predicate) { - String name = FILTER_NAME + INDEX.getAndIncrement(); + String name = topology.newName(FILTER_NAME); topology.addProcessor(name, new KStreamFilter<>(predicate, true), this.name); @@ -102,7 +99,7 @@ public class KStreamImpl implements KStream { @Override public KStream map(KeyValueMapper> mapper) { - String name = MAP_NAME + INDEX.getAndIncrement(); + String name = topology.newName(MAP_NAME); topology.addProcessor(name, new KStreamMap<>(mapper), this.name); @@ -111,7 +108,7 @@ public class KStreamImpl implements KStream { @Override public KStream mapValues(ValueMapper mapper) { - String name = MAPVALUES_NAME + INDEX.getAndIncrement(); + String name = topology.newName(MAPVALUES_NAME); topology.addProcessor(name, new KStreamMapValues<>(mapper), this.name); @@ -120,7 +117,7 @@ public class KStreamImpl implements KStream { @Override public KStream flatMap(KeyValueMapper>> mapper) { - String name = FLATMAP_NAME + INDEX.getAndIncrement(); + String name = topology.newName(FLATMAP_NAME); topology.addProcessor(name, new KStreamFlatMap<>(mapper), this.name); @@ -129,7 +126,7 @@ public class KStreamImpl implements KStream { @Override public KStream flatMapValues(ValueMapper> mapper) { - String name = FLATMAPVALUES_NAME + INDEX.getAndIncrement(); + String name = topology.newName(FLATMAPVALUES_NAME); topology.addProcessor(name, new KStreamFlatMapValues<>(mapper), this.name); @@ -138,7 +135,7 @@ public class KStreamImpl implements KStream { @Override public KStreamWindowed with(WindowSupplier windowSupplier) { - String name = WINDOWED_NAME + INDEX.getAndIncrement(); + String name = topology.newName(WINDOWED_NAME); topology.addProcessor(name, new KStreamWindow<>(windowSupplier), this.name); @@ -148,13 +145,13 @@ public class KStreamImpl implements KStream { @Override @SuppressWarnings("unchecked") public KStream[] branch(Predicate... predicates) { - String branchName = BRANCH_NAME + INDEX.getAndIncrement(); + String branchName = topology.newName(BRANCH_NAME); topology.addProcessor(branchName, new KStreamBranch(predicates.clone()), this.name); KStream[] branchChildren = (KStream[]) Array.newInstance(KStream.class, predicates.length); for (int i = 0; i < predicates.length; i++) { - String childName = BRANCHCHILD_NAME + INDEX.getAndIncrement(); + String childName = topology.newName(BRANCHCHILD_NAME); topology.addProcessor(childName, new KStreamPassThrough(), branchName); @@ -170,11 +167,11 @@ public class KStreamImpl implements KStream { Serializer valSerializer, Deserializer keyDeserializer, Deserializer valDeserializer) { - String sendName = SINK_NAME + INDEX.getAndIncrement(); + String sendName = topology.newName(SINK_NAME); topology.addSink(sendName, topic, keySerializer, valSerializer, this.name); - String sourceName = SOURCE_NAME + INDEX.getAndIncrement(); + String sourceName = topology.newName(SOURCE_NAME); topology.addSource(sourceName, keyDeserializer, valDeserializer, topic); @@ -188,21 +185,21 @@ public class KStreamImpl implements KStream { @Override public void to(String topic) { - String name = SINK_NAME + INDEX.getAndIncrement(); + String name = topology.newName(SINK_NAME); topology.addSink(name, topic, this.name); } @Override public void to(String topic, Serializer keySerializer, Serializer valSerializer) { - String name = SINK_NAME + INDEX.getAndIncrement(); + String name = topology.newName(SINK_NAME); topology.addSink(name, topic, keySerializer, valSerializer, this.name); } @Override public KStream transform(TransformerSupplier> transformerSupplier, String... stateStoreNames) { - String name = TRANSFORM_NAME + INDEX.getAndIncrement(); + String name = topology.newName(TRANSFORM_NAME); topology.addProcessor(name, new KStreamTransform<>(transformerSupplier), this.name); topology.connectProcessorAndStateStores(name, stateStoreNames); @@ -212,7 +209,7 @@ public class KStreamImpl implements KStream { @Override public KStream transformValues(ValueTransformerSupplier valueTransformerSupplier, String... stateStoreNames) { - String name = TRANSFORMVALUES_NAME + INDEX.getAndIncrement(); + String name = topology.newName(TRANSFORMVALUES_NAME); topology.addProcessor(name, new KStreamTransformValues<>(valueTransformerSupplier), this.name); topology.connectProcessorAndStateStores(name, stateStoreNames); @@ -222,7 +219,7 @@ public class KStreamImpl implements KStream { @Override public void process(final ProcessorSupplier processorSupplier, String... stateStoreNames) { - String name = PROCESSOR_NAME + INDEX.getAndIncrement(); + String name = topology.newName(PROCESSOR_NAME); topology.addProcessor(name, processorSupplier, this.name); topology.connectProcessorAndStateStores(name, stateStoreNames); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java index 4e9f4c69da2..cb49873a8ff 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java @@ -19,10 +19,10 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.KafkaException; import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KStreamWindowed; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.WindowSupplier; -import org.apache.kafka.streams.processor.TopologyBuilder; import java.util.HashSet; import java.util.Set; @@ -31,7 +31,7 @@ public final class KStreamWindowedImpl extends KStreamImpl implement private final WindowSupplier windowSupplier; - public KStreamWindowedImpl(TopologyBuilder topology, String name, Set sourceNodes, WindowSupplier windowSupplier) { + public KStreamWindowedImpl(KStreamBuilder topology, String name, Set sourceNodes, WindowSupplier windowSupplier) { super(topology, name, sourceNodes); this.windowSupplier = windowSupplier; } @@ -53,9 +53,9 @@ public final class KStreamWindowedImpl extends KStreamImpl implement KStreamJoin joinOther = new KStreamJoin<>(thisWindowName, KStreamJoin.reverseJoiner(valueJoiner)); KStreamPassThrough joinMerge = new KStreamPassThrough<>(); - String joinThisName = JOINTHIS_NAME + INDEX.getAndIncrement(); - String joinOtherName = JOINOTHER_NAME + INDEX.getAndIncrement(); - String joinMergeName = JOINMERGE_NAME + INDEX.getAndIncrement(); + String joinThisName = topology.newName(JOINTHIS_NAME); + String joinOtherName = topology.newName(JOINOTHER_NAME); + String joinMergeName = topology.newName(JOINMERGE_NAME); topology.addProcessor(joinThisName, joinThis, this.name); topology.addProcessor(joinOtherName, joinOther, ((KStreamImpl) other).name); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java index 49171e340b3..cf1cfaa0f98 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java @@ -21,6 +21,8 @@ import org.apache.kafka.streams.kstream.internals.KStreamImpl; import org.apache.kafka.streams.processor.TopologyException; import org.junit.Test; +import static org.junit.Assert.assertEquals; + public class KStreamBuilderTest { @Test(expected = TopologyException.class) @@ -29,6 +31,21 @@ public class KStreamBuilderTest { builder.from("topic-1", "topic-2"); - builder.addSource(KStreamImpl.SOURCE_NAME + KStreamImpl.INDEX.decrementAndGet(), "topic-3"); + builder.addSource(KStreamImpl.SOURCE_NAME + "0000000000", "topic-3"); + } + + @Test + public void testNewName() { + KStreamBuilder builder = new KStreamBuilder(); + + assertEquals("X-0000000000", builder.newName("X-")); + assertEquals("Y-0000000001", builder.newName("Y-")); + assertEquals("Z-0000000002", builder.newName("Z-")); + + builder = new KStreamBuilder(); + + assertEquals("X-0000000000", builder.newName("X-")); + assertEquals("Y-0000000001", builder.newName("Y-")); + assertEquals("Z-0000000002", builder.newName("Z-")); } }