KAFKA-3505: Fix punctuate generated record metadata

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Anna Povzner <anna@confluent.io>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes #1190 from guozhangwang/K3505
This commit is contained in:
Guozhang Wang 2016-04-08 08:59:50 -07:00 committed by Ewen Cheslack-Postava
parent 8b9b07e5d6
commit 3a58407e2e
24 changed files with 459 additions and 228 deletions

View File

@ -31,7 +31,7 @@ public class ConsumerRecordTest {
String key = "key"; String key = "key";
String value = "value"; String value = "value";
ConsumerRecord record = new ConsumerRecord(topic, partition, offset, key, value); ConsumerRecord<String, String> record = new ConsumerRecord<>(topic, partition, offset, key, value);
assertEquals(topic, record.topic()); assertEquals(topic, record.topic());
assertEquals(partition, record.partition()); assertEquals(partition, record.partition());
assertEquals(offset, record.offset()); assertEquals(offset, record.offset());

View File

@ -124,30 +124,39 @@ public interface ProcessorContext {
void commit(); void commit();
/** /**
* Returns the topic name of the current input record * Returns the topic name of the current input record; could be null if it is not
* available (for example, if this method is invoked from the punctuate call)
* *
* @return the topic name * @return the topic name
*/ */
String topic(); String topic();
/** /**
* Returns the partition id of the current input record * Returns the partition id of the current input record; could be -1 if it is not
* available (for example, if this method is invoked from the punctuate call)
* *
* @return the partition id * @return the partition id
*/ */
int partition(); int partition();
/** /**
* Returns the offset of the current input record * Returns the offset of the current input record; could be -1 if it is not
* available (for example, if this method is invoked from the punctuate call)
* *
* @return the offset * @return the offset
*/ */
long offset(); long offset();
/** /**
* Returns the timestamp of the current input record. The timestamp is extracted from * Returns the current timestamp.
*
* If it is triggered while processing a record streamed from the source processor, timestamp is defined as the timestamp of the current input record; the timestamp is extracted from
* {@link org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord} by {@link TimestampExtractor}. * {@link org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord} by {@link TimestampExtractor}.
* *
* If it is triggered while processing a record generated not from the source processor (for example,
* if this method is invoked from the punctuate call), timestamp is defined as the current
* task's stream time, which is defined as the smallest among all its input stream partition timestamps.
*
* @return the timestamp * @return the timestamp
*/ */
long timestamp(); long timestamp();

View File

@ -135,17 +135,14 @@ public class PartitionGroup {
* partition timestamp among all its partitions * partition timestamp among all its partitions
*/ */
public long timestamp() { public long timestamp() {
if (queuesByTime.isEmpty()) { // we should always return the smallest timestamp of all partitions
// if there is no data in all partitions, return the smallest of their last known times // to avoid group partition time goes backward
long timestamp = Long.MAX_VALUE; long timestamp = Long.MAX_VALUE;
for (RecordQueue queue : partitionQueues.values()) { for (RecordQueue queue : partitionQueues.values()) {
if (timestamp > queue.timestamp()) if (timestamp > queue.timestamp())
timestamp = queue.timestamp(); timestamp = queue.timestamp();
} }
return timestamp; return timestamp;
} else {
return queuesByTime.peek().timestamp();
}
} }
public int numBuffered(TopicPartition partition) { public int numBuffered(TopicPartition partition) {

View File

@ -30,6 +30,8 @@ import java.io.File;
public class ProcessorContextImpl implements ProcessorContext, RecordCollector.Supplier { public class ProcessorContextImpl implements ProcessorContext, RecordCollector.Supplier {
public static final String NONEXIST_TOPIC = "__null_topic__";
private final TaskId id; private final TaskId id;
private final StreamTask task; private final StreamTask task;
private final StreamsMetrics metrics; private final StreamsMetrics metrics;
@ -118,7 +120,7 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S
if (node == null) if (node == null)
throw new TopologyBuilderException("Accessing from an unknown node"); throw new TopologyBuilderException("Accessing from an unknown node");
// TODO: restore this once we fix the ValueGetter initialiation issue // TODO: restore this once we fix the ValueGetter initialization issue
//if (!node.stateStores.contains(name)) //if (!node.stateStores.contains(name))
// throw new TopologyBuilderException("Processor " + node.name() + " has no access to StateStore " + name); // throw new TopologyBuilderException("Processor " + node.name() + " has no access to StateStore " + name);
@ -130,7 +132,12 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S
if (task.record() == null) if (task.record() == null)
throw new IllegalStateException("This should not happen as topic() should only be called while a record is processed"); throw new IllegalStateException("This should not happen as topic() should only be called while a record is processed");
return task.record().topic(); String topic = task.record().topic();
if (topic.equals(NONEXIST_TOPIC))
return null;
else
return topic;
} }
@Override @Override

View File

@ -48,7 +48,7 @@ public class ProcessorNode<K, V> {
return name; return name;
} }
public final Processor processor() { public final Processor<K, V> processor() {
return processor; return processor;
} }

View File

@ -22,7 +22,7 @@ public class PunctuationSchedule extends Stamped<ProcessorNode> {
final long interval; final long interval;
public PunctuationSchedule(ProcessorNode node, long interval) { public PunctuationSchedule(ProcessorNode node, long interval) {
this(node, System.currentTimeMillis(), interval); this(node, 0, interval);
} }
public PunctuationSchedule(ProcessorNode node, long time, long interval) { public PunctuationSchedule(ProcessorNode node, long time, long interval) {

View File

@ -108,37 +108,37 @@ public class StandbyContextImpl implements ProcessorContext, RecordCollector.Sup
@Override @Override
public StateStore getStateStore(String name) { public StateStore getStateStore(String name) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException("getStateStore() not supported.");
} }
@Override @Override
public String topic() { public String topic() {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException("topic() not supported.");
} }
@Override @Override
public int partition() { public int partition() {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException("partition() not supported.");
} }
@Override @Override
public long offset() { public long offset() {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException("offset() not supported.");
} }
@Override @Override
public long timestamp() { public long timestamp() {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException("timestamp() not supported.");
} }
@Override @Override
public <K, V> void forward(K key, V value) { public <K, V> void forward(K key, V value) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException("forward() not supported.");
} }
@Override @Override
public <K, V> void forward(K key, V value, int childIndex) { public <K, V> void forward(K key, V value, int childIndex) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException("forward() not supported.");
} }
@Override @Override
@ -148,11 +148,11 @@ public class StandbyContextImpl implements ProcessorContext, RecordCollector.Sup
@Override @Override
public void commit() { public void commit() {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException("commit() not supported.");
} }
@Override @Override
public void schedule(long interval) { public void schedule(long interval) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException("schedule() not supported.");
} }
} }

View File

@ -43,6 +43,8 @@ public class StreamTask extends AbstractTask implements Punctuator {
private static final Logger log = LoggerFactory.getLogger(StreamTask.class); private static final Logger log = LoggerFactory.getLogger(StreamTask.class);
private static final ConsumerRecord<Object, Object> DUMMY_RECORD = new ConsumerRecord<>(ProcessorContextImpl.NONEXIST_TOPIC, -1, -1L, null, null);
private final int maxBufferedSize; private final int maxBufferedSize;
private final PartitionGroup partitionGroup; private final PartitionGroup partitionGroup;
@ -202,11 +204,11 @@ public class StreamTask extends AbstractTask implements Punctuator {
/** /**
* Possibly trigger registered punctuation functions if * Possibly trigger registered punctuation functions if
* current time has reached the defined stamp * current partition group timestamp has reached the defined stamp
*
* @param timestamp
*/ */
public boolean maybePunctuate(long timestamp) { public boolean maybePunctuate() {
long timestamp = partitionGroup.timestamp();
return punctuationQueue.mayPunctuate(timestamp, this); return punctuationQueue.mayPunctuate(timestamp, this);
} }
@ -216,10 +218,13 @@ public class StreamTask extends AbstractTask implements Punctuator {
throw new IllegalStateException("Current node is not null"); throw new IllegalStateException("Current node is not null");
currNode = node; currNode = node;
currRecord = new StampedRecord(DUMMY_RECORD, timestamp);
try { try {
node.processor().punctuate(timestamp); node.processor().punctuate(timestamp);
} finally { } finally {
currNode = null; currNode = null;
currRecord = null;
} }
} }

View File

@ -341,8 +341,9 @@ public class StreamThread extends Thread {
totalNumBuffered = 0; totalNumBuffered = 0;
// try to process one fetch record from each task via the topology, and also trigger punctuate
// functions if necessary, which may result in more records going through the topology in this loop
if (!activeTasks.isEmpty()) { if (!activeTasks.isEmpty()) {
// try to process one record from each task
for (StreamTask task : activeTasks.values()) { for (StreamTask task : activeTasks.values()) {
long startProcess = time.milliseconds(); long startProcess = time.milliseconds();
@ -431,7 +432,9 @@ public class StreamThread extends Thread {
try { try {
long now = time.milliseconds(); long now = time.milliseconds();
if (task.maybePunctuate(now)) // check whether we should punctuate based on the task's partition group timestamp;
// which are essentially based on record timestamp.
if (task.maybePunctuate())
sensors.punctuateTimeSensor.record(time.milliseconds() - now); sensors.punctuateTimeSensor.record(time.milliseconds() - now);
} catch (KafkaException e) { } catch (KafkaException e) {

View File

@ -90,7 +90,7 @@ public class KStreamKStreamJoinTest {
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
} }
processor.checkAndClearResult(); processor.checkAndClearProcessResult();
// push two items to the other stream. this should produce two items. // push two items to the other stream. this should produce two items.
// w1 = { 0:X0, 1:X1 } // w1 = { 0:X0, 1:X1 }
@ -102,7 +102,7 @@ public class KStreamKStreamJoinTest {
driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
} }
processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1"); processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
// push all four items to the primary stream. this should produce two items. // push all four items to the primary stream. this should produce two items.
// w1 = { 0:X0, 1:X1 } // w1 = { 0:X0, 1:X1 }
@ -114,7 +114,7 @@ public class KStreamKStreamJoinTest {
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
} }
processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1"); processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
// push all items to the other stream. this should produce six items. // push all items to the other stream. this should produce six items.
// w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 } // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
@ -126,7 +126,7 @@ public class KStreamKStreamJoinTest {
driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
} }
processor.checkAndClearResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); processor.checkAndClearProcessResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
// push all four items to the primary stream. this should produce six items. // push all four items to the primary stream. this should produce six items.
// w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 } // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
@ -138,7 +138,7 @@ public class KStreamKStreamJoinTest {
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
} }
processor.checkAndClearResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3"); processor.checkAndClearProcessResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
// push two items to the other stream. this should produce six item. // push two items to the other stream. this should produce six item.
// w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 } // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 }
@ -150,7 +150,7 @@ public class KStreamKStreamJoinTest {
driver.process(topic2, expectedKeys[i], "YYY" + expectedKeys[i]); driver.process(topic2, expectedKeys[i], "YYY" + expectedKeys[i]);
} }
processor.checkAndClearResult("0:X0+YYY0", "0:X0+YYY0", "0:XX0+YYY0", "1:X1+YYY1", "1:X1+YYY1", "1:XX1+YYY1"); processor.checkAndClearProcessResult("0:X0+YYY0", "0:X0+YYY0", "0:XX0+YYY0", "1:X1+YYY1", "1:X1+YYY1", "1:XX1+YYY1");
} finally { } finally {
Utils.delete(baseDir); Utils.delete(baseDir);
@ -195,7 +195,7 @@ public class KStreamKStreamJoinTest {
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
} }
processor.checkAndClearResult("0:X0+null", "1:X1+null"); processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
// push two items to the other stream. this should produce two items. // push two items to the other stream. this should produce two items.
// w1 = { 0:X0, 1:X1 } // w1 = { 0:X0, 1:X1 }
@ -207,7 +207,7 @@ public class KStreamKStreamJoinTest {
driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
} }
processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1"); processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
// push all four items to the primary stream. this should produce four items. // push all four items to the primary stream. this should produce four items.
// w1 = { 0:X0, 1:X1 } // w1 = { 0:X0, 1:X1 }
@ -219,7 +219,7 @@ public class KStreamKStreamJoinTest {
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
} }
processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null"); processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
// push all items to the other stream. this should produce six items. // push all items to the other stream. this should produce six items.
// w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 } // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
@ -231,7 +231,7 @@ public class KStreamKStreamJoinTest {
driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
} }
processor.checkAndClearResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); processor.checkAndClearProcessResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
// push all four items to the primary stream. this should produce six items. // push all four items to the primary stream. this should produce six items.
// w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 } // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
@ -243,7 +243,7 @@ public class KStreamKStreamJoinTest {
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
} }
processor.checkAndClearResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3"); processor.checkAndClearProcessResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
// push two items to the other stream. this should produce six item. // push two items to the other stream. this should produce six item.
// w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 } // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 }
@ -255,7 +255,7 @@ public class KStreamKStreamJoinTest {
driver.process(topic2, expectedKeys[i], "YYY" + expectedKeys[i]); driver.process(topic2, expectedKeys[i], "YYY" + expectedKeys[i]);
} }
processor.checkAndClearResult("0:X0+YYY0", "0:X0+YYY0", "0:XX0+YYY0", "1:X1+YYY1", "1:X1+YYY1", "1:XX1+YYY1"); processor.checkAndClearProcessResult("0:X0+YYY0", "0:X0+YYY0", "0:XX0+YYY0", "1:X1+YYY1", "1:X1+YYY1", "1:XX1+YYY1");
} finally { } finally {
Utils.delete(baseDir); Utils.delete(baseDir);
@ -302,7 +302,7 @@ public class KStreamKStreamJoinTest {
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
} }
processor.checkAndClearResult(); processor.checkAndClearProcessResult();
// push two items to the other stream. this should produce two items. // push two items to the other stream. this should produce two items.
// w1 = { 0:X0, 1:X1 } // w1 = { 0:X0, 1:X1 }
@ -314,7 +314,7 @@ public class KStreamKStreamJoinTest {
driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
} }
processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1"); processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
// clear logically // clear logically
time = 1000L; time = 1000L;
@ -323,7 +323,7 @@ public class KStreamKStreamJoinTest {
driver.setTime(time + i); driver.setTime(time + i);
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
} }
processor.checkAndClearResult(); processor.checkAndClearProcessResult();
// gradually expires items in w1 // gradually expires items in w1
// w1 = { 0:X0, 1:X1, 2:X2, 3:X3 } // w1 = { 0:X0, 1:X1, 2:X2, 3:X3 }
@ -335,35 +335,35 @@ public class KStreamKStreamJoinTest {
driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
} }
processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
driver.setTime(++time); driver.setTime(++time);
for (int i = 0; i < expectedKeys.length; i++) { for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
} }
processor.checkAndClearResult("1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); processor.checkAndClearProcessResult("1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
driver.setTime(++time); driver.setTime(++time);
for (int i = 0; i < expectedKeys.length; i++) { for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
} }
processor.checkAndClearResult("2:X2+YY2", "3:X3+YY3"); processor.checkAndClearProcessResult("2:X2+YY2", "3:X3+YY3");
driver.setTime(++time); driver.setTime(++time);
for (int i = 0; i < expectedKeys.length; i++) { for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
} }
processor.checkAndClearResult("3:X3+YY3"); processor.checkAndClearProcessResult("3:X3+YY3");
driver.setTime(++time); driver.setTime(++time);
for (int i = 0; i < expectedKeys.length; i++) { for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
} }
processor.checkAndClearResult(); processor.checkAndClearProcessResult();
// go back to the time before expiration // go back to the time before expiration
@ -373,35 +373,35 @@ public class KStreamKStreamJoinTest {
driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
} }
processor.checkAndClearResult(); processor.checkAndClearProcessResult();
driver.setTime(++time); driver.setTime(++time);
for (int i = 0; i < expectedKeys.length; i++) { for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
} }
processor.checkAndClearResult("0:X0+YY0"); processor.checkAndClearProcessResult("0:X0+YY0");
driver.setTime(++time); driver.setTime(++time);
for (int i = 0; i < expectedKeys.length; i++) { for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
} }
processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1"); processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1");
driver.setTime(++time); driver.setTime(++time);
for (int i = 0; i < expectedKeys.length; i++) { for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
} }
processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2"); processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2");
driver.setTime(++time); driver.setTime(++time);
for (int i = 0; i < expectedKeys.length; i++) { for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
} }
processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
// clear (logically) // clear (logically)
time = 2000L; time = 2000L;
@ -411,7 +411,7 @@ public class KStreamKStreamJoinTest {
driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
} }
processor.checkAndClearResult(); processor.checkAndClearProcessResult();
// gradually expires items in w2 // gradually expires items in w2
// w2 = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 } // w2 = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 }
@ -422,35 +422,35 @@ public class KStreamKStreamJoinTest {
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
} }
processor.checkAndClearResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3"); processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
driver.setTime(++time); driver.setTime(++time);
for (int i = 0; i < expectedKeys.length; i++) { for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
} }
processor.checkAndClearResult("1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3"); processor.checkAndClearProcessResult("1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
driver.setTime(++time); driver.setTime(++time);
for (int i = 0; i < expectedKeys.length; i++) { for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
} }
processor.checkAndClearResult("2:XX2+Y2", "3:XX3+Y3"); processor.checkAndClearProcessResult("2:XX2+Y2", "3:XX3+Y3");
driver.setTime(++time); driver.setTime(++time);
for (int i = 0; i < expectedKeys.length; i++) { for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
} }
processor.checkAndClearResult("3:XX3+Y3"); processor.checkAndClearProcessResult("3:XX3+Y3");
driver.setTime(++time); driver.setTime(++time);
for (int i = 0; i < expectedKeys.length; i++) { for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
} }
processor.checkAndClearResult(); processor.checkAndClearProcessResult();
// go back to the time before expiration // go back to the time before expiration
@ -460,35 +460,35 @@ public class KStreamKStreamJoinTest {
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
} }
processor.checkAndClearResult(); processor.checkAndClearProcessResult();
driver.setTime(++time); driver.setTime(++time);
for (int i = 0; i < expectedKeys.length; i++) { for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
} }
processor.checkAndClearResult("0:XX0+Y0"); processor.checkAndClearProcessResult("0:XX0+Y0");
driver.setTime(++time); driver.setTime(++time);
for (int i = 0; i < expectedKeys.length; i++) { for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
} }
processor.checkAndClearResult("0:XX0+Y0", "1:XX1+Y1"); processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1");
driver.setTime(++time); driver.setTime(++time);
for (int i = 0; i < expectedKeys.length; i++) { for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
} }
processor.checkAndClearResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2"); processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2");
driver.setTime(++time); driver.setTime(++time);
for (int i = 0; i < expectedKeys.length; i++) { for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
} }
processor.checkAndClearResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3"); processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
} finally { } finally {
Utils.delete(baseDir); Utils.delete(baseDir);

View File

@ -88,7 +88,7 @@ public class KStreamKStreamLeftJoinTest {
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
} }
processor.checkAndClearResult("0:X0+null", "1:X1+null"); processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
// push two items to the other stream. this should produce two items. // push two items to the other stream. this should produce two items.
// w {} // w {}
@ -98,7 +98,7 @@ public class KStreamKStreamLeftJoinTest {
driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
} }
processor.checkAndClearResult(); processor.checkAndClearProcessResult();
// push all four items to the primary stream. this should produce four items. // push all four items to the primary stream. this should produce four items.
// w = { 0:Y0, 1:Y1 } // w = { 0:Y0, 1:Y1 }
@ -108,7 +108,7 @@ public class KStreamKStreamLeftJoinTest {
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
} }
processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null"); processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
// push all items to the other stream. this should produce no items. // push all items to the other stream. this should produce no items.
// w = { 0:Y0, 1:Y1 } // w = { 0:Y0, 1:Y1 }
@ -118,7 +118,7 @@ public class KStreamKStreamLeftJoinTest {
driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
} }
processor.checkAndClearResult(); processor.checkAndClearProcessResult();
// push all four items to the primary stream. this should produce four items. // push all four items to the primary stream. this should produce four items.
// w = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 // w = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3
@ -128,7 +128,7 @@ public class KStreamKStreamLeftJoinTest {
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
} }
processor.checkAndClearResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3"); processor.checkAndClearProcessResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
} finally { } finally {
Utils.delete(baseDir); Utils.delete(baseDir);
@ -173,7 +173,7 @@ public class KStreamKStreamLeftJoinTest {
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
} }
processor.checkAndClearResult("0:X0+null", "1:X1+null"); processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
// push two items to the other stream. this should produce no items. // push two items to the other stream. this should produce no items.
// w = {} // w = {}
@ -183,7 +183,7 @@ public class KStreamKStreamLeftJoinTest {
driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
} }
processor.checkAndClearResult(); processor.checkAndClearProcessResult();
// clear logically // clear logically
time = 1000L; time = 1000L;
@ -196,7 +196,7 @@ public class KStreamKStreamLeftJoinTest {
driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
} }
processor.checkAndClearResult(); processor.checkAndClearProcessResult();
// gradually expire items in window. // gradually expire items in window.
// w = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 } // w = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 }
@ -207,35 +207,35 @@ public class KStreamKStreamLeftJoinTest {
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
} }
processor.checkAndClearResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3"); processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
driver.setTime(++time); driver.setTime(++time);
for (int i = 0; i < expectedKeys.length; i++) { for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
} }
processor.checkAndClearResult("0:XX0+null", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3"); processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
driver.setTime(++time); driver.setTime(++time);
for (int i = 0; i < expectedKeys.length; i++) { for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
} }
processor.checkAndClearResult("0:XX0+null", "1:XX1+null", "2:XX2+Y2", "3:XX3+Y3"); processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+Y2", "3:XX3+Y3");
driver.setTime(++time); driver.setTime(++time);
for (int i = 0; i < expectedKeys.length; i++) { for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
} }
processor.checkAndClearResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+Y3"); processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+Y3");
driver.setTime(++time); driver.setTime(++time);
for (int i = 0; i < expectedKeys.length; i++) { for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
} }
processor.checkAndClearResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+null"); processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+null");
// go back to the time before expiration // go back to the time before expiration
@ -245,35 +245,35 @@ public class KStreamKStreamLeftJoinTest {
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
} }
processor.checkAndClearResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+null"); processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+null");
driver.setTime(++time); driver.setTime(++time);
for (int i = 0; i < expectedKeys.length; i++) { for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
} }
processor.checkAndClearResult("0:XX0+Y0", "1:XX1+null", "2:XX2+null", "3:XX3+null"); processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+null", "2:XX2+null", "3:XX3+null");
driver.setTime(++time); driver.setTime(++time);
for (int i = 0; i < expectedKeys.length; i++) { for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
} }
processor.checkAndClearResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+null", "3:XX3+null"); processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+null", "3:XX3+null");
driver.setTime(++time); driver.setTime(++time);
for (int i = 0; i < expectedKeys.length; i++) { for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
} }
processor.checkAndClearResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+null"); processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+null");
driver.setTime(++time); driver.setTime(++time);
for (int i = 0; i < expectedKeys.length; i++) { for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
} }
processor.checkAndClearResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3"); processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
} finally { } finally {
Utils.delete(baseDir); Utils.delete(baseDir);

View File

@ -95,7 +95,7 @@ public class KStreamKTableLeftJoinTest {
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
} }
processor.checkAndClearResult("0:X0+null", "1:X1+null"); processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
// push two items to the other stream. this should not produce any item. // push two items to the other stream. this should not produce any item.
@ -103,7 +103,7 @@ public class KStreamKTableLeftJoinTest {
driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
} }
processor.checkAndClearResult(); processor.checkAndClearProcessResult();
// push all four items to the primary stream. this should produce four items. // push all four items to the primary stream. this should produce four items.
@ -111,14 +111,14 @@ public class KStreamKTableLeftJoinTest {
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
} }
processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null"); processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
// push all items to the other stream. this should not produce any item // push all items to the other stream. this should not produce any item
for (int i = 0; i < expectedKeys.length; i++) { for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
} }
processor.checkAndClearResult(); processor.checkAndClearProcessResult();
// push all four items to the primary stream. this should produce four items. // push all four items to the primary stream. this should produce four items.
@ -126,7 +126,7 @@ public class KStreamKTableLeftJoinTest {
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
} }
processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
// push two items with null to the other stream as deletes. this should not produce any item. // push two items with null to the other stream as deletes. this should not produce any item.
@ -134,7 +134,7 @@ public class KStreamKTableLeftJoinTest {
driver.process(topic2, expectedKeys[i], null); driver.process(topic2, expectedKeys[i], null);
} }
processor.checkAndClearResult(); processor.checkAndClearProcessResult();
// push all four items to the primary stream. this should produce four items. // push all four items to the primary stream. this should produce four items.
@ -142,7 +142,7 @@ public class KStreamKTableLeftJoinTest {
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
} }
processor.checkAndClearResult("0:XX0+null", "1:XX1+null", "2:XX2+YY2", "3:XX3+YY3"); processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+YY2", "3:XX3+YY3");
} finally { } finally {
Utils.delete(baseDir); Utils.delete(baseDir);

View File

@ -182,15 +182,15 @@ public class KStreamWindowAggregateTest {
driver.setTime(4L); driver.setTime(4L);
driver.process(topic1, "A", "1"); driver.process(topic1, "A", "1");
proc1.checkAndClearResult( proc1.checkAndClearProcessResult(
"[A@0]:0+1", "[A@0]:0+1",
"[B@0]:0+2", "[B@0]:0+2",
"[C@0]:0+3", "[C@0]:0+3",
"[D@0]:0+4", "[D@0]:0+4",
"[A@0]:0+1+1" "[A@0]:0+1+1"
); );
proc2.checkAndClearResult(); proc2.checkAndClearProcessResult();
proc3.checkAndClearResult( proc3.checkAndClearProcessResult(
"[A@0]:null", "[A@0]:null",
"[B@0]:null", "[B@0]:null",
"[C@0]:null", "[C@0]:null",
@ -209,15 +209,15 @@ public class KStreamWindowAggregateTest {
driver.setTime(9L); driver.setTime(9L);
driver.process(topic1, "C", "3"); driver.process(topic1, "C", "3");
proc1.checkAndClearResult( proc1.checkAndClearProcessResult(
"[A@0]:0+1+1+1", "[A@5]:0+1", "[A@0]:0+1+1+1", "[A@5]:0+1",
"[B@0]:0+2+2", "[B@5]:0+2", "[B@0]:0+2+2", "[B@5]:0+2",
"[D@0]:0+4+4", "[D@5]:0+4", "[D@0]:0+4+4", "[D@5]:0+4",
"[B@0]:0+2+2+2", "[B@5]:0+2+2", "[B@0]:0+2+2+2", "[B@5]:0+2+2",
"[C@0]:0+3+3", "[C@5]:0+3" "[C@0]:0+3+3", "[C@5]:0+3"
); );
proc2.checkAndClearResult(); proc2.checkAndClearProcessResult();
proc3.checkAndClearResult( proc3.checkAndClearProcessResult(
"[A@0]:null", "[A@5]:null", "[A@0]:null", "[A@5]:null",
"[B@0]:null", "[B@5]:null", "[B@0]:null", "[B@5]:null",
"[D@0]:null", "[D@5]:null", "[D@0]:null", "[D@5]:null",
@ -236,15 +236,15 @@ public class KStreamWindowAggregateTest {
driver.setTime(4L); driver.setTime(4L);
driver.process(topic2, "A", "a"); driver.process(topic2, "A", "a");
proc1.checkAndClearResult(); proc1.checkAndClearProcessResult();
proc2.checkAndClearResult( proc2.checkAndClearProcessResult(
"[A@0]:0+a", "[A@0]:0+a",
"[B@0]:0+b", "[B@0]:0+b",
"[C@0]:0+c", "[C@0]:0+c",
"[D@0]:0+d", "[D@0]:0+d",
"[A@0]:0+a+a" "[A@0]:0+a+a"
); );
proc3.checkAndClearResult( proc3.checkAndClearProcessResult(
"[A@0]:0+1+1+1%0+a", "[A@0]:0+1+1+1%0+a",
"[B@0]:0+2+2+2%0+b", "[B@0]:0+2+2+2%0+b",
"[C@0]:0+3+3%0+c", "[C@0]:0+3+3%0+c",
@ -262,15 +262,15 @@ public class KStreamWindowAggregateTest {
driver.setTime(9L); driver.setTime(9L);
driver.process(topic2, "C", "c"); driver.process(topic2, "C", "c");
proc1.checkAndClearResult(); proc1.checkAndClearProcessResult();
proc2.checkAndClearResult( proc2.checkAndClearProcessResult(
"[A@0]:0+a+a+a", "[A@5]:0+a", "[A@0]:0+a+a+a", "[A@5]:0+a",
"[B@0]:0+b+b", "[B@5]:0+b", "[B@0]:0+b+b", "[B@5]:0+b",
"[D@0]:0+d+d", "[D@5]:0+d", "[D@0]:0+d+d", "[D@5]:0+d",
"[B@0]:0+b+b+b", "[B@5]:0+b+b", "[B@0]:0+b+b+b", "[B@5]:0+b+b",
"[C@0]:0+c+c", "[C@5]:0+c" "[C@0]:0+c+c", "[C@5]:0+c"
); );
proc3.checkAndClearResult( proc3.checkAndClearProcessResult(
"[A@0]:0+1+1+1%0+a+a+a", "[A@5]:0+1%0+a", "[A@0]:0+1+1+1%0+a+a+a", "[A@5]:0+1%0+a",
"[B@0]:0+2+2+2%0+b+b", "[B@5]:0+2+2%0+b", "[B@0]:0+2+2+2%0+b+b", "[B@5]:0+2+2%0+b",
"[D@0]:0+4+4%0+d+d", "[D@5]:0+4%0+d", "[D@0]:0+4+4%0+d+d", "[D@5]:0+4%0+d",

View File

@ -74,8 +74,8 @@ public class KTableFilterTest {
driver.process(topic1, "A", null); driver.process(topic1, "A", null);
driver.process(topic1, "B", null); driver.process(topic1, "B", null);
proc2.checkAndClearResult("A:null", "B:2", "C:null", "D:4", "A:null", "B:null"); proc2.checkAndClearProcessResult("A:null", "B:2", "C:null", "D:4", "A:null", "B:null");
proc3.checkAndClearResult("A:1", "B:null", "C:3", "D:null", "A:null", "B:null"); proc3.checkAndClearProcessResult("A:1", "B:null", "C:3", "D:null", "A:null", "B:null");
} }
@Test @Test
@ -193,25 +193,25 @@ public class KTableFilterTest {
driver.process(topic1, "B", 1); driver.process(topic1, "B", 1);
driver.process(topic1, "C", 1); driver.process(topic1, "C", 1);
proc1.checkAndClearResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)"); proc1.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
proc2.checkAndClearResult("A:(null<-null)", "B:(null<-null)", "C:(null<-null)"); proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)", "C:(null<-null)");
driver.process(topic1, "A", 2); driver.process(topic1, "A", 2);
driver.process(topic1, "B", 2); driver.process(topic1, "B", 2);
proc1.checkAndClearResult("A:(2<-null)", "B:(2<-null)"); proc1.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
proc2.checkAndClearResult("A:(2<-null)", "B:(2<-null)"); proc2.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
driver.process(topic1, "A", 3); driver.process(topic1, "A", 3);
proc1.checkAndClearResult("A:(3<-null)"); proc1.checkAndClearProcessResult("A:(3<-null)");
proc2.checkAndClearResult("A:(null<-null)"); proc2.checkAndClearProcessResult("A:(null<-null)");
driver.process(topic1, "A", null); driver.process(topic1, "A", null);
driver.process(topic1, "B", null); driver.process(topic1, "B", null);
proc1.checkAndClearResult("A:(null<-null)", "B:(null<-null)"); proc1.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)");
proc2.checkAndClearResult("A:(null<-null)", "B:(null<-null)"); proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)");
} finally { } finally {
Utils.delete(stateDir); Utils.delete(stateDir);
@ -250,25 +250,25 @@ public class KTableFilterTest {
driver.process(topic1, "B", 1); driver.process(topic1, "B", 1);
driver.process(topic1, "C", 1); driver.process(topic1, "C", 1);
proc1.checkAndClearResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)"); proc1.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
proc2.checkAndClearResult("A:(null<-null)", "B:(null<-null)", "C:(null<-null)"); proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)", "C:(null<-null)");
driver.process(topic1, "A", 2); driver.process(topic1, "A", 2);
driver.process(topic1, "B", 2); driver.process(topic1, "B", 2);
proc1.checkAndClearResult("A:(2<-1)", "B:(2<-1)"); proc1.checkAndClearProcessResult("A:(2<-1)", "B:(2<-1)");
proc2.checkAndClearResult("A:(2<-null)", "B:(2<-null)"); proc2.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
driver.process(topic1, "A", 3); driver.process(topic1, "A", 3);
proc1.checkAndClearResult("A:(3<-2)"); proc1.checkAndClearProcessResult("A:(3<-2)");
proc2.checkAndClearResult("A:(null<-2)"); proc2.checkAndClearProcessResult("A:(null<-2)");
driver.process(topic1, "A", null); driver.process(topic1, "A", null);
driver.process(topic1, "B", null); driver.process(topic1, "B", null);
proc1.checkAndClearResult("A:(null<-3)", "B:(null<-2)"); proc1.checkAndClearProcessResult("A:(null<-3)", "B:(null<-2)");
proc2.checkAndClearResult("A:(null<-null)", "B:(null<-2)"); proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-2)");
} finally { } finally {
Utils.delete(stateDir); Utils.delete(stateDir);

View File

@ -100,7 +100,7 @@ public class KTableKTableJoinTest {
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
} }
processor.checkAndClearResult("0:null", "1:null"); processor.checkAndClearProcessResult("0:null", "1:null");
checkJoinedValues(getter, kv(0, null), kv(1, null), kv(2, null), kv(3, null)); checkJoinedValues(getter, kv(0, null), kv(1, null), kv(2, null), kv(3, null));
// push two items to the other stream. this should produce two items. // push two items to the other stream. this should produce two items.
@ -109,7 +109,7 @@ public class KTableKTableJoinTest {
driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
} }
processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1"); processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, null), kv(3, null)); checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, null), kv(3, null));
// push all four items to the primary stream. this should produce four items. // push all four items to the primary stream. this should produce four items.
@ -118,7 +118,7 @@ public class KTableKTableJoinTest {
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
} }
processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1", "2:null", "3:null"); processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:null", "3:null");
checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, null), kv(3, null)); checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, null), kv(3, null));
// push all items to the other stream. this should produce four items. // push all items to the other stream. this should produce four items.
@ -126,7 +126,7 @@ public class KTableKTableJoinTest {
driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
} }
processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3")); checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
// push all four items to the primary stream. this should produce four items. // push all four items to the primary stream. this should produce four items.
@ -135,7 +135,7 @@ public class KTableKTableJoinTest {
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
} }
processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3")); checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
// push two items with null to the other stream as deletes. this should produce two item. // push two items with null to the other stream as deletes. this should produce two item.
@ -144,7 +144,7 @@ public class KTableKTableJoinTest {
driver.process(topic2, expectedKeys[i], null); driver.process(topic2, expectedKeys[i], null);
} }
processor.checkAndClearResult("0:null", "1:null"); processor.checkAndClearProcessResult("0:null", "1:null");
checkJoinedValues(getter, kv(0, null), kv(1, null), kv(2, "X2+YY2"), kv(3, "X3+YY3")); checkJoinedValues(getter, kv(0, null), kv(1, null), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
// push all four items to the primary stream. this should produce four items. // push all four items to the primary stream. this should produce four items.
@ -153,7 +153,7 @@ public class KTableKTableJoinTest {
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
} }
processor.checkAndClearResult("0:null", "1:null", "2:XX2+YY2", "3:XX3+YY3"); processor.checkAndClearProcessResult("0:null", "1:null", "2:XX2+YY2", "3:XX3+YY3");
checkJoinedValues(getter, kv(0, null), kv(1, null), kv(2, "XX2+YY2"), kv(3, "XX3+YY3")); checkJoinedValues(getter, kv(0, null), kv(1, null), kv(2, "XX2+YY2"), kv(3, "XX3+YY3"));
} finally { } finally {
@ -195,7 +195,7 @@ public class KTableKTableJoinTest {
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
} }
proc.checkAndClearResult("0:(null<-null)", "1:(null<-null)"); proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)");
// push two items to the other stream. this should produce two items. // push two items to the other stream. this should produce two items.
@ -203,7 +203,7 @@ public class KTableKTableJoinTest {
driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
} }
proc.checkAndClearResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)"); proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)");
// push all four items to the primary stream. this should produce four items. // push all four items to the primary stream. this should produce four items.
@ -211,14 +211,14 @@ public class KTableKTableJoinTest {
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
} }
proc.checkAndClearResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)", "2:(null<-null)", "3:(null<-null)"); proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)", "2:(null<-null)", "3:(null<-null)");
// push all items to the other stream. this should produce four items. // push all items to the other stream. this should produce four items.
for (int i = 0; i < expectedKeys.length; i++) { for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
} }
proc.checkAndClearResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)"); proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
// push all four items to the primary stream. this should produce four items. // push all four items to the primary stream. this should produce four items.
@ -226,7 +226,7 @@ public class KTableKTableJoinTest {
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
} }
proc.checkAndClearResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)"); proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
// push two items with null to the other stream as deletes. this should produce two item. // push two items with null to the other stream as deletes. this should produce two item.
@ -234,7 +234,7 @@ public class KTableKTableJoinTest {
driver.process(topic2, expectedKeys[i], null); driver.process(topic2, expectedKeys[i], null);
} }
proc.checkAndClearResult("0:(null<-null)", "1:(null<-null)"); proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)");
// push all four items to the primary stream. this should produce four items. // push all four items to the primary stream. this should produce four items.
@ -242,7 +242,7 @@ public class KTableKTableJoinTest {
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
} }
proc.checkAndClearResult("0:(null<-null)", "1:(null<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)"); proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)");
} finally { } finally {
Utils.delete(baseDir); Utils.delete(baseDir);
@ -285,7 +285,7 @@ public class KTableKTableJoinTest {
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
} }
proc.checkAndClearResult("0:(null<-null)", "1:(null<-null)"); proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)");
// push two items to the other stream. this should produce two items. // push two items to the other stream. this should produce two items.
@ -293,7 +293,7 @@ public class KTableKTableJoinTest {
driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
} }
proc.checkAndClearResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)"); proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)");
// push all four items to the primary stream. this should produce four items. // push all four items to the primary stream. this should produce four items.
@ -301,14 +301,14 @@ public class KTableKTableJoinTest {
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
} }
proc.checkAndClearResult("0:(X0+Y0<-X0+Y0)", "1:(X1+Y1<-X1+Y1)", "2:(null<-null)", "3:(null<-null)"); proc.checkAndClearProcessResult("0:(X0+Y0<-X0+Y0)", "1:(X1+Y1<-X1+Y1)", "2:(null<-null)", "3:(null<-null)");
// push all items to the other stream. this should produce four items. // push all items to the other stream. this should produce four items.
for (int i = 0; i < expectedKeys.length; i++) { for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
} }
proc.checkAndClearResult("0:(X0+YY0<-X0+Y0)", "1:(X1+YY1<-X1+Y1)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)"); proc.checkAndClearProcessResult("0:(X0+YY0<-X0+Y0)", "1:(X1+YY1<-X1+Y1)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
// push all four items to the primary stream. this should produce four items. // push all four items to the primary stream. this should produce four items.
@ -316,7 +316,7 @@ public class KTableKTableJoinTest {
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
} }
proc.checkAndClearResult("0:(X0+YY0<-X0+YY0)", "1:(X1+YY1<-X1+YY1)", "2:(X2+YY2<-X2+YY2)", "3:(X3+YY3<-X3+YY3)"); proc.checkAndClearProcessResult("0:(X0+YY0<-X0+YY0)", "1:(X1+YY1<-X1+YY1)", "2:(X2+YY2<-X2+YY2)", "3:(X3+YY3<-X3+YY3)");
// push two items with null to the other stream as deletes. this should produce two item. // push two items with null to the other stream as deletes. this should produce two item.
@ -324,7 +324,7 @@ public class KTableKTableJoinTest {
driver.process(topic2, expectedKeys[i], null); driver.process(topic2, expectedKeys[i], null);
} }
proc.checkAndClearResult("0:(null<-X0+YY0)", "1:(null<-X1+YY1)"); proc.checkAndClearProcessResult("0:(null<-X0+YY0)", "1:(null<-X1+YY1)");
// push all four items to the primary stream. this should produce four items. // push all four items to the primary stream. this should produce four items.
@ -332,7 +332,7 @@ public class KTableKTableJoinTest {
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
} }
proc.checkAndClearResult("0:(null<-null)", "1:(null<-null)", "2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)"); proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)", "2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)");
} finally { } finally {
Utils.delete(baseDir); Utils.delete(baseDir);

View File

@ -105,7 +105,7 @@ public class KTableKTableLeftJoinTest {
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
} }
processor.checkAndClearResult("0:X0+null", "1:X1+null"); processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
checkJoinedValues(getter, kv(0, "X0+null"), kv(1, "X1+null"), kv(2, null), kv(3, null)); checkJoinedValues(getter, kv(0, "X0+null"), kv(1, "X1+null"), kv(2, null), kv(3, null));
// push two items to the other stream. this should produce two items. // push two items to the other stream. this should produce two items.
@ -114,7 +114,7 @@ public class KTableKTableLeftJoinTest {
driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
} }
processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1"); processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, null), kv(3, null)); checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, null), kv(3, null));
// push all four items to the primary stream. this should produce four items. // push all four items to the primary stream. this should produce four items.
@ -123,7 +123,7 @@ public class KTableKTableLeftJoinTest {
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
} }
processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null"); processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, "X2+null"), kv(3, "X3+null")); checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, "X2+null"), kv(3, "X3+null"));
// push all items to the other stream. this should produce four items. // push all items to the other stream. this should produce four items.
@ -131,7 +131,7 @@ public class KTableKTableLeftJoinTest {
driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
} }
processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3")); checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
// push all four items to the primary stream. this should produce four items. // push all four items to the primary stream. this should produce four items.
@ -140,7 +140,7 @@ public class KTableKTableLeftJoinTest {
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
} }
processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3")); checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
// push two items with null to the other stream as deletes. this should produce two item. // push two items with null to the other stream as deletes. this should produce two item.
@ -149,7 +149,7 @@ public class KTableKTableLeftJoinTest {
driver.process(topic2, expectedKeys[i], null); driver.process(topic2, expectedKeys[i], null);
} }
processor.checkAndClearResult("0:X0+null", "1:X1+null"); processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
checkJoinedValues(getter, kv(0, "X0+null"), kv(1, "X1+null"), kv(2, "X2+YY2"), kv(3, "X3+YY3")); checkJoinedValues(getter, kv(0, "X0+null"), kv(1, "X1+null"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
// push all four items to the primary stream. this should produce four items. // push all four items to the primary stream. this should produce four items.
@ -158,7 +158,7 @@ public class KTableKTableLeftJoinTest {
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
} }
processor.checkAndClearResult("0:XX0+null", "1:XX1+null", "2:XX2+YY2", "3:XX3+YY3"); processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+YY2", "3:XX3+YY3");
checkJoinedValues(getter, kv(0, "XX0+null"), kv(1, "XX1+null"), kv(2, "XX2+YY2"), kv(3, "XX3+YY3")); checkJoinedValues(getter, kv(0, "XX0+null"), kv(1, "XX1+null"), kv(2, "XX2+YY2"), kv(3, "XX3+YY3"));
} finally { } finally {
@ -200,7 +200,7 @@ public class KTableKTableLeftJoinTest {
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
} }
proc.checkAndClearResult("0:(X0+null<-null)", "1:(X1+null<-null)"); proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)");
// push two items to the other stream. this should produce two items. // push two items to the other stream. this should produce two items.
@ -208,7 +208,7 @@ public class KTableKTableLeftJoinTest {
driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
} }
proc.checkAndClearResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)"); proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)");
// push all four items to the primary stream. this should produce four items. // push all four items to the primary stream. this should produce four items.
@ -216,14 +216,14 @@ public class KTableKTableLeftJoinTest {
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
} }
proc.checkAndClearResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)", "2:(X2+null<-null)", "3:(X3+null<-null)"); proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)", "2:(X2+null<-null)", "3:(X3+null<-null)");
// push all items to the other stream. this should produce four items. // push all items to the other stream. this should produce four items.
for (int i = 0; i < expectedKeys.length; i++) { for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
} }
proc.checkAndClearResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)"); proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
// push all four items to the primary stream. this should produce four items. // push all four items to the primary stream. this should produce four items.
@ -231,7 +231,7 @@ public class KTableKTableLeftJoinTest {
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
} }
proc.checkAndClearResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)"); proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
// push two items with null to the other stream as deletes. this should produce two item. // push two items with null to the other stream as deletes. this should produce two item.
@ -239,7 +239,7 @@ public class KTableKTableLeftJoinTest {
driver.process(topic2, expectedKeys[i], null); driver.process(topic2, expectedKeys[i], null);
} }
proc.checkAndClearResult("0:(X0+null<-null)", "1:(X1+null<-null)"); proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)");
// push all four items to the primary stream. this should produce four items. // push all four items to the primary stream. this should produce four items.
@ -247,7 +247,7 @@ public class KTableKTableLeftJoinTest {
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
} }
proc.checkAndClearResult("0:(XX0+null<-null)", "1:(XX1+null<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)"); proc.checkAndClearProcessResult("0:(XX0+null<-null)", "1:(XX1+null<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)");
} finally { } finally {
Utils.delete(baseDir); Utils.delete(baseDir);
@ -290,7 +290,7 @@ public class KTableKTableLeftJoinTest {
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
} }
proc.checkAndClearResult("0:(X0+null<-null)", "1:(X1+null<-null)"); proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)");
// push two items to the other stream. this should produce two items. // push two items to the other stream. this should produce two items.
@ -298,7 +298,7 @@ public class KTableKTableLeftJoinTest {
driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
} }
proc.checkAndClearResult("0:(X0+Y0<-X0+null)", "1:(X1+Y1<-X1+null)"); proc.checkAndClearProcessResult("0:(X0+Y0<-X0+null)", "1:(X1+Y1<-X1+null)");
// push all four items to the primary stream. this should produce four items. // push all four items to the primary stream. this should produce four items.
@ -306,14 +306,14 @@ public class KTableKTableLeftJoinTest {
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
} }
proc.checkAndClearResult("0:(X0+Y0<-X0+Y0)", "1:(X1+Y1<-X1+Y1)", "2:(X2+null<-null)", "3:(X3+null<-null)"); proc.checkAndClearProcessResult("0:(X0+Y0<-X0+Y0)", "1:(X1+Y1<-X1+Y1)", "2:(X2+null<-null)", "3:(X3+null<-null)");
// push all items to the other stream. this should produce four items. // push all items to the other stream. this should produce four items.
for (int i = 0; i < expectedKeys.length; i++) { for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
} }
proc.checkAndClearResult("0:(X0+YY0<-X0+Y0)", "1:(X1+YY1<-X1+Y1)", "2:(X2+YY2<-X2+null)", "3:(X3+YY3<-X3+null)"); proc.checkAndClearProcessResult("0:(X0+YY0<-X0+Y0)", "1:(X1+YY1<-X1+Y1)", "2:(X2+YY2<-X2+null)", "3:(X3+YY3<-X3+null)");
// push all four items to the primary stream. this should produce four items. // push all four items to the primary stream. this should produce four items.
@ -321,7 +321,7 @@ public class KTableKTableLeftJoinTest {
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
} }
proc.checkAndClearResult("0:(X0+YY0<-X0+YY0)", "1:(X1+YY1<-X1+YY1)", "2:(X2+YY2<-X2+YY2)", "3:(X3+YY3<-X3+YY3)"); proc.checkAndClearProcessResult("0:(X0+YY0<-X0+YY0)", "1:(X1+YY1<-X1+YY1)", "2:(X2+YY2<-X2+YY2)", "3:(X3+YY3<-X3+YY3)");
// push two items with null to the other stream as deletes. this should produce two item. // push two items with null to the other stream as deletes. this should produce two item.
@ -329,7 +329,7 @@ public class KTableKTableLeftJoinTest {
driver.process(topic2, expectedKeys[i], null); driver.process(topic2, expectedKeys[i], null);
} }
proc.checkAndClearResult("0:(X0+null<-X0+YY0)", "1:(X1+null<-X1+YY1)"); proc.checkAndClearProcessResult("0:(X0+null<-X0+YY0)", "1:(X1+null<-X1+YY1)");
// push all four items to the primary stream. this should produce four items. // push all four items to the primary stream. this should produce four items.
@ -337,7 +337,7 @@ public class KTableKTableLeftJoinTest {
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
} }
proc.checkAndClearResult("0:(XX0+null<-X0+null)", "1:(XX1+null<-X1+null)", "2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)"); proc.checkAndClearProcessResult("0:(XX0+null<-X0+null)", "1:(XX1+null<-X1+null)", "2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)");
} finally { } finally {
Utils.delete(baseDir); Utils.delete(baseDir);

View File

@ -100,7 +100,7 @@ public class KTableKTableOuterJoinTest {
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
} }
processor.checkAndClearResult("0:X0+null", "1:X1+null"); processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
checkJoinedValues(getter, kv(0, "X0+null"), kv(1, "X1+null"), kv(2, null), kv(3, null)); checkJoinedValues(getter, kv(0, "X0+null"), kv(1, "X1+null"), kv(2, null), kv(3, null));
// push two items to the other stream. this should produce two items. // push two items to the other stream. this should produce two items.
@ -109,7 +109,7 @@ public class KTableKTableOuterJoinTest {
driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
} }
processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1"); processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, null), kv(3, null)); checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, null), kv(3, null));
// push all four items to the primary stream. this should produce four items. // push all four items to the primary stream. this should produce four items.
@ -118,7 +118,7 @@ public class KTableKTableOuterJoinTest {
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
} }
processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null"); processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, "X2+null"), kv(3, "X3+null")); checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, "X2+null"), kv(3, "X3+null"));
// push all items to the other stream. this should produce four items. // push all items to the other stream. this should produce four items.
@ -126,7 +126,7 @@ public class KTableKTableOuterJoinTest {
driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
} }
processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3")); checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
// push all four items to the primary stream. this should produce four items. // push all four items to the primary stream. this should produce four items.
@ -135,7 +135,7 @@ public class KTableKTableOuterJoinTest {
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
} }
processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3")); checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
// push two items with null to the other stream as deletes. this should produce two item. // push two items with null to the other stream as deletes. this should produce two item.
@ -144,7 +144,7 @@ public class KTableKTableOuterJoinTest {
driver.process(topic2, expectedKeys[i], null); driver.process(topic2, expectedKeys[i], null);
} }
processor.checkAndClearResult("0:X0+null", "1:X1+null"); processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
checkJoinedValues(getter, kv(0, "X0+null"), kv(1, "X1+null"), kv(2, "X2+YY2"), kv(3, "X3+YY3")); checkJoinedValues(getter, kv(0, "X0+null"), kv(1, "X1+null"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
// push all four items to the primary stream. this should produce four items. // push all four items to the primary stream. this should produce four items.
@ -153,7 +153,7 @@ public class KTableKTableOuterJoinTest {
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
} }
processor.checkAndClearResult("0:XX0+null", "1:XX1+null", "2:XX2+YY2", "3:XX3+YY3"); processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+YY2", "3:XX3+YY3");
checkJoinedValues(getter, kv(0, "XX0+null"), kv(1, "XX1+null"), kv(2, "XX2+YY2"), kv(3, "XX3+YY3")); checkJoinedValues(getter, kv(0, "XX0+null"), kv(1, "XX1+null"), kv(2, "XX2+YY2"), kv(3, "XX3+YY3"));
// push middle two items to the primary stream with null. this should produce two items. // push middle two items to the primary stream with null. this should produce two items.
@ -162,7 +162,7 @@ public class KTableKTableOuterJoinTest {
driver.process(topic1, expectedKeys[i], null); driver.process(topic1, expectedKeys[i], null);
} }
processor.checkAndClearResult("1:null", "2:null+YY2"); processor.checkAndClearProcessResult("1:null", "2:null+YY2");
checkJoinedValues(getter, kv(0, "XX0+null"), kv(1, null), kv(2, "null+YY2"), kv(3, "XX3+YY3")); checkJoinedValues(getter, kv(0, "XX0+null"), kv(1, null), kv(2, "null+YY2"), kv(3, "XX3+YY3"));
} finally { } finally {
@ -204,7 +204,7 @@ public class KTableKTableOuterJoinTest {
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
} }
proc.checkAndClearResult("0:(X0+null<-null)", "1:(X1+null<-null)"); proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)");
// push two items to the other stream. this should produce two items. // push two items to the other stream. this should produce two items.
@ -212,7 +212,7 @@ public class KTableKTableOuterJoinTest {
driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
} }
proc.checkAndClearResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)"); proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)");
// push all four items to the primary stream. this should produce four items. // push all four items to the primary stream. this should produce four items.
@ -220,14 +220,14 @@ public class KTableKTableOuterJoinTest {
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
} }
proc.checkAndClearResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)", "2:(X2+null<-null)", "3:(X3+null<-null)"); proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)", "2:(X2+null<-null)", "3:(X3+null<-null)");
// push all items to the other stream. this should produce four items. // push all items to the other stream. this should produce four items.
for (int i = 0; i < expectedKeys.length; i++) { for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
} }
proc.checkAndClearResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)"); proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
// push all four items to the primary stream. this should produce four items. // push all four items to the primary stream. this should produce four items.
@ -235,7 +235,7 @@ public class KTableKTableOuterJoinTest {
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
} }
proc.checkAndClearResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)"); proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
// push two items with null to the other stream as deletes. this should produce two item. // push two items with null to the other stream as deletes. this should produce two item.
@ -243,7 +243,7 @@ public class KTableKTableOuterJoinTest {
driver.process(topic2, expectedKeys[i], null); driver.process(topic2, expectedKeys[i], null);
} }
proc.checkAndClearResult("0:(X0+null<-null)", "1:(X1+null<-null)"); proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)");
// push all four items to the primary stream. this should produce four items. // push all four items to the primary stream. this should produce four items.
@ -251,7 +251,7 @@ public class KTableKTableOuterJoinTest {
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
} }
proc.checkAndClearResult("0:(XX0+null<-null)", "1:(XX1+null<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)"); proc.checkAndClearProcessResult("0:(XX0+null<-null)", "1:(XX1+null<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)");
// push middle two items to the primary stream with null. this should produce two items. // push middle two items to the primary stream with null. this should produce two items.
@ -259,7 +259,7 @@ public class KTableKTableOuterJoinTest {
driver.process(topic1, expectedKeys[i], null); driver.process(topic1, expectedKeys[i], null);
} }
proc.checkAndClearResult("1:(null<-null)", "2:(null+YY2<-null)"); proc.checkAndClearProcessResult("1:(null<-null)", "2:(null+YY2<-null)");
} finally { } finally {
Utils.delete(baseDir); Utils.delete(baseDir);
@ -302,7 +302,7 @@ public class KTableKTableOuterJoinTest {
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
} }
proc.checkAndClearResult("0:(X0+null<-null)", "1:(X1+null<-null)"); proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)");
// push two items to the other stream. this should produce two items. // push two items to the other stream. this should produce two items.
@ -310,7 +310,7 @@ public class KTableKTableOuterJoinTest {
driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
} }
proc.checkAndClearResult("0:(X0+Y0<-X0+null)", "1:(X1+Y1<-X1+null)"); proc.checkAndClearProcessResult("0:(X0+Y0<-X0+null)", "1:(X1+Y1<-X1+null)");
// push all four items to the primary stream. this should produce four items. // push all four items to the primary stream. this should produce four items.
@ -318,14 +318,14 @@ public class KTableKTableOuterJoinTest {
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
} }
proc.checkAndClearResult("0:(X0+Y0<-X0+Y0)", "1:(X1+Y1<-X1+Y1)", "2:(X2+null<-null)", "3:(X3+null<-null)"); proc.checkAndClearProcessResult("0:(X0+Y0<-X0+Y0)", "1:(X1+Y1<-X1+Y1)", "2:(X2+null<-null)", "3:(X3+null<-null)");
// push all items to the other stream. this should produce four items. // push all items to the other stream. this should produce four items.
for (int i = 0; i < expectedKeys.length; i++) { for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
} }
proc.checkAndClearResult("0:(X0+YY0<-X0+Y0)", "1:(X1+YY1<-X1+Y1)", "2:(X2+YY2<-X2+null)", "3:(X3+YY3<-X3+null)"); proc.checkAndClearProcessResult("0:(X0+YY0<-X0+Y0)", "1:(X1+YY1<-X1+Y1)", "2:(X2+YY2<-X2+null)", "3:(X3+YY3<-X3+null)");
// push all four items to the primary stream. this should produce four items. // push all four items to the primary stream. this should produce four items.
@ -333,7 +333,7 @@ public class KTableKTableOuterJoinTest {
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
} }
proc.checkAndClearResult("0:(X0+YY0<-X0+YY0)", "1:(X1+YY1<-X1+YY1)", "2:(X2+YY2<-X2+YY2)", "3:(X3+YY3<-X3+YY3)"); proc.checkAndClearProcessResult("0:(X0+YY0<-X0+YY0)", "1:(X1+YY1<-X1+YY1)", "2:(X2+YY2<-X2+YY2)", "3:(X3+YY3<-X3+YY3)");
// push two items with null to the other stream as deletes. this should produce two item. // push two items with null to the other stream as deletes. this should produce two item.
@ -341,7 +341,7 @@ public class KTableKTableOuterJoinTest {
driver.process(topic2, expectedKeys[i], null); driver.process(topic2, expectedKeys[i], null);
} }
proc.checkAndClearResult("0:(X0+null<-X0+YY0)", "1:(X1+null<-X1+YY1)"); proc.checkAndClearProcessResult("0:(X0+null<-X0+YY0)", "1:(X1+null<-X1+YY1)");
// push all four items to the primary stream. this should produce four items. // push all four items to the primary stream. this should produce four items.
@ -349,7 +349,7 @@ public class KTableKTableOuterJoinTest {
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
} }
proc.checkAndClearResult("0:(XX0+null<-X0+null)", "1:(XX1+null<-X1+null)", "2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)"); proc.checkAndClearProcessResult("0:(XX0+null<-X0+null)", "1:(XX1+null<-X1+null)", "2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)");
// push middle two items to the primary stream with null. this should produce two items. // push middle two items to the primary stream with null. this should produce two items.
@ -357,7 +357,7 @@ public class KTableKTableOuterJoinTest {
driver.process(topic1, expectedKeys[i], null); driver.process(topic1, expectedKeys[i], null);
} }
proc.checkAndClearResult("1:(null<-XX1+null)", "2:(null+YY2<-XX2+YY2)"); proc.checkAndClearProcessResult("1:(null<-XX1+null)", "2:(null+YY2<-XX2+YY2)");
} finally { } finally {
Utils.delete(baseDir); Utils.delete(baseDir);

View File

@ -223,20 +223,20 @@ public class KTableMapValuesTest {
driver.process(topic1, "B", "01"); driver.process(topic1, "B", "01");
driver.process(topic1, "C", "01"); driver.process(topic1, "C", "01");
proc.checkAndClearResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)"); proc.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
driver.process(topic1, "A", "02"); driver.process(topic1, "A", "02");
driver.process(topic1, "B", "02"); driver.process(topic1, "B", "02");
proc.checkAndClearResult("A:(2<-null)", "B:(2<-null)"); proc.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
driver.process(topic1, "A", "03"); driver.process(topic1, "A", "03");
proc.checkAndClearResult("A:(3<-null)"); proc.checkAndClearProcessResult("A:(3<-null)");
driver.process(topic1, "A", null); driver.process(topic1, "A", null);
proc.checkAndClearResult("A:(null<-null)"); proc.checkAndClearProcessResult("A:(null<-null)");
} finally { } finally {
Utils.delete(stateDir); Utils.delete(stateDir);
@ -276,20 +276,20 @@ public class KTableMapValuesTest {
driver.process(topic1, "B", "01"); driver.process(topic1, "B", "01");
driver.process(topic1, "C", "01"); driver.process(topic1, "C", "01");
proc.checkAndClearResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)"); proc.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
driver.process(topic1, "A", "02"); driver.process(topic1, "A", "02");
driver.process(topic1, "B", "02"); driver.process(topic1, "B", "02");
proc.checkAndClearResult("A:(2<-1)", "B:(2<-1)"); proc.checkAndClearProcessResult("A:(2<-1)", "B:(2<-1)");
driver.process(topic1, "A", "03"); driver.process(topic1, "A", "03");
proc.checkAndClearResult("A:(3<-2)"); proc.checkAndClearProcessResult("A:(3<-2)");
driver.process(topic1, "A", null); driver.process(topic1, "A", null);
proc.checkAndClearResult("A:(null<-3)"); proc.checkAndClearProcessResult("A:(null<-3)");
} finally { } finally {
Utils.delete(stateDir); Utils.delete(stateDir);

View File

@ -131,21 +131,21 @@ public class KTableSourceTest {
driver.process(topic1, "B", "01"); driver.process(topic1, "B", "01");
driver.process(topic1, "C", "01"); driver.process(topic1, "C", "01");
proc1.checkAndClearResult("A:(01<-null)", "B:(01<-null)", "C:(01<-null)"); proc1.checkAndClearProcessResult("A:(01<-null)", "B:(01<-null)", "C:(01<-null)");
driver.process(topic1, "A", "02"); driver.process(topic1, "A", "02");
driver.process(topic1, "B", "02"); driver.process(topic1, "B", "02");
proc1.checkAndClearResult("A:(02<-null)", "B:(02<-null)"); proc1.checkAndClearProcessResult("A:(02<-null)", "B:(02<-null)");
driver.process(topic1, "A", "03"); driver.process(topic1, "A", "03");
proc1.checkAndClearResult("A:(03<-null)"); proc1.checkAndClearProcessResult("A:(03<-null)");
driver.process(topic1, "A", null); driver.process(topic1, "A", null);
driver.process(topic1, "B", null); driver.process(topic1, "B", null);
proc1.checkAndClearResult("A:(null<-null)", "B:(null<-null)"); proc1.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)");
} finally { } finally {
Utils.delete(stateDir); Utils.delete(stateDir);
@ -176,21 +176,21 @@ public class KTableSourceTest {
driver.process(topic1, "B", "01"); driver.process(topic1, "B", "01");
driver.process(topic1, "C", "01"); driver.process(topic1, "C", "01");
proc1.checkAndClearResult("A:(01<-null)", "B:(01<-null)", "C:(01<-null)"); proc1.checkAndClearProcessResult("A:(01<-null)", "B:(01<-null)", "C:(01<-null)");
driver.process(topic1, "A", "02"); driver.process(topic1, "A", "02");
driver.process(topic1, "B", "02"); driver.process(topic1, "B", "02");
proc1.checkAndClearResult("A:(02<-01)", "B:(02<-01)"); proc1.checkAndClearProcessResult("A:(02<-01)", "B:(02<-01)");
driver.process(topic1, "A", "03"); driver.process(topic1, "A", "03");
proc1.checkAndClearResult("A:(03<-02)"); proc1.checkAndClearProcessResult("A:(03<-02)");
driver.process(topic1, "A", null); driver.process(topic1, "A", null);
driver.process(topic1, "B", null); driver.process(topic1, "B", null);
proc1.checkAndClearResult("A:(null<-03)", "B:(null<-02)"); proc1.checkAndClearProcessResult("A:(null<-03)", "B:(null<-02)");
} finally { } finally {
Utils.delete(stateDir); Utils.delete(stateDir);

View File

@ -21,7 +21,6 @@ import static org.junit.Assert.assertEquals;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.IntegerSerializer;
@ -60,17 +59,17 @@ public class PartitionGroupTest {
// add three 3 records with timestamp 1, 3, 5 to partition-1 // add three 3 records with timestamp 1, 3, 5 to partition-1
List<ConsumerRecord<byte[], byte[]>> list1 = Arrays.asList( List<ConsumerRecord<byte[], byte[]>> list1 = Arrays.asList(
new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), new ConsumerRecord<>("topic", 1, 1L, recordKey, recordValue),
new ConsumerRecord<>("topic", 1, 3, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), new ConsumerRecord<>("topic", 1, 3L, recordKey, recordValue),
new ConsumerRecord<>("topic", 1, 5, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)); new ConsumerRecord<>("topic", 1, 5L, recordKey, recordValue));
group.addRawRecords(partition1, list1); group.addRawRecords(partition1, list1);
// add three 3 records with timestamp 2, 4, 6 to partition-2 // add three 3 records with timestamp 2, 4, 6 to partition-2
List<ConsumerRecord<byte[], byte[]>> list2 = Arrays.asList( List<ConsumerRecord<byte[], byte[]>> list2 = Arrays.asList(
new ConsumerRecord<>("topic", 1, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), new ConsumerRecord<>("topic", 2, 2L, recordKey, recordValue),
new ConsumerRecord<>("topic", 1, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), new ConsumerRecord<>("topic", 2, 4L, recordKey, recordValue),
new ConsumerRecord<>("topic", 1, 6, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)); new ConsumerRecord<>("topic", 2, 6L, recordKey, recordValue));
group.addRawRecords(partition2, list2); group.addRawRecords(partition2, list2);
@ -82,7 +81,7 @@ public class PartitionGroupTest {
StampedRecord record; StampedRecord record;
PartitionGroup.RecordInfo info = new PartitionGroup.RecordInfo(); PartitionGroup.RecordInfo info = new PartitionGroup.RecordInfo();
// get one record // get one record, now the time should be advanced
record = group.nextRecord(info); record = group.nextRecord(info);
assertEquals(partition1, info.partition()); assertEquals(partition1, info.partition());
assertEquals(1L, record.timestamp); assertEquals(1L, record.timestamp);
@ -99,5 +98,72 @@ public class PartitionGroupTest {
assertEquals(2, group.numBuffered(partition1)); assertEquals(2, group.numBuffered(partition1));
assertEquals(2, group.numBuffered(partition2)); assertEquals(2, group.numBuffered(partition2));
assertEquals(3L, group.timestamp()); assertEquals(3L, group.timestamp());
// add three 3 records with timestamp 2, 4, 6 to partition-1 again
List<ConsumerRecord<byte[], byte[]>> list3 = Arrays.asList(
new ConsumerRecord<>("topic", 1, 2L, recordKey, recordValue),
new ConsumerRecord<>("topic", 1, 4L, recordKey, recordValue));
group.addRawRecords(partition1, list3);
assertEquals(6, group.numBuffered());
assertEquals(4, group.numBuffered(partition1));
assertEquals(2, group.numBuffered(partition2));
assertEquals(3L, group.timestamp());
// get one record, time should not be advanced
record = group.nextRecord(info);
assertEquals(partition1, info.partition());
assertEquals(3L, record.timestamp);
assertEquals(5, group.numBuffered());
assertEquals(3, group.numBuffered(partition1));
assertEquals(2, group.numBuffered(partition2));
assertEquals(3L, group.timestamp());
// get one more record, now time should be advanced
record = group.nextRecord(info);
assertEquals(partition1, info.partition());
assertEquals(5L, record.timestamp);
assertEquals(4, group.numBuffered());
assertEquals(2, group.numBuffered(partition1));
assertEquals(2, group.numBuffered(partition2));
assertEquals(3L, group.timestamp());
// get one more record, time should not be advanced
record = group.nextRecord(info);
assertEquals(partition1, info.partition());
assertEquals(2L, record.timestamp);
assertEquals(3, group.numBuffered());
assertEquals(1, group.numBuffered(partition1));
assertEquals(2, group.numBuffered(partition2));
assertEquals(4L, group.timestamp());
// get one more record, now time should be advanced
record = group.nextRecord(info);
assertEquals(partition2, info.partition());
assertEquals(4L, record.timestamp);
assertEquals(2, group.numBuffered());
assertEquals(1, group.numBuffered(partition1));
assertEquals(1, group.numBuffered(partition2));
assertEquals(4L, group.timestamp());
// get one more record, time should not be advanced
record = group.nextRecord(info);
assertEquals(partition1, info.partition());
assertEquals(4L, record.timestamp);
assertEquals(1, group.numBuffered());
assertEquals(0, group.numBuffered(partition1));
assertEquals(1, group.numBuffered(partition2));
assertEquals(4L, group.timestamp());
// get one more record, time should not be advanced
record = group.nextRecord(info);
assertEquals(partition2, info.partition());
assertEquals(6L, record.timestamp);
assertEquals(0, group.numBuffered());
assertEquals(0, group.numBuffered(partition1));
assertEquals(0, group.numBuffered(partition2));
assertEquals(4L, group.timestamp());
} }
} }

View File

@ -32,6 +32,7 @@ import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.test.MockProcessorNode;
import org.apache.kafka.test.MockSourceNode; import org.apache.kafka.test.MockSourceNode;
import org.apache.kafka.test.MockTimestampExtractor; import org.apache.kafka.test.MockTimestampExtractor;
import org.junit.Test; import org.junit.Test;
@ -46,6 +47,7 @@ import java.util.Properties;
import java.util.Set; import java.util.Set;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
public class StreamTaskTest { public class StreamTaskTest {
@ -58,10 +60,12 @@ public class StreamTaskTest {
private final TopicPartition partition2 = new TopicPartition("topic2", 1); private final TopicPartition partition2 = new TopicPartition("topic2", 1);
private final Set<TopicPartition> partitions = Utils.mkSet(partition1, partition2); private final Set<TopicPartition> partitions = Utils.mkSet(partition1, partition2);
private final MockSourceNode source1 = new MockSourceNode<>(intDeserializer, intDeserializer); private final MockSourceNode<Integer, Integer> source1 = new MockSourceNode<>(intDeserializer, intDeserializer);
private final MockSourceNode source2 = new MockSourceNode<>(intDeserializer, intDeserializer); private final MockSourceNode<Integer, Integer> source2 = new MockSourceNode<>(intDeserializer, intDeserializer);
private final MockProcessorNode<Integer, Integer> processor = new MockProcessorNode<>(10L);
private final ProcessorTopology topology = new ProcessorTopology( private final ProcessorTopology topology = new ProcessorTopology(
Arrays.asList((ProcessorNode) source1, (ProcessorNode) source2), Arrays.asList((ProcessorNode) source1, (ProcessorNode) source2, (ProcessorNode) processor),
new HashMap<String, SourceNode>() { new HashMap<String, SourceNode>() {
{ {
put("topic1", source1); put("topic1", source1);
@ -94,6 +98,8 @@ public class StreamTaskTest {
@Before @Before
public void setup() { public void setup() {
consumer.assign(Arrays.asList(partition1, partition2)); consumer.assign(Arrays.asList(partition1, partition2));
source1.addChild(processor);
source2.addChild(processor);
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -211,6 +217,73 @@ public class StreamTaskTest {
} }
} }
@SuppressWarnings("unchecked")
@Test
public void testMaybePunctuate() throws Exception {
File baseDir = Files.createTempDirectory("test").toFile();
try {
StreamsConfig config = createConfig(baseDir);
StreamTask task = new StreamTask(new TaskId(0, 0), "applicationId", partitions, topology, consumer, producer, restoreStateConsumer, config, null);
task.addRecords(partition1, records(
new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
new ConsumerRecord<>(partition1.topic(), partition1.partition(), 30, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
));
task.addRecords(partition2, records(
new ConsumerRecord<>(partition2.topic(), partition2.partition(), 15, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
new ConsumerRecord<>(partition2.topic(), partition2.partition(), 25, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
new ConsumerRecord<>(partition2.topic(), partition2.partition(), 35, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
));
assertTrue(task.maybePunctuate());
assertEquals(5, task.process());
assertEquals(1, source1.numReceived);
assertEquals(0, source2.numReceived);
assertFalse(task.maybePunctuate());
assertEquals(4, task.process());
assertEquals(1, source1.numReceived);
assertEquals(1, source2.numReceived);
assertTrue(task.maybePunctuate());
assertEquals(3, task.process());
assertEquals(2, source1.numReceived);
assertEquals(1, source2.numReceived);
assertFalse(task.maybePunctuate());
assertEquals(2, task.process());
assertEquals(2, source1.numReceived);
assertEquals(2, source2.numReceived);
assertTrue(task.maybePunctuate());
assertEquals(1, task.process());
assertEquals(3, source1.numReceived);
assertEquals(2, source2.numReceived);
assertFalse(task.maybePunctuate());
assertEquals(0, task.process());
assertEquals(3, source1.numReceived);
assertEquals(3, source2.numReceived);
assertFalse(task.maybePunctuate());
processor.supplier.checkAndClearPunctuateResult(10L, 20L, 30L);
task.close();
} finally {
Utils.delete(baseDir);
}
}
private Iterable<ConsumerRecord<byte[], byte[]>> records(ConsumerRecord<byte[], byte[]>... recs) { private Iterable<ConsumerRecord<byte[], byte[]>> records(ConsumerRecord<byte[], byte[]>... recs) {
return Arrays.asList(recs); return Arrays.asList(recs);
} }

View File

@ -143,7 +143,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
@Override @Override
public void schedule(long interval) { public void schedule(long interval) {
throw new UnsupportedOperationException("schedule() not supported"); throw new UnsupportedOperationException("schedule() not supported.");
} }
@Override @Override

View File

@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.test;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger;
public class MockProcessorNode<K, V> extends ProcessorNode<K, V> {
public static final String NAME = "MOCK-PROCESS-";
public static final AtomicInteger INDEX = new AtomicInteger(1);
public int numReceived = 0;
public final MockProcessorSupplier<K, V> supplier;
public MockProcessorNode(long scheduleInterval) {
this(new MockProcessorSupplier<K, V>(scheduleInterval));
}
private MockProcessorNode(MockProcessorSupplier<K, V> supplier) {
super(NAME + INDEX.getAndIncrement(), supplier.get(), Collections.<String>emptySet());
this.supplier = supplier;
}
@Override
public void process(K key, V value) {
this.numReceived++;
processor().process(key, value);
}
}

View File

@ -17,6 +17,7 @@
package org.apache.kafka.test; package org.apache.kafka.test;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.processor.ProcessorSupplier;
@ -30,16 +31,28 @@ public class MockProcessorSupplier<K, V> implements ProcessorSupplier<K, V> {
public final ArrayList<String> processed = new ArrayList<>(); public final ArrayList<String> processed = new ArrayList<>();
public final ArrayList<Long> punctuated = new ArrayList<>(); public final ArrayList<Long> punctuated = new ArrayList<>();
private final long scheduleInterval;
public MockProcessorSupplier() {
this(-1L);
}
public MockProcessorSupplier(long scheduleInterval) {
this.scheduleInterval = scheduleInterval;
}
@Override @Override
public Processor<K, V> get() { public Processor<K, V> get() {
return new MockProcessor(); return new MockProcessor();
} }
public class MockProcessor implements Processor<K, V> { public class MockProcessor extends AbstractProcessor<K, V> {
@Override @Override
public void init(ProcessorContext context) { public void init(ProcessorContext context) {
// do nothing super.init(context);
if (scheduleInterval > 0L)
context.schedule(scheduleInterval);
} }
@Override @Override
@ -49,17 +62,16 @@ public class MockProcessorSupplier<K, V> implements ProcessorSupplier<K, V> {
@Override @Override
public void punctuate(long streamTime) { public void punctuate(long streamTime) {
assertEquals(streamTime, context().timestamp());
assertEquals(null, context().topic());
assertEquals(-1, context().partition());
assertEquals(-1L, context().offset());
punctuated.add(streamTime); punctuated.add(streamTime);
} }
@Override
public void close() {
// do nothing
} }
} public void checkAndClearProcessResult(String... expected) {
public void checkAndClearResult(String... expected) {
assertEquals("the number of outputs:", expected.length, processed.size()); assertEquals("the number of outputs:", expected.length, processed.size());
for (int i = 0; i < expected.length; i++) { for (int i = 0; i < expected.length; i++) {
@ -69,4 +81,14 @@ public class MockProcessorSupplier<K, V> implements ProcessorSupplier<K, V> {
processed.clear(); processed.clear();
} }
public void checkAndClearPunctuateResult(long... expected) {
assertEquals("the number of outputs:", expected.length, punctuated.size());
for (int i = 0; i < expected.length; i++) {
assertEquals("output[" + i + "]:", expected[i], (long) punctuated.get(i));
}
processed.clear();
}
} }