Fix all dialyzer warnings in `amqp_client`
And enable `warnings_as_errors`. A pair of `behaviour_info/1` needed converting to `-callback` directives.
This commit is contained in:
parent
4840ca9f2f
commit
60093b0689
|
|
@ -8,6 +8,7 @@ load(
|
|||
"rabbitmq_app",
|
||||
"rabbitmq_integration_suite",
|
||||
"rabbitmq_suite",
|
||||
"RABBITMQ_DIALYZER_OPTS",
|
||||
)
|
||||
|
||||
APP_ENV = """[
|
||||
|
|
@ -38,6 +39,8 @@ APP_EXTRA_KEYS = """%% Hex.pm package informations.
|
|||
|
||||
EXTRA_APPS = [
|
||||
"xmerl",
|
||||
"ssl",
|
||||
"public_key",
|
||||
]
|
||||
|
||||
DEPS = [
|
||||
|
|
@ -68,7 +71,7 @@ plt(
|
|||
|
||||
dialyze(
|
||||
plt = ":base_plt",
|
||||
warnings_as_errors = False,
|
||||
dialyzer_opts = RABBITMQ_DIALYZER_OPTS,
|
||||
)
|
||||
|
||||
rabbitmq_home(
|
||||
|
|
|
|||
|
|
@ -489,11 +489,11 @@ handle_info({send_command, Method, Content}, State) ->
|
|||
handle_info({send_command_and_notify, QPid, ChPid,
|
||||
Method = #'basic.deliver'{}, Content},
|
||||
State = #state{delivery_flow_control = MFC}) ->
|
||||
case MFC of
|
||||
false -> handle_method_from_server(Method, Content, State),
|
||||
rabbit_amqqueue_common:notify_sent(QPid, ChPid);
|
||||
true -> handle_method_from_server(Method, Content,
|
||||
{self(), QPid, ChPid}, State)
|
||||
_ = case MFC of
|
||||
false -> _ = handle_method_from_server(Method, Content, State),
|
||||
_ = rabbit_amqqueue_common:notify_sent(QPid, ChPid);
|
||||
true -> _ = handle_method_from_server(Method, Content,
|
||||
{self(), QPid, ChPid}, State)
|
||||
end,
|
||||
{noreply, State};
|
||||
%% This comes from the writer or rabbit_channel
|
||||
|
|
@ -763,8 +763,8 @@ handle_method_from_server1(#'basic.deliver'{} = Deliver, AmqpMsg, State) ->
|
|||
{noreply, State};
|
||||
handle_method_from_server1(#'channel.flow'{active = Active} = Flow, none,
|
||||
State = #state{flow_handler = FlowHandler}) ->
|
||||
case FlowHandler of none -> ok;
|
||||
{Pid, _Ref} -> Pid ! Flow
|
||||
_ = case FlowHandler of none -> ok;
|
||||
{Pid, _Ref} -> Pid ! Flow
|
||||
end,
|
||||
%% Putting the flow_ok in the queue so that the RPC queue can be
|
||||
%% flushed beforehand. Methods that made it to the queue are not
|
||||
|
|
@ -774,7 +774,7 @@ handle_method_from_server1(#'channel.flow'{active = Active} = Flow, none,
|
|||
handle_method_from_server1(
|
||||
#'basic.return'{} = BasicReturn, AmqpMsg,
|
||||
State = #state{return_handler = ReturnHandler}) ->
|
||||
case ReturnHandler of
|
||||
_ = case ReturnHandler of
|
||||
none -> ?LOG_WARN("Channel (~tp): received {~tp, ~tp} but there is "
|
||||
"no return handler registered",
|
||||
[self(), BasicReturn, AmqpMsg]);
|
||||
|
|
@ -1004,7 +1004,9 @@ call_to_consumer(Method, Args, DeliveryCtx, #state{consumer = Consumer}) ->
|
|||
amqp_gen_consumer:call_consumer(Consumer, Method, Args, DeliveryCtx).
|
||||
|
||||
safe_cancel_timer(undefined) -> ok;
|
||||
safe_cancel_timer(TRef) -> erlang:cancel_timer(TRef).
|
||||
safe_cancel_timer(TRef) ->
|
||||
_ = erlang:cancel_timer(TRef),
|
||||
ok.
|
||||
|
||||
second_to_millisecond(Timeout) ->
|
||||
Timeout * 1000.
|
||||
|
|
|
|||
|
|
@ -190,7 +190,7 @@ check_all_channels_terminated(State = #state{closing = true,
|
|||
|
||||
handle_connection_closing(ChannelCloseType, Reason,
|
||||
State = #state{connection = Connection}) ->
|
||||
case internal_is_empty(State) of
|
||||
_ = case internal_is_empty(State) of
|
||||
true -> amqp_gen_connection:channels_terminated(Connection);
|
||||
false -> signal_channels_connection_closing(ChannelCloseType, Reason,
|
||||
State)
|
||||
|
|
|
|||
|
|
@ -22,7 +22,7 @@ start() ->
|
|||
%%
|
||||
%% * https://github.com/rabbitmq/rabbitmq-erlang-client/issues/72
|
||||
%% * https://github.com/rabbitmq/rabbitmq-common/pull/149
|
||||
application:ensure_all_started(rabbit_common),
|
||||
{ok, _} = application:ensure_all_started(rabbit_common),
|
||||
{ok, _} = application:ensure_all_started(amqp_client),
|
||||
ok.
|
||||
|
||||
|
|
|
|||
|
|
@ -196,7 +196,8 @@ set_connection_name(ConnName,
|
|||
%% application which is making this call.
|
||||
ensure_started() ->
|
||||
[ensure_started(App) || App <- [syntax_tools, compiler, xmerl,
|
||||
rabbit_common, amqp_client, credentials_obfuscation]].
|
||||
rabbit_common, amqp_client, credentials_obfuscation]],
|
||||
ok.
|
||||
|
||||
ensure_started(App) ->
|
||||
case is_pid(application_controller:get_master(App)) andalso amqp_sup:is_ready() of
|
||||
|
|
|
|||
|
|
@ -16,7 +16,6 @@
|
|||
channel_internal_error/3, server_misbehaved/2, channels_terminated/1,
|
||||
close/3, server_close/2, info/2, info_keys/0, info_keys/1,
|
||||
register_blocked_handler/2, update_secret/2]).
|
||||
-export([behaviour_info/1]).
|
||||
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
|
||||
handle_info/2]).
|
||||
|
||||
|
|
@ -38,6 +37,36 @@
|
|||
close,
|
||||
from = none}).
|
||||
|
||||
-type connect_params() :: {ServerProperties :: term(),
|
||||
ChannelMax :: term(),
|
||||
ChMgr :: term(),
|
||||
NewState :: term()}.
|
||||
-callback init() -> {ok, InitialState :: term()}.
|
||||
-callback terminate(Reason :: term(), FinalState :: term()) -> Ignored :: term().
|
||||
-callback connect(AmqpParams :: #amqp_params_network{} | #amqp_params_direct{},
|
||||
SIF :: term(),
|
||||
TypeSup :: term(),
|
||||
State :: term()) ->
|
||||
{ok, connect_params() } |
|
||||
{closing, connect_params(), AmqpError :: term(), Reply :: term()} |
|
||||
{error, Error :: term()}.
|
||||
|
||||
-callback do(Method :: term(), State :: term()) -> Ignored :: term().
|
||||
-callback open_channel_args(State :: term()) -> OpenChannelArgs :: term().
|
||||
-callback i(InfoItem :: atom(), State :: term()) -> Info :: term().
|
||||
-callback info_keys() -> [InfoItem :: atom()].
|
||||
|
||||
-callback handle_message(Message :: term(), State :: term()) ->
|
||||
{ok, NewState :: term() } |
|
||||
{stop, Reason :: term(), FinalState :: term()}.
|
||||
-callback closing(flush|abrupt, Reason :: term(), State :: term()) ->
|
||||
{ok, NewState :: term() } |
|
||||
{stop, Reason :: term(), FinalState :: term()}.
|
||||
-callback channels_terminated(State :: term()) ->
|
||||
{ok, NewState :: term() } |
|
||||
{stop, Reason :: term(), FinalState :: term()}.
|
||||
|
||||
|
||||
%%---------------------------------------------------------------------------
|
||||
%% Interface
|
||||
%%---------------------------------------------------------------------------
|
||||
|
|
@ -93,47 +122,6 @@ info_keys(Pid) ->
|
|||
%% Behaviour
|
||||
%%---------------------------------------------------------------------------
|
||||
|
||||
behaviour_info(callbacks) ->
|
||||
[
|
||||
%% init() -> {ok, InitialState}
|
||||
{init, 0},
|
||||
|
||||
%% terminate(Reason, FinalState) -> Ignored
|
||||
{terminate, 2},
|
||||
|
||||
%% connect(AmqpParams, SIF, TypeSup, State) ->
|
||||
%% {ok, ConnectParams} | {closing, ConnectParams, AmqpError, Reply} |
|
||||
%% {error, Error}
|
||||
%% where
|
||||
%% ConnectParams = {ServerProperties, ChannelMax, ChMgr, NewState}
|
||||
{connect, 4},
|
||||
|
||||
%% do(Method, State) -> Ignored
|
||||
{do, 2},
|
||||
|
||||
%% open_channel_args(State) -> OpenChannelArgs
|
||||
{open_channel_args, 1},
|
||||
|
||||
%% i(InfoItem, State) -> Info
|
||||
{i, 2},
|
||||
|
||||
%% info_keys() -> [InfoItem]
|
||||
{info_keys, 0},
|
||||
|
||||
%% CallbackReply = {ok, NewState} | {stop, Reason, FinalState}
|
||||
|
||||
%% handle_message(Message, State) -> CallbackReply
|
||||
{handle_message, 2},
|
||||
|
||||
%% closing(flush|abrupt, Reason, State) -> CallbackReply
|
||||
{closing, 3},
|
||||
|
||||
%% channels_terminated(State) -> CallbackReply
|
||||
{channels_terminated, 1}
|
||||
];
|
||||
behaviour_info(_Other) ->
|
||||
undefined.
|
||||
|
||||
callback(Function, Params, State = #state{module = Mod,
|
||||
module_state = MState}) ->
|
||||
case erlang:apply(Mod, Function, Params ++ [MState]) of
|
||||
|
|
@ -287,12 +275,12 @@ handle_method(#'connection.close_ok'{}, State = #state{closing = Closing}) ->
|
|||
end,
|
||||
{stop, {shutdown, closing_to_reason(Closing)}, State};
|
||||
handle_method(#'connection.blocked'{} = Blocked, State = #state{block_handler = BlockHandler}) ->
|
||||
case BlockHandler of none -> ok;
|
||||
_ = case BlockHandler of none -> ok;
|
||||
{Pid, _Ref} -> Pid ! Blocked
|
||||
end,
|
||||
{noreply, State};
|
||||
handle_method(#'connection.unblocked'{} = Unblocked, State = #state{block_handler = BlockHandler}) ->
|
||||
case BlockHandler of none -> ok;
|
||||
_ = case BlockHandler of none -> ok;
|
||||
{Pid, _Ref} -> Pid ! Unblocked
|
||||
end,
|
||||
{noreply, State};
|
||||
|
|
@ -310,7 +298,7 @@ handle_method(Other, State) ->
|
|||
%%---------------------------------------------------------------------------
|
||||
|
||||
app_initiated_close(Close, From, Timeout, State) ->
|
||||
case Timeout of
|
||||
_ = case Timeout of
|
||||
infinity -> ok;
|
||||
_ -> erlang:send_after(Timeout, self(), closing_timeout)
|
||||
end,
|
||||
|
|
|
|||
|
|
@ -23,13 +23,69 @@
|
|||
-behaviour(gen_server2).
|
||||
|
||||
-export([start_link/3, call_consumer/2, call_consumer/3, call_consumer/4]).
|
||||
-export([behaviour_info/1]).
|
||||
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
|
||||
handle_info/2, prioritise_info/3]).
|
||||
|
||||
-record(state, {module,
|
||||
module_state}).
|
||||
|
||||
|
||||
-type ok_error() :: {ok, State :: term()} | {error, Reason:: term(), State :: term()}.
|
||||
|
||||
%% This callback is invoked by the channel, when it starts
|
||||
%% up. Use it to initialize the state of the consumer. In case of
|
||||
-callback init(Args :: term()) -> {ok, State :: term()} | {stop, Reason :: term()} | ignore.
|
||||
|
||||
%% This callback is invoked by the channel before a basic.consume
|
||||
-callback handle_consume(#'basic.consume'{}, Sender :: pid(), State :: term()) -> ok_error().
|
||||
|
||||
%% This callback is invoked by the channel every time a
|
||||
%% basic.consume_ok is received from the server. Consume is the original
|
||||
%% method sent out to the server - it can be used to associate the
|
||||
%% call with the response.
|
||||
-callback handle_consume_ok(#'basic.consume_ok'{}, #'basic.consume'{}, State :: term()) -> ok_error().
|
||||
|
||||
%% This callback is invoked by the channel every time a basic.cancel
|
||||
%% is sent to the server.
|
||||
-callback handle_cancel(#'basic.cancel'{}, State :: term()) -> ok_error().
|
||||
|
||||
%% This callback is invoked by the channel every time a basic.cancel_ok
|
||||
%% is received from the server.
|
||||
-callback handle_cancel_ok(CancelOk :: #'basic.cancel_ok'{}, #'basic.cancel'{}, State :: term()) -> ok_error().
|
||||
|
||||
%% This callback is invoked by the channel every time a basic.cancel
|
||||
%% is received from the server.
|
||||
-callback handle_server_cancel(#'basic.cancel'{}, State :: term()) -> ok_error().
|
||||
|
||||
%% This callback is invoked by the channel every time a basic.deliver
|
||||
%% is received from the server.
|
||||
-callback handle_deliver(#'basic.deliver'{}, #amqp_msg{}, State :: term()) -> ok_error().
|
||||
|
||||
%% This callback is invoked by the channel every time a basic.deliver
|
||||
%% is received from the server. Only relevant for channels that use
|
||||
%% direct client connection and manual flow control.
|
||||
-callback handle_deliver(#'basic.deliver'{}, #amqp_msg{}, DeliveryCtx :: {pid(), pid(), pid()}, State :: term()) -> ok_error().
|
||||
|
||||
%% This callback is invoked the consumer process receives a
|
||||
%% message.
|
||||
-callback handle_info(Info :: term(), State :: term()) -> ok_error().
|
||||
|
||||
%% This callback is invoked by the channel when calling
|
||||
%% amqp_channel:call_consumer/2. Reply is the term that
|
||||
%% amqp_channel:call_consumer/2 will return. If the callback
|
||||
%% returns {noreply, _}, then the caller to
|
||||
%% amqp_channel:call_consumer/2 and the channel remain blocked
|
||||
%% until gen_server2:reply/2 is used with the provided From as
|
||||
%% the first argument.
|
||||
-callback handle_call(Msg :: term(), From :: term(), State :: term()) ->
|
||||
{reply, Reply :: term(), NewState :: term()} |
|
||||
{noreply, NewState :: term()} |
|
||||
{error, Reason :: term(), NewState :: term()}.
|
||||
|
||||
%% This callback is invoked by the channel after it has shut down and
|
||||
%% just before its process exits.
|
||||
-callback terminate(Reason :: term(), State :: term()) -> any().
|
||||
|
||||
%%---------------------------------------------------------------------------
|
||||
%% Interface
|
||||
%%---------------------------------------------------------------------------
|
||||
|
|
@ -65,137 +121,6 @@ call_consumer(Pid, Method, Args) ->
|
|||
call_consumer(Pid, Method, Args, DeliveryCtx) ->
|
||||
gen_server2:call(Pid, {consumer_call, Method, Args, DeliveryCtx}, amqp_util:call_timeout()).
|
||||
|
||||
%%---------------------------------------------------------------------------
|
||||
%% Behaviour
|
||||
%%---------------------------------------------------------------------------
|
||||
|
||||
%% @private
|
||||
behaviour_info(callbacks) ->
|
||||
[
|
||||
%% init(Args) -> {ok, InitialState} | {stop, Reason} | ignore
|
||||
%% where
|
||||
%% Args = [any()]
|
||||
%% InitialState = state()
|
||||
%% Reason = term()
|
||||
%%
|
||||
%% This callback is invoked by the channel, when it starts
|
||||
%% up. Use it to initialize the state of the consumer. In case of
|
||||
%% an error, return {stop, Reason} or ignore.
|
||||
{init, 1},
|
||||
|
||||
%% handle_consume(Consume, Sender, State) -> ok_error()
|
||||
%% where
|
||||
%% Consume = #'basic.consume'{}
|
||||
%% Sender = pid()
|
||||
%% State = state()
|
||||
%%
|
||||
%% This callback is invoked by the channel before a basic.consume
|
||||
%% is sent to the server.
|
||||
{handle_consume, 3},
|
||||
|
||||
%% handle_consume_ok(ConsumeOk, Consume, State) -> ok_error()
|
||||
%% where
|
||||
%% ConsumeOk = #'basic.consume_ok'{}
|
||||
%% Consume = #'basic.consume'{}
|
||||
%% State = state()
|
||||
%%
|
||||
%% This callback is invoked by the channel every time a
|
||||
%% basic.consume_ok is received from the server. Consume is the original
|
||||
%% method sent out to the server - it can be used to associate the
|
||||
%% call with the response.
|
||||
{handle_consume_ok, 3},
|
||||
|
||||
%% handle_cancel(Cancel, State) -> ok_error()
|
||||
%% where
|
||||
%% Cancel = #'basic.cancel'{}
|
||||
%% State = state()
|
||||
%%
|
||||
%% This callback is invoked by the channel every time a basic.cancel
|
||||
%% is sent to the server.
|
||||
{handle_cancel, 2},
|
||||
|
||||
%% handle_cancel_ok(CancelOk, Cancel, State) -> ok_error()
|
||||
%% where
|
||||
%% CancelOk = #'basic.cancel_ok'{}
|
||||
%% Cancel = #'basic.cancel'{}
|
||||
%% State = state()
|
||||
%%
|
||||
%% This callback is invoked by the channel every time a basic.cancel_ok
|
||||
%% is received from the server.
|
||||
{handle_cancel_ok, 3},
|
||||
|
||||
%% handle_server_cancel(Cancel, State) -> ok_error()
|
||||
%% where
|
||||
%% Cancel = #'basic.cancel'{}
|
||||
%% State = state()
|
||||
%%
|
||||
%% This callback is invoked by the channel every time a basic.cancel
|
||||
%% is received from the server.
|
||||
{handle_server_cancel, 2},
|
||||
|
||||
%% handle_deliver(Deliver, Message, State) -> ok_error()
|
||||
%% where
|
||||
%% Deliver = #'basic.deliver'{}
|
||||
%% Message = #amqp_msg{}
|
||||
%% State = state()
|
||||
%%
|
||||
%% This callback is invoked by the channel every time a basic.deliver
|
||||
%% is received from the server.
|
||||
{handle_deliver, 3},
|
||||
|
||||
%% handle_deliver(Deliver, Message,
|
||||
%% DeliveryCtx, State) -> ok_error()
|
||||
%% where
|
||||
%% Deliver = #'basic.deliver'{}
|
||||
%% Message = #amqp_msg{}
|
||||
%% DeliveryCtx = {pid(), pid(), pid()}
|
||||
%% State = state()
|
||||
%%
|
||||
%% This callback is invoked by the channel every time a basic.deliver
|
||||
%% is received from the server. Only relevant for channels that use
|
||||
%% direct client connection and manual flow control.
|
||||
{handle_deliver, 4},
|
||||
|
||||
%% handle_info(Info, State) -> ok_error()
|
||||
%% where
|
||||
%% Info = any()
|
||||
%% State = state()
|
||||
%%
|
||||
%% This callback is invoked the consumer process receives a
|
||||
%% message.
|
||||
{handle_info, 2},
|
||||
|
||||
%% handle_call(Msg, From, State) -> {reply, Reply, NewState} |
|
||||
%% {noreply, NewState} |
|
||||
%% {error, Reason, NewState}
|
||||
%% where
|
||||
%% Msg = any()
|
||||
%% From = any()
|
||||
%% Reply = any()
|
||||
%% State = state()
|
||||
%% NewState = state()
|
||||
%%
|
||||
%% This callback is invoked by the channel when calling
|
||||
%% amqp_channel:call_consumer/2. Reply is the term that
|
||||
%% amqp_channel:call_consumer/2 will return. If the callback
|
||||
%% returns {noreply, _}, then the caller to
|
||||
%% amqp_channel:call_consumer/2 and the channel remain blocked
|
||||
%% until gen_server2:reply/2 is used with the provided From as
|
||||
%% the first argument.
|
||||
{handle_call, 3},
|
||||
|
||||
%% terminate(Reason, State) -> any()
|
||||
%% where
|
||||
%% Reason = any()
|
||||
%% State = state()
|
||||
%%
|
||||
%% This callback is invoked by the channel after it has shut down and
|
||||
%% just before its process exits.
|
||||
{terminate, 2}
|
||||
];
|
||||
behaviour_info(_Other) ->
|
||||
undefined.
|
||||
|
||||
%%---------------------------------------------------------------------------
|
||||
%% gen_server2 callbacks
|
||||
%%---------------------------------------------------------------------------
|
||||
|
|
|
|||
|
|
@ -259,7 +259,7 @@ tune(#'connection.tune'{channel_max = ServerChannelMax,
|
|||
_ -> lists:min([FrameMax, ?TCP_MAX_PACKET_SIZE])
|
||||
end,
|
||||
NewState = State#state{heartbeat = Heartbeat, frame_max = CappedFrameMax},
|
||||
start_heartbeat(NewState),
|
||||
_ = start_heartbeat(NewState),
|
||||
{#'connection.tune_ok'{channel_max = ChannelMax,
|
||||
frame_max = CappedFrameMax,
|
||||
heartbeat = Heartbeat}, ChannelMax, NewState}.
|
||||
|
|
|
|||
Loading…
Reference in New Issue