diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordTest.java index d1d3b24afef..41e9160f122 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordTest.java @@ -31,7 +31,7 @@ public class ConsumerRecordTest { String key = "key"; String value = "value"; - ConsumerRecord record = new ConsumerRecord(topic, partition, offset, key, value); + ConsumerRecord record = new ConsumerRecord<>(topic, partition, offset, key, value); assertEquals(topic, record.topic()); assertEquals(partition, record.partition()); assertEquals(offset, record.offset()); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java index 8bac3e38363..815b5b4e26f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java @@ -124,30 +124,39 @@ public interface ProcessorContext { void commit(); /** - * Returns the topic name of the current input record + * Returns the topic name of the current input record; could be null if it is not + * available (for example, if this method is invoked from the punctuate call) * * @return the topic name */ String topic(); /** - * Returns the partition id of the current input record + * Returns the partition id of the current input record; could be -1 if it is not + * available (for example, if this method is invoked from the punctuate call) * * @return the partition id */ int partition(); /** - * Returns the offset of the current input record + * Returns the offset of the current input record; could be -1 if it is not + * available (for example, if this method is invoked from the punctuate call) * * @return the offset */ long offset(); /** - * Returns the timestamp of the current input record. The timestamp is extracted from + * Returns the current timestamp. + * + * If it is triggered while processing a record streamed from the source processor, timestamp is defined as the timestamp of the current input record; the timestamp is extracted from * {@link org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord} by {@link TimestampExtractor}. * + * If it is triggered while processing a record generated not from the source processor (for example, + * if this method is invoked from the punctuate call), timestamp is defined as the current + * task's stream time, which is defined as the smallest among all its input stream partition timestamps. + * * @return the timestamp */ long timestamp(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java index 3d8f792c7c8..ec89d47e08b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java @@ -135,17 +135,14 @@ public class PartitionGroup { * partition timestamp among all its partitions */ public long timestamp() { - if (queuesByTime.isEmpty()) { - // if there is no data in all partitions, return the smallest of their last known times - long timestamp = Long.MAX_VALUE; - for (RecordQueue queue : partitionQueues.values()) { - if (timestamp > queue.timestamp()) - timestamp = queue.timestamp(); - } - return timestamp; - } else { - return queuesByTime.peek().timestamp(); + // we should always return the smallest timestamp of all partitions + // to avoid group partition time goes backward + long timestamp = Long.MAX_VALUE; + for (RecordQueue queue : partitionQueues.values()) { + if (timestamp > queue.timestamp()) + timestamp = queue.timestamp(); } + return timestamp; } public int numBuffered(TopicPartition partition) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java index 5bda856cc00..1c398ac69b3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java @@ -30,6 +30,8 @@ import java.io.File; public class ProcessorContextImpl implements ProcessorContext, RecordCollector.Supplier { + public static final String NONEXIST_TOPIC = "__null_topic__"; + private final TaskId id; private final StreamTask task; private final StreamsMetrics metrics; @@ -118,7 +120,7 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S if (node == null) throw new TopologyBuilderException("Accessing from an unknown node"); - // TODO: restore this once we fix the ValueGetter initialiation issue + // TODO: restore this once we fix the ValueGetter initialization issue //if (!node.stateStores.contains(name)) // throw new TopologyBuilderException("Processor " + node.name() + " has no access to StateStore " + name); @@ -130,7 +132,12 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S if (task.record() == null) throw new IllegalStateException("This should not happen as topic() should only be called while a record is processed"); - return task.record().topic(); + String topic = task.record().topic(); + + if (topic.equals(NONEXIST_TOPIC)) + return null; + else + return topic; } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java index 6db83a11c99..50e3a0b27b3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java @@ -48,7 +48,7 @@ public class ProcessorNode { return name; } - public final Processor processor() { + public final Processor processor() { return processor; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java index dc9a50d32c2..758cfb001e0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java @@ -22,7 +22,7 @@ public class PunctuationSchedule extends Stamped { final long interval; public PunctuationSchedule(ProcessorNode node, long interval) { - this(node, System.currentTimeMillis(), interval); + this(node, 0, interval); } public PunctuationSchedule(ProcessorNode node, long time, long interval) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java index d5a9683db80..ea008b827e5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java @@ -108,37 +108,37 @@ public class StandbyContextImpl implements ProcessorContext, RecordCollector.Sup @Override public StateStore getStateStore(String name) { - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException("getStateStore() not supported."); } @Override public String topic() { - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException("topic() not supported."); } @Override public int partition() { - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException("partition() not supported."); } @Override public long offset() { - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException("offset() not supported."); } @Override public long timestamp() { - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException("timestamp() not supported."); } @Override public void forward(K key, V value) { - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException("forward() not supported."); } @Override public void forward(K key, V value, int childIndex) { - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException("forward() not supported."); } @Override @@ -148,11 +148,11 @@ public class StandbyContextImpl implements ProcessorContext, RecordCollector.Sup @Override public void commit() { - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException("commit() not supported."); } @Override public void schedule(long interval) { - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException("schedule() not supported."); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index a48498010ba..53d0a8df284 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -43,6 +43,8 @@ public class StreamTask extends AbstractTask implements Punctuator { private static final Logger log = LoggerFactory.getLogger(StreamTask.class); + private static final ConsumerRecord DUMMY_RECORD = new ConsumerRecord<>(ProcessorContextImpl.NONEXIST_TOPIC, -1, -1L, null, null); + private final int maxBufferedSize; private final PartitionGroup partitionGroup; @@ -202,11 +204,11 @@ public class StreamTask extends AbstractTask implements Punctuator { /** * Possibly trigger registered punctuation functions if - * current time has reached the defined stamp - * - * @param timestamp + * current partition group timestamp has reached the defined stamp */ - public boolean maybePunctuate(long timestamp) { + public boolean maybePunctuate() { + long timestamp = partitionGroup.timestamp(); + return punctuationQueue.mayPunctuate(timestamp, this); } @@ -216,10 +218,13 @@ public class StreamTask extends AbstractTask implements Punctuator { throw new IllegalStateException("Current node is not null"); currNode = node; + currRecord = new StampedRecord(DUMMY_RECORD, timestamp); + try { node.processor().punctuate(timestamp); } finally { currNode = null; + currRecord = null; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index c2a8e06e9b1..38dc356a672 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -341,8 +341,9 @@ public class StreamThread extends Thread { totalNumBuffered = 0; + // try to process one fetch record from each task via the topology, and also trigger punctuate + // functions if necessary, which may result in more records going through the topology in this loop if (!activeTasks.isEmpty()) { - // try to process one record from each task for (StreamTask task : activeTasks.values()) { long startProcess = time.milliseconds(); @@ -431,7 +432,9 @@ public class StreamThread extends Thread { try { long now = time.milliseconds(); - if (task.maybePunctuate(now)) + // check whether we should punctuate based on the task's partition group timestamp; + // which are essentially based on record timestamp. + if (task.maybePunctuate()) sensors.punctuateTimeSensor.record(time.milliseconds() - now); } catch (KafkaException e) { diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java index d24ab15461c..19a941177e0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java @@ -90,7 +90,7 @@ public class KStreamKStreamJoinTest { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } - processor.checkAndClearResult(); + processor.checkAndClearProcessResult(); // push two items to the other stream. this should produce two items. // w1 = { 0:X0, 1:X1 } @@ -102,7 +102,7 @@ public class KStreamKStreamJoinTest { driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); } - processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1"); + processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1"); // push all four items to the primary stream. this should produce two items. // w1 = { 0:X0, 1:X1 } @@ -114,7 +114,7 @@ public class KStreamKStreamJoinTest { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } - processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1"); + processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1"); // push all items to the other stream. this should produce six items. // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 } @@ -126,7 +126,7 @@ public class KStreamKStreamJoinTest { driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); } - processor.checkAndClearResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); + processor.checkAndClearProcessResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); // push all four items to the primary stream. this should produce six items. // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 } @@ -138,7 +138,7 @@ public class KStreamKStreamJoinTest { driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); } - processor.checkAndClearResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3"); + processor.checkAndClearProcessResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3"); // push two items to the other stream. this should produce six item. // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 } @@ -150,7 +150,7 @@ public class KStreamKStreamJoinTest { driver.process(topic2, expectedKeys[i], "YYY" + expectedKeys[i]); } - processor.checkAndClearResult("0:X0+YYY0", "0:X0+YYY0", "0:XX0+YYY0", "1:X1+YYY1", "1:X1+YYY1", "1:XX1+YYY1"); + processor.checkAndClearProcessResult("0:X0+YYY0", "0:X0+YYY0", "0:XX0+YYY0", "1:X1+YYY1", "1:X1+YYY1", "1:XX1+YYY1"); } finally { Utils.delete(baseDir); @@ -195,7 +195,7 @@ public class KStreamKStreamJoinTest { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } - processor.checkAndClearResult("0:X0+null", "1:X1+null"); + processor.checkAndClearProcessResult("0:X0+null", "1:X1+null"); // push two items to the other stream. this should produce two items. // w1 = { 0:X0, 1:X1 } @@ -207,7 +207,7 @@ public class KStreamKStreamJoinTest { driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); } - processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1"); + processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1"); // push all four items to the primary stream. this should produce four items. // w1 = { 0:X0, 1:X1 } @@ -219,7 +219,7 @@ public class KStreamKStreamJoinTest { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } - processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null"); + processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null"); // push all items to the other stream. this should produce six items. // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 } @@ -231,7 +231,7 @@ public class KStreamKStreamJoinTest { driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); } - processor.checkAndClearResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); + processor.checkAndClearProcessResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); // push all four items to the primary stream. this should produce six items. // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 } @@ -243,7 +243,7 @@ public class KStreamKStreamJoinTest { driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); } - processor.checkAndClearResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3"); + processor.checkAndClearProcessResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3"); // push two items to the other stream. this should produce six item. // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 } @@ -255,7 +255,7 @@ public class KStreamKStreamJoinTest { driver.process(topic2, expectedKeys[i], "YYY" + expectedKeys[i]); } - processor.checkAndClearResult("0:X0+YYY0", "0:X0+YYY0", "0:XX0+YYY0", "1:X1+YYY1", "1:X1+YYY1", "1:XX1+YYY1"); + processor.checkAndClearProcessResult("0:X0+YYY0", "0:X0+YYY0", "0:XX0+YYY0", "1:X1+YYY1", "1:X1+YYY1", "1:XX1+YYY1"); } finally { Utils.delete(baseDir); @@ -302,7 +302,7 @@ public class KStreamKStreamJoinTest { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } - processor.checkAndClearResult(); + processor.checkAndClearProcessResult(); // push two items to the other stream. this should produce two items. // w1 = { 0:X0, 1:X1 } @@ -314,7 +314,7 @@ public class KStreamKStreamJoinTest { driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); } - processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1"); + processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1"); // clear logically time = 1000L; @@ -323,7 +323,7 @@ public class KStreamKStreamJoinTest { driver.setTime(time + i); driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } - processor.checkAndClearResult(); + processor.checkAndClearProcessResult(); // gradually expires items in w1 // w1 = { 0:X0, 1:X1, 2:X2, 3:X3 } @@ -335,35 +335,35 @@ public class KStreamKStreamJoinTest { driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); } - processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); + processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); driver.setTime(++time); for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); } - processor.checkAndClearResult("1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); + processor.checkAndClearProcessResult("1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); driver.setTime(++time); for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); } - processor.checkAndClearResult("2:X2+YY2", "3:X3+YY3"); + processor.checkAndClearProcessResult("2:X2+YY2", "3:X3+YY3"); driver.setTime(++time); for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); } - processor.checkAndClearResult("3:X3+YY3"); + processor.checkAndClearProcessResult("3:X3+YY3"); driver.setTime(++time); for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); } - processor.checkAndClearResult(); + processor.checkAndClearProcessResult(); // go back to the time before expiration @@ -373,35 +373,35 @@ public class KStreamKStreamJoinTest { driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); } - processor.checkAndClearResult(); + processor.checkAndClearProcessResult(); driver.setTime(++time); for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); } - processor.checkAndClearResult("0:X0+YY0"); + processor.checkAndClearProcessResult("0:X0+YY0"); driver.setTime(++time); for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); } - processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1"); + processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1"); driver.setTime(++time); for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); } - processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2"); + processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2"); driver.setTime(++time); for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); } - processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); + processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); // clear (logically) time = 2000L; @@ -411,7 +411,7 @@ public class KStreamKStreamJoinTest { driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); } - processor.checkAndClearResult(); + processor.checkAndClearProcessResult(); // gradually expires items in w2 // w2 = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 } @@ -422,35 +422,35 @@ public class KStreamKStreamJoinTest { driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); } - processor.checkAndClearResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3"); + processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3"); driver.setTime(++time); for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); } - processor.checkAndClearResult("1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3"); + processor.checkAndClearProcessResult("1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3"); driver.setTime(++time); for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); } - processor.checkAndClearResult("2:XX2+Y2", "3:XX3+Y3"); + processor.checkAndClearProcessResult("2:XX2+Y2", "3:XX3+Y3"); driver.setTime(++time); for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); } - processor.checkAndClearResult("3:XX3+Y3"); + processor.checkAndClearProcessResult("3:XX3+Y3"); driver.setTime(++time); for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); } - processor.checkAndClearResult(); + processor.checkAndClearProcessResult(); // go back to the time before expiration @@ -460,35 +460,35 @@ public class KStreamKStreamJoinTest { driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); } - processor.checkAndClearResult(); + processor.checkAndClearProcessResult(); driver.setTime(++time); for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); } - processor.checkAndClearResult("0:XX0+Y0"); + processor.checkAndClearProcessResult("0:XX0+Y0"); driver.setTime(++time); for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); } - processor.checkAndClearResult("0:XX0+Y0", "1:XX1+Y1"); + processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1"); driver.setTime(++time); for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); } - processor.checkAndClearResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2"); + processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2"); driver.setTime(++time); for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); } - processor.checkAndClearResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3"); + processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3"); } finally { Utils.delete(baseDir); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java index 166e8ba0385..65226d338f2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java @@ -88,7 +88,7 @@ public class KStreamKStreamLeftJoinTest { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } - processor.checkAndClearResult("0:X0+null", "1:X1+null"); + processor.checkAndClearProcessResult("0:X0+null", "1:X1+null"); // push two items to the other stream. this should produce two items. // w {} @@ -98,7 +98,7 @@ public class KStreamKStreamLeftJoinTest { driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); } - processor.checkAndClearResult(); + processor.checkAndClearProcessResult(); // push all four items to the primary stream. this should produce four items. // w = { 0:Y0, 1:Y1 } @@ -108,7 +108,7 @@ public class KStreamKStreamLeftJoinTest { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } - processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null"); + processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null"); // push all items to the other stream. this should produce no items. // w = { 0:Y0, 1:Y1 } @@ -118,7 +118,7 @@ public class KStreamKStreamLeftJoinTest { driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); } - processor.checkAndClearResult(); + processor.checkAndClearProcessResult(); // push all four items to the primary stream. this should produce four items. // w = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 @@ -128,7 +128,7 @@ public class KStreamKStreamLeftJoinTest { driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); } - processor.checkAndClearResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3"); + processor.checkAndClearProcessResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3"); } finally { Utils.delete(baseDir); @@ -173,7 +173,7 @@ public class KStreamKStreamLeftJoinTest { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } - processor.checkAndClearResult("0:X0+null", "1:X1+null"); + processor.checkAndClearProcessResult("0:X0+null", "1:X1+null"); // push two items to the other stream. this should produce no items. // w = {} @@ -183,7 +183,7 @@ public class KStreamKStreamLeftJoinTest { driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); } - processor.checkAndClearResult(); + processor.checkAndClearProcessResult(); // clear logically time = 1000L; @@ -196,7 +196,7 @@ public class KStreamKStreamLeftJoinTest { driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); } - processor.checkAndClearResult(); + processor.checkAndClearProcessResult(); // gradually expire items in window. // w = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 } @@ -207,35 +207,35 @@ public class KStreamKStreamLeftJoinTest { driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); } - processor.checkAndClearResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3"); + processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3"); driver.setTime(++time); for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); } - processor.checkAndClearResult("0:XX0+null", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3"); + processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3"); driver.setTime(++time); for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); } - processor.checkAndClearResult("0:XX0+null", "1:XX1+null", "2:XX2+Y2", "3:XX3+Y3"); + processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+Y2", "3:XX3+Y3"); driver.setTime(++time); for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); } - processor.checkAndClearResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+Y3"); + processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+Y3"); driver.setTime(++time); for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); } - processor.checkAndClearResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+null"); + processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+null"); // go back to the time before expiration @@ -245,35 +245,35 @@ public class KStreamKStreamLeftJoinTest { driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); } - processor.checkAndClearResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+null"); + processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+null"); driver.setTime(++time); for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); } - processor.checkAndClearResult("0:XX0+Y0", "1:XX1+null", "2:XX2+null", "3:XX3+null"); + processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+null", "2:XX2+null", "3:XX3+null"); driver.setTime(++time); for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); } - processor.checkAndClearResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+null", "3:XX3+null"); + processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+null", "3:XX3+null"); driver.setTime(++time); for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); } - processor.checkAndClearResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+null"); + processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+null"); driver.setTime(++time); for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); } - processor.checkAndClearResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3"); + processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3"); } finally { Utils.delete(baseDir); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java index 8e672a2311d..3acb59a4a0c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java @@ -95,7 +95,7 @@ public class KStreamKTableLeftJoinTest { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } - processor.checkAndClearResult("0:X0+null", "1:X1+null"); + processor.checkAndClearProcessResult("0:X0+null", "1:X1+null"); // push two items to the other stream. this should not produce any item. @@ -103,7 +103,7 @@ public class KStreamKTableLeftJoinTest { driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); } - processor.checkAndClearResult(); + processor.checkAndClearProcessResult(); // push all four items to the primary stream. this should produce four items. @@ -111,14 +111,14 @@ public class KStreamKTableLeftJoinTest { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } - processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null"); + processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null"); // push all items to the other stream. this should not produce any item for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); } - processor.checkAndClearResult(); + processor.checkAndClearProcessResult(); // push all four items to the primary stream. this should produce four items. @@ -126,7 +126,7 @@ public class KStreamKTableLeftJoinTest { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } - processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); + processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); // push two items with null to the other stream as deletes. this should not produce any item. @@ -134,7 +134,7 @@ public class KStreamKTableLeftJoinTest { driver.process(topic2, expectedKeys[i], null); } - processor.checkAndClearResult(); + processor.checkAndClearProcessResult(); // push all four items to the primary stream. this should produce four items. @@ -142,7 +142,7 @@ public class KStreamKTableLeftJoinTest { driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); } - processor.checkAndClearResult("0:XX0+null", "1:XX1+null", "2:XX2+YY2", "3:XX3+YY3"); + processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+YY2", "3:XX3+YY3"); } finally { Utils.delete(baseDir); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java index e19510f6793..3c7a1bdc604 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java @@ -182,15 +182,15 @@ public class KStreamWindowAggregateTest { driver.setTime(4L); driver.process(topic1, "A", "1"); - proc1.checkAndClearResult( + proc1.checkAndClearProcessResult( "[A@0]:0+1", "[B@0]:0+2", "[C@0]:0+3", "[D@0]:0+4", "[A@0]:0+1+1" ); - proc2.checkAndClearResult(); - proc3.checkAndClearResult( + proc2.checkAndClearProcessResult(); + proc3.checkAndClearProcessResult( "[A@0]:null", "[B@0]:null", "[C@0]:null", @@ -209,15 +209,15 @@ public class KStreamWindowAggregateTest { driver.setTime(9L); driver.process(topic1, "C", "3"); - proc1.checkAndClearResult( + proc1.checkAndClearProcessResult( "[A@0]:0+1+1+1", "[A@5]:0+1", "[B@0]:0+2+2", "[B@5]:0+2", "[D@0]:0+4+4", "[D@5]:0+4", "[B@0]:0+2+2+2", "[B@5]:0+2+2", "[C@0]:0+3+3", "[C@5]:0+3" ); - proc2.checkAndClearResult(); - proc3.checkAndClearResult( + proc2.checkAndClearProcessResult(); + proc3.checkAndClearProcessResult( "[A@0]:null", "[A@5]:null", "[B@0]:null", "[B@5]:null", "[D@0]:null", "[D@5]:null", @@ -236,15 +236,15 @@ public class KStreamWindowAggregateTest { driver.setTime(4L); driver.process(topic2, "A", "a"); - proc1.checkAndClearResult(); - proc2.checkAndClearResult( + proc1.checkAndClearProcessResult(); + proc2.checkAndClearProcessResult( "[A@0]:0+a", "[B@0]:0+b", "[C@0]:0+c", "[D@0]:0+d", "[A@0]:0+a+a" ); - proc3.checkAndClearResult( + proc3.checkAndClearProcessResult( "[A@0]:0+1+1+1%0+a", "[B@0]:0+2+2+2%0+b", "[C@0]:0+3+3%0+c", @@ -262,15 +262,15 @@ public class KStreamWindowAggregateTest { driver.setTime(9L); driver.process(topic2, "C", "c"); - proc1.checkAndClearResult(); - proc2.checkAndClearResult( + proc1.checkAndClearProcessResult(); + proc2.checkAndClearProcessResult( "[A@0]:0+a+a+a", "[A@5]:0+a", "[B@0]:0+b+b", "[B@5]:0+b", "[D@0]:0+d+d", "[D@5]:0+d", "[B@0]:0+b+b+b", "[B@5]:0+b+b", "[C@0]:0+c+c", "[C@5]:0+c" ); - proc3.checkAndClearResult( + proc3.checkAndClearProcessResult( "[A@0]:0+1+1+1%0+a+a+a", "[A@5]:0+1%0+a", "[B@0]:0+2+2+2%0+b+b", "[B@5]:0+2+2%0+b", "[D@0]:0+4+4%0+d+d", "[D@5]:0+4%0+d", diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java index 78d274eb695..ee26058193c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java @@ -74,8 +74,8 @@ public class KTableFilterTest { driver.process(topic1, "A", null); driver.process(topic1, "B", null); - proc2.checkAndClearResult("A:null", "B:2", "C:null", "D:4", "A:null", "B:null"); - proc3.checkAndClearResult("A:1", "B:null", "C:3", "D:null", "A:null", "B:null"); + proc2.checkAndClearProcessResult("A:null", "B:2", "C:null", "D:4", "A:null", "B:null"); + proc3.checkAndClearProcessResult("A:1", "B:null", "C:3", "D:null", "A:null", "B:null"); } @Test @@ -193,25 +193,25 @@ public class KTableFilterTest { driver.process(topic1, "B", 1); driver.process(topic1, "C", 1); - proc1.checkAndClearResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)"); - proc2.checkAndClearResult("A:(null<-null)", "B:(null<-null)", "C:(null<-null)"); + proc1.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)"); + proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)", "C:(null<-null)"); driver.process(topic1, "A", 2); driver.process(topic1, "B", 2); - proc1.checkAndClearResult("A:(2<-null)", "B:(2<-null)"); - proc2.checkAndClearResult("A:(2<-null)", "B:(2<-null)"); + proc1.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)"); + proc2.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)"); driver.process(topic1, "A", 3); - proc1.checkAndClearResult("A:(3<-null)"); - proc2.checkAndClearResult("A:(null<-null)"); + proc1.checkAndClearProcessResult("A:(3<-null)"); + proc2.checkAndClearProcessResult("A:(null<-null)"); driver.process(topic1, "A", null); driver.process(topic1, "B", null); - proc1.checkAndClearResult("A:(null<-null)", "B:(null<-null)"); - proc2.checkAndClearResult("A:(null<-null)", "B:(null<-null)"); + proc1.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)"); + proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)"); } finally { Utils.delete(stateDir); @@ -250,25 +250,25 @@ public class KTableFilterTest { driver.process(topic1, "B", 1); driver.process(topic1, "C", 1); - proc1.checkAndClearResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)"); - proc2.checkAndClearResult("A:(null<-null)", "B:(null<-null)", "C:(null<-null)"); + proc1.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)"); + proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)", "C:(null<-null)"); driver.process(topic1, "A", 2); driver.process(topic1, "B", 2); - proc1.checkAndClearResult("A:(2<-1)", "B:(2<-1)"); - proc2.checkAndClearResult("A:(2<-null)", "B:(2<-null)"); + proc1.checkAndClearProcessResult("A:(2<-1)", "B:(2<-1)"); + proc2.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)"); driver.process(topic1, "A", 3); - proc1.checkAndClearResult("A:(3<-2)"); - proc2.checkAndClearResult("A:(null<-2)"); + proc1.checkAndClearProcessResult("A:(3<-2)"); + proc2.checkAndClearProcessResult("A:(null<-2)"); driver.process(topic1, "A", null); driver.process(topic1, "B", null); - proc1.checkAndClearResult("A:(null<-3)", "B:(null<-2)"); - proc2.checkAndClearResult("A:(null<-null)", "B:(null<-2)"); + proc1.checkAndClearProcessResult("A:(null<-3)", "B:(null<-2)"); + proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-2)"); } finally { Utils.delete(stateDir); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java index 5f30574cd5a..f6ebbe1a083 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java @@ -100,7 +100,7 @@ public class KTableKTableJoinTest { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } - processor.checkAndClearResult("0:null", "1:null"); + processor.checkAndClearProcessResult("0:null", "1:null"); checkJoinedValues(getter, kv(0, null), kv(1, null), kv(2, null), kv(3, null)); // push two items to the other stream. this should produce two items. @@ -109,7 +109,7 @@ public class KTableKTableJoinTest { driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); } - processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1"); + processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1"); checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, null), kv(3, null)); // push all four items to the primary stream. this should produce four items. @@ -118,7 +118,7 @@ public class KTableKTableJoinTest { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } - processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1", "2:null", "3:null"); + processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:null", "3:null"); checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, null), kv(3, null)); // push all items to the other stream. this should produce four items. @@ -126,7 +126,7 @@ public class KTableKTableJoinTest { driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); } - processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); + processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3")); // push all four items to the primary stream. this should produce four items. @@ -135,7 +135,7 @@ public class KTableKTableJoinTest { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } - processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); + processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3")); // push two items with null to the other stream as deletes. this should produce two item. @@ -144,7 +144,7 @@ public class KTableKTableJoinTest { driver.process(topic2, expectedKeys[i], null); } - processor.checkAndClearResult("0:null", "1:null"); + processor.checkAndClearProcessResult("0:null", "1:null"); checkJoinedValues(getter, kv(0, null), kv(1, null), kv(2, "X2+YY2"), kv(3, "X3+YY3")); // push all four items to the primary stream. this should produce four items. @@ -153,7 +153,7 @@ public class KTableKTableJoinTest { driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); } - processor.checkAndClearResult("0:null", "1:null", "2:XX2+YY2", "3:XX3+YY3"); + processor.checkAndClearProcessResult("0:null", "1:null", "2:XX2+YY2", "3:XX3+YY3"); checkJoinedValues(getter, kv(0, null), kv(1, null), kv(2, "XX2+YY2"), kv(3, "XX3+YY3")); } finally { @@ -195,7 +195,7 @@ public class KTableKTableJoinTest { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } - proc.checkAndClearResult("0:(null<-null)", "1:(null<-null)"); + proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)"); // push two items to the other stream. this should produce two items. @@ -203,7 +203,7 @@ public class KTableKTableJoinTest { driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); } - proc.checkAndClearResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)"); + proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)"); // push all four items to the primary stream. this should produce four items. @@ -211,14 +211,14 @@ public class KTableKTableJoinTest { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } - proc.checkAndClearResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)", "2:(null<-null)", "3:(null<-null)"); + proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)", "2:(null<-null)", "3:(null<-null)"); // push all items to the other stream. this should produce four items. for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); } - proc.checkAndClearResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)"); + proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)"); // push all four items to the primary stream. this should produce four items. @@ -226,7 +226,7 @@ public class KTableKTableJoinTest { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } - proc.checkAndClearResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)"); + proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)"); // push two items with null to the other stream as deletes. this should produce two item. @@ -234,7 +234,7 @@ public class KTableKTableJoinTest { driver.process(topic2, expectedKeys[i], null); } - proc.checkAndClearResult("0:(null<-null)", "1:(null<-null)"); + proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)"); // push all four items to the primary stream. this should produce four items. @@ -242,7 +242,7 @@ public class KTableKTableJoinTest { driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); } - proc.checkAndClearResult("0:(null<-null)", "1:(null<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)"); + proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)"); } finally { Utils.delete(baseDir); @@ -285,7 +285,7 @@ public class KTableKTableJoinTest { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } - proc.checkAndClearResult("0:(null<-null)", "1:(null<-null)"); + proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)"); // push two items to the other stream. this should produce two items. @@ -293,7 +293,7 @@ public class KTableKTableJoinTest { driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); } - proc.checkAndClearResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)"); + proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)"); // push all four items to the primary stream. this should produce four items. @@ -301,14 +301,14 @@ public class KTableKTableJoinTest { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } - proc.checkAndClearResult("0:(X0+Y0<-X0+Y0)", "1:(X1+Y1<-X1+Y1)", "2:(null<-null)", "3:(null<-null)"); + proc.checkAndClearProcessResult("0:(X0+Y0<-X0+Y0)", "1:(X1+Y1<-X1+Y1)", "2:(null<-null)", "3:(null<-null)"); // push all items to the other stream. this should produce four items. for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); } - proc.checkAndClearResult("0:(X0+YY0<-X0+Y0)", "1:(X1+YY1<-X1+Y1)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)"); + proc.checkAndClearProcessResult("0:(X0+YY0<-X0+Y0)", "1:(X1+YY1<-X1+Y1)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)"); // push all four items to the primary stream. this should produce four items. @@ -316,7 +316,7 @@ public class KTableKTableJoinTest { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } - proc.checkAndClearResult("0:(X0+YY0<-X0+YY0)", "1:(X1+YY1<-X1+YY1)", "2:(X2+YY2<-X2+YY2)", "3:(X3+YY3<-X3+YY3)"); + proc.checkAndClearProcessResult("0:(X0+YY0<-X0+YY0)", "1:(X1+YY1<-X1+YY1)", "2:(X2+YY2<-X2+YY2)", "3:(X3+YY3<-X3+YY3)"); // push two items with null to the other stream as deletes. this should produce two item. @@ -324,7 +324,7 @@ public class KTableKTableJoinTest { driver.process(topic2, expectedKeys[i], null); } - proc.checkAndClearResult("0:(null<-X0+YY0)", "1:(null<-X1+YY1)"); + proc.checkAndClearProcessResult("0:(null<-X0+YY0)", "1:(null<-X1+YY1)"); // push all four items to the primary stream. this should produce four items. @@ -332,7 +332,7 @@ public class KTableKTableJoinTest { driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); } - proc.checkAndClearResult("0:(null<-null)", "1:(null<-null)", "2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)"); + proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)", "2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)"); } finally { Utils.delete(baseDir); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java index f92c5ca8141..449ea0501f4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java @@ -105,7 +105,7 @@ public class KTableKTableLeftJoinTest { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } - processor.checkAndClearResult("0:X0+null", "1:X1+null"); + processor.checkAndClearProcessResult("0:X0+null", "1:X1+null"); checkJoinedValues(getter, kv(0, "X0+null"), kv(1, "X1+null"), kv(2, null), kv(3, null)); // push two items to the other stream. this should produce two items. @@ -114,7 +114,7 @@ public class KTableKTableLeftJoinTest { driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); } - processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1"); + processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1"); checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, null), kv(3, null)); // push all four items to the primary stream. this should produce four items. @@ -123,7 +123,7 @@ public class KTableKTableLeftJoinTest { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } - processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null"); + processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null"); checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, "X2+null"), kv(3, "X3+null")); // push all items to the other stream. this should produce four items. @@ -131,7 +131,7 @@ public class KTableKTableLeftJoinTest { driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); } - processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); + processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3")); // push all four items to the primary stream. this should produce four items. @@ -140,7 +140,7 @@ public class KTableKTableLeftJoinTest { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } - processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); + processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3")); // push two items with null to the other stream as deletes. this should produce two item. @@ -149,7 +149,7 @@ public class KTableKTableLeftJoinTest { driver.process(topic2, expectedKeys[i], null); } - processor.checkAndClearResult("0:X0+null", "1:X1+null"); + processor.checkAndClearProcessResult("0:X0+null", "1:X1+null"); checkJoinedValues(getter, kv(0, "X0+null"), kv(1, "X1+null"), kv(2, "X2+YY2"), kv(3, "X3+YY3")); // push all four items to the primary stream. this should produce four items. @@ -158,7 +158,7 @@ public class KTableKTableLeftJoinTest { driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); } - processor.checkAndClearResult("0:XX0+null", "1:XX1+null", "2:XX2+YY2", "3:XX3+YY3"); + processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+YY2", "3:XX3+YY3"); checkJoinedValues(getter, kv(0, "XX0+null"), kv(1, "XX1+null"), kv(2, "XX2+YY2"), kv(3, "XX3+YY3")); } finally { @@ -200,7 +200,7 @@ public class KTableKTableLeftJoinTest { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } - proc.checkAndClearResult("0:(X0+null<-null)", "1:(X1+null<-null)"); + proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)"); // push two items to the other stream. this should produce two items. @@ -208,7 +208,7 @@ public class KTableKTableLeftJoinTest { driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); } - proc.checkAndClearResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)"); + proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)"); // push all four items to the primary stream. this should produce four items. @@ -216,14 +216,14 @@ public class KTableKTableLeftJoinTest { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } - proc.checkAndClearResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)", "2:(X2+null<-null)", "3:(X3+null<-null)"); + proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)", "2:(X2+null<-null)", "3:(X3+null<-null)"); // push all items to the other stream. this should produce four items. for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); } - proc.checkAndClearResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)"); + proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)"); // push all four items to the primary stream. this should produce four items. @@ -231,7 +231,7 @@ public class KTableKTableLeftJoinTest { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } - proc.checkAndClearResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)"); + proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)"); // push two items with null to the other stream as deletes. this should produce two item. @@ -239,7 +239,7 @@ public class KTableKTableLeftJoinTest { driver.process(topic2, expectedKeys[i], null); } - proc.checkAndClearResult("0:(X0+null<-null)", "1:(X1+null<-null)"); + proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)"); // push all four items to the primary stream. this should produce four items. @@ -247,7 +247,7 @@ public class KTableKTableLeftJoinTest { driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); } - proc.checkAndClearResult("0:(XX0+null<-null)", "1:(XX1+null<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)"); + proc.checkAndClearProcessResult("0:(XX0+null<-null)", "1:(XX1+null<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)"); } finally { Utils.delete(baseDir); @@ -290,7 +290,7 @@ public class KTableKTableLeftJoinTest { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } - proc.checkAndClearResult("0:(X0+null<-null)", "1:(X1+null<-null)"); + proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)"); // push two items to the other stream. this should produce two items. @@ -298,7 +298,7 @@ public class KTableKTableLeftJoinTest { driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); } - proc.checkAndClearResult("0:(X0+Y0<-X0+null)", "1:(X1+Y1<-X1+null)"); + proc.checkAndClearProcessResult("0:(X0+Y0<-X0+null)", "1:(X1+Y1<-X1+null)"); // push all four items to the primary stream. this should produce four items. @@ -306,14 +306,14 @@ public class KTableKTableLeftJoinTest { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } - proc.checkAndClearResult("0:(X0+Y0<-X0+Y0)", "1:(X1+Y1<-X1+Y1)", "2:(X2+null<-null)", "3:(X3+null<-null)"); + proc.checkAndClearProcessResult("0:(X0+Y0<-X0+Y0)", "1:(X1+Y1<-X1+Y1)", "2:(X2+null<-null)", "3:(X3+null<-null)"); // push all items to the other stream. this should produce four items. for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); } - proc.checkAndClearResult("0:(X0+YY0<-X0+Y0)", "1:(X1+YY1<-X1+Y1)", "2:(X2+YY2<-X2+null)", "3:(X3+YY3<-X3+null)"); + proc.checkAndClearProcessResult("0:(X0+YY0<-X0+Y0)", "1:(X1+YY1<-X1+Y1)", "2:(X2+YY2<-X2+null)", "3:(X3+YY3<-X3+null)"); // push all four items to the primary stream. this should produce four items. @@ -321,7 +321,7 @@ public class KTableKTableLeftJoinTest { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } - proc.checkAndClearResult("0:(X0+YY0<-X0+YY0)", "1:(X1+YY1<-X1+YY1)", "2:(X2+YY2<-X2+YY2)", "3:(X3+YY3<-X3+YY3)"); + proc.checkAndClearProcessResult("0:(X0+YY0<-X0+YY0)", "1:(X1+YY1<-X1+YY1)", "2:(X2+YY2<-X2+YY2)", "3:(X3+YY3<-X3+YY3)"); // push two items with null to the other stream as deletes. this should produce two item. @@ -329,7 +329,7 @@ public class KTableKTableLeftJoinTest { driver.process(topic2, expectedKeys[i], null); } - proc.checkAndClearResult("0:(X0+null<-X0+YY0)", "1:(X1+null<-X1+YY1)"); + proc.checkAndClearProcessResult("0:(X0+null<-X0+YY0)", "1:(X1+null<-X1+YY1)"); // push all four items to the primary stream. this should produce four items. @@ -337,7 +337,7 @@ public class KTableKTableLeftJoinTest { driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); } - proc.checkAndClearResult("0:(XX0+null<-X0+null)", "1:(XX1+null<-X1+null)", "2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)"); + proc.checkAndClearProcessResult("0:(XX0+null<-X0+null)", "1:(XX1+null<-X1+null)", "2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)"); } finally { Utils.delete(baseDir); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java index 6cc77e0b1b0..ea7476ae2e1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java @@ -100,7 +100,7 @@ public class KTableKTableOuterJoinTest { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } - processor.checkAndClearResult("0:X0+null", "1:X1+null"); + processor.checkAndClearProcessResult("0:X0+null", "1:X1+null"); checkJoinedValues(getter, kv(0, "X0+null"), kv(1, "X1+null"), kv(2, null), kv(3, null)); // push two items to the other stream. this should produce two items. @@ -109,7 +109,7 @@ public class KTableKTableOuterJoinTest { driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); } - processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1"); + processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1"); checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, null), kv(3, null)); // push all four items to the primary stream. this should produce four items. @@ -118,7 +118,7 @@ public class KTableKTableOuterJoinTest { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } - processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null"); + processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null"); checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, "X2+null"), kv(3, "X3+null")); // push all items to the other stream. this should produce four items. @@ -126,7 +126,7 @@ public class KTableKTableOuterJoinTest { driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); } - processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); + processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3")); // push all four items to the primary stream. this should produce four items. @@ -135,7 +135,7 @@ public class KTableKTableOuterJoinTest { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } - processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); + processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3")); // push two items with null to the other stream as deletes. this should produce two item. @@ -144,7 +144,7 @@ public class KTableKTableOuterJoinTest { driver.process(topic2, expectedKeys[i], null); } - processor.checkAndClearResult("0:X0+null", "1:X1+null"); + processor.checkAndClearProcessResult("0:X0+null", "1:X1+null"); checkJoinedValues(getter, kv(0, "X0+null"), kv(1, "X1+null"), kv(2, "X2+YY2"), kv(3, "X3+YY3")); // push all four items to the primary stream. this should produce four items. @@ -153,7 +153,7 @@ public class KTableKTableOuterJoinTest { driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); } - processor.checkAndClearResult("0:XX0+null", "1:XX1+null", "2:XX2+YY2", "3:XX3+YY3"); + processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+YY2", "3:XX3+YY3"); checkJoinedValues(getter, kv(0, "XX0+null"), kv(1, "XX1+null"), kv(2, "XX2+YY2"), kv(3, "XX3+YY3")); // push middle two items to the primary stream with null. this should produce two items. @@ -162,7 +162,7 @@ public class KTableKTableOuterJoinTest { driver.process(topic1, expectedKeys[i], null); } - processor.checkAndClearResult("1:null", "2:null+YY2"); + processor.checkAndClearProcessResult("1:null", "2:null+YY2"); checkJoinedValues(getter, kv(0, "XX0+null"), kv(1, null), kv(2, "null+YY2"), kv(3, "XX3+YY3")); } finally { @@ -204,7 +204,7 @@ public class KTableKTableOuterJoinTest { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } - proc.checkAndClearResult("0:(X0+null<-null)", "1:(X1+null<-null)"); + proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)"); // push two items to the other stream. this should produce two items. @@ -212,7 +212,7 @@ public class KTableKTableOuterJoinTest { driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); } - proc.checkAndClearResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)"); + proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)"); // push all four items to the primary stream. this should produce four items. @@ -220,14 +220,14 @@ public class KTableKTableOuterJoinTest { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } - proc.checkAndClearResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)", "2:(X2+null<-null)", "3:(X3+null<-null)"); + proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)", "2:(X2+null<-null)", "3:(X3+null<-null)"); // push all items to the other stream. this should produce four items. for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); } - proc.checkAndClearResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)"); + proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)"); // push all four items to the primary stream. this should produce four items. @@ -235,7 +235,7 @@ public class KTableKTableOuterJoinTest { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } - proc.checkAndClearResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)"); + proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)"); // push two items with null to the other stream as deletes. this should produce two item. @@ -243,7 +243,7 @@ public class KTableKTableOuterJoinTest { driver.process(topic2, expectedKeys[i], null); } - proc.checkAndClearResult("0:(X0+null<-null)", "1:(X1+null<-null)"); + proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)"); // push all four items to the primary stream. this should produce four items. @@ -251,7 +251,7 @@ public class KTableKTableOuterJoinTest { driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); } - proc.checkAndClearResult("0:(XX0+null<-null)", "1:(XX1+null<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)"); + proc.checkAndClearProcessResult("0:(XX0+null<-null)", "1:(XX1+null<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)"); // push middle two items to the primary stream with null. this should produce two items. @@ -259,7 +259,7 @@ public class KTableKTableOuterJoinTest { driver.process(topic1, expectedKeys[i], null); } - proc.checkAndClearResult("1:(null<-null)", "2:(null+YY2<-null)"); + proc.checkAndClearProcessResult("1:(null<-null)", "2:(null+YY2<-null)"); } finally { Utils.delete(baseDir); @@ -302,7 +302,7 @@ public class KTableKTableOuterJoinTest { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } - proc.checkAndClearResult("0:(X0+null<-null)", "1:(X1+null<-null)"); + proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)"); // push two items to the other stream. this should produce two items. @@ -310,7 +310,7 @@ public class KTableKTableOuterJoinTest { driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); } - proc.checkAndClearResult("0:(X0+Y0<-X0+null)", "1:(X1+Y1<-X1+null)"); + proc.checkAndClearProcessResult("0:(X0+Y0<-X0+null)", "1:(X1+Y1<-X1+null)"); // push all four items to the primary stream. this should produce four items. @@ -318,14 +318,14 @@ public class KTableKTableOuterJoinTest { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } - proc.checkAndClearResult("0:(X0+Y0<-X0+Y0)", "1:(X1+Y1<-X1+Y1)", "2:(X2+null<-null)", "3:(X3+null<-null)"); + proc.checkAndClearProcessResult("0:(X0+Y0<-X0+Y0)", "1:(X1+Y1<-X1+Y1)", "2:(X2+null<-null)", "3:(X3+null<-null)"); // push all items to the other stream. this should produce four items. for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); } - proc.checkAndClearResult("0:(X0+YY0<-X0+Y0)", "1:(X1+YY1<-X1+Y1)", "2:(X2+YY2<-X2+null)", "3:(X3+YY3<-X3+null)"); + proc.checkAndClearProcessResult("0:(X0+YY0<-X0+Y0)", "1:(X1+YY1<-X1+Y1)", "2:(X2+YY2<-X2+null)", "3:(X3+YY3<-X3+null)"); // push all four items to the primary stream. this should produce four items. @@ -333,7 +333,7 @@ public class KTableKTableOuterJoinTest { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } - proc.checkAndClearResult("0:(X0+YY0<-X0+YY0)", "1:(X1+YY1<-X1+YY1)", "2:(X2+YY2<-X2+YY2)", "3:(X3+YY3<-X3+YY3)"); + proc.checkAndClearProcessResult("0:(X0+YY0<-X0+YY0)", "1:(X1+YY1<-X1+YY1)", "2:(X2+YY2<-X2+YY2)", "3:(X3+YY3<-X3+YY3)"); // push two items with null to the other stream as deletes. this should produce two item. @@ -341,7 +341,7 @@ public class KTableKTableOuterJoinTest { driver.process(topic2, expectedKeys[i], null); } - proc.checkAndClearResult("0:(X0+null<-X0+YY0)", "1:(X1+null<-X1+YY1)"); + proc.checkAndClearProcessResult("0:(X0+null<-X0+YY0)", "1:(X1+null<-X1+YY1)"); // push all four items to the primary stream. this should produce four items. @@ -349,7 +349,7 @@ public class KTableKTableOuterJoinTest { driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); } - proc.checkAndClearResult("0:(XX0+null<-X0+null)", "1:(XX1+null<-X1+null)", "2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)"); + proc.checkAndClearProcessResult("0:(XX0+null<-X0+null)", "1:(XX1+null<-X1+null)", "2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)"); // push middle two items to the primary stream with null. this should produce two items. @@ -357,7 +357,7 @@ public class KTableKTableOuterJoinTest { driver.process(topic1, expectedKeys[i], null); } - proc.checkAndClearResult("1:(null<-XX1+null)", "2:(null+YY2<-XX2+YY2)"); + proc.checkAndClearProcessResult("1:(null<-XX1+null)", "2:(null+YY2<-XX2+YY2)"); } finally { Utils.delete(baseDir); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java index 9ec12583b5b..9cafe8b3bb0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java @@ -223,20 +223,20 @@ public class KTableMapValuesTest { driver.process(topic1, "B", "01"); driver.process(topic1, "C", "01"); - proc.checkAndClearResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)"); + proc.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)"); driver.process(topic1, "A", "02"); driver.process(topic1, "B", "02"); - proc.checkAndClearResult("A:(2<-null)", "B:(2<-null)"); + proc.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)"); driver.process(topic1, "A", "03"); - proc.checkAndClearResult("A:(3<-null)"); + proc.checkAndClearProcessResult("A:(3<-null)"); driver.process(topic1, "A", null); - proc.checkAndClearResult("A:(null<-null)"); + proc.checkAndClearProcessResult("A:(null<-null)"); } finally { Utils.delete(stateDir); @@ -276,20 +276,20 @@ public class KTableMapValuesTest { driver.process(topic1, "B", "01"); driver.process(topic1, "C", "01"); - proc.checkAndClearResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)"); + proc.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)"); driver.process(topic1, "A", "02"); driver.process(topic1, "B", "02"); - proc.checkAndClearResult("A:(2<-1)", "B:(2<-1)"); + proc.checkAndClearProcessResult("A:(2<-1)", "B:(2<-1)"); driver.process(topic1, "A", "03"); - proc.checkAndClearResult("A:(3<-2)"); + proc.checkAndClearProcessResult("A:(3<-2)"); driver.process(topic1, "A", null); - proc.checkAndClearResult("A:(null<-3)"); + proc.checkAndClearProcessResult("A:(null<-3)"); } finally { Utils.delete(stateDir); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java index 51276f32282..7c158e2bb6b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java @@ -131,21 +131,21 @@ public class KTableSourceTest { driver.process(topic1, "B", "01"); driver.process(topic1, "C", "01"); - proc1.checkAndClearResult("A:(01<-null)", "B:(01<-null)", "C:(01<-null)"); + proc1.checkAndClearProcessResult("A:(01<-null)", "B:(01<-null)", "C:(01<-null)"); driver.process(topic1, "A", "02"); driver.process(topic1, "B", "02"); - proc1.checkAndClearResult("A:(02<-null)", "B:(02<-null)"); + proc1.checkAndClearProcessResult("A:(02<-null)", "B:(02<-null)"); driver.process(topic1, "A", "03"); - proc1.checkAndClearResult("A:(03<-null)"); + proc1.checkAndClearProcessResult("A:(03<-null)"); driver.process(topic1, "A", null); driver.process(topic1, "B", null); - proc1.checkAndClearResult("A:(null<-null)", "B:(null<-null)"); + proc1.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)"); } finally { Utils.delete(stateDir); @@ -176,21 +176,21 @@ public class KTableSourceTest { driver.process(topic1, "B", "01"); driver.process(topic1, "C", "01"); - proc1.checkAndClearResult("A:(01<-null)", "B:(01<-null)", "C:(01<-null)"); + proc1.checkAndClearProcessResult("A:(01<-null)", "B:(01<-null)", "C:(01<-null)"); driver.process(topic1, "A", "02"); driver.process(topic1, "B", "02"); - proc1.checkAndClearResult("A:(02<-01)", "B:(02<-01)"); + proc1.checkAndClearProcessResult("A:(02<-01)", "B:(02<-01)"); driver.process(topic1, "A", "03"); - proc1.checkAndClearResult("A:(03<-02)"); + proc1.checkAndClearProcessResult("A:(03<-02)"); driver.process(topic1, "A", null); driver.process(topic1, "B", null); - proc1.checkAndClearResult("A:(null<-03)", "B:(null<-02)"); + proc1.checkAndClearProcessResult("A:(null<-03)", "B:(null<-02)"); } finally { Utils.delete(stateDir); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java index 5bf1b5e0419..a1c07af7d3d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java @@ -21,7 +21,6 @@ import static org.junit.Assert.assertEquals; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; @@ -60,17 +59,17 @@ public class PartitionGroupTest { // add three 3 records with timestamp 1, 3, 5 to partition-1 List> list1 = Arrays.asList( - new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), - new ConsumerRecord<>("topic", 1, 3, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), - new ConsumerRecord<>("topic", 1, 5, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)); + new ConsumerRecord<>("topic", 1, 1L, recordKey, recordValue), + new ConsumerRecord<>("topic", 1, 3L, recordKey, recordValue), + new ConsumerRecord<>("topic", 1, 5L, recordKey, recordValue)); group.addRawRecords(partition1, list1); // add three 3 records with timestamp 2, 4, 6 to partition-2 List> list2 = Arrays.asList( - new ConsumerRecord<>("topic", 1, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), - new ConsumerRecord<>("topic", 1, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), - new ConsumerRecord<>("topic", 1, 6, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)); + new ConsumerRecord<>("topic", 2, 2L, recordKey, recordValue), + new ConsumerRecord<>("topic", 2, 4L, recordKey, recordValue), + new ConsumerRecord<>("topic", 2, 6L, recordKey, recordValue)); group.addRawRecords(partition2, list2); @@ -82,7 +81,7 @@ public class PartitionGroupTest { StampedRecord record; PartitionGroup.RecordInfo info = new PartitionGroup.RecordInfo(); - // get one record + // get one record, now the time should be advanced record = group.nextRecord(info); assertEquals(partition1, info.partition()); assertEquals(1L, record.timestamp); @@ -99,5 +98,72 @@ public class PartitionGroupTest { assertEquals(2, group.numBuffered(partition1)); assertEquals(2, group.numBuffered(partition2)); assertEquals(3L, group.timestamp()); + + // add three 3 records with timestamp 2, 4, 6 to partition-1 again + List> list3 = Arrays.asList( + new ConsumerRecord<>("topic", 1, 2L, recordKey, recordValue), + new ConsumerRecord<>("topic", 1, 4L, recordKey, recordValue)); + + group.addRawRecords(partition1, list3); + + assertEquals(6, group.numBuffered()); + assertEquals(4, group.numBuffered(partition1)); + assertEquals(2, group.numBuffered(partition2)); + assertEquals(3L, group.timestamp()); + + // get one record, time should not be advanced + record = group.nextRecord(info); + assertEquals(partition1, info.partition()); + assertEquals(3L, record.timestamp); + assertEquals(5, group.numBuffered()); + assertEquals(3, group.numBuffered(partition1)); + assertEquals(2, group.numBuffered(partition2)); + assertEquals(3L, group.timestamp()); + + // get one more record, now time should be advanced + record = group.nextRecord(info); + assertEquals(partition1, info.partition()); + assertEquals(5L, record.timestamp); + assertEquals(4, group.numBuffered()); + assertEquals(2, group.numBuffered(partition1)); + assertEquals(2, group.numBuffered(partition2)); + assertEquals(3L, group.timestamp()); + + // get one more record, time should not be advanced + record = group.nextRecord(info); + assertEquals(partition1, info.partition()); + assertEquals(2L, record.timestamp); + assertEquals(3, group.numBuffered()); + assertEquals(1, group.numBuffered(partition1)); + assertEquals(2, group.numBuffered(partition2)); + assertEquals(4L, group.timestamp()); + + // get one more record, now time should be advanced + record = group.nextRecord(info); + assertEquals(partition2, info.partition()); + assertEquals(4L, record.timestamp); + assertEquals(2, group.numBuffered()); + assertEquals(1, group.numBuffered(partition1)); + assertEquals(1, group.numBuffered(partition2)); + assertEquals(4L, group.timestamp()); + + // get one more record, time should not be advanced + record = group.nextRecord(info); + assertEquals(partition1, info.partition()); + assertEquals(4L, record.timestamp); + assertEquals(1, group.numBuffered()); + assertEquals(0, group.numBuffered(partition1)); + assertEquals(1, group.numBuffered(partition2)); + assertEquals(4L, group.timestamp()); + + // get one more record, time should not be advanced + record = group.nextRecord(info); + assertEquals(partition2, info.partition()); + assertEquals(6L, record.timestamp); + assertEquals(0, group.numBuffered()); + assertEquals(0, group.numBuffered(partition1)); + assertEquals(0, group.numBuffered(partition2)); + assertEquals(4L, group.timestamp()); + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 33fa5c4a091..dd489474401 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -32,6 +32,7 @@ import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.test.MockProcessorNode; import org.apache.kafka.test.MockSourceNode; import org.apache.kafka.test.MockTimestampExtractor; import org.junit.Test; @@ -46,6 +47,7 @@ import java.util.Properties; import java.util.Set; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; public class StreamTaskTest { @@ -58,10 +60,12 @@ public class StreamTaskTest { private final TopicPartition partition2 = new TopicPartition("topic2", 1); private final Set partitions = Utils.mkSet(partition1, partition2); - private final MockSourceNode source1 = new MockSourceNode<>(intDeserializer, intDeserializer); - private final MockSourceNode source2 = new MockSourceNode<>(intDeserializer, intDeserializer); + private final MockSourceNode source1 = new MockSourceNode<>(intDeserializer, intDeserializer); + private final MockSourceNode source2 = new MockSourceNode<>(intDeserializer, intDeserializer); + private final MockProcessorNode processor = new MockProcessorNode<>(10L); + private final ProcessorTopology topology = new ProcessorTopology( - Arrays.asList((ProcessorNode) source1, (ProcessorNode) source2), + Arrays.asList((ProcessorNode) source1, (ProcessorNode) source2, (ProcessorNode) processor), new HashMap() { { put("topic1", source1); @@ -94,6 +98,8 @@ public class StreamTaskTest { @Before public void setup() { consumer.assign(Arrays.asList(partition1, partition2)); + source1.addChild(processor); + source2.addChild(processor); } @SuppressWarnings("unchecked") @@ -211,6 +217,73 @@ public class StreamTaskTest { } } + @SuppressWarnings("unchecked") + @Test + public void testMaybePunctuate() throws Exception { + File baseDir = Files.createTempDirectory("test").toFile(); + try { + StreamsConfig config = createConfig(baseDir); + StreamTask task = new StreamTask(new TaskId(0, 0), "applicationId", partitions, topology, consumer, producer, restoreStateConsumer, config, null); + + task.addRecords(partition1, records( + new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), + new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), + new ConsumerRecord<>(partition1.topic(), partition1.partition(), 30, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue) + )); + + task.addRecords(partition2, records( + new ConsumerRecord<>(partition2.topic(), partition2.partition(), 15, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), + new ConsumerRecord<>(partition2.topic(), partition2.partition(), 25, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), + new ConsumerRecord<>(partition2.topic(), partition2.partition(), 35, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue) + )); + + assertTrue(task.maybePunctuate()); + + assertEquals(5, task.process()); + assertEquals(1, source1.numReceived); + assertEquals(0, source2.numReceived); + + assertFalse(task.maybePunctuate()); + + assertEquals(4, task.process()); + assertEquals(1, source1.numReceived); + assertEquals(1, source2.numReceived); + + assertTrue(task.maybePunctuate()); + + assertEquals(3, task.process()); + assertEquals(2, source1.numReceived); + assertEquals(1, source2.numReceived); + + assertFalse(task.maybePunctuate()); + + assertEquals(2, task.process()); + assertEquals(2, source1.numReceived); + assertEquals(2, source2.numReceived); + + assertTrue(task.maybePunctuate()); + + assertEquals(1, task.process()); + assertEquals(3, source1.numReceived); + assertEquals(2, source2.numReceived); + + assertFalse(task.maybePunctuate()); + + assertEquals(0, task.process()); + assertEquals(3, source1.numReceived); + assertEquals(3, source2.numReceived); + + assertFalse(task.maybePunctuate()); + + processor.supplier.checkAndClearPunctuateResult(10L, 20L, 30L); + + task.close(); + + } finally { + Utils.delete(baseDir); + } + } + private Iterable> records(ConsumerRecord... recs) { return Arrays.asList(recs); } diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java index d3b808177a7..287af5a61b1 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java +++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java @@ -143,7 +143,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S @Override public void schedule(long interval) { - throw new UnsupportedOperationException("schedule() not supported"); + throw new UnsupportedOperationException("schedule() not supported."); } @Override diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java new file mode 100644 index 00000000000..cf8a5260070 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.test; + +import org.apache.kafka.streams.processor.internals.ProcessorNode; + +import java.util.Collections; +import java.util.concurrent.atomic.AtomicInteger; + +public class MockProcessorNode extends ProcessorNode { + + public static final String NAME = "MOCK-PROCESS-"; + public static final AtomicInteger INDEX = new AtomicInteger(1); + + public int numReceived = 0; + + public final MockProcessorSupplier supplier; + + public MockProcessorNode(long scheduleInterval) { + this(new MockProcessorSupplier(scheduleInterval)); + } + + private MockProcessorNode(MockProcessorSupplier supplier) { + super(NAME + INDEX.getAndIncrement(), supplier.get(), Collections.emptySet()); + + this.supplier = supplier; + } + + @Override + public void process(K key, V value) { + this.numReceived++; + processor().process(key, value); + } +} \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java index b402525beaa..921c365cad0 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java +++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java @@ -17,6 +17,7 @@ package org.apache.kafka.test; +import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.ProcessorSupplier; @@ -30,16 +31,28 @@ public class MockProcessorSupplier implements ProcessorSupplier { public final ArrayList processed = new ArrayList<>(); public final ArrayList punctuated = new ArrayList<>(); + private final long scheduleInterval; + + public MockProcessorSupplier() { + this(-1L); + } + + public MockProcessorSupplier(long scheduleInterval) { + this.scheduleInterval = scheduleInterval; + } + @Override public Processor get() { return new MockProcessor(); } - public class MockProcessor implements Processor { + public class MockProcessor extends AbstractProcessor { @Override public void init(ProcessorContext context) { - // do nothing + super.init(context); + if (scheduleInterval > 0L) + context.schedule(scheduleInterval); } @Override @@ -49,17 +62,16 @@ public class MockProcessorSupplier implements ProcessorSupplier { @Override public void punctuate(long streamTime) { + assertEquals(streamTime, context().timestamp()); + assertEquals(null, context().topic()); + assertEquals(-1, context().partition()); + assertEquals(-1L, context().offset()); + punctuated.add(streamTime); } - - @Override - public void close() { - // do nothing - } - } - public void checkAndClearResult(String... expected) { + public void checkAndClearProcessResult(String... expected) { assertEquals("the number of outputs:", expected.length, processed.size()); for (int i = 0; i < expected.length; i++) { @@ -69,4 +81,14 @@ public class MockProcessorSupplier implements ProcessorSupplier { processed.clear(); } + public void checkAndClearPunctuateResult(long... expected) { + assertEquals("the number of outputs:", expected.length, punctuated.size()); + + for (int i = 0; i < expected.length; i++) { + assertEquals("output[" + i + "]:", expected[i], (long) punctuated.get(i)); + } + + processed.clear(); + } + }