mirror of https://github.com/apache/kafka.git
KAFKA-15417: flip joinSpuriousLookBackTimeMs and emit non-joined items (#14426)
Kafka Streams support asymmetric join windows. Depending on the window configuration we need to compute window close time etc differently. This PR flips `joinSpuriousLookBackTimeMs`, because they were not correct, and introduced the `windowsAfterIntervalMs`-field that is used to find if emitting records can be skipped. Reviewers: Hao Li <hli@confluent.io>, Guozhang Wang <guozhang.wang.us@gmail.com>, Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
parent
554fa57af8
commit
e81379d3fe
|
@ -51,7 +51,8 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K,
|
|||
private final long joinAfterMs;
|
||||
private final long joinGraceMs;
|
||||
private final boolean enableSpuriousResultFix;
|
||||
private final long joinSpuriousLookBackTimeMs;
|
||||
private final long windowsBeforeMs;
|
||||
private final long windowsAfterMs;
|
||||
|
||||
private final boolean outer;
|
||||
private final boolean isLeftSide;
|
||||
|
@ -72,12 +73,12 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K,
|
|||
if (isLeftSide) {
|
||||
this.joinBeforeMs = windows.beforeMs;
|
||||
this.joinAfterMs = windows.afterMs;
|
||||
this.joinSpuriousLookBackTimeMs = windows.beforeMs;
|
||||
} else {
|
||||
this.joinBeforeMs = windows.afterMs;
|
||||
this.joinAfterMs = windows.beforeMs;
|
||||
this.joinSpuriousLookBackTimeMs = windows.afterMs;
|
||||
}
|
||||
this.windowsAfterMs = windows.afterMs;
|
||||
this.windowsBeforeMs = windows.beforeMs;
|
||||
this.joinGraceMs = windows.gracePeriodMs();
|
||||
this.enableSpuriousResultFix = windows.spuriousResultFixEnabled();
|
||||
this.joiner = joiner;
|
||||
|
@ -136,11 +137,12 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K,
|
|||
return;
|
||||
}
|
||||
|
||||
boolean needOuterJoin = outer;
|
||||
// Emit all non-joined records which window has closed
|
||||
if (inputRecordTimestamp == sharedTimeTracker.streamTime) {
|
||||
outerJoinStore.ifPresent(store -> emitNonJoinedOuterRecords(store, record));
|
||||
}
|
||||
|
||||
boolean needOuterJoin = outer;
|
||||
try (final WindowStoreIterator<V2> iter = otherWindowStore.fetch(record.key(), timeFrom, timeTo)) {
|
||||
while (iter.hasNext()) {
|
||||
needOuterJoin = false;
|
||||
|
@ -200,7 +202,7 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K,
|
|||
// to reduce runtime cost, we try to avoid paying those cost
|
||||
|
||||
// only try to emit left/outer join results if there _might_ be any result records
|
||||
if (sharedTimeTracker.minTime >= sharedTimeTracker.streamTime - joinSpuriousLookBackTimeMs - joinGraceMs) {
|
||||
if (sharedTimeTracker.minTime + joinAfterMs + joinGraceMs >= sharedTimeTracker.streamTime) {
|
||||
return;
|
||||
}
|
||||
// throttle the emit frequency to a (configurable) interval;
|
||||
|
@ -222,6 +224,8 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K,
|
|||
TimestampedKeyAndJoinSide<K> prevKey = null;
|
||||
|
||||
while (it.hasNext()) {
|
||||
boolean outerJoinLeftBreak = false;
|
||||
boolean outerJoinRightBreak = false;
|
||||
final KeyValue<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<V1, V2>> next = it.next();
|
||||
final TimestampedKeyAndJoinSide<K> timestampedKeyAndJoinSide = next.key;
|
||||
final LeftOrRightValue<V1, V2> value = next.value;
|
||||
|
@ -230,8 +234,19 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K,
|
|||
sharedTimeTracker.minTime = timestamp;
|
||||
|
||||
// Skip next records if window has not closed
|
||||
if (timestamp + joinAfterMs + joinGraceMs >= sharedTimeTracker.streamTime) {
|
||||
break;
|
||||
final long outerJoinLookBackTimeMs = getOuterJoinLookBackTimeMs(timestampedKeyAndJoinSide);
|
||||
if (sharedTimeTracker.minTime + outerJoinLookBackTimeMs + joinGraceMs >= sharedTimeTracker.streamTime) {
|
||||
if (timestampedKeyAndJoinSide.isLeftSide()) {
|
||||
outerJoinLeftBreak = true; // there are no more candidates to emit on left-outerJoin-side
|
||||
} else {
|
||||
outerJoinRightBreak = true; // there are no more candidates to emit on right-outerJoin-side
|
||||
}
|
||||
if (outerJoinLeftBreak && outerJoinRightBreak) {
|
||||
break; // there are no more candidates to emit on left-outerJoin-side and
|
||||
// right-outerJoin-side
|
||||
} else {
|
||||
continue; // there are possibly candidates left on the other outerJoin-side
|
||||
}
|
||||
}
|
||||
|
||||
final VOut nullJoinedValue;
|
||||
|
@ -268,6 +283,15 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K,
|
|||
}
|
||||
}
|
||||
|
||||
private long getOuterJoinLookBackTimeMs(final TimestampedKeyAndJoinSide<K> timestampedKeyAndJoinSide) {
|
||||
// depending on the JoinSide we fill in the outerJoinLookBackTimeMs
|
||||
if (timestampedKeyAndJoinSide.isLeftSide()) {
|
||||
return windowsAfterMs; // On the left-JoinSide we look back in time
|
||||
} else {
|
||||
return windowsBeforeMs; // On the right-JoinSide we look forward in time
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
sharedTimeTrackerSupplier.remove(context().taskId());
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.kafka.streams.KafkaStreams;
|
|||
import org.apache.kafka.streams.KeyValue;
|
||||
import org.apache.kafka.streams.StreamsBuilder;
|
||||
import org.apache.kafka.streams.StreamsConfig;
|
||||
import org.apache.kafka.streams.StreamsConfig.InternalConfig;
|
||||
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
|
||||
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
|
||||
import org.apache.kafka.streams.kstream.JoinWindows;
|
||||
|
@ -99,6 +100,7 @@ public class KStreamKStreamIntegrationTest {
|
|||
final String safeTestName = safeUniqueTestName(testInfo);
|
||||
streamsConfig = getStreamsConfig(safeTestName);
|
||||
streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, stateDirBasePath);
|
||||
streamsConfig.put(InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX, 0L);
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
|
|
|
@ -436,6 +436,184 @@ public class KStreamKStreamLeftJoinTest {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLeftJoinedRecordsWithZeroAfterAreEmitted() {
|
||||
final StreamsBuilder builder = new StreamsBuilder();
|
||||
|
||||
final int[] expectedKeys = new int[] {0, 1, 2, 3};
|
||||
|
||||
final KStream<Integer, String> stream1;
|
||||
final KStream<Integer, String> stream2;
|
||||
final KStream<Integer, String> joined;
|
||||
final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
|
||||
stream1 = builder.stream(topic1, consumed);
|
||||
stream2 = builder.stream(topic2, consumed);
|
||||
|
||||
joined = stream1.leftJoin(
|
||||
stream2,
|
||||
MockValueJoiner.TOSTRING_JOINER,
|
||||
JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)).after(ZERO),
|
||||
StreamJoined.with(Serdes.Integer(),
|
||||
Serdes.String(),
|
||||
Serdes.String())
|
||||
);
|
||||
joined.process(supplier);
|
||||
|
||||
final Collection<Set<String>> copartitionGroups =
|
||||
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
|
||||
|
||||
assertEquals(1, copartitionGroups.size());
|
||||
assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
|
||||
|
||||
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) {
|
||||
final TestInputTopic<Integer, String> inputTopic1 =
|
||||
driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
|
||||
final TestInputTopic<Integer, String> inputTopic2 =
|
||||
driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
|
||||
final MockApiProcessor<Integer, String, Void, Void> processor = supplier.theCapturedProcessor();
|
||||
|
||||
processor.init(null);
|
||||
|
||||
// push four items with increasing timestamps to the primary stream; the other window is empty;
|
||||
// this should emit the first three left-joined items;
|
||||
// A3 is not triggered yet
|
||||
// w1 = {}
|
||||
// w2 = {}
|
||||
// --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
|
||||
// w2 = {}
|
||||
long time = 1000L;
|
||||
for (int i = 0; i < expectedKeys.length; i++) {
|
||||
inputTopic1.pipeInput(expectedKeys[i], "A" + expectedKeys[i], time + i);
|
||||
}
|
||||
processor.checkAndClearProcessResult(
|
||||
new KeyValueTimestamp<>(0, "A0+null", 1000L),
|
||||
new KeyValueTimestamp<>(1, "A1+null", 1001L),
|
||||
new KeyValueTimestamp<>(2, "A2+null", 1002L)
|
||||
);
|
||||
|
||||
// push four items smaller timestamps (out of window) to the secondary stream;
|
||||
// this should produce four joined items
|
||||
// w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
|
||||
// w2 = {}
|
||||
// --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
|
||||
// w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999) }
|
||||
time = 1000L - 1L;
|
||||
for (final int expectedKey : expectedKeys) {
|
||||
inputTopic2.pipeInput(expectedKey, "a" + expectedKey, time);
|
||||
}
|
||||
processor.checkAndClearProcessResult(
|
||||
new KeyValueTimestamp<>(0, "A0+a0", 1000L),
|
||||
new KeyValueTimestamp<>(1, "A1+a1", 1001L),
|
||||
new KeyValueTimestamp<>(2, "A2+a2", 1002L),
|
||||
new KeyValueTimestamp<>(3, "A3+a3", 1003L)
|
||||
);
|
||||
|
||||
// push four items with increased timestamps to the secondary stream;
|
||||
// this should produce four joined item
|
||||
// w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
|
||||
// w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999) }
|
||||
// --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
|
||||
// w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999),
|
||||
// 0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3 (ts: 1000) }
|
||||
time += 1L;
|
||||
for (final int expectedKey : expectedKeys) {
|
||||
inputTopic2.pipeInput(expectedKey, "b" + expectedKey, time);
|
||||
}
|
||||
processor.checkAndClearProcessResult(
|
||||
new KeyValueTimestamp<>(0, "A0+b0", 1000L),
|
||||
new KeyValueTimestamp<>(1, "A1+b1", 1001L),
|
||||
new KeyValueTimestamp<>(2, "A2+b2", 1002L),
|
||||
new KeyValueTimestamp<>(3, "A3+b3", 1003L)
|
||||
);
|
||||
|
||||
// push four items with increased timestamps to the secondary stream;
|
||||
// this should produce only three joined items;
|
||||
// c0 arrives too late to be joined with A0
|
||||
// w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
|
||||
// w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999),
|
||||
// 0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3 (ts: 1000) }
|
||||
// --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
|
||||
// w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999),
|
||||
// 0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3 (ts: 1000),
|
||||
// 0:c0 (ts: 1001), 1:c1 (ts: 1001), 2:c2 (ts: 1001), 3:c3 (ts: 1001) }
|
||||
time += 1L;
|
||||
for (final int expectedKey : expectedKeys) {
|
||||
inputTopic2.pipeInput(expectedKey, "c" + expectedKey, time);
|
||||
}
|
||||
processor.checkAndClearProcessResult(
|
||||
new KeyValueTimestamp<>(1, "A1+c1", 1001L),
|
||||
new KeyValueTimestamp<>(2, "A2+c2", 1002L),
|
||||
new KeyValueTimestamp<>(3, "A3+c3", 1003L)
|
||||
);
|
||||
|
||||
// push four items with increased timestamps to the secondary stream;
|
||||
// this should produce only two joined items;
|
||||
// d0 and d1 arrive too late to be joined with A0 and A1
|
||||
// w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
|
||||
// w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999),
|
||||
// 0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3 (ts: 1000),
|
||||
// 0:c0 (ts: 1001), 1:c1 (ts: 1001), 2:c2 (ts: 1001), 3:c3 (ts: 1001) }
|
||||
// --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
|
||||
// w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999),
|
||||
// 0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3 (ts: 1000),
|
||||
// 0:c0 (ts: 1001), 1:c1 (ts: 1001), 2:c2 (ts: 1001), 3:c3 (ts: 1001),
|
||||
// 0:d0 (ts: 1002), 1:d1 (ts: 1002), 2:d2 (ts: 1002), 3:d3 (ts: 1002) }
|
||||
time += 1L;
|
||||
for (final int expectedKey : expectedKeys) {
|
||||
inputTopic2.pipeInput(expectedKey, "d" + expectedKey, time);
|
||||
}
|
||||
processor.checkAndClearProcessResult(
|
||||
new KeyValueTimestamp<>(2, "A2+d2", 1002L),
|
||||
new KeyValueTimestamp<>(3, "A3+d3", 1003L)
|
||||
);
|
||||
|
||||
// push four items with increased timestamps to the secondary stream;
|
||||
// this should produce one joined item;
|
||||
// only e3 can be joined with A3;
|
||||
// e0, e1 and e2 arrive too late to be joined with A0, A1 and A2
|
||||
// w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
|
||||
// w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999),
|
||||
// 0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3 (ts: 1000),
|
||||
// 0:c0 (ts: 1001), 1:c1 (ts: 1001), 2:c2 (ts: 1001), 3:c3 (ts: 1001),
|
||||
// 0:d0 (ts: 1002), 1:d1 (ts: 1002), 2:d2 (ts: 1002), 3:d3 (ts: 1002) }
|
||||
// --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
|
||||
// w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999),
|
||||
// 0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3 (ts: 1000),
|
||||
// 0:c0 (ts: 1001), 1:c1 (ts: 1001), 2:c2 (ts: 1001), 3:c3 (ts: 1001),
|
||||
// 0:d0 (ts: 1002), 1:d1 (ts: 1002), 2:d2 (ts: 1002), 3:d3 (ts: 1002),
|
||||
// 0:e0 (ts: 1003), 1:e1 (ts: 1003), 2:e2 (ts: 1003), 3:e3 (ts: 1003) }
|
||||
time += 1L;
|
||||
for (final int expectedKey : expectedKeys) {
|
||||
inputTopic2.pipeInput(expectedKey, "e" + expectedKey, time);
|
||||
}
|
||||
processor.checkAndClearProcessResult(
|
||||
new KeyValueTimestamp<>(3, "A3+e3", 1003L)
|
||||
);
|
||||
|
||||
// push four items with larger timestamps to the secondary stream;
|
||||
// no (non-)joined items can be produced
|
||||
//
|
||||
// w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
|
||||
// w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999),
|
||||
// 0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3 (ts: 1000),
|
||||
// 0:c0 (ts: 1001), 1:c1 (ts: 1001), 2:c2 (ts: 1001), 3:c3 (ts: 1001),
|
||||
// 0:d0 (ts: 1002), 1:d1 (ts: 1002), 2:d2 (ts: 1002), 3:d3 (ts: 1002),
|
||||
// 0:e0 (ts: 1003), 1:e1 (ts: 1003), 2:e2 (ts: 1003), 3:e3 (ts: 1003) }
|
||||
// --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) }
|
||||
// w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999),
|
||||
// 0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3 (ts: 1000),
|
||||
// 0:c0 (ts: 1001), 1:c1 (ts: 1001), 2:c2 (ts: 1001), 3:c3 (ts: 1001),
|
||||
// 0:d0 (ts: 1002), 1:d1 (ts: 1002), 2:d2 (ts: 1002), 3:d3 (ts: 1002),
|
||||
// 0:e0 (ts: 1003), 1:e1 (ts: 1003), 2:e2 (ts: 1003), 3:e3 (ts: 1003),
|
||||
// 0:f0 (ts: 1100), 1:f1 (ts: 1100), 2:f2 (ts: 1100), 3:f3 (ts: 1100) }
|
||||
time = 1000 + 100L;
|
||||
for (final int expectedKey : expectedKeys) {
|
||||
inputTopic2.pipeInput(expectedKey, "f" + expectedKey, time);
|
||||
}
|
||||
processor.checkAndClearProcessResult();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLeftJoinWithInMemoryCustomSuppliers() {
|
||||
final JoinWindows joinWindows = JoinWindows.ofTimeDifferenceAndGrace(ofMillis(100L), ofMillis(0L));
|
||||
|
@ -609,8 +787,9 @@ public class KStreamKStreamLeftJoinTest {
|
|||
inputTopic1.pipeInput(1, "A1", 100L);
|
||||
processor.checkAndClearProcessResult();
|
||||
|
||||
// push one item to the other window that has a join; this should produce non-joined records with a closed window first, then
|
||||
// the joined records
|
||||
// push one item to the other window that has a join;
|
||||
// this should produce the joined record first;
|
||||
// then non-joined record with a closed window
|
||||
// by the time they were produced before
|
||||
// w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) }
|
||||
// w2 = { }
|
||||
|
|
|
@ -108,11 +108,11 @@ public class KStreamKStreamOuterJoinTest {
|
|||
inputTopic2.pipeInput(1, "b1", 0L);
|
||||
|
||||
processor.checkAndClearProcessResult(
|
||||
new KeyValueTimestamp<>(0, "A0+null", 0L),
|
||||
new KeyValueTimestamp<>(0, "A0-0+null", 0L),
|
||||
new KeyValueTimestamp<>(0, "A0+a0", 0L),
|
||||
new KeyValueTimestamp<>(0, "A0-0+a0", 0L),
|
||||
new KeyValueTimestamp<>(1, "null+b1", 0L)
|
||||
new KeyValueTimestamp<>(0, "A0+null", 0L),
|
||||
new KeyValueTimestamp<>(0, "A0-0+null", 0L),
|
||||
new KeyValueTimestamp<>(0, "A0+a0", 0L),
|
||||
new KeyValueTimestamp<>(0, "A0-0+a0", 0L),
|
||||
new KeyValueTimestamp<>(1, "null+b1", 0L)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
@ -438,13 +438,13 @@ public class KStreamKStreamOuterJoinTest {
|
|||
inputTopic1.pipeInput(1, "A1", 100L);
|
||||
processor.checkAndClearProcessResult();
|
||||
|
||||
// push one item to the other window that has a join; this should produce non-joined records with a closed window first, then
|
||||
// the joined records
|
||||
// by the time they were produced before
|
||||
// push one item to the other window that has a join;
|
||||
// this should produce the not-joined record first;
|
||||
// then the joined record
|
||||
// w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) }
|
||||
// w2 = { }
|
||||
// --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) }
|
||||
// --> w2 = { 0:a0 (ts: 110) }
|
||||
// --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) }
|
||||
// --> w2 = { 1:a1 (ts: 110) }
|
||||
inputTopic2.pipeInput(1, "a1", 110L);
|
||||
processor.checkAndClearProcessResult(
|
||||
new KeyValueTimestamp<>(0, "A0+null", 0L),
|
||||
|
@ -788,7 +788,7 @@ public class KStreamKStreamOuterJoinTest {
|
|||
new KeyValueTimestamp<>(1, "A1+null", 1L)
|
||||
);
|
||||
|
||||
// push one item to the other stream; this should not produce any items
|
||||
// push one item to the other stream; this should produce one right-join item
|
||||
// w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
|
||||
// w2 = { 0:a0 (ts: 100), 1:a1 (ts: 102) }
|
||||
// --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
|
||||
|
@ -841,7 +841,8 @@ public class KStreamKStreamOuterJoinTest {
|
|||
final MockApiProcessor<Integer, String, Void, Void> processor = supplier.theCapturedProcessor();
|
||||
long time = 0L;
|
||||
|
||||
// push two items to the primary stream; the other window is empty; this should not produce any item
|
||||
// push two items to the primary stream; the other window is empty;
|
||||
// this should produce one left-joined item
|
||||
// w1 = {}
|
||||
// w2 = {}
|
||||
// --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
|
||||
|
@ -849,7 +850,9 @@ public class KStreamKStreamOuterJoinTest {
|
|||
for (int i = 0; i < 2; i++) {
|
||||
inputTopic1.pipeInput(expectedKeys[i], "A" + expectedKeys[i], time + i);
|
||||
}
|
||||
processor.checkAndClearProcessResult();
|
||||
processor.checkAndClearProcessResult(
|
||||
new KeyValueTimestamp<>(0, "A0+null", 0L)
|
||||
);
|
||||
|
||||
// push one item to the other stream; this should produce one full-join item
|
||||
// w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
|
||||
|
@ -863,7 +866,8 @@ public class KStreamKStreamOuterJoinTest {
|
|||
new KeyValueTimestamp<>(1, "A1+a1", 1L)
|
||||
);
|
||||
|
||||
// push one item to the other stream; this should produce one left-join item
|
||||
// push one item to the other stream;
|
||||
// this should not produce any item
|
||||
// w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
|
||||
// w2 = { 1:a1 (ts: 1) }
|
||||
// --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
|
||||
|
@ -871,9 +875,7 @@ public class KStreamKStreamOuterJoinTest {
|
|||
time += 100;
|
||||
inputTopic2.pipeInput(expectedKeys[2], "a" + expectedKeys[2], time);
|
||||
|
||||
processor.checkAndClearProcessResult(
|
||||
new KeyValueTimestamp<>(0, "A0+null", 0L)
|
||||
);
|
||||
processor.checkAndClearProcessResult();
|
||||
|
||||
// push one item to the other stream; this should not produce any item
|
||||
// w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
|
||||
|
@ -884,11 +886,12 @@ public class KStreamKStreamOuterJoinTest {
|
|||
|
||||
processor.checkAndClearProcessResult();
|
||||
|
||||
// push one item to the first stream; this should produce one full-join item
|
||||
// push one item to the first stream;
|
||||
// this should produce one inner-join item;
|
||||
// w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) }
|
||||
// w2 = { 1:a1 (ts: 1), 2:a2 (ts: 101), 3:a3 (ts: 101) }
|
||||
// --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1), 2:A2 (ts: 201) }
|
||||
// --> w2 = { 1:a1 (ts: 1), 2:a2 (ts: 101), 3:a3 (ts: 101 }
|
||||
// --> w2 = { 1:a1 (ts: 1), 2:a2 (ts: 101), 3:a3 (ts: 101) }
|
||||
time += 100;
|
||||
inputTopic1.pipeInput(expectedKeys[2], "A" + expectedKeys[2], time);
|
||||
|
||||
|
|
Loading…
Reference in New Issue