KAFKA-12827 Remove Deprecated method KafkaStreams#setUncaughtExceptionHandler (#16988)

Reviewers: Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
Abhishek Giri 2024-11-06 07:08:32 +01:00 committed by GitHub
parent 069667b6b2
commit c903bdf496
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 21 additions and 91 deletions

View File

@ -40,6 +40,7 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsConfig.InternalConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
@ -418,7 +419,6 @@ public class EosIntegrationTest {
uncommittedRecords,
dataBeforeFailure,
"The uncommitted records before failure do not match what expected");
errorInjected.set(true);
writeInputData(dataAfterFailure);
@ -1104,7 +1104,7 @@ public class EosIntegrationTest {
final KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.setUncaughtExceptionHandler((t, e) -> {
streams.setUncaughtExceptionHandler(e -> {
if (uncaughtException != null ||
!(e instanceof StreamsException) ||
!e.getCause().getMessage().equals("Injected test exception.")) {
@ -1112,8 +1112,8 @@ public class EosIntegrationTest {
hasUnexpectedError = true;
}
uncaughtException = e;
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
});
return streams;
}

View File

@ -103,6 +103,7 @@ import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkProperties;
import static org.apache.kafka.streams.StoreQueryParameters.fromNameAndType;
import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getRunningStreams;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
import static org.apache.kafka.streams.state.QueryableStoreTypes.keyValueStore;
@ -1032,14 +1033,14 @@ public class QueryableStateIntegrationTest {
final KeyValue<String, String> hello = KeyValue.pair("hello", "hello");
IntegrationTestUtils.produceKeyValuesSynchronously(
streamThree,
Arrays.asList(hello, hello, hello, hello, hello, hello, hello, hello),
TestUtils.producerConfig(
CLUSTER.bootstrapServers(),
StringSerializer.class,
StringSerializer.class,
new Properties()),
mockTime);
streamThree,
Arrays.asList(hello, hello, hello, hello, hello, hello, hello, hello),
TestUtils.producerConfig(
CLUSTER.bootstrapServers(),
StringSerializer.class,
StringSerializer.class,
new Properties()),
mockTime);
final int maxWaitMs = 30000;
@ -1073,8 +1074,8 @@ public class QueryableStateIntegrationTest {
}
@Test
@Deprecated //A single thread should no longer die
public void shouldAllowToQueryAfterThreadDied() throws Exception {
final AtomicBoolean beforeFailure = new AtomicBoolean(true);
final AtomicBoolean failed = new AtomicBoolean(false);
@ -1097,7 +1098,10 @@ public class QueryableStateIntegrationTest {
streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
kafkaStreams.setUncaughtExceptionHandler((t, e) -> failed.set(true));
kafkaStreams.setUncaughtExceptionHandler(exception -> {
failed.set(true);
return REPLACE_THREAD;
});
startApplicationAndWaitUntilRunning(kafkaStreams);

View File

@ -77,7 +77,6 @@ import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.fail;
@SuppressWarnings("deprecation") //Need to call the old handler, will remove those calls when the old handler is removed
@Tag("integration")
@Timeout(600)
public class StreamsUncaughtExceptionHandlerIntegrationTest {
@ -141,30 +140,9 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest {
purgeLocalStreamsState(properties);
}
@Test
public void shouldShutdownThreadUsingOldHandler() throws Exception {
try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
final AtomicInteger counter = new AtomicInteger(0);
kafkaStreams.setUncaughtExceptionHandler((t, e) -> counter.incrementAndGet());
startApplicationAndWaitUntilRunning(kafkaStreams);
produceMessages(NOW, inputTopic, "A");
// should call the UncaughtExceptionHandler in current thread
TestUtils.waitForCondition(() -> counter.get() == 1, "Handler was called 1st time");
// should call the UncaughtExceptionHandler after rebalancing to another thread
TestUtils.waitForCondition(() -> counter.get() == 2, DEFAULT_DURATION.toMillis(), "Handler was called 2nd time");
// there is no threads running but the client is still in running
waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.RUNNING, DEFAULT_DURATION);
assertThat(processorValueCollector.size(), equalTo(2));
}
}
@Test
public void shouldShutdownClient() throws Exception {
try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
kafkaStreams.setUncaughtExceptionHandler((t, e) -> fail("should not hit old handler"));
kafkaStreams.setUncaughtExceptionHandler(exception -> SHUTDOWN_CLIENT);
@ -249,7 +227,6 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest {
properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 0);
try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
kafkaStreams.setUncaughtExceptionHandler((t, e) -> fail("should not hit old handler"));
kafkaStreams.setUncaughtExceptionHandler(exception -> REPLACE_THREAD);
startApplicationAndWaitUntilRunning(kafkaStreams);
@ -360,8 +337,6 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest {
try (final KafkaStreams kafkaStreams1 = new KafkaStreams(topology, properties);
final KafkaStreams kafkaStreams2 = new KafkaStreams(topology, properties)) {
kafkaStreams1.setUncaughtExceptionHandler((t, e) -> fail("should not hit old handler"));
kafkaStreams2.setUncaughtExceptionHandler((t, e) -> fail("should not hit old handler"));
kafkaStreams1.setUncaughtExceptionHandler(exception -> SHUTDOWN_APPLICATION);
kafkaStreams2.setUncaughtExceptionHandler(exception -> SHUTDOWN_APPLICATION);
@ -377,8 +352,6 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest {
private void testReplaceThreads(final int numThreads) throws Exception {
properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numThreads);
try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
kafkaStreams.setUncaughtExceptionHandler((t, e) -> fail("should not hit old handler"));
final AtomicInteger count = new AtomicInteger();
kafkaStreams.setUncaughtExceptionHandler(exception -> {
if (count.incrementAndGet() == numThreads) {

View File

@ -188,7 +188,6 @@ public class KafkaStreams implements AutoCloseable {
GlobalStreamThread globalStreamThread;
protected StateDirectory stateDirectory = null;
private KafkaStreams.StateListener stateListener;
private boolean oldHandler;
private BiConsumer<Throwable, Boolean> streamsUncaughtExceptionHandler;
private final Object changeThreadCount = new Object();
@ -433,32 +432,6 @@ public class KafkaStreams implements AutoCloseable {
}
}
/**
* Set the handler invoked when an internal {@link StreamsConfig#NUM_STREAM_THREADS_CONFIG stream thread} abruptly
* terminates due to an uncaught exception.
*
* @param uncaughtExceptionHandler the uncaught exception handler for all internal threads; {@code null} deletes the current handler
* @throws IllegalStateException if this {@code KafkaStreams} instance has already been started.
*
* @deprecated Since 2.8.0. Use {@link KafkaStreams#setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler)} instead.
*
*/
@Deprecated
public void setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
synchronized (stateLock) {
if (state.hasNotStarted()) {
oldHandler = true;
processStreamThread(thread -> thread.setUncaughtExceptionHandler(uncaughtExceptionHandler));
if (globalStreamThread != null) {
globalStreamThread.setUncaughtExceptionHandler(uncaughtExceptionHandler);
}
} else {
throw new IllegalStateException("Can only set UncaughtExceptionHandler before calling start(). " +
"Current state is: " + state);
}
}
}
/**
* Set the handler invoked when an internal {@link StreamsConfig#NUM_STREAM_THREADS_CONFIG stream thread}
@ -502,21 +475,6 @@ public class KafkaStreams implements AutoCloseable {
}
}
private void defaultStreamsUncaughtExceptionHandler(final Throwable throwable, final boolean skipThreadReplacement) {
if (oldHandler) {
threads.remove(Thread.currentThread());
if (throwable instanceof RuntimeException) {
throw (RuntimeException) throwable;
} else if (throwable instanceof Error) {
throw (Error) throwable;
} else {
throw new RuntimeException("Unexpected checked exception caught in the uncaught exception handler", throwable);
}
} else {
handleStreamsUncaughtException(throwable, t -> SHUTDOWN_CLIENT, skipThreadReplacement);
}
}
private void replaceStreamThread(final Throwable throwable) {
if (globalStreamThread != null && Thread.currentThread().getName().equals(globalStreamThread.getName())) {
log.warn("The global thread cannot be replaced. Reverting to shutting down the client.");
@ -540,10 +498,7 @@ public class KafkaStreams implements AutoCloseable {
final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler,
final boolean skipThreadReplacement) {
final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse action = streamsUncaughtExceptionHandler.handle(throwable);
if (oldHandler) {
log.warn("Stream's new uncaught exception handler is set as well as the deprecated old handler." +
"The old handler will be ignored as long as a new handler is set.");
}
switch (action) {
case REPLACE_THREAD:
if (!skipThreadReplacement) {
@ -1039,9 +994,7 @@ public class KafkaStreams implements AutoCloseable {
parseHostInfo(applicationConfigs.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)),
logContext
);
oldHandler = false;
streamsUncaughtExceptionHandler = this::defaultStreamsUncaughtExceptionHandler;
streamsUncaughtExceptionHandler = (throwable, skipThreadReplacement) -> handleStreamsUncaughtException(throwable, t -> SHUTDOWN_CLIENT, skipThreadReplacement);
delegatingStateRestoreListener = new DelegatingStateRestoreListener();
delegatingStandbyUpdateListener = new DelegatingStandbyUpdateListener();
@ -1062,7 +1015,7 @@ public class KafkaStreams implements AutoCloseable {
time,
globalThreadId,
delegatingStateRestoreListener,
exception -> defaultStreamsUncaughtExceptionHandler(exception, false)
exception -> handleStreamsUncaughtException(exception, t -> SHUTDOWN_CLIENT, false)
);
globalThreadState = globalStreamThread.state();
}
@ -1407,7 +1360,7 @@ public class KafkaStreams implements AutoCloseable {
* However, if you have global stores in your topology, this method blocks until all global stores are restored.
* As a consequence, any fatal exception that happens during processing is by default only logged.
* If you want to be notified about dying threads, you can
* {@link #setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler) register an uncaught exception handler}
* {@link #setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler) register an uncaught exception handler}
* before starting the {@code KafkaStreams} instance.
* <p>
* Note, for brokers with version {@code 0.9.x} or lower, the broker version cannot be checked.