Reject openConnection attempt while closing (#86315)
Today while the `ClusterConnectionManager` is closing it will reject attempts to open _managed_ connections (i.e. using `connectToNode`), but it still permits ad-hoc connections (i.e. using `openConnection`). This commit extends the existing refcounting mechanism to cover both cases, preventing all concurrent connection attempts while shutting down. Closes #86249 Relates #77539
This commit is contained in:
parent
6b8e141272
commit
22136f056c
|
@ -0,0 +1,6 @@
|
||||||
|
pr: 86315
|
||||||
|
summary: Reject `openConnection` attempt while closing
|
||||||
|
area: Network
|
||||||
|
type: bug
|
||||||
|
issues:
|
||||||
|
- 86249
|
|
@ -75,7 +75,20 @@ public class ClusterConnectionManager implements ConnectionManager {
|
||||||
@Override
|
@Override
|
||||||
public void openConnection(DiscoveryNode node, ConnectionProfile connectionProfile, ActionListener<Transport.Connection> listener) {
|
public void openConnection(DiscoveryNode node, ConnectionProfile connectionProfile, ActionListener<Transport.Connection> listener) {
|
||||||
ConnectionProfile resolvedProfile = ConnectionProfile.resolveConnectionProfile(connectionProfile, defaultProfile);
|
ConnectionProfile resolvedProfile = ConnectionProfile.resolveConnectionProfile(connectionProfile, defaultProfile);
|
||||||
internalOpenConnection(node, resolvedProfile, listener);
|
if (connectingRefCounter.tryIncRef()) {
|
||||||
|
var success = false;
|
||||||
|
final var release = new RunOnce(connectingRefCounter::decRef);
|
||||||
|
try {
|
||||||
|
internalOpenConnection(node, resolvedProfile, ActionListener.runBefore(listener, release::run));
|
||||||
|
success = true;
|
||||||
|
} finally {
|
||||||
|
if (success == false) {
|
||||||
|
release.run();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
listener.onFailure(new ConnectTransportException(node, "connection manager is closed"));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -124,7 +137,7 @@ public class ClusterConnectionManager implements ConnectionManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (connectingRefCounter.tryIncRef() == false) {
|
if (connectingRefCounter.tryIncRef() == false) {
|
||||||
listener.onFailure(new IllegalStateException("connection manager is closed"));
|
listener.onFailure(new ConnectTransportException(node, "connection manager is closed"));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -39,6 +39,7 @@ import java.util.concurrent.BrokenBarrierException;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.CyclicBarrier;
|
import java.util.concurrent.CyclicBarrier;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.Semaphore;
|
import java.util.concurrent.Semaphore;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
@ -46,6 +47,7 @@ import java.util.concurrent.atomic.AtomicReference;
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
import static org.elasticsearch.test.ActionListenerUtils.anyActionListener;
|
import static org.elasticsearch.test.ActionListenerUtils.anyActionListener;
|
||||||
|
import static org.hamcrest.Matchers.containsString;
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
import static org.hamcrest.Matchers.notNullValue;
|
import static org.hamcrest.Matchers.notNullValue;
|
||||||
import static org.mockito.ArgumentMatchers.any;
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
|
@ -490,6 +492,27 @@ public class ClusterConnectionManagerTests extends ESTestCase {
|
||||||
assertEquals(0, nodeDisconnectedCount.get());
|
assertEquals(0, nodeDisconnectedCount.get());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testConnectAfterClose() {
|
||||||
|
connectionManager.close();
|
||||||
|
final var node = new DiscoveryNode("", new TransportAddress(InetAddress.getLoopbackAddress(), 0), Version.CURRENT);
|
||||||
|
|
||||||
|
final var openConnectionFuture = new PlainActionFuture<Transport.Connection>();
|
||||||
|
connectionManager.openConnection(node, connectionProfile, openConnectionFuture);
|
||||||
|
assertTrue(openConnectionFuture.isDone());
|
||||||
|
assertThat(
|
||||||
|
expectThrows(ExecutionException.class, ConnectTransportException.class, openConnectionFuture::get).getMessage(),
|
||||||
|
containsString("connection manager is closed")
|
||||||
|
);
|
||||||
|
|
||||||
|
final var connectToNodeFuture = new PlainActionFuture<Releasable>();
|
||||||
|
connectionManager.connectToNode(node, connectionProfile, (c, p, l) -> fail("should not be called"), connectToNodeFuture);
|
||||||
|
assertTrue(connectToNodeFuture.isDone());
|
||||||
|
assertThat(
|
||||||
|
expectThrows(ExecutionException.class, ConnectTransportException.class, connectToNodeFuture::get).getMessage(),
|
||||||
|
containsString("connection manager is closed")
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
private static class TestConnect extends CloseableConnection {
|
private static class TestConnect extends CloseableConnection {
|
||||||
|
|
||||||
private final DiscoveryNode node;
|
private final DiscoveryNode node;
|
||||||
|
|
Loading…
Reference in New Issue