Set topic alias when publish retained message

- remove topic alias from message props when storing retained msgs
- set topic alias for outbound before sending retained msgs
This commit is contained in:
Chunyi Lyu 2023-06-19 15:46:48 +01:00 committed by Chunyi Lyu
parent 0e00e3479e
commit 08fd9d00e3
2 changed files with 29 additions and 5 deletions

View File

@ -395,7 +395,7 @@ process_request(?PUBLISH,
dup = Dup,
packet_id = PacketId,
payload = Payload,
props = Props},
props = maps:remove('Topic-Alias', Props)},
case EffectiveQos of
?QOS_0 ->
publish_to_queues_with_checks(Msg, State);
@ -963,14 +963,15 @@ send_retained_message(TopicFilter0, SubscribeQos,
#mqtt_msg{qos = MsgQos,
retain = Retain,
payload = Payload,
props = Props} ->
props = Props0} ->
Qos = effective_qos(MsgQos, SubscribeQos),
{Topic, Props, State1} = process_topic_alias_outbound(TopicFilter, Props0, State0),
{PacketId, State} = case Qos of
?QOS_0 ->
{undefined, State0};
{undefined, State1};
?QOS_1 ->
{PacketId0,
State0#state{packet_id = increment_packet_id(PacketId0)}}
State1#state{packet_id = increment_packet_id(PacketId0)}}
end,
Packet = #mqtt_packet{
fixed = #mqtt_packet_fixed{
@ -983,7 +984,7 @@ send_retained_message(TopicFilter0, SubscribeQos,
packet_id = PacketId,
%% Wildcards are currently not supported when fetching retained
%% messages. Therefore, TopicFilter must must be a topic name.
topic_name = TopicFilter,
topic_name = Topic,
props = Props
},
payload = Payload},

View File

@ -125,6 +125,8 @@ cluster_size_1_tests() ->
topic_alias_invalid,
topic_alias_unknown,
topic_alias_disallowed,
topic_alias_in_retained_message,
topic_alias_diallowed_retained_message,
extended_auth
].
@ -1958,6 +1960,26 @@ topic_alias_disallowed(Config) ->
ok = rpc(Config, persistent_term, put, [Key, DefaultMax]).
topic_alias_in_retained_message(Config) ->
topic_alias_in_retained_message_test(Config, 1, 10, #{'Topic-Alias' => 1}).
topic_alias_diallowed_retained_message(Config) ->
topic_alias_in_retained_message_test(Config, 0, 1, #{}).
topic_alias_in_retained_message_test(Config, ClientAliasMax, MsgAlias, ExpectedProps) ->
Topic = ClientId = atom_to_binary(?FUNCTION_NAME),
C = connect(ClientId, Config, [{properties, #{'Topic-Alias-Maximum' => ClientAliasMax}}]),
{ok, _} = emqtt:publish(C, Topic, #{'Topic-Alias' => MsgAlias}, <<"m1">>, [{retain, true}, {qos, 1}]),
{ok, _, [1]} = emqtt:subscribe(C, Topic, [{qos, 1}]),
receive {publish, #{payload := <<"m1">>,
topic := Topic,
retain := true,
properties := ExpectedProps}} -> ok
after 500 -> ct:fail("Did not receive m1")
end,
ok = emqtt:disconnect(C).
extended_auth(Config) ->
{C, Connect} = start_client(?FUNCTION_NAME, Config, 0,
[{properties, #{'Authentication-Method' => <<"OTP">>,
@ -1999,3 +2021,4 @@ assert_nothing_received(Timeout) ->
receive Unexpected -> ct:fail("Received unexpected message: ~p", [Unexpected])
after Timeout -> ok
end.