KAFKA-10379: Implement the KIP-478 StreamBuilder#addGlobalStore() (#9148)

From KIP-478, implement the new StreamBuilder#addGlobalStore() overload
that takes a stateUpdateSupplier fully typed Processor<KIn, VIn, Void, Void>.

Where necessary, use the adapters to make the old APIs defer to the new ones,
as well as limiting the scope of this change set.

Reviewers: Boyang Chen <boyang@apache.org>
This commit is contained in:
John Roesler 2020-08-20 14:06:16 -05:00 committed by GitHub
parent 9cabc8c2b3
commit 88d4bc4641
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
44 changed files with 682 additions and 357 deletions

View File

@ -30,11 +30,12 @@ import org.apache.kafka.streams.kstream.ValueTransformer;
import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorAdapter;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.SourceNode;
import org.apache.kafka.streams.state.KeyValueStore;
@ -468,7 +469,7 @@ public class StreamsBuilder {
/**
* Adds a state store to the underlying {@link Topology}.
* <p>
* It is required to connect state stores to {@link Processor Processors}, {@link Transformer Transformers},
* It is required to connect state stores to {@link org.apache.kafka.streams.processor.Processor Processors}, {@link Transformer Transformers},
* or {@link ValueTransformer ValueTransformers} before they can be used.
*
* @param builder the builder used to obtain this state store {@link StateStore} instance
@ -482,7 +483,7 @@ public class StreamsBuilder {
}
/**
* @deprecated use {@link #addGlobalStore(StoreBuilder, String, Consumed, ProcessorSupplier)} instead
* @deprecated Use {@link #addGlobalStore(StoreBuilder, String, Consumed, ProcessorSupplier)} instead.
*/
@Deprecated
public synchronized <K, V> StreamsBuilder addGlobalStore(final StoreBuilder<?> storeBuilder,
@ -490,7 +491,7 @@ public class StreamsBuilder {
final String sourceName,
final Consumed<K, V> consumed,
final String processorName,
final ProcessorSupplier<K, V> stateUpdateSupplier) {
final org.apache.kafka.streams.processor.ProcessorSupplier<K, V> stateUpdateSupplier) {
Objects.requireNonNull(storeBuilder, "storeBuilder can't be null");
Objects.requireNonNull(consumed, "consumed can't be null");
internalStreamsBuilder.addGlobalStore(
@ -499,7 +500,7 @@ public class StreamsBuilder {
topic,
new ConsumedInternal<>(consumed),
processorName,
stateUpdateSupplier
() -> ProcessorAdapter.adapt(stateUpdateSupplier.get())
);
return this;
}
@ -512,27 +513,70 @@ public class StreamsBuilder {
* A {@link SourceNode} with the provided sourceName will be added to consume the data arriving from the partitions
* of the input topic.
* <p>
* The provided {@link ProcessorSupplier} will be used to create an {@link ProcessorNode} that will receive all
* The provided {@link org.apache.kafka.streams.processor.ProcessorSupplier} will be used to create an {@link ProcessorNode} that will receive all
* records forwarded from the {@link SourceNode}. NOTE: you should not use the {@code Processor} to insert transformed records into
* the global state store. This store uses the source topic as changelog and during restore will insert records directly
* from the source.
* This {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date.
* The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
* <p>
* It is not required to connect a global store to {@link Processor Processors}, {@link Transformer Transformers},
* It is not required to connect a global store to {@link org.apache.kafka.streams.processor.Processor Processors}, {@link Transformer Transformers},
* or {@link ValueTransformer ValueTransformer}; those have read-only access to all global stores by default.
*
* @param storeBuilder user defined {@link StoreBuilder}; can't be {@code null}
* @param topic the topic to source the data from
* @param consumed the instance of {@link Consumed} used to define optional parameters; can't be {@code null}
* @param stateUpdateSupplier the instance of {@link org.apache.kafka.streams.processor.ProcessorSupplier}
* @return itself
* @throws TopologyException if the processor of state is already registered
* @deprecated Since 2.7.0; use {@link #addGlobalStore(StoreBuilder, String, Consumed, ProcessorSupplier)} instead.
*/
@Deprecated
public synchronized <K, V> StreamsBuilder addGlobalStore(final StoreBuilder<?> storeBuilder,
final String topic,
final Consumed<K, V> consumed,
final org.apache.kafka.streams.processor.ProcessorSupplier<K, V> stateUpdateSupplier) {
Objects.requireNonNull(storeBuilder, "storeBuilder can't be null");
Objects.requireNonNull(consumed, "consumed can't be null");
internalStreamsBuilder.addGlobalStore(
storeBuilder,
topic,
new ConsumedInternal<>(consumed),
() -> ProcessorAdapter.adapt(stateUpdateSupplier.get())
);
return this;
}
/**
* Adds a global {@link StateStore} to the topology.
* The {@link StateStore} sources its data from all partitions of the provided input topic.
* There will be exactly one instance of this {@link StateStore} per Kafka Streams instance.
* <p>
* A {@link SourceNode} with the provided sourceName will be added to consume the data arriving from the partitions
* of the input topic.
* <p>
* The provided {@link ProcessorSupplier}} will be used to create an
* {@link Processor} that will receive all records forwarded from the {@link SourceNode}.
* NOTE: you should not use the {@link Processor} to insert transformed records into
* the global state store. This store uses the source topic as changelog and during restore will insert records directly
* from the source.
* This {@link Processor} should be used to keep the {@link StateStore} up-to-date.
* The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
* <p>
* It is not required to connect a global store to the {@link Processor Processors},
* {@link Transformer Transformers}, or {@link ValueTransformer ValueTransformer}; those have read-only access to all global stores by default.
*
* @param storeBuilder user defined {@link StoreBuilder}; can't be {@code null}
* @param topic the topic to source the data from
* @param consumed the instance of {@link Consumed} used to define optional parameters; can't be {@code null}
* @param stateUpdateSupplier the instance of {@link ProcessorSupplier}
* @return itself
* @throws TopologyException if the processor of state is already registered
*/
public synchronized <K, V> StreamsBuilder addGlobalStore(final StoreBuilder<?> storeBuilder,
final String topic,
final Consumed<K, V> consumed,
final ProcessorSupplier<K, V> stateUpdateSupplier) {
public synchronized <KIn, VIn> StreamsBuilder addGlobalStore(final StoreBuilder<?> storeBuilder,
final String topic,
final Consumed<KIn, VIn> consumed,
final ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) {
Objects.requireNonNull(storeBuilder, "storeBuilder can't be null");
Objects.requireNonNull(consumed, "consumed can't be null");
internalStreamsBuilder.addGlobalStore(

View File

@ -29,6 +29,7 @@ import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.TopicNameExtractor;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorAdapter;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.SinkNode;
@ -721,7 +722,7 @@ public class Topology {
valueDeserializer,
topic,
processorName,
stateUpdateSupplier
() -> ProcessorAdapter.adapt(stateUpdateSupplier.get())
);
return this;
}
@ -766,7 +767,7 @@ public class Topology {
valueDeserializer,
topic,
processorName,
stateUpdateSupplier
() -> ProcessorAdapter.adapt(stateUpdateSupplier.get())
);
return this;
}

View File

@ -31,7 +31,6 @@ import org.apache.kafka.streams.kstream.internals.graph.StateStoreNode;
import org.apache.kafka.streams.kstream.internals.graph.StreamSourceNode;
import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
import org.apache.kafka.streams.kstream.internals.graph.TableSourceNode;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
@ -201,12 +200,12 @@ public class InternalStreamsBuilder implements InternalNameProvider {
addGraphNode(root, new StateStoreNode<>(builder));
}
public synchronized <K, V> void addGlobalStore(final StoreBuilder<?> storeBuilder,
final String sourceName,
final String topic,
final ConsumedInternal<K, V> consumed,
final String processorName,
final ProcessorSupplier<K, V> stateUpdateSupplier) {
public synchronized <KIn, VIn> void addGlobalStore(final StoreBuilder<?> storeBuilder,
final String sourceName,
final String topic,
final ConsumedInternal<KIn, VIn> consumed,
final String processorName,
final org.apache.kafka.streams.processor.api.ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) {
final StreamsGraphNode globalStoreNode = new GlobalStoreNode<>(
storeBuilder,
@ -220,10 +219,10 @@ public class InternalStreamsBuilder implements InternalNameProvider {
addGraphNode(root, globalStoreNode);
}
public synchronized <K, V> void addGlobalStore(final StoreBuilder<?> storeBuilder,
final String topic,
final ConsumedInternal<K, V> consumed,
final ProcessorSupplier<K, V> stateUpdateSupplier) {
public synchronized <KIn, VIn> void addGlobalStore(final StoreBuilder<?> storeBuilder,
final String topic,
final ConsumedInternal<KIn, VIn> consumed,
final org.apache.kafka.streams.processor.api.ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) {
// explicitly disable logging for global stores
storeBuilder.withLoggingDisabled();
final String sourceName = newProcessorName(KStreamImpl.SOURCE_NAME);

View File

@ -17,26 +17,26 @@
package org.apache.kafka.streams.kstream.internals.graph;
import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.state.StoreBuilder;
public class GlobalStoreNode<K, V, S extends StateStore> extends StateStoreNode<S> {
public class GlobalStoreNode<KIn, VIn, S extends StateStore> extends StateStoreNode<S> {
private final String sourceName;
private final String topic;
private final ConsumedInternal<K, V> consumed;
private final ConsumedInternal<KIn, VIn> consumed;
private final String processorName;
private final ProcessorSupplier<K, V> stateUpdateSupplier;
private final ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier;
public GlobalStoreNode(final StoreBuilder<S> storeBuilder,
final String sourceName,
final String topic,
final ConsumedInternal<K, V> consumed,
final ConsumedInternal<KIn, VIn> consumed,
final String processorName,
final ProcessorSupplier<K, V> stateUpdateSupplier) {
final ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) {
super(storeBuilder);
this.sourceName = sourceName;

View File

@ -23,6 +23,7 @@ import org.apache.kafka.streams.kstream.internals.KTableSource;
import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
import org.apache.kafka.streams.kstream.internals.TimestampedKeyValueStoreMaterializer;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorAdapter;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
@ -89,14 +90,16 @@ public class TableSourceNode<K, V> extends StreamSourceNode<K, V> {
new TimestampedKeyValueStoreMaterializer<>((MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>>) materializedInternal).materialize();
if (isGlobalKTable) {
topologyBuilder.addGlobalStore(storeBuilder,
sourceName,
consumedInternal().timestampExtractor(),
consumedInternal().keyDeserializer(),
consumedInternal().valueDeserializer(),
topicName,
processorParameters.processorName(),
processorParameters.processorSupplier());
topologyBuilder.addGlobalStore(
storeBuilder,
sourceName,
consumedInternal().timestampExtractor(),
consumedInternal().keyDeserializer(),
consumedInternal().valueDeserializer(),
topicName,
processorParameters.processorName(),
() -> ProcessorAdapter.adapt(processorParameters.processorSupplier().get())
);
} else {
topologyBuilder.addSource(consumedInternal().offsetResetPolicy(),
sourceName,

View File

@ -22,11 +22,11 @@ import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.TopicNameExtractor;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.internals.SessionStoreBuilder;
import org.apache.kafka.streams.state.internals.TimestampedWindowStoreBuilder;
@ -192,16 +192,23 @@ public class InternalTopologyBuilder {
}
private static class ProcessorNodeFactory<KIn, VIn, KOut, VOut> extends NodeFactory<KIn, VIn, KOut, VOut> {
private final ProcessorSupplier<KIn, VIn> supplier;
private final ProcessorSupplier<KIn, VIn, KOut, VOut> supplier;
private final Set<String> stateStoreNames = new HashSet<>();
ProcessorNodeFactory(final String name,
final String[] predecessors,
final ProcessorSupplier<KIn, VIn> supplier) {
final ProcessorSupplier<KIn, VIn, KOut, VOut> supplier) {
super(name, predecessors.clone());
this.supplier = supplier;
}
ProcessorNodeFactory(final String name,
final String[] predecessors,
final org.apache.kafka.streams.processor.ProcessorSupplier<KIn, VIn> supplier) {
super(name, predecessors.clone());
this.supplier = () -> ProcessorAdapter.adapt(supplier.get());
}
public void addStateStore(final String stateStoreName) {
stateStoreNames.add(stateStoreName);
}
@ -476,7 +483,7 @@ public class InternalTopologyBuilder {
}
public final void addProcessor(final String name,
final ProcessorSupplier<?, ?> supplier,
final org.apache.kafka.streams.processor.ProcessorSupplier<?, ?> supplier,
final String... predecessorNames) {
Objects.requireNonNull(name, "name must not be null");
Objects.requireNonNull(supplier, "supplier must not be null");
@ -532,14 +539,14 @@ public class InternalTopologyBuilder {
nodeGroups = null;
}
public final <KIn, VIn, KOut, VOut> void addGlobalStore(final StoreBuilder<?> storeBuilder,
final String sourceName,
final TimestampExtractor timestampExtractor,
final Deserializer<KIn> keyDeserializer,
final Deserializer<VIn> valueDeserializer,
final String topic,
final String processorName,
final ProcessorSupplier<KIn, VIn> stateUpdateSupplier) {
public final <KIn, VIn> void addGlobalStore(final StoreBuilder<?> storeBuilder,
final String sourceName,
final TimestampExtractor timestampExtractor,
final Deserializer<KIn> keyDeserializer,
final Deserializer<VIn> valueDeserializer,
final String topic,
final String processorName,
final ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) {
Objects.requireNonNull(storeBuilder, "store builder must not be null");
validateGlobalStoreArguments(sourceName,
topic,
@ -552,7 +559,7 @@ public class InternalTopologyBuilder {
final String[] topics = {topic};
final String[] predecessors = {sourceName};
final ProcessorNodeFactory<KIn, VIn, KOut, VOut> nodeFactory = new ProcessorNodeFactory<>(
final ProcessorNodeFactory<KIn, VIn, Void, Void> nodeFactory = new ProcessorNodeFactory<>(
processorName,
predecessors,
stateUpdateSupplier
@ -667,7 +674,7 @@ public class InternalTopologyBuilder {
private void validateGlobalStoreArguments(final String sourceName,
final String topic,
final String processorName,
final ProcessorSupplier<?, ?> stateUpdateSupplier,
final ProcessorSupplier<?, ?, Void, Void> stateUpdateSupplier,
final String storeName,
final boolean loggingEnabled) {
Objects.requireNonNull(sourceName, "sourceName must not be null");

View File

@ -23,7 +23,7 @@ import org.apache.kafka.streams.processor.api.ProcessorContext;
public final class ProcessorAdapter<KIn, VIn, KOut, VOut> implements Processor<KIn, VIn, KOut, VOut> {
private final org.apache.kafka.streams.processor.Processor<KIn, VIn> delegate;
static <KIn, VIn, KOut, VOut> Processor<KIn, VIn, KOut, VOut> adapt(final org.apache.kafka.streams.processor.Processor<KIn, VIn> delegate) {
public static <KIn, VIn, KOut, VOut> Processor<KIn, VIn, KOut, VOut> adapt(final org.apache.kafka.streams.processor.Processor<KIn, VIn> delegate) {
if (delegate == null) {
return null;
} else {

View File

@ -41,7 +41,7 @@ public final class ProcessorContextAdapter<KForward, VForward>
private final InternalProcessorContext delegate;
@SuppressWarnings("unchecked")
static <KForward, VForward> InternalApiProcessorContext<KForward, VForward> shim(final InternalProcessorContext delegate) {
public static <KForward, VForward> InternalApiProcessorContext<KForward, VForward> adapt(final InternalProcessorContext delegate) {
if (delegate instanceof ProcessorContextReverseAdapter) {
return (InternalApiProcessorContext<KForward, VForward>) ((ProcessorContextReverseAdapter) delegate).delegate();
} else {

View File

@ -54,7 +54,19 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> {
private Sensor createSensor;
public ProcessorNode(final String name) {
this(name, null, null);
this(name, (Processor<KIn, VIn, KOut, VOut>) null, null);
}
public ProcessorNode(final String name,
final Processor<KIn, VIn, KOut, VOut> processor,
final Set<String> stateStores) {
this.name = name;
this.processor = processor;
this.children = new ArrayList<>();
this.childByName = new HashMap<>();
this.stateStores = stateStores;
this.time = new SystemTime();
}
public ProcessorNode(final String name,
@ -97,7 +109,7 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> {
maybeMeasureLatency(
() -> {
if (processor != null) {
processor.init(ProcessorContextAdapter.shim(context));
processor.init(ProcessorContextAdapter.adapt(context));
}
},
time,

View File

@ -37,10 +37,13 @@ import org.apache.kafka.streams.kstream.StreamJoined;
import org.apache.kafka.streams.kstream.ValueTransformer;
import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.MockPredicate;
import org.apache.kafka.test.MockProcessorSupplier;
@ -60,23 +63,64 @@ import java.util.Properties;
import static java.util.Arrays.asList;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class StreamsBuilderTest {
private static final String STREAM_TOPIC = "stream-topic";
private static final String STREAM_TOPIC = "stream-topic";
private static final String STREAM_OPERATION_NAME = "stream-operation";
private static final String STREAM_TOPIC_TWO = "stream-topic-two";
private static final String TABLE_TOPIC = "table-topic";
private static final String TABLE_TOPIC = "table-topic";
private final StreamsBuilder builder = new StreamsBuilder();
private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
@Test
public void shouldAddGlobalStore() {
final Properties properties = new Properties();
properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "dummy");
properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy");
final StreamsBuilder builder = new StreamsBuilder();
builder.addGlobalStore(
Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore("store"),
Serdes.String(),
Serdes.String()
),
"topic",
Consumed.with(Serdes.String(), Serdes.String()),
() -> new Processor<String, String, Void, Void>() {
private KeyValueStore<String, String> store;
@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext<Void, Void> context) {
store = (KeyValueStore<String, String>) context.getStateStore("store");
}
@Override
public void process(final String key, final String value) {
store.put(key, value);
}
}
);
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), properties)) {
final TestInputTopic<String, String> inputTopic =
driver.createInputTopic("topic", new StringSerializer(), new StringSerializer());
inputTopic.pipeInput("hey", "there");
final KeyValueStore<String, String> store = driver.getKeyValueStore("store");
final String hey = store.get("hey");
assertThat(hey, is("there"));
}
}
@Test
public void shouldNotThrowNullPointerIfOptimizationsNotSpecified() {
final Properties properties = new Properties();
@ -256,13 +300,13 @@ public class StreamsBuilderTest {
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
final TestInputTopic<String, String> inputTopic =
driver.createInputTopic("topic-source", new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
driver.createInputTopic("topic-source", new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
inputTopic.pipeInput("A", "aa");
}
// no exception was thrown
assertEquals(Collections.singletonList(new KeyValueTimestamp<>("A", "aa", 0)),
processorSupplier.theCapturedProcessor().processed);
processorSupplier.theCapturedProcessor().processed());
}
@Deprecated
@ -279,12 +323,12 @@ public class StreamsBuilderTest {
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
final TestInputTopic<String, String> inputTopic =
driver.createInputTopic("topic-source", new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
driver.createInputTopic("topic-source", new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
inputTopic.pipeInput("A", "aa");
}
assertEquals(Collections.singletonList(new KeyValueTimestamp<>("A", "aa", 0)), sourceProcessorSupplier.theCapturedProcessor().processed);
assertEquals(Collections.singletonList(new KeyValueTimestamp<>("A", "aa", 0)), throughProcessorSupplier.theCapturedProcessor().processed);
assertEquals(Collections.singletonList(new KeyValueTimestamp<>("A", "aa", 0)), sourceProcessorSupplier.theCapturedProcessor().processed());
assertEquals(Collections.singletonList(new KeyValueTimestamp<>("A", "aa", 0)), throughProcessorSupplier.theCapturedProcessor().processed());
}
@Test
@ -304,8 +348,8 @@ public class StreamsBuilderTest {
inputTopic.pipeInput("A", "aa");
}
assertEquals(Collections.singletonList(new KeyValueTimestamp<>("A", "aa", 0)), sourceProcessorSupplier.theCapturedProcessor().processed);
assertEquals(Collections.singletonList(new KeyValueTimestamp<>("A", "aa", 0)), throughProcessorSupplier.theCapturedProcessor().processed);
assertEquals(Collections.singletonList(new KeyValueTimestamp<>("A", "aa", 0)), sourceProcessorSupplier.theCapturedProcessor().processed());
assertEquals(Collections.singletonList(new KeyValueTimestamp<>("A", "aa", 0)), throughProcessorSupplier.theCapturedProcessor().processed());
}
@Test
@ -322,9 +366,9 @@ public class StreamsBuilderTest {
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
final TestInputTopic<String, String> inputTopic1 =
driver.createInputTopic(topic1, new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
driver.createInputTopic(topic1, new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
final TestInputTopic<String, String> inputTopic2 =
driver.createInputTopic(topic2, new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
driver.createInputTopic(topic2, new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
inputTopic1.pipeInput("A", "aa");
inputTopic2.pipeInput("B", "bb");
@ -333,9 +377,9 @@ public class StreamsBuilderTest {
}
assertEquals(asList(new KeyValueTimestamp<>("A", "aa", 0),
new KeyValueTimestamp<>("B", "bb", 0),
new KeyValueTimestamp<>("C", "cc", 0),
new KeyValueTimestamp<>("D", "dd", 0)), processorSupplier.theCapturedProcessor().processed);
new KeyValueTimestamp<>("B", "bb", 0),
new KeyValueTimestamp<>("C", "cc", 0),
new KeyValueTimestamp<>("D", "dd", 0)), processorSupplier.theCapturedProcessor().processed());
}
@Test
@ -344,13 +388,13 @@ public class StreamsBuilderTest {
final String topic = "topic";
final ForeachAction<Long, String> action = results::put;
builder.table(topic, Materialized.<Long, String, KeyValueStore<Bytes, byte[]>>as("store")
.withKeySerde(Serdes.Long())
.withValueSerde(Serdes.String()))
.toStream().foreach(action);
.withKeySerde(Serdes.Long())
.withValueSerde(Serdes.String()))
.toStream().foreach(action);
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
final TestInputTopic<Long, String> inputTopic =
driver.createInputTopic(topic, new LongSerializer(), new StringSerializer());
driver.createInputTopic(topic, new LongSerializer(), new StringSerializer());
inputTopic.pipeInput(1L, "value1");
inputTopic.pipeInput(2L, "value2");
@ -366,12 +410,12 @@ public class StreamsBuilderTest {
public void shouldUseSerdesDefinedInMaterializedToConsumeGlobalTable() {
final String topic = "topic";
builder.globalTable(topic, Materialized.<Long, String, KeyValueStore<Bytes, byte[]>>as("store")
.withKeySerde(Serdes.Long())
.withValueSerde(Serdes.String()));
.withKeySerde(Serdes.Long())
.withValueSerde(Serdes.String()));
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
final TestInputTopic<Long, String> inputTopic =
driver.createInputTopic(topic, new LongSerializer(), new StringSerializer());
driver.createInputTopic(topic, new LongSerializer(), new StringSerializer());
inputTopic.pipeInput(1L, "value1");
inputTopic.pipeInput(2L, "value2");
final KeyValueStore<Long, String> store = driver.getKeyValueStore("store");
@ -438,7 +482,7 @@ public class StreamsBuilderTest {
internalTopologyBuilder.topicGroups().get(0).stateChangelogTopics.keySet(),
equalTo(Collections.singleton("appId-store-changelog")));
}
@Test(expected = TopologyException.class)
public void shouldThrowExceptionWhenNoTopicPresent() {
builder.stream(Collections.emptyList());
@ -471,11 +515,11 @@ public class StreamsBuilderTest {
final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).buildTopology();
assertNamesForOperation(
topology,
expected + "-source",
expected,
"KSTREAM-SOURCE-0000000004",
"KTABLE-SOURCE-0000000005");
topology,
expected + "-source",
expected,
"KSTREAM-SOURCE-0000000004",
"KTABLE-SOURCE-0000000005");
}
@Test
@ -487,9 +531,9 @@ public class StreamsBuilderTest {
final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).buildTopology();
assertNamesForStateStore(
topology.globalStateStores(),
"stream-topic-STATE-STORE-0000000000",
"stream-topic-two-STATE-STORE-0000000003"
topology.globalStateStores(),
"stream-topic-STATE-STORE-0000000000",
"stream-topic-two-STATE-STORE-0000000003"
);
}
@ -573,15 +617,15 @@ public class StreamsBuilderTest {
@SuppressWarnings("unchecked")
public void shouldUseSpecifiedNameForBranchOperation() {
builder.stream(STREAM_TOPIC)
.branch(Named.as("branch-processor"), (k, v) -> true, (k, v) -> false);
.branch(Named.as("branch-processor"), (k, v) -> true, (k, v) -> false);
builder.build();
final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).buildTopology();
assertNamesForOperation(topology,
"KSTREAM-SOURCE-0000000000",
"branch-processor",
"branch-processor-predicate-0",
"branch-processor-predicate-1");
"KSTREAM-SOURCE-0000000000",
"branch-processor",
"branch-processor-predicate-0",
"branch-processor-predicate-1");
}
@Test
@ -593,10 +637,10 @@ public class StreamsBuilderTest {
final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).buildTopology();
assertNamesForOperation(topology,
"KSTREAM-SOURCE-0000000000",
"KSTREAM-SOURCE-0000000002",
"KTABLE-SOURCE-0000000003",
STREAM_OPERATION_NAME);
"KSTREAM-SOURCE-0000000000",
"KSTREAM-SOURCE-0000000002",
"KTABLE-SOURCE-0000000003",
STREAM_OPERATION_NAME);
}
@Test
@ -608,10 +652,10 @@ public class StreamsBuilderTest {
final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).buildTopology();
assertNamesForOperation(topology,
"KSTREAM-SOURCE-0000000000",
"KSTREAM-SOURCE-0000000002",
"KTABLE-SOURCE-0000000003",
STREAM_OPERATION_NAME);
"KSTREAM-SOURCE-0000000000",
"KSTREAM-SOURCE-0000000002",
"KTABLE-SOURCE-0000000003",
STREAM_OPERATION_NAME);
}
@Test
@ -624,16 +668,16 @@ public class StreamsBuilderTest {
final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).buildTopology();
assertNamesForStateStore(topology.stateStores(),
STREAM_OPERATION_NAME + "-this-join-store", STREAM_OPERATION_NAME + "-outer-other-join-store"
STREAM_OPERATION_NAME + "-this-join-store", STREAM_OPERATION_NAME + "-outer-other-join-store"
);
assertNamesForOperation(topology,
"KSTREAM-SOURCE-0000000000",
"KSTREAM-SOURCE-0000000001",
STREAM_OPERATION_NAME + "-this-windowed",
STREAM_OPERATION_NAME + "-other-windowed",
STREAM_OPERATION_NAME + "-this-join",
STREAM_OPERATION_NAME + "-outer-other-join",
STREAM_OPERATION_NAME + "-merge");
"KSTREAM-SOURCE-0000000000",
"KSTREAM-SOURCE-0000000001",
STREAM_OPERATION_NAME + "-this-windowed",
STREAM_OPERATION_NAME + "-other-windowed",
STREAM_OPERATION_NAME + "-this-join",
STREAM_OPERATION_NAME + "-outer-other-join",
STREAM_OPERATION_NAME + "-merge");
}
@Test
@ -647,17 +691,17 @@ public class StreamsBuilderTest {
final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).buildTopology();
assertNamesForStateStore(topology.stateStores(),
"KSTREAM-JOINTHIS-0000000004-store",
"KSTREAM-OUTEROTHER-0000000005-store"
"KSTREAM-JOINTHIS-0000000004-store",
"KSTREAM-OUTEROTHER-0000000005-store"
);
assertNamesForOperation(topology,
"KSTREAM-SOURCE-0000000000",
"KSTREAM-SOURCE-0000000001",
STREAM_OPERATION_NAME + "-this-windowed",
STREAM_OPERATION_NAME + "-other-windowed",
STREAM_OPERATION_NAME + "-this-join",
STREAM_OPERATION_NAME + "-outer-other-join",
STREAM_OPERATION_NAME + "-merge");
"KSTREAM-SOURCE-0000000000",
"KSTREAM-SOURCE-0000000001",
STREAM_OPERATION_NAME + "-this-windowed",
STREAM_OPERATION_NAME + "-other-windowed",
STREAM_OPERATION_NAME + "-this-join",
STREAM_OPERATION_NAME + "-outer-other-join",
STREAM_OPERATION_NAME + "-merge");
}
@Test
@ -670,17 +714,17 @@ public class StreamsBuilderTest {
final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).buildTopology();
assertNamesForStateStore(topology.stateStores(),
STREAM_OPERATION_NAME + "-this-join-store",
STREAM_OPERATION_NAME + "-other-join-store"
STREAM_OPERATION_NAME + "-this-join-store",
STREAM_OPERATION_NAME + "-other-join-store"
);
assertNamesForOperation(topology,
"KSTREAM-SOURCE-0000000000",
"KSTREAM-SOURCE-0000000001",
STREAM_OPERATION_NAME + "-this-windowed",
STREAM_OPERATION_NAME + "-other-windowed",
STREAM_OPERATION_NAME + "-this-join",
STREAM_OPERATION_NAME + "-other-join",
STREAM_OPERATION_NAME + "-merge");
"KSTREAM-SOURCE-0000000000",
"KSTREAM-SOURCE-0000000001",
STREAM_OPERATION_NAME + "-this-windowed",
STREAM_OPERATION_NAME + "-other-windowed",
STREAM_OPERATION_NAME + "-this-join",
STREAM_OPERATION_NAME + "-other-join",
STREAM_OPERATION_NAME + "-merge");
}
@Test
@ -694,17 +738,17 @@ public class StreamsBuilderTest {
final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).buildTopology();
assertNamesForStateStore(topology.stateStores(),
"KSTREAM-JOINTHIS-0000000004-store",
"KSTREAM-JOINOTHER-0000000005-store"
"KSTREAM-JOINTHIS-0000000004-store",
"KSTREAM-JOINOTHER-0000000005-store"
);
assertNamesForOperation(topology,
"KSTREAM-SOURCE-0000000000",
"KSTREAM-SOURCE-0000000001",
STREAM_OPERATION_NAME + "-this-windowed",
STREAM_OPERATION_NAME + "-other-windowed",
STREAM_OPERATION_NAME + "-this-join",
STREAM_OPERATION_NAME + "-other-join",
STREAM_OPERATION_NAME + "-merge");
"KSTREAM-SOURCE-0000000000",
"KSTREAM-SOURCE-0000000001",
STREAM_OPERATION_NAME + "-this-windowed",
STREAM_OPERATION_NAME + "-other-windowed",
STREAM_OPERATION_NAME + "-this-join",
STREAM_OPERATION_NAME + "-other-join",
STREAM_OPERATION_NAME + "-merge");
}
@Test
@ -716,16 +760,16 @@ public class StreamsBuilderTest {
builder.build();
final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).buildTopology();
assertNamesForStateStore(topology.stateStores(),
STREAM_OPERATION_NAME + "-outer-this-join-store",
STREAM_OPERATION_NAME + "-outer-other-join-store");
STREAM_OPERATION_NAME + "-outer-this-join-store",
STREAM_OPERATION_NAME + "-outer-other-join-store");
assertNamesForOperation(topology,
"KSTREAM-SOURCE-0000000000",
"KSTREAM-SOURCE-0000000001",
STREAM_OPERATION_NAME + "-this-windowed",
STREAM_OPERATION_NAME + "-other-windowed",
STREAM_OPERATION_NAME + "-outer-this-join",
STREAM_OPERATION_NAME + "-outer-other-join",
STREAM_OPERATION_NAME + "-merge");
"KSTREAM-SOURCE-0000000000",
"KSTREAM-SOURCE-0000000001",
STREAM_OPERATION_NAME + "-this-windowed",
STREAM_OPERATION_NAME + "-other-windowed",
STREAM_OPERATION_NAME + "-outer-this-join",
STREAM_OPERATION_NAME + "-outer-other-join",
STREAM_OPERATION_NAME + "-merge");
}
@ -740,17 +784,17 @@ public class StreamsBuilderTest {
final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).buildTopology();
assertNamesForStateStore(topology.stateStores(),
"KSTREAM-OUTERTHIS-0000000004-store",
"KSTREAM-OUTEROTHER-0000000005-store"
"KSTREAM-OUTERTHIS-0000000004-store",
"KSTREAM-OUTEROTHER-0000000005-store"
);
assertNamesForOperation(topology,
"KSTREAM-SOURCE-0000000000",
"KSTREAM-SOURCE-0000000001",
STREAM_OPERATION_NAME + "-this-windowed",
STREAM_OPERATION_NAME + "-other-windowed",
STREAM_OPERATION_NAME + "-outer-this-join",
STREAM_OPERATION_NAME + "-outer-other-join",
STREAM_OPERATION_NAME + "-merge");
"KSTREAM-SOURCE-0000000000",
"KSTREAM-SOURCE-0000000001",
STREAM_OPERATION_NAME + "-this-windowed",
STREAM_OPERATION_NAME + "-other-windowed",
STREAM_OPERATION_NAME + "-outer-this-join",
STREAM_OPERATION_NAME + "-outer-other-join",
STREAM_OPERATION_NAME + "-merge");
}
@ -770,7 +814,7 @@ public class StreamsBuilderTest {
@Test
public void shouldUseSpecifiedNameForProcessOperation() {
builder.stream(STREAM_TOPIC)
.process(() -> null, Named.as("test-processor"));
.process(() -> null, Named.as("test-processor"));
builder.build();
final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).buildTopology();
@ -806,28 +850,28 @@ public class StreamsBuilderTest {
@Test
public void shouldUseSpecifiedNameForToStream() {
builder.table(STREAM_TOPIC)
.toStream(Named.as("to-stream"));
.toStream(Named.as("to-stream"));
builder.build();
final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).buildTopology();
assertNamesForOperation(topology,
"KSTREAM-SOURCE-0000000001",
"KTABLE-SOURCE-0000000002",
"to-stream");
"KSTREAM-SOURCE-0000000001",
"KTABLE-SOURCE-0000000002",
"to-stream");
}
@Test
public void shouldUseSpecifiedNameForToStreamWithMapper() {
builder.table(STREAM_TOPIC)
.toStream(KeyValue::pair, Named.as("to-stream"));
.toStream(KeyValue::pair, Named.as("to-stream"));
builder.build();
final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).buildTopology();
assertNamesForOperation(topology,
"KSTREAM-SOURCE-0000000001",
"KTABLE-SOURCE-0000000002",
"to-stream",
"KSTREAM-KEY-SELECT-0000000004");
"KSTREAM-SOURCE-0000000001",
"KTABLE-SOURCE-0000000002",
"to-stream",
"KSTREAM-KEY-SELECT-0000000004");
}
@Test
@ -838,7 +882,7 @@ public class StreamsBuilderTest {
assertNamesForStateStore(
topology.stateStores(),
STREAM_TOPIC + "-STATE-STORE-0000000000",
"KTABLE-AGGREGATE-STATE-STORE-0000000004");
"KTABLE-AGGREGATE-STATE-STORE-0000000004");
assertNamesForOperation(
topology,

View File

@ -138,8 +138,8 @@ public class GlobalKTableIntegrationTest {
return false;
}
final Map<String, ValueAndTimestamp<String>> result = new HashMap<>();
result.putAll(supplier.capturedProcessors(2).get(0).lastValueAndTimestampPerKey);
result.putAll(supplier.capturedProcessors(2).get(1).lastValueAndTimestampPerKey);
result.putAll(supplier.capturedProcessors(2).get(0).lastValueAndTimestampPerKey());
result.putAll(supplier.capturedProcessors(2).get(1).lastValueAndTimestampPerKey());
return result.equals(expected);
},
30000L,
@ -191,8 +191,8 @@ public class GlobalKTableIntegrationTest {
return false;
}
final Map<String, ValueAndTimestamp<String>> result = new HashMap<>();
result.putAll(supplier.capturedProcessors(2).get(0).lastValueAndTimestampPerKey);
result.putAll(supplier.capturedProcessors(2).get(1).lastValueAndTimestampPerKey);
result.putAll(supplier.capturedProcessors(2).get(0).lastValueAndTimestampPerKey());
result.putAll(supplier.capturedProcessors(2).get(1).lastValueAndTimestampPerKey());
return result.equals(expected);
},
30000L,
@ -220,8 +220,8 @@ public class GlobalKTableIntegrationTest {
return false;
}
final Map<String, ValueAndTimestamp<String>> result = new HashMap<>();
result.putAll(supplier.capturedProcessors(2).get(0).lastValueAndTimestampPerKey);
result.putAll(supplier.capturedProcessors(2).get(1).lastValueAndTimestampPerKey);
result.putAll(supplier.capturedProcessors(2).get(0).lastValueAndTimestampPerKey());
result.putAll(supplier.capturedProcessors(2).get(1).lastValueAndTimestampPerKey());
return result.equals(expected);
},
30000L,
@ -274,8 +274,8 @@ public class GlobalKTableIntegrationTest {
return false;
}
final Map<String, ValueAndTimestamp<String>> result = new HashMap<>();
result.putAll(supplier.capturedProcessors(2).get(0).lastValueAndTimestampPerKey);
result.putAll(supplier.capturedProcessors(2).get(1).lastValueAndTimestampPerKey);
result.putAll(supplier.capturedProcessors(2).get(0).lastValueAndTimestampPerKey());
result.putAll(supplier.capturedProcessors(2).get(1).lastValueAndTimestampPerKey());
return result.equals(expected);
},
30000L,

View File

@ -36,9 +36,8 @@ import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
@ -122,7 +121,8 @@ public class GlobalThreadShutDownOrderTest {
storeBuilder,
globalStoreTopic,
Consumed.with(Serdes.String(), Serdes.Long()),
new MockProcessorSupplier<>());
new MockApiProcessorSupplier<>()
);
builder
.stream(streamTopic, stringLongConsumed)

View File

@ -94,7 +94,7 @@ public class AbstractStreamTest {
inputTopic.pipeInput(expectedKey, "V" + expectedKey);
}
assertTrue(supplier.theCapturedProcessor().processed.size() <= expectedKeys.length);
assertTrue(supplier.theCapturedProcessor().processed().size() <= expectedKeys.length);
}
private static class ExtendedKStream<K, V> extends AbstractStream<K, V> {

View File

@ -101,6 +101,6 @@ public class GlobalKTableJoinsTest {
streamInputTopic.pipeInput("3", "c", 3L);
}
assertEquals(expected, supplier.theCapturedProcessor().lastValueAndTimestampPerKey);
assertEquals(expected, supplier.theCapturedProcessor().lastValueAndTimestampPerKey());
}
}

View File

@ -167,7 +167,7 @@ public class KGroupedStreamImplTest {
inputTopic.pipeInput("1", "1", 90);
}
final Map<Windowed<String>, ValueAndTimestamp<Integer>> result
= supplier.theCapturedProcessor().lastValueAndTimestampPerKey;
= supplier.theCapturedProcessor().lastValueAndTimestampPerKey();
assertEquals(
ValueAndTimestamp.make(2, 30L),
result.get(new Windowed<>("1", new SessionWindow(10L, 30L))));
@ -225,7 +225,7 @@ public class KGroupedStreamImplTest {
inputTopic.pipeInput("1", "1", 90);
}
final Map<Windowed<String>, ValueAndTimestamp<Long>> result =
supplier.theCapturedProcessor().lastValueAndTimestampPerKey;
supplier.theCapturedProcessor().lastValueAndTimestampPerKey();
assertEquals(
ValueAndTimestamp.make(2L, 30L),
result.get(new Windowed<>("1", new SessionWindow(10L, 30L))));
@ -271,7 +271,7 @@ public class KGroupedStreamImplTest {
inputTopic.pipeInput("1", "C", 90);
}
final Map<Windowed<String>, ValueAndTimestamp<String>> result =
supplier.theCapturedProcessor().lastValueAndTimestampPerKey;
supplier.theCapturedProcessor().lastValueAndTimestampPerKey();
assertEquals(
ValueAndTimestamp.make("A:B", 30L),
result.get(new Windowed<>("1", new SessionWindow(10L, 30L))));
@ -582,13 +582,13 @@ public class KGroupedStreamImplTest {
processData(driver);
assertThat(
supplier.theCapturedProcessor().lastValueAndTimestampPerKey.get("1"),
supplier.theCapturedProcessor().lastValueAndTimestampPerKey().get("1"),
equalTo(ValueAndTimestamp.make("0+A+C+D", 10L)));
assertThat(
supplier.theCapturedProcessor().lastValueAndTimestampPerKey.get("2"),
supplier.theCapturedProcessor().lastValueAndTimestampPerKey().get("2"),
equalTo(ValueAndTimestamp.make("0+B", 1L)));
assertThat(
supplier.theCapturedProcessor().lastValueAndTimestampPerKey.get("3"),
supplier.theCapturedProcessor().lastValueAndTimestampPerKey().get("3"),
equalTo(ValueAndTimestamp.make("0+E+F", 9L)));
}
}
@ -622,7 +622,7 @@ public class KGroupedStreamImplTest {
inputTopic.pipeInput("2", "B", 500L);
inputTopic.pipeInput("3", "B", 100L);
}
assertThat(supplier.theCapturedProcessor().processed, equalTo(Arrays.asList(
assertThat(supplier.theCapturedProcessor().processed(), equalTo(Arrays.asList(
new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(0L, 500L)), 1L, 0L),
new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(0L, 500L)), 2L, 499L),
new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(0L, 500L)), 3L, 499L),

View File

@ -173,7 +173,7 @@ public class KGroupedTableImplTest {
final MockProcessorSupplier<String, Integer> supplier = getReducedResults(reduced);
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
assertReduced(supplier.theCapturedProcessor().lastValueAndTimestampPerKey, topic, driver);
assertReduced(supplier.theCapturedProcessor().lastValueAndTimestampPerKey(), topic, driver);
assertEquals(reduced.queryableStoreName(), "reduced");
}
}
@ -195,7 +195,7 @@ public class KGroupedTableImplTest {
final MockProcessorSupplier<String, Integer> supplier = getReducedResults(reduced);
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
assertReduced(supplier.theCapturedProcessor().lastValueAndTimestampPerKey, topic, driver);
assertReduced(supplier.theCapturedProcessor().lastValueAndTimestampPerKey(), topic, driver);
assertNull(reduced.queryableStoreName());
}
}
@ -220,7 +220,7 @@ public class KGroupedTableImplTest {
final MockProcessorSupplier<String, Integer> supplier = getReducedResults(reduced);
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
assertReduced(supplier.theCapturedProcessor().lastValueAndTimestampPerKey, topic, driver);
assertReduced(supplier.theCapturedProcessor().lastValueAndTimestampPerKey(), topic, driver);
{
final KeyValueStore<String, Integer> reduce = driver.getKeyValueStore("reduce");
assertThat(reduce.get("A"), equalTo(5));

View File

@ -72,9 +72,9 @@ public class KStreamBranchTest {
}
final List<MockProcessor<Integer, String>> processors = supplier.capturedProcessors(3);
assertEquals(3, processors.get(0).processed.size());
assertEquals(1, processors.get(1).processed.size());
assertEquals(2, processors.get(2).processed.size());
assertEquals(3, processors.get(0).processed().size());
assertEquals(1, processors.get(1).processed().size());
assertEquals(2, processors.get(2).processed().size());
}
@SuppressWarnings("unchecked")

View File

@ -58,7 +58,7 @@ public class KStreamFilterTest {
}
}
assertEquals(2, supplier.theCapturedProcessor().processed.size());
assertEquals(2, supplier.theCapturedProcessor().processed().size());
}
@Test
@ -79,7 +79,7 @@ public class KStreamFilterTest {
}
}
assertEquals(5, supplier.theCapturedProcessor().processed.size());
assertEquals(5, supplier.theCapturedProcessor().processed().size());
}
@Test

View File

@ -72,7 +72,7 @@ public class KStreamFlatMapTest {
}
}
assertEquals(6, supplier.theCapturedProcessor().processed.size());
assertEquals(6, supplier.theCapturedProcessor().processed().size());
final KeyValueTimestamp[] expected = {new KeyValueTimestamp<>("10", "V1", 0),
new KeyValueTimestamp<>("20", "V2", 0),
@ -82,7 +82,7 @@ public class KStreamFlatMapTest {
new KeyValueTimestamp<>("32", "V3", 0)};
for (int i = 0; i < expected.length; i++) {
assertEquals(expected[i], supplier.theCapturedProcessor().processed.get(i));
assertEquals(expected[i], supplier.theCapturedProcessor().processed().get(i));
}
}
}

View File

@ -73,7 +73,7 @@ public class KStreamFlatMapValuesTest {
new KeyValueTimestamp<>(2, "v2", 0), new KeyValueTimestamp<>(2, "V2", 0),
new KeyValueTimestamp<>(3, "v3", 0), new KeyValueTimestamp<>(3, "V3", 0)};
assertArrayEquals(expected, supplier.theCapturedProcessor().processed.toArray());
assertArrayEquals(expected, supplier.theCapturedProcessor().processed().toArray());
}
@ -114,6 +114,6 @@ public class KStreamFlatMapValuesTest {
new KeyValueTimestamp<>(3, "v3", 0),
new KeyValueTimestamp<>(3, "k3", 0)};
assertArrayEquals(expected, supplier.theCapturedProcessor().processed.toArray());
assertArrayEquals(expected, supplier.theCapturedProcessor().processed().toArray());
}
}

View File

@ -1504,7 +1504,7 @@ public class KStreamImplTest {
driver.createInputTopic(input, new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
inputTopic.pipeInput("a", "b");
}
assertThat(processorSupplier.theCapturedProcessor().processed, equalTo(Collections.singletonList(new KeyValueTimestamp<>("a", "b", 0))));
assertThat(processorSupplier.theCapturedProcessor().processed(), equalTo(Collections.singletonList(new KeyValueTimestamp<>("a", "b", 0))));
}
@Test
@ -1519,7 +1519,7 @@ public class KStreamImplTest {
driver.createInputTopic(input, new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
inputTopic.pipeInput("a", "b");
}
assertThat(processorSupplier.theCapturedProcessor().processed, equalTo(Collections.singletonList(new KeyValueTimestamp<>("a", "b", 0))));
assertThat(processorSupplier.theCapturedProcessor().processed(), equalTo(Collections.singletonList(new KeyValueTimestamp<>("a", "b", 0))));
}
@Test
@ -1535,7 +1535,7 @@ public class KStreamImplTest {
driver.createInputTopic(input, new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
inputTopic.pipeInput("e", "f");
}
assertThat(processorSupplier.theCapturedProcessor().processed, equalTo(Collections.singletonList(new KeyValueTimestamp<>("e", "f", 0))));
assertThat(processorSupplier.theCapturedProcessor().processed(), equalTo(Collections.singletonList(new KeyValueTimestamp<>("e", "f", 0))));
}
@Test
@ -1556,9 +1556,9 @@ public class KStreamImplTest {
inputTopic.pipeInput("b", "v1");
}
final List<MockProcessor<String, String>> mockProcessors = processorSupplier.capturedProcessors(2);
assertThat(mockProcessors.get(0).processed, equalTo(asList(new KeyValueTimestamp<>("a", "v1", 0),
assertThat(mockProcessors.get(0).processed(), equalTo(asList(new KeyValueTimestamp<>("a", "v1", 0),
new KeyValueTimestamp<>("a", "v2", 0))));
assertThat(mockProcessors.get(1).processed, equalTo(Collections.singletonList(new KeyValueTimestamp<>("b", "v1", 0))));
assertThat(mockProcessors.get(1).processed(), equalTo(Collections.singletonList(new KeyValueTimestamp<>("b", "v1", 0))));
}
@SuppressWarnings("deprecation") // specifically testing the deprecated variant
@ -1665,7 +1665,7 @@ public class KStreamImplTest {
assertEquals(asList(new KeyValueTimestamp<>("A", "aa", 0),
new KeyValueTimestamp<>("B", "bb", 0),
new KeyValueTimestamp<>("C", "cc", 0),
new KeyValueTimestamp<>("D", "dd", 0)), processorSupplier.theCapturedProcessor().processed);
new KeyValueTimestamp<>("D", "dd", 0)), processorSupplier.theCapturedProcessor().processed());
}
@Test
@ -1711,7 +1711,7 @@ public class KStreamImplTest {
new KeyValueTimestamp<>("F", "ff", 7),
new KeyValueTimestamp<>("G", "gg", 4),
new KeyValueTimestamp<>("H", "hh", 6)),
processorSupplier.theCapturedProcessor().processed);
processorSupplier.theCapturedProcessor().processed());
}
@Test
@ -1744,7 +1744,7 @@ public class KStreamImplTest {
new KeyValueTimestamp<>("C", "cc", 10),
new KeyValueTimestamp<>("D", "dd", 8),
new KeyValueTimestamp<>("E", "ee", 3)),
processorSupplier.theCapturedProcessor().processed);
processorSupplier.theCapturedProcessor().processed());
}
@Test
@ -1782,7 +1782,7 @@ public class KStreamImplTest {
new KeyValueTimestamp<>("C", "cc", 10),
new KeyValueTimestamp<>("D", "dd", 8),
new KeyValueTimestamp<>("E", "ee", 3)),
processorSupplier.theCapturedProcessor().processed);
processorSupplier.theCapturedProcessor().processed());
}
@Test

View File

@ -61,9 +61,9 @@ public class KStreamMapTest {
new KeyValueTimestamp<>("V1", 1, 9),
new KeyValueTimestamp<>("V2", 2, 8),
new KeyValueTimestamp<>("V3", 3, 7)};
assertEquals(4, supplier.theCapturedProcessor().processed.size());
assertEquals(4, supplier.theCapturedProcessor().processed().size());
for (int i = 0; i < expected.length; i++) {
assertEquals(expected[i], supplier.theCapturedProcessor().processed.get(i));
assertEquals(expected[i], supplier.theCapturedProcessor().processed().get(i));
}
}

View File

@ -60,7 +60,7 @@ public class KStreamMapValuesTest {
new KeyValueTimestamp<>(100, 3, 50),
new KeyValueTimestamp<>(1000, 4, 500)};
assertArrayEquals(expected, supplier.theCapturedProcessor().processed.toArray());
assertArrayEquals(expected, supplier.theCapturedProcessor().processed().toArray());
}
@Test
@ -86,7 +86,7 @@ public class KStreamMapValuesTest {
new KeyValueTimestamp<>(100, 103, 50),
new KeyValueTimestamp<>(1000, 1004, 500)};
assertArrayEquals(expected, supplier.theCapturedProcessor().processed.toArray());
assertArrayEquals(expected, supplier.theCapturedProcessor().processed().toArray());
}
}

View File

@ -68,9 +68,9 @@ public class KStreamSelectKeyTest {
}
}
assertEquals(3, supplier.theCapturedProcessor().processed.size());
assertEquals(3, supplier.theCapturedProcessor().processed().size());
for (int i = 0; i < expected.length; i++) {
assertEquals(expected[i], supplier.theCapturedProcessor().processed.get(i));
assertEquals(expected[i], supplier.theCapturedProcessor().processed().get(i));
}
}

View File

@ -106,9 +106,9 @@ public class KStreamTransformTest {
new KeyValueTimestamp<>(-1, 3, 3)
};
assertEquals(expected.length, processor.theCapturedProcessor().processed.size());
assertEquals(expected.length, processor.theCapturedProcessor().processed().size());
for (int i = 0; i < expected.length; i++) {
assertEquals(expected[i], processor.theCapturedProcessor().processed.get(i));
assertEquals(expected[i], processor.theCapturedProcessor().processed().get(i));
}
}
}
@ -158,7 +158,7 @@ public class KStreamTransformTest {
driver.advanceWallClockTime(Duration.ofMillis(1));
}
assertEquals(6, processor.theCapturedProcessor().processed.size());
assertEquals(6, processor.theCapturedProcessor().processed().size());
final KeyValueTimestamp[] expected = {new KeyValueTimestamp<>(2, 10, 0),
new KeyValueTimestamp<>(20, 110, 0),
@ -168,7 +168,7 @@ public class KStreamTransformTest {
new KeyValueTimestamp<>(-1, 3, 3)};
for (int i = 0; i < expected.length; i++) {
assertEquals(expected[i], processor.theCapturedProcessor().processed.get(i));
assertEquals(expected[i], processor.theCapturedProcessor().processed().get(i));
}
}

View File

@ -93,7 +93,7 @@ public class KStreamTransformValuesTest {
new KeyValueTimestamp<>(100, 1110, 50),
new KeyValueTimestamp<>(1000, 11110, 500)};
assertArrayEquals(expected, supplier.theCapturedProcessor().processed.toArray());
assertArrayEquals(expected, supplier.theCapturedProcessor().processed().toArray());
}
@Test
@ -135,7 +135,7 @@ public class KStreamTransformValuesTest {
new KeyValueTimestamp<>(100, 1221, 50),
new KeyValueTimestamp<>(1000, 12221, 500)};
assertArrayEquals(expected, supplier.theCapturedProcessor().processed.toArray());
assertArrayEquals(expected, supplier.theCapturedProcessor().processed().toArray());
}
@SuppressWarnings("unchecked")

View File

@ -140,7 +140,7 @@ public class KStreamWindowAggregateTest {
new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(5, 15)), "0+2+2+2+2+3", 13)
),
supplier.theCapturedProcessor().processed
supplier.theCapturedProcessor().processed()
);
}

View File

@ -108,7 +108,7 @@ public class KTableAggregateTest {
new KeyValueTimestamp<>("B", "0+2-2+4-4+7", 18L),
new KeyValueTimestamp<>("C", "0+5-5", 10L),
new KeyValueTimestamp<>("C", "0+5-5+8", 10L)),
supplier.theCapturedProcessor().processed);
supplier.theCapturedProcessor().processed());
}
}
@ -171,7 +171,7 @@ public class KTableAggregateTest {
new KeyValueTimestamp<>("4", "0+4", 23),
new KeyValueTimestamp<>("4", "0+4-4", 23),
new KeyValueTimestamp<>("7", "0+7", 22)),
supplier.theCapturedProcessor().processed);
supplier.theCapturedProcessor().processed());
}
}
@ -204,7 +204,7 @@ public class KTableAggregateTest {
new KeyValueTimestamp<>("blue", 1L, 12),
new KeyValueTimestamp<>("yellow", 1L, 15),
new KeyValueTimestamp<>("green", 2L, 12)),
supplier.theCapturedProcessor().processed);
supplier.theCapturedProcessor().processed());
}
}
@ -288,7 +288,7 @@ public class KTableAggregateTest {
new KeyValueTimestamp<>("1", "", 12),
new KeyValueTimestamp<>("1", "2", 12L)
),
proc.processed
proc.processed()
);
}
}

View File

@ -118,25 +118,25 @@ public class KTableImplTest {
new KeyValueTimestamp<>("C", "03", 0),
new KeyValueTimestamp<>("D", "04", 0),
new KeyValueTimestamp<>("A", "05", 10),
new KeyValueTimestamp<>("A", "06", 8)), processors.get(0).processed);
new KeyValueTimestamp<>("A", "06", 8)), processors.get(0).processed());
assertEquals(asList(new KeyValueTimestamp<>("A", 1, 5),
new KeyValueTimestamp<>("B", 2, 100),
new KeyValueTimestamp<>("C", 3, 0),
new KeyValueTimestamp<>("D", 4, 0),
new KeyValueTimestamp<>("A", 5, 10),
new KeyValueTimestamp<>("A", 6, 8)), processors.get(1).processed);
new KeyValueTimestamp<>("A", 6, 8)), processors.get(1).processed());
assertEquals(asList(new KeyValueTimestamp<>("A", null, 5),
new KeyValueTimestamp<>("B", 2, 100),
new KeyValueTimestamp<>("C", null, 0),
new KeyValueTimestamp<>("D", 4, 0),
new KeyValueTimestamp<>("A", null, 10),
new KeyValueTimestamp<>("A", 6, 8)), processors.get(2).processed);
new KeyValueTimestamp<>("A", 6, 8)), processors.get(2).processed());
assertEquals(asList(new KeyValueTimestamp<>("A", "01", 5),
new KeyValueTimestamp<>("B", "02", 100),
new KeyValueTimestamp<>("C", "03", 0),
new KeyValueTimestamp<>("D", "04", 0),
new KeyValueTimestamp<>("A", "05", 10),
new KeyValueTimestamp<>("A", "06", 8)), processors.get(3).processed);
new KeyValueTimestamp<>("A", "06", 8)), processors.get(3).processed());
}
@Test

View File

@ -70,9 +70,9 @@ public class KTableMapKeysTest {
}
}
assertEquals(3, supplier.theCapturedProcessor().processed.size());
assertEquals(3, supplier.theCapturedProcessor().processed().size());
for (int i = 0; i < expected.length; i++) {
assertEquals(expected[i], supplier.theCapturedProcessor().processed.get(i));
assertEquals(expected[i], supplier.theCapturedProcessor().processed().get(i));
}
}
}

View File

@ -65,7 +65,7 @@ public class KTableMapValuesTest {
assertEquals(asList(new KeyValueTimestamp<>("A", 1, 5),
new KeyValueTimestamp<>("B", 2, 25),
new KeyValueTimestamp<>("C", 3, 20),
new KeyValueTimestamp<>("D", 4, 10)), supplier.theCapturedProcessor().processed);
new KeyValueTimestamp<>("D", 4, 10)), supplier.theCapturedProcessor().processed());
}
}

View File

@ -88,7 +88,7 @@ public class KTableSourceTest {
new KeyValueTimestamp<>("D", 4, 13L),
new KeyValueTimestamp<>("A", null, 14L),
new KeyValueTimestamp<>("B", null, 15L)),
supplier.theCapturedProcessor().processed);
supplier.theCapturedProcessor().processed());
}
@Test

View File

@ -437,8 +437,8 @@ public class KTableTransformValuesTest {
new KeyValueTimestamp<>("A", "3", 15)));
}
private ArrayList<KeyValueTimestamp<Object, Object>> output() {
return capture.capturedProcessors(1).get(0).processed;
private ArrayList<KeyValueTimestamp<String, String>> output() {
return capture.capturedProcessors(1).get(0).processed();
}
private static KeyValueMapper<String, Integer, KeyValue<String, Integer>> toForceSendingOfOldValues() {

View File

@ -89,7 +89,7 @@ public class SessionWindowedKStreamImplTest {
}
final Map<Windowed<String>, ValueAndTimestamp<Long>> result =
supplier.theCapturedProcessor().lastValueAndTimestampPerKey;
supplier.theCapturedProcessor().lastValueAndTimestampPerKey();
assertThat(result.size(), equalTo(3));
assertThat(
@ -115,7 +115,7 @@ public class SessionWindowedKStreamImplTest {
}
final Map<Windowed<String>, ValueAndTimestamp<String>> result =
supplier.theCapturedProcessor().lastValueAndTimestampPerKey;
supplier.theCapturedProcessor().lastValueAndTimestampPerKey();
assertThat(result.size(), equalTo(3));
assertThat(
@ -143,7 +143,7 @@ public class SessionWindowedKStreamImplTest {
}
final Map<Windowed<String>, ValueAndTimestamp<String>> result =
supplier.theCapturedProcessor().lastValueAndTimestampPerKey;
supplier.theCapturedProcessor().lastValueAndTimestampPerKey();
assertThat(result.size(), equalTo(3));
assertThat(

View File

@ -77,15 +77,15 @@ public class TimeWindowedKStreamImplTest {
processData(driver);
}
assertThat(
supplier.theCapturedProcessor().lastValueAndTimestampPerKey
supplier.theCapturedProcessor().lastValueAndTimestampPerKey()
.get(new Windowed<>("1", new TimeWindow(0L, 500L))),
equalTo(ValueAndTimestamp.make(2L, 15L)));
assertThat(
supplier.theCapturedProcessor().lastValueAndTimestampPerKey
supplier.theCapturedProcessor().lastValueAndTimestampPerKey()
.get(new Windowed<>("2", new TimeWindow(500L, 1000L))),
equalTo(ValueAndTimestamp.make(2L, 550L)));
assertThat(
supplier.theCapturedProcessor().lastValueAndTimestampPerKey
supplier.theCapturedProcessor().lastValueAndTimestampPerKey()
.get(new Windowed<>("1", new TimeWindow(500L, 1000L))),
equalTo(ValueAndTimestamp.make(1L, 500L)));
}
@ -102,15 +102,15 @@ public class TimeWindowedKStreamImplTest {
processData(driver);
}
assertThat(
supplier.theCapturedProcessor().lastValueAndTimestampPerKey
supplier.theCapturedProcessor().lastValueAndTimestampPerKey()
.get(new Windowed<>("1", new TimeWindow(0L, 500L))),
equalTo(ValueAndTimestamp.make("1+2", 15L)));
assertThat(
supplier.theCapturedProcessor().lastValueAndTimestampPerKey
supplier.theCapturedProcessor().lastValueAndTimestampPerKey()
.get(new Windowed<>("2", new TimeWindow(500L, 1000L))),
equalTo(ValueAndTimestamp.make("10+20", 550L)));
assertThat(
supplier.theCapturedProcessor().lastValueAndTimestampPerKey
supplier.theCapturedProcessor().lastValueAndTimestampPerKey()
.get(new Windowed<>("1", new TimeWindow(500L, 1000L))),
equalTo(ValueAndTimestamp.make("3", 500L)));
}
@ -130,15 +130,15 @@ public class TimeWindowedKStreamImplTest {
processData(driver);
}
assertThat(
supplier.theCapturedProcessor().lastValueAndTimestampPerKey
supplier.theCapturedProcessor().lastValueAndTimestampPerKey()
.get(new Windowed<>("1", new TimeWindow(0L, 500L))),
equalTo(ValueAndTimestamp.make("0+1+2", 15L)));
assertThat(
supplier.theCapturedProcessor().lastValueAndTimestampPerKey
supplier.theCapturedProcessor().lastValueAndTimestampPerKey()
.get(new Windowed<>("2", new TimeWindow(500L, 1000L))),
equalTo(ValueAndTimestamp.make("0+10+20", 550L)));
assertThat(
supplier.theCapturedProcessor().lastValueAndTimestampPerKey
supplier.theCapturedProcessor().lastValueAndTimestampPerKey()
.get(new Windowed<>("1", new TimeWindow(500L, 1000L))),
equalTo(ValueAndTimestamp.make("0+3", 500L)));
}

View File

@ -96,7 +96,7 @@ public class GlobalStreamThreadTest {
null,
GLOBAL_STORE_TOPIC_NAME,
"processorName",
new KTableSource<>(GLOBAL_STORE_NAME, GLOBAL_STORE_NAME));
() -> ProcessorAdapter.adapt(new KTableSource<>(GLOBAL_STORE_NAME, GLOBAL_STORE_NAME).get()));
baseDirectoryName = TestUtils.tempDirectory().getAbsolutePath();
final HashMap<String, Object> properties = new HashMap<>();

View File

@ -28,6 +28,7 @@ import org.apache.kafka.streams.processor.TopicNameExtractor;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockKeyValueStoreBuilder;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockTimestampExtractor;
@ -270,14 +271,15 @@ public class InternalTopologyBuilderTest {
builder.addSource(null, "source-1", null, null, null, Pattern.compile("topic-1"));
builder.addSource(null, "source-2", null, null, null, Pattern.compile("topic-2"));
builder.addGlobalStore(
new MockKeyValueStoreBuilder("global-store", false).withLoggingDisabled(),
"globalSource",
null,
null,
null,
"globalTopic",
"global-processor",
new MockProcessorSupplier<>());
new MockKeyValueStoreBuilder("global-store", false).withLoggingDisabled(),
"globalSource",
null,
null,
null,
"globalTopic",
"global-processor",
new MockApiProcessorSupplier<>()
);
builder.initializeSubscription();
final Pattern expectedPattern = Pattern.compile("topic-1|topic-2");
@ -298,7 +300,8 @@ public class InternalTopologyBuilderTest {
null,
"globalTopic",
"global-processor",
new MockProcessorSupplier<>());
new MockApiProcessorSupplier<>()
);
builder.initializeSubscription();
assertThat(builder.sourceTopicCollection(), equalTo(asList("topic-1", "topic-2")));
@ -403,7 +406,7 @@ public class InternalTopologyBuilderTest {
null,
"global-topic",
"global-processor",
new MockProcessorSupplier<>()
new MockApiProcessorSupplier<>()
);
final TopologyException exception = assertThrows(
@ -434,7 +437,7 @@ public class InternalTopologyBuilderTest {
null,
"global-topic",
"global-processor",
new MockProcessorSupplier<>()
new MockApiProcessorSupplier<>()
)
);
@ -459,7 +462,7 @@ public class InternalTopologyBuilderTest {
null,
"global-topic",
"global-processor",
new MockProcessorSupplier<>()
new MockApiProcessorSupplier<>()
);
final TopologyException exception = assertThrows(
@ -472,7 +475,7 @@ public class InternalTopologyBuilderTest {
null,
"global-topic",
"global-processor-2",
new MockProcessorSupplier<>()
new MockApiProcessorSupplier<>()
)
);
@ -667,7 +670,7 @@ public class InternalTopologyBuilderTest {
null,
"globalTopic",
"global-processor",
new MockProcessorSupplier<>()
new MockApiProcessorSupplier<>()
);
newNodeGroups = builder.nodeGroups();
assertNotEquals(oldNodeGroups, newNodeGroups);
@ -961,7 +964,8 @@ public class InternalTopologyBuilderTest {
null,
"anyTopicName",
sameNameForSourceAndProcessor,
new MockProcessorSupplier<>());
new MockApiProcessorSupplier<>()
);
}
@Test
@ -1104,7 +1108,8 @@ public class InternalTopologyBuilderTest {
null,
globalTopic,
"global-processor",
new MockProcessorSupplier<>());
new MockApiProcessorSupplier<>()
);
builder.initializeSubscription();
assertThat(builder.buildGlobalStateTopology().storeToChangelogTopic().get(globalStoreName), is(globalTopic));

View File

@ -16,8 +16,6 @@
*/
package org.apache.kafka.streams.processor.internals;
import java.time.Duration;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
@ -44,6 +42,7 @@ import org.easymock.EasyMock;
import org.junit.Before;
import org.junit.Test;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@ -51,8 +50,8 @@ import java.util.List;
import java.util.function.Consumer;
import static java.util.Arrays.asList;
import static org.apache.kafka.streams.processor.internals.ProcessorContextImpl.BYTES_KEY_SERIALIZER;
import static org.apache.kafka.streams.processor.internals.ProcessorContextImpl.BYTEARRAY_VALUE_SERIALIZER;
import static org.apache.kafka.streams.processor.internals.ProcessorContextImpl.BYTES_KEY_SERIALIZER;
import static org.easymock.EasyMock.anyLong;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.anyString;
@ -150,13 +149,21 @@ public class ProcessorContextImplTest {
((InternalProcessorContext) context).transitionToActive(task, null, null);
EasyMock.expect(task.recordCollector()).andStubReturn(recordCollector);
context.setCurrentNode(new ProcessorNode<String, Long, Object, Object>("fake", null,
new HashSet<>(asList(
"LocalKeyValueStore",
"LocalTimestampedKeyValueStore",
"LocalWindowStore",
"LocalTimestampedWindowStore",
"LocalSessionStore"))));
context.setCurrentNode(
new ProcessorNode<>(
"fake",
(org.apache.kafka.streams.processor.api.Processor<String, Long, Object, Object>) null,
new HashSet<>(
asList(
"LocalKeyValueStore",
"LocalTimestampedKeyValueStore",
"LocalWindowStore",
"LocalTimestampedWindowStore",
"LocalSessionStore"
)
)
)
);
}
private ProcessorContextImpl getStandbyContext() {

View File

@ -33,7 +33,7 @@ public class PunctuationQueueTest {
private final Punctuator punctuator = new Punctuator() {
@Override
public void punctuate(final long timestamp) {
node.mockProcessor.punctuatedStreamTime.add(timestamp);
node.mockProcessor.punctuatedStreamTime().add(timestamp);
}
};
@ -52,28 +52,28 @@ public class PunctuationQueueTest {
};
queue.mayPunctuate(now, PunctuationType.STREAM_TIME, processorNodePunctuator);
assertEquals(0, node.mockProcessor.punctuatedStreamTime.size());
assertEquals(0, node.mockProcessor.punctuatedStreamTime().size());
queue.mayPunctuate(now + 99L, PunctuationType.STREAM_TIME, processorNodePunctuator);
assertEquals(0, node.mockProcessor.punctuatedStreamTime.size());
assertEquals(0, node.mockProcessor.punctuatedStreamTime().size());
queue.mayPunctuate(now + 100L, PunctuationType.STREAM_TIME, processorNodePunctuator);
assertEquals(1, node.mockProcessor.punctuatedStreamTime.size());
assertEquals(1, node.mockProcessor.punctuatedStreamTime().size());
queue.mayPunctuate(now + 199L, PunctuationType.STREAM_TIME, processorNodePunctuator);
assertEquals(1, node.mockProcessor.punctuatedStreamTime.size());
assertEquals(1, node.mockProcessor.punctuatedStreamTime().size());
queue.mayPunctuate(now + 200L, PunctuationType.STREAM_TIME, processorNodePunctuator);
assertEquals(2, node.mockProcessor.punctuatedStreamTime.size());
assertEquals(2, node.mockProcessor.punctuatedStreamTime().size());
queue.mayPunctuate(now + 1001L, PunctuationType.STREAM_TIME, processorNodePunctuator);
assertEquals(3, node.mockProcessor.punctuatedStreamTime.size());
assertEquals(3, node.mockProcessor.punctuatedStreamTime().size());
queue.mayPunctuate(now + 1002L, PunctuationType.STREAM_TIME, processorNodePunctuator);
assertEquals(3, node.mockProcessor.punctuatedStreamTime.size());
assertEquals(3, node.mockProcessor.punctuatedStreamTime().size());
queue.mayPunctuate(now + 1100L, PunctuationType.STREAM_TIME, processorNodePunctuator);
assertEquals(4, node.mockProcessor.punctuatedStreamTime.size());
assertEquals(4, node.mockProcessor.punctuatedStreamTime().size());
}
@Test
@ -91,28 +91,28 @@ public class PunctuationQueueTest {
};
queue.mayPunctuate(now, PunctuationType.STREAM_TIME, processorNodePunctuator);
assertEquals(0, node.mockProcessor.punctuatedStreamTime.size());
assertEquals(0, node.mockProcessor.punctuatedStreamTime().size());
queue.mayPunctuate(now + 49L, PunctuationType.STREAM_TIME, processorNodePunctuator);
assertEquals(0, node.mockProcessor.punctuatedStreamTime.size());
assertEquals(0, node.mockProcessor.punctuatedStreamTime().size());
queue.mayPunctuate(now + 50L, PunctuationType.STREAM_TIME, processorNodePunctuator);
assertEquals(1, node.mockProcessor.punctuatedStreamTime.size());
assertEquals(1, node.mockProcessor.punctuatedStreamTime().size());
queue.mayPunctuate(now + 149L, PunctuationType.STREAM_TIME, processorNodePunctuator);
assertEquals(1, node.mockProcessor.punctuatedStreamTime.size());
assertEquals(1, node.mockProcessor.punctuatedStreamTime().size());
queue.mayPunctuate(now + 150L, PunctuationType.STREAM_TIME, processorNodePunctuator);
assertEquals(2, node.mockProcessor.punctuatedStreamTime.size());
assertEquals(2, node.mockProcessor.punctuatedStreamTime().size());
queue.mayPunctuate(now + 1051L, PunctuationType.STREAM_TIME, processorNodePunctuator);
assertEquals(3, node.mockProcessor.punctuatedStreamTime.size());
assertEquals(3, node.mockProcessor.punctuatedStreamTime().size());
queue.mayPunctuate(now + 1052L, PunctuationType.STREAM_TIME, processorNodePunctuator);
assertEquals(3, node.mockProcessor.punctuatedStreamTime.size());
assertEquals(3, node.mockProcessor.punctuatedStreamTime().size());
queue.mayPunctuate(now + 1150L, PunctuationType.STREAM_TIME, processorNodePunctuator);
assertEquals(4, node.mockProcessor.punctuatedStreamTime.size());
assertEquals(4, node.mockProcessor.punctuatedStreamTime().size());
}
@Test
@ -132,13 +132,13 @@ public class PunctuationQueueTest {
};
queue.mayPunctuate(now, PunctuationType.STREAM_TIME, processorNodePunctuator);
assertEquals(0, node.mockProcessor.punctuatedStreamTime.size());
assertEquals(0, node.mockProcessor.punctuatedStreamTime().size());
queue.mayPunctuate(now + 100L, PunctuationType.STREAM_TIME, processorNodePunctuator);
assertEquals(1, node.mockProcessor.punctuatedStreamTime.size());
assertEquals(1, node.mockProcessor.punctuatedStreamTime().size());
queue.mayPunctuate(now + 200L, PunctuationType.STREAM_TIME, processorNodePunctuator);
assertEquals(1, node.mockProcessor.punctuatedStreamTime.size());
assertEquals(1, node.mockProcessor.punctuatedStreamTime().size());
}
private static class TestProcessor extends AbstractProcessor<String, String> {

View File

@ -827,7 +827,7 @@ public class StreamTaskTest {
// st is now 30
assertTrue(task.process(0L));
processorStreamTime.mockProcessor.scheduleCancellable.cancel();
processorStreamTime.mockProcessor.scheduleCancellable().cancel();
assertFalse(task.maybePunctuateStreamTime());
@ -842,7 +842,7 @@ public class StreamTaskTest {
final long now = time.milliseconds();
time.sleep(10);
assertTrue(task.maybePunctuateSystemTime());
processorSystemTime.mockProcessor.scheduleCancellable.cancel();
processorSystemTime.mockProcessor.scheduleCancellable().cancel();
time.sleep(10);
assertFalse(task.maybePunctuateSystemTime());
processorSystemTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, now + 10);

View File

@ -0,0 +1,151 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.test;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
public class MockApiProcessor<KIn, VIn, KOut, VOut> implements Processor<KIn, VIn, KOut, VOut> {
private final ArrayList<KeyValueTimestamp<KIn, VIn>> processed = new ArrayList<>();
private final Map<KIn, ValueAndTimestamp<VIn>> lastValueAndTimestampPerKey = new HashMap<>();
private final ArrayList<Long> punctuatedStreamTime = new ArrayList<>();
private final ArrayList<Long> punctuatedSystemTime = new ArrayList<>();
private Cancellable scheduleCancellable;
private final PunctuationType punctuationType;
private final long scheduleInterval;
private boolean commitRequested = false;
private ProcessorContext<KOut, VOut> context;
public MockApiProcessor(final PunctuationType punctuationType,
final long scheduleInterval) {
this.punctuationType = punctuationType;
this.scheduleInterval = scheduleInterval;
}
public MockApiProcessor() {
this(PunctuationType.STREAM_TIME, -1);
}
@Override
public void init(final ProcessorContext<KOut, VOut> context) {
this.context = context;
if (scheduleInterval > 0L) {
scheduleCancellable = context.schedule(
Duration.ofMillis(scheduleInterval),
punctuationType,
timestamp -> {
if (punctuationType == PunctuationType.STREAM_TIME) {
assertThat(context.timestamp(), is(timestamp));
}
assertThat(context.partition(), is(-1));
assertThat(context.offset(), is(-1L));
(punctuationType == PunctuationType.STREAM_TIME ? punctuatedStreamTime : punctuatedSystemTime)
.add(timestamp);
});
}
}
@Override
public void process(final KIn key, final VIn value) {
final KeyValueTimestamp<KIn, VIn> keyValueTimestamp = new KeyValueTimestamp<>(key, value, context.timestamp());
if (value != null) {
lastValueAndTimestampPerKey.put(key, ValueAndTimestamp.make(value, context.timestamp()));
} else {
lastValueAndTimestampPerKey.remove(key);
}
processed.add(keyValueTimestamp);
if (commitRequested) {
context.commit();
commitRequested = false;
}
}
public void checkAndClearProcessResult(final KeyValueTimestamp<?, ?>... expected) {
assertThat("the number of outputs:" + processed, processed.size(), is(expected.length));
for (int i = 0; i < expected.length; i++) {
assertThat("output[" + i + "]:", processed.get(i), is(expected[i]));
}
processed.clear();
}
public void requestCommit() {
commitRequested = true;
}
public void checkEmptyAndClearProcessResult() {
assertThat("the number of outputs:", processed.size(), is(0));
processed.clear();
}
public void checkAndClearPunctuateResult(final PunctuationType type, final long... expected) {
final ArrayList<Long> punctuated = type == PunctuationType.STREAM_TIME ? punctuatedStreamTime : punctuatedSystemTime;
assertThat("the number of outputs:", punctuated.size(), is(expected.length));
for (int i = 0; i < expected.length; i++) {
assertThat("output[" + i + "]:", punctuated.get(i), is(expected[i]));
}
processed.clear();
}
public ArrayList<KeyValueTimestamp<KIn, VIn>> processed() {
return processed;
}
public Map<KIn, ValueAndTimestamp<VIn>> lastValueAndTimestampPerKey() {
return lastValueAndTimestampPerKey;
}
public List<Long> punctuatedStreamTime() {
return punctuatedStreamTime;
}
public Cancellable scheduleCancellable() {
return scheduleCancellable;
}
public ProcessorContext<KOut, VOut> context() {
return context;
}
public void context(final ProcessorContext<KOut, VOut> context) {
this.context = context;
}
}

View File

@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.test;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.PunctuationType;
import java.util.ArrayList;
import java.util.List;
import static org.junit.Assert.assertEquals;
public class MockApiProcessorSupplier<KIn, VIn, KOut, VOut> implements ProcessorSupplier<KIn, VIn, KOut, VOut> {
private final long scheduleInterval;
private final PunctuationType punctuationType;
private final List<MockApiProcessor<KIn, VIn, KOut, VOut>> processors = new ArrayList<>();
public MockApiProcessorSupplier() {
this(-1L);
}
public MockApiProcessorSupplier(final long scheduleInterval) {
this(scheduleInterval, PunctuationType.STREAM_TIME);
}
public MockApiProcessorSupplier(final long scheduleInterval, final PunctuationType punctuationType) {
this.scheduleInterval = scheduleInterval;
this.punctuationType = punctuationType;
}
@Override
public Processor<KIn, VIn, KOut, VOut> get() {
final MockApiProcessor<KIn, VIn, KOut, VOut> processor = new MockApiProcessor<>(punctuationType, scheduleInterval);
processors.add(processor);
return processor;
}
// get the captured processor assuming that only one processor gets returned from this supplier
public MockApiProcessor<KIn, VIn, KOut, VOut> theCapturedProcessor() {
return capturedProcessors(1).get(0);
}
public int capturedProcessorsCount() {
return processors.size();
}
// get the captured processors with the expected number
public List<MockApiProcessor<KIn, VIn, KOut, VOut>> capturedProcessors(final int expectedNumberOfProcessors) {
assertEquals(expectedNumberOfProcessors, processors.size());
return processors;
}
}

View File

@ -21,104 +21,66 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextAdapter;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
public class MockProcessor<K, V> extends AbstractProcessor<K, V> {
public final ArrayList<KeyValueTimestamp<Object, Object>> processed = new ArrayList<>();
public final Map<K, ValueAndTimestamp<V>> lastValueAndTimestampPerKey = new HashMap<>();
public final ArrayList<Long> punctuatedStreamTime = new ArrayList<>();
public final ArrayList<Long> punctuatedSystemTime = new ArrayList<>();
public Cancellable scheduleCancellable;
private final PunctuationType punctuationType;
private final long scheduleInterval;
private boolean commitRequested = false;
private final MockApiProcessor<K, V, Object, Object> delegate;
public MockProcessor(final PunctuationType punctuationType,
final long scheduleInterval) {
this.punctuationType = punctuationType;
this.scheduleInterval = scheduleInterval;
delegate = new MockApiProcessor<>(punctuationType, scheduleInterval);
}
public MockProcessor() {
this(PunctuationType.STREAM_TIME, -1);
delegate = new MockApiProcessor<>();
}
@Override
public void init(final ProcessorContext context) {
super.init(context);
if (scheduleInterval > 0L) {
scheduleCancellable = context.schedule(
Duration.ofMillis(scheduleInterval),
punctuationType,
timestamp -> {
if (punctuationType == PunctuationType.STREAM_TIME) {
assertEquals(timestamp, context().timestamp());
}
assertEquals(-1, context().partition());
assertEquals(-1L, context().offset());
(punctuationType == PunctuationType.STREAM_TIME ? punctuatedStreamTime : punctuatedSystemTime)
.add(timestamp);
});
}
delegate.init(ProcessorContextAdapter.adapt((InternalProcessorContext) context));
}
@Override
public void process(final K key, final V value) {
KeyValueTimestamp<Object, Object> keyValueTimestamp = new KeyValueTimestamp<>(key, value, context().timestamp());
if (value != null) {
lastValueAndTimestampPerKey.put(key, ValueAndTimestamp.make(value, context().timestamp()));
} else {
lastValueAndTimestampPerKey.remove(key);
}
processed.add(keyValueTimestamp);
if (commitRequested) {
context().commit();
commitRequested = false;
}
delegate.process(key, value);
}
public void checkAndClearProcessResult(final KeyValueTimestamp<?, ?>... expected) {
assertEquals("the number of outputs:" + processed, expected.length, processed.size());
for (int i = 0; i < expected.length; i++) {
assertEquals("output[" + i + "]:", expected[i], processed.get(i));
}
processed.clear();
delegate.checkAndClearProcessResult(expected);
}
public void requestCommit() {
commitRequested = true;
delegate.requestCommit();
}
public void checkEmptyAndClearProcessResult() {
assertEquals("the number of outputs:", 0, processed.size());
processed.clear();
delegate.checkEmptyAndClearProcessResult();
}
public void checkAndClearPunctuateResult(final PunctuationType type, final long... expected) {
final ArrayList<Long> punctuated = type == PunctuationType.STREAM_TIME ? punctuatedStreamTime : punctuatedSystemTime;
assertEquals("the number of outputs:", expected.length, punctuated.size());
delegate.checkAndClearPunctuateResult(type, expected);
}
for (int i = 0; i < expected.length; i++) {
assertEquals("output[" + i + "]:", expected[i], (long) punctuated.get(i));
}
public Map<K, ValueAndTimestamp<V>> lastValueAndTimestampPerKey() {
return delegate.lastValueAndTimestampPerKey();
}
processed.clear();
public List<Long> punctuatedStreamTime() {
return delegate.punctuatedStreamTime();
}
public Cancellable scheduleCancellable() {
return delegate.scheduleCancellable();
}
public ArrayList<KeyValueTimestamp<K, V>> processed() {
return delegate.processed();
}
}

View File

@ -176,12 +176,33 @@ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) {
*
* @see `org.apache.kafka.streams.StreamsBuilder#addGlobalStore`
*/
@deprecated(
"Use #addGlobalStore(StoreBuilder, String, Consumed, org.apache.kafka.streams.processor.api.ProcessorSupplier) instead.",
"2.7.0"
)
def addGlobalStore[K, V](storeBuilder: StoreBuilder[_ <: StateStore],
topic: String,
consumed: Consumed[K, V],
stateUpdateSupplier: ProcessorSupplier[K, V]): StreamsBuilderJ =
inner.addGlobalStore(storeBuilder, topic, consumed, stateUpdateSupplier)
/**
* Adds a global `StateStore` to the topology. Global stores should not be added to `Processor`, `Transformer`,
* or `ValueTransformer` (in contrast to regular stores).
* <p>
* It is not required to connect a global store to `Processor`, `Transformer`, or `ValueTransformer`;
* those have read-only access to all global stores by default.
*
* @see `org.apache.kafka.streams.StreamsBuilder#addGlobalStore`
*/
def addGlobalStore[K, V](
storeBuilder: StoreBuilder[_ <: StateStore],
topic: String,
consumed: Consumed[K, V],
stateUpdateSupplier: org.apache.kafka.streams.processor.api.ProcessorSupplier[K, V, Void, Void]
): StreamsBuilderJ =
inner.addGlobalStore(storeBuilder, topic, consumed, stateUpdateSupplier)
def build(): Topology = inner.build()
/**