KAFKA-10243; ConcurrentModificationException while processing connection setup timeouts (#8990)

This PR fixes a bug introduced in #8683.

While processing connection set up timeouts, we are iterating through the connecting nodes to process timeouts and we disconnect within the loop, removing the entry from the set in the loop that it iterating over the set. That raises a ConcurrentModificationException exception. The current unit test did not catch this because it was using only one node.

Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
This commit is contained in:
David Jacot 2020-07-07 17:48:44 +02:00 committed by GitHub
parent 18f2589c1e
commit 47cbbf2752
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 80 additions and 16 deletions

View File

@ -19,6 +19,7 @@ package org.apache.kafka.clients;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.utils.ExponentialBackoff;
import org.apache.kafka.common.utils.LogContext;
@ -405,7 +406,8 @@ final class ClusterConnectionStates {
/**
* Get the id set of nodes which are in CONNECTING state
*/
public Set<String> connectingNodes() {
// package private for testing only
Set<String> connectingNodes() {
return this.connectingNodes;
}
@ -440,6 +442,16 @@ final class ClusterConnectionStates {
return now - lastConnectAttemptMs(id) > connectionSetupTimeoutMs(id);
}
/**
* Return the Set of nodes whose connection setup has timed out.
* @param now the current time in ms
*/
public Set<String> nodesWithConnectionSetupTimeout(long now) {
return connectingNodes.stream()
.filter(id -> isConnectionSetupTimeout(id, now))
.collect(Collectors.toSet());
}
/**
* The state of our connection to a node.
*/

View File

@ -818,17 +818,15 @@ public class NetworkClient implements KafkaClient {
* @param now The current time
*/
private void handleTimedOutConnections(List<ClientResponse> responses, long now) {
Set<String> connectingNodes = connectionStates.connectingNodes();
for (String nodeId : connectingNodes) {
if (connectionStates.isConnectionSetupTimeout(nodeId, now)) {
this.selector.close(nodeId);
log.debug(
"Disconnecting from node {} due to socket connection setup timeout. " +
"The timeout value is {} ms.",
nodeId,
connectionStates.connectionSetupTimeoutMs(nodeId));
processDisconnection(responses, nodeId, now, ChannelState.LOCAL_CLOSE);
}
Set<String> nodes = connectionStates.nodesWithConnectionSetupTimeout(now);
for (String nodeId : nodes) {
this.selector.close(nodeId);
log.debug(
"Disconnecting from node {} due to socket connection setup timeout. " +
"The timeout value is {} ms.",
nodeId,
connectionStates.connectionSetupTimeoutMs(nodeId));
processDisconnection(responses, nodeId, now, ChannelState.LOCAL_CLOSE);
}
}

View File

@ -29,6 +29,7 @@ import java.lang.reflect.Method;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Set;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
@ -48,6 +49,7 @@ public class ClusterConnectionStatesTest {
private final double connectionSetupTimeoutJitter = ClusterConnectionStates.CONNECTION_SETUP_TIMEOUT_JITTER;
private final String nodeId1 = "1001";
private final String nodeId2 = "2002";
private final String nodeId3 = "3003";
private final String hostTwoIps = "kafka.apache.org";
private ClusterConnectionStates connectionStates;
@ -365,4 +367,47 @@ public class ClusterConnectionStatesTest {
connectionSetupTimeoutMs * connectionSetupTimeoutJitter);
assertTrue(connectionStates.connectingNodes().contains(nodeId1));
}
@Test
public void testTimedOutConnections() {
// Initiate two connections
connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT);
connectionStates.connecting(nodeId2, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT);
// Expect no timed out connections
assertEquals(0, connectionStates.nodesWithConnectionSetupTimeout(time.milliseconds()).size());
// Advance time by half of the connection setup timeout
time.sleep(connectionSetupTimeoutMs / 2);
// Initiate a third connection
connectionStates.connecting(nodeId3, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT);
// Advance time beyond the connection setup timeout (+ max jitter) for the first two connections
time.sleep((long) (connectionSetupTimeoutMs / 2 + connectionSetupTimeoutMs * connectionSetupTimeoutJitter));
// Expect two timed out connections.
Set<String> timedOutConnections = connectionStates.nodesWithConnectionSetupTimeout(time.milliseconds());
assertEquals(2, timedOutConnections.size());
assertTrue(timedOutConnections.contains(nodeId1));
assertTrue(timedOutConnections.contains(nodeId2));
// Disconnect the first two connections
connectionStates.disconnected(nodeId1, time.milliseconds());
connectionStates.disconnected(nodeId2, time.milliseconds());
// Advance time beyond the connection setup timeout (+ max jitter) for the third connections
time.sleep((long) (connectionSetupTimeoutMs / 2 + connectionSetupTimeoutMs * connectionSetupTimeoutJitter));
// Expect one timed out connection
timedOutConnections = connectionStates.nodesWithConnectionSetupTimeout(time.milliseconds());
assertEquals(1, timedOutConnections.size());
assertTrue(timedOutConnections.contains(nodeId3));
// Disconnect the third connection
connectionStates.disconnected(nodeId3, time.milliseconds());
// Expect no timed out connections
assertEquals(0, connectionStates.nodesWithConnectionSetupTimeout(time.milliseconds()).size());
}
}

View File

@ -469,19 +469,28 @@ public class NetworkClientTest {
@Test
public void testConnectionSetupTimeout() {
client.ready(node, time.milliseconds());
selector.serverConnectionBlocked(node.idString());
// Use two nodes to ensure that the logic iterate over a set of more than one
// element. ConcurrentModificationException is not triggered otherwise.
final Cluster cluster = TestUtils.clusterWith(2);
final Node node0 = cluster.nodeById(0);
final Node node1 = cluster.nodeById(1);
client.ready(node0, time.milliseconds());
selector.serverConnectionBlocked(node0.idString());
client.ready(node1, time.milliseconds());
selector.serverConnectionBlocked(node1.idString());
client.poll(0, time.milliseconds());
assertFalse(
"The connection should not fail before the socket connection setup timeout elapsed",
"The connections should not fail before the socket connection setup timeout elapsed",
client.connectionFailed(node)
);
time.sleep((long) (connectionSetupTimeoutMsTest * 1.2) + 1);
client.poll(0, time.milliseconds());
assertTrue(
"Expected the connection to fail due to the socket connection setup timeout",
"Expected the connections to fail due to the socket connection setup timeout",
client.connectionFailed(node)
);
}