MINOR: Update clientInstanceIds from EOS_V2 refactor (#17664)

Updates KafkaStreams.clientInstanceIds method to correctly populate the client-id -> clientInstanceId map that was altered in a previous refactoring.

Added a test that confirms ClientInstanceIds is correctly storing consumer and producer instance ids

Reviewers: Matthias Sax <mjsax@apache.org>
This commit is contained in:
Bill Bejeck 2024-11-02 10:37:46 -04:00 committed by GitHub
parent dafa126223
commit 908dfa30d7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 42 additions and 4 deletions

View File

@ -92,6 +92,7 @@ import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
@ -1929,10 +1930,18 @@ public class KafkaStreams implements AutoCloseable {
// could be `null` if telemetry is disabled on the consumer itself
if (instanceId != null) {
clientInstanceIds.addConsumerInstanceId(
clientFuture.getKey(),
instanceId
);
final String clientFutureKey = clientFuture.getKey();
if (clientFutureKey.toLowerCase(Locale.getDefault()).endsWith("-producer")) {
clientInstanceIds.addProducerInstanceId(
clientFutureKey,
instanceId
);
} else {
clientInstanceIds.addConsumerInstanceId(
clientFutureKey,
instanceId
);
}
} else {
log.debug(String.format("Telemetry is disabled for %s.", clientFuture.getKey()));
}

View File

@ -21,6 +21,7 @@ import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.TimeoutException;
@ -1668,6 +1669,34 @@ public class KafkaStreamsTest {
}
}
@Test
public void shouldReturnProducerAndConsumerInstanceIds() {
prepareStreams();
prepareStreamThread(streamThreadOne, 1);
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
final Uuid mainConsumerInstanceId = Uuid.randomUuid();
final Uuid producerInstanceId = Uuid.randomUuid();
final KafkaFutureImpl<Uuid> consumerFuture = new KafkaFutureImpl<>();
final KafkaFutureImpl<Uuid> producerFuture = new KafkaFutureImpl<>();
consumerFuture.complete(mainConsumerInstanceId);
producerFuture.complete(producerInstanceId);
final Uuid adminInstanceId = Uuid.randomUuid();
adminClient.setClientInstanceId(adminInstanceId);
final Map<String, KafkaFuture<Uuid>> expectedClientIds = Map.of("main-consumer", consumerFuture, "some-thread-producer", producerFuture);
when(streamThreadOne.clientInstanceIds(any())).thenReturn(expectedClientIds);
try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
streams.start();
final ClientInstanceIds clientInstanceIds = streams.clientInstanceIds(Duration.ZERO);
assertThat(clientInstanceIds.consumerInstanceIds().size(), equalTo(1));
assertThat(clientInstanceIds.consumerInstanceIds().get("main-consumer"), equalTo(mainConsumerInstanceId));
assertThat(clientInstanceIds.producerInstanceIds().size(), equalTo(1));
assertThat(clientInstanceIds.producerInstanceIds().get("some-thread-producer"), equalTo(producerInstanceId));
assertThat(clientInstanceIds.adminInstanceId(), equalTo(adminInstanceId));
}
}
@Test
public void shouldThrowTimeoutExceptionWhenAnyClientFutureDoesNotComplete() {
prepareStreams();