Add test case for binding args Khepri regression
This commit adds a test case for a regression/bug that occurs in Khepri. ``` make -C deps/rabbit ct-bindings t=cluster:binding_args RABBITMQ_METADATA_STORE=mnesia ``` succeeds, but ``` make -C deps/rabbit ct-bindings t=cluster:binding_args RABBITMQ_METADATA_STORE=khepri ``` fails. The problem is that ETS table `rabbit_khepri_index_route` cannot differentiate between two bindings with different binding arguments, and therefore deletes entries too early, leading to wrong routing decisions. The solution to this bug is to include the binding arguments in the `rabbit_khepri_index_route` projection, similar to how the binding args are also included in the `rabbit_index_route` Mnesia table. This bug/regression is an edge case and exists if the source exchange type is `direct` or `fanout` and if different bindings arguments are used by client apps. Note that such binding arguments are entirely ignored when RabbitMQ performs routing decisions for the `direct` or `fanout` exchange. However, there might be client apps that use binding arguments to add some metadata to the binding, for example `app-id` or `user` or `purpose` and might use this metadata as a form of reference counting in deciding when to delete `auto-delete` exchanges or just for informational/operational purposes.
This commit is contained in:
parent
e0bbd50322
commit
fe8b9df5e1
|
@ -37,6 +37,7 @@ groups() ->
|
||||||
all_tests() ->
|
all_tests() ->
|
||||||
[
|
[
|
||||||
%% Queue bindings
|
%% Queue bindings
|
||||||
|
binding_args,
|
||||||
bind_and_unbind,
|
bind_and_unbind,
|
||||||
bind_and_delete,
|
bind_and_delete,
|
||||||
bind_and_delete_source_exchange,
|
bind_and_delete_source_exchange,
|
||||||
|
@ -116,6 +117,56 @@ end_per_testcase(Testcase, Config) ->
|
||||||
%% -------------------------------------------------------------------
|
%% -------------------------------------------------------------------
|
||||||
%% Testcases.
|
%% Testcases.
|
||||||
%% -------------------------------------------------------------------
|
%% -------------------------------------------------------------------
|
||||||
|
|
||||||
|
binding_args(Config) ->
|
||||||
|
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
|
||||||
|
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
|
||||||
|
Q = ?config(queue_name, Config),
|
||||||
|
?assertEqual({'queue.declare_ok', Q, 0, 0}, declare(Ch, Q, [])),
|
||||||
|
|
||||||
|
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
|
||||||
|
amqp_channel:register_confirm_handler(Ch, self()),
|
||||||
|
|
||||||
|
%% Create two bindings that differ only in their binding arguments.
|
||||||
|
Exchange = <<"amq.direct">>,
|
||||||
|
RoutingKey = <<"some-key">>,
|
||||||
|
BindingArgs1 = [{<<"app">>, longstr, <<"app-1">>}],
|
||||||
|
BindingArgs2 = [{<<"app">>, longstr, <<"app-2">>}],
|
||||||
|
#'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{exchange = Exchange,
|
||||||
|
routing_key = RoutingKey,
|
||||||
|
queue = Q,
|
||||||
|
arguments = BindingArgs1}),
|
||||||
|
#'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{exchange = Exchange,
|
||||||
|
routing_key = RoutingKey,
|
||||||
|
queue = Q,
|
||||||
|
arguments = BindingArgs2}),
|
||||||
|
ok = amqp_channel:cast(Ch,
|
||||||
|
#'basic.publish'{exchange = Exchange,
|
||||||
|
routing_key = RoutingKey},
|
||||||
|
#amqp_msg{payload = <<"m1">>}),
|
||||||
|
receive #'basic.ack'{} -> ok
|
||||||
|
after 9000 -> ct:fail(confirm_timeout)
|
||||||
|
end,
|
||||||
|
|
||||||
|
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m1">>}},
|
||||||
|
amqp_channel:call(Ch, #'basic.get'{queue = Q, no_ack = true})),
|
||||||
|
|
||||||
|
%% If we delete the 1st binding, we expect RabbitMQ to still route via the 2nd binding.
|
||||||
|
#'queue.unbind_ok'{} = amqp_channel:call(Ch, #'queue.unbind'{exchange = Exchange,
|
||||||
|
routing_key = RoutingKey,
|
||||||
|
queue = Q,
|
||||||
|
arguments = BindingArgs1}),
|
||||||
|
ok = amqp_channel:cast(Ch,
|
||||||
|
#'basic.publish'{exchange = Exchange,
|
||||||
|
routing_key = RoutingKey},
|
||||||
|
#amqp_msg{payload = <<"m2">>}),
|
||||||
|
receive #'basic.ack'{} -> ok
|
||||||
|
after 9000 -> ct:fail(confirm_timeout)
|
||||||
|
end,
|
||||||
|
|
||||||
|
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m2">>}},
|
||||||
|
amqp_channel:call(Ch, #'basic.get'{queue = Q, no_ack = true})).
|
||||||
|
|
||||||
bind_and_unbind(Config) ->
|
bind_and_unbind(Config) ->
|
||||||
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
|
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue