KAFKA-19054: StreamThread exception handling with SHUTDOWN_APPLICATION may trigger a tight loop with MANY logs (#19394)

Under the `SHUTDOWN_APPLICATION` configuration in Kafka Streams, a tight
loop in the shutdown process can flood logs with repeated messages. This
PR introduces a check to ensure that the shutdown log is emitted only
once every 10 seconds, thereby preventing log flooding.

Reviewers: PoAn Yang <payang@apache.org>, Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
Hong-Yi Chen 2025-04-17 11:35:43 +08:00 committed by Matthias J. Sax
parent f98dec9440
commit 8a515da2c8
2 changed files with 21 additions and 5 deletions

View File

@ -23,6 +23,8 @@ import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
@ -39,10 +41,12 @@ import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
import org.apache.kafka.test.TestUtils;
import org.apache.logging.log4j.Level;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
@ -334,11 +338,14 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest {
properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numThreads);
final Topology topology = builder.build();
try (final KafkaStreams kafkaStreams1 = new KafkaStreams(topology, properties);
final KafkaStreams kafkaStreams2 = new KafkaStreams(topology, properties)) {
final MockTime time = new MockTime(0L);
try (final KafkaStreams kafkaStreams1 = new KafkaStreams(topology, properties, time);
final KafkaStreams kafkaStreams2 = new KafkaStreams(topology, properties, time);
final LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister()) {
kafkaStreams1.setUncaughtExceptionHandler(exception -> SHUTDOWN_APPLICATION);
kafkaStreams2.setUncaughtExceptionHandler(exception -> SHUTDOWN_APPLICATION);
logCaptureAppender.setClassLogger(StreamThread.class, Level.WARN);
startApplicationAndWaitUntilRunning(asList(kafkaStreams1, kafkaStreams2));
@ -346,6 +353,8 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest {
waitForApplicationState(asList(kafkaStreams1, kafkaStreams2), KafkaStreams.State.ERROR, DEFAULT_DURATION);
assertThat(processorValueCollector.size(), equalTo(1));
assertThat("Shutdown warning log message should be exported exactly once",
logCaptureAppender.getMessages("WARN").stream().filter(msg -> msg.contains("Detected that shutdown was requested")).count(), equalTo(1L));
}
}

View File

@ -346,6 +346,7 @@ public class StreamThread extends Thread implements ProcessingThread {
// These are used to signal from outside the stream thread, but the variables themselves are internal to the thread
private final AtomicLong cacheResizeSize = new AtomicLong(-1L);
private final AtomicBoolean leaveGroupRequested = new AtomicBoolean(false);
private final AtomicLong lastShutdownWarningTimestamp = new AtomicLong(0L);
private final boolean eosEnabled;
private final boolean stateUpdaterEnabled;
private final boolean processingThreadsEnabled;
@ -869,8 +870,14 @@ public class StreamThread extends Thread implements ProcessingThread {
public void maybeSendShutdown() {
if (assignmentErrorCode.get() == AssignorError.SHUTDOWN_REQUESTED.code()) {
log.warn("Detected that shutdown was requested. " +
"All clients in this app will now begin to shutdown");
final long now = time.milliseconds();
final long lastLogged = lastShutdownWarningTimestamp.get();
if (now - lastLogged >= 10_000L) {
if (lastShutdownWarningTimestamp.compareAndSet(lastLogged, now)) {
log.warn("Detected that shutdown was requested. " +
"All clients in this app will now begin to shutdown");
}
}
mainConsumer.enforceRebalance("Shutdown requested");
}
}