mirror of https://github.com/apache/kafka.git
KAFKA-6473: Add MockProcessorContext to public test-utils (#4736)
Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <guozhang@confluent.io>, Bill Bejeck <bill@confluent.io>
This commit is contained in:
parent
395c7e0f09
commit
adbf31ab1d
|
@ -1002,6 +1002,10 @@ project(':streams:examples') {
|
|||
compile project(':streams')
|
||||
compile project(':connect:json') // this dependency should be removed after we unify data API
|
||||
compile libs.slf4jlog4j
|
||||
|
||||
testCompile project(':clients').sourceSets.test.output // for org.apache.kafka.test.IntegrationTest
|
||||
testCompile project(':streams:test-utils')
|
||||
testCompile libs.junit
|
||||
}
|
||||
|
||||
javadoc {
|
||||
|
|
|
@ -66,6 +66,7 @@
|
|||
</ul>
|
||||
</li>
|
||||
<li><a class="reference internal" href="#writing-streams-back-to-kafka" id="id25">Writing streams back to Kafka</a></li>
|
||||
<li><a class="reference internal" href="#testing-a-streams-app" id="id26">Testing a Streams application</a></li>
|
||||
</ul>
|
||||
</div>
|
||||
<div class="section" id="overview">
|
||||
|
@ -3154,6 +3155,10 @@ t=5 (blue), which lead to a merge of sessions and an extension of a session, res
|
|||
retry on delivery failure or to prevent message duplication).</p>
|
||||
</div>
|
||||
</div>
|
||||
<div class="section" id="testing-a-streams-app">
|
||||
<a class="headerlink" href="#testing-a-streams-app" title="Permalink to this headline"><h2>Testing a Streams application</a></h2>
|
||||
Kafka Streams comes with a <code>test-utils</code> module to help you test your application <a href="testing.html">here</a>.
|
||||
</div>
|
||||
</div>
|
||||
|
||||
|
||||
|
|
|
@ -41,13 +41,16 @@
|
|||
<p class="topic-title first"><b>Table of Contents</b></p>
|
||||
<ul class="simple">
|
||||
<li><a class="reference internal" href="#overview" id="id1">Overview</a></li>
|
||||
<li><a class="reference internal" href="#defining-a-stream-processor" id="id2">Defining a Stream Processor</a></li>
|
||||
<li><a class="reference internal" href="#state-stores" id="id3">State Stores</a><ul>
|
||||
<li><a class="reference internal" href="#defining-and-creating-a-state-store" id="id4">Defining and creating a State Store</a></li>
|
||||
<li><a class="reference internal" href="#fault-tolerant-state-stores" id="id5">Fault-tolerant State Stores</a></li>
|
||||
<li><a class="reference internal" href="#enable-or-disable-fault-tolerance-of-state-stores-store-changelogs" id="id6">Enable or Disable Fault Tolerance of State Stores (Store Changelogs)</a></li>
|
||||
<li><a class="reference internal" href="#implementing-custom-state-stores" id="id7">Implementing Custom State Stores</a></li>
|
||||
</ul>
|
||||
<li><a class="reference internal" href="#defining-a-stream-processor" id="id2">Defining a Stream
|
||||
Processor</a></li>
|
||||
<li><a class="reference internal" href="#unit-testing-processors" id="id9">Unit Testing Processors</a></li>
|
||||
<li><a class="reference internal" href="#state-stores" id="id3">State Stores</a>
|
||||
<ul>
|
||||
<li><a class="reference internal" href="#defining-and-creating-a-state-store" id="id4">Defining and creating a State Store</a></li>
|
||||
<li><a class="reference internal" href="#fault-tolerant-state-stores" id="id5">Fault-tolerant State Stores</a></li>
|
||||
<li><a class="reference internal" href="#enable-or-disable-fault-tolerance-of-state-stores-store-changelogs" id="id6">Enable or Disable Fault Tolerance of State Stores (Store Changelogs)</a></li>
|
||||
<li><a class="reference internal" href="#implementing-custom-state-stores" id="id7">Implementing Custom State Stores</a></li>
|
||||
</ul>
|
||||
</li>
|
||||
<li><a class="reference internal" href="#connecting-processors-and-state-stores" id="id8">Connecting Processors and State Stores</a></li>
|
||||
</ul>
|
||||
|
@ -98,11 +101,12 @@
|
|||
callbacks with different <code class="docutils literal"><span class="pre">PunctuationType</span></code> types within the same processor by calling <code class="docutils literal"><span class="pre">ProcessorContext#schedule()</span></code> multiple
|
||||
times inside <code class="docutils literal"><span class="pre">init()</span></code> method.</p>
|
||||
<div class="admonition attention">
|
||||
<p class="first admonition-title">Attention</p>
|
||||
<p class="first admonition-title"><b>Attention</b></p>
|
||||
<p class="last">Stream-time is only advanced if all input partitions over all input topics have new data (with newer timestamps) available.
|
||||
If at least one partition does not have any new data available, stream-time will not be advanced and thus <code class="docutils literal"><span class="pre">punctuate()</span></code> will not be triggered if <code class="docutils literal"><span class="pre">PunctuationType.STREAM_TIME</span></code> was specified.
|
||||
This behavior is independent of the configured timestamp extractor, i.e., using <code class="docutils literal"><span class="pre">WallclockTimestampExtractor</span></code> does not enable wall-clock triggering of <code class="docutils literal"><span class="pre">punctuate()</span></code>.</p>
|
||||
</div>
|
||||
<p><b>Example</b></p>
|
||||
<p>The following example <code class="docutils literal"><span class="pre">Processor</span></code> defines a simple word-count algorithm and the following actions are performed:</p>
|
||||
<ul class="simple">
|
||||
<li>In the <code class="docutils literal"><span class="pre">init()</span></code> method, schedule the punctuation every 1000 time units (the time unit is normally milliseconds, which in this example would translate to punctuation every 1 second) and retrieve the local state store by its name “Counts”.</li>
|
||||
|
@ -159,6 +163,16 @@
|
|||
arrived records for stateful processing needs like aggregations and joins. For more information, see the <a class="reference internal" href="#streams-developer-guide-state-store"><span class="std std-ref">state stores</span></a> documentation.</p>
|
||||
</div>
|
||||
</div>
|
||||
<div class="section" id="unit-testing-processors">
|
||||
<h2>
|
||||
<a class="toc-backref" href="#id9">Unit Testing Processors</a>
|
||||
<a class="headerlink" href="#unit-testing-processors" title="Permalink to this headline"></a>
|
||||
</h2>
|
||||
<p>
|
||||
Kafka Streams comes with a <code>test-utils</code> module to help you write unit tests for your
|
||||
processors <a href="testing.html#unit-testing-processors">here</a>.
|
||||
</p>
|
||||
</div>
|
||||
<div class="section" id="state-stores">
|
||||
<span id="streams-developer-guide-state-store"></span><h2><a class="toc-backref" href="#id3">State Stores</a><a class="headerlink" href="#state-stores" title="Permalink to this headline"></a></h2>
|
||||
<p>To implement a <strong>stateful</strong> <code class="docutils literal"><span class="pre">Processor</span></code> or <code class="docutils literal"><span class="pre">Transformer</span></code>, you must provide one or more state stores to the processor
|
||||
|
|
|
@ -18,26 +18,40 @@
|
|||
<script><!--#include virtual="../../js/templateData.js" --></script>
|
||||
|
||||
<script id="content-template" type="text/x-handlebars-template">
|
||||
<!-- h1>Developer Guide for Kafka Streams</h1 -->
|
||||
<div class="sub-nav-sticky">
|
||||
<div class="sticky-top">
|
||||
<!-- div style="height:35px">
|
||||
<a href="/{{version}}/documentation/streams/">Introduction</a>
|
||||
<a class="active-menu-item" href="/{{version}}/documentation/streams/developer-guide">Developer Guide</a>
|
||||
<a href="/{{version}}/documentation/streams/core-concepts">Concepts</a>
|
||||
<a href="/{{version}}/documentation/streams/quickstart">Run Demo App</a>
|
||||
<a href="/{{version}}/documentation/streams/tutorial">Tutorial: Write App</a>
|
||||
</div -->
|
||||
<!-- h1>Developer Guide for Kafka Streams</h1 -->
|
||||
<div class="sub-nav-sticky">
|
||||
<div class="sticky-top">
|
||||
<!-- div style="height:35px">
|
||||
<a href="/{{version}}/documentation/streams/">Introduction</a>
|
||||
<a class="active-menu-item" href="/{{version}}/documentation/streams/developer-guide">Developer Guide</a>
|
||||
<a href="/{{version}}/documentation/streams/core-concepts">Concepts</a>
|
||||
<a href="/{{version}}/documentation/streams/quickstart">Run Demo App</a>
|
||||
<a href="/{{version}}/documentation/streams/tutorial">Tutorial: Write App</a>
|
||||
</div -->
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div class="section" id="testing">
|
||||
<span id="streams-developer-guide-testing"></span><h1>Testing a Streams Application<a class="headerlink" href="#testing" title="Permalink to this headline"></a></h1>
|
||||
<p>
|
||||
To test a Kafka Streams application, Kafka provides a test-utils artifact that can be added as regular dependency to your test code base.
|
||||
Example <code>pom.xml</code> snippet when using Maven:
|
||||
</p>
|
||||
<pre>
|
||||
<div class="section" id="testing">
|
||||
<span id="streams-developer-guide-testing"></span>
|
||||
<h1>Testing Kafka Streams<a class="headerlink" href="#testing" title="Permalink to this headline"></a></h1>
|
||||
<div class="contents local topic" id="table-of-contents">
|
||||
<p class="topic-title first"><b>Table of Contents</b></p>
|
||||
<ul class="simple">
|
||||
<li><a class="reference internal" href="#test-utils-artifact">Importing the test utilities</a></li>
|
||||
<li><a class="reference internal" href="#testing-topologytestdriver">Testing Streams applications</a>
|
||||
</li>
|
||||
<li><a class="reference internal" href="#unit-testing-processors">Unit testing Processors</a>
|
||||
</li>
|
||||
</ul>
|
||||
</div>
|
||||
<div class="section" id="test-utils-artifact">
|
||||
<h2><a class="toc-backref" href="#test-utils-artifact" title="Permalink to this headline">Importing the test
|
||||
utilities</a></h2>
|
||||
<p>
|
||||
To test a Kafka Streams application, Kafka provides a test-utils artifact that can be added as regular
|
||||
dependency to your test code base. Example <code>pom.xml</code> snippet when using Maven:
|
||||
</p>
|
||||
<pre>
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka-streams-test-utils</artifactId>
|
||||
|
@ -45,13 +59,21 @@
|
|||
<scope>test</scope>
|
||||
</dependency>
|
||||
</pre>
|
||||
<p>
|
||||
The test-utils package provides a <code>TopologyTestDriver</code> that can be used pipe data through a <code>Topology</code> that is either assembled manually
|
||||
using Processor API or via the DSL using <code>StreamsBuilder</code>.
|
||||
The test driver simulates the library runtime that continuously fetches records from input topics and processes them by traversing the topology.
|
||||
You can use the test driver to verify that your specified processor topology computes the correct result with the manually piped in data records.
|
||||
The test driver captures the results records and allows to query its embedded state stores.
|
||||
<pre>
|
||||
</div>
|
||||
<div class="section" id="testing-topologytestdriver">
|
||||
<h2><a class="toc-backref" href="#testing-topologytestdriver" title="Permalink to this headline">Testing a
|
||||
Streams application</a></h2>
|
||||
|
||||
<p>
|
||||
The test-utils package provides a <code>TopologyTestDriver</code> that can be used pipe data through a
|
||||
<code>Topology</code> that is either assembled manually
|
||||
using Processor API or via the DSL using <code>StreamsBuilder</code>.
|
||||
The test driver simulates the library runtime that continuously fetches records from input topics and
|
||||
processes them by traversing the topology.
|
||||
You can use the test driver to verify that your specified processor topology computes the correct result
|
||||
with the manually piped in data records.
|
||||
The test driver captures the results records and allows to query its embedded state stores.
|
||||
<pre>
|
||||
// Processor API
|
||||
Topology topology = new Topology();
|
||||
topology.addSource("sourceProcessor", "input-topic");
|
||||
|
@ -68,62 +90,66 @@ Properties config = new Properties();
|
|||
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
|
||||
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
|
||||
TopologyTestDriver testDriver = new TopologyTestDriver(topology, config);
|
||||
</pre>
|
||||
<p>
|
||||
The test driver accepts <code>ConsumerRecord</code>s with key and value type <code>byte[]</code>.
|
||||
Because <code>byte[]</code> types can be problematic, you can use the <code>ConsumerRecordFactory</code> to generate those records
|
||||
by providing regular Java types for key and values and the corresponding serializers.
|
||||
</p>
|
||||
<pre>
|
||||
</pre>
|
||||
<p>
|
||||
The test driver accepts <code>ConsumerRecord</code>s with key and value type <code>byte[]</code>.
|
||||
Because <code>byte[]</code> types can be problematic, you can use the <code>ConsumerRecordFactory</code>
|
||||
to generate those records
|
||||
by providing regular Java types for key and values and the corresponding serializers.
|
||||
</p>
|
||||
<pre>
|
||||
ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>("input-topic", new StringSerializer(), new IntegerSerializer());
|
||||
testDriver.pipe(factory.create("key", 42L));
|
||||
</pre>
|
||||
<p>
|
||||
To verify the output, the test driver produces <code>ProducerRecord</code>s with key and value type <code>byte[]</code>.
|
||||
For result verification, you can specify corresponding deserializers when reading the output record from the driver.
|
||||
<pre>
|
||||
</pre>
|
||||
<p>
|
||||
To verify the output, the test driver produces <code>ProducerRecord</code>s with key and value type
|
||||
<code>byte[]</code>.
|
||||
For result verification, you can specify corresponding deserializers when reading the output record from
|
||||
the driver.
|
||||
<pre>
|
||||
ProducerRecord<String, Integer> outputRecord = testDriver.readOutput("output-topic", new StringDeserializer(), new LongDeserializer());
|
||||
</pre>
|
||||
<p>
|
||||
For result verification, you can use <code>OutputVerifier</code>.
|
||||
It offers helper methods to compare only certain parts of the result record:
|
||||
for example, you might only care about the key and value, but not the timestamp of the result record.
|
||||
</p>
|
||||
<pre>
|
||||
</pre>
|
||||
<p>
|
||||
For result verification, you can use <code>OutputVerifier</code>.
|
||||
It offers helper methods to compare only certain parts of the result record:
|
||||
for example, you might only care about the key and value, but not the timestamp of the result record.
|
||||
</p>
|
||||
<pre>
|
||||
OutputVerifier.compareKeyValue(outputRecord, "key", 42L); // throws AssertionError if key or value does not match
|
||||
</pre>
|
||||
<p>
|
||||
<code>TopologyTestDriver</code> supports punctuations, too.
|
||||
Event-time punctuations are triggered automatically based on the processed records' timestamps.
|
||||
Wall-clock-time punctuations can also be triggered by advancing the test driver's wall-clock-time (the driver mocks wall-clock-time internally to give users control over it).
|
||||
</p>
|
||||
<pre>
|
||||
</pre>
|
||||
<p>
|
||||
<code>TopologyTestDriver</code> supports punctuations, too.
|
||||
Event-time punctuations are triggered automatically based on the processed records' timestamps.
|
||||
Wall-clock-time punctuations can also be triggered by advancing the test driver's wall-clock-time (the
|
||||
driver mocks wall-clock-time internally to give users control over it).
|
||||
</p>
|
||||
<pre>
|
||||
testDriver.advanceWallClockTime(20L);
|
||||
</pre>
|
||||
</div>
|
||||
<p>
|
||||
Additionally, you can access state stores via the test driver before or after a test.
|
||||
Accessing stores before a test is useful to pre-populate a store with some initial values.
|
||||
After data was processed, expected updates to the store can be verified.
|
||||
</p>
|
||||
<pre>
|
||||
</pre>
|
||||
<p>
|
||||
Additionally, you can access state stores via the test driver before or after a test.
|
||||
Accessing stores before a test is useful to pre-populate a store with some initial values.
|
||||
After data was processed, expected updates to the store can be verified.
|
||||
</p>
|
||||
<pre>
|
||||
KeyValueStore store = testDriver.getKeyValueStore("store-name");
|
||||
</pre>
|
||||
<p>
|
||||
Note, that you should always close the test driver at the end to make sure all resources are release properly.
|
||||
</p>
|
||||
<pre>
|
||||
</pre>
|
||||
<p>
|
||||
Note, that you should always close the test driver at the end to make sure all resources are release
|
||||
properly.
|
||||
</p>
|
||||
<pre>
|
||||
testDriver.close();
|
||||
</pre>
|
||||
</pre>
|
||||
|
||||
<h2>Example</h2>
|
||||
<p>
|
||||
The following example demonstrates how to use the test driver and helper classes.
|
||||
The example creates a topology that computes the maximum value per key using a key-value-store.
|
||||
While processing, no output is generated, but only the store is updated.
|
||||
Output is only sent downstream based on event-time and wall-clock punctuations.
|
||||
</p>
|
||||
<pre>
|
||||
<h3>Example</h3>
|
||||
<p>
|
||||
The following example demonstrates how to use the test driver and helper classes.
|
||||
The example creates a topology that computes the maximum value per key using a key-value-store.
|
||||
While processing, no output is generated, but only the store is updated.
|
||||
Output is only sent downstream based on event-time and wall-clock punctuations.
|
||||
</p>
|
||||
<pre>
|
||||
private TopologyTestDriver testDriver;
|
||||
private KeyValueStore<String, Long> store;
|
||||
|
||||
|
@ -266,31 +292,147 @@ public class CustomMaxAggregator implements Processor<String, Long> {
|
|||
@Override
|
||||
public void close() {}
|
||||
}
|
||||
</pre>
|
||||
<div class="pagination">
|
||||
<div class="pagination">
|
||||
<a href="/{{version}}/documentation/streams/developer-guide/datatypes" class="pagination__btn pagination__btn__prev">Previous</a>
|
||||
<a href="/{{version}}/documentation/streams/developer-guide/interactive-queries" class="pagination__btn pagination__btn__next">Next</a>
|
||||
</div>
|
||||
</pre>
|
||||
</div>
|
||||
<div class="section" id="unit-testing-processors">
|
||||
<h2>
|
||||
<a class="headerlink" href="#unit-testing-processors"
|
||||
title="Permalink to this headline">Unit Testing Processors</a>
|
||||
</h2>
|
||||
<p>
|
||||
If you <a href="processor-api.html">write a Processor</a>, you will want to test it.
|
||||
</p>
|
||||
<p>
|
||||
Because the <code>Processor</code> forwards its results to the context rather than returning them,
|
||||
Unit testing requires a mocked context capable of capturing forwarded data for inspection.
|
||||
For this reason, we provide a <code>MockProcessorContext</code> in <a href="#test-utils-artifact"><code>test-utils</code></a>.
|
||||
</p>
|
||||
<b>Construction</b>
|
||||
<p>
|
||||
To begin with, instantiate your processor and initialize it with the mock context:
|
||||
<pre>
|
||||
final Processor processorUnderTest = ...;
|
||||
final MockProcessorContext context = new MockProcessorContext();
|
||||
processorUnderTest.init(context);
|
||||
</pre>
|
||||
If you need to pass configuration to your processor or set the default serdes, you can create the mock with
|
||||
config:
|
||||
<pre>
|
||||
final Properties config = new Properties();
|
||||
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "unit-test");
|
||||
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "");
|
||||
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
|
||||
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
|
||||
config.put("some.other.config", "some config value");
|
||||
final MockProcessorContext context = new MockProcessorContext(config);
|
||||
</pre>
|
||||
</p>
|
||||
<b>Captured data</b>
|
||||
<p>
|
||||
The mock will capture any values that your processor forwards. You can make assertions on them:
|
||||
<pre>
|
||||
processorUnderTest.process("key", "value");
|
||||
|
||||
final Iterator<CapturedForward> forwarded = context.forwarded().iterator();
|
||||
assertEquals(forwarded.next().keyValue(), new KeyValue<>(..., ...));
|
||||
assertFalse(forwarded.hasNext());
|
||||
|
||||
// you can reset forwards to clear the captured data. This may be helpful in constructing longer scenarios.
|
||||
context.resetForwards();
|
||||
|
||||
assertEquals(context.forwarded().size(), 0);
|
||||
</pre>
|
||||
If your processor forwards to specific child processors, you can query the context for captured data by
|
||||
child name:
|
||||
<pre>
|
||||
final List<CapturedForward> captures = context.forwarded("childProcessorName");
|
||||
</pre>
|
||||
The mock also captures whether your processor has called <code>commit()</code> on the context:
|
||||
<pre>
|
||||
assertTrue(context.committed());
|
||||
|
||||
// commit captures can also be reset.
|
||||
context.resetCommit();
|
||||
|
||||
assertFalse(context.committed());
|
||||
</pre>
|
||||
</p>
|
||||
<b>Setting record metadata</b>
|
||||
<p>
|
||||
In case your processor logic depends on the record metadata (topic, partition, offset, or timestamp),
|
||||
you can set them on the context, either all together or individually:
|
||||
<pre>
|
||||
context.setRecordMetadata("topicName", /*partition*/ 0, /*offset*/ 0L, /*timestamp*/ 0L);
|
||||
context.setTopic("topicName");
|
||||
context.setPartition(0);
|
||||
context.setOffset(0L);
|
||||
context.setTimestamp(0L);
|
||||
</pre>
|
||||
Once these are set, the context will continue returning the same values, until you set new ones.
|
||||
</p>
|
||||
<b>State stores</b>
|
||||
<p>
|
||||
In case your punctuator is stateful, the mock context allows you to register state stores.
|
||||
You're encouraged to use a simple in-memory store of the appropriate type (KeyValue, Windowed, or
|
||||
Session), since the mock context does <i>not</i> manage changelogs, state directories, etc.
|
||||
</p>
|
||||
<pre>
|
||||
final KeyValueStore<String, Integer> store =
|
||||
Stores.keyValueStoreBuilder(
|
||||
Stores.inMemoryKeyValueStore("myStore"),
|
||||
Serdes.String(),
|
||||
Serdes.Integer()
|
||||
)
|
||||
.withLoggingDisabled() // Changelog is not supported by MockProcessorContext.
|
||||
.build();
|
||||
store.init(context, store);
|
||||
context.register(store, /*deprecated parameter*/ false, /*parameter unused in mock*/ null);
|
||||
</pre>
|
||||
<b>Verifying punctuators</b>
|
||||
<p>
|
||||
Processors can schedule punctuators to handle periodic tasks.
|
||||
The mock context does <i>not</i> automatically execute punctuators, but it does capture them to
|
||||
allow you to unit test them as well:
|
||||
<pre>
|
||||
final MockProcessorContext.CapturedPunctuator capturedPunctuator = context.scheduledPunctuators().get(0);
|
||||
final long interval = capturedPunctuator.getIntervalMs();
|
||||
final PunctuationType type = capturedPunctuator.getType();
|
||||
final boolean cancelled = capturedPunctuator.cancelled();
|
||||
final Punctuator punctuator = capturedPunctuator.getPunctuator();
|
||||
punctuator.punctuate(/*timestamp*/ 0L);
|
||||
</pre>
|
||||
If you need to write tests involving automatic firing of scheduled punctuators, we recommend creating a
|
||||
simple topology with your processor and using the <a href="testing.html#testing-topologytestdriver"><code>TopologyTestDriver</code></a>.
|
||||
</p>
|
||||
</div>
|
||||
</div>
|
||||
<div class="pagination">
|
||||
<div class="pagination">
|
||||
<a href="/{{version}}/documentation/streams/developer-guide/datatypes"
|
||||
class="pagination__btn pagination__btn__prev">Previous</a>
|
||||
<a href="/{{version}}/documentation/streams/developer-guide/interactive-queries"
|
||||
class="pagination__btn pagination__btn__next">Next</a>
|
||||
</div>
|
||||
</div>
|
||||
</script>
|
||||
|
||||
<!--#include virtual="../../../includes/_header.htm" -->
|
||||
<!--#include virtual="../../../includes/_top.htm" -->
|
||||
<div class="content documentation documentation--current">
|
||||
<!--#include virtual="../../../includes/_nav.htm" -->
|
||||
<div class="right">
|
||||
<!--#include virtual="../../../includes/_docs_banner.htm" -->
|
||||
<ul class="breadcrumbs">
|
||||
<li><a href="/documentation">Documentation</a></li>
|
||||
<li><a href="/documentation/streams">Kafka Streams</a></li>
|
||||
<li><a href="/documentation/streams/developer-guide/">Developer Guide</a></li>
|
||||
</ul>
|
||||
<div class="p-content"></div>
|
||||
</div>
|
||||
<!--#include virtual="../../../includes/_nav.htm" -->
|
||||
<div class="right">
|
||||
<!--#include virtual="../../../includes/_docs_banner.htm" -->
|
||||
<ul class="breadcrumbs">
|
||||
<li><a href="/documentation">Documentation</a></li>
|
||||
<li><a href="/documentation/streams">Kafka Streams</a></li>
|
||||
<li><a href="/documentation/streams/developer-guide/">Developer Guide</a></li>
|
||||
</ul>
|
||||
<div class="p-content"></div>
|
||||
</div>
|
||||
</div>
|
||||
<!--#include virtual="../../../includes/_footer.htm" -->
|
||||
<script>
|
||||
$(function() {
|
||||
$(function () {
|
||||
// Show selected style on nav item
|
||||
$('.b-nav__streams').addClass('selected');
|
||||
|
||||
|
@ -299,7 +441,7 @@ public class CustomMaxAggregator implements Processor<String, Long> {
|
|||
y_pos = $navbar.offset().top,
|
||||
height = $navbar.height();
|
||||
|
||||
$(window).scroll(function() {
|
||||
$(window).scroll(function () {
|
||||
var scrollTop = $(window).scrollTop();
|
||||
|
||||
if (scrollTop > y_pos - height) {
|
||||
|
|
|
@ -37,6 +37,7 @@
|
|||
<ul class="simple">
|
||||
<li><a class="reference internal" href="#libraries-and-maven-artifacts" id="id1">Libraries and Maven artifacts</a></li>
|
||||
<li><a class="reference internal" href="#using-kafka-streams-within-your-application-code" id="id2">Using Kafka Streams within your application code</a></li>
|
||||
<li><a class="reference internal" href="#testing-a-streams-app" id="id3">Testing a Streams application</a></li>
|
||||
</ul>
|
||||
<p>Any Java application that makes use of the Kafka Streams library is considered a Kafka Streams application.
|
||||
The computational logic of a Kafka Streams application is defined as a <a class="reference internal" href="../concepts.html#streams-concepts"><span class="std std-ref">processor topology</span></a>,
|
||||
|
@ -196,6 +197,11 @@
|
|||
<p>After an application is stopped, Kafka Streams will migrate any tasks that had been running in this instance to available remaining
|
||||
instances.</p>
|
||||
</div>
|
||||
|
||||
<div class="section" id="testing-a-streams-app">
|
||||
<a class="headerlink" href="#testing-a-streams-app" title="Permalink to this headline"><h2>Testing a Streams application</a></h2>
|
||||
Kafka Streams comes with a <code>test-utils</code> module to help you test your application <a href="testing.html">here</a>.
|
||||
</div>
|
||||
</div>
|
||||
|
||||
|
||||
|
|
|
@ -49,7 +49,7 @@ import java.util.concurrent.CountDownLatch;
|
|||
*/
|
||||
public class WordCountProcessorDemo {
|
||||
|
||||
private static class MyProcessorSupplier implements ProcessorSupplier<String, String> {
|
||||
static class MyProcessorSupplier implements ProcessorSupplier<String, String> {
|
||||
|
||||
@Override
|
||||
public Processor<String, String> get() {
|
||||
|
|
|
@ -0,0 +1,70 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.streams.examples.wordcount;
|
||||
|
||||
import org.apache.kafka.common.serialization.Serdes;
|
||||
import org.apache.kafka.streams.KeyValue;
|
||||
import org.apache.kafka.streams.processor.MockProcessorContext;
|
||||
import org.apache.kafka.streams.processor.Processor;
|
||||
import org.apache.kafka.streams.state.KeyValueStore;
|
||||
import org.apache.kafka.streams.state.Stores;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Iterator;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
/**
|
||||
* Demonstrate the use of {@link MockProcessorContext} for testing the {@link Processor} in the {@link WordCountProcessorDemo}.
|
||||
*/
|
||||
public class WordCountProcessorTest {
|
||||
@Test
|
||||
public void test() {
|
||||
final MockProcessorContext context = new MockProcessorContext();
|
||||
|
||||
// Create, initialize, and register the state store.
|
||||
final KeyValueStore<String, Integer> store =
|
||||
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("Counts"), Serdes.String(), Serdes.Integer())
|
||||
.withLoggingDisabled() // Changelog is not supported by MockProcessorContext.
|
||||
.build();
|
||||
store.init(context, store);
|
||||
context.register(store, false, null);
|
||||
|
||||
// Create and initialize the processor under test
|
||||
final Processor<String, String> processor = new WordCountProcessorDemo.MyProcessorSupplier().get();
|
||||
processor.init(context);
|
||||
|
||||
// send a record to the processor
|
||||
processor.process("key", "alpha beta gamma alpha");
|
||||
|
||||
// note that the processor commits, but does not forward, during process()
|
||||
assertTrue(context.committed());
|
||||
assertTrue(context.forwarded().isEmpty());
|
||||
|
||||
// now, we trigger the punctuator, which iterates over the state store and forwards the contents.
|
||||
context.scheduledPunctuators().get(0).getPunctuator().punctuate(0L);
|
||||
|
||||
// finally, we can verify the output.
|
||||
final Iterator<MockProcessorContext.CapturedForward> capturedForwards = context.forwarded().iterator();
|
||||
assertEquals(new KeyValue<>("alpha", "2"), capturedForwards.next().keyValue());
|
||||
assertEquals(new KeyValue<>("beta", "1"), capturedForwards.next().keyValue());
|
||||
assertEquals(new KeyValue<>("gamma", "1"), capturedForwards.next().keyValue());
|
||||
assertFalse(capturedForwards.hasNext());
|
||||
}
|
||||
}
|
|
@ -26,7 +26,7 @@ import org.apache.kafka.streams.kstream.Joined;
|
|||
import org.apache.kafka.streams.kstream.KStream;
|
||||
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
|
||||
import org.apache.kafka.test.KStreamTestDriver;
|
||||
import org.apache.kafka.test.MockProcessorContext;
|
||||
import org.apache.kafka.test.InternalMockProcessorContext;
|
||||
import org.apache.kafka.test.MockProcessorSupplier;
|
||||
import org.apache.kafka.test.MockValueJoiner;
|
||||
import org.apache.kafka.test.TestUtils;
|
||||
|
@ -719,6 +719,6 @@ public class KStreamKStreamJoinTest {
|
|||
}
|
||||
|
||||
private void setRecordContext(final long time, final String topic) {
|
||||
((MockProcessorContext) driver.context()).setRecordContext(new ProcessorRecordContext(time, 0, 0, topic));
|
||||
((InternalMockProcessorContext) driver.context()).setRecordContext(new ProcessorRecordContext(time, 0, 0, topic));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,7 +26,7 @@ import org.apache.kafka.streams.kstream.Joined;
|
|||
import org.apache.kafka.streams.kstream.KStream;
|
||||
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
|
||||
import org.apache.kafka.test.KStreamTestDriver;
|
||||
import org.apache.kafka.test.MockProcessorContext;
|
||||
import org.apache.kafka.test.InternalMockProcessorContext;
|
||||
import org.apache.kafka.test.MockProcessorSupplier;
|
||||
import org.apache.kafka.test.MockValueJoiner;
|
||||
import org.apache.kafka.test.TestUtils;
|
||||
|
@ -304,6 +304,6 @@ public class KStreamKStreamLeftJoinTest {
|
|||
}
|
||||
|
||||
private void setRecordContext(final long time, final String topic) {
|
||||
((MockProcessorContext) driver.context()).setRecordContext(new ProcessorRecordContext(time, 0, 0, topic));
|
||||
((InternalMockProcessorContext) driver.context()).setRecordContext(new ProcessorRecordContext(time, 0, 0, topic));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -31,7 +31,7 @@ import org.apache.kafka.streams.state.KeyValueIterator;
|
|||
import org.apache.kafka.streams.state.internals.RocksDBSessionStoreSupplier;
|
||||
import org.apache.kafka.streams.state.SessionStore;
|
||||
import org.apache.kafka.streams.state.internals.ThreadCache;
|
||||
import org.apache.kafka.test.MockProcessorContext;
|
||||
import org.apache.kafka.test.InternalMockProcessorContext;
|
||||
import org.apache.kafka.test.NoOpRecordCollector;
|
||||
import org.apache.kafka.test.TestUtils;
|
||||
import org.junit.After;
|
||||
|
@ -81,13 +81,13 @@ public class KStreamSessionWindowAggregateProcessorTest {
|
|||
private final List<KeyValue> results = new ArrayList<>();
|
||||
private Processor<String, String> processor = sessionAggregator.get();
|
||||
private SessionStore<String, Long> sessionStore;
|
||||
private MockProcessorContext context;
|
||||
private InternalMockProcessorContext context;
|
||||
|
||||
|
||||
@Before
|
||||
public void initializeStore() {
|
||||
final File stateDir = TestUtils.tempDirectory();
|
||||
context = new MockProcessorContext(stateDir,
|
||||
context = new InternalMockProcessorContext(stateDir,
|
||||
Serdes.String(), Serdes.String(), new NoOpRecordCollector(), new ThreadCache(new LogContext("testCache "), 100000, new MockStreamsMetrics(new Metrics()))) {
|
||||
@Override
|
||||
public <K, V> void forward(final K key, final V value) {
|
||||
|
|
|
@ -31,7 +31,7 @@ import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
|
|||
import org.apache.kafka.test.KStreamTestDriver;
|
||||
import org.apache.kafka.test.MockAggregator;
|
||||
import org.apache.kafka.test.MockInitializer;
|
||||
import org.apache.kafka.test.MockProcessorContext;
|
||||
import org.apache.kafka.test.InternalMockProcessorContext;
|
||||
import org.apache.kafka.test.MockProcessorSupplier;
|
||||
import org.apache.kafka.test.TestUtils;
|
||||
import org.junit.Before;
|
||||
|
@ -143,7 +143,7 @@ public class KStreamWindowAggregateTest {
|
|||
}
|
||||
|
||||
private void setRecordContext(final long time, final String topic) {
|
||||
((MockProcessorContext) driver.context()).setRecordContext(new ProcessorRecordContext(time, 0, 0, topic));
|
||||
((InternalMockProcessorContext) driver.context()).setRecordContext(new ProcessorRecordContext(time, 0, 0, topic));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -32,7 +32,7 @@ import org.apache.kafka.streams.errors.LockException;
|
|||
import org.apache.kafka.streams.errors.ProcessorStateException;
|
||||
import org.apache.kafka.streams.processor.StateStore;
|
||||
import org.apache.kafka.streams.processor.TaskId;
|
||||
import org.apache.kafka.test.MockProcessorContext;
|
||||
import org.apache.kafka.test.InternalMockProcessorContext;
|
||||
import org.apache.kafka.test.MockRestoreCallback;
|
||||
import org.apache.kafka.test.MockStateRestoreListener;
|
||||
import org.apache.kafka.test.TestUtils;
|
||||
|
@ -194,7 +194,7 @@ public class AbstractTaskTest {
|
|||
testFile4.createNewFile();
|
||||
assertTrue(testFile4.exists());
|
||||
|
||||
task.processorContext = new MockProcessorContext(stateDirectory.directoryForTask(task.id), streamsConfig);
|
||||
task.processorContext = new InternalMockProcessorContext(stateDirectory.directoryForTask(task.id), streamsConfig);
|
||||
|
||||
task.stateMgr.register(store1, new MockRestoreCallback());
|
||||
task.stateMgr.register(store2, new MockRestoreCallback());
|
||||
|
|
|
@ -34,7 +34,7 @@ import org.apache.kafka.streams.errors.StreamsException;
|
|||
import org.apache.kafka.streams.processor.StateRestoreCallback;
|
||||
import org.apache.kafka.streams.processor.StateStore;
|
||||
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
|
||||
import org.apache.kafka.test.MockProcessorContext;
|
||||
import org.apache.kafka.test.InternalMockProcessorContext;
|
||||
import org.apache.kafka.test.MockStateRestoreListener;
|
||||
import org.apache.kafka.test.NoOpReadOnlyStore;
|
||||
import org.apache.kafka.test.TestUtils;
|
||||
|
@ -86,7 +86,7 @@ public class GlobalStateManagerImplTest {
|
|||
private MockConsumer<byte[], byte[]> consumer;
|
||||
private File checkpointFile;
|
||||
private ProcessorTopology topology;
|
||||
private MockProcessorContext mockProcessorContext;
|
||||
private InternalMockProcessorContext processorContext;
|
||||
|
||||
@Before
|
||||
public void before() throws IOException {
|
||||
|
@ -120,8 +120,8 @@ public class GlobalStateManagerImplTest {
|
|||
stateDirectory,
|
||||
stateRestoreListener,
|
||||
streamsConfig);
|
||||
mockProcessorContext = new MockProcessorContext(stateDirectory.globalStateDir(), streamsConfig);
|
||||
stateManager.setGlobalProcessorContext(mockProcessorContext);
|
||||
processorContext = new InternalMockProcessorContext(stateDirectory.globalStateDir(), streamsConfig);
|
||||
stateManager.setGlobalProcessorContext(processorContext);
|
||||
checkpointFile = new File(stateManager.baseDir(), ProcessorStateManager.CHECKPOINT_FILE_NAME);
|
||||
}
|
||||
|
||||
|
@ -631,7 +631,7 @@ public class GlobalStateManagerImplTest {
|
|||
assertTrue(testFile4.exists());
|
||||
|
||||
// only delete and recreate store 1 and 3 -- 2 and 4 must be untouched
|
||||
stateManager.reinitializeStateStoresForPartitions(Utils.mkList(t1, t3), mockProcessorContext);
|
||||
stateManager.reinitializeStateStoresForPartitions(Utils.mkList(t1, t3), processorContext);
|
||||
|
||||
assertFalse(testFile1.exists());
|
||||
assertTrue(testFile2.exists());
|
||||
|
|
|
@ -24,7 +24,7 @@ import org.apache.kafka.streams.errors.StreamsException;
|
|||
import org.apache.kafka.streams.processor.Processor;
|
||||
import org.apache.kafka.streams.processor.ProcessorContext;
|
||||
import org.apache.kafka.streams.state.StateSerdes;
|
||||
import org.apache.kafka.test.MockProcessorContext;
|
||||
import org.apache.kafka.test.InternalMockProcessorContext;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Collections;
|
||||
|
@ -110,7 +110,7 @@ public class ProcessorNodeTest {
|
|||
final StateSerdes anyStateSerde = StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class);
|
||||
|
||||
final Metrics metrics = new Metrics();
|
||||
final MockProcessorContext context = new MockProcessorContext(anyStateSerde, new RecordCollectorImpl(null, null, new LogContext("processnode-test "), new DefaultProductionExceptionHandler()), metrics);
|
||||
final InternalMockProcessorContext context = new InternalMockProcessorContext(anyStateSerde, new RecordCollectorImpl(null, null, new LogContext("processnode-test "), new DefaultProductionExceptionHandler()), metrics);
|
||||
final ProcessorNode node = new ProcessorNode("name", new NoOpProcessor(), Collections.emptySet());
|
||||
node.init(context);
|
||||
|
||||
|
|
|
@ -34,7 +34,7 @@ import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
|
|||
import org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp;
|
||||
import org.apache.kafka.streams.processor.TimestampExtractor;
|
||||
import org.apache.kafka.streams.state.StateSerdes;
|
||||
import org.apache.kafka.test.MockProcessorContext;
|
||||
import org.apache.kafka.test.InternalMockProcessorContext;
|
||||
import org.apache.kafka.test.MockSourceNode;
|
||||
import org.apache.kafka.test.MockTimestampExtractor;
|
||||
import org.junit.After;
|
||||
|
@ -54,7 +54,7 @@ public class RecordQueueTest {
|
|||
private final TimestampExtractor timestampExtractor = new MockTimestampExtractor();
|
||||
private final String[] topics = {"topic"};
|
||||
|
||||
final MockProcessorContext context = new MockProcessorContext(StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class),
|
||||
final InternalMockProcessorContext context = new InternalMockProcessorContext(StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class),
|
||||
new RecordCollectorImpl(null, null, new LogContext("record-queue-test "), new DefaultProductionExceptionHandler()));
|
||||
private final MockSourceNode mockSourceNodeWithMetrics = new MockSourceNode<>(topics, intDeserializer, intDeserializer);
|
||||
private final RecordQueue queue = new RecordQueue(
|
||||
|
|
|
@ -24,7 +24,7 @@ import org.apache.kafka.common.utils.LogContext;
|
|||
import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
|
||||
import org.apache.kafka.streams.errors.StreamsException;
|
||||
import org.apache.kafka.streams.state.StateSerdes;
|
||||
import org.apache.kafka.test.MockProcessorContext;
|
||||
import org.apache.kafka.test.InternalMockProcessorContext;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
@ -37,7 +37,7 @@ import static org.junit.Assert.fail;
|
|||
public class SinkNodeTest {
|
||||
private final Serializer anySerializer = Serdes.Bytes().serializer();
|
||||
private final StateSerdes anyStateSerde = StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class);
|
||||
private final MockProcessorContext context = new MockProcessorContext(anyStateSerde,
|
||||
private final InternalMockProcessorContext context = new InternalMockProcessorContext(anyStateSerde,
|
||||
new RecordCollectorImpl(new MockProducer<byte[], byte[]>(true, anySerializer, anySerializer), null, new LogContext("sinknode-test "), new DefaultProductionExceptionHandler()));
|
||||
private final SinkNode sink = new SinkNode<>("anyNodeName", "any-output-topic", anySerializer, anySerializer, null);
|
||||
|
||||
|
|
|
@ -34,7 +34,7 @@ import org.apache.kafka.streams.processor.internals.RecordCollector;
|
|||
import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
|
||||
import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreTest;
|
||||
import org.apache.kafka.streams.state.internals.ThreadCache;
|
||||
import org.apache.kafka.test.MockProcessorContext;
|
||||
import org.apache.kafka.test.InternalMockProcessorContext;
|
||||
import org.apache.kafka.test.MockTimestampExtractor;
|
||||
import org.apache.kafka.test.TestUtils;
|
||||
|
||||
|
@ -179,7 +179,7 @@ public class KeyValueStoreTestDriver<K, V> {
|
|||
private final Set<K> flushedRemovals = new HashSet<>();
|
||||
private final List<KeyValue<byte[], byte[]>> restorableEntries = new LinkedList<>();
|
||||
|
||||
private final MockProcessorContext context;
|
||||
private final InternalMockProcessorContext context;
|
||||
private final StateSerdes<K, V> stateSerdes;
|
||||
|
||||
private KeyValueStoreTestDriver(final StateSerdes<K, V> serdes) {
|
||||
|
@ -227,7 +227,7 @@ public class KeyValueStoreTestDriver<K, V> {
|
|||
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, serdes.valueSerde().getClass());
|
||||
props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, RocksDBKeyValueStoreTest.TheRocksDbConfigSetter.class);
|
||||
|
||||
context = new MockProcessorContext(stateDir, serdes.keySerde(), serdes.valueSerde(), recordCollector, null) {
|
||||
context = new InternalMockProcessorContext(stateDir, serdes.keySerde(), serdes.valueSerde(), recordCollector, null) {
|
||||
ThreadCache cache = new ThreadCache(new LogContext("testCache "), 1024 * 1024L, metrics());
|
||||
|
||||
@Override
|
||||
|
|
|
@ -25,7 +25,7 @@ import org.apache.kafka.streams.processor.ProcessorContext;
|
|||
import org.apache.kafka.streams.state.KeyValueIterator;
|
||||
import org.apache.kafka.streams.state.KeyValueStore;
|
||||
import org.apache.kafka.streams.state.KeyValueStoreTestDriver;
|
||||
import org.apache.kafka.test.MockProcessorContext;
|
||||
import org.apache.kafka.test.InternalMockProcessorContext;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
@ -48,14 +48,14 @@ public abstract class AbstractKeyValueStoreTest {
|
|||
|
||||
protected abstract <K, V> KeyValueStore<K, V> createKeyValueStore(final ProcessorContext context);
|
||||
|
||||
protected MockProcessorContext context;
|
||||
protected InternalMockProcessorContext context;
|
||||
protected KeyValueStore<Integer, String> store;
|
||||
protected KeyValueStoreTestDriver<Integer, String> driver;
|
||||
|
||||
@Before
|
||||
public void before() {
|
||||
driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
|
||||
context = (MockProcessorContext) driver.context();
|
||||
context = (InternalMockProcessorContext) driver.context();
|
||||
context.setTime(10);
|
||||
store = createKeyValueStore(context);
|
||||
}
|
||||
|
|
|
@ -33,7 +33,7 @@ import org.apache.kafka.streams.state.KeyValueIterator;
|
|||
import org.apache.kafka.streams.state.KeyValueStore;
|
||||
import org.apache.kafka.streams.state.StoreBuilder;
|
||||
import org.apache.kafka.streams.state.Stores;
|
||||
import org.apache.kafka.test.MockProcessorContext;
|
||||
import org.apache.kafka.test.InternalMockProcessorContext;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
@ -58,7 +58,7 @@ import static org.junit.Assert.fail;
|
|||
public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
|
||||
|
||||
private final int maxCacheSizeBytes = 150;
|
||||
private MockProcessorContext context;
|
||||
private InternalMockProcessorContext context;
|
||||
private CachingKeyValueStore<String, String> store;
|
||||
private InMemoryKeyValueStore<Bytes, byte[]> underlyingStore;
|
||||
private ThreadCache cache;
|
||||
|
@ -73,7 +73,7 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
|
|||
store = new CachingKeyValueStore<>(underlyingStore, Serdes.String(), Serdes.String());
|
||||
store.setFlushListener(cacheFlushListener, false);
|
||||
cache = new ThreadCache(new LogContext("testCache "), maxCacheSizeBytes, new MockStreamsMetrics(new Metrics()));
|
||||
context = new MockProcessorContext(null, null, null, (RecordCollector) null, cache);
|
||||
context = new InternalMockProcessorContext(null, null, null, (RecordCollector) null, cache);
|
||||
topic = "topic";
|
||||
context.setRecordContext(new ProcessorRecordContext(10, 0, 0, topic));
|
||||
store.init(context, null);
|
||||
|
|
|
@ -30,7 +30,7 @@ import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
|
|||
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
|
||||
import org.apache.kafka.streams.state.KeyValueIterator;
|
||||
import org.apache.kafka.streams.state.StateSerdes;
|
||||
import org.apache.kafka.test.MockProcessorContext;
|
||||
import org.apache.kafka.test.InternalMockProcessorContext;
|
||||
import org.apache.kafka.test.TestUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
@ -52,7 +52,7 @@ import static org.junit.Assert.assertFalse;
|
|||
public class CachingSessionStoreTest {
|
||||
|
||||
private static final int MAX_CACHE_SIZE_BYTES = 600;
|
||||
private MockProcessorContext context;
|
||||
private InternalMockProcessorContext context;
|
||||
private RocksDBSegmentedBytesStore underlying;
|
||||
private CachingSessionStore<String, String> cachingStore;
|
||||
private ThreadCache cache;
|
||||
|
@ -75,7 +75,7 @@ public class CachingSessionStoreTest {
|
|||
Segments.segmentInterval(retention, numSegments)
|
||||
);
|
||||
cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
|
||||
context = new MockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache);
|
||||
context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache);
|
||||
context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, "topic"));
|
||||
cachingStore.init(context, cachingStore);
|
||||
}
|
||||
|
|
|
@ -30,7 +30,7 @@ import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
|
|||
import org.apache.kafka.streams.processor.internals.RecordCollector;
|
||||
import org.apache.kafka.streams.state.KeyValueIterator;
|
||||
import org.apache.kafka.streams.state.WindowStoreIterator;
|
||||
import org.apache.kafka.test.MockProcessorContext;
|
||||
import org.apache.kafka.test.InternalMockProcessorContext;
|
||||
import org.apache.kafka.test.TestUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
@ -56,7 +56,7 @@ public class CachingWindowStoreTest {
|
|||
private static final int MAX_CACHE_SIZE_BYTES = 150;
|
||||
private static final long DEFAULT_TIMESTAMP = 10L;
|
||||
private static final Long WINDOW_SIZE = 10000L;
|
||||
private MockProcessorContext context;
|
||||
private InternalMockProcessorContext context;
|
||||
private RocksDBSegmentedBytesStore underlying;
|
||||
private CachingWindowStore<String, String> cachingStore;
|
||||
private CachingKeyValueStoreTest.CacheFlushListenerStub<Windowed<String>, String> cacheListener;
|
||||
|
@ -80,7 +80,7 @@ public class CachingWindowStoreTest {
|
|||
cachingStore.setFlushListener(cacheListener, false);
|
||||
cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
|
||||
topic = "topic";
|
||||
context = new MockProcessorContext(TestUtils.tempDirectory(), null, null, (RecordCollector) null, cache);
|
||||
context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, (RecordCollector) null, cache);
|
||||
context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, topic));
|
||||
cachingStore.init(context, cachingStore);
|
||||
}
|
||||
|
|
|
@ -23,7 +23,7 @@ import org.apache.kafka.common.utils.Bytes;
|
|||
import org.apache.kafka.common.utils.LogContext;
|
||||
import org.apache.kafka.streams.KeyValue;
|
||||
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
|
||||
import org.apache.kafka.test.MockProcessorContext;
|
||||
import org.apache.kafka.test.InternalMockProcessorContext;
|
||||
import org.apache.kafka.test.NoOpRecordCollector;
|
||||
import org.apache.kafka.test.TestUtils;
|
||||
import org.junit.After;
|
||||
|
@ -41,7 +41,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
|
|||
|
||||
public class ChangeLoggingKeyValueBytesStoreTest {
|
||||
|
||||
private MockProcessorContext context;
|
||||
private InternalMockProcessorContext context;
|
||||
private final InMemoryKeyValueStore<Bytes, byte[]> inner = new InMemoryKeyValueStore<>("kv", Serdes.Bytes(), Serdes.ByteArray());
|
||||
private final ChangeLoggingKeyValueBytesStore store = new ChangeLoggingKeyValueBytesStore(inner);
|
||||
private final Map sent = new HashMap<>();
|
||||
|
@ -64,7 +64,7 @@ public class ChangeLoggingKeyValueBytesStoreTest {
|
|||
sent.put(key, value);
|
||||
}
|
||||
};
|
||||
context = new MockProcessorContext(
|
||||
context = new InternalMockProcessorContext(
|
||||
TestUtils.tempDirectory(),
|
||||
Serdes.String(),
|
||||
Serdes.Long(),
|
||||
|
|
|
@ -27,7 +27,7 @@ import org.apache.kafka.common.utils.MockTime;
|
|||
import org.apache.kafka.streams.StreamsMetrics;
|
||||
import org.apache.kafka.streams.kstream.Windowed;
|
||||
import org.apache.kafka.streams.state.WindowStore;
|
||||
import org.apache.kafka.test.MockProcessorContext;
|
||||
import org.apache.kafka.test.InternalMockProcessorContext;
|
||||
import org.apache.kafka.test.NoOpRecordCollector;
|
||||
import org.apache.kafka.test.TestUtils;
|
||||
import org.easymock.EasyMock;
|
||||
|
@ -44,7 +44,7 @@ import static org.junit.Assert.assertNull;
|
|||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class MeteredWindowStoreTest {
|
||||
private MockProcessorContext context;
|
||||
private InternalMockProcessorContext context;
|
||||
@SuppressWarnings("unchecked")
|
||||
private final WindowStore<Bytes, byte[]> innerStoreMock = EasyMock.createNiceMock(WindowStore.class);
|
||||
private final MeteredWindowStore<String, String> store = new MeteredWindowStore<>(innerStoreMock, "scope", new MockTime(), Serdes.String(), new SerdeThatDoesntHandleNull());
|
||||
|
@ -98,7 +98,7 @@ public class MeteredWindowStoreTest {
|
|||
|
||||
};
|
||||
|
||||
context = new MockProcessorContext(
|
||||
context = new InternalMockProcessorContext(
|
||||
TestUtils.tempDirectory(),
|
||||
Serdes.String(),
|
||||
Serdes.Long(),
|
||||
|
|
|
@ -24,7 +24,7 @@ import org.apache.kafka.common.utils.LogContext;
|
|||
import org.apache.kafka.streams.StreamsMetrics;
|
||||
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
|
||||
import org.apache.kafka.streams.state.KeyValueStore;
|
||||
import org.apache.kafka.test.MockProcessorContext;
|
||||
import org.apache.kafka.test.InternalMockProcessorContext;
|
||||
import org.apache.kafka.test.NoOpRecordCollector;
|
||||
import org.apache.kafka.test.TestUtils;
|
||||
import org.junit.After;
|
||||
|
@ -44,7 +44,7 @@ public class RocksDBKeyValueStoreSupplierTest {
|
|||
|
||||
private static final String STORE_NAME = "name";
|
||||
private final ThreadCache cache = new ThreadCache(new LogContext("test "), 1024, new MockStreamsMetrics(new Metrics()));
|
||||
private final MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(),
|
||||
private final InternalMockProcessorContext context = new InternalMockProcessorContext(TestUtils.tempDirectory(),
|
||||
Serdes.String(),
|
||||
Serdes.String(),
|
||||
new NoOpRecordCollector(),
|
||||
|
@ -73,7 +73,7 @@ public class RocksDBKeyValueStoreSupplierTest {
|
|||
logged.add(new ProducerRecord<K, V>(topic, partition, timestamp, key, value));
|
||||
}
|
||||
};
|
||||
final MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(),
|
||||
final InternalMockProcessorContext context = new InternalMockProcessorContext(TestUtils.tempDirectory(),
|
||||
Serdes.String(),
|
||||
Serdes.String(),
|
||||
collector,
|
||||
|
@ -100,7 +100,7 @@ public class RocksDBKeyValueStoreSupplierTest {
|
|||
logged.add(new ProducerRecord<>(topic, partition, timestamp, key, value));
|
||||
}
|
||||
};
|
||||
final MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(),
|
||||
final InternalMockProcessorContext context = new InternalMockProcessorContext(TestUtils.tempDirectory(),
|
||||
Serdes.String(),
|
||||
Serdes.String(),
|
||||
collector,
|
||||
|
|
|
@ -26,7 +26,7 @@ import org.apache.kafka.streams.kstream.Windowed;
|
|||
import org.apache.kafka.streams.kstream.internals.SessionWindow;
|
||||
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
|
||||
import org.apache.kafka.streams.state.KeyValueIterator;
|
||||
import org.apache.kafka.test.MockProcessorContext;
|
||||
import org.apache.kafka.test.InternalMockProcessorContext;
|
||||
import org.apache.kafka.test.NoOpRecordCollector;
|
||||
import org.apache.kafka.test.TestUtils;
|
||||
import org.junit.After;
|
||||
|
@ -56,7 +56,7 @@ public class RocksDBSegmentedBytesStoreTest {
|
|||
|
||||
private final long retention = 60000L;
|
||||
private final int numSegments = 3;
|
||||
private MockProcessorContext context;
|
||||
private InternalMockProcessorContext context;
|
||||
private final String storeName = "bytes-store";
|
||||
private RocksDBSegmentedBytesStore bytesStore;
|
||||
private File stateDir;
|
||||
|
@ -71,7 +71,7 @@ public class RocksDBSegmentedBytesStoreTest {
|
|||
schema);
|
||||
|
||||
stateDir = TestUtils.tempDirectory();
|
||||
context = new MockProcessorContext(
|
||||
context = new InternalMockProcessorContext(
|
||||
stateDir,
|
||||
Serdes.String(),
|
||||
Serdes.Long(),
|
||||
|
|
|
@ -26,7 +26,7 @@ import org.apache.kafka.streams.kstream.Windowed;
|
|||
import org.apache.kafka.streams.kstream.internals.SessionWindow;
|
||||
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
|
||||
import org.apache.kafka.streams.state.SessionStore;
|
||||
import org.apache.kafka.test.MockProcessorContext;
|
||||
import org.apache.kafka.test.InternalMockProcessorContext;
|
||||
import org.apache.kafka.test.NoOpRecordCollector;
|
||||
import org.apache.kafka.test.TestUtils;
|
||||
import org.junit.After;
|
||||
|
@ -47,7 +47,7 @@ public class RocksDBSessionStoreSupplierTest {
|
|||
private static final String STORE_NAME = "name";
|
||||
private final List<ProducerRecord> logged = new ArrayList<>();
|
||||
private final ThreadCache cache = new ThreadCache(new LogContext("test "), 1024, new MockStreamsMetrics(new Metrics()));
|
||||
private final MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(),
|
||||
private final InternalMockProcessorContext context = new InternalMockProcessorContext(TestUtils.tempDirectory(),
|
||||
Serdes.String(),
|
||||
Serdes.String(),
|
||||
new NoOpRecordCollector() {
|
||||
|
|
|
@ -25,7 +25,7 @@ import org.apache.kafka.streams.kstream.internals.SessionWindow;
|
|||
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
|
||||
import org.apache.kafka.streams.state.KeyValueIterator;
|
||||
import org.apache.kafka.streams.state.SessionStore;
|
||||
import org.apache.kafka.test.MockProcessorContext;
|
||||
import org.apache.kafka.test.InternalMockProcessorContext;
|
||||
import org.apache.kafka.test.NoOpRecordCollector;
|
||||
import org.apache.kafka.test.TestUtils;
|
||||
import org.junit.After;
|
||||
|
@ -45,7 +45,7 @@ import static org.junit.Assert.assertTrue;
|
|||
public class RocksDBSessionStoreTest {
|
||||
|
||||
private SessionStore<String, Long> sessionStore;
|
||||
private MockProcessorContext context;
|
||||
private InternalMockProcessorContext context;
|
||||
|
||||
@Before
|
||||
public void before() {
|
||||
|
@ -59,7 +59,7 @@ public class RocksDBSessionStoreTest {
|
|||
Serdes.String(),
|
||||
Serdes.Long());
|
||||
|
||||
context = new MockProcessorContext(TestUtils.tempDirectory(),
|
||||
context = new InternalMockProcessorContext(TestUtils.tempDirectory(),
|
||||
Serdes.String(),
|
||||
Serdes.Long(),
|
||||
new NoOpRecordCollector(),
|
||||
|
|
|
@ -31,7 +31,7 @@ import org.apache.kafka.streams.errors.ProcessorStateException;
|
|||
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
|
||||
import org.apache.kafka.streams.state.KeyValueIterator;
|
||||
import org.apache.kafka.streams.state.RocksDBConfigSetter;
|
||||
import org.apache.kafka.test.MockProcessorContext;
|
||||
import org.apache.kafka.test.InternalMockProcessorContext;
|
||||
import org.apache.kafka.test.NoOpRecordCollector;
|
||||
import org.apache.kafka.test.TestUtils;
|
||||
import org.junit.After;
|
||||
|
@ -62,14 +62,14 @@ public class RocksDBStoreTest {
|
|||
private Serializer<String> stringSerializer = new StringSerializer();
|
||||
private Deserializer<String> stringDeserializer = new StringDeserializer();
|
||||
private RocksDBStore rocksDBStore;
|
||||
private MockProcessorContext context;
|
||||
private InternalMockProcessorContext context;
|
||||
private File dir;
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
rocksDBStore = new RocksDBStore("test");
|
||||
dir = TestUtils.tempDirectory();
|
||||
context = new MockProcessorContext(dir,
|
||||
context = new InternalMockProcessorContext(dir,
|
||||
Serdes.String(),
|
||||
Serdes.String(),
|
||||
new NoOpRecordCollector(),
|
||||
|
@ -115,7 +115,7 @@ public class RocksDBStoreTest {
|
|||
configs.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "test-server:9092");
|
||||
configs.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, MockRocksDbConfigSetter.class);
|
||||
MockRocksDbConfigSetter.called = false;
|
||||
rocksDBStore.openDB(new MockProcessorContext(tempDir, new StreamsConfig(configs)));
|
||||
rocksDBStore.openDB(new InternalMockProcessorContext(tempDir, new StreamsConfig(configs)));
|
||||
|
||||
assertTrue(MockRocksDbConfigSetter.called);
|
||||
}
|
||||
|
@ -123,7 +123,7 @@ public class RocksDBStoreTest {
|
|||
@Test(expected = ProcessorStateException.class)
|
||||
public void shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir() throws IOException {
|
||||
final File tmpDir = TestUtils.tempDirectory();
|
||||
MockProcessorContext tmpContext = new MockProcessorContext(tmpDir,
|
||||
InternalMockProcessorContext tmpContext = new InternalMockProcessorContext(tmpDir,
|
||||
Serdes.String(),
|
||||
Serdes.Long(),
|
||||
new NoOpRecordCollector(),
|
||||
|
|
|
@ -24,7 +24,7 @@ import org.apache.kafka.common.utils.LogContext;
|
|||
import org.apache.kafka.streams.StreamsMetrics;
|
||||
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
|
||||
import org.apache.kafka.streams.state.WindowStore;
|
||||
import org.apache.kafka.test.MockProcessorContext;
|
||||
import org.apache.kafka.test.InternalMockProcessorContext;
|
||||
import org.apache.kafka.test.NoOpRecordCollector;
|
||||
import org.apache.kafka.test.TestUtils;
|
||||
import org.junit.After;
|
||||
|
@ -45,7 +45,7 @@ public class RocksDBWindowStoreSupplierTest {
|
|||
private static final String STORE_NAME = "name";
|
||||
private WindowStore<String, String> store;
|
||||
private final ThreadCache cache = new ThreadCache(new LogContext("test "), 1024, new MockStreamsMetrics(new Metrics()));
|
||||
private final MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(),
|
||||
private final InternalMockProcessorContext context = new InternalMockProcessorContext(TestUtils.tempDirectory(),
|
||||
Serdes.String(),
|
||||
Serdes.String(),
|
||||
new NoOpRecordCollector(),
|
||||
|
@ -75,7 +75,7 @@ public class RocksDBWindowStoreSupplierTest {
|
|||
logged.add(new ProducerRecord<K, V>(topic, partition, timestamp, key, value));
|
||||
}
|
||||
};
|
||||
final MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(),
|
||||
final InternalMockProcessorContext context = new InternalMockProcessorContext(TestUtils.tempDirectory(),
|
||||
Serdes.String(),
|
||||
Serdes.String(),
|
||||
collector,
|
||||
|
@ -102,7 +102,7 @@ public class RocksDBWindowStoreSupplierTest {
|
|||
logged.add(new ProducerRecord<K, V>(topic, partition, timestamp, key, value));
|
||||
}
|
||||
};
|
||||
final MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(),
|
||||
final InternalMockProcessorContext context = new InternalMockProcessorContext(TestUtils.tempDirectory(),
|
||||
Serdes.String(),
|
||||
Serdes.String(),
|
||||
collector,
|
||||
|
|
|
@ -37,7 +37,7 @@ import org.apache.kafka.streams.state.StateSerdes;
|
|||
import org.apache.kafka.streams.state.Stores;
|
||||
import org.apache.kafka.streams.state.WindowStore;
|
||||
import org.apache.kafka.streams.state.WindowStoreIterator;
|
||||
import org.apache.kafka.test.MockProcessorContext;
|
||||
import org.apache.kafka.test.InternalMockProcessorContext;
|
||||
import org.apache.kafka.test.StreamsTestUtils;
|
||||
import org.apache.kafka.test.TestUtils;
|
||||
import org.junit.After;
|
||||
|
@ -94,7 +94,7 @@ public class RocksDBWindowStoreTest {
|
|||
};
|
||||
|
||||
private final File baseDir = TestUtils.tempDirectory("test");
|
||||
private final MockProcessorContext context = new MockProcessorContext(baseDir, Serdes.ByteArray(), Serdes.ByteArray(), recordCollector, cache);
|
||||
private final InternalMockProcessorContext context = new InternalMockProcessorContext(baseDir, Serdes.ByteArray(), Serdes.ByteArray(), recordCollector, cache);
|
||||
private WindowStore<Integer, String> windowStore;
|
||||
|
||||
private WindowStore<Integer, String> createWindowStore(final ProcessorContext context, final boolean retainDuplicates) {
|
||||
|
@ -842,7 +842,7 @@ public class RocksDBWindowStoreTest {
|
|||
assertThat(toList(windowStore.fetch(key3, 0, Long.MAX_VALUE)), equalTo(expectedKey3));
|
||||
}
|
||||
|
||||
private void putFirstBatch(final WindowStore<Integer, String> store, final long startTime, final MockProcessorContext context) {
|
||||
private void putFirstBatch(final WindowStore<Integer, String> store, final long startTime, final InternalMockProcessorContext context) {
|
||||
context.setRecordContext(createRecordContext(startTime));
|
||||
store.put(0, "zero");
|
||||
context.setRecordContext(createRecordContext(startTime + 1L));
|
||||
|
@ -855,7 +855,7 @@ public class RocksDBWindowStoreTest {
|
|||
store.put(5, "five");
|
||||
}
|
||||
|
||||
private void putSecondBatch(final WindowStore<Integer, String> store, final long startTime, MockProcessorContext context) {
|
||||
private void putSecondBatch(final WindowStore<Integer, String> store, final long startTime, InternalMockProcessorContext context) {
|
||||
context.setRecordContext(createRecordContext(startTime + 3L));
|
||||
store.put(2, "two+1");
|
||||
context.setRecordContext(createRecordContext(startTime + 4L));
|
||||
|
|
|
@ -23,7 +23,7 @@ import org.apache.kafka.common.utils.LogContext;
|
|||
import org.apache.kafka.streams.KeyValue;
|
||||
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
|
||||
import org.apache.kafka.streams.state.KeyValueIterator;
|
||||
import org.apache.kafka.test.MockProcessorContext;
|
||||
import org.apache.kafka.test.InternalMockProcessorContext;
|
||||
import org.apache.kafka.test.NoOpRecordCollector;
|
||||
import org.apache.kafka.test.TestUtils;
|
||||
import org.junit.After;
|
||||
|
@ -48,12 +48,12 @@ public class SegmentIteratorTest {
|
|||
}
|
||||
};
|
||||
|
||||
private MockProcessorContext context;
|
||||
private InternalMockProcessorContext context;
|
||||
private SegmentIterator iterator = null;
|
||||
|
||||
@Before
|
||||
public void before() {
|
||||
context = new MockProcessorContext(
|
||||
context = new InternalMockProcessorContext(
|
||||
TestUtils.tempDirectory(),
|
||||
Serdes.String(),
|
||||
Serdes.String(),
|
||||
|
|
|
@ -20,7 +20,7 @@ import org.apache.kafka.common.metrics.Metrics;
|
|||
import org.apache.kafka.common.serialization.Serdes;
|
||||
import org.apache.kafka.common.utils.LogContext;
|
||||
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
|
||||
import org.apache.kafka.test.MockProcessorContext;
|
||||
import org.apache.kafka.test.InternalMockProcessorContext;
|
||||
import org.apache.kafka.test.NoOpRecordCollector;
|
||||
import org.apache.kafka.test.TestUtils;
|
||||
import org.junit.After;
|
||||
|
@ -44,7 +44,7 @@ import static org.junit.Assert.assertTrue;
|
|||
public class SegmentsTest {
|
||||
|
||||
private static final int NUM_SEGMENTS = 5;
|
||||
private MockProcessorContext context;
|
||||
private InternalMockProcessorContext context;
|
||||
private Segments segments;
|
||||
private long segmentInterval;
|
||||
private File stateDirectory;
|
||||
|
@ -54,7 +54,7 @@ public class SegmentsTest {
|
|||
@Before
|
||||
public void createContext() {
|
||||
stateDirectory = TestUtils.tempDirectory();
|
||||
context = new MockProcessorContext(stateDirectory,
|
||||
context = new InternalMockProcessorContext(stateDirectory,
|
||||
Serdes.String(),
|
||||
Serdes.Long(),
|
||||
new NoOpRecordCollector(),
|
||||
|
|
|
@ -21,7 +21,7 @@ import org.apache.kafka.streams.processor.StateStore;
|
|||
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
|
||||
import org.apache.kafka.streams.state.KeyValueStore;
|
||||
import org.apache.kafka.streams.state.StateSerdes;
|
||||
import org.apache.kafka.test.MockProcessorContext;
|
||||
import org.apache.kafka.test.InternalMockProcessorContext;
|
||||
import org.apache.kafka.test.NoOpRecordCollector;
|
||||
|
||||
import java.util.Collections;
|
||||
|
@ -42,7 +42,7 @@ public class StateStoreTestUtils {
|
|||
|
||||
final StateStore stateStore = supplier.get();
|
||||
stateStore.init(
|
||||
new MockProcessorContext(
|
||||
new InternalMockProcessorContext(
|
||||
StateSerdes.withBuiltinTypes(
|
||||
ProcessorStateManager.storeChangelogTopic(applicationId, name),
|
||||
keyType,
|
||||
|
|
|
@ -23,7 +23,7 @@ import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
|
|||
import org.apache.kafka.streams.processor.StreamPartitioner;
|
||||
import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
|
||||
import org.apache.kafka.streams.state.StateSerdes;
|
||||
import org.apache.kafka.test.MockProcessorContext;
|
||||
import org.apache.kafka.test.InternalMockProcessorContext;
|
||||
import org.junit.After;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -39,7 +39,7 @@ public class StoreChangeLoggerTest {
|
|||
|
||||
private final Map<Integer, String> logged = new HashMap<>();
|
||||
|
||||
private final MockProcessorContext context = new MockProcessorContext(StateSerdes.withBuiltinTypes(topic, Integer.class, String.class),
|
||||
private final InternalMockProcessorContext context = new InternalMockProcessorContext(StateSerdes.withBuiltinTypes(topic, Integer.class, String.class),
|
||||
new RecordCollectorImpl(null, "StoreChangeLoggerTest", new LogContext("StoreChangeLoggerTest "), new DefaultProductionExceptionHandler()) {
|
||||
@Override
|
||||
public <K1, V1> void send(final String topic,
|
||||
|
|
|
@ -48,7 +48,7 @@ import java.util.LinkedHashMap;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class MockProcessorContext extends AbstractProcessorContext implements RecordCollector.Supplier {
|
||||
public class InternalMockProcessorContext extends AbstractProcessorContext implements RecordCollector.Supplier {
|
||||
|
||||
private final File stateDir;
|
||||
private final Metrics metrics;
|
||||
|
@ -61,19 +61,19 @@ public class MockProcessorContext extends AbstractProcessorContext implements Re
|
|||
private Serde<?> valSerde;
|
||||
private long timestamp = -1L;
|
||||
|
||||
public MockProcessorContext(final File stateDir,
|
||||
final StreamsConfig config) {
|
||||
public InternalMockProcessorContext(final File stateDir,
|
||||
final StreamsConfig config) {
|
||||
this(stateDir, null, null, new Metrics(), config, null, null);
|
||||
}
|
||||
|
||||
public MockProcessorContext(final StateSerdes<?, ?> serdes,
|
||||
final RecordCollector collector) {
|
||||
public InternalMockProcessorContext(final StateSerdes<?, ?> serdes,
|
||||
final RecordCollector collector) {
|
||||
this(null, serdes.keySerde(), serdes.valueSerde(), collector, null);
|
||||
}
|
||||
|
||||
public MockProcessorContext(final StateSerdes<?, ?> serdes,
|
||||
final RecordCollector collector,
|
||||
final Metrics metrics) {
|
||||
public InternalMockProcessorContext(final StateSerdes<?, ?> serdes,
|
||||
final RecordCollector collector,
|
||||
final Metrics metrics) {
|
||||
this(null, serdes.keySerde(), serdes.valueSerde(), metrics, new StreamsConfig(StreamsTestUtils.minimalStreamsConfig()), new RecordCollector.Supplier() {
|
||||
@Override
|
||||
public RecordCollector recordCollector() {
|
||||
|
@ -82,11 +82,11 @@ public class MockProcessorContext extends AbstractProcessorContext implements Re
|
|||
}, null);
|
||||
}
|
||||
|
||||
public MockProcessorContext(final File stateDir,
|
||||
final Serde<?> keySerde,
|
||||
final Serde<?> valSerde,
|
||||
final RecordCollector collector,
|
||||
final ThreadCache cache) {
|
||||
public InternalMockProcessorContext(final File stateDir,
|
||||
final Serde<?> keySerde,
|
||||
final Serde<?> valSerde,
|
||||
final RecordCollector collector,
|
||||
final ThreadCache cache) {
|
||||
this(stateDir, keySerde, valSerde, new Metrics(), new StreamsConfig(StreamsTestUtils.minimalStreamsConfig()), new RecordCollector.Supplier() {
|
||||
@Override
|
||||
public RecordCollector recordCollector() {
|
||||
|
@ -95,13 +95,13 @@ public class MockProcessorContext extends AbstractProcessorContext implements Re
|
|||
}, cache);
|
||||
}
|
||||
|
||||
private MockProcessorContext(final File stateDir,
|
||||
final Serde<?> keySerde,
|
||||
final Serde<?> valSerde,
|
||||
final Metrics metrics,
|
||||
final StreamsConfig config,
|
||||
final RecordCollector.Supplier collectorSupplier,
|
||||
final ThreadCache cache) {
|
||||
private InternalMockProcessorContext(final File stateDir,
|
||||
final Serde<?> keySerde,
|
||||
final Serde<?> valSerde,
|
||||
final Metrics metrics,
|
||||
final StreamsConfig config,
|
||||
final RecordCollector.Supplier collectorSupplier,
|
||||
final ThreadCache cache) {
|
||||
super(new TaskId(0, 0),
|
||||
config,
|
||||
new MockStreamsMetrics(metrics),
|
|
@ -48,7 +48,7 @@ public class KStreamTestDriver extends ExternalResource {
|
|||
private static final long DEFAULT_CACHE_SIZE_BYTES = 1 * 1024 * 1024L;
|
||||
|
||||
private ProcessorTopology topology;
|
||||
private MockProcessorContext context;
|
||||
private InternalMockProcessorContext context;
|
||||
private ProcessorTopology globalTopology;
|
||||
private final LogContext logContext = new LogContext("testCache ");
|
||||
|
||||
|
@ -85,7 +85,7 @@ public class KStreamTestDriver extends ExternalResource {
|
|||
topology = builder.build(null);
|
||||
globalTopology = builder.buildGlobalStateTopology();
|
||||
final ThreadCache cache = new ThreadCache(logContext, cacheSize, new MockStreamsMetrics(new Metrics()));
|
||||
context = new MockProcessorContext(stateDir, keySerde, valSerde, new MockRecordCollector(), cache);
|
||||
context = new InternalMockProcessorContext(stateDir, keySerde, valSerde, new MockRecordCollector(), cache);
|
||||
context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "topic"));
|
||||
// init global topology first as it will add stores to the
|
||||
// store map that are required for joins etc.
|
||||
|
@ -126,7 +126,7 @@ public class KStreamTestDriver extends ExternalResource {
|
|||
globalTopology = internalTopologyBuilder.buildGlobalStateTopology();
|
||||
|
||||
final ThreadCache cache = new ThreadCache(logContext, cacheSize, new MockStreamsMetrics(new Metrics()));
|
||||
context = new MockProcessorContext(stateDir, keySerde, valSerde, new MockRecordCollector(), cache);
|
||||
context = new InternalMockProcessorContext(stateDir, keySerde, valSerde, new MockRecordCollector(), cache);
|
||||
context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "topic"));
|
||||
|
||||
// init global topology first as it will add stores to the
|
||||
|
|
|
@ -0,0 +1,478 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.streams.processor;
|
||||
|
||||
import org.apache.kafka.common.annotation.InterfaceStability;
|
||||
import org.apache.kafka.common.metrics.Metrics;
|
||||
import org.apache.kafka.common.serialization.Serde;
|
||||
import org.apache.kafka.streams.KeyValue;
|
||||
import org.apache.kafka.streams.StreamsConfig;
|
||||
import org.apache.kafka.streams.StreamsMetrics;
|
||||
import org.apache.kafka.streams.Topology;
|
||||
import org.apache.kafka.streams.TopologyTestDriver;
|
||||
import org.apache.kafka.streams.kstream.Transformer;
|
||||
import org.apache.kafka.streams.kstream.ValueTransformer;
|
||||
import org.apache.kafka.streams.processor.internals.RecordCollector;
|
||||
import org.apache.kafka.streams.processor.internals.StreamsMetricsImpl;
|
||||
import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
* {@link MockProcessorContext} is a mock of {@link ProcessorContext} for users to test their {@link Processor},
|
||||
* {@link Transformer}, and {@link ValueTransformer} implementations.
|
||||
* <p>
|
||||
* The tests for this class (org.apache.kafka.streams.MockProcessorContextTest) include several behavioral
|
||||
* tests that serve as example usage.
|
||||
* <p>
|
||||
* Note that this class does not take any automated actions (such as firing scheduled punctuators).
|
||||
* It simply captures any data it witnessess.
|
||||
* If you require more automated tests, we recommend wrapping your {@link Processor} in a minimal source-processor-sink
|
||||
* {@link Topology} and using the {@link TopologyTestDriver}.
|
||||
*/
|
||||
@InterfaceStability.Evolving
|
||||
public class MockProcessorContext implements ProcessorContext, RecordCollector.Supplier {
|
||||
// Immutable fields ================================================
|
||||
private final StreamsMetricsImpl metrics;
|
||||
private final TaskId taskId;
|
||||
private final StreamsConfig config;
|
||||
private final File stateDir;
|
||||
|
||||
// settable record metadata ================================================
|
||||
private String topic;
|
||||
private Integer partition;
|
||||
private Long offset;
|
||||
private Long timestamp;
|
||||
|
||||
// mocks ================================================
|
||||
private final Map<String, StateStore> stateStores = new HashMap<>();
|
||||
private final List<CapturedPunctuator> punctuators = new LinkedList<>();
|
||||
private final List<CapturedForward> capturedForwards = new LinkedList<>();
|
||||
private boolean committed = false;
|
||||
|
||||
/**
|
||||
* {@link CapturedPunctuator} holds captured punctuators, along with their scheduling information.
|
||||
*/
|
||||
public static class CapturedPunctuator {
|
||||
private final long intervalMs;
|
||||
private final PunctuationType type;
|
||||
private final Punctuator punctuator;
|
||||
private boolean cancelled = false;
|
||||
|
||||
private CapturedPunctuator(final long intervalMs, final PunctuationType type, final Punctuator punctuator) {
|
||||
this.intervalMs = intervalMs;
|
||||
this.type = type;
|
||||
this.punctuator = punctuator;
|
||||
}
|
||||
|
||||
public long getIntervalMs() {
|
||||
return intervalMs;
|
||||
}
|
||||
|
||||
public PunctuationType getType() {
|
||||
return type;
|
||||
}
|
||||
|
||||
public Punctuator getPunctuator() {
|
||||
return punctuator;
|
||||
}
|
||||
|
||||
public void cancel() {
|
||||
this.cancelled = true;
|
||||
}
|
||||
|
||||
public boolean cancelled() {
|
||||
return cancelled;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public static class CapturedForward {
|
||||
private final String childName;
|
||||
private final long timestamp;
|
||||
private final KeyValue keyValue;
|
||||
|
||||
private CapturedForward(final To to, final KeyValue keyValue) {
|
||||
if (keyValue == null) throw new IllegalArgumentException();
|
||||
|
||||
this.childName = to.childName;
|
||||
this.timestamp = to.timestamp;
|
||||
this.keyValue = keyValue;
|
||||
}
|
||||
|
||||
/**
|
||||
* The child this data was forwarded to.
|
||||
*
|
||||
* @return The child name, or {@code null} if it was broadcasted.
|
||||
*/
|
||||
public String childName() {
|
||||
return childName;
|
||||
}
|
||||
|
||||
/**
|
||||
* The timestamp attached to the forwarded record.
|
||||
*
|
||||
* @return A timestamp, or {@code -1} if none was forwarded.
|
||||
*/
|
||||
public long timestamp() {
|
||||
return timestamp;
|
||||
}
|
||||
|
||||
/**
|
||||
* The data forwarded.
|
||||
*
|
||||
* @return A key/value pair. Not null.
|
||||
*/
|
||||
public KeyValue keyValue() {
|
||||
return keyValue;
|
||||
}
|
||||
}
|
||||
|
||||
// contructors ================================================
|
||||
|
||||
/**
|
||||
* Create a {@link MockProcessorContext} with dummy {@code config} and {@code taskId} and {@code null} {@code stateDir}.
|
||||
* Most unit tests using this mock won't need to know the taskId,
|
||||
* and most unit tests should be able to get by with the
|
||||
* {@link InMemoryKeyValueStore}, so the stateDir won't matter.
|
||||
*/
|
||||
public MockProcessorContext() {
|
||||
//noinspection DoubleBraceInitialization
|
||||
this(
|
||||
new Properties() {
|
||||
{
|
||||
put(StreamsConfig.APPLICATION_ID_CONFIG, "");
|
||||
put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "");
|
||||
}
|
||||
},
|
||||
new TaskId(0, 0),
|
||||
null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a {@link MockProcessorContext} with dummy {@code taskId} and {@code null} {@code stateDir}.
|
||||
* Most unit tests using this mock won't need to know the taskId,
|
||||
* and most unit tests should be able to get by with the
|
||||
* {@link InMemoryKeyValueStore}, so the stateDir won't matter.
|
||||
*
|
||||
* @param config a Properties object, used to configure the context and the processor.
|
||||
*/
|
||||
public MockProcessorContext(final Properties config) {
|
||||
this(config, new TaskId(0, 0), null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a {@link MockProcessorContext} with a specified taskId and null stateDir.
|
||||
*
|
||||
* @param config a {@link Properties} object, used to configure the context and the processor.
|
||||
* @param taskId a {@link TaskId}, which the context makes available via {@link MockProcessorContext#taskId()}.
|
||||
* @param stateDir a {@link File}, which the context makes available viw {@link MockProcessorContext#stateDir()}.
|
||||
*/
|
||||
public MockProcessorContext(final Properties config, final TaskId taskId, final File stateDir) {
|
||||
final StreamsConfig streamsConfig = new StreamsConfig(config);
|
||||
this.taskId = taskId;
|
||||
this.config = streamsConfig;
|
||||
this.stateDir = stateDir;
|
||||
this.metrics = new StreamsMetricsImpl(new Metrics(), "mock-processor-context", new HashMap<String, String>());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String applicationId() {
|
||||
return config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TaskId taskId() {
|
||||
return taskId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> appConfigs() {
|
||||
final Map<String, Object> combined = new HashMap<>();
|
||||
combined.putAll(config.originals());
|
||||
combined.putAll(config.values());
|
||||
return combined;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> appConfigsWithPrefix(final String prefix) {
|
||||
return config.originalsWithPrefix(prefix);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Serde<?> keySerde() {
|
||||
return config.defaultKeySerde();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Serde<?> valueSerde() {
|
||||
return config.defaultValueSerde();
|
||||
}
|
||||
|
||||
@Override
|
||||
public File stateDir() {
|
||||
return stateDir;
|
||||
}
|
||||
|
||||
@Override
|
||||
public StreamsMetrics metrics() {
|
||||
return metrics;
|
||||
}
|
||||
|
||||
// settable record metadata ================================================
|
||||
|
||||
/**
|
||||
* The context exposes these metadata for use in the processor. Normally, they are set by the Kafka Streams framework,
|
||||
* but for the purpose of driving unit tests, you can set them directly.
|
||||
*
|
||||
* @param topic A topic name
|
||||
* @param partition A partition number
|
||||
* @param offset A record offset
|
||||
* @param timestamp A record timestamp
|
||||
*/
|
||||
public void setRecordMetadata(final String topic, final int partition, final long offset, final long timestamp) {
|
||||
this.topic = topic;
|
||||
this.partition = partition;
|
||||
this.offset = offset;
|
||||
this.timestamp = timestamp;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* The context exposes this metadata for use in the processor. Normally, they are set by the Kafka Streams framework,
|
||||
* but for the purpose of driving unit tests, you can set it directly. Setting this attribute doesn't affect the others.
|
||||
*
|
||||
* @param topic A topic name
|
||||
*/
|
||||
public void setTopic(final String topic) {
|
||||
this.topic = topic;
|
||||
}
|
||||
|
||||
/**
|
||||
* The context exposes this metadata for use in the processor. Normally, they are set by the Kafka Streams framework,
|
||||
* but for the purpose of driving unit tests, you can set it directly. Setting this attribute doesn't affect the others.
|
||||
*
|
||||
* @param partition A partition number
|
||||
*/
|
||||
public void setPartition(final int partition) {
|
||||
this.partition = partition;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* The context exposes this metadata for use in the processor. Normally, they are set by the Kafka Streams framework,
|
||||
* but for the purpose of driving unit tests, you can set it directly. Setting this attribute doesn't affect the others.
|
||||
*
|
||||
* @param offset A record offset
|
||||
*/
|
||||
public void setOffset(final long offset) {
|
||||
this.offset = offset;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* The context exposes this metadata for use in the processor. Normally, they are set by the Kafka Streams framework,
|
||||
* but for the purpose of driving unit tests, you can set it directly. Setting this attribute doesn't affect the others.
|
||||
*
|
||||
* @param timestamp A record timestamp
|
||||
*/
|
||||
public void setTimestamp(final long timestamp) {
|
||||
this.timestamp = timestamp;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String topic() {
|
||||
if (topic == null) {
|
||||
throw new IllegalStateException("Topic must be set before use via setRecordMetadata() or setTopic().");
|
||||
}
|
||||
return topic;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int partition() {
|
||||
if (partition == null) {
|
||||
throw new IllegalStateException("Partition must be set before use via setRecordMetadata() or setPartition().");
|
||||
}
|
||||
return partition;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long offset() {
|
||||
if (offset == null) {
|
||||
throw new IllegalStateException("Offset must be set before use via setRecordMetadata() or setOffset().");
|
||||
}
|
||||
return offset;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long timestamp() {
|
||||
if (timestamp == null) {
|
||||
throw new IllegalStateException("Timestamp must be set before use via setRecordMetadata() or setTimestamp().");
|
||||
}
|
||||
return timestamp;
|
||||
}
|
||||
|
||||
// mocks ================================================
|
||||
|
||||
|
||||
@Override
|
||||
public void register(final StateStore store,
|
||||
final boolean loggingEnabledIsDeprecatedAndIgnored,
|
||||
final StateRestoreCallback stateRestoreCallbackIsIgnoredInMock) {
|
||||
stateStores.put(store.name(), store);
|
||||
}
|
||||
|
||||
@Override
|
||||
public StateStore getStateStore(final String name) {
|
||||
return stateStores.get(name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Cancellable schedule(final long intervalMs, final PunctuationType type, final Punctuator callback) {
|
||||
final CapturedPunctuator capturedPunctuator = new CapturedPunctuator(intervalMs, type, callback);
|
||||
|
||||
punctuators.add(capturedPunctuator);
|
||||
|
||||
return new Cancellable() {
|
||||
@Override
|
||||
public void cancel() {
|
||||
capturedPunctuator.cancel();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public void schedule(final long interval) {
|
||||
throw new UnsupportedOperationException(
|
||||
"schedule() is deprecated and not supported in Mock. " +
|
||||
"Use schedule(final long intervalMs, final PunctuationType type, final Punctuator callback) instead."
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the punctuators scheduled so far. The returned list is not affected by subsequent calls to {@code schedule(...)}.
|
||||
*
|
||||
* @return A list of captured punctuators.
|
||||
*/
|
||||
public List<CapturedPunctuator> scheduledPunctuators() {
|
||||
final LinkedList<CapturedPunctuator> capturedPunctuators = new LinkedList<>();
|
||||
capturedPunctuators.addAll(punctuators);
|
||||
return capturedPunctuators;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <K, V> void forward(final K key, final V value) {
|
||||
//noinspection unchecked
|
||||
capturedForwards.add(new CapturedForward(To.all(), new KeyValue(key, value)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public <K, V> void forward(final K key, final V value, final To to) {
|
||||
//noinspection unchecked
|
||||
capturedForwards.add(new CapturedForward(to, new KeyValue(key, value)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public <K, V> void forward(final K key, final V value, final int childIndex) {
|
||||
throw new UnsupportedOperationException(
|
||||
"Forwarding to a child by index is deprecated. " +
|
||||
"Please transition processors to forward using a 'To' object instead."
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <K, V> void forward(final K key, final V value, final String childName) {
|
||||
throw new UnsupportedOperationException(
|
||||
"Forwarding to a child by name is deprecated. " +
|
||||
"Please transition processors to forward using 'To.child(childName)' instead."
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all the forwarded data this context has observed. The returned list will not be
|
||||
* affected by subsequent interactions with the context. The data in the list is in the same order as the calls to
|
||||
* {@code forward(...)}.
|
||||
*
|
||||
* @return A list of key/value pairs that were previously passed to the context.
|
||||
*/
|
||||
public List<CapturedForward> forwarded() {
|
||||
final LinkedList<CapturedForward> result = new LinkedList<>();
|
||||
result.addAll(capturedForwards);
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all the forwarded data this context has observed for a specific child by name.
|
||||
* The returned list will not be affected by subsequent interactions with the context.
|
||||
* The data in the list is in the same order as the calls to {@code forward(...)}.
|
||||
*
|
||||
* @param childName The child name to retrieve forwards for
|
||||
* @return A list of key/value pairs that were previously passed to the context.
|
||||
*/
|
||||
public List<CapturedForward> forwarded(final String childName) {
|
||||
final LinkedList<CapturedForward> result = new LinkedList<>();
|
||||
for (final CapturedForward capture : capturedForwards) {
|
||||
if (capture.childName() == null || capture.childName().equals(childName)) {
|
||||
result.add(capture);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Clear the captured forwarded data.
|
||||
*/
|
||||
public void resetForwards() {
|
||||
capturedForwards.clear();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void commit() {
|
||||
committed = true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Whether {@link ProcessorContext#commit()} has been called in this context.
|
||||
*
|
||||
* @return {@code true} iff {@link ProcessorContext#commit()} has been called in this context since construction or reset.
|
||||
*/
|
||||
public boolean committed() {
|
||||
return committed;
|
||||
}
|
||||
|
||||
/**
|
||||
* Reset the commit capture to {@code false} (whether or not it was previously {@code true}).
|
||||
*/
|
||||
public void resetCommit() {
|
||||
committed = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RecordCollector recordCollector() {
|
||||
// This interface is assumed by state stores that add change-logging.
|
||||
// Rather than risk a mysterious ClassCastException during unit tests, throw an explanatory exception.
|
||||
|
||||
throw new UnsupportedOperationException("MockProcessorContext does not provide record collection. " +
|
||||
"For processor unit tests, use an in-memory state store with change-logging disabled. " +
|
||||
"Alternatively, use the TopologyTestDriver for testing processor/store/topology integration.");
|
||||
}
|
||||
}
|
|
@ -0,0 +1,406 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.streams;
|
||||
|
||||
import org.apache.kafka.common.serialization.Serdes;
|
||||
import org.apache.kafka.streams.processor.AbstractProcessor;
|
||||
import org.apache.kafka.streams.processor.MockProcessorContext;
|
||||
import org.apache.kafka.streams.processor.MockProcessorContext.CapturedForward;
|
||||
import org.apache.kafka.streams.processor.Processor;
|
||||
import org.apache.kafka.streams.processor.ProcessorContext;
|
||||
import org.apache.kafka.streams.processor.PunctuationType;
|
||||
import org.apache.kafka.streams.processor.Punctuator;
|
||||
import org.apache.kafka.streams.processor.TaskId;
|
||||
import org.apache.kafka.streams.processor.To;
|
||||
import org.apache.kafka.streams.state.KeyValueStore;
|
||||
import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.Iterator;
|
||||
import java.util.Properties;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
public class MockProcessorContextTest {
|
||||
@Test
|
||||
public void shouldCaptureOutputRecords() {
|
||||
final AbstractProcessor<String, Long> processor = new AbstractProcessor<String, Long>() {
|
||||
@Override
|
||||
public void process(final String key, final Long value) {
|
||||
context().forward(key + value, key.length() + value);
|
||||
}
|
||||
};
|
||||
|
||||
final MockProcessorContext context = new MockProcessorContext();
|
||||
processor.init(context);
|
||||
|
||||
processor.process("foo", 5L);
|
||||
processor.process("barbaz", 50L);
|
||||
|
||||
final Iterator<CapturedForward> forwarded = context.forwarded().iterator();
|
||||
assertEquals(new KeyValue<>("foo5", 8L), forwarded.next().keyValue());
|
||||
assertEquals(new KeyValue<>("barbaz50", 56L), forwarded.next().keyValue());
|
||||
assertFalse(forwarded.hasNext());
|
||||
|
||||
context.resetForwards();
|
||||
|
||||
assertEquals(0, context.forwarded().size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldCaptureOutputRecordsUsingTo() {
|
||||
final AbstractProcessor<String, Long> processor = new AbstractProcessor<String, Long>() {
|
||||
@Override
|
||||
public void process(final String key, final Long value) {
|
||||
context().forward(key + value, key.length() + value, To.all());
|
||||
}
|
||||
};
|
||||
|
||||
final MockProcessorContext context = new MockProcessorContext();
|
||||
|
||||
processor.init(context);
|
||||
|
||||
processor.process("foo", 5L);
|
||||
processor.process("barbaz", 50L);
|
||||
|
||||
final Iterator<CapturedForward> forwarded = context.forwarded().iterator();
|
||||
assertEquals(new KeyValue<>("foo5", 8L), forwarded.next().keyValue());
|
||||
assertEquals(new KeyValue<>("barbaz50", 56L), forwarded.next().keyValue());
|
||||
assertFalse(forwarded.hasNext());
|
||||
|
||||
context.resetForwards();
|
||||
|
||||
assertEquals(0, context.forwarded().size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldCaptureRecordsOutputToChildByName() {
|
||||
final AbstractProcessor<String, Long> processor = new AbstractProcessor<String, Long>() {
|
||||
private int count = 0;
|
||||
|
||||
@Override
|
||||
public void process(final String key, final Long value) {
|
||||
if (count == 0) {
|
||||
context().forward("start", -1L, To.all()); // broadcast
|
||||
}
|
||||
final To toChild = count % 2 == 0 ? To.child("george") : To.child("pete");
|
||||
context().forward(key + value, key.length() + value, toChild);
|
||||
count++;
|
||||
}
|
||||
};
|
||||
|
||||
final MockProcessorContext context = new MockProcessorContext();
|
||||
|
||||
processor.init(context);
|
||||
|
||||
processor.process("foo", 5L);
|
||||
processor.process("barbaz", 50L);
|
||||
|
||||
{
|
||||
final Iterator<CapturedForward> forwarded = context.forwarded().iterator();
|
||||
|
||||
final CapturedForward forward1 = forwarded.next();
|
||||
assertEquals(new KeyValue<>("start", -1L), forward1.keyValue());
|
||||
assertEquals(null, forward1.childName());
|
||||
|
||||
final CapturedForward forward2 = forwarded.next();
|
||||
assertEquals(new KeyValue<>("foo5", 8L), forward2.keyValue());
|
||||
assertEquals("george", forward2.childName());
|
||||
|
||||
final CapturedForward forward3 = forwarded.next();
|
||||
assertEquals(new KeyValue<>("barbaz50", 56L), forward3.keyValue());
|
||||
assertEquals("pete", forward3.childName());
|
||||
|
||||
assertFalse(forwarded.hasNext());
|
||||
}
|
||||
|
||||
{
|
||||
final Iterator<CapturedForward> forwarded = context.forwarded("george").iterator();
|
||||
assertEquals(new KeyValue<>("start", -1L), forwarded.next().keyValue());
|
||||
assertEquals(new KeyValue<>("foo5", 8L), forwarded.next().keyValue());
|
||||
assertFalse(forwarded.hasNext());
|
||||
}
|
||||
|
||||
{
|
||||
final Iterator<CapturedForward> forwarded = context.forwarded("pete").iterator();
|
||||
assertEquals(new KeyValue<>("start", -1L), forwarded.next().keyValue());
|
||||
assertEquals(new KeyValue<>("barbaz50", 56L), forwarded.next().keyValue());
|
||||
assertFalse(forwarded.hasNext());
|
||||
}
|
||||
|
||||
{
|
||||
final Iterator<CapturedForward> forwarded = context.forwarded("steve").iterator();
|
||||
assertEquals(new KeyValue<>("start", -1L), forwarded.next().keyValue());
|
||||
assertFalse(forwarded.hasNext());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldThrowIfForwardedWithDeprecatedChildIndex() {
|
||||
final AbstractProcessor<String, Long> processor = new AbstractProcessor<String, Long>() {
|
||||
@Override
|
||||
public void process(final String key, final Long value) {
|
||||
//noinspection deprecation
|
||||
context().forward(key, value, 0);
|
||||
}
|
||||
};
|
||||
|
||||
final MockProcessorContext context = new MockProcessorContext();
|
||||
|
||||
processor.init(context);
|
||||
|
||||
try {
|
||||
processor.process("foo", 5L);
|
||||
fail("Should have thrown an UnsupportedOperationException.");
|
||||
} catch (final UnsupportedOperationException expected) {
|
||||
// expected
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldThrowIfForwardedWithDeprecatedChildName() {
|
||||
final AbstractProcessor<String, Long> processor = new AbstractProcessor<String, Long>() {
|
||||
@Override
|
||||
public void process(final String key, final Long value) {
|
||||
//noinspection deprecation
|
||||
context().forward(key, value, "child1");
|
||||
}
|
||||
};
|
||||
|
||||
final MockProcessorContext context = new MockProcessorContext();
|
||||
|
||||
processor.init(context);
|
||||
|
||||
try {
|
||||
processor.process("foo", 5L);
|
||||
fail("Should have thrown an UnsupportedOperationException.");
|
||||
} catch (final UnsupportedOperationException expected) {
|
||||
// expected
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldCaptureCommitsAndAllowReset() {
|
||||
final AbstractProcessor<String, Long> processor = new AbstractProcessor<String, Long>() {
|
||||
private int count = 0;
|
||||
|
||||
@Override
|
||||
public void process(final String key, final Long value) {
|
||||
if (++count > 2) context().commit();
|
||||
}
|
||||
};
|
||||
|
||||
final MockProcessorContext context = new MockProcessorContext();
|
||||
|
||||
processor.init(context);
|
||||
|
||||
processor.process("foo", 5L);
|
||||
processor.process("barbaz", 50L);
|
||||
|
||||
assertFalse(context.committed());
|
||||
|
||||
processor.process("foobar", 500L);
|
||||
|
||||
assertTrue(context.committed());
|
||||
|
||||
context.resetCommit();
|
||||
|
||||
assertFalse(context.committed());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldStoreAndReturnStateStores() {
|
||||
final AbstractProcessor<String, Long> processor = new AbstractProcessor<String, Long>() {
|
||||
@Override
|
||||
public void process(final String key, final Long value) {
|
||||
//noinspection unchecked
|
||||
final KeyValueStore<String, Long> stateStore = (KeyValueStore<String, Long>) context().getStateStore("my-state");
|
||||
stateStore.put(key, (stateStore.get(key) == null ? 0 : stateStore.get(key)) + value);
|
||||
stateStore.put("all", (stateStore.get("all") == null ? 0 : stateStore.get("all")) + value);
|
||||
}
|
||||
};
|
||||
|
||||
final MockProcessorContext context = new MockProcessorContext();
|
||||
final KeyValueStore<String, Long> store = new InMemoryKeyValueStore<>("my-state", Serdes.String(), Serdes.Long());
|
||||
context.register(store, false, null);
|
||||
|
||||
store.init(context, store);
|
||||
processor.init(context);
|
||||
|
||||
processor.process("foo", 5L);
|
||||
processor.process("bar", 50L);
|
||||
|
||||
assertEquals(5L, (long) store.get("foo"));
|
||||
assertEquals(50L, (long) store.get("bar"));
|
||||
assertEquals(55L, (long) store.get("all"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldCaptureApplicationAndRecordMetadata() {
|
||||
final Properties config = new Properties();
|
||||
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "testMetadata");
|
||||
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "");
|
||||
|
||||
final AbstractProcessor<String, Object> processor = new AbstractProcessor<String, Object>() {
|
||||
@Override
|
||||
public void process(final String key, final Object value) {
|
||||
context().forward("appId", context().applicationId());
|
||||
context().forward("taskId", context().taskId());
|
||||
|
||||
context().forward("topic", context().topic());
|
||||
context().forward("partition", context().partition());
|
||||
context().forward("offset", context().offset());
|
||||
context().forward("timestamp", context().timestamp());
|
||||
|
||||
context().forward("key", key);
|
||||
context().forward("value", value);
|
||||
}
|
||||
};
|
||||
|
||||
final MockProcessorContext context = new MockProcessorContext(config);
|
||||
processor.init(context);
|
||||
|
||||
try {
|
||||
processor.process("foo", 5L);
|
||||
fail("Should have thrown an exception.");
|
||||
} catch (final IllegalStateException expected) {
|
||||
// expected, since the record metadata isn't initialized
|
||||
}
|
||||
|
||||
context.resetForwards();
|
||||
context.setRecordMetadata("t1", 0, 0L, 0L);
|
||||
|
||||
{
|
||||
processor.process("foo", 5L);
|
||||
final Iterator<CapturedForward> forwarded = context.forwarded().iterator();
|
||||
assertEquals(new KeyValue<>("appId", "testMetadata"), forwarded.next().keyValue());
|
||||
assertEquals(new KeyValue<>("taskId", new TaskId(0, 0)), forwarded.next().keyValue());
|
||||
assertEquals(new KeyValue<>("topic", "t1"), forwarded.next().keyValue());
|
||||
assertEquals(new KeyValue<>("partition", 0), forwarded.next().keyValue());
|
||||
assertEquals(new KeyValue<>("offset", 0L), forwarded.next().keyValue());
|
||||
assertEquals(new KeyValue<>("timestamp", 0L), forwarded.next().keyValue());
|
||||
assertEquals(new KeyValue<>("key", "foo"), forwarded.next().keyValue());
|
||||
assertEquals(new KeyValue<>("value", 5L), forwarded.next().keyValue());
|
||||
}
|
||||
|
||||
context.resetForwards();
|
||||
|
||||
// record metadata should be "sticky"
|
||||
context.setOffset(1L);
|
||||
context.setTimestamp(10L);
|
||||
|
||||
{
|
||||
processor.process("bar", 50L);
|
||||
final Iterator<CapturedForward> forwarded = context.forwarded().iterator();
|
||||
assertEquals(new KeyValue<>("appId", "testMetadata"), forwarded.next().keyValue());
|
||||
assertEquals(new KeyValue<>("taskId", new TaskId(0, 0)), forwarded.next().keyValue());
|
||||
assertEquals(new KeyValue<>("topic", "t1"), forwarded.next().keyValue());
|
||||
assertEquals(new KeyValue<>("partition", 0), forwarded.next().keyValue());
|
||||
assertEquals(new KeyValue<>("offset", 1L), forwarded.next().keyValue());
|
||||
assertEquals(new KeyValue<>("timestamp", 10L), forwarded.next().keyValue());
|
||||
assertEquals(new KeyValue<>("key", "bar"), forwarded.next().keyValue());
|
||||
assertEquals(new KeyValue<>("value", 50L), forwarded.next().keyValue());
|
||||
}
|
||||
|
||||
context.resetForwards();
|
||||
// record metadata should be "sticky"
|
||||
context.setTopic("t2");
|
||||
context.setPartition(30);
|
||||
|
||||
{
|
||||
processor.process("baz", 500L);
|
||||
final Iterator<CapturedForward> forwarded = context.forwarded().iterator();
|
||||
assertEquals(new KeyValue<>("appId", "testMetadata"), forwarded.next().keyValue());
|
||||
assertEquals(new KeyValue<>("taskId", new TaskId(0, 0)), forwarded.next().keyValue());
|
||||
assertEquals(new KeyValue<>("topic", "t2"), forwarded.next().keyValue());
|
||||
assertEquals(new KeyValue<>("partition", 30), forwarded.next().keyValue());
|
||||
assertEquals(new KeyValue<>("offset", 1L), forwarded.next().keyValue());
|
||||
assertEquals(new KeyValue<>("timestamp", 10L), forwarded.next().keyValue());
|
||||
assertEquals(new KeyValue<>("key", "baz"), forwarded.next().keyValue());
|
||||
assertEquals(new KeyValue<>("value", 500L), forwarded.next().keyValue());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldCapturePunctuator() {
|
||||
final Processor<String, Long> processor = new Processor<String, Long>() {
|
||||
@Override
|
||||
public void init(final ProcessorContext context) {
|
||||
context.schedule(
|
||||
1000L,
|
||||
PunctuationType.WALL_CLOCK_TIME,
|
||||
new Punctuator() {
|
||||
@Override
|
||||
public void punctuate(final long timestamp) {
|
||||
context.commit();
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(final String key, final Long value) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void punctuate(final long timestamp) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
}
|
||||
};
|
||||
|
||||
final MockProcessorContext context = new MockProcessorContext();
|
||||
|
||||
processor.init(context);
|
||||
|
||||
final MockProcessorContext.CapturedPunctuator capturedPunctuator = context.scheduledPunctuators().get(0);
|
||||
assertEquals(1000L, capturedPunctuator.getIntervalMs());
|
||||
assertEquals(PunctuationType.WALL_CLOCK_TIME, capturedPunctuator.getType());
|
||||
assertFalse(capturedPunctuator.cancelled());
|
||||
|
||||
final Punctuator punctuator = capturedPunctuator.getPunctuator();
|
||||
assertFalse(context.committed());
|
||||
punctuator.punctuate(1234L);
|
||||
assertTrue(context.committed());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void fullConstructorShouldSetAllExpectedAttributes() {
|
||||
final Properties config = new Properties();
|
||||
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "testFullConstructor");
|
||||
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "");
|
||||
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
|
||||
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
|
||||
|
||||
final File dummyFile = new File("");
|
||||
final MockProcessorContext context = new MockProcessorContext(config, new TaskId(1, 1), dummyFile);
|
||||
|
||||
assertEquals("testFullConstructor", context.applicationId());
|
||||
assertEquals(new TaskId(1, 1), context.taskId());
|
||||
assertEquals("testFullConstructor", context.appConfigs().get(StreamsConfig.APPLICATION_ID_CONFIG));
|
||||
assertEquals("testFullConstructor", context.appConfigsWithPrefix("application.").get("id"));
|
||||
assertEquals(Serdes.String().getClass(), context.keySerde().getClass());
|
||||
assertEquals(Serdes.Long().getClass(), context.valueSerde().getClass());
|
||||
assertEquals(dummyFile, context.stateDir());
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue