Make send method configurable and export useful functions

Needed for the new Web-MQTT plugin.
This commit is contained in:
Loïc Hoguin 2016-01-07 16:19:01 +01:00 committed by Michael Klishin
parent 9ba1da7434
commit 1f1cb7ab9b
3 changed files with 47 additions and 33 deletions

View File

@ -45,7 +45,8 @@
%% Retained messages handler. See rabbit_mqtt_retainer_sup %% Retained messages handler. See rabbit_mqtt_retainer_sup
%% and rabbit_mqtt_retainer. %% and rabbit_mqtt_retainer.
retainer_pid, retainer_pid,
auth_state}). auth_state,
send_fun}).
-record(auth_state, {username, -record(auth_state, {username,
user, user,

View File

@ -16,7 +16,7 @@
-module(rabbit_mqtt_processor). -module(rabbit_mqtt_processor).
-export([info/2, initial_state/2, -export([info/2, initial_state/2, initial_state/3,
process_frame/2, amqp_pub/2, amqp_callback/2, send_will/1, process_frame/2, amqp_pub/2, amqp_callback/2, send_will/1,
close_connection/1]). close_connection/1]).
@ -32,15 +32,19 @@
Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }}). Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }}).
initial_state(Socket,SSLLoginName) -> initial_state(Socket,SSLLoginName) ->
#proc_state{ unacked_pubs = gb_trees:empty(), initial_state(Socket, SSLLoginName, fun send_client/2).
awaiting_ack = gb_trees:empty(),
message_id = 1, initial_state(Socket,SSLLoginName,SendFun) ->
subscriptions = dict:new(), #proc_state{ unacked_pubs = gb_trees:empty(),
consumer_tags = {undefined, undefined}, awaiting_ack = gb_trees:empty(),
channels = {undefined, undefined}, message_id = 1,
exchange = rabbit_mqtt_util:env(exchange), subscriptions = dict:new(),
socket = Socket, consumer_tags = {undefined, undefined},
ssl_login_name = SSLLoginName }. channels = {undefined, undefined},
exchange = rabbit_mqtt_util:env(exchange),
socket = Socket,
ssl_login_name = SSLLoginName,
send_fun = SendFun }.
info(client_id, #proc_state{ client_id = ClientId }) -> ClientId. info(client_id, #proc_state{ client_id = ClientId }) -> ClientId.
@ -60,7 +64,8 @@ process_request(?CONNECT,
clean_sess = CleanSess, clean_sess = CleanSess,
client_id = ClientId0, client_id = ClientId0,
keep_alive = Keepalive} = Var}, keep_alive = Keepalive} = Var},
PState = #proc_state{ ssl_login_name = SSLLoginName }) -> PState = #proc_state{ ssl_login_name = SSLLoginName,
send_fun = SendFun }) ->
ClientId = case ClientId0 of ClientId = case ClientId0 of
[] -> rabbit_mqtt_util:gen_client_id(); [] -> rabbit_mqtt_util:gen_client_id();
[_|_] -> ClientId0 [_|_] -> ClientId0
@ -106,9 +111,9 @@ process_request(?CONNECT,
end end
end end
end, end,
send_client(#mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?CONNACK}, SendFun(#mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?CONNACK},
variable = #mqtt_frame_connack{ variable = #mqtt_frame_connack{
return_code = ReturnCode }}, PState1), return_code = ReturnCode }}, PState1),
{ok, PState1}; {ok, PState1};
process_request(?PUBACK, process_request(?PUBACK,
@ -162,7 +167,8 @@ process_request(?SUBSCRIBE,
payload = undefined}, payload = undefined},
#proc_state{channels = {Channel, _}, #proc_state{channels = {Channel, _},
exchange = Exchange, exchange = Exchange,
retainer_pid = RPid} = PState0) -> retainer_pid = RPid,
send_fun = SendFun } = PState0) ->
check_subscribe_or_die(Topics, fun() -> check_subscribe_or_die(Topics, fun() ->
{QosResponse, PState1} = {QosResponse, PState1} =
lists:foldl(fun (#mqtt_topic{name = TopicName, lists:foldl(fun (#mqtt_topic{name = TopicName,
@ -180,10 +186,10 @@ process_request(?SUBSCRIBE,
PState1 #proc_state{subscriptions = PState1 #proc_state{subscriptions =
dict:append(TopicName, SupportedQos, Subs)}} dict:append(TopicName, SupportedQos, Subs)}}
end, {[], PState0}, Topics), end, {[], PState0}, Topics),
send_client(#mqtt_frame{fixed = #mqtt_frame_fixed{type = ?SUBACK}, SendFun(#mqtt_frame{fixed = #mqtt_frame_fixed{type = ?SUBACK},
variable = #mqtt_frame_suback{ variable = #mqtt_frame_suback{
message_id = MessageId, message_id = MessageId,
qos_table = QosResponse}}, PState1), qos_table = QosResponse}}, PState1),
%% we may need to send up to length(Topics) messages. %% we may need to send up to length(Topics) messages.
%% if QoS is > 0 then we need to generate a message id, %% if QoS is > 0 then we need to generate a message id,
%% and increment the counter. %% and increment the counter.
@ -203,7 +209,8 @@ process_request(?UNSUBSCRIBE,
payload = undefined }, #proc_state{ channels = {Channel, _}, payload = undefined }, #proc_state{ channels = {Channel, _},
exchange = Exchange, exchange = Exchange,
client_id = ClientId, client_id = ClientId,
subscriptions = Subs0} = PState) -> subscriptions = Subs0,
send_fun = SendFun } = PState) ->
Queues = rabbit_mqtt_util:subcription_queue_name(ClientId), Queues = rabbit_mqtt_util:subcription_queue_name(ClientId),
Subs1 = Subs1 =
lists:foldl( lists:foldl(
@ -224,13 +231,13 @@ process_request(?UNSUBSCRIBE,
end, QosSubs), end, QosSubs),
dict:erase(TopicName, Subs) dict:erase(TopicName, Subs)
end, Subs0, Topics), end, Subs0, Topics),
send_client(#mqtt_frame{ fixed = #mqtt_frame_fixed { type = ?UNSUBACK }, SendFun(#mqtt_frame{ fixed = #mqtt_frame_fixed { type = ?UNSUBACK },
variable = #mqtt_frame_suback{ message_id = MessageId }}, variable = #mqtt_frame_suback{ message_id = MessageId }},
PState), PState),
{ok, PState #proc_state{ subscriptions = Subs1 }}; {ok, PState #proc_state{ subscriptions = Subs1 }};
process_request(?PINGREQ, #mqtt_frame{}, PState) -> process_request(?PINGREQ, #mqtt_frame{}, #proc_state{ send_fun = SendFun } = PState) ->
send_client(#mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?PINGRESP }}, SendFun(#mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?PINGRESP }},
PState), PState),
{ok, PState}; {ok, PState};
@ -246,7 +253,8 @@ hand_off_to_retainer(RetainerPid, Topic, Msg) ->
rabbit_mqtt_retainer:retain(RetainerPid, Topic, Msg), rabbit_mqtt_retainer:retain(RetainerPid, Topic, Msg),
ok. ok.
maybe_send_retained_message(RPid, #mqtt_topic{name = S, qos = SubscribeQos}, MsgId, PState) -> maybe_send_retained_message(RPid, #mqtt_topic{name = S, qos = SubscribeQos}, MsgId,
#proc_state{ send_fun = SendFun } = PState) ->
case rabbit_mqtt_retainer:fetch(RPid, S) of case rabbit_mqtt_retainer:fetch(RPid, S) of
undefined -> false; undefined -> false;
Msg -> Msg ->
@ -258,7 +266,7 @@ maybe_send_retained_message(RPid, #mqtt_topic{name = S, qos = SubscribeQos}, Msg
?QOS_0 -> undefined; ?QOS_0 -> undefined;
?QOS_1 -> MsgId ?QOS_1 -> MsgId
end, end,
send_client(#mqtt_frame{fixed = #mqtt_frame_fixed{ SendFun(#mqtt_frame{fixed = #mqtt_frame_fixed{
type = ?PUBLISH, type = ?PUBLISH,
qos = Qos, qos = Qos,
dup = false, dup = false,
@ -282,7 +290,8 @@ amqp_callback({#'basic.deliver'{ consumer_tag = ConsumerTag,
DeliveryCtx} = Delivery, DeliveryCtx} = Delivery,
#proc_state{ channels = {Channel, _}, #proc_state{ channels = {Channel, _},
awaiting_ack = Awaiting, awaiting_ack = Awaiting,
message_id = MsgId } = PState) -> message_id = MsgId,
send_fun = SendFun } = PState) ->
amqp_channel:notify_received(DeliveryCtx), amqp_channel:notify_received(DeliveryCtx),
case {delivery_dup(Delivery), delivery_qos(ConsumerTag, Headers, PState)} of case {delivery_dup(Delivery), delivery_qos(ConsumerTag, Headers, PState)} of
{true, {?QOS_0, ?QOS_1}} -> {true, {?QOS_0, ?QOS_1}} ->
@ -292,7 +301,7 @@ amqp_callback({#'basic.deliver'{ consumer_tag = ConsumerTag,
{true, {?QOS_0, ?QOS_0}} -> {true, {?QOS_0, ?QOS_0}} ->
{ok, PState}; {ok, PState};
{Dup, {DeliveryQos, _SubQos} = Qos} -> {Dup, {DeliveryQos, _SubQos} = Qos} ->
send_client( SendFun(
#mqtt_frame{ fixed = #mqtt_frame_fixed{ #mqtt_frame{ fixed = #mqtt_frame_fixed{
type = ?PUBLISH, type = ?PUBLISH,
qos = DeliveryQos, qos = DeliveryQos,
@ -324,11 +333,12 @@ amqp_callback({#'basic.deliver'{ consumer_tag = ConsumerTag,
end; end;
amqp_callback(#'basic.ack'{ multiple = true, delivery_tag = Tag } = Ack, amqp_callback(#'basic.ack'{ multiple = true, delivery_tag = Tag } = Ack,
PState = #proc_state{ unacked_pubs = UnackedPubs }) -> PState = #proc_state{ unacked_pubs = UnackedPubs,
send_fun = SendFun }) ->
case gb_trees:size(UnackedPubs) > 0 andalso case gb_trees:size(UnackedPubs) > 0 andalso
gb_trees:take_smallest(UnackedPubs) of gb_trees:take_smallest(UnackedPubs) of
{TagSmall, MsgId, UnackedPubs1} when TagSmall =< Tag -> {TagSmall, MsgId, UnackedPubs1} when TagSmall =< Tag ->
send_client( SendFun(
#mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?PUBACK }, #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?PUBACK },
variable = #mqtt_frame_publish{ message_id = MsgId }}, variable = #mqtt_frame_publish{ message_id = MsgId }},
PState), PState),
@ -338,8 +348,9 @@ amqp_callback(#'basic.ack'{ multiple = true, delivery_tag = Tag } = Ack,
end; end;
amqp_callback(#'basic.ack'{ multiple = false, delivery_tag = Tag }, amqp_callback(#'basic.ack'{ multiple = false, delivery_tag = Tag },
PState = #proc_state{ unacked_pubs = UnackedPubs }) -> PState = #proc_state{ unacked_pubs = UnackedPubs,
send_client( send_fun = SendFun }) ->
SendFun(
#mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?PUBACK }, #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?PUBACK },
variable = #mqtt_frame_publish{ variable = #mqtt_frame_publish{
message_id = gb_trees:get( message_id = gb_trees:get(

View File

@ -23,6 +23,8 @@
-export([conserve_resources/3, start_keepalive/2]). -export([conserve_resources/3, start_keepalive/2]).
-export([ssl_login_name/1]).
-include_lib("amqp_client/include/amqp_client.hrl"). -include_lib("amqp_client/include/amqp_client.hrl").
-include("rabbit_mqtt.hrl"). -include("rabbit_mqtt.hrl").