KAFKA-8206: KIP-899: Allow client to rebootstrap (#13277)

This commit implements KIP-899: Allow producer and consumer clients to rebootstrap. It introduces the new setting `metadata.recovery.strategy`, applicable to all the types of clients.

Reviewers: Greg Harris <gharris1727@gmail.com>, Rajini Sivaram <rajinisivaram@googlemail.com>
This commit is contained in:
Ivan Yurchenko 2024-06-12 22:48:32 +03:00 committed by rajinisivaram
parent 0e7134c105
commit 62fb6a3ef1
40 changed files with 666 additions and 72 deletions

View File

@ -245,7 +245,9 @@ public final class ClientUtils {
throttleTimeSensor,
logContext,
hostResolver,
clientTelemetrySender);
clientTelemetrySender,
MetadataRecoveryStrategy.forName(config.getString(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG))
);
} catch (Throwable t) {
closeQuietly(selector, "Selector");
closeQuietly(channelBuilder, "ChannelBuilder");

View File

@ -219,6 +219,19 @@ public class CommonClientConfigs {
public static final String DEFAULT_API_TIMEOUT_MS_DOC = "Specifies the timeout (in milliseconds) for client APIs. " +
"This configuration is used as the default timeout for all client operations that do not specify a <code>timeout</code> parameter.";
public static final String METADATA_RECOVERY_STRATEGY_CONFIG = "metadata.recovery.strategy";
public static final String METADATA_RECOVERY_STRATEGY_DOC = "Controls how the client recovers when none of the brokers known to it is available. " +
"If set to <code>none</code>, the client fails. If set to <code>rebootstrap</code>, " +
"the client repeats the bootstrap process using <code>bootstrap.servers</code>. " +
"Rebootstrapping is useful when a client communicates with brokers so infrequently " +
"that the set of brokers may change entirely before the client refreshes metadata. " +
"Metadata recovery is triggered when all last-known brokers appear unavailable simultaneously. " +
"Brokers appear unavailable when disconnected and no current retry attempt is in-progress. " +
"Consider increasing <code>reconnect.backoff.ms</code> and <code>reconnect.backoff.max.ms</code> and " +
"decreasing <code>socket.connection.setup.timeout.ms</code> and <code>socket.connection.setup.timeout.max.ms</code> " +
"for the client.";
public static final String DEFAULT_METADATA_RECOVERY_STRATEGY = MetadataRecoveryStrategy.NONE.name;
/**
* Postprocess the configuration so that exponential backoff is disabled when reconnect backoff
* is explicitly configured but the maximum reconnect backoff is not explicitly configured.

View File

@ -130,7 +130,7 @@ public interface KafkaClient extends Closeable {
* @param now The current time in ms
* @return The node with the fewest in-flight requests.
*/
Node leastLoadedNode(long now);
LeastLoadedNode leastLoadedNode(long now);
/**
* The number of currently in-flight requests for which we have not yet returned a response

View File

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients;
import org.apache.kafka.common.Node;
public class LeastLoadedNode {
private final Node node;
private final boolean atLeastOneConnectionReady;
public LeastLoadedNode(Node node, boolean atLeastOneConnectionReady) {
this.node = node;
this.atLeastOneConnectionReady = atLeastOneConnectionReady;
}
public Node node() {
return node;
}
/**
* Indicates if the least loaded node is available or at least a ready connection exists.
*
* <p>There may be no node available while ready connections to live nodes exist. This may happen when
* the connections are overloaded with in-flight requests. This function takes this into account.
*/
public boolean hasNodeAvailableOrConnectionReady() {
return node != null || atLeastOneConnectionReady;
}
}

View File

@ -82,6 +82,8 @@ public class Metadata implements Closeable {
private final ClusterResourceListeners clusterResourceListeners;
private boolean isClosed;
private final Map<TopicPartition, Integer> lastSeenLeaderEpochs;
/** Addresses with which the metadata was originally bootstrapped. */
private List<InetSocketAddress> bootstrapAddresses;
/**
* Create a new Metadata instance
@ -304,6 +306,12 @@ public class Metadata implements Closeable {
this.needFullUpdate = true;
this.updateVersion += 1;
this.metadataSnapshot = MetadataSnapshot.bootstrap(addresses);
this.bootstrapAddresses = addresses;
}
public synchronized void rebootstrap() {
log.info("Rebootstrapping with {}", this.bootstrapAddresses);
this.bootstrap(this.bootstrapAddresses);
}
/**

View File

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients;
import java.util.Locale;
/**
* Defines the strategies which clients can follow to deal with the situation when none of the known nodes is available.
*/
public enum MetadataRecoveryStrategy {
NONE("none"),
REBOOTSTRAP("rebootstrap");
public final String name;
MetadataRecoveryStrategy(String name) {
this.name = name;
}
public static MetadataRecoveryStrategy forName(String name) {
if (name == null) {
throw new IllegalArgumentException("Illegal MetadataRecoveryStrategy: null");
}
try {
return MetadataRecoveryStrategy.valueOf(name.toUpperCase(Locale.ROOT));
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Illegal MetadataRecoveryStrategy: " + name);
}
}
}

View File

@ -114,6 +114,8 @@ public class NetworkClient implements KafkaClient {
/* time in ms to wait before retrying to create connection to a server */
private final long reconnectBackoffMs;
private final MetadataRecoveryStrategy metadataRecoveryStrategy;
private final Time time;
/**
@ -147,7 +149,8 @@ public class NetworkClient implements KafkaClient {
Time time,
boolean discoverBrokerVersions,
ApiVersions apiVersions,
LogContext logContext) {
LogContext logContext,
MetadataRecoveryStrategy metadataRecoveryStrategy) {
this(selector,
metadata,
clientId,
@ -163,7 +166,8 @@ public class NetworkClient implements KafkaClient {
discoverBrokerVersions,
apiVersions,
null,
logContext);
logContext,
metadataRecoveryStrategy);
}
public NetworkClient(Selectable selector,
@ -181,7 +185,8 @@ public class NetworkClient implements KafkaClient {
boolean discoverBrokerVersions,
ApiVersions apiVersions,
Sensor throttleTimeSensor,
LogContext logContext) {
LogContext logContext,
MetadataRecoveryStrategy metadataRecoveryStrategy) {
this(null,
metadata,
selector,
@ -200,7 +205,8 @@ public class NetworkClient implements KafkaClient {
throttleTimeSensor,
logContext,
new DefaultHostResolver(),
null);
null,
metadataRecoveryStrategy);
}
public NetworkClient(Selectable selector,
@ -217,7 +223,8 @@ public class NetworkClient implements KafkaClient {
Time time,
boolean discoverBrokerVersions,
ApiVersions apiVersions,
LogContext logContext) {
LogContext logContext,
MetadataRecoveryStrategy metadataRecoveryStrategy) {
this(metadataUpdater,
null,
selector,
@ -236,7 +243,8 @@ public class NetworkClient implements KafkaClient {
null,
logContext,
new DefaultHostResolver(),
null);
null,
metadataRecoveryStrategy);
}
public NetworkClient(MetadataUpdater metadataUpdater,
@ -257,7 +265,8 @@ public class NetworkClient implements KafkaClient {
Sensor throttleTimeSensor,
LogContext logContext,
HostResolver hostResolver,
ClientTelemetrySender clientTelemetrySender) {
ClientTelemetrySender clientTelemetrySender,
MetadataRecoveryStrategy metadataRecoveryStrategy) {
/* It would be better if we could pass `DefaultMetadataUpdater` from the public constructor, but it's not
* possible because `DefaultMetadataUpdater` is an inner class and it can only be instantiated after the
* super constructor is invoked.
@ -288,6 +297,7 @@ public class NetworkClient implements KafkaClient {
this.log = logContext.logger(NetworkClient.class);
this.state = new AtomicReference<>(State.ACTIVE);
this.telemetrySender = (clientTelemetrySender != null) ? new TelemetrySender(clientTelemetrySender) : null;
this.metadataRecoveryStrategy = metadataRecoveryStrategy;
}
/**
@ -695,7 +705,7 @@ public class NetworkClient implements KafkaClient {
* @return The node with the fewest in-flight requests.
*/
@Override
public Node leastLoadedNode(long now) {
public LeastLoadedNode leastLoadedNode(long now) {
List<Node> nodes = this.metadataUpdater.fetchNodes();
if (nodes.isEmpty())
throw new IllegalStateException("There are no nodes in the Kafka cluster");
@ -705,16 +715,25 @@ public class NetworkClient implements KafkaClient {
Node foundCanConnect = null;
Node foundReady = null;
boolean atLeastOneConnectionReady = false;
int offset = this.randOffset.nextInt(nodes.size());
for (int i = 0; i < nodes.size(); i++) {
int idx = (offset + i) % nodes.size();
Node node = nodes.get(idx);
if (!atLeastOneConnectionReady
&& connectionStates.isReady(node.idString(), now)
&& selector.isChannelReady(node.idString())) {
atLeastOneConnectionReady = true;
}
if (canSendRequest(node.idString(), now)) {
int currInflight = this.inFlightRequests.count(node.idString());
if (currInflight == 0) {
// if we find an established connection with no in-flight requests we can stop right away
log.trace("Found least loaded node {} connected with no in-flight requests", node);
return node;
return new LeastLoadedNode(node, true);
} else if (currInflight < inflight) {
// otherwise if this is the best we have found so far, record that
inflight = currInflight;
@ -738,16 +757,16 @@ public class NetworkClient implements KafkaClient {
// which are being established before connecting to new nodes.
if (foundReady != null) {
log.trace("Found least loaded node {} with {} inflight requests", foundReady, inflight);
return foundReady;
return new LeastLoadedNode(foundReady, atLeastOneConnectionReady);
} else if (foundConnecting != null) {
log.trace("Found least loaded connecting node {}", foundConnecting);
return foundConnecting;
return new LeastLoadedNode(foundConnecting, atLeastOneConnectionReady);
} else if (foundCanConnect != null) {
log.trace("Found least loaded node {} with no active connection", foundCanConnect);
return foundCanConnect;
return new LeastLoadedNode(foundCanConnect, atLeastOneConnectionReady);
} else {
log.trace("Least loaded node selection failed to find an available node");
return null;
return new LeastLoadedNode(null, atLeastOneConnectionReady);
}
}
@ -1122,13 +1141,22 @@ public class NetworkClient implements KafkaClient {
// Beware that the behavior of this method and the computation of timeouts for poll() are
// highly dependent on the behavior of leastLoadedNode.
Node node = leastLoadedNode(now);
if (node == null) {
LeastLoadedNode leastLoadedNode = leastLoadedNode(now);
// Rebootstrap if needed and configured.
if (metadataRecoveryStrategy == MetadataRecoveryStrategy.REBOOTSTRAP
&& !leastLoadedNode.hasNodeAvailableOrConnectionReady()) {
metadata.rebootstrap();
leastLoadedNode = leastLoadedNode(now);
}
if (leastLoadedNode.node() == null) {
log.debug("Give up sending metadata request since no node is available");
return reconnectBackoffMs;
}
return maybeUpdate(now, node);
return maybeUpdate(now, leastLoadedNode.node());
}
@Override
@ -1266,7 +1294,7 @@ public class NetworkClient implements KafkaClient {
// Per KIP-714, let's continue to re-use the same broker for as long as possible.
if (stickyNode == null) {
stickyNode = leastLoadedNode(now);
stickyNode = leastLoadedNode(now).node();
if (stickyNode == null) {
log.debug("Give up sending telemetry request since no node is available");
return reconnectBackoffMs;

View File

@ -19,6 +19,7 @@ package org.apache.kafka.clients.admin;
import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.MetadataRecoveryStrategy;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
@ -139,6 +140,10 @@ public class AdminClientConfig extends AbstractConfig {
public static final String RETRIES_CONFIG = CommonClientConfigs.RETRIES_CONFIG;
public static final String DEFAULT_API_TIMEOUT_MS_CONFIG = CommonClientConfigs.DEFAULT_API_TIMEOUT_MS_CONFIG;
public static final String METADATA_RECOVERY_STRATEGY_CONFIG = CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG;
public static final String METADATA_RECOVERY_STRATEGY_DOC = CommonClientConfigs.METADATA_RECOVERY_STRATEGY_DOC;
public static final String DEFAULT_METADATA_RECOVERY_STRATEGY = CommonClientConfigs.DEFAULT_METADATA_RECOVERY_STRATEGY;
/**
* <code>security.providers</code>
*/
@ -262,7 +267,14 @@ public class AdminClientConfig extends AbstractConfig {
Importance.MEDIUM,
SECURITY_PROTOCOL_DOC)
.withClientSslSupport()
.withClientSaslSupport();
.withClientSaslSupport()
.define(METADATA_RECOVERY_STRATEGY_CONFIG,
Type.STRING,
DEFAULT_METADATA_RECOVERY_STRATEGY,
ConfigDef.CaseInsensitiveValidString
.in(Utils.enumOptions(MetadataRecoveryStrategy.class)),
Importance.LOW,
METADATA_RECOVERY_STRATEGY_DOC);
}
@Override

View File

@ -25,6 +25,8 @@ import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.DefaultHostResolver;
import org.apache.kafka.clients.HostResolver;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.LeastLoadedNode;
import org.apache.kafka.clients.MetadataRecoveryStrategy;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.StaleMetadataException;
import org.apache.kafka.clients.admin.CreateTopicsResult.TopicMetadataAndConfig;
@ -399,6 +401,7 @@ public class KafkaAdminClient extends AdminClient {
private final long retryBackoffMaxMs;
private final ExponentialBackoff retryBackoff;
private final boolean clientTelemetryEnabled;
private final MetadataRecoveryStrategy metadataRecoveryStrategy;
/**
* The telemetry requests client instance id.
@ -612,6 +615,7 @@ public class KafkaAdminClient extends AdminClient {
retryBackoffMaxMs,
CommonClientConfigs.RETRY_BACKOFF_JITTER);
this.clientTelemetryEnabled = config.getBoolean(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG);
this.metadataRecoveryStrategy = MetadataRecoveryStrategy.forName(config.getString(AdminClientConfig.METADATA_RECOVERY_STRATEGY_CONFIG));
config.logUnused();
AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds());
log.debug("Kafka admin client initialized");
@ -698,7 +702,13 @@ public class KafkaAdminClient extends AdminClient {
private class MetadataUpdateNodeIdProvider implements NodeProvider {
@Override
public Node provide() {
return client.leastLoadedNode(time.milliseconds());
LeastLoadedNode leastLoadedNode = client.leastLoadedNode(time.milliseconds());
if (metadataRecoveryStrategy == MetadataRecoveryStrategy.REBOOTSTRAP
&& !leastLoadedNode.hasNodeAvailableOrConnectionReady()) {
metadataManager.rebootstrap(time.milliseconds());
}
return leastLoadedNode.node();
}
@Override
@ -780,7 +790,7 @@ public class KafkaAdminClient extends AdminClient {
if (metadataManager.isReady()) {
// This may return null if all nodes are busy.
// In that case, we will postpone node assignment.
return client.leastLoadedNode(time.milliseconds());
return client.leastLoadedNode(time.milliseconds()).node();
}
metadataManager.requestUpdate();
return null;
@ -835,7 +845,7 @@ public class KafkaAdminClient extends AdminClient {
} else {
// This may return null if all nodes are busy.
// In that case, we will postpone node assignment.
return client.leastLoadedNode(time.milliseconds());
return client.leastLoadedNode(time.milliseconds()).node();
}
}
metadataManager.requestUpdate();

View File

@ -92,6 +92,11 @@ public class AdminMetadataManager {
*/
private ApiException fatalException = null;
/**
* The cluster with which the metadata was bootstrapped.
*/
private Cluster bootstrapCluster;
public class AdminMetadataUpdater implements MetadataUpdater {
@Override
public List<Node> fetchNodes() {
@ -275,6 +280,7 @@ public class AdminMetadataManager {
public void update(Cluster cluster, long now) {
if (cluster.isBootstrapConfigured()) {
log.debug("Setting bootstrap cluster metadata {}.", cluster);
bootstrapCluster = cluster;
} else {
log.debug("Updating cluster metadata to {}", cluster);
this.lastMetadataUpdateMs = now;
@ -287,4 +293,12 @@ public class AdminMetadataManager {
this.cluster = cluster;
}
}
/**
* Rebootstrap metadata with the cluster previously used for bootstrapping.
*/
public void rebootstrap(long now) {
log.info("Rebootstrapping with {}", this.bootstrapCluster);
update(bootstrapCluster, now);
}
}

View File

@ -18,6 +18,7 @@ package org.apache.kafka.clients.consumer;
import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.MetadataRecoveryStrategy;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
@ -656,7 +657,14 @@ public class ConsumerConfig extends AbstractConfig {
Importance.MEDIUM,
CommonClientConfigs.SECURITY_PROTOCOL_DOC)
.withClientSslSupport()
.withClientSaslSupport();
.withClientSaslSupport()
.define(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG,
Type.STRING,
CommonClientConfigs.DEFAULT_METADATA_RECOVERY_STRATEGY,
ConfigDef.CaseInsensitiveValidString
.in(Utils.enumOptions(MetadataRecoveryStrategy.class)),
Importance.LOW,
CommonClientConfigs.METADATA_RECOVERY_STRATEGY_DOC);
}
@Override

View File

@ -139,7 +139,7 @@ public class ConsumerNetworkClient implements Closeable {
public Node leastLoadedNode() {
lock.lock();
try {
return client.leastLoadedNode(time.milliseconds());
return client.leastLoadedNode(time.milliseconds()).node();
} finally {
lock.unlock();
}

View File

@ -163,7 +163,7 @@ public class NetworkClientDelegate implements AutoCloseable {
}
boolean doSend(final UnsentRequest r, final long currentTimeMs) {
Node node = r.node.orElse(client.leastLoadedNode(currentTimeMs));
Node node = r.node.orElse(client.leastLoadedNode(currentTimeMs).node());
if (node == null || nodeUnavailable(node)) {
log.debug("No broker available to send the request: {}. Retrying.", r);
return false;
@ -208,7 +208,7 @@ public class NetworkClientDelegate implements AutoCloseable {
}
public Node leastLoadedNode() {
return this.client.leastLoadedNode(time.milliseconds());
return this.client.leastLoadedNode(time.milliseconds()).node();
}
public void wakeup() {

View File

@ -18,6 +18,7 @@ package org.apache.kafka.clients.producer;
import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.MetadataRecoveryStrategy;
import org.apache.kafka.common.compress.GzipCompression;
import org.apache.kafka.common.compress.Lz4Compression;
import org.apache.kafka.common.compress.ZstdCompression;
@ -528,7 +529,14 @@ public class ProducerConfig extends AbstractConfig {
null,
new ConfigDef.NonEmptyString(),
Importance.LOW,
TRANSACTIONAL_ID_DOC);
TRANSACTIONAL_ID_DOC)
.define(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG,
Type.STRING,
CommonClientConfigs.DEFAULT_METADATA_RECOVERY_STRATEGY,
ConfigDef.CaseInsensitiveValidString
.in(Utils.enumOptions(MetadataRecoveryStrategy.class)),
Importance.LOW,
CommonClientConfigs.METADATA_RECOVERY_STRATEGY_DOC);
}
@Override

View File

@ -484,7 +484,7 @@ public class Sender implements Runnable {
FindCoordinatorRequest.CoordinatorType coordinatorType = nextRequestHandler.coordinatorType();
targetNode = coordinatorType != null ?
transactionManager.coordinator(coordinatorType) :
client.leastLoadedNode(time.milliseconds());
client.leastLoadedNode(time.milliseconds()).node();
if (targetNode != null) {
if (!awaitNodeReady(targetNode, coordinatorType)) {
log.trace("Target node {} not ready within request timeout, will retry when node is ready.", targetNode);

View File

@ -319,7 +319,7 @@ public class MockClient implements KafkaClient {
checkTimeoutOfPendingRequests(now);
// We skip metadata updates if all nodes are currently blacked out
if (metadataUpdater.isUpdateNeeded() && leastLoadedNode(now) != null) {
if (metadataUpdater.isUpdateNeeded() && leastLoadedNode(now).node() != null) {
MetadataUpdate metadataUpdate = metadataUpdates.poll();
if (metadataUpdate != null) {
metadataUpdater.update(time, metadataUpdate);
@ -588,13 +588,13 @@ public class MockClient implements KafkaClient {
}
@Override
public Node leastLoadedNode(long now) {
public LeastLoadedNode leastLoadedNode(long now) {
// Consistent with NetworkClient, we do not return nodes awaiting reconnect backoff
for (Node node : metadataUpdater.fetchNodes()) {
if (!connectionState(node.idString()).isBackingOff(now))
return node;
return new LeastLoadedNode(node, true);
}
return null;
return new LeastLoadedNode(null, false);
}
public void setWakeupHook(Runnable wakeupHook) {

View File

@ -128,7 +128,16 @@ public class NetworkClientTest {
private NetworkClient createNetworkClient(long reconnectBackoffMaxMs) {
return new NetworkClient(selector, metadataUpdater, "mock", Integer.MAX_VALUE,
reconnectBackoffMsTest, reconnectBackoffMaxMs, 64 * 1024, 64 * 1024,
defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest, time, true, new ApiVersions(), new LogContext());
defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest, time, true, new ApiVersions(), new LogContext(),
MetadataRecoveryStrategy.NONE);
}
private NetworkClient createNetworkClientWithMaxInFlightRequestsPerConnection(
int maxInFlightRequestsPerConnection, long reconnectBackoffMaxMs) {
return new NetworkClient(selector, metadataUpdater, "mock", maxInFlightRequestsPerConnection,
reconnectBackoffMsTest, reconnectBackoffMaxMs, 64 * 1024, 64 * 1024,
defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest, time, true, new ApiVersions(), new LogContext(),
MetadataRecoveryStrategy.NONE);
}
private NetworkClient createNetworkClientWithMultipleNodes(long reconnectBackoffMaxMs, long connectionSetupTimeoutMsTest, int nodeNumber) {
@ -136,26 +145,30 @@ public class NetworkClientTest {
TestMetadataUpdater metadataUpdater = new TestMetadataUpdater(nodes);
return new NetworkClient(selector, metadataUpdater, "mock", Integer.MAX_VALUE,
reconnectBackoffMsTest, reconnectBackoffMaxMs, 64 * 1024, 64 * 1024,
defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest, time, true, new ApiVersions(), new LogContext());
defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest, time, true, new ApiVersions(), new LogContext(),
MetadataRecoveryStrategy.NONE);
}
private NetworkClient createNetworkClientWithStaticNodes() {
return new NetworkClient(selector, metadataUpdater,
"mock-static", Integer.MAX_VALUE, 0, 0, 64 * 1024, 64 * 1024, defaultRequestTimeoutMs,
connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest, time, true, new ApiVersions(), new LogContext());
connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest, time, true, new ApiVersions(), new LogContext(),
MetadataRecoveryStrategy.NONE);
}
private NetworkClient createNetworkClientWithNoVersionDiscovery(Metadata metadata) {
return new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE,
reconnectBackoffMsTest, 0, 64 * 1024, 64 * 1024,
defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest, time, false, new ApiVersions(), new LogContext());
defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest, time, false, new ApiVersions(), new LogContext(),
MetadataRecoveryStrategy.NONE);
}
private NetworkClient createNetworkClientWithNoVersionDiscovery() {
return new NetworkClient(selector, metadataUpdater, "mock", Integer.MAX_VALUE,
reconnectBackoffMsTest, reconnectBackoffMaxMsTest,
64 * 1024, 64 * 1024, defaultRequestTimeoutMs,
connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest, time, false, new ApiVersions(), new LogContext());
connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest, time, false, new ApiVersions(), new LogContext(),
MetadataRecoveryStrategy.NONE);
}
@BeforeEach
@ -698,14 +711,18 @@ public class NetworkClientTest {
public void testLeastLoadedNode() {
client.ready(node, time.milliseconds());
assertFalse(client.isReady(node, time.milliseconds()));
assertEquals(node, client.leastLoadedNode(time.milliseconds()));
LeastLoadedNode leastLoadedNode = client.leastLoadedNode(time.milliseconds());
assertEquals(node, leastLoadedNode.node());
assertTrue(leastLoadedNode.hasNodeAvailableOrConnectionReady());
awaitReady(client, node);
client.poll(1, time.milliseconds());
assertTrue(client.isReady(node, time.milliseconds()), "The client should be ready");
// leastloadednode should be our single node
Node leastNode = client.leastLoadedNode(time.milliseconds());
leastLoadedNode = client.leastLoadedNode(time.milliseconds());
assertTrue(leastLoadedNode.hasNodeAvailableOrConnectionReady());
Node leastNode = leastLoadedNode.node();
assertEquals(leastNode.id(), node.id(), "There should be one leastloadednode");
// sleep for longer than reconnect backoff
@ -716,8 +733,29 @@ public class NetworkClientTest {
client.poll(1, time.milliseconds());
assertFalse(client.ready(node, time.milliseconds()), "After we forced the disconnection the client is no longer ready.");
leastNode = client.leastLoadedNode(time.milliseconds());
assertNull(leastNode, "There should be NO leastloadednode");
leastLoadedNode = client.leastLoadedNode(time.milliseconds());
assertFalse(leastLoadedNode.hasNodeAvailableOrConnectionReady());
assertNull(leastLoadedNode.node(), "There should be NO leastloadednode");
}
@Test
public void testHasNodeAvailableOrConnectionReady() {
NetworkClient client = createNetworkClientWithMaxInFlightRequestsPerConnection(1, reconnectBackoffMaxMsTest);
awaitReady(client, node);
long now = time.milliseconds();
LeastLoadedNode leastLoadedNode = client.leastLoadedNode(now);
assertEquals(node, leastLoadedNode.node());
assertTrue(leastLoadedNode.hasNodeAvailableOrConnectionReady());
MetadataRequest.Builder builder = new MetadataRequest.Builder(Collections.emptyList(), true);
ClientRequest request = client.newClientRequest(node.idString(), builder, now, true);
client.send(request, now);
client.poll(defaultRequestTimeoutMs, now);
leastLoadedNode = client.leastLoadedNode(now);
assertNull(leastLoadedNode.node());
assertTrue(leastLoadedNode.hasNodeAvailableOrConnectionReady());
}
@Test
@ -727,7 +765,7 @@ public class NetworkClientTest {
Set<Node> providedNodeIds = new HashSet<>();
for (int i = 0; i < nodeNumber * 10; i++) {
Node node = client.leastLoadedNode(time.milliseconds());
Node node = client.leastLoadedNode(time.milliseconds()).node();
assertNotNull(node, "Should provide a node");
providedNodeIds.add(node);
client.ready(node, time.milliseconds());
@ -800,7 +838,7 @@ public class NetworkClientTest {
client.poll(1, time.milliseconds());
// leastloadednode should return null since the node is throttled
assertNull(client.leastLoadedNode(time.milliseconds()));
assertNull(client.leastLoadedNode(time.milliseconds()).node());
}
@Test
@ -1046,7 +1084,8 @@ public class NetworkClientTest {
NetworkClient client = new NetworkClient(metadataUpdater, null, selector, "mock", Integer.MAX_VALUE,
reconnectBackoffMsTest, reconnectBackoffMaxMsTest, 64 * 1024, 64 * 1024,
defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest,
time, false, new ApiVersions(), null, new LogContext(), mockHostResolver, mockClientTelemetrySender);
time, false, new ApiVersions(), null, new LogContext(), mockHostResolver, mockClientTelemetrySender,
MetadataRecoveryStrategy.NONE);
// Connect to one the initial addresses, then change the addresses and disconnect
client.ready(node, time.milliseconds());
@ -1106,7 +1145,8 @@ public class NetworkClientTest {
NetworkClient client = new NetworkClient(metadataUpdater, null, selector, "mock", Integer.MAX_VALUE,
reconnectBackoffMsTest, reconnectBackoffMaxMsTest, 64 * 1024, 64 * 1024,
defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest,
time, false, new ApiVersions(), null, new LogContext(), mockHostResolver, mockClientTelemetrySender);
time, false, new ApiVersions(), null, new LogContext(), mockHostResolver, mockClientTelemetrySender,
MetadataRecoveryStrategy.NONE);
// First connection attempt should fail
client.ready(node, time.milliseconds());
@ -1158,7 +1198,8 @@ public class NetworkClientTest {
NetworkClient client = new NetworkClient(metadataUpdater, null, selector, "mock", Integer.MAX_VALUE,
reconnectBackoffMsTest, reconnectBackoffMaxMsTest, 64 * 1024, 64 * 1024,
defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest,
time, false, new ApiVersions(), null, new LogContext(), mockHostResolver, mockClientTelemetrySender);
time, false, new ApiVersions(), null, new LogContext(), mockHostResolver, mockClientTelemetrySender,
MetadataRecoveryStrategy.NONE);
// Connect to one the initial addresses, then change the addresses and disconnect
client.ready(node, time.milliseconds());
@ -1266,7 +1307,8 @@ public class NetworkClientTest {
NetworkClient client = new NetworkClient(metadataUpdater, null, selector, "mock", Integer.MAX_VALUE,
reconnectBackoffMsTest, reconnectBackoffMaxMsTest, 64 * 1024, 64 * 1024,
defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest,
time, true, new ApiVersions(), null, new LogContext(), new DefaultHostResolver(), mockClientTelemetrySender);
time, true, new ApiVersions(), null, new LogContext(), new DefaultHostResolver(), mockClientTelemetrySender,
MetadataRecoveryStrategy.NONE);
// Send the ApiVersionsRequest
client.ready(node, time.milliseconds());

View File

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.MetadataRecoveryStrategy;
import org.apache.kafka.common.config.ConfigException;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class AdminClientConfigTest {
@Test
public void testDefaultMetadataRecoveryStrategy() {
Map<String, Object> configs = new HashMap<>();
final AdminClientConfig adminClientConfig = new AdminClientConfig(configs);
assertEquals(MetadataRecoveryStrategy.NONE.name, adminClientConfig.getString(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG));
}
@Test
public void testInvalidMetadataRecoveryStrategy() {
Map<String, Object> configs = new HashMap<>();
configs.put(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG, "abc");
ConfigException ce = assertThrows(ConfigException.class, () -> new AdminClientConfig(configs));
assertTrue(ce.getMessage().contains(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG));
}
}

View File

@ -17,6 +17,7 @@
package org.apache.kafka.clients.consumer;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.MetadataRecoveryStrategy;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.security.auth.SecurityProtocol;
@ -191,6 +192,25 @@ public class ConsumerConfigTest {
assertEquals(remoteAssignorName, consumerConfig.getString(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG));
}
@Test
public void testDefaultMetadataRecoveryStrategy() {
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClass);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass);
final ConsumerConfig consumerConfig = new ConsumerConfig(configs);
assertEquals(MetadataRecoveryStrategy.NONE.name, consumerConfig.getString(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG));
}
@Test
public void testInvalidMetadataRecoveryStrategy() {
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClass);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass);
configs.put(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG, "abc");
ConfigException ce = assertThrows(ConfigException.class, () -> new ConsumerConfig(configs));
assertTrue(ce.getMessage().contains(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG));
}
@ParameterizedTest
@CsvSource({"consumer, true", "classic, true", "Consumer, true", "Classic, true", "invalid, false"})
public void testProtocolConfigValidation(String protocol, boolean isValid) {

View File

@ -21,6 +21,7 @@ import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MetadataRecoveryStrategy;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.NodeApiVersions;
@ -1909,7 +1910,8 @@ public class FetchRequestManagerTest {
Node node = cluster.nodes().get(0);
NetworkClient client = new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE,
1000, 1000, 64 * 1024, 64 * 1024, 1000, 10 * 1000, 127 * 1000,
time, true, new ApiVersions(), metricsManager.throttleTimeSensor(), new LogContext());
time, true, new ApiVersions(), metricsManager.throttleTimeSensor(), new LogContext(),
MetadataRecoveryStrategy.NONE);
ApiVersionsResponse apiVersionsResponse = TestUtils.defaultApiVersionsResponse(
400, ApiMessageType.ListenerType.ZK_BROKER);

View File

@ -21,6 +21,7 @@ import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.FetchSessionHandler;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MetadataRecoveryStrategy;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.NodeApiVersions;
@ -1905,7 +1906,8 @@ public class FetcherTest {
Node node = cluster.nodes().get(0);
NetworkClient client = new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE,
1000, 1000, 64 * 1024, 64 * 1024, 1000, 10 * 1000, 127 * 1000,
time, true, new ApiVersions(), metricsManager.throttleTimeSensor(), new LogContext());
time, true, new ApiVersions(), metricsManager.throttleTimeSensor(), new LogContext(),
MetadataRecoveryStrategy.NONE);
ApiVersionsResponse apiVersionsResponse = TestUtils.defaultApiVersionsResponse(
400, ApiMessageType.ListenerType.ZK_BROKER);

View File

@ -18,6 +18,7 @@ package org.apache.kafka.clients.producer;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.LeastLoadedNode;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.consumer.ConsumerConfig;
@ -735,8 +736,8 @@ public class KafkaProducerTest {
// let mockClient#leastLoadedNode return the node directly so that we can isolate Metadata calls from KafkaProducer for idempotent producer
MockClient mockClient = new MockClient(Time.SYSTEM, metadata) {
@Override
public Node leastLoadedNode(long now) {
return NODE;
public LeastLoadedNode leastLoadedNode(long now) {
return new LeastLoadedNode(NODE, true);
}
};

View File

@ -17,6 +17,7 @@
package org.apache.kafka.clients.producer;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.MetadataRecoveryStrategy;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.ByteArraySerializer;
@ -98,6 +99,25 @@ public class ProducerConfigTest {
assertTrue(ce.getMessage().contains(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
}
@Test
public void testDefaultMetadataRecoveryStrategy() {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass);
final ProducerConfig producerConfig = new ProducerConfig(configs);
assertEquals(MetadataRecoveryStrategy.NONE.name, producerConfig.getString(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG));
}
@Test
public void testInvalidMetadataRecoveryStrategy() {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass);
configs.put(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG, "abc");
ConfigException ce = assertThrows(ConfigException.class, () -> new ProducerConfig(configs));
assertTrue(ce.getMessage().contains(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG));
}
@Test
public void testCaseInsensitiveSecurityProtocol() {
final String saslSslLowerCase = SecurityProtocol.SASL_SSL.name.toLowerCase(Locale.ROOT);

View File

@ -19,7 +19,9 @@ package org.apache.kafka.clients.producer.internals;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.LeastLoadedNode;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MetadataRecoveryStrategy;
import org.apache.kafka.clients.MetadataSnapshot;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NetworkClient;
@ -299,7 +301,8 @@ public class SenderTest {
Node node = cluster.nodes().get(0);
NetworkClient client = new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE,
1000, 1000, 64 * 1024, 64 * 1024, 1000, 10 * 1000, 127 * 1000,
time, true, new ApiVersions(), throttleTimeSensor, logContext);
time, true, new ApiVersions(), throttleTimeSensor, logContext,
MetadataRecoveryStrategy.NONE);
ApiVersionsResponse apiVersionsResponse = TestUtils.defaultApiVersionsResponse(
400, ApiMessageType.ListenerType.ZK_BROKER);
@ -3797,12 +3800,12 @@ public class SenderTest {
client = new MockClient(time, metadata) {
volatile boolean canSendMore = true;
@Override
public Node leastLoadedNode(long now) {
public LeastLoadedNode leastLoadedNode(long now) {
for (Node node : metadata.fetch().nodes()) {
if (isReady(node, now) && canSendMore)
return node;
return new LeastLoadedNode(node, true);
}
return null;
return new LeastLoadedNode(null, false);
}
@Override
@ -3821,7 +3824,7 @@ public class SenderTest {
while (!client.ready(node, time.milliseconds()))
client.poll(0, time.milliseconds());
client.send(request, time.milliseconds());
while (client.leastLoadedNode(time.milliseconds()) != null)
while (client.leastLoadedNode(time.milliseconds()).node() != null)
client.poll(0, time.milliseconds());
}

View File

@ -17,6 +17,7 @@
package org.apache.kafka.connect.runtime.distributed;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.MetadataRecoveryStrategy;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
@ -96,6 +97,10 @@ public class DistributedConfig extends WorkerConfig {
public static final String REBALANCE_TIMEOUT_MS_CONFIG = CommonClientConfigs.REBALANCE_TIMEOUT_MS_CONFIG;
private static final String REBALANCE_TIMEOUT_MS_DOC = CommonClientConfigs.REBALANCE_TIMEOUT_MS_DOC;
public static final String METADATA_RECOVERY_STRATEGY_CONFIG = CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG;
private static final String METADATA_RECOVERY_STRATEGY_DOC = CommonClientConfigs.METADATA_RECOVERY_STRATEGY_DOC;
public static final String DEFAULT_METADATA_RECOVERY_STRATEGY = CommonClientConfigs.DEFAULT_METADATA_RECOVERY_STRATEGY;
/**
* <code>worker.sync.timeout.ms</code>
*/
@ -512,7 +517,14 @@ public class DistributedConfig extends WorkerConfig {
(name, value) -> validateVerificationAlgorithms(crypto, name, (List<String>) value),
() -> "A list of one or more MAC algorithms, each supported by the worker JVM"),
ConfigDef.Importance.LOW,
INTER_WORKER_VERIFICATION_ALGORITHMS_DOC);
INTER_WORKER_VERIFICATION_ALGORITHMS_DOC)
.define(METADATA_RECOVERY_STRATEGY_CONFIG,
ConfigDef.Type.STRING,
DEFAULT_METADATA_RECOVERY_STRATEGY,
ConfigDef.CaseInsensitiveValidString
.in(Utils.enumOptions(MetadataRecoveryStrategy.class)),
ConfigDef.Importance.LOW,
METADATA_RECOVERY_STRATEGY_DOC);
}
private final ExactlyOnceSourceSupport exactlyOnceSourceSupport;

View File

@ -20,6 +20,7 @@ import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MetadataRecoveryStrategy;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.clients.GroupRebalanceConfig;
@ -119,7 +120,9 @@ public class WorkerGroupMember {
time,
true,
new ApiVersions(),
logContext);
logContext,
MetadataRecoveryStrategy.forName(config.getString(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG))
);
this.client = new ConsumerNetworkClient(
logContext,
netClient,

View File

@ -18,6 +18,7 @@ package kafka.server;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ManualMetadataUpdater;
import org.apache.kafka.clients.MetadataRecoveryStrategy;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.common.Reconfigurable;
import org.apache.kafka.common.metrics.Metrics;
@ -84,7 +85,8 @@ public class NetworkUtils {
time,
true,
new ApiVersions(),
logContext
logContext,
MetadataRecoveryStrategy.NONE
);
}
}

View File

@ -26,7 +26,7 @@ import joptsimple.OptionSpec
import kafka.utils.Implicits._
import kafka.utils.Logging
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.clients.{ApiVersions, ClientDnsLookup, ClientResponse, ClientUtils, CommonClientConfigs, Metadata, NetworkClient, NodeApiVersions}
import org.apache.kafka.clients.{ApiVersions, ClientDnsLookup, ClientResponse, ClientUtils, CommonClientConfigs, Metadata, MetadataRecoveryStrategy, NetworkClient, NodeApiVersions}
import org.apache.kafka.clients.consumer.internals.{ConsumerNetworkClient, RequestFuture}
import org.apache.kafka.common.config.ConfigDef.ValidString._
import org.apache.kafka.common.config.ConfigDef.{Importance, Type}
@ -310,7 +310,8 @@ object BrokerApiVersionsCommand {
time,
true,
new ApiVersions,
logContext)
logContext,
MetadataRecoveryStrategy.NONE)
val highLevelClient = new ConsumerNetworkClient(
logContext,

View File

@ -164,7 +164,8 @@ class ControllerChannelManager(controllerEpoch: () => Int,
time,
false,
new ApiVersions,
logContext
logContext,
MetadataRecoveryStrategy.NONE
)
(networkClient, reconfigurableChannelBuilder)
}

View File

@ -96,7 +96,8 @@ object TransactionMarkerChannelManager {
time,
false,
new ApiVersions,
logContext
logContext,
MetadataRecoveryStrategy.NONE
)
new TransactionMarkerChannelManager(config,

View File

@ -30,7 +30,7 @@ import kafka.server.KafkaConfig
import kafka.utils.CoreUtils
import kafka.utils.FileLock
import kafka.utils.Logging
import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, NetworkClient}
import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, MetadataRecoveryStrategy, NetworkClient}
import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.Node
import org.apache.kafka.common.TopicPartition
@ -312,7 +312,8 @@ class KafkaRaftManager[T](
time,
discoverBrokerVersions,
apiVersions,
logContext
logContext,
MetadataRecoveryStrategy.NONE
)
(controllerListenerName, networkClient)

View File

@ -96,7 +96,8 @@ class BrokerBlockingSender(sourceBroker: BrokerEndPoint,
time,
false,
new ApiVersions,
logContext
logContext,
MetadataRecoveryStrategy.NONE
)
(networkClient, reconfigurableChannelBuilder)
}

View File

@ -30,7 +30,7 @@ import kafka.raft.KafkaRaftManager
import kafka.server.metadata.{OffsetTrackingListener, ZkConfigRepository, ZkMetadataCache}
import kafka.utils._
import kafka.zk.{AdminZkClient, BrokerInfo, KafkaZkClient}
import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, NetworkClient, NetworkClientUtils}
import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, MetadataRecoveryStrategy, NetworkClient, NetworkClientUtils}
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.ApiMessageType.ListenerType
@ -827,7 +827,8 @@ class KafkaServer(
time,
false,
new ApiVersions,
logContext)
logContext,
MetadataRecoveryStrategy.NONE)
}
var shutdownSucceeded: Boolean = false

View File

@ -191,7 +191,8 @@ class NodeToControllerChannelManagerImpl(
time,
true,
apiVersions,
logContext
logContext,
MetadataRecoveryStrategy.NONE
)
}
val threadName = s"${threadNamePrefix}to-controller-${channelName}-channel-manager"

View File

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.api
import org.junit.jupiter.api.Test
class AdminClientRebootstrapTest extends RebootstrapTest {
@Test
def testRebootstrap(): Unit = {
server1.shutdown()
server1.awaitShutdown()
val adminClient = createAdminClient(configOverrides = clientOverrides)
// Only the server 0 is available for the admin client during the bootstrap.
adminClient.listTopics().names().get()
server0.shutdown()
server0.awaitShutdown()
server1.startup()
// The server 0, originally cached during the bootstrap, is offline.
// However, the server 1 from the bootstrap list is online.
// Should be able to list topics again.
adminClient.listTopics().names().get()
server1.shutdown()
server1.awaitShutdown()
server0.startup()
// The same situation, but the server 1 has gone and server 0 is back.
adminClient.listTopics().names().get()
}
}

View File

@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.api
import kafka.utils.TestUtils
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.junit.jupiter.api.Test
import java.util.Collections
class ConsumerRebootstrapTest extends RebootstrapTest {
@Test
def testRebootstrap(): Unit = {
sendRecords(10, 0)
TestUtils.waitUntilTrue(
() => server0.logManager.logsByTopic(tp.topic()).head.logEndOffset == server1.logManager.logsByTopic(tp.topic()).head.logEndOffset,
"Timeout waiting for records to be replicated"
)
server1.shutdown()
server1.awaitShutdown()
val consumer = createConsumer(configOverrides = clientOverrides)
// Only the server 0 is available for the consumer during the bootstrap.
consumer.assign(Collections.singleton(tp))
consumeAndVerifyRecords(consumer, 10, 0)
// Bring back the server 1 and shut down 0.
server1.startup()
TestUtils.waitUntilTrue(
() => server0.logManager.logsByTopic(tp.topic()).head.logEndOffset == server1.logManager.logsByTopic(tp.topic()).head.logEndOffset,
"Timeout waiting for records to be replicated"
)
server0.shutdown()
server0.awaitShutdown()
sendRecords(10, 10)
// The server 0, originally cached during the bootstrap, is offline.
// However, the server 1 from the bootstrap list is online.
// Should be able to consume records.
consumeAndVerifyRecords(consumer, 10, 10, startingKeyAndValueIndex = 10, startingTimestamp = 10)
// Bring back the server 0 and shut down 1.
server0.startup()
TestUtils.waitUntilTrue(
() => server0.logManager.logsByTopic(tp.topic()).head.logEndOffset == server1.logManager.logsByTopic(tp.topic()).head.logEndOffset,
"Timeout waiting for records to be replicated"
)
server1.shutdown()
server1.awaitShutdown()
sendRecords(10, 20)
// The same situation, but the server 1 has gone and server 0 is back.
consumeAndVerifyRecords(consumer, 10, 20, startingKeyAndValueIndex = 20, startingTimestamp = 20)
}
private def sendRecords(numRecords: Int, from: Int): Unit = {
val producer: KafkaProducer[Array[Byte], Array[Byte]] = createProducer()
(from until (numRecords + from)).foreach { i =>
val record = new ProducerRecord(tp.topic(), tp.partition(), i.toLong, s"key $i".getBytes, s"value $i".getBytes)
producer.send(record)
}
producer.flush()
producer.close()
}
}

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.api
import org.apache.kafka.clients.producer.ProducerRecord
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
class ProducerRebootstrapTest extends RebootstrapTest {
@Test
def testRebootstrap(): Unit = {
server1.shutdown()
server1.awaitShutdown()
val producer = createProducer(configOverrides = clientOverrides)
// Only the server 0 is available for the producer during the bootstrap.
producer.send(new ProducerRecord(topic, part, "key 0".getBytes, "value 0".getBytes)).get()
server0.shutdown()
server0.awaitShutdown()
server1.startup()
// The server 0, originally cached during the bootstrap, is offline.
// However, the server 1 from the bootstrap list is online.
// Should be able to produce records.
val recordMetadata1 = producer.send(new ProducerRecord(topic, part, "key 1".getBytes, "value 1".getBytes)).get()
assertEquals(0, recordMetadata1.offset())
server1.shutdown()
server1.awaitShutdown()
server0.startup()
// The same situation, but the server 1 has gone and server 0 is back.
val recordMetadata2 = producer.send(new ProducerRecord(topic, part, "key 1".getBytes, "value 1".getBytes)).get()
assertEquals(1, recordMetadata2.offset())
}
}

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.api
import kafka.server.{KafkaConfig, KafkaServer}
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import java.util.Properties
abstract class RebootstrapTest extends AbstractConsumerTest {
override def brokerCount: Int = 2
def server0: KafkaServer = serverForId(0).get
def server1: KafkaServer = serverForId(1).get
override def generateConfigs: Seq[KafkaConfig] = {
val overridingProps = new Properties()
overridingProps.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, brokerCount.toString)
overridingProps.put(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "true")
// In this test, fixed ports are necessary, because brokers must have the
// same port after the restart.
FixedPortTestUtils.createBrokerConfigs(brokerCount, zkConnect, enableControlledShutdown = false)
.map(KafkaConfig.fromProps(_, overridingProps))
}
def clientOverrides: Properties = {
val overrides = new Properties()
overrides.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG, "5000")
overrides.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG, "5000")
overrides.put(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG, "1000")
overrides.put(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG, "1000")
overrides.put(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG, "rebootstrap")
overrides
}
}

View File

@ -23,6 +23,7 @@ import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.ManualMetadataUpdater;
import org.apache.kafka.clients.MetadataRecoveryStrategy;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.NetworkClientUtils;
import org.apache.kafka.clients.admin.Admin;
@ -706,7 +707,8 @@ public class ReplicaVerificationTool {
time,
false,
new ApiVersions(),
logContext
logContext,
MetadataRecoveryStrategy.forName(consumerConfig.getString(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG))
);
}

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.JsonNode;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.ManualMetadataUpdater;
import org.apache.kafka.clients.MetadataRecoveryStrategy;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.NetworkClientUtils;
import org.apache.kafka.clients.admin.Admin;
@ -179,7 +180,8 @@ public class ConnectionStressWorker implements TaskWorker {
TIME,
false,
new ApiVersions(),
logContext)) {
logContext,
MetadataRecoveryStrategy.NONE)) {
NetworkClientUtils.awaitReady(client, targetNode, TIME, 500);
}
}