diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 6962d0a5c4a..4c1cbce0233 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -224,7 +224,7 @@ files="(RecordCollectorTest|StreamsPartitionAssignorTest|StreamThreadTest|StreamTaskTest|TaskManagerTest|TopologyTestDriverTest).java"/> + files="(EosIntegrationTest|EosV2UpgradeIntegrationTest|KStreamKStreamJoinTest|RocksDBWindowStoreTest|StreamStreamJoinIntegrationTest).java"/> diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java index 44ca7a186ea..5805ecfeff6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java @@ -88,31 +88,35 @@ public abstract class AbstractJoinIntegrationTest { StreamsBuilder builder; private final List> input = Arrays.asList( - new Input<>(INPUT_TOPIC_LEFT, null), - new Input<>(INPUT_TOPIC_RIGHT, null), - new Input<>(INPUT_TOPIC_LEFT, "A"), - new Input<>(INPUT_TOPIC_RIGHT, "a"), - new Input<>(INPUT_TOPIC_LEFT, "B"), - new Input<>(INPUT_TOPIC_RIGHT, "b"), - new Input<>(INPUT_TOPIC_LEFT, null), - new Input<>(INPUT_TOPIC_RIGHT, null), - new Input<>(INPUT_TOPIC_LEFT, "C"), - new Input<>(INPUT_TOPIC_RIGHT, "c"), - new Input<>(INPUT_TOPIC_RIGHT, null), - new Input<>(INPUT_TOPIC_LEFT, null), - new Input<>(INPUT_TOPIC_RIGHT, null), - new Input<>(INPUT_TOPIC_RIGHT, "d"), - new Input<>(INPUT_TOPIC_LEFT, "D") + new Input<>(INPUT_TOPIC_LEFT, null, 1), + new Input<>(INPUT_TOPIC_RIGHT, null, 2), + new Input<>(INPUT_TOPIC_LEFT, "A", 3), + new Input<>(INPUT_TOPIC_RIGHT, "a", 4), + new Input<>(INPUT_TOPIC_LEFT, "B", 5), + new Input<>(INPUT_TOPIC_RIGHT, "b", 6), + new Input<>(INPUT_TOPIC_LEFT, null, 7), + new Input<>(INPUT_TOPIC_RIGHT, null, 8), + new Input<>(INPUT_TOPIC_LEFT, "C", 9), + new Input<>(INPUT_TOPIC_RIGHT, "c", 10), + new Input<>(INPUT_TOPIC_RIGHT, null, 11), + new Input<>(INPUT_TOPIC_LEFT, null, 12), + new Input<>(INPUT_TOPIC_RIGHT, null, 13), + new Input<>(INPUT_TOPIC_RIGHT, "d", 14), + new Input<>(INPUT_TOPIC_LEFT, "D", 15), + new Input<>(INPUT_TOPIC_LEFT, "E", 4), // out-of-order data + new Input<>(INPUT_TOPIC_RIGHT, "e", 3), + new Input<>(INPUT_TOPIC_RIGHT, "f", 7), + new Input<>(INPUT_TOPIC_LEFT, "F", 8) ); private final List> leftInput = Arrays.asList( - new Input<>(INPUT_TOPIC_LEFT, null), - new Input<>(INPUT_TOPIC_LEFT, "A"), - new Input<>(INPUT_TOPIC_LEFT, "B"), - new Input<>(INPUT_TOPIC_LEFT, null), - new Input<>(INPUT_TOPIC_LEFT, "C"), - new Input<>(INPUT_TOPIC_LEFT, null), - new Input<>(INPUT_TOPIC_LEFT, "D") + new Input<>(INPUT_TOPIC_LEFT, null, 1), + new Input<>(INPUT_TOPIC_LEFT, "A", 2), + new Input<>(INPUT_TOPIC_LEFT, "B", 3), + new Input<>(INPUT_TOPIC_LEFT, null, 4), + new Input<>(INPUT_TOPIC_LEFT, "C", 5), + new Input<>(INPUT_TOPIC_LEFT, null, 6), + new Input<>(INPUT_TOPIC_LEFT, "D", 7) ); @@ -156,17 +160,16 @@ public abstract class AbstractJoinIntegrationTest { TestRecord expectedFinalResult = null; - final long firstTimestamp = time.milliseconds(); - long eventTimestamp = firstTimestamp; + final long baseTimestamp = time.milliseconds(); final Iterator>> resultIterator = expectedResult.iterator(); for (final Input singleInputRecord : input) { - testInputTopicMap.get(singleInputRecord.topic).pipeInput(singleInputRecord.record.key, singleInputRecord.record.value, ++eventTimestamp); + testInputTopicMap.get(singleInputRecord.topic).pipeInput(singleInputRecord.record.key, singleInputRecord.record.value, baseTimestamp + singleInputRecord.timestamp); final List> expected = resultIterator.next(); if (expected != null) { final List> updatedExpected = new LinkedList<>(); for (final TestRecord record : expected) { - updatedExpected.add(new TestRecord<>(record.key(), record.value(), null, firstTimestamp + record.timestamp())); + updatedExpected.add(new TestRecord<>(record.key(), record.value(), null, baseTimestamp + record.timestamp())); } final List> output = outputTopic.readRecordsToList(); @@ -191,11 +194,10 @@ public abstract class AbstractJoinIntegrationTest { testInputTopicMap.put(INPUT_TOPIC_RIGHT, right); testInputTopicMap.put(INPUT_TOPIC_LEFT, left); - final long firstTimestamp = time.milliseconds(); - long eventTimestamp = firstTimestamp; + final long baseTimestamp = time.milliseconds(); for (final Input singleInputRecord : input) { - testInputTopicMap.get(singleInputRecord.topic).pipeInput(singleInputRecord.record.key, singleInputRecord.record.value, ++eventTimestamp); + testInputTopicMap.get(singleInputRecord.topic).pipeInput(singleInputRecord.record.key, singleInputRecord.record.value, baseTimestamp + singleInputRecord.timestamp); } final TestRecord updatedExpectedFinalResult = @@ -203,7 +205,7 @@ public abstract class AbstractJoinIntegrationTest { expectedFinalResult.key(), expectedFinalResult.value(), null, - firstTimestamp + expectedFinalResult.timestamp()); + baseTimestamp + expectedFinalResult.timestamp()); final List> output = outputTopic.readRecordsToList(); @@ -259,10 +261,12 @@ public abstract class AbstractJoinIntegrationTest { private static final class Input { String topic; KeyValue record; + long timestamp; - Input(final String topic, final V value) { + Input(final String topic, final V value, final long timestamp) { this.topic = topic; record = KeyValue.pair(ANY_UNIQUE_KEY, value); + this.timestamp = timestamp; } } } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java index fc0d2213d2d..725ed7e3f66 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java @@ -137,7 +137,31 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest new TestRecord<>(ANY_UNIQUE_KEY, "D-a", null, 15L), new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L), new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L), - new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)) + new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)), + Arrays.asList( + new TestRecord<>(ANY_UNIQUE_KEY, "E-a", null, 4L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-b", null, 6L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-c", null, 10L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-d", null, 14L)), + Arrays.asList( + new TestRecord<>(ANY_UNIQUE_KEY, "A-e", null, 3L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-e", null, 4L), + new TestRecord<>(ANY_UNIQUE_KEY, "B-e", null, 5L), + new TestRecord<>(ANY_UNIQUE_KEY, "C-e", null, 9L), + new TestRecord<>(ANY_UNIQUE_KEY, "D-e", null, 15L)), + Arrays.asList( + new TestRecord<>(ANY_UNIQUE_KEY, "A-f", null, 7L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-f", null, 7L), + new TestRecord<>(ANY_UNIQUE_KEY, "B-f", null, 7L), + new TestRecord<>(ANY_UNIQUE_KEY, "C-f", null, 9L), + new TestRecord<>(ANY_UNIQUE_KEY, "D-f", null, 15L)), + Arrays.asList( + new TestRecord<>(ANY_UNIQUE_KEY, "F-e", null, 8L), + new TestRecord<>(ANY_UNIQUE_KEY, "F-a", null, 8L), + new TestRecord<>(ANY_UNIQUE_KEY, "F-b", null, 8L), + new TestRecord<>(ANY_UNIQUE_KEY, "F-f", null, 8L), + new TestRecord<>(ANY_UNIQUE_KEY, "F-c", null, 10L), + new TestRecord<>(ANY_UNIQUE_KEY, "F-d", null, 14L)) ); leftStream.join( @@ -182,7 +206,31 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest new TestRecord<>(ANY_UNIQUE_KEY, "D-a", null, 15L), new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L), new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L), - new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)) + new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)), + Arrays.asList( + new TestRecord<>(ANY_UNIQUE_KEY, "E-a", null, 4L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-b", null, 6L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-c", null, 10L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-d", null, 14L)), + Arrays.asList( + new TestRecord<>(ANY_UNIQUE_KEY, "A-e", null, 3L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-e", null, 4L), + new TestRecord<>(ANY_UNIQUE_KEY, "B-e", null, 5L), + new TestRecord<>(ANY_UNIQUE_KEY, "C-e", null, 9L), + new TestRecord<>(ANY_UNIQUE_KEY, "D-e", null, 15L)), + Arrays.asList( + new TestRecord<>(ANY_UNIQUE_KEY, "A-f", null, 7L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-f", null, 7L), + new TestRecord<>(ANY_UNIQUE_KEY, "B-f", null, 7L), + new TestRecord<>(ANY_UNIQUE_KEY, "C-f", null, 9L), + new TestRecord<>(ANY_UNIQUE_KEY, "D-f", null, 15L)), + Arrays.asList( + new TestRecord<>(ANY_UNIQUE_KEY, "F-e", null, 8L), + new TestRecord<>(ANY_UNIQUE_KEY, "F-a", null, 8L), + new TestRecord<>(ANY_UNIQUE_KEY, "F-b", null, 8L), + new TestRecord<>(ANY_UNIQUE_KEY, "F-f", null, 8L), + new TestRecord<>(ANY_UNIQUE_KEY, "F-c", null, 10L), + new TestRecord<>(ANY_UNIQUE_KEY, "F-d", null, 14L)) ); leftStream.map(MockMapper.noOpKeyValueMapper()) @@ -229,7 +277,31 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest new TestRecord<>(ANY_UNIQUE_KEY, "D-a", null, 15L), new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L), new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L), - new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)) + new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)), + Arrays.asList( + new TestRecord<>(ANY_UNIQUE_KEY, "E-a", null, 4L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-b", null, 6L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-c", null, 10L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-d", null, 14L)), + Arrays.asList( + new TestRecord<>(ANY_UNIQUE_KEY, "A-e", null, 3L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-e", null, 4L), + new TestRecord<>(ANY_UNIQUE_KEY, "B-e", null, 5L), + new TestRecord<>(ANY_UNIQUE_KEY, "C-e", null, 9L), + new TestRecord<>(ANY_UNIQUE_KEY, "D-e", null, 15L)), + Arrays.asList( + new TestRecord<>(ANY_UNIQUE_KEY, "A-f", null, 7L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-f", null, 7L), + new TestRecord<>(ANY_UNIQUE_KEY, "B-f", null, 7L), + new TestRecord<>(ANY_UNIQUE_KEY, "C-f", null, 9L), + new TestRecord<>(ANY_UNIQUE_KEY, "D-f", null, 15L)), + Arrays.asList( + new TestRecord<>(ANY_UNIQUE_KEY, "F-e", null, 8L), + new TestRecord<>(ANY_UNIQUE_KEY, "F-a", null, 8L), + new TestRecord<>(ANY_UNIQUE_KEY, "F-b", null, 8L), + new TestRecord<>(ANY_UNIQUE_KEY, "F-f", null, 8L), + new TestRecord<>(ANY_UNIQUE_KEY, "F-c", null, 10L), + new TestRecord<>(ANY_UNIQUE_KEY, "F-d", null, 14L)) ); leftStream.leftJoin( @@ -274,7 +346,31 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest new TestRecord<>(ANY_UNIQUE_KEY, "D-a", null, 15L), new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L), new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L), - new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)) + new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)), + Arrays.asList( + new TestRecord<>(ANY_UNIQUE_KEY, "E-a", null, 4L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-b", null, 6L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-c", null, 10L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-d", null, 14L)), + Arrays.asList( + new TestRecord<>(ANY_UNIQUE_KEY, "A-e", null, 3L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-e", null, 4L), + new TestRecord<>(ANY_UNIQUE_KEY, "B-e", null, 5L), + new TestRecord<>(ANY_UNIQUE_KEY, "C-e", null, 9L), + new TestRecord<>(ANY_UNIQUE_KEY, "D-e", null, 15L)), + Arrays.asList( + new TestRecord<>(ANY_UNIQUE_KEY, "A-f", null, 7L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-f", null, 7L), + new TestRecord<>(ANY_UNIQUE_KEY, "B-f", null, 7L), + new TestRecord<>(ANY_UNIQUE_KEY, "C-f", null, 9L), + new TestRecord<>(ANY_UNIQUE_KEY, "D-f", null, 15L)), + Arrays.asList( + new TestRecord<>(ANY_UNIQUE_KEY, "F-e", null, 8L), + new TestRecord<>(ANY_UNIQUE_KEY, "F-a", null, 8L), + new TestRecord<>(ANY_UNIQUE_KEY, "F-b", null, 8L), + new TestRecord<>(ANY_UNIQUE_KEY, "F-f", null, 8L), + new TestRecord<>(ANY_UNIQUE_KEY, "F-c", null, 10L), + new TestRecord<>(ANY_UNIQUE_KEY, "F-d", null, 14L)) ); leftStream.map(MockMapper.noOpKeyValueMapper()) @@ -321,7 +417,31 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest new TestRecord<>(ANY_UNIQUE_KEY, "D-a", null, 15L), new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L), new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L), - new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)) + new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)), + Arrays.asList( + new TestRecord<>(ANY_UNIQUE_KEY, "E-a", null, 4L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-b", null, 6L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-c", null, 10L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-d", null, 14L)), + Arrays.asList( + new TestRecord<>(ANY_UNIQUE_KEY, "A-e", null, 3L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-e", null, 4L), + new TestRecord<>(ANY_UNIQUE_KEY, "B-e", null, 5L), + new TestRecord<>(ANY_UNIQUE_KEY, "C-e", null, 9L), + new TestRecord<>(ANY_UNIQUE_KEY, "D-e", null, 15L)), + Arrays.asList( + new TestRecord<>(ANY_UNIQUE_KEY, "A-f", null, 7L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-f", null, 7L), + new TestRecord<>(ANY_UNIQUE_KEY, "B-f", null, 7L), + new TestRecord<>(ANY_UNIQUE_KEY, "C-f", null, 9L), + new TestRecord<>(ANY_UNIQUE_KEY, "D-f", null, 15L)), + Arrays.asList( + new TestRecord<>(ANY_UNIQUE_KEY, "F-e", null, 8L), + new TestRecord<>(ANY_UNIQUE_KEY, "F-a", null, 8L), + new TestRecord<>(ANY_UNIQUE_KEY, "F-b", null, 8L), + new TestRecord<>(ANY_UNIQUE_KEY, "F-f", null, 8L), + new TestRecord<>(ANY_UNIQUE_KEY, "F-c", null, 10L), + new TestRecord<>(ANY_UNIQUE_KEY, "F-d", null, 14L)) ); leftStream.outerJoin( @@ -366,7 +486,31 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest new TestRecord<>(ANY_UNIQUE_KEY, "D-a", null, 15L), new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L), new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L), - new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)) + new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)), + Arrays.asList( + new TestRecord<>(ANY_UNIQUE_KEY, "E-a", null, 4L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-b", null, 6L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-c", null, 10L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-d", null, 14L)), + Arrays.asList( + new TestRecord<>(ANY_UNIQUE_KEY, "A-e", null, 3L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-e", null, 4L), + new TestRecord<>(ANY_UNIQUE_KEY, "B-e", null, 5L), + new TestRecord<>(ANY_UNIQUE_KEY, "C-e", null, 9L), + new TestRecord<>(ANY_UNIQUE_KEY, "D-e", null, 15L)), + Arrays.asList( + new TestRecord<>(ANY_UNIQUE_KEY, "A-f", null, 7L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-f", null, 7L), + new TestRecord<>(ANY_UNIQUE_KEY, "B-f", null, 7L), + new TestRecord<>(ANY_UNIQUE_KEY, "C-f", null, 9L), + new TestRecord<>(ANY_UNIQUE_KEY, "D-f", null, 15L)), + Arrays.asList( + new TestRecord<>(ANY_UNIQUE_KEY, "F-e", null, 8L), + new TestRecord<>(ANY_UNIQUE_KEY, "F-a", null, 8L), + new TestRecord<>(ANY_UNIQUE_KEY, "F-b", null, 8L), + new TestRecord<>(ANY_UNIQUE_KEY, "F-f", null, 8L), + new TestRecord<>(ANY_UNIQUE_KEY, "F-c", null, 10L), + new TestRecord<>(ANY_UNIQUE_KEY, "F-d", null, 14L)) ); leftStream.map(MockMapper.noOpKeyValueMapper()) @@ -404,7 +548,7 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest new TestRecord<>(ANY_UNIQUE_KEY, "C-a-b", null, 9L), new TestRecord<>(ANY_UNIQUE_KEY, "C-b-a", null, 9L), new TestRecord<>(ANY_UNIQUE_KEY, "C-b-b", null, 9L)), - Arrays.asList( + Arrays.>asList( new TestRecord<>(ANY_UNIQUE_KEY, "A-c-a", null, 10L), new TestRecord<>(ANY_UNIQUE_KEY, "A-c-b", null, 10L), new TestRecord<>(ANY_UNIQUE_KEY, "B-c-a", null, 10L), @@ -423,7 +567,7 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest null, null, null, - Arrays.asList( + Arrays.>asList( new TestRecord<>(ANY_UNIQUE_KEY, "A-d-a", null, 14L), new TestRecord<>(ANY_UNIQUE_KEY, "A-d-b", null, 14L), new TestRecord<>(ANY_UNIQUE_KEY, "A-d-c", null, 14L), @@ -445,7 +589,7 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest new TestRecord<>(ANY_UNIQUE_KEY, "A-d-d", null, 14L), new TestRecord<>(ANY_UNIQUE_KEY, "B-d-d", null, 14L), new TestRecord<>(ANY_UNIQUE_KEY, "C-d-d", null, 14L)), - Arrays.asList( + Arrays.>asList( new TestRecord<>(ANY_UNIQUE_KEY, "D-a-a", null, 15L), new TestRecord<>(ANY_UNIQUE_KEY, "D-a-b", null, 15L), new TestRecord<>(ANY_UNIQUE_KEY, "D-a-c", null, 15L), @@ -461,7 +605,163 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest new TestRecord<>(ANY_UNIQUE_KEY, "D-d-a", null, 15L), new TestRecord<>(ANY_UNIQUE_KEY, "D-d-b", null, 15L), new TestRecord<>(ANY_UNIQUE_KEY, "D-d-c", null, 15L), - new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)) + new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)), + Arrays.>asList( + new TestRecord<>(ANY_UNIQUE_KEY, "E-a-a", null, 4L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-a-b", null, 6L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-a-c", null, 10L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-a-d", null, 14L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-b-a", null, 6L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-b-b", null, 6L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-b-c", null, 10L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-b-d", null, 14L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-c-a", null, 10L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-c-b", null, 10L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-c-c", null, 10L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-c-d", null, 14L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-d-a", null, 14L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-d-b", null, 14L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-d-c", null, 14L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-d-d", null, 14L)), + Arrays.>asList( + new TestRecord<>(ANY_UNIQUE_KEY, "A-e-a", null, 4L), + new TestRecord<>(ANY_UNIQUE_KEY, "A-e-b", null, 6L), + new TestRecord<>(ANY_UNIQUE_KEY, "A-e-c", null, 10L), + new TestRecord<>(ANY_UNIQUE_KEY, "A-e-d", null, 14L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-e-a", null, 4L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-e-b", null, 6L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-e-c", null, 10L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-e-d", null, 14L), + new TestRecord<>(ANY_UNIQUE_KEY, "B-e-a", null, 5L), + new TestRecord<>(ANY_UNIQUE_KEY, "B-e-b", null, 6L), + new TestRecord<>(ANY_UNIQUE_KEY, "B-e-c", null, 10L), + new TestRecord<>(ANY_UNIQUE_KEY, "B-e-d", null, 14L), + new TestRecord<>(ANY_UNIQUE_KEY, "C-e-a", null, 9L), + new TestRecord<>(ANY_UNIQUE_KEY, "C-e-b", null, 9L), + new TestRecord<>(ANY_UNIQUE_KEY, "C-e-c", null, 10L), + new TestRecord<>(ANY_UNIQUE_KEY, "C-e-d", null, 14L), + new TestRecord<>(ANY_UNIQUE_KEY, "D-e-a", null, 15L), + new TestRecord<>(ANY_UNIQUE_KEY, "D-e-b", null, 15L), + new TestRecord<>(ANY_UNIQUE_KEY, "D-e-c", null, 15L), + new TestRecord<>(ANY_UNIQUE_KEY, "D-e-d", null, 15L), + new TestRecord<>(ANY_UNIQUE_KEY, "A-e-e", null, 3L), + new TestRecord<>(ANY_UNIQUE_KEY, "A-a-e", null, 4L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-a-e", null, 4L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-e-e", null, 4L), + new TestRecord<>(ANY_UNIQUE_KEY, "B-a-e", null, 5L), + new TestRecord<>(ANY_UNIQUE_KEY, "B-e-e", null, 5L), + new TestRecord<>(ANY_UNIQUE_KEY, "A-b-e", null, 6L), + new TestRecord<>(ANY_UNIQUE_KEY, "B-b-e", null, 6L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-b-e", null, 6L), + new TestRecord<>(ANY_UNIQUE_KEY, "C-a-e", null, 9L), + new TestRecord<>(ANY_UNIQUE_KEY, "C-b-e", null, 9L), + new TestRecord<>(ANY_UNIQUE_KEY, "C-e-e", null, 9L), + new TestRecord<>(ANY_UNIQUE_KEY, "A-c-e", null, 10L), + new TestRecord<>(ANY_UNIQUE_KEY, "B-c-e", null, 10L), + new TestRecord<>(ANY_UNIQUE_KEY, "C-c-e", null, 10L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-c-e", null, 10L), + new TestRecord<>(ANY_UNIQUE_KEY, "A-d-e", null, 14L), + new TestRecord<>(ANY_UNIQUE_KEY, "B-d-e", null, 14L), + new TestRecord<>(ANY_UNIQUE_KEY, "C-d-e", null, 14L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-d-e", null, 14L), + new TestRecord<>(ANY_UNIQUE_KEY, "D-a-e", null, 15L), + new TestRecord<>(ANY_UNIQUE_KEY, "D-b-e", null, 15L), + new TestRecord<>(ANY_UNIQUE_KEY, "D-c-e", null, 15L), + new TestRecord<>(ANY_UNIQUE_KEY, "D-d-e", null, 15L), + new TestRecord<>(ANY_UNIQUE_KEY, "D-e-e", null, 15L)), + Arrays.>asList( + new TestRecord<>(ANY_UNIQUE_KEY, "A-f-e", null, 7L), + new TestRecord<>(ANY_UNIQUE_KEY, "A-f-a", null, 7L), + new TestRecord<>(ANY_UNIQUE_KEY, "A-f-b", null, 7L), + new TestRecord<>(ANY_UNIQUE_KEY, "A-f-c", null, 10L), + new TestRecord<>(ANY_UNIQUE_KEY, "A-f-d", null, 14L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-f-e", null, 7L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-f-a", null, 7L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-f-b", null, 7L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-f-c", null, 10L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-f-d", null, 14L), + new TestRecord<>(ANY_UNIQUE_KEY, "B-f-e", null, 7L), + new TestRecord<>(ANY_UNIQUE_KEY, "B-f-a", null, 7L), + new TestRecord<>(ANY_UNIQUE_KEY, "B-f-b", null, 7L), + new TestRecord<>(ANY_UNIQUE_KEY, "B-f-c", null, 10L), + new TestRecord<>(ANY_UNIQUE_KEY, "B-f-d", null, 14L), + new TestRecord<>(ANY_UNIQUE_KEY, "C-f-e", null, 9L), + new TestRecord<>(ANY_UNIQUE_KEY, "C-f-a", null, 9L), + new TestRecord<>(ANY_UNIQUE_KEY, "C-f-b", null, 9L), + new TestRecord<>(ANY_UNIQUE_KEY, "C-f-c", null, 10L), + new TestRecord<>(ANY_UNIQUE_KEY, "C-f-d", null, 14L), + new TestRecord<>(ANY_UNIQUE_KEY, "D-f-e", null, 15L), + new TestRecord<>(ANY_UNIQUE_KEY, "D-f-a", null, 15L), + new TestRecord<>(ANY_UNIQUE_KEY, "D-f-b", null, 15L), + new TestRecord<>(ANY_UNIQUE_KEY, "D-f-c", null, 15L), + new TestRecord<>(ANY_UNIQUE_KEY, "D-f-d", null, 15L), + new TestRecord<>(ANY_UNIQUE_KEY, "A-e-f", null, 7L), + new TestRecord<>(ANY_UNIQUE_KEY, "A-a-f", null, 7L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-a-f", null, 7L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-e-f", null, 7L), + new TestRecord<>(ANY_UNIQUE_KEY, "B-a-f", null, 7L), + new TestRecord<>(ANY_UNIQUE_KEY, "B-e-f", null, 7L), + new TestRecord<>(ANY_UNIQUE_KEY, "A-b-f", null, 7L), + new TestRecord<>(ANY_UNIQUE_KEY, "B-b-f", null, 7L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-b-f", null, 7L), + new TestRecord<>(ANY_UNIQUE_KEY, "A-f-f", null, 7L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-f-f", null, 7L), + new TestRecord<>(ANY_UNIQUE_KEY, "B-f-f", null, 7L), + new TestRecord<>(ANY_UNIQUE_KEY, "C-a-f", null, 9L), + new TestRecord<>(ANY_UNIQUE_KEY, "C-b-f", null, 9L), + new TestRecord<>(ANY_UNIQUE_KEY, "C-e-f", null, 9L), + new TestRecord<>(ANY_UNIQUE_KEY, "C-f-f", null, 9L), + new TestRecord<>(ANY_UNIQUE_KEY, "A-c-f", null, 10L), + new TestRecord<>(ANY_UNIQUE_KEY, "B-c-f", null, 10L), + new TestRecord<>(ANY_UNIQUE_KEY, "C-c-f", null, 10L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-c-f", null, 10L), + new TestRecord<>(ANY_UNIQUE_KEY, "A-d-f", null, 14L), + new TestRecord<>(ANY_UNIQUE_KEY, "B-d-f", null, 14L), + new TestRecord<>(ANY_UNIQUE_KEY, "C-d-f", null, 14L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-d-f", null, 14L), + new TestRecord<>(ANY_UNIQUE_KEY, "D-a-f", null, 15L), + new TestRecord<>(ANY_UNIQUE_KEY, "D-b-f", null, 15L), + new TestRecord<>(ANY_UNIQUE_KEY, "D-c-f", null, 15L), + new TestRecord<>(ANY_UNIQUE_KEY, "D-d-f", null, 15L), + new TestRecord<>(ANY_UNIQUE_KEY, "D-e-f", null, 15L), + new TestRecord<>(ANY_UNIQUE_KEY, "D-f-f", null, 15L)), + Arrays.>asList( + new TestRecord<>(ANY_UNIQUE_KEY, "F-e-e", null, 8L), + new TestRecord<>(ANY_UNIQUE_KEY, "F-e-a", null, 8L), + new TestRecord<>(ANY_UNIQUE_KEY, "F-e-b", null, 8L), + new TestRecord<>(ANY_UNIQUE_KEY, "F-e-f", null, 8L), + new TestRecord<>(ANY_UNIQUE_KEY, "F-e-c", null, 10L), + new TestRecord<>(ANY_UNIQUE_KEY, "F-e-d", null, 14L), + new TestRecord<>(ANY_UNIQUE_KEY, "F-a-e", null, 8L), + new TestRecord<>(ANY_UNIQUE_KEY, "F-a-a", null, 8L), + new TestRecord<>(ANY_UNIQUE_KEY, "F-a-b", null, 8L), + new TestRecord<>(ANY_UNIQUE_KEY, "F-a-f", null, 8L), + new TestRecord<>(ANY_UNIQUE_KEY, "F-a-c", null, 10L), + new TestRecord<>(ANY_UNIQUE_KEY, "F-a-d", null, 14L), + new TestRecord<>(ANY_UNIQUE_KEY, "F-b-e", null, 8L), + new TestRecord<>(ANY_UNIQUE_KEY, "F-b-a", null, 8L), + new TestRecord<>(ANY_UNIQUE_KEY, "F-b-b", null, 8L), + new TestRecord<>(ANY_UNIQUE_KEY, "F-b-f", null, 8L), + new TestRecord<>(ANY_UNIQUE_KEY, "F-b-c", null, 10L), + new TestRecord<>(ANY_UNIQUE_KEY, "F-b-d", null, 14L), + new TestRecord<>(ANY_UNIQUE_KEY, "F-f-e", null, 8L), + new TestRecord<>(ANY_UNIQUE_KEY, "F-f-a", null, 8L), + new TestRecord<>(ANY_UNIQUE_KEY, "F-f-b", null, 8L), + new TestRecord<>(ANY_UNIQUE_KEY, "F-f-f", null, 8L), + new TestRecord<>(ANY_UNIQUE_KEY, "F-f-c", null, 10L), + new TestRecord<>(ANY_UNIQUE_KEY, "F-f-d", null, 14L), + new TestRecord<>(ANY_UNIQUE_KEY, "F-c-e", null, 10L), + new TestRecord<>(ANY_UNIQUE_KEY, "F-c-a", null, 10L), + new TestRecord<>(ANY_UNIQUE_KEY, "F-c-b", null, 10L), + new TestRecord<>(ANY_UNIQUE_KEY, "F-c-f", null, 10L), + new TestRecord<>(ANY_UNIQUE_KEY, "F-c-c", null, 10L), + new TestRecord<>(ANY_UNIQUE_KEY, "F-c-d", null, 14L), + new TestRecord<>(ANY_UNIQUE_KEY, "F-d-e", null, 14L), + new TestRecord<>(ANY_UNIQUE_KEY, "F-d-a", null, 14L), + new TestRecord<>(ANY_UNIQUE_KEY, "F-d-b", null, 14L), + new TestRecord<>(ANY_UNIQUE_KEY, "F-d-f", null, 14L), + new TestRecord<>(ANY_UNIQUE_KEY, "F-d-c", null, 14L), + new TestRecord<>(ANY_UNIQUE_KEY, "F-d-d", null, 14L)) ); leftStream.join( diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java index 37d5fc2a724..144c18a70fb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java @@ -79,7 +79,11 @@ public class StreamTableJoinIntegrationTest extends AbstractJoinIntegrationTest null, null, null, - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)) + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-d", null, 4L)), + null, + null, + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-f", null, 8L)) ); leftStream.join(rightTable, valueJoiner).to(OUTPUT_TOPIC); @@ -105,7 +109,11 @@ public class StreamTableJoinIntegrationTest extends AbstractJoinIntegrationTest null, null, null, - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)) + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-d", null, 4L)), + null, + null, + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-f", null, 8L)) ); leftStream.leftJoin(rightTable, valueJoiner).to(OUTPUT_TOPIC); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java index aaa0f462e83..faa4527adcf 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java @@ -63,8 +63,8 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest { rightTable = builder.table(INPUT_TOPIC_RIGHT, Materialized.>as("right").withLoggingDisabled()); } - private final TestRecord expectedFinalJoinResult = new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L); - private final TestRecord expectedFinalMultiJoinResult = new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L); + private final TestRecord expectedFinalJoinResult = new TestRecord<>(ANY_UNIQUE_KEY, "F-f", null, 8L); + private final TestRecord expectedFinalMultiJoinResult = new TestRecord<>(ANY_UNIQUE_KEY, "F-f-f", null, 8L); private final String storeName = appID + "-store"; private final Materialized> materialized = Materialized.>as(storeName) @@ -97,7 +97,11 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest { null, null, null, - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)) + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-d", null, 14L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-e", null, 4L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-f", null, 7L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-f", null, 8L)) ); runTestWithDriver(expectedResult, storeName); @@ -128,7 +132,11 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest { Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 12L)), null, null, - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)) + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-d", null, 14L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-e", null, 4L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-f", null, 7L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-f", null, 8L)) ); runTestWithDriver(expectedResult, storeName); @@ -159,7 +167,11 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest { Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 12L)), null, Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "null-d", null, 14L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)) + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-d", null, 14L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-e", null, 4L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-f", null, 7L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-f", null, 8L)) ); runTestWithDriver(expectedResult, storeName); @@ -202,7 +214,15 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest { null, null, null, - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)) + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-d-d", null, 14L)), + Arrays.asList( + new TestRecord<>(ANY_UNIQUE_KEY, "E-e-e", null, 4L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-e-e", null, 4L)), + Arrays.asList( + new TestRecord<>(ANY_UNIQUE_KEY, "E-f-f", null, 7L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-f-f", null, 7L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-f-f", null, 8L)) ); runTestWithDriver(expectedResult, storeName); @@ -242,7 +262,15 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest { null, null, null, - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)) + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-d-d", null, 14L)), + Arrays.asList( + new TestRecord<>(ANY_UNIQUE_KEY, "E-e-e", null, 4L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-e-e", null, 4L)), + Arrays.asList( + new TestRecord<>(ANY_UNIQUE_KEY, "E-f-f", null, 7L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-f-f", null, 7L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-f-f", null, 8L)) ); runTestWithDriver(expectedResult, storeName); @@ -287,7 +315,15 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest { Arrays.asList( // incorrect result `null-d` is caused by self-join of `rightTable` new TestRecord<>(ANY_UNIQUE_KEY, "null-d", null, 14L), - new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)) + new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-d-d", null, 14L)), + Arrays.asList( + new TestRecord<>(ANY_UNIQUE_KEY, "E-e-e", null, 4L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-e-e", null, 4L)), + Arrays.asList( + new TestRecord<>(ANY_UNIQUE_KEY, "E-f-f", null, 7L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-f-f", null, 7L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-f-f", null, 8L)) ); runTestWithDriver(expectedResult, storeName); @@ -327,7 +363,15 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest { null, null, null, - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)) + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-d-d", null, 14L)), + Arrays.asList( + new TestRecord<>(ANY_UNIQUE_KEY, "E-e-e", null, 4L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-e-e", null, 4L)), + Arrays.asList( + new TestRecord<>(ANY_UNIQUE_KEY, "E-f-f", null, 7L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-f-f", null, 7L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-f-f", null, 8L)) ); runTestWithDriver(expectedResult, storeName); @@ -371,7 +415,15 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest { Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 12L)), null, null, - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)) + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-d-d", null, 14L)), + Arrays.asList( + new TestRecord<>(ANY_UNIQUE_KEY, "E-e-e", null, 4L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-e-e", null, 4L)), + Arrays.asList( + new TestRecord<>(ANY_UNIQUE_KEY, "E-f-f", null, 7L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-f-f", null, 7L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-f-f", null, 8L)) ); runTestWithDriver(expectedResult, storeName); @@ -417,7 +469,15 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest { null, Arrays.asList( new TestRecord<>(ANY_UNIQUE_KEY, "null-d", null, 14L), - new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)) + new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-d-d", null, 14L)), + Arrays.asList( + new TestRecord<>(ANY_UNIQUE_KEY, "E-e-e", null, 4L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-e-e", null, 4L)), + Arrays.asList( + new TestRecord<>(ANY_UNIQUE_KEY, "E-f-f", null, 7L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-f-f", null, 7L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-f-f", null, 8L)) ); runTestWithDriver(expectedResult, storeName); @@ -459,7 +519,15 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest { Arrays.asList( new TestRecord<>(ANY_UNIQUE_KEY, "null-d-d", null, 14L), new TestRecord<>(ANY_UNIQUE_KEY, "null-d-d", null, 14L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)) + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-d-d", null, 14L)), + Arrays.asList( + new TestRecord<>(ANY_UNIQUE_KEY, "E-e-e", null, 4L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-e-e", null, 4L)), + Arrays.asList( + new TestRecord<>(ANY_UNIQUE_KEY, "E-f-f", null, 7L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-f-f", null, 7L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-f-f", null, 8L)) ); runTestWithDriver(expectedResult, storeName); @@ -505,7 +573,15 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest { Arrays.asList( new TestRecord<>(ANY_UNIQUE_KEY, "null-d-d", null, 14L), new TestRecord<>(ANY_UNIQUE_KEY, "null-d-d", null, 14L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)) + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-d-d", null, 14L)), + Arrays.asList( + new TestRecord<>(ANY_UNIQUE_KEY, "E-e-e", null, 4L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-e-e", null, 4L)), + Arrays.asList( + new TestRecord<>(ANY_UNIQUE_KEY, "E-f-f", null, 7L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-f-f", null, 7L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-f-f", null, 8L)) ); runTestWithDriver(expectedResult, storeName); @@ -553,7 +629,15 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest { Arrays.asList( new TestRecord<>(ANY_UNIQUE_KEY, "null-d-d", null, 14L), new TestRecord<>(ANY_UNIQUE_KEY, "null-d-d", null, 14L), - new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)) + new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-d-d", null, 14L)), + Arrays.asList( + new TestRecord<>(ANY_UNIQUE_KEY, "E-e-e", null, 4L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-e-e", null, 4L)), + Arrays.asList( + new TestRecord<>(ANY_UNIQUE_KEY, "E-f-f", null, 7L), + new TestRecord<>(ANY_UNIQUE_KEY, "E-f-f", null, 7L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-f-f", null, 8L)) ); runTestWithDriver(expectedResult, storeName); }