From 9eb7e1a23d846a3e7feddcf6b734130b688cbb6c Mon Sep 17 00:00:00 2001 From: Ramin Gharib Date: Sun, 25 Aug 2024 23:53:16 +0200 Subject: [PATCH] KAFKA-17100: GlobalStreamThread#start should not busy-wait (#16914) This PR replaces a busy-wait sleep with a CountDownLatch. Reviewers: Greg Harris , Matthias J. Sax --- .../internals/GlobalStreamThread.java | 27 ++++++++++--------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java index 1c7194b1913..d064603e0ba 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java @@ -29,7 +29,6 @@ import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.internals.KafkaFutureImpl; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.StateRestoreListener; @@ -45,9 +44,9 @@ import java.util.Collections; import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicLong; -import static org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.CREATED; import static org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.DEAD; import static org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.PENDING_SHUTDOWN; import static org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.RUNNING; @@ -72,6 +71,7 @@ public class GlobalStreamThread extends Thread { private java.util.function.Consumer streamsUncaughtExceptionHandler; private volatile long fetchDeadlineClientInstanceId = -1; private volatile KafkaFutureImpl clientInstanceIdFuture = new KafkaFutureImpl<>(); + private final CountDownLatch initializationLatch = new CountDownLatch(1); /** * The states that the global stream thread can be in @@ -194,12 +194,6 @@ public class GlobalStreamThread extends Thread { } } - public boolean stillInitializing() { - synchronized (stateLock) { - return state.equals(CREATED); - } - } - public GlobalStreamThread(final ProcessorTopology topology, final StreamsConfig config, final Consumer globalConsumer, @@ -436,6 +430,8 @@ public class GlobalStreamThread extends Thread { } catch (final Exception fatalException) { closeStateConsumer(stateConsumer, false); startupException = new StreamsException("Exception caught during initialization of GlobalStreamThread", fatalException); + } finally { + initializationLatch.countDown(); } return null; } @@ -453,11 +449,15 @@ public class GlobalStreamThread extends Thread { @Override public synchronized void start() { super.start(); - while (stillInitializing()) { - Utils.sleep(1); - if (startupException != null) { - throw startupException; - } + try { + initializationLatch.await(); + } catch (final InterruptedException e) { + currentThread().interrupt(); + throw new IllegalStateException("GlobalStreamThread was interrupted during initialization", e); + } + + if (startupException != null) { + throw startupException; } if (inErrorState()) { @@ -469,6 +469,7 @@ public class GlobalStreamThread extends Thread { // one could call shutdown() multiple times, so ignore subsequent calls // if already shutting down or dead setState(PENDING_SHUTDOWN); + initializationLatch.countDown(); } public Map consumerMetrics() {