mirror of https://github.com/apache/kafka.git
KAFKA-714: ConsoleConsumer throws SocketTimeoutException when fetching topic metadata; reviewed by Neha Narkhede
This commit is contained in:
parent
311a5d81d8
commit
7833c894a8
|
|
@ -58,10 +58,11 @@ object ClientUtils extends Logging{
|
|||
* @param clientId The client's identifier
|
||||
* @return topic metadata response
|
||||
*/
|
||||
def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], clientId: String): TopicMetadataResponse = {
|
||||
def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], clientId: String, timeoutMs: Int): TopicMetadataResponse = {
|
||||
val props = new Properties()
|
||||
props.put("broker.list", brokers.map(_.getConnectionString()).mkString(","))
|
||||
props.put("client.id", clientId)
|
||||
props.put("request.timeout.ms", timeoutMs.toString)
|
||||
val producerConfig = new ProducerConfig(props)
|
||||
fetchTopicMetadata(topics, brokers, producerConfig, 0)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -79,6 +79,11 @@ object ConsoleConsumer extends Logging {
|
|||
.describedAs("size")
|
||||
.ofType(classOf[java.lang.Integer])
|
||||
.defaultsTo(2 * 1024 * 1024)
|
||||
val socketTimeoutMsOpt = parser.accepts("socket-timeout-ms", "The socket timeout used for the connection to the broker")
|
||||
.withRequiredArg
|
||||
.describedAs("ms")
|
||||
.ofType(classOf[java.lang.Integer])
|
||||
.defaultsTo(ConsumerConfig.SocketTimeout)
|
||||
val consumerTimeoutMsOpt = parser.accepts("consumer-timeout-ms", "consumer throws timeout exception after waiting this much " +
|
||||
"of time without incoming messages")
|
||||
.withRequiredArg
|
||||
|
|
@ -146,6 +151,7 @@ object ConsoleConsumer extends Logging {
|
|||
val props = new Properties()
|
||||
props.put("group.id", options.valueOf(groupIdOpt))
|
||||
props.put("socket.receive.buffer.bytes", options.valueOf(socketBufferSizeOpt).toString)
|
||||
props.put("socket.timeout.ms", options.valueOf(socketTimeoutMsOpt).toString)
|
||||
props.put("fetch.message.max.bytes", options.valueOf(fetchSizeOpt).toString)
|
||||
props.put("fetch.min.bytes", options.valueOf(minFetchBytesOpt).toString)
|
||||
props.put("fetch.wait.max.ms", options.valueOf(maxWaitMsOpt).toString)
|
||||
|
|
|
|||
|
|
@ -54,7 +54,10 @@ class ConsumerFetcherManager(private val consumerIdString: String,
|
|||
try {
|
||||
trace("Partitions without leader %s".format(noLeaderPartitionSet))
|
||||
val brokers = getAllBrokersInCluster(zkClient)
|
||||
val topicsMetadata = ClientUtils.fetchTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet, brokers, config.clientId).topicsMetadata
|
||||
val topicsMetadata = ClientUtils.fetchTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet,
|
||||
brokers,
|
||||
config.clientId,
|
||||
config.socketTimeoutMs).topicsMetadata
|
||||
val leaderForPartitionsMap = new HashMap[TopicAndPartition, Broker]
|
||||
topicsMetadata.foreach(
|
||||
tmd => {
|
||||
|
|
|
|||
|
|
@ -400,7 +400,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
|
|||
val myTopicThreadIdsMap = TopicCount.constructTopicCount(group, consumerIdString, zkClient).getConsumerThreadIdsPerTopic
|
||||
val consumersPerTopicMap = getConsumersPerTopic(zkClient, group)
|
||||
val brokers = getAllBrokersInCluster(zkClient)
|
||||
val topicsMetadata = ClientUtils.fetchTopicMetadata(myTopicThreadIdsMap.keySet, brokers, config.clientId).topicsMetadata
|
||||
val topicsMetadata = ClientUtils.fetchTopicMetadata(myTopicThreadIdsMap.keySet,
|
||||
brokers,
|
||||
config.clientId,
|
||||
config.socketTimeoutMs).topicsMetadata
|
||||
val partitionsPerTopicMap = new mutable.HashMap[String, Seq[Int]]
|
||||
topicsMetadata.foreach(m =>{
|
||||
val topic = m.topic
|
||||
|
|
|
|||
|
|
@ -125,7 +125,7 @@ object SimpleConsumerShell extends Logging {
|
|||
// getting topic metadata
|
||||
info("Getting topic metatdata...")
|
||||
val metadataTargetBrokers = ClientUtils.parseBrokerList(options.valueOf(brokerListOpt))
|
||||
val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers, clientId).topicsMetadata
|
||||
val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers, clientId, maxWaitMs).topicsMetadata
|
||||
if(topicsMetadata.size != 1 || !topicsMetadata(0).topic.equals(topic)) {
|
||||
System.err.println(("Error: no valid topic metadata for topic: %s, " + "what we get from server is only: %s").format(topic, topicsMetadata))
|
||||
System.exit(1)
|
||||
|
|
|
|||
Loading…
Reference in New Issue