Add Message Expiry Interval for retained messages
MQTT v5 spec: "If the current retained message for a Topic expires, it is discarded and there will be no retained message for that topic." This commit also supports Message Expiry Interval for retained messages when a node is restarted. Therefore, the insertion timestamp needs to be stored on disk. Upon recovery, the Erlang timers are re-created.
This commit is contained in:
parent
c39079f657
commit
66fe9630b5
|
|
@ -543,7 +543,7 @@ ensure_ttl_timer(undefined, State) ->
|
||||||
State;
|
State;
|
||||||
ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = undefined,
|
ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = undefined,
|
||||||
args_policy_version = Version}) ->
|
args_policy_version = Version}) ->
|
||||||
After = (case Expiry - os:system_time(micro_seconds) of
|
After = (case Expiry - os:system_time(microsecond) of
|
||||||
V when V > 0 -> V + 999; %% always fire later
|
V when V > 0 -> V + 999; %% always fire later
|
||||||
_ -> 0
|
_ -> 0
|
||||||
end) div 1000,
|
end) div 1000,
|
||||||
|
|
@ -991,7 +991,7 @@ calculate_msg_expiry(#basic_message{content = Content}, TTL) ->
|
||||||
{ok, MsgTTL} = rabbit_basic:parse_expiration(Props),
|
{ok, MsgTTL} = rabbit_basic:parse_expiration(Props),
|
||||||
case lists:min([TTL, MsgTTL]) of
|
case lists:min([TTL, MsgTTL]) of
|
||||||
undefined -> undefined;
|
undefined -> undefined;
|
||||||
T -> os:system_time(micro_seconds) + T * 1000
|
T -> os:system_time(microsecond) + T * 1000
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% Logically this function should invoke maybe_send_drained/2.
|
%% Logically this function should invoke maybe_send_drained/2.
|
||||||
|
|
@ -1002,7 +1002,7 @@ calculate_msg_expiry(#basic_message{content = Content}, TTL) ->
|
||||||
drop_expired_msgs(State) ->
|
drop_expired_msgs(State) ->
|
||||||
case is_empty(State) of
|
case is_empty(State) of
|
||||||
true -> State;
|
true -> State;
|
||||||
false -> drop_expired_msgs(os:system_time(micro_seconds),
|
false -> drop_expired_msgs(os:system_time(microsecond),
|
||||||
State)
|
State)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
@ -1782,7 +1782,7 @@ handle_pre_hibernate(State = #q{backing_queue = BQ,
|
||||||
State, #q.stats_timer,
|
State, #q.stats_timer,
|
||||||
fun () -> emit_stats(State,
|
fun () -> emit_stats(State,
|
||||||
[{idle_since,
|
[{idle_since,
|
||||||
os:system_time(milli_seconds)}])
|
os:system_time(millisecond)}])
|
||||||
end),
|
end),
|
||||||
State1 = rabbit_event:stop_stats_timer(State#q{backing_queue_state = BQS3},
|
State1 = rabbit_event:stop_stats_timer(State#q{backing_queue_state = BQS3},
|
||||||
#q.stats_timer),
|
#q.stats_timer),
|
||||||
|
|
|
||||||
|
|
@ -840,7 +840,7 @@ handle_pre_hibernate(State0) ->
|
||||||
State, #ch.stats_timer,
|
State, #ch.stats_timer,
|
||||||
fun () -> emit_stats(State,
|
fun () -> emit_stats(State,
|
||||||
[{idle_since,
|
[{idle_since,
|
||||||
os:system_time(milli_seconds)}])
|
os:system_time(millisecond)}])
|
||||||
end),
|
end),
|
||||||
{hibernate, rabbit_event:stop_stats_timer(State, #ch.stats_timer)}.
|
{hibernate, rabbit_event:stop_stats_timer(State, #ch.stats_timer)}.
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -44,7 +44,7 @@ make_msg(Msg = #basic_message{content = Content,
|
||||||
_ -> {[RK], fun (H) -> lists:keydelete(<<"CC">>, 1, H) end}
|
_ -> {[RK], fun (H) -> lists:keydelete(<<"CC">>, 1, H) end}
|
||||||
end,
|
end,
|
||||||
ReasonBin = atom_to_binary(Reason),
|
ReasonBin = atom_to_binary(Reason),
|
||||||
TimeSec = os:system_time(seconds),
|
TimeSec = os:system_time(second),
|
||||||
PerMsgTTL = per_msg_ttl_header(Content#content.properties),
|
PerMsgTTL = per_msg_ttl_header(Content#content.properties),
|
||||||
HeadersFun2 =
|
HeadersFun2 =
|
||||||
fun (Headers) ->
|
fun (Headers) ->
|
||||||
|
|
|
||||||
|
|
@ -228,7 +228,8 @@
|
||||||
packet_id :: option(packet_id()) | ?WILL_MSG_QOS_1_CORRELATION,
|
packet_id :: option(packet_id()) | ?WILL_MSG_QOS_1_CORRELATION,
|
||||||
payload :: binary(),
|
payload :: binary(),
|
||||||
%% PUBLISH or Will properties
|
%% PUBLISH or Will properties
|
||||||
props :: properties()
|
props :: properties(),
|
||||||
|
timestamp :: option(integer())
|
||||||
}).
|
}).
|
||||||
|
|
||||||
-type mqtt_msg() :: #mqtt_msg{}.
|
-type mqtt_msg() :: #mqtt_msg{}.
|
||||||
|
|
|
||||||
|
|
@ -67,7 +67,7 @@
|
||||||
port :: inet:port_number(),
|
port :: inet:port_number(),
|
||||||
peer_ip_addr :: inet:ip_address(),
|
peer_ip_addr :: inet:ip_address(),
|
||||||
peer_port :: inet:port_number(),
|
peer_port :: inet:port_number(),
|
||||||
connected_at = os:system_time(milli_seconds) :: pos_integer(),
|
connected_at = os:system_time(millisecond) :: pos_integer(),
|
||||||
send_fun :: send_fun(),
|
send_fun :: send_fun(),
|
||||||
%% Maximum MQTT packet size in bytes for packets sent from server to client.
|
%% Maximum MQTT packet size in bytes for packets sent from server to client.
|
||||||
max_packet_size :: max_packet_size()
|
max_packet_size :: max_packet_size()
|
||||||
|
|
@ -683,7 +683,8 @@ maybe_send_retained_message(RPid, #mqtt_topic{filter = Topic0, qos = SubscribeQo
|
||||||
State0;
|
State0;
|
||||||
#mqtt_msg{qos = MsgQos,
|
#mqtt_msg{qos = MsgQos,
|
||||||
retain = Retain,
|
retain = Retain,
|
||||||
payload = Payload} ->
|
payload = Payload,
|
||||||
|
props = Props} ->
|
||||||
Qos = effective_qos(MsgQos, SubscribeQos),
|
Qos = effective_qos(MsgQos, SubscribeQos),
|
||||||
{PacketId, State} = case Qos of
|
{PacketId, State} = case Qos of
|
||||||
?QOS_0 ->
|
?QOS_0 ->
|
||||||
|
|
@ -699,7 +700,8 @@ maybe_send_retained_message(RPid, #mqtt_topic{filter = Topic0, qos = SubscribeQo
|
||||||
},
|
},
|
||||||
variable = #mqtt_packet_publish{
|
variable = #mqtt_packet_publish{
|
||||||
packet_id = PacketId,
|
packet_id = PacketId,
|
||||||
topic_name = Topic
|
topic_name = Topic,
|
||||||
|
props = Props
|
||||||
},
|
},
|
||||||
payload = Payload},
|
payload = Payload},
|
||||||
_ = send(Packet, State),
|
_ = send(Packet, State),
|
||||||
|
|
@ -1150,7 +1152,7 @@ publish_to_queues(
|
||||||
{Expiration, Timestamp} = case Props of
|
{Expiration, Timestamp} = case Props of
|
||||||
#{'Message-Expiry-Interval' := ExpirySeconds} ->
|
#{'Message-Expiry-Interval' := ExpirySeconds} ->
|
||||||
{integer_to_binary(ExpirySeconds * 1000),
|
{integer_to_binary(ExpirySeconds * 1000),
|
||||||
os:system_time(seconds)};
|
os:system_time(second)};
|
||||||
_ ->
|
_ ->
|
||||||
{undefined, undefined}
|
{undefined, undefined}
|
||||||
end,
|
end,
|
||||||
|
|
@ -1601,7 +1603,7 @@ p_basic_to_publish_properties(#'P_basic'{headers = Headers,
|
||||||
%% "The PUBLISH packet sent to a Client by the Server MUST contain a Message
|
%% "The PUBLISH packet sent to a Client by the Server MUST contain a Message
|
||||||
%% Expiry Interval set to the received value minus the time that the
|
%% Expiry Interval set to the received value minus the time that the
|
||||||
%% Application Message has been waiting in the Server" [MQTT-3.3.2-6]
|
%% Application Message has been waiting in the Server" [MQTT-3.3.2-6]
|
||||||
WaitingSeconds = os:system_time(seconds) - TimestampSeconds,
|
WaitingSeconds = os:system_time(second) - TimestampSeconds,
|
||||||
Expiry = max(0, ExpirationSeconds - WaitingSeconds),
|
Expiry = max(0, ExpirationSeconds - WaitingSeconds),
|
||||||
#{'Message-Expiry-Interval' => Expiry};
|
#{'Message-Expiry-Interval' => Expiry};
|
||||||
false ->
|
false ->
|
||||||
|
|
|
||||||
|
|
@ -22,18 +22,24 @@
|
||||||
-include("rabbit_mqtt_packet.hrl").
|
-include("rabbit_mqtt_packet.hrl").
|
||||||
-include_lib("kernel/include/logger.hrl").
|
-include_lib("kernel/include/logger.hrl").
|
||||||
-export([start/1, insert/3, lookup/2, delete/2, terminate/1]).
|
-export([start/1, insert/3, lookup/2, delete/2, terminate/1]).
|
||||||
-export_type([state/0]).
|
-export([expire/2]).
|
||||||
|
-export_type([state/0, expire/0]).
|
||||||
|
|
||||||
-define(STATE, ?MODULE).
|
-define(STATE, ?MODULE).
|
||||||
-record(?STATE, {store_mod :: module(),
|
-record(?STATE, {store_mod :: module(),
|
||||||
store_state :: term()}).
|
store_state :: term()}).
|
||||||
-opaque state() :: #?STATE{}.
|
-opaque state() :: #?STATE{}.
|
||||||
|
|
||||||
|
-type expire() :: #{Topic :: binary() :=
|
||||||
|
{InsertionTimestamp :: integer(),
|
||||||
|
MessageExpiryInterval :: pos_integer()}}.
|
||||||
|
|
||||||
-callback new(Directory :: file:name_all(), rabbit_types:vhost()) ->
|
-callback new(Directory :: file:name_all(), rabbit_types:vhost()) ->
|
||||||
State :: any().
|
State :: any().
|
||||||
|
|
||||||
-callback recover(Directory :: file:name_all(), rabbit_types:vhost()) ->
|
-callback recover(Directory :: file:name_all(), rabbit_types:vhost()) ->
|
||||||
{ok, State :: any()} | {error, uninitialized}.
|
{ok, State :: any(), expire()} |
|
||||||
|
{error, uninitialized}.
|
||||||
|
|
||||||
-callback insert(Topic :: binary(), mqtt_msg(), State :: any()) ->
|
-callback insert(Topic :: binary(), mqtt_msg(), State :: any()) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
@ -47,25 +53,25 @@
|
||||||
-callback terminate(State :: any()) ->
|
-callback terminate(State :: any()) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
-spec start(rabbit_types:vhost()) -> state().
|
-spec start(rabbit_types:vhost()) -> {state(), expire()}.
|
||||||
start(VHost) ->
|
start(VHost) ->
|
||||||
{ok, Mod} = application:get_env(?APP_NAME, retained_message_store),
|
{ok, Mod} = application:get_env(?APP_NAME, retained_message_store),
|
||||||
Dir = rabbit:data_dir(),
|
Dir = rabbit:data_dir(),
|
||||||
?LOG_INFO("Starting MQTT retained message store ~s for vhost '~ts'",
|
?LOG_INFO("Starting MQTT retained message store ~s for vhost '~ts'",
|
||||||
[Mod, VHost]),
|
[Mod, VHost]),
|
||||||
S = case Mod:recover(Dir, VHost) of
|
{S, Expire} = case Mod:recover(Dir, VHost) of
|
||||||
{ok, StoreState} ->
|
{ok, StoreState, Expire0} ->
|
||||||
?LOG_INFO("Recovered MQTT retained message store ~s for vhost '~ts'",
|
?LOG_INFO("Recovered MQTT retained message store ~s for vhost '~ts'",
|
||||||
[Mod, VHost]),
|
[Mod, VHost]),
|
||||||
StoreState;
|
{StoreState, Expire0};
|
||||||
{error, uninitialized} ->
|
{error, uninitialized} ->
|
||||||
StoreState = Mod:new(Dir, VHost),
|
StoreState = Mod:new(Dir, VHost),
|
||||||
?LOG_INFO("Initialized MQTT retained message store ~s for vhost '~ts'",
|
?LOG_INFO("Initialized MQTT retained message store ~s for vhost '~ts'",
|
||||||
[Mod, VHost]),
|
[Mod, VHost]),
|
||||||
StoreState
|
{StoreState, #{}}
|
||||||
end,
|
end,
|
||||||
#?STATE{store_mod = Mod,
|
{#?STATE{store_mod = Mod,
|
||||||
store_state = S}.
|
store_state = S}, Expire}.
|
||||||
|
|
||||||
-spec insert(Topic :: binary(), mqtt_msg(), state()) -> ok.
|
-spec insert(Topic :: binary(), mqtt_msg(), state()) -> ok.
|
||||||
insert(Topic, Msg, #?STATE{store_mod = Mod,
|
insert(Topic, Msg, #?STATE{store_mod = Mod,
|
||||||
|
|
@ -87,3 +93,22 @@ delete(Topic, #?STATE{store_mod = Mod,
|
||||||
terminate(#?STATE{store_mod = Mod,
|
terminate(#?STATE{store_mod = Mod,
|
||||||
store_state = StoreState}) ->
|
store_state = StoreState}) ->
|
||||||
ok = Mod:terminate(StoreState).
|
ok = Mod:terminate(StoreState).
|
||||||
|
|
||||||
|
-spec expire(ets | dets, ets:tid() | dets:tab_name()) -> expire().
|
||||||
|
expire(Mod, Tab) ->
|
||||||
|
Now = os:system_time(second),
|
||||||
|
Mod:foldl(
|
||||||
|
fun(#retained_message{topic = Topic,
|
||||||
|
mqtt_msg = #mqtt_msg{props = #{'Message-Expiry-Interval' := Expiry},
|
||||||
|
timestamp = Timestamp}}, Acc)
|
||||||
|
when is_integer(Expiry) andalso
|
||||||
|
is_integer(Timestamp) ->
|
||||||
|
if Now - Timestamp >= Expiry ->
|
||||||
|
Mod:delete(Tab, Topic),
|
||||||
|
Acc;
|
||||||
|
true ->
|
||||||
|
maps:put(Topic, {Timestamp, Expiry}, Acc)
|
||||||
|
end;
|
||||||
|
(_, Acc) ->
|
||||||
|
Acc
|
||||||
|
end, #{}, Tab).
|
||||||
|
|
|
||||||
|
|
@ -8,7 +8,9 @@
|
||||||
-module(rabbit_mqtt_retained_msg_store_dets).
|
-module(rabbit_mqtt_retained_msg_store_dets).
|
||||||
|
|
||||||
-behaviour(rabbit_mqtt_retained_msg_store).
|
-behaviour(rabbit_mqtt_retained_msg_store).
|
||||||
|
|
||||||
-include("rabbit_mqtt_packet.hrl").
|
-include("rabbit_mqtt_packet.hrl").
|
||||||
|
-include_lib("kernel/include/logger.hrl").
|
||||||
|
|
||||||
-export([new/2, recover/2, insert/3, lookup/2, delete/2, terminate/1]).
|
-export([new/2, recover/2, insert/3, lookup/2, delete/2, terminate/1]).
|
||||||
|
|
||||||
|
|
@ -18,16 +20,22 @@
|
||||||
|
|
||||||
-spec new(file:name_all(), rabbit_types:vhost()) -> store_state().
|
-spec new(file:name_all(), rabbit_types:vhost()) -> store_state().
|
||||||
new(Dir, VHost) ->
|
new(Dir, VHost) ->
|
||||||
Tid = open_table(Dir, VHost),
|
{ok, TabName} = open_table(Dir, VHost),
|
||||||
#store_state{table = Tid}.
|
#store_state{table = TabName}.
|
||||||
|
|
||||||
-spec recover(file:name_all(), rabbit_types:vhost()) ->
|
-spec recover(file:name_all(), rabbit_types:vhost()) ->
|
||||||
{error, uninitialized} | {ok, store_state()}.
|
{ok, store_state(), rabbit_mqtt_retained_msg_store:expire()} |
|
||||||
|
{error, uninitialized}.
|
||||||
recover(Dir, VHost) ->
|
recover(Dir, VHost) ->
|
||||||
case open_table(Dir, VHost) of
|
case open_table(Dir, VHost) of
|
||||||
{error, _} -> {error, uninitialized};
|
{ok, TabName} ->
|
||||||
{ok, Tid} -> {ok, #store_state{table = Tid}}
|
{ok,
|
||||||
end.
|
#store_state{table = TabName},
|
||||||
|
rabbit_mqtt_retained_msg_store:expire(dets, TabName)};
|
||||||
|
{error, Reason} ->
|
||||||
|
?LOG_ERROR("~s failed to open table: ~p", [?MODULE, Reason]),
|
||||||
|
{error, uninitialized}
|
||||||
|
end.
|
||||||
|
|
||||||
-spec insert(binary(), mqtt_msg(), store_state()) -> ok.
|
-spec insert(binary(), mqtt_msg(), store_state()) -> ok.
|
||||||
insert(Topic, Msg, #store_state{table = T}) ->
|
insert(Topic, Msg, #store_state{table = T}) ->
|
||||||
|
|
|
||||||
|
|
@ -8,6 +8,7 @@
|
||||||
-module(rabbit_mqtt_retained_msg_store_ets).
|
-module(rabbit_mqtt_retained_msg_store_ets).
|
||||||
|
|
||||||
-behaviour(rabbit_mqtt_retained_msg_store).
|
-behaviour(rabbit_mqtt_retained_msg_store).
|
||||||
|
|
||||||
-include("rabbit_mqtt_packet.hrl").
|
-include("rabbit_mqtt_packet.hrl").
|
||||||
|
|
||||||
-export([new/2, recover/2, insert/3, lookup/2, delete/2, terminate/1]).
|
-export([new/2, recover/2, insert/3, lookup/2, delete/2, terminate/1]).
|
||||||
|
|
@ -28,14 +29,19 @@ new(Dir, VHost) ->
|
||||||
#store_state{table = Tid, filename = Path}.
|
#store_state{table = Tid, filename = Path}.
|
||||||
|
|
||||||
-spec recover(file:name_all(), rabbit_types:vhost()) ->
|
-spec recover(file:name_all(), rabbit_types:vhost()) ->
|
||||||
{error, uninitialized} | {ok, store_state()}.
|
{ok, store_state(), rabbit_mqtt_retained_msg_store:expire()} |
|
||||||
|
{error, uninitialized}.
|
||||||
recover(Dir, VHost) ->
|
recover(Dir, VHost) ->
|
||||||
Path = rabbit_mqtt_util:path_for(Dir, VHost),
|
Path = rabbit_mqtt_util:path_for(Dir, VHost),
|
||||||
case ets:file2tab(Path) of
|
case ets:file2tab(Path) of
|
||||||
{ok, Tid} -> _ = file:delete(Path),
|
{ok, Tid} ->
|
||||||
{ok, #store_state{table = Tid, filename = Path}};
|
_ = file:delete(Path),
|
||||||
{error, _} -> {error, uninitialized}
|
{ok,
|
||||||
end.
|
#store_state{table = Tid, filename = Path},
|
||||||
|
rabbit_mqtt_retained_msg_store:expire(ets, Tid)};
|
||||||
|
{error, _} ->
|
||||||
|
{error, uninitialized}
|
||||||
|
end.
|
||||||
|
|
||||||
-spec insert(binary(), mqtt_msg(), store_state()) -> ok.
|
-spec insert(binary(), mqtt_msg(), store_state()) -> ok.
|
||||||
insert(Topic, Msg, #store_state{table = T}) ->
|
insert(Topic, Msg, #store_state{table = T}) ->
|
||||||
|
|
|
||||||
|
|
@ -15,7 +15,7 @@ new(_Dir, _VHost) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
recover(_Dir, _VHost) ->
|
recover(_Dir, _VHost) ->
|
||||||
{ok, ok}.
|
{ok, ok, #{}}.
|
||||||
|
|
||||||
insert(_Topic, _Msg, _State) ->
|
insert(_Topic, _Msg, _State) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
|
||||||
|
|
@ -18,6 +18,12 @@
|
||||||
|
|
||||||
-define(TIMEOUT, 30_000).
|
-define(TIMEOUT, 30_000).
|
||||||
|
|
||||||
|
-define(STATE, ?MODULE).
|
||||||
|
-record(?STATE, {store_state :: rabbit_mqtt_retained_msg_store:state(),
|
||||||
|
expire :: #{Topic :: binary() := TimerRef :: reference()}
|
||||||
|
}).
|
||||||
|
-type state() :: #?STATE{}.
|
||||||
|
|
||||||
-spec start_link(rabbit_types:vhost()) ->
|
-spec start_link(rabbit_types:vhost()) ->
|
||||||
gen_server:start_ret().
|
gen_server:start_ret().
|
||||||
start_link(VHost) ->
|
start_link(VHost) ->
|
||||||
|
|
@ -37,27 +43,88 @@ clear(Pid, Topic) ->
|
||||||
gen_server:cast(Pid, {clear, Topic}).
|
gen_server:cast(Pid, {clear, Topic}).
|
||||||
|
|
||||||
-spec init(rabbit_types:vhost()) ->
|
-spec init(rabbit_types:vhost()) ->
|
||||||
{ok, rabbit_mqtt_retained_msg_store:state()}.
|
{ok, state()}.
|
||||||
init(VHost) ->
|
init(VHost) ->
|
||||||
process_flag(trap_exit, true),
|
process_flag(trap_exit, true),
|
||||||
State = rabbit_mqtt_retained_msg_store:start(VHost),
|
{StoreState, Expire0} = rabbit_mqtt_retained_msg_store:start(VHost),
|
||||||
{ok, State}.
|
Now = os:system_time(second),
|
||||||
|
Expire = maps:map(fun(Topic, {Timestamp, Expiry}) ->
|
||||||
|
TimerSecs = max(0, Expiry - (Now - Timestamp)),
|
||||||
|
start_timer(TimerSecs, Topic)
|
||||||
|
end, Expire0),
|
||||||
|
{ok, #?STATE{store_state = StoreState,
|
||||||
|
expire = Expire}}.
|
||||||
|
|
||||||
handle_cast({retain, Topic, Msg}, State) ->
|
handle_cast({retain, Topic, Msg0 = #mqtt_msg{props = Props}},
|
||||||
ok = rabbit_mqtt_retained_msg_store:insert(Topic, Msg, State),
|
State = #?STATE{store_state = StoreState,
|
||||||
{noreply, State};
|
expire = Expire0}) ->
|
||||||
handle_cast({clear, Topic}, State) ->
|
Expire2 = case maps:take(Topic, Expire0) of
|
||||||
ok = rabbit_mqtt_retained_msg_store:delete(Topic, State),
|
{OldTimer, Expire1} ->
|
||||||
{noreply, State}.
|
cancel_timer(OldTimer),
|
||||||
|
Expire1;
|
||||||
|
error ->
|
||||||
|
Expire0
|
||||||
|
end,
|
||||||
|
{Msg, Expire} = case maps:find('Message-Expiry-Interval', Props) of
|
||||||
|
{ok, ExpirySeconds} ->
|
||||||
|
Timer = start_timer(ExpirySeconds, Topic),
|
||||||
|
{Msg0#mqtt_msg{timestamp = os:system_time(second)},
|
||||||
|
maps:put(Topic, Timer, Expire2)};
|
||||||
|
error ->
|
||||||
|
{Msg0, Expire2}
|
||||||
|
end,
|
||||||
|
ok = rabbit_mqtt_retained_msg_store:insert(Topic, Msg, StoreState),
|
||||||
|
{noreply, State#?STATE{expire = Expire}};
|
||||||
|
handle_cast({clear, Topic}, State = #?STATE{store_state = StoreState,
|
||||||
|
expire = Expire0}) ->
|
||||||
|
Expire = case maps:take(Topic, Expire0) of
|
||||||
|
{OldTimer, Expire1} ->
|
||||||
|
cancel_timer(OldTimer),
|
||||||
|
Expire1;
|
||||||
|
error ->
|
||||||
|
Expire0
|
||||||
|
end,
|
||||||
|
ok = rabbit_mqtt_retained_msg_store:delete(Topic, StoreState),
|
||||||
|
{noreply, State#?STATE{expire = Expire}}.
|
||||||
|
|
||||||
handle_call({fetch, Topic}, _From, State) ->
|
handle_call({fetch, Topic}, _From, State = #?STATE{store_state = StoreState}) ->
|
||||||
Reply = rabbit_mqtt_retained_msg_store:lookup(Topic, State),
|
Reply = case rabbit_mqtt_retained_msg_store:lookup(Topic, StoreState) of
|
||||||
|
#mqtt_msg{props = #{'Message-Expiry-Interval' := Expiry0} = Props,
|
||||||
|
timestamp = Timestamp} = MqttMsg ->
|
||||||
|
%% “The PUBLISH packet sent to a Client by the Server MUST contain a Message
|
||||||
|
%% Expiry Interval set to the received value minus the time that the
|
||||||
|
%% Application Message has been waiting in the Server [MQTT-3.3.2-6].”
|
||||||
|
Expiry = max(0, Expiry0 - (os:system_time(second) - Timestamp)),
|
||||||
|
MqttMsg#mqtt_msg{props = maps:put('Message-Expiry-Interval', Expiry, Props)};
|
||||||
|
Other ->
|
||||||
|
Other
|
||||||
|
end,
|
||||||
{reply, Reply, State}.
|
{reply, Reply, State}.
|
||||||
|
|
||||||
|
handle_info({timeout, Timer, Topic}, State = #?STATE{store_state = StoreState,
|
||||||
|
expire = Expire0}) ->
|
||||||
|
Expire = case maps:take(Topic, Expire0) of
|
||||||
|
{Timer, Expire1} ->
|
||||||
|
ok = rabbit_mqtt_retained_msg_store:delete(Topic, StoreState),
|
||||||
|
Expire1;
|
||||||
|
_ ->
|
||||||
|
Expire0
|
||||||
|
end,
|
||||||
|
{noreply, State#?STATE{expire = Expire}};
|
||||||
handle_info(stop, State) ->
|
handle_info(stop, State) ->
|
||||||
{stop, normal, State};
|
{stop, normal, State};
|
||||||
handle_info(Info, State) ->
|
handle_info(Info, State) ->
|
||||||
{stop, {unknown_info, Info}, State}.
|
{stop, {unknown_info, Info}, State}.
|
||||||
|
|
||||||
terminate(_Reason, State) ->
|
terminate(_Reason, #?STATE{store_state = StoreState}) ->
|
||||||
rabbit_mqtt_retained_msg_store:terminate(State).
|
rabbit_mqtt_retained_msg_store:terminate(StoreState).
|
||||||
|
|
||||||
|
-spec start_timer(integer(), binary()) -> reference().
|
||||||
|
start_timer(Seconds, Topic)
|
||||||
|
when is_binary(Topic) ->
|
||||||
|
erlang:start_timer(timer:seconds(Seconds), self(), Topic).
|
||||||
|
|
||||||
|
-spec cancel_timer(reference()) -> ok.
|
||||||
|
cancel_timer(TimerRef) ->
|
||||||
|
ok = erlang:cancel_timer(TimerRef, [{async, true},
|
||||||
|
{info, false}]).
|
||||||
|
|
|
||||||
|
|
@ -8,9 +8,11 @@
|
||||||
-compile([export_all, nowarn_export_all]).
|
-compile([export_all, nowarn_export_all]).
|
||||||
|
|
||||||
-include_lib("common_test/include/ct.hrl").
|
-include_lib("common_test/include/ct.hrl").
|
||||||
-import(util, [expect_publishes/3,
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
connect/2,
|
-import(util, [connect/2, connect/3,
|
||||||
connect/3]).
|
expect_publishes/3,
|
||||||
|
assert_message_expiry_interval/2
|
||||||
|
]).
|
||||||
|
|
||||||
all() ->
|
all() ->
|
||||||
[
|
[
|
||||||
|
|
@ -37,7 +39,8 @@ tests() ->
|
||||||
should_translate_amqp2mqtt_on_publish,
|
should_translate_amqp2mqtt_on_publish,
|
||||||
should_translate_amqp2mqtt_on_retention,
|
should_translate_amqp2mqtt_on_retention,
|
||||||
should_translate_amqp2mqtt_on_retention_search,
|
should_translate_amqp2mqtt_on_retention_search,
|
||||||
recover
|
recover,
|
||||||
|
recover_with_message_expiry_interval
|
||||||
].
|
].
|
||||||
|
|
||||||
suite() ->
|
suite() ->
|
||||||
|
|
@ -91,6 +94,13 @@ end_per_group(_, Config) ->
|
||||||
rabbit_ct_client_helpers:teardown_steps() ++
|
rabbit_ct_client_helpers:teardown_steps() ++
|
||||||
rabbit_ct_broker_helpers:teardown_steps()).
|
rabbit_ct_broker_helpers:teardown_steps()).
|
||||||
|
|
||||||
|
init_per_testcase(recover_with_message_expiry_interval = T, Config) ->
|
||||||
|
case ?config(mqtt_version, Config) of
|
||||||
|
v4 ->
|
||||||
|
{skip, "Message Expiry Interval not supported in MQTT v4"};
|
||||||
|
v5 ->
|
||||||
|
rabbit_ct_helpers:testcase_started(Config, T)
|
||||||
|
end;
|
||||||
init_per_testcase(Testcase, Config) ->
|
init_per_testcase(Testcase, Config) ->
|
||||||
rabbit_ct_helpers:testcase_started(Config, Testcase).
|
rabbit_ct_helpers:testcase_started(Config, Testcase).
|
||||||
|
|
||||||
|
|
@ -172,3 +182,56 @@ recover(Config) ->
|
||||||
{ok, _, _} = emqtt:subscribe(C2, Topic, qos1),
|
{ok, _, _} = emqtt:subscribe(C2, Topic, qos1),
|
||||||
ok = expect_publishes(C2, Topic, [Payload]),
|
ok = expect_publishes(C2, Topic, [Payload]),
|
||||||
ok = emqtt:disconnect(C2).
|
ok = emqtt:disconnect(C2).
|
||||||
|
|
||||||
|
recover_with_message_expiry_interval(Config) ->
|
||||||
|
ClientId = atom_to_binary(?FUNCTION_NAME),
|
||||||
|
C1 = connect(ClientId, Config),
|
||||||
|
Start = os:system_time(second),
|
||||||
|
{ok, _} = emqtt:publish(C1, <<"topic/1">>,
|
||||||
|
<<"m1">>, [{retain, true}, {qos, 1}]),
|
||||||
|
{ok, _} = emqtt:publish(C1, <<"topic/2">>, #{'Message-Expiry-Interval' => 100},
|
||||||
|
<<"m2">>, [{retain, true}, {qos, 1}]),
|
||||||
|
{ok, _} = emqtt:publish(C1, <<"topic/3">>, #{'Message-Expiry-Interval' => 3},
|
||||||
|
<<"m3">>, [{retain, true}, {qos, 1}]),
|
||||||
|
{ok, _} = emqtt:publish(C1, <<"topic/4">>, #{'Message-Expiry-Interval' => 15},
|
||||||
|
<<"m4">>, [{retain, true}, {qos, 1}]),
|
||||||
|
ok = emqtt:disconnect(C1),
|
||||||
|
%% Takes around 9 seconds on Linux.
|
||||||
|
ok = rabbit_ct_broker_helpers:restart_node(Config, 0),
|
||||||
|
C2 = connect(ClientId, Config),
|
||||||
|
|
||||||
|
%% Retained message for topic/3 should have expired during node restart.
|
||||||
|
%% Wait for retained message for topic/4 to expire.
|
||||||
|
ElapsedSeconds1 = os:system_time(second) - Start,
|
||||||
|
SleepMs = max(0, timer:seconds(15 - ElapsedSeconds1 + 1)),
|
||||||
|
ct:pal("Sleeping for ~b ms", [SleepMs]),
|
||||||
|
timer:sleep(SleepMs),
|
||||||
|
|
||||||
|
ElapsedSeconds2 = os:system_time(second) - Start,
|
||||||
|
{ok, _, [1,1,1,1]} = emqtt:subscribe(C2, [{<<"topic/1">>, qos1},
|
||||||
|
{<<"topic/2">>, qos1},
|
||||||
|
{<<"topic/3">>, qos1},
|
||||||
|
{<<"topic/4">>, qos1}]),
|
||||||
|
receive {publish, #{client_pid := C2,
|
||||||
|
retain := true,
|
||||||
|
topic := <<"topic/1">>,
|
||||||
|
payload := <<"m1">>,
|
||||||
|
properties := Props}}
|
||||||
|
when map_size(Props) =:= 0 -> ok
|
||||||
|
after 100 -> ct:fail("did not topic/1")
|
||||||
|
end,
|
||||||
|
|
||||||
|
receive {publish, #{client_pid := C2,
|
||||||
|
retain := true,
|
||||||
|
topic := <<"topic/2">>,
|
||||||
|
payload := <<"m2">>,
|
||||||
|
properties := #{'Message-Expiry-Interval' := MEI}}} ->
|
||||||
|
assert_message_expiry_interval(100 - ElapsedSeconds2, MEI)
|
||||||
|
after 100 -> ct:fail("did not topic/2")
|
||||||
|
end,
|
||||||
|
|
||||||
|
receive Unexpected -> ct:fail("Received unexpectedly: ~p", [Unexpected])
|
||||||
|
after 0 -> ok
|
||||||
|
end,
|
||||||
|
|
||||||
|
ok = emqtt:disconnect(C2).
|
||||||
|
|
|
||||||
|
|
@ -21,6 +21,7 @@
|
||||||
get_events/1,
|
get_events/1,
|
||||||
assert_event_type/2,
|
assert_event_type/2,
|
||||||
assert_event_prop/2,
|
assert_event_prop/2,
|
||||||
|
assert_message_expiry_interval/2,
|
||||||
await_exit/1,
|
await_exit/1,
|
||||||
await_exit/2,
|
await_exit/2,
|
||||||
maybe_skip_v5/1
|
maybe_skip_v5/1
|
||||||
|
|
@ -113,6 +114,12 @@ assert_event_prop(ExpectedProps, Event)
|
||||||
assert_event_prop(P, Event)
|
assert_event_prop(P, Event)
|
||||||
end, ExpectedProps).
|
end, ExpectedProps).
|
||||||
|
|
||||||
|
assert_message_expiry_interval(Expected, Actual)
|
||||||
|
when is_integer(Expected),
|
||||||
|
is_integer(Actual) ->
|
||||||
|
?assert(Expected >= Actual - 1),
|
||||||
|
?assert(Expected =< Actual + 1).
|
||||||
|
|
||||||
await_exit(Pid) ->
|
await_exit(Pid) ->
|
||||||
receive
|
receive
|
||||||
{'EXIT', Pid, _} -> ok
|
{'EXIT', Pid, _} -> ok
|
||||||
|
|
|
||||||
|
|
@ -16,7 +16,8 @@
|
||||||
-import(util,
|
-import(util,
|
||||||
[
|
[
|
||||||
start_client/4,
|
start_client/4,
|
||||||
connect/2, connect/3, connect/4
|
connect/2, connect/3, connect/4,
|
||||||
|
assert_message_expiry_interval/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
all() ->
|
all() ->
|
||||||
|
|
@ -40,6 +41,7 @@ cluster_size_1_tests() ->
|
||||||
client_set_max_packet_size_invalid,
|
client_set_max_packet_size_invalid,
|
||||||
message_expiry_interval,
|
message_expiry_interval,
|
||||||
message_expiry_interval_will_message,
|
message_expiry_interval_will_message,
|
||||||
|
message_expiry_interval_retained_message,
|
||||||
client_publish_qos2
|
client_publish_qos2
|
||||||
].
|
].
|
||||||
|
|
||||||
|
|
@ -166,7 +168,8 @@ message_expiry_interval(Config) ->
|
||||||
%% "The PUBLISH packet sent to a Client by the Server MUST contain a Message
|
%% "The PUBLISH packet sent to a Client by the Server MUST contain a Message
|
||||||
%% Expiry Interval set to the received value minus the time that the
|
%% Expiry Interval set to the received value minus the time that the
|
||||||
%% Application Message has been waiting in the Server" [MQTT-3.3.2-6]
|
%% Application Message has been waiting in the Server" [MQTT-3.3.2-6]
|
||||||
properties := #{'Message-Expiry-Interval' := 10-2}}} -> ok
|
properties := #{'Message-Expiry-Interval' := MEI}}} ->
|
||||||
|
assert_message_expiry_interval(10 - 2, MEI)
|
||||||
after 100 -> ct:fail("did not receive m3")
|
after 100 -> ct:fail("did not receive m3")
|
||||||
end,
|
end,
|
||||||
assert_nothing_received(),
|
assert_nothing_received(),
|
||||||
|
|
@ -206,13 +209,63 @@ message_expiry_interval_will_message(Config) ->
|
||||||
assert_nothing_received(),
|
assert_nothing_received(),
|
||||||
ok = emqtt:disconnect(Sub2).
|
ok = emqtt:disconnect(Sub2).
|
||||||
|
|
||||||
|
message_expiry_interval_retained_message(Config) ->
|
||||||
|
Pub = connect(<<"publisher">>, Config),
|
||||||
|
|
||||||
|
{ok, _} = emqtt:publish(Pub, <<"topic1">>, #{'Message-Expiry-Interval' => 100},
|
||||||
|
<<"m1.1">>, [{retain, true}, {qos, 1}]),
|
||||||
|
{ok, _} = emqtt:publish(Pub, <<"topic2">>, #{'Message-Expiry-Interval' => 2},
|
||||||
|
<<"m2">>, [{retain, true}, {qos, 1}]),
|
||||||
|
{ok, _} = emqtt:publish(Pub, <<"topic3">>, #{'Message-Expiry-Interval' => 100},
|
||||||
|
<<"m3.1">>, [{retain, true}, {qos, 1}]),
|
||||||
|
{ok, _} = emqtt:publish(Pub, <<"topic4">>, #{'Message-Expiry-Interval' => 100},
|
||||||
|
<<"m4">>, [{retain, true}, {qos, 1}]),
|
||||||
|
|
||||||
|
{ok, _} = emqtt:publish(Pub, <<"topic1">>, #{'Message-Expiry-Interval' => 2},
|
||||||
|
<<"m1.2">>, [{retain, true}, {qos, 1}]),
|
||||||
|
{ok, _} = emqtt:publish(Pub, <<"topic2">>, #{'Message-Expiry-Interval' => 2},
|
||||||
|
<<>>, [{retain, true}, {qos, 1}]),
|
||||||
|
{ok, _} = emqtt:publish(Pub, <<"topic3">>, #{},
|
||||||
|
<<"m3.2">>, [{retain, true}, {qos, 1}]),
|
||||||
|
timer:sleep(2001),
|
||||||
|
%% Expectations:
|
||||||
|
%% topic1 expired because 2 seconds elapsed
|
||||||
|
%% topic2 is not retained because it got deleted
|
||||||
|
%% topic3 is retained because its new message does not have an Expiry-Interval set
|
||||||
|
%% topic4 is retained because 100 seconds have not elapsed
|
||||||
|
Sub = connect(<<"subscriber">>, Config),
|
||||||
|
{ok, _, [1,1,1,1]} = emqtt:subscribe(Sub, [{<<"topic1">>, qos1},
|
||||||
|
{<<"topic2">>, qos1},
|
||||||
|
{<<"topic3">>, qos1},
|
||||||
|
{<<"topic4">>, qos1}]),
|
||||||
|
receive {publish, #{client_pid := Sub,
|
||||||
|
retain := true,
|
||||||
|
topic := <<"topic3">>,
|
||||||
|
payload := <<"m3.2">>,
|
||||||
|
properties := Props}}
|
||||||
|
when map_size(Props) =:= 0 -> ok
|
||||||
|
after 100 -> ct:fail("did not topic3")
|
||||||
|
end,
|
||||||
|
|
||||||
|
receive {publish, #{client_pid := Sub,
|
||||||
|
retain := true,
|
||||||
|
topic := <<"topic4">>,
|
||||||
|
payload := <<"m4">>,
|
||||||
|
properties := #{'Message-Expiry-Interval' := MEI}}} ->
|
||||||
|
assert_message_expiry_interval(100 - 2, MEI)
|
||||||
|
after 100 -> ct:fail("did not receive topic4")
|
||||||
|
end,
|
||||||
|
assert_nothing_received(),
|
||||||
|
|
||||||
|
ok = emqtt:disconnect(Pub),
|
||||||
|
ok = emqtt:disconnect(Sub).
|
||||||
|
|
||||||
client_publish_qos2(Config) ->
|
client_publish_qos2(Config) ->
|
||||||
Topic = ClientId = atom_to_binary(?FUNCTION_NAME),
|
Topic = ClientId = atom_to_binary(?FUNCTION_NAME),
|
||||||
{C, Connect} = start_client(ClientId, Config, 0, []),
|
{C, Connect} = start_client(ClientId, Config, 0, []),
|
||||||
{ok, Props} = Connect(C),
|
?assertMatch({ok, #{'Maximum-QoS' := 1}}, Connect(C)),
|
||||||
?assertEqual(1, maps:get('Maximum-QoS', Props)),
|
?assertEqual({error, {disconnected, _RcQosNotSupported = 155, #{}}},
|
||||||
{error, Response} = emqtt:publish(C, Topic, #{}, <<"msg">>, [{qos, 2}]),
|
emqtt:publish(C, Topic, <<"msg">>, [{qos, 2}])).
|
||||||
?assertEqual({disconnected, 155, #{}}, Response).
|
|
||||||
|
|
||||||
satisfy_bazel(_Config) ->
|
satisfy_bazel(_Config) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
|
||||||
|
|
@ -237,7 +237,7 @@ init([KeepaliveSup,
|
||||||
peer_host = PeerHost,
|
peer_host = PeerHost,
|
||||||
port = Port,
|
port = Port,
|
||||||
peer_port = PeerPort,
|
peer_port = PeerPort,
|
||||||
connected_at = os:system_time(milli_seconds),
|
connected_at = os:system_time(millisecond),
|
||||||
auth_mechanism = none,
|
auth_mechanism = none,
|
||||||
helper_sup = KeepaliveSup,
|
helper_sup = KeepaliveSup,
|
||||||
socket = RealSocket,
|
socket = RealSocket,
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue