Format stream and stream management plugins code

This commit is contained in:
Arnaud Cogoluègnes 2021-01-06 15:47:27 +01:00
parent 07b9e68ff5
commit 5f0df3bdb0
No known key found for this signature in database
GPG Key ID: D5C8C4DFAD43AFA8
29 changed files with 3368 additions and 1783 deletions

12
deps/rabbitmq_stream/rebar.config vendored Normal file
View File

@ -0,0 +1,12 @@
{plugins, [rebar3_format]}.
{format, [
{files, ["src/*.erl", "test/*.erl"]},
{formatter, default_formatter},
{options, #{
paper => 80,
ribbon => 70,
inline_attributes => {when_under, 1},
inline_items => {when_under, 4}
}}
]}.

View File

@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is GoPivotal, Inc.
%% Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved.
%% Copyright (c) 2020-2021 VMware, Inc. or its affiliates. All rights reserved.
-module('Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConnectionsCommand').
@ -34,23 +34,32 @@
description/0,
help_section/0]).
formatter() -> 'Elixir.RabbitMQ.CLI.Formatters.Table'.
formatter() ->
'Elixir.RabbitMQ.CLI.Formatters.Table'.
scopes() -> [ctl, diagnostics, streams].
scopes() ->
[ctl, diagnostics, streams].
switches() -> [{verbose, boolean}].
aliases() -> [{'V', verbose}].
switches() ->
[{verbose, boolean}].
description() -> <<"Lists stream connections on the target node">>.
aliases() ->
[{'V', verbose}].
description() ->
<<"Lists stream connections on the target node">>.
help_section() ->
{plugin, stream}.
validate(Args, _) ->
case 'Elixir.RabbitMQ.CLI.Ctl.InfoKeys':validate_info_keys(Args,
?INFO_ITEMS) of
{ok, _} -> ok;
Error -> Error
?INFO_ITEMS)
of
{ok, _} ->
ok;
Error ->
Error
end.
merge_defaults([], Opts) ->
@ -63,33 +72,37 @@ usage() ->
usage_additional() ->
Prefix = <<" must be one of ">>,
InfoItems = 'Elixir.Enum':join(lists:usort(?INFO_ITEMS), <<", ">>),
[
{<<"<column>">>, <<Prefix/binary, InfoItems/binary>>}
].
InfoItems =
'Elixir.Enum':join(
lists:usort(?INFO_ITEMS), <<", ">>),
[{<<"<column>">>, <<Prefix/binary, InfoItems/binary>>}].
usage_doc_guides() ->
[?STREAM_GUIDE_URL].
run(Args, #{node := NodeName,
timeout := Timeout,
verbose := Verbose}) ->
InfoKeys = case Verbose of
true -> ?INFO_ITEMS;
false -> 'Elixir.RabbitMQ.CLI.Ctl.InfoKeys':prepare_info_keys(Args)
end,
run(Args,
#{node := NodeName,
timeout := Timeout,
verbose := Verbose}) ->
InfoKeys =
case Verbose of
true ->
?INFO_ITEMS;
false ->
'Elixir.RabbitMQ.CLI.Ctl.InfoKeys':prepare_info_keys(Args)
end,
Nodes = 'Elixir.RabbitMQ.CLI.Core.Helpers':nodes_in_cluster(NodeName),
'Elixir.RabbitMQ.CLI.Ctl.RpcStream':receive_list_items(
NodeName,
rabbit_stream,
emit_connection_info_all,
[Nodes, InfoKeys],
Timeout,
InfoKeys,
length(Nodes)).
'Elixir.RabbitMQ.CLI.Ctl.RpcStream':receive_list_items(NodeName,
rabbit_stream,
emit_connection_info_all,
[Nodes, InfoKeys],
Timeout,
InfoKeys,
length(Nodes)).
banner(_, _) -> <<"Listing stream connections ...">>.
banner(_, _) ->
<<"Listing stream connections ...">>.
output(Result, _Opts) ->
'Elixir.RabbitMQ.CLI.DefaultOutput':output(Result).

View File

@ -11,17 +11,21 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is Pivotal Software, Inc.
%% Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved.
%% Copyright (c) 2020-2021 VMware, Inc. or its affiliates. All rights reserved.
%%
-module(rabbit_stream).
-behaviour(application).
-export([start/2, host/0, port/0, kill_connection/1]).
-export([start/2,
host/0,
port/0,
kill_connection/1]).
-export([stop/1]).
-export([emit_connection_info_local/3,
emit_connection_info_all/4,
list/0]).
emit_connection_info_all/4,
list/0]).
-include_lib("rabbit_common/include/rabbit.hrl").
@ -30,7 +34,8 @@ start(_Type, _Args) ->
rabbit_stream_sup:start_link().
host() ->
case application:get_env(rabbitmq_stream, advertised_host, undefined) of
case application:get_env(rabbitmq_stream, advertised_host, undefined)
of
undefined ->
hostname_from_node();
Host ->
@ -38,9 +43,10 @@ host() ->
end.
hostname_from_node() ->
case re:split(rabbit_data_coercion:to_binary(node()),
"@",
[{return, binary}, {parts, 2}]) of
case re:split(
rabbit_data_coercion:to_binary(node()), "@",
[{return, binary}, {parts, 2}])
of
[_, Hostname] ->
Hostname;
[_] ->
@ -49,7 +55,8 @@ hostname_from_node() ->
end.
port() ->
case application:get_env(rabbitmq_stream, advertised_port, undefined) of
case application:get_env(rabbitmq_stream, advertised_port, undefined)
of
undefined ->
port_from_listener();
Port ->
@ -58,11 +65,13 @@ port() ->
port_from_listener() ->
Listeners = rabbit_networking:node_listeners(node()),
Port = lists:foldl(fun(#listener{port = Port, protocol = stream}, _Acc) ->
Port;
(_, Acc) ->
Acc
end, undefined, Listeners),
Port =
lists:foldl(fun (#listener{port = Port, protocol = stream}, _Acc) ->
Port;
(_, Acc) ->
Acc
end,
undefined, Listeners),
Port.
stop(_State) ->
@ -71,35 +80,44 @@ stop(_State) ->
kill_connection(ConnectionName) ->
ConnectionNameBin = rabbit_data_coercion:to_binary(ConnectionName),
lists:foreach(fun(ConnectionPid) ->
ConnectionPid ! {infos, self()},
receive
{ConnectionPid, #{<<"connection_name">> := ConnectionNameBin}} ->
exit(ConnectionPid, kill);
{ConnectionPid, _ClientProperties} ->
ok
after 1000 ->
ok
end
end, pg_local:get_members(rabbit_stream_connections)).
ConnectionPid ! {infos, self()},
receive
{ConnectionPid,
#{<<"connection_name">> := ConnectionNameBin}} ->
exit(ConnectionPid, kill);
{ConnectionPid, _ClientProperties} -> ok
after 1000 -> ok
end
end,
pg_local:get_members(rabbit_stream_connections)).
emit_connection_info_all(Nodes, Items, Ref, AggregatorPid) ->
Pids = [spawn_link(Node, rabbit_stream, emit_connection_info_local,
[Items, Ref, AggregatorPid])
|| Node <- Nodes],
Pids =
[spawn_link(Node,
rabbit_stream,
emit_connection_info_local,
[Items, Ref, AggregatorPid])
|| Node <- Nodes],
rabbit_control_misc:await_emitters_termination(Pids),
ok.
emit_connection_info_local(Items, Ref, AggregatorPid) ->
rabbit_control_misc:emitting_map_with_exit_handler(
AggregatorPid, Ref, fun(Pid) ->
rabbit_stream_reader:info(Pid, Items)
end,
list()).
rabbit_control_misc:emitting_map_with_exit_handler(AggregatorPid,
Ref,
fun(Pid) ->
rabbit_stream_reader:info(Pid,
Items)
end,
list()).
list() ->
[Client
|| {_, ListSupPid, _, _} <- supervisor2:which_children(rabbit_stream_sup),
{_, RanchSup, supervisor, _} <- supervisor2:which_children(ListSupPid),
{ranch_conns_sup, ConnSup, _, _} <- supervisor:which_children(RanchSup),
|| {_, ListSupPid, _, _}
<- supervisor2:which_children(rabbit_stream_sup),
{_, RanchSup, supervisor, _}
<- supervisor2:which_children(ListSupPid),
{ranch_conns_sup, ConnSup, _, _}
<- supervisor:which_children(RanchSup),
{_, CliSup, _, _} <- supervisor:which_children(ConnSup),
{rabbit_stream_reader, Client, _, _} <- supervisor:which_children(CliSup)].
{rabbit_stream_reader, Client, _, _}
<- supervisor:which_children(CliSup)].

View File

@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is Pivotal Software, Inc.
%% Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved.
%% Copyright (c) 2020-2021 VMware, Inc. or its affiliates. All rights reserved.
%%
-module(rabbit_stream_connection_sup).
@ -21,23 +21,30 @@
-include_lib("rabbit_common/include/rabbit.hrl").
-export([start_link/4, start_keepalive_link/0]).
-export([start_link/4,
start_keepalive_link/0]).
-export([init/1]).
start_link(Ref, _Sock, Transport, Opts) ->
{ok, SupPid} = supervisor2:start_link(?MODULE, []),
{ok, KeepaliveSup} = supervisor2:start_child(
SupPid,
{rabbit_stream_keepalive_sup,
{rabbit_stream_connection_sup, start_keepalive_link, []},
intrinsic, infinity, supervisor, [rabbit_keepalive_sup]}),
{ok, ReaderPid} = supervisor2:start_child(
SupPid,
{rabbit_stream_reader,
{rabbit_stream_reader, start_link, [KeepaliveSup, Transport, Ref, Opts]},
intrinsic, ?WORKER_WAIT, worker, [rabbit_stream_reader]}),
{ok, KeepaliveSup} =
supervisor2:start_child(SupPid,
{rabbit_stream_keepalive_sup,
{rabbit_stream_connection_sup,
start_keepalive_link, []},
intrinsic,
infinity,
supervisor,
[rabbit_keepalive_sup]}),
{ok, ReaderPid} =
supervisor2:start_child(SupPid,
{rabbit_stream_reader,
{rabbit_stream_reader, start_link,
[KeepaliveSup, Transport, Ref, Opts]},
intrinsic,
?WORKER_WAIT,
worker,
[rabbit_stream_reader]}),
{ok, SupPid, ReaderPid}.
start_keepalive_link() ->

View File

@ -11,21 +11,28 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is Pivotal Software, Inc.
%% Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved.
%% Copyright (c) 2020-2021 VMware, Inc. or its affiliates. All rights reserved.
%%
-module(rabbit_stream_manager).
-behaviour(gen_server).
-include_lib("rabbit_common/include/rabbit.hrl").
%% API
-export([init/1, handle_call/3, handle_cast/2, handle_info/2]).
-export([start_link/1, create/4, delete/3, lookup_leader/2, lookup_local_member/2, topology/2]).
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2]).
-export([start_link/1,
create/4,
delete/3,
lookup_leader/2,
lookup_local_member/2,
topology/2]).
-record(state, {
configuration
}).
-record(state, {configuration}).
start_link(Conf) ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [Conf], []).
@ -34,13 +41,16 @@ init([Conf]) ->
{ok, #state{configuration = Conf}}.
-spec create(binary(), binary(), #{binary() => binary()}, binary()) ->
{ok, map()} | {error, reference_already_exists} | {error, internal_error}
| {error, validation_failed}.
{ok, map()} |
{error, reference_already_exists} |
{error, internal_error} |
{error, validation_failed}.
create(VirtualHost, Reference, Arguments, Username) ->
gen_server:call(?MODULE, {create, VirtualHost, Reference, Arguments, Username}).
gen_server:call(?MODULE,
{create, VirtualHost, Reference, Arguments, Username}).
-spec delete(binary(), binary(), binary()) ->
{ok, deleted} | {error, reference_not_found}.
{ok, deleted} | {error, reference_not_found}.
delete(VirtualHost, Reference, Username) ->
gen_server:call(?MODULE, {delete, VirtualHost, Reference, Username}).
@ -48,59 +58,73 @@ delete(VirtualHost, Reference, Username) ->
lookup_leader(VirtualHost, Stream) ->
gen_server:call(?MODULE, {lookup_leader, VirtualHost, Stream}).
-spec lookup_local_member(binary(), binary()) -> {ok, pid()}
| {error, not_found} | {error, not_available}.
-spec lookup_local_member(binary(), binary()) ->
{ok, pid()} | {error, not_found} |
{error, not_available}.
lookup_local_member(VirtualHost, Stream) ->
gen_server:call(?MODULE, {lookup_local_member, VirtualHost, Stream}).
-spec topology(binary(), binary()) ->
{ok, #{leader_node => undefined | pid(), replica_nodes => [pid()]}}
| {error, stream_not_found} | {error, stream_not_available}.
{ok,
#{leader_node => undefined | pid(),
replica_nodes => [pid()]}} |
{error, stream_not_found} | {error, stream_not_available}.
topology(VirtualHost, Stream) ->
gen_server:call(?MODULE, {topology, VirtualHost, Stream}).
stream_queue_arguments(Arguments) ->
stream_queue_arguments([{<<"x-queue-type">>, longstr, <<"stream">>}], Arguments).
stream_queue_arguments([{<<"x-queue-type">>, longstr, <<"stream">>}],
Arguments).
stream_queue_arguments(ArgumentsAcc, Arguments) when map_size(Arguments) =:= 0 ->
stream_queue_arguments(ArgumentsAcc, Arguments)
when map_size(Arguments) =:= 0 ->
ArgumentsAcc;
stream_queue_arguments(ArgumentsAcc, #{<<"max-length-bytes">> := Value} = Arguments) ->
stream_queue_arguments(
[{<<"x-max-length-bytes">>, long, binary_to_integer(Value)}] ++ ArgumentsAcc,
maps:remove(<<"max-length-bytes">>, Arguments)
);
stream_queue_arguments(ArgumentsAcc, #{<<"max-age">> := Value} = Arguments) ->
stream_queue_arguments(
[{<<"x-max-age">>, longstr, Value}] ++ ArgumentsAcc,
maps:remove(<<"max-age">>, Arguments)
);
stream_queue_arguments(ArgumentsAcc, #{<<"max-segment-size">> := Value} = Arguments) ->
stream_queue_arguments(
[{<<"x-max-segment-size">>, long, binary_to_integer(Value)}] ++ ArgumentsAcc,
maps:remove(<<"max-segment-size">>, Arguments)
);
stream_queue_arguments(ArgumentsAcc, #{<<"initial-cluster-size">> := Value} = Arguments) ->
stream_queue_arguments(
[{<<"x-initial-cluster-size">>, long, binary_to_integer(Value)}] ++ ArgumentsAcc,
maps:remove(<<"initial-cluster-size">>, Arguments)
);
stream_queue_arguments(ArgumentsAcc, #{<<"queue-leader-locator">> := Value} = Arguments) ->
stream_queue_arguments(
[{<<"x-queue-leader-locator">>, longstr, Value}] ++ ArgumentsAcc,
maps:remove(<<"queue-leader-locator">>, Arguments)
);
stream_queue_arguments(ArgumentsAcc,
#{<<"max-length-bytes">> := Value} = Arguments) ->
stream_queue_arguments([{<<"x-max-length-bytes">>, long,
binary_to_integer(Value)}]
++ ArgumentsAcc,
maps:remove(<<"max-length-bytes">>, Arguments));
stream_queue_arguments(ArgumentsAcc,
#{<<"max-age">> := Value} = Arguments) ->
stream_queue_arguments([{<<"x-max-age">>, longstr, Value}]
++ ArgumentsAcc,
maps:remove(<<"max-age">>, Arguments));
stream_queue_arguments(ArgumentsAcc,
#{<<"max-segment-size">> := Value} = Arguments) ->
stream_queue_arguments([{<<"x-max-segment-size">>, long,
binary_to_integer(Value)}]
++ ArgumentsAcc,
maps:remove(<<"max-segment-size">>, Arguments));
stream_queue_arguments(ArgumentsAcc,
#{<<"initial-cluster-size">> := Value} = Arguments) ->
stream_queue_arguments([{<<"x-initial-cluster-size">>, long,
binary_to_integer(Value)}]
++ ArgumentsAcc,
maps:remove(<<"initial-cluster-size">>, Arguments));
stream_queue_arguments(ArgumentsAcc,
#{<<"queue-leader-locator">> := Value} = Arguments) ->
stream_queue_arguments([{<<"x-queue-leader-locator">>, longstr,
Value}]
++ ArgumentsAcc,
maps:remove(<<"queue-leader-locator">>, Arguments));
stream_queue_arguments(ArgumentsAcc, _Arguments) ->
ArgumentsAcc.
validate_stream_queue_arguments([]) ->
ok;
validate_stream_queue_arguments([{<<"x-initial-cluster-size">>, long, ClusterSize} | _]) when ClusterSize =< 0 ->
validate_stream_queue_arguments([{<<"x-initial-cluster-size">>, long,
ClusterSize}
| _])
when ClusterSize =< 0 ->
error;
validate_stream_queue_arguments([{<<"x-queue-leader-locator">>, longstr, Locator} | T]) ->
case lists:member(Locator, [<<"client-local">>,
<<"random">>,
<<"least-leaders">>]) of
true ->
validate_stream_queue_arguments([{<<"x-queue-leader-locator">>,
longstr, Locator}
| T]) ->
case lists:member(Locator,
[<<"client-local">>, <<"random">>, <<"least-leaders">>])
of
true ->
validate_stream_queue_arguments(T);
false ->
error
@ -108,74 +132,105 @@ validate_stream_queue_arguments([{<<"x-queue-leader-locator">>, longstr, Locator
validate_stream_queue_arguments([_ | T]) ->
validate_stream_queue_arguments(T).
handle_call({create, VirtualHost, Reference, Arguments, Username}, _From, State) ->
Name = #resource{virtual_host = VirtualHost, kind = queue, name = Reference},
handle_call({create, VirtualHost, Reference, Arguments, Username},
_From, State) ->
Name =
#resource{virtual_host = VirtualHost,
kind = queue,
name = Reference},
StreamQueueArguments = stream_queue_arguments(Arguments),
case validate_stream_queue_arguments(StreamQueueArguments) of
ok ->
Q0 = amqqueue:new(
Name,
none, true, false, none, StreamQueueArguments,
VirtualHost, #{user => Username}, rabbit_stream_queue
),
case rabbit_amqqueue:with(
Name,
fun(Q) ->
ok = rabbit_amqqueue:assert_equivalence(Q, true, false, StreamQueueArguments, none)
end) of
Q0 = amqqueue:new(Name,
none,
true,
false,
none,
StreamQueueArguments,
VirtualHost,
#{user => Username},
rabbit_stream_queue),
case rabbit_amqqueue:with(Name,
fun(Q) ->
ok =
rabbit_amqqueue:assert_equivalence(Q,
true,
false,
StreamQueueArguments,
none)
end)
of
ok ->
{reply, {error, reference_already_exists}, State};
{error, not_found} ->
try
case rabbit_stream_queue:declare(Q0, node()) of
{new, Q} ->
{reply, {ok, amqqueue:get_type_state(Q)}, State};
{reply, {ok, amqqueue:get_type_state(Q)},
State};
{existing, _} ->
{reply, {error, reference_already_exists}, State};
{reply, {error, reference_already_exists},
State};
{error, Err} ->
rabbit_log:warning("Error while creating ~p stream, ~p~n", [Reference, Err]),
rabbit_log:warning("Error while creating ~p stream, ~p~n",
[Reference, Err]),
{reply, {error, internal_error}, State}
end
catch
exit:Error ->
rabbit_log:info("Error while creating ~p stream, ~p~n", [Reference, Error]),
rabbit_log:info("Error while creating ~p stream, ~p~n",
[Reference, Error]),
{reply, {error, internal_error}, State}
end;
{error, {absent, _, Reason}} ->
rabbit_log:warning("Error while creating ~p stream, ~p~n", [Reference, Reason]),
rabbit_log:warning("Error while creating ~p stream, ~p~n",
[Reference, Reason]),
{reply, {error, internal_error}, State}
end;
error ->
{reply, {error, validation_failed}, State}
end;
handle_call({delete, VirtualHost, Reference, Username}, _From, State) ->
Name = #resource{virtual_host = VirtualHost, kind = queue, name = Reference},
handle_call({delete, VirtualHost, Reference, Username}, _From,
State) ->
Name =
#resource{virtual_host = VirtualHost,
kind = queue,
name = Reference},
rabbit_log:debug("Trying to delete stream ~p~n", [Reference]),
case rabbit_amqqueue:lookup(Name) of
{ok, Q} ->
rabbit_log:debug("Found queue record ~p, checking if it is a stream~n", [Reference]),
rabbit_log:debug("Found queue record ~p, checking if it is a stream~n",
[Reference]),
case is_stream_queue(Q) of
true ->
rabbit_log:debug("Queue record ~p is a stream, trying to delete it~n", [Reference]),
{ok, _} = rabbit_stream_queue:delete(Q, false, false, Username),
rabbit_log:debug("Queue record ~p is a stream, trying to delete "
"it~n",
[Reference]),
{ok, _} =
rabbit_stream_queue:delete(Q, false, false, Username),
rabbit_log:debug("Stream ~p deleted~n", [Reference]),
{reply, {ok, deleted}, State};
_ ->
rabbit_log:debug("Queue record ~p is NOT a stream, returning error~n", [Reference]),
rabbit_log:debug("Queue record ~p is NOT a stream, returning error~n",
[Reference]),
{reply, {error, reference_not_found}, State}
end;
{error, not_found} ->
rabbit_log:debug("Stream ~p not found, cannot delete it~n", [Reference]),
rabbit_log:debug("Stream ~p not found, cannot delete it~n",
[Reference]),
{reply, {error, reference_not_found}, State}
end;
handle_call({lookup_leader, VirtualHost, Stream}, _From, State) ->
Name = #resource{virtual_host = VirtualHost, kind = queue, name = Stream},
Name =
#resource{virtual_host = VirtualHost,
kind = queue,
name = Stream},
Res = case rabbit_amqqueue:lookup(Name) of
{ok, Q} ->
case is_stream_queue(Q) of
true ->
#{leader_pid := LeaderPid} = amqqueue:get_type_state(Q),
#{leader_pid := LeaderPid} =
amqqueue:get_type_state(Q),
% FIXME check if pid is alive in case of stale information
LeaderPid;
_ ->
@ -185,21 +240,28 @@ handle_call({lookup_leader, VirtualHost, Stream}, _From, State) ->
cluster_not_found
end,
{reply, Res, State};
handle_call({lookup_local_member, VirtualHost, Stream}, _From, State) ->
Name = #resource{virtual_host = VirtualHost, kind = queue, name = Stream},
handle_call({lookup_local_member, VirtualHost, Stream}, _From,
State) ->
Name =
#resource{virtual_host = VirtualHost,
kind = queue,
name = Stream},
Res = case rabbit_amqqueue:lookup(Name) of
{ok, Q} ->
case is_stream_queue(Q) of
true ->
#{leader_pid := LeaderPid, replica_pids := ReplicaPids} = amqqueue:get_type_state(Q),
LocalMember = lists:foldl(fun(Pid, Acc) ->
case node(Pid) =:= node() of
true ->
Pid;
false ->
Acc
end
end, undefined, [LeaderPid] ++ ReplicaPids),
#{leader_pid := LeaderPid,
replica_pids := ReplicaPids} =
amqqueue:get_type_state(Q),
LocalMember =
lists:foldl(fun(Pid, Acc) ->
case node(Pid) =:= node() of
true -> Pid;
false -> Acc
end
end,
undefined,
[LeaderPid] ++ ReplicaPids),
% FIXME check if pid is alive in case of stale information
case LocalMember of
undefined ->
@ -220,30 +282,42 @@ handle_call({lookup_local_member, VirtualHost, Stream}, _From, State) ->
end,
{reply, Res, State};
handle_call({topology, VirtualHost, Stream}, _From, State) ->
Name = #resource{virtual_host = VirtualHost, kind = queue, name = Stream},
Name =
#resource{virtual_host = VirtualHost,
kind = queue,
name = Stream},
Res = case rabbit_amqqueue:lookup(Name) of
{ok, Q} ->
case is_stream_queue(Q) of
true ->
QState = amqqueue:get_type_state(Q),
ProcessAliveFun = fun(Pid) ->
rpc:call(node(Pid), erlang, is_process_alive, [Pid], 10000)
end,
LeaderNode = case ProcessAliveFun(maps:get(leader_pid, QState)) of
true ->
maps:get(leader_node, QState);
_ ->
undefined
end,
ReplicaNodes = lists:foldl(fun(Pid, Acc) ->
case ProcessAliveFun(Pid) of
ProcessAliveFun =
fun(Pid) ->
rpc:call(node(Pid),
erlang,
is_process_alive,
[Pid],
10000)
end,
LeaderNode =
case ProcessAliveFun(maps:get(leader_pid, QState))
of
true ->
Acc ++ [node(Pid)];
maps:get(leader_node, QState);
_ ->
Acc
end
end, [], maps:get(replica_pids, QState)),
{ok, #{leader_node => LeaderNode, replica_nodes => ReplicaNodes}};
undefined
end,
ReplicaNodes =
lists:foldl(fun(Pid, Acc) ->
case ProcessAliveFun(Pid) of
true -> Acc ++ [node(Pid)];
_ -> Acc
end
end,
[], maps:get(replica_pids, QState)),
{ok,
#{leader_node => LeaderNode,
replica_nodes => ReplicaNodes}};
_ ->
{error, stream_not_found}
end;
@ -272,4 +346,4 @@ is_stream_queue(Q) ->
true;
_ ->
false
end.
end.

View File

@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is Pivotal Software, Inc.
%% Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved.
%% Copyright (c) 2020-2021 VMware, Inc. or its affiliates. All rights reserved.
%%
-module(rabbit_stream_metrics).
@ -20,47 +20,82 @@
%% API
-export([init/0]).
-export([consumer_created/6, consumer_updated/6, consumer_cancelled/3]).
-export([publisher_created/4, publisher_updated/7, publisher_deleted/3]).
-export([consumer_created/6,
consumer_updated/6,
consumer_cancelled/3]).
-export([publisher_created/4,
publisher_updated/7,
publisher_deleted/3]).
init() ->
rabbit_core_metrics:create_table({?TABLE_CONSUMER, set}),
rabbit_core_metrics:create_table({?TABLE_PUBLISHER, set}),
ok.
consumer_created(Connection, StreamResource, SubscriptionId, Credits, MessageCount, Offset) ->
Values = [{credits, Credits}, {consumed, MessageCount}, {offset, Offset}],
ets:insert(?TABLE_CONSUMER, {{StreamResource, Connection, SubscriptionId}, Values}),
consumer_created(Connection,
StreamResource,
SubscriptionId,
Credits,
MessageCount,
Offset) ->
Values =
[{credits, Credits}, {consumed, MessageCount}, {offset, Offset}],
ets:insert(?TABLE_CONSUMER,
{{StreamResource, Connection, SubscriptionId}, Values}),
ok.
consumer_updated(Connection, StreamResource, SubscriptionId, Credits, MessageCount, Offset) ->
Values = [{credits, Credits}, {consumed, MessageCount}, {offset, Offset}],
ets:insert(?TABLE_CONSUMER, {{StreamResource, Connection, SubscriptionId}, Values}),
consumer_updated(Connection,
StreamResource,
SubscriptionId,
Credits,
MessageCount,
Offset) ->
Values =
[{credits, Credits}, {consumed, MessageCount}, {offset, Offset}],
ets:insert(?TABLE_CONSUMER,
{{StreamResource, Connection, SubscriptionId}, Values}),
ok.
consumer_cancelled(Connection, StreamResource, SubscriptionId) ->
ets:delete(?TABLE_CONSUMER, {StreamResource, Connection, SubscriptionId}),
ets:delete(?TABLE_CONSUMER,
{StreamResource, Connection, SubscriptionId}),
ok.
publisher_created(Connection, StreamResource, PublisherId, Reference) ->
Values = [
{reference, format_publisher_reference(Reference)},
{published, 0}, {confirmed, 0}, {errored, 0}],
ets:insert(?TABLE_PUBLISHER, {{StreamResource, Connection, PublisherId}, Values}),
ok.
publisher_created(Connection,
StreamResource,
PublisherId,
Reference) ->
Values =
[{reference, format_publisher_reference(Reference)},
{published, 0},
{confirmed, 0},
{errored, 0}],
ets:insert(?TABLE_PUBLISHER,
{{StreamResource, Connection, PublisherId}, Values}),
ok.
publisher_updated(Connection, StreamResource, PublisherId, Reference, Published, Confirmed, Errored) ->
Values = [
{reference, format_publisher_reference(Reference)},
{published, Published}, {confirmed, Confirmed}, {errored, Errored}],
ets:insert(?TABLE_PUBLISHER, {{StreamResource, Connection, PublisherId}, Values}),
publisher_updated(Connection,
StreamResource,
PublisherId,
Reference,
Published,
Confirmed,
Errored) ->
Values =
[{reference, format_publisher_reference(Reference)},
{published, Published},
{confirmed, Confirmed},
{errored, Errored}],
ets:insert(?TABLE_PUBLISHER,
{{StreamResource, Connection, PublisherId}, Values}),
ok.
publisher_deleted(Connection, StreamResource, PublisherId) ->
ets:delete(?TABLE_PUBLISHER, {StreamResource, Connection, PublisherId}),
ok.
ets:delete(?TABLE_PUBLISHER,
{StreamResource, Connection, PublisherId}),
ok.
format_publisher_reference(undefined) ->
<<"">>;
<<"">>;
format_publisher_reference(Ref) when is_binary(Ref) ->
Ref.
Ref.

View File

@ -11,28 +11,30 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is Pivotal Software, Inc.
%% Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved.
%% Copyright (c) 2020-2021 VMware, Inc. or its affiliates. All rights reserved.
%%
-module(rabbit_stream_metrics_gc).
-include_lib("rabbitmq_stream/include/rabbit_stream_metrics.hrl").
-record(state, {timer,
interval
}).
-record(state, {timer, interval}).
-export([start_link/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).
-spec start_link() -> rabbit_types:ok_pid_or_error().
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
init(_) ->
Interval = rabbit_misc:get_env(rabbit, core_metrics_gc_interval, 120000),
Interval =
rabbit_misc:get_env(rabbit, core_metrics_gc_interval, 120000),
{ok, start_timer(#state{interval = Interval})}.
handle_call(which_children, _From, State) ->
@ -42,7 +44,9 @@ handle_cast(_Request, State) ->
{noreply, State}.
handle_info(start_gc, State) ->
GbSet = gb_sets:from_list(rabbit_amqqueue:list_names()),
GbSet =
gb_sets:from_list(
rabbit_amqqueue:list_names()),
gc_process_and_entity(?TABLE_CONSUMER, GbSet),
gc_process_and_entity(?TABLE_PUBLISHER, GbSet),
{noreply, start_timer(State)}.
@ -60,15 +64,17 @@ start_timer(#state{interval = Interval} = St) ->
gc_process_and_entity(Table, GbSet) ->
ets:foldl(fun({{Id, Pid, _} = Key, _}, none) ->
gc_process_and_entity(Id, Pid, Table, Key, GbSet)
end, none, Table).
gc_process_and_entity(Id, Pid, Table, Key, GbSet)
end,
none, Table).
gc_process_and_entity(Id, Pid, Table, Key, GbSet) ->
case rabbit_misc:is_process_alive(Pid) andalso gb_sets:is_member(Id, GbSet) of
case rabbit_misc:is_process_alive(Pid)
andalso gb_sets:is_member(Id, GbSet)
of
true ->
none;
false ->
ets:delete(Table, Key),
none
end.

File diff suppressed because it is too large Load Diff

View File

@ -11,10 +11,11 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is Pivotal Software, Inc.
%% Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved.
%% Copyright (c) 2020-2021 VMware, Inc. or its affiliates. All rights reserved.
%%
-module(rabbit_stream_sup).
-behaviour(supervisor).
-export([start_link/0]).
@ -27,41 +28,59 @@ start_link() ->
init([]) ->
{ok, Listeners} = application:get_env(rabbitmq_stream, tcp_listeners),
NumTcpAcceptors = application:get_env(rabbitmq_stream, num_tcp_acceptors, 10),
{ok, SocketOpts} = application:get_env(rabbitmq_stream, tcp_listen_options),
NumTcpAcceptors =
application:get_env(rabbitmq_stream, num_tcp_acceptors, 10),
{ok, SocketOpts} =
application:get_env(rabbitmq_stream, tcp_listen_options),
Nodes = rabbit_mnesia:cluster_nodes(all),
OsirisConf = #{nodes => Nodes},
ServerConfiguration = #{
initial_credits => application:get_env(rabbitmq_stream, initial_credits, ?DEFAULT_INITIAL_CREDITS),
credits_required_for_unblocking => application:get_env(rabbitmq_stream, credits_required_for_unblocking, ?DEFAULT_CREDITS_REQUIRED_FOR_UNBLOCKING),
frame_max => application:get_env(rabbit_stream, frame_max, ?DEFAULT_FRAME_MAX),
heartbeat => application:get_env(rabbit_stream, heartbeat, ?DEFAULT_HEARTBEAT)
},
ServerConfiguration =
#{initial_credits =>
application:get_env(rabbitmq_stream, initial_credits,
?DEFAULT_INITIAL_CREDITS),
credits_required_for_unblocking =>
application:get_env(rabbitmq_stream,
credits_required_for_unblocking,
?DEFAULT_CREDITS_REQUIRED_FOR_UNBLOCKING),
frame_max =>
application:get_env(rabbit_stream, frame_max, ?DEFAULT_FRAME_MAX),
heartbeat =>
application:get_env(rabbit_stream, heartbeat,
?DEFAULT_HEARTBEAT)},
StreamManager = #{id => rabbit_stream_manager,
type => worker,
start => {rabbit_stream_manager, start_link, [OsirisConf]}},
StreamManager =
#{id => rabbit_stream_manager,
type => worker,
start => {rabbit_stream_manager, start_link, [OsirisConf]}},
MetricsGc = #{
id => rabbit_stream_metrics_gc_sup,
type => worker,
start => {rabbit_stream_metrics_gc, start_link, []}
},
MetricsGc =
#{id => rabbit_stream_metrics_gc_sup,
type => worker,
start => {rabbit_stream_metrics_gc, start_link, []}},
{ok, {{one_for_all, 10, 10},
[StreamManager, MetricsGc] ++
listener_specs(fun tcp_listener_spec/1,
[SocketOpts, ServerConfiguration, NumTcpAcceptors], Listeners)}}.
{ok,
{{one_for_all, 10, 10},
[StreamManager, MetricsGc]
++ listener_specs(fun tcp_listener_spec/1,
[SocketOpts, ServerConfiguration, NumTcpAcceptors],
Listeners)}}.
listener_specs(Fun, Args, Listeners) ->
[Fun([Address | Args]) ||
Listener <- Listeners,
[Fun([Address | Args])
|| Listener <- Listeners,
Address <- rabbit_networking:tcp_listener_addresses(Listener)].
tcp_listener_spec([Address, SocketOpts, Configuration, NumAcceptors]) ->
rabbit_networking:tcp_listener_spec(
rabbit_stream_listener_sup, Address, SocketOpts,
ranch_tcp, rabbit_stream_connection_sup, Configuration,
stream, NumAcceptors, "Stream TCP listener").
tcp_listener_spec([Address,
SocketOpts,
Configuration,
NumAcceptors]) ->
rabbit_networking:tcp_listener_spec(rabbit_stream_listener_sup,
Address,
SocketOpts,
ranch_tcp,
rabbit_stream_connection_sup,
Configuration,
stream,
NumAcceptors,
"Stream TCP listener").

View File

@ -11,124 +11,195 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is Pivotal Software, Inc.
%% Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved.
%% Copyright (c) 2020-2021 VMware, Inc. or its affiliates. All rights reserved.
%%
-module(rabbit_stream_utils).
%% API
-export([enforce_correct_stream_name/1, write_messages/4, parse_map/2,
auth_mechanisms/1, auth_mechanism_to_module/2,
check_configure_permitted/3, check_write_permitted/3, check_read_permitted/3,
-export([enforce_correct_stream_name/1,
write_messages/4,
parse_map/2,
auth_mechanisms/1,
auth_mechanism_to_module/2,
check_configure_permitted/3,
check_write_permitted/3,
check_read_permitted/3,
extract_stream_list/2]).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
enforce_correct_stream_name(Name) ->
% from rabbit_channel
StrippedName = binary:replace(Name, [<<"\n">>, <<"\r">>], <<"">>, [global]),
case check_name(StrippedName) of
ok ->
{ok, StrippedName};
error ->
error
end.
% from rabbit_channel
StrippedName =
binary:replace(Name, [<<"\n">>, <<"\r">>], <<"">>, [global]),
case check_name(StrippedName) of
ok ->
{ok, StrippedName};
error ->
error
end.
check_name(<<"amq.", _/binary>>) ->
error;
error;
check_name(<<"">>) ->
error;
error;
check_name(_Name) ->
ok.
ok.
write_messages(_ClusterLeader, undefined, _PublisherId, <<>>) ->
ok;
write_messages(ClusterLeader, undefined, PublisherId, <<PublishingId:64, 0:1, MessageSize:31, Message:MessageSize/binary, Rest/binary>>) ->
% FIXME handle write error
ok = osiris:write(ClusterLeader, undefined, {PublisherId, PublishingId}, Message),
write_messages(ClusterLeader, undefined, PublisherId, Rest);
write_messages(ClusterLeader, undefined, PublisherId, <<PublishingId:64, 1:1, CompressionType:3, _Unused:4, MessageCount:16, BatchSize:32, Batch:BatchSize/binary, Rest/binary>>) ->
% FIXME handle write error
ok = osiris:write(ClusterLeader, undefined, {PublisherId, PublishingId}, {batch, MessageCount, CompressionType, Batch}),
write_messages(ClusterLeader, undefined, PublisherId, Rest);
ok;
write_messages(ClusterLeader,
undefined,
PublisherId,
<<PublishingId:64,
0:1,
MessageSize:31,
Message:MessageSize/binary,
Rest/binary>>) ->
% FIXME handle write error
ok =
osiris:write(ClusterLeader,
undefined,
{PublisherId, PublishingId},
Message),
write_messages(ClusterLeader, undefined, PublisherId, Rest);
write_messages(ClusterLeader,
undefined,
PublisherId,
<<PublishingId:64,
1:1,
CompressionType:3,
_Unused:4,
MessageCount:16,
BatchSize:32,
Batch:BatchSize/binary,
Rest/binary>>) ->
% FIXME handle write error
ok =
osiris:write(ClusterLeader,
undefined,
{PublisherId, PublishingId},
{batch, MessageCount, CompressionType, Batch}),
write_messages(ClusterLeader, undefined, PublisherId, Rest);
write_messages(_ClusterLeader, _PublisherRef, _PublisherId, <<>>) ->
ok;
write_messages(ClusterLeader, PublisherRef, PublisherId, <<PublishingId:64, 0:1, MessageSize:31, Message:MessageSize/binary, Rest/binary>>) ->
% FIXME handle write error
ok = osiris:write(ClusterLeader, PublisherRef, PublishingId, Message),
write_messages(ClusterLeader, PublisherRef, PublisherId, Rest);
write_messages(ClusterLeader, PublisherRef, PublisherId, <<PublishingId:64, 1:1, CompressionType:3, _Unused:4, MessageCount:16, BatchSize:32, Batch:BatchSize/binary, Rest/binary>>) ->
% FIXME handle write error
ok = osiris:write(ClusterLeader, PublisherRef, PublishingId, {batch, MessageCount, CompressionType, Batch}),
write_messages(ClusterLeader, PublisherRef, PublisherId, Rest).
ok;
write_messages(ClusterLeader,
PublisherRef,
PublisherId,
<<PublishingId:64,
0:1,
MessageSize:31,
Message:MessageSize/binary,
Rest/binary>>) ->
% FIXME handle write error
ok = osiris:write(ClusterLeader, PublisherRef, PublishingId, Message),
write_messages(ClusterLeader, PublisherRef, PublisherId, Rest);
write_messages(ClusterLeader,
PublisherRef,
PublisherId,
<<PublishingId:64,
1:1,
CompressionType:3,
_Unused:4,
MessageCount:16,
BatchSize:32,
Batch:BatchSize/binary,
Rest/binary>>) ->
% FIXME handle write error
ok =
osiris:write(ClusterLeader,
PublisherRef,
PublishingId,
{batch, MessageCount, CompressionType, Batch}),
write_messages(ClusterLeader, PublisherRef, PublisherId, Rest).
parse_map(<<>>, _Count) ->
{#{}, <<>>};
{#{}, <<>>};
parse_map(Content, 0) ->
{#{}, Content};
{#{}, Content};
parse_map(Arguments, Count) ->
parse_map(#{}, Arguments, Count).
parse_map(#{}, Arguments, Count).
parse_map(Acc, <<>>, _Count) ->
{Acc, <<>>};
{Acc, <<>>};
parse_map(Acc, Content, 0) ->
{Acc, Content};
parse_map(Acc, <<KeySize:16, Key:KeySize/binary, ValueSize:16, Value:ValueSize/binary, Rest/binary>>, Count) ->
parse_map(maps:put(Key, Value, Acc), Rest, Count - 1).
{Acc, Content};
parse_map(Acc,
<<KeySize:16,
Key:KeySize/binary,
ValueSize:16,
Value:ValueSize/binary,
Rest/binary>>,
Count) ->
parse_map(maps:put(Key, Value, Acc), Rest, Count - 1).
auth_mechanisms(Sock) ->
{ok, Configured} = application:get_env(rabbit, auth_mechanisms),
[rabbit_data_coercion:to_binary(Name) || {Name, Module} <- rabbit_registry:lookup_all(auth_mechanism),
Module:should_offer(Sock), lists:member(Name, Configured)].
{ok, Configured} = application:get_env(rabbit, auth_mechanisms),
[rabbit_data_coercion:to_binary(Name)
|| {Name, Module} <- rabbit_registry:lookup_all(auth_mechanism),
Module:should_offer(Sock), lists:member(Name, Configured)].
auth_mechanism_to_module(TypeBin, Sock) ->
case rabbit_registry:binary_to_type(TypeBin) of
{error, not_found} ->
rabbit_log:warning("Unknown authentication mechanism '~p'~n", [TypeBin]),
{error, not_found};
T ->
case {lists:member(TypeBin, rabbit_stream_utils:auth_mechanisms(Sock)),
rabbit_registry:lookup_module(auth_mechanism, T)} of
{true, {ok, Module}} ->
{ok, Module};
_ ->
rabbit_log:warning("Invalid authentication mechanism '~p'~n", [T]),
{error, invalid}
end
end.
case rabbit_registry:binary_to_type(TypeBin) of
{error, not_found} ->
rabbit_log:warning("Unknown authentication mechanism '~p'~n",
[TypeBin]),
{error, not_found};
T ->
case {lists:member(TypeBin,
rabbit_stream_utils:auth_mechanisms(Sock)),
rabbit_registry:lookup_module(auth_mechanism, T)}
of
{true, {ok, Module}} ->
{ok, Module};
_ ->
rabbit_log:warning("Invalid authentication mechanism '~p'~n",
[T]),
{error, invalid}
end
end.
check_resource_access(User, Resource, Perm, Context) ->
V = {Resource, Context, Perm},
V = {Resource, Context, Perm},
Cache = case get(permission_cache) of
undefined -> [];
Other -> Other
end,
case lists:member(V, Cache) of
true -> ok;
false ->
try
rabbit_access_control:check_resource_access(
User, Resource, Perm, Context),
CacheTail = lists:sublist(Cache, ?MAX_PERMISSION_CACHE_SIZE - 1),
put(permission_cache, [V | CacheTail]),
ok
catch
exit:_ ->
error
end
end.
Cache =
case get(permission_cache) of
undefined ->
[];
Other ->
Other
end,
case lists:member(V, Cache) of
true ->
ok;
false ->
try
rabbit_access_control:check_resource_access(User,
Resource,
Perm,
Context),
CacheTail =
lists:sublist(Cache, ?MAX_PERMISSION_CACHE_SIZE - 1),
put(permission_cache, [V | CacheTail]),
ok
catch
exit:_ ->
error
end
end.
check_configure_permitted(Resource, User, Context) ->
check_resource_access(User, Resource, configure, Context).
check_resource_access(User, Resource, configure, Context).
check_write_permitted(Resource, User, Context) ->
check_resource_access(User, Resource, write, Context).
check_resource_access(User, Resource, write, Context).
check_read_permitted(Resource, User, Context) ->
check_resource_access(User, Resource, read, Context).
check_resource_access(User, Resource, read, Context).
extract_stream_list(<<>>, Streams) ->
Streams;
extract_stream_list(<<Length:16, Stream:Length/binary, Rest/binary>>, Streams) ->
extract_stream_list(Rest, [Stream | Streams]).
Streams;
extract_stream_list(<<Length:16, Stream:Length/binary, Rest/binary>>,
Streams) ->
extract_stream_list(Rest, [Stream | Streams]).

View File

@ -2,43 +2,39 @@
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
%% Copyright (c) 2007-2020-2021 VMware, Inc. or its affiliates. All rights reserved.
%%
-module(command_SUITE).
-compile([export_all]).
-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
-include("rabbit_stream.hrl").
-define(COMMAND, 'Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConnectionsCommand').
-define(COMMAND,
'Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConnectionsCommand').
all() ->
[
{group, non_parallel_tests}
].
[{group, non_parallel_tests}].
groups() ->
[
{non_parallel_tests, [], [
merge_defaults,
run
]}
].
[{non_parallel_tests, [], [merge_defaults, run]}].
init_per_suite(Config) ->
Config1 = rabbit_ct_helpers:set_config(Config,
[{rmq_nodename_suffix, ?MODULE}]),
Config1 =
rabbit_ct_helpers:set_config(Config,
[{rmq_nodename_suffix, ?MODULE}]),
rabbit_ct_helpers:log_environment(),
rabbit_ct_helpers:run_setup_steps(Config1,
rabbit_ct_broker_helpers:setup_steps()).
rabbit_ct_broker_helpers:setup_steps()).
end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config,
rabbit_ct_broker_helpers:teardown_steps()).
rabbit_ct_broker_helpers:teardown_steps()).
init_per_group(_, Config) ->
Config.
@ -62,14 +58,17 @@ merge_defaults(_Config) ->
{[<<"other_key">>], #{verbose := false}} =
?COMMAND:merge_defaults([<<"other_key">>], #{verbose => false}).
run(Config) ->
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Opts = #{node => Node, timeout => 10000, verbose => false},
Opts =
#{node => Node,
timeout => 10000,
verbose => false},
%% No connections
[] = 'Elixir.Enum':to_list(?COMMAND:run([], Opts)),
[] =
'Elixir.Enum':to_list(
?COMMAND:run([], Opts)),
StreamPort = rabbit_stream_SUITE:get_stream_port(Config),
@ -77,31 +76,41 @@ run(Config) ->
ct:sleep(100),
[[{conn_name, _}]] =
'Elixir.Enum':to_list(?COMMAND:run([<<"conn_name">>], Opts)),
'Elixir.Enum':to_list(
?COMMAND:run([<<"conn_name">>], Opts)),
S2 = start_stream_connection(StreamPort),
ct:sleep(100),
[[{conn_name, _}], [{conn_name, _}]] =
'Elixir.Enum':to_list(?COMMAND:run([<<"conn_name">>], Opts)),
'Elixir.Enum':to_list(
?COMMAND:run([<<"conn_name">>], Opts)),
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
Port =
rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
start_amqp_connection(network, Node, Port),
%% There are still just two connections
[[{conn_name, _}], [{conn_name, _}]] =
'Elixir.Enum':to_list(?COMMAND:run([<<"conn_name">>], Opts)),
'Elixir.Enum':to_list(
?COMMAND:run([<<"conn_name">>], Opts)),
start_amqp_connection(direct, Node, Port),
%% Still two MQTT connections, one direct AMQP 0-9-1 connection
[[{conn_name, _}], [{conn_name, _}]] =
'Elixir.Enum':to_list(?COMMAND:run([<<"conn_name">>], Opts)),
'Elixir.Enum':to_list(
?COMMAND:run([<<"conn_name">>], Opts)),
%% Verbose returns all keys
Infos = lists:map(fun(El) -> atom_to_binary(El, utf8) end, ?INFO_ITEMS),
AllKeys = 'Elixir.Enum':to_list(?COMMAND:run(Infos, Opts)),
AllKeys = 'Elixir.Enum':to_list(?COMMAND:run([], Opts#{verbose => true})),
Infos =
lists:map(fun(El) -> atom_to_binary(El, utf8) end, ?INFO_ITEMS),
AllKeys =
'Elixir.Enum':to_list(
?COMMAND:run(Infos, Opts)),
AllKeys =
'Elixir.Enum':to_list(
?COMMAND:run([], Opts#{verbose => true})),
%% There are two connections
[First, _Second] = AllKeys,
@ -120,8 +129,8 @@ run(Config) ->
ok.
start_stream_connection(Port) ->
{ok, S} = gen_tcp:connect("localhost", Port, [{active, false},
{mode, binary}]),
{ok, S} =
gen_tcp:connect("localhost", Port, [{active, false}, {mode, binary}]),
rabbit_stream_SUITE:test_peer_properties(S),
rabbit_stream_SUITE:test_authenticate(S),
S.

View File

@ -2,7 +2,7 @@
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved.
%% Copyright (c) 2020-2021 VMware, Inc. or its affiliates. All rights reserved.
%%
-module(config_schema_SUITE).
@ -10,44 +10,47 @@
-compile(export_all).
all() ->
[
run_snippets
].
[run_snippets].
%% -------------------------------------------------------------------
%% Testsuite setup/teardown.
%% -------------------------------------------------------------------
init_per_suite(Config) ->
rabbit_ct_helpers:log_environment(),
Config1 = rabbit_ct_helpers:run_setup_steps(Config),
rabbit_ct_config_schema:init_schemas(rabbitmq_stream, Config1).
rabbit_ct_helpers:log_environment(),
Config1 = rabbit_ct_helpers:run_setup_steps(Config),
rabbit_ct_config_schema:init_schemas(rabbitmq_stream, Config1).
end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config).
rabbit_ct_helpers:run_teardown_steps(Config).
init_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_started(Config, Testcase),
Config1 = rabbit_ct_helpers:set_config(Config, [
{rmq_nodename_suffix, Testcase}
]),
rabbit_ct_helpers:run_steps(Config1,
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps()).
rabbit_ct_helpers:testcase_started(Config, Testcase),
Config1 =
rabbit_ct_helpers:set_config(Config,
[{rmq_nodename_suffix, Testcase}]),
rabbit_ct_helpers:run_steps(Config1,
rabbit_ct_broker_helpers:setup_steps()
++ rabbit_ct_client_helpers:setup_steps()).
end_per_testcase(Testcase, Config) ->
Config1 = rabbit_ct_helpers:run_steps(Config,
rabbit_ct_client_helpers:teardown_steps() ++
rabbit_ct_broker_helpers:teardown_steps()),
rabbit_ct_helpers:testcase_finished(Config1, Testcase).
Config1 =
rabbit_ct_helpers:run_steps(Config,
rabbit_ct_client_helpers:teardown_steps()
++ rabbit_ct_broker_helpers:teardown_steps()),
rabbit_ct_helpers:testcase_finished(Config1, Testcase).
%% -------------------------------------------------------------------
%% Testcases.
%% -------------------------------------------------------------------
run_snippets(Config) ->
ok = rabbit_ct_broker_helpers:rpc(Config, 0,
?MODULE, run_snippets1, [Config]).
ok =
rabbit_ct_broker_helpers:rpc(Config,
0,
?MODULE,
run_snippets1,
[Config]).
run_snippets1(Config) ->
rabbit_ct_config_schema:run_snippets(Config).
rabbit_ct_config_schema:run_snippets(Config).

View File

@ -11,29 +11,26 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is Pivotal Software, Inc.
%% Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved.
%% Copyright (c) 2020-2021 VMware, Inc. or its affiliates. All rights reserved.
%%
-module(rabbit_stream_SUITE).
-include_lib("common_test/include/ct.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
-include("rabbit_stream.hrl").
-include("rabbit_stream_metrics.hrl").
-compile(export_all).
all() ->
[
{group, single_node},
{group, cluster}
].
[{group, single_node}, {group, cluster}].
groups() ->
[
{single_node, [], [test_stream, test_gc_consumers, test_gc_publishers]},
{cluster, [], [test_stream, java]}
].
[{single_node, [],
[test_stream, test_gc_consumers, test_gc_publishers]},
{cluster, [], [test_stream, java]}].
init_per_suite(Config) ->
rabbit_ct_helpers:log_environment(),
@ -43,25 +40,32 @@ end_per_suite(Config) ->
Config.
init_per_group(single_node, Config) ->
Config1 = rabbit_ct_helpers:set_config(Config, [{rmq_nodes_clustered, false}]),
Config1 =
rabbit_ct_helpers:set_config(Config, [{rmq_nodes_clustered, false}]),
rabbit_ct_helpers:run_setup_steps(Config1,
[fun(StepConfig) ->
rabbit_ct_helpers:merge_app_env(StepConfig,
{rabbit, [{core_metrics_gc_interval, 1000}]})
end] ++
rabbit_ct_broker_helpers:setup_steps());
[fun(StepConfig) ->
rabbit_ct_helpers:merge_app_env(StepConfig,
{rabbit,
[{core_metrics_gc_interval,
1000}]})
end]
++ rabbit_ct_broker_helpers:setup_steps());
init_per_group(cluster = Group, Config) ->
Config1 = rabbit_ct_helpers:set_config(Config, [{rmq_nodes_clustered, true}]),
Config2 = rabbit_ct_helpers:set_config(Config1,
[{rmq_nodes_count, 3},
{rmq_nodename_suffix, Group},
{tcp_ports_base}]),
Config1 =
rabbit_ct_helpers:set_config(Config, [{rmq_nodes_clustered, true}]),
Config2 =
rabbit_ct_helpers:set_config(Config1,
[{rmq_nodes_count, 3},
{rmq_nodename_suffix, Group},
{tcp_ports_base}]),
rabbit_ct_helpers:run_setup_steps(Config2,
[fun(StepConfig) ->
rabbit_ct_helpers:merge_app_env(StepConfig,
{aten, [{poll_interval, 1000}]})
end] ++
rabbit_ct_broker_helpers:setup_steps());
[fun(StepConfig) ->
rabbit_ct_helpers:merge_app_env(StepConfig,
{aten,
[{poll_interval,
1000}]})
end]
++ rabbit_ct_broker_helpers:setup_steps());
init_per_group(_, Config) ->
rabbit_ct_helpers:run_setup_steps(Config).
@ -69,7 +73,7 @@ end_per_group(java, Config) ->
rabbit_ct_helpers:run_teardown_steps(Config);
end_per_group(_, Config) ->
rabbit_ct_helpers:run_steps(Config,
rabbit_ct_broker_helpers:teardown_steps()).
rabbit_ct_broker_helpers:teardown_steps()).
init_per_testcase(_TestCase, Config) ->
Config.
@ -84,40 +88,56 @@ test_stream(Config) ->
test_gc_consumers(Config) ->
Pid = spawn(fun() -> ok end),
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_stream_metrics, consumer_created,
[Pid, #resource{name = <<"test">>, kind = queue, virtual_host = <<"/">>}, 0, 10, 0, 0]
),
rabbit_ct_broker_helpers:rpc(Config,
0,
rabbit_stream_metrics,
consumer_created,
[Pid,
#resource{name = <<"test">>,
kind = queue,
virtual_host = <<"/">>},
0,
10,
0,
0]),
ok = wait_until(fun() -> consumer_count(Config) == 0 end),
ok.
test_gc_publishers(Config) ->
Pid = spawn(fun() -> ok end),
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_stream_metrics, publisher_created,
[Pid, #resource{name = <<"test">>, kind = queue, virtual_host = <<"/">>}, 0, <<"ref">>]
),
rabbit_ct_broker_helpers:rpc(Config,
0,
rabbit_stream_metrics,
publisher_created,
[Pid,
#resource{name = <<"test">>,
kind = queue,
virtual_host = <<"/">>},
0,
<<"ref">>]),
ok = wait_until(fun() -> publisher_count(Config) == 0 end),
ok.
wait_until(Predicate) ->
Fun = fun(Pid, Fun) ->
case Predicate() of
true ->
Pid ! done,
ok;
_ ->
timer:sleep(100),
Fun(Pid, Fun)
end
end,
case Predicate() of
true ->
Pid ! done,
ok;
_ ->
timer:sleep(100),
Fun(Pid, Fun)
end
end,
CurrentPid = self(),
Pid = spawn(fun() -> Fun(CurrentPid, Fun) end),
Result = receive
done ->
ok
after
5000 ->
failed
end,
Result =
receive
done ->
ok
after 5000 ->
failed
end,
exit(Pid, kill),
Result.
@ -138,13 +158,14 @@ java(Config) ->
Node2Name = get_node_name(Config, 1),
RabbitMqCtl = get_rabbitmqctl(Config),
DataDir = rabbit_ct_helpers:get_config(Config, data_dir),
MakeResult = rabbit_ct_helpers:make(Config, DataDir, ["tests",
{"NODE1_STREAM_PORT=~b", [StreamPortNode1]},
{"NODE1_NAME=~p", [Node1Name]},
{"NODE2_NAME=~p", [Node2Name]},
{"NODE2_STREAM_PORT=~b", [StreamPortNode2]},
{"RABBITMQCTL=~p", [RabbitMqCtl]}
]),
MakeResult =
rabbit_ct_helpers:make(Config, DataDir,
["tests",
{"NODE1_STREAM_PORT=~b", [StreamPortNode1]},
{"NODE1_NAME=~p", [Node1Name]},
{"NODE2_NAME=~p", [Node2Name]},
{"NODE2_STREAM_PORT=~b", [StreamPortNode2]},
{"RABBITMQCTL=~p", [RabbitMqCtl]}]),
{ok, _} = MakeResult.
get_rabbitmqctl(Config) ->
@ -154,7 +175,8 @@ get_stream_port(Config) ->
get_stream_port(Config, 0).
get_stream_port(Config, Node) ->
rabbit_ct_broker_helpers:get_node_config(Config, Node, tcp_port_stream).
rabbit_ct_broker_helpers:get_node_config(Config, Node,
tcp_port_stream).
get_node_name(Config) ->
get_node_name(Config, 0).
@ -163,8 +185,8 @@ get_node_name(Config, Node) ->
rabbit_ct_broker_helpers:get_node_config(Config, Node, nodename).
test_server(Port) ->
{ok, S} = gen_tcp:connect("localhost", Port, [{active, false},
{mode, binary}]),
{ok, S} =
gen_tcp:connect("localhost", Port, [{active, false}, {mode, binary}]),
test_peer_properties(S),
test_authenticate(S),
Stream = <<"stream1">>,
@ -183,29 +205,57 @@ test_server(Port) ->
ok.
test_peer_properties(S) ->
PeerPropertiesFrame = <<?COMMAND_PEER_PROPERTIES:16, ?VERSION_0:16, 1:32, 0:32>>,
PeerPropertiesFrame =
<<?COMMAND_PEER_PROPERTIES:16, ?VERSION_0:16, 1:32, 0:32>>,
PeerPropertiesFrameSize = byte_size(PeerPropertiesFrame),
gen_tcp:send(S, <<PeerPropertiesFrameSize:32, PeerPropertiesFrame/binary>>),
{ok, <<_Size:32, ?COMMAND_PEER_PROPERTIES:16, ?VERSION_0:16, 1:32, ?RESPONSE_CODE_OK:16, _Rest/binary>>} = gen_tcp:recv(S, 0, 5000).
gen_tcp:send(S,
<<PeerPropertiesFrameSize:32, PeerPropertiesFrame/binary>>),
{ok,
<<_Size:32,
?COMMAND_PEER_PROPERTIES:16,
?VERSION_0:16,
1:32,
?RESPONSE_CODE_OK:16,
_Rest/binary>>} =
gen_tcp:recv(S, 0, 5000).
test_authenticate(S) ->
SaslHandshakeFrame = <<?COMMAND_SASL_HANDSHAKE:16, ?VERSION_0:16, 1:32>>,
SaslHandshakeFrame =
<<?COMMAND_SASL_HANDSHAKE:16, ?VERSION_0:16, 1:32>>,
SaslHandshakeFrameSize = byte_size(SaslHandshakeFrame),
gen_tcp:send(S, <<SaslHandshakeFrameSize:32, SaslHandshakeFrame/binary>>),
gen_tcp:send(S,
<<SaslHandshakeFrameSize:32, SaslHandshakeFrame/binary>>),
Plain = <<"PLAIN">>,
AmqPlain = <<"AMQPLAIN">>,
{ok, SaslAvailable} = gen_tcp:recv(S, 0, 5000),
%% mechanisms order is not deterministic, so checking both orders
ok = case SaslAvailable of
<<31:32, ?COMMAND_SASL_HANDSHAKE:16, ?VERSION_0:16, 1:32, ?RESPONSE_CODE_OK:16, 2:32,
5:16, Plain:5/binary, 8:16, AmqPlain:8/binary>> ->
ok;
<<31:32, ?COMMAND_SASL_HANDSHAKE:16, ?VERSION_0:16, 1:32, ?RESPONSE_CODE_OK:16, 2:32,
8:16, AmqPlain:8/binary, 5:16, Plain:5/binary>> ->
ok;
_ ->
failed
end,
ok =
case SaslAvailable of
<<31:32,
?COMMAND_SASL_HANDSHAKE:16,
?VERSION_0:16,
1:32,
?RESPONSE_CODE_OK:16,
2:32,
5:16,
Plain:5/binary,
8:16,
AmqPlain:8/binary>> ->
ok;
<<31:32,
?COMMAND_SASL_HANDSHAKE:16,
?VERSION_0:16,
1:32,
?RESPONSE_CODE_OK:16,
2:32,
8:16,
AmqPlain:8/binary,
5:16,
Plain:5/binary>> ->
ok;
_ ->
failed
end,
Username = <<"guest">>,
Password = <<"guest">>,
@ -213,16 +263,36 @@ test_authenticate(S) ->
PlainSasl = <<Null:8, Username/binary, Null:8, Password/binary>>,
PlainSaslSize = byte_size(PlainSasl),
SaslAuthenticateFrame = <<?COMMAND_SASL_AUTHENTICATE:16, ?VERSION_0:16, 2:32,
5:16, Plain/binary, PlainSaslSize:32, PlainSasl/binary>>,
SaslAuthenticateFrame =
<<?COMMAND_SASL_AUTHENTICATE:16,
?VERSION_0:16,
2:32,
5:16,
Plain/binary,
PlainSaslSize:32,
PlainSasl/binary>>,
SaslAuthenticateFrameSize = byte_size(SaslAuthenticateFrame),
gen_tcp:send(S, <<SaslAuthenticateFrameSize:32, SaslAuthenticateFrame/binary>>),
gen_tcp:send(S,
<<SaslAuthenticateFrameSize:32,
SaslAuthenticateFrame/binary>>),
{ok, <<10:32, ?COMMAND_SASL_AUTHENTICATE:16, ?VERSION_0:16, 2:32, ?RESPONSE_CODE_OK:16, RestTune/binary>>} = gen_tcp:recv(S, 0, 5000),
{ok,
<<10:32,
?COMMAND_SASL_AUTHENTICATE:16,
?VERSION_0:16,
2:32,
?RESPONSE_CODE_OK:16,
RestTune/binary>>} =
gen_tcp:recv(S, 0, 5000),
TuneExpected = <<12:32, ?COMMAND_TUNE:16, ?VERSION_0:16, ?DEFAULT_FRAME_MAX:32, ?DEFAULT_HEARTBEAT:32>>,
TuneExpected =
<<12:32,
?COMMAND_TUNE:16,
?VERSION_0:16,
?DEFAULT_FRAME_MAX:32,
?DEFAULT_HEARTBEAT:32>>,
case RestTune of
<<>> ->
{ok, TuneExpected} = gen_tcp:recv(S, 0, 5000);
@ -230,81 +300,189 @@ test_authenticate(S) ->
TuneExpected = TuneReceived
end,
TuneFrame = <<?COMMAND_TUNE:16, ?VERSION_0:16, ?DEFAULT_FRAME_MAX:32, 0:32>>,
TuneFrame =
<<?COMMAND_TUNE:16, ?VERSION_0:16, ?DEFAULT_FRAME_MAX:32, 0:32>>,
TuneFrameSize = byte_size(TuneFrame),
gen_tcp:send(S, <<TuneFrameSize:32, TuneFrame/binary>>),
VirtualHost = <<"/">>,
VirtualHostLength = byte_size(VirtualHost),
OpenFrame = <<?COMMAND_OPEN:16, ?VERSION_0:16, 3:32, VirtualHostLength:16, VirtualHost/binary>>,
OpenFrame =
<<?COMMAND_OPEN:16,
?VERSION_0:16,
3:32,
VirtualHostLength:16,
VirtualHost/binary>>,
OpenFrameSize = byte_size(OpenFrame),
gen_tcp:send(S, <<OpenFrameSize:32, OpenFrame/binary>>),
{ok, <<10:32, ?COMMAND_OPEN:16, ?VERSION_0:16, 3:32, ?RESPONSE_CODE_OK:16>>} = gen_tcp:recv(S, 0, 5000).
{ok,
<<10:32,
?COMMAND_OPEN:16,
?VERSION_0:16,
3:32,
?RESPONSE_CODE_OK:16>>} =
gen_tcp:recv(S, 0, 5000).
test_create_stream(S, Stream) ->
StreamSize = byte_size(Stream),
CreateStreamFrame = <<?COMMAND_CREATE_STREAM:16, ?VERSION_0:16, 1:32, StreamSize:16, Stream:StreamSize/binary, 0:32>>,
CreateStreamFrame =
<<?COMMAND_CREATE_STREAM:16,
?VERSION_0:16,
1:32,
StreamSize:16,
Stream:StreamSize/binary,
0:32>>,
FrameSize = byte_size(CreateStreamFrame),
gen_tcp:send(S, <<FrameSize:32, CreateStreamFrame/binary>>),
{ok, <<_Size:32, ?COMMAND_CREATE_STREAM:16, ?VERSION_0:16, 1:32, ?RESPONSE_CODE_OK:16>>} = gen_tcp:recv(S, 0, 5000).
{ok,
<<_Size:32,
?COMMAND_CREATE_STREAM:16,
?VERSION_0:16,
1:32,
?RESPONSE_CODE_OK:16>>} =
gen_tcp:recv(S, 0, 5000).
test_delete_stream(S, Stream) ->
StreamSize = byte_size(Stream),
DeleteStreamFrame = <<?COMMAND_DELETE_STREAM:16, ?VERSION_0:16, 1:32, StreamSize:16, Stream:StreamSize/binary>>,
DeleteStreamFrame =
<<?COMMAND_DELETE_STREAM:16,
?VERSION_0:16,
1:32,
StreamSize:16,
Stream:StreamSize/binary>>,
FrameSize = byte_size(DeleteStreamFrame),
gen_tcp:send(S, <<FrameSize:32, DeleteStreamFrame/binary>>),
ResponseFrameSize = 10,
{ok, <<ResponseFrameSize:32, ?COMMAND_DELETE_STREAM:16, ?VERSION_0:16, 1:32, ?RESPONSE_CODE_OK:16>>} = gen_tcp:recv(S, 4 + 10, 5000).
{ok,
<<ResponseFrameSize:32,
?COMMAND_DELETE_STREAM:16,
?VERSION_0:16,
1:32,
?RESPONSE_CODE_OK:16>>} =
gen_tcp:recv(S, 4 + 10, 5000).
test_declare_publisher(S, PublisherId, Stream) ->
StreamSize = byte_size(Stream),
DeclarePublisherFrame = <<?COMMAND_DECLARE_PUBLISHER:16, ?VERSION_0:16, 1:32, PublisherId:8,
0:16, %% empty publisher reference
StreamSize:16, Stream:StreamSize/binary>>,
DeclarePublisherFrame =
<<?COMMAND_DECLARE_PUBLISHER:16,
?VERSION_0:16,
1:32,
PublisherId:8,
0:16, %% empty publisher reference
StreamSize:16,
Stream:StreamSize/binary>>,
FrameSize = byte_size(DeclarePublisherFrame),
gen_tcp:send(S, <<FrameSize:32, DeclarePublisherFrame/binary>>),
Res = gen_tcp:recv(S, 0, 5000),
{ok, <<_Size:32, ?COMMAND_DECLARE_PUBLISHER:16, ?VERSION_0:16, 1:32, ?RESPONSE_CODE_OK:16, Rest/binary>>} = Res,
{ok,
<<_Size:32,
?COMMAND_DECLARE_PUBLISHER:16,
?VERSION_0:16,
1:32,
?RESPONSE_CODE_OK:16,
Rest/binary>>} =
Res,
Rest.
test_publish_confirm(S, PublisherId, Body) ->
BodySize = byte_size(Body),
PublishFrame = <<?COMMAND_PUBLISH:16, ?VERSION_0:16,
PublisherId:8, 1:32, 1:64, BodySize:32, Body:BodySize/binary>>,
PublishFrame =
<<?COMMAND_PUBLISH:16,
?VERSION_0:16,
PublisherId:8,
1:32,
1:64,
BodySize:32,
Body:BodySize/binary>>,
FrameSize = byte_size(PublishFrame),
gen_tcp:send(S, <<FrameSize:32, PublishFrame/binary>>),
{ok, <<_Size:32, ?COMMAND_PUBLISH_CONFIRM:16, ?VERSION_0:16, PublisherId:8, 1:32, 1:64>>} = gen_tcp:recv(S, 0, 5000).
{ok,
<<_Size:32,
?COMMAND_PUBLISH_CONFIRM:16,
?VERSION_0:16,
PublisherId:8,
1:32,
1:64>>} =
gen_tcp:recv(S, 0, 5000).
test_subscribe(S, SubscriptionId, Stream) ->
StreamSize = byte_size(Stream),
SubscribeFrame = <<?COMMAND_SUBSCRIBE:16, ?VERSION_0:16, 1:32, SubscriptionId:8, StreamSize:16, Stream:StreamSize/binary,
?OFFSET_TYPE_OFFSET:16, 0:64, 10:16>>,
SubscribeFrame =
<<?COMMAND_SUBSCRIBE:16,
?VERSION_0:16,
1:32,
SubscriptionId:8,
StreamSize:16,
Stream:StreamSize/binary,
?OFFSET_TYPE_OFFSET:16,
0:64,
10:16>>,
FrameSize = byte_size(SubscribeFrame),
gen_tcp:send(S, <<FrameSize:32, SubscribeFrame/binary>>),
Res = gen_tcp:recv(S, 0, 5000),
{ok, <<_Size:32, ?COMMAND_SUBSCRIBE:16, ?VERSION_0:16, 1:32, ?RESPONSE_CODE_OK:16, Rest/binary>>} = Res,
{ok,
<<_Size:32,
?COMMAND_SUBSCRIBE:16,
?VERSION_0:16,
1:32,
?RESPONSE_CODE_OK:16,
Rest/binary>>} =
Res,
Rest.
test_deliver(S, Rest, SubscriptionId, Body) ->
BodySize = byte_size(Body),
Frame = read_frame(S, Rest),
<<58:32, ?COMMAND_DELIVER:16, ?VERSION_0:16, SubscriptionId:8, 5:4/unsigned, 0:4/unsigned, 0:8,
1:16, 1:32,
_Timestamp:64, _Epoch:64, 0:64, _Crc:32, _DataLength:32, _TrailerLength:32,
0:1, BodySize:31/unsigned, Body/binary>> = Frame.
<<58:32,
?COMMAND_DELIVER:16,
?VERSION_0:16,
SubscriptionId:8,
5:4/unsigned,
0:4/unsigned,
0:8,
1:16,
1:32,
_Timestamp:64,
_Epoch:64,
0:64,
_Crc:32,
_DataLength:32,
_TrailerLength:32,
0:1,
BodySize:31/unsigned,
Body/binary>> =
Frame.
test_metadata_update_stream_deleted(S, Stream) ->
StreamSize = byte_size(Stream),
{ok, <<15:32, ?COMMAND_METADATA_UPDATE:16, ?VERSION_0:16, ?RESPONSE_CODE_STREAM_NOT_AVAILABLE:16, StreamSize:16, Stream/binary>>} = gen_tcp:recv(S, 0, 5000).
{ok,
<<15:32,
?COMMAND_METADATA_UPDATE:16,
?VERSION_0:16,
?RESPONSE_CODE_STREAM_NOT_AVAILABLE:16,
StreamSize:16,
Stream/binary>>} =
gen_tcp:recv(S, 0, 5000).
test_close(S) ->
CloseReason = <<"OK">>,
CloseReasonSize = byte_size(CloseReason),
CloseFrame = <<?COMMAND_CLOSE:16, ?VERSION_0:16, 1:32, ?RESPONSE_CODE_OK:16, CloseReasonSize:16, CloseReason/binary>>,
CloseFrame =
<<?COMMAND_CLOSE:16,
?VERSION_0:16,
1:32,
?RESPONSE_CODE_OK:16,
CloseReasonSize:16,
CloseReason/binary>>,
CloseFrameSize = byte_size(CloseFrame),
gen_tcp:send(S, <<CloseFrameSize:32, CloseFrame/binary>>),
{ok, <<10:32, ?COMMAND_CLOSE:16, ?VERSION_0:16, 1:32, ?RESPONSE_CODE_OK:16>>} = gen_tcp:recv(S, 0, 5000).
{ok,
<<10:32,
?COMMAND_CLOSE:16,
?VERSION_0:16,
1:32,
?RESPONSE_CODE_OK:16>>} =
gen_tcp:recv(S, 0, 5000).
wait_for_socket_close(_S, 0) ->
not_closed;
@ -327,8 +505,7 @@ read_frame(S, Buffer) ->
_ ->
read_frame(S, Data)
end
after
1000 ->
inet:setopts(S, [{active, false}]),
Buffer
end.
after 1000 ->
inet:setopts(S, [{active, false}]),
Buffer
end.

View File

@ -11,7 +11,7 @@
// The Original Code is RabbitMQ.
//
// The Initial Developer of the Original Code is Pivotal Software, Inc.
// Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved.
// Copyright (c) 2020-2021 VMware, Inc. or its affiliates. All rights reserved.
//
package com.rabbitmq.stream;

View File

@ -11,7 +11,7 @@
// The Original Code is RabbitMQ.
//
// The Initial Developer of the Original Code is Pivotal Software, Inc.
// Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved.
// Copyright (c) 2020-2021 VMware, Inc. or its affiliates. All rights reserved.
//
package com.rabbitmq.stream;

View File

@ -11,7 +11,7 @@
// The Original Code is RabbitMQ.
//
// The Initial Developer of the Original Code is Pivotal Software, Inc.
// Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved.
// Copyright (c) 2020-2021 VMware, Inc. or its affiliates. All rights reserved.
//
package com.rabbitmq.stream;

View File

@ -11,7 +11,7 @@
// The Original Code is RabbitMQ.
//
// The Initial Developer of the Original Code is Pivotal Software, Inc.
// Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved.
// Copyright (c) 2020-2021 VMware, Inc. or its affiliates. All rights reserved.
//
package com.rabbitmq.stream;

View File

@ -11,7 +11,7 @@
// The Original Code is RabbitMQ.
//
// The Initial Developer of the Original Code is Pivotal Software, Inc.
// Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved.
// Copyright (c) 2020-2021 VMware, Inc. or its affiliates. All rights reserved.
//
package com.rabbitmq.stream;

View File

@ -0,0 +1,12 @@
{plugins, [rebar3_format]}.
{format, [
{files, ["src/*.erl", "test/*.erl"]},
{formatter, default_formatter},
{options, #{
paper => 80,
ribbon => 70,
inline_attributes => {when_under, 1},
inline_items => {when_under, 4}
}}
]}.

View File

@ -9,48 +9,60 @@
-behaviour(rabbit_mgmt_extension).
-export([dispatcher/0, web_ui/0]).
-export([init/2, to_json/2, content_types_provided/2, is_authorized/2]).
-export([dispatcher/0,
web_ui/0]).
-export([init/2,
to_json/2,
content_types_provided/2,
is_authorized/2]).
-export([resource_exists/2]).
-export([variances/2]).
-include_lib("rabbitmq_management_agent/include/rabbit_mgmt_records.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
dispatcher() -> [{"/stream/connections/:vhost/:connection/consumers", ?MODULE, []}].
dispatcher() ->
[{"/stream/connections/:vhost/:connection/consumers", ?MODULE, []}].
web_ui() -> [].
web_ui() ->
[].
%%--------------------------------------------------------------------
init(Req, _State) ->
{cowboy_rest, rabbit_mgmt_headers:set_common_permission_headers(Req, ?MODULE), #context{}}.
{cowboy_rest,
rabbit_mgmt_headers:set_common_permission_headers(Req, ?MODULE),
#context{}}.
variances(Req, Context) ->
{[<<"accept-encoding">>, <<"origin">>], Req, Context}.
{[<<"accept-encoding">>, <<"origin">>], Req, Context}.
content_types_provided(ReqData, Context) ->
{rabbit_mgmt_util:responder_map(to_json), ReqData, Context}.
{rabbit_mgmt_util:responder_map(to_json), ReqData, Context}.
resource_exists(ReqData, Context) ->
case rabbit_mgmt_wm_connection:conn(ReqData) of
error -> {false, ReqData, Context};
not_found -> {false, ReqData, Context};
_Conn -> {true, ReqData, Context}
end.
case rabbit_mgmt_wm_connection:conn(ReqData) of
error ->
{false, ReqData, Context};
not_found ->
{false, ReqData, Context};
_Conn ->
{true, ReqData, Context}
end.
to_json(ReqData, Context) ->
Pid = proplists:get_value(pid, rabbit_mgmt_wm_connection:conn(ReqData)),
Consumers = rabbit_mgmt_format:strip_pids(rabbit_stream_mgmt_db:get_connection_consumers(Pid)),
rabbit_mgmt_util:reply_list(
Consumers,
ReqData, Context).
Pid = proplists:get_value(pid,
rabbit_mgmt_wm_connection:conn(ReqData)),
Consumers =
rabbit_mgmt_format:strip_pids(
rabbit_stream_mgmt_db:get_connection_consumers(Pid)),
rabbit_mgmt_util:reply_list(Consumers, ReqData, Context).
is_authorized(ReqData, Context) ->
try
rabbit_mgmt_util:is_authorized_user(
ReqData, Context, rabbit_mgmt_wm_connection:conn(ReqData))
catch
{error, invalid_range_parameters, Reason} ->
rabbit_mgmt_util:bad_request(iolist_to_binary(Reason), ReqData, Context)
end.
try
rabbit_mgmt_util:is_authorized_user(ReqData, Context,
rabbit_mgmt_wm_connection:conn(ReqData))
catch
{error, invalid_range_parameters, Reason} ->
rabbit_mgmt_util:bad_request(iolist_to_binary(Reason), ReqData,
Context)
end.

View File

@ -9,116 +9,147 @@
-behaviour(rabbit_mgmt_extension).
-export([dispatcher/0, web_ui/0]).
-export([init/2, resource_exists/2, to_json/2, content_types_provided/2,
is_authorized/2, allowed_methods/2, delete_resource/2]).
-export([dispatcher/0,
web_ui/0]).
-export([init/2,
resource_exists/2,
to_json/2,
content_types_provided/2,
is_authorized/2,
allowed_methods/2,
delete_resource/2]).
-export([variances/2]).
-include_lib("rabbitmq_management_agent/include/rabbit_mgmt_records.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
dispatcher() -> [{"/stream/connections/:vhost/:connection", ?MODULE, []}].
dispatcher() ->
[{"/stream/connections/:vhost/:connection", ?MODULE, []}].
web_ui() -> [].
web_ui() ->
[].
%%--------------------------------------------------------------------
init(Req, _State) ->
{cowboy_rest, rabbit_mgmt_headers:set_common_permission_headers(Req, ?MODULE), #context{}}.
{cowboy_rest,
rabbit_mgmt_headers:set_common_permission_headers(Req, ?MODULE),
#context{}}.
variances(Req, Context) ->
{[<<"accept-encoding">>, <<"origin">>], Req, Context}.
{[<<"accept-encoding">>, <<"origin">>], Req, Context}.
content_types_provided(ReqData, Context) ->
{rabbit_mgmt_util:responder_map(to_json), ReqData, Context}.
{rabbit_mgmt_util:responder_map(to_json), ReqData, Context}.
allowed_methods(ReqData, Context) ->
{[<<"HEAD">>, <<"GET">>, <<"DELETE">>, <<"OPTIONS">>], ReqData, Context}.
{[<<"HEAD">>, <<"GET">>, <<"DELETE">>, <<"OPTIONS">>], ReqData,
Context}.
resource_exists(ReqData, Context) ->
case conn(ReqData) of
not_found -> {false, ReqData, Context};
_Conn -> {true, ReqData, Context}
end.
case conn(ReqData) of
not_found ->
{false, ReqData, Context};
_Conn ->
{true, ReqData, Context}
end.
to_json(ReqData, Context) ->
case rabbit_mgmt_util:disable_stats(ReqData) of
false ->
rabbit_mgmt_util:reply(
maps:from_list(rabbit_mgmt_format:strip_pids(conn_stats(ReqData))), ReqData, Context);
true ->
rabbit_mgmt_util:reply([{name, rabbit_mgmt_util:id(connection, ReqData)}],
ReqData, Context)
end.
case rabbit_mgmt_util:disable_stats(ReqData) of
false ->
rabbit_mgmt_util:reply(
maps:from_list(
rabbit_mgmt_format:strip_pids(conn_stats(ReqData))),
ReqData, Context);
true ->
rabbit_mgmt_util:reply([{name,
rabbit_mgmt_util:id(connection, ReqData)}],
ReqData, Context)
end.
delete_resource(ReqData, Context) ->
case conn(ReqData) of
not_found -> ok;
Conn ->
case proplists:get_value(pid, Conn) of
undefined -> ok;
Pid when is_pid(Pid) ->
force_close_connection(ReqData, Pid)
end
end,
{true, ReqData, Context}.
case conn(ReqData) of
not_found ->
ok;
Conn ->
case proplists:get_value(pid, Conn) of
undefined ->
ok;
Pid when is_pid(Pid) ->
force_close_connection(ReqData, Pid)
end
end,
{true, ReqData, Context}.
is_authorized(ReqData, Context) ->
try
rabbit_mgmt_util:is_authorized_user(ReqData, Context, conn(ReqData))
catch
{error, invalid_range_parameters, Reason} ->
rabbit_mgmt_util:bad_request(iolist_to_binary(Reason), ReqData, Context)
end.
try
rabbit_mgmt_util:is_authorized_user(ReqData, Context, conn(ReqData))
catch
{error, invalid_range_parameters, Reason} ->
rabbit_mgmt_util:bad_request(iolist_to_binary(Reason), ReqData,
Context)
end.
%%--------------------------------------------------------------------
conn(ReqData) ->
case rabbit_mgmt_util:disable_stats(ReqData) of
false ->
conn_stats(ReqData);
true ->
VHost = rabbit_mgmt_util:id(vhost, ReqData),
case rabbit_connection_tracking:lookup(rabbit_mgmt_util:id(connection, ReqData)) of
#tracked_connection{name = Name, pid = Pid,
username = Username, type = Type,
protocol = <<"stream">>, vhost = VHost} ->
[{name, Name}, {pid, Pid}, {user, Username}, {type, Type}];
#tracked_connection{} ->
not_found;
not_found ->
not_found
end
end.
case rabbit_mgmt_util:disable_stats(ReqData) of
false ->
conn_stats(ReqData);
true ->
VHost = rabbit_mgmt_util:id(vhost, ReqData),
case rabbit_connection_tracking:lookup(
rabbit_mgmt_util:id(connection, ReqData))
of
#tracked_connection{name = Name,
pid = Pid,
username = Username,
type = Type,
protocol = <<"stream">>,
vhost = VHost} ->
[{name, Name}, {pid, Pid}, {user, Username}, {type, Type}];
#tracked_connection{} ->
not_found;
not_found ->
not_found
end
end.
conn_stats(ReqData) ->
case rabbit_mgmt_db:get_connection(rabbit_mgmt_util:id(connection, ReqData),
rabbit_mgmt_util:range_ceil(ReqData)) of
not_found ->
not_found;
Connection ->
VHost = rabbit_mgmt_util:id(vhost, ReqData),
case {rabbit_stream_management_utils:is_stream_connection(Connection),
same_vhost(VHost, Connection)} of
{true, true} ->
Connection;
_ ->
not_found
end
end.
case rabbit_mgmt_db:get_connection(
rabbit_mgmt_util:id(connection, ReqData),
rabbit_mgmt_util:range_ceil(ReqData))
of
not_found ->
not_found;
Connection ->
VHost = rabbit_mgmt_util:id(vhost, ReqData),
case
{rabbit_stream_management_utils:is_stream_connection(Connection),
same_vhost(VHost, Connection)}
of
{true, true} ->
Connection;
_ ->
not_found
end
end.
same_vhost(Vhost, Connection) ->
case lists:keyfind(vhost, 1, Connection) of
{vhost, Vhost} ->
true;
_ ->
false
end.
case lists:keyfind(vhost, 1, Connection) of
{vhost, Vhost} ->
true;
_ ->
false
end.
force_close_connection(ReqData, Pid) ->
Reason = case cowboy_req:header(<<"x-reason">>, ReqData) of
undefined -> "Closed via management plugin";
V -> binary_to_list(V)
end,
gen_server:call(Pid, {shutdown, Reason}, infinity),
ok.
Reason =
case cowboy_req:header(<<"x-reason">>, ReqData) of
undefined ->
"Closed via management plugin";
V ->
binary_to_list(V)
end,
gen_server:call(Pid, {shutdown, Reason}, infinity),
ok.

View File

@ -9,48 +9,60 @@
-behaviour(rabbit_mgmt_extension).
-export([dispatcher/0, web_ui/0]).
-export([init/2, to_json/2, content_types_provided/2, is_authorized/2]).
-export([dispatcher/0,
web_ui/0]).
-export([init/2,
to_json/2,
content_types_provided/2,
is_authorized/2]).
-export([resource_exists/2]).
-export([variances/2]).
-include_lib("rabbitmq_management_agent/include/rabbit_mgmt_records.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
dispatcher() -> [{"/stream/connections/:vhost/:connection/publishers", ?MODULE, []}].
dispatcher() ->
[{"/stream/connections/:vhost/:connection/publishers", ?MODULE, []}].
web_ui() -> [].
web_ui() ->
[].
%%--------------------------------------------------------------------
init(Req, _State) ->
{cowboy_rest, rabbit_mgmt_headers:set_common_permission_headers(Req, ?MODULE), #context{}}.
{cowboy_rest,
rabbit_mgmt_headers:set_common_permission_headers(Req, ?MODULE),
#context{}}.
variances(Req, Context) ->
{[<<"accept-encoding">>, <<"origin">>], Req, Context}.
{[<<"accept-encoding">>, <<"origin">>], Req, Context}.
content_types_provided(ReqData, Context) ->
{rabbit_mgmt_util:responder_map(to_json), ReqData, Context}.
{rabbit_mgmt_util:responder_map(to_json), ReqData, Context}.
resource_exists(ReqData, Context) ->
case rabbit_mgmt_wm_connection:conn(ReqData) of
error -> {false, ReqData, Context};
not_found -> {false, ReqData, Context};
_Conn -> {true, ReqData, Context}
end.
case rabbit_mgmt_wm_connection:conn(ReqData) of
error ->
{false, ReqData, Context};
not_found ->
{false, ReqData, Context};
_Conn ->
{true, ReqData, Context}
end.
to_json(ReqData, Context) ->
Pid = proplists:get_value(pid, rabbit_mgmt_wm_connection:conn(ReqData)),
Publishers = rabbit_mgmt_format:strip_pids(rabbit_stream_mgmt_db:get_connection_publishers(Pid)),
rabbit_mgmt_util:reply_list(
Publishers,
ReqData, Context).
Pid = proplists:get_value(pid,
rabbit_mgmt_wm_connection:conn(ReqData)),
Publishers =
rabbit_mgmt_format:strip_pids(
rabbit_stream_mgmt_db:get_connection_publishers(Pid)),
rabbit_mgmt_util:reply_list(Publishers, ReqData, Context).
is_authorized(ReqData, Context) ->
try
rabbit_mgmt_util:is_authorized_user(
ReqData, Context, rabbit_mgmt_wm_connection:conn(ReqData))
catch
{error, invalid_range_parameters, Reason} ->
rabbit_mgmt_util:bad_request(iolist_to_binary(Reason), ReqData, Context)
end.
try
rabbit_mgmt_util:is_authorized_user(ReqData, Context,
rabbit_mgmt_wm_connection:conn(ReqData))
catch
{error, invalid_range_parameters, Reason} ->
rabbit_mgmt_util:bad_request(iolist_to_binary(Reason), ReqData,
Context)
end.

View File

@ -9,47 +9,57 @@
-behaviour(rabbit_mgmt_extension).
-export([dispatcher/0, web_ui/0]).
-export([init/2, to_json/2, content_types_provided/2, is_authorized/2]).
-export([dispatcher/0,
web_ui/0]).
-export([init/2,
to_json/2,
content_types_provided/2,
is_authorized/2]).
-include_lib("rabbitmq_management_agent/include/rabbit_mgmt_records.hrl").
dispatcher() -> [{"/stream/connections", ?MODULE, []}].
dispatcher() ->
[{"/stream/connections", ?MODULE, []}].
web_ui() -> [{javascript, <<"stream.js">>}].
web_ui() ->
[{javascript, <<"stream.js">>}].
%%--------------------------------------------------------------------
init(Req, _Opts) ->
{cowboy_rest, rabbit_mgmt_cors:set_headers(Req, ?MODULE), #context{}}.
{cowboy_rest, rabbit_mgmt_cors:set_headers(Req, ?MODULE), #context{}}.
content_types_provided(ReqData, Context) ->
{[{<<"application/json">>, to_json}], ReqData, Context}.
{[{<<"application/json">>, to_json}], ReqData, Context}.
to_json(ReqData, Context) ->
try
Connections = do_connections_query(ReqData, Context),
rabbit_mgmt_util:reply_list_or_paginate(Connections, ReqData, Context)
catch
{error, invalid_range_parameters, Reason} ->
rabbit_mgmt_util:bad_request(iolist_to_binary(Reason), ReqData, Context)
end.
try
Connections = do_connections_query(ReqData, Context),
rabbit_mgmt_util:reply_list_or_paginate(Connections, ReqData, Context)
catch
{error, invalid_range_parameters, Reason} ->
rabbit_mgmt_util:bad_request(iolist_to_binary(Reason), ReqData,
Context)
end.
is_authorized(ReqData, Context) ->
rabbit_mgmt_util:is_authorized(ReqData, Context).
rabbit_mgmt_util:is_authorized(ReqData, Context).
augmented(ReqData, Context) ->
rabbit_mgmt_util:filter_conn_ch_list(
rabbit_mgmt_db:get_all_connections(
rabbit_mgmt_util:range_ceil(ReqData)), ReqData, Context).
rabbit_mgmt_util:filter_conn_ch_list(
rabbit_mgmt_db:get_all_connections(
rabbit_mgmt_util:range_ceil(ReqData)),
ReqData, Context).
do_connections_query(ReqData, Context) ->
case rabbit_mgmt_util:disable_stats(ReqData) of
false ->
rabbit_stream_management_utils:keep_stream_connections(augmented(ReqData, Context));
true ->
TrackedStreamConnections = rabbit_stream_management_utils:keep_tracked_stream_connections(
rabbit_connection_tracking:list()),
rabbit_mgmt_util:filter_tracked_conn_list(TrackedStreamConnections,
ReqData, Context)
end.
case rabbit_mgmt_util:disable_stats(ReqData) of
false ->
rabbit_stream_management_utils:keep_stream_connections(augmented(ReqData,
Context));
true ->
TrackedStreamConnections =
rabbit_stream_management_utils:keep_tracked_stream_connections(
rabbit_connection_tracking:list()),
rabbit_mgmt_util:filter_tracked_conn_list(TrackedStreamConnections,
ReqData, Context)
end.

View File

@ -9,41 +9,54 @@
-behaviour(rabbit_mgmt_extension).
-export([dispatcher/0, web_ui/0]).
-export([init/2, to_json/2, content_types_provided/2, resource_exists/2, is_authorized/2]).
-export([dispatcher/0,
web_ui/0]).
-export([init/2,
to_json/2,
content_types_provided/2,
resource_exists/2,
is_authorized/2]).
-include_lib("rabbitmq_management_agent/include/rabbit_mgmt_records.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
dispatcher() -> [{"/stream/connections/:vhost", ?MODULE, []}].
dispatcher() ->
[{"/stream/connections/:vhost", ?MODULE, []}].
web_ui() -> [].
web_ui() ->
[].
%%--------------------------------------------------------------------
init(Req, _Opts) ->
{cowboy_rest, rabbit_mgmt_cors:set_headers(Req, ?MODULE), #context{}}.
{cowboy_rest, rabbit_mgmt_cors:set_headers(Req, ?MODULE), #context{}}.
content_types_provided(ReqData, Context) ->
{[{<<"application/json">>, to_json}], ReqData, Context}.
{[{<<"application/json">>, to_json}], ReqData, Context}.
resource_exists(ReqData, Context) ->
{rabbit_vhost:exists(rabbit_mgmt_util:id(vhost, ReqData)), ReqData, Context}.
{rabbit_vhost:exists(
rabbit_mgmt_util:id(vhost, ReqData)),
ReqData, Context}.
to_json(ReqData, Context) ->
try
rabbit_mgmt_util:reply_list(rabbit_stream_management_utils:keep_stream_connections(
augmented(ReqData, Context)), ReqData, Context
)
catch
{error, invalid_range_parameters, Reason} ->
rabbit_mgmt_util:bad_request(iolist_to_binary(Reason), ReqData, Context)
end.
try
rabbit_mgmt_util:reply_list(
rabbit_stream_management_utils:keep_stream_connections(augmented(ReqData,
Context)),
ReqData, Context)
catch
{error, invalid_range_parameters, Reason} ->
rabbit_mgmt_util:bad_request(iolist_to_binary(Reason), ReqData,
Context)
end.
is_authorized(ReqData, Context) ->
rabbit_mgmt_util:is_authorized_vhost_visible_for_monitoring(ReqData, Context).
rabbit_mgmt_util:is_authorized_vhost_visible_for_monitoring(ReqData,
Context).
augmented(ReqData, Context) ->
rabbit_mgmt_util:filter_conn_ch_list(
rabbit_mgmt_db:get_all_connections(
rabbit_mgmt_util:range_ceil(ReqData)), ReqData, Context).
rabbit_mgmt_util:filter_conn_ch_list(
rabbit_mgmt_db:get_all_connections(
rabbit_mgmt_util:range_ceil(ReqData)),
ReqData, Context).

View File

@ -9,54 +9,74 @@
-behaviour(rabbit_mgmt_extension).
-export([dispatcher/0, web_ui/0]).
-export([init/2, resource_exists/2, to_json/2, content_types_provided/2, is_authorized/2]).
-export([dispatcher/0,
web_ui/0]).
-export([init/2,
resource_exists/2,
to_json/2,
content_types_provided/2,
is_authorized/2]).
-include_lib("rabbitmq_management_agent/include/rabbit_mgmt_records.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
dispatcher() -> [
{"/stream/consumers", ?MODULE, []},
{"/stream/consumers/:vhost", ?MODULE, []}
].
dispatcher() ->
[{"/stream/consumers", ?MODULE, []},
{"/stream/consumers/:vhost", ?MODULE, []}].
web_ui() -> [].
web_ui() ->
[].
%%--------------------------------------------------------------------
init(Req, _Opts) ->
{cowboy_rest, rabbit_mgmt_cors:set_headers(Req, ?MODULE), #context{}}.
{cowboy_rest, rabbit_mgmt_cors:set_headers(Req, ?MODULE), #context{}}.
content_types_provided(ReqData, Context) ->
{[{<<"application/json">>, to_json}], ReqData, Context}.
{[{<<"application/json">>, to_json}], ReqData, Context}.
resource_exists(ReqData, Context) ->
{case rabbit_mgmt_util:vhost(ReqData) of
not_found -> false;
none -> true; % none means `all`
_ -> true
end, ReqData, Context}.
{case rabbit_mgmt_util:vhost(ReqData) of
not_found ->
false;
none ->
true; % none means `all`
_ ->
true
end,
ReqData, Context}.
to_json(ReqData, Context = #context{user = User}) ->
case rabbit_mgmt_util:disable_stats(ReqData) of
false ->
Arg = case rabbit_mgmt_util:vhost(ReqData) of
none -> all;
VHost -> VHost
end,
Consumers = rabbit_mgmt_format:strip_pids(rabbit_stream_mgmt_db:get_all_consumers(Arg)),
rabbit_mgmt_util:reply_list(
filter_user(Consumers, User), [], ReqData, Context);
true ->
rabbit_mgmt_util:bad_request(<<"Stats in management UI are disabled on this node">>, ReqData, Context)
end.
case rabbit_mgmt_util:disable_stats(ReqData) of
false ->
Arg = case rabbit_mgmt_util:vhost(ReqData) of
none ->
all;
VHost ->
VHost
end,
Consumers =
rabbit_mgmt_format:strip_pids(
rabbit_stream_mgmt_db:get_all_consumers(Arg)),
rabbit_mgmt_util:reply_list(filter_user(Consumers, User),
[],
ReqData,
Context);
true ->
rabbit_mgmt_util:bad_request(<<"Stats in management UI are disabled on this node">>,
ReqData, Context)
end.
is_authorized(ReqData, Context) ->
rabbit_mgmt_util:is_authorized(ReqData, Context).
rabbit_mgmt_util:is_authorized(ReqData, Context).
filter_user(List, #user{username = Username, tags = Tags}) ->
case rabbit_mgmt_util:is_monitor(Tags) of
true -> List;
false -> [I || I <- List,
rabbit_misc:pget(user, rabbit_misc:pget(connection_details, I)) == Username]
end.
case rabbit_mgmt_util:is_monitor(Tags) of
true ->
List;
false ->
[I
|| I <- List,
rabbit_misc:pget(user, rabbit_misc:pget(connection_details, I))
== Username]
end.

View File

@ -7,26 +7,27 @@
-module(rabbit_stream_management_utils).
-export([keep_stream_connections/1, keep_tracked_stream_connections/1, is_stream_connection/1]).
-export([keep_stream_connections/1,
keep_tracked_stream_connections/1,
is_stream_connection/1]).
-include_lib("rabbit_common/include/rabbit.hrl").
keep_stream_connections(Connections) ->
lists:filter(fun is_stream_connection/1, Connections).
lists:filter(fun is_stream_connection/1, Connections).
is_stream_connection(Connection) ->
case lists:keyfind(protocol, 1, Connection) of
{protocol, <<"stream">>} ->
true;
_ ->
false
end.
case lists:keyfind(protocol, 1, Connection) of
{protocol, <<"stream">>} ->
true;
_ ->
false
end.
keep_tracked_stream_connections(Connections) ->
lists:filter(fun(#tracked_connection{protocol = <<"stream">>}) ->
true;
(_) ->
false
end, Connections).
lists:filter(fun (#tracked_connection{protocol = <<"stream">>}) ->
true;
(_) ->
false
end,
Connections).

View File

@ -13,104 +13,126 @@
-include_lib("rabbitmq_stream/include/rabbit_stream_metrics.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
-export([get_all_consumers/1, get_all_publishers/1]).
-export([get_all_consumers/1,
get_all_publishers/1]).
-export([entity_data/4]).
-export([get_connection_consumers/1, get_connection_publishers/1]).
-export([get_connection_consumers/1,
get_connection_publishers/1]).
get_all_consumers(VHosts) ->
rabbit_mgmt_db:submit(fun(_Interval) -> consumers_stats(VHosts) end).
rabbit_mgmt_db:submit(fun(_Interval) -> consumers_stats(VHosts) end).
get_all_publishers(VHosts) ->
rabbit_mgmt_db:submit(fun(_Interval) -> publishers_stats(VHosts) end).
rabbit_mgmt_db:submit(fun(_Interval) -> publishers_stats(VHosts) end).
get_connection_consumers(ConnectionPid) when is_pid(ConnectionPid) ->
rabbit_mgmt_db:submit(fun(_Interval) -> connection_consumers_stats(ConnectionPid) end).
rabbit_mgmt_db:submit(fun(_Interval) ->
connection_consumers_stats(ConnectionPid)
end).
get_connection_publishers(ConnectionPid) when is_pid(ConnectionPid) ->
rabbit_mgmt_db:submit(fun(_Interval) -> connection_publishers_stats(ConnectionPid) end).
rabbit_mgmt_db:submit(fun(_Interval) ->
connection_publishers_stats(ConnectionPid)
end).
consumers_stats(VHost) ->
Data = rabbit_mgmt_db:get_data_from_nodes({rabbit_stream_mgmt_db, entity_data,
[VHost, ?ENTITY_CONSUMER, fun consumers_by_vhost/1]}),
[V || {_, V} <- maps:to_list(Data)].
Data =
rabbit_mgmt_db:get_data_from_nodes({rabbit_stream_mgmt_db,
entity_data,
[VHost, ?ENTITY_CONSUMER,
fun consumers_by_vhost/1]}),
[V || {_, V} <- maps:to_list(Data)].
publishers_stats(VHost) ->
Data = rabbit_mgmt_db:get_data_from_nodes({rabbit_stream_mgmt_db, entity_data,
[VHost, ?ENTITY_PUBLISHER, fun publishers_by_vhost/1]}),
[V || {_, V} <- maps:to_list(Data)].
Data =
rabbit_mgmt_db:get_data_from_nodes({rabbit_stream_mgmt_db,
entity_data,
[VHost, ?ENTITY_PUBLISHER,
fun publishers_by_vhost/1]}),
[V || {_, V} <- maps:to_list(Data)].
connection_consumers_stats(ConnectionPid) ->
Data = rabbit_mgmt_db:get_data_from_nodes({rabbit_stream_mgmt_db, entity_data,
[ConnectionPid, ?ENTITY_CONSUMER, fun consumers_by_connection/1]}),
[V || {_, V} <- maps:to_list(Data)].
Data =
rabbit_mgmt_db:get_data_from_nodes({rabbit_stream_mgmt_db,
entity_data,
[ConnectionPid, ?ENTITY_CONSUMER,
fun consumers_by_connection/1]}),
[V || {_, V} <- maps:to_list(Data)].
connection_publishers_stats(ConnectionPid) ->
Data = rabbit_mgmt_db:get_data_from_nodes({rabbit_stream_mgmt_db, entity_data,
[ConnectionPid, ?ENTITY_PUBLISHER, fun publishers_by_connection/1]}),
[V || {_, V} <- maps:to_list(Data)].
Data =
rabbit_mgmt_db:get_data_from_nodes({rabbit_stream_mgmt_db,
entity_data,
[ConnectionPid, ?ENTITY_PUBLISHER,
fun publishers_by_connection/1]}),
[V || {_, V} <- maps:to_list(Data)].
entity_data(_Pid, Param, EntityType, QueryFun) ->
maps:from_list(
[begin
AugmentedPublisher = augment_entity(EntityType, P),
{P, augment_connection_pid(AugmentedPublisher) ++ AugmentedPublisher}
end
|| P <- QueryFun(Param)]
).
maps:from_list([begin
AugmentedPublisher = augment_entity(EntityType, P),
{P,
augment_connection_pid(AugmentedPublisher)
++ AugmentedPublisher}
end
|| P <- QueryFun(Param)]).
augment_entity(?ENTITY_CONSUMER, {{Q, ConnPid, SubId}, Props}) ->
[{queue, format_resource(Q)},
{connection, ConnPid},
{subscription_id, SubId} | Props];
[{queue, format_resource(Q)}, {connection, ConnPid},
{subscription_id, SubId}
| Props];
augment_entity(?ENTITY_PUBLISHER, {{Q, ConnPid, PubId}, Props}) ->
[{queue, format_resource(Q)},
{connection, ConnPid},
{publisher_id, PubId} | Props].
[{queue, format_resource(Q)}, {connection, ConnPid},
{publisher_id, PubId}
| Props].
consumers_by_vhost(VHost) ->
ets:select(?TABLE_CONSUMER,
[{{{#resource{virtual_host = '$1', _ = '_'}, '_', '_'}, '_'},
[{'orelse', {'==', 'all', VHost}, {'==', VHost, '$1'}}],
['$_']}]).
ets:select(?TABLE_CONSUMER,
[{{{#resource{virtual_host = '$1', _ = '_'}, '_', '_'}, '_'},
[{'orelse', {'==', all, VHost}, {'==', VHost, '$1'}}],
['$_']}]).
publishers_by_vhost(VHost) ->
ets:select(?TABLE_PUBLISHER,
[{{{#resource{virtual_host = '$1', _ = '_'}, '_', '_'}, '_'},
[{'orelse', {'==', 'all', VHost}, {'==', VHost, '$1'}}],
['$_']}]).
ets:select(?TABLE_PUBLISHER,
[{{{#resource{virtual_host = '$1', _ = '_'}, '_', '_'}, '_'},
[{'orelse', {'==', all, VHost}, {'==', VHost, '$1'}}],
['$_']}]).
consumers_by_connection(ConnectionPid) ->
get_entity_stats(?TABLE_CONSUMER, ConnectionPid).
get_entity_stats(?TABLE_CONSUMER, ConnectionPid).
publishers_by_connection(ConnectionPid) ->
get_entity_stats(?TABLE_PUBLISHER, ConnectionPid).
get_entity_stats(?TABLE_PUBLISHER, ConnectionPid).
get_entity_stats(Table, Id) ->
ets:select(Table, match_entity_spec(Id)).
ets:select(Table, match_entity_spec(Id)).
match_entity_spec(ConnectionId) ->
[{{{'_', '$1', '_'}, '_'}, [{'==', ConnectionId, '$1'}], ['$_']}].
[{{{'_', '$1', '_'}, '_'}, [{'==', ConnectionId, '$1'}], ['$_']}].
augment_connection_pid(Consumer) ->
Pid = rabbit_misc:pget(connection, Consumer),
Conn = rabbit_mgmt_data:lookup_element(connection_created_stats, Pid, 3),
ConnDetails = case Conn of
[] -> %% If the connection has just been opened, we might not yet have the data
[];
_ ->
[{name, rabbit_misc:pget(name, Conn)},
{user, rabbit_misc:pget(user, Conn)},
{node, rabbit_misc:pget(node, Conn)},
{peer_port, rabbit_misc:pget(peer_port, Conn)},
{peer_host, rabbit_misc:pget(peer_host, Conn)}]
end,
[{connection_details, ConnDetails}].
Pid = rabbit_misc:pget(connection, Consumer),
Conn =
rabbit_mgmt_data:lookup_element(connection_created_stats, Pid, 3),
ConnDetails =
case Conn of
[] -> %% If the connection has just been opened, we might not yet have the data
[];
_ ->
[{name, rabbit_misc:pget(name, Conn)},
{user, rabbit_misc:pget(user, Conn)},
{node, rabbit_misc:pget(node, Conn)},
{peer_port, rabbit_misc:pget(peer_port, Conn)},
{peer_host, rabbit_misc:pget(peer_host, Conn)}]
end,
[{connection_details, ConnDetails}].
format_resource(unknown) -> unknown;
format_resource(Res) -> format_resource(name, Res).
format_resource(unknown) ->
unknown;
format_resource(Res) ->
format_resource(name, Res).
format_resource(_, unknown) ->
unknown;
format_resource(NameAs, #resource{name = Name, virtual_host = VHost}) ->
[{NameAs, Name}, {vhost, VHost}].
unknown;
format_resource(NameAs,
#resource{name = Name, virtual_host = VHost}) ->
[{NameAs, Name}, {vhost, VHost}].

View File

@ -9,54 +9,74 @@
-behaviour(rabbit_mgmt_extension).
-export([dispatcher/0, web_ui/0]).
-export([init/2, resource_exists/2, to_json/2, content_types_provided/2, is_authorized/2]).
-export([dispatcher/0,
web_ui/0]).
-export([init/2,
resource_exists/2,
to_json/2,
content_types_provided/2,
is_authorized/2]).
-include_lib("rabbitmq_management_agent/include/rabbit_mgmt_records.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
dispatcher() -> [
{"/stream/publishers", ?MODULE, []},
{"/stream/publishers/:vhost", ?MODULE, []}
].
dispatcher() ->
[{"/stream/publishers", ?MODULE, []},
{"/stream/publishers/:vhost", ?MODULE, []}].
web_ui() -> [].
web_ui() ->
[].
%%--------------------------------------------------------------------
init(Req, _Opts) ->
{cowboy_rest, rabbit_mgmt_cors:set_headers(Req, ?MODULE), #context{}}.
{cowboy_rest, rabbit_mgmt_cors:set_headers(Req, ?MODULE), #context{}}.
content_types_provided(ReqData, Context) ->
{[{<<"application/json">>, to_json}], ReqData, Context}.
{[{<<"application/json">>, to_json}], ReqData, Context}.
resource_exists(ReqData, Context) ->
{case rabbit_mgmt_util:vhost(ReqData) of
not_found -> false;
none -> true; % none means `all`
_ -> true
end, ReqData, Context}.
{case rabbit_mgmt_util:vhost(ReqData) of
not_found ->
false;
none ->
true; % none means `all`
_ ->
true
end,
ReqData, Context}.
to_json(ReqData, Context = #context{user = User}) ->
case rabbit_mgmt_util:disable_stats(ReqData) of
false ->
Arg = case rabbit_mgmt_util:vhost(ReqData) of
none -> all;
VHost -> VHost
end,
Publishers = rabbit_mgmt_format:strip_pids(rabbit_stream_mgmt_db:get_all_publishers(Arg)),
rabbit_mgmt_util:reply_list(
filter_user(Publishers, User), [], ReqData, Context);
true ->
rabbit_mgmt_util:bad_request(<<"Stats in management UI are disabled on this node">>, ReqData, Context)
end.
case rabbit_mgmt_util:disable_stats(ReqData) of
false ->
Arg = case rabbit_mgmt_util:vhost(ReqData) of
none ->
all;
VHost ->
VHost
end,
Publishers =
rabbit_mgmt_format:strip_pids(
rabbit_stream_mgmt_db:get_all_publishers(Arg)),
rabbit_mgmt_util:reply_list(filter_user(Publishers, User),
[],
ReqData,
Context);
true ->
rabbit_mgmt_util:bad_request(<<"Stats in management UI are disabled on this node">>,
ReqData, Context)
end.
is_authorized(ReqData, Context) ->
rabbit_mgmt_util:is_authorized(ReqData, Context).
rabbit_mgmt_util:is_authorized(ReqData, Context).
filter_user(List, #user{username = Username, tags = Tags}) ->
case rabbit_mgmt_util:is_monitor(Tags) of
true -> List;
false -> [I || I <- List,
rabbit_misc:pget(user, rabbit_misc:pget(connection_details, I)) == Username]
end.
case rabbit_mgmt_util:is_monitor(Tags) of
true ->
List;
false ->
[I
|| I <- List,
rabbit_misc:pget(user, rabbit_misc:pget(connection_details, I))
== Username]
end.

View File

@ -14,16 +14,10 @@
-compile(export_all).
all() ->
[
{group, non_parallel_tests}
].
[{group, non_parallel_tests}].
groups() ->
[
{non_parallel_tests, [], [
stream_management
]}
].
[{non_parallel_tests, [], [stream_management]}].
%% -------------------------------------------------------------------
%% Testsuite setup/teardown.
@ -31,21 +25,23 @@ groups() ->
init_per_suite(Config) ->
rabbit_ct_helpers:log_environment(),
Config1 = rabbit_ct_helpers:set_config(Config, [
{rmq_nodename_suffix, ?MODULE}
]),
Config1 =
rabbit_ct_helpers:set_config(Config,
[{rmq_nodename_suffix, ?MODULE}]),
rabbit_ct_helpers:run_setup_steps(Config1,
[fun(StepConfig) ->
rabbit_ct_helpers:merge_app_env(StepConfig,
{rabbit, [{collect_statistics_interval, 500}]})
end] ++
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps()).
[fun(StepConfig) ->
rabbit_ct_helpers:merge_app_env(StepConfig,
{rabbit,
[{collect_statistics_interval,
500}]})
end]
++ rabbit_ct_broker_helpers:setup_steps()
++ rabbit_ct_client_helpers:setup_steps()).
end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config,
rabbit_ct_client_helpers:teardown_steps() ++
rabbit_ct_broker_helpers:teardown_steps()).
rabbit_ct_client_helpers:teardown_steps()
++ rabbit_ct_broker_helpers:teardown_steps()).
init_per_group(_, Config) ->
Config.
@ -69,28 +65,38 @@ stream_management(Config) ->
Vhost1 = <<"vh1">>,
Vhost2 = <<"vh2">>,
rabbit_ct_broker_helpers:add_user(Config, UserManagement),
rabbit_ct_broker_helpers:set_user_tags(Config, 0, UserManagement, [management]),
rabbit_ct_broker_helpers:set_user_tags(Config,
0,
UserManagement,
[management]),
rabbit_ct_broker_helpers:add_user(Config, UserMonitoring),
rabbit_ct_broker_helpers:set_user_tags(Config, 0, UserMonitoring, [monitoring]),
rabbit_ct_broker_helpers:set_user_tags(Config,
0,
UserMonitoring,
[monitoring]),
rabbit_ct_broker_helpers:add_vhost(Config, Vhost1),
rabbit_ct_broker_helpers:add_vhost(Config, Vhost2),
rabbit_ct_broker_helpers:set_full_permissions(Config, UserManagement, Vhost1),
rabbit_ct_broker_helpers:set_full_permissions(Config, UserMonitoring, Vhost1),
rabbit_ct_broker_helpers:set_full_permissions(Config, <<"guest">>, Vhost1),
rabbit_ct_broker_helpers:set_full_permissions(Config, <<"guest">>, Vhost2),
rabbit_ct_broker_helpers:set_full_permissions(Config, UserManagement,
Vhost1),
rabbit_ct_broker_helpers:set_full_permissions(Config, UserMonitoring,
Vhost1),
rabbit_ct_broker_helpers:set_full_permissions(Config, <<"guest">>,
Vhost1),
rabbit_ct_broker_helpers:set_full_permissions(Config, <<"guest">>,
Vhost2),
StreamPortNode = get_stream_port(Config),
ManagementPortNode = get_management_port(Config),
DataDir = rabbit_ct_helpers:get_config(Config, data_dir),
MakeResult = rabbit_ct_helpers:make(Config, DataDir, ["tests",
{"STREAM_PORT=~b", [StreamPortNode]},
{"MANAGEMENT_PORT=~b", [ManagementPortNode]}
]),
MakeResult =
rabbit_ct_helpers:make(Config, DataDir,
["tests", {"STREAM_PORT=~b", [StreamPortNode]},
{"MANAGEMENT_PORT=~b", [ManagementPortNode]}]),
{ok, _} = MakeResult.
get_stream_port(Config) ->
rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_stream).
rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_stream).
get_management_port(Config) ->
rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mgmt).
rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mgmt).