MINOR: Add regression tests for KTable mapValues and filter (#5134)

Reviewers: Bill Bejeck <bill@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
This commit is contained in:
John Roesler 2018-06-05 13:12:57 -05:00 committed by Guozhang Wang
parent 8a166f8c28
commit ba5fd3c8a4
1 changed files with 144 additions and 1 deletions

View File

@ -17,8 +17,10 @@
package org.apache.kafka.streams;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.TimeWindows;
@ -27,12 +29,14 @@ import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.TopicNameExtractor;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockStateStore;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays;
@ -101,7 +105,7 @@ public class TopologyTest {
@Test(expected = NullPointerException.class)
public void shouldNotAllowNullTopicChooserWhenAddingSink() {
topology.addSink("name", (TopicNameExtractor) null);
topology.addSink("name", (TopicNameExtractor<Object, Object>) null);
}
@Test(expected = NullPointerException.class)
@ -859,6 +863,145 @@ public class TopologyTest {
);
}
@Test
public void kTableNonMaterializedMapValuesShouldPreserveTopologyStructure() {
final StreamsBuilder builder = new StreamsBuilder();
final KTable<Object, Object> table = builder.table("input-topic");
table.mapValues((readOnlyKey, value) -> null);
final TopologyDescription describe = builder.build().describe();
System.out.println(describe);
Assert.assertEquals(
"Topologies:\n" +
" Sub-topology: 0\n" +
" Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic])\n" +
" --> KTABLE-SOURCE-0000000002\n" +
" Processor: KTABLE-SOURCE-0000000002 (stores: [input-topic-STATE-STORE-0000000000])\n" +
" --> KTABLE-MAPVALUES-0000000003\n" +
" <-- KSTREAM-SOURCE-0000000001\n" +
" Processor: KTABLE-MAPVALUES-0000000003 (stores: [])\n" +
" --> none\n" +
" <-- KTABLE-SOURCE-0000000002\n\n",
describe.toString());
}
@Test
public void kTableAnonymousMaterializedMapValuesShouldPreserveTopologyStructure() {
final StreamsBuilder builder = new StreamsBuilder();
final KTable<Object, Object> table = builder.table("input-topic");
table.mapValues(
(readOnlyKey, value) -> null,
Materialized.with(null, null));
final TopologyDescription describe = builder.build().describe();
Assert.assertEquals(
"Topologies:\n" +
" Sub-topology: 0\n" +
" Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic])\n" +
" --> KTABLE-SOURCE-0000000002\n" +
" Processor: KTABLE-SOURCE-0000000002 (stores: [input-topic-STATE-STORE-0000000000])\n" +
" --> KTABLE-MAPVALUES-0000000004\n" +
" <-- KSTREAM-SOURCE-0000000001\n" +
// previously, this was
// Processor: KTABLE-MAPVALUES-0000000004 (stores: [KTABLE-MAPVALUES-STATE-STORE-0000000003]
// but we added a change not to materialize non-queriable stores. This change shouldn't break compatibility.
" Processor: KTABLE-MAPVALUES-0000000004 (stores: [])\n" +
" --> none\n" +
" <-- KTABLE-SOURCE-0000000002\n" +
"\n",
describe.toString());
}
@Test
public void kTableNamedMaterializedMapValuesShouldPreserveTopologyStructure() {
final StreamsBuilder builder = new StreamsBuilder();
final KTable<Object, Object> table = builder.table("input-topic");
table.mapValues(
(readOnlyKey, value) -> null,
Materialized.<Object, Object, KeyValueStore<Bytes, byte[]>>as("store-name").withKeySerde(null).withValueSerde(null));
final TopologyDescription describe = builder.build().describe();
Assert.assertEquals(
"Topologies:\n" +
" Sub-topology: 0\n" +
" Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic])\n" +
" --> KTABLE-SOURCE-0000000002\n" +
" Processor: KTABLE-SOURCE-0000000002 (stores: [input-topic-STATE-STORE-0000000000])\n" +
" --> KTABLE-MAPVALUES-0000000003\n" +
" <-- KSTREAM-SOURCE-0000000001\n" +
" Processor: KTABLE-MAPVALUES-0000000003 (stores: [store-name])\n" +
" --> none\n" +
" <-- KTABLE-SOURCE-0000000002\n" +
"\n",
describe.toString());
}
@Test
public void kTableNonMaterializedFilterShouldPreserveTopologyStructure() {
final StreamsBuilder builder = new StreamsBuilder();
final KTable<Object, Object> table = builder.table("input-topic");
table.filter((key, value) -> false);
final TopologyDescription describe = builder.build().describe();
Assert.assertEquals(
"Topologies:\n" +
" Sub-topology: 0\n" +
" Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic])\n" +
" --> KTABLE-SOURCE-0000000002\n" +
" Processor: KTABLE-SOURCE-0000000002 (stores: [input-topic-STATE-STORE-0000000000])\n" +
" --> KTABLE-FILTER-0000000003\n" +
" <-- KSTREAM-SOURCE-0000000001\n" +
" Processor: KTABLE-FILTER-0000000003 (stores: [])\n" +
" --> none\n" +
" <-- KTABLE-SOURCE-0000000002\n\n",
describe.toString());
}
@Test
public void kTableAnonymousMaterializedFilterShouldPreserveTopologyStructure() {
final StreamsBuilder builder = new StreamsBuilder();
final KTable<Object, Object> table = builder.table("input-topic");
table.filter(
(key, value) -> false,
Materialized.with(null, null));
final TopologyDescription describe = builder.build().describe();
Assert.assertEquals(
"Topologies:\n" +
" Sub-topology: 0\n" +
" Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic])\n" +
" --> KTABLE-SOURCE-0000000002\n" +
" Processor: KTABLE-SOURCE-0000000002 (stores: [input-topic-STATE-STORE-0000000000])\n" +
" --> KTABLE-FILTER-0000000004\n" +
" <-- KSTREAM-SOURCE-0000000001\n" +
// Previously, this was
// Processor: KTABLE-FILTER-0000000004 (stores: [KTABLE-FILTER-STATE-STORE-0000000003]
// but we added a change not to materialize non-queriable stores. This change shouldn't break compatibility.
" Processor: KTABLE-FILTER-0000000004 (stores: [])\n" +
" --> none\n" +
" <-- KTABLE-SOURCE-0000000002\n" +
"\n",
describe.toString());
}
@Test
public void kTableNamedMaterializedFilterShouldPreserveTopologyStructure() {
final StreamsBuilder builder = new StreamsBuilder();
final KTable<Object, Object> table = builder.table("input-topic");
table.filter(
(key, value) -> false,
Materialized.<Object, Object, KeyValueStore<Bytes, byte[]>>as("store-name").withKeySerde(null).withValueSerde(null));
final TopologyDescription describe = builder.build().describe();
Assert.assertEquals(
"Topologies:\n" +
" Sub-topology: 0\n" +
" Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic])\n" +
" --> KTABLE-SOURCE-0000000002\n" +
" Processor: KTABLE-SOURCE-0000000002 (stores: [input-topic-STATE-STORE-0000000000])\n" +
" --> KTABLE-FILTER-0000000003\n" +
" <-- KSTREAM-SOURCE-0000000001\n" +
" Processor: KTABLE-FILTER-0000000003 (stores: [store-name])\n" +
" --> none\n" +
" <-- KTABLE-SOURCE-0000000002\n" +
"\n",
describe.toString());
}
private TopologyDescription.Source addSource(final String sourceName,
final String... sourceTopic) {
topology.addSource(null, sourceName, null, null, null, sourceTopic);