mirror of https://github.com/apache/kafka.git
KAFKA-12432: AdminClient should time out nodes that are never ready (#10281)
Previously, if we assigned one or more calls to a remote node, but it never became available, AdminClient would block until the calls hit their the API timeout. This was particularly unfortunate in the case where the calls could have been sent to a different node in the cluster. This PR fixes this behavior by timing out pending connections to remote nodes if they take longer than the request timeout. There are a few other small cleanups in this PR: it removes the unecessary Call#aborted, sets Call#curNode to null after the call has failed to avoid confusion when debugging or logging, and adds a "closing" boolean rather than setting newCalls to null when the client is closed. Also, it increases the log level of the log message that indicates that we timed out some calls because AdminClient closed, and simplifies the type of callsInFlight. Reviewers: Jason Gustafson <jason@confluent.io>
This commit is contained in:
parent
9af81955c4
commit
11f0ea3a5e
|
|
@ -737,7 +737,6 @@ public class KafkaAdminClient extends AdminClient {
|
||||||
private final long deadlineMs;
|
private final long deadlineMs;
|
||||||
private final NodeProvider nodeProvider;
|
private final NodeProvider nodeProvider;
|
||||||
private int tries = 0;
|
private int tries = 0;
|
||||||
private boolean aborted = false;
|
|
||||||
private Node curNode = null;
|
private Node curNode = null;
|
||||||
private long nextAllowedTryMs = 0;
|
private long nextAllowedTryMs = 0;
|
||||||
|
|
||||||
|
|
@ -756,11 +755,6 @@ public class KafkaAdminClient extends AdminClient {
|
||||||
return curNode;
|
return curNode;
|
||||||
}
|
}
|
||||||
|
|
||||||
void abortAndFail(TimeoutException timeoutException) {
|
|
||||||
this.aborted = true;
|
|
||||||
fail(time.milliseconds(), timeoutException);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Handle a failure.
|
* Handle a failure.
|
||||||
*
|
*
|
||||||
|
|
@ -772,12 +766,13 @@ public class KafkaAdminClient extends AdminClient {
|
||||||
* @param throwable The failure exception.
|
* @param throwable The failure exception.
|
||||||
*/
|
*/
|
||||||
final void fail(long now, Throwable throwable) {
|
final void fail(long now, Throwable throwable) {
|
||||||
if (aborted) {
|
if (curNode != null) {
|
||||||
// If the call was aborted while in flight due to a timeout, deliver a
|
runnable.nodeReadyDeadlines.remove(curNode);
|
||||||
// TimeoutException. In this case, we do not get any more retries - the call has
|
curNode = null;
|
||||||
// failed. We increment tries anyway in order to display an accurate log message.
|
}
|
||||||
tries++;
|
// If the admin client is closing, we can't retry.
|
||||||
failWithTimeout(now, throwable);
|
if (runnable.closing) {
|
||||||
|
handleFailure(throwable);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
// If this is an UnsupportedVersionException that we can retry, do so. Note that a
|
// If this is an UnsupportedVersionException that we can retry, do so. Note that a
|
||||||
|
|
@ -786,7 +781,7 @@ public class KafkaAdminClient extends AdminClient {
|
||||||
if ((throwable instanceof UnsupportedVersionException) &&
|
if ((throwable instanceof UnsupportedVersionException) &&
|
||||||
handleUnsupportedVersionException((UnsupportedVersionException) throwable)) {
|
handleUnsupportedVersionException((UnsupportedVersionException) throwable)) {
|
||||||
log.debug("{} attempting protocol downgrade and then retry.", this);
|
log.debug("{} attempting protocol downgrade and then retry.", this);
|
||||||
runnable.enqueue(this, now);
|
runnable.pendingCalls.add(this);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
tries++;
|
tries++;
|
||||||
|
|
@ -794,7 +789,7 @@ public class KafkaAdminClient extends AdminClient {
|
||||||
|
|
||||||
// If the call has timed out, fail.
|
// If the call has timed out, fail.
|
||||||
if (calcTimeoutMsRemainingAsInt(now, deadlineMs) < 0) {
|
if (calcTimeoutMsRemainingAsInt(now, deadlineMs) < 0) {
|
||||||
failWithTimeout(now, throwable);
|
handleTimeoutFailure(now, throwable);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
// If the exception is not retriable, fail.
|
// If the exception is not retriable, fail.
|
||||||
|
|
@ -808,17 +803,17 @@ public class KafkaAdminClient extends AdminClient {
|
||||||
}
|
}
|
||||||
// If we are out of retries, fail.
|
// If we are out of retries, fail.
|
||||||
if (tries > maxRetries) {
|
if (tries > maxRetries) {
|
||||||
failWithTimeout(now, throwable);
|
handleTimeoutFailure(now, throwable);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (log.isDebugEnabled()) {
|
if (log.isDebugEnabled()) {
|
||||||
log.debug("{} failed: {}. Beginning retry #{}",
|
log.debug("{} failed: {}. Beginning retry #{}",
|
||||||
this, prettyPrintException(throwable), tries);
|
this, prettyPrintException(throwable), tries);
|
||||||
}
|
}
|
||||||
runnable.enqueue(this, now);
|
runnable.pendingCalls.add(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void failWithTimeout(long now, Throwable cause) {
|
private void handleTimeoutFailure(long now, Throwable cause) {
|
||||||
if (log.isDebugEnabled()) {
|
if (log.isDebugEnabled()) {
|
||||||
log.debug("{} timed out at {} after {} attempt(s)", this, now, tries,
|
log.debug("{} timed out at {} after {} attempt(s)", this, now, tries,
|
||||||
new Exception(prettyPrintException(cause)));
|
new Exception(prettyPrintException(cause)));
|
||||||
|
|
@ -969,7 +964,7 @@ public class KafkaAdminClient extends AdminClient {
|
||||||
* Maps node ID strings to calls that have been sent.
|
* Maps node ID strings to calls that have been sent.
|
||||||
* Only accessed from this thread.
|
* Only accessed from this thread.
|
||||||
*/
|
*/
|
||||||
private final Map<String, List<Call>> callsInFlight = new HashMap<>();
|
private final Map<String, Call> callsInFlight = new HashMap<>();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Maps correlation IDs to calls that have been sent.
|
* Maps correlation IDs to calls that have been sent.
|
||||||
|
|
@ -979,9 +974,20 @@ public class KafkaAdminClient extends AdminClient {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Pending calls. Protected by the object monitor.
|
* Pending calls. Protected by the object monitor.
|
||||||
* This will be null only if the thread has shut down.
|
|
||||||
*/
|
*/
|
||||||
private List<Call> newCalls = new LinkedList<>();
|
private final List<Call> newCalls = new LinkedList<>();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Maps node ID strings to their readiness deadlines. A node will appear in this
|
||||||
|
* map if there are callsToSend which are waiting for it to be ready, and there
|
||||||
|
* are no calls in flight using the node.
|
||||||
|
*/
|
||||||
|
private final Map<Node, Long> nodeReadyDeadlines = new HashMap<>();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Whether the admin client is closing.
|
||||||
|
*/
|
||||||
|
private volatile boolean closing = false;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Time out the elements in the pendingCalls list which are expired.
|
* Time out the elements in the pendingCalls list which are expired.
|
||||||
|
|
@ -1017,10 +1023,21 @@ public class KafkaAdminClient extends AdminClient {
|
||||||
* users of AdminClient who will also take the lock to add new calls.
|
* users of AdminClient who will also take the lock to add new calls.
|
||||||
*/
|
*/
|
||||||
private synchronized void drainNewCalls() {
|
private synchronized void drainNewCalls() {
|
||||||
if (!newCalls.isEmpty()) {
|
transitionToPendingAndClearList(newCalls);
|
||||||
pendingCalls.addAll(newCalls);
|
}
|
||||||
newCalls.clear();
|
|
||||||
|
/**
|
||||||
|
* Add some calls to pendingCalls, and then clear the input list.
|
||||||
|
* Also clears Call#curNode.
|
||||||
|
*
|
||||||
|
* @param calls The calls to add.
|
||||||
|
*/
|
||||||
|
private void transitionToPendingAndClearList(List<Call> calls) {
|
||||||
|
for (Call call : calls) {
|
||||||
|
call.curNode = null;
|
||||||
|
pendingCalls.add(call);
|
||||||
}
|
}
|
||||||
|
calls.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -1089,29 +1106,63 @@ public class KafkaAdminClient extends AdminClient {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
Node node = entry.getKey();
|
Node node = entry.getKey();
|
||||||
|
if (callsInFlight.containsKey(node.idString())) {
|
||||||
|
log.trace("Still waiting for other calls to finish on node {}.", node);
|
||||||
|
nodeReadyDeadlines.remove(node);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
if (!client.ready(node, now)) {
|
if (!client.ready(node, now)) {
|
||||||
|
Long deadline = nodeReadyDeadlines.get(node);
|
||||||
|
if (deadline != null) {
|
||||||
|
if (now >= deadline) {
|
||||||
|
log.info("Disconnecting from {} and revoking {} node assignment(s) " +
|
||||||
|
"because the node is taking too long to become ready.",
|
||||||
|
node.idString(), calls.size());
|
||||||
|
transitionToPendingAndClearList(calls);
|
||||||
|
client.disconnect(node.idString());
|
||||||
|
nodeReadyDeadlines.remove(node);
|
||||||
|
iter.remove();
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
pollTimeout = Math.min(pollTimeout, deadline - now);
|
||||||
|
} else {
|
||||||
|
nodeReadyDeadlines.put(node, now + requestTimeoutMs);
|
||||||
|
}
|
||||||
long nodeTimeout = client.pollDelayMs(node, now);
|
long nodeTimeout = client.pollDelayMs(node, now);
|
||||||
pollTimeout = Math.min(pollTimeout, nodeTimeout);
|
pollTimeout = Math.min(pollTimeout, nodeTimeout);
|
||||||
log.trace("Client is not ready to send to {}. Must delay {} ms", node, nodeTimeout);
|
log.trace("Client is not ready to send to {}. Must delay {} ms", node, nodeTimeout);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
Call call = calls.remove(0);
|
// Subtract the time we spent waiting for the node to become ready from
|
||||||
int requestTimeoutMs = Math.min(KafkaAdminClient.this.requestTimeoutMs,
|
// the total request time.
|
||||||
calcTimeoutMsRemainingAsInt(now, call.deadlineMs));
|
int remainingRequestTime;
|
||||||
AbstractRequest.Builder<?> requestBuilder;
|
Long deadlineMs = nodeReadyDeadlines.remove(node);
|
||||||
try {
|
if (deadlineMs == null) {
|
||||||
requestBuilder = call.createRequest(requestTimeoutMs);
|
remainingRequestTime = requestTimeoutMs;
|
||||||
} catch (Throwable throwable) {
|
} else {
|
||||||
call.fail(now, new KafkaException(String.format(
|
remainingRequestTime = calcTimeoutMsRemainingAsInt(now, deadlineMs);
|
||||||
"Internal error sending %s to %s.", call.callName, node)));
|
}
|
||||||
continue;
|
while (!calls.isEmpty()) {
|
||||||
|
Call call = calls.remove(0);
|
||||||
|
int timeoutMs = Math.min(remainingRequestTime,
|
||||||
|
calcTimeoutMsRemainingAsInt(now, call.deadlineMs));
|
||||||
|
AbstractRequest.Builder<?> requestBuilder;
|
||||||
|
try {
|
||||||
|
requestBuilder = call.createRequest(timeoutMs);
|
||||||
|
} catch (Throwable t) {
|
||||||
|
call.fail(now, new KafkaException(String.format(
|
||||||
|
"Internal error sending %s to %s.", call.callName, node), t));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
ClientRequest clientRequest = client.newClientRequest(node.idString(),
|
||||||
|
requestBuilder, now, true, timeoutMs, null);
|
||||||
|
log.debug("Sending {} to {}. correlationId={}, timeoutMs={}",
|
||||||
|
requestBuilder, node, clientRequest.correlationId(), timeoutMs);
|
||||||
|
client.send(clientRequest, now);
|
||||||
|
callsInFlight.put(node.idString(), call);
|
||||||
|
correlationIdToCalls.put(clientRequest.correlationId(), call);
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
ClientRequest clientRequest = client.newClientRequest(node.idString(), requestBuilder, now,
|
|
||||||
true, requestTimeoutMs, null);
|
|
||||||
log.debug("Sending {} to {}. correlationId={}", requestBuilder, node, clientRequest.correlationId());
|
|
||||||
client.send(clientRequest, now);
|
|
||||||
getOrCreateListValue(callsInFlight, node.idString()).add(call);
|
|
||||||
correlationIdToCalls.put(clientRequest.correlationId(), call);
|
|
||||||
}
|
}
|
||||||
return pollTimeout;
|
return pollTimeout;
|
||||||
}
|
}
|
||||||
|
|
@ -1127,26 +1178,16 @@ public class KafkaAdminClient extends AdminClient {
|
||||||
*/
|
*/
|
||||||
private void timeoutCallsInFlight(TimeoutProcessor processor) {
|
private void timeoutCallsInFlight(TimeoutProcessor processor) {
|
||||||
int numTimedOut = 0;
|
int numTimedOut = 0;
|
||||||
for (Map.Entry<String, List<Call>> entry : callsInFlight.entrySet()) {
|
for (Map.Entry<String, Call> entry : callsInFlight.entrySet()) {
|
||||||
List<Call> contexts = entry.getValue();
|
Call call = entry.getValue();
|
||||||
if (contexts.isEmpty())
|
|
||||||
continue;
|
|
||||||
String nodeId = entry.getKey();
|
String nodeId = entry.getKey();
|
||||||
// We assume that the first element in the list is the earliest. So it should be the
|
|
||||||
// only one we need to check the timeout for.
|
|
||||||
Call call = contexts.get(0);
|
|
||||||
if (processor.callHasExpired(call)) {
|
if (processor.callHasExpired(call)) {
|
||||||
if (call.aborted) {
|
log.info("Disconnecting from {} due to timeout while awaiting {}", nodeId, call);
|
||||||
log.warn("Aborted call {} is still in callsInFlight.", call);
|
client.disconnect(nodeId);
|
||||||
} else {
|
numTimedOut++;
|
||||||
log.debug("Closing connection to {} due to timeout while awaiting {}", nodeId, call);
|
// We don't remove anything from the callsInFlight data structure. Because the connection
|
||||||
call.aborted = true;
|
// has been closed, the calls should be returned by the next client#poll(),
|
||||||
client.disconnect(nodeId);
|
// and handled at that point.
|
||||||
numTimedOut++;
|
|
||||||
// We don't remove anything from the callsInFlight data structure. Because the connection
|
|
||||||
// has been closed, the calls should be returned by the next client#poll(),
|
|
||||||
// and handled at that point.
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (numTimedOut > 0)
|
if (numTimedOut > 0)
|
||||||
|
|
@ -1176,8 +1217,7 @@ public class KafkaAdminClient extends AdminClient {
|
||||||
|
|
||||||
// Stop tracking this call.
|
// Stop tracking this call.
|
||||||
correlationIdToCalls.remove(correlationId);
|
correlationIdToCalls.remove(correlationId);
|
||||||
List<Call> calls = callsInFlight.get(response.destination());
|
if (!callsInFlight.remove(response.destination(), call)) {
|
||||||
if ((calls == null) || (!calls.remove(call))) {
|
|
||||||
log.error("Internal server error on {}: ignoring call {} in correlationIdToCall " +
|
log.error("Internal server error on {}: ignoring call {} in correlationIdToCall " +
|
||||||
"that did not exist in callsInFlight", response.destination(), call);
|
"that did not exist in callsInFlight", response.destination(), call);
|
||||||
continue;
|
continue;
|
||||||
|
|
@ -1226,7 +1266,8 @@ public class KafkaAdminClient extends AdminClient {
|
||||||
if (awaitingCalls.isEmpty()) {
|
if (awaitingCalls.isEmpty()) {
|
||||||
iter.remove();
|
iter.remove();
|
||||||
} else if (shouldUnassign.test(node)) {
|
} else if (shouldUnassign.test(node)) {
|
||||||
pendingCalls.addAll(awaitingCalls);
|
nodeReadyDeadlines.remove(node);
|
||||||
|
transitionToPendingAndClearList(awaitingCalls);
|
||||||
iter.remove();
|
iter.remove();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -1271,24 +1312,24 @@ public class KafkaAdminClient extends AdminClient {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
log.trace("Thread starting");
|
log.debug("Thread starting");
|
||||||
try {
|
try {
|
||||||
processRequests();
|
processRequests();
|
||||||
} finally {
|
} finally {
|
||||||
|
closing = true;
|
||||||
AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId, metrics);
|
AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId, metrics);
|
||||||
|
|
||||||
int numTimedOut = 0;
|
int numTimedOut = 0;
|
||||||
TimeoutProcessor timeoutProcessor = new TimeoutProcessor(Long.MAX_VALUE);
|
TimeoutProcessor timeoutProcessor = new TimeoutProcessor(Long.MAX_VALUE);
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
numTimedOut += timeoutProcessor.handleTimeouts(newCalls, "The AdminClient thread has exited.");
|
numTimedOut += timeoutProcessor.handleTimeouts(newCalls, "The AdminClient thread has exited.");
|
||||||
newCalls = null;
|
|
||||||
}
|
}
|
||||||
numTimedOut += timeoutProcessor.handleTimeouts(pendingCalls, "The AdminClient thread has exited.");
|
numTimedOut += timeoutProcessor.handleTimeouts(pendingCalls, "The AdminClient thread has exited.");
|
||||||
numTimedOut += timeoutCallsToSend(timeoutProcessor);
|
numTimedOut += timeoutCallsToSend(timeoutProcessor);
|
||||||
numTimedOut += timeoutProcessor.handleTimeouts(correlationIdToCalls.values(),
|
numTimedOut += timeoutProcessor.handleTimeouts(correlationIdToCalls.values(),
|
||||||
"The AdminClient thread has exited.");
|
"The AdminClient thread has exited.");
|
||||||
if (numTimedOut > 0) {
|
if (numTimedOut > 0) {
|
||||||
log.debug("Timed out {} remaining operation(s).", numTimedOut);
|
log.info("Timed out {} remaining operation(s) during close.", numTimedOut);
|
||||||
}
|
}
|
||||||
closeQuietly(client, "KafkaClient");
|
closeQuietly(client, "KafkaClient");
|
||||||
closeQuietly(metrics, "Metrics");
|
closeQuietly(metrics, "Metrics");
|
||||||
|
|
@ -1342,7 +1383,7 @@ public class KafkaAdminClient extends AdminClient {
|
||||||
|
|
||||||
// Wait for network responses.
|
// Wait for network responses.
|
||||||
log.trace("Entering KafkaClient#poll(timeout={})", pollTimeout);
|
log.trace("Entering KafkaClient#poll(timeout={})", pollTimeout);
|
||||||
List<ClientResponse> responses = client.poll(pollTimeout, now);
|
List<ClientResponse> responses = client.poll(Math.max(0L, pollTimeout), now);
|
||||||
log.trace("KafkaClient#poll retrieved {} response(s)", responses.size());
|
log.trace("KafkaClient#poll retrieved {} response(s)", responses.size());
|
||||||
|
|
||||||
// unassign calls to disconnected nodes
|
// unassign calls to disconnected nodes
|
||||||
|
|
@ -1367,15 +1408,17 @@ public class KafkaAdminClient extends AdminClient {
|
||||||
void enqueue(Call call, long now) {
|
void enqueue(Call call, long now) {
|
||||||
if (call.tries > maxRetries) {
|
if (call.tries > maxRetries) {
|
||||||
log.debug("Max retries {} for {} reached", maxRetries, call);
|
log.debug("Max retries {} for {} reached", maxRetries, call);
|
||||||
call.fail(time.milliseconds(), new TimeoutException());
|
call.handleTimeoutFailure(time.milliseconds(), new TimeoutException(
|
||||||
|
"Exceeded maxRetries after " + call.tries + " tries."));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (log.isDebugEnabled()) {
|
if (log.isDebugEnabled()) {
|
||||||
log.debug("Queueing {} with a timeout {} ms from now.", call, call.deadlineMs - now);
|
log.debug("Queueing {} with a timeout {} ms from now.", call,
|
||||||
|
Math.min(requestTimeoutMs, call.deadlineMs - now));
|
||||||
}
|
}
|
||||||
boolean accepted = false;
|
boolean accepted = false;
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
if (newCalls != null) {
|
if (!closing) {
|
||||||
newCalls.add(call);
|
newCalls.add(call);
|
||||||
accepted = true;
|
accepted = true;
|
||||||
}
|
}
|
||||||
|
|
@ -1384,7 +1427,8 @@ public class KafkaAdminClient extends AdminClient {
|
||||||
client.wakeup(); // wake the thread if it is in poll()
|
client.wakeup(); // wake the thread if it is in poll()
|
||||||
} else {
|
} else {
|
||||||
log.debug("The AdminClient thread has exited. Timing out {}.", call);
|
log.debug("The AdminClient thread has exited. Timing out {}.", call);
|
||||||
call.abortAndFail(new TimeoutException("The AdminClient thread has exited."));
|
call.handleTimeoutFailure(time.milliseconds(),
|
||||||
|
new TimeoutException("The AdminClient thread has exited."));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -1399,7 +1443,8 @@ public class KafkaAdminClient extends AdminClient {
|
||||||
void call(Call call, long now) {
|
void call(Call call, long now) {
|
||||||
if (hardShutdownTimeMs.get() != INVALID_SHUTDOWN_TIME) {
|
if (hardShutdownTimeMs.get() != INVALID_SHUTDOWN_TIME) {
|
||||||
log.debug("The AdminClient is not accepting new calls. Timing out {}.", call);
|
log.debug("The AdminClient is not accepting new calls. Timing out {}.", call);
|
||||||
call.abortAndFail(new TimeoutException("The AdminClient thread is not accepting new calls."));
|
call.handleTimeoutFailure(time.milliseconds(),
|
||||||
|
new TimeoutException("The AdminClient thread is not accepting new calls."));
|
||||||
} else {
|
} else {
|
||||||
enqueue(call, now);
|
enqueue(call, now);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -38,6 +38,7 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Queue;
|
import java.util.Queue;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.ConcurrentLinkedDeque;
|
import java.util.concurrent.ConcurrentLinkedDeque;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
|
@ -84,6 +85,7 @@ public class MockClient implements KafkaClient {
|
||||||
private volatile NodeApiVersions nodeApiVersions = NodeApiVersions.create();
|
private volatile NodeApiVersions nodeApiVersions = NodeApiVersions.create();
|
||||||
private volatile int numBlockingWakeups = 0;
|
private volatile int numBlockingWakeups = 0;
|
||||||
private volatile boolean active = true;
|
private volatile boolean active = true;
|
||||||
|
private volatile CompletableFuture<String> disconnectFuture;
|
||||||
|
|
||||||
public MockClient(Time time) {
|
public MockClient(Time time) {
|
||||||
this(time, new NoOpMetadataUpdater());
|
this(time, new NoOpMetadataUpdater());
|
||||||
|
|
@ -169,6 +171,10 @@ public class MockClient implements KafkaClient {
|
||||||
return authenticationErrors.get(node);
|
return authenticationErrors.get(node);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setDisconnectFuture(CompletableFuture<String> disconnectFuture) {
|
||||||
|
this.disconnectFuture = disconnectFuture;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void disconnect(String node) {
|
public void disconnect(String node) {
|
||||||
long now = time.milliseconds();
|
long now = time.milliseconds();
|
||||||
|
|
@ -182,6 +188,10 @@ public class MockClient implements KafkaClient {
|
||||||
iter.remove();
|
iter.remove();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
CompletableFuture<String> curDisconnectFuture = disconnectFuture;
|
||||||
|
if (curDisconnectFuture != null) {
|
||||||
|
curDisconnectFuture.complete(node);
|
||||||
|
}
|
||||||
connectionState(node).disconnect();
|
connectionState(node).disconnect();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -198,6 +198,7 @@ import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.OptionalInt;
|
import java.util.OptionalInt;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.Semaphore;
|
import java.util.concurrent.Semaphore;
|
||||||
|
|
@ -5307,6 +5308,71 @@ public class KafkaAdminClientTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test that if the client can obtain a node assignment, but can't send to the given
|
||||||
|
* node, it will disconnect and try a different node.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testClientSideTimeoutAfterFailureToSend() throws Exception {
|
||||||
|
Cluster cluster = mockCluster(3, 0);
|
||||||
|
CompletableFuture<String> disconnectFuture = new CompletableFuture<>();
|
||||||
|
MockTime time = new MockTime();
|
||||||
|
try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster,
|
||||||
|
newStrMap(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "1",
|
||||||
|
AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "100000",
|
||||||
|
AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, "1"))) {
|
||||||
|
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
|
||||||
|
for (Node node : cluster.nodes()) {
|
||||||
|
env.kafkaClient().delayReady(node, 100);
|
||||||
|
}
|
||||||
|
env.kafkaClient().setDisconnectFuture(disconnectFuture);
|
||||||
|
final ListTopicsResult result = env.adminClient().listTopics();
|
||||||
|
env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
|
||||||
|
log.debug("Advancing clock by 25 ms to trigger client-side disconnect.");
|
||||||
|
time.sleep(25);
|
||||||
|
disconnectFuture.get();
|
||||||
|
log.debug("Enabling nodes to send requests again.");
|
||||||
|
for (Node node : cluster.nodes()) {
|
||||||
|
env.kafkaClient().delayReady(node, 0);
|
||||||
|
}
|
||||||
|
time.sleep(5);
|
||||||
|
log.info("Waiting for result.");
|
||||||
|
assertEquals(0, result.listings().get().size());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test that if the client can send to a node, but doesn't receive a response, it will
|
||||||
|
* disconnect and try a different node.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testClientSideTimeoutAfterFailureToReceiveResponse() throws Exception {
|
||||||
|
Cluster cluster = mockCluster(3, 0);
|
||||||
|
CompletableFuture<String> disconnectFuture = new CompletableFuture<>();
|
||||||
|
MockTime time = new MockTime();
|
||||||
|
try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster,
|
||||||
|
newStrMap(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "1",
|
||||||
|
AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "100000",
|
||||||
|
AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, "1"))) {
|
||||||
|
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
|
||||||
|
env.kafkaClient().setDisconnectFuture(disconnectFuture);
|
||||||
|
final ListTopicsResult result = env.adminClient().listTopics();
|
||||||
|
while (true) {
|
||||||
|
time.sleep(1);
|
||||||
|
try {
|
||||||
|
disconnectFuture.get(1, TimeUnit.MICROSECONDS);
|
||||||
|
break;
|
||||||
|
} catch (java.util.concurrent.TimeoutException e) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertFalse(result.future.isDone());
|
||||||
|
env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
|
||||||
|
log.debug("Advancing clock by 10 ms to trigger client-side retry.");
|
||||||
|
time.sleep(10);
|
||||||
|
assertEquals(0, result.listings().get().size());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private UnregisterBrokerResponse prepareUnregisterBrokerResponse(Errors error, int throttleTimeMs) {
|
private UnregisterBrokerResponse prepareUnregisterBrokerResponse(Errors error, int throttleTimeMs) {
|
||||||
return new UnregisterBrokerResponse(new UnregisterBrokerResponseData()
|
return new UnregisterBrokerResponse(new UnregisterBrokerResponseData()
|
||||||
.setErrorCode(error.code())
|
.setErrorCode(error.code())
|
||||||
|
|
|
||||||
|
|
@ -948,6 +948,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
|
||||||
def testCallInFlightTimeouts(): Unit = {
|
def testCallInFlightTimeouts(): Unit = {
|
||||||
val config = createConfig
|
val config = createConfig
|
||||||
config.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "100000000")
|
config.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "100000000")
|
||||||
|
config.put(AdminClientConfig.RETRIES_CONFIG, "0")
|
||||||
val factory = new KafkaAdminClientTest.FailureInjectingTimeoutProcessorFactory()
|
val factory = new KafkaAdminClientTest.FailureInjectingTimeoutProcessorFactory()
|
||||||
client = KafkaAdminClientTest.createInternal(new AdminClientConfig(config), factory)
|
client = KafkaAdminClientTest.createInternal(new AdminClientConfig(config), factory)
|
||||||
val future = client.createTopics(Seq("mytopic", "mytopic2").map(new NewTopic(_, 1, 1.toShort)).asJava,
|
val future = client.createTopics(Seq("mytopic", "mytopic2").map(new NewTopic(_, 1, 1.toShort)).asJava,
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue