From 7436d28a243295ae7c8c3723e97ea43a10f2771c Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Thu, 2 Feb 2017 10:33:33 -0800 Subject: [PATCH] KAFKA-3896: Fix KStream-KStream leftJoin in RepartitionIntegrationTest The issue of transiently having duplicates is due to the bad design of the left join itself: in order to ignore the partial joined results such as `A:null`, it lets the producer to potentially send twice to source stream one and rely on all the following conditions to be true in order to pass the test: 1. `receiveMessages` happen to have fetched all the produced results and have committed offsets. 2. streams app happen to have completed sending all result data. 3. consumer used in `receiveMessages` will complete getting all messages in a single poll(). If any of the above is not true, the test fails. Fixed this test to add a filter right after left join to filter out partial joined results. Minor cleanup on integration test utils. Author: Guozhang Wang Reviewers: Damian Guy, Ewen Cheslack-Postava Closes #2485 from guozhangwang/K3896-duplicate-join-results --- .../KStreamRepartitionJoinTest.java | 30 +++++++------------ .../utils/IntegrationTestUtils.java | 23 +++++++------- 2 files changed, 24 insertions(+), 29 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java index 43e5d879f31..f08bc72f834 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java @@ -32,6 +32,7 @@ import org.apache.kafka.streams.kstream.JoinWindows; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.streams.kstream.Predicate; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.test.MockKeyValueMapper; import org.apache.kafka.test.MockValueJoiner; @@ -143,7 +144,7 @@ public class KStreamRepartitionJoinTest { verifyCorrectOutput(flatMapJoin); verifyCorrectOutput(mapRhs); verifyCorrectOutput(mapJoinJoin); - verifyLeftJoin(leftJoin); + verifyCorrectOutput(leftJoin); } private ExpectedOutputOnTopic mapStreamOneAndJoin() throws InterruptedException { @@ -232,6 +233,13 @@ public class KStreamRepartitionJoinTest { Serdes.Integer(), Serdes.Integer(), Serdes.String()) + .filterNot(new Predicate() { + @Override + public boolean test(Integer key, String value) { + // filter not left-only join results + return value.substring(2).equals("null"); + } + }) .to(Serdes.Integer(), Serdes.String(), outputTopic); return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, outputTopic); @@ -268,7 +276,7 @@ public class KStreamRepartitionJoinTest { } private JoinWindows getJoinWindow() { - return (JoinWindows) JoinWindows.of(WINDOW_SIZE).until(3 * WINDOW_SIZE); + return JoinWindows.of(WINDOW_SIZE).until(3 * WINDOW_SIZE); } @@ -282,7 +290,6 @@ public class KStreamRepartitionJoinTest { } } - private void verifyCorrectOutput(final ExpectedOutputOnTopic expectedOutputOnTopic) throws InterruptedException { assertThat(receiveMessages(new StringDeserializer(), @@ -291,16 +298,6 @@ public class KStreamRepartitionJoinTest { is(expectedOutputOnTopic.expectedOutput)); } - private void verifyLeftJoin(final ExpectedOutputOnTopic expectedOutputOnTopic) - throws InterruptedException, ExecutionException { - final List received = receiveMessages(new StringDeserializer(), expectedOutputOnTopic - .expectedOutput.size(), expectedOutputOnTopic.outputTopic); - if (!received.equals(expectedOutputOnTopic.expectedOutput)) { - produceToStreamOne(); - verifyCorrectOutput(expectedOutputOnTopic.expectedOutput, expectedOutputOnTopic.outputTopic); - } - } - private void produceMessages() throws ExecutionException, InterruptedException { produceToStreamOne(); @@ -380,13 +377,8 @@ public class KStreamRepartitionJoinTest { numMessages, 60 * 1000); Collections.sort(received); - return received; - } - private void verifyCorrectOutput(final List expectedMessages, - final String topic) throws InterruptedException { - assertThat(receiveMessages(new StringDeserializer(), expectedMessages.size(), topic), - is(expectedMessages)); + return received; } private void doJoin(final KStream lhs, diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java index 08e22cca763..a38781b5b8c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java @@ -61,12 +61,13 @@ public class IntegrationTestUtils { * * @param topic Kafka topic to read messages from * @param consumerConfig Kafka consumer configuration + * @param waitTime Maximum wait time in milliseconds * @param maxMessages Maximum number of messages to read via the consumer. * @return The values retrieved via the consumer. */ - public static List readValues(final String topic, final Properties consumerConfig, final int maxMessages) { + public static List readValues(final String topic, final Properties consumerConfig, final long waitTime, final int maxMessages) { final List returnList = new ArrayList<>(); - final List> kvs = readKeyValues(topic, consumerConfig, maxMessages); + final List> kvs = readKeyValues(topic, consumerConfig, waitTime, maxMessages); for (final KeyValue kv : kvs) { returnList.add(kv.value); } @@ -79,10 +80,11 @@ public class IntegrationTestUtils { * * @param topic Kafka topic to read messages from * @param consumerConfig Kafka consumer configuration + * @param waitTime Maximum wait time in milliseconds * @return The KeyValue elements retrieved via the consumer. */ - public static List> readKeyValues(final String topic, final Properties consumerConfig) { - return readKeyValues(topic, consumerConfig, UNLIMITED_MESSAGES); + public static List> readKeyValues(final String topic, final Properties consumerConfig, final long waitTime) { + return readKeyValues(topic, consumerConfig, waitTime, UNLIMITED_MESSAGES); } /** @@ -91,17 +93,17 @@ public class IntegrationTestUtils { * * @param topic Kafka topic to read messages from * @param consumerConfig Kafka consumer configuration + * @param waitTime Maximum wait time in milliseconds * @param maxMessages Maximum number of messages to read via the consumer * @return The KeyValue elements retrieved via the consumer */ - public static List> readKeyValues(final String topic, final Properties consumerConfig, final int maxMessages) { + public static List> readKeyValues(final String topic, final Properties consumerConfig, final long waitTime, final int maxMessages) { final KafkaConsumer consumer = new KafkaConsumer<>(consumerConfig); consumer.subscribe(Collections.singletonList(topic)); final int pollIntervalMs = 100; - final int maxTotalPollTimeMs = 2000; - int totalPollTimeMs = 0; final List> consumedValues = new ArrayList<>(); - while (totalPollTimeMs < maxTotalPollTimeMs && continueConsuming(consumedValues.size(), maxMessages)) { + int totalPollTimeMs = 0; + while (totalPollTimeMs < waitTime && continueConsuming(consumedValues.size(), maxMessages)) { totalPollTimeMs += pollIntervalMs; final ConsumerRecords records = consumer.poll(pollIntervalMs); for (final ConsumerRecord record : records) { @@ -208,7 +210,7 @@ public class IntegrationTestUtils { final TestCondition valuesRead = new TestCondition() { @Override public boolean conditionMet() { - final List> readData = readKeyValues(topic, consumerConfig); + final List> readData = readKeyValues(topic, consumerConfig, waitTime); accumData.addAll(readData); return accumData.size() >= expectedNumRecords; } @@ -248,8 +250,9 @@ public class IntegrationTestUtils { final TestCondition valuesRead = new TestCondition() { @Override public boolean conditionMet() { - final List readData = readValues(topic, consumerConfig, expectedNumRecords); + final List readData = readValues(topic, consumerConfig, waitTime, expectedNumRecords); accumData.addAll(readData); + return accumData.size() >= expectedNumRecords; } };