diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java b/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java index ed4c0d98596..dc8f0f115bc 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java +++ b/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java @@ -23,6 +23,7 @@ public final class ClientRequest { private final boolean expectResponse; private final RequestSend request; private final RequestCompletionHandler callback; + private final boolean isInitiatedByNetworkClient; /** * @param createdMs The unix timestamp in milliseconds for the time at which this request was created. @@ -30,17 +31,35 @@ public final class ClientRequest { * @param request The request * @param callback A callback to execute when the response has been received (or null if no callback is necessary) */ - public ClientRequest(long createdMs, boolean expectResponse, RequestSend request, RequestCompletionHandler callback) { + public ClientRequest(long createdMs, boolean expectResponse, RequestSend request, + RequestCompletionHandler callback) { + this(createdMs, expectResponse, request, callback, false); + } + + /** + * @param createdMs The unix timestamp in milliseconds for the time at which this request was created. + * @param expectResponse Should we expect a response message or is this request complete once it is sent? + * @param request The request + * @param callback A callback to execute when the response has been received (or null if no callback is necessary) + * @param isInitiatedByNetworkClient Is request initiated by network client, if yes, its + * response will be consumed by network client + */ + public ClientRequest(long createdMs, boolean expectResponse, RequestSend request, + RequestCompletionHandler callback, boolean isInitiatedByNetworkClient) { this.createdMs = createdMs; this.callback = callback; this.request = request; this.expectResponse = expectResponse; + this.isInitiatedByNetworkClient = isInitiatedByNetworkClient; } @Override public String toString() { - return "ClientRequest(expectResponse=" + expectResponse + ", callback=" + callback + ", request=" + request - + ")"; + return "ClientRequest(expectResponse=" + expectResponse + + ", callback=" + callback + + ", request=" + request + + (isInitiatedByNetworkClient ? ", isInitiatedByNetworkClient" : "") + + ")"; } public boolean expectResponse() { @@ -63,4 +82,8 @@ public final class ClientRequest { return createdMs; } + public boolean isInitiatedByNetworkClient() { + return isInitiatedByNetworkClient; + } + } \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 48fe7961e22..0e51d7bd461 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -378,7 +378,7 @@ public class NetworkClient implements KafkaClient { short apiKey = req.request().header().apiKey(); Struct body = (Struct) ProtoUtils.currentResponseSchema(apiKey).read(receive.payload()); correlate(req.request().header(), header); - if (apiKey == ApiKeys.METADATA.id) { + if (apiKey == ApiKeys.METADATA.id && req.isInitiatedByNetworkClient()) { handleMetadataResponse(req.request().header(), body, now); } else { // need to add body/header to response here @@ -454,7 +454,7 @@ public class NetworkClient implements KafkaClient { private ClientRequest metadataRequest(long now, String node, Set topics) { MetadataRequest metadata = new MetadataRequest(new ArrayList(topics)); RequestSend send = new RequestSend(node, nextRequestHeader(ApiKeys.METADATA), metadata.toStruct()); - return new ClientRequest(now, true, send, null); + return new ClientRequest(now, true, send, null, true); } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java index 252b759c080..23e410b7d93 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java @@ -113,6 +113,11 @@ public interface Consumer extends Closeable { */ public List partitionsFor(String topic); + /** + * @see KafkaConsumer#listTopics() + */ + public Map> listTopics(); + /** * @see KafkaConsumer#close() */ diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index bea3d737c51..923ff999d1b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -1024,6 +1024,22 @@ public class KafkaConsumer implements Consumer { } } + /** + * Get metadata about partitions for all topics. This method will issue a remote call to the + * server. + * + * @return The map of topics and its partitions + */ + @Override + public Map> listTopics() { + acquire(); + try { + return fetcher.getAllTopics(requestTimeoutMs); + } finally { + release(); + } + } + @Override public void close() { acquire(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java index c14eed1e95f..5b22fa0bcb4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java @@ -177,6 +177,12 @@ public class MockConsumer implements Consumer { return parts; } + @Override + public Map> listTopics() { + ensureNotClosed(); + return partitions; + } + public synchronized void updatePartitions(String topic, List partitions) { ensureNotClosed(); this.partitions.put(topic, partitions); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index d2a0e2be678..9f71451e1b0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -39,6 +39,8 @@ import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.common.requests.FetchResponse; import org.apache.kafka.common.requests.ListOffsetRequest; import org.apache.kafka.common.requests.ListOffsetResponse; +import org.apache.kafka.common.requests.MetadataRequest; +import org.apache.kafka.common.requests.MetadataResponse; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; @@ -160,6 +162,48 @@ public class Fetcher { } } + + + /** + * Get metadata for all topics present in Kafka cluster + * + * @param timeout time for which getting all topics is attempted + * @return The map of topics and its partitions + */ + public Map> getAllTopics(long timeout) { + final HashMap> topicsPartitionInfos = new HashMap<>(); + long startTime = time.milliseconds(); + + while (time.milliseconds() - startTime < timeout) { + final Node node = client.leastLoadedNode(); + if (node != null) { + MetadataRequest metadataRequest = new MetadataRequest(Collections.emptyList()); + final RequestFuture requestFuture = + client.send(node, ApiKeys.METADATA, metadataRequest); + + client.poll(requestFuture); + + if (requestFuture.succeeded()) { + MetadataResponse response = + new MetadataResponse(requestFuture.value().responseBody()); + + for (String topic : response.cluster().topics()) + topicsPartitionInfos.put( + topic, response.cluster().availablePartitionsForTopic(topic)); + + return topicsPartitionInfos; + } + + if (!requestFuture.isRetriable()) + throw requestFuture.exception(); + } + + Utils.sleep(retryBackoffMs); + } + + return topicsPartitionInfos; + } + /** * Reset offsets for the given partition using the offset reset strategy. * diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 4002679cbc8..06e29906365 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.protocol.Errors; @@ -29,6 +30,7 @@ import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.requests.FetchResponse; +import org.apache.kafka.common.requests.MetadataResponse; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.test.TestUtils; @@ -180,6 +182,17 @@ public class FetcherTest { assertEquals(null, subscriptions.consumed(tp)); } + @Test + public void testGetAllTopics() throws InterruptedException { + // sending response before request, as getAllTopics is a blocking call + client.prepareResponse( + new MetadataResponse(cluster, Collections.emptyMap()).toStruct()); + + Map> allTopics = fetcher.getAllTopics(5000L); + + assertEquals(cluster.topics().size(), allTopics.size()); + } + private Struct fetchResponse(ByteBuffer buffer, short error, long hw) { FetchResponse response = new FetchResponse(Collections.singletonMap(tp, new FetchResponse.PartitionData(error, hw, buffer))); return response.toStruct(); diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala index cca6e94af1b..0c2755f7240 100644 --- a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala @@ -186,6 +186,24 @@ class ConsumerTest extends IntegrationTestHarness with Logging { assertNull(this.consumers(0).partitionsFor("non-exist-topic")) } + def testListTopics() { + val numParts = 2 + val topic1: String = "part-test-topic-1" + val topic2: String = "part-test-topic-2" + val topic3: String = "part-test-topic-3" + TestUtils.createTopic(this.zkClient, topic1, numParts, 1, this.servers) + TestUtils.createTopic(this.zkClient, topic2, numParts, 1, this.servers) + TestUtils.createTopic(this.zkClient, topic3, numParts, 1, this.servers) + + val topics = this.consumers.head.listTopics() + assertNotNull(topics) + assertEquals(5, topics.size()) + assertEquals(5, topics.keySet().size()) + assertEquals(2, topics.get(topic1).length) + assertEquals(2, topics.get(topic2).length) + assertEquals(2, topics.get(topic3).length) + } + def testPartitionReassignmentCallback() { val callback = new TestConsumerReassignmentCallback() this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100"); // timeout quickly to avoid slow test