Support iodata() when sending message to MQTT client
When the MQTT connection receives an AMQP 0.9.1 message, it will contain
a list of payload fragments.
This commit avoids the expensive operation of turning that list into a binary.
All I/O methods accept iodata():
* erlang:port_command/2
* ssl:send/2
* In Web MQTT, cowboy websockets accept iodata():
0d04cfffa3/src/cow_ws.erl (L58)
This commit is contained in:
parent
15636fdb90
commit
61f6ca7b66
|
|
@ -53,14 +53,16 @@
|
|||
%% Packet identifier is a non zero two byte integer.
|
||||
-type packet_id() :: 1..16#ffff.
|
||||
|
||||
-record(mqtt_packet, {fixed,
|
||||
variable,
|
||||
payload}).
|
||||
|
||||
-record(mqtt_packet_fixed, {type = 0,
|
||||
dup = 0,
|
||||
qos = 0,
|
||||
retain = 0}).
|
||||
retain = 0
|
||||
}).
|
||||
|
||||
-record(mqtt_packet, {fixed :: #mqtt_packet_fixed{},
|
||||
variable :: tuple(),
|
||||
payload :: iodata()
|
||||
}).
|
||||
|
||||
-record(mqtt_packet_connect, {proto_ver,
|
||||
will_retain,
|
||||
|
|
@ -90,8 +92,6 @@
|
|||
-record(mqtt_packet_suback, {packet_id :: packet_id(),
|
||||
qos_table = []}).
|
||||
|
||||
-record(mqtt_packet_other, {other}).
|
||||
|
||||
-record(mqtt_msg, {retain :: boolean(),
|
||||
qos :: qos(),
|
||||
topic :: string(),
|
||||
|
|
|
|||
|
|
@ -24,9 +24,9 @@ parse(<<>>, none) ->
|
|||
{more, fun(Bin) -> parse(Bin, none) end};
|
||||
parse(<<MessageType:4, Dup:1, QoS:2, Retain:1, Rest/binary>>, none) ->
|
||||
parse_remaining_len(Rest, #mqtt_packet_fixed{ type = MessageType,
|
||||
dup = bool(Dup),
|
||||
qos = QoS,
|
||||
retain = bool(Retain) });
|
||||
dup = bool(Dup),
|
||||
qos = QoS,
|
||||
retain = bool(Retain) });
|
||||
parse(Bin, Cont) -> Cont(Bin).
|
||||
|
||||
parse_remaining_len(<<>>, Fixed) ->
|
||||
|
|
@ -45,7 +45,7 @@ parse_remaining_len(<<0:1, Len:7, Rest/binary>>, Fixed, Multiplier, Value) ->
|
|||
parse_packet(Rest, Fixed, Value + Len * Multiplier).
|
||||
|
||||
parse_packet(Bin, #mqtt_packet_fixed{ type = Type,
|
||||
qos = Qos } = Fixed, Length) ->
|
||||
qos = Qos } = Fixed, Length) ->
|
||||
case {Type, Bin} of
|
||||
{?CONNECT, <<PacketBin:Length/binary, Rest/binary>>} ->
|
||||
{ProtoName, Rest1} = parse_utf(PacketBin),
|
||||
|
|
@ -90,7 +90,7 @@ parse_packet(Bin, #mqtt_packet_fixed{ type = Type,
|
|||
{M, R}
|
||||
end,
|
||||
wrap(Fixed, #mqtt_packet_publish { topic_name = TopicName,
|
||||
packet_id = PacketId },
|
||||
packet_id = PacketId },
|
||||
Payload, Rest);
|
||||
{?PUBACK, <<PacketBin:Length/binary, Rest/binary>>} ->
|
||||
<<PacketId:16/big>> = PacketBin,
|
||||
|
|
@ -101,7 +101,7 @@ parse_packet(Bin, #mqtt_packet_fixed{ type = Type,
|
|||
<<PacketId:16/big, Rest1/binary>> = PacketBin,
|
||||
Topics = parse_topics(Subs, Rest1, []),
|
||||
wrap(Fixed, #mqtt_packet_subscribe { packet_id = PacketId,
|
||||
topic_table = Topics }, Rest);
|
||||
topic_table = Topics }, Rest);
|
||||
{Minimal, Rest}
|
||||
when Minimal =:= ?DISCONNECT orelse Minimal =:= ?PINGREQ ->
|
||||
Length = 0,
|
||||
|
|
@ -109,7 +109,7 @@ parse_packet(Bin, #mqtt_packet_fixed{ type = Type,
|
|||
{_, TooShortBin} ->
|
||||
{more, fun(BinMore) ->
|
||||
parse_packet(<<TooShortBin/binary, BinMore/binary>>,
|
||||
Fixed, Length)
|
||||
Fixed, Length)
|
||||
end}
|
||||
end.
|
||||
|
||||
|
|
@ -150,24 +150,29 @@ bool(1) -> true.
|
|||
|
||||
%% serialisation
|
||||
|
||||
-spec serialise(#mqtt_packet{}, ?MQTT_PROTO_V3 | ?MQTT_PROTO_V4) ->
|
||||
iodata().
|
||||
serialise(#mqtt_packet{fixed = Fixed,
|
||||
variable = Variable,
|
||||
payload = Payload}, Vsn) ->
|
||||
variable = Variable,
|
||||
payload = Payload}, Vsn) ->
|
||||
serialise_variable(Fixed, Variable, serialise_payload(Payload), Vsn).
|
||||
|
||||
serialise_payload(undefined) -> <<>>;
|
||||
serialise_payload(B) when is_binary(B) -> B.
|
||||
serialise_payload(undefined) ->
|
||||
<<>>;
|
||||
serialise_payload(P)
|
||||
when is_binary(P) orelse is_list(P) ->
|
||||
P.
|
||||
|
||||
serialise_variable(#mqtt_packet_fixed { type = ?CONNACK } = Fixed,
|
||||
#mqtt_packet_connack { session_present = SessionPresent,
|
||||
return_code = ReturnCode },
|
||||
return_code = ReturnCode },
|
||||
<<>> = PayloadBin, _Vsn) ->
|
||||
VariableBin = <<?RESERVED:7, (opt(SessionPresent)):1, ReturnCode:8>>,
|
||||
serialise_fixed(Fixed, VariableBin, PayloadBin);
|
||||
|
||||
serialise_variable(#mqtt_packet_fixed { type = SubAck } = Fixed,
|
||||
#mqtt_packet_suback { packet_id = PacketId,
|
||||
qos_table = Qos },
|
||||
qos_table = Qos },
|
||||
<<>> = _PayloadBin, Vsn)
|
||||
when SubAck =:= ?SUBACK orelse SubAck =:= ?UNSUBACK ->
|
||||
VariableBin = <<PacketId:16/big>>,
|
||||
|
|
@ -181,16 +186,16 @@ serialise_variable(#mqtt_packet_fixed { type = SubAck } = Fixed,
|
|||
serialise_fixed(Fixed, VariableBin, QosBin);
|
||||
|
||||
serialise_variable(#mqtt_packet_fixed { type = ?PUBLISH,
|
||||
qos = Qos } = Fixed,
|
||||
qos = Qos } = Fixed,
|
||||
#mqtt_packet_publish { topic_name = TopicName,
|
||||
packet_id = PacketId },
|
||||
PayloadBin, _Vsn) ->
|
||||
packet_id = PacketId },
|
||||
Payload, _Vsn) ->
|
||||
TopicBin = serialise_utf(TopicName),
|
||||
PacketIdBin = case Qos of
|
||||
0 -> <<>>;
|
||||
1 -> <<PacketId:16/big>>
|
||||
end,
|
||||
serialise_fixed(Fixed, <<TopicBin/binary, PacketIdBin/binary>>, PayloadBin);
|
||||
serialise_fixed(Fixed, <<TopicBin/binary, PacketIdBin/binary>>, Payload);
|
||||
|
||||
serialise_variable(#mqtt_packet_fixed { type = ?PUBACK } = Fixed,
|
||||
#mqtt_packet_publish { packet_id = PacketId },
|
||||
|
|
@ -204,15 +209,15 @@ serialise_variable(#mqtt_packet_fixed {} = Fixed,
|
|||
serialise_fixed(Fixed, <<>>, <<>>).
|
||||
|
||||
serialise_fixed(#mqtt_packet_fixed{ type = Type,
|
||||
dup = Dup,
|
||||
qos = Qos,
|
||||
retain = Retain }, VariableBin, PayloadBin)
|
||||
dup = Dup,
|
||||
qos = Qos,
|
||||
retain = Retain }, VariableBin, Payload)
|
||||
when is_integer(Type) andalso ?CONNECT =< Type andalso Type =< ?DISCONNECT ->
|
||||
Len = size(VariableBin) + size(PayloadBin),
|
||||
Len = size(VariableBin) + iolist_size(Payload),
|
||||
true = (Len =< ?MAX_LEN),
|
||||
LenBin = serialise_len(Len),
|
||||
<<Type:4, (opt(Dup)):1, (opt(Qos)):2, (opt(Retain)):1,
|
||||
LenBin/binary, VariableBin/binary, PayloadBin/binary>>.
|
||||
[<<Type:4, (opt(Dup)):1, (opt(Qos)):2, (opt(Retain)):1,
|
||||
LenBin/binary, VariableBin/binary>>, Payload].
|
||||
|
||||
serialise_utf(String) ->
|
||||
StringBin = unicode:characters_to_binary(String),
|
||||
|
|
|
|||
|
|
@ -1604,8 +1604,6 @@ maybe_publish_to_client(
|
|||
content = #content{payload_fragments_rev = FragmentsRev}}},
|
||||
QoS, State0 = #state{send_fun = SendFun}) ->
|
||||
{PacketId, State} = queue_packet_id_to_packet_id(QMsgId, QoS, State0),
|
||||
%%TODO support iolists when sending to client
|
||||
Payload = list_to_binary(lists:reverse(FragmentsRev)),
|
||||
Packet =
|
||||
#mqtt_packet{
|
||||
fixed = #mqtt_packet_fixed{
|
||||
|
|
@ -1621,7 +1619,7 @@ maybe_publish_to_client(
|
|||
variable = #mqtt_packet_publish{
|
||||
packet_id = PacketId,
|
||||
topic_name = amqp_to_mqtt(RoutingKey)},
|
||||
payload = Payload},
|
||||
payload = lists:reverse(FragmentsRev)},
|
||||
SendFun(Packet, State),
|
||||
message_delivered(QNameOrType, Redelivered, QoS, State),
|
||||
State.
|
||||
|
|
|
|||
|
|
@ -37,7 +37,7 @@ groups() ->
|
|||
[
|
||||
%% separate RMQ so global counters start from 0
|
||||
{global_counters, [], [global_counters_v3, global_counters_v4]},
|
||||
{common_tests, [], common_tests()}
|
||||
{common_tests, [], tests()}
|
||||
]},
|
||||
{cluster_size_3, [],
|
||||
[queue_down_qos1,
|
||||
|
|
@ -46,11 +46,11 @@ groups() ->
|
|||
flow_classic_mirrored_queue,
|
||||
flow_quorum_queue,
|
||||
flow_stream,
|
||||
rabbit_mqtt_qos0_queue] ++ common_tests()
|
||||
rabbit_mqtt_qos0_queue] ++ tests()
|
||||
}
|
||||
].
|
||||
|
||||
common_tests() ->
|
||||
tests() ->
|
||||
[delete_create_queue
|
||||
,quorum_queue_rejects
|
||||
,publish_to_all_queue_types_qos0
|
||||
|
|
@ -62,6 +62,8 @@ common_tests() ->
|
|||
,subscribe_same_topic_same_qos
|
||||
,subscribe_same_topic_different_qos
|
||||
,subscribe_multiple
|
||||
,large_message_mqtt_to_mqtt
|
||||
,large_message_amqp_to_mqtt
|
||||
].
|
||||
|
||||
suite() ->
|
||||
|
|
@ -157,7 +159,7 @@ publish_to_all_queue_types_qos1(Config) ->
|
|||
publish_to_all_queue_types(Config, qos1).
|
||||
|
||||
publish_to_all_queue_types(Config, QoS) ->
|
||||
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
|
||||
Ch = rabbit_ct_client_helpers:open_channel(Config, 0),
|
||||
|
||||
CQ = <<"classic-queue">>,
|
||||
CMQ = <<"classic-mirrored-queue">>,
|
||||
|
|
@ -214,7 +216,7 @@ publish_to_all_queue_types(Config, QoS) ->
|
|||
ok = emqtt:disconnect(C),
|
||||
?awaitMatch([],
|
||||
all_connection_pids(Config), 10_000, 1000),
|
||||
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
|
||||
ok = rabbit_ct_client_helpers:close_channel(Ch).
|
||||
|
||||
flow_classic_mirrored_queue(Config) ->
|
||||
QueueName = <<"flow">>,
|
||||
|
|
@ -234,7 +236,7 @@ flow(Config, {App, Par, Val}, QueueType)
|
|||
Result = rpc_all(Config, application, set_env, [App, Par, Val]),
|
||||
?assert(lists:all(fun(R) -> R =:= ok end, Result)),
|
||||
|
||||
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
|
||||
Ch = rabbit_ct_client_helpers:open_channel(Config, 0),
|
||||
QueueName = Topic = atom_to_binary(?FUNCTION_NAME),
|
||||
declare_queue(Ch, QueueName, [{<<"x-queue-type">>, longstr, QueueType}]),
|
||||
bind(Ch, QueueName, Topic),
|
||||
|
|
@ -261,7 +263,7 @@ flow(Config, {App, Par, Val}, QueueType)
|
|||
ok = emqtt:disconnect(C),
|
||||
?awaitMatch([],
|
||||
all_connection_pids(Config), 10_000, 1000),
|
||||
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch),
|
||||
ok = rabbit_ct_client_helpers:close_channel(Ch),
|
||||
Result = rpc_all(Config, application, set_env, [App, Par, DefaultVal]),
|
||||
ok.
|
||||
|
||||
|
|
@ -461,11 +463,11 @@ global_counters(Config, ProtoVer) ->
|
|||
get_global_counters(Config, ProtoVer)).
|
||||
|
||||
queue_down_qos1(Config) ->
|
||||
{Conn1, Ch1} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 1),
|
||||
Ch1 = rabbit_ct_client_helpers:open_channel(Config, 1),
|
||||
CQ = Topic = atom_to_binary(?FUNCTION_NAME),
|
||||
declare_queue(Ch1, CQ, []),
|
||||
bind(Ch1, CQ, Topic),
|
||||
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn1, Ch1),
|
||||
ok = rabbit_ct_client_helpers:close_channel(Ch1),
|
||||
ok = rabbit_ct_broker_helpers:stop_node(Config, 1),
|
||||
|
||||
C = connect(?FUNCTION_NAME, Config, [{retry_interval, 2}]),
|
||||
|
|
@ -479,9 +481,9 @@ queue_down_qos1(Config) ->
|
|||
rabbitmqctl_list(Config, 1, ["list_queues", "messages", "--no-table-headers"])),
|
||||
500, 20),
|
||||
|
||||
{Conn0, Ch0} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
|
||||
Ch0 = rabbit_ct_client_helpers:open_channel(Config, 0),
|
||||
delete_queue(Ch0, CQ),
|
||||
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn0, Ch0),
|
||||
ok = rabbit_ct_client_helpers:close_channel(Ch0),
|
||||
ok = emqtt:disconnect(C).
|
||||
|
||||
%% Even though classic mirrored queues are deprecated, we know that some users have set up
|
||||
|
|
@ -581,7 +583,7 @@ consuming_classic_queue_down(Config) ->
|
|||
ok.
|
||||
|
||||
delete_create_queue(Config) ->
|
||||
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
|
||||
Ch = rabbit_ct_client_helpers:open_channel(Config, 0),
|
||||
CQ1 = <<"classic-queue-1-delete-create">>,
|
||||
CQ2 = <<"classic-queue-2-delete-create">>,
|
||||
QQ = <<"quorum-queue-delete-create">>,
|
||||
|
|
@ -639,7 +641,7 @@ delete_create_queue(Config) ->
|
|||
1000, 10),
|
||||
|
||||
delete_queue(Ch, [CQ1, CQ2, QQ]),
|
||||
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch),
|
||||
ok = rabbit_ct_client_helpers:close_channel(Ch),
|
||||
ok = emqtt:disconnect(C).
|
||||
|
||||
non_clean_sess_disconnect(Config) ->
|
||||
|
|
@ -727,6 +729,35 @@ subscribe_multiple(Config) ->
|
|||
{<<"topic1">>, qos1}])),
|
||||
ok = emqtt:disconnect(C).
|
||||
|
||||
large_message_mqtt_to_mqtt(Config) ->
|
||||
Topic = ClientId = atom_to_binary(?FUNCTION_NAME),
|
||||
C = connect(ClientId, Config),
|
||||
{ok, _, [1]} = emqtt:subscribe(C, {Topic, qos1}),
|
||||
|
||||
Payload0 = binary:copy(<<"x">>, 1_000_000),
|
||||
Payload = <<Payload0/binary, "y">>,
|
||||
{ok, _} = emqtt:publish(C, Topic, Payload, qos1),
|
||||
ok = expect_publishes(Topic, [Payload]),
|
||||
|
||||
ok = emqtt:disconnect(C).
|
||||
|
||||
large_message_amqp_to_mqtt(Config) ->
|
||||
Topic = ClientId = atom_to_binary(?FUNCTION_NAME),
|
||||
C = connect(ClientId, Config),
|
||||
{ok, _, [1]} = emqtt:subscribe(C, {Topic, qos1}),
|
||||
|
||||
Ch = rabbit_ct_client_helpers:open_channel(Config, 0),
|
||||
Payload0 = binary:copy(<<"x">>, 1_000_000),
|
||||
Payload = <<Payload0/binary, "y">>,
|
||||
amqp_channel:call(Ch,
|
||||
#'basic.publish'{exchange = <<"amq.topic">>,
|
||||
routing_key = Topic},
|
||||
#amqp_msg{payload = Payload}),
|
||||
ok = expect_publishes(Topic, [Payload]),
|
||||
|
||||
ok = rabbit_ct_client_helpers:close_channel(Ch),
|
||||
ok = emqtt:disconnect(C).
|
||||
|
||||
%% This test is mostly interesting in mixed version mode where feature flag
|
||||
%% rabbit_mqtt_qos0_queue is disabled and therefore a classic queue gets created.
|
||||
rabbit_mqtt_qos0_queue(Config) ->
|
||||
|
|
|
|||
Loading…
Reference in New Issue