KAFKA-14834: [2/N] Test coverage for out-of-order data in joins (#13497)

In preparation for updating DSL join processors to have updated semantics when versioned stores are used (cf KIP-914), this PR adds test coverage for out-of-order data in joins to the existing integration tests for stream-table joins and primary-key table-table joins. Follow-up PRs will build on top of this change by adding new tests for versioned stores, and the out-of-order data will produce different results in those settings.

Reviewers: Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
Victoria Xia 2023-04-11 23:42:55 -04:00 committed by GitHub
parent cb7d0833ee
commit 17b4569d70
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 454 additions and 58 deletions

View File

@ -224,7 +224,7 @@
files="(RecordCollectorTest|StreamsPartitionAssignorTest|StreamThreadTest|StreamTaskTest|TaskManagerTest|TopologyTestDriverTest).java"/> files="(RecordCollectorTest|StreamsPartitionAssignorTest|StreamThreadTest|StreamTaskTest|TaskManagerTest|TopologyTestDriverTest).java"/>
<suppress checks="MethodLength" <suppress checks="MethodLength"
files="(EosIntegrationTest|EosV2UpgradeIntegrationTest|KStreamKStreamJoinTest|RocksDBWindowStoreTest).java"/> files="(EosIntegrationTest|EosV2UpgradeIntegrationTest|KStreamKStreamJoinTest|RocksDBWindowStoreTest|StreamStreamJoinIntegrationTest).java"/>
<suppress checks="ClassDataAbstractionCoupling" <suppress checks="ClassDataAbstractionCoupling"
files=".*[/\\]streams[/\\].*test[/\\].*.java"/> files=".*[/\\]streams[/\\].*test[/\\].*.java"/>

View File

@ -88,31 +88,35 @@ public abstract class AbstractJoinIntegrationTest {
StreamsBuilder builder; StreamsBuilder builder;
private final List<Input<String>> input = Arrays.asList( private final List<Input<String>> input = Arrays.asList(
new Input<>(INPUT_TOPIC_LEFT, null), new Input<>(INPUT_TOPIC_LEFT, null, 1),
new Input<>(INPUT_TOPIC_RIGHT, null), new Input<>(INPUT_TOPIC_RIGHT, null, 2),
new Input<>(INPUT_TOPIC_LEFT, "A"), new Input<>(INPUT_TOPIC_LEFT, "A", 3),
new Input<>(INPUT_TOPIC_RIGHT, "a"), new Input<>(INPUT_TOPIC_RIGHT, "a", 4),
new Input<>(INPUT_TOPIC_LEFT, "B"), new Input<>(INPUT_TOPIC_LEFT, "B", 5),
new Input<>(INPUT_TOPIC_RIGHT, "b"), new Input<>(INPUT_TOPIC_RIGHT, "b", 6),
new Input<>(INPUT_TOPIC_LEFT, null), new Input<>(INPUT_TOPIC_LEFT, null, 7),
new Input<>(INPUT_TOPIC_RIGHT, null), new Input<>(INPUT_TOPIC_RIGHT, null, 8),
new Input<>(INPUT_TOPIC_LEFT, "C"), new Input<>(INPUT_TOPIC_LEFT, "C", 9),
new Input<>(INPUT_TOPIC_RIGHT, "c"), new Input<>(INPUT_TOPIC_RIGHT, "c", 10),
new Input<>(INPUT_TOPIC_RIGHT, null), new Input<>(INPUT_TOPIC_RIGHT, null, 11),
new Input<>(INPUT_TOPIC_LEFT, null), new Input<>(INPUT_TOPIC_LEFT, null, 12),
new Input<>(INPUT_TOPIC_RIGHT, null), new Input<>(INPUT_TOPIC_RIGHT, null, 13),
new Input<>(INPUT_TOPIC_RIGHT, "d"), new Input<>(INPUT_TOPIC_RIGHT, "d", 14),
new Input<>(INPUT_TOPIC_LEFT, "D") new Input<>(INPUT_TOPIC_LEFT, "D", 15),
new Input<>(INPUT_TOPIC_LEFT, "E", 4), // out-of-order data
new Input<>(INPUT_TOPIC_RIGHT, "e", 3),
new Input<>(INPUT_TOPIC_RIGHT, "f", 7),
new Input<>(INPUT_TOPIC_LEFT, "F", 8)
); );
private final List<Input<String>> leftInput = Arrays.asList( private final List<Input<String>> leftInput = Arrays.asList(
new Input<>(INPUT_TOPIC_LEFT, null), new Input<>(INPUT_TOPIC_LEFT, null, 1),
new Input<>(INPUT_TOPIC_LEFT, "A"), new Input<>(INPUT_TOPIC_LEFT, "A", 2),
new Input<>(INPUT_TOPIC_LEFT, "B"), new Input<>(INPUT_TOPIC_LEFT, "B", 3),
new Input<>(INPUT_TOPIC_LEFT, null), new Input<>(INPUT_TOPIC_LEFT, null, 4),
new Input<>(INPUT_TOPIC_LEFT, "C"), new Input<>(INPUT_TOPIC_LEFT, "C", 5),
new Input<>(INPUT_TOPIC_LEFT, null), new Input<>(INPUT_TOPIC_LEFT, null, 6),
new Input<>(INPUT_TOPIC_LEFT, "D") new Input<>(INPUT_TOPIC_LEFT, "D", 7)
); );
@ -156,17 +160,16 @@ public abstract class AbstractJoinIntegrationTest {
TestRecord<Long, String> expectedFinalResult = null; TestRecord<Long, String> expectedFinalResult = null;
final long firstTimestamp = time.milliseconds(); final long baseTimestamp = time.milliseconds();
long eventTimestamp = firstTimestamp;
final Iterator<List<TestRecord<Long, String>>> resultIterator = expectedResult.iterator(); final Iterator<List<TestRecord<Long, String>>> resultIterator = expectedResult.iterator();
for (final Input<String> singleInputRecord : input) { for (final Input<String> singleInputRecord : input) {
testInputTopicMap.get(singleInputRecord.topic).pipeInput(singleInputRecord.record.key, singleInputRecord.record.value, ++eventTimestamp); testInputTopicMap.get(singleInputRecord.topic).pipeInput(singleInputRecord.record.key, singleInputRecord.record.value, baseTimestamp + singleInputRecord.timestamp);
final List<TestRecord<Long, String>> expected = resultIterator.next(); final List<TestRecord<Long, String>> expected = resultIterator.next();
if (expected != null) { if (expected != null) {
final List<TestRecord<Long, String>> updatedExpected = new LinkedList<>(); final List<TestRecord<Long, String>> updatedExpected = new LinkedList<>();
for (final TestRecord<Long, String> record : expected) { for (final TestRecord<Long, String> record : expected) {
updatedExpected.add(new TestRecord<>(record.key(), record.value(), null, firstTimestamp + record.timestamp())); updatedExpected.add(new TestRecord<>(record.key(), record.value(), null, baseTimestamp + record.timestamp()));
} }
final List<TestRecord<Long, String>> output = outputTopic.readRecordsToList(); final List<TestRecord<Long, String>> output = outputTopic.readRecordsToList();
@ -191,11 +194,10 @@ public abstract class AbstractJoinIntegrationTest {
testInputTopicMap.put(INPUT_TOPIC_RIGHT, right); testInputTopicMap.put(INPUT_TOPIC_RIGHT, right);
testInputTopicMap.put(INPUT_TOPIC_LEFT, left); testInputTopicMap.put(INPUT_TOPIC_LEFT, left);
final long firstTimestamp = time.milliseconds(); final long baseTimestamp = time.milliseconds();
long eventTimestamp = firstTimestamp;
for (final Input<String> singleInputRecord : input) { for (final Input<String> singleInputRecord : input) {
testInputTopicMap.get(singleInputRecord.topic).pipeInput(singleInputRecord.record.key, singleInputRecord.record.value, ++eventTimestamp); testInputTopicMap.get(singleInputRecord.topic).pipeInput(singleInputRecord.record.key, singleInputRecord.record.value, baseTimestamp + singleInputRecord.timestamp);
} }
final TestRecord<Long, String> updatedExpectedFinalResult = final TestRecord<Long, String> updatedExpectedFinalResult =
@ -203,7 +205,7 @@ public abstract class AbstractJoinIntegrationTest {
expectedFinalResult.key(), expectedFinalResult.key(),
expectedFinalResult.value(), expectedFinalResult.value(),
null, null,
firstTimestamp + expectedFinalResult.timestamp()); baseTimestamp + expectedFinalResult.timestamp());
final List<TestRecord<Long, String>> output = outputTopic.readRecordsToList(); final List<TestRecord<Long, String>> output = outputTopic.readRecordsToList();
@ -259,10 +261,12 @@ public abstract class AbstractJoinIntegrationTest {
private static final class Input<V> { private static final class Input<V> {
String topic; String topic;
KeyValue<Long, V> record; KeyValue<Long, V> record;
long timestamp;
Input(final String topic, final V value) { Input(final String topic, final V value, final long timestamp) {
this.topic = topic; this.topic = topic;
record = KeyValue.pair(ANY_UNIQUE_KEY, value); record = KeyValue.pair(ANY_UNIQUE_KEY, value);
this.timestamp = timestamp;
} }
} }
} }

View File

@ -137,7 +137,31 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
new TestRecord<>(ANY_UNIQUE_KEY, "D-a", null, 15L), new TestRecord<>(ANY_UNIQUE_KEY, "D-a", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L), new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L), new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)) new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)),
Arrays.asList(
new TestRecord<>(ANY_UNIQUE_KEY, "E-a", null, 4L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-b", null, 6L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-c", null, 10L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-d", null, 14L)),
Arrays.asList(
new TestRecord<>(ANY_UNIQUE_KEY, "A-e", null, 3L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-e", null, 4L),
new TestRecord<>(ANY_UNIQUE_KEY, "B-e", null, 5L),
new TestRecord<>(ANY_UNIQUE_KEY, "C-e", null, 9L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-e", null, 15L)),
Arrays.asList(
new TestRecord<>(ANY_UNIQUE_KEY, "A-f", null, 7L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-f", null, 7L),
new TestRecord<>(ANY_UNIQUE_KEY, "B-f", null, 7L),
new TestRecord<>(ANY_UNIQUE_KEY, "C-f", null, 9L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-f", null, 15L)),
Arrays.asList(
new TestRecord<>(ANY_UNIQUE_KEY, "F-e", null, 8L),
new TestRecord<>(ANY_UNIQUE_KEY, "F-a", null, 8L),
new TestRecord<>(ANY_UNIQUE_KEY, "F-b", null, 8L),
new TestRecord<>(ANY_UNIQUE_KEY, "F-f", null, 8L),
new TestRecord<>(ANY_UNIQUE_KEY, "F-c", null, 10L),
new TestRecord<>(ANY_UNIQUE_KEY, "F-d", null, 14L))
); );
leftStream.join( leftStream.join(
@ -182,7 +206,31 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
new TestRecord<>(ANY_UNIQUE_KEY, "D-a", null, 15L), new TestRecord<>(ANY_UNIQUE_KEY, "D-a", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L), new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L), new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)) new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)),
Arrays.asList(
new TestRecord<>(ANY_UNIQUE_KEY, "E-a", null, 4L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-b", null, 6L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-c", null, 10L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-d", null, 14L)),
Arrays.asList(
new TestRecord<>(ANY_UNIQUE_KEY, "A-e", null, 3L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-e", null, 4L),
new TestRecord<>(ANY_UNIQUE_KEY, "B-e", null, 5L),
new TestRecord<>(ANY_UNIQUE_KEY, "C-e", null, 9L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-e", null, 15L)),
Arrays.asList(
new TestRecord<>(ANY_UNIQUE_KEY, "A-f", null, 7L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-f", null, 7L),
new TestRecord<>(ANY_UNIQUE_KEY, "B-f", null, 7L),
new TestRecord<>(ANY_UNIQUE_KEY, "C-f", null, 9L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-f", null, 15L)),
Arrays.asList(
new TestRecord<>(ANY_UNIQUE_KEY, "F-e", null, 8L),
new TestRecord<>(ANY_UNIQUE_KEY, "F-a", null, 8L),
new TestRecord<>(ANY_UNIQUE_KEY, "F-b", null, 8L),
new TestRecord<>(ANY_UNIQUE_KEY, "F-f", null, 8L),
new TestRecord<>(ANY_UNIQUE_KEY, "F-c", null, 10L),
new TestRecord<>(ANY_UNIQUE_KEY, "F-d", null, 14L))
); );
leftStream.map(MockMapper.noOpKeyValueMapper()) leftStream.map(MockMapper.noOpKeyValueMapper())
@ -229,7 +277,31 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
new TestRecord<>(ANY_UNIQUE_KEY, "D-a", null, 15L), new TestRecord<>(ANY_UNIQUE_KEY, "D-a", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L), new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L), new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)) new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)),
Arrays.asList(
new TestRecord<>(ANY_UNIQUE_KEY, "E-a", null, 4L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-b", null, 6L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-c", null, 10L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-d", null, 14L)),
Arrays.asList(
new TestRecord<>(ANY_UNIQUE_KEY, "A-e", null, 3L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-e", null, 4L),
new TestRecord<>(ANY_UNIQUE_KEY, "B-e", null, 5L),
new TestRecord<>(ANY_UNIQUE_KEY, "C-e", null, 9L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-e", null, 15L)),
Arrays.asList(
new TestRecord<>(ANY_UNIQUE_KEY, "A-f", null, 7L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-f", null, 7L),
new TestRecord<>(ANY_UNIQUE_KEY, "B-f", null, 7L),
new TestRecord<>(ANY_UNIQUE_KEY, "C-f", null, 9L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-f", null, 15L)),
Arrays.asList(
new TestRecord<>(ANY_UNIQUE_KEY, "F-e", null, 8L),
new TestRecord<>(ANY_UNIQUE_KEY, "F-a", null, 8L),
new TestRecord<>(ANY_UNIQUE_KEY, "F-b", null, 8L),
new TestRecord<>(ANY_UNIQUE_KEY, "F-f", null, 8L),
new TestRecord<>(ANY_UNIQUE_KEY, "F-c", null, 10L),
new TestRecord<>(ANY_UNIQUE_KEY, "F-d", null, 14L))
); );
leftStream.leftJoin( leftStream.leftJoin(
@ -274,7 +346,31 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
new TestRecord<>(ANY_UNIQUE_KEY, "D-a", null, 15L), new TestRecord<>(ANY_UNIQUE_KEY, "D-a", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L), new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L), new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)) new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)),
Arrays.asList(
new TestRecord<>(ANY_UNIQUE_KEY, "E-a", null, 4L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-b", null, 6L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-c", null, 10L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-d", null, 14L)),
Arrays.asList(
new TestRecord<>(ANY_UNIQUE_KEY, "A-e", null, 3L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-e", null, 4L),
new TestRecord<>(ANY_UNIQUE_KEY, "B-e", null, 5L),
new TestRecord<>(ANY_UNIQUE_KEY, "C-e", null, 9L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-e", null, 15L)),
Arrays.asList(
new TestRecord<>(ANY_UNIQUE_KEY, "A-f", null, 7L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-f", null, 7L),
new TestRecord<>(ANY_UNIQUE_KEY, "B-f", null, 7L),
new TestRecord<>(ANY_UNIQUE_KEY, "C-f", null, 9L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-f", null, 15L)),
Arrays.asList(
new TestRecord<>(ANY_UNIQUE_KEY, "F-e", null, 8L),
new TestRecord<>(ANY_UNIQUE_KEY, "F-a", null, 8L),
new TestRecord<>(ANY_UNIQUE_KEY, "F-b", null, 8L),
new TestRecord<>(ANY_UNIQUE_KEY, "F-f", null, 8L),
new TestRecord<>(ANY_UNIQUE_KEY, "F-c", null, 10L),
new TestRecord<>(ANY_UNIQUE_KEY, "F-d", null, 14L))
); );
leftStream.map(MockMapper.noOpKeyValueMapper()) leftStream.map(MockMapper.noOpKeyValueMapper())
@ -321,7 +417,31 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
new TestRecord<>(ANY_UNIQUE_KEY, "D-a", null, 15L), new TestRecord<>(ANY_UNIQUE_KEY, "D-a", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L), new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L), new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)) new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)),
Arrays.asList(
new TestRecord<>(ANY_UNIQUE_KEY, "E-a", null, 4L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-b", null, 6L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-c", null, 10L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-d", null, 14L)),
Arrays.asList(
new TestRecord<>(ANY_UNIQUE_KEY, "A-e", null, 3L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-e", null, 4L),
new TestRecord<>(ANY_UNIQUE_KEY, "B-e", null, 5L),
new TestRecord<>(ANY_UNIQUE_KEY, "C-e", null, 9L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-e", null, 15L)),
Arrays.asList(
new TestRecord<>(ANY_UNIQUE_KEY, "A-f", null, 7L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-f", null, 7L),
new TestRecord<>(ANY_UNIQUE_KEY, "B-f", null, 7L),
new TestRecord<>(ANY_UNIQUE_KEY, "C-f", null, 9L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-f", null, 15L)),
Arrays.asList(
new TestRecord<>(ANY_UNIQUE_KEY, "F-e", null, 8L),
new TestRecord<>(ANY_UNIQUE_KEY, "F-a", null, 8L),
new TestRecord<>(ANY_UNIQUE_KEY, "F-b", null, 8L),
new TestRecord<>(ANY_UNIQUE_KEY, "F-f", null, 8L),
new TestRecord<>(ANY_UNIQUE_KEY, "F-c", null, 10L),
new TestRecord<>(ANY_UNIQUE_KEY, "F-d", null, 14L))
); );
leftStream.outerJoin( leftStream.outerJoin(
@ -366,7 +486,31 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
new TestRecord<>(ANY_UNIQUE_KEY, "D-a", null, 15L), new TestRecord<>(ANY_UNIQUE_KEY, "D-a", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L), new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L), new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)) new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)),
Arrays.asList(
new TestRecord<>(ANY_UNIQUE_KEY, "E-a", null, 4L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-b", null, 6L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-c", null, 10L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-d", null, 14L)),
Arrays.asList(
new TestRecord<>(ANY_UNIQUE_KEY, "A-e", null, 3L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-e", null, 4L),
new TestRecord<>(ANY_UNIQUE_KEY, "B-e", null, 5L),
new TestRecord<>(ANY_UNIQUE_KEY, "C-e", null, 9L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-e", null, 15L)),
Arrays.asList(
new TestRecord<>(ANY_UNIQUE_KEY, "A-f", null, 7L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-f", null, 7L),
new TestRecord<>(ANY_UNIQUE_KEY, "B-f", null, 7L),
new TestRecord<>(ANY_UNIQUE_KEY, "C-f", null, 9L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-f", null, 15L)),
Arrays.asList(
new TestRecord<>(ANY_UNIQUE_KEY, "F-e", null, 8L),
new TestRecord<>(ANY_UNIQUE_KEY, "F-a", null, 8L),
new TestRecord<>(ANY_UNIQUE_KEY, "F-b", null, 8L),
new TestRecord<>(ANY_UNIQUE_KEY, "F-f", null, 8L),
new TestRecord<>(ANY_UNIQUE_KEY, "F-c", null, 10L),
new TestRecord<>(ANY_UNIQUE_KEY, "F-d", null, 14L))
); );
leftStream.map(MockMapper.noOpKeyValueMapper()) leftStream.map(MockMapper.noOpKeyValueMapper())
@ -404,7 +548,7 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
new TestRecord<>(ANY_UNIQUE_KEY, "C-a-b", null, 9L), new TestRecord<>(ANY_UNIQUE_KEY, "C-a-b", null, 9L),
new TestRecord<>(ANY_UNIQUE_KEY, "C-b-a", null, 9L), new TestRecord<>(ANY_UNIQUE_KEY, "C-b-a", null, 9L),
new TestRecord<>(ANY_UNIQUE_KEY, "C-b-b", null, 9L)), new TestRecord<>(ANY_UNIQUE_KEY, "C-b-b", null, 9L)),
Arrays.asList( Arrays.<TestRecord<Long, String>>asList(
new TestRecord<>(ANY_UNIQUE_KEY, "A-c-a", null, 10L), new TestRecord<>(ANY_UNIQUE_KEY, "A-c-a", null, 10L),
new TestRecord<>(ANY_UNIQUE_KEY, "A-c-b", null, 10L), new TestRecord<>(ANY_UNIQUE_KEY, "A-c-b", null, 10L),
new TestRecord<>(ANY_UNIQUE_KEY, "B-c-a", null, 10L), new TestRecord<>(ANY_UNIQUE_KEY, "B-c-a", null, 10L),
@ -423,7 +567,7 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
null, null,
null, null,
null, null,
Arrays.asList( Arrays.<TestRecord<Long, String>>asList(
new TestRecord<>(ANY_UNIQUE_KEY, "A-d-a", null, 14L), new TestRecord<>(ANY_UNIQUE_KEY, "A-d-a", null, 14L),
new TestRecord<>(ANY_UNIQUE_KEY, "A-d-b", null, 14L), new TestRecord<>(ANY_UNIQUE_KEY, "A-d-b", null, 14L),
new TestRecord<>(ANY_UNIQUE_KEY, "A-d-c", null, 14L), new TestRecord<>(ANY_UNIQUE_KEY, "A-d-c", null, 14L),
@ -445,7 +589,7 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
new TestRecord<>(ANY_UNIQUE_KEY, "A-d-d", null, 14L), new TestRecord<>(ANY_UNIQUE_KEY, "A-d-d", null, 14L),
new TestRecord<>(ANY_UNIQUE_KEY, "B-d-d", null, 14L), new TestRecord<>(ANY_UNIQUE_KEY, "B-d-d", null, 14L),
new TestRecord<>(ANY_UNIQUE_KEY, "C-d-d", null, 14L)), new TestRecord<>(ANY_UNIQUE_KEY, "C-d-d", null, 14L)),
Arrays.asList( Arrays.<TestRecord<Long, String>>asList(
new TestRecord<>(ANY_UNIQUE_KEY, "D-a-a", null, 15L), new TestRecord<>(ANY_UNIQUE_KEY, "D-a-a", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-a-b", null, 15L), new TestRecord<>(ANY_UNIQUE_KEY, "D-a-b", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-a-c", null, 15L), new TestRecord<>(ANY_UNIQUE_KEY, "D-a-c", null, 15L),
@ -461,7 +605,163 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
new TestRecord<>(ANY_UNIQUE_KEY, "D-d-a", null, 15L), new TestRecord<>(ANY_UNIQUE_KEY, "D-d-a", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-d-b", null, 15L), new TestRecord<>(ANY_UNIQUE_KEY, "D-d-b", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-d-c", null, 15L), new TestRecord<>(ANY_UNIQUE_KEY, "D-d-c", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)) new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)),
Arrays.<TestRecord<Long, String>>asList(
new TestRecord<>(ANY_UNIQUE_KEY, "E-a-a", null, 4L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-a-b", null, 6L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-a-c", null, 10L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-a-d", null, 14L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-b-a", null, 6L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-b-b", null, 6L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-b-c", null, 10L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-b-d", null, 14L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-c-a", null, 10L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-c-b", null, 10L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-c-c", null, 10L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-c-d", null, 14L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-d-a", null, 14L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-d-b", null, 14L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-d-c", null, 14L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-d-d", null, 14L)),
Arrays.<TestRecord<Long, String>>asList(
new TestRecord<>(ANY_UNIQUE_KEY, "A-e-a", null, 4L),
new TestRecord<>(ANY_UNIQUE_KEY, "A-e-b", null, 6L),
new TestRecord<>(ANY_UNIQUE_KEY, "A-e-c", null, 10L),
new TestRecord<>(ANY_UNIQUE_KEY, "A-e-d", null, 14L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-e-a", null, 4L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-e-b", null, 6L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-e-c", null, 10L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-e-d", null, 14L),
new TestRecord<>(ANY_UNIQUE_KEY, "B-e-a", null, 5L),
new TestRecord<>(ANY_UNIQUE_KEY, "B-e-b", null, 6L),
new TestRecord<>(ANY_UNIQUE_KEY, "B-e-c", null, 10L),
new TestRecord<>(ANY_UNIQUE_KEY, "B-e-d", null, 14L),
new TestRecord<>(ANY_UNIQUE_KEY, "C-e-a", null, 9L),
new TestRecord<>(ANY_UNIQUE_KEY, "C-e-b", null, 9L),
new TestRecord<>(ANY_UNIQUE_KEY, "C-e-c", null, 10L),
new TestRecord<>(ANY_UNIQUE_KEY, "C-e-d", null, 14L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-e-a", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-e-b", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-e-c", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-e-d", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "A-e-e", null, 3L),
new TestRecord<>(ANY_UNIQUE_KEY, "A-a-e", null, 4L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-a-e", null, 4L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-e-e", null, 4L),
new TestRecord<>(ANY_UNIQUE_KEY, "B-a-e", null, 5L),
new TestRecord<>(ANY_UNIQUE_KEY, "B-e-e", null, 5L),
new TestRecord<>(ANY_UNIQUE_KEY, "A-b-e", null, 6L),
new TestRecord<>(ANY_UNIQUE_KEY, "B-b-e", null, 6L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-b-e", null, 6L),
new TestRecord<>(ANY_UNIQUE_KEY, "C-a-e", null, 9L),
new TestRecord<>(ANY_UNIQUE_KEY, "C-b-e", null, 9L),
new TestRecord<>(ANY_UNIQUE_KEY, "C-e-e", null, 9L),
new TestRecord<>(ANY_UNIQUE_KEY, "A-c-e", null, 10L),
new TestRecord<>(ANY_UNIQUE_KEY, "B-c-e", null, 10L),
new TestRecord<>(ANY_UNIQUE_KEY, "C-c-e", null, 10L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-c-e", null, 10L),
new TestRecord<>(ANY_UNIQUE_KEY, "A-d-e", null, 14L),
new TestRecord<>(ANY_UNIQUE_KEY, "B-d-e", null, 14L),
new TestRecord<>(ANY_UNIQUE_KEY, "C-d-e", null, 14L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-d-e", null, 14L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-a-e", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-b-e", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-c-e", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-d-e", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-e-e", null, 15L)),
Arrays.<TestRecord<Long, String>>asList(
new TestRecord<>(ANY_UNIQUE_KEY, "A-f-e", null, 7L),
new TestRecord<>(ANY_UNIQUE_KEY, "A-f-a", null, 7L),
new TestRecord<>(ANY_UNIQUE_KEY, "A-f-b", null, 7L),
new TestRecord<>(ANY_UNIQUE_KEY, "A-f-c", null, 10L),
new TestRecord<>(ANY_UNIQUE_KEY, "A-f-d", null, 14L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-f-e", null, 7L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-f-a", null, 7L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-f-b", null, 7L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-f-c", null, 10L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-f-d", null, 14L),
new TestRecord<>(ANY_UNIQUE_KEY, "B-f-e", null, 7L),
new TestRecord<>(ANY_UNIQUE_KEY, "B-f-a", null, 7L),
new TestRecord<>(ANY_UNIQUE_KEY, "B-f-b", null, 7L),
new TestRecord<>(ANY_UNIQUE_KEY, "B-f-c", null, 10L),
new TestRecord<>(ANY_UNIQUE_KEY, "B-f-d", null, 14L),
new TestRecord<>(ANY_UNIQUE_KEY, "C-f-e", null, 9L),
new TestRecord<>(ANY_UNIQUE_KEY, "C-f-a", null, 9L),
new TestRecord<>(ANY_UNIQUE_KEY, "C-f-b", null, 9L),
new TestRecord<>(ANY_UNIQUE_KEY, "C-f-c", null, 10L),
new TestRecord<>(ANY_UNIQUE_KEY, "C-f-d", null, 14L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-f-e", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-f-a", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-f-b", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-f-c", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-f-d", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "A-e-f", null, 7L),
new TestRecord<>(ANY_UNIQUE_KEY, "A-a-f", null, 7L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-a-f", null, 7L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-e-f", null, 7L),
new TestRecord<>(ANY_UNIQUE_KEY, "B-a-f", null, 7L),
new TestRecord<>(ANY_UNIQUE_KEY, "B-e-f", null, 7L),
new TestRecord<>(ANY_UNIQUE_KEY, "A-b-f", null, 7L),
new TestRecord<>(ANY_UNIQUE_KEY, "B-b-f", null, 7L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-b-f", null, 7L),
new TestRecord<>(ANY_UNIQUE_KEY, "A-f-f", null, 7L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-f-f", null, 7L),
new TestRecord<>(ANY_UNIQUE_KEY, "B-f-f", null, 7L),
new TestRecord<>(ANY_UNIQUE_KEY, "C-a-f", null, 9L),
new TestRecord<>(ANY_UNIQUE_KEY, "C-b-f", null, 9L),
new TestRecord<>(ANY_UNIQUE_KEY, "C-e-f", null, 9L),
new TestRecord<>(ANY_UNIQUE_KEY, "C-f-f", null, 9L),
new TestRecord<>(ANY_UNIQUE_KEY, "A-c-f", null, 10L),
new TestRecord<>(ANY_UNIQUE_KEY, "B-c-f", null, 10L),
new TestRecord<>(ANY_UNIQUE_KEY, "C-c-f", null, 10L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-c-f", null, 10L),
new TestRecord<>(ANY_UNIQUE_KEY, "A-d-f", null, 14L),
new TestRecord<>(ANY_UNIQUE_KEY, "B-d-f", null, 14L),
new TestRecord<>(ANY_UNIQUE_KEY, "C-d-f", null, 14L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-d-f", null, 14L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-a-f", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-b-f", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-c-f", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-d-f", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-e-f", null, 15L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-f-f", null, 15L)),
Arrays.<TestRecord<Long, String>>asList(
new TestRecord<>(ANY_UNIQUE_KEY, "F-e-e", null, 8L),
new TestRecord<>(ANY_UNIQUE_KEY, "F-e-a", null, 8L),
new TestRecord<>(ANY_UNIQUE_KEY, "F-e-b", null, 8L),
new TestRecord<>(ANY_UNIQUE_KEY, "F-e-f", null, 8L),
new TestRecord<>(ANY_UNIQUE_KEY, "F-e-c", null, 10L),
new TestRecord<>(ANY_UNIQUE_KEY, "F-e-d", null, 14L),
new TestRecord<>(ANY_UNIQUE_KEY, "F-a-e", null, 8L),
new TestRecord<>(ANY_UNIQUE_KEY, "F-a-a", null, 8L),
new TestRecord<>(ANY_UNIQUE_KEY, "F-a-b", null, 8L),
new TestRecord<>(ANY_UNIQUE_KEY, "F-a-f", null, 8L),
new TestRecord<>(ANY_UNIQUE_KEY, "F-a-c", null, 10L),
new TestRecord<>(ANY_UNIQUE_KEY, "F-a-d", null, 14L),
new TestRecord<>(ANY_UNIQUE_KEY, "F-b-e", null, 8L),
new TestRecord<>(ANY_UNIQUE_KEY, "F-b-a", null, 8L),
new TestRecord<>(ANY_UNIQUE_KEY, "F-b-b", null, 8L),
new TestRecord<>(ANY_UNIQUE_KEY, "F-b-f", null, 8L),
new TestRecord<>(ANY_UNIQUE_KEY, "F-b-c", null, 10L),
new TestRecord<>(ANY_UNIQUE_KEY, "F-b-d", null, 14L),
new TestRecord<>(ANY_UNIQUE_KEY, "F-f-e", null, 8L),
new TestRecord<>(ANY_UNIQUE_KEY, "F-f-a", null, 8L),
new TestRecord<>(ANY_UNIQUE_KEY, "F-f-b", null, 8L),
new TestRecord<>(ANY_UNIQUE_KEY, "F-f-f", null, 8L),
new TestRecord<>(ANY_UNIQUE_KEY, "F-f-c", null, 10L),
new TestRecord<>(ANY_UNIQUE_KEY, "F-f-d", null, 14L),
new TestRecord<>(ANY_UNIQUE_KEY, "F-c-e", null, 10L),
new TestRecord<>(ANY_UNIQUE_KEY, "F-c-a", null, 10L),
new TestRecord<>(ANY_UNIQUE_KEY, "F-c-b", null, 10L),
new TestRecord<>(ANY_UNIQUE_KEY, "F-c-f", null, 10L),
new TestRecord<>(ANY_UNIQUE_KEY, "F-c-c", null, 10L),
new TestRecord<>(ANY_UNIQUE_KEY, "F-c-d", null, 14L),
new TestRecord<>(ANY_UNIQUE_KEY, "F-d-e", null, 14L),
new TestRecord<>(ANY_UNIQUE_KEY, "F-d-a", null, 14L),
new TestRecord<>(ANY_UNIQUE_KEY, "F-d-b", null, 14L),
new TestRecord<>(ANY_UNIQUE_KEY, "F-d-f", null, 14L),
new TestRecord<>(ANY_UNIQUE_KEY, "F-d-c", null, 14L),
new TestRecord<>(ANY_UNIQUE_KEY, "F-d-d", null, 14L))
); );
leftStream.join( leftStream.join(

View File

@ -79,7 +79,11 @@ public class StreamTableJoinIntegrationTest extends AbstractJoinIntegrationTest
null, null,
null, null,
null, null,
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)) Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)),
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-d", null, 4L)),
null,
null,
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-f", null, 8L))
); );
leftStream.join(rightTable, valueJoiner).to(OUTPUT_TOPIC); leftStream.join(rightTable, valueJoiner).to(OUTPUT_TOPIC);
@ -105,7 +109,11 @@ public class StreamTableJoinIntegrationTest extends AbstractJoinIntegrationTest
null, null,
null, null,
null, null,
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)) Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)),
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-d", null, 4L)),
null,
null,
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-f", null, 8L))
); );
leftStream.leftJoin(rightTable, valueJoiner).to(OUTPUT_TOPIC); leftStream.leftJoin(rightTable, valueJoiner).to(OUTPUT_TOPIC);

View File

@ -63,8 +63,8 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
rightTable = builder.table(INPUT_TOPIC_RIGHT, Materialized.<Long, String, KeyValueStore<Bytes, byte[]>>as("right").withLoggingDisabled()); rightTable = builder.table(INPUT_TOPIC_RIGHT, Materialized.<Long, String, KeyValueStore<Bytes, byte[]>>as("right").withLoggingDisabled());
} }
private final TestRecord<Long, String> expectedFinalJoinResult = new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L); private final TestRecord<Long, String> expectedFinalJoinResult = new TestRecord<>(ANY_UNIQUE_KEY, "F-f", null, 8L);
private final TestRecord<Long, String> expectedFinalMultiJoinResult = new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L); private final TestRecord<Long, String> expectedFinalMultiJoinResult = new TestRecord<>(ANY_UNIQUE_KEY, "F-f-f", null, 8L);
private final String storeName = appID + "-store"; private final String storeName = appID + "-store";
private final Materialized<Long, String, KeyValueStore<Bytes, byte[]>> materialized = Materialized.<Long, String, KeyValueStore<Bytes, byte[]>>as(storeName) private final Materialized<Long, String, KeyValueStore<Bytes, byte[]>> materialized = Materialized.<Long, String, KeyValueStore<Bytes, byte[]>>as(storeName)
@ -97,7 +97,11 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
null, null,
null, null,
null, null,
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)) Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)),
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-d", null, 14L)),
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-e", null, 4L)),
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-f", null, 7L)),
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-f", null, 8L))
); );
runTestWithDriver(expectedResult, storeName); runTestWithDriver(expectedResult, storeName);
@ -128,7 +132,11 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 12L)), Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 12L)),
null, null,
null, null,
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)) Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)),
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-d", null, 14L)),
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-e", null, 4L)),
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-f", null, 7L)),
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-f", null, 8L))
); );
runTestWithDriver(expectedResult, storeName); runTestWithDriver(expectedResult, storeName);
@ -159,7 +167,11 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 12L)), Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 12L)),
null, null,
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "null-d", null, 14L)), Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "null-d", null, 14L)),
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)) Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)),
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-d", null, 14L)),
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-e", null, 4L)),
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-f", null, 7L)),
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-f", null, 8L))
); );
runTestWithDriver(expectedResult, storeName); runTestWithDriver(expectedResult, storeName);
@ -202,7 +214,15 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
null, null,
null, null,
null, null,
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)) Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)),
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-d-d", null, 14L)),
Arrays.asList(
new TestRecord<>(ANY_UNIQUE_KEY, "E-e-e", null, 4L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-e-e", null, 4L)),
Arrays.asList(
new TestRecord<>(ANY_UNIQUE_KEY, "E-f-f", null, 7L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-f-f", null, 7L)),
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-f-f", null, 8L))
); );
runTestWithDriver(expectedResult, storeName); runTestWithDriver(expectedResult, storeName);
@ -242,7 +262,15 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
null, null,
null, null,
null, null,
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)) Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)),
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-d-d", null, 14L)),
Arrays.asList(
new TestRecord<>(ANY_UNIQUE_KEY, "E-e-e", null, 4L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-e-e", null, 4L)),
Arrays.asList(
new TestRecord<>(ANY_UNIQUE_KEY, "E-f-f", null, 7L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-f-f", null, 7L)),
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-f-f", null, 8L))
); );
runTestWithDriver(expectedResult, storeName); runTestWithDriver(expectedResult, storeName);
@ -287,7 +315,15 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
Arrays.asList( Arrays.asList(
// incorrect result `null-d` is caused by self-join of `rightTable` // incorrect result `null-d` is caused by self-join of `rightTable`
new TestRecord<>(ANY_UNIQUE_KEY, "null-d", null, 14L), new TestRecord<>(ANY_UNIQUE_KEY, "null-d", null, 14L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)) new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)),
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-d-d", null, 14L)),
Arrays.asList(
new TestRecord<>(ANY_UNIQUE_KEY, "E-e-e", null, 4L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-e-e", null, 4L)),
Arrays.asList(
new TestRecord<>(ANY_UNIQUE_KEY, "E-f-f", null, 7L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-f-f", null, 7L)),
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-f-f", null, 8L))
); );
runTestWithDriver(expectedResult, storeName); runTestWithDriver(expectedResult, storeName);
@ -327,7 +363,15 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
null, null,
null, null,
null, null,
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)) Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)),
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-d-d", null, 14L)),
Arrays.asList(
new TestRecord<>(ANY_UNIQUE_KEY, "E-e-e", null, 4L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-e-e", null, 4L)),
Arrays.asList(
new TestRecord<>(ANY_UNIQUE_KEY, "E-f-f", null, 7L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-f-f", null, 7L)),
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-f-f", null, 8L))
); );
runTestWithDriver(expectedResult, storeName); runTestWithDriver(expectedResult, storeName);
@ -371,7 +415,15 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 12L)), Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 12L)),
null, null,
null, null,
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)) Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)),
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-d-d", null, 14L)),
Arrays.asList(
new TestRecord<>(ANY_UNIQUE_KEY, "E-e-e", null, 4L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-e-e", null, 4L)),
Arrays.asList(
new TestRecord<>(ANY_UNIQUE_KEY, "E-f-f", null, 7L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-f-f", null, 7L)),
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-f-f", null, 8L))
); );
runTestWithDriver(expectedResult, storeName); runTestWithDriver(expectedResult, storeName);
@ -417,7 +469,15 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
null, null,
Arrays.asList( Arrays.asList(
new TestRecord<>(ANY_UNIQUE_KEY, "null-d", null, 14L), new TestRecord<>(ANY_UNIQUE_KEY, "null-d", null, 14L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)) new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)),
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-d-d", null, 14L)),
Arrays.asList(
new TestRecord<>(ANY_UNIQUE_KEY, "E-e-e", null, 4L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-e-e", null, 4L)),
Arrays.asList(
new TestRecord<>(ANY_UNIQUE_KEY, "E-f-f", null, 7L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-f-f", null, 7L)),
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-f-f", null, 8L))
); );
runTestWithDriver(expectedResult, storeName); runTestWithDriver(expectedResult, storeName);
@ -459,7 +519,15 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
Arrays.asList( Arrays.asList(
new TestRecord<>(ANY_UNIQUE_KEY, "null-d-d", null, 14L), new TestRecord<>(ANY_UNIQUE_KEY, "null-d-d", null, 14L),
new TestRecord<>(ANY_UNIQUE_KEY, "null-d-d", null, 14L)), new TestRecord<>(ANY_UNIQUE_KEY, "null-d-d", null, 14L)),
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)) Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)),
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-d-d", null, 14L)),
Arrays.asList(
new TestRecord<>(ANY_UNIQUE_KEY, "E-e-e", null, 4L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-e-e", null, 4L)),
Arrays.asList(
new TestRecord<>(ANY_UNIQUE_KEY, "E-f-f", null, 7L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-f-f", null, 7L)),
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-f-f", null, 8L))
); );
runTestWithDriver(expectedResult, storeName); runTestWithDriver(expectedResult, storeName);
@ -505,7 +573,15 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
Arrays.asList( Arrays.asList(
new TestRecord<>(ANY_UNIQUE_KEY, "null-d-d", null, 14L), new TestRecord<>(ANY_UNIQUE_KEY, "null-d-d", null, 14L),
new TestRecord<>(ANY_UNIQUE_KEY, "null-d-d", null, 14L)), new TestRecord<>(ANY_UNIQUE_KEY, "null-d-d", null, 14L)),
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)) Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)),
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-d-d", null, 14L)),
Arrays.asList(
new TestRecord<>(ANY_UNIQUE_KEY, "E-e-e", null, 4L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-e-e", null, 4L)),
Arrays.asList(
new TestRecord<>(ANY_UNIQUE_KEY, "E-f-f", null, 7L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-f-f", null, 7L)),
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-f-f", null, 8L))
); );
runTestWithDriver(expectedResult, storeName); runTestWithDriver(expectedResult, storeName);
@ -553,7 +629,15 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
Arrays.asList( Arrays.asList(
new TestRecord<>(ANY_UNIQUE_KEY, "null-d-d", null, 14L), new TestRecord<>(ANY_UNIQUE_KEY, "null-d-d", null, 14L),
new TestRecord<>(ANY_UNIQUE_KEY, "null-d-d", null, 14L), new TestRecord<>(ANY_UNIQUE_KEY, "null-d-d", null, 14L),
new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)) new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)),
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-d-d", null, 14L)),
Arrays.asList(
new TestRecord<>(ANY_UNIQUE_KEY, "E-e-e", null, 4L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-e-e", null, 4L)),
Arrays.asList(
new TestRecord<>(ANY_UNIQUE_KEY, "E-f-f", null, 7L),
new TestRecord<>(ANY_UNIQUE_KEY, "E-f-f", null, 7L)),
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-f-f", null, 8L))
); );
runTestWithDriver(expectedResult, storeName); runTestWithDriver(expectedResult, storeName);
} }