diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 3af07bbbd64..cecacb41eef 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -546,7 +546,8 @@ public class StreamThread extends Thread { cleanRun = runLoop(); } catch (final Throwable e) { failedStreamThreadSensor.record(); - this.streamsUncaughtExceptionHandler.accept(e, false); + requestLeaveGroupDuringShutdown(); + streamsUncaughtExceptionHandler.accept(e, false); } finally { completeShutdown(cleanRun); } @@ -1249,7 +1250,7 @@ public class StreamThread extends Thread { } public void requestLeaveGroupDuringShutdown() { - this.leaveGroupRequested.set(true); + leaveGroupRequested.set(true); } public Map producerMetrics() { diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ErrorHandlingIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ErrorHandlingIntegrationTest.java index 40d2cf98027..f889e375762 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/ErrorHandlingIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/ErrorHandlingIntegrationTest.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.integration; -import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.Serdes; @@ -96,8 +95,7 @@ public class ErrorHandlingIntegrationTest { mkEntry(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0), mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 15000L), mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class), - mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class), - mkEntry(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000)) + mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class)) ); }