KAFKA-15662: Add support for clientInstanceIds in Kafka Stream (#14936)

Part of KIP-714.

Add support to collect client instance id of the restore consumer.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
This commit is contained in:
Matthias J. Sax 2023-12-12 08:54:45 -08:00 committed by GitHub
parent be531c681c
commit 083aa61a96
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 243 additions and 17 deletions

View File

@ -16,8 +16,12 @@
*/ */
package org.apache.kafka.streams.processor.internals; package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.MetricName; import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.Sensor.RecordingLevel; import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
@ -79,6 +83,9 @@ public class DefaultStateUpdater implements StateUpdater {
private long totalCheckpointLatency = 0L; private long totalCheckpointLatency = 0L;
private volatile long fetchDeadlineClientInstanceId = -1L;
private volatile KafkaFutureImpl<Uuid> clientInstanceIdFuture = new KafkaFutureImpl<>();
public StateUpdaterThread(final String name, public StateUpdaterThread(final String name,
final Metrics metrics, final Metrics metrics,
final ChangelogReader changelogReader) { final ChangelogReader changelogReader) {
@ -165,6 +172,8 @@ public class DefaultStateUpdater implements StateUpdater {
pauseTasks(); pauseTasks();
restoreTasks(totalStartTimeMs); restoreTasks(totalStartTimeMs);
maybeGetClientInstanceIds();
final long checkpointStartTimeMs = time.milliseconds(); final long checkpointStartTimeMs = time.milliseconds();
maybeCheckpointTasks(checkpointStartTimeMs); maybeCheckpointTasks(checkpointStartTimeMs);
@ -231,6 +240,69 @@ public class DefaultStateUpdater implements StateUpdater {
} }
} }
private void maybeGetClientInstanceIds() {
if (fetchDeadlineClientInstanceId != -1) {
if (!clientInstanceIdFuture.isDone()) {
if (fetchDeadlineClientInstanceId >= time.milliseconds()) {
try {
// if the state-updated thread has active work:
// we pass in a timeout of zero into each `clientInstanceId()` call
// to just trigger the "get instance id" background RPC;
// we don't want to block the state updater thread that can do useful work in the meantime
// otherwise, we pass in 100ms to avoid busy waiting
clientInstanceIdFuture.complete(
restoreConsumer.clientInstanceId(
allWorkDone() ? Duration.ofMillis(100L) : Duration.ZERO
)
);
fetchDeadlineClientInstanceId = -1L;
} catch (final IllegalStateException disabledError) {
// if telemetry is disabled on a client, we swallow the error,
// to allow returning a partial result for all other clients
clientInstanceIdFuture.complete(null);
fetchDeadlineClientInstanceId = -1L;
} catch (final TimeoutException swallow) {
// swallow
} catch (final Exception error) {
clientInstanceIdFuture.completeExceptionally(error);
fetchDeadlineClientInstanceId = -1L;
}
} else {
clientInstanceIdFuture.completeExceptionally(
new TimeoutException("Could not retrieve restore consumer client instance id.")
);
fetchDeadlineClientInstanceId = -1L;
}
}
}
}
private KafkaFutureImpl<Uuid> restoreConsumerInstanceId(final Duration timeout) {
boolean setDeadline = false;
if (clientInstanceIdFuture.isDone()) {
if (clientInstanceIdFuture.isCompletedExceptionally()) {
clientInstanceIdFuture = new KafkaFutureImpl<>();
setDeadline = true;
}
} else {
setDeadline = true;
}
if (setDeadline) {
fetchDeadlineClientInstanceId = time.milliseconds() + timeout.toMillis();
tasksAndActionsLock.lock();
try {
tasksAndActionsCondition.signalAll();
} finally {
tasksAndActionsLock.unlock();
}
}
return clientInstanceIdFuture;
}
private void handleRuntimeException(final RuntimeException runtimeException) { private void handleRuntimeException(final RuntimeException runtimeException) {
log.error("An unexpected error occurred within the state updater thread: " + runtimeException); log.error("An unexpected error occurred within the state updater thread: " + runtimeException);
addToExceptionsAndFailedTasksThenClearUpdatingTasks(new ExceptionAndTasks(new HashSet<>(updatingTasks.values()), runtimeException)); addToExceptionsAndFailedTasksThenClearUpdatingTasks(new ExceptionAndTasks(new HashSet<>(updatingTasks.values()), runtimeException));
@ -306,13 +378,8 @@ public class DefaultStateUpdater implements StateUpdater {
private void waitIfAllChangelogsCompletelyRead() { private void waitIfAllChangelogsCompletelyRead() {
tasksAndActionsLock.lock(); tasksAndActionsLock.lock();
final boolean noTasksToUpdate = changelogReader.allChangelogsCompleted() || updatingTasks.isEmpty();
try { try {
while (isRunning.get() && while (allWorkDone() && fetchDeadlineClientInstanceId == -1L) {
noTasksToUpdate &&
tasksAndActions.isEmpty() &&
!isTopologyResumed.get()) {
isIdle.set(true); isIdle.set(true);
tasksAndActionsCondition.await(); tasksAndActionsCondition.await();
} }
@ -325,6 +392,15 @@ public class DefaultStateUpdater implements StateUpdater {
} }
} }
private boolean allWorkDone() {
final boolean noTasksToUpdate = changelogReader.allChangelogsCompleted() || updatingTasks.isEmpty();
return isRunning.get() &&
noTasksToUpdate &&
tasksAndActions.isEmpty() &&
!isTopologyResumed.get();
}
private void removeUpdatingAndPausedTasks() { private void removeUpdatingAndPausedTasks() {
changelogReader.clear(); changelogReader.clear();
measureCheckpointLatency(() -> updatingTasks.forEach((id, task) -> { measureCheckpointLatency(() -> updatingTasks.forEach((id, task) -> {
@ -525,6 +601,7 @@ public class DefaultStateUpdater implements StateUpdater {
private final Logger log; private final Logger log;
private final String name; private final String name;
private final Metrics metrics; private final Metrics metrics;
private final Consumer<byte[], byte[]> restoreConsumer;
private final ChangelogReader changelogReader; private final ChangelogReader changelogReader;
private final TopologyMetadata topologyMetadata; private final TopologyMetadata topologyMetadata;
private final Queue<TaskAndAction> tasksAndActions = new LinkedList<>(); private final Queue<TaskAndAction> tasksAndActions = new LinkedList<>();
@ -546,12 +623,14 @@ public class DefaultStateUpdater implements StateUpdater {
public DefaultStateUpdater(final String name, public DefaultStateUpdater(final String name,
final Metrics metrics, final Metrics metrics,
final StreamsConfig config, final StreamsConfig config,
final Consumer<byte[], byte[]> restoreConsumer,
final ChangelogReader changelogReader, final ChangelogReader changelogReader,
final TopologyMetadata topologyMetadata, final TopologyMetadata topologyMetadata,
final Time time) { final Time time) {
this.time = time; this.time = time;
this.name = name; this.name = name;
this.metrics = metrics; this.metrics = metrics;
this.restoreConsumer = restoreConsumer;
this.changelogReader = changelogReader; this.changelogReader = changelogReader;
this.topologyMetadata = topologyMetadata; this.topologyMetadata = topologyMetadata;
this.commitIntervalMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG); this.commitIntervalMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG);
@ -561,6 +640,7 @@ public class DefaultStateUpdater implements StateUpdater {
this.log = logContext.logger(DefaultStateUpdater.class); this.log = logContext.logger(DefaultStateUpdater.class);
} }
@Override
public void start() { public void start() {
if (stateUpdaterThread == null) { if (stateUpdaterThread == null) {
stateUpdaterThread = new StateUpdaterThread(name, metrics, changelogReader); stateUpdaterThread = new StateUpdaterThread(name, metrics, changelogReader);
@ -772,6 +852,11 @@ public class DefaultStateUpdater implements StateUpdater {
); );
} }
@Override
public KafkaFutureImpl<Uuid> restoreConsumerInstanceId(final Duration timeout) {
return stateUpdaterThread.restoreConsumerInstanceId(timeout);
}
// used for testing // used for testing
boolean isIdle() { boolean isIdle() {
if (stateUpdaterThread != null) { if (stateUpdaterThread != null) {

View File

@ -16,6 +16,8 @@
*/ */
package org.apache.kafka.streams.processor.internals; package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.TaskId;
import java.time.Duration; import java.time.Duration;
@ -208,4 +210,9 @@ public interface StateUpdater {
* @return set of all tasks managed by the state updater * @return set of all tasks managed by the state updater
*/ */
Set<StandbyTask> getStandbyTasks(); Set<StandbyTask> getStandbyTasks();
/**
* Get the restore consumer instance id for telemetry, and complete the given future to return it.
*/
KafkaFutureImpl<Uuid> restoreConsumerInstanceId(final Duration timeout);
} }

View File

@ -286,6 +286,7 @@ public class StreamThread extends Thread implements ProcessingThread {
private final int maxPollTimeMs; private final int maxPollTimeMs;
private final String originalReset; private final String originalReset;
private final TaskManager taskManager; private final TaskManager taskManager;
private final StateUpdater stateUpdater;
private final StreamsMetricsImpl streamsMetrics; private final StreamsMetricsImpl streamsMetrics;
private final Sensor commitSensor; private final Sensor commitSensor;
@ -346,6 +347,7 @@ public class StreamThread extends Thread implements ProcessingThread {
private volatile long fetchDeadlineClientInstanceId = -1; private volatile long fetchDeadlineClientInstanceId = -1;
private volatile KafkaFutureImpl<Uuid> mainConsumerInstanceIdFuture = new KafkaFutureImpl<>(); private volatile KafkaFutureImpl<Uuid> mainConsumerInstanceIdFuture = new KafkaFutureImpl<>();
private volatile KafkaFutureImpl<Uuid> restoreConsumerInstanceIdFuture = new KafkaFutureImpl<>();
private volatile KafkaFutureImpl<Map<String, KafkaFuture<Uuid>>> producerInstanceIdFuture = new KafkaFutureImpl<>(); private volatile KafkaFutureImpl<Map<String, KafkaFuture<Uuid>>> producerInstanceIdFuture = new KafkaFutureImpl<>();
private volatile KafkaFutureImpl<Uuid> threadProducerInstanceIdFuture = new KafkaFutureImpl<>(); private volatile KafkaFutureImpl<Uuid> threadProducerInstanceIdFuture = new KafkaFutureImpl<>();
@ -427,7 +429,17 @@ public class StreamThread extends Thread implements ProcessingThread {
final DefaultTaskManager schedulingTaskManager = final DefaultTaskManager schedulingTaskManager =
maybeCreateSchedulingTaskManager(processingThreadsEnabled, stateUpdaterEnabled, topologyMetadata, time, threadId, tasks); maybeCreateSchedulingTaskManager(processingThreadsEnabled, stateUpdaterEnabled, topologyMetadata, time, threadId, tasks);
final StateUpdater stateUpdater = final StateUpdater stateUpdater =
maybeCreateAndStartStateUpdater(stateUpdaterEnabled, streamsMetrics, config, changelogReader, topologyMetadata, time, clientId, threadIdx); maybeCreateAndStartStateUpdater(
stateUpdaterEnabled,
streamsMetrics,
config,
restoreConsumer,
changelogReader,
topologyMetadata,
time,
clientId,
threadIdx
);
final TaskManager taskManager = new TaskManager( final TaskManager taskManager = new TaskManager(
time, time,
@ -469,6 +481,7 @@ public class StreamThread extends Thread implements ProcessingThread {
changelogReader, changelogReader,
originalReset, originalReset,
taskManager, taskManager,
stateUpdater,
streamsMetrics, streamsMetrics,
topologyMetadata, topologyMetadata,
threadId, threadId,
@ -512,6 +525,7 @@ public class StreamThread extends Thread implements ProcessingThread {
private static StateUpdater maybeCreateAndStartStateUpdater(final boolean stateUpdaterEnabled, private static StateUpdater maybeCreateAndStartStateUpdater(final boolean stateUpdaterEnabled,
final StreamsMetricsImpl streamsMetrics, final StreamsMetricsImpl streamsMetrics,
final StreamsConfig streamsConfig, final StreamsConfig streamsConfig,
final Consumer<byte[], byte[]> restoreConsumer,
final ChangelogReader changelogReader, final ChangelogReader changelogReader,
final TopologyMetadata topologyMetadata, final TopologyMetadata topologyMetadata,
final Time time, final Time time,
@ -519,7 +533,15 @@ public class StreamThread extends Thread implements ProcessingThread {
final int threadIdx) { final int threadIdx) {
if (stateUpdaterEnabled) { if (stateUpdaterEnabled) {
final String name = clientId + "-StateUpdater-" + threadIdx; final String name = clientId + "-StateUpdater-" + threadIdx;
final StateUpdater stateUpdater = new DefaultStateUpdater(name, streamsMetrics.metricsRegistry(), streamsConfig, changelogReader, topologyMetadata, time); final StateUpdater stateUpdater = new DefaultStateUpdater(
name,
streamsMetrics.metricsRegistry(),
streamsConfig,
restoreConsumer,
changelogReader,
topologyMetadata,
time
);
stateUpdater.start(); stateUpdater.start();
return stateUpdater; return stateUpdater;
} else { } else {
@ -536,6 +558,7 @@ public class StreamThread extends Thread implements ProcessingThread {
final ChangelogReader changelogReader, final ChangelogReader changelogReader,
final String originalReset, final String originalReset,
final TaskManager taskManager, final TaskManager taskManager,
final StateUpdater stateUpdater,
final StreamsMetricsImpl streamsMetrics, final StreamsMetricsImpl streamsMetrics,
final TopologyMetadata topologyMetadata, final TopologyMetadata topologyMetadata,
final String threadId, final String threadId,
@ -598,6 +621,7 @@ public class StreamThread extends Thread implements ProcessingThread {
this.log = logContext.logger(StreamThread.class); this.log = logContext.logger(StreamThread.class);
this.rebalanceListener = new StreamsRebalanceListener(time, taskManager, this, this.log, this.assignmentErrorCode); this.rebalanceListener = new StreamsRebalanceListener(time, taskManager, this, this.log, this.assignmentErrorCode);
this.taskManager = taskManager; this.taskManager = taskManager;
this.stateUpdater = stateUpdater;
this.restoreConsumer = restoreConsumer; this.restoreConsumer = restoreConsumer;
this.mainConsumer = mainConsumer; this.mainConsumer = mainConsumer;
this.changelogReader = changelogReader; this.changelogReader = changelogReader;
@ -759,6 +783,27 @@ public class StreamThread extends Thread implements ProcessingThread {
} }
} }
if (!stateUpdaterEnabled && !restoreConsumerInstanceIdFuture.isDone()) {
if (fetchDeadlineClientInstanceId >= time.milliseconds()) {
try {
restoreConsumerInstanceIdFuture.complete(restoreConsumer.clientInstanceId(Duration.ZERO));
} catch (final IllegalStateException disabledError) {
// if telemetry is disabled on a client, we swallow the error,
// to allow returning a partial result for all other clients
restoreConsumerInstanceIdFuture.complete(null);
} catch (final TimeoutException swallow) {
// swallow
} catch (final Exception error) {
restoreConsumerInstanceIdFuture.completeExceptionally(error);
}
} else {
restoreConsumerInstanceIdFuture.completeExceptionally(
new TimeoutException("Could not retrieve restore consumer client instance id.")
);
}
}
if (!processingMode.equals(StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_ALPHA) && if (!processingMode.equals(StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_ALPHA) &&
!threadProducerInstanceIdFuture.isDone()) { !threadProducerInstanceIdFuture.isDone()) {
@ -788,7 +833,8 @@ public class StreamThread extends Thread implements ProcessingThread {
} }
private void maybeResetFetchDeadline() { private void maybeResetFetchDeadline() {
boolean reset = mainConsumerInstanceIdFuture.isDone(); boolean reset = mainConsumerInstanceIdFuture.isDone()
&& (!stateUpdaterEnabled && restoreConsumerInstanceIdFuture.isDone());
if (processingMode.equals(StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_ALPHA)) { if (processingMode.equals(StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_ALPHA)) {
throw new UnsupportedOperationException("not implemented yet"); throw new UnsupportedOperationException("not implemented yet");
@ -1583,6 +1629,20 @@ public class StreamThread extends Thread implements ProcessingThread {
} }
result.put(getName() + "-consumer", mainConsumerInstanceIdFuture); result.put(getName() + "-consumer", mainConsumerInstanceIdFuture);
if (stateUpdaterEnabled) {
restoreConsumerInstanceIdFuture = stateUpdater.restoreConsumerInstanceId(timeout);
} else {
if (restoreConsumerInstanceIdFuture.isDone()) {
if (restoreConsumerInstanceIdFuture.isCompletedExceptionally()) {
restoreConsumerInstanceIdFuture = new KafkaFutureImpl<>();
setDeadline = true;
}
} else {
setDeadline = true;
}
}
result.put(getName() + "-restore-consumer", restoreConsumerInstanceIdFuture);
if (setDeadline) { if (setDeadline) {
fetchDeadlineClientInstanceId = time.milliseconds() + timeout.toMillis(); fetchDeadlineClientInstanceId = time.milliseconds() + timeout.toMillis();
} }

View File

@ -104,7 +104,7 @@ class DefaultStateUpdaterTest {
private final ChangelogReader changelogReader = mock(ChangelogReader.class); private final ChangelogReader changelogReader = mock(ChangelogReader.class);
private final TopologyMetadata topologyMetadata = unnamedTopology().build(); private final TopologyMetadata topologyMetadata = unnamedTopology().build();
private DefaultStateUpdater stateUpdater = private DefaultStateUpdater stateUpdater =
new DefaultStateUpdater("test-state-updater", metrics, config, changelogReader, topologyMetadata, time); new DefaultStateUpdater("test-state-updater", metrics, config, null, changelogReader, topologyMetadata, time);
@AfterEach @AfterEach
public void tearDown() { public void tearDown() {
@ -162,7 +162,7 @@ class DefaultStateUpdaterTest {
@Test @Test
public void shouldRemoveUpdatingTasksOnShutdown() throws Exception { public void shouldRemoveUpdatingTasksOnShutdown() throws Exception {
stateUpdater.shutdown(Duration.ofMillis(Long.MAX_VALUE)); stateUpdater.shutdown(Duration.ofMillis(Long.MAX_VALUE));
stateUpdater = new DefaultStateUpdater("test-state-updater", metrics, new StreamsConfig(configProps(Integer.MAX_VALUE)), changelogReader, topologyMetadata, time); stateUpdater = new DefaultStateUpdater("test-state-updater", metrics, new StreamsConfig(configProps(Integer.MAX_VALUE)), null, changelogReader, topologyMetadata, time);
final StreamTask activeTask = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build(); final StreamTask activeTask = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final StandbyTask standbyTask = standbyTask(TASK_0_2, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build(); final StandbyTask standbyTask = standbyTask(TASK_0_2, mkSet(TOPIC_PARTITION_C_0)).inState(State.RUNNING).build();
when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet()); when(changelogReader.completedChangelogs()).thenReturn(Collections.emptySet());
@ -1397,7 +1397,7 @@ class DefaultStateUpdaterTest {
public void shouldNotAutoCheckpointTasksIfIntervalNotElapsed() { public void shouldNotAutoCheckpointTasksIfIntervalNotElapsed() {
// we need to use a non auto-ticking timer here to control how much time elapsed exactly // we need to use a non auto-ticking timer here to control how much time elapsed exactly
final Time time = new MockTime(); final Time time = new MockTime();
final DefaultStateUpdater stateUpdater = new DefaultStateUpdater("test-state-updater", metrics, config, changelogReader, topologyMetadata, time); final DefaultStateUpdater stateUpdater = new DefaultStateUpdater("test-state-updater", metrics, config, null, changelogReader, topologyMetadata, time);
try { try {
final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build(); final StreamTask task1 = statefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final StreamTask task2 = statefulTask(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build(); final StreamTask task2 = statefulTask(TASK_0_2, mkSet(TOPIC_PARTITION_B_0)).inState(State.RESTORING).build();

View File

@ -1494,6 +1494,7 @@ public class StreamThreadTest {
changelogReader, changelogReader,
null, null,
taskManager, taskManager,
null,
streamsMetrics, streamsMetrics,
new TopologyMetadata(internalTopologyBuilder, config), new TopologyMetadata(internalTopologyBuilder, config),
CLIENT_ID, CLIENT_ID,
@ -2686,6 +2687,7 @@ public class StreamThreadTest {
changelogReader, changelogReader,
null, null,
taskManager, taskManager,
null,
streamsMetrics, streamsMetrics,
topologyMetadata, topologyMetadata,
CLIENT_ID, CLIENT_ID,
@ -2742,6 +2744,7 @@ public class StreamThreadTest {
changelogReader, changelogReader,
null, null,
taskManager, taskManager,
null,
streamsMetrics, streamsMetrics,
topologyMetadata, topologyMetadata,
CLIENT_ID, CLIENT_ID,
@ -2807,6 +2810,7 @@ public class StreamThreadTest {
changelogReader, changelogReader,
null, null,
taskManager, taskManager,
null,
streamsMetrics, streamsMetrics,
topologyMetadata, topologyMetadata,
CLIENT_ID, CLIENT_ID,
@ -2868,6 +2872,7 @@ public class StreamThreadTest {
changelogReader, changelogReader,
null, null,
taskManager, taskManager,
null,
streamsMetrics, streamsMetrics,
topologyMetadata, topologyMetadata,
CLIENT_ID, CLIENT_ID,
@ -2926,6 +2931,7 @@ public class StreamThreadTest {
changelogReader, changelogReader,
null, null,
taskManager, taskManager,
null,
streamsMetrics, streamsMetrics,
topologyMetadata, topologyMetadata,
CLIENT_ID, CLIENT_ID,
@ -3130,6 +3136,7 @@ public class StreamThreadTest {
changelogReader, changelogReader,
null, null,
taskManager, taskManager,
null,
streamsMetrics, streamsMetrics,
topologyMetadata, topologyMetadata,
CLIENT_ID, CLIENT_ID,
@ -3183,6 +3190,7 @@ public class StreamThreadTest {
changelogReader, changelogReader,
null, null,
taskManager, taskManager,
null,
streamsMetrics, streamsMetrics,
topologyMetadata, topologyMetadata,
CLIENT_ID, CLIENT_ID,
@ -3309,21 +3317,26 @@ public class StreamThreadTest {
} }
@Test @Test
public void shouldGetMainConsumerInstanceId() throws Exception { public void shouldGetMainAndRestoreConsumerInstanceId() throws Exception {
getMainConsumerInstanceId(false); getMainAndRestoreConsumerInstanceId(false);
} }
@Test @Test
public void shouldGetMainConsumerInstanceIdWithInternalTimeout() throws Exception { public void shouldGetMainAndRestoreConsumerInstanceIdWithInternalTimeout() throws Exception {
getMainConsumerInstanceId(true); getMainAndRestoreConsumerInstanceId(true);
} }
private void getMainConsumerInstanceId(final boolean injectTimeException) throws Exception { private void getMainAndRestoreConsumerInstanceId(final boolean injectTimeException) throws Exception {
final Uuid consumerInstanceId = Uuid.randomUuid(); final Uuid consumerInstanceId = Uuid.randomUuid();
clientSupplier.consumer.setClientInstanceId(consumerInstanceId); clientSupplier.consumer.setClientInstanceId(consumerInstanceId);
if (injectTimeException) { if (injectTimeException) {
clientSupplier.consumer.injectTimeoutException(1); clientSupplier.consumer.injectTimeoutException(1);
} }
final Uuid restoreInstanceId = Uuid.randomUuid();
clientSupplier.restoreConsumer.setClientInstanceId(restoreInstanceId);
if (injectTimeException) {
clientSupplier.restoreConsumer.injectTimeoutException(1);
}
thread = createStreamThread("clientId"); thread = createStreamThread("clientId");
thread.setState(State.STARTING); thread.setState(State.STARTING);
@ -3336,6 +3349,10 @@ public class StreamThreadTest {
final KafkaFuture<Uuid> mainConsumerFuture = consumerInstanceIdFutures.get("clientId-StreamThread-1-consumer"); final KafkaFuture<Uuid> mainConsumerFuture = consumerInstanceIdFutures.get("clientId-StreamThread-1-consumer");
final Uuid mainConsumerUuid = mainConsumerFuture.get(); final Uuid mainConsumerUuid = mainConsumerFuture.get();
assertThat(mainConsumerUuid, equalTo(consumerInstanceId)); assertThat(mainConsumerUuid, equalTo(consumerInstanceId));
final KafkaFuture<Uuid> restoreConsumerFuture = consumerInstanceIdFutures.get("clientId-StreamThread-1-restore-consumer");
final Uuid restoreConsumerUuid = restoreConsumerFuture.get();
assertThat(restoreConsumerUuid, equalTo(restoreInstanceId));
} }
@Test @Test
@ -3386,6 +3403,21 @@ public class StreamThreadTest {
assertThat(error.getCause().getMessage(), equalTo("clientInstanceId not set")); assertThat(error.getCause().getMessage(), equalTo("clientInstanceId not set"));
} }
@Test
public void shouldReturnErrorIfRestoreConsumerInstanceIdNotInitialized() {
thread = createStreamThread("clientId");
thread.setState(State.STARTING);
final Map<String, KafkaFuture<Uuid>> consumerFutures = thread.consumerClientInstanceIds(Duration.ZERO);
thread.maybeGetClientInstanceIds();
final KafkaFuture<Uuid> future = consumerFutures.get("clientId-StreamThread-1-restore-consumer");
final ExecutionException error = assertThrows(ExecutionException.class, future::get);
assertThat(error.getCause(), instanceOf(UnsupportedOperationException.class));
assertThat(error.getCause().getMessage(), equalTo("clientInstanceId not set"));
}
@Test @Test
public void shouldReturnErrorIfProducerInstanceIdNotInitialized() throws Exception { public void shouldReturnErrorIfProducerInstanceIdNotInitialized() throws Exception {
thread = createStreamThread("clientId"); thread = createStreamThread("clientId");
@ -3416,6 +3448,22 @@ public class StreamThreadTest {
assertThat(clientInstanceId, equalTo(null)); assertThat(clientInstanceId, equalTo(null));
} }
@Test
public void shouldReturnNullIfRestoreConsumerTelemetryDisabled() throws Exception {
clientSupplier.restoreConsumer.disableTelemetry();
thread = createStreamThread("clientId");
thread.setState(State.STARTING);
final Map<String, KafkaFuture<Uuid>> consumerFutures = thread.consumerClientInstanceIds(Duration.ZERO);
thread.maybeGetClientInstanceIds();
final KafkaFuture<Uuid> future = consumerFutures.get("clientId-StreamThread-1-restore-consumer");
final Uuid clientInstanceId = future.get();
assertThat(clientInstanceId, equalTo(null));
}
@Test @Test
public void shouldReturnNullIfProducerTelemetryDisabled() throws Exception { public void shouldReturnNullIfProducerTelemetryDisabled() throws Exception {
final MockProducer<byte[], byte[]> producer = new MockProducer<>(); final MockProducer<byte[], byte[]> producer = new MockProducer<>();
@ -3456,6 +3504,30 @@ public class StreamThreadTest {
); );
} }
@Test
public void shouldTimeOutOnRestoreConsumerInstanceId() {
clientSupplier.restoreConsumer.setClientInstanceId(Uuid.randomUuid());
clientSupplier.restoreConsumer.injectTimeoutException(-1);
thread = createStreamThread("clientId");
thread.setState(State.STARTING);
final Map<String, KafkaFuture<Uuid>> consumerFutures = thread.consumerClientInstanceIds(Duration.ZERO);
mockTime.sleep(1L);
thread.maybeGetClientInstanceIds();
final KafkaFuture<Uuid> future = consumerFutures.get("clientId-StreamThread-1-restore-consumer");
final ExecutionException error = assertThrows(ExecutionException.class, future::get);
assertThat(error.getCause(), instanceOf(TimeoutException.class));
assertThat(
error.getCause().getMessage(),
equalTo("Could not retrieve restore consumer client instance id.")
);
}
@Test @Test
public void shouldTimeOutOnProducerInstanceId() throws Exception { public void shouldTimeOutOnProducerInstanceId() throws Exception {
final MockProducer<byte[], byte[]> producer = new MockProducer<>(); final MockProducer<byte[], byte[]> producer = new MockProducer<>();
@ -3500,6 +3572,7 @@ public class StreamThreadTest {
changelogReader, changelogReader,
"", "",
taskManager, taskManager,
null,
new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime), new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime),
topologyMetadata, topologyMetadata,
"thread-id", "thread-id",
@ -3620,6 +3693,7 @@ public class StreamThreadTest {
changelogReader, changelogReader,
null, null,
taskManager, taskManager,
null,
streamsMetrics, streamsMetrics,
topologyMetadata, topologyMetadata,
CLIENT_ID, CLIENT_ID,