KAFKA-2677: ensure consumer sees coordinator disconnects

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Ismael Juma, Guozhang Wang

Closes #349 from hachikuji/KAFKA-2677
This commit is contained in:
Jason Gustafson 2015-10-27 16:24:10 -07:00 committed by Guozhang Wang
parent e6b343302f
commit 0b05d3b939
4 changed files with 83 additions and 53 deletions

View File

@ -12,16 +12,6 @@
*/
package org.apache.kafka.clients;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.network.NetworkReceive;
@ -40,6 +30,16 @@ import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
/**
* A network client for asynchronous request/response network i/o. This is an internal class used to implement the
* user-facing producer and consumer clients.
@ -58,6 +58,7 @@ public class NetworkClient implements KafkaClient {
/* a list of nodes we've connected to in the past */
private final List<Integer> nodesEverSeen;
private final Map<Integer, Node> nodesEverSeenById;
/* random offset into nodesEverSeen list */
private final Random randOffset;
@ -233,16 +234,6 @@ public class NetworkClient implements KafkaClient {
return connectionStates.isConnected(node) && selector.isChannelReady(node) && inFlightRequests.canSendMore(node);
}
/**
* Return the state of the connection to the given node
*
* @param node The node to check
* @return The connection state
*/
public ConnectionState connectionState(String node) {
return connectionStates.connectionState(node);
}
/**
* Queue up the given request for sending. Requests can only be sent out to ready nodes.
*
@ -275,7 +266,6 @@ public class NetworkClient implements KafkaClient {
@Override
public List<ClientResponse> poll(long timeout, long now) {
long metadataTimeout = metadataUpdater.maybeUpdate(now);
long updatedNow = now;
try {
this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
} catch (IOException e) {
@ -283,7 +273,7 @@ public class NetworkClient implements KafkaClient {
}
// process completed actions
updatedNow = this.time.milliseconds();
long updatedNow = this.time.milliseconds();
List<ClientResponse> responses = new ArrayList<>();
handleCompletedSends(responses, updatedNow);
handleCompletedReceives(responses, updatedNow);
@ -612,9 +602,8 @@ public class NetworkClient implements KafkaClient {
* @param nodes Current alive nodes
*/
private void updateNodesEverSeen(List<Node> nodes) {
Node existing = null;
for (Node n : nodes) {
existing = nodesEverSeenById.get(n.id());
Node existing = nodesEverSeenById.get(n.id());
if (existing == null) {
nodesEverSeenById.put(n.id(), n);
log.debug("Adding node {} to nodes ever seen", n.id());

View File

@ -477,6 +477,8 @@ public abstract class AbstractCoordinator {
groupMetadataResponse.node().host(),
groupMetadataResponse.node().port());
client.tryConnect(coordinator);
// start sending heartbeats only if we have a valid generation
if (generation > 0)
heartbeatTask.reset();
@ -488,11 +490,19 @@ public abstract class AbstractCoordinator {
}
/**
* Check if we know who the coordinator is.
* Check if we know who the coordinator is and we have an active connection
* @return true if the coordinator is unknown
*/
public boolean coordinatorUnknown() {
return this.coordinator == null;
if (coordinator == null)
return true;
if (client.connectionFailed(coordinator)) {
coordinatorDead();
return true;
}
return false;
}

View File

@ -48,7 +48,7 @@ public class ConsumerNetworkClient implements Closeable {
private final KafkaClient client;
private final AtomicBoolean wakeup = new AtomicBoolean(false);
private final DelayedTaskQueue delayedTasks = new DelayedTaskQueue();
private final Map<Node, List<ClientRequest>> unsent = new HashMap<Node, List<ClientRequest>>();
private final Map<Node, List<ClientRequest>> unsent = new HashMap<>();
private final Metadata metadata;
private final Time time;
private final long retryBackoffMs;
@ -106,7 +106,7 @@ public class ConsumerNetworkClient implements Closeable {
private void put(Node node, ClientRequest request) {
List<ClientRequest> nodeUnsent = unsent.get(node);
if (nodeUnsent == null) {
nodeUnsent = new ArrayList<ClientRequest>();
nodeUnsent = new ArrayList<>();
unsent.put(node, nodeUnsent);
}
nodeUnsent.add(request);
@ -183,25 +183,28 @@ public class ConsumerNetworkClient implements Closeable {
private void poll(long timeout, long now) {
// send all the requests we can send now
pollUnsentRequests(now);
now = time.milliseconds();
trySend(now);
// ensure we don't poll any longer than the deadline for
// the next scheduled task
timeout = Math.min(timeout, delayedTasks.nextTimeout(now));
clientPoll(timeout, now);
now = time.milliseconds();
// handle any disconnects by failing the active requests. note that disconects must
// be checked immediately following poll since any subsequent call to client.ready()
// will reset the disconnect status
checkDisconnects(now);
// execute scheduled tasks
now = time.milliseconds();
delayedTasks.poll(now);
// try again to send requests since buffer space may have been
// cleared or a connect finished in the poll
pollUnsentRequests(now);
trySend(now);
// fail all requests that couldn't be sent
clearUnsentRequests();
failUnsentRequests();
}
/**
@ -237,14 +240,27 @@ public class ConsumerNetworkClient implements Closeable {
return total + client.inFlightRequestCount();
}
private void pollUnsentRequests(long now) {
while (trySend(now)) {
clientPoll(0, now);
now = time.milliseconds();
private void checkDisconnects(long now) {
// any disconnects affecting requests that have already been transmitted will be handled
// by NetworkClient, so we just need to check whether connections for any of the unsent
// requests have been disconnected; if they have, then we complete the corresponding future
// and set the disconnect flag in the ClientResponse
Iterator<Map.Entry<Node, List<ClientRequest>>> iterator = unsent.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<Node, List<ClientRequest>> requestEntry = iterator.next();
Node node = requestEntry.getKey();
if (client.connectionFailed(node)) {
for (ClientRequest request : requestEntry.getValue()) {
RequestFutureCompletionHandler handler =
(RequestFutureCompletionHandler) request.callback();
handler.complete(new ClientResponse(request, now, true, null));
}
iterator.remove();
}
}
}
private void clearUnsentRequests() {
private void failUnsentRequests() {
// clear all unsent requests and fail their corresponding futures
for (Map.Entry<Node, List<ClientRequest>> requestEntry: unsent.entrySet()) {
Iterator<ClientRequest> iterator = requestEntry.getValue().iterator();
@ -271,11 +287,6 @@ public class ConsumerNetworkClient implements Closeable {
client.send(request, now);
iterator.remove();
requestsSent = true;
} else if (client.connectionFailed(node)) {
RequestFutureCompletionHandler handler =
(RequestFutureCompletionHandler) request.callback();
handler.onComplete(new ClientResponse(request, now, true, null));
iterator.remove();
}
}
}
@ -285,7 +296,7 @@ public class ConsumerNetworkClient implements Closeable {
private void clientPoll(long timeout, long now) {
client.poll(timeout, now);
if (wakeup.get()) {
clearUnsentRequests();
failUnsentRequests();
wakeup.set(false);
throw new ConsumerWakeupException();
}
@ -296,6 +307,25 @@ public class ConsumerNetworkClient implements Closeable {
client.close();
}
/**
* Find whether a previous connection has failed. Note that the failure state will persist until either
* {@link #tryConnect(Node)} or {@link #send(Node, ApiKeys, AbstractRequest)} has been called.
* @param node Node to connect to if possible
*/
public boolean connectionFailed(Node node) {
return client.connectionFailed(node);
}
/**
* Initiate a connection if currently possible. This is only really useful for resetting the failed
* status of a socket. If there is an actual request to send, then {@link #send(Node, ApiKeys, AbstractRequest)}
* should be used.
* @param node The node to connect to
*/
public void tryConnect(Node node) {
client.ready(node, time.milliseconds());
}
public static class RequestFutureCompletionHandler
extends RequestFuture<ClientResponse>
implements RequestCompletionHandler {

View File

@ -57,10 +57,10 @@ public class MockClient implements KafkaClient {
private final Time time;
private int correlation = 0;
private Node node = null;
private final Set<Integer> ready = new HashSet<Integer>();
private final Queue<ClientRequest> requests = new ArrayDeque<ClientRequest>();
private final Queue<ClientResponse> responses = new ArrayDeque<ClientResponse>();
private final Queue<FutureResponse> futureResponses = new ArrayDeque<FutureResponse>();
private final Set<Integer> ready = new HashSet<>();
private final Queue<ClientRequest> requests = new ArrayDeque<>();
private final Queue<ClientResponse> responses = new ArrayDeque<>();
private final Queue<FutureResponse> futureResponses = new ArrayDeque<>();
public MockClient(Time time) {
this.time = time;
@ -88,11 +88,12 @@ public class MockClient implements KafkaClient {
}
public void disconnect(String node) {
long now = time.milliseconds();
Iterator<ClientRequest> iter = requests.iterator();
while (iter.hasNext()) {
ClientRequest request = iter.next();
if (request.request().destination() == node) {
responses.add(new ClientResponse(request, time.milliseconds(), true, null));
responses.add(new ClientResponse(request, now, true, null));
iter.remove();
}
}
@ -146,7 +147,7 @@ public class MockClient implements KafkaClient {
/**
* Prepare a response for a request matching the provided matcher. If the matcher does not
* match, {@link #send(ClientRequest)} will throw IllegalStateException
* match, {@link #send(ClientRequest, long)} will throw IllegalStateException
* @param matcher The matcher to apply
* @param body The response body
*/
@ -160,7 +161,7 @@ public class MockClient implements KafkaClient {
/**
* Prepare a response for a request matching the provided matcher. If the matcher does not
* match, {@link #send(ClientRequest)} will throw IllegalStateException
* match, {@link #send(ClientRequest, long)} will throw IllegalStateException
* @param matcher The matcher to apply
* @param body The response body
* @param disconnected Whether the request was disconnected