MINOR: update Kafka Streams `Topology` JavaDocs (#18778)

Reviewers: Bill Bejeck <bill@confluent.io>
This commit is contained in:
Matthias J. Sax 2025-02-05 20:24:14 -08:00 committed by GitHub
parent 8be2a8ed4e
commit 9774635bfd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 552 additions and 841 deletions

File diff suppressed because it is too large Load Diff

View File

@ -18,6 +18,7 @@ package org.apache.kafka.streams.internals;
import java.time.Duration; import java.time.Duration;
import java.time.Instant; import java.time.Instant;
import java.util.Objects;
import java.util.function.Supplier; import java.util.function.Supplier;
import static java.lang.String.format; import static java.lang.String.format;
@ -80,9 +81,11 @@ public final class ApiUtils {
/** /**
* @throws IllegalArgumentException if the same instance is obtained each time * @throws IllegalArgumentException if the same instance is obtained each time
*/ */
public static void checkSupplier(final Supplier<?> supplier) { public static void checkSupplier(final Supplier<?> processorSupplier) {
if (supplier.get() == supplier.get()) { Objects.requireNonNull(processorSupplier, "processorSupplier cannot be null");
final String supplierClass = supplier.getClass().getName();
if (processorSupplier.get() == processorSupplier.get()) {
final String supplierClass = processorSupplier.getClass().getName();
throw new IllegalArgumentException(String.format("%s generates single reference." + throw new IllegalArgumentException(String.format("%s generates single reference." +
" %s#get() must return a new object each time it is called.", supplierClass, supplierClass)); " %s#get() must return a new object each time it is called.", supplierClass, supplierClass));
} }

View File

@ -1237,10 +1237,9 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
final Named named, final Named named,
final String... stateStoreNames final String... stateStoreNames
) { ) {
Objects.requireNonNull(processorSupplier, "processorSupplier can't be null"); ApiUtils.checkSupplier(processorSupplier);
Objects.requireNonNull(named, "named can't be null"); Objects.requireNonNull(named, "named can't be null");
Objects.requireNonNull(stateStoreNames, "stateStoreNames can't be a null array"); Objects.requireNonNull(stateStoreNames, "stateStoreNames can't be a null array");
ApiUtils.checkSupplier(processorSupplier);
for (final String stateStoreName : stateStoreNames) { for (final String stateStoreName : stateStoreNames) {
Objects.requireNonNull(stateStoreName, "stateStoreNames can't be null"); Objects.requireNonNull(stateStoreName, "stateStoreNames can't be null");
} }
@ -1282,10 +1281,9 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
final Named named, final Named named,
final String... stateStoreNames final String... stateStoreNames
) { ) {
Objects.requireNonNull(processorSupplier, "processorSupplier can't be null"); ApiUtils.checkSupplier(processorSupplier);
Objects.requireNonNull(named, "named can't be null"); Objects.requireNonNull(named, "named can't be null");
Objects.requireNonNull(stateStoreNames, "stateStoreNames can't be a null array"); Objects.requireNonNull(stateStoreNames, "stateStoreNames can't be a null array");
ApiUtils.checkSupplier(processorSupplier);
for (final String stateStoreName : stateStoreNames) { for (final String stateStoreName : stateStoreNames) {
Objects.requireNonNull(stateStoreName, "stateStoreNames can't be null"); Objects.requireNonNull(stateStoreName, "stateStoreNames can't be null");
} }

View File

@ -103,13 +103,16 @@ public class TableSourceNode<K, V> extends SourceGraphNode<K, V> {
consumedInternal().valueDeserializer(), consumedInternal().valueDeserializer(),
topicName); topicName);
processorParameters.addProcessorTo(topologyBuilder, new String[] {sourceName}); processorParameters.addProcessorTo(topologyBuilder, sourceName);
// if the KTableSource should not be materialized, stores will be null or empty // if the KTableSource should not be materialized, stores will be null or empty
final KTableSource<K, V> tableSource = (KTableSource<K, V>) processorParameters.processorSupplier(); final KTableSource<K, V> tableSource = (KTableSource<K, V>) processorParameters.processorSupplier();
if (tableSource.stores() != null) { if (tableSource.stores() != null) {
if (shouldReuseSourceTopicForChangelog) { if (shouldReuseSourceTopicForChangelog) {
// TODO: rewrite this part to use Topology.addReadOnlyStateStore() instead
// should allow to move off using `InternalTopologyBuilder` in favor of the public `Topology` API
tableSource.stores().forEach(store -> { tableSource.stores().forEach(store -> {
// connect the source topic as (read-only) changelog topic for fault-tolerance
store.withLoggingDisabled(); store.withLoggingDisabled();
topologyBuilder.connectSourceStoreAndTopic(store.name(), topicName); topologyBuilder.connectSourceStoreAndTopic(store.name(), topicName);
}); });

View File

@ -74,6 +74,8 @@ public class InternalTopologyBuilder {
} }
public InternalTopologyBuilder(final TopologyConfig topologyConfigs) { public InternalTopologyBuilder(final TopologyConfig topologyConfigs) {
Objects.requireNonNull(topologyConfigs, "topologyConfigs cannot be null");
this.topologyConfigs = topologyConfigs; this.topologyConfigs = topologyConfigs;
this.topologyName = topologyConfigs.topologyName; this.topologyName = topologyConfigs.topologyName;
@ -350,11 +352,11 @@ public class InternalTopologyBuilder {
private final Serializer<KIn> keySerializer; private final Serializer<KIn> keySerializer;
private final Serializer<VIn> valSerializer; private final Serializer<VIn> valSerializer;
private final StreamPartitioner<? super KIn, ? super VIn> partitioner; private final StreamPartitioner<? super KIn, ? super VIn> partitioner;
private final TopicNameExtractor<KIn, VIn> topicExtractor; private final TopicNameExtractor<? super KIn, ? super VIn> topicExtractor;
private SinkNodeFactory(final String name, private SinkNodeFactory(final String name,
final String[] predecessors, final String[] predecessors,
final TopicNameExtractor<KIn, VIn> topicExtractor, final TopicNameExtractor<? super KIn, ? super VIn> topicExtractor,
final Serializer<KIn> keySerializer, final Serializer<KIn> keySerializer,
final Serializer<VIn> valSerializer, final Serializer<VIn> valSerializer,
final StreamPartitioner<? super KIn, ? super VIn> partitioner) { final StreamPartitioner<? super KIn, ? super VIn> partitioner) {
@ -368,7 +370,7 @@ public class InternalTopologyBuilder {
@Override @Override
public ProcessorNode<KIn, VIn, Void, Void> build() { public ProcessorNode<KIn, VIn, Void, Void> build() {
if (topicExtractor instanceof StaticTopicNameExtractor) { if (topicExtractor instanceof StaticTopicNameExtractor) {
final String topic = ((StaticTopicNameExtractor<KIn, VIn>) topicExtractor).topicName; final String topic = ((StaticTopicNameExtractor<?, ?>) topicExtractor).topicName;
if (internalTopicNamesWithProperties.containsKey(topic)) { if (internalTopicNamesWithProperties.containsKey(topic)) {
// prefix the internal topic name with the application id // prefix the internal topic name with the application id
return new SinkNode<>(name, new StaticTopicNameExtractor<>(decorateTopic(topic)), keySerializer, valSerializer, partitioner); return new SinkNode<>(name, new StaticTopicNameExtractor<>(decorateTopic(topic)), keySerializer, valSerializer, partitioner);
@ -447,18 +449,23 @@ public class InternalTopologyBuilder {
return this; return this;
} }
private void verifyName(final String name) {
Objects.requireNonNull(name, "name cannot be null");
if (nodeFactories.containsKey(name)) {
throw new TopologyException("Processor " + name + " is already added.");
}
}
public final void addSource(final AutoOffsetResetInternal offsetReset, public final void addSource(final AutoOffsetResetInternal offsetReset,
final String name, final String name,
final TimestampExtractor timestampExtractor, final TimestampExtractor timestampExtractor,
final Deserializer<?> keyDeserializer, final Deserializer<?> keyDeserializer,
final Deserializer<?> valDeserializer, final Deserializer<?> valDeserializer,
final String... topics) { final String... topics) {
verifyName(name);
Objects.requireNonNull(topics, "topics cannot be null");
if (topics.length == 0) { if (topics.length == 0) {
throw new TopologyException("You must provide at least one topic"); throw new TopologyException("topics cannot be empty");
}
Objects.requireNonNull(name, "name must not be null");
if (nodeFactories.containsKey(name)) {
throw new TopologyException("Processor " + name + " is already added.");
} }
for (final String topic : topics) { for (final String topic : topics) {
@ -480,12 +487,8 @@ public class InternalTopologyBuilder {
final Deserializer<?> keyDeserializer, final Deserializer<?> keyDeserializer,
final Deserializer<?> valDeserializer, final Deserializer<?> valDeserializer,
final Pattern topicPattern) { final Pattern topicPattern) {
Objects.requireNonNull(topicPattern, "topicPattern can't be null"); verifyName(name);
Objects.requireNonNull(name, "name can't be null"); Objects.requireNonNull(topicPattern, "topicPattern cannot be null");
if (nodeFactories.containsKey(name)) {
throw new TopologyException("Processor " + name + " is already added.");
}
for (final String sourceTopicName : rawSourceTopicNames) { for (final String sourceTopicName : rawSourceTopicNames) {
if (topicPattern.matcher(sourceTopicName).matches()) { if (topicPattern.matcher(sourceTopicName).matches()) {
@ -507,46 +510,23 @@ public class InternalTopologyBuilder {
final Serializer<V> valSerializer, final Serializer<V> valSerializer,
final StreamPartitioner<? super K, ? super V> partitioner, final StreamPartitioner<? super K, ? super V> partitioner,
final String... predecessorNames) { final String... predecessorNames) {
Objects.requireNonNull(name, "name must not be null"); verifyName(name);
Objects.requireNonNull(topic, "topic must not be null"); Objects.requireNonNull(topic, "topic cannot be null");
Objects.requireNonNull(predecessorNames, "predecessor names must not be null"); verifyParents(name, predecessorNames);
if (predecessorNames.length == 0) {
throw new TopologyException("Sink " + name + " must have at least one parent");
}
addSink(name, new StaticTopicNameExtractor<>(topic), keySerializer, valSerializer, partitioner, predecessorNames); addSink(name, new StaticTopicNameExtractor<>(topic), keySerializer, valSerializer, partitioner, predecessorNames);
nodeToSinkTopic.put(name, topic); nodeToSinkTopic.put(name, topic);
nodeGroups = null;
} }
public final <K, V> void addSink(final String name, public final <K, V> void addSink(final String name,
final TopicNameExtractor<K, V> topicExtractor, final TopicNameExtractor<? super K, ? super V> topicExtractor,
final Serializer<K> keySerializer, final Serializer<K> keySerializer,
final Serializer<V> valSerializer, final Serializer<V> valSerializer,
final StreamPartitioner<? super K, ? super V> partitioner, final StreamPartitioner<? super K, ? super V> partitioner,
final String... predecessorNames) { final String... predecessorNames) {
Objects.requireNonNull(name, "name must not be null"); verifyName(name);
Objects.requireNonNull(topicExtractor, "topic extractor must not be null"); Objects.requireNonNull(topicExtractor, "topicExtractor cannot be null");
Objects.requireNonNull(predecessorNames, "predecessor names must not be null"); verifyParents(name, predecessorNames);
if (nodeFactories.containsKey(name)) {
throw new TopologyException("Processor " + name + " is already added.");
}
if (predecessorNames.length == 0) {
throw new TopologyException("Sink " + name + " must have at least one parent");
}
for (final String predecessor : predecessorNames) {
Objects.requireNonNull(predecessor, "predecessor name can't be null");
if (predecessor.equals(name)) {
throw new TopologyException("Processor " + name + " cannot be a predecessor of itself.");
}
if (!nodeFactories.containsKey(predecessor)) {
throw new TopologyException("Predecessor processor " + predecessor + " is not added yet.");
}
if (nodeToSinkTopic.containsKey(predecessor)) {
throw new TopologyException("Sink " + predecessor + " cannot be used a parent.");
}
}
nodeFactories.put(name, new SinkNodeFactory<>(name, predecessorNames, topicExtractor, keySerializer, valSerializer, partitioner)); nodeFactories.put(name, new SinkNodeFactory<>(name, predecessorNames, topicExtractor, keySerializer, valSerializer, partitioner));
nodeGrouper.add(name); nodeGrouper.add(name);
@ -554,66 +534,52 @@ public class InternalTopologyBuilder {
nodeGroups = null; nodeGroups = null;
} }
public final <KIn, VIn, KOut, VOut> void addProcessor(final String name, public final void addProcessor(final String name,
final ProcessorSupplier<KIn, VIn, KOut, VOut> supplier, final ProcessorSupplier<?, ?, ?, ?> processorSupplier,
final String... predecessorNames) { final String... predecessorNames) {
Objects.requireNonNull(name, "name must not be null"); verifyName(name);
Objects.requireNonNull(supplier, "supplier must not be null"); ApiUtils.checkSupplier(processorSupplier);
Objects.requireNonNull(predecessorNames, "predecessor names must not be null"); verifyParents(name, predecessorNames);
ApiUtils.checkSupplier(supplier);
if (nodeFactories.containsKey(name)) {
throw new TopologyException("Processor " + name + " is already added.");
}
if (predecessorNames.length == 0) {
throw new TopologyException("Processor " + name + " must have at least one parent");
}
for (final String predecessor : predecessorNames) { nodeFactories.put(name, new ProcessorNodeFactory<>(name, predecessorNames, processorSupplier));
Objects.requireNonNull(predecessor, "predecessor name must not be null");
if (predecessor.equals(name)) {
throw new TopologyException("Processor " + name + " cannot be a predecessor of itself.");
}
if (!nodeFactories.containsKey(predecessor)) {
throw new TopologyException("Predecessor processor " + predecessor + " is not added yet for " + name);
}
}
nodeFactories.put(name, new ProcessorNodeFactory<>(name, predecessorNames, supplier));
nodeGrouper.add(name); nodeGrouper.add(name);
nodeGrouper.unite(name, predecessorNames); nodeGrouper.unite(name, predecessorNames);
nodeGroups = null; nodeGroups = null;
} }
public final <KIn, VIn, VOut> void addProcessor(final String name, public final <KIn, VIn, VOut> void addProcessor(final String name,
final FixedKeyProcessorSupplier<KIn, VIn, VOut> supplier, final FixedKeyProcessorSupplier<KIn, VIn, VOut> processorSupplier,
final String... predecessorNames) { final String... predecessorNames) {
Objects.requireNonNull(name, "name must not be null"); verifyName(name);
Objects.requireNonNull(supplier, "supplier must not be null"); ApiUtils.checkSupplier(processorSupplier);
Objects.requireNonNull(predecessorNames, "predecessor names must not be null"); verifyParents(name, predecessorNames);
ApiUtils.checkSupplier(supplier);
if (nodeFactories.containsKey(name)) {
throw new TopologyException("Processor " + name + " is already added.");
}
if (predecessorNames.length == 0) {
throw new TopologyException("Processor " + name + " must have at least one parent");
}
for (final String predecessor : predecessorNames) { nodeFactories.put(name, new FixedKeyProcessorNodeFactory<>(name, predecessorNames, processorSupplier));
Objects.requireNonNull(predecessor, "predecessor name must not be null");
if (predecessor.equals(name)) {
throw new TopologyException("Processor " + name + " cannot be a predecessor of itself.");
}
if (!nodeFactories.containsKey(predecessor)) {
throw new TopologyException("Predecessor processor " + predecessor + " is not added yet for " + name);
}
}
nodeFactories.put(name, new FixedKeyProcessorNodeFactory<>(name, predecessorNames, supplier));
nodeGrouper.add(name); nodeGrouper.add(name);
nodeGrouper.unite(name, predecessorNames); nodeGrouper.unite(name, predecessorNames);
nodeGroups = null; nodeGroups = null;
} }
private void verifyParents(final String processorName, final String... predecessorNames) {
Objects.requireNonNull(predecessorNames, "predecessorNames must not be null");
if (predecessorNames.length == 0) {
throw new TopologyException("predecessorNames cannot be empty");
}
for (final String predecessor : predecessorNames) {
Objects.requireNonNull(predecessor, "predecessor name cannot be null");
if (!nodeFactories.containsKey(predecessor)) {
if (predecessor.equals(processorName)) {
throw new TopologyException("Predecessor " + predecessor + " is unknown (self-reference).");
}
throw new TopologyException("Predecessor " + predecessor + " is unknown.");
}
if (nodeToSinkTopic.containsKey(predecessor)) {
throw new TopologyException("Sink " + predecessor + " cannot be used a parent.");
}
}
}
public final void addStateStore(final StoreBuilder<?> storeBuilder, public final void addStateStore(final StoreBuilder<?> storeBuilder,
final String... processorNames) { final String... processorNames) {
addStateStore(StoreBuilderWrapper.wrapStoreBuilder(storeBuilder), false, processorNames); addStateStore(StoreBuilderWrapper.wrapStoreBuilder(storeBuilder), false, processorNames);
@ -640,10 +606,11 @@ public class InternalTopologyBuilder {
if (processorNames != null) { if (processorNames != null) {
for (final String processorName : processorNames) { for (final String processorName : processorNames) {
Objects.requireNonNull(processorName, "processor name must not be null"); Objects.requireNonNull(processorName, "processor cannot not be null");
connectProcessorAndStateStore(processorName, storeFactory.storeName()); connectProcessorAndStateStore(processorName, storeFactory.storeName());
} }
} }
nodeGroups = null; nodeGroups = null;
} }
@ -655,22 +622,33 @@ public class InternalTopologyBuilder {
final String processorName, final String processorName,
final ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier, final ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier,
final boolean reprocessOnRestore) { final boolean reprocessOnRestore) {
verifyName(sourceName);
Objects.requireNonNull(topic, "topic cannot be null");
validateTopicNotAlreadyRegistered(topic);
verifyName(processorName);
if (sourceName.equals(processorName)) {
throw new TopologyException("sourceName and processorName must be different.");
}
ApiUtils.checkSupplier(stateUpdateSupplier); ApiUtils.checkSupplier(stateUpdateSupplier);
final Set<StoreBuilder<?>> stores = stateUpdateSupplier.stores(); final Set<StoreBuilder<?>> stores = stateUpdateSupplier.stores();
if (stores == null || stores.size() != 1) { if (stores == null || stores.size() != 1) {
throw new IllegalArgumentException( throw new IllegalArgumentException(
"Global stores must pass in suppliers with exactly one store but got " + "Global stores must pass in suppliers with exactly one store but got " +
(stores != null ? stores.size() : 0)); (stores != null ? stores.size() : 0));
} }
final StoreFactory storeFactory = final StoreFactory storeFactory =
StoreBuilderWrapper.wrapStoreBuilder(stores.iterator().next()); StoreBuilderWrapper.wrapStoreBuilder(stores.iterator().next());
validateGlobalStoreArguments(sourceName,
topic, final String storeName = storeFactory.storeName();
processorName, if (stateFactories.containsKey(storeName)) {
stateUpdateSupplier, throw new TopologyException("A different StateStore has already been added with the name " + storeName);
storeFactory.storeName(), }
storeFactory.loggingEnabled()); if (globalStateBuilders.containsKey(storeName)) {
validateTopicNotAlreadyRegistered(topic); throw new TopologyException("A different GlobalStateStore has already been added with the name " + storeName);
}
final String[] topics = {topic}; final String[] topics = {topic};
final String[] predecessors = {sourceName}; final String[] predecessors = {sourceName};
@ -701,6 +679,8 @@ public class InternalTopologyBuilder {
nodeGrouper.add(processorName); nodeGrouper.add(processorName);
nodeGrouper.unite(processorName, predecessors); nodeGrouper.unite(processorName, predecessors);
globalStateBuilders.put(storeFactory.storeName(), storeFactory); globalStateBuilders.put(storeFactory.storeName(), storeFactory);
// connect the source topic as (read-only) changelog topic for fault-tolerance
storeFactory.withLoggingDisabled();
connectSourceStoreAndTopic(storeFactory.storeName(), topic); connectSourceStoreAndTopic(storeFactory.storeName(), topic);
nodeGroups = null; nodeGroups = null;
} }
@ -728,13 +708,21 @@ public class InternalTopologyBuilder {
public final void connectProcessorAndStateStores(final String processorName, public final void connectProcessorAndStateStores(final String processorName,
final String... stateStoreNames) { final String... stateStoreNames) {
Objects.requireNonNull(processorName, "processorName can't be null"); Objects.requireNonNull(processorName, "processorName cannot be null");
Objects.requireNonNull(stateStoreNames, "state store list must not be null"); Objects.requireNonNull(stateStoreNames, "stateStoreNames cannot null");
if (stateStoreNames.length == 0) { if (stateStoreNames.length == 0) {
throw new TopologyException("Must provide at least one state store name."); throw new TopologyException("stateStoreNames cannot be empty");
} }
if (nodeToSourceTopics.containsKey(processorName)
|| nodeToSourcePatterns.containsKey(processorName)
|| nodeToSinkTopic.containsKey(processorName)) {
throw new TopologyException("State stores cannot be connect to sources or sinks.");
}
for (final String stateStoreName : stateStoreNames) { for (final String stateStoreName : stateStoreNames) {
Objects.requireNonNull(stateStoreName, "state store name must not be null"); Objects.requireNonNull(stateStoreName, "state store name cannot be null");
connectProcessorAndStateStore(processorName, stateStoreName); connectProcessorAndStateStore(processorName, stateStoreName);
} }
nodeGroups = null; nodeGroups = null;
@ -810,36 +798,6 @@ public class InternalTopologyBuilder {
} }
} }
private void validateGlobalStoreArguments(final String sourceName,
final String topic,
final String processorName,
final ProcessorSupplier<?, ?, Void, Void> stateUpdateSupplier,
final String storeName,
final boolean loggingEnabled) {
Objects.requireNonNull(sourceName, "sourceName must not be null");
Objects.requireNonNull(topic, "topic must not be null");
Objects.requireNonNull(stateUpdateSupplier, "supplier must not be null");
Objects.requireNonNull(processorName, "processorName must not be null");
if (nodeFactories.containsKey(sourceName)) {
throw new TopologyException("Processor " + sourceName + " is already added.");
}
if (nodeFactories.containsKey(processorName)) {
throw new TopologyException("Processor " + processorName + " is already added.");
}
if (stateFactories.containsKey(storeName)) {
throw new TopologyException("A different StateStore has already been added with the name " + storeName);
}
if (globalStateBuilders.containsKey(storeName)) {
throw new TopologyException("A different GlobalStateStore has already been added with the name " + storeName);
}
if (loggingEnabled) {
throw new TopologyException("StateStore " + storeName + " for global table must not have logging enabled.");
}
if (sourceName.equals(processorName)) {
throw new TopologyException("sourceName and processorName must be different.");
}
}
private void connectProcessorAndStateStore(final String processorName, private void connectProcessorAndStateStore(final String processorName,
final String stateStoreName) { final String stateStoreName) {
if (globalStateBuilders.containsKey(stateStoreName)) { if (globalStateBuilders.containsKey(stateStoreName)) {
@ -878,7 +836,7 @@ public class InternalTopologyBuilder {
if (nodeFactory instanceof SourceNodeFactory) { if (nodeFactory instanceof SourceNodeFactory) {
sourceNodes.add((SourceNodeFactory<?, ?>) nodeFactory); sourceNodes.add((SourceNodeFactory<?, ?>) nodeFactory);
} else if (nodeFactory instanceof ProcessorNodeFactory) { } else if (nodeFactory instanceof ProcessorNodeFactory) {
sourceNodes.addAll(findSourcesForProcessorPredecessors(((ProcessorNodeFactory<?, ?, ?, ?>) nodeFactory).predecessors)); sourceNodes.addAll(findSourcesForProcessorPredecessors(nodeFactory.predecessors));
} }
} }
return sourceNodes; return sourceNodes;
@ -1346,14 +1304,12 @@ public class InternalTopologyBuilder {
} }
} }
private <S extends StateStore> InternalTopicConfig createChangelogTopicConfig(final StoreFactory factory, private InternalTopicConfig createChangelogTopicConfig(final StoreFactory factory,
final String name) { final String name) {
if (factory.isVersionedStore()) { if (factory.isVersionedStore()) {
final VersionedChangelogTopicConfig config = new VersionedChangelogTopicConfig(name, factory.logConfig(), factory.historyRetention()); return new VersionedChangelogTopicConfig(name, factory.logConfig(), factory.historyRetention());
return config;
} else if (factory.isWindowStore()) { } else if (factory.isWindowStore()) {
final WindowedChangelogTopicConfig config = new WindowedChangelogTopicConfig(name, factory.logConfig(), factory.retentionPeriod()); return new WindowedChangelogTopicConfig(name, factory.logConfig(), factory.retentionPeriod());
return config;
} else { } else {
return new UnwindowedUnversionedChangelogTopicConfig(name, factory.logConfig()); return new UnwindowedUnversionedChangelogTopicConfig(name, factory.logConfig());
} }
@ -1923,9 +1879,10 @@ public class InternalTopologyBuilder {
} }
public static final class Sink<K, V> extends AbstractNode implements TopologyDescription.Sink { public static final class Sink<K, V> extends AbstractNode implements TopologyDescription.Sink {
private final TopicNameExtractor<K, V> topicNameExtractor; private final TopicNameExtractor<? super K, ? super V> topicNameExtractor;
public Sink(final String name, public Sink(final String name,
final TopicNameExtractor<K, V> topicNameExtractor) { final TopicNameExtractor<? super K, ? super V> topicNameExtractor) {
super(name); super(name);
this.topicNameExtractor = topicNameExtractor; this.topicNameExtractor = topicNameExtractor;
} }
@ -1939,14 +1896,14 @@ public class InternalTopologyBuilder {
@Override @Override
public String topic() { public String topic() {
if (topicNameExtractor instanceof StaticTopicNameExtractor) { if (topicNameExtractor instanceof StaticTopicNameExtractor) {
return ((StaticTopicNameExtractor<K, V>) topicNameExtractor).topicName; return ((StaticTopicNameExtractor<?, ?>) topicNameExtractor).topicName;
} else { } else {
return null; return null;
} }
} }
@Override @Override
public TopicNameExtractor<K, V> topicNameExtractor() { public TopicNameExtractor<? super K, ? super V> topicNameExtractor() {
if (topicNameExtractor instanceof StaticTopicNameExtractor) { if (topicNameExtractor instanceof StaticTopicNameExtractor) {
return null; return null;
} else { } else {
@ -1968,7 +1925,6 @@ public class InternalTopologyBuilder {
+ nodeNames(predecessors); + nodeNames(predecessors);
} }
@SuppressWarnings("unchecked")
@Override @Override
public boolean equals(final Object o) { public boolean equals(final Object o) {
if (this == o) { if (this == o) {
@ -1978,7 +1934,7 @@ public class InternalTopologyBuilder {
return false; return false;
} }
final Sink<K, V> sink = (Sink<K, V>) o; final Sink<?, ?> sink = (Sink<?, ?>) o;
return name.equals(sink.name) return name.equals(sink.name)
&& topicNameExtractor.equals(sink.topicNameExtractor) && topicNameExtractor.equals(sink.topicNameExtractor)
&& predecessors.equals(sink.predecessors); && predecessors.equals(sink.predecessors);

View File

@ -30,13 +30,13 @@ public class SinkNode<KIn, VIn> extends ProcessorNode<KIn, VIn, Void, Void> {
private Serializer<KIn> keySerializer; private Serializer<KIn> keySerializer;
private Serializer<VIn> valSerializer; private Serializer<VIn> valSerializer;
private final TopicNameExtractor<KIn, VIn> topicExtractor; private final TopicNameExtractor<? super KIn, ? super VIn> topicExtractor;
private final StreamPartitioner<? super KIn, ? super VIn> partitioner; private final StreamPartitioner<? super KIn, ? super VIn> partitioner;
private InternalProcessorContext<Void, Void> context; private InternalProcessorContext<Void, Void> context;
SinkNode(final String name, SinkNode(final String name,
final TopicNameExtractor<KIn, VIn> topicExtractor, final TopicNameExtractor<? super KIn, ? super VIn> topicExtractor,
final Serializer<KIn> keySerializer, final Serializer<KIn> keySerializer,
final Serializer<VIn> valSerializer, final Serializer<VIn> valSerializer,
final StreamPartitioner<? super KIn, ? super VIn> partitioner) { final StreamPartitioner<? super KIn, ? super VIn> partitioner) {

View File

@ -423,10 +423,8 @@ public class TopologyTest {
} }
} }
@Deprecated // testing old PAPI
@Test @Test
public void shouldNotAllowToAddGlobalStoreWithSourceNameEqualsProcessorName() { public void shouldNotAllowToAddGlobalStoreWithSourceNameEqualsProcessorName() {
when(globalStoreBuilder.name()).thenReturn("anyName");
assertThrows(TopologyException.class, () -> topology.addGlobalStore( assertThrows(TopologyException.class, () -> topology.addGlobalStore(
globalStoreBuilder, globalStoreBuilder,
"sameName", "sameName",

View File

@ -507,7 +507,7 @@ public class KStreamImplTest {
public void shouldNotAllowNullGroupedOnGroupBy() { public void shouldNotAllowNullGroupedOnGroupBy() {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
NullPointerException.class, NullPointerException.class,
() -> testStream.groupBy((k, v) -> k, (Grouped<String, String>) null)); () -> testStream.groupBy((k, v) -> k, null));
assertThat(exception.getMessage(), equalTo("grouped can't be null")); assertThat(exception.getMessage(), equalTo("grouped can't be null"));
} }
@ -515,7 +515,7 @@ public class KStreamImplTest {
public void shouldNotAllowNullGroupedOnGroupByKey() { public void shouldNotAllowNullGroupedOnGroupByKey() {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
NullPointerException.class, NullPointerException.class,
() -> testStream.groupByKey((Grouped<String, String>) null)); () -> testStream.groupByKey(null));
assertThat(exception.getMessage(), equalTo("grouped can't be null")); assertThat(exception.getMessage(), equalTo("grouped can't be null"));
} }
@ -646,7 +646,7 @@ public class KStreamImplTest {
testStream, testStream,
MockValueJoiner.TOSTRING_JOINER, MockValueJoiner.TOSTRING_JOINER,
JoinWindows.of(ofMillis(10)), JoinWindows.of(ofMillis(10)),
(StreamJoined<String, String, String>) null)); null));
assertThat(exception.getMessage(), equalTo("streamJoined can't be null")); assertThat(exception.getMessage(), equalTo("streamJoined can't be null"));
} }
@ -746,7 +746,7 @@ public class KStreamImplTest {
testStream, testStream,
MockValueJoiner.TOSTRING_JOINER, MockValueJoiner.TOSTRING_JOINER,
JoinWindows.of(ofMillis(10)), JoinWindows.of(ofMillis(10)),
(StreamJoined<String, String, String>) null)); null));
assertThat(exception.getMessage(), equalTo("streamJoined can't be null")); assertThat(exception.getMessage(), equalTo("streamJoined can't be null"));
} }
@ -845,7 +845,7 @@ public class KStreamImplTest {
testStream, testStream,
MockValueJoiner.TOSTRING_JOINER, MockValueJoiner.TOSTRING_JOINER,
JoinWindows.of(ofMillis(10)), JoinWindows.of(ofMillis(10)),
(StreamJoined<String, String, String>) null)); null));
assertThat(exception.getMessage(), equalTo("streamJoined can't be null")); assertThat(exception.getMessage(), equalTo("streamJoined can't be null"));
} }
@ -1595,7 +1595,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
NullPointerException.class, NullPointerException.class,
() -> testStream.process((ProcessorSupplier<? super String, ? super String, Void, Void>) null)); () -> testStream.process((ProcessorSupplier<? super String, ? super String, Void, Void>) null));
assertThat(exception.getMessage(), equalTo("processorSupplier can't be null")); assertThat(exception.getMessage(), equalTo("processorSupplier cannot be null"));
} }
@Test @Test
@ -1604,7 +1604,7 @@ public class KStreamImplTest {
NullPointerException.class, NullPointerException.class,
() -> testStream.process((ProcessorSupplier<? super String, ? super String, Void, Void>) null, () -> testStream.process((ProcessorSupplier<? super String, ? super String, Void, Void>) null,
"storeName")); "storeName"));
assertThat(exception.getMessage(), equalTo("processorSupplier can't be null")); assertThat(exception.getMessage(), equalTo("processorSupplier cannot be null"));
} }
@Test @Test
@ -1613,7 +1613,7 @@ public class KStreamImplTest {
NullPointerException.class, NullPointerException.class,
() -> testStream.process((ProcessorSupplier<? super String, ? super String, Void, Void>) null, () -> testStream.process((ProcessorSupplier<? super String, ? super String, Void, Void>) null,
Named.as("processor"))); Named.as("processor")));
assertThat(exception.getMessage(), equalTo("processorSupplier can't be null")); assertThat(exception.getMessage(), equalTo("processorSupplier cannot be null"));
} }
@Test @Test
@ -1622,7 +1622,7 @@ public class KStreamImplTest {
NullPointerException.class, NullPointerException.class,
() -> testStream.process((ProcessorSupplier<? super String, ? super String, Void, Void>) null, () -> testStream.process((ProcessorSupplier<? super String, ? super String, Void, Void>) null,
Named.as("processor"), "stateStore")); Named.as("processor"), "stateStore"));
assertThat(exception.getMessage(), equalTo("processorSupplier can't be null")); assertThat(exception.getMessage(), equalTo("processorSupplier cannot be null"));
} }
@Test @Test
@ -1678,7 +1678,7 @@ public class KStreamImplTest {
final NullPointerException exception = assertThrows( final NullPointerException exception = assertThrows(
NullPointerException.class, NullPointerException.class,
() -> testStream.processValues((FixedKeyProcessorSupplier<? super String, ? super String, Void>) null)); () -> testStream.processValues((FixedKeyProcessorSupplier<? super String, ? super String, Void>) null));
assertThat(exception.getMessage(), equalTo("processorSupplier can't be null")); assertThat(exception.getMessage(), equalTo("processorSupplier cannot be null"));
} }
@Test @Test
@ -1687,7 +1687,7 @@ public class KStreamImplTest {
NullPointerException.class, NullPointerException.class,
() -> testStream.processValues((FixedKeyProcessorSupplier<? super String, ? super String, Void>) null, () -> testStream.processValues((FixedKeyProcessorSupplier<? super String, ? super String, Void>) null,
"storeName")); "storeName"));
assertThat(exception.getMessage(), equalTo("processorSupplier can't be null")); assertThat(exception.getMessage(), equalTo("processorSupplier cannot be null"));
} }
@Test @Test
@ -1696,7 +1696,7 @@ public class KStreamImplTest {
NullPointerException.class, NullPointerException.class,
() -> testStream.process((ProcessorSupplier<? super String, ? super String, Void, Void>) null, () -> testStream.process((ProcessorSupplier<? super String, ? super String, Void, Void>) null,
Named.as("processor"))); Named.as("processor")));
assertThat(exception.getMessage(), equalTo("processorSupplier can't be null")); assertThat(exception.getMessage(), equalTo("processorSupplier cannot be null"));
} }
@Test @Test
@ -1705,7 +1705,7 @@ public class KStreamImplTest {
NullPointerException.class, NullPointerException.class,
() -> testStream.process((ProcessorSupplier<? super String, ? super String, Void, Void>) null, () -> testStream.process((ProcessorSupplier<? super String, ? super String, Void, Void>) null,
Named.as("processor"), "stateStore")); Named.as("processor"), "stateStore"));
assertThat(exception.getMessage(), equalTo("processorSupplier can't be null")); assertThat(exception.getMessage(), equalTo("processorSupplier cannot be null"));
} }
@Test @Test

View File

@ -549,7 +549,7 @@ public class InternalTopologyBuilderTest {
new MockKeyValueStoreBuilder("testStore", false).withLoggingDisabled(); new MockKeyValueStoreBuilder("testStore", false).withLoggingDisabled();
builder.addGlobalStore( builder.addGlobalStore(
"global-store", "global-source",
null, null,
null, null,
null, null,
@ -562,11 +562,11 @@ public class InternalTopologyBuilderTest {
final TopologyException exception = assertThrows( final TopologyException exception = assertThrows(
TopologyException.class, TopologyException.class,
() -> builder.addGlobalStore( () -> builder.addGlobalStore(
"global-store-2", "global-source-2",
null, null,
null, null,
null, null,
"global-topic", "global-topic-2",
"global-processor-2", "global-processor-2",
new StoreDelegatingProcessorSupplier<>(new MockApiProcessorSupplier<>(), Set.of(secondGlobalBuilder)), new StoreDelegatingProcessorSupplier<>(new MockApiProcessorSupplier<>(), Set.of(secondGlobalBuilder)),
false false