Potential fix for flaky STOMP integration tests
When ReactorNetty2StompBrokerRelayIntegrationTests fail, typically there are multiple exceptions "Connection refused: /127.0.0.1:61613" that appear after we've conneted, sent CONNECT, and expecting CONNECTED, but that does not come within the 10 second timeout. 61613 is the default port for STOMP. However, in all integration tests we start ActiveMQ with port 0 which results in a random port. Moreover, the stacktrace is for Netty 4 (not 5), and the eventloop thread id's are different than the one where the connection to the correct, random port was established. The suspicion is that these are log messages from MessageBrokerConfigurationTests which focuses on testing configuration but nevertheless as a bean starts and attempts to connect to the default port and fails. Perhaps those attempts to connect on the default port somehow affect the ActiveMQ server, and it stops responding. This change adds a no-op TcpClient in MessageBrokerConfigurationTests to avoid unnecessary attempts to connect that are not needed. See gh-29287
This commit is contained in:
parent
e737980033
commit
bd66c70b2b
|
@ -20,6 +20,7 @@ import java.util.ArrayList;
|
|||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
@ -69,6 +70,9 @@ import org.springframework.messaging.support.AbstractSubscribableChannel;
|
|||
import org.springframework.messaging.support.ChannelInterceptor;
|
||||
import org.springframework.messaging.support.ExecutorSubscribableChannel;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.messaging.tcp.ReconnectStrategy;
|
||||
import org.springframework.messaging.tcp.TcpConnectionHandler;
|
||||
import org.springframework.messaging.tcp.TcpOperations;
|
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||
import org.springframework.stereotype.Controller;
|
||||
import org.springframework.util.AntPathMatcher;
|
||||
|
@ -622,7 +626,9 @@ public class MessageBrokerConfigurationTests {
|
|||
|
||||
@Override
|
||||
public void configureMessageBroker(MessageBrokerRegistry registry) {
|
||||
registry.enableStompBrokerRelay("/topic", "/queue").setAutoStartup(true)
|
||||
registry.enableStompBrokerRelay("/topic", "/queue")
|
||||
.setAutoStartup(true)
|
||||
.setTcpClient(new NoOpTcpClient())
|
||||
.setUserDestinationBroadcast("/topic/unresolved-user-destination")
|
||||
.setUserRegistryBroadcast("/topic/simp-user-registry");
|
||||
}
|
||||
|
@ -787,4 +793,24 @@ public class MessageBrokerConfigurationTests {
|
|||
private static class CustomThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
|
||||
}
|
||||
|
||||
|
||||
private static class NoOpTcpClient implements TcpOperations<byte[]> {
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> connectAsync(TcpConnectionHandler<byte[]> handler) {
|
||||
return CompletableFuture.completedFuture(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> connectAsync(TcpConnectionHandler<byte[]> handler, ReconnectStrategy strategy) {
|
||||
return CompletableFuture.completedFuture(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> shutdownAsync() {
|
||||
return CompletableFuture.completedFuture(null);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -228,13 +228,14 @@ public abstract class AbstractStompBrokerRelayIntegrationTests {
|
|||
this.relay.handleMessage(subscribe.message);
|
||||
this.responseHandler.expectMessages(subscribe);
|
||||
|
||||
MessageExchange error = MessageExchangeBuilder.error(sess1).build();
|
||||
stopActiveMqBrokerAndAwait();
|
||||
this.responseHandler.expectMessages(error);
|
||||
|
||||
MessageExchange error = MessageExchangeBuilder.error(sess1).build();
|
||||
this.responseHandler.expectMessages(error);
|
||||
this.eventPublisher.expectBrokerAvailabilityEvent(false);
|
||||
|
||||
startActiveMQBroker();
|
||||
|
||||
this.eventPublisher.expectBrokerAvailabilityEvent(true);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue