rabbitmq-server/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl

961 lines
43 KiB
Erlang
Raw Normal View History

2012-06-27 00:57:24 +08:00
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
URL Cleanup This commit updates URLs to prefer the https protocol. Redirects are not followed to avoid accidentally expanding intentionally shortened URLs (i.e. if using a URL shortener). # HTTP URLs that Could Not Be Fixed These URLs were unable to be fixed. Please review them to see if they can be manually resolved. * http://blog.listincomprehension.com/search/label/procket (200) with 1 occurrences could not be migrated: ([https](https://blog.listincomprehension.com/search/label/procket) result ClosedChannelException). * http://dozzie.jarowit.net/trac/wiki/TOML (200) with 1 occurrences could not be migrated: ([https](https://dozzie.jarowit.net/trac/wiki/TOML) result SSLHandshakeException). * http://dozzie.jarowit.net/trac/wiki/subproc (200) with 1 occurrences could not be migrated: ([https](https://dozzie.jarowit.net/trac/wiki/subproc) result SSLHandshakeException). * http://e2project.org (200) with 1 occurrences could not be migrated: ([https](https://e2project.org) result AnnotatedConnectException). * http://nitrogenproject.com/ (200) with 2 occurrences could not be migrated: ([https](https://nitrogenproject.com/) result ConnectTimeoutException). * http://proper.softlab.ntua.gr (200) with 1 occurrences could not be migrated: ([https](https://proper.softlab.ntua.gr) result SSLHandshakeException). * http://yaws.hyber.org (200) with 1 occurrences could not be migrated: ([https](https://yaws.hyber.org) result AnnotatedConnectException). * http://choven.ca (503) with 1 occurrences could not be migrated: ([https](https://choven.ca) result ConnectTimeoutException). # Fixed URLs ## Fixed But Review Recommended These URLs were fixed, but the https status was not OK. However, the https status was the same as the http request or http redirected to an https URL, so they were migrated. Your review is recommended. * http://fixprotocol.org/ (301) with 1 occurrences migrated to: https://fixtrading.org ([https](https://fixprotocol.org/) result SSLHandshakeException). * http://erldb.org (UnknownHostException) with 1 occurrences migrated to: https://erldb.org ([https](https://erldb.org) result UnknownHostException). ## Fixed Success These URLs were switched to an https URL with a 2xx status. While the status was successful, your review is still recommended. * http://cloudi.org/ with 27 occurrences migrated to: https://cloudi.org/ ([https](https://cloudi.org/) result 200). * http://erlware.org/ with 1 occurrences migrated to: https://erlware.org/ ([https](https://erlware.org/) result 200). * http://inaka.github.io/cowboy-trails/ with 1 occurrences migrated to: https://inaka.github.io/cowboy-trails/ ([https](https://inaka.github.io/cowboy-trails/) result 200). * http://ninenines.eu with 6 occurrences migrated to: https://ninenines.eu ([https](https://ninenines.eu) result 200). * http://www.actordb.com/ with 2 occurrences migrated to: https://www.actordb.com/ ([https](https://www.actordb.com/) result 200). * http://www.cs.kent.ac.uk/projects/wrangler/Home.html with 1 occurrences migrated to: https://www.cs.kent.ac.uk/projects/wrangler/Home.html ([https](https://www.cs.kent.ac.uk/projects/wrangler/Home.html) result 200). * http://www.rabbitmq.com/mpl.html with 1 occurrences migrated to: https://www.rabbitmq.com/mpl.html ([https](https://www.rabbitmq.com/mpl.html) result 200). * http://www.rabbitmq.com/mqtt.html with 1 occurrences migrated to: https://www.rabbitmq.com/mqtt.html ([https](https://www.rabbitmq.com/mqtt.html) result 200). * http://www.rebar3.org with 1 occurrences migrated to: https://www.rebar3.org ([https](https://www.rebar3.org) result 200). * http://contributor-covenant.org with 1 occurrences migrated to: https://contributor-covenant.org ([https](https://contributor-covenant.org) result 301). * http://contributor-covenant.org/version/1/3/0/ with 1 occurrences migrated to: https://contributor-covenant.org/version/1/3/0/ ([https](https://contributor-covenant.org/version/1/3/0/) result 301). * http://inaka.github.com/apns4erl with 1 occurrences migrated to: https://inaka.github.com/apns4erl ([https](https://inaka.github.com/apns4erl) result 301). * http://inaka.github.com/edis/ with 1 occurrences migrated to: https://inaka.github.com/edis/ ([https](https://inaka.github.com/edis/) result 301). * http://lasp-lang.org/ with 1 occurrences migrated to: https://lasp-lang.org/ ([https](https://lasp-lang.org/) result 301). * http://saleyn.github.com/erlexec with 1 occurrences migrated to: https://saleyn.github.com/erlexec ([https](https://saleyn.github.com/erlexec) result 301). * http://www.mozilla.org/MPL/ with 27 occurrences migrated to: https://www.mozilla.org/MPL/ ([https](https://www.mozilla.org/MPL/) result 301). * http://www.rabbitmq.com/man/rabbitmq-plugins.1.man.html with 1 occurrences migrated to: https://www.rabbitmq.com/man/rabbitmq-plugins.1.man.html ([https](https://www.rabbitmq.com/man/rabbitmq-plugins.1.man.html) result 301). * http://zhongwencool.github.io/observer_cli with 1 occurrences migrated to: https://zhongwencool.github.io/observer_cli ([https](https://zhongwencool.github.io/observer_cli) result 301).
2019-03-20 16:18:59 +08:00
%% at https://www.mozilla.org/MPL/
2012-06-27 00:57:24 +08:00
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%% The Original Code is RabbitMQ.
%%
2013-07-01 17:49:14 +08:00
%% The Initial Developer of the Original Code is GoPivotal, Inc.
%% Copyright (c) 2007-2019 Pivotal Software, Inc. All rights reserved.
2012-06-27 00:57:24 +08:00
%%
-module(rabbit_mqtt_processor).
-export([info/2, initial_state/2, initial_state/5,
process_frame/2, amqp_pub/2, amqp_callback/2, send_will/1,
close_connection/1]).
2012-06-27 00:57:24 +08:00
%% for testing purposes
-export([get_vhost_username/1, get_vhost/3, get_vhost_from_user_mapping/2,
add_client_id_to_adapter_info/2]).
2012-06-27 00:57:24 +08:00
-include_lib("amqp_client/include/amqp_client.hrl").
2012-09-12 21:34:41 +08:00
-include("rabbit_mqtt_frame.hrl").
-include("rabbit_mqtt.hrl").
-define(APP, rabbitmq_mqtt).
-define(FRAME_TYPE(Frame, Type),
Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }}).
2016-01-08 07:25:26 +08:00
initial_state(Socket, SSLLoginName) ->
RealSocket = rabbit_net:unwrap_socket(Socket),
{ok, {PeerAddr, _PeerPort}} = rabbit_net:peername(RealSocket),
initial_state(RealSocket, SSLLoginName,
adapter_info(Socket, 'MQTT'),
fun serialise_and_send_to_client/2, PeerAddr).
initial_state(Socket, SSLLoginName,
2016-03-04 18:34:53 +08:00
AdapterInfo0 = #amqp_adapter_info{additional_info = Extra},
SendFun, PeerAddr) ->
2016-03-03 21:59:38 +08:00
%% MQTT connections use exactly one channel. The frame max is not
%% applicable and there is no way to know what client is used.
2016-03-04 18:34:53 +08:00
AdapterInfo = AdapterInfo0#amqp_adapter_info{additional_info = [
{channels, 1},
2016-03-03 21:59:38 +08:00
{channel_max, 1},
{frame_max, 0},
2016-03-04 18:34:53 +08:00
{client_properties,
[{<<"product">>, longstr, <<"MQTT client">>}]} | Extra]},
#proc_state{ unacked_pubs = gb_trees:empty(),
awaiting_ack = gb_trees:empty(),
message_id = 1,
2017-04-24 20:45:37 +08:00
subscriptions = #{},
consumer_tags = {undefined, undefined},
channels = {undefined, undefined},
exchange = rabbit_mqtt_util:env(exchange),
socket = Socket,
adapter_info = AdapterInfo,
ssl_login_name = SSLLoginName,
send_fun = SendFun,
peer_addr = PeerAddr }.
2012-08-06 06:52:54 +08:00
process_frame(#mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }},
PState = #proc_state{ connection = undefined } )
2012-08-06 06:52:54 +08:00
when Type =/= ?CONNECT ->
{error, connect_expected, PState};
process_frame(Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }},
PState) ->
2016-03-03 21:59:38 +08:00
case process_request(Type, Frame, PState) of
{ok, PState1} -> {ok, PState1, PState1#proc_state.connection};
Ret -> Ret
end.
add_client_id_to_adapter_info(ClientId, #amqp_adapter_info{additional_info = AdditionalInfo0} = AdapterInfo) ->
AdditionalInfo1 = [{variable_map, #{<<"client_id">> => ClientId}}
| AdditionalInfo0],
ClientProperties = proplists:get_value(client_properties, AdditionalInfo1, [])
++ [{client_id, longstr, ClientId}],
AdditionalInfo2 = case lists:keysearch(client_properties, 1, AdditionalInfo1) of
{value, _} ->
lists:keyreplace(client_properties,
1,
AdditionalInfo1,
{client_properties, ClientProperties});
false ->
[{client_properties, ClientProperties} | AdditionalInfo1]
end,
AdapterInfo#amqp_adapter_info{additional_info = AdditionalInfo2}.
process_request(?CONNECT,
#mqtt_frame{ variable = #mqtt_frame_connect{
2016-01-08 07:25:26 +08:00
username = Username,
password = Password,
proto_ver = ProtoVersion,
clean_sess = CleanSess,
client_id = ClientId0,
keep_alive = Keepalive} = Var},
PState0 = #proc_state{ ssl_login_name = SSLLoginName,
send_fun = SendFun,
adapter_info = AdapterInfo}) ->
2014-03-19 20:05:42 +08:00
ClientId = case ClientId0 of
[] -> rabbit_mqtt_util:gen_client_id();
[_|_] -> ClientId0
end,
rabbit_log_connection:debug("Received a CONNECT, client ID: ~p (expanded to ~p), username: ~p, "
"clean session: ~p, protocol version: ~p, keepalive: ~p",
[ClientId0, ClientId, Username, CleanSess, ProtoVersion, Keepalive]),
AdapterInfo1 = add_client_id_to_adapter_info(rabbit_data_coercion:to_binary(ClientId), AdapterInfo),
PState1 = PState0#proc_state{adapter_info = AdapterInfo1},
{Return, PState5} =
2014-03-19 20:05:42 +08:00
case {lists:member(ProtoVersion, proplists:get_keys(?PROTOCOL_NAMES)),
ClientId0 =:= [] andalso CleanSess =:= false} of
2012-07-04 00:35:18 +08:00
{false, _} ->
{?CONNACK_PROTO_VER, PState1};
2014-03-19 20:05:42 +08:00
{_, true} ->
{?CONNACK_INVALID_ID, PState1};
2012-07-04 00:35:18 +08:00
_ ->
case creds(Username, Password, SSLLoginName) of
nocreds ->
rabbit_log_connection:error("MQTT login failed: no credentials provided~n"),
{?CONNACK_CREDENTIALS, PState1};
2016-09-02 06:33:34 +08:00
{invalid_creds, {undefined, Pass}} when is_list(Pass) ->
rabbit_log_connection:error("MQTT login failed: no user username is provided"),
{?CONNACK_CREDENTIALS, PState1};
2016-09-02 06:33:34 +08:00
{invalid_creds, {User, undefined}} when is_list(User) ->
rabbit_log_connection:error("MQTT login failed for ~p: no password provided", [User]),
{?CONNACK_CREDENTIALS, PState1};
2012-09-19 22:34:42 +08:00
{UserBin, PassBin} ->
case process_login(UserBin, PassBin, ProtoVersion, PState1) of
connack_dup_auth ->
{SessionPresent0, PState2} = maybe_clean_sess(PState1),
{{?CONNACK_ACCEPT, SessionPresent0}, PState2};
{?CONNACK_ACCEPT, Conn, VHost, AState} ->
RetainerPid = rabbit_mqtt_retainer_sup:child_for_vhost(VHost),
link(Conn),
{ok, Ch} = amqp_connection:open_channel(Conn),
2015-02-10 20:52:02 +08:00
link(Ch),
2014-08-11 18:26:27 +08:00
amqp_channel:enable_delivery_flow_control(Ch),
ok = rabbit_mqtt_collector:register(ClientId, self()),
2012-08-09 20:52:19 +08:00
Prefetch = rabbit_mqtt_util:env(prefetch),
#'basic.qos_ok'{} = amqp_channel:call(
Ch, #'basic.qos'{prefetch_count = Prefetch}),
2014-01-07 01:50:02 +08:00
rabbit_mqtt_reader:start_keepalive(self(), Keepalive),
PState3 = PState1#proc_state{
2016-04-22 02:02:28 +08:00
will_msg = make_will_msg(Var),
clean_sess = CleanSess,
channels = {Ch, undefined},
connection = Conn,
client_id = ClientId,
retainer_pid = RetainerPid,
auth_state = AState},
{SessionPresent1, PState4} = maybe_clean_sess(PState3),
{{?CONNACK_ACCEPT, SessionPresent1}, PState4};
ConnAck ->
{ConnAck, PState1}
end
2012-07-04 00:35:18 +08:00
end
end,
2016-04-22 02:02:28 +08:00
{ReturnCode, SessionPresent} = case Return of
{?CONNACK_ACCEPT, _} ->
Return;
Return ->
{Return, false}
end,
SendFun(#mqtt_frame{fixed = #mqtt_frame_fixed{type = ?CONNACK},
variable = #mqtt_frame_connack{
session_present = SessionPresent,
return_code = ReturnCode}},
PState5),
{ok, PState5};
2012-07-04 00:35:18 +08:00
2012-08-06 06:52:54 +08:00
process_request(?PUBACK,
#mqtt_frame{
2012-08-06 06:52:54 +08:00
variable = #mqtt_frame_publish{ message_id = MessageId }},
#proc_state{ channels = {Channel, _},
awaiting_ack = Awaiting } = PState) ->
%% tag can be missing because of bogus clients and QoS downgrades
case gb_trees:is_defined(MessageId, Awaiting) of
false ->
{ok, PState};
true ->
Tag = gb_trees:get(MessageId, Awaiting),
amqp_channel:cast(Channel, #'basic.ack'{ delivery_tag = Tag }),
{ok, PState#proc_state{ awaiting_ack = gb_trees:delete(MessageId, Awaiting) }}
end;
2012-08-06 06:52:54 +08:00
process_request(?PUBLISH,
2016-05-18 21:11:50 +08:00
Frame = #mqtt_frame{
fixed = Fixed = #mqtt_frame_fixed{ qos = ?QOS_2 }},
PState) ->
% Downgrade QOS_2 to QOS_1
2016-05-18 21:11:50 +08:00
process_request(?PUBLISH,
Frame#mqtt_frame{
fixed = Fixed#mqtt_frame_fixed{ qos = ?QOS_1 }},
PState);
process_request(?PUBLISH,
#mqtt_frame{
2012-08-06 06:52:54 +08:00
fixed = #mqtt_frame_fixed{ qos = Qos,
retain = Retain,
dup = Dup },
variable = #mqtt_frame_publish{ topic_name = Topic,
message_id = MessageId },
payload = Payload },
PState = #proc_state{retainer_pid = RPid}) ->
check_publish(Topic, fun() ->
Msg = #mqtt_msg{retain = Retain,
qos = Qos,
topic = Topic,
dup = Dup,
message_id = MessageId,
payload = Payload},
Result = amqp_pub(Msg, PState),
case Retain of
false -> ok;
true -> hand_off_to_retainer(RPid, Topic, Msg)
end,
{ok, Result}
end, PState);
process_request(?SUBSCRIBE,
#mqtt_frame{
variable = #mqtt_frame_subscribe{
message_id = SubscribeMsgId,
topic_table = Topics},
payload = undefined},
#proc_state{channels = {Channel, _},
exchange = Exchange,
retainer_pid = RPid,
send_fun = SendFun,
message_id = StateMsgId} = PState0) ->
rabbit_log_connection:debug("Received a SUBSCRIBE for topic(s) ~p", [Topics]),
check_subscribe(Topics, fun() ->
{QosResponse, PState1} =
lists:foldl(fun (#mqtt_topic{name = TopicName,
qos = Qos}, {QosList, PState}) ->
SupportedQos = supported_subs_qos(Qos),
{Queue, #proc_state{subscriptions = Subs} = PState1} =
ensure_queue(SupportedQos, PState),
Binding = #'queue.bind'{
queue = Queue,
exchange = Exchange,
routing_key = rabbit_mqtt_util:mqtt2amqp(
TopicName)},
#'queue.bind_ok'{} = amqp_channel:call(Channel, Binding),
2017-04-24 20:45:37 +08:00
SupportedQosList = case maps:find(TopicName, Subs) of
{ok, L} -> [SupportedQos|L];
error -> [SupportedQos]
end,
{[SupportedQos | QosList],
2017-04-24 20:45:37 +08:00
PState1 #proc_state{
subscriptions =
maps:put(TopicName, SupportedQosList, Subs)}}
end, {[], PState0}, Topics),
SendFun(#mqtt_frame{fixed = #mqtt_frame_fixed{type = ?SUBACK},
variable = #mqtt_frame_suback{
message_id = SubscribeMsgId,
qos_table = QosResponse}}, PState1),
%% we may need to send up to length(Topics) messages.
%% if QoS is > 0 then we need to generate a message id,
%% and increment the counter.
StartMsgId = safe_max_id(SubscribeMsgId, StateMsgId),
N = lists:foldl(fun (Topic, Acc) ->
case maybe_send_retained_message(RPid, Topic, Acc, PState1) of
{true, X} -> Acc + X;
false -> Acc
end
end, StartMsgId, Topics),
{ok, PState1#proc_state{message_id = N}}
end, PState0);
2012-07-05 00:47:07 +08:00
process_request(?UNSUBSCRIBE,
#mqtt_frame{
variable = #mqtt_frame_subscribe{ message_id = MessageId,
topic_table = Topics },
payload = undefined }, #proc_state{ channels = {Channel, _},
exchange = Exchange,
client_id = ClientId,
subscriptions = Subs0,
send_fun = SendFun } = PState) ->
rabbit_log_connection:debug("Received an UNSUBSCRIBE for topic(s) ~p", [Topics]),
2012-08-06 06:52:54 +08:00
Queues = rabbit_mqtt_util:subcription_queue_name(ClientId),
Subs1 =
lists:foldl(
fun (#mqtt_topic{ name = TopicName }, Subs) ->
2017-04-24 20:45:37 +08:00
QosSubs = case maps:find(TopicName, Subs) of
2012-08-06 06:52:54 +08:00
{ok, Val} when is_list(Val) -> lists:usort(Val);
error -> []
end,
lists:foreach(
fun (QosSub) ->
Queue = element(QosSub + 1, Queues),
Binding = #'queue.unbind'{
queue = Queue,
exchange = Exchange,
routing_key =
rabbit_mqtt_util:mqtt2amqp(TopicName)},
2012-08-06 06:52:54 +08:00
#'queue.unbind_ok'{} = amqp_channel:call(Channel, Binding)
end, QosSubs),
2017-04-24 20:45:37 +08:00
maps:remove(TopicName, Subs)
2012-08-06 06:52:54 +08:00
end, Subs0, Topics),
SendFun(#mqtt_frame{ fixed = #mqtt_frame_fixed { type = ?UNSUBACK },
variable = #mqtt_frame_suback{ message_id = MessageId }},
PState),
{ok, PState #proc_state{ subscriptions = Subs1 }};
process_request(?PINGREQ, #mqtt_frame{}, #proc_state{ send_fun = SendFun } = PState) ->
rabbit_log_connection:debug("Received a PINGREQ"),
SendFun(#mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?PINGRESP }},
PState),
rabbit_log_connection:debug("Sent a PINGRESP"),
{ok, PState};
process_request(?DISCONNECT, #mqtt_frame{}, PState) ->
rabbit_log_connection:debug("Received a DISCONNECT"),
{stop, PState}.
2012-07-04 00:35:18 +08:00
hand_off_to_retainer(RetainerPid, Topic, #mqtt_msg{payload = <<"">>}) ->
rabbit_mqtt_retainer:clear(RetainerPid, Topic),
ok;
hand_off_to_retainer(RetainerPid, Topic, Msg) ->
rabbit_mqtt_retainer:retain(RetainerPid, Topic, Msg),
ok.
maybe_send_retained_message(RPid, #mqtt_topic{name = S, qos = SubscribeQos}, MsgId,
#proc_state{ send_fun = SendFun } = PState) ->
case rabbit_mqtt_retainer:fetch(RPid, S) of
undefined -> false;
Msg ->
%% calculate effective QoS as the lower value of SUBSCRIBE frame QoS
%% and retained message QoS. The spec isn't super clear on this, we
%% do what Mosquitto does, per user feedback.
Qos = erlang:min(SubscribeQos, Msg#mqtt_msg.qos),
Id = case Qos of
?QOS_0 -> undefined;
?QOS_1 -> MsgId
end,
SendFun(#mqtt_frame{fixed = #mqtt_frame_fixed{
type = ?PUBLISH,
qos = Qos,
dup = false,
retain = Msg#mqtt_msg.retain
}, variable = #mqtt_frame_publish{
message_id = Id,
2019-02-01 13:20:27 +08:00
topic_name = rabbit_mqtt_util:amqp2mqtt(S)
},
payload = Msg#mqtt_msg.payload}, PState),
case Qos of
?QOS_0 -> false;
?QOS_1 -> {true, 1}
end
end.
2012-08-06 06:52:54 +08:00
amqp_callback({#'basic.deliver'{ consumer_tag = ConsumerTag,
delivery_tag = DeliveryTag,
routing_key = RoutingKey },
#amqp_msg{ props = #'P_basic'{ headers = Headers },
2014-08-11 18:26:27 +08:00
payload = Payload },
DeliveryCtx} = Delivery,
#proc_state{ channels = {Channel, _},
awaiting_ack = Awaiting,
message_id = MsgId,
send_fun = SendFun } = PState) ->
2014-08-11 18:26:27 +08:00
amqp_channel:notify_received(DeliveryCtx),
case {delivery_dup(Delivery), delivery_qos(ConsumerTag, Headers, PState)} of
{true, {?QOS_0, ?QOS_1}} ->
2012-08-17 01:09:21 +08:00
amqp_channel:cast(
Channel, #'basic.ack'{ delivery_tag = DeliveryTag }),
{ok, PState};
{true, {?QOS_0, ?QOS_0}} ->
{ok, PState};
2012-08-06 06:52:54 +08:00
{Dup, {DeliveryQos, _SubQos} = Qos} ->
SendFun(
2012-08-06 06:52:54 +08:00
#mqtt_frame{ fixed = #mqtt_frame_fixed{
type = ?PUBLISH,
qos = DeliveryQos,
dup = Dup },
variable = #mqtt_frame_publish{
2012-08-06 06:52:54 +08:00
message_id =
case DeliveryQos of
?QOS_0 -> undefined;
?QOS_1 -> MsgId
end,
topic_name =
rabbit_mqtt_util:amqp2mqtt(
2012-08-06 06:52:54 +08:00
RoutingKey) },
payload = Payload}, PState),
2012-08-06 06:52:54 +08:00
case Qos of
{?QOS_0, ?QOS_0} ->
{ok, PState};
2012-08-06 06:52:54 +08:00
{?QOS_1, ?QOS_1} ->
Awaiting1 = gb_trees:insert(MsgId, DeliveryTag, Awaiting),
PState1 = PState#proc_state{ awaiting_ack = Awaiting1 },
PState2 = next_msg_id(PState1),
{ok, PState2};
2012-08-06 06:52:54 +08:00
{?QOS_0, ?QOS_1} ->
amqp_channel:cast(
2012-08-17 01:09:21 +08:00
Channel, #'basic.ack'{ delivery_tag = DeliveryTag }),
{ok, PState}
2012-08-06 06:52:54 +08:00
end
end;
amqp_callback(#'basic.ack'{ multiple = true, delivery_tag = Tag } = Ack,
PState = #proc_state{ unacked_pubs = UnackedPubs,
send_fun = SendFun }) ->
case gb_trees:size(UnackedPubs) > 0 andalso
gb_trees:take_smallest(UnackedPubs) of
2012-08-06 06:52:54 +08:00
{TagSmall, MsgId, UnackedPubs1} when TagSmall =< Tag ->
SendFun(
2012-08-06 06:52:54 +08:00
#mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?PUBACK },
variable = #mqtt_frame_publish{ message_id = MsgId }},
PState),
amqp_callback(Ack, PState #proc_state{ unacked_pubs = UnackedPubs1 });
2012-08-06 06:52:54 +08:00
_ ->
{ok, PState}
2012-08-06 06:52:54 +08:00
end;
amqp_callback(#'basic.ack'{ multiple = false, delivery_tag = Tag },
PState = #proc_state{ unacked_pubs = UnackedPubs,
send_fun = SendFun }) ->
SendFun(
2012-08-06 06:52:54 +08:00
#mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?PUBACK },
variable = #mqtt_frame_publish{
message_id = gb_trees:get(
Tag, UnackedPubs) }}, PState),
{ok, PState #proc_state{ unacked_pubs = gb_trees:delete(Tag, UnackedPubs) }}.
2012-08-06 06:52:54 +08:00
delivery_dup({#'basic.deliver'{ redelivered = Redelivered },
2014-08-11 18:26:27 +08:00
#amqp_msg{ props = #'P_basic'{ headers = Headers }},
_DeliveryCtx}) ->
2012-11-06 18:32:38 +08:00
case rabbit_mqtt_util:table_lookup(Headers, <<"x-mqtt-dup">>) of
2012-08-06 06:52:54 +08:00
undefined -> Redelivered;
{bool, Dup} -> Redelivered orelse Dup
end.
ensure_valid_mqtt_message_id(Id) when Id >= 16#ffff ->
1;
ensure_valid_mqtt_message_id(Id) ->
Id.
safe_max_id(Id0, Id1) ->
ensure_valid_mqtt_message_id(erlang:max(Id0, Id1)).
next_msg_id(PState = #proc_state{ message_id = MsgId0 }) ->
MsgId1 = ensure_valid_mqtt_message_id(MsgId0 + 1),
PState#proc_state{ message_id = MsgId1 }.
2012-08-06 06:52:54 +08:00
%% decide at which qos level to deliver based on subscription
2012-11-06 04:30:45 +08:00
%% and the message publish qos level. non-MQTT publishes are
%% assumed to be qos 1, regardless of delivery_mode.
delivery_qos(Tag, _Headers, #proc_state{ consumer_tags = {Tag, _} }) ->
2012-08-06 06:52:54 +08:00
{?QOS_0, ?QOS_0};
2012-11-06 04:30:45 +08:00
delivery_qos(Tag, Headers, #proc_state{ consumer_tags = {_, Tag} }) ->
2012-11-06 18:32:38 +08:00
case rabbit_mqtt_util:table_lookup(Headers, <<"x-mqtt-publish-qos">>) of
2012-11-06 04:30:45 +08:00
{byte, Qos} -> {lists:min([Qos, ?QOS_1]), ?QOS_1};
undefined -> {?QOS_1, ?QOS_1}
end.
2012-08-06 06:52:54 +08:00
2016-04-22 02:02:28 +08:00
maybe_clean_sess(PState = #proc_state { clean_sess = false,
connection = Conn,
2016-04-22 02:02:28 +08:00
client_id = ClientId }) ->
SessionPresent = session_present(Conn, ClientId),
{_Queue, PState1} = ensure_queue(?QOS_1, PState),
2016-04-22 02:02:28 +08:00
{SessionPresent, PState1};
maybe_clean_sess(PState = #proc_state { clean_sess = true,
connection = Conn,
client_id = ClientId }) ->
2012-08-06 06:52:54 +08:00
{_, Queue} = rabbit_mqtt_util:subcription_queue_name(ClientId),
{ok, Channel} = amqp_connection:open_channel(Conn),
ok = try amqp_channel:call(Channel, #'queue.delete'{ queue = Queue }) of
#'queue.delete_ok'{} -> ok
catch
exit:_Error -> ok
after
amqp_channel:close(Channel)
end,
2016-04-22 02:02:28 +08:00
{false, PState}.
session_present(Conn, ClientId) ->
2016-04-22 02:02:28 +08:00
{_, QueueQ1} = rabbit_mqtt_util:subcription_queue_name(ClientId),
Declare = #'queue.declare'{queue = QueueQ1,
passive = true},
{ok, Channel} = amqp_connection:open_channel(Conn),
try
amqp_channel:call(Channel, Declare),
amqp_channel:close(Channel),
true
catch exit:{{shutdown, {server_initiated_close, ?NOT_FOUND, _Text}}, _} ->
false
2016-04-22 02:02:28 +08:00
end.
make_will_msg(#mqtt_frame_connect{ will_flag = false }) ->
2012-08-06 06:52:54 +08:00
undefined;
make_will_msg(#mqtt_frame_connect{ will_retain = Retain,
will_qos = Qos,
will_topic = Topic,
will_msg = Msg }) ->
#mqtt_msg{ retain = Retain,
qos = Qos,
topic = Topic,
dup = false,
payload = Msg }.
2012-07-04 00:35:18 +08:00
process_login(_UserBin, _PassBin, _ProtoVersion,
#proc_state{channels = {Channel, _},
auth_state = #auth_state{username = Username,
vhost = VHost}}) when is_pid(Channel) ->
UsernameStr = rabbit_data_coercion:to_list(Username),
VHostStr = rabbit_data_coercion:to_list(VHost),
2019-02-08 06:58:27 +08:00
rabbit_log_connection:warning("MQTT detected duplicate connect/login attempt for user ~p, vhost ~p",
[UsernameStr, VHostStr]),
connack_dup_auth;
2014-03-19 20:05:42 +08:00
process_login(UserBin, PassBin, ProtoVersion,
#proc_state{channels = {undefined, undefined},
socket = Sock,
adapter_info = AdapterInfo,
ssl_login_name = SslLoginName,
peer_addr = Addr}) ->
{ok, {_, _, _, ToPort}} = rabbit_net:socket_ends(Sock, inbound),
2016-12-20 03:51:05 +08:00
{VHostPickedUsing, {VHost, UsernameBin}} = get_vhost(UserBin, SslLoginName, ToPort),
rabbit_log_connection:info(
2016-12-20 05:42:06 +08:00
"MQTT vhost picked using ~s~n",
[human_readable_vhost_lookup_strategy(VHostPickedUsing)]),
case rabbit_vhost:exists(VHost) of
true ->
case amqp_connection:start(#amqp_params_direct{
username = UsernameBin,
password = PassBin,
virtual_host = VHost,
adapter_info = set_proto_version(AdapterInfo, ProtoVersion)}) of
{ok, Connection} ->
case rabbit_access_control:check_user_loopback(UsernameBin, Addr) of
ok ->
[{internal_user, InternalUser}] = amqp_connection:info(
Connection, [internal_user]),
{?CONNACK_ACCEPT, Connection, VHost,
#auth_state{user = InternalUser,
username = UsernameBin,
vhost = VHost}};
not_allowed ->
amqp_connection:close(Connection),
rabbit_log_connection:warning(
"MQTT login failed for ~p access_refused "
"(access must be from localhost)~n",
[binary_to_list(UsernameBin)]),
?CONNACK_AUTH
end;
{error, {auth_failure, Explanation}} ->
rabbit_log_connection:error("MQTT login failed for ~p auth_failure: ~s~n",
[binary_to_list(UserBin), Explanation]),
?CONNACK_CREDENTIALS;
{error, access_refused} ->
rabbit_log_connection:warning("MQTT login failed for ~p access_refused "
"(vhost access not allowed)~n",
[binary_to_list(UserBin)]),
?CONNACK_AUTH;
{error, not_allowed} ->
%% when vhost allowed for TLS connection
rabbit_log_connection:warning("MQTT login failed for ~p access_refused "
"(vhost access not allowed)~n",
[binary_to_list(UserBin)]),
2016-05-20 21:04:10 +08:00
?CONNACK_AUTH
2014-02-19 01:33:33 +08:00
end;
false ->
rabbit_log_connection:error("MQTT login failed for ~p auth_failure: vhost ~s does not exist~n",
[binary_to_list(UserBin), VHost]),
?CONNACK_CREDENTIALS
2014-02-19 01:33:00 +08:00
end.
get_vhost(UserBin, none, Port) ->
get_vhost_no_ssl(UserBin, Port);
get_vhost(UserBin, undefined, Port) ->
get_vhost_no_ssl(UserBin, Port);
get_vhost(UserBin, SslLogin, Port) ->
get_vhost_ssl(UserBin, SslLogin, Port).
get_vhost_no_ssl(UserBin, Port) ->
case vhost_in_username(UserBin) of
true ->
{vhost_in_username_or_default, get_vhost_username(UserBin)};
false ->
PortVirtualHostMapping = rabbit_runtime_parameters:value_global(
mqtt_port_to_vhost_mapping
),
case get_vhost_from_port_mapping(Port, PortVirtualHostMapping) of
undefined ->
{default_vhost, {rabbit_mqtt_util:env(vhost), UserBin}};
VHost ->
{port_to_vhost_mapping, {VHost, UserBin}}
end
end.
get_vhost_ssl(UserBin, SslLoginName, Port) ->
UserVirtualHostMapping = rabbit_runtime_parameters:value_global(
mqtt_default_vhosts
),
case get_vhost_from_user_mapping(SslLoginName, UserVirtualHostMapping) of
undefined ->
PortVirtualHostMapping = rabbit_runtime_parameters:value_global(
mqtt_port_to_vhost_mapping
),
case get_vhost_from_port_mapping(Port, PortVirtualHostMapping) of
undefined ->
{vhost_in_username_or_default, get_vhost_username(UserBin)};
VHostFromPortMapping ->
{port_to_vhost_mapping, {VHostFromPortMapping, UserBin}}
end;
VHostFromCertMapping ->
{cert_to_vhost_mapping, {VHostFromCertMapping, UserBin}}
end.
vhost_in_username(UserBin) ->
case application:get_env(?APP, ignore_colons_in_username) of
{ok, true} -> false;
_ ->
%% split at the last colon, disallowing colons in username
case re:split(UserBin, ":(?!.*?:)") of
[_, _] -> true;
[UserBin] -> false
end
end.
2013-11-18 20:00:47 +08:00
get_vhost_username(UserBin) ->
Default = {rabbit_mqtt_util:env(vhost), UserBin},
case application:get_env(?APP, ignore_colons_in_username) of
{ok, true} -> Default;
_ ->
%% split at the last colon, disallowing colons in username
case re:split(UserBin, ":(?!.*?:)") of
[Vhost, UserName] -> {Vhost, UserName};
[UserBin] -> Default
end
2013-11-18 20:00:47 +08:00
end.
get_vhost_from_user_mapping(_User, not_found) ->
undefined;
get_vhost_from_user_mapping(User, Mapping) ->
M = rabbit_data_coercion:to_proplist(Mapping),
case rabbit_misc:pget(User, M) of
undefined ->
undefined;
VHost ->
VHost
end.
get_vhost_from_port_mapping(_Port, not_found) ->
undefined;
get_vhost_from_port_mapping(Port, Mapping) ->
M = rabbit_data_coercion:to_proplist(Mapping),
Res = case rabbit_misc:pget(rabbit_data_coercion:to_binary(Port), M) of
undefined ->
undefined;
VHost ->
VHost
end,
Res.
2016-12-20 05:42:06 +08:00
human_readable_vhost_lookup_strategy(vhost_in_username_or_default) ->
"vhost in username or default";
human_readable_vhost_lookup_strategy(port_to_vhost_mapping) ->
"MQTT port to vhost mapping";
human_readable_vhost_lookup_strategy(cert_to_vhost_mapping) ->
"client certificate to vhost mapping";
human_readable_vhost_lookup_strategy(default_vhost) ->
"plugin configuration or default";
human_readable_vhost_lookup_strategy(Val) ->
atom_to_list(Val).
creds(User, Pass, SSLLoginName) ->
DefaultUser = rabbit_mqtt_util:env(default_user),
DefaultPass = rabbit_mqtt_util:env(default_pass),
{ok, Anon} = application:get_env(?APP, allow_anonymous),
{ok, TLSAuth} = application:get_env(?APP, ssl_cert_login),
HaveDefaultCreds = Anon =:= true andalso
is_binary(DefaultUser) andalso
is_binary(DefaultPass),
CredentialsProvided = User =/= undefined orelse
Pass =/= undefined,
CorrectCredentials = is_list(User) andalso
is_list(Pass),
SSLLoginProvided = TLSAuth =:= true andalso
SSLLoginName =/= none,
case {CredentialsProvided, CorrectCredentials, SSLLoginProvided, HaveDefaultCreds} of
2016-09-02 06:33:34 +08:00
%% Username and password take priority
{true, true, _, _} -> {list_to_binary(User),
list_to_binary(Pass)};
%% Either username or password is provided
2016-09-02 06:33:34 +08:00
{true, false, _, _} -> {invalid_creds, {User, Pass}};
%% rabbitmq_mqtt.ssl_cert_login is true. SSL user name provided.
2016-09-02 06:33:34 +08:00
%% Authenticating using username only.
{false, false, true, _} -> {SSLLoginName, none};
2016-09-02 06:33:34 +08:00
%% Anonymous connection uses default credentials
{false, false, false, true} -> {DefaultUser, DefaultPass};
_ -> nocreds
end.
2012-07-04 00:35:18 +08:00
2012-08-06 06:52:54 +08:00
supported_subs_qos(?QOS_0) -> ?QOS_0;
supported_subs_qos(?QOS_1) -> ?QOS_1;
supported_subs_qos(?QOS_2) -> ?QOS_1.
2014-01-30 23:33:59 +08:00
delivery_mode(?QOS_0) -> 1;
delivery_mode(?QOS_1) -> 2.
2012-08-06 06:52:54 +08:00
%% different qos subscriptions are received in different queues
%% with appropriate durability and timeout arguments
%% this will lead to duplicate messages for overlapping subscriptions
%% with different qos values - todo: prevent duplicates
ensure_queue(Qos, #proc_state{ channels = {Channel, _},
client_id = ClientId,
clean_sess = CleanSess,
consumer_tags = {TagQ0, TagQ1} = Tags} = PState) ->
2012-08-06 06:52:54 +08:00
{QueueQ0, QueueQ1} = rabbit_mqtt_util:subcription_queue_name(ClientId),
Qos1Args = case {rabbit_mqtt_util:env(subscription_ttl), CleanSess} of
2012-11-06 18:32:38 +08:00
{undefined, _} ->
[];
{Ms, false} when is_integer(Ms) ->
[{<<"x-expires">>, long, Ms}];
_ ->
[]
2012-08-06 06:52:54 +08:00
end,
QueueSetup =
case {TagQ0, TagQ1, Qos} of
{undefined, _, ?QOS_0} ->
{QueueQ0,
#'queue.declare'{ queue = QueueQ0,
durable = false,
auto_delete = true },
#'basic.consume'{ queue = QueueQ0,
no_ack = true }};
{_, undefined, ?QOS_1} ->
{QueueQ1,
#'queue.declare'{ queue = QueueQ1,
durable = true,
2015-10-22 01:18:33 +08:00
%% Clean session means a transient connection,
%% translating into auto-delete.
%%
%% see rabbitmq/rabbitmq-mqtt#37
auto_delete = CleanSess,
2012-08-06 06:52:54 +08:00
arguments = Qos1Args },
#'basic.consume'{ queue = QueueQ1,
no_ack = false }};
{_, _, ?QOS_0} ->
{exists, QueueQ0};
{_, _, ?QOS_1} ->
{exists, QueueQ1}
end,
case QueueSetup of
{Queue, Declare, Consume} ->
#'queue.declare_ok'{} = amqp_channel:call(Channel, Declare),
#'basic.consume_ok'{ consumer_tag = Tag } =
amqp_channel:call(Channel, Consume),
{Queue, PState #proc_state{ consumer_tags = setelement(Qos+1, Tags, Tag) }};
2012-08-06 06:52:54 +08:00
{exists, Q} ->
{Q, PState}
2012-08-06 06:52:54 +08:00
end.
send_will(PState = #proc_state{will_msg = undefined}) ->
PState;
send_will(PState = #proc_state{will_msg = WillMsg = #mqtt_msg{retain = Retain,
topic = Topic},
retainer_pid = RPid,
channels = {ChQos0, ChQos1}}) ->
case check_topic_access(Topic, write, PState) of
ok ->
amqp_pub(WillMsg, PState),
case Retain of
false -> ok;
true -> hand_off_to_retainer(RPid, Topic, WillMsg)
end;
Error ->
rabbit_log:warning(
"Could not send last will: ~p~n",
[Error])
end,
case ChQos1 of
undefined -> ok;
_ -> amqp_channel:close(ChQos1)
end,
case ChQos0 of
undefined -> ok;
_ -> amqp_channel:close(ChQos0)
end,
PState #proc_state{ channels = {undefined, undefined} }.
amqp_pub(undefined, PState) ->
PState;
2012-08-06 06:52:54 +08:00
%% set up a qos1 publishing channel if necessary
%% this channel will only be used for publishing, not consuming
amqp_pub(Msg = #mqtt_msg{ qos = ?QOS_1 },
PState = #proc_state{ channels = {ChQos0, undefined},
awaiting_seqno = undefined,
connection = Conn }) ->
2012-08-06 06:52:54 +08:00
{ok, Channel} = amqp_connection:open_channel(Conn),
#'confirm.select_ok'{} = amqp_channel:call(Channel, #'confirm.select'{}),
amqp_channel:register_confirm_handler(Channel, self()),
amqp_pub(Msg, PState #proc_state{ channels = {ChQos0, Channel},
awaiting_seqno = 1 });
2012-08-06 06:52:54 +08:00
amqp_pub(#mqtt_msg{ qos = Qos,
2012-08-06 06:52:54 +08:00
topic = Topic,
dup = Dup,
message_id = MessageId,
payload = Payload },
PState = #proc_state{ channels = {ChQos0, ChQos1},
exchange = Exchange,
unacked_pubs = UnackedPubs,
awaiting_seqno = SeqNo }) ->
2012-08-06 06:52:54 +08:00
Method = #'basic.publish'{ exchange = Exchange,
routing_key =
rabbit_mqtt_util:mqtt2amqp(Topic)},
2012-11-06 18:32:38 +08:00
Headers = [{<<"x-mqtt-publish-qos">>, byte, Qos},
{<<"x-mqtt-dup">>, bool, Dup}],
2014-01-30 23:33:59 +08:00
Msg = #amqp_msg{ props = #'P_basic'{ headers = Headers,
delivery_mode = delivery_mode(Qos)},
2012-08-06 06:52:54 +08:00
payload = Payload },
{UnackedPubs1, Ch, SeqNo1} =
case Qos =:= ?QOS_1 andalso MessageId =/= undefined of
true -> {gb_trees:enter(SeqNo, MessageId, UnackedPubs), ChQos1,
SeqNo + 1};
false -> {UnackedPubs, ChQos0, SeqNo}
end,
amqp_channel:cast_flow(Ch, Method, Msg),
PState #proc_state{ unacked_pubs = UnackedPubs1,
awaiting_seqno = SeqNo1 }.
2012-08-06 06:52:54 +08:00
adapter_info(Sock, ProtoName) ->
amqp_connection:socket_adapter_info(Sock, {ProtoName, "N/A"}).
2016-03-04 18:34:53 +08:00
set_proto_version(AdapterInfo = #amqp_adapter_info{protocol = {Proto, _}}, Vsn) ->
AdapterInfo#amqp_adapter_info{protocol = {Proto,
human_readable_mqtt_version(Vsn)}}.
human_readable_mqtt_version(3) ->
"3.1.0";
human_readable_mqtt_version(4) ->
"3.1.1";
human_readable_mqtt_version(_) ->
"N/A".
2012-08-17 00:56:50 +08:00
serialise_and_send_to_client(Frame, #proc_state{ socket = Sock }) ->
try rabbit_net:port_command(Sock, rabbit_mqtt_frame:serialise(Frame)) of
Res ->
Res
catch _:Error ->
rabbit_log_connection:error("MQTT: a socket write failed, the socket might already be closed"),
rabbit_log_connection:debug("Failed to write to socket ~p, error: ~p, frame: ~p",
[Sock, Error, Frame])
end.
close_connection(PState = #proc_state{ connection = undefined }) ->
PState;
close_connection(PState = #proc_state{ connection = Connection,
client_id = ClientId }) ->
% todo: maybe clean session
case ClientId of
undefined -> ok;
2013-11-14 21:48:14 +08:00
_ -> ok = rabbit_mqtt_collector:unregister(ClientId, self())
end,
%% ignore noproc or other exceptions to avoid debris
catch amqp_connection:close(Connection),
PState #proc_state{ channels = {undefined, undefined},
connection = undefined }.
% NB: check_*: MQTT spec says we should ack normally, ie pretend there
% was no auth error, but here we are closing the connection with an error. This
% is what happens anyway if there is an authorization failure at the AMQP level.
check_publish(TopicName, Fn, PState) ->
case check_topic_access(TopicName, write, PState) of
2015-07-14 17:47:39 +08:00
ok -> Fn();
2015-07-14 17:47:01 +08:00
_ -> {err, unauthorized, PState}
end.
check_subscribe([], Fn, _) ->
2015-07-14 17:47:39 +08:00
Fn();
check_subscribe([#mqtt_topic{name = TopicName} | Topics], Fn, PState) ->
case check_topic_access(TopicName, read, PState) of
ok -> check_subscribe(Topics, Fn, PState);
2015-07-14 17:47:01 +08:00
_ -> {err, unauthorized, PState}
end.
check_topic_access(TopicName, Access,
#proc_state{
auth_state = #auth_state{user = User = #user{username = Username},
vhost = VHost},
exchange = Exchange,
client_id = ClientId}) ->
Resource = #resource{virtual_host = VHost,
kind = topic,
name = Exchange},
Context = #{routing_key => rabbit_mqtt_util:mqtt2amqp(TopicName),
variable_map => #{
<<"username">> => Username,
<<"vhost">> => VHost,
<<"client_id">> => rabbit_data_coercion:to_binary(ClientId)
}
},
try rabbit_access_control:check_topic_access(User, Resource, Access, Context) of
R -> R
catch
_:{amqp_error, access_refused, Msg, _} ->
rabbit_log:error("operation resulted in an error (access_refused): ~p~n", [Msg]),
{error, access_refused};
_:Error ->
rabbit_log:error("~p~n", [Error]),
{error, access_refused}
end.
2016-12-05 22:58:19 +08:00
info(consumer_tags, #proc_state{consumer_tags = Val}) -> Val;
info(unacked_pubs, #proc_state{unacked_pubs = Val}) -> Val;
info(awaiting_ack, #proc_state{awaiting_ack = Val}) -> Val;
info(awaiting_seqno, #proc_state{awaiting_seqno = Val}) -> Val;
info(message_id, #proc_state{message_id = Val}) -> Val;
2016-12-06 22:24:33 +08:00
info(client_id, #proc_state{client_id = Val}) ->
rabbit_data_coercion:to_binary(Val);
2016-12-05 22:58:19 +08:00
info(clean_sess, #proc_state{clean_sess = Val}) -> Val;
info(will_msg, #proc_state{will_msg = Val}) -> Val;
info(channels, #proc_state{channels = Val}) -> Val;
info(exchange, #proc_state{exchange = Val}) -> Val;
info(adapter_info, #proc_state{adapter_info = Val}) -> Val;
info(ssl_login_name, #proc_state{ssl_login_name = Val}) -> Val;
info(retainer_pid, #proc_state{retainer_pid = Val}) -> Val;
info(user, #proc_state{auth_state = #auth_state{username = Val}}) -> Val;
info(vhost, #proc_state{auth_state = #auth_state{vhost = Val}}) -> Val;
info(host, #proc_state{adapter_info = #amqp_adapter_info{host = Val}}) -> Val;
info(port, #proc_state{adapter_info = #amqp_adapter_info{port = Val}}) -> Val;
info(peer_host, #proc_state{adapter_info = #amqp_adapter_info{peer_host = Val}}) -> Val;
info(peer_port, #proc_state{adapter_info = #amqp_adapter_info{peer_port = Val}}) -> Val;
2016-12-06 22:24:33 +08:00
info(protocol, #proc_state{adapter_info = #amqp_adapter_info{protocol = Val}}) ->
case Val of
{Proto, Version} -> {Proto, rabbit_data_coercion:to_binary(Version)};
Other -> Other
end;
2016-12-05 22:58:19 +08:00
info(channels, PState) -> additional_info(channels, PState);
info(channel_max, PState) -> additional_info(channel_max, PState);
info(frame_max, PState) -> additional_info(frame_max, PState);
info(client_properties, PState) -> additional_info(client_properties, PState);
info(ssl, PState) -> additional_info(ssl, PState);
info(ssl_protocol, PState) -> additional_info(ssl_protocol, PState);
info(ssl_key_exchange, PState) -> additional_info(ssl_key_exchange, PState);
info(ssl_cipher, PState) -> additional_info(ssl_cipher, PState);
info(ssl_hash, PState) -> additional_info(ssl_hash, PState);
info(Other, _) -> throw({bad_argument, Other}).
additional_info(Key,
#proc_state{adapter_info =
#amqp_adapter_info{additional_info = AddInfo}}) ->
proplists:get_value(Key, AddInfo).