mirror of https://github.com/apache/kafka.git
MINOR: Catch InvocationTargetException explicitly and propagate underlying cause (#12230)
Catch InvocationTargetException explicitly and propagate underlying cause Reviewers: Ismael Juma <mlists@juma.me.uk>, Matthew de Detrich <mdedetrich@gmail.com>, Kvicii, Luke Chen <showuon@gmail.com>
This commit is contained in:
parent
add4ca6c7f
commit
9aef992118
|
|
@ -21,6 +21,7 @@ import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.lang.reflect.Constructor;
|
import java.lang.reflect.Constructor;
|
||||||
import java.lang.reflect.InvocationHandler;
|
import java.lang.reflect.InvocationHandler;
|
||||||
|
import java.lang.reflect.InvocationTargetException;
|
||||||
import java.lang.reflect.Method;
|
import java.lang.reflect.Method;
|
||||||
import java.lang.reflect.Proxy;
|
import java.lang.reflect.Proxy;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
|
@ -75,8 +76,12 @@ public class LoggingSignalHandler {
|
||||||
private Object createSignalHandler(final Map<String, Object> jvmSignalHandlers) {
|
private Object createSignalHandler(final Map<String, Object> jvmSignalHandlers) {
|
||||||
InvocationHandler invocationHandler = new InvocationHandler() {
|
InvocationHandler invocationHandler = new InvocationHandler() {
|
||||||
|
|
||||||
private String getName(Object signal) throws ReflectiveOperationException {
|
private String getName(Object signal) throws Throwable {
|
||||||
return (String) signalGetNameMethod.invoke(signal);
|
try {
|
||||||
|
return (String) signalGetNameMethod.invoke(signal);
|
||||||
|
} catch (InvocationTargetException e) {
|
||||||
|
throw e.getCause();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void handle(Object signalHandler, Object signal) throws ReflectiveOperationException {
|
private void handle(Object signalHandler, Object signal) throws ReflectiveOperationException {
|
||||||
|
|
|
||||||
|
|
@ -463,8 +463,7 @@ public final class Utils {
|
||||||
throw new ClassNotFoundException(String.format("Unable to access " +
|
throw new ClassNotFoundException(String.format("Unable to access " +
|
||||||
"constructor of %s", className), e);
|
"constructor of %s", className), e);
|
||||||
} catch (InvocationTargetException e) {
|
} catch (InvocationTargetException e) {
|
||||||
throw new ClassNotFoundException(String.format("Unable to invoke " +
|
throw new KafkaException(String.format("The constructor of %s threw an exception", className), e.getCause());
|
||||||
"constructor of %s", className), e);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -122,6 +122,14 @@ public class KafkaFutureTest {
|
||||||
assertEquals(CancellationException.class, cancellationException.getClass());
|
assertEquals(CancellationException.class, cancellationException.getClass());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Object invokeOrThrow(final Method method, final Object obj, final Object... args) throws Throwable {
|
||||||
|
try {
|
||||||
|
return method.invoke(obj, args);
|
||||||
|
} catch (InvocationTargetException e) {
|
||||||
|
throw e.getCause();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCompleteFutures() throws Exception {
|
public void testCompleteFutures() throws Exception {
|
||||||
KafkaFutureImpl<Integer> future123 = new KafkaFutureImpl<>();
|
KafkaFutureImpl<Integer> future123 = new KafkaFutureImpl<>();
|
||||||
|
|
@ -591,7 +599,7 @@ public class KafkaFutureTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public void testLeakCompletableFuture() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
|
public void testLeakCompletableFuture() throws Throwable {
|
||||||
final KafkaFutureImpl<String> kfut = new KafkaFutureImpl<>();
|
final KafkaFutureImpl<String> kfut = new KafkaFutureImpl<>();
|
||||||
CompletableFuture<String> comfut = kfut.toCompletionStage().toCompletableFuture();
|
CompletableFuture<String> comfut = kfut.toCompletionStage().toCompletableFuture();
|
||||||
assertThrows(UnsupportedOperationException.class, () -> comfut.complete(""));
|
assertThrows(UnsupportedOperationException.class, () -> comfut.complete(""));
|
||||||
|
|
@ -600,44 +608,20 @@ public class KafkaFutureTest {
|
||||||
// so test reflectively
|
// so test reflectively
|
||||||
if (Java.IS_JAVA9_COMPATIBLE) {
|
if (Java.IS_JAVA9_COMPATIBLE) {
|
||||||
Method completeOnTimeout = CompletableFuture.class.getDeclaredMethod("completeOnTimeout", Object.class, Long.TYPE, TimeUnit.class);
|
Method completeOnTimeout = CompletableFuture.class.getDeclaredMethod("completeOnTimeout", Object.class, Long.TYPE, TimeUnit.class);
|
||||||
assertThrows(UnsupportedOperationException.class, () -> {
|
assertThrows(UnsupportedOperationException.class, () -> invokeOrThrow(completeOnTimeout, comfut, "", 1L, TimeUnit.MILLISECONDS));
|
||||||
try {
|
|
||||||
completeOnTimeout.invoke(comfut, "", 1L, TimeUnit.MILLISECONDS);
|
|
||||||
} catch (InvocationTargetException e) {
|
|
||||||
throw e.getCause();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
Method completeAsync = CompletableFuture.class.getDeclaredMethod("completeAsync", Supplier.class);
|
Method completeAsync = CompletableFuture.class.getDeclaredMethod("completeAsync", Supplier.class);
|
||||||
assertThrows(UnsupportedOperationException.class, () -> {
|
assertThrows(UnsupportedOperationException.class, () -> invokeOrThrow(completeAsync, comfut, (Supplier<String>) () -> ""));
|
||||||
try {
|
|
||||||
completeAsync.invoke(comfut, (Supplier<String>) () -> "");
|
|
||||||
} catch (InvocationTargetException e) {
|
|
||||||
throw e.getCause();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
Method obtrudeValue = CompletableFuture.class.getDeclaredMethod("obtrudeValue", Object.class);
|
Method obtrudeValue = CompletableFuture.class.getDeclaredMethod("obtrudeValue", Object.class);
|
||||||
assertThrows(UnsupportedOperationException.class, () -> {
|
assertThrows(UnsupportedOperationException.class, () -> invokeOrThrow(obtrudeValue, comfut, ""));
|
||||||
try {
|
|
||||||
obtrudeValue.invoke(comfut, "");
|
|
||||||
} catch (InvocationTargetException e) {
|
|
||||||
throw e.getCause();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
Method obtrudeException = CompletableFuture.class.getDeclaredMethod("obtrudeException", Throwable.class);
|
Method obtrudeException = CompletableFuture.class.getDeclaredMethod("obtrudeException", Throwable.class);
|
||||||
assertThrows(UnsupportedOperationException.class, () -> {
|
assertThrows(UnsupportedOperationException.class, () -> invokeOrThrow(obtrudeException, comfut, new RuntimeException()));
|
||||||
try {
|
|
||||||
obtrudeException.invoke(comfut, new RuntimeException());
|
|
||||||
} catch (InvocationTargetException e) {
|
|
||||||
throw e.getCause();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
// Check the CF from a minimal CompletionStage doesn't cause completion of the original KafkaFuture
|
// Check the CF from a minimal CompletionStage doesn't cause completion of the original KafkaFuture
|
||||||
Method minimal = CompletableFuture.class.getDeclaredMethod("minimalCompletionStage");
|
Method minimal = CompletableFuture.class.getDeclaredMethod("minimalCompletionStage");
|
||||||
CompletionStage<String> cs = (CompletionStage<String>) minimal.invoke(comfut);
|
CompletionStage<String> cs = (CompletionStage<String>) invokeOrThrow(minimal, comfut);
|
||||||
cs.toCompletableFuture().complete("");
|
cs.toCompletableFuture().complete("");
|
||||||
|
|
||||||
assertFalse(kfut.isDone());
|
assertFalse(kfut.isDone());
|
||||||
|
|
|
||||||
|
|
@ -78,7 +78,7 @@ public class RecordTestUtils {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (InvocationTargetException e) {
|
} catch (InvocationTargetException e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e.getCause());
|
||||||
} catch (IllegalAccessException e) {
|
} catch (IllegalAccessException e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue