From 1f1cb7ab9bfe9a9b336baaaec21d3e50d504886a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Thu, 7 Jan 2016 16:19:01 +0100 Subject: [PATCH] Make send method configurable and export useful functions Needed for the new Web-MQTT plugin. --- deps/rabbitmq_mqtt/include/rabbit_mqtt.hrl | 3 +- .../src/rabbit_mqtt_processor.erl | 75 +++++++++++-------- deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl | 2 + 3 files changed, 47 insertions(+), 33 deletions(-) diff --git a/deps/rabbitmq_mqtt/include/rabbit_mqtt.hrl b/deps/rabbitmq_mqtt/include/rabbit_mqtt.hrl index 51a637858f..3bfedd3b61 100644 --- a/deps/rabbitmq_mqtt/include/rabbit_mqtt.hrl +++ b/deps/rabbitmq_mqtt/include/rabbit_mqtt.hrl @@ -45,7 +45,8 @@ %% Retained messages handler. See rabbit_mqtt_retainer_sup %% and rabbit_mqtt_retainer. retainer_pid, - auth_state}). + auth_state, + send_fun}). -record(auth_state, {username, user, diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl index c158196d91..9f312fa81e 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl @@ -16,7 +16,7 @@ -module(rabbit_mqtt_processor). --export([info/2, initial_state/2, +-export([info/2, initial_state/2, initial_state/3, process_frame/2, amqp_pub/2, amqp_callback/2, send_will/1, close_connection/1]). @@ -32,15 +32,19 @@ Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }}). initial_state(Socket,SSLLoginName) -> - #proc_state{ unacked_pubs = gb_trees:empty(), - awaiting_ack = gb_trees:empty(), - message_id = 1, - subscriptions = dict:new(), - consumer_tags = {undefined, undefined}, - channels = {undefined, undefined}, - exchange = rabbit_mqtt_util:env(exchange), - socket = Socket, - ssl_login_name = SSLLoginName }. + initial_state(Socket, SSLLoginName, fun send_client/2). + +initial_state(Socket,SSLLoginName,SendFun) -> + #proc_state{ unacked_pubs = gb_trees:empty(), + awaiting_ack = gb_trees:empty(), + message_id = 1, + subscriptions = dict:new(), + consumer_tags = {undefined, undefined}, + channels = {undefined, undefined}, + exchange = rabbit_mqtt_util:env(exchange), + socket = Socket, + ssl_login_name = SSLLoginName, + send_fun = SendFun }. info(client_id, #proc_state{ client_id = ClientId }) -> ClientId. @@ -60,7 +64,8 @@ process_request(?CONNECT, clean_sess = CleanSess, client_id = ClientId0, keep_alive = Keepalive} = Var}, - PState = #proc_state{ ssl_login_name = SSLLoginName }) -> + PState = #proc_state{ ssl_login_name = SSLLoginName, + send_fun = SendFun }) -> ClientId = case ClientId0 of [] -> rabbit_mqtt_util:gen_client_id(); [_|_] -> ClientId0 @@ -106,9 +111,9 @@ process_request(?CONNECT, end end end, - send_client(#mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?CONNACK}, - variable = #mqtt_frame_connack{ - return_code = ReturnCode }}, PState1), + SendFun(#mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?CONNACK}, + variable = #mqtt_frame_connack{ + return_code = ReturnCode }}, PState1), {ok, PState1}; process_request(?PUBACK, @@ -162,7 +167,8 @@ process_request(?SUBSCRIBE, payload = undefined}, #proc_state{channels = {Channel, _}, exchange = Exchange, - retainer_pid = RPid} = PState0) -> + retainer_pid = RPid, + send_fun = SendFun } = PState0) -> check_subscribe_or_die(Topics, fun() -> {QosResponse, PState1} = lists:foldl(fun (#mqtt_topic{name = TopicName, @@ -180,10 +186,10 @@ process_request(?SUBSCRIBE, PState1 #proc_state{subscriptions = dict:append(TopicName, SupportedQos, Subs)}} end, {[], PState0}, Topics), - send_client(#mqtt_frame{fixed = #mqtt_frame_fixed{type = ?SUBACK}, - variable = #mqtt_frame_suback{ - message_id = MessageId, - qos_table = QosResponse}}, PState1), + SendFun(#mqtt_frame{fixed = #mqtt_frame_fixed{type = ?SUBACK}, + variable = #mqtt_frame_suback{ + message_id = MessageId, + qos_table = QosResponse}}, PState1), %% we may need to send up to length(Topics) messages. %% if QoS is > 0 then we need to generate a message id, %% and increment the counter. @@ -203,7 +209,8 @@ process_request(?UNSUBSCRIBE, payload = undefined }, #proc_state{ channels = {Channel, _}, exchange = Exchange, client_id = ClientId, - subscriptions = Subs0} = PState) -> + subscriptions = Subs0, + send_fun = SendFun } = PState) -> Queues = rabbit_mqtt_util:subcription_queue_name(ClientId), Subs1 = lists:foldl( @@ -224,13 +231,13 @@ process_request(?UNSUBSCRIBE, end, QosSubs), dict:erase(TopicName, Subs) end, Subs0, Topics), - send_client(#mqtt_frame{ fixed = #mqtt_frame_fixed { type = ?UNSUBACK }, - variable = #mqtt_frame_suback{ message_id = MessageId }}, + SendFun(#mqtt_frame{ fixed = #mqtt_frame_fixed { type = ?UNSUBACK }, + variable = #mqtt_frame_suback{ message_id = MessageId }}, PState), {ok, PState #proc_state{ subscriptions = Subs1 }}; -process_request(?PINGREQ, #mqtt_frame{}, PState) -> - send_client(#mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?PINGRESP }}, +process_request(?PINGREQ, #mqtt_frame{}, #proc_state{ send_fun = SendFun } = PState) -> + SendFun(#mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?PINGRESP }}, PState), {ok, PState}; @@ -246,7 +253,8 @@ hand_off_to_retainer(RetainerPid, Topic, Msg) -> rabbit_mqtt_retainer:retain(RetainerPid, Topic, Msg), ok. -maybe_send_retained_message(RPid, #mqtt_topic{name = S, qos = SubscribeQos}, MsgId, PState) -> +maybe_send_retained_message(RPid, #mqtt_topic{name = S, qos = SubscribeQos}, MsgId, + #proc_state{ send_fun = SendFun } = PState) -> case rabbit_mqtt_retainer:fetch(RPid, S) of undefined -> false; Msg -> @@ -258,7 +266,7 @@ maybe_send_retained_message(RPid, #mqtt_topic{name = S, qos = SubscribeQos}, Msg ?QOS_0 -> undefined; ?QOS_1 -> MsgId end, - send_client(#mqtt_frame{fixed = #mqtt_frame_fixed{ + SendFun(#mqtt_frame{fixed = #mqtt_frame_fixed{ type = ?PUBLISH, qos = Qos, dup = false, @@ -282,7 +290,8 @@ amqp_callback({#'basic.deliver'{ consumer_tag = ConsumerTag, DeliveryCtx} = Delivery, #proc_state{ channels = {Channel, _}, awaiting_ack = Awaiting, - message_id = MsgId } = PState) -> + message_id = MsgId, + send_fun = SendFun } = PState) -> amqp_channel:notify_received(DeliveryCtx), case {delivery_dup(Delivery), delivery_qos(ConsumerTag, Headers, PState)} of {true, {?QOS_0, ?QOS_1}} -> @@ -292,7 +301,7 @@ amqp_callback({#'basic.deliver'{ consumer_tag = ConsumerTag, {true, {?QOS_0, ?QOS_0}} -> {ok, PState}; {Dup, {DeliveryQos, _SubQos} = Qos} -> - send_client( + SendFun( #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?PUBLISH, qos = DeliveryQos, @@ -324,11 +333,12 @@ amqp_callback({#'basic.deliver'{ consumer_tag = ConsumerTag, end; amqp_callback(#'basic.ack'{ multiple = true, delivery_tag = Tag } = Ack, - PState = #proc_state{ unacked_pubs = UnackedPubs }) -> + PState = #proc_state{ unacked_pubs = UnackedPubs, + send_fun = SendFun }) -> case gb_trees:size(UnackedPubs) > 0 andalso gb_trees:take_smallest(UnackedPubs) of {TagSmall, MsgId, UnackedPubs1} when TagSmall =< Tag -> - send_client( + SendFun( #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?PUBACK }, variable = #mqtt_frame_publish{ message_id = MsgId }}, PState), @@ -338,8 +348,9 @@ amqp_callback(#'basic.ack'{ multiple = true, delivery_tag = Tag } = Ack, end; amqp_callback(#'basic.ack'{ multiple = false, delivery_tag = Tag }, - PState = #proc_state{ unacked_pubs = UnackedPubs }) -> - send_client( + PState = #proc_state{ unacked_pubs = UnackedPubs, + send_fun = SendFun }) -> + SendFun( #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?PUBACK }, variable = #mqtt_frame_publish{ message_id = gb_trees:get( diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl index 9a2ef86d10..eca6513328 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl @@ -23,6 +23,8 @@ -export([conserve_resources/3, start_keepalive/2]). +-export([ssl_login_name/1]). + -include_lib("amqp_client/include/amqp_client.hrl"). -include("rabbit_mqtt.hrl").