Merge pull request #14240 from rabbitmq/stream-fix-test-flake-2
Trigger a 4.2.x alpha release build / trigger_alpha_build (push) Waiting to run Details
Test (make) / Build and Xref (1.18, 26) (push) Waiting to run Details
Test (make) / Build and Xref (1.18, 27) (push) Waiting to run Details
Test (make) / Build and Xref (1.18, 28) (push) Waiting to run Details
Test (make) / Test (1.18, 28, khepri) (push) Waiting to run Details
Test (make) / Test (1.18, 28, mnesia) (push) Waiting to run Details
Test (make) / Test mixed clusters (1.18, 28, khepri) (push) Waiting to run Details
Test (make) / Test mixed clusters (1.18, 28, mnesia) (push) Waiting to run Details
Test (make) / Type check (1.18, 28) (push) Waiting to run Details

Increase timeouts and improve error logging in stream test
This commit is contained in:
Arnaud Cogoluègnes 2025-07-16 11:34:28 +00:00 committed by GitHub
commit f8eac1dc92
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 136 additions and 78 deletions

View File

@ -21,10 +21,17 @@ connect(Config, Node) ->
connect(StreamPort).
connect(StreamPort) ->
do_connect(StreamPort, #{}).
connect_pp(StreamPort, PeerProperties) ->
do_connect(StreamPort, PeerProperties).
do_connect(StreamPort, PeerProperties) ->
{ok, Sock} = gen_tcp:connect("localhost", StreamPort, [{active, false}, {mode, binary}]),
C0 = rabbit_stream_core:init(0),
PeerPropertiesFrame = rabbit_stream_core:frame({request, 1, {peer_properties, #{}}}),
PeerPropertiesFrame = rabbit_stream_core:frame({request, 1, {peer_properties,
PeerProperties}}),
ok = gen_tcp:send(Sock, PeerPropertiesFrame),
{{response, 1, {peer_properties, _, _}}, C1} = receive_stream_commands(Sock, C0),
@ -78,8 +85,12 @@ delete_publisher(Sock, C0, PublisherId) ->
subscribe(Sock, C0, Stream, SubscriptionId, InitialCredit) ->
subscribe(Sock, C0, Stream, SubscriptionId, InitialCredit, #{}).
subscribe(Sock, C0, Stream, SubscriptionId, InitialCredit, Props) ->
Cmd = {subscribe, SubscriptionId, Stream, _OffsetSpec = first,
subscribe(Sock, C0, Stream, SubscriptionId, InitialCredit, Props, first).
subscribe(Sock, C0, Stream, SubscriptionId, InitialCredit, Props, OffsetSpec) ->
Cmd = {subscribe, SubscriptionId, Stream, OffsetSpec,
InitialCredit, Props},
SubscribeFrame = rabbit_stream_core:frame({request, 1, Cmd}),
ok = gen_tcp:send(Sock, SubscribeFrame),

View File

@ -2344,8 +2344,8 @@ handle_frame_post_auth(Transport,
case {is_binary(Host), is_integer(Port)} of
{true, true} -> Acc#{Node => {Host, Port}};
_ ->
rabbit_log:warning("Error when retrieving broker metadata: ~tp ~tp",
[Host, Port]),
rabbit_log:warning("Error when retrieving broker '~tp' metadata: ~tp ~tp",
[Node, Host, Port]),
Acc
end
end,

View File

@ -819,89 +819,86 @@ store_offset_requires_read_access(Config) ->
offset_lag_calculation(Config) ->
FunctionName = atom_to_binary(?FUNCTION_NAME, utf8),
T = gen_tcp,
Port = get_port(T, Config),
Opts = get_opts(T),
{ok, S} = T:connect("localhost", Port, Opts),
C = rabbit_stream_core:init(0),
Port = get_port(gen_tcp, Config),
ConnectionName = FunctionName,
test_peer_properties(T, S, #{<<"connection_name">> => ConnectionName}, C),
test_authenticate(T, S, C),
{ok, S, C0} = stream_test_utils:connect_pp(Port,
#{<<"connection_name">> => ConnectionName}),
Stream = FunctionName,
test_create_stream(T, S, Stream, C),
St = FunctionName,
{ok, C1} = stream_test_utils:create_stream(S, C0, St),
SubId = 1,
TheFuture = os:system_time(millisecond) + 60 * 60 * 1_000,
lists:foreach(fun(OffsetSpec) ->
test_subscribe(T, S, SubId, Stream,
OffsetSpec, 10, #{},
?RESPONSE_CODE_OK, C),
ConsumerInfo = consumer_offset_info(Config, ConnectionName),
?assertEqual({0, 0}, ConsumerInfo),
test_unsubscribe(T, S, SubId, C)
end, [first, last, next, 0, 1_000, {timestamp, TheFuture}]),
C2 = lists:foldl(
fun(OffsetSpec, C00) ->
{ok, C01} = stream_test_utils:subscribe(S, C00, St, SubId,
10, #{}, OffsetSpec),
ConsumerInfo = consumer_offset_info(Config, ConnectionName),
?assertEqual({0, 0}, ConsumerInfo),
{ok, C02} = stream_test_utils:unsubscribe(S, C01, SubId),
C02
end, C1, [first, last, next, 0, 1_000, {timestamp, TheFuture}]),
PublisherId = 1,
test_declare_publisher(T, S, PublisherId, Stream, C),
PubId = 1,
{ok, C3} = stream_test_utils:declare_publisher(S, C2, St, PubId),
MessageCount = 10,
Body = <<"hello">>,
lists:foreach(fun(_) ->
test_publish_confirm(T, S, PublisherId, Body, C)
end, lists:seq(1, MessageCount - 1)),
{ok, C4} = stream_test_utils:publish(S, C3, PubId, 1,
lists:duplicate(MessageCount - 1, Body)),
%% to make sure to have 2 chunks
timer:sleep(200),
test_publish_confirm(T, S, PublisherId, Body, C),
test_delete_publisher(T, S, PublisherId, C),
{ok, C5} = stream_test_utils:publish(S, C4, PubId, 1, [Body]),
{ok, C6} = stream_test_utils:delete_publisher(S, C5, PubId),
NextOffset = MessageCount,
lists:foreach(fun({OffsetSpec, ReceiveDeliver, CheckFun}) ->
test_subscribe(T, S, SubId, Stream,
OffsetSpec, 1, #{},
?RESPONSE_CODE_OK, C),
case ReceiveDeliver of
true ->
{{deliver, SubId, _}, _} = receive_commands(T, S, C);
_ ->
ok
end,
{Offset, Lag} = consumer_offset_info(Config, ConnectionName),
CheckFun(Offset, Lag),
test_unsubscribe(T, S, SubId, C)
end, [{first, true,
fun(Offset, Lag) ->
?assert(Offset >= 0, "first, at least one chunk consumed"),
?assert(Lag > 0, "first, not all messages consumed")
end},
{last, true,
fun(Offset, _Lag) ->
?assert(Offset > 0, "offset expected for last")
end},
{next, false,
fun(Offset, Lag) ->
?assertEqual(NextOffset, Offset, "next, offset should be at the end of the stream"),
?assert(Lag =:= 0, "next, offset lag should be 0")
end},
{0, true,
fun(Offset, Lag) ->
?assert(Offset >= 0, "offset spec = 0, at least one chunk consumed"),
?assert(Lag > 0, "offset spec = 0, not all messages consumed")
end},
{1_000, false,
fun(Offset, Lag) ->
?assertEqual(NextOffset, Offset, "offset spec = 1000, offset should be at the end of the stream"),
?assert(Lag =:= 0, "offset spec = 1000, offset lag should be 0")
end},
{{timestamp, TheFuture}, false,
fun(Offset, Lag) ->
?assertEqual(NextOffset, Offset, "offset spec in future, offset should be at the end of the stream"),
?assert(Lag =:= 0, "offset spec in future , offset lag should be 0")
end}]),
C7 = lists:foldl(
fun({OffsetSpec, ReceiveDeliver, CheckFun}, C00) ->
{ok, C01} = stream_test_utils:subscribe(S, C00, St, SubId,
1, #{}, OffsetSpec),
test_delete_stream(T, S, Stream, C, false),
test_close(T, S, C),
C03 = case ReceiveDeliver of
true ->
{{deliver, SubId, _}, C02} = receive_commands(S, C01),
C02;
_ ->
C01
end,
{Offset, Lag} = consumer_offset_info(Config, ConnectionName),
CheckFun(Offset, Lag),
{ok, C04} = stream_test_utils:unsubscribe(S, C03, SubId),
C04
end, C6, [{first, true,
fun(Offset, Lag) ->
?assert(Offset >= 0, "first, at least one chunk consumed"),
?assert(Lag > 0, "first, not all messages consumed")
end},
{last, true,
fun(Offset, _Lag) ->
?assert(Offset > 0, "offset expected for last")
end},
{next, false,
fun(Offset, Lag) ->
?assertEqual(NextOffset, Offset, "next, offset should be at the end of the stream"),
?assert(Lag =:= 0, "next, offset lag should be 0")
end},
{0, true,
fun(Offset, Lag) ->
?assert(Offset >= 0, "offset spec = 0, at least one chunk consumed"),
?assert(Lag > 0, "offset spec = 0, not all messages consumed")
end},
{1_000, false,
fun(Offset, Lag) ->
?assertEqual(NextOffset, Offset, "offset spec = 1000, offset should be at the end of the stream"),
?assert(Lag =:= 0, "offset spec = 1000, offset lag should be 0")
end},
{{timestamp, TheFuture}, false,
fun(Offset, Lag) ->
?assertEqual(NextOffset, Offset, "offset spec in future, offset should be at the end of the stream"),
?assert(Lag =:= 0, "offset spec in future , offset lag should be 0")
end}]),
{ok, C8} = stream_test_utils:delete_stream(S, C7, St),
{ok, _} = stream_test_utils:close(S, C8),
ok.
authentication_error_should_close_with_delay(Config) ->

View File

@ -34,8 +34,11 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.ToLongFunction;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -45,6 +48,7 @@ public class FailureTest {
private static final Logger LOGGER = LoggerFactory.getLogger(FailureTest.class);
static String testMethod;
TestUtils.ClientFactory cf;
String stream;
ExecutorService executorService;
@ -57,6 +61,11 @@ public class FailureTest {
}
}
@BeforeEach
void init(TestInfo info) {
testMethod = info.getTestMethod().get().getName();
}
@AfterEach
void tearDown() {
if (executorService != null) {
@ -142,9 +151,9 @@ public class FailureTest {
waitAtMost(
Duration.ofSeconds(10),
() -> {
LOGGER.info("Getting metadata for {}", stream);
log("Getting metadata for {}", stream);
Client.StreamMetadata m = publisher.metadata(stream).get(stream);
LOGGER.info("Metadata for {} (expecting 2 replicas): {}", stream, m);
log("Metadata for {} (expecting 2 replicas): {}", stream, m);
return m.getReplicas().size() == 2;
});
@ -195,6 +204,7 @@ public class FailureTest {
Map<Long, Message> published = new ConcurrentHashMap<>();
Set<Message> confirmed = ConcurrentHashMap.newKeySet();
// match confirmed messages to published messages
Client.PublishConfirmListener publishConfirmListener =
(publisherId, publishingId) -> {
Message confirmedMessage;
@ -212,18 +222,22 @@ public class FailureTest {
AtomicReference<Client> publisher = new AtomicReference<>();
CountDownLatch reconnectionLatch = new CountDownLatch(1);
AtomicReference<Client.ShutdownListener> shutdownListenerReference = new AtomicReference<>();
// shutdown listener reconnects to node 2 to locate the node the stream leader is on
// it then re-creates a publisher connected to this node
Client.ShutdownListener shutdownListener =
shutdownContext -> {
if (shutdownContext.getShutdownReason()
== Client.ShutdownContext.ShutdownReason.UNKNOWN) {
log("Connection got closed, reconnecting");
// avoid long-running task in the IO thread
executorService.submit(
() -> {
connected.set(false);
AtomicReference<Client> locator = new AtomicReference<>();
try {
log("Reconnecting to node 2");
waitAtMost(
Duration.ofSeconds(5),
Duration.ofSeconds(20),
() -> {
try {
locator.set(
@ -233,14 +247,35 @@ public class FailureTest {
return false;
}
});
log("Reconnected to node 2, looking up new stream leader");
waitAtMost(
Duration.ofSeconds(5),
Duration.ofSeconds(20),
() -> {
Client.StreamMetadata m = locator.get().metadata(stream).get(stream);
return m.getLeader() != null
&& m.getLeader().getPort() != streamPortNode1();
});
log("New stream leader is on another node than node 1");
} catch (Throwable e) {
log("Error while trying to connect to new stream leader");
if (locator.get() == null) {
log("Could not reconnect");
} else {
try {
Client.StreamMetadata m = locator.get().metadata(stream).get(stream);
if (m.getLeader() == null) {
log("The stream has no leader");
} else {
log(
"The stream is on node with port {} (node 1 = {}, node 2 = {})",
m.getLeader().getPort(),
streamPortNode1(),
streamPortNode2());
}
} catch (Exception ex) {
log("Error while checking failure: {}", ex.getMessage());
}
}
reconnectionLatch.countDown();
return;
}
@ -278,6 +313,9 @@ public class FailureTest {
AtomicBoolean keepPublishing = new AtomicBoolean(true);
AtomicLong publishSequence = new AtomicLong(0);
ToLongFunction<Object> publishSequenceFunction = value -> publishSequence.getAndIncrement();
executorService.submit(
() -> {
while (keepPublishing.get()) {
@ -295,7 +333,11 @@ public class FailureTest {
.build();
try {
long publishingId =
publisher.get().publish((byte) 1, Collections.singletonList(message)).get(0);
publisher
.get()
.publish(
(byte) 1, Collections.singletonList(message), publishSequenceFunction)
.get(0);
published.put(publishingId, message);
} catch (Exception e) {
// keep going
@ -314,6 +356,7 @@ public class FailureTest {
int confirmedCount = confirmed.size();
try {
// stop the first node (this is where the stream leader is)
Host.rabbitmqctl("stop_app");
assertThat(reconnectionLatch.await(10, TimeUnit.SECONDS)).isTrue();
@ -324,6 +367,7 @@ public class FailureTest {
} finally {
Host.rabbitmqctl("start_app");
}
// making sure we published a few messages and got the confirmations
assertThat(confirmed).hasSizeGreaterThan(confirmedCount);
confirmedCount = confirmed.size();
@ -339,6 +383,7 @@ public class FailureTest {
// let's publish for a bit of time
Thread.sleep(2000);
// making sure we published messages and got the confirmations
assertThat(confirmed).hasSizeGreaterThan(confirmedCount);
keepPublishing.set(false);
@ -640,4 +685,8 @@ public class FailureTest {
Host.killStreamLeaderProcess(stream);
waitUntil(() -> metadataNotifications.get() == 2);
}
private static void log(String format, Object... args) {
LOGGER.info("[" + testMethod + "] " + format, args);
}
}

View File

@ -6,6 +6,7 @@
</appender>
<logger name="com.rabbitmq.stream" level="info" />
<logger name="com.rabbitmq.stream.impl.Client" level="warn" />
<root level="info">
<appender-ref ref="STDOUT" />