MINOR: avoid unnecessary UnsupportedOperationException (#15102)

We did no complete KIP-714 with regard to collecting producer clients
instance IDs in Kafka Streams if EOSv1 is enabled. Instead of throwing
an UnsupportedOperationException, we should return an empty map.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
This commit is contained in:
Matthias J. Sax 2024-01-02 07:34:42 -08:00 committed by GitHub
parent 735022a423
commit 31ab73cf1e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 34 additions and 12 deletions

View File

@ -1877,6 +1877,9 @@ public class KafkaStreams implements AutoCloseable {
/**
* Returns the internal clients' assigned {@code client instance ids}.
* <p>
* Note, if {@link StreamsConfig#PROCESSING_GUARANTEE_CONFIG} is set to {@link StreamsConfig#EXACTLY_ONCE},
* the producer client instance ids are not returned yet. This gap will be closed in the next release.
*
* @return The internal clients' assigned instance ids used for metrics collection.
*

View File

@ -1604,7 +1604,9 @@ public class StreamThread extends Thread implements ProcessingThread {
}
if (processingMode.equals(StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_ALPHA)) {
throw new UnsupportedOperationException("not yet implemented");
log.error("EOS v1 enabled: Producer client instance IDs are not collected." +
" Enable producer logging to retrieve the IDs from the producer logs.");
producerInstanceIdFuture.complete(Collections.emptyMap());
} else {
if (threadProducerInstanceIdFuture.isDone()) {
if (threadProducerInstanceIdFuture.isCompletedExceptionally()) {

View File

@ -3340,24 +3340,37 @@ public class StreamThreadTest {
@Test
public void shouldGetProducerInstanceId() throws Exception {
getProducerInstanceId(false);
getProducerInstanceId(false, false);
}
@Test
public void shouldProducerInstanceIdAndInternalTimeout() throws Exception {
getProducerInstanceId(true);
public void shouldGetProducerInstanceIdWithInternalTimeout() throws Exception {
getProducerInstanceId(true, false);
}
private void getProducerInstanceId(final boolean injectTimeException) throws Exception {
@Test
public void shouldNotGetProducerInstanceIdWithEosV1() throws Exception {
getProducerInstanceId(false, true);
}
@SuppressWarnings("deprecation")
private void getProducerInstanceId(final boolean injectTimeException,
final boolean enableEos) throws Exception {
final Uuid producerInstanceId = Uuid.randomUuid();
final MockProducer<byte[], byte[]> producer = new MockProducer<>();
producer.setClientInstanceId(producerInstanceId);
if (injectTimeException) {
producer.injectTimeoutException(1);
if (!enableEos) {
producer.setClientInstanceId(producerInstanceId);
if (injectTimeException) {
producer.injectTimeoutException(1);
}
}
clientSupplier.prepareProducer(producer);
thread = createStreamThread("clientId");
final Properties properties = configProps(enableEos);
if (enableEos) {
properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
}
thread = createStreamThread("clientId", new StreamsConfig(properties));
thread.setState(State.STARTING);
final KafkaFuture<Map<String, KafkaFuture<Uuid>>> producerInstanceIdFutures =
@ -3366,9 +3379,13 @@ public class StreamThreadTest {
thread.maybeGetClientInstanceIds(); // triggers internal timeout; should not crash
thread.maybeGetClientInstanceIds();
final KafkaFuture<Uuid> producerFuture = producerInstanceIdFutures.get().get("clientId-StreamThread-1-producer");
final Uuid producerUuid = producerFuture.get();
assertThat(producerUuid, equalTo(producerInstanceId));
if (enableEos) {
assertThat(producerInstanceIdFutures.get(), equalTo(emptyMap()));
} else {
final KafkaFuture<Uuid> producerFuture = producerInstanceIdFutures.get().get("clientId-StreamThread-1-producer");
final Uuid producerUuid = producerFuture.get();
assertThat(producerUuid, equalTo(producerInstanceId));
}
}
@Test