diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java index e5a6aff47c9..9629a3bccee 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java @@ -16,8 +16,12 @@ */ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.internals.KafkaFutureImpl; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.Sensor.RecordingLevel; @@ -79,6 +83,9 @@ public class DefaultStateUpdater implements StateUpdater { private long totalCheckpointLatency = 0L; + private volatile long fetchDeadlineClientInstanceId = -1L; + private volatile KafkaFutureImpl clientInstanceIdFuture = new KafkaFutureImpl<>(); + public StateUpdaterThread(final String name, final Metrics metrics, final ChangelogReader changelogReader) { @@ -165,6 +172,8 @@ public class DefaultStateUpdater implements StateUpdater { pauseTasks(); restoreTasks(totalStartTimeMs); + maybeGetClientInstanceIds(); + final long checkpointStartTimeMs = time.milliseconds(); maybeCheckpointTasks(checkpointStartTimeMs); @@ -231,6 +240,69 @@ public class DefaultStateUpdater implements StateUpdater { } } + private void maybeGetClientInstanceIds() { + if (fetchDeadlineClientInstanceId != -1) { + if (!clientInstanceIdFuture.isDone()) { + if (fetchDeadlineClientInstanceId >= time.milliseconds()) { + try { + // if the state-updated thread has active work: + // we pass in a timeout of zero into each `clientInstanceId()` call + // to just trigger the "get instance id" background RPC; + // we don't want to block the state updater thread that can do useful work in the meantime + // otherwise, we pass in 100ms to avoid busy waiting + clientInstanceIdFuture.complete( + restoreConsumer.clientInstanceId( + allWorkDone() ? Duration.ofMillis(100L) : Duration.ZERO + ) + ); + fetchDeadlineClientInstanceId = -1L; + } catch (final IllegalStateException disabledError) { + // if telemetry is disabled on a client, we swallow the error, + // to allow returning a partial result for all other clients + clientInstanceIdFuture.complete(null); + fetchDeadlineClientInstanceId = -1L; + } catch (final TimeoutException swallow) { + // swallow + } catch (final Exception error) { + clientInstanceIdFuture.completeExceptionally(error); + fetchDeadlineClientInstanceId = -1L; + } + } else { + clientInstanceIdFuture.completeExceptionally( + new TimeoutException("Could not retrieve restore consumer client instance id.") + ); + fetchDeadlineClientInstanceId = -1L; + } + } + } + } + + private KafkaFutureImpl restoreConsumerInstanceId(final Duration timeout) { + boolean setDeadline = false; + + if (clientInstanceIdFuture.isDone()) { + if (clientInstanceIdFuture.isCompletedExceptionally()) { + clientInstanceIdFuture = new KafkaFutureImpl<>(); + setDeadline = true; + } + } else { + setDeadline = true; + } + + if (setDeadline) { + fetchDeadlineClientInstanceId = time.milliseconds() + timeout.toMillis(); + tasksAndActionsLock.lock(); + try { + tasksAndActionsCondition.signalAll(); + } finally { + tasksAndActionsLock.unlock(); + } + } + + return clientInstanceIdFuture; + } + + private void handleRuntimeException(final RuntimeException runtimeException) { log.error("An unexpected error occurred within the state updater thread: " + runtimeException); addToExceptionsAndFailedTasksThenClearUpdatingTasks(new ExceptionAndTasks(new HashSet<>(updatingTasks.values()), runtimeException)); @@ -306,13 +378,8 @@ public class DefaultStateUpdater implements StateUpdater { private void waitIfAllChangelogsCompletelyRead() { tasksAndActionsLock.lock(); - final boolean noTasksToUpdate = changelogReader.allChangelogsCompleted() || updatingTasks.isEmpty(); try { - while (isRunning.get() && - noTasksToUpdate && - tasksAndActions.isEmpty() && - !isTopologyResumed.get()) { - + while (allWorkDone() && fetchDeadlineClientInstanceId == -1L) { isIdle.set(true); tasksAndActionsCondition.await(); } @@ -325,6 +392,15 @@ public class DefaultStateUpdater implements StateUpdater { } } + private boolean allWorkDone() { + final boolean noTasksToUpdate = changelogReader.allChangelogsCompleted() || updatingTasks.isEmpty(); + + return isRunning.get() && + noTasksToUpdate && + tasksAndActions.isEmpty() && + !isTopologyResumed.get(); + } + private void removeUpdatingAndPausedTasks() { changelogReader.clear(); measureCheckpointLatency(() -> updatingTasks.forEach((id, task) -> { @@ -525,6 +601,7 @@ public class DefaultStateUpdater implements StateUpdater { private final Logger log; private final String name; private final Metrics metrics; + private final Consumer restoreConsumer; private final ChangelogReader changelogReader; private final TopologyMetadata topologyMetadata; private final Queue tasksAndActions = new LinkedList<>(); @@ -546,12 +623,14 @@ public class DefaultStateUpdater implements StateUpdater { public DefaultStateUpdater(final String name, final Metrics metrics, final StreamsConfig config, + final Consumer restoreConsumer, final ChangelogReader changelogReader, final TopologyMetadata topologyMetadata, final Time time) { this.time = time; this.name = name; this.metrics = metrics; + this.restoreConsumer = restoreConsumer; this.changelogReader = changelogReader; this.topologyMetadata = topologyMetadata; this.commitIntervalMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG); @@ -561,6 +640,7 @@ public class DefaultStateUpdater implements StateUpdater { this.log = logContext.logger(DefaultStateUpdater.class); } + @Override public void start() { if (stateUpdaterThread == null) { stateUpdaterThread = new StateUpdaterThread(name, metrics, changelogReader); @@ -772,6 +852,11 @@ public class DefaultStateUpdater implements StateUpdater { ); } + @Override + public KafkaFutureImpl restoreConsumerInstanceId(final Duration timeout) { + return stateUpdaterThread.restoreConsumerInstanceId(timeout); + } + // used for testing boolean isIdle() { if (stateUpdaterThread != null) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java index f7297518f2a..445d72aec83 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.internals.KafkaFutureImpl; import org.apache.kafka.streams.processor.TaskId; import java.time.Duration; @@ -208,4 +210,9 @@ public interface StateUpdater { * @return set of all tasks managed by the state updater */ Set getStandbyTasks(); + + /** + * Get the restore consumer instance id for telemetry, and complete the given future to return it. + */ + KafkaFutureImpl restoreConsumerInstanceId(final Duration timeout); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 55d09e8926c..560da2928f7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -286,6 +286,7 @@ public class StreamThread extends Thread implements ProcessingThread { private final int maxPollTimeMs; private final String originalReset; private final TaskManager taskManager; + private final StateUpdater stateUpdater; private final StreamsMetricsImpl streamsMetrics; private final Sensor commitSensor; @@ -346,6 +347,7 @@ public class StreamThread extends Thread implements ProcessingThread { private volatile long fetchDeadlineClientInstanceId = -1; private volatile KafkaFutureImpl mainConsumerInstanceIdFuture = new KafkaFutureImpl<>(); + private volatile KafkaFutureImpl restoreConsumerInstanceIdFuture = new KafkaFutureImpl<>(); private volatile KafkaFutureImpl>> producerInstanceIdFuture = new KafkaFutureImpl<>(); private volatile KafkaFutureImpl threadProducerInstanceIdFuture = new KafkaFutureImpl<>(); @@ -427,7 +429,17 @@ public class StreamThread extends Thread implements ProcessingThread { final DefaultTaskManager schedulingTaskManager = maybeCreateSchedulingTaskManager(processingThreadsEnabled, stateUpdaterEnabled, topologyMetadata, time, threadId, tasks); final StateUpdater stateUpdater = - maybeCreateAndStartStateUpdater(stateUpdaterEnabled, streamsMetrics, config, changelogReader, topologyMetadata, time, clientId, threadIdx); + maybeCreateAndStartStateUpdater( + stateUpdaterEnabled, + streamsMetrics, + config, + restoreConsumer, + changelogReader, + topologyMetadata, + time, + clientId, + threadIdx + ); final TaskManager taskManager = new TaskManager( time, @@ -469,6 +481,7 @@ public class StreamThread extends Thread implements ProcessingThread { changelogReader, originalReset, taskManager, + stateUpdater, streamsMetrics, topologyMetadata, threadId, @@ -512,6 +525,7 @@ public class StreamThread extends Thread implements ProcessingThread { private static StateUpdater maybeCreateAndStartStateUpdater(final boolean stateUpdaterEnabled, final StreamsMetricsImpl streamsMetrics, final StreamsConfig streamsConfig, + final Consumer restoreConsumer, final ChangelogReader changelogReader, final TopologyMetadata topologyMetadata, final Time time, @@ -519,7 +533,15 @@ public class StreamThread extends Thread implements ProcessingThread { final int threadIdx) { if (stateUpdaterEnabled) { final String name = clientId + "-StateUpdater-" + threadIdx; - final StateUpdater stateUpdater = new DefaultStateUpdater(name, streamsMetrics.metricsRegistry(), streamsConfig, changelogReader, topologyMetadata, time); + final StateUpdater stateUpdater = new DefaultStateUpdater( + name, + streamsMetrics.metricsRegistry(), + streamsConfig, + restoreConsumer, + changelogReader, + topologyMetadata, + time + ); stateUpdater.start(); return stateUpdater; } else { @@ -536,6 +558,7 @@ public class StreamThread extends Thread implements ProcessingThread { final ChangelogReader changelogReader, final String originalReset, final TaskManager taskManager, + final StateUpdater stateUpdater, final StreamsMetricsImpl streamsMetrics, final TopologyMetadata topologyMetadata, final String threadId, @@ -598,6 +621,7 @@ public class StreamThread extends Thread implements ProcessingThread { this.log = logContext.logger(StreamThread.class); this.rebalanceListener = new StreamsRebalanceListener(time, taskManager, this, this.log, this.assignmentErrorCode); this.taskManager = taskManager; + this.stateUpdater = stateUpdater; this.restoreConsumer = restoreConsumer; this.mainConsumer = mainConsumer; this.changelogReader = changelogReader; @@ -759,6 +783,27 @@ public class StreamThread extends Thread implements ProcessingThread { } } + + if (!stateUpdaterEnabled && !restoreConsumerInstanceIdFuture.isDone()) { + if (fetchDeadlineClientInstanceId >= time.milliseconds()) { + try { + restoreConsumerInstanceIdFuture.complete(restoreConsumer.clientInstanceId(Duration.ZERO)); + } catch (final IllegalStateException disabledError) { + // if telemetry is disabled on a client, we swallow the error, + // to allow returning a partial result for all other clients + restoreConsumerInstanceIdFuture.complete(null); + } catch (final TimeoutException swallow) { + // swallow + } catch (final Exception error) { + restoreConsumerInstanceIdFuture.completeExceptionally(error); + } + } else { + restoreConsumerInstanceIdFuture.completeExceptionally( + new TimeoutException("Could not retrieve restore consumer client instance id.") + ); + } + } + if (!processingMode.equals(StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_ALPHA) && !threadProducerInstanceIdFuture.isDone()) { @@ -788,7 +833,8 @@ public class StreamThread extends Thread implements ProcessingThread { } private void maybeResetFetchDeadline() { - boolean reset = mainConsumerInstanceIdFuture.isDone(); + boolean reset = mainConsumerInstanceIdFuture.isDone() + && (!stateUpdaterEnabled && restoreConsumerInstanceIdFuture.isDone()); if (processingMode.equals(StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_ALPHA)) { throw new UnsupportedOperationException("not implemented yet"); @@ -1583,6 +1629,20 @@ public class StreamThread extends Thread implements ProcessingThread { } result.put(getName() + "-consumer", mainConsumerInstanceIdFuture); + if (stateUpdaterEnabled) { + restoreConsumerInstanceIdFuture = stateUpdater.restoreConsumerInstanceId(timeout); + } else { + if (restoreConsumerInstanceIdFuture.isDone()) { + if (restoreConsumerInstanceIdFuture.isCompletedExceptionally()) { + restoreConsumerInstanceIdFuture = new KafkaFutureImpl<>(); + setDeadline = true; + } + } else { + setDeadline = true; + } + } + result.put(getName() + "-restore-consumer", restoreConsumerInstanceIdFuture); + if (setDeadline) { fetchDeadlineClientInstanceId = time.milliseconds() + timeout.toMillis(); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java index 1c5c4292336..4d4728edab0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java @@ -104,7 +104,7 @@ class DefaultStateUpdaterTest { private final ChangelogReader changelogReader = mock(ChangelogReader.class); private final TopologyMetadata topologyMetadata = unnamedTopology().build(); private DefaultStateUpdater stateUpdater = - new DefaultStateUpdater("test-state-updater", metrics, config, changelogReader, topologyMetadata, time); + new DefaultStateUpdater("test-state-updater", metrics, config, null, changelogReader, topologyMetadata, time); @AfterEach public void tearDown() { @@ -162,7 +162,7 @@ class DefaultStateUpdaterTest { @Test public void shouldRemoveUpdatingTasksOnShutdown() throws Exception { stateUpdater.shutdown(Duration.ofMillis(Long.MAX_VALUE)); - stateUpdater = new DefaultStateUpdater("test-state-updater", metrics, new StreamsConfig(configProps(Integer.MAX_VALUE)), changelogReader, topologyMetadata, time); + stateUpdater = new DefaultStateUpdater("test-state-updater", metrics, new StreamsConfig(configProps(Integer.MAX_VALUE)), null, changelogReader, topologyMetadata, time); final StreamTask activeTask = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build(); final StandbyTask standbyTask = standbyTask(TASK_0_2, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build(); when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet()); @@ -1397,7 +1397,7 @@ class DefaultStateUpdaterTest { public void shouldNotAutoCheckpointTasksIfIntervalNotElapsed() { // we need to use a non auto-ticking timer here to control how much time elapsed exactly final Time time = new MockTime(); - final DefaultStateUpdater stateUpdater = new DefaultStateUpdater("test-state-updater", metrics, config, changelogReader, topologyMetadata, time); + final DefaultStateUpdater stateUpdater = new DefaultStateUpdater("test-state-updater", metrics, config, null, changelogReader, topologyMetadata, time); try { final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build(); final StreamTask task2 = statefulTask(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build(); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index df2dacd3c65..46cf86008d5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -1494,6 +1494,7 @@ public class StreamThreadTest { changelogReader, null, taskManager, + null, streamsMetrics, new TopologyMetadata(internalTopologyBuilder, config), CLIENT_ID, @@ -2686,6 +2687,7 @@ public class StreamThreadTest { changelogReader, null, taskManager, + null, streamsMetrics, topologyMetadata, CLIENT_ID, @@ -2742,6 +2744,7 @@ public class StreamThreadTest { changelogReader, null, taskManager, + null, streamsMetrics, topologyMetadata, CLIENT_ID, @@ -2807,6 +2810,7 @@ public class StreamThreadTest { changelogReader, null, taskManager, + null, streamsMetrics, topologyMetadata, CLIENT_ID, @@ -2868,6 +2872,7 @@ public class StreamThreadTest { changelogReader, null, taskManager, + null, streamsMetrics, topologyMetadata, CLIENT_ID, @@ -2926,6 +2931,7 @@ public class StreamThreadTest { changelogReader, null, taskManager, + null, streamsMetrics, topologyMetadata, CLIENT_ID, @@ -3130,6 +3136,7 @@ public class StreamThreadTest { changelogReader, null, taskManager, + null, streamsMetrics, topologyMetadata, CLIENT_ID, @@ -3183,6 +3190,7 @@ public class StreamThreadTest { changelogReader, null, taskManager, + null, streamsMetrics, topologyMetadata, CLIENT_ID, @@ -3309,21 +3317,26 @@ public class StreamThreadTest { } @Test - public void shouldGetMainConsumerInstanceId() throws Exception { - getMainConsumerInstanceId(false); + public void shouldGetMainAndRestoreConsumerInstanceId() throws Exception { + getMainAndRestoreConsumerInstanceId(false); } @Test - public void shouldGetMainConsumerInstanceIdWithInternalTimeout() throws Exception { - getMainConsumerInstanceId(true); + public void shouldGetMainAndRestoreConsumerInstanceIdWithInternalTimeout() throws Exception { + getMainAndRestoreConsumerInstanceId(true); } - private void getMainConsumerInstanceId(final boolean injectTimeException) throws Exception { + private void getMainAndRestoreConsumerInstanceId(final boolean injectTimeException) throws Exception { final Uuid consumerInstanceId = Uuid.randomUuid(); clientSupplier.consumer.setClientInstanceId(consumerInstanceId); if (injectTimeException) { clientSupplier.consumer.injectTimeoutException(1); } + final Uuid restoreInstanceId = Uuid.randomUuid(); + clientSupplier.restoreConsumer.setClientInstanceId(restoreInstanceId); + if (injectTimeException) { + clientSupplier.restoreConsumer.injectTimeoutException(1); + } thread = createStreamThread("clientId"); thread.setState(State.STARTING); @@ -3336,6 +3349,10 @@ public class StreamThreadTest { final KafkaFuture mainConsumerFuture = consumerInstanceIdFutures.get("clientId-StreamThread-1-consumer"); final Uuid mainConsumerUuid = mainConsumerFuture.get(); assertThat(mainConsumerUuid, equalTo(consumerInstanceId)); + + final KafkaFuture restoreConsumerFuture = consumerInstanceIdFutures.get("clientId-StreamThread-1-restore-consumer"); + final Uuid restoreConsumerUuid = restoreConsumerFuture.get(); + assertThat(restoreConsumerUuid, equalTo(restoreInstanceId)); } @Test @@ -3386,6 +3403,21 @@ public class StreamThreadTest { assertThat(error.getCause().getMessage(), equalTo("clientInstanceId not set")); } + @Test + public void shouldReturnErrorIfRestoreConsumerInstanceIdNotInitialized() { + thread = createStreamThread("clientId"); + thread.setState(State.STARTING); + + final Map> consumerFutures = thread.consumerClientInstanceIds(Duration.ZERO); + + thread.maybeGetClientInstanceIds(); + + final KafkaFuture future = consumerFutures.get("clientId-StreamThread-1-restore-consumer"); + final ExecutionException error = assertThrows(ExecutionException.class, future::get); + assertThat(error.getCause(), instanceOf(UnsupportedOperationException.class)); + assertThat(error.getCause().getMessage(), equalTo("clientInstanceId not set")); + } + @Test public void shouldReturnErrorIfProducerInstanceIdNotInitialized() throws Exception { thread = createStreamThread("clientId"); @@ -3416,6 +3448,22 @@ public class StreamThreadTest { assertThat(clientInstanceId, equalTo(null)); } + @Test + public void shouldReturnNullIfRestoreConsumerTelemetryDisabled() throws Exception { + clientSupplier.restoreConsumer.disableTelemetry(); + + thread = createStreamThread("clientId"); + thread.setState(State.STARTING); + + final Map> consumerFutures = thread.consumerClientInstanceIds(Duration.ZERO); + + thread.maybeGetClientInstanceIds(); + + final KafkaFuture future = consumerFutures.get("clientId-StreamThread-1-restore-consumer"); + final Uuid clientInstanceId = future.get(); + assertThat(clientInstanceId, equalTo(null)); + } + @Test public void shouldReturnNullIfProducerTelemetryDisabled() throws Exception { final MockProducer producer = new MockProducer<>(); @@ -3456,6 +3504,30 @@ public class StreamThreadTest { ); } + + @Test + public void shouldTimeOutOnRestoreConsumerInstanceId() { + clientSupplier.restoreConsumer.setClientInstanceId(Uuid.randomUuid()); + clientSupplier.restoreConsumer.injectTimeoutException(-1); + thread = createStreamThread("clientId"); + thread.setState(State.STARTING); + + final Map> consumerFutures = thread.consumerClientInstanceIds(Duration.ZERO); + + mockTime.sleep(1L); + + thread.maybeGetClientInstanceIds(); + + final KafkaFuture future = consumerFutures.get("clientId-StreamThread-1-restore-consumer"); + + final ExecutionException error = assertThrows(ExecutionException.class, future::get); + assertThat(error.getCause(), instanceOf(TimeoutException.class)); + assertThat( + error.getCause().getMessage(), + equalTo("Could not retrieve restore consumer client instance id.") + ); + } + @Test public void shouldTimeOutOnProducerInstanceId() throws Exception { final MockProducer producer = new MockProducer<>(); @@ -3500,6 +3572,7 @@ public class StreamThreadTest { changelogReader, "", taskManager, + null, new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime), topologyMetadata, "thread-id", @@ -3620,6 +3693,7 @@ public class StreamThreadTest { changelogReader, null, taskManager, + null, streamsMetrics, topologyMetadata, CLIENT_ID,