KAFKA-12887 Skip some RuntimeExceptions from exception handler (#11228)

Instead of letting all RuntimeExceptions go through and be processed by the uncaught exception handler, IllegalStateException and IllegalArgumentException are not passed through and fail fast. In this PR when setting the uncaught exception handler we check if the exception is in an "exclude list", if so, we terminate the client, otherwise we continue as usual.

Added test checking this new case. Added integration test checking that user defined exception handler is not used when an IllegalStateException is thrown.

Reviewers: Bruno Cadonna <cadonna@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
This commit is contained in:
Josep Prat 2021-09-01 18:58:36 +02:00 committed by GitHub
parent 25b0857bdb
commit 4835c64f89
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 78 additions and 11 deletions

View File

@ -141,6 +141,9 @@ public class KafkaStreams implements AutoCloseable {
private static final String JMX_PREFIX = "kafka.streams";
private static final Set<Class<? extends Throwable>> EXCEPTIONS_NOT_TO_BE_HANDLED_BY_USERS =
new HashSet<>(Arrays.asList(IllegalStateException.class, IllegalArgumentException.class));
// processId is expected to be unique across JVMs and to be used
// in userData of the subscription request to allow assignor be aware
// of the co-location of stream thread's consumers. It is for internal
@ -495,9 +498,24 @@ public class KafkaStreams implements AutoCloseable {
}
}
private boolean wrappedExceptionIsIn(final Throwable throwable, final Set<Class<? extends Throwable>> exceptionsOfInterest) {
return throwable.getCause() != null && exceptionsOfInterest.contains(throwable.getCause().getClass());
}
private StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse getActionForThrowable(final Throwable throwable,
final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse action;
if (wrappedExceptionIsIn(throwable, EXCEPTIONS_NOT_TO_BE_HANDLED_BY_USERS)) {
action = SHUTDOWN_CLIENT;
} else {
action = streamsUncaughtExceptionHandler.handle(throwable);
}
return action;
}
private void handleStreamsUncaughtException(final Throwable throwable,
final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse action = streamsUncaughtExceptionHandler.handle(throwable);
final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse action = getActionForThrowable(throwable, streamsUncaughtExceptionHandler);
if (oldHandler) {
log.warn("Stream's new uncaught exception handler is set as well as the deprecated old handler." +
"The old handler will be ignored as long as a new handler is set.");
@ -509,7 +527,7 @@ public class KafkaStreams implements AutoCloseable {
break;
case SHUTDOWN_CLIENT:
log.error("Encountered the following exception during processing " +
"and the registered exception handler opted to " + action + "." +
"and Kafka Streams opted to " + action + "." +
" The streams client is going to shut down now. ", throwable);
closeToError();
break;

View File

@ -104,7 +104,7 @@ public class EmitOnChangeIntegrationTest {
.toStream()
.map((key, value) -> {
if (shouldThrow.compareAndSet(true, false)) {
throw new IllegalStateException("Kaboom");
throw new RuntimeException("Kaboom");
} else {
return new KeyValue<>(key, value);
}

View File

@ -93,7 +93,9 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest {
private static Properties properties;
private static List<String> processorValueCollector;
private static String appId = "";
private static AtomicBoolean throwError = new AtomicBoolean(true);
private static final AtomicBoolean THROW_ERROR = new AtomicBoolean(true);
private static final AtomicBoolean THROW_ILLEGAL_STATE_EXCEPTION = new AtomicBoolean(false);
private static final AtomicBoolean THROW_ILLEGAL_ARGUMENT_EXCEPTION = new AtomicBoolean(false);
@Before
public void setup() {
@ -163,6 +165,47 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest {
}
}
@Test
public void shouldShutdownClientWhenIllegalStateException() throws InterruptedException {
THROW_ILLEGAL_STATE_EXCEPTION.compareAndSet(false, true);
try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
kafkaStreams.setUncaughtExceptionHandler((t, e) -> fail("should not hit old handler"));
kafkaStreams.setUncaughtExceptionHandler(exception -> REPLACE_THREAD); // if the user defined uncaught exception handler would be hit we would be replacing the thread
StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
produceMessages(0L, inputTopic, "A");
waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.ERROR, DEFAULT_DURATION);
assertThat(processorValueCollector.size(), equalTo(1));
} finally {
THROW_ILLEGAL_STATE_EXCEPTION.compareAndSet(true, false);
}
}
@Test
public void shouldShutdownClientWhenIllegalArgumentException() throws InterruptedException {
THROW_ILLEGAL_ARGUMENT_EXCEPTION.compareAndSet(false, true);
try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
kafkaStreams.setUncaughtExceptionHandler((t, e) -> fail("should not hit old handler"));
kafkaStreams.setUncaughtExceptionHandler(exception -> REPLACE_THREAD); // if the user defined uncaught exception handler would be hit we would be replacing the thread
StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
produceMessages(0L, inputTopic, "A");
waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.ERROR, DEFAULT_DURATION);
assertThat(processorValueCollector.size(), equalTo(1));
} finally {
THROW_ILLEGAL_ARGUMENT_EXCEPTION.compareAndSet(true, false);
}
}
@Test
public void shouldReplaceThreads() throws InterruptedException {
testReplaceThreads(2);
@ -235,10 +278,16 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest {
@Override
public void process(final String key, final String value) {
valueList.add(value + " " + context.taskId());
if (throwError.get()) {
throw new StreamsException(Thread.currentThread().getName());
if (THROW_ERROR.get()) {
if (THROW_ILLEGAL_STATE_EXCEPTION.get()) {
throw new IllegalStateException("Something unexpected happened in " + Thread.currentThread().getName());
} else if (THROW_ILLEGAL_ARGUMENT_EXCEPTION.get()) {
throw new IllegalArgumentException("Something unexpected happened in " + Thread.currentThread().getName());
} else {
throw new StreamsException(Thread.currentThread().getName());
}
}
throwError.set(true);
THROW_ERROR.set(true);
}
}
@ -272,7 +321,7 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest {
final AtomicInteger count = new AtomicInteger();
kafkaStreams.setUncaughtExceptionHandler(exception -> {
if (count.incrementAndGet() == numThreads) {
throwError.set(false);
THROW_ERROR.set(false);
}
return REPLACE_THREAD;
});
@ -280,7 +329,7 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest {
produceMessages(0L, inputTopic, "A");
TestUtils.waitForCondition(() -> count.get() == numThreads, "finished replacing threads");
TestUtils.waitForCondition(() -> throwError.get(), "finished replacing threads");
TestUtils.waitForCondition(() -> THROW_ERROR.get(), "finished replacing threads");
kafkaStreams.close();
waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION);

View File

@ -2336,9 +2336,9 @@ public class StreamThreadTest {
expect(consumer.groupMetadata()).andStubReturn(consumerGroupMetadata);
expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty());
consumer.subscribe((Collection<String>) anyObject(), anyObject());
EasyMock.expectLastCall().anyTimes();
EasyMock.expectLastCall().atLeastOnce();
consumer.unsubscribe();
EasyMock.expectLastCall().anyTimes();
EasyMock.expectLastCall().atLeastOnce();
EasyMock.replay(consumerGroupMetadata);
final Task task1 = mock(Task.class);
final Task task2 = mock(Task.class);