mirror of https://github.com/apache/kafka.git
KAFKA-10437: Implement new PAPI support for test-utils (#9396)
Implements KIP-478 for the test-utils module: * adds mocks of the new ProcessorContext and StateStoreContext * adds tests that all stores and store builders are usable with the new mock * adds tests that the new Processor api is usable with the new mock * updates the demonstration Processor to the new api Reviewers: Guozhang Wang <guozhang@apache.org>
This commit is contained in:
parent
a72f0c1eac
commit
27b0e35e7a
|
@ -194,13 +194,13 @@
|
|||
files=".*[/\\]streams[/\\].*test[/\\].*.java"/>
|
||||
|
||||
<suppress checks="CyclomaticComplexity"
|
||||
files="(EosBetaUpgradeIntegrationTest|KStreamKStreamJoinTest|KTableKTableForeignKeyJoinIntegrationTest|RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest|RelationalSmokeTest).java"/>
|
||||
files="(EosBetaUpgradeIntegrationTest|KStreamKStreamJoinTest|KTableKTableForeignKeyJoinIntegrationTest|RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest|RelationalSmokeTest|MockProcessorContextStateStoreTest).java"/>
|
||||
|
||||
<suppress checks="JavaNCSS"
|
||||
files="(EosBetaUpgradeIntegrationTest|KStreamKStreamJoinTest|TaskManagerTest).java"/>
|
||||
|
||||
<suppress checks="NPathComplexity"
|
||||
files="(EosBetaUpgradeIntegrationTest|EosTestDriver|KStreamKStreamJoinTest|KTableKTableForeignKeyJoinIntegrationTest|RelationalSmokeTest).java"/>
|
||||
files="(EosBetaUpgradeIntegrationTest|EosTestDriver|KStreamKStreamJoinTest|KTableKTableForeignKeyJoinIntegrationTest|RelationalSmokeTest|MockProcessorContextStateStoreTest).java"/>
|
||||
|
||||
<suppress checks="(FinalLocalVariable|WhitespaceAround|LocalVariableName|ImportControl|AvoidStarImport)"
|
||||
files="Murmur3Test.java"/>
|
||||
|
|
|
@ -20,6 +20,8 @@ import org.apache.kafka.common.header.Headers;
|
|||
import org.apache.kafka.common.header.internals.RecordHeaders;
|
||||
import org.apache.kafka.streams.errors.StreamsException;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* A data class representing an incoming record for processing in a {@link Processor}
|
||||
* or a record to forward to downstream processors via {@link ProcessorContext}.
|
||||
|
@ -162,4 +164,30 @@ public class Record<K, V> {
|
|||
public Record<K, V> withHeaders(final Headers headers) {
|
||||
return new Record<>(key, value, timestamp, headers);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "Record{" +
|
||||
"key=" + key +
|
||||
", value=" + value +
|
||||
", timestamp=" + timestamp +
|
||||
", headers=" + headers +
|
||||
'}';
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(final Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
final Record<?, ?> record = (Record<?, ?>) o;
|
||||
return timestamp == record.timestamp &&
|
||||
Objects.equals(key, record.key) &&
|
||||
Objects.equals(value, record.value) &&
|
||||
Objects.equals(headers, record.headers);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(key, value, timestamp, headers);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -75,18 +75,25 @@ public class InMemorySessionStore implements SessionStore<Bytes, byte[]> {
|
|||
@Deprecated
|
||||
@Override
|
||||
public void init(final ProcessorContext context, final StateStore root) {
|
||||
this.context = (InternalProcessorContext) context;
|
||||
|
||||
final StreamsMetricsImpl metrics = this.context.metrics();
|
||||
final String threadId = Thread.currentThread().getName();
|
||||
final String taskName = context.taskId().toString();
|
||||
expiredRecordSensor = TaskMetrics.droppedRecordsSensorOrExpiredWindowRecordDropSensor(
|
||||
threadId,
|
||||
taskName,
|
||||
metricScope,
|
||||
name,
|
||||
metrics
|
||||
);
|
||||
|
||||
// The provided context is not required to implement InternalProcessorContext,
|
||||
// If it doesn't, we can't record this metric.
|
||||
if (context instanceof InternalProcessorContext) {
|
||||
this.context = (InternalProcessorContext) context;
|
||||
final StreamsMetricsImpl metrics = this.context.metrics();
|
||||
expiredRecordSensor = TaskMetrics.droppedRecordsSensorOrExpiredWindowRecordDropSensor(
|
||||
threadId,
|
||||
taskName,
|
||||
metricScope,
|
||||
name,
|
||||
metrics
|
||||
);
|
||||
} else {
|
||||
this.context = null;
|
||||
expiredRecordSensor = null;
|
||||
}
|
||||
|
||||
if (root != null) {
|
||||
context.register(root, (key, value) -> put(SessionKeySchema.from(Bytes.wrap(key)), value));
|
||||
|
@ -102,7 +109,11 @@ public class InMemorySessionStore implements SessionStore<Bytes, byte[]> {
|
|||
observedStreamTime = Math.max(observedStreamTime, windowEndTimestamp);
|
||||
|
||||
if (windowEndTimestamp <= observedStreamTime - retentionPeriod) {
|
||||
expiredRecordSensor.record(1.0d, context.currentSystemTimeMs());
|
||||
// The provided context is not required to implement InternalProcessorContext,
|
||||
// If it doesn't, we can't record this metric (in fact, we wouldn't have even initialized it).
|
||||
if (expiredRecordSensor != null && context != null) {
|
||||
expiredRecordSensor.record(1.0d, context.currentSystemTimeMs());
|
||||
}
|
||||
LOG.warn("Skipping record for expired segment.");
|
||||
} else {
|
||||
if (aggregate != null) {
|
||||
|
|
|
@ -0,0 +1,494 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.streams.processor.api;
|
||||
|
||||
import org.apache.kafka.common.metrics.MetricConfig;
|
||||
import org.apache.kafka.common.metrics.Metrics;
|
||||
import org.apache.kafka.common.metrics.Sensor;
|
||||
import org.apache.kafka.common.serialization.Serde;
|
||||
import org.apache.kafka.common.utils.Time;
|
||||
import org.apache.kafka.streams.StreamsConfig;
|
||||
import org.apache.kafka.streams.StreamsMetrics;
|
||||
import org.apache.kafka.streams.Topology;
|
||||
import org.apache.kafka.streams.TopologyTestDriver;
|
||||
import org.apache.kafka.streams.kstream.Transformer;
|
||||
import org.apache.kafka.streams.kstream.ValueTransformer;
|
||||
import org.apache.kafka.streams.processor.Cancellable;
|
||||
import org.apache.kafka.streams.processor.PunctuationType;
|
||||
import org.apache.kafka.streams.processor.Punctuator;
|
||||
import org.apache.kafka.streams.processor.StateRestoreCallback;
|
||||
import org.apache.kafka.streams.processor.StateStore;
|
||||
import org.apache.kafka.streams.processor.StateStoreContext;
|
||||
import org.apache.kafka.streams.processor.TaskId;
|
||||
import org.apache.kafka.streams.processor.internals.ClientUtils;
|
||||
import org.apache.kafka.streams.processor.internals.RecordCollector;
|
||||
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
|
||||
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
|
||||
import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
|
||||
|
||||
import java.io.File;
|
||||
import java.time.Duration;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Properties;
|
||||
|
||||
import static org.apache.kafka.common.utils.Utils.mkEntry;
|
||||
import static org.apache.kafka.common.utils.Utils.mkMap;
|
||||
import static org.apache.kafka.common.utils.Utils.mkProperties;
|
||||
|
||||
/**
|
||||
* {@link MockProcessorContext} is a mock of {@link ProcessorContext} for users to test their {@link Processor},
|
||||
* {@link Transformer}, and {@link ValueTransformer} implementations.
|
||||
* <p>
|
||||
* The tests for this class (org.apache.kafka.streams.MockProcessorContextTest) include several behavioral
|
||||
* tests that serve as example usage.
|
||||
* <p>
|
||||
* Note that this class does not take any automated actions (such as firing scheduled punctuators).
|
||||
* It simply captures any data it witnesses.
|
||||
* If you require more automated tests, we recommend wrapping your {@link Processor} in a minimal source-processor-sink
|
||||
* {@link Topology} and using the {@link TopologyTestDriver}.
|
||||
*/
|
||||
public class MockProcessorContext<KForward, VForward> implements ProcessorContext<KForward, VForward>, RecordCollector.Supplier {
|
||||
// Immutable fields ================================================
|
||||
private final StreamsMetricsImpl metrics;
|
||||
private final TaskId taskId;
|
||||
private final StreamsConfig config;
|
||||
private final File stateDir;
|
||||
|
||||
// settable record metadata ================================================
|
||||
private MockRecordMetadata recordMetadata;
|
||||
|
||||
// mocks ================================================
|
||||
private final Map<String, StateStore> stateStores = new HashMap<>();
|
||||
private final List<CapturedPunctuator> punctuators = new LinkedList<>();
|
||||
private final List<CapturedForward<? extends KForward, ? extends VForward>> capturedForwards = new LinkedList<>();
|
||||
private boolean committed = false;
|
||||
|
||||
private static final class MockRecordMetadata implements RecordMetadata {
|
||||
private final String topic;
|
||||
private final int partition;
|
||||
private final long offset;
|
||||
|
||||
private MockRecordMetadata(final String topic, final int partition, final long offset) {
|
||||
this.topic = topic;
|
||||
this.partition = partition;
|
||||
this.offset = offset;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String topic() {
|
||||
return topic;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int partition() {
|
||||
return partition;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long offset() {
|
||||
return offset;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@link CapturedPunctuator} holds captured punctuators, along with their scheduling information.
|
||||
*/
|
||||
public static final class CapturedPunctuator {
|
||||
private final Duration interval;
|
||||
private final PunctuationType type;
|
||||
private final Punctuator punctuator;
|
||||
private boolean cancelled = false;
|
||||
|
||||
private CapturedPunctuator(final Duration interval, final PunctuationType type, final Punctuator punctuator) {
|
||||
this.interval = interval;
|
||||
this.type = type;
|
||||
this.punctuator = punctuator;
|
||||
}
|
||||
|
||||
public Duration getInterval() {
|
||||
return interval;
|
||||
}
|
||||
|
||||
public PunctuationType getType() {
|
||||
return type;
|
||||
}
|
||||
|
||||
public Punctuator getPunctuator() {
|
||||
return punctuator;
|
||||
}
|
||||
|
||||
public void cancel() {
|
||||
cancelled = true;
|
||||
}
|
||||
|
||||
public boolean cancelled() {
|
||||
return cancelled;
|
||||
}
|
||||
}
|
||||
|
||||
public static final class CapturedForward<K, V> {
|
||||
|
||||
private final Record<K, V> record;
|
||||
private final Optional<String> childName;
|
||||
|
||||
public CapturedForward(final Record<K, V> record) {
|
||||
this(record, Optional.empty());
|
||||
}
|
||||
|
||||
public CapturedForward(final Record<K, V> record, final Optional<String> childName) {
|
||||
this.record = Objects.requireNonNull(record);
|
||||
this.childName = Objects.requireNonNull(childName);
|
||||
}
|
||||
|
||||
/**
|
||||
* The child this data was forwarded to.
|
||||
*
|
||||
* @return If present, the child name the record was forwarded to.
|
||||
* If empty, the forward was a broadcast.
|
||||
*/
|
||||
public Optional<String> childName() {
|
||||
return childName;
|
||||
}
|
||||
|
||||
/**
|
||||
* The record that was forwarded.
|
||||
*
|
||||
* @return The forwarded record. Not null.
|
||||
*/
|
||||
public Record<K, V> record() {
|
||||
return record;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "CapturedForward{" +
|
||||
"record=" + record +
|
||||
", childName=" + childName +
|
||||
'}';
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(final Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
final CapturedForward<?, ?> that = (CapturedForward<?, ?>) o;
|
||||
return Objects.equals(record, that.record) &&
|
||||
Objects.equals(childName, that.childName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(record, childName);
|
||||
}
|
||||
}
|
||||
|
||||
// constructors ================================================
|
||||
|
||||
/**
|
||||
* Create a {@link MockProcessorContext} with dummy {@code config} and {@code taskId} and {@code null} {@code stateDir}.
|
||||
* Most unit tests using this mock won't need to know the taskId,
|
||||
* and most unit tests should be able to get by with the
|
||||
* {@link InMemoryKeyValueStore}, so the stateDir won't matter.
|
||||
*/
|
||||
public MockProcessorContext() {
|
||||
this(
|
||||
mkProperties(mkMap(
|
||||
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, ""),
|
||||
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "")
|
||||
)),
|
||||
new TaskId(0, 0),
|
||||
null
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a {@link MockProcessorContext} with dummy {@code taskId} and {@code null} {@code stateDir}.
|
||||
* Most unit tests using this mock won't need to know the taskId,
|
||||
* and most unit tests should be able to get by with the
|
||||
* {@link InMemoryKeyValueStore}, so the stateDir won't matter.
|
||||
*
|
||||
* @param config a Properties object, used to configure the context and the processor.
|
||||
*/
|
||||
public MockProcessorContext(final Properties config) {
|
||||
this(config, new TaskId(0, 0), null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a {@link MockProcessorContext} with a specified taskId and null stateDir.
|
||||
*
|
||||
* @param config a {@link Properties} object, used to configure the context and the processor.
|
||||
* @param taskId a {@link TaskId}, which the context makes available via {@link MockProcessorContext#taskId()}.
|
||||
* @param stateDir a {@link File}, which the context makes available viw {@link MockProcessorContext#stateDir()}.
|
||||
*/
|
||||
public MockProcessorContext(final Properties config, final TaskId taskId, final File stateDir) {
|
||||
final StreamsConfig streamsConfig = new ClientUtils.QuietStreamsConfig(config);
|
||||
this.taskId = taskId;
|
||||
this.config = streamsConfig;
|
||||
this.stateDir = stateDir;
|
||||
final MetricConfig metricConfig = new MetricConfig();
|
||||
metricConfig.recordLevel(Sensor.RecordingLevel.DEBUG);
|
||||
final String threadId = Thread.currentThread().getName();
|
||||
metrics = new StreamsMetricsImpl(
|
||||
new Metrics(metricConfig),
|
||||
threadId,
|
||||
streamsConfig.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG),
|
||||
Time.SYSTEM
|
||||
);
|
||||
TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor(threadId, taskId.toString(), metrics);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String applicationId() {
|
||||
return config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TaskId taskId() {
|
||||
return taskId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> appConfigs() {
|
||||
final Map<String, Object> combined = new HashMap<>();
|
||||
combined.putAll(config.originals());
|
||||
combined.putAll(config.values());
|
||||
return combined;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> appConfigsWithPrefix(final String prefix) {
|
||||
return config.originalsWithPrefix(prefix);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Serde<?> keySerde() {
|
||||
return config.defaultKeySerde();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Serde<?> valueSerde() {
|
||||
return config.defaultValueSerde();
|
||||
}
|
||||
|
||||
@Override
|
||||
public File stateDir() {
|
||||
return Objects.requireNonNull(
|
||||
stateDir,
|
||||
"The stateDir constructor argument was needed (probably for a state store) but not supplied. " +
|
||||
"You can either reconfigure your test so that it doesn't need access to the disk " +
|
||||
"(such as using an in-memory store), or use the full MockProcessorContext constructor to supply " +
|
||||
"a non-null stateDir argument."
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public StreamsMetrics metrics() {
|
||||
return metrics;
|
||||
}
|
||||
|
||||
// settable record metadata ================================================
|
||||
|
||||
/**
|
||||
* The context exposes these metadata for use in the processor. Normally, they are set by the Kafka Streams framework,
|
||||
* but for the purpose of driving unit tests, you can set them directly.
|
||||
*
|
||||
* @param topic A topic name
|
||||
* @param partition A partition number
|
||||
* @param offset A record offset
|
||||
*/
|
||||
public void setRecordMetadata(final String topic,
|
||||
final int partition,
|
||||
final long offset) {
|
||||
recordMetadata = new MockRecordMetadata(topic, partition, offset);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<RecordMetadata> recordMetadata() {
|
||||
return Optional.ofNullable(recordMetadata);
|
||||
}
|
||||
|
||||
// mocks ================================================
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public <S extends StateStore> S getStateStore(final String name) {
|
||||
return (S) stateStores.get(name);
|
||||
}
|
||||
|
||||
public <S extends StateStore> void addStateStore(final S stateStore) {
|
||||
stateStores.put(stateStore.name(), stateStore);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Cancellable schedule(final Duration interval,
|
||||
final PunctuationType type,
|
||||
final Punctuator callback) {
|
||||
final CapturedPunctuator capturedPunctuator = new CapturedPunctuator(interval, type, callback);
|
||||
|
||||
punctuators.add(capturedPunctuator);
|
||||
|
||||
return capturedPunctuator::cancel;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the punctuators scheduled so far. The returned list is not affected by subsequent calls to {@code schedule(...)}.
|
||||
*
|
||||
* @return A list of captured punctuators.
|
||||
*/
|
||||
public List<CapturedPunctuator> scheduledPunctuators() {
|
||||
return new LinkedList<>(punctuators);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <K extends KForward, V extends VForward> void forward(final Record<K, V> record) {
|
||||
forward(record, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <K extends KForward, V extends VForward> void forward(final Record<K, V> record, final String childName) {
|
||||
capturedForwards.add(new CapturedForward<>(record, Optional.ofNullable(childName)));
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all the forwarded data this context has observed. The returned list will not be
|
||||
* affected by subsequent interactions with the context. The data in the list is in the same order as the calls to
|
||||
* {@code forward(...)}.
|
||||
*
|
||||
* @return A list of records that were previously passed to the context.
|
||||
*/
|
||||
public List<CapturedForward<? extends KForward, ? extends VForward>> forwarded() {
|
||||
return new LinkedList<>(capturedForwards);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all the forwarded data this context has observed for a specific child by name.
|
||||
* The returned list will not be affected by subsequent interactions with the context.
|
||||
* The data in the list is in the same order as the calls to {@code forward(...)}.
|
||||
*
|
||||
* @param childName The child name to retrieve forwards for
|
||||
* @return A list of records that were previously passed to the context.
|
||||
*/
|
||||
public List<CapturedForward<? extends KForward, ? extends VForward>> forwarded(final String childName) {
|
||||
final LinkedList<CapturedForward<? extends KForward, ? extends VForward>> result = new LinkedList<>();
|
||||
for (final CapturedForward<? extends KForward, ? extends VForward> capture : capturedForwards) {
|
||||
if (!capture.childName().isPresent() || capture.childName().equals(Optional.of(childName))) {
|
||||
result.add(capture);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Clear the captured forwarded data.
|
||||
*/
|
||||
public void resetForwards() {
|
||||
capturedForwards.clear();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void commit() {
|
||||
committed = true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Whether {@link ProcessorContext#commit()} has been called in this context.
|
||||
*
|
||||
* @return {@code true} iff {@link ProcessorContext#commit()} has been called in this context since construction or reset.
|
||||
*/
|
||||
public boolean committed() {
|
||||
return committed;
|
||||
}
|
||||
|
||||
/**
|
||||
* Reset the commit capture to {@code false} (whether or not it was previously {@code true}).
|
||||
*/
|
||||
public void resetCommit() {
|
||||
committed = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RecordCollector recordCollector() {
|
||||
// This interface is assumed by state stores that add change-logging.
|
||||
// Rather than risk a mysterious ClassCastException during unit tests, throw an explanatory exception.
|
||||
|
||||
throw new UnsupportedOperationException(
|
||||
"MockProcessorContext does not provide record collection. " +
|
||||
"For processor unit tests, use an in-memory state store with change-logging disabled. " +
|
||||
"Alternatively, use the TopologyTestDriver for testing processor/store/topology integration."
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Used to get a {@link StateStoreContext} for use with
|
||||
* {@link StateStore#init(StateStoreContext, StateStore)}
|
||||
* if you need to initialize a store for your tests.
|
||||
* @return a {@link StateStoreContext} that delegates to this ProcessorContext.
|
||||
*/
|
||||
public StateStoreContext getStateStoreContext() {
|
||||
return new StateStoreContext() {
|
||||
@Override
|
||||
public String applicationId() {
|
||||
return MockProcessorContext.this.applicationId();
|
||||
}
|
||||
|
||||
@Override
|
||||
public TaskId taskId() {
|
||||
return MockProcessorContext.this.taskId();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Serde<?> keySerde() {
|
||||
return MockProcessorContext.this.keySerde();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Serde<?> valueSerde() {
|
||||
return MockProcessorContext.this.valueSerde();
|
||||
}
|
||||
|
||||
@Override
|
||||
public File stateDir() {
|
||||
return MockProcessorContext.this.stateDir();
|
||||
}
|
||||
|
||||
@Override
|
||||
public StreamsMetrics metrics() {
|
||||
return MockProcessorContext.this.metrics();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void register(final StateStore store, final StateRestoreCallback stateRestoreCallback) {
|
||||
stateStores.put(store.name(), store);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> appConfigs() {
|
||||
return MockProcessorContext.this.appConfigs();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> appConfigsWithPrefix(final String prefix) {
|
||||
return MockProcessorContext.this.appConfigsWithPrefix(prefix);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
|
@ -43,6 +43,7 @@ import static org.junit.Assert.assertNull;
|
|||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
@SuppressWarnings("deprecation") // this is a test of a deprecated API
|
||||
public class MockProcessorContextTest {
|
||||
@Test
|
||||
public void shouldCaptureOutputRecords() {
|
||||
|
@ -160,7 +161,6 @@ public class MockProcessorContextTest {
|
|||
@Test
|
||||
public void shouldThrowIfForwardedWithDeprecatedChildIndex() {
|
||||
final AbstractProcessor<String, Long> processor = new AbstractProcessor<String, Long>() {
|
||||
@SuppressWarnings("deprecation")
|
||||
@Override
|
||||
public void process(final String key, final Long value) {
|
||||
context().forward(key, value, 0);
|
||||
|
@ -182,7 +182,6 @@ public class MockProcessorContextTest {
|
|||
@Test
|
||||
public void shouldThrowIfForwardedWithDeprecatedChildName() {
|
||||
final AbstractProcessor<String, Long> processor = new AbstractProcessor<String, Long>() {
|
||||
@SuppressWarnings("deprecation")
|
||||
@Override
|
||||
public void process(final String key, final Long value) {
|
||||
context().forward(key, value, "child1");
|
||||
|
@ -231,14 +230,13 @@ public class MockProcessorContextTest {
|
|||
|
||||
assertFalse(context.committed());
|
||||
}
|
||||
@SuppressWarnings({"deprecation", "unchecked"}) // TODO deprecation will be fixed in KAFKA-10437
|
||||
|
||||
@Test
|
||||
public void shouldStoreAndReturnStateStores() {
|
||||
final AbstractProcessor<String, Long> processor = new AbstractProcessor<String, Long>() {
|
||||
@Override
|
||||
public void process(final String key, final Long value) {
|
||||
@SuppressWarnings("unchecked")
|
||||
final KeyValueStore<String, Long> stateStore = (KeyValueStore<String, Long>) context().getStateStore("my-state");
|
||||
final KeyValueStore<String, Long> stateStore = context().getStateStore("my-state");
|
||||
stateStore.put(key, (stateStore.get(key) == null ? 0 : stateStore.get(key)) + value);
|
||||
stateStore.put("all", (stateStore.get("all") == null ? 0 : stateStore.get("all")) + value);
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|||
import org.apache.kafka.streams.KeyValue;
|
||||
import org.apache.kafka.streams.processor.ProcessorContext;
|
||||
import org.apache.kafka.streams.processor.StateStore;
|
||||
import org.apache.kafka.streams.processor.StateStoreContext;
|
||||
import org.apache.kafka.streams.state.KeyValueIterator;
|
||||
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
|
||||
import org.apache.kafka.streams.state.ValueAndTimestamp;
|
||||
|
@ -53,10 +54,22 @@ public class KeyValueStoreFacadeTest {
|
|||
keyValueStoreFacade = new KeyValueStoreFacade<>(mockedKeyValueTimestampStore);
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation") // TODO will be fixed in KAFKA-10437
|
||||
@SuppressWarnings("deprecation") // test of deprecated method
|
||||
@Test
|
||||
public void shouldForwardDeprecatedInit() {
|
||||
final ProcessorContext context = mock(ProcessorContext.class);
|
||||
final StateStore store = mock(StateStore.class);
|
||||
mockedKeyValueTimestampStore.init(context, store);
|
||||
expectLastCall();
|
||||
replay(mockedKeyValueTimestampStore);
|
||||
|
||||
keyValueStoreFacade.init(context, store);
|
||||
verify(mockedKeyValueTimestampStore);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldForwardInit() {
|
||||
final ProcessorContext context = mock(ProcessorContext.class);
|
||||
final StateStoreContext context = mock(StateStoreContext.class);
|
||||
final StateStore store = mock(StateStore.class);
|
||||
mockedKeyValueTimestampStore.init(context, store);
|
||||
expectLastCall();
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.kafka.streams.internals;
|
|||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.streams.processor.ProcessorContext;
|
||||
import org.apache.kafka.streams.processor.StateStore;
|
||||
import org.apache.kafka.streams.processor.StateStoreContext;
|
||||
import org.apache.kafka.streams.state.TimestampedWindowStore;
|
||||
import org.apache.kafka.streams.state.ValueAndTimestamp;
|
||||
import org.easymock.EasyMockRunner;
|
||||
|
@ -47,10 +48,22 @@ public class WindowStoreFacadeTest {
|
|||
windowStoreFacade = new WindowStoreFacade<>(mockedWindowTimestampStore);
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation") // TODO will be fixed in KAFKA-10437
|
||||
@SuppressWarnings("deprecation") // test of deprecated method
|
||||
@Test
|
||||
public void shouldForwardDeprecatedInit() {
|
||||
final ProcessorContext context = mock(ProcessorContext.class);
|
||||
final StateStore store = mock(StateStore.class);
|
||||
mockedWindowTimestampStore.init(context, store);
|
||||
expectLastCall();
|
||||
replay(mockedWindowTimestampStore);
|
||||
|
||||
windowStoreFacade.init(context, store);
|
||||
verify(mockedWindowTimestampStore);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldForwardInit() {
|
||||
final ProcessorContext context = mock(ProcessorContext.class);
|
||||
final StateStoreContext context = mock(StateStoreContext.class);
|
||||
final StateStore store = mock(StateStore.class);
|
||||
mockedWindowTimestampStore.init(context, store);
|
||||
expectLastCall();
|
||||
|
|
|
@ -0,0 +1,353 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.streams.test;
|
||||
|
||||
import org.apache.kafka.common.serialization.Serdes;
|
||||
import org.apache.kafka.streams.StreamsConfig;
|
||||
import org.apache.kafka.streams.processor.PunctuationType;
|
||||
import org.apache.kafka.streams.processor.Punctuator;
|
||||
import org.apache.kafka.streams.processor.TaskId;
|
||||
import org.apache.kafka.streams.processor.api.MockProcessorContext;
|
||||
import org.apache.kafka.streams.processor.api.MockProcessorContext.CapturedForward;
|
||||
import org.apache.kafka.streams.processor.api.Processor;
|
||||
import org.apache.kafka.streams.processor.api.ProcessorContext;
|
||||
import org.apache.kafka.streams.processor.api.Record;
|
||||
import org.apache.kafka.streams.processor.api.RecordMetadata;
|
||||
import org.apache.kafka.streams.state.KeyValueStore;
|
||||
import org.apache.kafka.streams.state.StoreBuilder;
|
||||
import org.apache.kafka.streams.state.Stores;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.File;
|
||||
import java.time.Duration;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.Properties;
|
||||
|
||||
import static java.util.Arrays.asList;
|
||||
import static java.util.Collections.singletonList;
|
||||
import static org.apache.kafka.common.utils.Utils.mkEntry;
|
||||
import static org.apache.kafka.common.utils.Utils.mkMap;
|
||||
import static org.apache.kafka.common.utils.Utils.mkProperties;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.empty;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
||||
public class MockProcessorContextAPITest {
|
||||
@Test
|
||||
public void shouldCaptureOutputRecords() {
|
||||
final Processor<String, Long, String, Long> processor = new Processor<String, Long, String, Long>() {
|
||||
private ProcessorContext<String, Long> context;
|
||||
|
||||
@Override
|
||||
public void init(final ProcessorContext<String, Long> context) {
|
||||
this.context = context;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(final Record<String, Long> record) {
|
||||
final String key = record.key();
|
||||
final Long value = record.value();
|
||||
context.forward(record.withKey(key + value).withValue(key.length() + value));
|
||||
}
|
||||
};
|
||||
|
||||
final MockProcessorContext<String, Long> context = new MockProcessorContext<>();
|
||||
processor.init(context);
|
||||
|
||||
processor.process(new Record<>("foo", 5L, 0L));
|
||||
processor.process(new Record<>("barbaz", 50L, 0L));
|
||||
|
||||
final List<CapturedForward<? extends String, ? extends Long>> actual = context.forwarded();
|
||||
final List<CapturedForward<String, Long>> expected = asList(
|
||||
new CapturedForward<>(new Record<>("foo5", 8L, 0L)),
|
||||
new CapturedForward<>(new Record<>("barbaz50", 56L, 0L))
|
||||
);
|
||||
assertThat(actual, is(expected));
|
||||
|
||||
context.resetForwards();
|
||||
|
||||
assertThat(context.forwarded(), empty());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldCaptureRecordsOutputToChildByName() {
|
||||
final Processor<String, Long, String, Long> processor = new Processor<String, Long, String, Long>() {
|
||||
private ProcessorContext<String, Long> context;
|
||||
|
||||
@Override
|
||||
public void process(final Record<String, Long> record) {
|
||||
final String key = record.key();
|
||||
final Long value = record.value();
|
||||
if (count == 0) {
|
||||
context.forward(new Record<>("start", -1L, 0L)); // broadcast
|
||||
}
|
||||
final String toChild = count % 2 == 0 ? "george" : "pete";
|
||||
context.forward(new Record<>(key + value, key.length() + value, 0L), toChild);
|
||||
count++;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(final ProcessorContext<String, Long> context) {
|
||||
this.context = context;
|
||||
}
|
||||
|
||||
private int count = 0;
|
||||
|
||||
};
|
||||
|
||||
final MockProcessorContext<String, Long> context = new MockProcessorContext<>();
|
||||
|
||||
processor.init(context);
|
||||
|
||||
processor.process(new Record<>("foo", 5L, 0L));
|
||||
processor.process(new Record<>("barbaz", 50L, 0L));
|
||||
|
||||
{
|
||||
final List<CapturedForward<? extends String, ? extends Long>> forwarded = context.forwarded();
|
||||
final List<CapturedForward<? extends String, ? extends Long>> expected = asList(
|
||||
new CapturedForward<>(new Record<>("start", -1L, 0L), Optional.empty()),
|
||||
new CapturedForward<>(new Record<>("foo5", 8L, 0L), Optional.of("george")),
|
||||
new CapturedForward<>(new Record<>("barbaz50", 56L, 0L), Optional.of("pete"))
|
||||
);
|
||||
|
||||
assertThat(forwarded, is(expected));
|
||||
}
|
||||
{
|
||||
final List<CapturedForward<? extends String, ? extends Long>> forwarded = context.forwarded("george");
|
||||
final List<CapturedForward<? extends String, ? extends Long>> expected = asList(
|
||||
new CapturedForward<>(new Record<>("start", -1L, 0L), Optional.empty()),
|
||||
new CapturedForward<>(new Record<>("foo5", 8L, 0L), Optional.of("george"))
|
||||
);
|
||||
|
||||
assertThat(forwarded, is(expected));
|
||||
}
|
||||
{
|
||||
final List<CapturedForward<? extends String, ? extends Long>> forwarded = context.forwarded("pete");
|
||||
final List<CapturedForward<? extends String, ? extends Long>> expected = asList(
|
||||
new CapturedForward<>(new Record<>("start", -1L, 0L), Optional.empty()),
|
||||
new CapturedForward<>(new Record<>("barbaz50", 56L, 0L), Optional.of("pete"))
|
||||
);
|
||||
|
||||
assertThat(forwarded, is(expected));
|
||||
}
|
||||
{
|
||||
final List<CapturedForward<? extends String, ? extends Long>> forwarded = context.forwarded("steve");
|
||||
final List<CapturedForward<? extends String, ? extends Long>> expected = singletonList(
|
||||
new CapturedForward<>(new Record<>("start", -1L, 0L))
|
||||
);
|
||||
|
||||
assertThat(forwarded, is(expected));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldCaptureCommitsAndAllowReset() {
|
||||
final Processor<String, Long, Void, Void> processor = new Processor<String, Long, Void, Void>() {
|
||||
private ProcessorContext<Void, Void> context;
|
||||
private int count = 0;
|
||||
|
||||
@Override
|
||||
public void init(final ProcessorContext<Void, Void> context) {
|
||||
this.context = context;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(final Record<String, Long> record) {
|
||||
if (++count > 2) {
|
||||
context.commit();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
final MockProcessorContext<Void, Void> context = new MockProcessorContext<>();
|
||||
|
||||
processor.init(context);
|
||||
|
||||
processor.process(new Record<>("foo", 5L, 0L));
|
||||
processor.process(new Record<>("barbaz", 50L, 0L));
|
||||
|
||||
assertThat(context.committed(), is(false));
|
||||
|
||||
processor.process(new Record<>("foobar", 500L, 0L));
|
||||
|
||||
assertThat(context.committed(), is(true));
|
||||
|
||||
context.resetCommit();
|
||||
|
||||
assertThat(context.committed(), is(false));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldStoreAndReturnStateStores() {
|
||||
final Processor<String, Long, Void, Void> processor = new Processor<String, Long, Void, Void>() {
|
||||
private ProcessorContext<Void, Void> context;
|
||||
|
||||
@Override
|
||||
public void init(final ProcessorContext<Void, Void> context) {
|
||||
this.context = context;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(final Record<String, Long> record) {
|
||||
final String key = record.key();
|
||||
final Long value = record.value();
|
||||
final KeyValueStore<String, Long> stateStore = context.getStateStore("my-state");
|
||||
|
||||
stateStore.put(key, (stateStore.get(key) == null ? 0 : stateStore.get(key)) + value);
|
||||
stateStore.put("all", (stateStore.get("all") == null ? 0 : stateStore.get("all")) + value);
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
final MockProcessorContext<Void, Void> context = new MockProcessorContext<>();
|
||||
|
||||
final StoreBuilder<KeyValueStore<String, Long>> storeBuilder = Stores.keyValueStoreBuilder(
|
||||
Stores.inMemoryKeyValueStore("my-state"),
|
||||
Serdes.String(),
|
||||
Serdes.Long()).withLoggingDisabled();
|
||||
|
||||
final KeyValueStore<String, Long> store = storeBuilder.build();
|
||||
|
||||
store.init(context.getStateStoreContext(), store);
|
||||
|
||||
processor.init(context);
|
||||
|
||||
processor.process(new Record<>("foo", 5L, 0L));
|
||||
processor.process(new Record<>("bar", 50L, 0L));
|
||||
|
||||
assertThat(store.get("foo"), is(5L));
|
||||
assertThat(store.get("bar"), is(50L));
|
||||
assertThat(store.get("all"), is(55L));
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void shouldCaptureApplicationAndRecordMetadata() {
|
||||
final Properties config = mkProperties(
|
||||
mkMap(
|
||||
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "testMetadata"),
|
||||
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "")
|
||||
)
|
||||
);
|
||||
|
||||
final Processor<String, Object, String, Object> processor = new Processor<String, Object, String, Object>() {
|
||||
private ProcessorContext<String, Object> context;
|
||||
|
||||
@Override
|
||||
public void init(final ProcessorContext<String, Object> context) {
|
||||
this.context = context;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(final Record<String, Object> record) {
|
||||
context.forward(new Record<String, Object>("appId", context.applicationId(), 0L));
|
||||
context.forward(new Record<String, Object>("taskId", context.taskId(), 0L));
|
||||
|
||||
if (context.recordMetadata().isPresent()) {
|
||||
final RecordMetadata recordMetadata = context.recordMetadata().get();
|
||||
context.forward(new Record<String, Object>("topic", recordMetadata.topic(), 0L));
|
||||
context.forward(new Record<String, Object>("partition", recordMetadata.partition(), 0L));
|
||||
context.forward(new Record<String, Object>("offset", recordMetadata.offset(), 0L));
|
||||
}
|
||||
|
||||
context.forward(new Record<String, Object>("record", record, 0L));
|
||||
}
|
||||
};
|
||||
|
||||
final MockProcessorContext<String, Object> context = new MockProcessorContext<>(config);
|
||||
processor.init(context);
|
||||
|
||||
processor.process(new Record<>("foo", 5L, 0L));
|
||||
{
|
||||
final List<CapturedForward<? extends String, ?>> forwarded = context.forwarded();
|
||||
final List<CapturedForward<? extends String, ?>> expected = asList(
|
||||
new CapturedForward<>(new Record<>("appId", "testMetadata", 0L)),
|
||||
new CapturedForward<>(new Record<>("taskId", new TaskId(0, 0), 0L)),
|
||||
new CapturedForward<>(new Record<>("record", new Record<>("foo", 5L, 0L), 0L))
|
||||
);
|
||||
assertThat(forwarded, is(expected));
|
||||
}
|
||||
context.resetForwards();
|
||||
context.setRecordMetadata("t1", 0, 0L);
|
||||
processor.process(new Record<>("foo", 5L, 0L));
|
||||
{
|
||||
final List<CapturedForward<? extends String, ?>> forwarded = context.forwarded();
|
||||
final List<CapturedForward<? extends String, ?>> expected = asList(
|
||||
new CapturedForward<>(new Record<>("appId", "testMetadata", 0L)),
|
||||
new CapturedForward<>(new Record<>("taskId", new TaskId(0, 0), 0L)),
|
||||
new CapturedForward<>(new Record<>("topic", "t1", 0L)),
|
||||
new CapturedForward<>(new Record<>("partition", 0, 0L)),
|
||||
new CapturedForward<>(new Record<>("offset", 0L, 0L)),
|
||||
new CapturedForward<>(new Record<>("record", new Record<>("foo", 5L, 0L), 0L))
|
||||
);
|
||||
assertThat(forwarded, is(expected));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldCapturePunctuator() {
|
||||
final Processor<String, Long, Void, Void> processor = new Processor<String, Long, Void, Void>() {
|
||||
@Override
|
||||
public void init(final ProcessorContext<Void, Void> context) {
|
||||
context.schedule(
|
||||
Duration.ofSeconds(1L),
|
||||
PunctuationType.WALL_CLOCK_TIME,
|
||||
timestamp -> context.commit()
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(final Record<String, Long> record) {}
|
||||
};
|
||||
|
||||
final MockProcessorContext<Void, Void> context = new MockProcessorContext<>();
|
||||
|
||||
processor.init(context);
|
||||
|
||||
final MockProcessorContext.CapturedPunctuator capturedPunctuator = context.scheduledPunctuators().get(0);
|
||||
assertThat(capturedPunctuator.getInterval(), is(Duration.ofMillis(1000L)));
|
||||
assertThat(capturedPunctuator.getType(), is(PunctuationType.WALL_CLOCK_TIME));
|
||||
assertThat(capturedPunctuator.cancelled(), is(false));
|
||||
|
||||
final Punctuator punctuator = capturedPunctuator.getPunctuator();
|
||||
assertThat(context.committed(), is(false));
|
||||
punctuator.punctuate(1234L);
|
||||
assertThat(context.committed(), is(true));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void fullConstructorShouldSetAllExpectedAttributes() {
|
||||
final Properties config = new Properties();
|
||||
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "testFullConstructor");
|
||||
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "");
|
||||
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
|
||||
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
|
||||
|
||||
final File dummyFile = new File("");
|
||||
final MockProcessorContext<Void, Void> context =
|
||||
new MockProcessorContext<>(config, new TaskId(1, 1), dummyFile);
|
||||
|
||||
assertThat(context.applicationId(), is("testFullConstructor"));
|
||||
assertThat(context.taskId(), is(new TaskId(1, 1)));
|
||||
assertThat(context.appConfigs().get(StreamsConfig.APPLICATION_ID_CONFIG), is("testFullConstructor"));
|
||||
assertThat(context.appConfigsWithPrefix("application.").get("id"), is("testFullConstructor"));
|
||||
assertThat(context.keySerde().getClass(), is(Serdes.String().getClass()));
|
||||
assertThat(context.valueSerde().getClass(), is(Serdes.Long().getClass()));
|
||||
assertThat(context.stateDir(), is(dummyFile));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,200 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.streams.test;
|
||||
|
||||
import org.apache.kafka.common.serialization.Serdes;
|
||||
import org.apache.kafka.common.utils.Utils;
|
||||
import org.apache.kafka.streams.StreamsConfig;
|
||||
import org.apache.kafka.streams.processor.StateStore;
|
||||
import org.apache.kafka.streams.processor.TaskId;
|
||||
import org.apache.kafka.streams.processor.api.MockProcessorContext;
|
||||
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
|
||||
import org.apache.kafka.streams.state.KeyValueStore;
|
||||
import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
|
||||
import org.apache.kafka.streams.state.SessionStore;
|
||||
import org.apache.kafka.streams.state.StoreBuilder;
|
||||
import org.apache.kafka.streams.state.Stores;
|
||||
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
|
||||
import org.apache.kafka.streams.state.WindowStore;
|
||||
import org.apache.kafka.test.TestUtils;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import static java.util.Arrays.asList;
|
||||
import static org.apache.kafka.common.utils.Utils.mkEntry;
|
||||
import static org.apache.kafka.common.utils.Utils.mkMap;
|
||||
import static org.apache.kafka.common.utils.Utils.mkProperties;
|
||||
import static org.junit.Assert.assertThrows;
|
||||
|
||||
@RunWith(value = Parameterized.class)
|
||||
public class MockProcessorContextStateStoreTest {
|
||||
private final StoreBuilder<StateStore> builder;
|
||||
private final boolean timestamped;
|
||||
private final boolean caching;
|
||||
private final boolean logging;
|
||||
|
||||
@Parameterized.Parameters(name = "builder = {0}, timestamped = {1}, caching = {2}, logging = {3}")
|
||||
public static Collection<Object[]> data() {
|
||||
final List<Boolean> booleans = asList(true, false);
|
||||
|
||||
final List<Object[]> values = new ArrayList<>();
|
||||
|
||||
for (final Boolean timestamped : booleans) {
|
||||
for (final Boolean caching : booleans) {
|
||||
for (final Boolean logging : booleans) {
|
||||
final List<KeyValueBytesStoreSupplier> keyValueBytesStoreSuppliers = asList(
|
||||
Stores.inMemoryKeyValueStore("kv" + timestamped + caching + logging),
|
||||
Stores.persistentKeyValueStore("kv" + timestamped + caching + logging),
|
||||
Stores.persistentTimestampedKeyValueStore("kv" + timestamped + caching + logging)
|
||||
);
|
||||
for (final KeyValueBytesStoreSupplier supplier : keyValueBytesStoreSuppliers) {
|
||||
final StoreBuilder<? extends KeyValueStore<String, ?>> builder;
|
||||
if (timestamped) {
|
||||
builder = Stores.timestampedKeyValueStoreBuilder(supplier, Serdes.String(), Serdes.Long());
|
||||
} else {
|
||||
builder = Stores.keyValueStoreBuilder(supplier, Serdes.String(), Serdes.Long());
|
||||
}
|
||||
if (caching) {
|
||||
builder.withCachingEnabled();
|
||||
} else {
|
||||
builder.withCachingDisabled();
|
||||
}
|
||||
if (logging) {
|
||||
builder.withLoggingEnabled(Collections.emptyMap());
|
||||
} else {
|
||||
builder.withLoggingDisabled();
|
||||
}
|
||||
|
||||
values.add(new Object[] {builder, timestamped, caching, logging});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (final Boolean timestamped : booleans) {
|
||||
for (final Boolean caching : booleans) {
|
||||
for (final Boolean logging : booleans) {
|
||||
final List<WindowBytesStoreSupplier> windowBytesStoreSuppliers = asList(
|
||||
Stores.inMemoryWindowStore("w" + timestamped + caching + logging, Duration.ofSeconds(1), Duration.ofSeconds(1), false),
|
||||
Stores.persistentWindowStore("w" + timestamped + caching + logging, Duration.ofSeconds(1), Duration.ofSeconds(1), false),
|
||||
Stores.persistentTimestampedWindowStore("w" + timestamped + caching + logging, Duration.ofSeconds(1), Duration.ofSeconds(1), false)
|
||||
);
|
||||
|
||||
for (final WindowBytesStoreSupplier supplier : windowBytesStoreSuppliers) {
|
||||
final StoreBuilder<? extends WindowStore<String, ?>> builder;
|
||||
if (timestamped) {
|
||||
builder = Stores.timestampedWindowStoreBuilder(supplier, Serdes.String(), Serdes.Long());
|
||||
} else {
|
||||
builder = Stores.windowStoreBuilder(supplier, Serdes.String(), Serdes.Long());
|
||||
}
|
||||
if (caching) {
|
||||
builder.withCachingEnabled();
|
||||
} else {
|
||||
builder.withCachingDisabled();
|
||||
}
|
||||
if (logging) {
|
||||
builder.withLoggingEnabled(Collections.emptyMap());
|
||||
} else {
|
||||
builder.withLoggingDisabled();
|
||||
}
|
||||
|
||||
values.add(new Object[] {builder, timestamped, caching, logging});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (final Boolean caching : booleans) {
|
||||
for (final Boolean logging : booleans) {
|
||||
final List<SessionBytesStoreSupplier> sessionBytesStoreSuppliers = asList(
|
||||
Stores.inMemorySessionStore("s" + caching + logging, Duration.ofSeconds(1)),
|
||||
Stores.persistentSessionStore("s" + caching + logging, Duration.ofSeconds(1))
|
||||
);
|
||||
|
||||
for (final SessionBytesStoreSupplier supplier : sessionBytesStoreSuppliers) {
|
||||
final StoreBuilder<? extends SessionStore<String, ?>> builder =
|
||||
Stores.sessionStoreBuilder(supplier, Serdes.String(), Serdes.Long());
|
||||
if (caching) {
|
||||
builder.withCachingEnabled();
|
||||
} else {
|
||||
builder.withCachingDisabled();
|
||||
}
|
||||
if (logging) {
|
||||
builder.withLoggingEnabled(Collections.emptyMap());
|
||||
} else {
|
||||
builder.withLoggingDisabled();
|
||||
}
|
||||
|
||||
values.add(new Object[] {builder, false, caching, logging});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return values;
|
||||
}
|
||||
|
||||
public MockProcessorContextStateStoreTest(final StoreBuilder<StateStore> builder,
|
||||
final boolean timestamped,
|
||||
final boolean caching,
|
||||
final boolean logging) {
|
||||
|
||||
this.builder = builder;
|
||||
this.timestamped = timestamped;
|
||||
this.caching = caching;
|
||||
this.logging = logging;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldEitherInitOrThrow() {
|
||||
final File stateDir = TestUtils.tempDirectory();
|
||||
try {
|
||||
final MockProcessorContext<Void, Void> context = new MockProcessorContext<>(
|
||||
mkProperties(mkMap(
|
||||
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, ""),
|
||||
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "")
|
||||
)),
|
||||
new TaskId(0, 0),
|
||||
stateDir
|
||||
);
|
||||
final StateStore store = builder.build();
|
||||
if (caching || logging) {
|
||||
assertThrows(
|
||||
IllegalArgumentException.class,
|
||||
() -> store.init(context.getStateStoreContext(), store)
|
||||
);
|
||||
} else {
|
||||
store.init(context.getStateStoreContext(), store);
|
||||
store.close();
|
||||
}
|
||||
} finally {
|
||||
try {
|
||||
Utils.delete(stateDir);
|
||||
} catch (final IOException e) {
|
||||
// Failed to clean up the state dir. The JVM hooks will try again later.
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -18,43 +18,41 @@ package org.apache.kafka.streams.test.wordcount;
|
|||
|
||||
import org.apache.kafka.streams.KeyValue;
|
||||
import org.apache.kafka.streams.kstream.Windowed;
|
||||
import org.apache.kafka.streams.processor.Processor;
|
||||
import org.apache.kafka.streams.processor.ProcessorContext;
|
||||
import org.apache.kafka.streams.processor.ProcessorSupplier;
|
||||
import org.apache.kafka.streams.processor.PunctuationType;
|
||||
import org.apache.kafka.streams.processor.api.Processor;
|
||||
import org.apache.kafka.streams.processor.api.ProcessorContext;
|
||||
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
|
||||
import org.apache.kafka.streams.processor.api.Record;
|
||||
import org.apache.kafka.streams.state.KeyValueIterator;
|
||||
import org.apache.kafka.streams.state.WindowStore;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Locale;
|
||||
|
||||
public final class WindowedWordCountProcessorSupplier implements ProcessorSupplier<String, String> {
|
||||
public final class WindowedWordCountProcessorSupplier implements ProcessorSupplier<String, String, String, String> {
|
||||
|
||||
@Override
|
||||
public Processor<String, String> get() {
|
||||
return new Processor<String, String>() {
|
||||
private ProcessorContext context;
|
||||
public Processor<String, String, String, String> get() {
|
||||
return new Processor<String, String, String, String>() {
|
||||
private WindowStore<String, Integer> windowStore;
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public void init(final ProcessorContext context) {
|
||||
this.context = context;
|
||||
this.context.schedule(Duration.ofSeconds(1), PunctuationType.STREAM_TIME, timestamp -> {
|
||||
public void init(final ProcessorContext<String, String> context) {
|
||||
context.schedule(Duration.ofSeconds(1), PunctuationType.STREAM_TIME, timestamp -> {
|
||||
try (final KeyValueIterator<Windowed<String>, Integer> iter = windowStore.all()) {
|
||||
while (iter.hasNext()) {
|
||||
final KeyValue<Windowed<String>, Integer> entry = iter.next();
|
||||
context.forward(entry.key.toString(), entry.value.toString());
|
||||
context.forward(new Record<>(entry.key.toString(), entry.value.toString(), timestamp));
|
||||
}
|
||||
}
|
||||
});
|
||||
windowStore = (WindowStore<String, Integer>) context.getStateStore("WindowedCounts");
|
||||
windowStore = context.getStateStore("WindowedCounts");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(final String key, final String value) {
|
||||
final String[] words = value.toLowerCase(Locale.getDefault()).split(" ");
|
||||
final long timestamp = context.timestamp();
|
||||
public void process(final Record<String, String> record) {
|
||||
final String[] words = record.value().toLowerCase(Locale.getDefault()).split(" ");
|
||||
final long timestamp = record.timestamp();
|
||||
|
||||
// calculate the window as every 100 ms
|
||||
// Note this has to be aligned with the configuration for the window store you register separately
|
||||
|
@ -70,9 +68,6 @@ public final class WindowedWordCountProcessorSupplier implements ProcessorSuppli
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,11 +18,12 @@ package org.apache.kafka.streams.test.wordcount;
|
|||
|
||||
import org.apache.kafka.common.serialization.Serdes;
|
||||
import org.apache.kafka.common.utils.Utils;
|
||||
import org.apache.kafka.streams.KeyValue;
|
||||
import org.apache.kafka.streams.StreamsConfig;
|
||||
import org.apache.kafka.streams.processor.MockProcessorContext;
|
||||
import org.apache.kafka.streams.processor.Processor;
|
||||
import org.apache.kafka.streams.processor.TaskId;
|
||||
import org.apache.kafka.streams.processor.api.MockProcessorContext;
|
||||
import org.apache.kafka.streams.processor.api.MockProcessorContext.CapturedForward;
|
||||
import org.apache.kafka.streams.processor.api.Processor;
|
||||
import org.apache.kafka.streams.processor.api.Record;
|
||||
import org.apache.kafka.streams.state.Stores;
|
||||
import org.apache.kafka.streams.state.WindowStore;
|
||||
import org.apache.kafka.test.TestUtils;
|
||||
|
@ -31,19 +32,17 @@ import org.junit.Test;
|
|||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.time.Duration;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
|
||||
import static java.util.Arrays.asList;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.junit.Assert.assertThrows;
|
||||
|
||||
public class WindowedWordCountProcessorTest {
|
||||
@SuppressWarnings("deprecation") // TODO will be fixed in KAFKA-10437
|
||||
@Test
|
||||
public void shouldWorkWithInMemoryStore() {
|
||||
final MockProcessorContext context = new MockProcessorContext();
|
||||
final MockProcessorContext<String, String> context = new MockProcessorContext<>();
|
||||
|
||||
// Create, initialize, and register the state store.
|
||||
final WindowStore<String, Integer> store =
|
||||
|
@ -56,20 +55,18 @@ public class WindowedWordCountProcessorTest {
|
|||
.withLoggingDisabled() // Changelog is not supported by MockProcessorContext.
|
||||
.withCachingDisabled() // Caching is not supported by MockProcessorContext.
|
||||
.build();
|
||||
store.init(context, store);
|
||||
context.register(store, null);
|
||||
store.init(context.getStateStoreContext(), store);
|
||||
context.getStateStoreContext().register(store, null);
|
||||
|
||||
// Create and initialize the processor under test
|
||||
final Processor<String, String> processor = new WindowedWordCountProcessorSupplier().get();
|
||||
final Processor<String, String, String, String> processor = new WindowedWordCountProcessorSupplier().get();
|
||||
processor.init(context);
|
||||
|
||||
// send a record to the processor
|
||||
context.setTimestamp(101);
|
||||
processor.process("key", "alpha beta gamma alpha");
|
||||
processor.process(new Record<>("key", "alpha beta gamma alpha", 101L));
|
||||
|
||||
// send a record to the processor in a new window
|
||||
context.setTimestamp(221);
|
||||
processor.process("key", "gamma delta");
|
||||
processor.process(new Record<>("key", "gamma delta", 221L));
|
||||
|
||||
// note that the processor does not forward during process()
|
||||
assertThat(context.forwarded().isEmpty(), is(true));
|
||||
|
@ -78,16 +75,18 @@ public class WindowedWordCountProcessorTest {
|
|||
context.scheduledPunctuators().get(0).getPunctuator().punctuate(1_000L);
|
||||
|
||||
// finally, we can verify the output.
|
||||
final Iterator<MockProcessorContext.CapturedForward> capturedForwards = context.forwarded().iterator();
|
||||
assertThat(capturedForwards.next().keyValue(), is(new KeyValue<>("[alpha@100/200]", "2")));
|
||||
assertThat(capturedForwards.next().keyValue(), is(new KeyValue<>("[beta@100/200]", "1")));
|
||||
assertThat(capturedForwards.next().keyValue(), is(new KeyValue<>("[gamma@100/200]", "1")));
|
||||
assertThat(capturedForwards.next().keyValue(), is(new KeyValue<>("[delta@200/300]", "1")));
|
||||
assertThat(capturedForwards.next().keyValue(), is(new KeyValue<>("[gamma@200/300]", "1")));
|
||||
assertThat(capturedForwards.hasNext(), is(false));
|
||||
final List<CapturedForward<? extends String, ? extends String>> capturedForwards = context.forwarded();
|
||||
final List<CapturedForward<? extends String, ? extends String>> expected = asList(
|
||||
new CapturedForward<>(new Record<>("[alpha@100/200]", "2", 1_000L)),
|
||||
new CapturedForward<>(new Record<>("[beta@100/200]", "1", 1_000L)),
|
||||
new CapturedForward<>(new Record<>("[gamma@100/200]", "1", 1_000L)),
|
||||
new CapturedForward<>(new Record<>("[delta@200/300]", "1", 1_000L)),
|
||||
new CapturedForward<>(new Record<>("[gamma@200/300]", "1", 1_000L))
|
||||
);
|
||||
|
||||
assertThat(capturedForwards, is(expected));
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation") // TODO will be fixed in KAFKA-10437
|
||||
@Test
|
||||
public void shouldWorkWithPersistentStore() throws IOException {
|
||||
final Properties properties = new Properties();
|
||||
|
@ -97,7 +96,7 @@ public class WindowedWordCountProcessorTest {
|
|||
final File stateDir = TestUtils.tempDirectory();
|
||||
|
||||
try {
|
||||
final MockProcessorContext context = new MockProcessorContext(
|
||||
final MockProcessorContext<String, String> context = new MockProcessorContext<>(
|
||||
properties,
|
||||
new TaskId(0, 0),
|
||||
stateDir
|
||||
|
@ -114,20 +113,18 @@ public class WindowedWordCountProcessorTest {
|
|||
.withLoggingDisabled() // Changelog is not supported by MockProcessorContext.
|
||||
.withCachingDisabled() // Caching is not supported by MockProcessorContext.
|
||||
.build();
|
||||
store.init(context, store);
|
||||
context.register(store, null);
|
||||
store.init(context.getStateStoreContext(), store);
|
||||
context.getStateStoreContext().register(store, null);
|
||||
|
||||
// Create and initialize the processor under test
|
||||
final Processor<String, String> processor = new WindowedWordCountProcessorSupplier().get();
|
||||
final Processor<String, String, String, String> processor = new WindowedWordCountProcessorSupplier().get();
|
||||
processor.init(context);
|
||||
|
||||
// send a record to the processor
|
||||
context.setTimestamp(101);
|
||||
processor.process("key", "alpha beta gamma alpha");
|
||||
processor.process(new Record<>("key", "alpha beta gamma alpha", 101L));
|
||||
|
||||
// send a record to the processor in a new window
|
||||
context.setTimestamp(221);
|
||||
processor.process("key", "gamma delta");
|
||||
processor.process(new Record<>("key", "gamma delta", 221L));
|
||||
|
||||
// note that the processor does not forward during process()
|
||||
assertThat(context.forwarded().isEmpty(), is(true));
|
||||
|
@ -136,54 +133,18 @@ public class WindowedWordCountProcessorTest {
|
|||
context.scheduledPunctuators().get(0).getPunctuator().punctuate(1_000L);
|
||||
|
||||
// finally, we can verify the output.
|
||||
final Iterator<MockProcessorContext.CapturedForward> capturedForwards = context.forwarded().iterator();
|
||||
assertThat(capturedForwards.next().keyValue(), is(new KeyValue<>("[alpha@100/200]", "2")));
|
||||
assertThat(capturedForwards.next().keyValue(), is(new KeyValue<>("[beta@100/200]", "1")));
|
||||
assertThat(capturedForwards.next().keyValue(), is(new KeyValue<>("[delta@200/300]", "1")));
|
||||
assertThat(capturedForwards.next().keyValue(), is(new KeyValue<>("[gamma@100/200]", "1")));
|
||||
assertThat(capturedForwards.next().keyValue(), is(new KeyValue<>("[gamma@200/300]", "1")));
|
||||
assertThat(capturedForwards.hasNext(), is(false));
|
||||
final List<CapturedForward<? extends String, ? extends String>> capturedForwards = context.forwarded();
|
||||
final List<CapturedForward<? extends String, ? extends String>> expected = asList(
|
||||
new CapturedForward<>(new Record<>("[alpha@100/200]", "2", 1_000L)),
|
||||
new CapturedForward<>(new Record<>("[beta@100/200]", "1", 1_000L)),
|
||||
new CapturedForward<>(new Record<>("[delta@200/300]", "1", 1_000L)),
|
||||
new CapturedForward<>(new Record<>("[gamma@100/200]", "1", 1_000L)),
|
||||
new CapturedForward<>(new Record<>("[gamma@200/300]", "1", 1_000L))
|
||||
);
|
||||
|
||||
assertThat(capturedForwards, is(expected));
|
||||
} finally {
|
||||
Utils.delete(stateDir);
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation") // TODO will be fixed in KAFKA-10437
|
||||
@Test
|
||||
public void shouldFailWithLogging() {
|
||||
final MockProcessorContext context = new MockProcessorContext();
|
||||
|
||||
// Create, initialize, and register the state store.
|
||||
final WindowStore<String, Integer> store =
|
||||
Stores.windowStoreBuilder(Stores.inMemoryWindowStore("WindowedCounts",
|
||||
Duration.ofDays(24),
|
||||
Duration.ofMillis(100),
|
||||
false),
|
||||
Serdes.String(),
|
||||
Serdes.Integer())
|
||||
.withLoggingEnabled(new HashMap<>()) // Changelog is not supported by MockProcessorContext.
|
||||
.withCachingDisabled() // Caching is not supported by MockProcessorContext.
|
||||
.build();
|
||||
assertThrows(IllegalArgumentException.class, () -> store.init(context, store));
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation") // TODO will be fixed in KAFKA-10437
|
||||
@Test
|
||||
public void shouldFailWithCaching() {
|
||||
final MockProcessorContext context = new MockProcessorContext();
|
||||
|
||||
// Create, initialize, and register the state store.
|
||||
final WindowStore<String, Integer> store =
|
||||
Stores.windowStoreBuilder(Stores.inMemoryWindowStore("WindowedCounts",
|
||||
Duration.ofDays(24),
|
||||
Duration.ofMillis(100),
|
||||
false),
|
||||
Serdes.String(),
|
||||
Serdes.Integer())
|
||||
.withLoggingDisabled() // Changelog is not supported by MockProcessorContext.
|
||||
.withCachingEnabled() // Caching is not supported by MockProcessorContext.
|
||||
.build();
|
||||
|
||||
assertThrows(IllegalArgumentException.class, () -> store.init(context, store));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue