KAFKA-14834: [3/N] Timestamped lookups for stream-table joins (#13509)

This PR updates the stream-table join processors, including both KStream-KTable and KStream-GlobalKTable joins, to perform timestamped lookups when the (global) table is versioned, as specified in KIP-914.

Reviewers: Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
Victoria Xia 2023-04-12 19:54:15 -04:00 committed by GitHub
parent 750a389308
commit 88e2d6b8c2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 212 additions and 44 deletions

View File

@ -24,6 +24,7 @@ import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -83,7 +84,10 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, VOut> extends ContextualProcess
}
droppedRecordsSensor.record();
} else {
final V2 value2 = getValueOrNull(valueGetter.get(mappedKey));
final ValueAndTimestamp<V2> valueAndTimestamp2 = valueGetter.isVersioned()
? valueGetter.get(mappedKey, record.timestamp())
: valueGetter.get(mappedKey);
final V2 value2 = getValueOrNull(valueAndTimestamp2);
if (leftJoin || value2 != null) {
context().forward(record.withValue(joiner.apply(record.key(), record.value(), value2)));
}

View File

@ -16,10 +16,13 @@
*/
package org.apache.kafka.streams.integration;
import java.time.Duration;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.test.TestRecord;
import org.apache.kafka.test.IntegrationTest;
import org.junit.Before;
@ -40,6 +43,9 @@ import java.util.List;
@Category({IntegrationTest.class})
@RunWith(value = Parameterized.class)
public class StreamTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
private static final String STORE_NAME = "table-store";
@Rule
public Timeout globalTimeout = Timeout.seconds(600);
private KStream<Long, String> leftStream;
@ -56,14 +62,16 @@ public class StreamTableJoinIntegrationTest extends AbstractJoinIntegrationTest
appID = "stream-table-join-integration-test";
builder = new StreamsBuilder();
rightTable = builder.table(INPUT_TOPIC_RIGHT);
leftStream = builder.stream(INPUT_TOPIC_LEFT);
}
@Test
public void testInner() {
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-inner");
leftStream = builder.stream(INPUT_TOPIC_LEFT);
rightTable = builder.table(INPUT_TOPIC_RIGHT);
leftStream.join(rightTable, valueJoiner).to(OUTPUT_TOPIC);
final List<List<TestRecord<Long, String>>> expectedResult = Arrays.asList(
null,
null,
@ -86,7 +94,6 @@ public class StreamTableJoinIntegrationTest extends AbstractJoinIntegrationTest
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-f", null, 8L))
);
leftStream.join(rightTable, valueJoiner).to(OUTPUT_TOPIC);
runTestWithDriver(expectedResult);
}
@ -94,6 +101,10 @@ public class StreamTableJoinIntegrationTest extends AbstractJoinIntegrationTest
public void testLeft() {
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-left");
leftStream = builder.stream(INPUT_TOPIC_LEFT);
rightTable = builder.table(INPUT_TOPIC_RIGHT);
leftStream.leftJoin(rightTable, valueJoiner).to(OUTPUT_TOPIC);
final List<List<TestRecord<Long, String>>> expectedResult = Arrays.asList(
null,
null,
@ -116,8 +127,74 @@ public class StreamTableJoinIntegrationTest extends AbstractJoinIntegrationTest
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-f", null, 8L))
);
runTestWithDriver(expectedResult);
}
@Test
public void testInnerWithVersionedStore() {
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-inner");
leftStream = builder.stream(INPUT_TOPIC_LEFT);
rightTable = builder.table(INPUT_TOPIC_RIGHT, Materialized.as(
Stores.persistentVersionedKeyValueStore(STORE_NAME, Duration.ofMinutes(5))));
leftStream.join(rightTable, valueJoiner).to(OUTPUT_TOPIC);
final List<List<TestRecord<Long, String>>> expectedResult = Arrays.asList(
null,
null,
null,
null,
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a", null, 5L)),
null,
null,
null,
null,
null,
null,
null,
null,
null,
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)),
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-a", null, 4L)),
null,
null,
null
);
runTestWithDriver(expectedResult);
}
@Test
public void testLeftWithVersionedStore() {
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-left");
leftStream = builder.stream(INPUT_TOPIC_LEFT);
rightTable = builder.table(INPUT_TOPIC_RIGHT, Materialized.as(
Stores.persistentVersionedKeyValueStore(STORE_NAME, Duration.ofMinutes(5))));
leftStream.leftJoin(rightTable, valueJoiner).to(OUTPUT_TOPIC);
final List<List<TestRecord<Long, String>>> expectedResult = Arrays.asList(
null,
null,
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-null", null, 3L)),
null,
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a", null, 5L)),
null,
null,
null,
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-null", null, 9L)),
null,
null,
null,
null,
null,
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)),
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-a", null, 4L)),
null,
null,
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-null", null, 8L))
);
runTestWithDriver(expectedResult);
}
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.kstream.internals;
import java.util.Optional;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
@ -28,6 +29,8 @@ import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.MockApiProcessor;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
@ -49,6 +52,8 @@ public class KStreamGlobalKTableJoinTest {
private final String streamTopic = "streamTopic";
private final String globalTableTopic = "globalTableTopic";
private TestInputTopic<Integer, String> inputStreamTopic;
private TestInputTopic<String, String> inputTableTopic;
private final int[] expectedKeys = {0, 1, 2, 3};
private TopologyTestDriver driver;
@ -57,7 +62,15 @@ public class KStreamGlobalKTableJoinTest {
@Before
public void setUp() {
// use un-versioned store by default
init(Optional.empty());
}
private void initWithVersionedStore(final long historyRetentionMs) {
init(Optional.of(historyRetentionMs));
}
private void init(final Optional<Long> versionedStoreHistoryRetentionMs) {
builder = new StreamsBuilder();
final KStream<Integer, String> stream;
final GlobalKTable<String, String> table; // value of stream optionally contains key of table
@ -67,7 +80,12 @@ public class KStreamGlobalKTableJoinTest {
final Consumed<Integer, String> streamConsumed = Consumed.with(Serdes.Integer(), Serdes.String());
final Consumed<String, String> tableConsumed = Consumed.with(Serdes.String(), Serdes.String());
stream = builder.stream(streamTopic, streamConsumed);
table = builder.globalTable(globalTableTopic, tableConsumed);
if (versionedStoreHistoryRetentionMs.isPresent()) {
table = builder.globalTable(globalTableTopic, tableConsumed, Materialized.as(
Stores.persistentVersionedKeyValueStore("table", Duration.ofMillis(versionedStoreHistoryRetentionMs.get()))));
} else {
table = builder.globalTable(globalTableTopic, tableConsumed);
}
keyMapper = (key, value) -> {
final String[] tokens = value.split(",");
// Value is comma delimited. If second token is present, it's the key to the global ktable.
@ -80,6 +98,10 @@ public class KStreamGlobalKTableJoinTest {
driver = new TopologyTestDriver(builder.build(), props);
processor = supplier.theCapturedProcessor();
// auto-advance stream timestamps by default, but not global table timestamps
inputStreamTopic = driver.createInputTopic(streamTopic, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ofMillis(1L));
inputTableTopic = driver.createInputTopic(globalTableTopic, new StringSerializer(), new StringSerializer());
}
@After
@ -88,8 +110,6 @@ public class KStreamGlobalKTableJoinTest {
}
private void pushToStream(final int messageCount, final String valuePrefix, final boolean includeForeignKey, final boolean includeNullKey) {
final TestInputTopic<Integer, String> inputTopic =
driver.createInputTopic(streamTopic, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ofMillis(1L));
for (int i = 0; i < messageCount; i++) {
String value = valuePrefix + expectedKeys[i];
if (includeForeignKey) {
@ -99,23 +119,19 @@ public class KStreamGlobalKTableJoinTest {
if (includeNullKey && i == 0) {
key = null;
}
inputTopic.pipeInput(key, value);
inputStreamTopic.pipeInput(key, value);
}
}
private void pushToGlobalTable(final int messageCount, final String valuePrefix) {
final TestInputTopic<String, String> inputTopic =
driver.createInputTopic(globalTableTopic, new StringSerializer(), new StringSerializer());
for (int i = 0; i < messageCount; i++) {
inputTopic.pipeInput("FKey" + expectedKeys[i], valuePrefix + expectedKeys[i]);
inputTableTopic.pipeInput("FKey" + expectedKeys[i], valuePrefix + expectedKeys[i]);
}
}
private void pushNullValueToGlobalTable(final int messageCount) {
final TestInputTopic<String, String> inputTopic =
driver.createInputTopic(globalTableTopic, new StringSerializer(), new StringSerializer());
for (int i = 0; i < messageCount; i++) {
inputTopic.pipeInput("FKey" + expectedKeys[i], (String) null);
inputTableTopic.pipeInput("FKey" + expectedKeys[i], (String) null);
}
}
@ -152,8 +168,8 @@ public class KStreamGlobalKTableJoinTest {
// push all four items to the primary stream. this should produce two items.
pushToStream(4, "X", true, false);
processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "X0,FKey0+Y0", 0),
new KeyValueTimestamp<>(1, "X1,FKey1+Y1", 1));
processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "X0,FKey0+Y0", 2),
new KeyValueTimestamp<>(1, "X1,FKey1+Y1", 3));
// push all items to the globalTable. this should not produce any item
@ -163,10 +179,10 @@ public class KStreamGlobalKTableJoinTest {
// push all four items to the primary stream. this should produce four items.
pushToStream(4, "X", true, false);
processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "X0,FKey0+YY0", 0),
new KeyValueTimestamp<>(1, "X1,FKey1+YY1", 1),
new KeyValueTimestamp<>(2, "X2,FKey2+YY2", 2),
new KeyValueTimestamp<>(3, "X3,FKey3+YY3", 3));
processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "X0,FKey0+YY0", 6),
new KeyValueTimestamp<>(1, "X1,FKey1+YY1", 7),
new KeyValueTimestamp<>(2, "X2,FKey2+YY2", 8),
new KeyValueTimestamp<>(3, "X3,FKey3+YY3", 9));
// push all items to the globalTable. this should not produce any item
@ -214,8 +230,8 @@ public class KStreamGlobalKTableJoinTest {
// push all four items to the primary stream. this should produce two items.
pushToStream(4, "XX", true, false);
processor.checkAndClearProcessResult(new KeyValueTimestamp<>(2, "XX2,FKey2+Y2", 2),
new KeyValueTimestamp<>(3, "XX3,FKey3+Y3", 3));
processor.checkAndClearProcessResult(new KeyValueTimestamp<>(2, "XX2,FKey2+Y2", 6),
new KeyValueTimestamp<>(3, "XX3,FKey3+Y3", 7));
}
@Test
@ -246,4 +262,30 @@ public class KStreamGlobalKTableJoinTest {
processor.checkAndClearProcessResult(new KeyValueTimestamp<>(null, "X0,FKey0+Y0", 0),
new KeyValueTimestamp<>(1, "X1,FKey1+Y1", 1));
}
@Test
public void shouldPerformTimestampedGet() {
initWithVersionedStore(1000);
// do not auto-advance stream timestamps for this test
inputStreamTopic = driver.createInputTopic(streamTopic, new IntegerSerializer(), new StringSerializer());
// produce out-of-order records including nulls to table
inputTableTopic.pipeInput("FKey1", "ValueT10", 10);
inputTableTopic.pipeInput("FKey1", "ValueT5", 5);
inputTableTopic.pipeInput("FKey1", null, 7);
inputTableTopic.pipeInput("FKey1", "ValueT12", 12);
// produce records to stream side
inputStreamTopic.pipeInput(1, "ValueS8,FKey1", 8);
inputStreamTopic.pipeInput(2, "ValueS12,FKey1", 12);
inputStreamTopic.pipeInput(3, "ValueS6,FKey1", 6);
inputStreamTopic.pipeInput(4, "ValueS10,FKey1", 10);
inputStreamTopic.pipeInput(5, "ValueS2,FKey1", 2);
processor.checkAndClearProcessResult(
new KeyValueTimestamp<>(2, "ValueS12,FKey1+ValueT12", 12),
new KeyValueTimestamp<>(3, "ValueS6,FKey1+ValueT5", 6),
new KeyValueTimestamp<>(4, "ValueS10,FKey1+ValueT10", 10));
}
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.kstream.internals;
import java.util.Optional;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
@ -28,6 +29,8 @@ import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.MockApiProcessor;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
@ -49,6 +52,8 @@ public class KStreamGlobalKTableLeftJoinTest {
private final String streamTopic = "streamTopic";
private final String globalTableTopic = "globalTableTopic";
private TestInputTopic<Integer, String> inputStreamTopic;
private TestInputTopic<String, String> inputTableTopic;
private final int[] expectedKeys = {0, 1, 2, 3};
private MockApiProcessor<Integer, String, Void, Void> processor;
@ -57,7 +62,15 @@ public class KStreamGlobalKTableLeftJoinTest {
@Before
public void setUp() {
// use un-versioned store by default
init(Optional.empty());
}
private void initWithVersionedStore(final long historyRetentionMs) {
init(Optional.of(historyRetentionMs));
}
private void init(final Optional<Long> versionedStoreHistoryRetentionMs) {
builder = new StreamsBuilder();
final KStream<Integer, String> stream;
final GlobalKTable<String, String> table; // value of stream optionally contains key of table
@ -67,7 +80,12 @@ public class KStreamGlobalKTableLeftJoinTest {
final Consumed<Integer, String> streamConsumed = Consumed.with(Serdes.Integer(), Serdes.String());
final Consumed<String, String> tableConsumed = Consumed.with(Serdes.String(), Serdes.String());
stream = builder.stream(streamTopic, streamConsumed);
table = builder.globalTable(globalTableTopic, tableConsumed);
if (versionedStoreHistoryRetentionMs.isPresent()) {
table = builder.globalTable(globalTableTopic, tableConsumed, Materialized.as(
Stores.persistentVersionedKeyValueStore("table", Duration.ofMillis(versionedStoreHistoryRetentionMs.get()))));
} else {
table = builder.globalTable(globalTableTopic, tableConsumed);
}
keyMapper = (key, value) -> {
final String[] tokens = value.split(",");
// Value is comma delimited. If second token is present, it's the key to the global ktable.
@ -80,6 +98,10 @@ public class KStreamGlobalKTableLeftJoinTest {
driver = new TopologyTestDriver(builder.build(), props);
processor = supplier.theCapturedProcessor();
// auto-advance timestamps by default
inputStreamTopic = driver.createInputTopic(streamTopic, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ofMillis(1L));
inputTableTopic = driver.createInputTopic(globalTableTopic, new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ofMillis(1L));
}
@After
@ -88,8 +110,6 @@ public class KStreamGlobalKTableLeftJoinTest {
}
private void pushToStream(final int messageCount, final String valuePrefix, final boolean includeForeignKey, final boolean includeNullKey) {
final TestInputTopic<Integer, String> inputTopic =
driver.createInputTopic(streamTopic, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ofMillis(1L));
for (int i = 0; i < messageCount; i++) {
String value = valuePrefix + expectedKeys[i];
if (includeForeignKey) {
@ -99,23 +119,19 @@ public class KStreamGlobalKTableLeftJoinTest {
if (includeNullKey && i == 0) {
key = null;
}
inputTopic.pipeInput(key, value);
inputStreamTopic.pipeInput(key, value);
}
}
private void pushToGlobalTable(final int messageCount, final String valuePrefix) {
final TestInputTopic<String, String> inputTopic =
driver.createInputTopic(globalTableTopic, new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ofMillis(1L));
for (int i = 0; i < messageCount; i++) {
inputTopic.pipeInput("FKey" + expectedKeys[i], valuePrefix + expectedKeys[i]);
inputTableTopic.pipeInput("FKey" + expectedKeys[i], valuePrefix + expectedKeys[i]);
}
}
private void pushNullValueToGlobalTable(final int messageCount) {
final TestInputTopic<String, String> inputTopic =
driver.createInputTopic(globalTableTopic, new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ofMillis(1L));
for (int i = 0; i < messageCount; i++) {
inputTopic.pipeInput("FKey" + expectedKeys[i], (String) null);
inputTableTopic.pipeInput("FKey" + expectedKeys[i], (String) null);
}
}
@ -154,10 +170,10 @@ public class KStreamGlobalKTableLeftJoinTest {
// push all four items to the primary stream. this should produce four items.
pushToStream(4, "X", true, false);
processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "X0,FKey0+Y0", 0),
new KeyValueTimestamp<>(1, "X1,FKey1+Y1", 1),
new KeyValueTimestamp<>(2, "X2,FKey2+null", 2),
new KeyValueTimestamp<>(3, "X3,FKey3+null", 3));
processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "X0,FKey0+Y0", 2),
new KeyValueTimestamp<>(1, "X1,FKey1+Y1", 3),
new KeyValueTimestamp<>(2, "X2,FKey2+null", 4),
new KeyValueTimestamp<>(3, "X3,FKey3+null", 5));
// push all items to the globalTable. this should not produce any item
@ -167,10 +183,10 @@ public class KStreamGlobalKTableLeftJoinTest {
// push all four items to the primary stream. this should produce four items.
pushToStream(4, "X", true, false);
processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "X0,FKey0+YY0", 0),
new KeyValueTimestamp<>(1, "X1,FKey1+YY1", 1),
new KeyValueTimestamp<>(2, "X2,FKey2+YY2", 2),
new KeyValueTimestamp<>(3, "X3,FKey3+YY3", 3));
processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "X0,FKey0+YY0", 6),
new KeyValueTimestamp<>(1, "X1,FKey1+YY1", 7),
new KeyValueTimestamp<>(2, "X2,FKey2+YY2", 8),
new KeyValueTimestamp<>(3, "X3,FKey3+YY3", 9));
// push all items to the globalTable. this should not produce any item
@ -220,10 +236,10 @@ public class KStreamGlobalKTableLeftJoinTest {
// push all four items to the primary stream. this should produce four items.
pushToStream(4, "XX", true, false);
processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "XX0,FKey0+null", 0),
new KeyValueTimestamp<>(1, "XX1,FKey1+null", 1),
new KeyValueTimestamp<>(2, "XX2,FKey2+Y2", 2),
new KeyValueTimestamp<>(3, "XX3,FKey3+Y3", 3));
processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "XX0,FKey0+null", 4),
new KeyValueTimestamp<>(1, "XX1,FKey1+null", 5),
new KeyValueTimestamp<>(2, "XX2,FKey2+Y2", 6),
new KeyValueTimestamp<>(3, "XX3,FKey3+Y3", 7));
}
@Test
@ -256,4 +272,33 @@ public class KStreamGlobalKTableLeftJoinTest {
new KeyValueTimestamp<>(2, "X2,FKey2+Y2", 2),
new KeyValueTimestamp<>(3, "X3,FKey3+Y3", 3));
}
@Test
public void shouldPerformTimestampedGet() {
initWithVersionedStore(1000);
// do not auto-advance timestamps for this test
inputStreamTopic = driver.createInputTopic(streamTopic, new IntegerSerializer(), new StringSerializer());
inputTableTopic = driver.createInputTopic(globalTableTopic, new StringSerializer(), new StringSerializer());
// produce out-of-order records including nulls to table
inputTableTopic.pipeInput("FKey1", "ValueT10", 10);
inputTableTopic.pipeInput("FKey1", "ValueT5", 5);
inputTableTopic.pipeInput("FKey1", null, 7);
inputTableTopic.pipeInput("FKey1", "ValueT12", 12);
// produce records to stream side
inputStreamTopic.pipeInput(1, "ValueS8,FKey1", 8);
inputStreamTopic.pipeInput(2, "ValueS12,FKey1", 12);
inputStreamTopic.pipeInput(3, "ValueS6,FKey1", 6);
inputStreamTopic.pipeInput(4, "ValueS10,FKey1", 10);
inputStreamTopic.pipeInput(5, "ValueS2,FKey1", 2);
processor.checkAndClearProcessResult(
new KeyValueTimestamp<>(1, "ValueS8,FKey1+null", 8),
new KeyValueTimestamp<>(2, "ValueS12,FKey1+ValueT12", 12),
new KeyValueTimestamp<>(3, "ValueS6,FKey1+ValueT5", 6),
new KeyValueTimestamp<>(4, "ValueS10,FKey1+ValueT10", 10),
new KeyValueTimestamp<>(5, "ValueS2,FKey1+null", 2));
}
}