From e4f2f6f6e82cafbdea785d53521b96fe062e172d Mon Sep 17 00:00:00 2001 From: Boyang Chen Date: Tue, 25 May 2021 23:59:30 -0700 Subject: [PATCH] KAFKA-12260: Avoid hitting NPE for partitionsFor (#10017) Remove null pointer from the public partitionsFor API. Reviewers: Chia-Ping Tsai --- .../kafka/clients/consumer/KafkaConsumer.java | 6 ++--- .../kafka/clients/consumer/MockConsumer.java | 2 +- .../clients/consumer/KafkaConsumerTest.java | 22 +++++++++++++++++++ .../kafka/connect/util/KafkaBasedLog.java | 4 ++-- .../ConnectorTopicsIntegrationTest.java | 11 +++++----- 5 files changed, 34 insertions(+), 11 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 1f4bc7ce977..992edc6977f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -1901,7 +1901,7 @@ public class KafkaConsumer implements Consumer { * * @param topic The topic to get partition metadata for * - * @return The list of partitions + * @return The list of partitions, which will be empty when the given topic is not found * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this * function is called * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while @@ -1924,7 +1924,7 @@ public class KafkaConsumer implements Consumer { * @param topic The topic to get partition metadata for * @param timeout The maximum of time to await topic metadata * - * @return The list of partitions + * @return The list of partitions, which will be empty when the given topic is not found * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this * function is called * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while @@ -1948,7 +1948,7 @@ public class KafkaConsumer implements Consumer { Timer timer = time.timer(timeout); Map> topicMetadata = fetcher.getTopicMetadata( new MetadataRequest.Builder(Collections.singletonList(topic), metadata.allowAutoTopicCreation()), timer); - return topicMetadata.get(topic); + return topicMetadata.getOrDefault(topic, Collections.emptyList()); } finally { release(); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java index 345cdec00dd..ed29afe488c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java @@ -376,7 +376,7 @@ public class MockConsumer implements Consumer { @Override public synchronized List partitionsFor(String topic) { ensureNotClosed(); - return this.partitions.get(topic); + return this.partitions.getOrDefault(topic, Collections.emptyList()); } @Override diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index b125a91f70c..59f72cd0eef 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -1757,6 +1757,28 @@ public class KafkaConsumerTest { } } + @Test + public void testPartitionsForNonExistingTopic() { + Time time = new MockTime(); + SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); + ConsumerMetadata metadata = createMetadata(subscription); + MockClient client = new MockClient(time, metadata); + + initMetadata(client, Collections.singletonMap(topic, 1)); + Cluster cluster = metadata.fetch(); + + MetadataResponse updateResponse = RequestTestUtils.metadataResponse(cluster.nodes(), + cluster.clusterResource().clusterId(), + cluster.controller().id(), + Collections.emptyList()); + client.prepareResponse(updateResponse); + + ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); + + KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); + assertEquals(Collections.emptyList(), consumer.partitionsFor("non-exist-topic")); + } + @Test public void testPartitionsForAuthenticationFailure() { final KafkaConsumer consumer = consumerWithPendingAuthenticationError(); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java index 6e2350fae08..b1920d59b20 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java @@ -175,12 +175,12 @@ public class KafkaBasedLog { List partitionInfos = consumer.partitionsFor(topic); long started = time.nanoseconds(); long sleepMs = 100; - while (partitionInfos == null && time.nanoseconds() - started < CREATE_TOPIC_TIMEOUT_NS) { + while (partitionInfos.isEmpty() && time.nanoseconds() - started < CREATE_TOPIC_TIMEOUT_NS) { time.sleep(sleepMs); sleepMs = Math.min(2 * sleepMs, MAX_SLEEP_MS); partitionInfos = consumer.partitionsFor(topic); } - if (partitionInfos == null) + if (partitionInfos.isEmpty()) throw new ConnectException("Could not look up partition metadata for offset backing store topic in" + " allotted period. This could indicate a connectivity issue, unavailable topic partitions, or if" + " this is your first use of the topic it may have taken too long to create."); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java index 75374a94819..8c4e1566931 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java @@ -19,6 +19,7 @@ package org.apache.kafka.connect.integration; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.runtime.distributed.DistributedConfig; import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException; @@ -40,7 +41,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.stream.Collectors; @@ -260,10 +260,11 @@ public class ConnectorTopicsIntegrationTest { Consumer verifiableConsumer = connect.kafka().createConsumer( Collections.singletonMap("group.id", "verifiable-consumer-group-0")); - List partitions = - Optional.ofNullable(verifiableConsumer.partitionsFor(statusTopic)) - .orElseThrow(() -> new AssertionError("Unable to retrieve partitions info for status topic")) - .stream() + List partitionInfos = verifiableConsumer.partitionsFor(statusTopic); + if (partitionInfos.isEmpty()) { + throw new AssertionError("Unable to retrieve partitions info for status topic"); + } + List partitions = partitionInfos.stream() .map(info -> new TopicPartition(info.topic(), info.partition())) .collect(Collectors.toList()); verifiableConsumer.assign(partitions);