KAFKA-6026; Fix for indefinite wait in KafkaFutureImpl

Author: bartdevylder <bartdevylder@gmail.com>
Author: Bart De Vylder <bartdevylder@gmail.com>

Reviewers: Colin P. Mccabe <cmccabe@confluent.io>, Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>

Closes #4044 from bartdevylder/KAFKA-6026
This commit is contained in:
bartdevylder 2017-10-09 17:49:52 -07:00 committed by Ismael Juma
parent bb27215cea
commit 2cbb6b8602
2 changed files with 10 additions and 2 deletions

View File

@ -96,7 +96,7 @@ public class KafkaFutureImpl<T> extends KafkaFuture<T> {
R await(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
long startMs = System.currentTimeMillis();
long waitTimeMs = (unit.toMillis(timeout) > 0) ? unit.toMillis(timeout) : 1;
long waitTimeMs = unit.toMillis(timeout);
long delta = 0;
synchronized (this) {
while (true) {
@ -104,7 +104,7 @@ public class KafkaFutureImpl<T> extends KafkaFuture<T> {
wrapAndThrow(exception);
if (done)
return value;
if (delta > waitTimeMs) {
if (delta >= waitTimeMs) {
throw new TimeoutException();
}
this.wait(waitTimeMs - delta);

View File

@ -26,6 +26,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
@ -173,4 +174,11 @@ public class KafkaFutureTest {
assertFalse(allFuture.isCompletedExceptionally());
allFuture.get();
}
@Test(expected = TimeoutException.class)
public void testFutureTimeoutWithZeroWait() throws Exception {
final KafkaFutureImpl<String> future = new KafkaFutureImpl<>();
future.get(0, TimeUnit.MILLISECONDS);
}
}