KAFKA-15662: Add support for clientInstanceIds in Kafka Stream (#14935)

Part of KIP-714.

Add support to collect client instance id of the global consumer.

Reviewers: Walker Carlson <wcarlson@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>
This commit is contained in:
Matthias J. Sax 2023-12-08 09:42:32 -08:00 committed by GitHub
parent e6e7d8c09f
commit fb5d45d26e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 227 additions and 17 deletions

View File

@ -41,6 +41,7 @@ import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.errors.InvalidStateStorePartitionException;
import org.apache.kafka.streams.errors.ProcessorStateException;
@ -1895,9 +1896,7 @@ public class KafkaStreams implements AutoCloseable {
throw new IllegalStateException("KafkaStreams has been stopped (" + state + ").");
}
long remainingTimeMs = timeout.toMillis();
long startTimestampMs = time.milliseconds();
final Timer remainingTime = time.timer(timeout.toMillis());
final ClientInstanceIdsImpl clientInstanceIds = new ClientInstanceIdsImpl();
// (1) fan-out calls to threads
@ -1911,13 +1910,15 @@ public class KafkaStreams implements AutoCloseable {
}
// GlobalThread
KafkaFuture<Uuid> globalThreadFuture = null;
if (globalStreamThread != null) {
globalThreadFuture = globalStreamThread.globalConsumerInstanceId(timeout);
}
// (2) get admin client instance id in a blocking fashion, while Stream/GlobalThreads work in parallel
try {
clientInstanceIds.setAdminInstanceId(adminClient.clientInstanceId(timeout));
final long nowMs = time.milliseconds();
remainingTimeMs -= nowMs - startTimestampMs;
startTimestampMs = nowMs;
remainingTime.update(time.milliseconds());
} catch (final IllegalStateException telemetryDisabledError) {
// swallow
log.debug("Telemetry is disabled on the admin client.");
@ -1933,15 +1934,13 @@ public class KafkaStreams implements AutoCloseable {
for (final Map.Entry<String, KafkaFuture<Uuid>> consumerFuture : consumerFutures.entrySet()) {
final Uuid instanceId = getOrThrowException(
consumerFuture.getValue(),
remainingTimeMs,
remainingTime.remainingMs(),
() -> String.format(
"Could not retrieve consumer instance id for %s.",
consumerFuture.getKey()
)
);
final long nowMs = time.milliseconds();
remainingTimeMs -= nowMs - startTimestampMs;
startTimestampMs = nowMs;
remainingTime.update(time.milliseconds());
// could be `null` if telemetry is disabled on the consumer itself
if (instanceId != null) {
@ -1957,6 +1956,24 @@ public class KafkaStreams implements AutoCloseable {
// (3b) collect producers from StreamsThread
// (3c) collect from GlobalThread
if (globalThreadFuture != null) {
final Uuid instanceId = getOrThrowException(
globalThreadFuture,
remainingTime.remainingMs(),
() -> "Could not retrieve global consumer client instance id."
);
remainingTime.update(time.milliseconds());
// could be `null` if telemetry is disabled on the client itself
if (instanceId != null) {
clientInstanceIds.addConsumerInstanceId(
globalStreamThread.getName(),
instanceId
);
} else {
log.debug("Telemetry is disabled for the global consumer.");
}
}
return clientInstanceIds;
}

View File

@ -20,9 +20,13 @@ import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
@ -65,6 +69,8 @@ public class GlobalStreamThread extends Thread {
private final AtomicLong cacheSize;
private volatile StreamsException startupException;
private java.util.function.Consumer<Throwable> streamsUncaughtExceptionHandler;
private volatile long fetchDeadlineClientInstanceId = -1;
private volatile KafkaFutureImpl<Uuid> clientInstanceIdFuture = new KafkaFutureImpl<>();
/**
* The states that the global stream thread can be in
@ -310,6 +316,32 @@ public class GlobalStreamThread extends Thread {
cache.resize(size);
}
stateConsumer.pollAndUpdate();
if (fetchDeadlineClientInstanceId != -1) {
if (fetchDeadlineClientInstanceId >= time.milliseconds()) {
try {
// we pass in a timeout of zero, to just trigger the "get instance id" background RPC,
// we don't want to block the global thread that can do useful work in the meantime
clientInstanceIdFuture.complete(globalConsumer.clientInstanceId(Duration.ZERO));
fetchDeadlineClientInstanceId = -1;
} catch (final IllegalStateException disabledError) {
// if telemetry is disabled on a client, we swallow the error,
// to allow returning a partial result for all other clients
clientInstanceIdFuture.complete(null);
fetchDeadlineClientInstanceId = -1;
} catch (final TimeoutException swallow) {
// swallow
} catch (final Exception error) {
clientInstanceIdFuture.completeExceptionally(error);
fetchDeadlineClientInstanceId = -1;
}
} else {
clientInstanceIdFuture.completeExceptionally(
new TimeoutException("Could not retrieve global consumer client instance id.")
);
fetchDeadlineClientInstanceId = -1;
}
}
}
} catch (final InvalidOffsetException recoverableException) {
wipeStateStore = true;
@ -454,4 +486,24 @@ public class GlobalStreamThread extends Thread {
public Map<MetricName, Metric> consumerMetrics() {
return Collections.unmodifiableMap(globalConsumer.metrics());
}
// this method is NOT thread-safe (we rely on the callee to be `synchronized`)
public KafkaFuture<Uuid> globalConsumerInstanceId(final Duration timeout) {
boolean setDeadline = false;
if (clientInstanceIdFuture.isDone()) {
if (clientInstanceIdFuture.isCompletedExceptionally()) {
clientInstanceIdFuture = new KafkaFutureImpl<>();
setDeadline = true;
}
} else {
setDeadline = true;
}
if (setDeadline) {
fetchDeadlineClientInstanceId = time.milliseconds() + timeout.toMillis();
}
return clientInstanceIdFuture;
}
}

View File

@ -737,15 +737,14 @@ public class StreamThread extends Thread implements ProcessingThread {
if (fetchDeadlineClientInstanceId >= time.milliseconds()) {
try {
mainConsumerInstanceIdFuture.complete(mainConsumer.clientInstanceId(Duration.ZERO));
maybeResetFetchDeadline();
} catch (final IllegalStateException disabledError) {
// if telemetry is disabled on a client, we swallow the error,
// to allow returning a partial result for all other clients
mainConsumerInstanceIdFuture.complete(null);
maybeResetFetchDeadline();
} catch (final TimeoutException swallow) {
// swallow
} catch (final Exception error) {
mainConsumerInstanceIdFuture.completeExceptionally(error);
maybeResetFetchDeadline();
}
} else {
mainConsumerInstanceIdFuture.completeExceptionally(
@ -753,6 +752,7 @@ public class StreamThread extends Thread implements ProcessingThread {
);
}
}
maybeResetFetchDeadline();
}
}

View File

@ -1400,15 +1400,37 @@ public class KafkaStreamsTest {
}
}
@Test
public void shouldThrowTimeoutExceptionWhenGlobalConsumerFutureDoesNotComplete() {
adminClient.setClientInstanceId(Uuid.randomUuid());
final StreamsBuilder builder = getBuilderWithSource();
builder.globalTable("anyTopic");
try (final KafkaStreams streams = new KafkaStreams(builder.build(), props, supplier, time)) {
streams.start();
when(globalStreamThreadMockedConstruction.constructed().get(0).globalConsumerInstanceId(any()))
.thenReturn(new KafkaFutureImpl<>());
final TimeoutException timeoutException = assertThrows(
TimeoutException.class,
() -> streams.clientInstanceIds(Duration.ZERO)
);
assertThat(timeoutException.getCause(), instanceOf(java.util.concurrent.TimeoutException.class));
}
}
@Test
public void shouldCountDownTimeoutAcrossClient() {
adminClient.setClientInstanceId(Uuid.randomUuid());
adminClient.advanceTimeOnClientInstanceId(time, Duration.ofMillis(10L).toMillis());
final Time mockTime = time;
final AtomicLong expectedTimeout = new AtomicLong(20L);
final AtomicLong expectedTimeout = new AtomicLong(50L);
final AtomicBoolean didAssertThreadOne = new AtomicBoolean(false);
final AtomicBoolean didAssertThreadTwo = new AtomicBoolean(false);
final AtomicBoolean didAssertGlobalThread = new AtomicBoolean(false);
when(streamThreadOne.consumerClientInstanceIds(any()))
.thenReturn(Collections.singletonMap("consumer1", new KafkaFutureImpl<Uuid>() {
@Override
@ -1430,14 +1452,29 @@ public class KafkaStreamsTest {
}
}));
final StreamsBuilder builder = getBuilderWithSource();
builder.globalTable("anyTopic");
try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
try (final KafkaStreams streams = new KafkaStreams(builder.build(), props, supplier, time)) {
streams.start();
streams.clientInstanceIds(Duration.ofMillis(30L));
when(globalStreamThreadMockedConstruction.constructed().get(0).globalConsumerInstanceId(any()))
.thenReturn(new KafkaFutureImpl<Uuid>() {
@Override
public Uuid get(final long timeout, final TimeUnit timeUnit) {
didAssertGlobalThread.set(true);
assertThat(timeout, equalTo(expectedTimeout.getAndAdd(-8L)));
mockTime.sleep(8L);
return null;
}
});
streams.clientInstanceIds(Duration.ofMillis(60L));
}
assertThat(didAssertThreadOne.get(), equalTo(true));
assertThat(didAssertThreadTwo.get(), equalTo(true));
assertThat(didAssertGlobalThread.get(), equalTo(true));
}
@Deprecated // testing old PAPI

View File

@ -1506,7 +1506,7 @@ public class StreamsConfigTest {
}
@Test
public void shouldEnableMetricCollectionForAllInternalClients() {
public void shouldEnableMetricCollectionForAllInternalClientsByDefault() {
props.put(StreamsConfig.ENABLE_METRICS_PUSH_CONFIG, true);
final StreamsConfig streamsConfig = new StreamsConfig(props);

View File

@ -19,9 +19,12 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
@ -44,10 +47,12 @@ import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import static org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.DEAD;
import static org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.RUNNING;
@ -58,6 +63,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@ -296,6 +302,104 @@ public class GlobalStreamThreadTest {
assertFalse(new File(baseDirectoryName + File.separator + "testAppId" + File.separator + "global").exists());
}
@Test
public void shouldGetGlobalConsumerClientInstanceId() throws Exception {
initializeConsumer();
startAndSwallowError();
final Uuid instanceId = Uuid.randomUuid();
mockConsumer.setClientInstanceId(instanceId);
try {
final KafkaFuture<Uuid> future = globalStreamThread.globalConsumerInstanceId(Duration.ZERO);
final Uuid result = future.get();
assertThat(result, equalTo(instanceId));
} finally {
globalStreamThread.shutdown();
globalStreamThread.join();
}
}
@Test
public void shouldGetGlobalConsumerClientInstanceIdWithInternalTimeoutException() throws Exception {
initializeConsumer();
startAndSwallowError();
final Uuid instanceId = Uuid.randomUuid();
mockConsumer.setClientInstanceId(instanceId);
mockConsumer.injectTimeoutException(5);
try {
final KafkaFuture<Uuid> future = globalStreamThread.globalConsumerInstanceId(Duration.ZERO);
final Uuid result = future.get();
assertThat(result, equalTo(instanceId));
} finally {
globalStreamThread.shutdown();
globalStreamThread.join();
}
}
@Test
public void shouldReturnNullIfTelemetryDisabled() throws Exception {
initializeConsumer();
mockConsumer.disableTelemetry();
startAndSwallowError();
try {
final KafkaFuture<Uuid> future = globalStreamThread.globalConsumerInstanceId(Duration.ZERO);
final Uuid result = future.get();
assertThat(result, equalTo(null));
} finally {
globalStreamThread.shutdown();
globalStreamThread.join();
}
}
@Test
public void shouldReturnErrorIfInstanceIdNotInitialized() throws Exception {
initializeConsumer();
startAndSwallowError();
try {
final KafkaFuture<Uuid> future = globalStreamThread.globalConsumerInstanceId(Duration.ZERO);
final ExecutionException error = assertThrows(ExecutionException.class, future::get);
assertThat(error.getCause(), instanceOf(UnsupportedOperationException.class));
assertThat(error.getCause().getMessage(), equalTo("clientInstanceId not set"));
} finally {
globalStreamThread.shutdown();
globalStreamThread.join();
}
}
@Test
public void shouldTimeOutOnGlobalConsumerInstanceId() throws Exception {
initializeConsumer();
startAndSwallowError();
final Uuid instanceId = Uuid.randomUuid();
mockConsumer.setClientInstanceId(instanceId);
mockConsumer.injectTimeoutException(-1);
try {
final KafkaFuture<Uuid> future = globalStreamThread.globalConsumerInstanceId(Duration.ZERO);
time.sleep(1L);
final ExecutionException error = assertThrows(ExecutionException.class, future::get);
assertThat(error.getCause(), instanceOf(TimeoutException.class));
assertThat(
error.getCause().getMessage(),
equalTo("Could not retrieve global consumer client instance id.")
);
} finally {
globalStreamThread.shutdown();
globalStreamThread.join();
}
}
private void initializeConsumer() {
mockConsumer.updatePartitions(
GLOBAL_STORE_TOPIC_NAME,