allow propagation of protocol exceptions in channel interceptors to parent/executing channels
This commit is contained in:
parent
8ce9db6850
commit
c817bed018
|
@ -72,7 +72,9 @@ call_module(Mod, St, M, C) ->
|
|||
% this little dance is because Mod might be unloaded at any point
|
||||
case (catch {ok, Mod:intercept(M, C, St)}) of
|
||||
{ok, R} -> validate_response(Mod, M, C, R);
|
||||
{'EXIT', {undef, [{Mod, intercept, _, _} | _]}} -> {M, C}
|
||||
{'EXIT', {undef, [{Mod, intercept, _, _} | _]}} -> {M, C};
|
||||
{'EXIT', {amqp_error, _Type, _ErrMsg, _} = AMQPError} ->
|
||||
rabbit_misc:protocol_error(AMQPError)
|
||||
end.
|
||||
|
||||
validate_response(Mod, M1, C1, R = {M2, C2}) ->
|
||||
|
|
|
@ -23,6 +23,7 @@ groups() ->
|
|||
{non_parallel_tests, [], [
|
||||
register_interceptor,
|
||||
register_interceptor_failing_with_amqp_error,
|
||||
register_interceptor_crashing_with_amqp_error_exception,
|
||||
register_failing_interceptors
|
||||
]}
|
||||
].
|
||||
|
@ -120,7 +121,7 @@ register_interceptor_failing_with_amqp_error1(Config, Interceptor) ->
|
|||
#'queue.declare_ok'{} =
|
||||
amqp_channel:call(Ch1, #'queue.declare'{queue = Q1}),
|
||||
|
||||
Q2 = <<"failing-q">>,
|
||||
Q2 = <<"failing-with-amqp-error-q">>,
|
||||
try
|
||||
amqp_channel:call(Ch1, #'queue.declare'{queue = Q2})
|
||||
catch
|
||||
|
@ -145,6 +146,55 @@ register_interceptor_failing_with_amqp_error1(Config, Interceptor) ->
|
|||
|
||||
passed.
|
||||
|
||||
register_interceptor_crashing_with_amqp_error_exception(Config) ->
|
||||
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
|
||||
?MODULE, register_interceptor_crashing_with_amqp_error_exception1,
|
||||
[Config, dummy_interceptor]).
|
||||
|
||||
register_interceptor_crashing_with_amqp_error_exception1(Config, Interceptor) ->
|
||||
PredefinedChannels = rabbit_channel:list(),
|
||||
|
||||
Ch1 = rabbit_ct_client_helpers:open_channel(Config, 0),
|
||||
|
||||
[ChannelProc] = rabbit_channel:list() -- PredefinedChannels,
|
||||
|
||||
[{interceptors, []}] = rabbit_channel:info(ChannelProc, [interceptors]),
|
||||
|
||||
ok = rabbit_registry:register(channel_interceptor,
|
||||
<<"dummy interceptor">>,
|
||||
Interceptor),
|
||||
[{interceptors, [{Interceptor, undefined}]}] =
|
||||
rabbit_channel:info(ChannelProc, [interceptors]),
|
||||
|
||||
Q1 = <<"succeeding-q">>,
|
||||
#'queue.declare_ok'{} =
|
||||
amqp_channel:call(Ch1, #'queue.declare'{queue = Q1}),
|
||||
|
||||
Q2 = <<"crashing-with-amqp-exception-q">>,
|
||||
try
|
||||
amqp_channel:call(Ch1, #'queue.declare'{queue = Q2})
|
||||
catch
|
||||
_:Reason ->
|
||||
?assertMatch(
|
||||
{{shutdown, {_, _, <<"PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'crashing-with-amqp-exception-q' in vhost '/': received 'false' but current is 'true'">>}}, _},
|
||||
Reason)
|
||||
end,
|
||||
|
||||
Ch2 = rabbit_ct_client_helpers:open_channel(Config, 0),
|
||||
[ChannelProc1] = rabbit_channel:list() -- PredefinedChannels,
|
||||
|
||||
ok = rabbit_registry:unregister(channel_interceptor,
|
||||
<<"dummy interceptor">>),
|
||||
[{interceptors, []}] = rabbit_channel:info(ChannelProc1, [interceptors]),
|
||||
|
||||
#'queue.declare_ok'{} =
|
||||
amqp_channel:call(Ch2, #'queue.declare'{queue = Q2}),
|
||||
|
||||
#'queue.delete_ok'{} = amqp_channel:call(Ch2, #'queue.delete' {queue = Q1}),
|
||||
#'queue.delete_ok'{} = amqp_channel:call(Ch2, #'queue.delete' {queue = Q2}),
|
||||
|
||||
passed.
|
||||
|
||||
register_failing_interceptors(Config) ->
|
||||
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
|
||||
?MODULE, register_interceptor1, [Config, failing_dummy_interceptor]).
|
||||
|
|
|
@ -20,11 +20,15 @@ intercept(#'basic.publish'{} = Method, Content, _IState) ->
|
|||
{Method, Content2};
|
||||
|
||||
%% Use 'queue.declare' to test #amqp_error{} handling
|
||||
intercept(#'queue.declare'{queue = <<"failing-q">>}, _Content, _IState) ->
|
||||
intercept(#'queue.declare'{queue = <<"failing-with-amqp-error-q">>}, _Content, _IState) ->
|
||||
rabbit_misc:amqp_error(
|
||||
'precondition_failed', "operation not allowed", [],
|
||||
'queue.declare');
|
||||
|
||||
intercept(#'queue.declare'{queue = QName = <<"crashing-with-amqp-exception-q">>}, _Content, _IState) ->
|
||||
QRes = rabbit_misc:r(<<"/">>, queue, QName),
|
||||
rabbit_misc:assert_field_equivalence(true, false, QRes, durable);
|
||||
|
||||
intercept(Method, Content, _VHost) ->
|
||||
{Method, Content}.
|
||||
|
||||
|
|
Loading…
Reference in New Issue