mirror of https://github.com/apache/kafka.git
KAFKA-12887 Skip some RuntimeExceptions from exception handler (#11228)
Instead of letting all RuntimeExceptions go through and be processed by the uncaught exception handler, IllegalStateException and IllegalArgumentException are not passed through and fail fast. In this PR when setting the uncaught exception handler we check if the exception is in an "exclude list", if so, we terminate the client, otherwise we continue as usual. Added test checking this new case. Added integration test checking that user defined exception handler is not used when an IllegalStateException is thrown. Reviewers: Bruno Cadonna <cadonna@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
This commit is contained in:
parent
25b0857bdb
commit
4835c64f89
|
@ -141,6 +141,9 @@ public class KafkaStreams implements AutoCloseable {
|
|||
|
||||
private static final String JMX_PREFIX = "kafka.streams";
|
||||
|
||||
private static final Set<Class<? extends Throwable>> EXCEPTIONS_NOT_TO_BE_HANDLED_BY_USERS =
|
||||
new HashSet<>(Arrays.asList(IllegalStateException.class, IllegalArgumentException.class));
|
||||
|
||||
// processId is expected to be unique across JVMs and to be used
|
||||
// in userData of the subscription request to allow assignor be aware
|
||||
// of the co-location of stream thread's consumers. It is for internal
|
||||
|
@ -495,9 +498,24 @@ public class KafkaStreams implements AutoCloseable {
|
|||
}
|
||||
}
|
||||
|
||||
private boolean wrappedExceptionIsIn(final Throwable throwable, final Set<Class<? extends Throwable>> exceptionsOfInterest) {
|
||||
return throwable.getCause() != null && exceptionsOfInterest.contains(throwable.getCause().getClass());
|
||||
}
|
||||
|
||||
private StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse getActionForThrowable(final Throwable throwable,
|
||||
final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
|
||||
final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse action;
|
||||
if (wrappedExceptionIsIn(throwable, EXCEPTIONS_NOT_TO_BE_HANDLED_BY_USERS)) {
|
||||
action = SHUTDOWN_CLIENT;
|
||||
} else {
|
||||
action = streamsUncaughtExceptionHandler.handle(throwable);
|
||||
}
|
||||
return action;
|
||||
}
|
||||
|
||||
private void handleStreamsUncaughtException(final Throwable throwable,
|
||||
final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
|
||||
final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse action = streamsUncaughtExceptionHandler.handle(throwable);
|
||||
final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse action = getActionForThrowable(throwable, streamsUncaughtExceptionHandler);
|
||||
if (oldHandler) {
|
||||
log.warn("Stream's new uncaught exception handler is set as well as the deprecated old handler." +
|
||||
"The old handler will be ignored as long as a new handler is set.");
|
||||
|
@ -509,7 +527,7 @@ public class KafkaStreams implements AutoCloseable {
|
|||
break;
|
||||
case SHUTDOWN_CLIENT:
|
||||
log.error("Encountered the following exception during processing " +
|
||||
"and the registered exception handler opted to " + action + "." +
|
||||
"and Kafka Streams opted to " + action + "." +
|
||||
" The streams client is going to shut down now. ", throwable);
|
||||
closeToError();
|
||||
break;
|
||||
|
|
|
@ -104,7 +104,7 @@ public class EmitOnChangeIntegrationTest {
|
|||
.toStream()
|
||||
.map((key, value) -> {
|
||||
if (shouldThrow.compareAndSet(true, false)) {
|
||||
throw new IllegalStateException("Kaboom");
|
||||
throw new RuntimeException("Kaboom");
|
||||
} else {
|
||||
return new KeyValue<>(key, value);
|
||||
}
|
||||
|
|
|
@ -93,7 +93,9 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest {
|
|||
private static Properties properties;
|
||||
private static List<String> processorValueCollector;
|
||||
private static String appId = "";
|
||||
private static AtomicBoolean throwError = new AtomicBoolean(true);
|
||||
private static final AtomicBoolean THROW_ERROR = new AtomicBoolean(true);
|
||||
private static final AtomicBoolean THROW_ILLEGAL_STATE_EXCEPTION = new AtomicBoolean(false);
|
||||
private static final AtomicBoolean THROW_ILLEGAL_ARGUMENT_EXCEPTION = new AtomicBoolean(false);
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
|
@ -163,6 +165,47 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void shouldShutdownClientWhenIllegalStateException() throws InterruptedException {
|
||||
THROW_ILLEGAL_STATE_EXCEPTION.compareAndSet(false, true);
|
||||
try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
|
||||
kafkaStreams.setUncaughtExceptionHandler((t, e) -> fail("should not hit old handler"));
|
||||
|
||||
kafkaStreams.setUncaughtExceptionHandler(exception -> REPLACE_THREAD); // if the user defined uncaught exception handler would be hit we would be replacing the thread
|
||||
|
||||
StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
|
||||
|
||||
produceMessages(0L, inputTopic, "A");
|
||||
waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.ERROR, DEFAULT_DURATION);
|
||||
|
||||
assertThat(processorValueCollector.size(), equalTo(1));
|
||||
} finally {
|
||||
THROW_ILLEGAL_STATE_EXCEPTION.compareAndSet(true, false);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldShutdownClientWhenIllegalArgumentException() throws InterruptedException {
|
||||
THROW_ILLEGAL_ARGUMENT_EXCEPTION.compareAndSet(false, true);
|
||||
try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
|
||||
kafkaStreams.setUncaughtExceptionHandler((t, e) -> fail("should not hit old handler"));
|
||||
|
||||
kafkaStreams.setUncaughtExceptionHandler(exception -> REPLACE_THREAD); // if the user defined uncaught exception handler would be hit we would be replacing the thread
|
||||
|
||||
StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
|
||||
|
||||
produceMessages(0L, inputTopic, "A");
|
||||
waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.ERROR, DEFAULT_DURATION);
|
||||
|
||||
assertThat(processorValueCollector.size(), equalTo(1));
|
||||
} finally {
|
||||
THROW_ILLEGAL_ARGUMENT_EXCEPTION.compareAndSet(true, false);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldReplaceThreads() throws InterruptedException {
|
||||
testReplaceThreads(2);
|
||||
|
@ -235,10 +278,16 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest {
|
|||
@Override
|
||||
public void process(final String key, final String value) {
|
||||
valueList.add(value + " " + context.taskId());
|
||||
if (throwError.get()) {
|
||||
throw new StreamsException(Thread.currentThread().getName());
|
||||
if (THROW_ERROR.get()) {
|
||||
if (THROW_ILLEGAL_STATE_EXCEPTION.get()) {
|
||||
throw new IllegalStateException("Something unexpected happened in " + Thread.currentThread().getName());
|
||||
} else if (THROW_ILLEGAL_ARGUMENT_EXCEPTION.get()) {
|
||||
throw new IllegalArgumentException("Something unexpected happened in " + Thread.currentThread().getName());
|
||||
} else {
|
||||
throw new StreamsException(Thread.currentThread().getName());
|
||||
}
|
||||
}
|
||||
throwError.set(true);
|
||||
THROW_ERROR.set(true);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -272,7 +321,7 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest {
|
|||
final AtomicInteger count = new AtomicInteger();
|
||||
kafkaStreams.setUncaughtExceptionHandler(exception -> {
|
||||
if (count.incrementAndGet() == numThreads) {
|
||||
throwError.set(false);
|
||||
THROW_ERROR.set(false);
|
||||
}
|
||||
return REPLACE_THREAD;
|
||||
});
|
||||
|
@ -280,7 +329,7 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest {
|
|||
|
||||
produceMessages(0L, inputTopic, "A");
|
||||
TestUtils.waitForCondition(() -> count.get() == numThreads, "finished replacing threads");
|
||||
TestUtils.waitForCondition(() -> throwError.get(), "finished replacing threads");
|
||||
TestUtils.waitForCondition(() -> THROW_ERROR.get(), "finished replacing threads");
|
||||
kafkaStreams.close();
|
||||
waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION);
|
||||
|
||||
|
|
|
@ -2336,9 +2336,9 @@ public class StreamThreadTest {
|
|||
expect(consumer.groupMetadata()).andStubReturn(consumerGroupMetadata);
|
||||
expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty());
|
||||
consumer.subscribe((Collection<String>) anyObject(), anyObject());
|
||||
EasyMock.expectLastCall().anyTimes();
|
||||
EasyMock.expectLastCall().atLeastOnce();
|
||||
consumer.unsubscribe();
|
||||
EasyMock.expectLastCall().anyTimes();
|
||||
EasyMock.expectLastCall().atLeastOnce();
|
||||
EasyMock.replay(consumerGroupMetadata);
|
||||
final Task task1 = mock(Task.class);
|
||||
final Task task2 = mock(Task.class);
|
||||
|
|
Loading…
Reference in New Issue