MINOR: Provide better messages when waiting for a condition in test (#7488)

Reviewers: Boyang Chen <boyang@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>
This commit is contained in:
Chris Pettitt 2019-10-15 18:18:58 -06:00 committed by Matthias J. Sax
parent bcdc6336fc
commit 9c8ab5ce10
5 changed files with 128 additions and 59 deletions

View File

@ -1274,6 +1274,7 @@ project(':streams:streams-scala') {
testCompile libs.junit
testCompile libs.scalatest
testCompile libs.easymock
testCompile libs.hamcrest
testRuntime libs.slf4jlog4j
}

View File

@ -62,6 +62,7 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static java.util.Arrays.asList;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThrows;
@ -83,6 +84,7 @@ public class TestUtils {
/* A consistent random number generator to make tests repeatable */
public static final Random SEEDED_RANDOM = new Random(192348092834L);
public static final Random RANDOM = new Random();
public static final long DEFAULT_POLL_INTERVAL_MS = 100;
public static final long DEFAULT_MAX_WAIT_MS = 15000;
public static Cluster singletonCluster() {
@ -361,20 +363,69 @@ public class TestUtils {
* avoid transient failures due to slow or overloaded machines.
*/
public static void waitForCondition(final TestCondition testCondition, final long maxWaitMs, Supplier<String> conditionDetailsSupplier) throws InterruptedException {
final long startTime = System.currentTimeMillis();
boolean testConditionMet;
while (!(testConditionMet = testCondition.conditionMet()) && ((System.currentTimeMillis() - startTime) < maxWaitMs)) {
Thread.sleep(Math.min(maxWaitMs, 100L));
}
// don't re-evaluate testCondition.conditionMet() because this might slow down some tests significantly (this
// could be avoided by making the implementations more robust, but we have a large number of such implementations
// and it's easier to simply avoid the issue altogether)
if (!testConditionMet) {
String conditionDetailsSupplied = conditionDetailsSupplier != null ? conditionDetailsSupplier.get() : null;
String conditionDetails = conditionDetailsSupplied != null ? conditionDetailsSupplied : "";
throw new AssertionError("Condition not met within timeout " + maxWaitMs + ". " + conditionDetails);
retryOnExceptionWithTimeout(maxWaitMs, () -> {
assertThat("Condition not met within timeout " + maxWaitMs + ". " + conditionDetails,
testCondition.conditionMet());
});
}
/**
* Wait for the given runnable to complete successfully, i.e. throw now {@link Exception}s or
* {@link AssertionError}s, or for the given timeout to expire. If the timeout expires then the
* last exception or assertion failure will be thrown thus providing context for the failure.
*
* @param timeoutMs the total time in milliseconds to wait for {@code runnable} to complete successfully.
* @param runnable the code to attempt to execute successfully.
* @throws InterruptedException if the current thread is interrupted while waiting for {@code runnable} to complete successfully.
*/
public static void retryOnExceptionWithTimeout(final long timeoutMs,
final ValuelessCallable runnable) throws InterruptedException {
retryOnExceptionWithTimeout(DEFAULT_POLL_INTERVAL_MS, timeoutMs, runnable);
}
/**
* Wait for the given runnable to complete successfully, i.e. throw now {@link Exception}s or
* {@link AssertionError}s, or for the default timeout to expire. If the timeout expires then the
* last exception or assertion failure will be thrown thus providing context for the failure.
*
* @param runnable the code to attempt to execute successfully.
* @throws InterruptedException if the current thread is interrupted while waiting for {@code runnable} to complete successfully.
*/
public static void retryOnExceptionWithTimeout(final ValuelessCallable runnable) throws InterruptedException {
retryOnExceptionWithTimeout(DEFAULT_POLL_INTERVAL_MS, DEFAULT_MAX_WAIT_MS, runnable);
}
/**
* Wait for the given runnable to complete successfully, i.e. throw now {@link Exception}s or
* {@link AssertionError}s, or for the given timeout to expire. If the timeout expires then the
* last exception or assertion failure will be thrown thus providing context for the failure.
*
* @param pollIntervalMs the interval in milliseconds to wait between invoking {@code runnable}.
* @param timeoutMs the total time in milliseconds to wait for {@code runnable} to complete successfully.
* @param runnable the code to attempt to execute successfully.
* @throws InterruptedException if the current thread is interrupted while waiting for {@code runnable} to complete successfully.
*/
public static void retryOnExceptionWithTimeout(final long pollIntervalMs,
final long timeoutMs,
final ValuelessCallable runnable) throws InterruptedException {
final long expectedEnd = System.currentTimeMillis() + timeoutMs;
while (true) {
try {
runnable.call();
return;
} catch (final AssertionError t) {
if (expectedEnd <= System.currentTimeMillis()) {
throw t;
}
} catch (final Exception e) {
if (expectedEnd <= System.currentTimeMillis()) {
throw new AssertionError(e);
}
}
Thread.sleep(Math.min(pollIntervalMs, timeoutMs));
}
}

View File

@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.test;
/**
* Like a {@link Runnable} that allows exceptions to be thrown or a {@link java.util.concurrent.Callable}
* that does not return a value.
*/
public interface ValuelessCallable {
void call() throws Exception;
}

View File

@ -189,7 +189,7 @@ public class OptimizedKTableIntegrationTest {
final ReadOnlyKeyValueStore<Integer, Integer> newActiveStore =
kafkaStreams1WasFirstActive ? store2 : store1;
retryOnExceptionWithTimeout(100, 60 * 1000, TimeUnit.MILLISECONDS, () -> {
TestUtils.retryOnExceptionWithTimeout(100, 60 * 1000, () -> {
// Assert that after failover we have recovered to the last store write
assertThat(newActiveStore.get(key), is(equalTo(batch1NumMessages - 1)));
});
@ -226,25 +226,6 @@ public class OptimizedKTableIntegrationTest {
mockTime);
}
private void retryOnExceptionWithTimeout(final long pollInterval,
final long timeout,
final TimeUnit timeUnit,
final Runnable runnable) throws InterruptedException {
final long expectedEnd = System.currentTimeMillis() + timeUnit.toMillis(timeout);
while (true) {
try {
runnable.run();
return;
} catch (final Throwable t) {
if (expectedEnd <= System.currentTimeMillis()) {
throw new AssertionError(t);
}
Thread.sleep(timeUnit.toMillis(pollInterval));
}
}
}
private void waitForKafkaStreamssToEnterRunningState(final Collection<KafkaStreams> kafkaStreamss,
final long time,
final TimeUnit timeUnit) throws InterruptedException {

View File

@ -65,8 +65,11 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import static org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
/**
* Utility functions to make integration testing more convenient.
@ -449,15 +452,14 @@ public class IntegrationTestUtils {
final int expectedNumRecords,
final long waitTime) throws InterruptedException {
final List<ConsumerRecord<K, V>> accumData = new ArrayList<>();
final String reason = String.format("Did not receive all %d records from topic %s within %d ms", expectedNumRecords, topic, waitTime);
try (final Consumer<K, V> consumer = createConsumer(consumerConfig)) {
final TestCondition valuesRead = () -> {
retryOnExceptionWithTimeout(waitTime, () -> {
final List<ConsumerRecord<K, V>> readData =
readRecords(topic, consumer, waitTime, expectedNumRecords);
accumData.addAll(readData);
return accumData.size() >= expectedNumRecords;
};
final String conditionDetails = "Did not receive all " + expectedNumRecords + " records from topic " + topic;
TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails);
assertThat(reason, accumData.size(), is(greaterThanOrEqualTo(expectedNumRecords)));
});
}
return accumData;
}
@ -495,15 +497,14 @@ public class IntegrationTestUtils {
final int expectedNumRecords,
final long waitTime) throws InterruptedException {
final List<KeyValue<K, V>> accumData = new ArrayList<>();
final String reason = String.format("Did not receive all %d records from topic %s within %d ms", expectedNumRecords, topic, waitTime);
try (final Consumer<K, V> consumer = createConsumer(consumerConfig)) {
final TestCondition valuesRead = () -> {
retryOnExceptionWithTimeout(waitTime, () -> {
final List<KeyValue<K, V>> readData =
readKeyValues(topic, consumer, waitTime, expectedNumRecords);
accumData.addAll(readData);
return accumData.size() >= expectedNumRecords;
};
final String conditionDetails = "Did not receive all " + expectedNumRecords + " records from topic " + topic;
TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails);
assertThat(reason, accumData.size(), is(greaterThanOrEqualTo(expectedNumRecords)));
});
}
return accumData;
}
@ -524,15 +525,14 @@ public class IntegrationTestUtils {
final int expectedNumRecords,
final long waitTime) throws InterruptedException {
final List<KeyValueTimestamp<K, V>> accumData = new ArrayList<>();
final String reason = String.format("Did not receive all %d records from topic %s within %d ms", expectedNumRecords, topic, waitTime);
try (final Consumer<K, V> consumer = createConsumer(consumerConfig)) {
final TestCondition valuesRead = () -> {
retryOnExceptionWithTimeout(waitTime, () -> {
final List<KeyValueTimestamp<K, V>> readData =
readKeyValuesWithTimestamp(topic, consumer, waitTime, expectedNumRecords);
accumData.addAll(readData);
return accumData.size() >= expectedNumRecords;
};
final String conditionDetails = "Did not receive all " + expectedNumRecords + " records from topic " + topic;
TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails);
assertThat(reason, accumData.size(), is(greaterThanOrEqualTo(expectedNumRecords)));
});
}
return accumData;
}
@ -671,15 +671,14 @@ public class IntegrationTestUtils {
final int expectedNumRecords,
final long waitTime) throws InterruptedException {
final List<V> accumData = new ArrayList<>();
final String reason = String.format("Did not receive all %d records from topic %s within %d ms", expectedNumRecords, topic, waitTime);
try (final Consumer<Object, V> consumer = createConsumer(consumerConfig)) {
final TestCondition valuesRead = () -> {
retryOnExceptionWithTimeout(waitTime, () -> {
final List<V> readData =
readValues(topic, consumer, waitTime, expectedNumRecords);
accumData.addAll(readData);
return accumData.size() >= expectedNumRecords;
};
final String conditionDetails = "Did not receive all " + expectedNumRecords + " records from topic " + topic;
TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails);
assertThat(reason, accumData.size(), is(greaterThanOrEqualTo(expectedNumRecords)));
});
}
return accumData;
}
@ -702,22 +701,34 @@ public class IntegrationTestUtils {
final String topic,
final int partition,
final long timeout) throws InterruptedException {
TestUtils.waitForCondition(() -> {
final String baseReason = String.format("Metadata for topic=%s partition=%d was not propagated to all brokers within %d ms. ",
topic, partition, timeout);
retryOnExceptionWithTimeout(timeout, () -> {
final List<KafkaServer> emptyPartitionInfos = new ArrayList<>();
final List<KafkaServer> invalidBrokerIds = new ArrayList<>();
for (final KafkaServer server : servers) {
final MetadataCache metadataCache = server.dataPlaneRequestProcessor().metadataCache();
final Option<UpdateMetadataPartitionState> partitionInfo =
metadataCache.getPartitionInfo(topic, partition);
if (partitionInfo.isEmpty()) {
return false;
emptyPartitionInfos.add(server);
continue;
}
final UpdateMetadataPartitionState metadataPartitionState = partitionInfo.get();
if (!Request.isValidBrokerId(metadataPartitionState.leader())) {
return false;
invalidBrokerIds.add(server);
continue;
}
}
return true;
}, timeout, "metadata for topic=" + topic + " partition=" + partition + " not propagated to all brokers");
final String reason = baseReason + ". Brokers without partition info: " + emptyPartitionInfos +
". Brokers with invalid broker id for partition leader: " + invalidBrokerIds;
assertThat(reason, emptyPartitionInfos.isEmpty() && invalidBrokerIds.isEmpty());
});
}
public static <K, V> void verifyKeyValueTimestamps(final Properties consumerConfig,