KAFKA-6958: Allow to name operation using parameter classes (#6410)

This is the 2nd PR for the KIP-307
Reviewers: Matthias J. Sax <mjsax@apache.org>,  John Roesler <john@confluent.io>, Bill Bejeck <bbejeck@gmail.com>
This commit is contained in:
Florian Hussonnois 2019-04-19 00:45:34 +02:00 committed by Bill Bejeck
parent 7b4b298edd
commit 075b368d47
21 changed files with 522 additions and 66 deletions

View File

@ -229,8 +229,10 @@ public class StreamsBuilder {
Objects.requireNonNull(materialized, "materialized can't be null");
final ConsumedInternal<K, V> consumedInternal = new ConsumedInternal<>(consumed);
materialized.withKeySerde(consumedInternal.keySerde()).withValueSerde(consumedInternal.valueSerde());
final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal =
new MaterializedInternal<>(materialized, internalStreamsBuilder, topic + "-");
return internalStreamsBuilder.table(topic, consumedInternal, materializedInternal);
}
@ -280,8 +282,12 @@ public class StreamsBuilder {
Objects.requireNonNull(topic, "topic can't be null");
Objects.requireNonNull(consumed, "consumed can't be null");
final ConsumedInternal<K, V> consumedInternal = new ConsumedInternal<>(consumed);
final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal =
new MaterializedInternal<>(Materialized.with(consumedInternal.keySerde(), consumedInternal.valueSerde()), internalStreamsBuilder, topic + "-");
new MaterializedInternal<>(
Materialized.with(consumedInternal.keySerde(), consumedInternal.valueSerde()),
internalStreamsBuilder, topic + "-");
return internalStreamsBuilder.table(topic, consumedInternal, materializedInternal);
}
@ -307,8 +313,10 @@ public class StreamsBuilder {
final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
Objects.requireNonNull(topic, "topic can't be null");
Objects.requireNonNull(materialized, "materialized can't be null");
final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal =
new MaterializedInternal<>(materialized, internalStreamsBuilder, topic + "-");
final ConsumedInternal<K, V> consumedInternal =
new ConsumedInternal<>(Consumed.with(materializedInternal.keySerde(), materializedInternal.valueSerde()));
@ -336,8 +344,11 @@ public class StreamsBuilder {
Objects.requireNonNull(topic, "topic can't be null");
Objects.requireNonNull(consumed, "consumed can't be null");
final ConsumedInternal<K, V> consumedInternal = new ConsumedInternal<>(consumed);
final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal =
new MaterializedInternal<>(Materialized.with(consumedInternal.keySerde(), consumedInternal.valueSerde()), internalStreamsBuilder, topic + "-");
new MaterializedInternal<>(
Materialized.with(consumedInternal.keySerde(), consumedInternal.valueSerde()),
internalStreamsBuilder, topic + "-");
return internalStreamsBuilder.globalTable(topic, consumedInternal, materializedInternal);
}
@ -403,6 +414,7 @@ public class StreamsBuilder {
final ConsumedInternal<K, V> consumedInternal = new ConsumedInternal<>(consumed);
// always use the serdes from consumed
materialized.withKeySerde(consumedInternal.keySerde()).withValueSerde(consumedInternal.valueSerde());
final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal =
new MaterializedInternal<>(materialized, internalStreamsBuilder, topic + "-");

View File

@ -50,21 +50,24 @@ import java.util.Objects;
* @param <K> type of record key
* @param <V> type of record value
*/
public class Consumed<K, V> {
public class Consumed<K, V> implements NamedOperation<Consumed<K, V>> {
protected Serde<K> keySerde;
protected Serde<V> valueSerde;
protected TimestampExtractor timestampExtractor;
protected Topology.AutoOffsetReset resetPolicy;
protected String processorName;
private Consumed(final Serde<K> keySerde,
final Serde<V> valueSerde,
final TimestampExtractor timestampExtractor,
final Topology.AutoOffsetReset resetPolicy) {
final Topology.AutoOffsetReset resetPolicy,
final String processorName) {
this.keySerde = keySerde;
this.valueSerde = valueSerde;
this.timestampExtractor = timestampExtractor;
this.resetPolicy = resetPolicy;
this.processorName = processorName;
}
/**
@ -72,7 +75,12 @@ public class Consumed<K, V> {
* @param consumed the instance of {@link Consumed} to copy
*/
protected Consumed(final Consumed<K, V> consumed) {
this(consumed.keySerde, consumed.valueSerde, consumed.timestampExtractor, consumed.resetPolicy);
this(consumed.keySerde,
consumed.valueSerde,
consumed.timestampExtractor,
consumed.resetPolicy,
consumed.processorName
);
}
/**
@ -90,7 +98,7 @@ public class Consumed<K, V> {
final Serde<V> valueSerde,
final TimestampExtractor timestampExtractor,
final Topology.AutoOffsetReset resetPolicy) {
return new Consumed<>(keySerde, valueSerde, timestampExtractor, resetPolicy);
return new Consumed<>(keySerde, valueSerde, timestampExtractor, resetPolicy, null);
}
@ -105,7 +113,7 @@ public class Consumed<K, V> {
*/
public static <K, V> Consumed<K, V> with(final Serde<K> keySerde,
final Serde<V> valueSerde) {
return new Consumed<>(keySerde, valueSerde, null, null);
return new Consumed<>(keySerde, valueSerde, null, null, null);
}
/**
@ -117,7 +125,7 @@ public class Consumed<K, V> {
* @return a new instance of {@link Consumed}
*/
public static <K, V> Consumed<K, V> with(final TimestampExtractor timestampExtractor) {
return new Consumed<>(null, null, timestampExtractor, null);
return new Consumed<>(null, null, timestampExtractor, null, null);
}
/**
@ -129,7 +137,19 @@ public class Consumed<K, V> {
* @return a new instance of {@link Consumed}
*/
public static <K, V> Consumed<K, V> with(final Topology.AutoOffsetReset resetPolicy) {
return new Consumed<>(null, null, null, resetPolicy);
return new Consumed<>(null, null, null, resetPolicy, null);
}
/**
* Create an instance of {@link Consumed} with provided processor name.
*
* @param processorName the processor name to be used. If {@code null} a default processor name will be generated
* @param <K> key type
* @param <V> value type
* @return a new instance of {@link Consumed}
*/
public static <K, V> Consumed<K, V> as(final String processorName) {
return new Consumed<>(null, null, null, null, processorName);
}
/**
@ -176,6 +196,18 @@ public class Consumed<K, V> {
return this;
}
/**
* Configure the instance of {@link Consumed} with a processor name.
*
* @param processorName the processor name to be used. If {@code null} a default processor name will be generated
* @return this
*/
@Override
public Consumed<K, V> withName(final String processorName) {
this.processorName = processorName;
return this;
}
@Override
public boolean equals(final Object o) {
if (this == o) {

View File

@ -29,7 +29,7 @@ import org.apache.kafka.common.serialization.Serde;
* @param <K> the key type
* @param <V> the value type
*/
public class Grouped<K, V> {
public class Grouped<K, V> implements NamedOperation<Grouped<K, V>> {
protected final Serde<K> keySerde;
protected final Serde<V> valueSerde;
@ -128,9 +128,10 @@ public class Grouped<K, V> {
* Perform the grouping operation with the name for a repartition topic if required. Note
* that Kafka Streams does not always create repartition topics for grouping operations.
*
* @param name the name used as part of the repartition topic name if required
* @param name the name used for the processor name and as part of the repartition topic name if required
* @return a new {@link Grouped} instance configured with the name
* */
@Override
public Grouped<K, V> withName(final String name) {
return new Grouped<>(name, keySerde, valueSerde);
}

View File

@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.kstream;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.internals.ApiUtils;
@ -100,7 +99,7 @@ public class Materialized<K, V, S extends StateStore> {
* @return a new {@link Materialized} instance with the given storeName
*/
public static <K, V, S extends StateStore> Materialized<K, V, S> as(final String storeName) {
Topic.validate(storeName);
Named.validate(storeName);
return new Materialized<>(storeName);
}

View File

@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream;
import org.apache.kafka.streams.errors.TopologyException;
import java.util.Objects;
public class Named implements NamedOperation<Named> {
private static final int MAX_NAME_LENGTH = 249;
protected String name;
protected Named(final String name) {
this.name = name;
if (name != null) {
validate(name);
}
}
/**
* Create a Named instance with provided name.
*
* @param name the processor name to be used. If {@code null} a default processor name will be generated.
* @return A new {@link Named} instance configured with name
*
* @throws TopologyException if an invalid name is specified; valid characters are ASCII alphanumerics, '.', '_' and '-'.
*/
public static Named as(final String name) {
Objects.requireNonNull(name, "name can't be null");
return new Named(name);
}
@Override
public Named withName(final String name) {
return new Named(name);
}
static void validate(final String name) {
if (name.isEmpty())
throw new TopologyException("Name is illegal, it can't be empty");
if (name.equals(".") || name.equals(".."))
throw new TopologyException("Name cannot be \".\" or \"..\"");
if (name.length() > MAX_NAME_LENGTH)
throw new TopologyException("Name is illegal, it can't be longer than " + MAX_NAME_LENGTH +
" characters, name: " + name);
if (!containsValidPattern(name))
throw new TopologyException("Name \"" + name + "\" is illegal, it contains a character other than " +
"ASCII alphanumerics, '.', '_' and '-'");
}
/**
* Valid characters for Kafka topics are the ASCII alphanumerics, '.', '_', and '-'
*/
private static boolean containsValidPattern(final String topic) {
for (int i = 0; i < topic.length(); ++i) {
final char c = topic.charAt(i);
// We don't use Character.isLetterOrDigit(c) because it's slower
final boolean validLetterOrDigit = (c >= 'a' && c <= 'z') || (c >= '0' && c <= '9') || (c >= 'A' && c <= 'Z');
final boolean validChar = validLetterOrDigit || c == '.' || c == '_' || c == '-';
if (!validChar) {
return false;
}
}
return true;
}
}

View File

@ -31,9 +31,10 @@ import java.util.Objects;
* @param <V> value type
* @see KStream#print(Printed)
*/
public class Printed<K, V> {
public class Printed<K, V> implements NamedOperation<Printed<K, V>> {
protected final OutputStream outputStream;
protected String label;
protected String processorName;
protected KeyValueMapper<? super K, ? super V, String> mapper = new KeyValueMapper<K, V, String>() {
@Override
public String apply(final K key, final V value) {
@ -53,6 +54,7 @@ public class Printed<K, V> {
this.outputStream = printed.outputStream;
this.label = printed.label;
this.mapper = printed.mapper;
this.processorName = printed.processorName;
}
/**
@ -122,4 +124,16 @@ public class Printed<K, V> {
this.mapper = mapper;
return this;
}
/**
* Print the records of a {@link KStream} with provided processor name.
*
* @param processorName the processor name to be used. If {@code null} a default processor name will be generated
** @return this
*/
@Override
public Printed<K, V> withName(final String processorName) {
this.processorName = processorName;
return this;
}
}

View File

@ -30,24 +30,28 @@ import java.util.Objects;
* @param <K> key type
* @param <V> value type
*/
public class Produced<K, V> {
public class Produced<K, V> implements NamedOperation<Produced<K, V>> {
protected Serde<K> keySerde;
protected Serde<V> valueSerde;
protected StreamPartitioner<? super K, ? super V> partitioner;
protected String processorName;
private Produced(final Serde<K> keySerde,
final Serde<V> valueSerde,
final StreamPartitioner<? super K, ? super V> partitioner) {
final StreamPartitioner<? super K, ? super V> partitioner,
final String processorName) {
this.keySerde = keySerde;
this.valueSerde = valueSerde;
this.partitioner = partitioner;
this.processorName = processorName;
}
protected Produced(final Produced<K, V> produced) {
this.keySerde = produced.keySerde;
this.valueSerde = produced.valueSerde;
this.partitioner = produced.partitioner;
this.processorName = produced.processorName;
}
/**
@ -62,7 +66,7 @@ public class Produced<K, V> {
*/
public static <K, V> Produced<K, V> with(final Serde<K> keySerde,
final Serde<V> valueSerde) {
return new Produced<>(keySerde, valueSerde, null);
return new Produced<>(keySerde, valueSerde, null, null);
}
/**
@ -82,7 +86,19 @@ public class Produced<K, V> {
public static <K, V> Produced<K, V> with(final Serde<K> keySerde,
final Serde<V> valueSerde,
final StreamPartitioner<? super K, ? super V> partitioner) {
return new Produced<>(keySerde, valueSerde, partitioner);
return new Produced<>(keySerde, valueSerde, partitioner, null);
}
/**
* Create an instance of {@link Produced} with provided processor name.
*
* @param processorName the processor name to be used. If {@code null} a default processor name will be generated
* @param <K> key type
* @param <V> value type
* @return a new instance of {@link Produced}
*/
public static <K, V> Produced<K, V> as(final String processorName) {
return new Produced<>(null, null, null, processorName);
}
/**
@ -95,7 +111,7 @@ public class Produced<K, V> {
* @see KStream#to(String, Produced)
*/
public static <K, V> Produced<K, V> keySerde(final Serde<K> keySerde) {
return new Produced<>(keySerde, null, null);
return new Produced<>(keySerde, null, null, null);
}
/**
@ -108,7 +124,7 @@ public class Produced<K, V> {
* @see KStream#to(String, Produced)
*/
public static <K, V> Produced<K, V> valueSerde(final Serde<V> valueSerde) {
return new Produced<>(null, valueSerde, null);
return new Produced<>(null, valueSerde, null, null);
}
/**
@ -123,7 +139,7 @@ public class Produced<K, V> {
* @see KStream#to(String, Produced)
*/
public static <K, V> Produced<K, V> streamPartitioner(final StreamPartitioner<? super K, ? super V> partitioner) {
return new Produced<>(null, null, partitioner);
return new Produced<>(null, null, partitioner, null);
}
/**
@ -176,4 +192,10 @@ public class Produced<K, V> {
public int hashCode() {
return Objects.hash(keySerde, valueSerde, partitioner);
}
@Override
public Produced<K, V> withName(final String name) {
this.processorName = name;
return this;
}
}

View File

@ -23,6 +23,7 @@ import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.processor.TimestampExtractor;
public class ConsumedInternal<K, V> extends Consumed<K, V> {
public ConsumedInternal(final Consumed<K, V> consumed) {
super(consumed);
}
@ -62,4 +63,8 @@ public class ConsumedInternal<K, V> extends Consumed<K, V> {
public Topology.AutoOffsetReset offsetResetPolicy() {
return resetPolicy;
}
public String name() {
return processorName;
}
}

View File

@ -17,7 +17,7 @@
package org.apache.kafka.streams.kstream.internals;
public interface InternalNameProvider {
String newProcessorName(String prefix);
String newProcessorName(final String prefix);
String newStoreName(String prefix);
String newStoreName(final String prefix);
}

View File

@ -79,7 +79,8 @@ public class InternalStreamsBuilder implements InternalNameProvider {
public <K, V> KStream<K, V> stream(final Collection<String> topics,
final ConsumedInternal<K, V> consumed) {
final String name = newProcessorName(KStreamImpl.SOURCE_NAME);
final String name = new NamedInternal(consumed.name()).orElseGenerateWithPrefix(this, KStreamImpl.SOURCE_NAME);
final StreamSourceNode<K, V> streamSourceNode = new StreamSourceNode<>(name, topics, consumed);
addGraphNode(root, streamSourceNode);
@ -112,8 +113,10 @@ public class InternalStreamsBuilder implements InternalNameProvider {
public <K, V> KTable<K, V> table(final String topic,
final ConsumedInternal<K, V> consumed,
final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
final String sourceName = newProcessorName(KStreamImpl.SOURCE_NAME);
final String tableSourceName = newProcessorName(KTableImpl.SOURCE_NAME);
final String sourceName = new NamedInternal(consumed.name())
.orElseGenerateWithPrefix(this, KStreamImpl.SOURCE_NAME);
final String tableSourceName = new NamedInternal(consumed.name())
.suffixWithOrElseGet("-table-source", () -> newProcessorName(KTableImpl.SOURCE_NAME));
final KTableSource<K, V> tableSource = new KTableSource<>(materialized.storeName(), materialized.queryableStoreName());
final ProcessorParameters<K, V> processorParameters = new ProcessorParameters<>(tableSource, tableSourceName);

View File

@ -168,7 +168,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
public <KR> KStream<KR, V> selectKey(final KeyValueMapper<? super K, ? super V, ? extends KR> mapper) {
Objects.requireNonNull(mapper, "mapper can't be null");
final ProcessorGraphNode<K, V> selectKeyProcessorNode = internalSelectKey(mapper);
final ProcessorGraphNode<K, V> selectKeyProcessorNode = internalSelectKey(mapper, NamedInternal.empty());
selectKeyProcessorNode.keyChangingOperation(true);
builder.addGraphNode(this.streamsGraphNode, selectKeyProcessorNode);
@ -178,9 +178,9 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
}
private <KR> ProcessorGraphNode<K, V> internalSelectKey(final KeyValueMapper<? super K, ? super V, ? extends KR> mapper) {
final String name = builder.newProcessorName(KEY_SELECT_NAME);
private <KR> ProcessorGraphNode<K, V> internalSelectKey(final KeyValueMapper<? super K, ? super V, ? extends KR> mapper,
final NamedInternal named) {
final String name = named.orElseGenerateWithPrefix(builder, KEY_SELECT_NAME);
final KStreamMap<K, V, KR, V> kStreamMap = new KStreamMap<>((key, value) -> new KeyValue<>(mapper.apply(key, value), value));
final ProcessorParameters<K, V> processorParameters = new ProcessorParameters<>(kStreamMap, name);
@ -241,8 +241,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
public void print(final Printed<K, V> printed) {
Objects.requireNonNull(printed, "printed can't be null");
final PrintedInternal<K, V> printedInternal = new PrintedInternal<>(printed);
final String name = builder.newProcessorName(PRINTING_NAME);
final String name = new NamedInternal(printedInternal.name()).orElseGenerateWithPrefix(builder, PRINTING_NAME);
final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(printedInternal.build(this.name), name);
final ProcessorGraphNode<? super K, ? super V> printNode = new ProcessorGraphNode<>(name, processorParameters);
@ -428,8 +427,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
}
private void to(final TopicNameExtractor<K, V> topicExtractor, final ProducedInternal<K, V> produced) {
final String name = builder.newProcessorName(SINK_NAME);
final String name = new NamedInternal(produced.name()).orElseGenerateWithPrefix(builder, SINK_NAME);
final StreamSinkNode<K, V> sinkNode = new StreamSinkNode<>(
name,
topicExtractor,
@ -852,7 +850,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
Objects.requireNonNull(selector, "selector can't be null");
Objects.requireNonNull(grouped, "grouped can't be null");
final GroupedInternal<KR, V> groupedInternal = new GroupedInternal<>(grouped);
final ProcessorGraphNode<K, V> selectKeyMapNode = internalSelectKey(selector);
final ProcessorGraphNode<K, V> selectKeyMapNode = internalSelectKey(selector, new NamedInternal(groupedInternal.name()));
selectKeyMapNode.keyChangingOperation(true);
builder.addGraphNode(this.streamsGraphNode, selectKeyMapNode);

View File

@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.kstream.Named;
import java.util.Optional;
import java.util.function.Supplier;
public class NamedInternal extends Named {
public static NamedInternal empty() {
return new NamedInternal(null);
}
public static NamedInternal with(final String name) {
return new NamedInternal(name);
}
/**
* Creates a new {@link NamedInternal} instance.
*
* @param internal the internal name.
*/
NamedInternal(final String internal) {
super(internal);
}
/**
* @return a string name.
*/
public String name() {
return name;
}
@Override
public NamedInternal withName(final String name) {
return new NamedInternal(name);
}
/**
* Check whether an internal name is defined.
* @return {@code false} if no name is set.
*/
public boolean isDefined() {
return name != null;
}
String suffixWithOrElseGet(final String suffix, final Supplier<String> supplier) {
final Optional<String> suffixed = Optional.ofNullable(this.name).map(s -> s + suffix);
// Creating a new named will re-validate generated name as suffixed string could be too large.
return new NamedInternal(suffixed.orElseGet(supplier)).name();
}
String orElseGenerateWithPrefix(final InternalNameProvider provider, final String prefix) {
return orElseGet(() -> provider.newProcessorName(prefix));
}
/**
* Returns the internal name or the value returns from the supplier.
*
* @param supplier the supplier to be used if internal name is empty.
* @return an internal string name.
*/
private String orElseGet(final Supplier<String> supplier) {
return Optional.ofNullable(this.name).orElseGet(supplier);
}
}

View File

@ -27,4 +27,8 @@ public class PrintedInternal<K, V> extends Printed<K, V> {
public ProcessorSupplier<K, V> build(final String processorName) {
return new KStreamPrint<>(new PrintForeachAction<>(outputStream, mapper, label != null ? label : processorName));
}
public String name() {
return processorName;
}
}

View File

@ -21,6 +21,7 @@ import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.processor.StreamPartitioner;
public class ProducedInternal<K, V> extends Produced<K, V> {
public ProducedInternal(final Produced<K, V> produced) {
super(produced);
}
@ -36,4 +37,9 @@ public class ProducedInternal<K, V> extends Produced<K, V> {
public StreamPartitioner<? super K, ? super V> streamPartitioner() {
return partitioner;
}
public String name() {
return processorName;
}
}

View File

@ -106,7 +106,8 @@ public class SuppressedInternal<K> implements Suppressed<K>, NamedSuppressed<K>
@Override
public String toString() {
return "SuppressedInternal{name='" + name + '\'' +
return "SuppressedInternal{" +
"name='" + name + '\'' +
", bufferConfig=" + bufferConfig +
", timeToWaitForMoreEvents=" + timeToWaitForMoreEvents +
", timeDefinition=" + timeDefinition +

View File

@ -22,11 +22,15 @@ import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
@ -35,11 +39,13 @@ import org.apache.kafka.test.MockPredicate;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@ -51,7 +57,14 @@ import static org.junit.Assert.assertTrue;
public class StreamsBuilderTest {
private static final String STREAM_TOPIC = "stream-topic";
private static final String STREAM_TOPIC_TWO = "stream-topic-two";
private static final String TABLE_TOPIC = "table-topic";
private final StreamsBuilder builder = new StreamsBuilder();
private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
@Test
@ -65,10 +78,10 @@ public class StreamsBuilderTest {
@Test
public void shouldAllowJoinUnmaterializedFilteredKTable() {
final KTable<Bytes, String> filteredKTable = builder
.<Bytes, String>table("table-topic")
.<Bytes, String>table(TABLE_TOPIC)
.filter(MockPredicate.allGoodPredicate());
builder
.<Bytes, String>stream("stream-topic")
.<Bytes, String>stream(STREAM_TOPIC)
.join(filteredKTable, MockValueJoiner.TOSTRING_JOINER);
builder.build();
@ -88,10 +101,10 @@ public class StreamsBuilderTest {
@Test
public void shouldAllowJoinMaterializedFilteredKTable() {
final KTable<Bytes, String> filteredKTable = builder
.<Bytes, String>table("table-topic")
.<Bytes, String>table(TABLE_TOPIC)
.filter(MockPredicate.allGoodPredicate(), Materialized.as("store"));
builder
.<Bytes, String>stream("stream-topic")
.<Bytes, String>stream(STREAM_TOPIC)
.join(filteredKTable, MockValueJoiner.TOSTRING_JOINER);
builder.build();
@ -112,10 +125,10 @@ public class StreamsBuilderTest {
@Test
public void shouldAllowJoinUnmaterializedMapValuedKTable() {
final KTable<Bytes, String> mappedKTable = builder
.<Bytes, String>table("table-topic")
.<Bytes, String>table(TABLE_TOPIC)
.mapValues(MockMapper.noOpValueMapper());
builder
.<Bytes, String>stream("stream-topic")
.<Bytes, String>stream(STREAM_TOPIC)
.join(mappedKTable, MockValueJoiner.TOSTRING_JOINER);
builder.build();
@ -135,10 +148,10 @@ public class StreamsBuilderTest {
@Test
public void shouldAllowJoinMaterializedMapValuedKTable() {
final KTable<Bytes, String> mappedKTable = builder
.<Bytes, String>table("table-topic")
.<Bytes, String>table(TABLE_TOPIC)
.mapValues(MockMapper.noOpValueMapper(), Materialized.as("store"));
builder
.<Bytes, String>stream("stream-topic")
.<Bytes, String>stream(STREAM_TOPIC)
.join(mappedKTable, MockValueJoiner.TOSTRING_JOINER);
builder.build();
@ -161,7 +174,7 @@ public class StreamsBuilderTest {
final KTable<Bytes, String> table1 = builder.table("table-topic1");
final KTable<Bytes, String> table2 = builder.table("table-topic2");
builder
.<Bytes, String>stream("stream-topic")
.<Bytes, String>stream(STREAM_TOPIC)
.join(table1.join(table2, MockValueJoiner.TOSTRING_JOINER), MockValueJoiner.TOSTRING_JOINER);
builder.build();
@ -183,7 +196,7 @@ public class StreamsBuilderTest {
final KTable<Bytes, String> table1 = builder.table("table-topic1");
final KTable<Bytes, String> table2 = builder.table("table-topic2");
builder
.<Bytes, String>stream("stream-topic")
.<Bytes, String>stream(STREAM_TOPIC)
.join(
table1.join(table2, MockValueJoiner.TOSTRING_JOINER, Materialized.as("store")),
MockValueJoiner.TOSTRING_JOINER);
@ -205,8 +218,8 @@ public class StreamsBuilderTest {
@Test
public void shouldAllowJoinMaterializedSourceKTable() {
final KTable<Bytes, String> table = builder.table("table-topic");
builder.<Bytes, String>stream("stream-topic").join(table, MockValueJoiner.TOSTRING_JOINER);
final KTable<Bytes, String> table = builder.table(TABLE_TOPIC);
builder.<Bytes, String>stream(STREAM_TOPIC).join(table, MockValueJoiner.TOSTRING_JOINER);
builder.build();
final ProcessorTopology topology =
@ -403,4 +416,72 @@ public class StreamsBuilderTest {
builder.stream(Arrays.asList(null, null));
builder.build();
}
@Test
public void shouldUseSpecifiedNameForStreamSourceProcessor() {
final String expected = "source-node";
builder.stream(STREAM_TOPIC, Consumed.as(expected));
builder.stream(STREAM_TOPIC_TWO);
builder.build();
final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
assertSpecifiedNameForOperation(topology, expected, "KSTREAM-SOURCE-0000000000");
}
@Test
public void shouldUseSpecifiedNameForTableSourceProcessor() {
final String expected = "source-node";
builder.table(STREAM_TOPIC, Consumed.as(expected));
builder.table(STREAM_TOPIC_TWO);
builder.build();
final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
assertSpecifiedNameForOperation(
topology,
expected,
expected + "-table-source",
"KSTREAM-SOURCE-0000000002",
"KTABLE-SOURCE-0000000003");
}
@Test
public void shouldUseSpecifiedNameForGlobalTableSourceProcessor() {
final String expected = "source-processor";
builder.globalTable(STREAM_TOPIC, Consumed.as(expected));
builder.globalTable(STREAM_TOPIC_TWO);
builder.build();
final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
assertSpecifiedNameForStateStore(
topology.globalStateStores(),
"stream-topic-STATE-STORE-0000000000",
"stream-topic-two-STATE-STORE-0000000003"
);
}
@Test
public void shouldUseSpecifiedNameForSinkProcessor() {
final String expected = "sink-processor";
final KStream<Object, Object> stream = builder.stream(STREAM_TOPIC);
stream.to(STREAM_TOPIC_TWO, Produced.as(expected));
stream.to(STREAM_TOPIC_TWO);
builder.build();
final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build();
assertSpecifiedNameForOperation(topology, "KSTREAM-SOURCE-0000000000", expected, "KSTREAM-SINK-0000000001");
}
private void assertSpecifiedNameForOperation(final ProcessorTopology topology, final String... expected) {
final List<ProcessorNode> processors = topology.processors();
Assert.assertEquals("Invalid number of expected processors", expected.length, processors.size());
for (int i = 0; i < expected.length; i++) {
assertEquals(expected[i], processors.get(i).name());
}
}
private void assertSpecifiedNameForStateStore(final List<StateStore> stores, final String... expected) {
Assert.assertEquals("Invalid number of expected state stores", expected.length, stores.size());
for (int i = 0; i < expected.length; i++) {
assertEquals(expected[i], stores.get(i).name());
}
}
}

View File

@ -17,7 +17,7 @@
package org.apache.kafka.streams.kstream;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
@ -32,7 +32,7 @@ public class MaterializedTest {
Materialized.as("valid_name");
}
@Test(expected = InvalidTopicException.class)
@Test(expected = TopologyException.class)
public void shouldNotAllowInvalidTopicNames() {
Materialized.as("not:valid");
}

View File

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream;
import org.apache.kafka.streams.errors.TopologyException;
import org.junit.Test;
import java.util.Arrays;
import static org.junit.Assert.fail;
public class NamedTest {
@Test(expected = NullPointerException.class)
public void shouldThrowExceptionGivenNullName() {
Named.as(null);
}
@Test
public void shouldThrowExceptionOnInvalidTopicNames() {
final char[] longString = new char[250];
Arrays.fill(longString, 'a');
final String[] invalidNames = {"", "foo bar", "..", "foo:bar", "foo=bar", ".", new String(longString)};
for (final String name : invalidNames) {
try {
Named.validate(name);
fail("No exception was thrown for named with invalid name: " + name);
} catch (final TopologyException e) {
// success
}
}
}
}

View File

@ -18,13 +18,13 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KGroupedStream;
@ -85,7 +85,7 @@ public class KGroupedStreamImplTest {
groupedStream.reduce(null);
}
@Test(expected = InvalidTopicException.class)
@Test(expected = TopologyException.class)
public void shouldNotHaveInvalidStoreNameOnReduce() {
groupedStream.reduce(MockReducer.STRING_ADDER, Materialized.as(INVALID_STORE_NAME));
}
@ -102,7 +102,7 @@ public class KGroupedStreamImplTest {
groupedStream.windowedBy((Windows) null);
}
@Test(expected = InvalidTopicException.class)
@Test(expected = TopologyException.class)
public void shouldNotHaveInvalidStoreNameWithWindowedReduce() {
groupedStream
.windowedBy(TimeWindows.of(ofMillis(10)))
@ -119,7 +119,7 @@ public class KGroupedStreamImplTest {
groupedStream.aggregate(MockInitializer.STRING_INIT, null, Materialized.as("store"));
}
@Test(expected = InvalidTopicException.class)
@Test(expected = TopologyException.class)
public void shouldNotHaveInvalidStoreNameOnAggregate() {
groupedStream.aggregate(
MockInitializer.STRING_INIT,
@ -146,7 +146,7 @@ public class KGroupedStreamImplTest {
groupedStream.windowedBy((Windows) null);
}
@Test(expected = InvalidTopicException.class)
@Test(expected = TopologyException.class)
public void shouldNotHaveInvalidStoreNameOnWindowedAggregate() {
groupedStream
.windowedBy(TimeWindows.of(ofMillis(10)))
@ -284,7 +284,7 @@ public class KGroupedStreamImplTest {
groupedStream.windowedBy((SessionWindows) null);
}
@Test(expected = InvalidTopicException.class)
@Test(expected = TopologyException.class)
public void shouldNotAcceptInvalidStoreNameWhenReducingSessionWindows() {
groupedStream
.windowedBy(SessionWindows.with(ofMillis(30)))
@ -349,7 +349,7 @@ public class KGroupedStreamImplTest {
Materialized.with(Serdes.String(), Serdes.String()));
}
@Test(expected = InvalidTopicException.class)
@Test(expected = TopologyException.class)
public void shouldNotAcceptInvalidStoreNameWhenAggregatingSessionWindows() {
groupedStream
.windowedBy(SessionWindows.with(ofMillis(10)))

View File

@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.serialization.DoubleSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
@ -24,6 +23,7 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KGroupedTable;
@ -64,7 +64,7 @@ public class KGroupedTableImplTest {
.groupBy(MockMapper.selectValueKeyValueMapper());
}
@Test(expected = InvalidTopicException.class)
@Test(expected = TopologyException.class)
public void shouldNotAllowInvalidStoreNameOnAggregate() {
groupedTable.aggregate(
MockInitializer.STRING_INIT,
@ -116,7 +116,7 @@ public class KGroupedTableImplTest {
Materialized.as("store"));
}
@Test(expected = InvalidTopicException.class)
@Test(expected = TopologyException.class)
public void shouldNotAllowInvalidStoreNameOnReduce() {
groupedTable.reduce(
MockReducer.STRING_ADDER,

View File

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
public class NamedInternalTest {
private static final String TEST_VALUE = "default-value";
private static final String TEST_SUFFIX = "-suffix";
@Test
public void shouldSuffixNameOrReturnProviderValue() {
final String name = "foo";
assertEquals(
name + TEST_SUFFIX,
NamedInternal.with(name).suffixWithOrElseGet(TEST_SUFFIX, () -> TEST_VALUE)
);
assertEquals(
TEST_VALUE,
NamedInternal.with(null).suffixWithOrElseGet(TEST_SUFFIX, () -> TEST_VALUE)
);
}
@Test
public void shouldGenerateWithPrefixGivenEmptyName() {
final String prefix = "KSTREAM-MAP-";
assertEquals(prefix + "PROCESSOR-NAME", NamedInternal.with(null).orElseGenerateWithPrefix(
new InternalNameProvider() {
@Override
public String newProcessorName(final String prefix) {
return prefix + "PROCESSOR-NAME";
}
@Override
public String newStoreName(final String prefix) {
return null;
}
},
prefix)
);
}
@Test
public void shouldNotGenerateWithPrefixGivenValidName() {
final String validName = "validName";
assertEquals(validName, NamedInternal.with(validName).orElseGenerateWithPrefix(null, "KSTREAM-MAP-")
);
}
}