From c2ee1411c8bb73fcf96c12abeedbfe6fde2c6354 Mon Sep 17 00:00:00 2001 From: "A. Sophie Blee-Goldman" Date: Thu, 24 Feb 2022 16:18:13 -0800 Subject: [PATCH] KAFKA-12738: send LeaveGroup request when thread dies to optimize replacement time (#11801) Quick followup to #11787 to optimize the impact of the task backoff by reducing the time to replace a thread. When a thread is going through a dirty close, ie shutting down from an uncaught exception, we should be sending a LeaveGroup request to make sure the broker acknowledges the thread has died and won't wait up to the `session.timeout` for it to join the group if the user opts to `REPLACE_THREAD` in the handler Reviewers: Walker Carlson , John Roesler --- .../kafka/streams/processor/internals/StreamThread.java | 5 +++-- .../streams/integration/ErrorHandlingIntegrationTest.java | 4 +--- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 3af07bbbd64..cecacb41eef 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -546,7 +546,8 @@ public class StreamThread extends Thread { cleanRun = runLoop(); } catch (final Throwable e) { failedStreamThreadSensor.record(); - this.streamsUncaughtExceptionHandler.accept(e, false); + requestLeaveGroupDuringShutdown(); + streamsUncaughtExceptionHandler.accept(e, false); } finally { completeShutdown(cleanRun); } @@ -1249,7 +1250,7 @@ public class StreamThread extends Thread { } public void requestLeaveGroupDuringShutdown() { - this.leaveGroupRequested.set(true); + leaveGroupRequested.set(true); } public Map producerMetrics() { diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ErrorHandlingIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ErrorHandlingIntegrationTest.java index 40d2cf98027..f889e375762 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/ErrorHandlingIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/ErrorHandlingIntegrationTest.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.integration; -import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.Serdes; @@ -96,8 +95,7 @@ public class ErrorHandlingIntegrationTest { mkEntry(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0), mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 15000L), mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class), - mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class), - mkEntry(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000)) + mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class)) ); }