KAFKA-3896: Fix KStream-KStream leftJoin in RepartitionIntegrationTest

The issue of transiently having duplicates is due to the bad design of the left join itself: in order to ignore the partial joined results such as `A:null`, it lets the producer to potentially send twice to source stream one and rely on all the following conditions to be true in order to pass the test:

1. `receiveMessages` happen to have fetched all the produced results and have committed offsets.
2. streams app happen to have completed sending all result data.
3. consumer used in `receiveMessages` will complete getting all messages in a single poll().

If any of the above is not true, the test fails.

Fixed this test to add a filter right after left join to filter out partial joined results. Minor cleanup on integration test utils.

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Damian Guy, Ewen Cheslack-Postava

Closes #2485 from guozhangwang/K3896-duplicate-join-results
This commit is contained in:
Guozhang Wang 2017-02-02 10:33:33 -08:00
parent 35cd008e5a
commit 7436d28a24
2 changed files with 24 additions and 29 deletions

View File

@ -32,6 +32,7 @@ import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.test.MockKeyValueMapper;
import org.apache.kafka.test.MockValueJoiner;
@ -143,7 +144,7 @@ public class KStreamRepartitionJoinTest {
verifyCorrectOutput(flatMapJoin);
verifyCorrectOutput(mapRhs);
verifyCorrectOutput(mapJoinJoin);
verifyLeftJoin(leftJoin);
verifyCorrectOutput(leftJoin);
}
private ExpectedOutputOnTopic mapStreamOneAndJoin() throws InterruptedException {
@ -232,6 +233,13 @@ public class KStreamRepartitionJoinTest {
Serdes.Integer(),
Serdes.Integer(),
Serdes.String())
.filterNot(new Predicate<Integer, String>() {
@Override
public boolean test(Integer key, String value) {
// filter not left-only join results
return value.substring(2).equals("null");
}
})
.to(Serdes.Integer(), Serdes.String(), outputTopic);
return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, outputTopic);
@ -268,7 +276,7 @@ public class KStreamRepartitionJoinTest {
}
private JoinWindows getJoinWindow() {
return (JoinWindows) JoinWindows.of(WINDOW_SIZE).until(3 * WINDOW_SIZE);
return JoinWindows.of(WINDOW_SIZE).until(3 * WINDOW_SIZE);
}
@ -282,7 +290,6 @@ public class KStreamRepartitionJoinTest {
}
}
private void verifyCorrectOutput(final ExpectedOutputOnTopic expectedOutputOnTopic)
throws InterruptedException {
assertThat(receiveMessages(new StringDeserializer(),
@ -291,16 +298,6 @@ public class KStreamRepartitionJoinTest {
is(expectedOutputOnTopic.expectedOutput));
}
private void verifyLeftJoin(final ExpectedOutputOnTopic expectedOutputOnTopic)
throws InterruptedException, ExecutionException {
final List<String> received = receiveMessages(new StringDeserializer(), expectedOutputOnTopic
.expectedOutput.size(), expectedOutputOnTopic.outputTopic);
if (!received.equals(expectedOutputOnTopic.expectedOutput)) {
produceToStreamOne();
verifyCorrectOutput(expectedOutputOnTopic.expectedOutput, expectedOutputOnTopic.outputTopic);
}
}
private void produceMessages()
throws ExecutionException, InterruptedException {
produceToStreamOne();
@ -380,13 +377,8 @@ public class KStreamRepartitionJoinTest {
numMessages,
60 * 1000);
Collections.sort(received);
return received;
}
private void verifyCorrectOutput(final List<String> expectedMessages,
final String topic) throws InterruptedException {
assertThat(receiveMessages(new StringDeserializer(), expectedMessages.size(), topic),
is(expectedMessages));
return received;
}
private void doJoin(final KStream<Integer, Integer> lhs,

View File

@ -61,12 +61,13 @@ public class IntegrationTestUtils {
*
* @param topic Kafka topic to read messages from
* @param consumerConfig Kafka consumer configuration
* @param waitTime Maximum wait time in milliseconds
* @param maxMessages Maximum number of messages to read via the consumer.
* @return The values retrieved via the consumer.
*/
public static <V> List<V> readValues(final String topic, final Properties consumerConfig, final int maxMessages) {
public static <V> List<V> readValues(final String topic, final Properties consumerConfig, final long waitTime, final int maxMessages) {
final List<V> returnList = new ArrayList<>();
final List<KeyValue<Object, V>> kvs = readKeyValues(topic, consumerConfig, maxMessages);
final List<KeyValue<Object, V>> kvs = readKeyValues(topic, consumerConfig, waitTime, maxMessages);
for (final KeyValue<?, V> kv : kvs) {
returnList.add(kv.value);
}
@ -79,10 +80,11 @@ public class IntegrationTestUtils {
*
* @param topic Kafka topic to read messages from
* @param consumerConfig Kafka consumer configuration
* @param waitTime Maximum wait time in milliseconds
* @return The KeyValue elements retrieved via the consumer.
*/
public static <K, V> List<KeyValue<K, V>> readKeyValues(final String topic, final Properties consumerConfig) {
return readKeyValues(topic, consumerConfig, UNLIMITED_MESSAGES);
public static <K, V> List<KeyValue<K, V>> readKeyValues(final String topic, final Properties consumerConfig, final long waitTime) {
return readKeyValues(topic, consumerConfig, waitTime, UNLIMITED_MESSAGES);
}
/**
@ -91,17 +93,17 @@ public class IntegrationTestUtils {
*
* @param topic Kafka topic to read messages from
* @param consumerConfig Kafka consumer configuration
* @param waitTime Maximum wait time in milliseconds
* @param maxMessages Maximum number of messages to read via the consumer
* @return The KeyValue elements retrieved via the consumer
*/
public static <K, V> List<KeyValue<K, V>> readKeyValues(final String topic, final Properties consumerConfig, final int maxMessages) {
public static <K, V> List<KeyValue<K, V>> readKeyValues(final String topic, final Properties consumerConfig, final long waitTime, final int maxMessages) {
final KafkaConsumer<K, V> consumer = new KafkaConsumer<>(consumerConfig);
consumer.subscribe(Collections.singletonList(topic));
final int pollIntervalMs = 100;
final int maxTotalPollTimeMs = 2000;
int totalPollTimeMs = 0;
final List<KeyValue<K, V>> consumedValues = new ArrayList<>();
while (totalPollTimeMs < maxTotalPollTimeMs && continueConsuming(consumedValues.size(), maxMessages)) {
int totalPollTimeMs = 0;
while (totalPollTimeMs < waitTime && continueConsuming(consumedValues.size(), maxMessages)) {
totalPollTimeMs += pollIntervalMs;
final ConsumerRecords<K, V> records = consumer.poll(pollIntervalMs);
for (final ConsumerRecord<K, V> record : records) {
@ -208,7 +210,7 @@ public class IntegrationTestUtils {
final TestCondition valuesRead = new TestCondition() {
@Override
public boolean conditionMet() {
final List<KeyValue<K, V>> readData = readKeyValues(topic, consumerConfig);
final List<KeyValue<K, V>> readData = readKeyValues(topic, consumerConfig, waitTime);
accumData.addAll(readData);
return accumData.size() >= expectedNumRecords;
}
@ -248,8 +250,9 @@ public class IntegrationTestUtils {
final TestCondition valuesRead = new TestCondition() {
@Override
public boolean conditionMet() {
final List<V> readData = readValues(topic, consumerConfig, expectedNumRecords);
final List<V> readData = readValues(topic, consumerConfig, waitTime, expectedNumRecords);
accumData.addAll(readData);
return accumData.size() >= expectedNumRecords;
}
};