diff --git a/deps/amqp10_client/src/amqp10_client_types.erl b/deps/amqp10_client/src/amqp10_client_types.erl index fed585ac97..5758012e93 100644 --- a/deps/amqp10_client/src/amqp10_client_types.erl +++ b/deps/amqp10_client/src/amqp10_client_types.erl @@ -82,8 +82,7 @@ utf8(B) when is_binary(B) -> {utf8, B}. uint(N) -> {uint, N}. make_properties(#{properties := Props}) - when is_map(Props) andalso - map_size(Props) > 0 -> + when map_size(Props) > 0 -> {map, maps:fold(fun(K, V, L) -> [{{symbol, K}, V} | L] end, [], Props)}; diff --git a/deps/rabbit/src/rabbit_amqp_session.erl b/deps/rabbit/src/rabbit_amqp_session.erl index 999ada9313..932eb24ca2 100644 --- a/deps/rabbit/src/rabbit_amqp_session.erl +++ b/deps/rabbit/src/rabbit_amqp_session.erl @@ -1086,7 +1086,7 @@ handle_control(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER, mode => Mode, consumer_tag => handle_to_ctag(HandleInt), exclusive_consume => false, - args => source_filters_to_consumer_args(Source), + args => consumer_arguments(Attach), ok_msg => undefined, acting_user => Username}, case rabbit_queue_type:consume(Q, Spec, QStates0) of @@ -2852,19 +2852,36 @@ encode_frames(T, Msg, MaxPayloadSize, Transfers) -> lists:reverse([[T, Msg] | Transfers]) end. -source_filters_to_consumer_args(#'v1_0.source'{filter = {map, KVList}}) -> - source_filters_to_consumer_args( +consumer_arguments(#'v1_0.attach'{ + source = #'v1_0.source'{filter = Filter}, + properties = Properties}) -> + properties_to_consumer_args(Properties) ++ + filter_to_consumer_args(Filter). + +properties_to_consumer_args({map, KVList}) -> + Key = {symbol, <<"rabbitmq:priority">>}, + case proplists:lookup(Key, KVList) of + {Key, Val = {int, _Prio}} -> + [mc_amqpl:to_091(<<"x-priority">>, Val)]; + _ -> + [] + end; +properties_to_consumer_args(_) -> + []. + +filter_to_consumer_args({map, KVList}) -> + filter_to_consumer_args( [<<"rabbitmq:stream-offset-spec">>, <<"rabbitmq:stream-filter">>, <<"rabbitmq:stream-match-unfiltered">>], KVList, []); -source_filters_to_consumer_args(_Source) -> +filter_to_consumer_args(_) -> []. -source_filters_to_consumer_args([], _KVList, Acc) -> +filter_to_consumer_args([], _KVList, Acc) -> Acc; -source_filters_to_consumer_args([<<"rabbitmq:stream-offset-spec">> = H | T], KVList, Acc) -> +filter_to_consumer_args([<<"rabbitmq:stream-offset-spec">> = H | T], KVList, Acc) -> Key = {symbol, H}, Arg = case keyfind_unpack_described(Key, KVList) of {_, {timestamp, Ts}} -> @@ -2876,8 +2893,8 @@ source_filters_to_consumer_args([<<"rabbitmq:stream-offset-spec">> = H | T], KVL _ -> [] end, - source_filters_to_consumer_args(T, KVList, Arg ++ Acc); -source_filters_to_consumer_args([<<"rabbitmq:stream-filter">> = H | T], KVList, Acc) -> + filter_to_consumer_args(T, KVList, Arg ++ Acc); +filter_to_consumer_args([<<"rabbitmq:stream-filter">> = H | T], KVList, Acc) -> Key = {symbol, H}, Arg = case keyfind_unpack_described(Key, KVList) of {_, {list, Filters0}} when is_list(Filters0) -> @@ -2892,8 +2909,8 @@ source_filters_to_consumer_args([<<"rabbitmq:stream-filter">> = H | T], KVList, _ -> [] end, - source_filters_to_consumer_args(T, KVList, Arg ++ Acc); -source_filters_to_consumer_args([<<"rabbitmq:stream-match-unfiltered">> = H | T], KVList, Acc) -> + filter_to_consumer_args(T, KVList, Arg ++ Acc); +filter_to_consumer_args([<<"rabbitmq:stream-match-unfiltered">> = H | T], KVList, Acc) -> Key = {symbol, H}, Arg = case keyfind_unpack_described(Key, KVList) of {_, MU} when is_boolean(MU) -> @@ -2901,9 +2918,9 @@ source_filters_to_consumer_args([<<"rabbitmq:stream-match-unfiltered">> = H | T] _ -> [] end, - source_filters_to_consumer_args(T, KVList, Arg ++ Acc); -source_filters_to_consumer_args([_ | T], KVList, Acc) -> - source_filters_to_consumer_args(T, KVList, Acc). + filter_to_consumer_args(T, KVList, Arg ++ Acc); +filter_to_consumer_args([_ | T], KVList, Acc) -> + filter_to_consumer_args(T, KVList, Acc). keyfind_unpack_described(Key, KvList) -> %% filterset values _should_ be described values diff --git a/deps/rabbit/src/rabbit_fifo.hrl b/deps/rabbit/src/rabbit_fifo.hrl index 076c3c80e0..92e15ef912 100644 --- a/deps/rabbit/src/rabbit_fifo.hrl +++ b/deps/rabbit/src/rabbit_fifo.hrl @@ -110,7 +110,7 @@ %% command: `{consumer_credit, ReceiverDeliveryCount, Credit}' credit_mode :: credit_mode(), % part of snapshot data lifetime = once :: once | auto, - priority = 0 :: non_neg_integer()}). + priority = 0 :: integer()}). -record(consumer, {cfg = #consumer_cfg{}, diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index 0ed9c9eb11..f48c6dcc88 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -81,6 +81,8 @@ groups() -> stop_classic_queue, stop_quorum_queue, stop_stream, + consumer_priority_classic_queue, + consumer_priority_quorum_queue, single_active_consumer_classic_queue, single_active_consumer_quorum_queue, detach_requeues_one_session_classic_queue, @@ -1841,6 +1843,95 @@ stop(QType, Config) -> #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), ok = rabbit_ct_client_helpers:close_channel(Ch). +consumer_priority_classic_queue(Config) -> + consumer_priority(<<"classic">>, Config). + +consumer_priority_quorum_queue(Config) -> + consumer_priority(<<"quorum">>, Config). + +consumer_priority(QType, Config) -> + QName = atom_to_binary(?FUNCTION_NAME), + {Connection, Session, LinkPair} = init(Config), + QProps = #{arguments => #{<<"x-queue-type">> => {utf8, QType}}}, + {ok, #{type := QType}} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, QProps), + + Address = rabbitmq_amqp_address:queue(QName), + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address), + ok = wait_for_credit(Sender), + + %% We test what our RabbitMQ docs state: + %% "Consumers which do not specify a value have priority 0. + %% Larger numbers indicate higher priority, and both positive and negative numbers can be used." + {ok, ReceiverDefaultPrio} = amqp10_client:attach_receiver_link( + Session, + <<"default prio consumer">>, + Address, + unsettled), + {ok, ReceiverHighPrio} = amqp10_client:attach_receiver_link( + Session, + <<"high prio consumer">>, + Address, + unsettled, + none, + #{}, + #{<<"rabbitmq:priority">> => {int, 2_000_000_000}}), + {ok, ReceiverLowPrio} = amqp10_client:attach_receiver_link( + Session, + <<"low prio consumer">>, + Address, + unsettled, + none, + #{}, + #{<<"rabbitmq:priority">> => {int, -2_000_000_000}}), + ok = amqp10_client:flow_link_credit(ReceiverDefaultPrio, 1, never), + ok = amqp10_client:flow_link_credit(ReceiverHighPrio, 2, never), + ok = amqp10_client:flow_link_credit(ReceiverLowPrio, 1, never), + + NumMsgs = 5, + [begin + Bin = integer_to_binary(N), + ok = amqp10_client:send_msg(Sender, amqp10_msg:new(Bin, Bin)) + end || N <- lists:seq(1, NumMsgs)], + ok = wait_for_accepts(NumMsgs), + + receive {amqp10_msg, Rec1, Msg1} -> + ?assertEqual(<<"1">>, amqp10_msg:body_bin(Msg1)), + ?assertEqual(ReceiverHighPrio, Rec1), + ok = amqp10_client:accept_msg(Rec1, Msg1) + after 5000 -> ct:fail({missing_msg, ?LINE}) + end, + receive {amqp10_msg, Rec2, Msg2} -> + ?assertEqual(<<"2">>, amqp10_msg:body_bin(Msg2)), + ?assertEqual(ReceiverHighPrio, Rec2), + ok = amqp10_client:accept_msg(Rec2, Msg2) + after 5000 -> ct:fail({missing_msg, ?LINE}) + end, + receive {amqp10_msg, Rec3, Msg3} -> + ?assertEqual(<<"3">>, amqp10_msg:body_bin(Msg3)), + ?assertEqual(ReceiverDefaultPrio, Rec3), + ok = amqp10_client:accept_msg(Rec3, Msg3) + after 5000 -> ct:fail({missing_msg, ?LINE}) + end, + receive {amqp10_msg, Rec4, Msg4} -> + ?assertEqual(<<"4">>, amqp10_msg:body_bin(Msg4)), + ?assertEqual(ReceiverLowPrio, Rec4), + ok = amqp10_client:accept_msg(Rec4, Msg4) + after 5000 -> ct:fail({missing_msg, ?LINE}) + end, + receive {amqp10_msg, _, _} = Unexpected -> + ct:fail({unexpected_msg, Unexpected, ?LINE}) + after 5 -> ok + end, + + ok = amqp10_client:detach_link(Sender), + ok = amqp10_client:detach_link(ReceiverDefaultPrio), + ok = amqp10_client:detach_link(ReceiverHighPrio), + ok = amqp10_client:detach_link(ReceiverLowPrio), + {ok, #{message_count := 1}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), + ok = end_session_sync(Session), + ok = amqp10_client:close_connection(Connection). + single_active_consumer_classic_queue(Config) -> single_active_consumer(<<"classic">>, Config). @@ -4899,7 +4990,6 @@ tcp_back_pressure_rabbitmq_internal_flow(QType, Config) -> ok = end_session_sync(Session), ok = amqp10_client:close_connection(Connection). - %% internal %%