Potential fix for flaky STOMP integration tests

When ReactorNetty2StompBrokerRelayIntegrationTests fail, typically there
are multiple exceptions "Connection refused: /127.0.0.1:61613" that
appear after we've conneted, sent CONNECT, and expecting CONNECTED, but
that does not come within the 10 second timeout.

61613 is the default port for STOMP. However, in all integration tests
we start ActiveMQ with port 0 which results in a random port. Moreover,
the stacktrace is for Netty 4 (not 5), and the eventloop thread id's
are different than the one where the connection to the correct, random
port was established.

The suspicion is that these are log messages from
MessageBrokerConfigurationTests which focuses on testing configuration
but nevertheless as a bean starts and attempts to connect to the default
port and fails. Perhaps those attempts to connect on the default port
somehow affect the ActiveMQ server, and it stops responding.

This change adds a no-op TcpClient in MessageBrokerConfigurationTests
to avoid unnecessary attempts to connect that are not needed.

See gh-29287
This commit is contained in:
rstoyanchev 2023-05-10 11:42:45 +01:00
parent e737980033
commit bd66c70b2b
2 changed files with 30 additions and 3 deletions

View File

@ -20,6 +20,7 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import org.junit.jupiter.api.Test;
@ -69,6 +70,9 @@ import org.springframework.messaging.support.AbstractSubscribableChannel;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.ExecutorSubscribableChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.messaging.tcp.ReconnectStrategy;
import org.springframework.messaging.tcp.TcpConnectionHandler;
import org.springframework.messaging.tcp.TcpOperations;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Controller;
import org.springframework.util.AntPathMatcher;
@ -622,7 +626,9 @@ public class MessageBrokerConfigurationTests {
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableStompBrokerRelay("/topic", "/queue").setAutoStartup(true)
registry.enableStompBrokerRelay("/topic", "/queue")
.setAutoStartup(true)
.setTcpClient(new NoOpTcpClient())
.setUserDestinationBroadcast("/topic/unresolved-user-destination")
.setUserRegistryBroadcast("/topic/simp-user-registry");
}
@ -787,4 +793,24 @@ public class MessageBrokerConfigurationTests {
private static class CustomThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
}
private static class NoOpTcpClient implements TcpOperations<byte[]> {
@Override
public CompletableFuture<Void> connectAsync(TcpConnectionHandler<byte[]> handler) {
return CompletableFuture.completedFuture(null);
}
@Override
public CompletableFuture<Void> connectAsync(TcpConnectionHandler<byte[]> handler, ReconnectStrategy strategy) {
return CompletableFuture.completedFuture(null);
}
@Override
public CompletableFuture<Void> shutdownAsync() {
return CompletableFuture.completedFuture(null);
}
}
}

View File

@ -228,13 +228,14 @@ public abstract class AbstractStompBrokerRelayIntegrationTests {
this.relay.handleMessage(subscribe.message);
this.responseHandler.expectMessages(subscribe);
MessageExchange error = MessageExchangeBuilder.error(sess1).build();
stopActiveMqBrokerAndAwait();
this.responseHandler.expectMessages(error);
MessageExchange error = MessageExchangeBuilder.error(sess1).build();
this.responseHandler.expectMessages(error);
this.eventPublisher.expectBrokerAvailabilityEvent(false);
startActiveMQBroker();
this.eventPublisher.expectBrokerAvailabilityEvent(true);
}