Merge pull request #2989 from Ayanda-D/channel-interceptor-amqp-errors
Allow AMQP error responses in channel interceptors
This commit is contained in:
		
						commit
						b5ed4e7ca1
					
				| 
						 | 
					@ -29,7 +29,7 @@
 | 
				
			||||||
-callback init(rabbit_channel:channel()) -> interceptor_state().
 | 
					-callback init(rabbit_channel:channel()) -> interceptor_state().
 | 
				
			||||||
-callback intercept(original_method(), original_content(),
 | 
					-callback intercept(original_method(), original_content(),
 | 
				
			||||||
                    interceptor_state()) ->
 | 
					                    interceptor_state()) ->
 | 
				
			||||||
    {processed_method(), processed_content()} |
 | 
					    {processed_method(), processed_content()} | rabbit_types:amqp_error() |
 | 
				
			||||||
    rabbit_misc:channel_or_connection_exit().
 | 
					    rabbit_misc:channel_or_connection_exit().
 | 
				
			||||||
-callback applies_to() -> list(method_name()).
 | 
					-callback applies_to() -> list(method_name()).
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -88,7 +88,9 @@ validate_response(Mod, M1, C1, R = {M2, C2}) ->
 | 
				
			||||||
                                "content iff content is provided but "
 | 
					                                "content iff content is provided but "
 | 
				
			||||||
                                "content in = ~p; content out = ~p",
 | 
					                                "content in = ~p; content out = ~p",
 | 
				
			||||||
                           [Mod, C1, C2])
 | 
					                           [Mod, C1, C2])
 | 
				
			||||||
    end.
 | 
					    end;
 | 
				
			||||||
 | 
					validate_response(_Mod, _M1, _C1, AMQPError = #amqp_error{}) ->
 | 
				
			||||||
 | 
					    internal_error(AMQPError).
 | 
				
			||||||
 | 
					
 | 
				
			||||||
validate_method(M, M2) ->
 | 
					validate_method(M, M2) ->
 | 
				
			||||||
    rabbit_misc:method_record_type(M) =:= rabbit_misc:method_record_type(M2).
 | 
					    rabbit_misc:method_record_type(M) =:= rabbit_misc:method_record_type(M2).
 | 
				
			||||||
| 
						 | 
					@ -98,6 +100,12 @@ validate_content(#content{}, #content{}) -> true;
 | 
				
			||||||
validate_content(_, _) -> false.
 | 
					validate_content(_, _) -> false.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
%% keep dialyzer happy
 | 
					%% keep dialyzer happy
 | 
				
			||||||
-spec internal_error(string(), [any()]) -> no_return().
 | 
					-spec internal_error(rabbit_types:amqp_error()) ->
 | 
				
			||||||
 | 
					  rabbit_misc:channel_or_connection_exit().
 | 
				
			||||||
 | 
					internal_error(AMQPError = #amqp_error{}) ->
 | 
				
			||||||
 | 
					    rabbit_misc:protocol_error(AMQPError).
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					-spec internal_error(string(), [any()]) ->
 | 
				
			||||||
 | 
					  rabbit_misc:channel_or_connection_exit().
 | 
				
			||||||
internal_error(Format, Args) ->
 | 
					internal_error(Format, Args) ->
 | 
				
			||||||
    rabbit_misc:protocol_error(internal_error, Format, Args).
 | 
					    rabbit_misc:protocol_error(internal_error, Format, Args).
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -9,6 +9,7 @@
 | 
				
			||||||
 | 
					
 | 
				
			||||||
-include_lib("common_test/include/ct.hrl").
 | 
					-include_lib("common_test/include/ct.hrl").
 | 
				
			||||||
-include_lib("amqp_client/include/amqp_client.hrl").
 | 
					-include_lib("amqp_client/include/amqp_client.hrl").
 | 
				
			||||||
 | 
					-include_lib("eunit/include/eunit.hrl").
 | 
				
			||||||
 | 
					
 | 
				
			||||||
-compile(export_all).
 | 
					-compile(export_all).
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -21,6 +22,7 @@ groups() ->
 | 
				
			||||||
    [
 | 
					    [
 | 
				
			||||||
      {non_parallel_tests, [], [
 | 
					      {non_parallel_tests, [], [
 | 
				
			||||||
          register_interceptor,
 | 
					          register_interceptor,
 | 
				
			||||||
 | 
					          register_interceptor_failing_with_amqp_error,
 | 
				
			||||||
          register_failing_interceptors
 | 
					          register_failing_interceptors
 | 
				
			||||||
        ]}
 | 
					        ]}
 | 
				
			||||||
    ].
 | 
					    ].
 | 
				
			||||||
| 
						 | 
					@ -94,6 +96,55 @@ register_interceptor1(Config, Interceptor) ->
 | 
				
			||||||
    check_send_receive(Ch1, QName, <<"bar">>, <<"bar">>),
 | 
					    check_send_receive(Ch1, QName, <<"bar">>, <<"bar">>),
 | 
				
			||||||
    passed.
 | 
					    passed.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					register_interceptor_failing_with_amqp_error(Config) ->
 | 
				
			||||||
 | 
					    passed = rabbit_ct_broker_helpers:rpc(Config, 0,
 | 
				
			||||||
 | 
					      ?MODULE, register_interceptor_failing_with_amqp_error1,
 | 
				
			||||||
 | 
					      [Config, dummy_interceptor]).
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					register_interceptor_failing_with_amqp_error1(Config, Interceptor) ->
 | 
				
			||||||
 | 
					    PredefinedChannels = rabbit_channel:list(),
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    Ch1 = rabbit_ct_client_helpers:open_channel(Config, 0),
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    [ChannelProc] = rabbit_channel:list() -- PredefinedChannels,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    [{interceptors, []}] = rabbit_channel:info(ChannelProc, [interceptors]),
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    ok = rabbit_registry:register(channel_interceptor,
 | 
				
			||||||
 | 
					                                  <<"dummy interceptor">>,
 | 
				
			||||||
 | 
					                                  Interceptor),
 | 
				
			||||||
 | 
					    [{interceptors, [{Interceptor, undefined}]}] =
 | 
				
			||||||
 | 
					      rabbit_channel:info(ChannelProc, [interceptors]),
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    Q1 = <<"succeeding-q">>,
 | 
				
			||||||
 | 
					    #'queue.declare_ok'{} =
 | 
				
			||||||
 | 
					        amqp_channel:call(Ch1, #'queue.declare'{queue = Q1}),
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    Q2 = <<"failing-q">>,
 | 
				
			||||||
 | 
					    try
 | 
				
			||||||
 | 
					        amqp_channel:call(Ch1, #'queue.declare'{queue = Q2})
 | 
				
			||||||
 | 
					    catch
 | 
				
			||||||
 | 
					      _:Reason ->
 | 
				
			||||||
 | 
					          ?assertMatch(
 | 
				
			||||||
 | 
					              {{shutdown, {_, _, <<"PRECONDITION_FAILED - operation not allowed">>}}, _},
 | 
				
			||||||
 | 
					              Reason)
 | 
				
			||||||
 | 
					    end,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    Ch2 = rabbit_ct_client_helpers:open_channel(Config, 0),
 | 
				
			||||||
 | 
					    [ChannelProc1] = rabbit_channel:list() -- PredefinedChannels,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    ok = rabbit_registry:unregister(channel_interceptor,
 | 
				
			||||||
 | 
					                                  <<"dummy interceptor">>),
 | 
				
			||||||
 | 
					    [{interceptors, []}] = rabbit_channel:info(ChannelProc1, [interceptors]),
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    #'queue.declare_ok'{} =
 | 
				
			||||||
 | 
					        amqp_channel:call(Ch2, #'queue.declare'{queue = Q2}),
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    #'queue.delete_ok'{} = amqp_channel:call(Ch2, #'queue.delete' {queue = Q1}),
 | 
				
			||||||
 | 
					    #'queue.delete_ok'{} = amqp_channel:call(Ch2, #'queue.delete' {queue = Q2}),
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    passed.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
register_failing_interceptors(Config) ->
 | 
					register_failing_interceptors(Config) ->
 | 
				
			||||||
    passed = rabbit_ct_broker_helpers:rpc(Config, 0,
 | 
					    passed = rabbit_ct_broker_helpers:rpc(Config, 0,
 | 
				
			||||||
      ?MODULE, register_interceptor1, [Config, failing_dummy_interceptor]).
 | 
					      ?MODULE, register_interceptor1, [Config, failing_dummy_interceptor]).
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -19,8 +19,14 @@ intercept(#'basic.publish'{} = Method, Content, _IState) ->
 | 
				
			||||||
    Content2 = Content#content{payload_fragments_rev = []},
 | 
					    Content2 = Content#content{payload_fragments_rev = []},
 | 
				
			||||||
    {Method, Content2};
 | 
					    {Method, Content2};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					%% Use 'queue.declare' to test #amqp_error{} handling
 | 
				
			||||||
 | 
					intercept(#'queue.declare'{queue = <<"failing-q">>}, _Content, _IState) ->
 | 
				
			||||||
 | 
					    rabbit_misc:amqp_error(
 | 
				
			||||||
 | 
					        'precondition_failed', "operation not allowed", [],
 | 
				
			||||||
 | 
					        'queue.declare');
 | 
				
			||||||
 | 
					
 | 
				
			||||||
intercept(Method, Content, _VHost) ->
 | 
					intercept(Method, Content, _VHost) ->
 | 
				
			||||||
    {Method, Content}.
 | 
					    {Method, Content}.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
applies_to() ->
 | 
					applies_to() ->
 | 
				
			||||||
    ['basic.publish'].
 | 
					    ['basic.publish', 'queue.declare'].
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue