Support MQTT 5.0 Subscription Option Retain Handling

The MQTT v5 spec is a bit vague on Retain Handling 1:
"If Retain Handling is set to 1 then if the subscription did not
already exist, the Server MUST send all retained message matching the
Topic Filter of the subscription to the Client, and if the subscription
did exist the Server MUST NOT send the retained messages.
[MQTT-3.3.1-10]." [v5 3.3.1.3]

Does a subscription with the same topic filter but different
subscription options mean that "the subscription did exist"?

This commit interprets "subscription exists" as both topic filter and
subscription options must be the same.

Therefore, if a client creates a subscription with a topic filter that
is identical to a previous subscription and subscription options that
are different and Retain Handling 1, the server sends the retained
message.
This commit is contained in:
David Ansari 2023-05-09 09:59:10 +00:00 committed by Chunyi Lyu
parent e2b545f270
commit ce573c35fa
3 changed files with 131 additions and 65 deletions

View File

@ -195,7 +195,9 @@
id :: option(subscription_identifier())
}).
-record(mqtt_subscription, {topic_filter :: binary(),
-type topic_filter() :: binary().
-record(mqtt_subscription, {topic_filter :: topic_filter(),
options :: #mqtt_subscription_opts{}
}).
@ -211,7 +213,7 @@
-record(mqtt_packet_unsubscribe, {packet_id :: packet_id(),
props :: properties(),
topic_filters :: [binary(), ...]
topic_filters :: [topic_filter(), ...]
}).
-record(mqtt_packet_unsuback, {packet_id :: packet_id(),

View File

@ -41,7 +41,7 @@
-type send_fun() :: fun((iodata()) -> ok).
-type session_expiry_interval() :: non_neg_integer() | infinity.
-type subscriptions() :: #{TopicFilter :: binary() => #mqtt_subscription_opts{}}.
-type subscriptions() :: #{topic_filter() => #mqtt_subscription_opts{}}.
-record(auth_state,
{user :: #user{},
@ -404,58 +404,70 @@ process_request(?SUBSCRIBE,
packet_id = SubscribePktId,
subscriptions = Subscriptions},
payload = undefined},
#state{cfg = #cfg{retainer_pid = RPid,
proto_ver = ProtoVer}} = State0) ->
#state{cfg = #cfg{proto_ver = ProtoVer}} = State0) ->
?LOG_DEBUG("Received a SUBSCRIBE with subscription(s) ~p", [Subscriptions]),
{Result, State1} =
{ResultRev, RetainedRev, State1} =
lists:foldl(
fun(_Subscription, {[{error, _} = E | _] = L, S}) ->
fun(_Subscription, {[{error, _} = E | _] = L, R, S}) ->
%% Once a subscription failed, mark all following subscriptions
%% as failed instead of creating bindings because we are going
%% to close the client connection anyway.
{[E | L], S};
(#mqtt_subscription{topic_filter = Topic,
options = Opts0},
{L, S0}) ->
QoS = maybe_downgrade_qos(Opts0#mqtt_subscription_opts.qos),
{[E | L], R, S};
(#mqtt_subscription{topic_filter = TopicFilter,
options = Opts0 = #mqtt_subscription_opts{
qos = Qos0,
retain_handling = Rh}},
{L0, R0, S0}) ->
QoS = maybe_downgrade_qos(Qos0),
Opts = Opts0#mqtt_subscription_opts{qos = QoS},
maybe
{ok, Q} ?= ensure_queue(QoS, S0),
QName = amqqueue:get_name(Q),
BindingArgs = binding_args_for_proto_ver(ProtoVer, Opts),
ok ?= add_subscription(Topic, BindingArgs, QName, S0),
ok ?= maybe_delete_old_subscription(Topic, Opts, S0),
Subs = maps:put(Topic, Opts, S0#state.subscriptions),
S1 = S0#state{subscriptions = Subs},
maybe_increment_consumer(S0, S1),
case self_consumes(Q) of
false ->
case consume(Q, QoS, S1) of
{ok, S2} ->
{[QoS | L], S2};
{error, _} = E1 ->
{[E1 | L], S1}
end;
true ->
{[QoS | L], S1}
end
else
{error, _} = E2 -> {[E2 | L], S0}
L = [QoS | L0],
R1 = [{TopicFilter, QoS} | R0],
case S0#state.subscriptions of
#{TopicFilter := Opts} ->
R = if Rh =:= 0 -> R1;
Rh > 0 -> R0
end,
{L, R, S0};
_ ->
maybe
{ok, Q} ?= ensure_queue(QoS, S0),
QName = amqqueue:get_name(Q),
BindingArgs = binding_args_for_proto_ver(ProtoVer, Opts),
ok ?= add_subscription(TopicFilter, BindingArgs, QName, S0),
ok ?= maybe_delete_old_subscription(TopicFilter, Opts, S0),
Subs = maps:put(TopicFilter, Opts, S0#state.subscriptions),
S1 = S0#state{subscriptions = Subs},
maybe_increment_consumer(S0, S1),
R = if Rh < 2 -> R1;
Rh =:= 2 -> R0
end,
case self_consumes(Q) of
false ->
case consume(Q, QoS, S1) of
{ok, S2} ->
{L, R, S2};
{error, _} = E1 ->
{[E1 | L0], R, S1}
end;
true ->
{L, R, S1}
end
else
{error, _} = E2 -> {[E2 | L0], R0, S0}
end
end
end, {[], State0}, Subscriptions),
ReasonCodes = subscribe_result_to_reason_codes(Result, ProtoVer),
end, {[], [], State0}, Subscriptions),
ReasonCodesRev = subscribe_result_to_reason_codes(ResultRev, ProtoVer),
Reply = #mqtt_packet{fixed = #mqtt_packet_fixed{type = ?SUBACK},
variable = #mqtt_packet_suback{
packet_id = SubscribePktId,
reason_codes = lists:reverse(ReasonCodes)}},
reason_codes = lists:reverse(ReasonCodesRev)}},
_ = send(Reply, State1),
case Result of
[{error, _} | _] ->
case hd(ResultRev) of
{error, _} ->
{error, subscribe_error, State1};
_ ->
State = lists:foldl(fun(Sub, S) ->
maybe_send_retained_message(RPid, Sub, S)
end, State1, Subscriptions),
State = send_retained_messages(lists:reverse(RetainedRev), State1),
{ok, State}
end;
@ -894,14 +906,18 @@ hand_off_to_retainer(RetainerPid, Topic0, Msg = #mqtt_msg{payload = Payload}) ->
rabbit_mqtt_retainer:retain(RetainerPid, Topic, Msg)
end.
maybe_send_retained_message(
RPid,
#mqtt_subscription{topic_filter = Topic0,
options = #mqtt_subscription_opts{
qos = SubscribeQos}},
State0 = #state{packet_id = PacketId0}) ->
Topic = amqp_to_mqtt(Topic0),
case rabbit_mqtt_retainer:fetch(RPid, Topic) of
-spec send_retained_messages([{topic_filter(), qos()}], state()) -> state().
send_retained_messages(Subscriptions, State) ->
lists:foldl(fun({TopicFilter, Qos}, S) ->
send_retained_message(TopicFilter, Qos, S)
end, State, Subscriptions).
-spec send_retained_message(topic_filter(), qos(), state()) -> state().
send_retained_message(TopicFilter0, SubscribeQos,
State0 = #state{packet_id = PacketId0,
cfg = #cfg{retainer_pid = RPid}}) ->
TopicFilter = amqp_to_mqtt(TopicFilter0),
case rabbit_mqtt_retainer:fetch(RPid, TopicFilter) of
undefined ->
State0;
#mqtt_msg{qos = MsgQos,
@ -913,20 +929,24 @@ maybe_send_retained_message(
?QOS_0 ->
{undefined, State0};
?QOS_1 ->
{PacketId0, State0#state{packet_id = increment_packet_id(PacketId0)}}
{PacketId0,
State0#state{packet_id = increment_packet_id(PacketId0)}}
end,
Packet = #mqtt_packet{fixed = #mqtt_packet_fixed{
type = ?PUBLISH,
qos = Qos,
dup = false,
retain = Retain
},
variable = #mqtt_packet_publish{
packet_id = PacketId,
topic_name = Topic,
props = Props
},
payload = Payload},
Packet = #mqtt_packet{
fixed = #mqtt_packet_fixed{
type = ?PUBLISH,
qos = Qos,
dup = false,
retain = Retain
},
variable = #mqtt_packet_publish{
packet_id = PacketId,
%% Wildcards are currently not supported when fetching retained
%% messages. Therefore, TopicFilter must must be a topic name.
topic_name = TopicFilter,
props = Props
},
payload = Payload},
_ = send(Packet, State),
State
end.

View File

@ -79,6 +79,7 @@ cluster_size_1_tests() ->
subscription_option_no_local_wildcards,
subscription_option_retain_as_published,
subscription_option_retain_as_published_wildcards,
subscription_option_retain_handling,
subscription_identifier,
subscription_identifier_amqp091,
subscription_identifier_at_most_once_dead_letter,
@ -659,6 +660,49 @@ subscription_option_retain_as_published_wildcards(Config) ->
ok = emqtt:publish(C, <<"t/2">>, <<"">>, [{retain, true}]),
ok = emqtt:disconnect(C).
subscription_option_retain_handling(Config) ->
ClientId = ?FUNCTION_NAME,
C1 = connect(ClientId, Config, non_clean_sess_opts()),
{ok, _} = emqtt:publish(C1, <<"t/1">>, <<"m1">>, [{retain, true}, {qos, 1}]),
{ok, _} = emqtt:publish(C1, <<"t/2">>, <<"m2">>, [{retain, true}, {qos, 1}]),
{ok, _} = emqtt:publish(C1, <<"t/3">>, <<"m3">>, [{retain, true}, {qos, 1}]),
{ok, _, [1, 1, 1]} = emqtt:subscribe(C1, [{<<"t/1">>, [{rh, 0}, {qos, 1}]},
%% Subscription does not exist.
{<<"t/2">>, [{rh, 1}, {qos, 1}]},
{<<"t/3">>, [{rh, 2}, {qos, 1}]}]),
ok = expect_publishes(C1, <<"t/1">>, [<<"m1">>]),
ok = expect_publishes(C1, <<"t/2">>, [<<"m2">>]),
assert_nothing_received(),
{ok, _, [1, 1, 1]} = emqtt:subscribe(C1, [{<<"t/1">>, [{rh, 0}, {qos, 1}]},
%% Subscription exists.
{<<"t/2">>, [{rh, 1}, {qos, 1}]},
{<<"t/3">>, [{rh, 2}, {qos, 1}]}]),
ok = expect_publishes(C1, <<"t/1">>, [<<"m1">>]),
assert_nothing_received(),
{ok, _, [0, 0, 0]} = emqtt:subscribe(C1, [{<<"t/1">>, [{rh, 0}, {qos, 0}]},
%% That specific subscription does not exist.
{<<"t/2">>, [{rh, 1}, {qos, 0}]},
{<<"t/3">>, [{rh, 2}, {qos, 0}]}]),
ok = expect_publishes(C1, <<"t/1">>, [<<"m1">>]),
ok = expect_publishes(C1, <<"t/2">>, [<<"m2">>]),
assert_nothing_received(),
ok = emqtt:disconnect(C1),
C2 = connect(ClientId, Config, [{clean_start, false}]),
{ok, _, [0, 0, 0]} = emqtt:subscribe(C2, [{<<"t/1">>, [{rh, 0}, {qos, 0}]},
%% Subscription exists.
{<<"t/2">>, [{rh, 1}, {qos, 0}]},
{<<"t/3">>, [{rh, 2}, {qos, 0}]}]),
ok = expect_publishes(C2, <<"t/1">>, [<<"m1">>]),
assert_nothing_received(),
ok = emqtt:publish(C2, <<"t/1">>, <<"">>, [{retain, true}]),
ok = emqtt:publish(C2, <<"t/2">>, <<"">>, [{retain, true}]),
ok = emqtt:publish(C2, <<"t/3">>, <<"">>, [{retain, true}]),
ok = emqtt:disconnect(C2).
subscription_identifier(Config) ->
C1 = connect(<<"c1">>, Config),
C2 = connect(<<"c2">>, Config),
@ -944,7 +988,7 @@ subscription_options_modify_qos(Qos, Config) ->
assert_received_no_duplicates();
1 ->
ExpectedPayloads = [integer_to_binary(I) || I <- lists:seq(2, NumSent - 1)],
ok = util:expect_publishes(Sub, Topic, ExpectedPayloads)
ok = expect_publishes(Sub, Topic, ExpectedPayloads)
end,
ok = emqtt:disconnect(Pub),
ok = emqtt:disconnect(Sub).
@ -990,7 +1034,7 @@ session_upgrade_v3_v5_qos(Qos, Config) ->
assert_received_no_duplicates();
1 ->
ExpectedPayloads = [integer_to_binary(I) || I <- lists:seq(2, NumSent - 1)],
ok = util:expect_publishes(Subv5, Topic, ExpectedPayloads)
ok = expect_publishes(Subv5, Topic, ExpectedPayloads)
end,
ok = emqtt:disconnect(Pub),
ok = emqtt:disconnect(Subv5).