mirror of https://github.com/apache/kafka.git
KAFKA-13879: Reconnect exponential backoff is ineffective in some cases (#12131)
When a client connects to a SSL listener using PLAINTEXT security protocol, after the TCP connection is setup, the client considers the channel setup is complete. In reality the channel setup is not complete yet. The client then resets reconnect exponential backoff and issues API version request. Since the broker expects SSL handshake, the API version request will cause the connection to disconnect. Client reconnects without exponential backoff since it has been reset. This commit removes the reset of reconnect exponential backoff when sending API version request. In the good case where the channel setup is complete, reconnect exponential backoff will be reset when the node becomes ready, which is after getting the API version response. Inter-broker clients which do not send API version request and go directly to ready state continue to reset backoff before any successful requests. Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
This commit is contained in:
parent
989d3ce07f
commit
eeb1e702eb
|
@ -246,7 +246,6 @@ final class ClusterConnectionStates {
|
|||
public void checkingApiVersions(String id) {
|
||||
NodeConnectionState nodeState = nodeState(id);
|
||||
nodeState.state = ConnectionState.CHECKING_API_VERSIONS;
|
||||
resetReconnectBackoff(nodeState);
|
||||
resetConnectionSetupTimeout(nodeState);
|
||||
connectingNodes.remove(id);
|
||||
}
|
||||
|
|
|
@ -231,20 +231,8 @@ public class ClusterConnectionStatesTest {
|
|||
|
||||
@Test
|
||||
public void testExponentialReconnectBackoff() {
|
||||
double reconnectBackoffMaxExp = Math.log(reconnectBackoffMax / (double) Math.max(reconnectBackoffMs, 1))
|
||||
/ Math.log(reconnectBackoffExpBase);
|
||||
|
||||
// Run through 10 disconnects and check that reconnect backoff value is within expected range for every attempt
|
||||
for (int i = 0; i < 10; i++) {
|
||||
connectionStates.connecting(nodeId1, time.milliseconds(), "localhost");
|
||||
connectionStates.disconnected(nodeId1, time.milliseconds());
|
||||
// Calculate expected backoff value without jitter
|
||||
long expectedBackoff = Math.round(Math.pow(reconnectBackoffExpBase, Math.min(i, reconnectBackoffMaxExp))
|
||||
* reconnectBackoffMs);
|
||||
long currentBackoff = connectionStates.connectionDelay(nodeId1, time.milliseconds());
|
||||
assertEquals(expectedBackoff, currentBackoff, reconnectBackoffJitter * expectedBackoff);
|
||||
time.sleep(connectionStates.connectionDelay(nodeId1, time.milliseconds()) + 1);
|
||||
}
|
||||
verifyReconnectExponentialBackoff(false);
|
||||
verifyReconnectExponentialBackoff(true);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -426,4 +414,26 @@ public class ClusterConnectionStatesTest {
|
|||
this.connectionStates = new ClusterConnectionStates(reconnectBackoffMs, reconnectBackoffMax,
|
||||
connectionSetupTimeoutMs, connectionSetupTimeoutMaxMs, new LogContext(), this.multipleIPHostResolver);
|
||||
}
|
||||
|
||||
private void verifyReconnectExponentialBackoff(boolean enterCheckingApiVersionState) {
|
||||
double reconnectBackoffMaxExp = Math.log(reconnectBackoffMax / (double) Math.max(reconnectBackoffMs, 1))
|
||||
/ Math.log(reconnectBackoffExpBase);
|
||||
|
||||
connectionStates.remove(nodeId1);
|
||||
// Run through 10 disconnects and check that reconnect backoff value is within expected range for every attempt
|
||||
for (int i = 0; i < 10; i++) {
|
||||
connectionStates.connecting(nodeId1, time.milliseconds(), "localhost");
|
||||
if (enterCheckingApiVersionState) {
|
||||
connectionStates.checkingApiVersions(nodeId1);
|
||||
}
|
||||
|
||||
connectionStates.disconnected(nodeId1, time.milliseconds());
|
||||
// Calculate expected backoff value without jitter
|
||||
long expectedBackoff = Math.round(Math.pow(reconnectBackoffExpBase, Math.min(i, reconnectBackoffMaxExp))
|
||||
* reconnectBackoffMs);
|
||||
long currentBackoff = connectionStates.connectionDelay(nodeId1, time.milliseconds());
|
||||
assertEquals(expectedBackoff, currentBackoff, reconnectBackoffJitter * expectedBackoff);
|
||||
time.sleep(connectionStates.connectionDelay(nodeId1, time.milliseconds()) + 1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -82,6 +82,8 @@ public class NetworkClientTest {
|
|||
protected final long reconnectBackoffMaxMsTest = 10 * 10000;
|
||||
protected final long connectionSetupTimeoutMsTest = 5 * 1000;
|
||||
protected final long connectionSetupTimeoutMaxMsTest = 127 * 1000;
|
||||
private final int reconnectBackoffExpBase = ClusterConnectionStates.RECONNECT_BACKOFF_EXP_BASE;
|
||||
private final double reconnectBackoffJitter = ClusterConnectionStates.RECONNECT_BACKOFF_JITTER;
|
||||
private final TestMetadataUpdater metadataUpdater = new TestMetadataUpdater(Collections.singletonList(node));
|
||||
private final NetworkClient client = createNetworkClient(reconnectBackoffMaxMsTest);
|
||||
private final NetworkClient clientWithNoExponentialBackoff = createNetworkClient(reconnectBackoffMsTest);
|
||||
|
@ -831,6 +833,11 @@ public class NetworkClientTest {
|
|||
|
||||
@Test
|
||||
public void testServerDisconnectAfterInternalApiVersionRequest() throws Exception {
|
||||
final long numIterations = 5;
|
||||
double reconnectBackoffMaxExp = Math.log(reconnectBackoffMaxMsTest / (double) Math.max(reconnectBackoffMsTest, 1))
|
||||
/ Math.log(reconnectBackoffExpBase);
|
||||
for (int i = 0; i < numIterations; i++) {
|
||||
selector.clear();
|
||||
awaitInFlightApiVersionRequest();
|
||||
selector.serverDisconnect(node.idString());
|
||||
|
||||
|
@ -838,6 +845,16 @@ public class NetworkClientTest {
|
|||
List<ClientResponse> responses = client.poll(0, time.milliseconds());
|
||||
assertFalse(client.hasInFlightRequests(node.idString()));
|
||||
assertTrue(responses.isEmpty());
|
||||
|
||||
long expectedBackoff = Math.round(Math.pow(reconnectBackoffExpBase, Math.min(i, reconnectBackoffMaxExp))
|
||||
* reconnectBackoffMsTest);
|
||||
long delay = client.connectionDelay(node, time.milliseconds());
|
||||
assertEquals(expectedBackoff, delay, reconnectBackoffJitter * expectedBackoff);
|
||||
if (i == numIterations - 1) {
|
||||
break;
|
||||
}
|
||||
time.sleep(delay + 1);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
Loading…
Reference in New Issue