diff --git a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java index 6c582115c0a..a8101dad12b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java +++ b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java @@ -115,10 +115,12 @@ final class ClusterConnectionStates { /** * Enter the disconnected state for the given node * @param id The connection we have disconnected + * @param now The current time */ - public void disconnected(String id) { + public void disconnected(String id, long now) { NodeConnectionState nodeState = nodeState(id); nodeState.state = ConnectionState.DISCONNECTED; + nodeState.lastConnectAttemptMs = now; } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 6f39ac9046e..4265004a10f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -15,8 +15,10 @@ package org.apache.kafka.clients; import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.Set; @@ -33,6 +35,7 @@ import org.apache.kafka.common.requests.MetadataResponse; import org.apache.kafka.common.requests.RequestHeader; import org.apache.kafka.common.requests.RequestSend; import org.apache.kafka.common.requests.ResponseHeader; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,7 +54,13 @@ public class NetworkClient implements KafkaClient { private final Selectable selector; private final MetadataUpdater metadataUpdater; - + + /* a list of nodes we've connected to in the past */ + private final List nodesEverSeen; + private final Map nodesEverSeenById; + /* random offset into nodesEverSeen list */ + private final Random randOffset; + /* the state of each node's connection */ private final ClusterConnectionStates connectionStates; @@ -75,6 +84,8 @@ public class NetworkClient implements KafkaClient { /* max time in ms for the producer to wait for acknowledgement from server*/ private final int requestTimeoutMs; + + private final Time time; public NetworkClient(Selectable selector, Metadata metadata, @@ -83,9 +94,10 @@ public class NetworkClient implements KafkaClient { long reconnectBackoffMs, int socketSendBuffer, int socketReceiveBuffer, - int requestTimeoutMs) { + int requestTimeoutMs, + Time time) { this(null, metadata, selector, clientId, maxInFlightRequestsPerConnection, - reconnectBackoffMs, socketSendBuffer, socketReceiveBuffer, requestTimeoutMs); + reconnectBackoffMs, socketSendBuffer, socketReceiveBuffer, requestTimeoutMs, time); } public NetworkClient(Selectable selector, @@ -95,9 +107,10 @@ public class NetworkClient implements KafkaClient { long reconnectBackoffMs, int socketSendBuffer, int socketReceiveBuffer, - int requestTimeoutMs) { + int requestTimeoutMs, + Time time) { this(metadataUpdater, null, selector, clientId, maxInFlightRequestsPerConnection, reconnectBackoffMs, - socketSendBuffer, socketReceiveBuffer, requestTimeoutMs); + socketSendBuffer, socketReceiveBuffer, requestTimeoutMs, time); } private NetworkClient(MetadataUpdater metadataUpdater, @@ -107,7 +120,9 @@ public class NetworkClient implements KafkaClient { int maxInFlightRequestsPerConnection, long reconnectBackoffMs, int socketSendBuffer, - int socketReceiveBuffer, int requestTimeoutMs) { + int socketReceiveBuffer, + int requestTimeoutMs, + Time time) { /* It would be better if we could pass `DefaultMetadataUpdater` from the public constructor, but it's not * possible because `DefaultMetadataUpdater` is an inner class and it can only be instantiated after the @@ -127,8 +142,13 @@ public class NetworkClient implements KafkaClient { this.socketSendBuffer = socketSendBuffer; this.socketReceiveBuffer = socketReceiveBuffer; this.correlation = 0; - this.nodeIndexOffset = new Random().nextInt(Integer.MAX_VALUE); + this.randOffset = new Random(); + this.nodeIndexOffset = this.randOffset.nextInt(Integer.MAX_VALUE); this.requestTimeoutMs = requestTimeoutMs; + this.nodesEverSeen = new ArrayList<>(); + this.nodesEverSeenById = new HashMap<>(); + + this.time = time; } /** @@ -255,6 +275,7 @@ public class NetworkClient implements KafkaClient { @Override public List poll(long timeout, long now) { long metadataTimeout = metadataUpdater.maybeUpdate(now); + long updatedNow = now; try { this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs)); } catch (IOException e) { @@ -262,12 +283,13 @@ public class NetworkClient implements KafkaClient { } // process completed actions + updatedNow = this.time.milliseconds(); List responses = new ArrayList<>(); - handleCompletedSends(responses, now); - handleCompletedReceives(responses, now); - handleDisconnections(responses, now); + handleCompletedSends(responses, updatedNow); + handleCompletedReceives(responses, updatedNow); + handleDisconnections(responses, updatedNow); handleConnections(); - handleTimedOutRequests(responses, now); + handleTimedOutRequests(responses, updatedNow); // invoke callbacks for (ClientResponse response : responses) { @@ -364,6 +386,20 @@ public class NetworkClient implements KafkaClient { found = node; } } + + // if we found no node in the current list, try one from the nodes seen before + if (found == null && nodesEverSeen.size() > 0) { + int offset = randOffset.nextInt(nodesEverSeen.size()); + for (int i = 0; i < nodesEverSeen.size(); i++) { + int idx = Utils.abs((offset + i) % nodesEverSeen.size()); + Node node = nodesEverSeenById.get(nodesEverSeen.get(idx)); + log.debug("No node found. Trying previously-seen node with ID {}", node.id()); + if (!this.connectionStates.isBlackedOut(node.idString(), now)) { + found = node; + } + } + } + return found; } @@ -375,7 +411,7 @@ public class NetworkClient implements KafkaClient { * @param now The current time */ private void processDisconnection(List responses, String nodeId, long now) { - connectionStates.disconnected(nodeId); + connectionStates.disconnected(nodeId, now); for (ClientRequest request : this.inFlightRequests.clearAll(nodeId)) { log.trace("Cancelled request {} due to node {} being disconnected", request, nodeId); if (!metadataUpdater.maybeHandleDisconnection(request)) @@ -489,7 +525,7 @@ public class NetworkClient implements KafkaClient { this.socketReceiveBuffer); } catch (IOException e) { /* attempt failed, we'll try again after the backoff */ - connectionStates.disconnected(nodeConnectionId); + connectionStates.disconnected(nodeConnectionId, now); /* maybe the problem is our metadata, update it */ metadataUpdater.requestUpdate(); log.debug("Error connecting to node {} at {}:{}:", node.id(), node.host(), node.port(), e); @@ -532,7 +568,7 @@ public class NetworkClient implements KafkaClient { // if there is no node available to connect, back off refreshing metadata long metadataTimeout = Math.max(Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt), waitForMetadataFetch); - + if (metadataTimeout == 0) { // Beware that the behavior of this method and the computation of timeouts for poll() are // highly dependent on the behavior of leastLoadedNode. @@ -570,6 +606,29 @@ public class NetworkClient implements KafkaClient { this.metadata.requestUpdate(); } + /* + * Keep track of any nodes we've ever seen. Add current + * alive nodes to this tracking list. + * @param nodes Current alive nodes + */ + private void updateNodesEverSeen(List nodes) { + Node existing = null; + for (Node n : nodes) { + existing = nodesEverSeenById.get(n.id()); + if (existing == null) { + nodesEverSeenById.put(n.id(), n); + log.debug("Adding node {} to nodes ever seen", n.id()); + nodesEverSeen.add(n.id()); + } else { + // check if the nodes are really equal. There could be a case + // where node.id() is the same but node has moved to different host + if (!existing.equals(n)) { + nodesEverSeenById.put(n.id(), n); + } + } + } + } + private void handleResponse(RequestHeader header, Struct body, long now) { this.metadataFetchInProgress = false; MetadataResponse response = new MetadataResponse(body); @@ -582,6 +641,7 @@ public class NetworkClient implements KafkaClient { // created which means we will get errors and no nodes until it exists if (cluster.nodes().size() > 0) { this.metadata.update(cluster, now); + this.updateNodesEverSeen(cluster.nodes()); } else { log.trace("Ignoring empty metadata response with correlation id {}.", header.correlationId()); this.metadata.failedUpdate(now); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 24051f2805e..2f7f153fc4d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -527,7 +527,7 @@ public class KafkaConsumer implements Consumer { config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG), config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG), config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG), - config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG)); + config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), time); this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs); OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase()); this.subscriptions = new SubscriptionState(offsetResetStrategy); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java index 0b611fb1ea2..4153eb39778 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java @@ -175,7 +175,8 @@ public class ConsumerNetworkClient implements Closeable { private void poll(long timeout, long now) { // send all the requests we can send now pollUnsentRequests(now); - + now = time.milliseconds(); + // ensure we don't poll any longer than the deadline for // the next scheduled task timeout = Math.min(timeout, delayedTasks.nextTimeout(now)); @@ -190,7 +191,7 @@ public class ConsumerNetworkClient implements Closeable { pollUnsentRequests(now); // fail all requests that couldn't be sent - clearUnsentRequests(now); + clearUnsentRequests(); } @@ -228,11 +229,13 @@ public class ConsumerNetworkClient implements Closeable { } private void pollUnsentRequests(long now) { - while (trySend(now)) + while (trySend(now)) { clientPoll(0, now); + now = time.milliseconds(); + } } - private void clearUnsentRequests(long now) { + private void clearUnsentRequests() { // clear all unsent requests and fail their corresponding futures for (Map.Entry> requestEntry: unsent.entrySet()) { Iterator iterator = requestEntry.getValue().iterator(); @@ -273,7 +276,7 @@ public class ConsumerNetworkClient implements Closeable { private void clientPoll(long timeout, long now) { client.poll(timeout, now); if (wakeup.get()) { - clearUnsentRequests(now); + clearUnsentRequests(); wakeup.set(false); throw new ConsumerWakeupException(); } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 44280e06b70..ff3bfe6f45b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -276,7 +276,7 @@ public class KafkaProducer implements Producer { config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG), config.getInt(ProducerConfig.SEND_BUFFER_CONFIG), config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG), - this.requestTimeoutMs); + this.requestTimeoutMs, time); this.sender = new Sender(client, this.metadata, this.accumulator, diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java index 237989689e0..12136d80b65 100644 --- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java @@ -52,10 +52,12 @@ public class NetworkClientTest { private int nodeId = 1; private Cluster cluster = TestUtils.singletonCluster("test", nodeId); private Node node = cluster.nodes().get(0); - private NetworkClient client = new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE, 0, 64 * 1024, 64 * 1024, requestTimeoutMs); - + private long reconnectBackoffMsTest = 10 * 1000; + private NetworkClient client = new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE, reconnectBackoffMsTest, + 64 * 1024, 64 * 1024, requestTimeoutMs, time); + private NetworkClient clientWithStaticNodes = new NetworkClient(selector, new ManualMetadataUpdater(Arrays.asList(node)), - "mock-static", Integer.MAX_VALUE, 0, 64 * 1024, 64 * 1024, requestTimeoutMs); + "mock-static", Integer.MAX_VALUE, 0, 64 * 1024, 64 * 1024, requestTimeoutMs, time); @Before public void setup() { @@ -149,6 +151,31 @@ public class NetworkClientTest { assertEquals(node.idString(), disconnectedNode); } + @Test + public void testLeastLoadedNode() { + Node leastNode = null; + client.ready(node, time.milliseconds()); + awaitReady(client, node); + client.poll(1, time.milliseconds()); + assertTrue("The client should be ready", client.isReady(node, time.milliseconds())); + + // leastloadednode should be our single node + leastNode = client.leastLoadedNode(time.milliseconds()); + assertEquals("There should be one leastloadednode", leastNode.id(), node.id()); + + // sleep for longer than reconnect backoff + time.sleep(reconnectBackoffMsTest); + + // CLOSE node + selector.close(node.idString()); + + client.poll(1, time.milliseconds()); + assertFalse("After we forced the disconnection the client is no longer ready.", client.ready(node, time.milliseconds())); + leastNode = client.leastLoadedNode(time.milliseconds()); + assertEquals("There should be NO leastloadednode", leastNode, null); + + } + private static class TestCallbackHandler implements RequestCompletionHandler { public boolean executed = false; public ClientResponse response; diff --git a/clients/src/test/java/org/apache/kafka/test/MockSelector.java b/clients/src/test/java/org/apache/kafka/test/MockSelector.java index 5a5f963ac88..b39ff7e7460 100644 --- a/clients/src/test/java/org/apache/kafka/test/MockSelector.java +++ b/clients/src/test/java/org/apache/kafka/test/MockSelector.java @@ -55,6 +55,12 @@ public class MockSelector implements Selectable { @Override public void close(String id) { this.disconnected.add(id); + for (int i = 0; i < this.connected.size(); i++) { + if (this.connected.get(i).equals(id)) { + this.connected.remove(i); + break; + } + } } public void clear() { diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index 37568227ab6..d86c8ce3471 100755 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -106,7 +106,8 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf 0, Selectable.USE_DEFAULT_BUFFER_SIZE, Selectable.USE_DEFAULT_BUFFER_SIZE, - config.requestTimeoutMs + config.requestTimeoutMs, + time ) } val threadName = threadNamePrefix match { diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 510957b704b..beea83a8b0f 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -317,7 +317,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr 0, Selectable.USE_DEFAULT_BUFFER_SIZE, Selectable.USE_DEFAULT_BUFFER_SIZE, - config.requestTimeoutMs) + config.requestTimeoutMs, + kafkaMetricsTime) } var shutdownSucceeded: Boolean = false diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 5993bbbe6a8..4affd894ca7 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -84,7 +84,8 @@ class ReplicaFetcherThread(name: String, 0, Selectable.USE_DEFAULT_BUFFER_SIZE, brokerConfig.replicaSocketReceiveBufferBytes, - brokerConfig.requestTimeoutMs + brokerConfig.requestTimeoutMs, + time ) }