2012-06-27 00:57:24 +08:00
|
|
|
%% The contents of this file are subject to the Mozilla Public License
|
|
|
|
|
%% Version 1.1 (the "License"); you may not use this file except in
|
|
|
|
|
%% compliance with the License. You may obtain a copy of the License
|
|
|
|
|
%% at http://www.mozilla.org/MPL/
|
|
|
|
|
%%
|
|
|
|
|
%% Software distributed under the License is distributed on an "AS IS"
|
|
|
|
|
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
|
|
|
|
|
%% the License for the specific language governing rights and
|
|
|
|
|
%% limitations under the License.
|
|
|
|
|
%%
|
|
|
|
|
%% The Original Code is RabbitMQ.
|
|
|
|
|
%%
|
|
|
|
|
%% The Initial Developer of the Original Code is VMware, Inc.
|
|
|
|
|
%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
|
|
|
|
|
%%
|
|
|
|
|
|
|
|
|
|
-module(rabbit_mqtt_processor).
|
|
|
|
|
|
2012-08-21 00:30:13 +08:00
|
|
|
-export([info/2, initial_state/1,
|
|
|
|
|
process_frame/2, amqp_pub/2, amqp_callback/2, send_will/1,
|
|
|
|
|
close_connection/1]).
|
2012-06-27 00:57:24 +08:00
|
|
|
|
|
|
|
|
-include_lib("amqp_client/include/amqp_client.hrl").
|
2012-09-12 21:34:41 +08:00
|
|
|
-include("rabbit_mqtt_frame.hrl").
|
|
|
|
|
-include("rabbit_mqtt.hrl").
|
2012-07-13 23:32:13 +08:00
|
|
|
|
|
|
|
|
-define(FRAME_TYPE(Frame, Type),
|
2012-08-21 00:30:13 +08:00
|
|
|
Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }}).
|
|
|
|
|
|
|
|
|
|
initial_state(Socket) ->
|
|
|
|
|
#proc_state{ unacked_pubs = gb_trees:empty(),
|
|
|
|
|
awaiting_ack = gb_trees:empty(),
|
|
|
|
|
message_id = 1,
|
|
|
|
|
subscriptions = dict:new(),
|
|
|
|
|
consumer_tags = {undefined, undefined},
|
|
|
|
|
channels = {undefined, undefined},
|
|
|
|
|
exchange = rabbit_mqtt_util:env(exchange),
|
|
|
|
|
socket = Socket }.
|
|
|
|
|
|
|
|
|
|
info(client_id, #proc_state{ client_id = ClientId }) -> ClientId.
|
2012-07-13 23:32:13 +08:00
|
|
|
|
2012-08-06 06:52:54 +08:00
|
|
|
process_frame(#mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }},
|
2012-08-21 00:30:13 +08:00
|
|
|
PState = #proc_state{ connection = undefined } )
|
2012-08-06 06:52:54 +08:00
|
|
|
when Type =/= ?CONNECT ->
|
2012-08-21 00:30:13 +08:00
|
|
|
{err, connect_expected, PState};
|
2012-07-13 23:32:13 +08:00
|
|
|
process_frame(Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }},
|
2012-08-21 00:30:13 +08:00
|
|
|
PState ) ->
|
2012-08-06 06:52:54 +08:00
|
|
|
%rabbit_log:info("MQTT received frame ~p ~n", [Frame]),
|
2012-08-21 00:30:13 +08:00
|
|
|
process_request(Type, Frame, PState).
|
2012-07-13 23:32:13 +08:00
|
|
|
|
|
|
|
|
process_request(?CONNECT,
|
2012-08-21 00:30:13 +08:00
|
|
|
#mqtt_frame{ variable = #mqtt_frame_connect{
|
2012-07-16 21:57:31 +08:00
|
|
|
username = Username,
|
|
|
|
|
password = Password,
|
|
|
|
|
proto_ver = ProtoVersion,
|
|
|
|
|
clean_sess = CleanSess,
|
2012-08-21 00:30:13 +08:00
|
|
|
client_id = ClientId } = Var}, PState) ->
|
|
|
|
|
{ReturnCode, PState1} =
|
2012-07-13 23:32:13 +08:00
|
|
|
case {ProtoVersion =:= ?MQTT_PROTO_MAJOR,
|
|
|
|
|
rabbit_mqtt_util:valid_client_id(ClientId)} of
|
2012-07-04 00:35:18 +08:00
|
|
|
{false, _} ->
|
2012-08-21 00:30:13 +08:00
|
|
|
{?CONNACK_PROTO_VER, PState};
|
2012-07-04 00:35:18 +08:00
|
|
|
{_, false} ->
|
2012-08-21 00:30:13 +08:00
|
|
|
{?CONNACK_INVALID_ID, PState};
|
2012-07-04 00:35:18 +08:00
|
|
|
_ ->
|
2012-07-16 21:57:31 +08:00
|
|
|
case creds(Username, Password) of
|
|
|
|
|
nocreds ->
|
|
|
|
|
rabbit_log:error("MQTT login failed - no credentials~n"),
|
2012-08-21 00:30:13 +08:00
|
|
|
{?CONNACK_CREDENTIALS, PState};
|
2012-09-19 22:34:42 +08:00
|
|
|
{UserBin, PassBin} ->
|
|
|
|
|
case process_login(UserBin, PassBin, PState) of
|
2012-07-16 21:57:31 +08:00
|
|
|
{?CONNACK_ACCEPT, Conn} ->
|
|
|
|
|
link(Conn),
|
|
|
|
|
maybe_clean_sess(CleanSess, Conn, ClientId),
|
|
|
|
|
{ok, Ch} = amqp_connection:open_channel(Conn),
|
2012-08-08 20:37:59 +08:00
|
|
|
ok = rabbit_mqtt_collector:register(
|
|
|
|
|
ClientId, self()),
|
2012-08-09 20:52:19 +08:00
|
|
|
Prefetch = rabbit_mqtt_util:env(prefetch),
|
|
|
|
|
#'basic.qos_ok'{} = amqp_channel:call(
|
|
|
|
|
Ch, #'basic.qos'{prefetch_count = Prefetch}),
|
2012-07-16 21:57:31 +08:00
|
|
|
{?CONNACK_ACCEPT,
|
2012-08-21 00:30:13 +08:00
|
|
|
PState #proc_state{ will_msg = make_will_msg(Var),
|
|
|
|
|
clean_sess = CleanSess,
|
|
|
|
|
channels = {Ch, undefined},
|
|
|
|
|
connection = Conn,
|
|
|
|
|
client_id = ClientId }};
|
|
|
|
|
ConnAck ->
|
|
|
|
|
{ConnAck, PState}
|
2012-07-16 21:57:31 +08:00
|
|
|
end
|
2012-07-04 00:35:18 +08:00
|
|
|
end
|
|
|
|
|
end,
|
2012-08-06 06:52:54 +08:00
|
|
|
send_client(#mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?CONNACK},
|
|
|
|
|
variable = #mqtt_frame_connack{
|
2012-08-21 00:30:13 +08:00
|
|
|
return_code = ReturnCode }}, PState1),
|
|
|
|
|
{ok, PState1};
|
2012-07-04 00:35:18 +08:00
|
|
|
|
2012-08-06 06:52:54 +08:00
|
|
|
process_request(?PUBACK,
|
2012-08-21 00:30:13 +08:00
|
|
|
#mqtt_frame{
|
2012-08-06 06:52:54 +08:00
|
|
|
variable = #mqtt_frame_publish{ message_id = MessageId }},
|
2012-08-21 00:30:13 +08:00
|
|
|
#proc_state{ channels = {Channel, _},
|
|
|
|
|
awaiting_ack = Awaiting } = PState) ->
|
2012-08-06 06:52:54 +08:00
|
|
|
Tag = gb_trees:get(MessageId, Awaiting),
|
|
|
|
|
amqp_channel:cast(
|
|
|
|
|
Channel, #'basic.ack'{ delivery_tag = Tag }),
|
2012-08-21 00:30:13 +08:00
|
|
|
{ok, PState #proc_state{ awaiting_ack = gb_trees:delete( MessageId, Awaiting)}};
|
2012-08-06 06:52:54 +08:00
|
|
|
|
|
|
|
|
process_request(?PUBLISH,
|
|
|
|
|
#mqtt_frame{
|
2012-08-21 00:30:13 +08:00
|
|
|
fixed = #mqtt_frame_fixed{ qos = ?QOS_2 }}, PState) ->
|
|
|
|
|
{err, qos2_not_supported, PState};
|
2012-07-04 23:31:30 +08:00
|
|
|
process_request(?PUBLISH,
|
2012-08-21 00:30:13 +08:00
|
|
|
#mqtt_frame{
|
2012-08-06 06:52:54 +08:00
|
|
|
fixed = #mqtt_frame_fixed{ qos = Qos,
|
|
|
|
|
retain = Retain,
|
|
|
|
|
dup = Dup },
|
|
|
|
|
variable = #mqtt_frame_publish{ topic_name = Topic,
|
|
|
|
|
message_id = MessageId },
|
2012-08-21 00:30:13 +08:00
|
|
|
payload = Payload }, PState) ->
|
2012-08-06 06:52:54 +08:00
|
|
|
{ok, amqp_pub(#mqtt_msg{ retain = Retain,
|
|
|
|
|
qos = Qos,
|
|
|
|
|
topic = Topic,
|
|
|
|
|
dup = Dup,
|
|
|
|
|
message_id = MessageId,
|
2012-08-21 00:30:13 +08:00
|
|
|
payload = Payload }, PState)};
|
2012-07-04 23:31:30 +08:00
|
|
|
|
|
|
|
|
process_request(?SUBSCRIBE,
|
2012-08-21 00:30:13 +08:00
|
|
|
#mqtt_frame{
|
|
|
|
|
variable = #mqtt_frame_subscribe{ message_id = MessageId,
|
|
|
|
|
topic_table = Topics },
|
2012-08-06 06:52:54 +08:00
|
|
|
payload = undefined },
|
2012-08-21 00:30:13 +08:00
|
|
|
#proc_state{ channels = {Channel, _},
|
|
|
|
|
exchange = Exchange} = PState0) ->
|
|
|
|
|
{QosResponse, PState1} =
|
|
|
|
|
lists:foldl(fun (#mqtt_topic{ name = TopicName,
|
|
|
|
|
qos = Qos }, {QosList, PState}) ->
|
2012-08-06 06:52:54 +08:00
|
|
|
SupportedQos = supported_subs_qos(Qos),
|
2012-08-21 00:30:13 +08:00
|
|
|
{Queue, #proc_state{ subscriptions = Subs } = PState1} =
|
|
|
|
|
ensure_queue(SupportedQos, PState),
|
2012-08-06 06:52:54 +08:00
|
|
|
Binding = #'queue.bind'{
|
|
|
|
|
queue = Queue,
|
|
|
|
|
exchange = Exchange,
|
2012-08-17 01:01:45 +08:00
|
|
|
routing_key = rabbit_mqtt_util:mqtt2amqp(
|
2012-08-06 06:52:54 +08:00
|
|
|
TopicName)},
|
|
|
|
|
#'queue.bind_ok'{} = amqp_channel:call(Channel, Binding),
|
|
|
|
|
{[SupportedQos | QosList],
|
2012-08-21 00:30:13 +08:00
|
|
|
PState1 #proc_state{ subscriptions =
|
|
|
|
|
dict:append(TopicName, SupportedQos, Subs) }}
|
|
|
|
|
end, {[], PState0}, Topics),
|
|
|
|
|
send_client(#mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?SUBACK },
|
|
|
|
|
variable = #mqtt_frame_suback{
|
2012-07-04 23:31:30 +08:00
|
|
|
message_id = MessageId,
|
2012-08-21 00:30:13 +08:00
|
|
|
qos_table = QosResponse }}, PState1),
|
2012-08-06 06:52:54 +08:00
|
|
|
|
2012-08-21 00:30:13 +08:00
|
|
|
{ok, PState1};
|
2012-07-05 00:47:07 +08:00
|
|
|
|
|
|
|
|
process_request(?UNSUBSCRIBE,
|
2012-08-21 00:30:13 +08:00
|
|
|
#mqtt_frame{
|
|
|
|
|
variable = #mqtt_frame_subscribe{ message_id = MessageId,
|
|
|
|
|
topic_table = Topics },
|
|
|
|
|
payload = undefined }, #proc_state{ channels = {Channel, _},
|
|
|
|
|
exchange = Exchange,
|
|
|
|
|
client_id = ClientId,
|
|
|
|
|
subscriptions = Subs0} = PState) ->
|
2012-08-06 06:52:54 +08:00
|
|
|
Queues = rabbit_mqtt_util:subcription_queue_name(ClientId),
|
|
|
|
|
Subs1 =
|
|
|
|
|
lists:foldl(
|
2012-08-21 00:30:13 +08:00
|
|
|
fun (#mqtt_topic{ name = TopicName }, Subs) ->
|
2012-08-06 06:52:54 +08:00
|
|
|
QosSubs = case dict:find(TopicName, Subs) of
|
|
|
|
|
{ok, Val} when is_list(Val) -> lists:usort(Val);
|
|
|
|
|
error -> []
|
|
|
|
|
end,
|
|
|
|
|
lists:foreach(
|
|
|
|
|
fun (QosSub) ->
|
|
|
|
|
Queue = element(QosSub + 1, Queues),
|
|
|
|
|
Binding = #'queue.unbind'{
|
|
|
|
|
queue = Queue,
|
|
|
|
|
exchange = Exchange,
|
|
|
|
|
routing_key =
|
2012-08-17 01:01:45 +08:00
|
|
|
rabbit_mqtt_util:mqtt2amqp(TopicName)},
|
2012-08-06 06:52:54 +08:00
|
|
|
#'queue.unbind_ok'{} = amqp_channel:call(Channel, Binding)
|
|
|
|
|
end, QosSubs),
|
|
|
|
|
dict:erase(TopicName, Subs)
|
|
|
|
|
end, Subs0, Topics),
|
2012-08-21 00:30:13 +08:00
|
|
|
send_client(#mqtt_frame{ fixed = #mqtt_frame_fixed { type = ?UNSUBACK },
|
|
|
|
|
variable = #mqtt_frame_suback{ message_id = MessageId }},
|
|
|
|
|
PState),
|
|
|
|
|
{ok, PState #proc_state{ subscriptions = Subs1 }};
|
2012-07-03 16:55:31 +08:00
|
|
|
|
2012-08-21 00:30:13 +08:00
|
|
|
process_request(?PINGREQ, #mqtt_frame{}, PState) ->
|
|
|
|
|
send_client(#mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?PINGRESP }},
|
|
|
|
|
PState),
|
|
|
|
|
{ok, PState};
|
2012-07-04 23:31:30 +08:00
|
|
|
|
2012-08-21 00:30:13 +08:00
|
|
|
process_request(?DISCONNECT, #mqtt_frame{}, PState) ->
|
|
|
|
|
{stop, PState}.
|
2012-07-04 00:35:18 +08:00
|
|
|
|
2012-08-06 06:52:54 +08:00
|
|
|
%%----------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
amqp_callback({#'basic.deliver'{ consumer_tag = ConsumerTag,
|
|
|
|
|
delivery_tag = DeliveryTag,
|
|
|
|
|
routing_key = RoutingKey },
|
|
|
|
|
#amqp_msg{ props = #'P_basic'{ headers = Headers },
|
|
|
|
|
payload = Payload }} = Delivery,
|
2012-08-21 00:30:13 +08:00
|
|
|
#proc_state{ channels = {Channel, _},
|
|
|
|
|
awaiting_ack = Awaiting,
|
|
|
|
|
message_id = MsgId } = PState) ->
|
|
|
|
|
case {delivery_dup(Delivery), delivery_qos(ConsumerTag, Headers, PState)} of
|
2012-08-17 22:05:14 +08:00
|
|
|
{true, {?QOS_0, ?QOS_1}} ->
|
2012-08-17 01:09:21 +08:00
|
|
|
amqp_channel:cast(
|
|
|
|
|
Channel, #'basic.ack'{ delivery_tag = DeliveryTag }),
|
2012-08-21 00:30:13 +08:00
|
|
|
{ok, PState};
|
2012-08-17 22:05:14 +08:00
|
|
|
{true, {?QOS_0, ?QOS_0}} ->
|
2012-08-21 00:30:13 +08:00
|
|
|
{ok, PState};
|
2012-08-06 06:52:54 +08:00
|
|
|
{Dup, {DeliveryQos, _SubQos} = Qos} ->
|
|
|
|
|
send_client(
|
|
|
|
|
#mqtt_frame{ fixed = #mqtt_frame_fixed{
|
|
|
|
|
type = ?PUBLISH,
|
|
|
|
|
qos = DeliveryQos,
|
|
|
|
|
dup = Dup },
|
2012-08-21 00:30:13 +08:00
|
|
|
variable = #mqtt_frame_publish{
|
2012-08-06 06:52:54 +08:00
|
|
|
message_id =
|
|
|
|
|
case DeliveryQos of
|
|
|
|
|
?QOS_0 -> undefined;
|
|
|
|
|
?QOS_1 -> MsgId
|
|
|
|
|
end,
|
|
|
|
|
topic_name =
|
2012-08-17 01:01:45 +08:00
|
|
|
rabbit_mqtt_util:amqp2mqtt(
|
2012-08-06 06:52:54 +08:00
|
|
|
RoutingKey) },
|
2012-08-21 00:30:13 +08:00
|
|
|
payload = Payload}, PState),
|
2012-08-06 06:52:54 +08:00
|
|
|
case Qos of
|
|
|
|
|
{?QOS_0, ?QOS_0} ->
|
2012-08-21 00:30:13 +08:00
|
|
|
{ok, PState};
|
2012-08-06 06:52:54 +08:00
|
|
|
{?QOS_1, ?QOS_1} ->
|
2012-08-21 00:30:13 +08:00
|
|
|
{ok,
|
2012-08-06 06:52:54 +08:00
|
|
|
next_msg_id(
|
2012-08-21 00:30:13 +08:00
|
|
|
PState #proc_state{
|
2012-08-06 06:52:54 +08:00
|
|
|
awaiting_ack =
|
2012-08-21 00:30:13 +08:00
|
|
|
gb_trees:insert(MsgId, DeliveryTag, Awaiting)})};
|
2012-08-06 06:52:54 +08:00
|
|
|
{?QOS_0, ?QOS_1} ->
|
|
|
|
|
amqp_channel:cast(
|
2012-08-17 01:09:21 +08:00
|
|
|
Channel, #'basic.ack'{ delivery_tag = DeliveryTag }),
|
2012-08-21 00:30:13 +08:00
|
|
|
{ok, PState}
|
2012-08-06 06:52:54 +08:00
|
|
|
end
|
|
|
|
|
end;
|
|
|
|
|
|
|
|
|
|
amqp_callback(#'basic.ack'{ multiple = true, delivery_tag = Tag } = Ack,
|
2012-08-21 00:30:13 +08:00
|
|
|
PState = #proc_state{ unacked_pubs = UnackedPubs }) ->
|
2012-08-06 06:52:54 +08:00
|
|
|
case gb_trees:take_smallest(UnackedPubs) of
|
|
|
|
|
{TagSmall, MsgId, UnackedPubs1} when TagSmall =< Tag ->
|
|
|
|
|
send_client(
|
|
|
|
|
#mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?PUBACK },
|
|
|
|
|
variable = #mqtt_frame_publish{ message_id = MsgId }},
|
2012-08-21 00:30:13 +08:00
|
|
|
PState),
|
|
|
|
|
amqp_callback(Ack, PState #proc_state{ unacked_pubs = UnackedPubs1 });
|
2012-08-06 06:52:54 +08:00
|
|
|
_ ->
|
2012-08-21 00:30:13 +08:00
|
|
|
{ok, PState}
|
2012-08-06 06:52:54 +08:00
|
|
|
end;
|
|
|
|
|
|
|
|
|
|
amqp_callback(#'basic.ack'{ multiple = false, delivery_tag = Tag },
|
2012-08-21 00:30:13 +08:00
|
|
|
PState = #proc_state{ unacked_pubs = UnackedPubs }) ->
|
2012-08-06 06:52:54 +08:00
|
|
|
send_client(
|
|
|
|
|
#mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?PUBACK },
|
|
|
|
|
variable = #mqtt_frame_publish{
|
|
|
|
|
message_id = gb_trees:get(
|
2012-08-21 00:30:13 +08:00
|
|
|
Tag, UnackedPubs) }}, PState),
|
|
|
|
|
{ok, PState #proc_state{ unacked_pubs = gb_trees:delete(Tag, UnackedPubs) }}.
|
2012-08-06 06:52:54 +08:00
|
|
|
|
|
|
|
|
delivery_dup({#'basic.deliver'{ redelivered = Redelivered },
|
|
|
|
|
#amqp_msg{ props = #'P_basic'{ headers = Headers }}}) ->
|
2012-11-06 04:30:45 +08:00
|
|
|
case rabbit_mqtt_util:table_lookup(Headers, "x-mqtt-dup") of
|
2012-08-06 06:52:54 +08:00
|
|
|
undefined -> Redelivered;
|
|
|
|
|
{bool, Dup} -> Redelivered orelse Dup
|
|
|
|
|
end.
|
|
|
|
|
|
2012-08-21 00:30:13 +08:00
|
|
|
next_msg_id(PState = #proc_state{ message_id = 16#ffff }) ->
|
|
|
|
|
PState #proc_state{ message_id = 1 };
|
|
|
|
|
next_msg_id(PState = #proc_state{ message_id = MsgId }) ->
|
|
|
|
|
PState #proc_state{ message_id = MsgId + 1 }.
|
2012-08-06 06:52:54 +08:00
|
|
|
|
|
|
|
|
%% decide at which qos level to deliver based on subscription
|
2012-11-06 04:30:45 +08:00
|
|
|
%% and the message publish qos level. non-MQTT publishes are
|
|
|
|
|
%% assumed to be qos 1, regardless of delivery_mode.
|
|
|
|
|
delivery_qos(Tag, _Headers, #proc_state{ consumer_tags = {Tag, _} }) ->
|
2012-08-06 06:52:54 +08:00
|
|
|
{?QOS_0, ?QOS_0};
|
2012-11-06 04:30:45 +08:00
|
|
|
delivery_qos(Tag, Headers, #proc_state{ consumer_tags = {_, Tag} }) ->
|
|
|
|
|
case rabbit_mqtt_util:table_lookup(Headers, "x-mqtt-publish-qos") of
|
|
|
|
|
{byte, Qos} -> {lists:min([Qos, ?QOS_1]), ?QOS_1};
|
|
|
|
|
undefined -> {?QOS_1, ?QOS_1}
|
|
|
|
|
end.
|
2012-08-06 06:52:54 +08:00
|
|
|
|
2012-07-16 21:57:31 +08:00
|
|
|
maybe_clean_sess(false, _Conn, _ClientId) ->
|
2012-08-06 06:52:54 +08:00
|
|
|
% todo: establish subscription to deliver old unacknowledged messages
|
2012-07-16 21:57:31 +08:00
|
|
|
ok;
|
|
|
|
|
maybe_clean_sess(true, Conn, ClientId) ->
|
2012-08-06 06:52:54 +08:00
|
|
|
{_, Queue} = rabbit_mqtt_util:subcription_queue_name(ClientId),
|
2012-07-16 21:57:31 +08:00
|
|
|
{ok, Channel} = amqp_connection:open_channel(Conn),
|
2012-08-06 06:52:54 +08:00
|
|
|
try amqp_channel:call(Channel, #'queue.delete'{ queue = Queue }) of
|
|
|
|
|
#'queue.delete_ok'{} -> ok = amqp_channel:close(Channel)
|
2012-07-16 21:57:31 +08:00
|
|
|
catch
|
2012-07-17 05:52:53 +08:00
|
|
|
exit:_Error -> ok
|
2012-07-16 21:57:31 +08:00
|
|
|
end.
|
|
|
|
|
|
2012-08-06 06:52:54 +08:00
|
|
|
%%----------------------------------------------------------------------------
|
|
|
|
|
|
2012-08-21 00:30:13 +08:00
|
|
|
make_will_msg(#mqtt_frame_connect{ will_flag = false }) ->
|
2012-08-06 06:52:54 +08:00
|
|
|
undefined;
|
|
|
|
|
make_will_msg(#mqtt_frame_connect{ will_retain = Retain,
|
|
|
|
|
will_qos = Qos,
|
|
|
|
|
will_topic = Topic,
|
|
|
|
|
will_msg = Msg }) ->
|
|
|
|
|
#mqtt_msg{ retain = Retain,
|
|
|
|
|
qos = Qos,
|
|
|
|
|
topic = Topic,
|
|
|
|
|
dup = false,
|
|
|
|
|
payload = Msg }.
|
2012-07-04 00:35:18 +08:00
|
|
|
|
2012-09-19 22:34:42 +08:00
|
|
|
process_login(UserBin, PassBin, #proc_state{ channels = {undefined, undefined},
|
|
|
|
|
socket = Sock }) ->
|
|
|
|
|
VHost = rabbit_mqtt_util:env(vhost),
|
|
|
|
|
case amqp_connection:start(#amqp_params_direct{
|
|
|
|
|
username = UserBin,
|
|
|
|
|
password = PassBin,
|
|
|
|
|
virtual_host = VHost,
|
|
|
|
|
adapter_info = adapter_info(Sock)}) of
|
|
|
|
|
{ok, Connection} -> {?CONNACK_ACCEPT, Connection};
|
|
|
|
|
{error, auth_failure} -> rabbit_log:error(
|
|
|
|
|
"MQTT login failed for ~p auth_failure~n",
|
|
|
|
|
[binary_to_list(UserBin)]),
|
|
|
|
|
?CONNACK_CREDENTIALS;
|
|
|
|
|
{error, access_refused} -> rabbit_log:warning(
|
|
|
|
|
"MQTT login failed for ~p access_refused "
|
|
|
|
|
"(vhost access not allowed)~n",
|
|
|
|
|
[binary_to_list(UserBin)]),
|
|
|
|
|
?CONNACK_AUTH
|
|
|
|
|
end.
|
2012-07-16 21:57:31 +08:00
|
|
|
|
|
|
|
|
creds(User, Pass) ->
|
|
|
|
|
DefaultUser = rabbit_mqtt_util:env(default_user),
|
|
|
|
|
DefaultPass = rabbit_mqtt_util:env(default_pass),
|
|
|
|
|
Anon = rabbit_mqtt_util:env(allow_anonymous),
|
|
|
|
|
U = case {User =/= undefined, is_binary(DefaultUser), Anon =:= true} of
|
|
|
|
|
{true, _, _ } -> list_to_binary(User);
|
|
|
|
|
{false, true, true} -> DefaultUser;
|
|
|
|
|
_ -> nocreds
|
2012-07-04 00:35:18 +08:00
|
|
|
end,
|
2012-07-16 21:57:31 +08:00
|
|
|
case U of
|
|
|
|
|
nocreds ->
|
|
|
|
|
nocreds;
|
|
|
|
|
_ ->
|
|
|
|
|
case {Pass =/= undefined, is_binary(DefaultPass), Anon =:= true} of
|
2012-09-19 22:34:42 +08:00
|
|
|
{true, _, _ } -> {U, list_to_binary(Pass)};
|
|
|
|
|
{false, true, true} -> {U, DefaultPass};
|
|
|
|
|
_ -> {U, none}
|
2012-07-16 21:57:31 +08:00
|
|
|
end
|
|
|
|
|
end.
|
2012-07-04 00:35:18 +08:00
|
|
|
|
2012-08-06 06:52:54 +08:00
|
|
|
supported_subs_qos(?QOS_0) -> ?QOS_0;
|
|
|
|
|
supported_subs_qos(?QOS_1) -> ?QOS_1;
|
|
|
|
|
supported_subs_qos(?QOS_2) -> ?QOS_1.
|
|
|
|
|
|
|
|
|
|
%% different qos subscriptions are received in different queues
|
|
|
|
|
%% with appropriate durability and timeout arguments
|
|
|
|
|
%% this will lead to duplicate messages for overlapping subscriptions
|
|
|
|
|
%% with different qos values - todo: prevent duplicates
|
2012-08-21 00:30:13 +08:00
|
|
|
ensure_queue(Qos, #proc_state{ channels = {Channel, _},
|
|
|
|
|
client_id = ClientId,
|
|
|
|
|
clean_sess = CleanSess,
|
|
|
|
|
consumer_tags = {TagQ0, TagQ1} = Tags} = PState) ->
|
2012-08-06 06:52:54 +08:00
|
|
|
{QueueQ0, QueueQ1} = rabbit_mqtt_util:subcription_queue_name(ClientId),
|
|
|
|
|
Qos1Args = case {rabbit_mqtt_util:env(subscription_ttl), CleanSess} of
|
|
|
|
|
{undefined, _} -> [];
|
|
|
|
|
{Ms, false} when is_integer(Ms) -> [{"x-expires", long, Ms}];
|
|
|
|
|
_ -> []
|
|
|
|
|
end,
|
|
|
|
|
QueueSetup =
|
|
|
|
|
case {TagQ0, TagQ1, Qos} of
|
|
|
|
|
{undefined, _, ?QOS_0} ->
|
|
|
|
|
{QueueQ0,
|
|
|
|
|
#'queue.declare'{ queue = QueueQ0,
|
|
|
|
|
durable = false,
|
|
|
|
|
auto_delete = true },
|
|
|
|
|
#'basic.consume'{ queue = QueueQ0,
|
|
|
|
|
no_ack = true }};
|
|
|
|
|
{_, undefined, ?QOS_1} ->
|
|
|
|
|
{QueueQ1,
|
|
|
|
|
#'queue.declare'{ queue = QueueQ1,
|
|
|
|
|
durable = true,
|
|
|
|
|
auto_delete = CleanSess,
|
|
|
|
|
arguments = Qos1Args },
|
|
|
|
|
#'basic.consume'{ queue = QueueQ1,
|
|
|
|
|
no_ack = false }};
|
|
|
|
|
{_, _, ?QOS_0} ->
|
|
|
|
|
{exists, QueueQ0};
|
|
|
|
|
{_, _, ?QOS_1} ->
|
|
|
|
|
{exists, QueueQ1}
|
|
|
|
|
end,
|
|
|
|
|
case QueueSetup of
|
|
|
|
|
{Queue, Declare, Consume} ->
|
|
|
|
|
#'queue.declare_ok'{} = amqp_channel:call(Channel, Declare),
|
|
|
|
|
#'basic.consume_ok'{ consumer_tag = Tag } =
|
|
|
|
|
amqp_channel:call(Channel, Consume),
|
2012-08-21 00:30:13 +08:00
|
|
|
{Queue, PState #proc_state{ consumer_tags = setelement(Qos+1, Tags, Tag) }};
|
2012-08-06 06:52:54 +08:00
|
|
|
{exists, Q} ->
|
2012-08-21 00:30:13 +08:00
|
|
|
{Q, PState}
|
2012-08-06 06:52:54 +08:00
|
|
|
end.
|
|
|
|
|
|
2012-08-21 00:30:13 +08:00
|
|
|
send_will(PState = #proc_state{ will_msg = WillMsg }) ->
|
|
|
|
|
amqp_pub(WillMsg, PState).
|
|
|
|
|
|
|
|
|
|
amqp_pub(undefined, PState) ->
|
|
|
|
|
PState;
|
2012-08-06 06:52:54 +08:00
|
|
|
|
|
|
|
|
%% set up a qos1 publishing channel if necessary
|
|
|
|
|
%% this channel will only be used for publishing, not consuming
|
|
|
|
|
amqp_pub(Msg = #mqtt_msg{ qos = ?QOS_1 },
|
2012-08-21 00:30:13 +08:00
|
|
|
PState = #proc_state{ channels = {ChQos0, undefined},
|
|
|
|
|
awaiting_seqno = undefined,
|
|
|
|
|
connection = Conn }) ->
|
2012-08-06 06:52:54 +08:00
|
|
|
{ok, Channel} = amqp_connection:open_channel(Conn),
|
|
|
|
|
#'confirm.select_ok'{} = amqp_channel:call(Channel, #'confirm.select'{}),
|
|
|
|
|
amqp_channel:register_confirm_handler(Channel, self()),
|
2012-08-21 00:30:13 +08:00
|
|
|
amqp_pub(Msg, PState #proc_state{ channels = {ChQos0, Channel},
|
|
|
|
|
awaiting_seqno = 1 });
|
2012-08-06 06:52:54 +08:00
|
|
|
|
2012-08-17 22:05:14 +08:00
|
|
|
amqp_pub(#mqtt_msg{ qos = Qos,
|
2012-08-06 06:52:54 +08:00
|
|
|
topic = Topic,
|
|
|
|
|
dup = Dup,
|
|
|
|
|
message_id = MessageId,
|
|
|
|
|
payload = Payload },
|
2012-08-21 00:30:13 +08:00
|
|
|
PState = #proc_state{ channels = {ChQos0, ChQos1},
|
|
|
|
|
exchange = Exchange,
|
|
|
|
|
unacked_pubs = UnackedPubs,
|
|
|
|
|
awaiting_seqno = SeqNo }) ->
|
2012-08-06 06:52:54 +08:00
|
|
|
Method = #'basic.publish'{ exchange = Exchange,
|
|
|
|
|
routing_key =
|
2012-08-17 01:01:45 +08:00
|
|
|
rabbit_mqtt_util:mqtt2amqp(Topic)},
|
2012-11-06 04:30:45 +08:00
|
|
|
Headers = [{"x-mqtt-publish-qos", byte, Qos}, {"x-mqtt-dup", bool, Dup}],
|
2012-08-06 06:52:54 +08:00
|
|
|
Msg = #amqp_msg{ props = #'P_basic'{ headers = Headers },
|
|
|
|
|
payload = Payload },
|
|
|
|
|
{UnackedPubs1, Ch, SeqNo1} =
|
|
|
|
|
case Qos =:= ?QOS_1 andalso MessageId =/= undefined of
|
|
|
|
|
true -> {gb_trees:enter(SeqNo, MessageId, UnackedPubs), ChQos1,
|
|
|
|
|
SeqNo + 1};
|
|
|
|
|
false -> {UnackedPubs, ChQos0, SeqNo}
|
|
|
|
|
end,
|
|
|
|
|
amqp_channel:cast_flow(Ch, Method, Msg),
|
2012-08-21 00:30:13 +08:00
|
|
|
PState #proc_state{ unacked_pubs = UnackedPubs1,
|
|
|
|
|
awaiting_seqno = SeqNo1 }.
|
2012-08-06 06:52:54 +08:00
|
|
|
|
2012-08-17 00:56:50 +08:00
|
|
|
adapter_info(Sock) ->
|
|
|
|
|
{Addr, Port} = case rabbit_net:sockname(Sock) of
|
|
|
|
|
{ok, Res} -> Res;
|
|
|
|
|
_ -> {unknown, unknown}
|
|
|
|
|
end,
|
|
|
|
|
{PeerAddr, PeerPort} = case rabbit_net:peername(Sock) of
|
|
|
|
|
{ok, Res2} -> Res2;
|
|
|
|
|
_ -> {unknown, unknown}
|
|
|
|
|
end,
|
|
|
|
|
Name = case rabbit_net:connection_string(Sock, inbound) of
|
|
|
|
|
{ok, Res3} -> Res3;
|
|
|
|
|
_ -> unknown
|
|
|
|
|
end,
|
2012-08-17 22:05:14 +08:00
|
|
|
#amqp_adapter_info{ protocol = {'MQTT', {?MQTT_PROTO_MAJOR,
|
|
|
|
|
?MQTT_PROTO_MINOR}},
|
|
|
|
|
name = list_to_binary(Name),
|
|
|
|
|
address = Addr,
|
|
|
|
|
port = Port,
|
|
|
|
|
peer_address = PeerAddr,
|
|
|
|
|
peer_port = PeerPort}.
|
2012-08-17 00:56:50 +08:00
|
|
|
|
2012-08-21 00:30:13 +08:00
|
|
|
send_client(Frame, #proc_state{ socket = Sock }) ->
|
2012-08-06 06:52:54 +08:00
|
|
|
%rabbit_log:info("MQTT sending frame ~p ~n", [Frame]),
|
|
|
|
|
rabbit_net:port_command(Sock, rabbit_mqtt_frame:serialise(Frame)).
|
2012-08-21 00:30:13 +08:00
|
|
|
|
|
|
|
|
close_connection(PState = #proc_state{ connection = undefined }) ->
|
|
|
|
|
PState;
|
|
|
|
|
close_connection(PState = #proc_state{ connection = Connection }) ->
|
|
|
|
|
%% ignore noproc or other exceptions to avoid debris
|
|
|
|
|
catch amqp_connection:close(Connection),
|
|
|
|
|
PState #proc_state{ channels = {undefined, undefined},
|
|
|
|
|
connection = undefined }.
|
|
|
|
|
|