Merge pull request #14242 from rabbitmq/mergify/bp/v4.1.x/pr-14240
Trigger a 4.1.x alpha release build / trigger_alpha_build (push) Has been cancelled
Details
Test (make) / Build and Xref (1.17, 26) (push) Has been cancelled
Details
Test (make) / Build and Xref (1.17, 27) (push) Has been cancelled
Details
Test (make) / Test (1.17, 27, khepri) (push) Has been cancelled
Details
Test (make) / Test (1.17, 27, mnesia) (push) Has been cancelled
Details
Test (make) / Test mixed clusters (1.17, 27, khepri) (push) Has been cancelled
Details
Test (make) / Test mixed clusters (1.17, 27, mnesia) (push) Has been cancelled
Details
Test (make) / Type check (1.17, 27) (push) Has been cancelled
Details
Trigger a 4.1.x alpha release build / trigger_alpha_build (push) Has been cancelled
Details
Test (make) / Build and Xref (1.17, 26) (push) Has been cancelled
Details
Test (make) / Build and Xref (1.17, 27) (push) Has been cancelled
Details
Test (make) / Test (1.17, 27, khepri) (push) Has been cancelled
Details
Test (make) / Test (1.17, 27, mnesia) (push) Has been cancelled
Details
Test (make) / Test mixed clusters (1.17, 27, khepri) (push) Has been cancelled
Details
Test (make) / Test mixed clusters (1.17, 27, mnesia) (push) Has been cancelled
Details
Test (make) / Type check (1.17, 27) (push) Has been cancelled
Details
Increase timeouts and improve error logging in stream test (backport #14240)
This commit is contained in:
commit
a763afc0b9
|
@ -21,10 +21,17 @@ connect(Config, Node) ->
|
|||
connect(StreamPort).
|
||||
|
||||
connect(StreamPort) ->
|
||||
do_connect(StreamPort, #{}).
|
||||
|
||||
connect_pp(StreamPort, PeerProperties) ->
|
||||
do_connect(StreamPort, PeerProperties).
|
||||
|
||||
do_connect(StreamPort, PeerProperties) ->
|
||||
{ok, Sock} = gen_tcp:connect("localhost", StreamPort, [{active, false}, {mode, binary}]),
|
||||
|
||||
C0 = rabbit_stream_core:init(0),
|
||||
PeerPropertiesFrame = rabbit_stream_core:frame({request, 1, {peer_properties, #{}}}),
|
||||
PeerPropertiesFrame = rabbit_stream_core:frame({request, 1, {peer_properties,
|
||||
PeerProperties}}),
|
||||
ok = gen_tcp:send(Sock, PeerPropertiesFrame),
|
||||
{{response, 1, {peer_properties, _, _}}, C1} = receive_stream_commands(Sock, C0),
|
||||
|
||||
|
@ -78,8 +85,12 @@ delete_publisher(Sock, C0, PublisherId) ->
|
|||
subscribe(Sock, C0, Stream, SubscriptionId, InitialCredit) ->
|
||||
subscribe(Sock, C0, Stream, SubscriptionId, InitialCredit, #{}).
|
||||
|
||||
|
||||
subscribe(Sock, C0, Stream, SubscriptionId, InitialCredit, Props) ->
|
||||
Cmd = {subscribe, SubscriptionId, Stream, _OffsetSpec = first,
|
||||
subscribe(Sock, C0, Stream, SubscriptionId, InitialCredit, Props, first).
|
||||
|
||||
subscribe(Sock, C0, Stream, SubscriptionId, InitialCredit, Props, OffsetSpec) ->
|
||||
Cmd = {subscribe, SubscriptionId, Stream, OffsetSpec,
|
||||
InitialCredit, Props},
|
||||
SubscribeFrame = rabbit_stream_core:frame({request, 1, Cmd}),
|
||||
ok = gen_tcp:send(Sock, SubscribeFrame),
|
||||
|
|
|
@ -2344,8 +2344,8 @@ handle_frame_post_auth(Transport,
|
|||
case {is_binary(Host), is_integer(Port)} of
|
||||
{true, true} -> Acc#{Node => {Host, Port}};
|
||||
_ ->
|
||||
rabbit_log:warning("Error when retrieving broker metadata: ~tp ~tp",
|
||||
[Host, Port]),
|
||||
rabbit_log:warning("Error when retrieving broker '~tp' metadata: ~tp ~tp",
|
||||
[Node, Host, Port]),
|
||||
Acc
|
||||
end
|
||||
end,
|
||||
|
|
|
@ -819,89 +819,86 @@ store_offset_requires_read_access(Config) ->
|
|||
|
||||
offset_lag_calculation(Config) ->
|
||||
FunctionName = atom_to_binary(?FUNCTION_NAME, utf8),
|
||||
T = gen_tcp,
|
||||
Port = get_port(T, Config),
|
||||
Opts = get_opts(T),
|
||||
{ok, S} = T:connect("localhost", Port, Opts),
|
||||
C = rabbit_stream_core:init(0),
|
||||
Port = get_port(gen_tcp, Config),
|
||||
ConnectionName = FunctionName,
|
||||
test_peer_properties(T, S, #{<<"connection_name">> => ConnectionName}, C),
|
||||
test_authenticate(T, S, C),
|
||||
{ok, S, C0} = stream_test_utils:connect_pp(Port,
|
||||
#{<<"connection_name">> => ConnectionName}),
|
||||
|
||||
Stream = FunctionName,
|
||||
test_create_stream(T, S, Stream, C),
|
||||
St = FunctionName,
|
||||
{ok, C1} = stream_test_utils:create_stream(S, C0, St),
|
||||
|
||||
SubId = 1,
|
||||
TheFuture = os:system_time(millisecond) + 60 * 60 * 1_000,
|
||||
lists:foreach(fun(OffsetSpec) ->
|
||||
test_subscribe(T, S, SubId, Stream,
|
||||
OffsetSpec, 10, #{},
|
||||
?RESPONSE_CODE_OK, C),
|
||||
ConsumerInfo = consumer_offset_info(Config, ConnectionName),
|
||||
?assertEqual({0, 0}, ConsumerInfo),
|
||||
test_unsubscribe(T, S, SubId, C)
|
||||
end, [first, last, next, 0, 1_000, {timestamp, TheFuture}]),
|
||||
C2 = lists:foldl(
|
||||
fun(OffsetSpec, C00) ->
|
||||
{ok, C01} = stream_test_utils:subscribe(S, C00, St, SubId,
|
||||
10, #{}, OffsetSpec),
|
||||
ConsumerInfo = consumer_offset_info(Config, ConnectionName),
|
||||
?assertEqual({0, 0}, ConsumerInfo),
|
||||
{ok, C02} = stream_test_utils:unsubscribe(S, C01, SubId),
|
||||
C02
|
||||
end, C1, [first, last, next, 0, 1_000, {timestamp, TheFuture}]),
|
||||
|
||||
|
||||
PublisherId = 1,
|
||||
test_declare_publisher(T, S, PublisherId, Stream, C),
|
||||
PubId = 1,
|
||||
{ok, C3} = stream_test_utils:declare_publisher(S, C2, St, PubId),
|
||||
MessageCount = 10,
|
||||
Body = <<"hello">>,
|
||||
lists:foreach(fun(_) ->
|
||||
test_publish_confirm(T, S, PublisherId, Body, C)
|
||||
end, lists:seq(1, MessageCount - 1)),
|
||||
{ok, C4} = stream_test_utils:publish(S, C3, PubId, 1,
|
||||
lists:duplicate(MessageCount - 1, Body)),
|
||||
%% to make sure to have 2 chunks
|
||||
timer:sleep(200),
|
||||
test_publish_confirm(T, S, PublisherId, Body, C),
|
||||
test_delete_publisher(T, S, PublisherId, C),
|
||||
{ok, C5} = stream_test_utils:publish(S, C4, PubId, 1, [Body]),
|
||||
{ok, C6} = stream_test_utils:delete_publisher(S, C5, PubId),
|
||||
|
||||
NextOffset = MessageCount,
|
||||
lists:foreach(fun({OffsetSpec, ReceiveDeliver, CheckFun}) ->
|
||||
test_subscribe(T, S, SubId, Stream,
|
||||
OffsetSpec, 1, #{},
|
||||
?RESPONSE_CODE_OK, C),
|
||||
case ReceiveDeliver of
|
||||
true ->
|
||||
{{deliver, SubId, _}, _} = receive_commands(T, S, C);
|
||||
_ ->
|
||||
ok
|
||||
end,
|
||||
{Offset, Lag} = consumer_offset_info(Config, ConnectionName),
|
||||
CheckFun(Offset, Lag),
|
||||
test_unsubscribe(T, S, SubId, C)
|
||||
end, [{first, true,
|
||||
fun(Offset, Lag) ->
|
||||
?assert(Offset >= 0, "first, at least one chunk consumed"),
|
||||
?assert(Lag > 0, "first, not all messages consumed")
|
||||
end},
|
||||
{last, true,
|
||||
fun(Offset, _Lag) ->
|
||||
?assert(Offset > 0, "offset expected for last")
|
||||
end},
|
||||
{next, false,
|
||||
fun(Offset, Lag) ->
|
||||
?assertEqual(NextOffset, Offset, "next, offset should be at the end of the stream"),
|
||||
?assert(Lag =:= 0, "next, offset lag should be 0")
|
||||
end},
|
||||
{0, true,
|
||||
fun(Offset, Lag) ->
|
||||
?assert(Offset >= 0, "offset spec = 0, at least one chunk consumed"),
|
||||
?assert(Lag > 0, "offset spec = 0, not all messages consumed")
|
||||
end},
|
||||
{1_000, false,
|
||||
fun(Offset, Lag) ->
|
||||
?assertEqual(NextOffset, Offset, "offset spec = 1000, offset should be at the end of the stream"),
|
||||
?assert(Lag =:= 0, "offset spec = 1000, offset lag should be 0")
|
||||
end},
|
||||
{{timestamp, TheFuture}, false,
|
||||
fun(Offset, Lag) ->
|
||||
?assertEqual(NextOffset, Offset, "offset spec in future, offset should be at the end of the stream"),
|
||||
?assert(Lag =:= 0, "offset spec in future , offset lag should be 0")
|
||||
end}]),
|
||||
C7 = lists:foldl(
|
||||
fun({OffsetSpec, ReceiveDeliver, CheckFun}, C00) ->
|
||||
{ok, C01} = stream_test_utils:subscribe(S, C00, St, SubId,
|
||||
1, #{}, OffsetSpec),
|
||||
|
||||
test_delete_stream(T, S, Stream, C, false),
|
||||
test_close(T, S, C),
|
||||
C03 = case ReceiveDeliver of
|
||||
true ->
|
||||
{{deliver, SubId, _}, C02} = receive_commands(S, C01),
|
||||
C02;
|
||||
_ ->
|
||||
C01
|
||||
end,
|
||||
{Offset, Lag} = consumer_offset_info(Config, ConnectionName),
|
||||
CheckFun(Offset, Lag),
|
||||
{ok, C04} = stream_test_utils:unsubscribe(S, C03, SubId),
|
||||
C04
|
||||
end, C6, [{first, true,
|
||||
fun(Offset, Lag) ->
|
||||
?assert(Offset >= 0, "first, at least one chunk consumed"),
|
||||
?assert(Lag > 0, "first, not all messages consumed")
|
||||
end},
|
||||
{last, true,
|
||||
fun(Offset, _Lag) ->
|
||||
?assert(Offset > 0, "offset expected for last")
|
||||
end},
|
||||
{next, false,
|
||||
fun(Offset, Lag) ->
|
||||
?assertEqual(NextOffset, Offset, "next, offset should be at the end of the stream"),
|
||||
?assert(Lag =:= 0, "next, offset lag should be 0")
|
||||
end},
|
||||
{0, true,
|
||||
fun(Offset, Lag) ->
|
||||
?assert(Offset >= 0, "offset spec = 0, at least one chunk consumed"),
|
||||
?assert(Lag > 0, "offset spec = 0, not all messages consumed")
|
||||
end},
|
||||
{1_000, false,
|
||||
fun(Offset, Lag) ->
|
||||
?assertEqual(NextOffset, Offset, "offset spec = 1000, offset should be at the end of the stream"),
|
||||
?assert(Lag =:= 0, "offset spec = 1000, offset lag should be 0")
|
||||
end},
|
||||
{{timestamp, TheFuture}, false,
|
||||
fun(Offset, Lag) ->
|
||||
?assertEqual(NextOffset, Offset, "offset spec in future, offset should be at the end of the stream"),
|
||||
?assert(Lag =:= 0, "offset spec in future , offset lag should be 0")
|
||||
end}]),
|
||||
|
||||
{ok, C8} = stream_test_utils:delete_stream(S, C7, St),
|
||||
{ok, _} = stream_test_utils:close(S, C8),
|
||||
ok.
|
||||
|
||||
authentication_error_should_close_with_delay(Config) ->
|
||||
|
|
|
@ -34,8 +34,11 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.ToLongFunction;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.TestInfo;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -45,6 +48,7 @@ public class FailureTest {
|
|||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(FailureTest.class);
|
||||
|
||||
static String testMethod;
|
||||
TestUtils.ClientFactory cf;
|
||||
String stream;
|
||||
ExecutorService executorService;
|
||||
|
@ -57,6 +61,11 @@ public class FailureTest {
|
|||
}
|
||||
}
|
||||
|
||||
@BeforeEach
|
||||
void init(TestInfo info) {
|
||||
testMethod = info.getTestMethod().get().getName();
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
void tearDown() {
|
||||
if (executorService != null) {
|
||||
|
@ -142,9 +151,9 @@ public class FailureTest {
|
|||
waitAtMost(
|
||||
Duration.ofSeconds(10),
|
||||
() -> {
|
||||
LOGGER.info("Getting metadata for {}", stream);
|
||||
log("Getting metadata for {}", stream);
|
||||
Client.StreamMetadata m = publisher.metadata(stream).get(stream);
|
||||
LOGGER.info("Metadata for {} (expecting 2 replicas): {}", stream, m);
|
||||
log("Metadata for {} (expecting 2 replicas): {}", stream, m);
|
||||
return m.getReplicas().size() == 2;
|
||||
});
|
||||
|
||||
|
@ -195,6 +204,7 @@ public class FailureTest {
|
|||
Map<Long, Message> published = new ConcurrentHashMap<>();
|
||||
Set<Message> confirmed = ConcurrentHashMap.newKeySet();
|
||||
|
||||
// match confirmed messages to published messages
|
||||
Client.PublishConfirmListener publishConfirmListener =
|
||||
(publisherId, publishingId) -> {
|
||||
Message confirmedMessage;
|
||||
|
@ -212,18 +222,22 @@ public class FailureTest {
|
|||
AtomicReference<Client> publisher = new AtomicReference<>();
|
||||
CountDownLatch reconnectionLatch = new CountDownLatch(1);
|
||||
AtomicReference<Client.ShutdownListener> shutdownListenerReference = new AtomicReference<>();
|
||||
// shutdown listener reconnects to node 2 to locate the node the stream leader is on
|
||||
// it then re-creates a publisher connected to this node
|
||||
Client.ShutdownListener shutdownListener =
|
||||
shutdownContext -> {
|
||||
if (shutdownContext.getShutdownReason()
|
||||
== Client.ShutdownContext.ShutdownReason.UNKNOWN) {
|
||||
log("Connection got closed, reconnecting");
|
||||
// avoid long-running task in the IO thread
|
||||
executorService.submit(
|
||||
() -> {
|
||||
connected.set(false);
|
||||
AtomicReference<Client> locator = new AtomicReference<>();
|
||||
try {
|
||||
log("Reconnecting to node 2");
|
||||
waitAtMost(
|
||||
Duration.ofSeconds(5),
|
||||
Duration.ofSeconds(20),
|
||||
() -> {
|
||||
try {
|
||||
locator.set(
|
||||
|
@ -233,14 +247,35 @@ public class FailureTest {
|
|||
return false;
|
||||
}
|
||||
});
|
||||
log("Reconnected to node 2, looking up new stream leader");
|
||||
waitAtMost(
|
||||
Duration.ofSeconds(5),
|
||||
Duration.ofSeconds(20),
|
||||
() -> {
|
||||
Client.StreamMetadata m = locator.get().metadata(stream).get(stream);
|
||||
return m.getLeader() != null
|
||||
&& m.getLeader().getPort() != streamPortNode1();
|
||||
});
|
||||
log("New stream leader is on another node than node 1");
|
||||
} catch (Throwable e) {
|
||||
log("Error while trying to connect to new stream leader");
|
||||
if (locator.get() == null) {
|
||||
log("Could not reconnect");
|
||||
} else {
|
||||
try {
|
||||
Client.StreamMetadata m = locator.get().metadata(stream).get(stream);
|
||||
if (m.getLeader() == null) {
|
||||
log("The stream has no leader");
|
||||
} else {
|
||||
log(
|
||||
"The stream is on node with port {} (node 1 = {}, node 2 = {})",
|
||||
m.getLeader().getPort(),
|
||||
streamPortNode1(),
|
||||
streamPortNode2());
|
||||
}
|
||||
} catch (Exception ex) {
|
||||
log("Error while checking failure: {}", ex.getMessage());
|
||||
}
|
||||
}
|
||||
reconnectionLatch.countDown();
|
||||
return;
|
||||
}
|
||||
|
@ -278,6 +313,9 @@ public class FailureTest {
|
|||
|
||||
AtomicBoolean keepPublishing = new AtomicBoolean(true);
|
||||
|
||||
AtomicLong publishSequence = new AtomicLong(0);
|
||||
ToLongFunction<Object> publishSequenceFunction = value -> publishSequence.getAndIncrement();
|
||||
|
||||
executorService.submit(
|
||||
() -> {
|
||||
while (keepPublishing.get()) {
|
||||
|
@ -295,7 +333,11 @@ public class FailureTest {
|
|||
.build();
|
||||
try {
|
||||
long publishingId =
|
||||
publisher.get().publish((byte) 1, Collections.singletonList(message)).get(0);
|
||||
publisher
|
||||
.get()
|
||||
.publish(
|
||||
(byte) 1, Collections.singletonList(message), publishSequenceFunction)
|
||||
.get(0);
|
||||
published.put(publishingId, message);
|
||||
} catch (Exception e) {
|
||||
// keep going
|
||||
|
@ -314,6 +356,7 @@ public class FailureTest {
|
|||
int confirmedCount = confirmed.size();
|
||||
|
||||
try {
|
||||
// stop the first node (this is where the stream leader is)
|
||||
Host.rabbitmqctl("stop_app");
|
||||
|
||||
assertThat(reconnectionLatch.await(10, TimeUnit.SECONDS)).isTrue();
|
||||
|
@ -324,6 +367,7 @@ public class FailureTest {
|
|||
} finally {
|
||||
Host.rabbitmqctl("start_app");
|
||||
}
|
||||
// making sure we published a few messages and got the confirmations
|
||||
assertThat(confirmed).hasSizeGreaterThan(confirmedCount);
|
||||
confirmedCount = confirmed.size();
|
||||
|
||||
|
@ -339,6 +383,7 @@ public class FailureTest {
|
|||
// let's publish for a bit of time
|
||||
Thread.sleep(2000);
|
||||
|
||||
// making sure we published messages and got the confirmations
|
||||
assertThat(confirmed).hasSizeGreaterThan(confirmedCount);
|
||||
|
||||
keepPublishing.set(false);
|
||||
|
@ -640,4 +685,8 @@ public class FailureTest {
|
|||
Host.killStreamLeaderProcess(stream);
|
||||
waitUntil(() -> metadataNotifications.get() == 2);
|
||||
}
|
||||
|
||||
private static void log(String format, Object... args) {
|
||||
LOGGER.info("[" + testMethod + "] " + format, args);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,6 +6,7 @@
|
|||
</appender>
|
||||
|
||||
<logger name="com.rabbitmq.stream" level="info" />
|
||||
<logger name="com.rabbitmq.stream.impl.Client" level="warn" />
|
||||
|
||||
<root level="info">
|
||||
<appender-ref ref="STDOUT" />
|
||||
|
|
Loading…
Reference in New Issue