Adapt stream plugin after coordinator refactoring

Do not use mnesia record for topology info, use coordinator instead.
This commit is contained in:
Arnaud Cogoluègnes 2021-03-10 15:23:13 +01:00 committed by kjnilsson
parent 7fa3f6b6e1
commit e80db9d46a
6 changed files with 79 additions and 59 deletions

View File

@ -240,10 +240,8 @@ handle_call({lookup_leader, VirtualHost, Stream}, _From, State) ->
{ok, Q} ->
case is_stream_queue(Q) of
true ->
#{leader_pid := LeaderPid} =
amqqueue:get_type_state(Q),
% FIXME check if pid is alive in case of stale information
LeaderPid;
amqqueue:get_pid(Q);
_ ->
cluster_not_found
end;
@ -261,24 +259,16 @@ handle_call({lookup_local_member, VirtualHost, Stream}, _From,
{ok, Q} ->
case is_stream_queue(Q) of
true ->
#{leader_pid := LeaderPid,
replica_pids := ReplicaPids} =
amqqueue:get_type_state(Q),
LocalMember =
lists:foldl(fun(Pid, Acc) ->
case node(Pid) =:= node() of
true -> Pid;
false -> Acc
end
end,
undefined,
[LeaderPid] ++ ReplicaPids),
#{name := StreamName} = amqqueue:get_type_state(Q),
% FIXME check if pid is alive in case of stale information
case LocalMember of
undefined ->
case rabbit_stream_coordinator:local_pid(StreamName)
of
{ok, Pid} when is_pid(Pid) ->
{ok, Pid};
{error, timeout} ->
{error, not_available};
Pid ->
{ok, Pid}
_ ->
{error, not_available}
end;
_ ->
{error, not_found}
@ -302,33 +292,36 @@ handle_call({topology, VirtualHost, Stream}, _From, State) ->
case is_stream_queue(Q) of
true ->
QState = amqqueue:get_type_state(Q),
ProcessAliveFun =
fun(Pid) ->
rpc:call(node(Pid),
erlang,
is_process_alive,
[Pid],
10000)
end,
LeaderNode =
case ProcessAliveFun(maps:get(leader_pid, QState))
#{name := StreamName} = QState,
StreamMembers =
case rabbit_stream_coordinator:members(StreamName)
of
true ->
maps:get(leader_node, QState);
{ok, Members} ->
maps:fold(fun (_Node, {undefined, _Role},
Acc) ->
Acc;
(LeaderNode, {_Pid, writer},
Acc) ->
Acc#{leader_node =>
LeaderNode};
(ReplicaNode,
{_Pid, replica}, Acc) ->
#{replica_nodes :=
ReplicaNodes} =
Acc,
Acc#{replica_nodes =>
ReplicaNodes
++ [ReplicaNode]};
(_Node, _, Acc) ->
Acc
end,
#{leader_node => undefined,
replica_nodes => []},
Members);
_ ->
undefined
{error, stream_not_found}
end,
ReplicaNodes =
lists:foldl(fun(Pid, Acc) ->
case ProcessAliveFun(Pid) of
true -> Acc ++ [node(Pid)];
_ -> Acc
end
end,
[], maps:get(replica_pids, QState)),
{ok,
#{leader_node => LeaderNode,
replica_nodes => ReplicaNodes}};
{ok, StreamMembers};
_ ->
{error, stream_not_found}
end;

View File

@ -1814,9 +1814,10 @@ handle_frame_post_auth(Transport,
Username)
of
{ok,
#{leader_pid := LeaderPid,
replica_pids := ReturnedReplicas}} ->
rabbit_log:info("Created cluster with leader ~p and replicas ~p",
#{leader_node := LeaderPid,
replica_nodes := ReturnedReplicas}} ->
rabbit_log:info("Created cluster with leader on ~p and replicas "
"on ~p~n",
[LeaderPid, ReturnedReplicas]),
response_ok(Transport,
Connection,
@ -1942,11 +1943,18 @@ handle_frame_post_auth(Transport,
Rest) ->
Streams = rabbit_stream_utils:extract_stream_list(BinaryStreams, []),
Topology =
lists:foldl(fun(Stream, Acc) ->
Acc#{Stream =>
rabbit_stream_manager:topology(VirtualHost,
Stream)}
end,
#{}, Streams),
%% get the nodes involved in the streams
NodesMap =
lists:foldl(fun(Stream, Acc) ->
case rabbit_stream_manager:topology(VirtualHost, Stream)
of
case maps:get(Stream, Topology) of
{ok,
#{leader_node := undefined,
replica_nodes := ReplicaNodes}} ->
@ -2004,8 +2012,7 @@ handle_frame_post_auth(Transport,
MetadataBin =
lists:foldl(fun(Stream, Acc) ->
StreamLength = byte_size(Stream),
case rabbit_stream_manager:topology(VirtualHost, Stream)
of
case maps:get(Stream, Topology) of
{error, stream_not_found} ->
<<Acc/binary,
StreamLength:16,

View File

@ -46,7 +46,8 @@ public class ClusterSizeTest {
@ParameterizedTest
@CsvSource({"1,1", "2,2", "3,3", "5,3"})
void clusterSizeShouldReflectOnMetadata(String requestedClusterSize, int expectedClusterSize) {
void clusterSizeShouldReflectOnMetadata(String requestedClusterSize, int expectedClusterSize)
throws InterruptedException {
Client client = cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode1()));
String s = UUID.randomUUID().toString();
try {
@ -56,8 +57,13 @@ public class ClusterSizeTest {
StreamMetadata metadata = client.metadata(s).get(s);
assertThat(metadata).isNotNull();
assertThat(metadata.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK);
int actualClusterSize = metadata.getLeader() == null ? 0 : 1 + metadata.getReplicas().size();
assertThat(actualClusterSize).isEqualTo(expectedClusterSize);
TestUtils.waitUntil(
() -> {
StreamMetadata m = client.metadata(s).get(s);
assertThat(metadata).isNotNull();
int actualClusterSize = m.getLeader() == null ? 0 : 1 + m.getReplicas().size();
return actualClusterSize == expectedClusterSize;
});
} finally {
client.delete(s);
}

View File

@ -371,6 +371,11 @@ public class FailureTest {
Client.StreamMetadata streamMetadata = metadata.get(stream);
assertThat(streamMetadata).isNotNull();
TestUtils.waitUntil(
() -> metadataClient.metadata(stream).get(stream).getReplicas().size() == 2);
metadata = metadataClient.metadata(stream);
streamMetadata = metadata.get(stream);
assertThat(streamMetadata.getLeader()).isNotNull();
assertThat(streamMetadata.getLeader().getPort()).isEqualTo(TestUtils.streamPortNode1());
@ -516,7 +521,7 @@ public class FailureTest {
// wait until all the replicas are there
TestUtils.waitAtMost(
Duration.ofSeconds(5),
Duration.ofSeconds(10),
() -> {
Client.StreamMetadata m = metadataClient.metadata(stream).get(stream);
return m.getReplicas().size() == 2;

View File

@ -82,7 +82,10 @@ public class StreamTest {
Client client = cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode1()));
Map<String, Client.StreamMetadata> metadata = client.metadata(stream);
assertThat(metadata).hasSize(1).containsKey(stream);
Client.StreamMetadata streamMetadata = metadata.get(stream);
TestUtils.waitUntil(() -> client.metadata(stream).get(stream).getReplicas().size() == 2);
Client.StreamMetadata streamMetadata = client.metadata(stream).get(stream);
CountDownLatch publishingLatch = new CountDownLatch(messageCount);
Client publisher =
@ -129,13 +132,13 @@ public class StreamTest {
}
@Test
void metadataOnClusterShouldReturnLeaderAndReplicas() {
void metadataOnClusterShouldReturnLeaderAndReplicas() throws InterruptedException {
Client client = cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode1()));
Map<String, Client.StreamMetadata> metadata = client.metadata(stream);
assertThat(metadata).hasSize(1).containsKey(stream);
Client.StreamMetadata streamMetadata = metadata.get(stream);
assertThat(streamMetadata.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK);
assertThat(streamMetadata.getReplicas()).hasSize(2);
assertThat(metadata.get(stream).getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK);
TestUtils.waitUntil(() -> client.metadata(stream).get(stream).getReplicas().size() == 2);
BiConsumer<Client.Broker, Client.Broker> assertNodesAreDifferent =
(node, anotherNode) -> {
@ -143,6 +146,8 @@ public class StreamTest {
assertThat(node.getPort()).isNotEqualTo(anotherNode.getPort());
};
Client.StreamMetadata streamMetadata = client.metadata(stream).get(stream);
streamMetadata
.getReplicas()
.forEach(replica -> assertNodesAreDifferent.accept(replica, streamMetadata.getLeader()));

View File

@ -45,6 +45,10 @@ public class TestUtils {
return Integer.valueOf(port);
}
static void waitUntil(BooleanSupplier condition) throws InterruptedException {
waitAtMost(Duration.ofSeconds(10), condition);
}
static void waitAtMost(Duration duration, BooleanSupplier condition) throws InterruptedException {
if (condition.getAsBoolean()) {
return;