From e4b3a3cdeb295fdd4c4434ec1a7ee77b66553ae0 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Thu, 27 May 2021 06:25:00 -0700 Subject: [PATCH] MINOR: Adjust parameter ordering of `waitForCondition` and `retryOnExceptionWithTimeout` (#10759) New parameters in overloaded methods should appear later apart from lambdas that should always be last. --- .../clients/admin/KafkaAdminClientTest.java | 2 +- .../java/org/apache/kafka/test/TestUtils.java | 16 ++++++++-------- .../controller/QuorumControllerTestEnv.java | 2 +- .../kafka/metalog/LocalLogManagerTest.java | 2 +- .../kafka/metalog/LocalLogManagerTestEnv.java | 2 +- .../OptimizedKTableIntegrationTest.java | 4 ++-- 6 files changed, 14 insertions(+), 14 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 4cad255ba02..8c987a12d02 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -5773,7 +5773,7 @@ public class KafkaAdminClientTest { TestUtils.waitForCondition(() -> { time.sleep(1); return disconnectFuture.isDone(); - }, 1, 5000, () -> "Timed out waiting for expected disconnect"); + }, 5000, 1, () -> "Timed out waiting for expected disconnect"); assertFalse(disconnectFuture.isCompletedExceptionally()); assertFalse(result.future.isDone()); TestUtils.waitForCondition(env.kafkaClient()::hasInFlightRequests, diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java index 5e34ae8c5b3..3c819befa5f 100644 --- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java +++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java @@ -298,7 +298,7 @@ public class TestUtils { * avoid transient failures due to slow or overloaded machines. */ public static void waitForCondition(final TestCondition testCondition, final long maxWaitMs, Supplier conditionDetailsSupplier) throws InterruptedException { - waitForCondition(testCondition, DEFAULT_POLL_INTERVAL_MS, maxWaitMs, conditionDetailsSupplier); + waitForCondition(testCondition, maxWaitMs, DEFAULT_POLL_INTERVAL_MS, conditionDetailsSupplier); } /** @@ -310,11 +310,11 @@ public class TestUtils { */ public static void waitForCondition( final TestCondition testCondition, - final long pollIntervalMs, final long maxWaitMs, + final long pollIntervalMs, Supplier conditionDetailsSupplier ) throws InterruptedException { - retryOnExceptionWithTimeout(pollIntervalMs, maxWaitMs, () -> { + retryOnExceptionWithTimeout(maxWaitMs, pollIntervalMs, () -> { String conditionDetailsSupplied = conditionDetailsSupplier != null ? conditionDetailsSupplier.get() : null; String conditionDetails = conditionDetailsSupplied != null ? conditionDetailsSupplied : ""; assertTrue(testCondition.conditionMet(), @@ -333,7 +333,7 @@ public class TestUtils { */ public static void retryOnExceptionWithTimeout(final long timeoutMs, final ValuelessCallable runnable) throws InterruptedException { - retryOnExceptionWithTimeout(DEFAULT_POLL_INTERVAL_MS, timeoutMs, runnable); + retryOnExceptionWithTimeout(timeoutMs, DEFAULT_POLL_INTERVAL_MS, runnable); } /** @@ -345,7 +345,7 @@ public class TestUtils { * @throws InterruptedException if the current thread is interrupted while waiting for {@code runnable} to complete successfully. */ public static void retryOnExceptionWithTimeout(final ValuelessCallable runnable) throws InterruptedException { - retryOnExceptionWithTimeout(DEFAULT_POLL_INTERVAL_MS, DEFAULT_MAX_WAIT_MS, runnable); + retryOnExceptionWithTimeout(DEFAULT_MAX_WAIT_MS, DEFAULT_POLL_INTERVAL_MS, runnable); } /** @@ -353,13 +353,13 @@ public class TestUtils { * {@link AssertionError}s, or for the given timeout to expire. If the timeout expires then the * last exception or assertion failure will be thrown thus providing context for the failure. * - * @param pollIntervalMs the interval in milliseconds to wait between invoking {@code runnable}. * @param timeoutMs the total time in milliseconds to wait for {@code runnable} to complete successfully. + * @param pollIntervalMs the interval in milliseconds to wait between invoking {@code runnable}. * @param runnable the code to attempt to execute successfully. * @throws InterruptedException if the current thread is interrupted while waiting for {@code runnable} to complete successfully. */ - public static void retryOnExceptionWithTimeout(final long pollIntervalMs, - final long timeoutMs, + public static void retryOnExceptionWithTimeout(final long timeoutMs, + final long pollIntervalMs, final ValuelessCallable runnable) throws InterruptedException { final long expectedEnd = System.currentTimeMillis() + timeoutMs; diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java index db3acfba2a7..da226997402 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java +++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java @@ -53,7 +53,7 @@ public class QuorumControllerTestEnv implements AutoCloseable { QuorumController activeController() throws InterruptedException { AtomicReference value = new AtomicReference<>(null); - TestUtils.retryOnExceptionWithTimeout(3, 20000, () -> { + TestUtils.retryOnExceptionWithTimeout(20000, 3, () -> { QuorumController activeController = null; for (QuorumController controller : controllers) { if (controller.isActive()) { diff --git a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java index 4d4e5101da9..7b5e26d79f6 100644 --- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java @@ -96,7 +96,7 @@ public class LocalLogManagerTest { private static void waitForLastCommittedOffset(long targetOffset, LocalLogManager logManager) throws InterruptedException { - TestUtils.retryOnExceptionWithTimeout(3, 20000, () -> { + TestUtils.retryOnExceptionWithTimeout(20000, 3, () -> { MockMetaLogManagerListener listener = (MockMetaLogManagerListener) logManager.listeners().get(0); long highestOffset = -1; diff --git a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java index 9282f42237d..4ff350e41a8 100644 --- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java +++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java @@ -108,7 +108,7 @@ public class LocalLogManagerTestEnv implements AutoCloseable { LeaderAndEpoch waitForLeader() throws InterruptedException { AtomicReference value = new AtomicReference<>(null); - TestUtils.retryOnExceptionWithTimeout(3, 20000, () -> { + TestUtils.retryOnExceptionWithTimeout(20000, 3, () -> { LeaderAndEpoch result = null; for (LocalLogManager logManager : logManagers) { LeaderAndEpoch leader = logManager.leaderAndEpoch(); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java index 7f7eabb1e1d..44744cd3e2b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java @@ -147,7 +147,7 @@ public class OptimizedKTableIntegrationTest { } final ReadOnlyKeyValueStore newActiveStore = kafkaStreams1WasFirstActive ? store2 : store1; - TestUtils.retryOnExceptionWithTimeout(100, 60 * 1000, () -> { + TestUtils.retryOnExceptionWithTimeout(60 * 1000, 100, () -> { // Assert that after failover we have recovered to the last store write assertThat(newActiveStore.get(key), is(equalTo(batch1NumMessages - 1))); }); @@ -159,7 +159,7 @@ public class OptimizedKTableIntegrationTest { // Assert that all messages in the second batch were processed in a timely manner assertThat(semaphore.tryAcquire(batch2NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true))); - TestUtils.retryOnExceptionWithTimeout(100, 60 * 1000, () -> { + TestUtils.retryOnExceptionWithTimeout(60 * 1000, 100, () -> { // Assert that the current value in store reflects all messages being processed assertThat(newActiveStore.get(key), is(equalTo(totalNumMessages - 1))); });