KAFKA-2459: Mark last committed timestamp to fix connection backoff

This fix applies to three JIRAs, since they are all connected.

KAFKA-2459Connection backoff/blackout period should start when a connection is disconnected, not when the connection attempt was initiated
Backoff when connection is disconnected

KAFKA-2615Poll() method is broken wrt time
Added Time through the NetworkClient API. Minimal change.

KAFKA-1843Metadata fetch/refresh in new producer should handle all node connection states gracefully
I’ve partially addressed this for a specific failure case in the JIRA.

Author: Eno Thereska <eno.thereska@gmail.com>

Reviewers: Ewen Cheslack-Postava, Jason Gustafson, Ismael Juma, Guozhang Wang

Closes #290 from enothereska/trunk
This commit is contained in:
Eno Thereska 2015-10-21 10:04:49 -07:00 committed by Guozhang Wang
parent 44f6c4b946
commit 0785feeb0f
10 changed files with 129 additions and 28 deletions

View File

@ -115,10 +115,12 @@ final class ClusterConnectionStates {
/**
* Enter the disconnected state for the given node
* @param id The connection we have disconnected
* @param now The current time
*/
public void disconnected(String id) {
public void disconnected(String id, long now) {
NodeConnectionState nodeState = nodeState(id);
nodeState.state = ConnectionState.DISCONNECTED;
nodeState.lastConnectAttemptMs = now;
}
/**

View File

@ -15,8 +15,10 @@ package org.apache.kafka.clients;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
@ -33,6 +35,7 @@ import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.RequestSend;
import org.apache.kafka.common.requests.ResponseHeader;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -52,6 +55,12 @@ public class NetworkClient implements KafkaClient {
private final MetadataUpdater metadataUpdater;
/* a list of nodes we've connected to in the past */
private final List<Integer> nodesEverSeen;
private final Map<Integer, Node> nodesEverSeenById;
/* random offset into nodesEverSeen list */
private final Random randOffset;
/* the state of each node's connection */
private final ClusterConnectionStates connectionStates;
@ -76,6 +85,8 @@ public class NetworkClient implements KafkaClient {
/* max time in ms for the producer to wait for acknowledgement from server*/
private final int requestTimeoutMs;
private final Time time;
public NetworkClient(Selectable selector,
Metadata metadata,
String clientId,
@ -83,9 +94,10 @@ public class NetworkClient implements KafkaClient {
long reconnectBackoffMs,
int socketSendBuffer,
int socketReceiveBuffer,
int requestTimeoutMs) {
int requestTimeoutMs,
Time time) {
this(null, metadata, selector, clientId, maxInFlightRequestsPerConnection,
reconnectBackoffMs, socketSendBuffer, socketReceiveBuffer, requestTimeoutMs);
reconnectBackoffMs, socketSendBuffer, socketReceiveBuffer, requestTimeoutMs, time);
}
public NetworkClient(Selectable selector,
@ -95,9 +107,10 @@ public class NetworkClient implements KafkaClient {
long reconnectBackoffMs,
int socketSendBuffer,
int socketReceiveBuffer,
int requestTimeoutMs) {
int requestTimeoutMs,
Time time) {
this(metadataUpdater, null, selector, clientId, maxInFlightRequestsPerConnection, reconnectBackoffMs,
socketSendBuffer, socketReceiveBuffer, requestTimeoutMs);
socketSendBuffer, socketReceiveBuffer, requestTimeoutMs, time);
}
private NetworkClient(MetadataUpdater metadataUpdater,
@ -107,7 +120,9 @@ public class NetworkClient implements KafkaClient {
int maxInFlightRequestsPerConnection,
long reconnectBackoffMs,
int socketSendBuffer,
int socketReceiveBuffer, int requestTimeoutMs) {
int socketReceiveBuffer,
int requestTimeoutMs,
Time time) {
/* It would be better if we could pass `DefaultMetadataUpdater` from the public constructor, but it's not
* possible because `DefaultMetadataUpdater` is an inner class and it can only be instantiated after the
@ -127,8 +142,13 @@ public class NetworkClient implements KafkaClient {
this.socketSendBuffer = socketSendBuffer;
this.socketReceiveBuffer = socketReceiveBuffer;
this.correlation = 0;
this.nodeIndexOffset = new Random().nextInt(Integer.MAX_VALUE);
this.randOffset = new Random();
this.nodeIndexOffset = this.randOffset.nextInt(Integer.MAX_VALUE);
this.requestTimeoutMs = requestTimeoutMs;
this.nodesEverSeen = new ArrayList<>();
this.nodesEverSeenById = new HashMap<>();
this.time = time;
}
/**
@ -255,6 +275,7 @@ public class NetworkClient implements KafkaClient {
@Override
public List<ClientResponse> poll(long timeout, long now) {
long metadataTimeout = metadataUpdater.maybeUpdate(now);
long updatedNow = now;
try {
this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
} catch (IOException e) {
@ -262,12 +283,13 @@ public class NetworkClient implements KafkaClient {
}
// process completed actions
updatedNow = this.time.milliseconds();
List<ClientResponse> responses = new ArrayList<>();
handleCompletedSends(responses, now);
handleCompletedReceives(responses, now);
handleDisconnections(responses, now);
handleCompletedSends(responses, updatedNow);
handleCompletedReceives(responses, updatedNow);
handleDisconnections(responses, updatedNow);
handleConnections();
handleTimedOutRequests(responses, now);
handleTimedOutRequests(responses, updatedNow);
// invoke callbacks
for (ClientResponse response : responses) {
@ -364,6 +386,20 @@ public class NetworkClient implements KafkaClient {
found = node;
}
}
// if we found no node in the current list, try one from the nodes seen before
if (found == null && nodesEverSeen.size() > 0) {
int offset = randOffset.nextInt(nodesEverSeen.size());
for (int i = 0; i < nodesEverSeen.size(); i++) {
int idx = Utils.abs((offset + i) % nodesEverSeen.size());
Node node = nodesEverSeenById.get(nodesEverSeen.get(idx));
log.debug("No node found. Trying previously-seen node with ID {}", node.id());
if (!this.connectionStates.isBlackedOut(node.idString(), now)) {
found = node;
}
}
}
return found;
}
@ -375,7 +411,7 @@ public class NetworkClient implements KafkaClient {
* @param now The current time
*/
private void processDisconnection(List<ClientResponse> responses, String nodeId, long now) {
connectionStates.disconnected(nodeId);
connectionStates.disconnected(nodeId, now);
for (ClientRequest request : this.inFlightRequests.clearAll(nodeId)) {
log.trace("Cancelled request {} due to node {} being disconnected", request, nodeId);
if (!metadataUpdater.maybeHandleDisconnection(request))
@ -489,7 +525,7 @@ public class NetworkClient implements KafkaClient {
this.socketReceiveBuffer);
} catch (IOException e) {
/* attempt failed, we'll try again after the backoff */
connectionStates.disconnected(nodeConnectionId);
connectionStates.disconnected(nodeConnectionId, now);
/* maybe the problem is our metadata, update it */
metadataUpdater.requestUpdate();
log.debug("Error connecting to node {} at {}:{}:", node.id(), node.host(), node.port(), e);
@ -570,6 +606,29 @@ public class NetworkClient implements KafkaClient {
this.metadata.requestUpdate();
}
/*
* Keep track of any nodes we've ever seen. Add current
* alive nodes to this tracking list.
* @param nodes Current alive nodes
*/
private void updateNodesEverSeen(List<Node> nodes) {
Node existing = null;
for (Node n : nodes) {
existing = nodesEverSeenById.get(n.id());
if (existing == null) {
nodesEverSeenById.put(n.id(), n);
log.debug("Adding node {} to nodes ever seen", n.id());
nodesEverSeen.add(n.id());
} else {
// check if the nodes are really equal. There could be a case
// where node.id() is the same but node has moved to different host
if (!existing.equals(n)) {
nodesEverSeenById.put(n.id(), n);
}
}
}
}
private void handleResponse(RequestHeader header, Struct body, long now) {
this.metadataFetchInProgress = false;
MetadataResponse response = new MetadataResponse(body);
@ -582,6 +641,7 @@ public class NetworkClient implements KafkaClient {
// created which means we will get errors and no nodes until it exists
if (cluster.nodes().size() > 0) {
this.metadata.update(cluster, now);
this.updateNodesEverSeen(cluster.nodes());
} else {
log.trace("Ignoring empty metadata response with correlation id {}.", header.correlationId());
this.metadata.failedUpdate(now);

View File

@ -527,7 +527,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG),
config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG),
config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG),
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG));
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), time);
this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs);
OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase());
this.subscriptions = new SubscriptionState(offsetResetStrategy);

View File

@ -175,6 +175,7 @@ public class ConsumerNetworkClient implements Closeable {
private void poll(long timeout, long now) {
// send all the requests we can send now
pollUnsentRequests(now);
now = time.milliseconds();
// ensure we don't poll any longer than the deadline for
// the next scheduled task
@ -190,7 +191,7 @@ public class ConsumerNetworkClient implements Closeable {
pollUnsentRequests(now);
// fail all requests that couldn't be sent
clearUnsentRequests(now);
clearUnsentRequests();
}
@ -228,11 +229,13 @@ public class ConsumerNetworkClient implements Closeable {
}
private void pollUnsentRequests(long now) {
while (trySend(now))
while (trySend(now)) {
clientPoll(0, now);
now = time.milliseconds();
}
}
private void clearUnsentRequests(long now) {
private void clearUnsentRequests() {
// clear all unsent requests and fail their corresponding futures
for (Map.Entry<Node, List<ClientRequest>> requestEntry: unsent.entrySet()) {
Iterator<ClientRequest> iterator = requestEntry.getValue().iterator();
@ -273,7 +276,7 @@ public class ConsumerNetworkClient implements Closeable {
private void clientPoll(long timeout, long now) {
client.poll(timeout, now);
if (wakeup.get()) {
clearUnsentRequests(now);
clearUnsentRequests();
wakeup.set(false);
throw new ConsumerWakeupException();
}

View File

@ -276,7 +276,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
this.requestTimeoutMs);
this.requestTimeoutMs, time);
this.sender = new Sender(client,
this.metadata,
this.accumulator,

View File

@ -52,10 +52,12 @@ public class NetworkClientTest {
private int nodeId = 1;
private Cluster cluster = TestUtils.singletonCluster("test", nodeId);
private Node node = cluster.nodes().get(0);
private NetworkClient client = new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE, 0, 64 * 1024, 64 * 1024, requestTimeoutMs);
private long reconnectBackoffMsTest = 10 * 1000;
private NetworkClient client = new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE, reconnectBackoffMsTest,
64 * 1024, 64 * 1024, requestTimeoutMs, time);
private NetworkClient clientWithStaticNodes = new NetworkClient(selector, new ManualMetadataUpdater(Arrays.asList(node)),
"mock-static", Integer.MAX_VALUE, 0, 64 * 1024, 64 * 1024, requestTimeoutMs);
"mock-static", Integer.MAX_VALUE, 0, 64 * 1024, 64 * 1024, requestTimeoutMs, time);
@Before
public void setup() {
@ -149,6 +151,31 @@ public class NetworkClientTest {
assertEquals(node.idString(), disconnectedNode);
}
@Test
public void testLeastLoadedNode() {
Node leastNode = null;
client.ready(node, time.milliseconds());
awaitReady(client, node);
client.poll(1, time.milliseconds());
assertTrue("The client should be ready", client.isReady(node, time.milliseconds()));
// leastloadednode should be our single node
leastNode = client.leastLoadedNode(time.milliseconds());
assertEquals("There should be one leastloadednode", leastNode.id(), node.id());
// sleep for longer than reconnect backoff
time.sleep(reconnectBackoffMsTest);
// CLOSE node
selector.close(node.idString());
client.poll(1, time.milliseconds());
assertFalse("After we forced the disconnection the client is no longer ready.", client.ready(node, time.milliseconds()));
leastNode = client.leastLoadedNode(time.milliseconds());
assertEquals("There should be NO leastloadednode", leastNode, null);
}
private static class TestCallbackHandler implements RequestCompletionHandler {
public boolean executed = false;
public ClientResponse response;

View File

@ -55,6 +55,12 @@ public class MockSelector implements Selectable {
@Override
public void close(String id) {
this.disconnected.add(id);
for (int i = 0; i < this.connected.size(); i++) {
if (this.connected.get(i).equals(id)) {
this.connected.remove(i);
break;
}
}
}
public void clear() {

View File

@ -106,7 +106,8 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
0,
Selectable.USE_DEFAULT_BUFFER_SIZE,
Selectable.USE_DEFAULT_BUFFER_SIZE,
config.requestTimeoutMs
config.requestTimeoutMs,
time
)
}
val threadName = threadNamePrefix match {

View File

@ -317,7 +317,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
0,
Selectable.USE_DEFAULT_BUFFER_SIZE,
Selectable.USE_DEFAULT_BUFFER_SIZE,
config.requestTimeoutMs)
config.requestTimeoutMs,
kafkaMetricsTime)
}
var shutdownSucceeded: Boolean = false

View File

@ -84,7 +84,8 @@ class ReplicaFetcherThread(name: String,
0,
Selectable.USE_DEFAULT_BUFFER_SIZE,
brokerConfig.replicaSocketReceiveBufferBytes,
brokerConfig.requestTimeoutMs
brokerConfig.requestTimeoutMs,
time
)
}