From 2cbb6b860285846d34b9a2add2174e19ab6eebce Mon Sep 17 00:00:00 2001 From: bartdevylder Date: Mon, 9 Oct 2017 17:49:52 -0700 Subject: [PATCH] KAFKA-6026; Fix for indefinite wait in KafkaFutureImpl Author: bartdevylder Author: Bart De Vylder Reviewers: Colin P. Mccabe , Ismael Juma , Jason Gustafson Closes #4044 from bartdevylder/KAFKA-6026 --- .../apache/kafka/common/internals/KafkaFutureImpl.java | 4 ++-- .../java/org/apache/kafka/common/KafkaFutureTest.java | 8 ++++++++ 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java b/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java index 01355c66e7c..b81cbb8cd44 100644 --- a/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java +++ b/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java @@ -96,7 +96,7 @@ public class KafkaFutureImpl extends KafkaFuture { R await(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { long startMs = System.currentTimeMillis(); - long waitTimeMs = (unit.toMillis(timeout) > 0) ? unit.toMillis(timeout) : 1; + long waitTimeMs = unit.toMillis(timeout); long delta = 0; synchronized (this) { while (true) { @@ -104,7 +104,7 @@ public class KafkaFutureImpl extends KafkaFuture { wrapAndThrow(exception); if (done) return value; - if (delta > waitTimeMs) { + if (delta >= waitTimeMs) { throw new TimeoutException(); } this.wait(waitTimeMs - delta); diff --git a/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java b/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java index 7d29bc56a67..71f3c3c984d 100644 --- a/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java +++ b/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java @@ -26,6 +26,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertFalse; @@ -173,4 +174,11 @@ public class KafkaFutureTest { assertFalse(allFuture.isCompletedExceptionally()); allFuture.get(); } + + @Test(expected = TimeoutException.class) + public void testFutureTimeoutWithZeroWait() throws Exception { + final KafkaFutureImpl future = new KafkaFutureImpl<>(); + future.get(0, TimeUnit.MILLISECONDS); + } + }