KAFKA-2275: Add ListTopics() API to the Java consumer; reviewed by Jason Gustafson, Edward Ribeiro and Guozhang Wang

This commit is contained in:
Ashish Singh 2015-07-28 15:49:22 -07:00 committed by Guozhang Wang
parent 3df46bf4ce
commit 594b963930
8 changed files with 130 additions and 5 deletions

View File

@ -23,6 +23,7 @@ public final class ClientRequest {
private final boolean expectResponse;
private final RequestSend request;
private final RequestCompletionHandler callback;
private final boolean isInitiatedByNetworkClient;
/**
* @param createdMs The unix timestamp in milliseconds for the time at which this request was created.
@ -30,17 +31,35 @@ public final class ClientRequest {
* @param request The request
* @param callback A callback to execute when the response has been received (or null if no callback is necessary)
*/
public ClientRequest(long createdMs, boolean expectResponse, RequestSend request, RequestCompletionHandler callback) {
public ClientRequest(long createdMs, boolean expectResponse, RequestSend request,
RequestCompletionHandler callback) {
this(createdMs, expectResponse, request, callback, false);
}
/**
* @param createdMs The unix timestamp in milliseconds for the time at which this request was created.
* @param expectResponse Should we expect a response message or is this request complete once it is sent?
* @param request The request
* @param callback A callback to execute when the response has been received (or null if no callback is necessary)
* @param isInitiatedByNetworkClient Is request initiated by network client, if yes, its
* response will be consumed by network client
*/
public ClientRequest(long createdMs, boolean expectResponse, RequestSend request,
RequestCompletionHandler callback, boolean isInitiatedByNetworkClient) {
this.createdMs = createdMs;
this.callback = callback;
this.request = request;
this.expectResponse = expectResponse;
this.isInitiatedByNetworkClient = isInitiatedByNetworkClient;
}
@Override
public String toString() {
return "ClientRequest(expectResponse=" + expectResponse + ", callback=" + callback + ", request=" + request
+ ")";
return "ClientRequest(expectResponse=" + expectResponse +
", callback=" + callback +
", request=" + request +
(isInitiatedByNetworkClient ? ", isInitiatedByNetworkClient" : "") +
")";
}
public boolean expectResponse() {
@ -63,4 +82,8 @@ public final class ClientRequest {
return createdMs;
}
public boolean isInitiatedByNetworkClient() {
return isInitiatedByNetworkClient;
}
}

View File

@ -378,7 +378,7 @@ public class NetworkClient implements KafkaClient {
short apiKey = req.request().header().apiKey();
Struct body = (Struct) ProtoUtils.currentResponseSchema(apiKey).read(receive.payload());
correlate(req.request().header(), header);
if (apiKey == ApiKeys.METADATA.id) {
if (apiKey == ApiKeys.METADATA.id && req.isInitiatedByNetworkClient()) {
handleMetadataResponse(req.request().header(), body, now);
} else {
// need to add body/header to response here
@ -454,7 +454,7 @@ public class NetworkClient implements KafkaClient {
private ClientRequest metadataRequest(long now, String node, Set<String> topics) {
MetadataRequest metadata = new MetadataRequest(new ArrayList<String>(topics));
RequestSend send = new RequestSend(node, nextRequestHeader(ApiKeys.METADATA), metadata.toStruct());
return new ClientRequest(now, true, send, null);
return new ClientRequest(now, true, send, null, true);
}
/**

View File

@ -113,6 +113,11 @@ public interface Consumer<K, V> extends Closeable {
*/
public List<PartitionInfo> partitionsFor(String topic);
/**
* @see KafkaConsumer#listTopics()
*/
public Map<String, List<PartitionInfo>> listTopics();
/**
* @see KafkaConsumer#close()
*/

View File

@ -1024,6 +1024,22 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
}
}
/**
* Get metadata about partitions for all topics. This method will issue a remote call to the
* server.
*
* @return The map of topics and its partitions
*/
@Override
public Map<String, List<PartitionInfo>> listTopics() {
acquire();
try {
return fetcher.getAllTopics(requestTimeoutMs);
} finally {
release();
}
}
@Override
public void close() {
acquire();

View File

@ -177,6 +177,12 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
return parts;
}
@Override
public Map<String, List<PartitionInfo>> listTopics() {
ensureNotClosed();
return partitions;
}
public synchronized void updatePartitions(String topic, List<PartitionInfo> partitions) {
ensureNotClosed();
this.partitions.put(topic, partitions);

View File

@ -39,6 +39,8 @@ import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.ListOffsetRequest;
import org.apache.kafka.common.requests.ListOffsetResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
@ -160,6 +162,48 @@ public class Fetcher<K, V> {
}
}
/**
* Get metadata for all topics present in Kafka cluster
*
* @param timeout time for which getting all topics is attempted
* @return The map of topics and its partitions
*/
public Map<String, List<PartitionInfo>> getAllTopics(long timeout) {
final HashMap<String, List<PartitionInfo>> topicsPartitionInfos = new HashMap<>();
long startTime = time.milliseconds();
while (time.milliseconds() - startTime < timeout) {
final Node node = client.leastLoadedNode();
if (node != null) {
MetadataRequest metadataRequest = new MetadataRequest(Collections.<String>emptyList());
final RequestFuture<ClientResponse> requestFuture =
client.send(node, ApiKeys.METADATA, metadataRequest);
client.poll(requestFuture);
if (requestFuture.succeeded()) {
MetadataResponse response =
new MetadataResponse(requestFuture.value().responseBody());
for (String topic : response.cluster().topics())
topicsPartitionInfos.put(
topic, response.cluster().availablePartitionsForTopic(topic));
return topicsPartitionInfos;
}
if (!requestFuture.isRetriable())
throw requestFuture.exception();
}
Utils.sleep(retryBackoffMs);
}
return topicsPartitionInfos;
}
/**
* Reset offsets for the given partition using the offset reset strategy.
*

View File

@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
@ -29,6 +30,7 @@ import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.test.TestUtils;
@ -180,6 +182,17 @@ public class FetcherTest {
assertEquals(null, subscriptions.consumed(tp));
}
@Test
public void testGetAllTopics() throws InterruptedException {
// sending response before request, as getAllTopics is a blocking call
client.prepareResponse(
new MetadataResponse(cluster, Collections.<String, Errors>emptyMap()).toStruct());
Map<String, List<PartitionInfo>> allTopics = fetcher.getAllTopics(5000L);
assertEquals(cluster.topics().size(), allTopics.size());
}
private Struct fetchResponse(ByteBuffer buffer, short error, long hw) {
FetchResponse response = new FetchResponse(Collections.singletonMap(tp, new FetchResponse.PartitionData(error, hw, buffer)));
return response.toStruct();

View File

@ -186,6 +186,24 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
assertNull(this.consumers(0).partitionsFor("non-exist-topic"))
}
def testListTopics() {
val numParts = 2
val topic1: String = "part-test-topic-1"
val topic2: String = "part-test-topic-2"
val topic3: String = "part-test-topic-3"
TestUtils.createTopic(this.zkClient, topic1, numParts, 1, this.servers)
TestUtils.createTopic(this.zkClient, topic2, numParts, 1, this.servers)
TestUtils.createTopic(this.zkClient, topic3, numParts, 1, this.servers)
val topics = this.consumers.head.listTopics()
assertNotNull(topics)
assertEquals(5, topics.size())
assertEquals(5, topics.keySet().size())
assertEquals(2, topics.get(topic1).length)
assertEquals(2, topics.get(topic2).length)
assertEquals(2, topics.get(topic3).length)
}
def testPartitionReassignmentCallback() {
val callback = new TestConsumerReassignmentCallback()
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100"); // timeout quickly to avoid slow test