diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 4265004a10f..2c56751fabb 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -12,16 +12,6 @@ */ package org.apache.kafka.clients; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.Set; - import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.network.NetworkReceive; @@ -40,6 +30,16 @@ import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; + /** * A network client for asynchronous request/response network i/o. This is an internal class used to implement the * user-facing producer and consumer clients. @@ -58,6 +58,7 @@ public class NetworkClient implements KafkaClient { /* a list of nodes we've connected to in the past */ private final List nodesEverSeen; private final Map nodesEverSeenById; + /* random offset into nodesEverSeen list */ private final Random randOffset; @@ -233,16 +234,6 @@ public class NetworkClient implements KafkaClient { return connectionStates.isConnected(node) && selector.isChannelReady(node) && inFlightRequests.canSendMore(node); } - /** - * Return the state of the connection to the given node - * - * @param node The node to check - * @return The connection state - */ - public ConnectionState connectionState(String node) { - return connectionStates.connectionState(node); - } - /** * Queue up the given request for sending. Requests can only be sent out to ready nodes. * @@ -275,7 +266,6 @@ public class NetworkClient implements KafkaClient { @Override public List poll(long timeout, long now) { long metadataTimeout = metadataUpdater.maybeUpdate(now); - long updatedNow = now; try { this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs)); } catch (IOException e) { @@ -283,7 +273,7 @@ public class NetworkClient implements KafkaClient { } // process completed actions - updatedNow = this.time.milliseconds(); + long updatedNow = this.time.milliseconds(); List responses = new ArrayList<>(); handleCompletedSends(responses, updatedNow); handleCompletedReceives(responses, updatedNow); @@ -612,9 +602,8 @@ public class NetworkClient implements KafkaClient { * @param nodes Current alive nodes */ private void updateNodesEverSeen(List nodes) { - Node existing = null; for (Node n : nodes) { - existing = nodesEverSeenById.get(n.id()); + Node existing = nodesEverSeenById.get(n.id()); if (existing == null) { nodesEverSeenById.put(n.id(), n); log.debug("Adding node {} to nodes ever seen", n.id()); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 4dce5861836..549c8de51a8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -477,6 +477,8 @@ public abstract class AbstractCoordinator { groupMetadataResponse.node().host(), groupMetadataResponse.node().port()); + client.tryConnect(coordinator); + // start sending heartbeats only if we have a valid generation if (generation > 0) heartbeatTask.reset(); @@ -488,11 +490,19 @@ public abstract class AbstractCoordinator { } /** - * Check if we know who the coordinator is. + * Check if we know who the coordinator is and we have an active connection * @return true if the coordinator is unknown */ public boolean coordinatorUnknown() { - return this.coordinator == null; + if (coordinator == null) + return true; + + if (client.connectionFailed(coordinator)) { + coordinatorDead(); + return true; + } + + return false; } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java index fbfe54aff02..e3a25149f0d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java @@ -48,7 +48,7 @@ public class ConsumerNetworkClient implements Closeable { private final KafkaClient client; private final AtomicBoolean wakeup = new AtomicBoolean(false); private final DelayedTaskQueue delayedTasks = new DelayedTaskQueue(); - private final Map> unsent = new HashMap>(); + private final Map> unsent = new HashMap<>(); private final Metadata metadata; private final Time time; private final long retryBackoffMs; @@ -106,7 +106,7 @@ public class ConsumerNetworkClient implements Closeable { private void put(Node node, ClientRequest request) { List nodeUnsent = unsent.get(node); if (nodeUnsent == null) { - nodeUnsent = new ArrayList(); + nodeUnsent = new ArrayList<>(); unsent.put(node, nodeUnsent); } nodeUnsent.add(request); @@ -183,25 +183,28 @@ public class ConsumerNetworkClient implements Closeable { private void poll(long timeout, long now) { // send all the requests we can send now - pollUnsentRequests(now); - now = time.milliseconds(); - + trySend(now); + // ensure we don't poll any longer than the deadline for // the next scheduled task timeout = Math.min(timeout, delayedTasks.nextTimeout(now)); clientPoll(timeout, now); + now = time.milliseconds(); + + // handle any disconnects by failing the active requests. note that disconects must + // be checked immediately following poll since any subsequent call to client.ready() + // will reset the disconnect status + checkDisconnects(now); // execute scheduled tasks - now = time.milliseconds(); delayedTasks.poll(now); // try again to send requests since buffer space may have been // cleared or a connect finished in the poll - pollUnsentRequests(now); + trySend(now); // fail all requests that couldn't be sent - clearUnsentRequests(); - + failUnsentRequests(); } /** @@ -237,14 +240,27 @@ public class ConsumerNetworkClient implements Closeable { return total + client.inFlightRequestCount(); } - private void pollUnsentRequests(long now) { - while (trySend(now)) { - clientPoll(0, now); - now = time.milliseconds(); + private void checkDisconnects(long now) { + // any disconnects affecting requests that have already been transmitted will be handled + // by NetworkClient, so we just need to check whether connections for any of the unsent + // requests have been disconnected; if they have, then we complete the corresponding future + // and set the disconnect flag in the ClientResponse + Iterator>> iterator = unsent.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry> requestEntry = iterator.next(); + Node node = requestEntry.getKey(); + if (client.connectionFailed(node)) { + for (ClientRequest request : requestEntry.getValue()) { + RequestFutureCompletionHandler handler = + (RequestFutureCompletionHandler) request.callback(); + handler.complete(new ClientResponse(request, now, true, null)); + } + iterator.remove(); + } } } - private void clearUnsentRequests() { + private void failUnsentRequests() { // clear all unsent requests and fail their corresponding futures for (Map.Entry> requestEntry: unsent.entrySet()) { Iterator iterator = requestEntry.getValue().iterator(); @@ -271,11 +287,6 @@ public class ConsumerNetworkClient implements Closeable { client.send(request, now); iterator.remove(); requestsSent = true; - } else if (client.connectionFailed(node)) { - RequestFutureCompletionHandler handler = - (RequestFutureCompletionHandler) request.callback(); - handler.onComplete(new ClientResponse(request, now, true, null)); - iterator.remove(); } } } @@ -285,7 +296,7 @@ public class ConsumerNetworkClient implements Closeable { private void clientPoll(long timeout, long now) { client.poll(timeout, now); if (wakeup.get()) { - clearUnsentRequests(); + failUnsentRequests(); wakeup.set(false); throw new ConsumerWakeupException(); } @@ -296,6 +307,25 @@ public class ConsumerNetworkClient implements Closeable { client.close(); } + /** + * Find whether a previous connection has failed. Note that the failure state will persist until either + * {@link #tryConnect(Node)} or {@link #send(Node, ApiKeys, AbstractRequest)} has been called. + * @param node Node to connect to if possible + */ + public boolean connectionFailed(Node node) { + return client.connectionFailed(node); + } + + /** + * Initiate a connection if currently possible. This is only really useful for resetting the failed + * status of a socket. If there is an actual request to send, then {@link #send(Node, ApiKeys, AbstractRequest)} + * should be used. + * @param node The node to connect to + */ + public void tryConnect(Node node) { + client.ready(node, time.milliseconds()); + } + public static class RequestFutureCompletionHandler extends RequestFuture implements RequestCompletionHandler { diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java index 67d894d4e6a..2726e874251 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java @@ -57,10 +57,10 @@ public class MockClient implements KafkaClient { private final Time time; private int correlation = 0; private Node node = null; - private final Set ready = new HashSet(); - private final Queue requests = new ArrayDeque(); - private final Queue responses = new ArrayDeque(); - private final Queue futureResponses = new ArrayDeque(); + private final Set ready = new HashSet<>(); + private final Queue requests = new ArrayDeque<>(); + private final Queue responses = new ArrayDeque<>(); + private final Queue futureResponses = new ArrayDeque<>(); public MockClient(Time time) { this.time = time; @@ -88,11 +88,12 @@ public class MockClient implements KafkaClient { } public void disconnect(String node) { + long now = time.milliseconds(); Iterator iter = requests.iterator(); while (iter.hasNext()) { ClientRequest request = iter.next(); if (request.request().destination() == node) { - responses.add(new ClientResponse(request, time.milliseconds(), true, null)); + responses.add(new ClientResponse(request, now, true, null)); iter.remove(); } } @@ -146,7 +147,7 @@ public class MockClient implements KafkaClient { /** * Prepare a response for a request matching the provided matcher. If the matcher does not - * match, {@link #send(ClientRequest)} will throw IllegalStateException + * match, {@link #send(ClientRequest, long)} will throw IllegalStateException * @param matcher The matcher to apply * @param body The response body */ @@ -160,7 +161,7 @@ public class MockClient implements KafkaClient { /** * Prepare a response for a request matching the provided matcher. If the matcher does not - * match, {@link #send(ClientRequest)} will throw IllegalStateException + * match, {@link #send(ClientRequest, long)} will throw IllegalStateException * @param matcher The matcher to apply * @param body The response body * @param disconnected Whether the request was disconnected