diff --git a/deps/rabbitmq_mqtt/include/rabbit_mqtt_frame.hrl b/deps/rabbitmq_mqtt/include/rabbit_mqtt_frame.hrl index 87f24d5d77..123029c094 100644 --- a/deps/rabbitmq_mqtt/include/rabbit_mqtt_frame.hrl +++ b/deps/rabbitmq_mqtt/include/rabbit_mqtt_frame.hrl @@ -48,6 +48,22 @@ -define(QOS_1, 1). -define(QOS_2, 2). +-ifdef(use_specs). + +%% TODO +-type(message_id :: any()). + +-type(mqtt_msg() :: #mqtt_msg { + retain :: boolean(), + qos :: QOS_0 | QOS_1 | QOS_2, + topic :: string(), + dup :: boolean(), + message_id :: message_id(), + payload :: binary() +}). + +-endif. + -record(mqtt_frame, {fixed, variable, payload}). diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl index 6d2218f5b1..e56ba70401 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl @@ -77,7 +77,11 @@ process_request(?CONNECT, {UserBin, PassBin} -> case process_login(UserBin, PassBin, ProtoVersion, PState) of {?CONNACK_ACCEPT, Conn, VHost} -> - {ok, RetainerPid} = start_retainer(VHost), + RetainerPid = + case start_retainer(VHost) of + {ok, Pid} -> Pid; + {error, {already_started, Pid}} -> Pid + end, link(Conn), {ok, Ch} = amqp_connection:open_channel(Conn), link(Ch), @@ -136,21 +140,26 @@ process_request(?PUBLISH, message_id = MessageId, payload = Payload}, Result = amqp_pub(Msg, PState), - rabbit_mqtt_retainer:retain(RPid, Topic, Msg), + case Retain of + false -> ok; + true -> hand_off_to_retainer(RPid, Topic, Msg) + end, {ok, Result}; process_request(?SUBSCRIBE, #mqtt_frame{ - variable = #mqtt_frame_subscribe{ message_id = MessageId, - topic_table = Topics }, - payload = undefined }, - #proc_state{ channels = {Channel, _}, - exchange = Exchange} = PState0) -> + variable = #mqtt_frame_subscribe{ + message_id = MessageId, + topic_table = Topics}, + payload = undefined}, + #proc_state{channels = {Channel, _}, + exchange = Exchange, + retainer_pid = RPid} = PState0) -> {QosResponse, PState1} = - lists:foldl(fun (#mqtt_topic{ name = TopicName, - qos = Qos }, {QosList, PState}) -> + lists:foldl(fun (#mqtt_topic{name = TopicName, + qos = Qos}, {QosList, PState}) -> SupportedQos = supported_subs_qos(Qos), - {Queue, #proc_state{ subscriptions = Subs } = PState1} = + {Queue, #proc_state{subscriptions = Subs} = PState1} = ensure_queue(SupportedQos, PState), Binding = #'queue.bind'{ queue = Queue, @@ -159,15 +168,23 @@ process_request(?SUBSCRIBE, TopicName)}, #'queue.bind_ok'{} = amqp_channel:call(Channel, Binding), {[SupportedQos | QosList], - PState1 #proc_state{ subscriptions = - dict:append(TopicName, SupportedQos, Subs) }} + PState1 #proc_state{subscriptions = + dict:append(TopicName, SupportedQos, Subs)}} end, {[], PState0}, Topics), - send_client(#mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?SUBACK }, - variable = #mqtt_frame_suback{ - message_id = MessageId, - qos_table = QosResponse }}, PState1), - - {ok, PState1}; + send_client(#mqtt_frame{fixed = #mqtt_frame_fixed{type = ?SUBACK}, + variable = #mqtt_frame_suback{ + message_id = MessageId, + qos_table = QosResponse}}, PState1), + %% we may need to send up to length(Topics) messages. + %% if QoS is > 0 then we need to generate a message id, + %% and increment the counter. + N = lists:foldl(fun (Topic, Acc) -> + case maybe_send_retained_message(RPid, Topic, Acc, PState1) of + {true, X} -> Acc + X; + false -> Acc + end + end, MessageId, Topics), + {ok, PState1#proc_state{message_id = N}}; process_request(?UNSUBSCRIBE, #mqtt_frame{ @@ -216,6 +233,36 @@ start_retainer(VHost) when is_binary(VHost) -> Mod = rabbit_mqtt_retainer:store_module(), rabbit_mqtt_retainer_sup:start_child(Mod, VHost). +hand_off_to_retainer(RetainerPid, Topic, #mqtt_msg{payload = <<"">>}) -> + rabbit_mqtt_retainer:clear(RetainerPid, Topic), + ok; +hand_off_to_retainer(RetainerPid, Topic, Msg) -> + rabbit_mqtt_retainer:retain(RetainerPid, Topic, Msg), + ok. + +maybe_send_retained_message(RPid, #mqtt_topic{name = S, qos = Qos}, MsgId, PState) -> + Id = case Qos of + ?QOS_0 -> undefined; + ?QOS_1 -> MsgId + end, + case rabbit_mqtt_retainer:fetch(RPid, S) of + undefined -> false; + Msg -> send_client(#mqtt_frame{fixed = #mqtt_frame_fixed{ + type = ?PUBLISH, + qos = Qos, + dup = false, + retain = Msg#mqtt_msg.retain + }, variable = #mqtt_frame_publish{ + message_id = Id, + topic_name = S + }, + payload = Msg#mqtt_msg.payload}, PState), + case Qos of + ?QOS_0 -> false; + ?QOS_1 -> {true, 1} + end + end. + amqp_callback({#'basic.deliver'{ consumer_tag = ConsumerTag, delivery_tag = DeliveryTag, routing_key = RoutingKey }, diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store_ets.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store_ets.erl index 31d53f5083..d53f63334e 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store_ets.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store_ets.erl @@ -20,6 +20,7 @@ -include("rabbit_mqtt.hrl"). -export([new/2, recover/2, insert/3, lookup/2, delete/2, terminate/1]). +-export([path_for/2]). -record(store_state, { %% ETS table ID @@ -39,14 +40,13 @@ new(Dir, VHost) -> recover(Dir, VHost) -> Path = path_for(Dir, VHost), case ets:file2tab(Path) of - {ok, Tid} -> file:delete(Path), - {ok, #store_state{table = Tid, filename = Path}}; - Error -> Error + {ok, Tid} -> file:delete(Path), + {ok, #store_state{table = Tid, filename = Path}}; + {error, _} -> {error, uninitialized} end. insert(Topic, Msg, #store_state{table = T}) -> - true = ets:insert_new(T, #retained_message{topic = Topic, - mqtt_msg = Msg}), + true = ets:insert(T, #retained_message{topic = Topic, mqtt_msg = Msg}), ok. lookup(Topic, #store_state{table = T}) -> @@ -69,4 +69,4 @@ path_for(Dir, VHost) -> rabbit_mqtt_util:vhost_name_to_dir_name(VHost)). table_name_for(VHost) -> - "mqtt_retained" ++ rabbit_mqtt_util:vhost_name_to_dir_name(VHost). + list_to_atom(rabbit_mqtt_util:vhost_name_to_dir_name(VHost)). diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_retainer.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_retainer.erl index e99b566dd4..5ea8507bbf 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_retainer.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_retainer.erl @@ -18,6 +18,7 @@ -behaviour(gen_server2). -include("rabbit_mqtt.hrl"). +-include("rabbit_mqtt_frame.hrl"). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, start_link/2]). @@ -27,15 +28,27 @@ -define(SERVER, ?MODULE). -record(retainer_state, {store_mod, - store}). + store}). + +-ifdef(use_specs). + +-spec(retain/3 :: (pid(), string(), mqtt_msg()) -> + {noreply, NewState :: term()} | + {noreply, NewState :: term(), timeout() | hibernate} | + {stop, Reason :: term(), NewState :: term()}). + +-endif. %%---------------------------------------------------------------------------- start_link(RetainStoreMod, VHost) -> gen_server2:start_link(?MODULE, [RetainStoreMod, VHost], []). -retain(Pid, Topic, Msg) -> - gen_server2:cast(Pid, {retain, Topic, Msg}). +retain(Pid, Topic, Msg = #mqtt_msg{retain = true}) -> + gen_server2:cast(Pid, {retain, Topic, Msg}); + +retain(_Pid, _Topic, Msg = #mqtt_msg{retain = false}) -> + throw({error, {retain_is_false, Msg}}). fetch(Pid, Topic) -> gen_server2:call(Pid, {fetch, Topic}). @@ -45,9 +58,14 @@ clear(Pid, Topic) -> %%---------------------------------------------------------------------------- -init([RetainStoreMod, VHost]) -> - {ok, #retainer_state{store = RetainStoreMod:new(store_dir(), VHost), - store_mod = RetainStoreMod}}. +init([StoreMod, VHost]) -> + State = case StoreMod:recover(store_dir(), VHost) of + {ok, Store} -> #retainer_state{store = Store, + store_mod = StoreMod}; + {error, _} -> #retainer_state{store = StoreMod:new(store_dir(), VHost), + store_mod = StoreMod} + end, + {ok, State}. store_module() -> case application:get_env(rabbitmq_mqtt, retained_message_store) of @@ -68,8 +86,11 @@ handle_cast({clear, Topic}, handle_call({fetch, Topic}, _From, State = #retainer_state{store = Store, store_mod = Mod}) -> - #retained_message{mqtt_msg = Msg} = Mod:lookup(Topic, Store), - {reply, Msg, State}. + Reply = case Mod:lookup(Topic, Store) of + #retained_message{mqtt_msg = Msg} -> Msg; + not_found -> undefined + end, + {reply, Reply, State}. handle_info(stop, State) -> {stop, normal, State}; @@ -80,8 +101,8 @@ handle_info(Info, State) -> store_dir() -> rabbit_mnesia:dir(). -terminate(_Reason, _State) -> - %% TODO: notify the store +terminate(_Reason, #retainer_state{store = Store, store_mod = Mod}) -> + Mod:terminate(Store), ok. code_change(_OldVsn, State, _Extra) -> diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_retainer_sup.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_retainer_sup.erl index 4e41b38362..17724aa13c 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_retainer_sup.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_retainer_sup.erl @@ -29,10 +29,10 @@ start_link(SupName) -> supervisor2:start_link(SupName, ?MODULE, []). start_child(RetainStoreMod, VHost) -> - supervisor2:start_child({local, ?MODULE}, - {ok, {binary_to_atom(VHost, ?ENCODING), + supervisor2:start_child(?MODULE, + {binary_to_atom(VHost, ?ENCODING), {rabbit_mqtt_retainer, start_link, [RetainStoreMod, VHost]}, - {one_for_one, 5, 5}}}). + intrinsic, 10, worker, [rabbit_mqtt_retainer]}). init([]) -> rabbit_log:info("MQTT retained message store: ~p~n",