KAFKA-17100: GlobalStreamThread#start should not busy-wait (#16914)

This PR replaces a busy-wait sleep with a CountDownLatch.

Reviewers: Greg Harris <greg.harris@aiven.io>, Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
Ramin Gharib 2024-08-25 23:53:16 +02:00 committed by GitHub
parent e37f1dfb25
commit 9eb7e1a23d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 14 additions and 13 deletions

View File

@ -29,7 +29,6 @@ import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.internals.KafkaFutureImpl; import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StateRestoreListener; import org.apache.kafka.streams.processor.StateRestoreListener;
@ -45,9 +44,9 @@ import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import static org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.CREATED;
import static org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.DEAD; import static org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.DEAD;
import static org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.PENDING_SHUTDOWN; import static org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.PENDING_SHUTDOWN;
import static org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.RUNNING; import static org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.RUNNING;
@ -72,6 +71,7 @@ public class GlobalStreamThread extends Thread {
private java.util.function.Consumer<Throwable> streamsUncaughtExceptionHandler; private java.util.function.Consumer<Throwable> streamsUncaughtExceptionHandler;
private volatile long fetchDeadlineClientInstanceId = -1; private volatile long fetchDeadlineClientInstanceId = -1;
private volatile KafkaFutureImpl<Uuid> clientInstanceIdFuture = new KafkaFutureImpl<>(); private volatile KafkaFutureImpl<Uuid> clientInstanceIdFuture = new KafkaFutureImpl<>();
private final CountDownLatch initializationLatch = new CountDownLatch(1);
/** /**
* The states that the global stream thread can be in * The states that the global stream thread can be in
@ -194,12 +194,6 @@ public class GlobalStreamThread extends Thread {
} }
} }
public boolean stillInitializing() {
synchronized (stateLock) {
return state.equals(CREATED);
}
}
public GlobalStreamThread(final ProcessorTopology topology, public GlobalStreamThread(final ProcessorTopology topology,
final StreamsConfig config, final StreamsConfig config,
final Consumer<byte[], byte[]> globalConsumer, final Consumer<byte[], byte[]> globalConsumer,
@ -436,6 +430,8 @@ public class GlobalStreamThread extends Thread {
} catch (final Exception fatalException) { } catch (final Exception fatalException) {
closeStateConsumer(stateConsumer, false); closeStateConsumer(stateConsumer, false);
startupException = new StreamsException("Exception caught during initialization of GlobalStreamThread", fatalException); startupException = new StreamsException("Exception caught during initialization of GlobalStreamThread", fatalException);
} finally {
initializationLatch.countDown();
} }
return null; return null;
} }
@ -453,12 +449,16 @@ public class GlobalStreamThread extends Thread {
@Override @Override
public synchronized void start() { public synchronized void start() {
super.start(); super.start();
while (stillInitializing()) { try {
Utils.sleep(1); initializationLatch.await();
} catch (final InterruptedException e) {
currentThread().interrupt();
throw new IllegalStateException("GlobalStreamThread was interrupted during initialization", e);
}
if (startupException != null) { if (startupException != null) {
throw startupException; throw startupException;
} }
}
if (inErrorState()) { if (inErrorState()) {
throw new IllegalStateException("Initialization for the global stream thread failed"); throw new IllegalStateException("Initialization for the global stream thread failed");
@ -469,6 +469,7 @@ public class GlobalStreamThread extends Thread {
// one could call shutdown() multiple times, so ignore subsequent calls // one could call shutdown() multiple times, so ignore subsequent calls
// if already shutting down or dead // if already shutting down or dead
setState(PENDING_SHUTDOWN); setState(PENDING_SHUTDOWN);
initializationLatch.countDown();
} }
public Map<MetricName, Metric> consumerMetrics() { public Map<MetricName, Metric> consumerMetrics() {