MINOR: Stabilize flaky smoke system tests before KIP-91

This is a workaround until KIP-91 is merged. We tried increasing the timeout multiple times already but tests are still flaky.

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Bill Bejeck <bill@confluent.io>, Apurva Mehta <apurva@confluent.io>, Guozhang Wang <wangguoz@gmail.com>

Closes #4329 from mjsax/hotfix-system-tests
This commit is contained in:
Matthias J. Sax 2017-12-18 17:34:50 -08:00 committed by Guozhang Wang
parent 066bfc314c
commit 22f742cdd2
1 changed files with 44 additions and 11 deletions

View File

@ -27,6 +27,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Exit;
@ -155,6 +156,8 @@ public class SmokeTestDriver extends SmokeTestUtil {
int remaining = data.length; int remaining = data.length;
List<ProducerRecord<byte[], byte[]>> needRetry = new ArrayList<>();
while (remaining > 0) { while (remaining > 0) {
int index = rand.nextInt(remaining); int index = rand.nextInt(remaining);
String key = data[index].key; String key = data[index].key;
@ -168,29 +171,59 @@ public class SmokeTestDriver extends SmokeTestUtil {
ProducerRecord<byte[], byte[]> record = ProducerRecord<byte[], byte[]> record =
new ProducerRecord<>("data", stringSerde.serializer().serialize("", key), intSerde.serializer().serialize("", value)); new ProducerRecord<>("data", stringSerde.serializer().serialize("", key), intSerde.serializer().serialize("", value));
producer.send(record, new Callback() { producer.send(record, new TestCallback(record, needRetry));
@Override
public void onCompletion(final RecordMetadata metadata, final Exception exception) {
if (exception != null) {
exception.printStackTrace();
Exit.exit(1);
}
}
});
numRecordsProduced++; numRecordsProduced++;
allData.get(key).add(value); allData.get(key).add(value);
if (numRecordsProduced % 100 == 0) if (numRecordsProduced % 100 == 0)
System.out.println(numRecordsProduced + " records produced"); System.out.println(numRecordsProduced + " records produced");
Utils.sleep(2); Utils.sleep(2);
} }
} }
producer.flush();
int remainingRetries = 5;
while (!needRetry.isEmpty()) {
final List<ProducerRecord<byte[], byte[]>> needRetry2 = new ArrayList<>();
for (final ProducerRecord<byte[], byte[]> record : needRetry) {
producer.send(record, new TestCallback(record, needRetry2));
}
producer.flush();
needRetry = needRetry2;
if (--remainingRetries == 0 && !needRetry.isEmpty()) {
System.err.println("Failed to produce all records after multiple retries");
Exit.exit(1);
}
}
producer.close(); producer.close();
return Collections.unmodifiableMap(allData); return Collections.unmodifiableMap(allData);
} }
private static class TestCallback implements Callback {
private final ProducerRecord<byte[], byte[]> originalRecord;
private final List<ProducerRecord<byte[], byte[]>> needRetry;
TestCallback(final ProducerRecord<byte[], byte[]> originalRecord,
final List<ProducerRecord<byte[], byte[]>> needRetry) {
this.originalRecord = originalRecord;
this.needRetry = needRetry;
}
@Override
public void onCompletion(final RecordMetadata metadata, final Exception exception) {
if (exception != null) {
if (exception instanceof TimeoutException) {
needRetry.add(originalRecord);
} else {
exception.printStackTrace();
Exit.exit(1);
}
}
}
}
private static void shuffle(int[] data, int windowSize) { private static void shuffle(int[] data, int windowSize) {
Random rand = new Random(); Random rand = new Random();
for (int i = 0; i < data.length; i++) { for (int i = 0; i < data.length; i++) {