diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
index 6b6b56059c8..b08bc12950c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
@@ -245,7 +245,9 @@ public final class ClientUtils {
throttleTimeSensor,
logContext,
hostResolver,
- clientTelemetrySender);
+ clientTelemetrySender,
+ MetadataRecoveryStrategy.forName(config.getString(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG))
+ );
} catch (Throwable t) {
closeQuietly(selector, "Selector");
closeQuietly(channelBuilder, "ChannelBuilder");
diff --git a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
index f5a08da2c93..9803d487f41 100644
--- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
@@ -219,6 +219,19 @@ public class CommonClientConfigs {
public static final String DEFAULT_API_TIMEOUT_MS_DOC = "Specifies the timeout (in milliseconds) for client APIs. " +
"This configuration is used as the default timeout for all client operations that do not specify a timeout
parameter.";
+ public static final String METADATA_RECOVERY_STRATEGY_CONFIG = "metadata.recovery.strategy";
+ public static final String METADATA_RECOVERY_STRATEGY_DOC = "Controls how the client recovers when none of the brokers known to it is available. " +
+ "If set to none
, the client fails. If set to rebootstrap
, " +
+ "the client repeats the bootstrap process using bootstrap.servers
. " +
+ "Rebootstrapping is useful when a client communicates with brokers so infrequently " +
+ "that the set of brokers may change entirely before the client refreshes metadata. " +
+ "Metadata recovery is triggered when all last-known brokers appear unavailable simultaneously. " +
+ "Brokers appear unavailable when disconnected and no current retry attempt is in-progress. " +
+ "Consider increasing reconnect.backoff.ms
and reconnect.backoff.max.ms
and " +
+ "decreasing socket.connection.setup.timeout.ms
and socket.connection.setup.timeout.max.ms
" +
+ "for the client.";
+ public static final String DEFAULT_METADATA_RECOVERY_STRATEGY = MetadataRecoveryStrategy.NONE.name;
+
/**
* Postprocess the configuration so that exponential backoff is disabled when reconnect backoff
* is explicitly configured but the maximum reconnect backoff is not explicitly configured.
diff --git a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
index a03d57b40f8..46b64986064 100644
--- a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
@@ -130,7 +130,7 @@ public interface KafkaClient extends Closeable {
* @param now The current time in ms
* @return The node with the fewest in-flight requests.
*/
- Node leastLoadedNode(long now);
+ LeastLoadedNode leastLoadedNode(long now);
/**
* The number of currently in-flight requests for which we have not yet returned a response
diff --git a/clients/src/main/java/org/apache/kafka/clients/LeastLoadedNode.java b/clients/src/main/java/org/apache/kafka/clients/LeastLoadedNode.java
new file mode 100644
index 00000000000..b2b93e6c94b
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/LeastLoadedNode.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import org.apache.kafka.common.Node;
+
+public class LeastLoadedNode {
+ private final Node node;
+ private final boolean atLeastOneConnectionReady;
+
+ public LeastLoadedNode(Node node, boolean atLeastOneConnectionReady) {
+ this.node = node;
+ this.atLeastOneConnectionReady = atLeastOneConnectionReady;
+ }
+
+ public Node node() {
+ return node;
+ }
+
+ /**
+ * Indicates if the least loaded node is available or at least a ready connection exists.
+ *
+ *
There may be no node available while ready connections to live nodes exist. This may happen when
+ * the connections are overloaded with in-flight requests. This function takes this into account.
+ */
+ public boolean hasNodeAvailableOrConnectionReady() {
+ return node != null || atLeastOneConnectionReady;
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index 30cad44a4bc..9246da01000 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -82,6 +82,8 @@ public class Metadata implements Closeable {
private final ClusterResourceListeners clusterResourceListeners;
private boolean isClosed;
private final Map lastSeenLeaderEpochs;
+ /** Addresses with which the metadata was originally bootstrapped. */
+ private List bootstrapAddresses;
/**
* Create a new Metadata instance
@@ -304,6 +306,12 @@ public class Metadata implements Closeable {
this.needFullUpdate = true;
this.updateVersion += 1;
this.metadataSnapshot = MetadataSnapshot.bootstrap(addresses);
+ this.bootstrapAddresses = addresses;
+ }
+
+ public synchronized void rebootstrap() {
+ log.info("Rebootstrapping with {}", this.bootstrapAddresses);
+ this.bootstrap(this.bootstrapAddresses);
}
/**
diff --git a/clients/src/main/java/org/apache/kafka/clients/MetadataRecoveryStrategy.java b/clients/src/main/java/org/apache/kafka/clients/MetadataRecoveryStrategy.java
new file mode 100644
index 00000000000..a4e0340c241
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/MetadataRecoveryStrategy.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import java.util.Locale;
+
+/**
+ * Defines the strategies which clients can follow to deal with the situation when none of the known nodes is available.
+ */
+public enum MetadataRecoveryStrategy {
+ NONE("none"),
+ REBOOTSTRAP("rebootstrap");
+
+ public final String name;
+
+ MetadataRecoveryStrategy(String name) {
+ this.name = name;
+ }
+
+ public static MetadataRecoveryStrategy forName(String name) {
+ if (name == null) {
+ throw new IllegalArgumentException("Illegal MetadataRecoveryStrategy: null");
+ }
+ try {
+ return MetadataRecoveryStrategy.valueOf(name.toUpperCase(Locale.ROOT));
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException("Illegal MetadataRecoveryStrategy: " + name);
+ }
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 3a7af6617e7..8ac92acd884 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -114,6 +114,8 @@ public class NetworkClient implements KafkaClient {
/* time in ms to wait before retrying to create connection to a server */
private final long reconnectBackoffMs;
+ private final MetadataRecoveryStrategy metadataRecoveryStrategy;
+
private final Time time;
/**
@@ -147,7 +149,8 @@ public class NetworkClient implements KafkaClient {
Time time,
boolean discoverBrokerVersions,
ApiVersions apiVersions,
- LogContext logContext) {
+ LogContext logContext,
+ MetadataRecoveryStrategy metadataRecoveryStrategy) {
this(selector,
metadata,
clientId,
@@ -163,7 +166,8 @@ public class NetworkClient implements KafkaClient {
discoverBrokerVersions,
apiVersions,
null,
- logContext);
+ logContext,
+ metadataRecoveryStrategy);
}
public NetworkClient(Selectable selector,
@@ -181,7 +185,8 @@ public class NetworkClient implements KafkaClient {
boolean discoverBrokerVersions,
ApiVersions apiVersions,
Sensor throttleTimeSensor,
- LogContext logContext) {
+ LogContext logContext,
+ MetadataRecoveryStrategy metadataRecoveryStrategy) {
this(null,
metadata,
selector,
@@ -200,7 +205,8 @@ public class NetworkClient implements KafkaClient {
throttleTimeSensor,
logContext,
new DefaultHostResolver(),
- null);
+ null,
+ metadataRecoveryStrategy);
}
public NetworkClient(Selectable selector,
@@ -217,7 +223,8 @@ public class NetworkClient implements KafkaClient {
Time time,
boolean discoverBrokerVersions,
ApiVersions apiVersions,
- LogContext logContext) {
+ LogContext logContext,
+ MetadataRecoveryStrategy metadataRecoveryStrategy) {
this(metadataUpdater,
null,
selector,
@@ -236,7 +243,8 @@ public class NetworkClient implements KafkaClient {
null,
logContext,
new DefaultHostResolver(),
- null);
+ null,
+ metadataRecoveryStrategy);
}
public NetworkClient(MetadataUpdater metadataUpdater,
@@ -257,7 +265,8 @@ public class NetworkClient implements KafkaClient {
Sensor throttleTimeSensor,
LogContext logContext,
HostResolver hostResolver,
- ClientTelemetrySender clientTelemetrySender) {
+ ClientTelemetrySender clientTelemetrySender,
+ MetadataRecoveryStrategy metadataRecoveryStrategy) {
/* It would be better if we could pass `DefaultMetadataUpdater` from the public constructor, but it's not
* possible because `DefaultMetadataUpdater` is an inner class and it can only be instantiated after the
* super constructor is invoked.
@@ -288,6 +297,7 @@ public class NetworkClient implements KafkaClient {
this.log = logContext.logger(NetworkClient.class);
this.state = new AtomicReference<>(State.ACTIVE);
this.telemetrySender = (clientTelemetrySender != null) ? new TelemetrySender(clientTelemetrySender) : null;
+ this.metadataRecoveryStrategy = metadataRecoveryStrategy;
}
/**
@@ -695,7 +705,7 @@ public class NetworkClient implements KafkaClient {
* @return The node with the fewest in-flight requests.
*/
@Override
- public Node leastLoadedNode(long now) {
+ public LeastLoadedNode leastLoadedNode(long now) {
List nodes = this.metadataUpdater.fetchNodes();
if (nodes.isEmpty())
throw new IllegalStateException("There are no nodes in the Kafka cluster");
@@ -705,16 +715,25 @@ public class NetworkClient implements KafkaClient {
Node foundCanConnect = null;
Node foundReady = null;
+ boolean atLeastOneConnectionReady = false;
+
int offset = this.randOffset.nextInt(nodes.size());
for (int i = 0; i < nodes.size(); i++) {
int idx = (offset + i) % nodes.size();
Node node = nodes.get(idx);
+
+ if (!atLeastOneConnectionReady
+ && connectionStates.isReady(node.idString(), now)
+ && selector.isChannelReady(node.idString())) {
+ atLeastOneConnectionReady = true;
+ }
+
if (canSendRequest(node.idString(), now)) {
int currInflight = this.inFlightRequests.count(node.idString());
if (currInflight == 0) {
// if we find an established connection with no in-flight requests we can stop right away
log.trace("Found least loaded node {} connected with no in-flight requests", node);
- return node;
+ return new LeastLoadedNode(node, true);
} else if (currInflight < inflight) {
// otherwise if this is the best we have found so far, record that
inflight = currInflight;
@@ -738,16 +757,16 @@ public class NetworkClient implements KafkaClient {
// which are being established before connecting to new nodes.
if (foundReady != null) {
log.trace("Found least loaded node {} with {} inflight requests", foundReady, inflight);
- return foundReady;
+ return new LeastLoadedNode(foundReady, atLeastOneConnectionReady);
} else if (foundConnecting != null) {
log.trace("Found least loaded connecting node {}", foundConnecting);
- return foundConnecting;
+ return new LeastLoadedNode(foundConnecting, atLeastOneConnectionReady);
} else if (foundCanConnect != null) {
log.trace("Found least loaded node {} with no active connection", foundCanConnect);
- return foundCanConnect;
+ return new LeastLoadedNode(foundCanConnect, atLeastOneConnectionReady);
} else {
log.trace("Least loaded node selection failed to find an available node");
- return null;
+ return new LeastLoadedNode(null, atLeastOneConnectionReady);
}
}
@@ -1122,13 +1141,22 @@ public class NetworkClient implements KafkaClient {
// Beware that the behavior of this method and the computation of timeouts for poll() are
// highly dependent on the behavior of leastLoadedNode.
- Node node = leastLoadedNode(now);
- if (node == null) {
+ LeastLoadedNode leastLoadedNode = leastLoadedNode(now);
+
+ // Rebootstrap if needed and configured.
+ if (metadataRecoveryStrategy == MetadataRecoveryStrategy.REBOOTSTRAP
+ && !leastLoadedNode.hasNodeAvailableOrConnectionReady()) {
+ metadata.rebootstrap();
+
+ leastLoadedNode = leastLoadedNode(now);
+ }
+
+ if (leastLoadedNode.node() == null) {
log.debug("Give up sending metadata request since no node is available");
return reconnectBackoffMs;
}
- return maybeUpdate(now, node);
+ return maybeUpdate(now, leastLoadedNode.node());
}
@Override
@@ -1266,7 +1294,7 @@ public class NetworkClient implements KafkaClient {
// Per KIP-714, let's continue to re-use the same broker for as long as possible.
if (stickyNode == null) {
- stickyNode = leastLoadedNode(now);
+ stickyNode = leastLoadedNode(now).node();
if (stickyNode == null) {
log.debug("Give up sending telemetry request since no node is available");
return reconnectBackoffMs;
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
index d740283f506..b64338b1d1b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
@@ -19,6 +19,7 @@ package org.apache.kafka.clients.admin;
import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.MetadataRecoveryStrategy;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
@@ -139,6 +140,10 @@ public class AdminClientConfig extends AbstractConfig {
public static final String RETRIES_CONFIG = CommonClientConfigs.RETRIES_CONFIG;
public static final String DEFAULT_API_TIMEOUT_MS_CONFIG = CommonClientConfigs.DEFAULT_API_TIMEOUT_MS_CONFIG;
+ public static final String METADATA_RECOVERY_STRATEGY_CONFIG = CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG;
+ public static final String METADATA_RECOVERY_STRATEGY_DOC = CommonClientConfigs.METADATA_RECOVERY_STRATEGY_DOC;
+ public static final String DEFAULT_METADATA_RECOVERY_STRATEGY = CommonClientConfigs.DEFAULT_METADATA_RECOVERY_STRATEGY;
+
/**
* security.providers
*/
@@ -262,7 +267,14 @@ public class AdminClientConfig extends AbstractConfig {
Importance.MEDIUM,
SECURITY_PROTOCOL_DOC)
.withClientSslSupport()
- .withClientSaslSupport();
+ .withClientSaslSupport()
+ .define(METADATA_RECOVERY_STRATEGY_CONFIG,
+ Type.STRING,
+ DEFAULT_METADATA_RECOVERY_STRATEGY,
+ ConfigDef.CaseInsensitiveValidString
+ .in(Utils.enumOptions(MetadataRecoveryStrategy.class)),
+ Importance.LOW,
+ METADATA_RECOVERY_STRATEGY_DOC);
}
@Override
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 55646e27820..8f693c9965a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -25,6 +25,8 @@ import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.DefaultHostResolver;
import org.apache.kafka.clients.HostResolver;
import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.LeastLoadedNode;
+import org.apache.kafka.clients.MetadataRecoveryStrategy;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.StaleMetadataException;
import org.apache.kafka.clients.admin.CreateTopicsResult.TopicMetadataAndConfig;
@@ -399,6 +401,7 @@ public class KafkaAdminClient extends AdminClient {
private final long retryBackoffMaxMs;
private final ExponentialBackoff retryBackoff;
private final boolean clientTelemetryEnabled;
+ private final MetadataRecoveryStrategy metadataRecoveryStrategy;
/**
* The telemetry requests client instance id.
@@ -612,6 +615,7 @@ public class KafkaAdminClient extends AdminClient {
retryBackoffMaxMs,
CommonClientConfigs.RETRY_BACKOFF_JITTER);
this.clientTelemetryEnabled = config.getBoolean(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG);
+ this.metadataRecoveryStrategy = MetadataRecoveryStrategy.forName(config.getString(AdminClientConfig.METADATA_RECOVERY_STRATEGY_CONFIG));
config.logUnused();
AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds());
log.debug("Kafka admin client initialized");
@@ -698,7 +702,13 @@ public class KafkaAdminClient extends AdminClient {
private class MetadataUpdateNodeIdProvider implements NodeProvider {
@Override
public Node provide() {
- return client.leastLoadedNode(time.milliseconds());
+ LeastLoadedNode leastLoadedNode = client.leastLoadedNode(time.milliseconds());
+ if (metadataRecoveryStrategy == MetadataRecoveryStrategy.REBOOTSTRAP
+ && !leastLoadedNode.hasNodeAvailableOrConnectionReady()) {
+ metadataManager.rebootstrap(time.milliseconds());
+ }
+
+ return leastLoadedNode.node();
}
@Override
@@ -780,7 +790,7 @@ public class KafkaAdminClient extends AdminClient {
if (metadataManager.isReady()) {
// This may return null if all nodes are busy.
// In that case, we will postpone node assignment.
- return client.leastLoadedNode(time.milliseconds());
+ return client.leastLoadedNode(time.milliseconds()).node();
}
metadataManager.requestUpdate();
return null;
@@ -835,7 +845,7 @@ public class KafkaAdminClient extends AdminClient {
} else {
// This may return null if all nodes are busy.
// In that case, we will postpone node assignment.
- return client.leastLoadedNode(time.milliseconds());
+ return client.leastLoadedNode(time.milliseconds()).node();
}
}
metadataManager.requestUpdate();
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
index f8123c40eff..239f6eecef0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
@@ -92,6 +92,11 @@ public class AdminMetadataManager {
*/
private ApiException fatalException = null;
+ /**
+ * The cluster with which the metadata was bootstrapped.
+ */
+ private Cluster bootstrapCluster;
+
public class AdminMetadataUpdater implements MetadataUpdater {
@Override
public List fetchNodes() {
@@ -275,6 +280,7 @@ public class AdminMetadataManager {
public void update(Cluster cluster, long now) {
if (cluster.isBootstrapConfigured()) {
log.debug("Setting bootstrap cluster metadata {}.", cluster);
+ bootstrapCluster = cluster;
} else {
log.debug("Updating cluster metadata to {}", cluster);
this.lastMetadataUpdateMs = now;
@@ -287,4 +293,12 @@ public class AdminMetadataManager {
this.cluster = cluster;
}
}
+
+ /**
+ * Rebootstrap metadata with the cluster previously used for bootstrapping.
+ */
+ public void rebootstrap(long now) {
+ log.info("Rebootstrapping with {}", this.bootstrapCluster);
+ update(bootstrapCluster, now);
+ }
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 76bfe7e91a1..c4c10c404b4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -18,6 +18,7 @@ package org.apache.kafka.clients.consumer;
import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.MetadataRecoveryStrategy;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
@@ -656,7 +657,14 @@ public class ConsumerConfig extends AbstractConfig {
Importance.MEDIUM,
CommonClientConfigs.SECURITY_PROTOCOL_DOC)
.withClientSslSupport()
- .withClientSaslSupport();
+ .withClientSaslSupport()
+ .define(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG,
+ Type.STRING,
+ CommonClientConfigs.DEFAULT_METADATA_RECOVERY_STRATEGY,
+ ConfigDef.CaseInsensitiveValidString
+ .in(Utils.enumOptions(MetadataRecoveryStrategy.class)),
+ Importance.LOW,
+ CommonClientConfigs.METADATA_RECOVERY_STRATEGY_DOC);
}
@Override
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index 4699f00c151..50c7eb5b028 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -139,7 +139,7 @@ public class ConsumerNetworkClient implements Closeable {
public Node leastLoadedNode() {
lock.lock();
try {
- return client.leastLoadedNode(time.milliseconds());
+ return client.leastLoadedNode(time.milliseconds()).node();
} finally {
lock.unlock();
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
index e2e4d529c00..d069a0d1fb6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
@@ -163,7 +163,7 @@ public class NetworkClientDelegate implements AutoCloseable {
}
boolean doSend(final UnsentRequest r, final long currentTimeMs) {
- Node node = r.node.orElse(client.leastLoadedNode(currentTimeMs));
+ Node node = r.node.orElse(client.leastLoadedNode(currentTimeMs).node());
if (node == null || nodeUnavailable(node)) {
log.debug("No broker available to send the request: {}. Retrying.", r);
return false;
@@ -208,7 +208,7 @@ public class NetworkClientDelegate implements AutoCloseable {
}
public Node leastLoadedNode() {
- return this.client.leastLoadedNode(time.milliseconds());
+ return this.client.leastLoadedNode(time.milliseconds()).node();
}
public void wakeup() {
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index c67d60a180a..a59ee81b4a9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -18,6 +18,7 @@ package org.apache.kafka.clients.producer;
import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.MetadataRecoveryStrategy;
import org.apache.kafka.common.compress.GzipCompression;
import org.apache.kafka.common.compress.Lz4Compression;
import org.apache.kafka.common.compress.ZstdCompression;
@@ -528,7 +529,14 @@ public class ProducerConfig extends AbstractConfig {
null,
new ConfigDef.NonEmptyString(),
Importance.LOW,
- TRANSACTIONAL_ID_DOC);
+ TRANSACTIONAL_ID_DOC)
+ .define(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG,
+ Type.STRING,
+ CommonClientConfigs.DEFAULT_METADATA_RECOVERY_STRATEGY,
+ ConfigDef.CaseInsensitiveValidString
+ .in(Utils.enumOptions(MetadataRecoveryStrategy.class)),
+ Importance.LOW,
+ CommonClientConfigs.METADATA_RECOVERY_STRATEGY_DOC);
}
@Override
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index c4e2b73e8b9..b1a3ab9e293 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -484,7 +484,7 @@ public class Sender implements Runnable {
FindCoordinatorRequest.CoordinatorType coordinatorType = nextRequestHandler.coordinatorType();
targetNode = coordinatorType != null ?
transactionManager.coordinator(coordinatorType) :
- client.leastLoadedNode(time.milliseconds());
+ client.leastLoadedNode(time.milliseconds()).node();
if (targetNode != null) {
if (!awaitNodeReady(targetNode, coordinatorType)) {
log.trace("Target node {} not ready within request timeout, will retry when node is ready.", targetNode);
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index d6cdd14f365..86d1ddf5f41 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -319,7 +319,7 @@ public class MockClient implements KafkaClient {
checkTimeoutOfPendingRequests(now);
// We skip metadata updates if all nodes are currently blacked out
- if (metadataUpdater.isUpdateNeeded() && leastLoadedNode(now) != null) {
+ if (metadataUpdater.isUpdateNeeded() && leastLoadedNode(now).node() != null) {
MetadataUpdate metadataUpdate = metadataUpdates.poll();
if (metadataUpdate != null) {
metadataUpdater.update(time, metadataUpdate);
@@ -588,13 +588,13 @@ public class MockClient implements KafkaClient {
}
@Override
- public Node leastLoadedNode(long now) {
+ public LeastLoadedNode leastLoadedNode(long now) {
// Consistent with NetworkClient, we do not return nodes awaiting reconnect backoff
for (Node node : metadataUpdater.fetchNodes()) {
if (!connectionState(node.idString()).isBackingOff(now))
- return node;
+ return new LeastLoadedNode(node, true);
}
- return null;
+ return new LeastLoadedNode(null, false);
}
public void setWakeupHook(Runnable wakeupHook) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index cd3ec36f385..4369c8404e1 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -128,7 +128,16 @@ public class NetworkClientTest {
private NetworkClient createNetworkClient(long reconnectBackoffMaxMs) {
return new NetworkClient(selector, metadataUpdater, "mock", Integer.MAX_VALUE,
reconnectBackoffMsTest, reconnectBackoffMaxMs, 64 * 1024, 64 * 1024,
- defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest, time, true, new ApiVersions(), new LogContext());
+ defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest, time, true, new ApiVersions(), new LogContext(),
+ MetadataRecoveryStrategy.NONE);
+ }
+
+ private NetworkClient createNetworkClientWithMaxInFlightRequestsPerConnection(
+ int maxInFlightRequestsPerConnection, long reconnectBackoffMaxMs) {
+ return new NetworkClient(selector, metadataUpdater, "mock", maxInFlightRequestsPerConnection,
+ reconnectBackoffMsTest, reconnectBackoffMaxMs, 64 * 1024, 64 * 1024,
+ defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest, time, true, new ApiVersions(), new LogContext(),
+ MetadataRecoveryStrategy.NONE);
}
private NetworkClient createNetworkClientWithMultipleNodes(long reconnectBackoffMaxMs, long connectionSetupTimeoutMsTest, int nodeNumber) {
@@ -136,26 +145,30 @@ public class NetworkClientTest {
TestMetadataUpdater metadataUpdater = new TestMetadataUpdater(nodes);
return new NetworkClient(selector, metadataUpdater, "mock", Integer.MAX_VALUE,
reconnectBackoffMsTest, reconnectBackoffMaxMs, 64 * 1024, 64 * 1024,
- defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest, time, true, new ApiVersions(), new LogContext());
+ defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest, time, true, new ApiVersions(), new LogContext(),
+ MetadataRecoveryStrategy.NONE);
}
private NetworkClient createNetworkClientWithStaticNodes() {
return new NetworkClient(selector, metadataUpdater,
"mock-static", Integer.MAX_VALUE, 0, 0, 64 * 1024, 64 * 1024, defaultRequestTimeoutMs,
- connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest, time, true, new ApiVersions(), new LogContext());
+ connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest, time, true, new ApiVersions(), new LogContext(),
+ MetadataRecoveryStrategy.NONE);
}
private NetworkClient createNetworkClientWithNoVersionDiscovery(Metadata metadata) {
return new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE,
reconnectBackoffMsTest, 0, 64 * 1024, 64 * 1024,
- defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest, time, false, new ApiVersions(), new LogContext());
+ defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest, time, false, new ApiVersions(), new LogContext(),
+ MetadataRecoveryStrategy.NONE);
}
private NetworkClient createNetworkClientWithNoVersionDiscovery() {
return new NetworkClient(selector, metadataUpdater, "mock", Integer.MAX_VALUE,
reconnectBackoffMsTest, reconnectBackoffMaxMsTest,
64 * 1024, 64 * 1024, defaultRequestTimeoutMs,
- connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest, time, false, new ApiVersions(), new LogContext());
+ connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest, time, false, new ApiVersions(), new LogContext(),
+ MetadataRecoveryStrategy.NONE);
}
@BeforeEach
@@ -698,14 +711,18 @@ public class NetworkClientTest {
public void testLeastLoadedNode() {
client.ready(node, time.milliseconds());
assertFalse(client.isReady(node, time.milliseconds()));
- assertEquals(node, client.leastLoadedNode(time.milliseconds()));
+ LeastLoadedNode leastLoadedNode = client.leastLoadedNode(time.milliseconds());
+ assertEquals(node, leastLoadedNode.node());
+ assertTrue(leastLoadedNode.hasNodeAvailableOrConnectionReady());
awaitReady(client, node);
client.poll(1, time.milliseconds());
assertTrue(client.isReady(node, time.milliseconds()), "The client should be ready");
// leastloadednode should be our single node
- Node leastNode = client.leastLoadedNode(time.milliseconds());
+ leastLoadedNode = client.leastLoadedNode(time.milliseconds());
+ assertTrue(leastLoadedNode.hasNodeAvailableOrConnectionReady());
+ Node leastNode = leastLoadedNode.node();
assertEquals(leastNode.id(), node.id(), "There should be one leastloadednode");
// sleep for longer than reconnect backoff
@@ -716,8 +733,29 @@ public class NetworkClientTest {
client.poll(1, time.milliseconds());
assertFalse(client.ready(node, time.milliseconds()), "After we forced the disconnection the client is no longer ready.");
- leastNode = client.leastLoadedNode(time.milliseconds());
- assertNull(leastNode, "There should be NO leastloadednode");
+ leastLoadedNode = client.leastLoadedNode(time.milliseconds());
+ assertFalse(leastLoadedNode.hasNodeAvailableOrConnectionReady());
+ assertNull(leastLoadedNode.node(), "There should be NO leastloadednode");
+ }
+
+ @Test
+ public void testHasNodeAvailableOrConnectionReady() {
+ NetworkClient client = createNetworkClientWithMaxInFlightRequestsPerConnection(1, reconnectBackoffMaxMsTest);
+ awaitReady(client, node);
+
+ long now = time.milliseconds();
+ LeastLoadedNode leastLoadedNode = client.leastLoadedNode(now);
+ assertEquals(node, leastLoadedNode.node());
+ assertTrue(leastLoadedNode.hasNodeAvailableOrConnectionReady());
+
+ MetadataRequest.Builder builder = new MetadataRequest.Builder(Collections.emptyList(), true);
+ ClientRequest request = client.newClientRequest(node.idString(), builder, now, true);
+ client.send(request, now);
+ client.poll(defaultRequestTimeoutMs, now);
+
+ leastLoadedNode = client.leastLoadedNode(now);
+ assertNull(leastLoadedNode.node());
+ assertTrue(leastLoadedNode.hasNodeAvailableOrConnectionReady());
}
@Test
@@ -727,7 +765,7 @@ public class NetworkClientTest {
Set providedNodeIds = new HashSet<>();
for (int i = 0; i < nodeNumber * 10; i++) {
- Node node = client.leastLoadedNode(time.milliseconds());
+ Node node = client.leastLoadedNode(time.milliseconds()).node();
assertNotNull(node, "Should provide a node");
providedNodeIds.add(node);
client.ready(node, time.milliseconds());
@@ -800,7 +838,7 @@ public class NetworkClientTest {
client.poll(1, time.milliseconds());
// leastloadednode should return null since the node is throttled
- assertNull(client.leastLoadedNode(time.milliseconds()));
+ assertNull(client.leastLoadedNode(time.milliseconds()).node());
}
@Test
@@ -1046,7 +1084,8 @@ public class NetworkClientTest {
NetworkClient client = new NetworkClient(metadataUpdater, null, selector, "mock", Integer.MAX_VALUE,
reconnectBackoffMsTest, reconnectBackoffMaxMsTest, 64 * 1024, 64 * 1024,
defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest,
- time, false, new ApiVersions(), null, new LogContext(), mockHostResolver, mockClientTelemetrySender);
+ time, false, new ApiVersions(), null, new LogContext(), mockHostResolver, mockClientTelemetrySender,
+ MetadataRecoveryStrategy.NONE);
// Connect to one the initial addresses, then change the addresses and disconnect
client.ready(node, time.milliseconds());
@@ -1106,7 +1145,8 @@ public class NetworkClientTest {
NetworkClient client = new NetworkClient(metadataUpdater, null, selector, "mock", Integer.MAX_VALUE,
reconnectBackoffMsTest, reconnectBackoffMaxMsTest, 64 * 1024, 64 * 1024,
defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest,
- time, false, new ApiVersions(), null, new LogContext(), mockHostResolver, mockClientTelemetrySender);
+ time, false, new ApiVersions(), null, new LogContext(), mockHostResolver, mockClientTelemetrySender,
+ MetadataRecoveryStrategy.NONE);
// First connection attempt should fail
client.ready(node, time.milliseconds());
@@ -1158,7 +1198,8 @@ public class NetworkClientTest {
NetworkClient client = new NetworkClient(metadataUpdater, null, selector, "mock", Integer.MAX_VALUE,
reconnectBackoffMsTest, reconnectBackoffMaxMsTest, 64 * 1024, 64 * 1024,
defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest,
- time, false, new ApiVersions(), null, new LogContext(), mockHostResolver, mockClientTelemetrySender);
+ time, false, new ApiVersions(), null, new LogContext(), mockHostResolver, mockClientTelemetrySender,
+ MetadataRecoveryStrategy.NONE);
// Connect to one the initial addresses, then change the addresses and disconnect
client.ready(node, time.milliseconds());
@@ -1266,7 +1307,8 @@ public class NetworkClientTest {
NetworkClient client = new NetworkClient(metadataUpdater, null, selector, "mock", Integer.MAX_VALUE,
reconnectBackoffMsTest, reconnectBackoffMaxMsTest, 64 * 1024, 64 * 1024,
defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest,
- time, true, new ApiVersions(), null, new LogContext(), new DefaultHostResolver(), mockClientTelemetrySender);
+ time, true, new ApiVersions(), null, new LogContext(), new DefaultHostResolver(), mockClientTelemetrySender,
+ MetadataRecoveryStrategy.NONE);
// Send the ApiVersionsRequest
client.ready(node, time.milliseconds());
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientConfigTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientConfigTest.java
new file mode 100644
index 00000000000..92dc56fde46
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientConfigTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.MetadataRecoveryStrategy;
+import org.apache.kafka.common.config.ConfigException;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class AdminClientConfigTest {
+ @Test
+ public void testDefaultMetadataRecoveryStrategy() {
+ Map configs = new HashMap<>();
+ final AdminClientConfig adminClientConfig = new AdminClientConfig(configs);
+ assertEquals(MetadataRecoveryStrategy.NONE.name, adminClientConfig.getString(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG));
+ }
+
+ @Test
+ public void testInvalidMetadataRecoveryStrategy() {
+ Map configs = new HashMap<>();
+ configs.put(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG, "abc");
+ ConfigException ce = assertThrows(ConfigException.class, () -> new AdminClientConfig(configs));
+ assertTrue(ce.getMessage().contains(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG));
+ }
+}
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java
index 8e9fa5722fc..0fc8e6ca485 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.clients.consumer;
import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.MetadataRecoveryStrategy;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.security.auth.SecurityProtocol;
@@ -191,6 +192,25 @@ public class ConsumerConfigTest {
assertEquals(remoteAssignorName, consumerConfig.getString(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG));
}
+ @Test
+ public void testDefaultMetadataRecoveryStrategy() {
+ Map configs = new HashMap<>();
+ configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClass);
+ configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass);
+ final ConsumerConfig consumerConfig = new ConsumerConfig(configs);
+ assertEquals(MetadataRecoveryStrategy.NONE.name, consumerConfig.getString(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG));
+ }
+
+ @Test
+ public void testInvalidMetadataRecoveryStrategy() {
+ Map configs = new HashMap<>();
+ configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClass);
+ configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass);
+ configs.put(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG, "abc");
+ ConfigException ce = assertThrows(ConfigException.class, () -> new ConsumerConfig(configs));
+ assertTrue(ce.getMessage().contains(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG));
+ }
+
@ParameterizedTest
@CsvSource({"consumer, true", "classic, true", "Consumer, true", "Classic, true", "invalid, false"})
public void testProtocolConfigValidation(String protocol, boolean isValid) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
index eaabcb8f814..71a267b5246 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MetadataRecoveryStrategy;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.NodeApiVersions;
@@ -1909,7 +1910,8 @@ public class FetchRequestManagerTest {
Node node = cluster.nodes().get(0);
NetworkClient client = new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE,
1000, 1000, 64 * 1024, 64 * 1024, 1000, 10 * 1000, 127 * 1000,
- time, true, new ApiVersions(), metricsManager.throttleTimeSensor(), new LogContext());
+ time, true, new ApiVersions(), metricsManager.throttleTimeSensor(), new LogContext(),
+ MetadataRecoveryStrategy.NONE);
ApiVersionsResponse apiVersionsResponse = TestUtils.defaultApiVersionsResponse(
400, ApiMessageType.ListenerType.ZK_BROKER);
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 1eac8709934..d0167e9b989 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.FetchSessionHandler;
import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MetadataRecoveryStrategy;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.NodeApiVersions;
@@ -1905,7 +1906,8 @@ public class FetcherTest {
Node node = cluster.nodes().get(0);
NetworkClient client = new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE,
1000, 1000, 64 * 1024, 64 * 1024, 1000, 10 * 1000, 127 * 1000,
- time, true, new ApiVersions(), metricsManager.throttleTimeSensor(), new LogContext());
+ time, true, new ApiVersions(), metricsManager.throttleTimeSensor(), new LogContext(),
+ MetadataRecoveryStrategy.NONE);
ApiVersionsResponse apiVersionsResponse = TestUtils.defaultApiVersionsResponse(
400, ApiMessageType.ListenerType.ZK_BROKER);
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 7d4aa5e3a85..17119a25290 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.clients.producer;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.LeastLoadedNode;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -735,8 +736,8 @@ public class KafkaProducerTest {
// let mockClient#leastLoadedNode return the node directly so that we can isolate Metadata calls from KafkaProducer for idempotent producer
MockClient mockClient = new MockClient(Time.SYSTEM, metadata) {
@Override
- public Node leastLoadedNode(long now) {
- return NODE;
+ public LeastLoadedNode leastLoadedNode(long now) {
+ return new LeastLoadedNode(NODE, true);
}
};
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
index d7952320e9f..eba5e8a0a7b 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.clients.producer;
import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.MetadataRecoveryStrategy;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.ByteArraySerializer;
@@ -98,6 +99,25 @@ public class ProducerConfigTest {
assertTrue(ce.getMessage().contains(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
}
+ @Test
+ public void testDefaultMetadataRecoveryStrategy() {
+ Map configs = new HashMap<>();
+ configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass);
+ configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass);
+ final ProducerConfig producerConfig = new ProducerConfig(configs);
+ assertEquals(MetadataRecoveryStrategy.NONE.name, producerConfig.getString(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG));
+ }
+
+ @Test
+ public void testInvalidMetadataRecoveryStrategy() {
+ Map configs = new HashMap<>();
+ configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass);
+ configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass);
+ configs.put(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG, "abc");
+ ConfigException ce = assertThrows(ConfigException.class, () -> new ProducerConfig(configs));
+ assertTrue(ce.getMessage().contains(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG));
+ }
+
@Test
public void testCaseInsensitiveSecurityProtocol() {
final String saslSslLowerCase = SecurityProtocol.SASL_SSL.name.toLowerCase(Locale.ROOT);
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 5c1088987eb..cfeefc0ae5f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -19,7 +19,9 @@ package org.apache.kafka.clients.producer.internals;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.LeastLoadedNode;
import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MetadataRecoveryStrategy;
import org.apache.kafka.clients.MetadataSnapshot;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NetworkClient;
@@ -299,7 +301,8 @@ public class SenderTest {
Node node = cluster.nodes().get(0);
NetworkClient client = new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE,
1000, 1000, 64 * 1024, 64 * 1024, 1000, 10 * 1000, 127 * 1000,
- time, true, new ApiVersions(), throttleTimeSensor, logContext);
+ time, true, new ApiVersions(), throttleTimeSensor, logContext,
+ MetadataRecoveryStrategy.NONE);
ApiVersionsResponse apiVersionsResponse = TestUtils.defaultApiVersionsResponse(
400, ApiMessageType.ListenerType.ZK_BROKER);
@@ -3797,12 +3800,12 @@ public class SenderTest {
client = new MockClient(time, metadata) {
volatile boolean canSendMore = true;
@Override
- public Node leastLoadedNode(long now) {
+ public LeastLoadedNode leastLoadedNode(long now) {
for (Node node : metadata.fetch().nodes()) {
if (isReady(node, now) && canSendMore)
- return node;
+ return new LeastLoadedNode(node, true);
}
- return null;
+ return new LeastLoadedNode(null, false);
}
@Override
@@ -3821,7 +3824,7 @@ public class SenderTest {
while (!client.ready(node, time.milliseconds()))
client.poll(0, time.milliseconds());
client.send(request, time.milliseconds());
- while (client.leastLoadedNode(time.milliseconds()) != null)
+ while (client.leastLoadedNode(time.milliseconds()).node() != null)
client.poll(0, time.milliseconds());
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
index 14826e982d6..1880fa512df 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
@@ -17,6 +17,7 @@
package org.apache.kafka.connect.runtime.distributed;
import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.MetadataRecoveryStrategy;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
@@ -96,6 +97,10 @@ public class DistributedConfig extends WorkerConfig {
public static final String REBALANCE_TIMEOUT_MS_CONFIG = CommonClientConfigs.REBALANCE_TIMEOUT_MS_CONFIG;
private static final String REBALANCE_TIMEOUT_MS_DOC = CommonClientConfigs.REBALANCE_TIMEOUT_MS_DOC;
+ public static final String METADATA_RECOVERY_STRATEGY_CONFIG = CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG;
+ private static final String METADATA_RECOVERY_STRATEGY_DOC = CommonClientConfigs.METADATA_RECOVERY_STRATEGY_DOC;
+ public static final String DEFAULT_METADATA_RECOVERY_STRATEGY = CommonClientConfigs.DEFAULT_METADATA_RECOVERY_STRATEGY;
+
/**
* worker.sync.timeout.ms
*/
@@ -512,7 +517,14 @@ public class DistributedConfig extends WorkerConfig {
(name, value) -> validateVerificationAlgorithms(crypto, name, (List) value),
() -> "A list of one or more MAC algorithms, each supported by the worker JVM"),
ConfigDef.Importance.LOW,
- INTER_WORKER_VERIFICATION_ALGORITHMS_DOC);
+ INTER_WORKER_VERIFICATION_ALGORITHMS_DOC)
+ .define(METADATA_RECOVERY_STRATEGY_CONFIG,
+ ConfigDef.Type.STRING,
+ DEFAULT_METADATA_RECOVERY_STRATEGY,
+ ConfigDef.CaseInsensitiveValidString
+ .in(Utils.enumOptions(MetadataRecoveryStrategy.class)),
+ ConfigDef.Importance.LOW,
+ METADATA_RECOVERY_STRATEGY_DOC);
}
private final ExactlyOnceSourceSupport exactlyOnceSourceSupport;
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index 2c3537ea675..2ea83daf048 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MetadataRecoveryStrategy;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.clients.GroupRebalanceConfig;
@@ -119,7 +120,9 @@ public class WorkerGroupMember {
time,
true,
new ApiVersions(),
- logContext);
+ logContext,
+ MetadataRecoveryStrategy.forName(config.getString(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG))
+ );
this.client = new ConsumerNetworkClient(
logContext,
netClient,
diff --git a/core/src/main/java/kafka/server/NetworkUtils.java b/core/src/main/java/kafka/server/NetworkUtils.java
index 5607f2623f9..83093c19e10 100644
--- a/core/src/main/java/kafka/server/NetworkUtils.java
+++ b/core/src/main/java/kafka/server/NetworkUtils.java
@@ -18,6 +18,7 @@ package kafka.server;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ManualMetadataUpdater;
+import org.apache.kafka.clients.MetadataRecoveryStrategy;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.common.Reconfigurable;
import org.apache.kafka.common.metrics.Metrics;
@@ -84,7 +85,8 @@ public class NetworkUtils {
time,
true,
new ApiVersions(),
- logContext
+ logContext,
+ MetadataRecoveryStrategy.NONE
);
}
}
\ No newline at end of file
diff --git a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
index 6cb273f066f..a8b7c8f59b5 100644
--- a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
@@ -26,7 +26,7 @@ import joptsimple.OptionSpec
import kafka.utils.Implicits._
import kafka.utils.Logging
import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.clients.{ApiVersions, ClientDnsLookup, ClientResponse, ClientUtils, CommonClientConfigs, Metadata, NetworkClient, NodeApiVersions}
+import org.apache.kafka.clients.{ApiVersions, ClientDnsLookup, ClientResponse, ClientUtils, CommonClientConfigs, Metadata, MetadataRecoveryStrategy, NetworkClient, NodeApiVersions}
import org.apache.kafka.clients.consumer.internals.{ConsumerNetworkClient, RequestFuture}
import org.apache.kafka.common.config.ConfigDef.ValidString._
import org.apache.kafka.common.config.ConfigDef.{Importance, Type}
@@ -310,7 +310,8 @@ object BrokerApiVersionsCommand {
time,
true,
new ApiVersions,
- logContext)
+ logContext,
+ MetadataRecoveryStrategy.NONE)
val highLevelClient = new ConsumerNetworkClient(
logContext,
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 67d3f3963b9..793b39538e7 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -164,7 +164,8 @@ class ControllerChannelManager(controllerEpoch: () => Int,
time,
false,
new ApiVersions,
- logContext
+ logContext,
+ MetadataRecoveryStrategy.NONE
)
(networkClient, reconfigurableChannelBuilder)
}
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
index 068dff4cca6..44176d22763 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
@@ -96,7 +96,8 @@ object TransactionMarkerChannelManager {
time,
false,
new ApiVersions,
- logContext
+ logContext,
+ MetadataRecoveryStrategy.NONE
)
new TransactionMarkerChannelManager(config,
diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala b/core/src/main/scala/kafka/raft/RaftManager.scala
index 48646e0b4d7..65ef855640c 100644
--- a/core/src/main/scala/kafka/raft/RaftManager.scala
+++ b/core/src/main/scala/kafka/raft/RaftManager.scala
@@ -30,7 +30,7 @@ import kafka.server.KafkaConfig
import kafka.utils.CoreUtils
import kafka.utils.FileLock
import kafka.utils.Logging
-import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, NetworkClient}
+import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, MetadataRecoveryStrategy, NetworkClient}
import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.Node
import org.apache.kafka.common.TopicPartition
@@ -312,7 +312,8 @@ class KafkaRaftManager[T](
time,
discoverBrokerVersions,
apiVersions,
- logContext
+ logContext,
+ MetadataRecoveryStrategy.NONE
)
(controllerListenerName, networkClient)
diff --git a/core/src/main/scala/kafka/server/BrokerBlockingSender.scala b/core/src/main/scala/kafka/server/BrokerBlockingSender.scala
index 7d9fb0512a5..3cb692045b6 100644
--- a/core/src/main/scala/kafka/server/BrokerBlockingSender.scala
+++ b/core/src/main/scala/kafka/server/BrokerBlockingSender.scala
@@ -96,7 +96,8 @@ class BrokerBlockingSender(sourceBroker: BrokerEndPoint,
time,
false,
new ApiVersions,
- logContext
+ logContext,
+ MetadataRecoveryStrategy.NONE
)
(networkClient, reconfigurableChannelBuilder)
}
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 5b6e04e5a0e..dffd2e7f697 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -30,7 +30,7 @@ import kafka.raft.KafkaRaftManager
import kafka.server.metadata.{OffsetTrackingListener, ZkConfigRepository, ZkMetadataCache}
import kafka.utils._
import kafka.zk.{AdminZkClient, BrokerInfo, KafkaZkClient}
-import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, NetworkClient, NetworkClientUtils}
+import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, MetadataRecoveryStrategy, NetworkClient, NetworkClientUtils}
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.ApiMessageType.ListenerType
@@ -827,7 +827,8 @@ class KafkaServer(
time,
false,
new ApiVersions,
- logContext)
+ logContext,
+ MetadataRecoveryStrategy.NONE)
}
var shutdownSucceeded: Boolean = false
diff --git a/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala b/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala
index a0e4bbbc463..6ce6e9e0a48 100644
--- a/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala
@@ -191,7 +191,8 @@ class NodeToControllerChannelManagerImpl(
time,
true,
apiVersions,
- logContext
+ logContext,
+ MetadataRecoveryStrategy.NONE
)
}
val threadName = s"${threadNamePrefix}to-controller-${channelName}-channel-manager"
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientRebootstrapTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientRebootstrapTest.scala
new file mode 100644
index 00000000000..70b514f199b
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/AdminClientRebootstrapTest.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.api
+
+import org.junit.jupiter.api.Test
+
+class AdminClientRebootstrapTest extends RebootstrapTest {
+ @Test
+ def testRebootstrap(): Unit = {
+ server1.shutdown()
+ server1.awaitShutdown()
+
+ val adminClient = createAdminClient(configOverrides = clientOverrides)
+
+ // Only the server 0 is available for the admin client during the bootstrap.
+ adminClient.listTopics().names().get()
+
+ server0.shutdown()
+ server0.awaitShutdown()
+ server1.startup()
+
+ // The server 0, originally cached during the bootstrap, is offline.
+ // However, the server 1 from the bootstrap list is online.
+ // Should be able to list topics again.
+ adminClient.listTopics().names().get()
+
+ server1.shutdown()
+ server1.awaitShutdown()
+ server0.startup()
+
+ // The same situation, but the server 1 has gone and server 0 is back.
+ adminClient.listTopics().names().get()
+ }
+}
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerRebootstrapTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerRebootstrapTest.scala
new file mode 100644
index 00000000000..9979e3eb91d
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/ConsumerRebootstrapTest.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.api
+
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
+import org.junit.jupiter.api.Test
+
+import java.util.Collections
+
+class ConsumerRebootstrapTest extends RebootstrapTest {
+ @Test
+ def testRebootstrap(): Unit = {
+ sendRecords(10, 0)
+
+ TestUtils.waitUntilTrue(
+ () => server0.logManager.logsByTopic(tp.topic()).head.logEndOffset == server1.logManager.logsByTopic(tp.topic()).head.logEndOffset,
+ "Timeout waiting for records to be replicated"
+ )
+
+ server1.shutdown()
+ server1.awaitShutdown()
+
+ val consumer = createConsumer(configOverrides = clientOverrides)
+
+ // Only the server 0 is available for the consumer during the bootstrap.
+ consumer.assign(Collections.singleton(tp))
+
+ consumeAndVerifyRecords(consumer, 10, 0)
+
+ // Bring back the server 1 and shut down 0.
+ server1.startup()
+
+ TestUtils.waitUntilTrue(
+ () => server0.logManager.logsByTopic(tp.topic()).head.logEndOffset == server1.logManager.logsByTopic(tp.topic()).head.logEndOffset,
+ "Timeout waiting for records to be replicated"
+ )
+
+ server0.shutdown()
+ server0.awaitShutdown()
+ sendRecords(10, 10)
+
+ // The server 0, originally cached during the bootstrap, is offline.
+ // However, the server 1 from the bootstrap list is online.
+ // Should be able to consume records.
+ consumeAndVerifyRecords(consumer, 10, 10, startingKeyAndValueIndex = 10, startingTimestamp = 10)
+
+ // Bring back the server 0 and shut down 1.
+ server0.startup()
+
+ TestUtils.waitUntilTrue(
+ () => server0.logManager.logsByTopic(tp.topic()).head.logEndOffset == server1.logManager.logsByTopic(tp.topic()).head.logEndOffset,
+ "Timeout waiting for records to be replicated"
+ )
+
+ server1.shutdown()
+ server1.awaitShutdown()
+ sendRecords(10, 20)
+
+ // The same situation, but the server 1 has gone and server 0 is back.
+ consumeAndVerifyRecords(consumer, 10, 20, startingKeyAndValueIndex = 20, startingTimestamp = 20)
+ }
+
+ private def sendRecords(numRecords: Int, from: Int): Unit = {
+ val producer: KafkaProducer[Array[Byte], Array[Byte]] = createProducer()
+ (from until (numRecords + from)).foreach { i =>
+ val record = new ProducerRecord(tp.topic(), tp.partition(), i.toLong, s"key $i".getBytes, s"value $i".getBytes)
+ producer.send(record)
+ }
+ producer.flush()
+ producer.close()
+ }
+}
diff --git a/core/src/test/scala/integration/kafka/api/ProducerRebootstrapTest.scala b/core/src/test/scala/integration/kafka/api/ProducerRebootstrapTest.scala
new file mode 100644
index 00000000000..3cb40b6a0cf
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/ProducerRebootstrapTest.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.api
+
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+
+class ProducerRebootstrapTest extends RebootstrapTest {
+ @Test
+ def testRebootstrap(): Unit = {
+ server1.shutdown()
+ server1.awaitShutdown()
+
+ val producer = createProducer(configOverrides = clientOverrides)
+
+ // Only the server 0 is available for the producer during the bootstrap.
+ producer.send(new ProducerRecord(topic, part, "key 0".getBytes, "value 0".getBytes)).get()
+
+ server0.shutdown()
+ server0.awaitShutdown()
+ server1.startup()
+
+ // The server 0, originally cached during the bootstrap, is offline.
+ // However, the server 1 from the bootstrap list is online.
+ // Should be able to produce records.
+ val recordMetadata1 = producer.send(new ProducerRecord(topic, part, "key 1".getBytes, "value 1".getBytes)).get()
+ assertEquals(0, recordMetadata1.offset())
+
+ server1.shutdown()
+ server1.awaitShutdown()
+ server0.startup()
+
+ // The same situation, but the server 1 has gone and server 0 is back.
+ val recordMetadata2 = producer.send(new ProducerRecord(topic, part, "key 1".getBytes, "value 1".getBytes)).get()
+ assertEquals(1, recordMetadata2.offset())
+ }
+}
diff --git a/core/src/test/scala/integration/kafka/api/RebootstrapTest.scala b/core/src/test/scala/integration/kafka/api/RebootstrapTest.scala
new file mode 100644
index 00000000000..b3b044ebcdb
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/RebootstrapTest.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.api
+
+import kafka.server.{KafkaConfig, KafkaServer}
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.config.TopicConfig
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
+
+import java.util.Properties
+
+abstract class RebootstrapTest extends AbstractConsumerTest {
+ override def brokerCount: Int = 2
+
+ def server0: KafkaServer = serverForId(0).get
+ def server1: KafkaServer = serverForId(1).get
+
+ override def generateConfigs: Seq[KafkaConfig] = {
+ val overridingProps = new Properties()
+ overridingProps.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, brokerCount.toString)
+ overridingProps.put(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "true")
+
+ // In this test, fixed ports are necessary, because brokers must have the
+ // same port after the restart.
+ FixedPortTestUtils.createBrokerConfigs(brokerCount, zkConnect, enableControlledShutdown = false)
+ .map(KafkaConfig.fromProps(_, overridingProps))
+ }
+
+ def clientOverrides: Properties = {
+ val overrides = new Properties()
+ overrides.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG, "5000")
+ overrides.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG, "5000")
+ overrides.put(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG, "1000")
+ overrides.put(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG, "1000")
+ overrides.put(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG, "rebootstrap")
+ overrides
+ }
+}
diff --git a/tools/src/main/java/org/apache/kafka/tools/ReplicaVerificationTool.java b/tools/src/main/java/org/apache/kafka/tools/ReplicaVerificationTool.java
index c3e1cc1f7a0..970e695ba92 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ReplicaVerificationTool.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ReplicaVerificationTool.java
@@ -23,6 +23,7 @@ import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.ManualMetadataUpdater;
+import org.apache.kafka.clients.MetadataRecoveryStrategy;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.NetworkClientUtils;
import org.apache.kafka.clients.admin.Admin;
@@ -706,7 +707,8 @@ public class ReplicaVerificationTool {
time,
false,
new ApiVersions(),
- logContext
+ logContext,
+ MetadataRecoveryStrategy.forName(consumerConfig.getString(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG))
);
}
diff --git a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java
index 6cd367c06d9..5afd9db26c3 100644
--- a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java
+++ b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.JsonNode;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.ManualMetadataUpdater;
+import org.apache.kafka.clients.MetadataRecoveryStrategy;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.NetworkClientUtils;
import org.apache.kafka.clients.admin.Admin;
@@ -179,7 +180,8 @@ public class ConnectionStressWorker implements TaskWorker {
TIME,
false,
new ApiVersions(),
- logContext)) {
+ logContext,
+ MetadataRecoveryStrategy.NONE)) {
NetworkClientUtils.awaitReady(client, targetNode, TIME, 500);
}
}