diff --git a/deps/rabbitmq_mqtt/include/rabbit_mqtt_packet.hrl b/deps/rabbitmq_mqtt/include/rabbit_mqtt_packet.hrl index 28e87350a7..2fe9721b8b 100644 --- a/deps/rabbitmq_mqtt/include/rabbit_mqtt_packet.hrl +++ b/deps/rabbitmq_mqtt/include/rabbit_mqtt_packet.hrl @@ -195,7 +195,9 @@ id :: option(subscription_identifier()) }). --record(mqtt_subscription, {topic_filter :: binary(), +-type topic_filter() :: binary(). + +-record(mqtt_subscription, {topic_filter :: topic_filter(), options :: #mqtt_subscription_opts{} }). @@ -211,7 +213,7 @@ -record(mqtt_packet_unsubscribe, {packet_id :: packet_id(), props :: properties(), - topic_filters :: [binary(), ...] + topic_filters :: [topic_filter(), ...] }). -record(mqtt_packet_unsuback, {packet_id :: packet_id(), diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl index 8fa3939328..3f714577dc 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl @@ -41,7 +41,7 @@ -type send_fun() :: fun((iodata()) -> ok). -type session_expiry_interval() :: non_neg_integer() | infinity. --type subscriptions() :: #{TopicFilter :: binary() => #mqtt_subscription_opts{}}. +-type subscriptions() :: #{topic_filter() => #mqtt_subscription_opts{}}. -record(auth_state, {user :: #user{}, @@ -404,58 +404,70 @@ process_request(?SUBSCRIBE, packet_id = SubscribePktId, subscriptions = Subscriptions}, payload = undefined}, - #state{cfg = #cfg{retainer_pid = RPid, - proto_ver = ProtoVer}} = State0) -> + #state{cfg = #cfg{proto_ver = ProtoVer}} = State0) -> ?LOG_DEBUG("Received a SUBSCRIBE with subscription(s) ~p", [Subscriptions]), - {Result, State1} = + {ResultRev, RetainedRev, State1} = lists:foldl( - fun(_Subscription, {[{error, _} = E | _] = L, S}) -> + fun(_Subscription, {[{error, _} = E | _] = L, R, S}) -> %% Once a subscription failed, mark all following subscriptions %% as failed instead of creating bindings because we are going %% to close the client connection anyway. - {[E | L], S}; - (#mqtt_subscription{topic_filter = Topic, - options = Opts0}, - {L, S0}) -> - QoS = maybe_downgrade_qos(Opts0#mqtt_subscription_opts.qos), + {[E | L], R, S}; + (#mqtt_subscription{topic_filter = TopicFilter, + options = Opts0 = #mqtt_subscription_opts{ + qos = Qos0, + retain_handling = Rh}}, + {L0, R0, S0}) -> + QoS = maybe_downgrade_qos(Qos0), Opts = Opts0#mqtt_subscription_opts{qos = QoS}, - maybe - {ok, Q} ?= ensure_queue(QoS, S0), - QName = amqqueue:get_name(Q), - BindingArgs = binding_args_for_proto_ver(ProtoVer, Opts), - ok ?= add_subscription(Topic, BindingArgs, QName, S0), - ok ?= maybe_delete_old_subscription(Topic, Opts, S0), - Subs = maps:put(Topic, Opts, S0#state.subscriptions), - S1 = S0#state{subscriptions = Subs}, - maybe_increment_consumer(S0, S1), - case self_consumes(Q) of - false -> - case consume(Q, QoS, S1) of - {ok, S2} -> - {[QoS | L], S2}; - {error, _} = E1 -> - {[E1 | L], S1} - end; - true -> - {[QoS | L], S1} - end - else - {error, _} = E2 -> {[E2 | L], S0} + L = [QoS | L0], + R1 = [{TopicFilter, QoS} | R0], + case S0#state.subscriptions of + #{TopicFilter := Opts} -> + R = if Rh =:= 0 -> R1; + Rh > 0 -> R0 + end, + {L, R, S0}; + _ -> + maybe + {ok, Q} ?= ensure_queue(QoS, S0), + QName = amqqueue:get_name(Q), + BindingArgs = binding_args_for_proto_ver(ProtoVer, Opts), + ok ?= add_subscription(TopicFilter, BindingArgs, QName, S0), + ok ?= maybe_delete_old_subscription(TopicFilter, Opts, S0), + Subs = maps:put(TopicFilter, Opts, S0#state.subscriptions), + S1 = S0#state{subscriptions = Subs}, + maybe_increment_consumer(S0, S1), + R = if Rh < 2 -> R1; + Rh =:= 2 -> R0 + end, + case self_consumes(Q) of + false -> + case consume(Q, QoS, S1) of + {ok, S2} -> + {L, R, S2}; + {error, _} = E1 -> + {[E1 | L0], R, S1} + end; + true -> + {L, R, S1} + end + else + {error, _} = E2 -> {[E2 | L0], R0, S0} + end end - end, {[], State0}, Subscriptions), - ReasonCodes = subscribe_result_to_reason_codes(Result, ProtoVer), + end, {[], [], State0}, Subscriptions), + ReasonCodesRev = subscribe_result_to_reason_codes(ResultRev, ProtoVer), Reply = #mqtt_packet{fixed = #mqtt_packet_fixed{type = ?SUBACK}, variable = #mqtt_packet_suback{ packet_id = SubscribePktId, - reason_codes = lists:reverse(ReasonCodes)}}, + reason_codes = lists:reverse(ReasonCodesRev)}}, _ = send(Reply, State1), - case Result of - [{error, _} | _] -> + case hd(ResultRev) of + {error, _} -> {error, subscribe_error, State1}; _ -> - State = lists:foldl(fun(Sub, S) -> - maybe_send_retained_message(RPid, Sub, S) - end, State1, Subscriptions), + State = send_retained_messages(lists:reverse(RetainedRev), State1), {ok, State} end; @@ -894,14 +906,18 @@ hand_off_to_retainer(RetainerPid, Topic0, Msg = #mqtt_msg{payload = Payload}) -> rabbit_mqtt_retainer:retain(RetainerPid, Topic, Msg) end. -maybe_send_retained_message( - RPid, - #mqtt_subscription{topic_filter = Topic0, - options = #mqtt_subscription_opts{ - qos = SubscribeQos}}, - State0 = #state{packet_id = PacketId0}) -> - Topic = amqp_to_mqtt(Topic0), - case rabbit_mqtt_retainer:fetch(RPid, Topic) of +-spec send_retained_messages([{topic_filter(), qos()}], state()) -> state(). +send_retained_messages(Subscriptions, State) -> + lists:foldl(fun({TopicFilter, Qos}, S) -> + send_retained_message(TopicFilter, Qos, S) + end, State, Subscriptions). + +-spec send_retained_message(topic_filter(), qos(), state()) -> state(). +send_retained_message(TopicFilter0, SubscribeQos, + State0 = #state{packet_id = PacketId0, + cfg = #cfg{retainer_pid = RPid}}) -> + TopicFilter = amqp_to_mqtt(TopicFilter0), + case rabbit_mqtt_retainer:fetch(RPid, TopicFilter) of undefined -> State0; #mqtt_msg{qos = MsgQos, @@ -913,20 +929,24 @@ maybe_send_retained_message( ?QOS_0 -> {undefined, State0}; ?QOS_1 -> - {PacketId0, State0#state{packet_id = increment_packet_id(PacketId0)}} + {PacketId0, + State0#state{packet_id = increment_packet_id(PacketId0)}} end, - Packet = #mqtt_packet{fixed = #mqtt_packet_fixed{ - type = ?PUBLISH, - qos = Qos, - dup = false, - retain = Retain - }, - variable = #mqtt_packet_publish{ - packet_id = PacketId, - topic_name = Topic, - props = Props - }, - payload = Payload}, + Packet = #mqtt_packet{ + fixed = #mqtt_packet_fixed{ + type = ?PUBLISH, + qos = Qos, + dup = false, + retain = Retain + }, + variable = #mqtt_packet_publish{ + packet_id = PacketId, + %% Wildcards are currently not supported when fetching retained + %% messages. Therefore, TopicFilter must must be a topic name. + topic_name = TopicFilter, + props = Props + }, + payload = Payload}, _ = send(Packet, State), State end. diff --git a/deps/rabbitmq_mqtt/test/v5_SUITE.erl b/deps/rabbitmq_mqtt/test/v5_SUITE.erl index 71f68bd289..0627136c5a 100644 --- a/deps/rabbitmq_mqtt/test/v5_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/v5_SUITE.erl @@ -79,6 +79,7 @@ cluster_size_1_tests() -> subscription_option_no_local_wildcards, subscription_option_retain_as_published, subscription_option_retain_as_published_wildcards, + subscription_option_retain_handling, subscription_identifier, subscription_identifier_amqp091, subscription_identifier_at_most_once_dead_letter, @@ -659,6 +660,49 @@ subscription_option_retain_as_published_wildcards(Config) -> ok = emqtt:publish(C, <<"t/2">>, <<"">>, [{retain, true}]), ok = emqtt:disconnect(C). +subscription_option_retain_handling(Config) -> + ClientId = ?FUNCTION_NAME, + C1 = connect(ClientId, Config, non_clean_sess_opts()), + {ok, _} = emqtt:publish(C1, <<"t/1">>, <<"m1">>, [{retain, true}, {qos, 1}]), + {ok, _} = emqtt:publish(C1, <<"t/2">>, <<"m2">>, [{retain, true}, {qos, 1}]), + {ok, _} = emqtt:publish(C1, <<"t/3">>, <<"m3">>, [{retain, true}, {qos, 1}]), + {ok, _, [1, 1, 1]} = emqtt:subscribe(C1, [{<<"t/1">>, [{rh, 0}, {qos, 1}]}, + %% Subscription does not exist. + {<<"t/2">>, [{rh, 1}, {qos, 1}]}, + {<<"t/3">>, [{rh, 2}, {qos, 1}]}]), + ok = expect_publishes(C1, <<"t/1">>, [<<"m1">>]), + ok = expect_publishes(C1, <<"t/2">>, [<<"m2">>]), + assert_nothing_received(), + + {ok, _, [1, 1, 1]} = emqtt:subscribe(C1, [{<<"t/1">>, [{rh, 0}, {qos, 1}]}, + %% Subscription exists. + {<<"t/2">>, [{rh, 1}, {qos, 1}]}, + {<<"t/3">>, [{rh, 2}, {qos, 1}]}]), + ok = expect_publishes(C1, <<"t/1">>, [<<"m1">>]), + assert_nothing_received(), + + {ok, _, [0, 0, 0]} = emqtt:subscribe(C1, [{<<"t/1">>, [{rh, 0}, {qos, 0}]}, + %% That specific subscription does not exist. + {<<"t/2">>, [{rh, 1}, {qos, 0}]}, + {<<"t/3">>, [{rh, 2}, {qos, 0}]}]), + ok = expect_publishes(C1, <<"t/1">>, [<<"m1">>]), + ok = expect_publishes(C1, <<"t/2">>, [<<"m2">>]), + assert_nothing_received(), + + ok = emqtt:disconnect(C1), + C2 = connect(ClientId, Config, [{clean_start, false}]), + {ok, _, [0, 0, 0]} = emqtt:subscribe(C2, [{<<"t/1">>, [{rh, 0}, {qos, 0}]}, + %% Subscription exists. + {<<"t/2">>, [{rh, 1}, {qos, 0}]}, + {<<"t/3">>, [{rh, 2}, {qos, 0}]}]), + ok = expect_publishes(C2, <<"t/1">>, [<<"m1">>]), + assert_nothing_received(), + + ok = emqtt:publish(C2, <<"t/1">>, <<"">>, [{retain, true}]), + ok = emqtt:publish(C2, <<"t/2">>, <<"">>, [{retain, true}]), + ok = emqtt:publish(C2, <<"t/3">>, <<"">>, [{retain, true}]), + ok = emqtt:disconnect(C2). + subscription_identifier(Config) -> C1 = connect(<<"c1">>, Config), C2 = connect(<<"c2">>, Config), @@ -944,7 +988,7 @@ subscription_options_modify_qos(Qos, Config) -> assert_received_no_duplicates(); 1 -> ExpectedPayloads = [integer_to_binary(I) || I <- lists:seq(2, NumSent - 1)], - ok = util:expect_publishes(Sub, Topic, ExpectedPayloads) + ok = expect_publishes(Sub, Topic, ExpectedPayloads) end, ok = emqtt:disconnect(Pub), ok = emqtt:disconnect(Sub). @@ -990,7 +1034,7 @@ session_upgrade_v3_v5_qos(Qos, Config) -> assert_received_no_duplicates(); 1 -> ExpectedPayloads = [integer_to_binary(I) || I <- lists:seq(2, NumSent - 1)], - ok = util:expect_publishes(Subv5, Topic, ExpectedPayloads) + ok = expect_publishes(Subv5, Topic, ExpectedPayloads) end, ok = emqtt:disconnect(Pub), ok = emqtt:disconnect(Subv5).