Support MQTT 5.0 Properties
The following PUBLISH and Will properties are forwarded unaltered by the server: * Payload Format Indicator * Content Type * Response Topic * Correlation Data * User Property Not only must these properties be forwarded unaltered from an MQTT publishing client to an MQTT receiving client, but it would also be nice to allow for protocol interoperability: Think about RPC request-response style patterns where the requester is an MQTT client and the responder is an AMQP 0.9.1 or STOMP client. We reuse the P_basic fields where possible: * content_type (if <= 255 bytes) * correlation_id (if <= 255 bytes) Otherwise, we add custom AMQP 0.9.1 headers. The headers follow the naming pattern "x-mqtt-<property>" where <property> is the MQTT v5 property if that property makes only (mainly) sense in the MQTT world: * x-mqtt-user-property * x-mqtt-payload-format-indicator If the MQTT v5 property makes also sense outside of the MQTT world, we name it more generic: * x-correlation (if > 255 bytes) * x-reply-to-topic (since P_basic.reply_to assumes a queue name) In the future, we can think about adding a header x-reply-to-exchange and have the MQTT plugin set its value to the configured mqtt.exchange such that clients don't have to assume the default topic exchange amq.topic.
This commit is contained in:
parent
fb7af48df6
commit
8c0b0e9338
|
|
@ -37,6 +37,7 @@
|
|||
-define(MAX_PERMISSION_CACHE_SIZE, 12).
|
||||
-define(CONSUMER_TAG, <<"mqtt">>).
|
||||
-define(QUEUE_TTL_KEY, <<"x-expires">>).
|
||||
-define(AMQP_091_SHORT_STR_MAX_SIZE, 255).
|
||||
|
||||
-type send_fun() :: fun((iodata()) -> ok).
|
||||
-type session_expiry_interval() :: non_neg_integer() | infinity.
|
||||
|
|
@ -87,6 +88,9 @@
|
|||
%% (Not to be confused with packet IDs sent from client to server which can be the
|
||||
%% same IDs because client and server assign IDs independently of each other.)
|
||||
packet_id = 1 :: packet_id(),
|
||||
%% "A Session cannot have more than one Non‑shared Subscription with the same Topic Filter,
|
||||
%% so the Topic Filter can be used as a key to identify the subscription within that Session."
|
||||
%% [v5 4.8.1]
|
||||
subscriptions = #{} :: subscriptions(),
|
||||
auth_state = #auth_state{},
|
||||
ra_register_state :: option(registered | {pending, reference()}),
|
||||
|
|
@ -1475,12 +1479,11 @@ publish_to_queues(
|
|||
_ ->
|
||||
{undefined, undefined}
|
||||
end,
|
||||
PBasic = #'P_basic'{
|
||||
headers = [{<<"x-mqtt-publish-qos">>, byte, Qos},
|
||||
{<<"x-mqtt-retain">>, bool, Retain}],
|
||||
delivery_mode = delivery_mode(Qos),
|
||||
expiration = Expiration,
|
||||
timestamp = Timestamp},
|
||||
PBasic0 = mqtt_props_to_amqp_props(Props, Qos, Retain),
|
||||
PBasic = PBasic0#'P_basic'{
|
||||
delivery_mode = delivery_mode(Qos),
|
||||
expiration = Expiration,
|
||||
timestamp = Timestamp},
|
||||
{ClassId, _MethodId} = rabbit_framing_amqp_0_9_1:method_id('basic.publish'),
|
||||
Content0 = #content{
|
||||
class_id = ClassId,
|
||||
|
|
@ -1725,15 +1728,15 @@ maybe_send_will(
|
|||
_ ->
|
||||
{[], undefined}
|
||||
end,
|
||||
PBasic = #'P_basic'{
|
||||
headers = Headers ++ [{<<"x-mqtt-publish-qos">>, byte, Qos},
|
||||
{<<"x-mqtt-retain">>, bool, Retain}],
|
||||
%% Persist message regardless of Will QoS since there is no noticable
|
||||
%% performance benefit if that single message is transient. This ensures that
|
||||
%% delayed Will Messages are not lost after a broker restart.
|
||||
delivery_mode = 2,
|
||||
expiration = integer_to_binary(MsgTTL),
|
||||
timestamp = Timestamp},
|
||||
PBasic0 = mqtt_props_to_amqp_props(Props, Qos, Retain),
|
||||
PBasic = PBasic0#'P_basic'{
|
||||
%% Persist message regardless of Will QoS since there is no noticable
|
||||
%% performance benefit if that single message is transient. This ensures that
|
||||
%% delayed Will Messages are not lost after a broker restart.
|
||||
headers = Headers ++ PBasic0#'P_basic'.headers,
|
||||
delivery_mode = 2,
|
||||
expiration = integer_to_binary(MsgTTL),
|
||||
timestamp = Timestamp},
|
||||
case check_publish_permitted(DefaultX, Topic, State) of
|
||||
ok ->
|
||||
ok = rabbit_basic:publish(DefaultX, QNameBin, PBasic, Payload),
|
||||
|
|
@ -2016,9 +2019,9 @@ maybe_publish_to_client(
|
|||
routing_keys = [RoutingKey | _CcRoutes],
|
||||
content = #content{payload_fragments_rev = FragmentsRev,
|
||||
properties = PBasic = #'P_basic'{headers = Headers}}}},
|
||||
QoS, State0) ->
|
||||
QoS, State0 = #state{cfg = #cfg{proto_ver = ProtoVer}}) ->
|
||||
MatchedTopicFilters = matched_topic_filters_v5(Headers, State0),
|
||||
Props0 = p_basic_to_publish_properties(PBasic),
|
||||
Props0 = amqp_props_to_mqtt_props(PBasic, ProtoVer),
|
||||
Props = maybe_add_subscription_ids(MatchedTopicFilters, Props0, State0),
|
||||
{PacketId, State} = msg_id_to_packet_id(QMsgId, QoS, State0),
|
||||
Packet =
|
||||
|
|
@ -2043,38 +2046,162 @@ maybe_publish_to_client(
|
|||
end,
|
||||
{SettleOp, State}.
|
||||
|
||||
%% Converts AMQP 0.9.1 properties to MQTT v5 properties.
|
||||
%% TODO map more properties such as content_encoding, content_type, correlation_id, headers, etc.
|
||||
-spec p_basic_to_publish_properties(#'P_basic'{}) -> properties().
|
||||
p_basic_to_publish_properties(#'P_basic'{headers = Headers,
|
||||
expiration = Expiration,
|
||||
timestamp = TimestampSeconds
|
||||
})
|
||||
when is_binary(Expiration) andalso
|
||||
is_integer(TimestampSeconds) ->
|
||||
%% Check whether source protocol is MQTT
|
||||
case lists:keymember(<<"x-mqtt-publish-qos">>, 1, Headers) of
|
||||
true ->
|
||||
ExpirationMs = binary_to_integer(Expiration),
|
||||
ExpirationSeconds = ExpirationMs div 1000,
|
||||
%% "The PUBLISH packet sent to a Client by the Server MUST contain a Message
|
||||
%% Expiry Interval set to the received value minus the time that the
|
||||
%% Application Message has been waiting in the Server" [MQTT-3.3.2-6]
|
||||
WaitingSeconds0 = os:system_time(second) - TimestampSeconds,
|
||||
%% For a delayed Will Message, the waiting time starts when the Will Message was published.
|
||||
WaitingSeconds = case rabbit_basic:header(<<"x-mqtt-will-delay-interval">>, Headers) of
|
||||
{<<"x-mqtt-will-delay-interval">>, long, Delay} ->
|
||||
WaitingSeconds0 - Delay;
|
||||
_ ->
|
||||
WaitingSeconds0
|
||||
end,
|
||||
Expiry = max(0, ExpirationSeconds - WaitingSeconds),
|
||||
#{'Message-Expiry-Interval' => Expiry};
|
||||
false ->
|
||||
#{}
|
||||
end;
|
||||
p_basic_to_publish_properties(#'P_basic'{}) ->
|
||||
#{}.
|
||||
%% Convert MQTT v5 PUBLISH or Will properties to AMQP 0.9.1 properties.
|
||||
-spec mqtt_props_to_amqp_props(properties(), qos(), boolean()) ->
|
||||
rabbit_framing:amqp_property_record().
|
||||
mqtt_props_to_amqp_props(Props, Qos, Retain) ->
|
||||
P0 = #'P_basic'{headers = [{<<"x-mqtt-publish-qos">>, byte, Qos},
|
||||
{<<"x-mqtt-retain">>, bool, Retain}]},
|
||||
P1 = case Props of
|
||||
#{'Content-Type' := T}
|
||||
when byte_size(T) =< ?AMQP_091_SHORT_STR_MAX_SIZE ->
|
||||
P0#'P_basic'{content_type = T};
|
||||
_ ->
|
||||
%% TODO if Content-Type is > 255 bytes (which seems unlikely), should we:
|
||||
%% 1. silently ignore (as done right now), or
|
||||
%% 2. close the network connection (i.e. prohibit), or
|
||||
%% 3. add a custom AMQP 0.9.1 header?
|
||||
P0
|
||||
end,
|
||||
P2 = case Props of
|
||||
#{'Payload-Format-Indicator' := 1} ->
|
||||
%% UTF-8 is not a MIME content encoding and therefore cannot be set as #'P_basic'.content_encoding.
|
||||
%% Rather, it would match to #'P_basic'.content_type = <<"text/plain;charset=UTF-8">>.
|
||||
%% However, we cannot set #'P_basic'.content_type because we don't know the subtype (wehther it's
|
||||
%% 'plain') and that field is already set by MQTT 5.0 property Content-Type.
|
||||
%% Therefore, we add a custom header.
|
||||
P1#'P_basic'{headers = [{<<"x-mqtt-payload-format-indicator">>, bool, true} |
|
||||
P1#'P_basic'.headers]};
|
||||
_ ->
|
||||
P1
|
||||
end,
|
||||
P3 = case Props of
|
||||
#{'Response-Topic' := Topic} ->
|
||||
%% Unfortunately, we cannot set #'P_basic'.reply_to because they are expected to hold
|
||||
%% the binary queue name in AMQP 0.9.1: "One of the standard message properties is
|
||||
%% Reply-To, which is designed specifically for carrying the name of reply queues."
|
||||
%% Therefore, we add a custom header.
|
||||
P2#'P_basic'{headers = [{<<"x-reply-to-topic">>, longstr,
|
||||
%% Convert such that an AMQP consumer can respond.
|
||||
mqtt_to_amqp(Topic)} |
|
||||
P2#'P_basic'.headers]};
|
||||
_ ->
|
||||
P2
|
||||
end,
|
||||
P4 = case Props of
|
||||
#{'Correlation-Data' := Corr}
|
||||
when byte_size(Corr) =< ?AMQP_091_SHORT_STR_MAX_SIZE ->
|
||||
P3#'P_basic'{correlation_id = Corr};
|
||||
#{'Correlation-Data' := Corr}
|
||||
when byte_size(Corr) > ?AMQP_091_SHORT_STR_MAX_SIZE ->
|
||||
P3#'P_basic'{headers = [{<<"x-correlation">>, longstr, Corr} | P3#'P_basic'.headers]};
|
||||
_ ->
|
||||
P3
|
||||
end,
|
||||
P = case Props of
|
||||
#{'User-Property' := PropList} ->
|
||||
%% "The same name is allowed to appear more than once."
|
||||
%% "The Server MUST maintain the order of User Properties
|
||||
%% when forwarding the Application Message" [v5 3.3.2.3.7]
|
||||
%% However, in AMQP 0.9.1 Field Tables: "Duplicate fields are illegal."
|
||||
%% To allow duplicate names and to maintain order, we create a 2 element map:
|
||||
%% The 1st element contains all names in order.
|
||||
%% The 2nd element contains all values in order.
|
||||
{Names, Values} = lists:unzip(PropList),
|
||||
Header = {<<"x-mqtt-user-property">>,
|
||||
table,
|
||||
rabbit_misc:to_amqp_table(#{<<"names">> => Names,
|
||||
<<"values">> => Values})},
|
||||
P4#'P_basic'{headers = [Header | P4#'P_basic'.headers]};
|
||||
_ ->
|
||||
P4
|
||||
end,
|
||||
P.
|
||||
|
||||
%% Convert AMQP 0.9.1 properties to MQTT v5 PUBLISH properties.
|
||||
-spec amqp_props_to_mqtt_props(rabbit_framing:amqp_property_record(), protocol_version_atom()) ->
|
||||
properties().
|
||||
%% Do not unnecessarily convert properties.
|
||||
amqp_props_to_mqtt_props(_, ?MQTT_PROTO_V3) ->
|
||||
#{};
|
||||
amqp_props_to_mqtt_props(_, ?MQTT_PROTO_V4) ->
|
||||
#{};
|
||||
amqp_props_to_mqtt_props(
|
||||
#'P_basic'{headers = Headers,
|
||||
expiration = Expiration,
|
||||
timestamp = TimestampSeconds,
|
||||
content_type = ContentType,
|
||||
correlation_id = CorrelationId
|
||||
}, ?MQTT_PROTO_V5) ->
|
||||
SourceProtocolIsMqtt = lists:keymember(<<"x-mqtt-publish-qos">>, 1, Headers),
|
||||
P0 = if is_binary(Expiration) andalso
|
||||
is_integer(TimestampSeconds) andalso
|
||||
%% Only if source protocol is MQTT we know that timestamp was set by the server
|
||||
SourceProtocolIsMqtt ->
|
||||
ExpirationMs = binary_to_integer(Expiration),
|
||||
ExpirationSeconds = ExpirationMs div 1000,
|
||||
%% "The PUBLISH packet sent to a Client by the Server MUST contain a Message
|
||||
%% Expiry Interval set to the received value minus the time that the
|
||||
%% Application Message has been waiting in the Server" [MQTT-3.3.2-6]
|
||||
WaitingSeconds0 = os:system_time(second) - TimestampSeconds,
|
||||
%% For a delayed Will Message, the waiting time starts when the Will Message was published.
|
||||
WaitingSeconds = case rabbit_basic:header(<<"x-mqtt-will-delay-interval">>, Headers) of
|
||||
{<<"x-mqtt-will-delay-interval">>, long, Delay} ->
|
||||
WaitingSeconds0 - Delay;
|
||||
_ ->
|
||||
WaitingSeconds0
|
||||
end,
|
||||
Expiry = max(0, ExpirationSeconds - WaitingSeconds),
|
||||
#{'Message-Expiry-Interval' => Expiry};
|
||||
true ->
|
||||
#{}
|
||||
end,
|
||||
P1 = case ContentType of
|
||||
T when is_binary(T) ->
|
||||
P0#{'Content-Type' => T};
|
||||
_ ->
|
||||
P0
|
||||
end,
|
||||
P2 = case rabbit_basic:header(<<"x-mqtt-payload-format-indicator">>, Headers) of
|
||||
{<<"x-mqtt-payload-format-indicator">>, bool, true} ->
|
||||
P1#{'Payload-Format-Indicator' => 1};
|
||||
_ ->
|
||||
P1
|
||||
end,
|
||||
P3 = case rabbit_basic:header(<<"x-reply-to-topic">>, Headers) of
|
||||
{<<"x-reply-to-topic">>, longstr, Topic}
|
||||
when is_binary(Topic) ->
|
||||
P2#{'Response-Topic' => amqp_to_mqtt(Topic)};
|
||||
_ ->
|
||||
P2
|
||||
end,
|
||||
P4 = case CorrelationId of
|
||||
C when is_binary(C) ->
|
||||
P3#{'Correlation-Data' => C};
|
||||
C when is_list(C) ->
|
||||
P3#{'Correlation-Data' => list_to_binary(C)};
|
||||
_ ->
|
||||
case rabbit_basic:header(<<"x-correlation">>, Headers) of
|
||||
{<<"x-correlation">>, longstr, C}
|
||||
when is_binary(C) ->
|
||||
P3#{'Correlation-Data' => C};
|
||||
_ ->
|
||||
P3
|
||||
end
|
||||
end,
|
||||
P = case rabbit_basic:header(<<"x-mqtt-user-property">>, Headers) of
|
||||
{<<"x-mqtt-user-property">>, table, Table} ->
|
||||
case rabbit_misc:amqp_table(Table) of
|
||||
#{<<"names">> := Names,
|
||||
<<"values">> := Values} ->
|
||||
P4#{'User-Property' => lists:zip(Names, Values)};
|
||||
_ ->
|
||||
P4
|
||||
end;
|
||||
_ ->
|
||||
P4
|
||||
end,
|
||||
P.
|
||||
|
||||
matched_topic_filters_v5(Headers, #state{cfg = #cfg{proto_ver = ?MQTT_PROTO_V5}}) ->
|
||||
case rabbit_mqtt_util:table_lookup(Headers, <<"x-binding-keys">>) of
|
||||
|
|
|
|||
|
|
@ -222,6 +222,8 @@ end_per_testcase(Testcase, Config) ->
|
|||
|
||||
end_per_testcase0(Testcase, Config) ->
|
||||
rabbit_ct_client_helpers:close_channels_and_connection(Config, 0),
|
||||
%% Assert that every testcase cleaned up their MQTT sessions.
|
||||
eventually(?_assertEqual([], rpc(Config, rabbit_amqqueue, list, []))),
|
||||
rabbit_ct_helpers:testcase_finished(Config, Testcase).
|
||||
|
||||
%% -------------------------------------------------------------------
|
||||
|
|
|
|||
|
|
@ -96,6 +96,11 @@ cluster_size_1_tests() ->
|
|||
session_upgrade_v3_v5_unsubscribe,
|
||||
session_upgrade_v4_v5_no_queue_bind_permission,
|
||||
amqp091_cc_header,
|
||||
publish_property_content_type,
|
||||
publish_property_payload_format_indicator,
|
||||
publish_property_response_topic_correlation_data,
|
||||
publish_property_user_property,
|
||||
publish_property_mqtt_to_amqp091,
|
||||
disconnect_with_will,
|
||||
will_qos2,
|
||||
will_delay_greater_than_session_expiry,
|
||||
|
|
@ -106,7 +111,10 @@ cluster_size_1_tests() ->
|
|||
will_delay_reconnect_with_will,
|
||||
will_delay_session_takeover,
|
||||
will_delay_message_expiry,
|
||||
will_delay_message_expiry_publish_properties
|
||||
will_delay_message_expiry_publish_properties,
|
||||
will_delay_properties,
|
||||
will_properties,
|
||||
retain_properties
|
||||
].
|
||||
|
||||
cluster_size_3_tests() ->
|
||||
|
|
@ -1155,6 +1163,153 @@ amqp091_cc_header(Config) ->
|
|||
assert_nothing_received(),
|
||||
ok = emqtt:disconnect(C).
|
||||
|
||||
publish_property_content_type(Config) ->
|
||||
Topic = ClientId = Payload = atom_to_binary(?FUNCTION_NAME),
|
||||
C = connect(ClientId, Config),
|
||||
{ok, _, [1]} = emqtt:subscribe(C, Topic, qos1),
|
||||
%% "The Content Type MUST be a UTF-8 Encoded String" [v5 3.3.2.3.9]
|
||||
{ok, _} = emqtt:publish(C, Topic, #{'Content-Type' => <<"text/plain😎;charset=UTF-8"/utf8>>}, Payload, [{qos, 1}]),
|
||||
receive {publish, #{payload := Payload,
|
||||
properties := #{'Content-Type' := <<"text/plain😎;charset=UTF-8"/utf8>>}}} -> ok
|
||||
after 1000 -> ct:fail("did not receive message")
|
||||
end,
|
||||
ok = emqtt:disconnect(C).
|
||||
|
||||
publish_property_payload_format_indicator(Config) ->
|
||||
Topic = ClientId = atom_to_binary(?FUNCTION_NAME),
|
||||
C = connect(ClientId, Config),
|
||||
{ok, _, [1]} = emqtt:subscribe(C, Topic, qos1),
|
||||
{ok, _} = emqtt:publish(C, Topic, #{'Payload-Format-Indicator' => 0}, <<"m1">>, [{qos, 1}]),
|
||||
{ok, _} = emqtt:publish(C, Topic, #{'Payload-Format-Indicator' => 1}, <<"m2">>, [{qos, 1}]),
|
||||
receive {publish, #{payload := <<"m1">>,
|
||||
properties := Props}} ->
|
||||
?assertEqual(0, maps:size(Props))
|
||||
after 1000 -> ct:fail("did not receive m1")
|
||||
end,
|
||||
receive {publish, #{payload := <<"m2">>,
|
||||
properties := #{'Payload-Format-Indicator' := 1}}} -> ok
|
||||
after 1000 -> ct:fail("did not receive m2")
|
||||
end,
|
||||
ok = emqtt:disconnect(C).
|
||||
|
||||
publish_property_response_topic_correlation_data(Config) ->
|
||||
%% "The Response Topic MUST be a UTF-8 Encoded String" [v5 3.3.2.3.5]
|
||||
Requester = connect(<<"requester">>, Config),
|
||||
FrenchResponder = connect(<<"French responder">>, Config),
|
||||
ItalianResponder = connect(<<"Italian responder">>, Config),
|
||||
ResponseTopic = <<"🗣️/response/for/English/request"/utf8>>,
|
||||
{ok, _, [0]} = emqtt:subscribe(Requester, ResponseTopic),
|
||||
{ok, _, [0]} = emqtt:subscribe(FrenchResponder, <<"greet/French">>),
|
||||
{ok, _, [0]} = emqtt:subscribe(ItalianResponder, <<"greet/Italian">>),
|
||||
CorrelationFrench = <<"French">>,
|
||||
%% "the length of Binary Data is limited to the range of 0 to 65,535 Bytes" [v5 1.5.6]
|
||||
%% Let's also test with large correlation data.
|
||||
CorrelationItalian = <<"Italian", (binary:copy(<<"x">>, 65_500))/binary>>,
|
||||
ok = emqtt:publish(Requester, <<"greet/French">>,
|
||||
#{'Response-Topic' => ResponseTopic,
|
||||
'Correlation-Data' => CorrelationFrench},
|
||||
<<"Harry">>, [{qos, 0}]),
|
||||
ok = emqtt:publish(Requester, <<"greet/Italian">>,
|
||||
#{'Response-Topic' => ResponseTopic,
|
||||
'Correlation-Data' => CorrelationItalian},
|
||||
<<"Harry">>, [{qos, 0}]),
|
||||
receive {publish, #{client_pid := FrenchResponder,
|
||||
payload := <<"Harry">>,
|
||||
properties := #{'Response-Topic' := ResponseTopic,
|
||||
'Correlation-Data' := Corr0}}} ->
|
||||
ok = emqtt:publish(FrenchResponder, ResponseTopic,
|
||||
#{'Correlation-Data' => Corr0},
|
||||
<<"Bonjour Henri">>, [{qos, 0}])
|
||||
after 1000 -> ct:fail("French responder did not receive request")
|
||||
end,
|
||||
receive {publish, #{client_pid := ItalianResponder,
|
||||
payload := <<"Harry">>,
|
||||
properties := #{'Response-Topic' := ResponseTopic,
|
||||
'Correlation-Data' := Corr1}}} ->
|
||||
ok = emqtt:publish(ItalianResponder, ResponseTopic,
|
||||
#{'Correlation-Data' => Corr1},
|
||||
<<"Buongiorno Enrico">>, [{qos, 0}])
|
||||
after 1000 -> ct:fail("Italian responder did not receive request")
|
||||
end,
|
||||
receive {publish, #{client_pid := Requester,
|
||||
properties := #{'Correlation-Data' := CorrelationItalian},
|
||||
payload := Payload0
|
||||
}} ->
|
||||
?assertEqual(<<"Buongiorno Enrico">>, Payload0)
|
||||
after 1000 -> ct:fail("did not receive Italian response")
|
||||
end,
|
||||
receive {publish, #{client_pid := Requester,
|
||||
properties := #{'Correlation-Data' := CorrelationFrench},
|
||||
payload := Payload1
|
||||
}} ->
|
||||
?assertEqual(<<"Bonjour Henri">>, Payload1)
|
||||
after 1000 -> ct:fail("did not receive French response")
|
||||
end,
|
||||
[ok = emqtt:disconnect(C) || C <- [Requester, FrenchResponder, ItalianResponder]].
|
||||
|
||||
publish_property_user_property(Config) ->
|
||||
Payload = Topic = ClientId = atom_to_binary(?FUNCTION_NAME),
|
||||
C = connect(ClientId, Config),
|
||||
{ok, _, [1]} = emqtt:subscribe(C, Topic, qos1),
|
||||
%% Same keys and values are allowed. Order must be maintained.
|
||||
UserProperty = [{<<"k1">>, <<"v2">>},
|
||||
{<<"k1">>, <<"v2">>},
|
||||
{<<"k1">>, <<"v1">>},
|
||||
{<<"k0">>, <<"v0">>},
|
||||
%% "UTF-8 encoded strings can have any length in the range 0 to 65,535 bytes"
|
||||
%% [v5 1.5.4]
|
||||
{<<>>, <<>>},
|
||||
{<<(binary:copy(<<"k">>, 65_000))/binary, "🐇"/utf8>>,
|
||||
<<(binary:copy(<<"v">>, 65_000))/binary, "🐇"/utf8>>}],
|
||||
{ok, _} = emqtt:publish(C, Topic, #{'User-Property' => UserProperty}, Payload, [{qos, 1}]),
|
||||
receive {publish, #{payload := Payload,
|
||||
properties := #{'User-Property' := UserProperty}}} -> ok
|
||||
after 1000 -> ct:fail("did not receive message")
|
||||
end,
|
||||
ok = emqtt:disconnect(C).
|
||||
|
||||
%% Test Properties interoperability between MQTT and AMQP 0.9.1
|
||||
publish_property_mqtt_to_amqp091(Config) ->
|
||||
Q = ClientId = atom_to_binary(?FUNCTION_NAME),
|
||||
Ch = rabbit_ct_client_helpers:open_channel(Config),
|
||||
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = Q}),
|
||||
#'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = Q,
|
||||
exchange = <<"amq.topic">>,
|
||||
routing_key = <<"my.topic">>}),
|
||||
C = connect(ClientId, Config),
|
||||
MqttResponseTopic = <<"response/topic">>,
|
||||
{ok, _, [1]} = emqtt:subscribe(C, MqttResponseTopic, qos1),
|
||||
Correlation = <<"some correlation ID">>,
|
||||
RequestPayload = <<"my request">>,
|
||||
{ok, _} = emqtt:publish(C, <<"my/topic">>,
|
||||
#{'Content-Type' => <<"text/plain">>,
|
||||
'Correlation-Data' => Correlation,
|
||||
'Response-Topic' => MqttResponseTopic},
|
||||
RequestPayload, [{qos, 1}]),
|
||||
{#'basic.get_ok'{},
|
||||
#amqp_msg{payload = RequestPayload,
|
||||
props = #'P_basic'{content_type = <<"text/plain">>,
|
||||
correlation_id = Correlation,
|
||||
delivery_mode = 2,
|
||||
headers = Headers}}} = amqp_channel:call(Ch, #'basic.get'{queue = Q}),
|
||||
{<<"x-reply-to-topic">>, longstr, AmqpResponseTopic} = rabbit_basic:header(<<"x-reply-to-topic">>, Headers),
|
||||
ReplyPayload = <<"{\"my\" : \"reply\"}">>,
|
||||
amqp_channel:call(Ch, #'basic.publish'{exchange = <<"amq.topic">>,
|
||||
routing_key = AmqpResponseTopic},
|
||||
#amqp_msg{payload = ReplyPayload,
|
||||
props = #'P_basic'{correlation_id = Correlation,
|
||||
content_type = <<"application/json">>}}),
|
||||
receive {publish,
|
||||
#{client_pid := C,
|
||||
topic := MqttResponseTopic,
|
||||
payload := ReplyPayload,
|
||||
properties := #{'Content-Type' := <<"application/json">>,
|
||||
'Correlation-Data' := Correlation}}} -> ok
|
||||
after 500 -> ct:fail("did not receive reply")
|
||||
end,
|
||||
#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = Q}),
|
||||
ok = emqtt:disconnect(C).
|
||||
|
||||
disconnect_with_will(Config) ->
|
||||
Topic = Payload = ClientId = atom_to_binary(?FUNCTION_NAME),
|
||||
Sub = connect(<<"subscriber">>, Config),
|
||||
|
|
@ -1406,6 +1561,106 @@ will_delay_message_expiry_publish_properties(Config) ->
|
|||
end,
|
||||
ok = emqtt:disconnect(Sub2).
|
||||
|
||||
%% Test all Will Properties (v5 3.1.3.2) that are forwarded unaltered by the server.
|
||||
will_properties(Config) ->
|
||||
will_properties0(Config, 0).
|
||||
|
||||
%% Test all Will Properties (v5 3.1.3.2) that are forwarded unaltered by the server
|
||||
%% when Will Delay Interval is set.
|
||||
will_delay_properties(Config) ->
|
||||
will_properties0(Config, 1).
|
||||
|
||||
will_properties0(Config, WillDelayInterval) ->
|
||||
Topic = Payload = atom_to_binary(?FUNCTION_NAME),
|
||||
Sub = connect(<<"sub">>, Config),
|
||||
{ok, _, [0]} = emqtt:subscribe(Sub, Topic),
|
||||
UserProperty = [{<<"k1">>, <<"v2">>},
|
||||
{<<>>, <<>>},
|
||||
{<<"k1">>, <<"v1">>}],
|
||||
CorrelationData = binary:copy(<<"x">>, 65_000),
|
||||
C = connect(<<"will">>, Config,
|
||||
[{properties, #{'Session-Expiry-Interval' => 1}},
|
||||
{will_props, #{'Will-Delay-Interval' => WillDelayInterval,
|
||||
'User-Property' => UserProperty,
|
||||
'Content-Type' => <<"text/plain😎;charset=UTF-8"/utf8>>,
|
||||
'Payload-Format-Indicator' => 1,
|
||||
'Response-Topic' => <<"response/topic">>,
|
||||
'Correlation-Data' => CorrelationData}},
|
||||
{will_topic, Topic},
|
||||
{will_qos, 0},
|
||||
{will_payload, Payload}]),
|
||||
ok = emqtt:disconnect(C, ?RC_DISCONNECT_WITH_WILL),
|
||||
if WillDelayInterval > 0 ->
|
||||
receive Unexpected -> ct:fail(Unexpected)
|
||||
after 700 -> ok
|
||||
end;
|
||||
WillDelayInterval =:= 0 ->
|
||||
ok
|
||||
end,
|
||||
receive {publish,
|
||||
#{client_pid := Sub,
|
||||
topic := Topic,
|
||||
payload := Payload,
|
||||
properties := #{'User-Property' := UserProperty,
|
||||
'Content-Type' := <<"text/plain😎;charset=UTF-8"/utf8>>,
|
||||
'Payload-Format-Indicator' := 1,
|
||||
'Response-Topic' := <<"response/topic">>,
|
||||
'Correlation-Data' := CorrelationData} = Props}}
|
||||
when map_size(Props) =:= 5 -> ok
|
||||
after 1500 -> ct:fail("did not receive Will Message")
|
||||
end,
|
||||
ok = emqtt:disconnect(Sub).
|
||||
|
||||
%% "When an Application Message is transported by MQTT it contains payload data,
|
||||
%% a Quality of Service (QoS), a collection of Properties, and a Topic Name" [v5 1.2]
|
||||
%% Since a retained message is an Application Message, it must also include the Properties.
|
||||
%% This test checks that the whole Application Message, especially Properties, are forwarded
|
||||
%% to future subscribers.
|
||||
retain_properties(Config) ->
|
||||
Props = #{'Content-Type' => <<"text/plain;charset=UTF-8">>,
|
||||
'User-Property' => [{<<"k1">>, <<"v2">>},
|
||||
{<<>>, <<>>},
|
||||
{<<"k1">>, <<"v1">>}],
|
||||
'Payload-Format-Indicator' => 1,
|
||||
'Response-Topic' => <<"response/topic">>,
|
||||
'Correlation-Data' => <<"some correlation data">>},
|
||||
%% Let's test both ways to retain messages:
|
||||
%% 1. a Will Message that is retained, and
|
||||
%% 2. a PUBLISH message that is retained
|
||||
Pub = connect(<<"publisher">>, Config,
|
||||
[{will_retain, true},
|
||||
{will_topic, <<"t/1">>},
|
||||
{will_payload, <<"m1">>},
|
||||
{will_qos, 1},
|
||||
{will_props, Props}]),
|
||||
{ok, _} = emqtt:publish(Pub, <<"t/2">>, Props, <<"m2">>, [{retain, true}, {qos, 1}]),
|
||||
ok = emqtt:disconnect(Pub, ?RC_DISCONNECT_WITH_WILL),
|
||||
%% Both messages are now retained.
|
||||
Sub = connect(<<"subscriber">>, Config),
|
||||
{ok, _, [1, 1]} = emqtt:subscribe(Sub, [{<<"t/1">>, qos1},
|
||||
{<<"t/2">>, qos1}]),
|
||||
receive {publish,
|
||||
#{client_pid := Sub,
|
||||
topic := <<"t/1">>,
|
||||
payload := <<"m1">>,
|
||||
retain := true,
|
||||
qos := 1,
|
||||
properties := Props}} -> ok
|
||||
after 500 -> ct:fail("did not receive m1")
|
||||
end,
|
||||
receive {publish,
|
||||
#{client_pid := Sub,
|
||||
topic := <<"t/2">>,
|
||||
payload := <<"m2">>,
|
||||
retain := true,
|
||||
qos := 1,
|
||||
properties := Props}} -> ok
|
||||
after 500 -> ct:fail("did not receive m2")
|
||||
end,
|
||||
ok = emqtt:publish(Sub, <<"t/1">>, <<>>, [{retain, true}]),
|
||||
ok = emqtt:publish(Sub, <<"t/2">>, <<>>, [{retain, true}]),
|
||||
ok = emqtt:disconnect(Sub).
|
||||
|
||||
%% "In the case of a Server shutdown or failure, the Server MAY defer publication of Will Messages
|
||||
%% until a subsequent restart. If this happens, there might be a delay between the time the Server
|
||||
%% experienced failure and when the Will Message is published." [v5 3.1.2.5]
|
||||
|
|
|
|||
Loading…
Reference in New Issue