mirror of https://github.com/apache/kafka.git
KAFKA-12738: send LeaveGroup request when thread dies to optimize replacement time (#11801)
Quick followup to #11787 to optimize the impact of the task backoff by reducing the time to replace a thread. When a thread is going through a dirty close, ie shutting down from an uncaught exception, we should be sending a LeaveGroup request to make sure the broker acknowledges the thread has died and won't wait up to the `session.timeout` for it to join the group if the user opts to `REPLACE_THREAD` in the handler Reviewers: Walker Carlson <wcarlson@confluent.io>, John Roesler <vvcephei@apache.org>
This commit is contained in:
parent
15ebad54b4
commit
c2ee1411c8
|
|
@ -546,7 +546,8 @@ public class StreamThread extends Thread {
|
||||||
cleanRun = runLoop();
|
cleanRun = runLoop();
|
||||||
} catch (final Throwable e) {
|
} catch (final Throwable e) {
|
||||||
failedStreamThreadSensor.record();
|
failedStreamThreadSensor.record();
|
||||||
this.streamsUncaughtExceptionHandler.accept(e, false);
|
requestLeaveGroupDuringShutdown();
|
||||||
|
streamsUncaughtExceptionHandler.accept(e, false);
|
||||||
} finally {
|
} finally {
|
||||||
completeShutdown(cleanRun);
|
completeShutdown(cleanRun);
|
||||||
}
|
}
|
||||||
|
|
@ -1249,7 +1250,7 @@ public class StreamThread extends Thread {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void requestLeaveGroupDuringShutdown() {
|
public void requestLeaveGroupDuringShutdown() {
|
||||||
this.leaveGroupRequested.set(true);
|
leaveGroupRequested.set(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Map<MetricName, Metric> producerMetrics() {
|
public Map<MetricName, Metric> producerMetrics() {
|
||||||
|
|
|
||||||
|
|
@ -16,7 +16,6 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.kafka.streams.integration;
|
package org.apache.kafka.streams.integration;
|
||||||
|
|
||||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
|
||||||
import org.apache.kafka.common.serialization.IntegerDeserializer;
|
import org.apache.kafka.common.serialization.IntegerDeserializer;
|
||||||
import org.apache.kafka.common.serialization.IntegerSerializer;
|
import org.apache.kafka.common.serialization.IntegerSerializer;
|
||||||
import org.apache.kafka.common.serialization.Serdes;
|
import org.apache.kafka.common.serialization.Serdes;
|
||||||
|
|
@ -96,8 +95,7 @@ public class ErrorHandlingIntegrationTest {
|
||||||
mkEntry(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0),
|
mkEntry(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0),
|
||||||
mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 15000L),
|
mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 15000L),
|
||||||
mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class),
|
mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class),
|
||||||
mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class),
|
mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class))
|
||||||
mkEntry(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000))
|
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue