KAFKA-1261 Make it possible to configure the metadata refresh.

This commit is contained in:
Jay Kreps 2014-02-12 15:30:08 -08:00
parent 84a5803a7e
commit 7e154a36f7
2 changed files with 36 additions and 31 deletions

View File

@ -1,18 +1,14 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.clients.producer;
@ -49,7 +45,6 @@ import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.SystemTime;
/**
* A Kafka client that publishes records to the Kafka cluster.
* <P>
@ -94,7 +89,8 @@ public class KafkaProducer implements Producer {
new SystemTime());
this.partitioner = new Partitioner();
this.metadataFetchTimeoutMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
this.metadata = new Metadata();
this.metadata = new Metadata(config.getLong(ProducerConfig.METADATA_FETCH_BACKOFF_CONFIG),
config.getLong(ProducerConfig.METADATA_EXPIRY_CONFIG));
this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
this.totalMemorySize = config.getLong(ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG);
this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.MAX_PARTITION_SIZE_CONFIG),
@ -256,6 +252,7 @@ public class KafkaProducer implements Producer {
*/
@Override
public void close() {
this.accumulator.close();
this.sender.initiateClose();
try {
this.ioThread.join();

View File

@ -1,18 +1,14 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.clients.producer;
@ -25,7 +21,6 @@ import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Type;
/**
* The producer configuration keys
*/
@ -47,6 +42,17 @@ public class ProducerConfig extends AbstractConfig {
*/
public static final String METADATA_FETCH_TIMEOUT_CONFIG = "metadata.fetch.timeout.ms";
/**
* The minimum amount of time between metadata fetches. This prevents polling for metadata too quickly.
*/
public static final String METADATA_FETCH_BACKOFF_CONFIG = "metadata.fetch.backoff.ms";
/**
* The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any
* leadership changes.
*/
public static final String METADATA_EXPIRY_CONFIG = "metadata.expiry.ms";
/**
* The buffer size allocated for a partition. When records are received which are smaller than this size the
* producer will attempt to optimistically group them together until this size is reached.
@ -125,6 +131,8 @@ public class ProducerConfig extends AbstractConfig {
/* TODO: add docs */
config = new ConfigDef().define(BROKER_LIST_CONFIG, Type.LIST, "blah blah")
.define(METADATA_FETCH_TIMEOUT_CONFIG, Type.LONG, 60 * 1000, atLeast(0), "blah blah")
.define(METADATA_FETCH_BACKOFF_CONFIG, Type.LONG, 50, atLeast(0), "blah blah")
.define(METADATA_EXPIRY_CONFIG, Type.LONG, 5 * 60 * 1000, atLeast(0), "blah blah")
.define(MAX_PARTITION_SIZE_CONFIG, Type.INT, 16384, atLeast(0), "blah blah")
.define(TOTAL_BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), "blah blah")
/* TODO: should be a string to handle acks=in-sync */