put/get/delete operations for retained message store

Yet to be done: dump and restore to disk on shutdown
and boot.
This commit is contained in:
Michael Klishin 2015-04-21 14:26:46 +03:00
parent 1c935cfb21
commit 0b6e7d5714
5 changed files with 121 additions and 37 deletions

View File

@ -48,6 +48,22 @@
-define(QOS_1, 1).
-define(QOS_2, 2).
-ifdef(use_specs).
%% TODO
-type(message_id :: any()).
-type(mqtt_msg() :: #mqtt_msg {
retain :: boolean(),
qos :: QOS_0 | QOS_1 | QOS_2,
topic :: string(),
dup :: boolean(),
message_id :: message_id(),
payload :: binary()
}).
-endif.
-record(mqtt_frame, {fixed,
variable,
payload}).

View File

@ -77,7 +77,11 @@ process_request(?CONNECT,
{UserBin, PassBin} ->
case process_login(UserBin, PassBin, ProtoVersion, PState) of
{?CONNACK_ACCEPT, Conn, VHost} ->
{ok, RetainerPid} = start_retainer(VHost),
RetainerPid =
case start_retainer(VHost) of
{ok, Pid} -> Pid;
{error, {already_started, Pid}} -> Pid
end,
link(Conn),
{ok, Ch} = amqp_connection:open_channel(Conn),
link(Ch),
@ -136,21 +140,26 @@ process_request(?PUBLISH,
message_id = MessageId,
payload = Payload},
Result = amqp_pub(Msg, PState),
rabbit_mqtt_retainer:retain(RPid, Topic, Msg),
case Retain of
false -> ok;
true -> hand_off_to_retainer(RPid, Topic, Msg)
end,
{ok, Result};
process_request(?SUBSCRIBE,
#mqtt_frame{
variable = #mqtt_frame_subscribe{ message_id = MessageId,
topic_table = Topics },
payload = undefined },
#proc_state{ channels = {Channel, _},
exchange = Exchange} = PState0) ->
variable = #mqtt_frame_subscribe{
message_id = MessageId,
topic_table = Topics},
payload = undefined},
#proc_state{channels = {Channel, _},
exchange = Exchange,
retainer_pid = RPid} = PState0) ->
{QosResponse, PState1} =
lists:foldl(fun (#mqtt_topic{ name = TopicName,
qos = Qos }, {QosList, PState}) ->
lists:foldl(fun (#mqtt_topic{name = TopicName,
qos = Qos}, {QosList, PState}) ->
SupportedQos = supported_subs_qos(Qos),
{Queue, #proc_state{ subscriptions = Subs } = PState1} =
{Queue, #proc_state{subscriptions = Subs} = PState1} =
ensure_queue(SupportedQos, PState),
Binding = #'queue.bind'{
queue = Queue,
@ -159,15 +168,23 @@ process_request(?SUBSCRIBE,
TopicName)},
#'queue.bind_ok'{} = amqp_channel:call(Channel, Binding),
{[SupportedQos | QosList],
PState1 #proc_state{ subscriptions =
dict:append(TopicName, SupportedQos, Subs) }}
PState1 #proc_state{subscriptions =
dict:append(TopicName, SupportedQos, Subs)}}
end, {[], PState0}, Topics),
send_client(#mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?SUBACK },
variable = #mqtt_frame_suback{
message_id = MessageId,
qos_table = QosResponse }}, PState1),
{ok, PState1};
send_client(#mqtt_frame{fixed = #mqtt_frame_fixed{type = ?SUBACK},
variable = #mqtt_frame_suback{
message_id = MessageId,
qos_table = QosResponse}}, PState1),
%% we may need to send up to length(Topics) messages.
%% if QoS is > 0 then we need to generate a message id,
%% and increment the counter.
N = lists:foldl(fun (Topic, Acc) ->
case maybe_send_retained_message(RPid, Topic, Acc, PState1) of
{true, X} -> Acc + X;
false -> Acc
end
end, MessageId, Topics),
{ok, PState1#proc_state{message_id = N}};
process_request(?UNSUBSCRIBE,
#mqtt_frame{
@ -216,6 +233,36 @@ start_retainer(VHost) when is_binary(VHost) ->
Mod = rabbit_mqtt_retainer:store_module(),
rabbit_mqtt_retainer_sup:start_child(Mod, VHost).
hand_off_to_retainer(RetainerPid, Topic, #mqtt_msg{payload = <<"">>}) ->
rabbit_mqtt_retainer:clear(RetainerPid, Topic),
ok;
hand_off_to_retainer(RetainerPid, Topic, Msg) ->
rabbit_mqtt_retainer:retain(RetainerPid, Topic, Msg),
ok.
maybe_send_retained_message(RPid, #mqtt_topic{name = S, qos = Qos}, MsgId, PState) ->
Id = case Qos of
?QOS_0 -> undefined;
?QOS_1 -> MsgId
end,
case rabbit_mqtt_retainer:fetch(RPid, S) of
undefined -> false;
Msg -> send_client(#mqtt_frame{fixed = #mqtt_frame_fixed{
type = ?PUBLISH,
qos = Qos,
dup = false,
retain = Msg#mqtt_msg.retain
}, variable = #mqtt_frame_publish{
message_id = Id,
topic_name = S
},
payload = Msg#mqtt_msg.payload}, PState),
case Qos of
?QOS_0 -> false;
?QOS_1 -> {true, 1}
end
end.
amqp_callback({#'basic.deliver'{ consumer_tag = ConsumerTag,
delivery_tag = DeliveryTag,
routing_key = RoutingKey },

View File

@ -20,6 +20,7 @@
-include("rabbit_mqtt.hrl").
-export([new/2, recover/2, insert/3, lookup/2, delete/2, terminate/1]).
-export([path_for/2]).
-record(store_state, {
%% ETS table ID
@ -39,14 +40,13 @@ new(Dir, VHost) ->
recover(Dir, VHost) ->
Path = path_for(Dir, VHost),
case ets:file2tab(Path) of
{ok, Tid} -> file:delete(Path),
{ok, #store_state{table = Tid, filename = Path}};
Error -> Error
{ok, Tid} -> file:delete(Path),
{ok, #store_state{table = Tid, filename = Path}};
{error, _} -> {error, uninitialized}
end.
insert(Topic, Msg, #store_state{table = T}) ->
true = ets:insert_new(T, #retained_message{topic = Topic,
mqtt_msg = Msg}),
true = ets:insert(T, #retained_message{topic = Topic, mqtt_msg = Msg}),
ok.
lookup(Topic, #store_state{table = T}) ->
@ -69,4 +69,4 @@ path_for(Dir, VHost) ->
rabbit_mqtt_util:vhost_name_to_dir_name(VHost)).
table_name_for(VHost) ->
"mqtt_retained" ++ rabbit_mqtt_util:vhost_name_to_dir_name(VHost).
list_to_atom(rabbit_mqtt_util:vhost_name_to_dir_name(VHost)).

View File

@ -18,6 +18,7 @@
-behaviour(gen_server2).
-include("rabbit_mqtt.hrl").
-include("rabbit_mqtt_frame.hrl").
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3, start_link/2]).
@ -27,15 +28,27 @@
-define(SERVER, ?MODULE).
-record(retainer_state, {store_mod,
store}).
store}).
-ifdef(use_specs).
-spec(retain/3 :: (pid(), string(), mqtt_msg()) ->
{noreply, NewState :: term()} |
{noreply, NewState :: term(), timeout() | hibernate} |
{stop, Reason :: term(), NewState :: term()}).
-endif.
%%----------------------------------------------------------------------------
start_link(RetainStoreMod, VHost) ->
gen_server2:start_link(?MODULE, [RetainStoreMod, VHost], []).
retain(Pid, Topic, Msg) ->
gen_server2:cast(Pid, {retain, Topic, Msg}).
retain(Pid, Topic, Msg = #mqtt_msg{retain = true}) ->
gen_server2:cast(Pid, {retain, Topic, Msg});
retain(_Pid, _Topic, Msg = #mqtt_msg{retain = false}) ->
throw({error, {retain_is_false, Msg}}).
fetch(Pid, Topic) ->
gen_server2:call(Pid, {fetch, Topic}).
@ -45,9 +58,14 @@ clear(Pid, Topic) ->
%%----------------------------------------------------------------------------
init([RetainStoreMod, VHost]) ->
{ok, #retainer_state{store = RetainStoreMod:new(store_dir(), VHost),
store_mod = RetainStoreMod}}.
init([StoreMod, VHost]) ->
State = case StoreMod:recover(store_dir(), VHost) of
{ok, Store} -> #retainer_state{store = Store,
store_mod = StoreMod};
{error, _} -> #retainer_state{store = StoreMod:new(store_dir(), VHost),
store_mod = StoreMod}
end,
{ok, State}.
store_module() ->
case application:get_env(rabbitmq_mqtt, retained_message_store) of
@ -68,8 +86,11 @@ handle_cast({clear, Topic},
handle_call({fetch, Topic}, _From,
State = #retainer_state{store = Store, store_mod = Mod}) ->
#retained_message{mqtt_msg = Msg} = Mod:lookup(Topic, Store),
{reply, Msg, State}.
Reply = case Mod:lookup(Topic, Store) of
#retained_message{mqtt_msg = Msg} -> Msg;
not_found -> undefined
end,
{reply, Reply, State}.
handle_info(stop, State) ->
{stop, normal, State};
@ -80,8 +101,8 @@ handle_info(Info, State) ->
store_dir() ->
rabbit_mnesia:dir().
terminate(_Reason, _State) ->
%% TODO: notify the store
terminate(_Reason, #retainer_state{store = Store, store_mod = Mod}) ->
Mod:terminate(Store),
ok.
code_change(_OldVsn, State, _Extra) ->

View File

@ -29,10 +29,10 @@ start_link(SupName) ->
supervisor2:start_link(SupName, ?MODULE, []).
start_child(RetainStoreMod, VHost) ->
supervisor2:start_child({local, ?MODULE},
{ok, {binary_to_atom(VHost, ?ENCODING),
supervisor2:start_child(?MODULE,
{binary_to_atom(VHost, ?ENCODING),
{rabbit_mqtt_retainer, start_link, [RetainStoreMod, VHost]},
{one_for_one, 5, 5}}}).
intrinsic, 10, worker, [rabbit_mqtt_retainer]}).
init([]) ->
rabbit_log:info("MQTT retained message store: ~p~n",