KAFKA-6195: Resolve DNS aliases in bootstrap.server (KIP-235) (#4485)

Adds `client.dns.lookup=resolve_canonical_bootstrap_servers_only` option to perform full dns resolution of bootstrap addresses

Reviewers: Colin Patrick McCabe <colin@cmccabe.xyz>, Sriharsha Chintalapani <sriharsha@apache.org>, Edoardo Comar <ecomar@uk.ibm.com>, Mickael Maison <mickael.maison@gmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com>, Rajini Sivaram <rajinisivaram@googlemail.com>
This commit is contained in:
jonathanskrzypek 2018-10-13 21:39:35 +01:00 committed by Rajini Sivaram
parent 6a3382c9d3
commit a947fe8da8
25 changed files with 158 additions and 88 deletions

View File

@ -14,14 +14,15 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.kafka.common.config; package org.apache.kafka.clients;
import java.util.Locale; import java.util.Locale;
public enum ClientDnsLookup { public enum ClientDnsLookup {
DEFAULT("default"), DEFAULT("default"),
USE_ALL_DNS_IPS("use_all_dns_ips"); USE_ALL_DNS_IPS("use_all_dns_ips"),
RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY("resolve_canonical_bootstrap_servers_only");
private String clientDnsLookup; private String clientDnsLookup;

View File

@ -17,7 +17,6 @@
package org.apache.kafka.clients; package org.apache.kafka.clients;
import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ClientDnsLookup;
import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.network.ChannelBuilder; import org.apache.kafka.common.network.ChannelBuilder;
@ -42,10 +41,12 @@ import static org.apache.kafka.common.utils.Utils.getPort;
public final class ClientUtils { public final class ClientUtils {
private static final Logger log = LoggerFactory.getLogger(ClientUtils.class); private static final Logger log = LoggerFactory.getLogger(ClientUtils.class);
private ClientUtils() {} private ClientUtils() {
}
public static List<InetSocketAddress> parseAndValidateAddresses(List<String> urls) { public static List<InetSocketAddress> parseAndValidateAddresses(List<String> urls, String clientDnsLookup) {
List<InetSocketAddress> addresses = new ArrayList<>(); List<InetSocketAddress> addresses = new ArrayList<>();
ClientDnsLookup clientDnsLookupBehaviour = ClientDnsLookup.forConfig(clientDnsLookup);
for (String url : urls) { for (String url : urls) {
if (url != null && !url.isEmpty()) { if (url != null && !url.isEmpty()) {
try { try {
@ -54,15 +55,30 @@ public final class ClientUtils {
if (host == null || port == null) if (host == null || port == null)
throw new ConfigException("Invalid url in " + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG + ": " + url); throw new ConfigException("Invalid url in " + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG + ": " + url);
InetSocketAddress address = new InetSocketAddress(host, port); if (clientDnsLookupBehaviour == ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY) {
InetAddress[] inetAddresses = InetAddress.getAllByName(host);
if (address.isUnresolved()) { for (InetAddress inetAddress : inetAddresses) {
log.warn("Removing server {} from {} as DNS resolution failed for {}", url, CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, host); String resolvedCanonicalName = inetAddress.getCanonicalHostName();
InetSocketAddress address = new InetSocketAddress(resolvedCanonicalName, port);
if (address.isUnresolved()) {
log.warn("Couldn't resolve server {} from {} as DNS resolution of the canonical hostname [} failed for {}", url, CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, resolvedCanonicalName, host);
} else {
addresses.add(address);
}
}
} else { } else {
addresses.add(address); InetSocketAddress address = new InetSocketAddress(host, port);
if (address.isUnresolved()) {
log.warn("Couldn't resolve server {} from {} as DNS resolution failed for {}", url, CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, host);
} else {
addresses.add(address);
}
} }
} catch (IllegalArgumentException e) { } catch (IllegalArgumentException e) {
throw new ConfigException("Invalid port in " + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG + ": " + url); throw new ConfigException("Invalid port in " + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG + ": " + url);
} catch (UnknownHostException e) {
throw new ConfigException("Unknown host in " + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG + ": " + url);
} }
} }
} }

View File

@ -18,7 +18,6 @@ package org.apache.kafka.clients;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
import org.apache.kafka.common.config.ClientDnsLookup;
import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.AuthenticationException;
import java.net.InetAddress; import java.net.InetAddress;

View File

@ -41,6 +41,11 @@ public class CommonClientConfigs {
+ "discover the full cluster membership (which may change dynamically), this list need not contain the full set of " + "discover the full cluster membership (which may change dynamically), this list need not contain the full set of "
+ "servers (you may want more than one, though, in case a server is down)."; + "servers (you may want more than one, though, in case a server is down).";
public static final String CLIENT_DNS_LOOKUP_CONFIG = "client.dns.lookup";
public static final String CLIENT_DNS_LOOKUP_DOC = "<p>Controls how the client uses DNS lookups.</p><p>If set to <code>use_all_dns_ips</code> then, when the lookup returns multiple IP addresses for a hostname,"
+ " they will all be attempted to connect to before failing the connection. Applies to both bootstrap and advertised servers.</p>"
+ "<p>If the value is <code>resolve_canonical_bootstrap_servers_only</code> each entry will be resolved and expanded into a list of canonical names.</p>";
public static final String METADATA_MAX_AGE_CONFIG = "metadata.max.age.ms"; public static final String METADATA_MAX_AGE_CONFIG = "metadata.max.age.ms";
public static final String METADATA_MAX_AGE_DOC = "The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions."; public static final String METADATA_MAX_AGE_DOC = "The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions.";
@ -93,9 +98,6 @@ public class CommonClientConfigs {
+ "elapses the client will resend the request if necessary or fail the request if " + "elapses the client will resend the request if necessary or fail the request if "
+ "retries are exhausted."; + "retries are exhausted.";
public static final String CLIENT_DNS_LOOKUP_CONFIG = "client.dns.lookup";
public static final String CLIENT_DNS_LOOKUP_DOC = "<p>Controls how the client uses DNS lookups.</p><p>If set to <code>use_all_dns_ips</code> then, when the lookup returns multiple IP addresses for a hostname,"
+ " they will all be attempted to connect to before failing the connection. Applies to both bootstrap and advertised servers.</p>";
/** /**
* Postprocess the configuration so that exponential backoff is disabled when reconnect backoff * Postprocess the configuration so that exponential backoff is disabled when reconnect backoff

View File

@ -19,7 +19,6 @@ package org.apache.kafka.clients;
import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node; import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ClientDnsLookup;
import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.Sensor;

View File

@ -17,9 +17,9 @@
package org.apache.kafka.clients.admin; package org.apache.kafka.clients.admin;
import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ClientDnsLookup;
import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.config.ConfigDef.Type;
@ -43,6 +43,12 @@ public class AdminClientConfig extends AbstractConfig {
public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
private static final String BOOTSTRAP_SERVERS_DOC = CommonClientConfigs.BOOTSTRAP_SERVERS_DOC; private static final String BOOTSTRAP_SERVERS_DOC = CommonClientConfigs.BOOTSTRAP_SERVERS_DOC;
/**
* <code>client.dns.lookup</code>
*/
public static final String CLIENT_DNS_LOOKUP_CONFIG = CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG;
private static final String CLIENT_DNS_LOOKUP_DOC = CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC;
/** /**
* <code>reconnect.backoff.ms</code> * <code>reconnect.backoff.ms</code>
*/ */
@ -159,12 +165,14 @@ public class AdminClientConfig extends AbstractConfig {
in(Sensor.RecordingLevel.INFO.toString(), Sensor.RecordingLevel.DEBUG.toString()), in(Sensor.RecordingLevel.INFO.toString(), Sensor.RecordingLevel.DEBUG.toString()),
Importance.LOW, Importance.LOW,
METRICS_RECORDING_LEVEL_DOC) METRICS_RECORDING_LEVEL_DOC)
.define(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG, .define(CLIENT_DNS_LOOKUP_CONFIG,
Type.STRING, Type.STRING,
ClientDnsLookup.DEFAULT.toString(), ClientDnsLookup.DEFAULT.toString(),
in(ClientDnsLookup.DEFAULT.toString(), ClientDnsLookup.USE_ALL_DNS_IPS.toString()), in(ClientDnsLookup.DEFAULT.toString(),
ClientDnsLookup.USE_ALL_DNS_IPS.toString(),
ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY.toString()),
Importance.MEDIUM, Importance.MEDIUM,
CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC) CLIENT_DNS_LOOKUP_DOC)
// security support // security support
.define(SECURITY_PROTOCOL_CONFIG, .define(SECURITY_PROTOCOL_CONFIG,
Type.STRING, Type.STRING,

View File

@ -18,10 +18,10 @@
package org.apache.kafka.clients.admin; package org.apache.kafka.clients.admin;
import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.ClientRequest; import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse; import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.ClientUtils; import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.KafkaClient; import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.NetworkClient; import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.StaleMetadataException; import org.apache.kafka.clients.StaleMetadataException;
@ -46,7 +46,6 @@ import org.apache.kafka.common.TopicPartitionReplica;
import org.apache.kafka.common.acl.AclBinding; import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter; import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.config.ClientDnsLookup;
import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.AuthenticationException;
@ -362,7 +361,7 @@ public class KafkaAdminClient extends AdminClient {
config.getInt(AdminClientConfig.SEND_BUFFER_CONFIG), config.getInt(AdminClientConfig.SEND_BUFFER_CONFIG),
config.getInt(AdminClientConfig.RECEIVE_BUFFER_CONFIG), config.getInt(AdminClientConfig.RECEIVE_BUFFER_CONFIG),
(int) TimeUnit.HOURS.toMillis(1), (int) TimeUnit.HOURS.toMillis(1),
ClientDnsLookup.forConfig(config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG)), ClientDnsLookup.forConfig(config.getString(AdminClientConfig.CLIENT_DNS_LOOKUP_CONFIG)),
time, time,
true, true,
apiVersions, apiVersions,
@ -414,7 +413,8 @@ public class KafkaAdminClient extends AdminClient {
this.time = time; this.time = time;
this.metadataManager = metadataManager; this.metadataManager = metadataManager;
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses( List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(
config.getList(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG)); config.getList(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG),
config.getString(AdminClientConfig.CLIENT_DNS_LOOKUP_CONFIG));
metadataManager.update(Cluster.bootstrap(addresses), time.milliseconds()); metadataManager.update(Cluster.bootstrap(addresses), time.milliseconds());
this.metrics = metrics; this.metrics = metrics;
this.client = client; this.client = client;

View File

@ -16,9 +16,9 @@
*/ */
package org.apache.kafka.clients.consumer; package org.apache.kafka.clients.consumer;
import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ClientDnsLookup;
import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.config.ConfigDef.Type;
@ -90,6 +90,9 @@ public class ConsumerConfig extends AbstractConfig {
*/ */
public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
/** <code>client.dns.lookup</code> */
public static final String CLIENT_DNS_LOOKUP_CONFIG = CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG;
/** /**
* <code>enable.auto.commit</code> * <code>enable.auto.commit</code>
*/ */
@ -258,7 +261,7 @@ public class ConsumerConfig extends AbstractConfig {
" return the LSO"; " return the LSO";
public static final String DEFAULT_ISOLATION_LEVEL = IsolationLevel.READ_UNCOMMITTED.toString().toLowerCase(Locale.ROOT); public static final String DEFAULT_ISOLATION_LEVEL = IsolationLevel.READ_UNCOMMITTED.toString().toLowerCase(Locale.ROOT);
static { static {
CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
Type.LIST, Type.LIST,
@ -266,6 +269,14 @@ public class ConsumerConfig extends AbstractConfig {
new ConfigDef.NonNullValidator(), new ConfigDef.NonNullValidator(),
Importance.HIGH, Importance.HIGH,
CommonClientConfigs.BOOTSTRAP_SERVERS_DOC) CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
.define(CLIENT_DNS_LOOKUP_CONFIG,
Type.STRING,
ClientDnsLookup.DEFAULT.toString(),
in(ClientDnsLookup.DEFAULT.toString(),
ClientDnsLookup.USE_ALL_DNS_IPS.toString(),
ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY.toString()),
Importance.MEDIUM,
CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC)
.define(GROUP_ID_CONFIG, Type.STRING, "", Importance.HIGH, GROUP_ID_DOC) .define(GROUP_ID_CONFIG, Type.STRING, "", Importance.HIGH, GROUP_ID_DOC)
.define(SESSION_TIMEOUT_MS_CONFIG, .define(SESSION_TIMEOUT_MS_CONFIG,
Type.INT, Type.INT,
@ -453,12 +464,6 @@ public class ConsumerConfig extends AbstractConfig {
in(IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT), IsolationLevel.READ_UNCOMMITTED.toString().toLowerCase(Locale.ROOT)), in(IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT), IsolationLevel.READ_UNCOMMITTED.toString().toLowerCase(Locale.ROOT)),
Importance.MEDIUM, Importance.MEDIUM,
ISOLATION_LEVEL_DOC) ISOLATION_LEVEL_DOC)
.define(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG,
Type.STRING,
ClientDnsLookup.DEFAULT.toString(),
in(ClientDnsLookup.DEFAULT.toString(), ClientDnsLookup.USE_ALL_DNS_IPS.toString()),
Importance.MEDIUM,
CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC)
// security support // security support
.define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
Type.STRING, Type.STRING,

View File

@ -17,8 +17,8 @@
package org.apache.kafka.clients.consumer; package org.apache.kafka.clients.consumer;
import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.ClientUtils; import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NetworkClient; import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator; import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;
@ -36,7 +36,6 @@ import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName; import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ClientDnsLookup;
import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.internals.ClusterResourceListeners; import org.apache.kafka.common.internals.ClusterResourceListeners;
@ -710,8 +709,10 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keyDeserializer, valueDeserializer, reporters, interceptorList); ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keyDeserializer, valueDeserializer, reporters, interceptorList);
this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG), this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG),
true, false, clusterResourceListeners); true, false, clusterResourceListeners);
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(
this.metadata.update(Cluster.bootstrap(addresses), Collections.emptySet(), 0); config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG),
config.getString(ConsumerConfig.CLIENT_DNS_LOOKUP_CONFIG));
this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), 0);
String metricGrpPrefix = "consumer"; String metricGrpPrefix = "consumer";
ConsumerMetrics metricsRegistry = new ConsumerMetrics(metricsTags.keySet(), "consumer"); ConsumerMetrics metricsRegistry = new ConsumerMetrics(metricsTags.keySet(), "consumer");
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config); ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config);
@ -732,7 +733,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG), config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG),
config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG), config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG),
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
ClientDnsLookup.forConfig(config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG)), ClientDnsLookup.forConfig(config.getString(ConsumerConfig.CLIENT_DNS_LOOKUP_CONFIG)),
time, time,
true, true,
new ApiVersions(), new ApiVersions(),

View File

@ -28,8 +28,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.ClientUtils; import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.KafkaClient; import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NetworkClient; import org.apache.kafka.clients.NetworkClient;
@ -49,7 +49,6 @@ import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName; import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ClientDnsLookup;
import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.AuthenticationException;
@ -407,7 +406,9 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
apiVersions, apiVersions,
transactionManager, transactionManager,
new BufferPool(this.totalMemorySize, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), metrics, time, PRODUCER_METRIC_GROUP_NAME)); new BufferPool(this.totalMemorySize, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), metrics, time, PRODUCER_METRIC_GROUP_NAME));
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(
config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG),
config.getString(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG));
if (metadata != null) { if (metadata != null) {
this.metadata = metadata; this.metadata = metadata;
} else { } else {
@ -449,7 +450,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
producerConfig.getInt(ProducerConfig.SEND_BUFFER_CONFIG), producerConfig.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
producerConfig.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG), producerConfig.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
requestTimeoutMs, requestTimeoutMs,
ClientDnsLookup.forConfig(producerConfig.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG)), ClientDnsLookup.forConfig(producerConfig.getString(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG)),
time, time,
true, true,
apiVersions, apiVersions,
@ -496,7 +497,9 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
} }
private static TransactionManager configureTransactionState(ProducerConfig config, LogContext logContext, Logger log) { private static TransactionManager configureTransactionState(ProducerConfig config, LogContext logContext, Logger log) {
TransactionManager transactionManager = null; TransactionManager transactionManager = null;
boolean userConfiguredIdempotence = false; boolean userConfiguredIdempotence = false;
if (config.originals().containsKey(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG)) if (config.originals().containsKey(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG))
userConfiguredIdempotence = true; userConfiguredIdempotence = true;

View File

@ -16,10 +16,10 @@
*/ */
package org.apache.kafka.clients.producer; package org.apache.kafka.clients.producer;
import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.internals.DefaultPartitioner; import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ClientDnsLookup;
import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.config.ConfigDef.Type;
@ -52,6 +52,9 @@ public class ProducerConfig extends AbstractConfig {
/** <code>bootstrap.servers</code> */ /** <code>bootstrap.servers</code> */
public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
/** <code>client.dns.lookup</code> */
public static final String CLIENT_DNS_LOOKUP_CONFIG = CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG;
/** <code>metadata.max.age.ms</code> */ /** <code>metadata.max.age.ms</code> */
public static final String METADATA_MAX_AGE_CONFIG = CommonClientConfigs.METADATA_MAX_AGE_CONFIG; public static final String METADATA_MAX_AGE_CONFIG = CommonClientConfigs.METADATA_MAX_AGE_CONFIG;
private static final String METADATA_MAX_AGE_DOC = CommonClientConfigs.METADATA_MAX_AGE_DOC; private static final String METADATA_MAX_AGE_DOC = CommonClientConfigs.METADATA_MAX_AGE_DOC;
@ -239,6 +242,14 @@ public class ProducerConfig extends AbstractConfig {
static { static {
CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Collections.emptyList(), new ConfigDef.NonNullValidator(), Importance.HIGH, CommonClientConfigs.BOOTSTRAP_SERVERS_DOC) CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Collections.emptyList(), new ConfigDef.NonNullValidator(), Importance.HIGH, CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
.define(CLIENT_DNS_LOOKUP_CONFIG,
Type.STRING,
ClientDnsLookup.DEFAULT.toString(),
in(ClientDnsLookup.DEFAULT.toString(),
ClientDnsLookup.USE_ALL_DNS_IPS.toString(),
ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY.toString()),
Importance.MEDIUM,
CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC)
.define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC) .define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC)
.define(RETRIES_CONFIG, Type.INT, Integer.MAX_VALUE, between(0, Integer.MAX_VALUE), Importance.HIGH, RETRIES_DOC) .define(RETRIES_CONFIG, Type.INT, Integer.MAX_VALUE, between(0, Integer.MAX_VALUE), Importance.HIGH, RETRIES_DOC)
.define(ACKS_CONFIG, .define(ACKS_CONFIG,
@ -347,13 +358,7 @@ public class ProducerConfig extends AbstractConfig {
null, null,
new ConfigDef.NonEmptyString(), new ConfigDef.NonEmptyString(),
Importance.LOW, Importance.LOW,
TRANSACTIONAL_ID_DOC) TRANSACTIONAL_ID_DOC);
.define(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG,
Type.STRING,
ClientDnsLookup.DEFAULT.toString(),
in(ClientDnsLookup.DEFAULT.toString(), ClientDnsLookup.USE_ALL_DNS_IPS.toString()),
Importance.MEDIUM,
CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC);
} }
@Override @Override

View File

@ -16,7 +16,6 @@
*/ */
package org.apache.kafka.clients; package org.apache.kafka.clients;
import org.apache.kafka.common.config.ClientDnsLookup;
import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.ConfigException;
import org.junit.Test; import org.junit.Test;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@ -31,27 +30,46 @@ import java.util.List;
public class ClientUtilsTest { public class ClientUtilsTest {
@Test @Test
public void testParseAndValidateAddresses() { public void testParseAndValidateAddresses() throws UnknownHostException {
check("127.0.0.1:8000"); checkWithoutLookup("127.0.0.1:8000");
check("mydomain.com:8080"); checkWithoutLookup("localhost:8080");
check("[::1]:8000"); checkWithoutLookup("[::1]:8000");
check("[2001:db8:85a3:8d3:1319:8a2e:370:7348]:1234", "mydomain.com:10000"); checkWithoutLookup("[2001:db8:85a3:8d3:1319:8a2e:370:7348]:1234", "localhost:10000");
List<InetSocketAddress> validatedAddresses = check("some.invalid.hostname.foo.bar.local:9999", "mydomain.com:10000"); List<InetSocketAddress> validatedAddresses = checkWithoutLookup("localhost:10000");
assertEquals(1, validatedAddresses.size()); assertEquals(1, validatedAddresses.size());
InetSocketAddress onlyAddress = validatedAddresses.get(0); InetSocketAddress onlyAddress = validatedAddresses.get(0);
assertEquals("mydomain.com", onlyAddress.getHostName()); assertEquals("localhost", onlyAddress.getHostName());
assertEquals(10000, onlyAddress.getPort()); assertEquals(10000, onlyAddress.getPort());
} }
@Test
public void testParseAndValidateAddressesWithReverseLookup() {
checkWithoutLookup("127.0.0.1:8000");
checkWithoutLookup("localhost:8080");
checkWithoutLookup("[::1]:8000");
checkWithoutLookup("[2001:db8:85a3:8d3:1319:8a2e:370:7348]:1234", "localhost:10000");
List<InetSocketAddress> validatedAddresses = checkWithLookup(Arrays.asList("example.com:10000"));
assertEquals(2, validatedAddresses.size());
InetSocketAddress address = validatedAddresses.get(0);
assertEquals("93.184.216.34", address.getHostName());
assertEquals(10000, address.getPort());
}
@Test(expected = IllegalArgumentException.class)
public void testInvalidConfig() {
ClientUtils.parseAndValidateAddresses(Arrays.asList("localhost:10000"), "random.value");
}
@Test(expected = ConfigException.class) @Test(expected = ConfigException.class)
public void testNoPort() { public void testNoPort() {
check("127.0.0.1"); checkWithoutLookup("127.0.0.1");
} }
@Test(expected = ConfigException.class) @Test(expected = ConfigException.class)
public void testOnlyBadHostname() { public void testOnlyBadHostname() {
check("some.invalid.hostname.foo.bar.local:9999"); checkWithoutLookup("some.invalid.hostname.foo.bar.local:9999");
} }
@Test @Test
@ -87,7 +105,12 @@ public class ClientUtilsTest {
assertEquals(2, ClientUtils.resolve("kafka.apache.org", ClientDnsLookup.USE_ALL_DNS_IPS).size()); assertEquals(2, ClientUtils.resolve("kafka.apache.org", ClientDnsLookup.USE_ALL_DNS_IPS).size());
} }
private List<InetSocketAddress> check(String... url) { private List<InetSocketAddress> checkWithoutLookup(String... url) {
return ClientUtils.parseAndValidateAddresses(Arrays.asList(url)); return ClientUtils.parseAndValidateAddresses(Arrays.asList(url), ClientDnsLookup.DEFAULT.toString());
} }
private List<InetSocketAddress> checkWithLookup(List<String> url) {
return ClientUtils.parseAndValidateAddresses(url, ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY.toString());
}
} }

View File

@ -27,7 +27,6 @@ import static org.junit.Assert.assertTrue;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import org.apache.kafka.common.config.ClientDnsLookup;
import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.MockTime;
import org.junit.Before; import org.junit.Before;

View File

@ -19,7 +19,6 @@ package org.apache.kafka.clients;
import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node; import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ClientDnsLookup;
import org.apache.kafka.common.network.NetworkReceive; import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.CommonFields; import org.apache.kafka.common.protocol.CommonFields;

View File

@ -17,6 +17,7 @@
package org.apache.kafka.clients.consumer.internals; package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.ClientRequest; import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientUtils; import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.FetchSessionHandler; import org.apache.kafka.clients.FetchSessionHandler;
@ -36,7 +37,6 @@ import org.apache.kafka.common.MetricNameTemplate;
import org.apache.kafka.common.Node; import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ClientDnsLookup;
import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.RecordTooLargeException; import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.errors.SerializationException;
@ -2735,7 +2735,7 @@ public class FetcherTest {
String topicName2 = "topic2"; String topicName2 = "topic2";
TopicPartition t2p0 = new TopicPartition(topicName2, 0); TopicPartition t2p0 = new TopicPartition(topicName2, 0);
// Expect a metadata refresh. // Expect a metadata refresh.
metadata.update(Cluster.bootstrap(ClientUtils.parseAndValidateAddresses(Collections.singletonList("1.1.1.1:1111"))), metadata.update(Cluster.bootstrap(ClientUtils.parseAndValidateAddresses(Collections.singletonList("1.1.1.1:1111"), ClientDnsLookup.DEFAULT.toString())),
Collections.<String>emptySet(), Collections.<String>emptySet(),
time.milliseconds()); time.milliseconds());

View File

@ -32,6 +32,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.ClientRequest; import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient; import org.apache.kafka.clients.MockClient;
@ -44,7 +45,6 @@ import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.MetricNameTemplate; import org.apache.kafka.common.MetricNameTemplate;
import org.apache.kafka.common.Node; import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ClientDnsLookup;
import org.apache.kafka.common.errors.ClusterAuthorizationException; import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.NetworkException; import org.apache.kafka.common.errors.NetworkException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException; import org.apache.kafka.common.errors.OutOfOrderSequenceException;

View File

@ -16,9 +16,9 @@
*/ */
package org.apache.kafka.connect.runtime; package org.apache.kafka.connect.runtime;
import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ClientDnsLookup;
import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.config.ConfigDef.Type;
@ -61,6 +61,9 @@ public class WorkerConfig extends AbstractConfig {
+ "than one, though, in case a server is down)."; + "than one, though, in case a server is down).";
public static final String BOOTSTRAP_SERVERS_DEFAULT = "localhost:9092"; public static final String BOOTSTRAP_SERVERS_DEFAULT = "localhost:9092";
public static final String CLIENT_DNS_LOOKUP_CONFIG = CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG;
public static final String CLIENT_DNS_LOOKUP_DOC = CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC;
public static final String KEY_CONVERTER_CLASS_CONFIG = "key.converter"; public static final String KEY_CONVERTER_CLASS_CONFIG = "key.converter";
public static final String KEY_CONVERTER_CLASS_DOC = public static final String KEY_CONVERTER_CLASS_DOC =
"Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka." + "Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka." +
@ -223,6 +226,14 @@ public class WorkerConfig extends AbstractConfig {
return new ConfigDef() return new ConfigDef()
.define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, BOOTSTRAP_SERVERS_DEFAULT, .define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, BOOTSTRAP_SERVERS_DEFAULT,
Importance.HIGH, BOOTSTRAP_SERVERS_DOC) Importance.HIGH, BOOTSTRAP_SERVERS_DOC)
.define(CLIENT_DNS_LOOKUP_CONFIG,
Type.STRING,
ClientDnsLookup.DEFAULT.toString(),
in(ClientDnsLookup.DEFAULT.toString(),
ClientDnsLookup.USE_ALL_DNS_IPS.toString(),
ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY.toString()),
Importance.MEDIUM,
CLIENT_DNS_LOOKUP_DOC)
.define(KEY_CONVERTER_CLASS_CONFIG, Type.CLASS, .define(KEY_CONVERTER_CLASS_CONFIG, Type.CLASS,
Importance.HIGH, KEY_CONVERTER_CLASS_DOC) Importance.HIGH, KEY_CONVERTER_CLASS_DOC)
.define(VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS, .define(VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS,
@ -278,13 +289,7 @@ public class WorkerConfig extends AbstractConfig {
Collections.emptyList(), Collections.emptyList(),
Importance.LOW, CONFIG_PROVIDERS_DOC) Importance.LOW, CONFIG_PROVIDERS_DOC)
.define(REST_EXTENSION_CLASSES_CONFIG, Type.LIST, "", .define(REST_EXTENSION_CLASSES_CONFIG, Type.LIST, "",
Importance.LOW, REST_EXTENSION_CLASSES_DOC) Importance.LOW, REST_EXTENSION_CLASSES_DOC);
.define(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG,
Type.STRING,
ClientDnsLookup.DEFAULT.toString(),
in(ClientDnsLookup.DEFAULT.toString(), ClientDnsLookup.USE_ALL_DNS_IPS.toString()),
Importance.MEDIUM,
CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC);
} }
private void logInternalConverterDeprecationWarnings(Map<String, String> props) { private void logInternalConverterDeprecationWarnings(Map<String, String> props) {

View File

@ -17,6 +17,7 @@
package org.apache.kafka.connect.runtime.distributed; package org.apache.kafka.connect.runtime.distributed;
import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.ClientUtils; import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.Metadata;
@ -24,7 +25,6 @@ import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient; import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.ClientDnsLookup;
import org.apache.kafka.common.metrics.JmxReporter; import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Metrics;
@ -96,7 +96,9 @@ public class WorkerGroupMember {
this.metrics = new Metrics(metricConfig, reporters, time); this.metrics = new Metrics(metricConfig, reporters, time);
this.retryBackoffMs = config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG); this.retryBackoffMs = config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG);
this.metadata = new Metadata(retryBackoffMs, config.getLong(CommonClientConfigs.METADATA_MAX_AGE_CONFIG), true); this.metadata = new Metadata(retryBackoffMs, config.getLong(CommonClientConfigs.METADATA_MAX_AGE_CONFIG), true);
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)); List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(
config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG),
config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG));
this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), 0); this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), 0);
String metricGrpPrefix = "connect"; String metricGrpPrefix = "connect";
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config); ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config);

View File

@ -23,6 +23,7 @@ import kafka.coordinator.group.GroupOverview
import kafka.utils.Logging import kafka.utils.Logging
import org.apache.kafka.clients._ import org.apache.kafka.clients._
import org.apache.kafka.clients.consumer.internals.{ConsumerNetworkClient, ConsumerProtocol, RequestFuture} import org.apache.kafka.clients.consumer.internals.{ConsumerNetworkClient, ConsumerProtocol, RequestFuture}
import org.apache.kafka.common.config.ConfigDef.ValidString._
import org.apache.kafka.common.config.ConfigDef.{Importance, Type} import org.apache.kafka.common.config.ConfigDef.{Importance, Type}
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef} import org.apache.kafka.common.config.{AbstractConfig, ConfigDef}
import org.apache.kafka.common.errors.{AuthenticationException, TimeoutException} import org.apache.kafka.common.errors.{AuthenticationException, TimeoutException}
@ -39,7 +40,6 @@ import org.apache.kafka.common.{Cluster, Node, TopicPartition}
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.util.{Failure, Success, Try} import scala.util.{Failure, Success, Try}
import org.apache.kafka.common.config.ClientDnsLookup
/** /**
* A Scala administrative client for Kafka which supports managing and inspecting topics, brokers, * A Scala administrative client for Kafka which supports managing and inspecting topics, brokers,
@ -386,6 +386,14 @@ object AdminClient {
Type.LIST, Type.LIST,
Importance.HIGH, Importance.HIGH,
CommonClientConfigs.BOOTSTRAP_SERVERS_DOC) CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
.define(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG,
Type.STRING,
ClientDnsLookup.DEFAULT.toString,
in(ClientDnsLookup.DEFAULT.toString,
ClientDnsLookup.USE_ALL_DNS_IPS.toString,
ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY.toString),
Importance.MEDIUM,
CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC)
.define( .define(
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
ConfigDef.Type.STRING, ConfigDef.Type.STRING,
@ -429,7 +437,8 @@ object AdminClient {
val retryBackoffMs = config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG) val retryBackoffMs = config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG)
val brokerUrls = config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG) val brokerUrls = config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
val brokerAddresses = ClientUtils.parseAndValidateAddresses(brokerUrls) val clientDnsLookup = config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG)
val brokerAddresses = ClientUtils.parseAndValidateAddresses(brokerUrls, clientDnsLookup)
val bootstrapCluster = Cluster.bootstrap(brokerAddresses) val bootstrapCluster = Cluster.bootstrap(brokerAddresses)
metadata.update(bootstrapCluster, Collections.emptySet(), 0) metadata.update(bootstrapCluster, Collections.emptySet(), 0)

View File

@ -39,8 +39,6 @@ import org.apache.kafka.common.{KafkaException, Node, TopicPartition}
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.collection.mutable.HashMap import scala.collection.mutable.HashMap
import scala.collection.{Set, mutable} import scala.collection.{Set, mutable}
import org.apache.kafka.common.config.ClientDnsLookup
object ControllerChannelManager { object ControllerChannelManager {
val QueueSizeMetricName = "QueueSize" val QueueSizeMetricName = "QueueSize"

View File

@ -36,7 +36,6 @@ import java.util.concurrent.{BlockingQueue, ConcurrentHashMap, LinkedBlockingQue
import collection.JavaConverters._ import collection.JavaConverters._
import scala.collection.{concurrent, immutable} import scala.collection.{concurrent, immutable}
import org.apache.kafka.common.config.ClientDnsLookup
object TransactionMarkerChannelManager { object TransactionMarkerChannelManager {
def apply(config: KafkaConfig, def apply(config: KafkaConfig,

View File

@ -37,7 +37,7 @@ import kafka.security.CredentialProvider
import kafka.security.auth.Authorizer import kafka.security.auth.Authorizer
import kafka.utils._ import kafka.utils._
import kafka.zk.{BrokerInfo, KafkaZkClient} import kafka.zk.{BrokerInfo, KafkaZkClient}
import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, NetworkClient, NetworkClientUtils} import org.apache.kafka.clients.{ApiVersions, ClientDnsLookup, ManualMetadataUpdater, NetworkClient, NetworkClientUtils}
import org.apache.kafka.common.internals.ClusterResourceListeners import org.apache.kafka.common.internals.ClusterResourceListeners
import org.apache.kafka.common.metrics.{JmxReporter, Metrics, _} import org.apache.kafka.common.metrics.{JmxReporter, Metrics, _}
import org.apache.kafka.common.network._ import org.apache.kafka.common.network._
@ -51,7 +51,6 @@ import org.apache.kafka.common.{ClusterResource, Node}
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.collection.{Map, Seq, mutable} import scala.collection.{Map, Seq, mutable}
import org.apache.kafka.common.config.ClientDnsLookup
object KafkaServer { object KafkaServer {
// Copy the subset of properties that are relevant to Logs // Copy the subset of properties that are relevant to Logs

View File

@ -30,7 +30,6 @@ import org.apache.kafka.common.Node
import org.apache.kafka.common.requests.AbstractRequest.Builder import org.apache.kafka.common.requests.AbstractRequest.Builder
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import org.apache.kafka.common.config.ClientDnsLookup
trait BlockingSend { trait BlockingSend {

View File

@ -43,7 +43,6 @@ import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.common.{Node, TopicPartition} import org.apache.kafka.common.{Node, TopicPartition}
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import org.apache.kafka.common.config.ClientDnsLookup
/** /**
* For verifying the consistency among replicas. * For verifying the consistency among replicas.

View File

@ -20,8 +20,8 @@ package org.apache.kafka.trogdor.workload;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.ClientUtils; import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.ManualMetadataUpdater; import org.apache.kafka.clients.ManualMetadataUpdater;
import org.apache.kafka.clients.NetworkClient; import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.NetworkClientUtils; import org.apache.kafka.clients.NetworkClientUtils;
@ -30,7 +30,6 @@ import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node; import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.ClientDnsLookup;
import org.apache.kafka.common.internals.KafkaFutureImpl; import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ChannelBuilder; import org.apache.kafka.common.network.ChannelBuilder;
@ -127,7 +126,8 @@ public class ConnectionStressWorker implements TaskWorker {
WorkerUtils.addConfigsToProperties(props, spec.commonClientConf(), spec.commonClientConf()); WorkerUtils.addConfigsToProperties(props, spec.commonClientConf(), spec.commonClientConf());
AdminClientConfig conf = new AdminClientConfig(props); AdminClientConfig conf = new AdminClientConfig(props);
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses( List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(
conf.getList(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG)); conf.getList(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG),
conf.getString(AdminClientConfig.CLIENT_DNS_LOOKUP_CONFIG));
ManualMetadataUpdater updater = new ManualMetadataUpdater(Cluster.bootstrap(addresses).nodes()); ManualMetadataUpdater updater = new ManualMetadataUpdater(Cluster.bootstrap(addresses).nodes());
while (true) { while (true) {
if (doneFuture.isDone()) { if (doneFuture.isDone()) {
@ -182,7 +182,7 @@ public class ConnectionStressWorker implements TaskWorker {
4096, 4096,
4096, 4096,
1000, 1000,
ClientDnsLookup.forConfig(conf.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG)), ClientDnsLookup.forConfig(conf.getString(AdminClientConfig.CLIENT_DNS_LOOKUP_CONFIG)),
Time.SYSTEM, Time.SYSTEM,
false, false,
new ApiVersions(), new ApiVersions(),