Put rabbit_cli_backend under supervisor

This commit is contained in:
Jean-Sébastien Pédron 2025-06-19 11:48:03 +02:00
parent d7b779967c
commit 56d502d0f5
No known key found for this signature in database
GPG Key ID: 39E99761A5FD94CC
9 changed files with 256 additions and 121 deletions

View File

@ -236,6 +236,12 @@
{enables, routing_ready}]}).
%% CLI-related boot steps.
-rabbit_boot_step({rabbit_cli_backend_sup,
[{description, "RabbitMQ CLI command supervisor"},
{mfa, {rabbit_sup, start_supervisor_child,
[rabbit_cli_backend_sup]}},
{requires, [core_initialized, recovery]},
{enables, routing_ready}]}).
-rabbit_boot_step({rabbit_cli_command_discovery,
[{description, "RabbitMQ CLI command discovery"},
{mfa, {rabbit_cli_commands, discover_commands,

View File

@ -1,5 +1,7 @@
-module(rabbit_cli_backend).
-behaviour(gen_statem).
-include_lib("kernel/include/logger.hrl").
-include_lib("rabbit_common/include/logging.hrl").
@ -7,29 +9,76 @@
-include("src/rabbit_cli_backend.hrl").
-export([run_command/1]).
-export([run_command/2,
start_link/3]).
-export([init/1,
callback_mode/0,
handle_event/4,
terminate/3,
code_change/4]).
%% TODO:
%% * Implémenter "list exchanges" plus proprement
%% * Implémenter "rabbitmqctl list_exchanges" pour la compatibilité
run_command(ContextMap) when is_map(ContextMap) ->
run_command(ContextMap, Caller) when is_map(ContextMap) ->
Context = map_to_context(ContextMap),
run_command(Context);
run_command(#rabbit_cli{} = Context) ->
maybe
%% We can query the argparse definition from the remote node to know
%% the commands it supports and proceed with the execution.
ArgparseDef = final_argparse_def(Context),
Context1 = Context#rabbit_cli{argparse_def = ArgparseDef},
run_command(Context, Caller);
run_command(#rabbit_cli{} = Context, Caller) when is_pid(Caller) ->
GroupLeader = erlang:group_leader(),
rabbit_cli_backend_sup:start_backend(Context, Caller, GroupLeader).
{ok, ArgMap, CmdPath, Command} ?= final_parse(Context1),
Context2 = Context1#rabbit_cli{arg_map = ArgMap,
cmd_path = CmdPath,
command = Command},
map_to_context(ContextMap) ->
#rabbit_cli{progname = maps:get(progname, ContextMap),
args = maps:get(args, ContextMap),
argparse_def = maps:get(argparse_def, ContextMap),
arg_map = maps:get(arg_map, ContextMap),
cmd_path = maps:get(cmd_path, ContextMap),
command = maps:get(command, ContextMap),
do_run_command(Context2)
end.
frontend_priv = undefined}.
start_link(Context, Caller, GroupLeader) ->
Args = #{context => Context,
caller => Caller,
group_leader => GroupLeader},
gen_statem:start_link(?MODULE, Args, []).
init(#{context := Context, caller := Caller, group_leader := GroupLeader}) ->
process_flag(trap_exit, true),
erlang:link(Caller),
erlang:group_leader(GroupLeader, self()),
{ok, standing_by, Context, {next_event, internal, parse_command}}.
callback_mode() ->
handle_event_function.
handle_event(internal, parse_command, standing_by, Context) ->
%% We can query the argparse definition from the remote node to know
%% the commands it supports and proceed with the execution.
ArgparseDef = final_argparse_def(Context),
Context1 = Context#rabbit_cli{argparse_def = ArgparseDef},
case final_parse(Context1) of
{ok, ArgMap, CmdPath, Command} ->
Context2 = Context1#rabbit_cli{arg_map = ArgMap,
cmd_path = CmdPath,
command = Command},
{next_state, command_parsed, Context2,
{next_event, internal, run_command}};
{error, Reason} ->
{stop, {failed_to_parse_command, Reason}}
end;
handle_event(internal, run_command, command_parsed, Context) ->
Ret = do_run_command(Context),
{stop, {shutdown, Ret}, Context}.
terminate(Reason, _State, _Data) ->
?LOG_DEBUG("CLI: backend terminating: ~0p", [Reason]),
ok.
code_change(_Vsn, State, Data, _Extra) ->
{ok, State, Data}.
%% -------------------------------------------------------------------
%% Argparse definition handling.
@ -70,12 +119,3 @@ final_parse(
do_run_command(
#rabbit_cli{command = #{handler := {Module, Function}}} = Context) ->
erlang:apply(Module, Function, [Context]).
map_to_context(ContextMap) ->
#rabbit_cli{scriptname = maps:get(scriptname, ContextMap),
progname = maps:get(progname, ContextMap),
args = maps:get(args, ContextMap),
argparse_def = maps:get(argparse_def, ContextMap),
arg_map = maps:get(arg_map, ContextMap),
cmd_path = maps:get(cmd_path, ContextMap),
command = maps:get(command, ContextMap)}.

View File

@ -1,7 +1,8 @@
-record(rabbit_cli, {scriptname,
progname,
-record(rabbit_cli, {progname,
args,
argparse_def,
arg_map,
cmd_path,
command}).
command,
frontend_priv}).

View File

@ -0,0 +1,20 @@
-module(rabbit_cli_backend_sup).
-behaviour(supervisor).
-export([start_link/0,
start_backend/3]).
-export([init/1]).
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, none).
start_backend(Context, Caller, GroupLeader) ->
supervisor:start_child(?MODULE, [Context, Caller, GroupLeader]).
init(_Args) ->
SupFlags = #{strategy => simple_one_for_one},
BackendChild = #{id => rabbit_cli_backend,
start => {rabbit_cli_backend, start_link, []},
restart => temporary},
{ok, {SupFlags, [BackendChild]}}.

View File

@ -8,16 +8,28 @@
-export([discover_commands/0,
discovered_commands/0,
discovered_argparse_def/0]).
-export([cmd_hello/1,
-export([cmd_noop/1,
cmd_hello/1,
cmd_crash/1,
cmd_list_exchanges/1,
cmd_import_definitions/1,
cmd_top/1]).
-rabbitmq_command(
{#{cli => ["noop"]},
#{help => "No-op",
handler => {?MODULE, cmd_noop}}}).
-rabbitmq_command(
{#{cli => ["hello"]},
#{help => "Say hello!",
handler => {?MODULE, cmd_hello}}}).
-rabbitmq_command(
{#{cli => ["crash"]},
#{help => "Crash",
handler => {?MODULE, cmd_crash}}}).
-rabbitmq_command(
{#{cli => ["declare", "exchange"],
http => {put, ["exchanges", vhost, exchange]}},
@ -141,11 +153,17 @@ expand_argparse_def(Defs) when is_list(Defs) ->
%% XXX
%% -------------------------------------------------------------------
cmd_noop(_) ->
ok.
cmd_hello(_) ->
Name = io:get_line("Name: "),
io:format("Hello ~s!~n", [string:trim(Name)]),
ok.
cmd_crash(_) ->
erlang:exit(oops).
cmd_list_exchanges(#{progname := Progname, arg_map := ArgMap}) ->
logger:alert("CLI: running list exchanges"),
InfoKeys = rabbit_exchange:info_keys() -- [user_who_performed_action],

View File

@ -3,17 +3,12 @@
-include_lib("kernel/include/logger.hrl").
-include_lib("stdlib/include/assert.hrl").
-include("src/rabbit_cli_backend.hrl").
-export([main/1,
noop/1]).
-record(?MODULE, {scriptname,
progname,
group_leader,
args,
argparse_def,
arg_map,
cmd_path,
command,
connection}).
main(Args) ->
@ -24,6 +19,7 @@ main(Args) ->
Ret = run_cli(ScriptName, Args),
?LOG_NOTICE("CLI: run_cli() return value: ~p", [Ret]),
%% FIXME: Ensures everything written to stdout/stderr was flushed.
timer:sleep(50),
erlang:halt().
%% -------------------------------------------------------------------
@ -59,48 +55,49 @@ configure_logging() ->
run_cli(ScriptName, Args) ->
ProgName = filename:basename(ScriptName, ".escript"),
GroupLeader = erlang:group_leader(),
Context = #?MODULE{scriptname = ScriptName,
progname = ProgName,
args = Args,
group_leader = GroupLeader},
Priv = #?MODULE{scriptname = ScriptName},
Context = #rabbit_cli{progname = ProgName,
args = Args,
frontend_priv = Priv},
init_local_args(Context).
init_local_args(Context) ->
maybe
LocalArgparseDef = initial_argparse_def(),
Context1 = Context#?MODULE{argparse_def = LocalArgparseDef},
Context1 = Context#rabbit_cli{argparse_def = LocalArgparseDef},
{ok,
PartialArgMap,
PartialCmdPath,
PartialCommand} ?= initial_parse(Context1),
Context2 = Context1#?MODULE{arg_map = PartialArgMap,
cmd_path = PartialCmdPath,
command = PartialCommand},
Context2 = Context1#rabbit_cli{arg_map = PartialArgMap,
cmd_path = PartialCmdPath,
command = PartialCommand},
set_log_level(Context2)
end.
set_log_level(#?MODULE{arg_map = #{verbose := Verbosity}} = Context)
set_log_level(#rabbit_cli{arg_map = #{verbose := Verbosity}} = Context)
when Verbosity >= 3 ->
logger:set_primary_config(level, debug),
connect_to_node(Context);
set_log_level(#?MODULE{} = Context) ->
set_log_level(#rabbit_cli{} = Context) ->
connect_to_node(Context).
connect_to_node(#?MODULE{arg_map = ArgMap} = Context) ->
connect_to_node(
#rabbit_cli{arg_map = ArgMap, frontend_priv = Priv} = Context) ->
Ret = case ArgMap of
#{node := NodenameOrUri} ->
rabbit_cli_transport2:connect(NodenameOrUri);
_ ->
rabbit_cli_transport2:connect()
end,
Context1 = case Ret of
{ok, Connection} ->
Context#?MODULE{connection = Connection};
{error, _Reason} ->
Context#?MODULE{connection = none}
end,
Priv1 = case Ret of
{ok, Connection} ->
Priv#?MODULE{connection = Connection};
{error, _Reason} ->
Priv#?MODULE{connection = none}
end,
Context1 = Context#rabbit_cli{frontend_priv = Priv1},
run_command(Context1).
%% -------------------------------------------------------------------
@ -138,7 +135,7 @@ initial_argparse_def() ->
handler => {?MODULE, noop}}.
initial_parse(
#?MODULE{progname = ProgName, args = Args, argparse_def = ArgparseDef}) ->
#rabbit_cli{progname = ProgName, args = Args, argparse_def = ArgparseDef}) ->
Options = #{progname => ProgName},
case partial_parse(Args, ArgparseDef, Options) of
{ok, ArgMap, CmdPath, Command, _RemainingArgs} ->
@ -163,10 +160,18 @@ partial_parse(Args, ArgparseDef, Options, RemainingArgs) ->
Error
end.
noop(_Context) ->
ok.
%% -------------------------------------------------------------------
%% Command execution.
%% -------------------------------------------------------------------
%% Run command:
%% * start backend (remote if connection, local otherwise); backend starts
%% execution of command
%% * loop to react to signals and messages from backend
%%
%% TODO: Send a list of supported features:
%% * support for some messages, like Erlang I/O protocol, file read/write
%% support
@ -175,26 +180,34 @@ partial_parse(Args, ArgparseDef, Options, RemainingArgs) ->
%% * is plain test or HTTP
%% * evolutions in the communication between the frontend and the backend
run_command(#?MODULE{connection = Connection} = Context)
run_command(
#rabbit_cli{frontend_priv = #?MODULE{connection = Connection}} = Context)
when Connection =/= none ->
ContextMap = context_to_map(Context),
rabbit_cli_transport2:rpc(
Connection, rabbit_cli_backend, run_command, [ContextMap]);
run_command(Context) ->
maybe
process_flag(trap_exit, true),
ContextMap = context_to_map(Context),
{ok, _Backend} ?= rabbit_cli_transport2:run_command(
Connection, ContextMap),
main_loop(Context)
end;
run_command(#rabbit_cli{} = Context) ->
%% TODO: If we can't connect to a node, try to parse args locally and run
%% the command on this CLI node.
%% FIXME: Load applications first, otherwise module attributes are
%% unavailable.
%% FIXME: Do we need to spawn a process?
ContextMap = context_to_map(Context),
rabbit_cli_backend:run_command(ContextMap).
%% FIXME: run_command() relies on rabbit_cli_backend_sup.
maybe
process_flag(trap_exit, true),
ContextMap = context_to_map(Context),
{ok, _Backend} ?= rabbit_cli_backend:run_command(ContextMap),
main_loop(Context)
end.
context_to_map(Context) ->
Fields = [Field || Field <- record_info(fields, ?MODULE),
%% We dont need or want to communicate the connection
%% state or the group leader to the backend.
Field =/= connection orelse
Field =/= group_leader],
Fields = [Field || Field <- record_info(fields, rabbit_cli),
%% We don't need or want to communicate anything that
%% is private to the frontend.
Field =/= frontend_priv],
record_to_map(Fields, Context, 2, #{}).
record_to_map([Field | Rest], Record, Index, Map) ->
@ -204,5 +217,16 @@ record_to_map([Field | Rest], Record, Index, Map) ->
record_to_map([], _Record, _Index, Map) ->
Map.
noop(_Context) ->
main_loop(#rabbit_cli{} = Context) ->
?LOG_DEBUG("CLI: frontend main loop..."),
receive
{'EXIT', _LinkedPid, Reason} ->
terminate(Reason, Context);
Info ->
?LOG_DEBUG("Unknown info: ~0p", [Info]),
main_loop(Context)
end.
terminate(Reason, _Context) ->
?LOG_DEBUG("CLI: frontend terminating: ~0p", [Reason]),
ok.

View File

@ -5,7 +5,9 @@
-include_lib("kernel/include/logger.hrl").
-export([start_link/1, t/0,
run_command/2,
rpc/4,
link/2,
send/3]).
-export([init/1,
callback_mode/0,
@ -28,9 +30,15 @@ t() ->
start_link(Uri) ->
gen_statem:start_link(?MODULE, Uri, []).
run_command(Client, ContextMap) ->
gen_statem:call(Client, {?FUNCTION_NAME, ContextMap}).
rpc(Client, Module, Function, Args) ->
gen_statem:call(Client, {?FUNCTION_NAME, Module, Function, Args}).
link(Client, Pid) ->
gen_statem:call(Client, {?FUNCTION_NAME, Pid}).
send(Client, Dest, Msg) ->
gen_statem:cast(Client, {?FUNCTION_NAME, Dest, Msg}).
@ -103,13 +111,15 @@ handle_event(
stream_ready,
#?MODULE{} = Data) ->
Request = binary_to_term(RequestBin),
?LOG_DEBUG("CLI: received request from server: ~p", [Request]),
?LOG_DEBUG("CLI: received HTTP message from server: ~p", [Request]),
case handle_request(Request, Data) of
{reply, Reply, Data1} ->
send_request(Reply, Data1),
{keep_state, Data1};
{noreply, Data1} ->
{keep_state, Data1}
{keep_state, Data1};
{stop, Reason} ->
{stop, Reason, Data}
end;
handle_event(
info, {io_reply, ProxyRef, Reply},
@ -138,7 +148,8 @@ handle_event(
%% FIXME: Handle pending requests.
{stop, normal, Data}.
terminate(_Reason, _State, _Data) ->
terminate(Reason, _State, _Data) ->
?LOG_DEBUG("CLI: HTTP client terminating: ~0p", [Reason]),
ok.
code_change(_Vsn, State, Data, _Extra) ->
@ -183,4 +194,6 @@ handle_request(
GroupLeader ! ProxyIoRequest,
IoRequests1 = IoRequests#{ProxyRef => {From, ReplyAs}},
Data1 = Data#?MODULE{io_requests = IoRequests1},
{noreply, Data1}.
{noreply, Data1};
handle_request({'EXIT', _Pid, Reason}, _Data) ->
{stop, Reason}.

View File

@ -107,7 +107,6 @@ config_change(_OldVsn, State, _Extra) ->
%% -------------------------------------------------------------------
init(#{method := <<"GET">>} = Req, State) ->
?LOG_DEBUG("CLI: received HTTP request: ~p", [Req]),
UpgradeHeader = cowboy_req:header(<<"upgrade">>, Req),
case UpgradeHeader of
<<"websocket">> ->
@ -124,25 +123,33 @@ init(#{method := <<"GET">>} = Req, State) ->
end
end;
init(Req, State) ->
?LOG_DEBUG("CLI: received HTTP request: ~p", [Req]),
Req1 = reply_with_help(Req, 405),
{ok, Req1, State}.
websocket_init(State) ->
{ok, Server} = rabbit_cli_http_server:start_link(self()),
State1 = State#{server => Server,
reqids => gen_server:reqids_new()},
{ok, State1}.
process_flag(trap_exit, true),
erlang:group_leader(self(), self()),
{ok, State}.
websocket_handle(
{binary, RequestBin},
#{server := Server, reqids := ReqIds} = State) ->
websocket_handle({binary, RequestBin}, State) ->
Request = binary_to_term(RequestBin),
?LOG_DEBUG("CLI: received request from client: ~p", [Request]),
ReqIds1 = rabbit_cli_http_server:send_request(
Server, Request, undefined, ReqIds),
State1 = State#{reqids => ReqIds1},
{ok, State1};
?LOG_DEBUG("CLI: received HTTP message from client: ~p", [Request]),
try
case handle_request(Request) of
{reply, Reply} ->
ReplyBin = term_to_binary(Reply),
Frame1 = {binary, ReplyBin},
{[Frame1], State};
noreply ->
{ok, State}
end
catch
Class:Reason:Stacktrace ->
Exception = {call_exception, Class, Reason, Stacktrace},
ExceptionBin = term_to_binary(Exception),
Frame2 = {binary, ExceptionBin},
{[Frame2], State}
end;
websocket_handle(Frame, State) ->
?LOG_DEBUG("CLI: unhandled Websocket frame: ~p", [Frame]),
{ok, State}.
@ -151,38 +158,13 @@ websocket_info({io_request, _From, _ReplyAs, _Request} = IoRequest, State) ->
IoRequestBin = term_to_binary(IoRequest),
Frame = {binary, IoRequestBin},
{[Frame], State};
websocket_info(Info, #{server := Server, reqids := ReqIds} = State) ->
case gen_server:check_response(Info, ReqIds, true) of
{{reply, Response}, _Label, ReqIds1} ->
State1 = State#{reqids => ReqIds1},
case Response of
{reply, Reply} ->
ReplyBin = term_to_binary(Reply),
Frame = {binary, ReplyBin},
{[Frame], State1};
noreply ->
{ok, State1}
end;
{{error, {Reason, Server}}, _Label, ReqIds1} ->
State1 = State#{reqids => ReqIds1},
?LOG_DEBUG("CLI: error from gen_server request: ~p", [Reason]),
{ok, State1};
NotResponse
when NotResponse =:= no_request orelse NotResponse =:= no_reply ->
?LOG_DEBUG("CLI: unhandled info: ~p", [Info]),
{ok, State}
end.
websocket_info({'EXIT', _Pid, _Reason} = Exit, State) ->
ExitBin = term_to_binary(Exit),
Frame = {binary, ExitBin},
{[Frame, close], State}.
terminate(_Reason, _Req, #{server := Server}) ->
?LOG_ALERT("CLI: terminate: ~p", [_Reason]),
rabbit_cli_http_server:stop(Server),
receive
{'EXIT', Server, _} ->
ok
end,
ok;
terminate(_Reason, _Req, _State) ->
?LOG_ALERT("CLI: terminate: ~p", [_Reason]),
terminate(Reason, _Req, _State) ->
?LOG_DEBUG("CLI: HTTP server terminating: ~0p", [Reason]),
ok.
reply_with_help(Req, Code) ->
@ -197,3 +179,21 @@ reply_with_help(Req, Code) ->
cowboy_req:reply(
Code, #{<<"content-type">> => <<"text/html; charset=utf-8">>}, Body,
Req).
handle_request({call, From, Command}) ->
Ret = handle_command(Command),
Reply = {call_ret, From, Ret},
{reply, Reply};
handle_request({cast, Command}) ->
_ = handle_command(Command),
noreply.
handle_command({run_command, ContextMap}) ->
Caller = self(),
rabbit_cli_backend:run_command(ContextMap, Caller);
handle_command({rpc, Module, Function, Args}) ->
erlang:apply(Module, Function, Args);
handle_command({link, Pid}) ->
erlang:link(Pid);
handle_command({send, Dest, Msg}) ->
erlang:send(Dest, Msg).

View File

@ -3,7 +3,9 @@
-include_lib("kernel/include/logger.hrl").
-export([connect/0, connect/1,
run_command/2,
rpc/4,
link/2,
send/3]).
-record(?MODULE, {type :: erldist | http,
@ -85,12 +87,23 @@ complete_nodename(Nodename) ->
list_to_atom(Nodename)
end.
run_command(#?MODULE{type = erldist, peer = Node}, ContextMap) ->
Caller = self(),
erpc:call(Node, rabbit_cli_backend, run_command, [ContextMap, Caller]);
run_command(#?MODULE{type = http, peer = Client}, ContextMap) ->
rabbit_cli_http_client:run_command(Client, ContextMap).
rpc(#?MODULE{type = erldist, peer = Node}, Module, Function, Args) ->
erpc:call(Node, Module, Function, Args);
rpc(#?MODULE{type = http, peer = Pid}, Module, Function, Args) ->
rabbit_cli_http_client:rpc(Pid, Module, Function, Args).
rpc(#?MODULE{type = http, peer = Client}, Module, Function, Args) ->
rabbit_cli_http_client:rpc(Client, Module, Function, Args).
link(#?MODULE{type = erldist}, Pid) ->
erlang:link(Pid);
link(#?MODULE{type = http, peer = Client}, Pid) ->
rabbit_cli_http_client:link(Client, Pid).
send(#?MODULE{type = erldist}, Dest, Msg) ->
erlang:send(Dest, Msg);
send(#?MODULE{type = http, peer = Pid}, Dest, Msg) ->
rabbit_cli_http_client:send(Pid, Dest, Msg).
send(#?MODULE{type = http, peer = Client}, Dest, Msg) ->
rabbit_cli_http_client:send(Client, Dest, Msg).