mirror of https://github.com/apache/kafka.git
MINOR: Adjust parameter ordering of `waitForCondition` and `retryOnExceptionWithTimeout` (#10759)
New parameters in overloaded methods should appear later apart from lambdas that should always be last.
This commit is contained in:
parent
a02b19cb77
commit
e4b3a3cdeb
|
@ -5773,7 +5773,7 @@ public class KafkaAdminClientTest {
|
||||||
TestUtils.waitForCondition(() -> {
|
TestUtils.waitForCondition(() -> {
|
||||||
time.sleep(1);
|
time.sleep(1);
|
||||||
return disconnectFuture.isDone();
|
return disconnectFuture.isDone();
|
||||||
}, 1, 5000, () -> "Timed out waiting for expected disconnect");
|
}, 5000, 1, () -> "Timed out waiting for expected disconnect");
|
||||||
assertFalse(disconnectFuture.isCompletedExceptionally());
|
assertFalse(disconnectFuture.isCompletedExceptionally());
|
||||||
assertFalse(result.future.isDone());
|
assertFalse(result.future.isDone());
|
||||||
TestUtils.waitForCondition(env.kafkaClient()::hasInFlightRequests,
|
TestUtils.waitForCondition(env.kafkaClient()::hasInFlightRequests,
|
||||||
|
|
|
@ -298,7 +298,7 @@ public class TestUtils {
|
||||||
* avoid transient failures due to slow or overloaded machines.
|
* avoid transient failures due to slow or overloaded machines.
|
||||||
*/
|
*/
|
||||||
public static void waitForCondition(final TestCondition testCondition, final long maxWaitMs, Supplier<String> conditionDetailsSupplier) throws InterruptedException {
|
public static void waitForCondition(final TestCondition testCondition, final long maxWaitMs, Supplier<String> conditionDetailsSupplier) throws InterruptedException {
|
||||||
waitForCondition(testCondition, DEFAULT_POLL_INTERVAL_MS, maxWaitMs, conditionDetailsSupplier);
|
waitForCondition(testCondition, maxWaitMs, DEFAULT_POLL_INTERVAL_MS, conditionDetailsSupplier);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -310,11 +310,11 @@ public class TestUtils {
|
||||||
*/
|
*/
|
||||||
public static void waitForCondition(
|
public static void waitForCondition(
|
||||||
final TestCondition testCondition,
|
final TestCondition testCondition,
|
||||||
final long pollIntervalMs,
|
|
||||||
final long maxWaitMs,
|
final long maxWaitMs,
|
||||||
|
final long pollIntervalMs,
|
||||||
Supplier<String> conditionDetailsSupplier
|
Supplier<String> conditionDetailsSupplier
|
||||||
) throws InterruptedException {
|
) throws InterruptedException {
|
||||||
retryOnExceptionWithTimeout(pollIntervalMs, maxWaitMs, () -> {
|
retryOnExceptionWithTimeout(maxWaitMs, pollIntervalMs, () -> {
|
||||||
String conditionDetailsSupplied = conditionDetailsSupplier != null ? conditionDetailsSupplier.get() : null;
|
String conditionDetailsSupplied = conditionDetailsSupplier != null ? conditionDetailsSupplier.get() : null;
|
||||||
String conditionDetails = conditionDetailsSupplied != null ? conditionDetailsSupplied : "";
|
String conditionDetails = conditionDetailsSupplied != null ? conditionDetailsSupplied : "";
|
||||||
assertTrue(testCondition.conditionMet(),
|
assertTrue(testCondition.conditionMet(),
|
||||||
|
@ -333,7 +333,7 @@ public class TestUtils {
|
||||||
*/
|
*/
|
||||||
public static void retryOnExceptionWithTimeout(final long timeoutMs,
|
public static void retryOnExceptionWithTimeout(final long timeoutMs,
|
||||||
final ValuelessCallable runnable) throws InterruptedException {
|
final ValuelessCallable runnable) throws InterruptedException {
|
||||||
retryOnExceptionWithTimeout(DEFAULT_POLL_INTERVAL_MS, timeoutMs, runnable);
|
retryOnExceptionWithTimeout(timeoutMs, DEFAULT_POLL_INTERVAL_MS, runnable);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -345,7 +345,7 @@ public class TestUtils {
|
||||||
* @throws InterruptedException if the current thread is interrupted while waiting for {@code runnable} to complete successfully.
|
* @throws InterruptedException if the current thread is interrupted while waiting for {@code runnable} to complete successfully.
|
||||||
*/
|
*/
|
||||||
public static void retryOnExceptionWithTimeout(final ValuelessCallable runnable) throws InterruptedException {
|
public static void retryOnExceptionWithTimeout(final ValuelessCallable runnable) throws InterruptedException {
|
||||||
retryOnExceptionWithTimeout(DEFAULT_POLL_INTERVAL_MS, DEFAULT_MAX_WAIT_MS, runnable);
|
retryOnExceptionWithTimeout(DEFAULT_MAX_WAIT_MS, DEFAULT_POLL_INTERVAL_MS, runnable);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -353,13 +353,13 @@ public class TestUtils {
|
||||||
* {@link AssertionError}s, or for the given timeout to expire. If the timeout expires then the
|
* {@link AssertionError}s, or for the given timeout to expire. If the timeout expires then the
|
||||||
* last exception or assertion failure will be thrown thus providing context for the failure.
|
* last exception or assertion failure will be thrown thus providing context for the failure.
|
||||||
*
|
*
|
||||||
* @param pollIntervalMs the interval in milliseconds to wait between invoking {@code runnable}.
|
|
||||||
* @param timeoutMs the total time in milliseconds to wait for {@code runnable} to complete successfully.
|
* @param timeoutMs the total time in milliseconds to wait for {@code runnable} to complete successfully.
|
||||||
|
* @param pollIntervalMs the interval in milliseconds to wait between invoking {@code runnable}.
|
||||||
* @param runnable the code to attempt to execute successfully.
|
* @param runnable the code to attempt to execute successfully.
|
||||||
* @throws InterruptedException if the current thread is interrupted while waiting for {@code runnable} to complete successfully.
|
* @throws InterruptedException if the current thread is interrupted while waiting for {@code runnable} to complete successfully.
|
||||||
*/
|
*/
|
||||||
public static void retryOnExceptionWithTimeout(final long pollIntervalMs,
|
public static void retryOnExceptionWithTimeout(final long timeoutMs,
|
||||||
final long timeoutMs,
|
final long pollIntervalMs,
|
||||||
final ValuelessCallable runnable) throws InterruptedException {
|
final ValuelessCallable runnable) throws InterruptedException {
|
||||||
final long expectedEnd = System.currentTimeMillis() + timeoutMs;
|
final long expectedEnd = System.currentTimeMillis() + timeoutMs;
|
||||||
|
|
||||||
|
|
|
@ -53,7 +53,7 @@ public class QuorumControllerTestEnv implements AutoCloseable {
|
||||||
|
|
||||||
QuorumController activeController() throws InterruptedException {
|
QuorumController activeController() throws InterruptedException {
|
||||||
AtomicReference<QuorumController> value = new AtomicReference<>(null);
|
AtomicReference<QuorumController> value = new AtomicReference<>(null);
|
||||||
TestUtils.retryOnExceptionWithTimeout(3, 20000, () -> {
|
TestUtils.retryOnExceptionWithTimeout(20000, 3, () -> {
|
||||||
QuorumController activeController = null;
|
QuorumController activeController = null;
|
||||||
for (QuorumController controller : controllers) {
|
for (QuorumController controller : controllers) {
|
||||||
if (controller.isActive()) {
|
if (controller.isActive()) {
|
||||||
|
|
|
@ -96,7 +96,7 @@ public class LocalLogManagerTest {
|
||||||
|
|
||||||
private static void waitForLastCommittedOffset(long targetOffset,
|
private static void waitForLastCommittedOffset(long targetOffset,
|
||||||
LocalLogManager logManager) throws InterruptedException {
|
LocalLogManager logManager) throws InterruptedException {
|
||||||
TestUtils.retryOnExceptionWithTimeout(3, 20000, () -> {
|
TestUtils.retryOnExceptionWithTimeout(20000, 3, () -> {
|
||||||
MockMetaLogManagerListener listener =
|
MockMetaLogManagerListener listener =
|
||||||
(MockMetaLogManagerListener) logManager.listeners().get(0);
|
(MockMetaLogManagerListener) logManager.listeners().get(0);
|
||||||
long highestOffset = -1;
|
long highestOffset = -1;
|
||||||
|
|
|
@ -108,7 +108,7 @@ public class LocalLogManagerTestEnv implements AutoCloseable {
|
||||||
|
|
||||||
LeaderAndEpoch waitForLeader() throws InterruptedException {
|
LeaderAndEpoch waitForLeader() throws InterruptedException {
|
||||||
AtomicReference<LeaderAndEpoch> value = new AtomicReference<>(null);
|
AtomicReference<LeaderAndEpoch> value = new AtomicReference<>(null);
|
||||||
TestUtils.retryOnExceptionWithTimeout(3, 20000, () -> {
|
TestUtils.retryOnExceptionWithTimeout(20000, 3, () -> {
|
||||||
LeaderAndEpoch result = null;
|
LeaderAndEpoch result = null;
|
||||||
for (LocalLogManager logManager : logManagers) {
|
for (LocalLogManager logManager : logManagers) {
|
||||||
LeaderAndEpoch leader = logManager.leaderAndEpoch();
|
LeaderAndEpoch leader = logManager.leaderAndEpoch();
|
||||||
|
|
|
@ -147,7 +147,7 @@ public class OptimizedKTableIntegrationTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
final ReadOnlyKeyValueStore<Integer, Integer> newActiveStore = kafkaStreams1WasFirstActive ? store2 : store1;
|
final ReadOnlyKeyValueStore<Integer, Integer> newActiveStore = kafkaStreams1WasFirstActive ? store2 : store1;
|
||||||
TestUtils.retryOnExceptionWithTimeout(100, 60 * 1000, () -> {
|
TestUtils.retryOnExceptionWithTimeout(60 * 1000, 100, () -> {
|
||||||
// Assert that after failover we have recovered to the last store write
|
// Assert that after failover we have recovered to the last store write
|
||||||
assertThat(newActiveStore.get(key), is(equalTo(batch1NumMessages - 1)));
|
assertThat(newActiveStore.get(key), is(equalTo(batch1NumMessages - 1)));
|
||||||
});
|
});
|
||||||
|
@ -159,7 +159,7 @@ public class OptimizedKTableIntegrationTest {
|
||||||
// Assert that all messages in the second batch were processed in a timely manner
|
// Assert that all messages in the second batch were processed in a timely manner
|
||||||
assertThat(semaphore.tryAcquire(batch2NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true)));
|
assertThat(semaphore.tryAcquire(batch2NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true)));
|
||||||
|
|
||||||
TestUtils.retryOnExceptionWithTimeout(100, 60 * 1000, () -> {
|
TestUtils.retryOnExceptionWithTimeout(60 * 1000, 100, () -> {
|
||||||
// Assert that the current value in store reflects all messages being processed
|
// Assert that the current value in store reflects all messages being processed
|
||||||
assertThat(newActiveStore.get(key), is(equalTo(totalNumMessages - 1)));
|
assertThat(newActiveStore.get(key), is(equalTo(totalNumMessages - 1)));
|
||||||
});
|
});
|
||||||
|
|
Loading…
Reference in New Issue