diff --git a/deps/rabbit/src/rabbit_amqqueue_process.erl b/deps/rabbit/src/rabbit_amqqueue_process.erl index 0c3447e075..2608d4a268 100644 --- a/deps/rabbit/src/rabbit_amqqueue_process.erl +++ b/deps/rabbit/src/rabbit_amqqueue_process.erl @@ -543,7 +543,7 @@ ensure_ttl_timer(undefined, State) -> State; ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = undefined, args_policy_version = Version}) -> - After = (case Expiry - os:system_time(micro_seconds) of + After = (case Expiry - os:system_time(microsecond) of V when V > 0 -> V + 999; %% always fire later _ -> 0 end) div 1000, @@ -991,7 +991,7 @@ calculate_msg_expiry(#basic_message{content = Content}, TTL) -> {ok, MsgTTL} = rabbit_basic:parse_expiration(Props), case lists:min([TTL, MsgTTL]) of undefined -> undefined; - T -> os:system_time(micro_seconds) + T * 1000 + T -> os:system_time(microsecond) + T * 1000 end. %% Logically this function should invoke maybe_send_drained/2. @@ -1002,7 +1002,7 @@ calculate_msg_expiry(#basic_message{content = Content}, TTL) -> drop_expired_msgs(State) -> case is_empty(State) of true -> State; - false -> drop_expired_msgs(os:system_time(micro_seconds), + false -> drop_expired_msgs(os:system_time(microsecond), State) end. @@ -1782,7 +1782,7 @@ handle_pre_hibernate(State = #q{backing_queue = BQ, State, #q.stats_timer, fun () -> emit_stats(State, [{idle_since, - os:system_time(milli_seconds)}]) + os:system_time(millisecond)}]) end), State1 = rabbit_event:stop_stats_timer(State#q{backing_queue_state = BQS3}, #q.stats_timer), diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index 995ccf623e..09b5eb2631 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -840,7 +840,7 @@ handle_pre_hibernate(State0) -> State, #ch.stats_timer, fun () -> emit_stats(State, [{idle_since, - os:system_time(milli_seconds)}]) + os:system_time(millisecond)}]) end), {hibernate, rabbit_event:stop_stats_timer(State, #ch.stats_timer)}. diff --git a/deps/rabbit/src/rabbit_dead_letter.erl b/deps/rabbit/src/rabbit_dead_letter.erl index 7212ed32df..8baa94774f 100644 --- a/deps/rabbit/src/rabbit_dead_letter.erl +++ b/deps/rabbit/src/rabbit_dead_letter.erl @@ -44,7 +44,7 @@ make_msg(Msg = #basic_message{content = Content, _ -> {[RK], fun (H) -> lists:keydelete(<<"CC">>, 1, H) end} end, ReasonBin = atom_to_binary(Reason), - TimeSec = os:system_time(seconds), + TimeSec = os:system_time(second), PerMsgTTL = per_msg_ttl_header(Content#content.properties), HeadersFun2 = fun (Headers) -> diff --git a/deps/rabbitmq_mqtt/include/rabbit_mqtt_packet.hrl b/deps/rabbitmq_mqtt/include/rabbit_mqtt_packet.hrl index f328cf2df9..5322b42908 100644 --- a/deps/rabbitmq_mqtt/include/rabbit_mqtt_packet.hrl +++ b/deps/rabbitmq_mqtt/include/rabbit_mqtt_packet.hrl @@ -228,7 +228,8 @@ packet_id :: option(packet_id()) | ?WILL_MSG_QOS_1_CORRELATION, payload :: binary(), %% PUBLISH or Will properties - props :: properties() + props :: properties(), + timestamp :: option(integer()) }). -type mqtt_msg() :: #mqtt_msg{}. diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl index ef13b0e938..050e1111c6 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl @@ -67,7 +67,7 @@ port :: inet:port_number(), peer_ip_addr :: inet:ip_address(), peer_port :: inet:port_number(), - connected_at = os:system_time(milli_seconds) :: pos_integer(), + connected_at = os:system_time(millisecond) :: pos_integer(), send_fun :: send_fun(), %% Maximum MQTT packet size in bytes for packets sent from server to client. max_packet_size :: max_packet_size() @@ -683,7 +683,8 @@ maybe_send_retained_message(RPid, #mqtt_topic{filter = Topic0, qos = SubscribeQo State0; #mqtt_msg{qos = MsgQos, retain = Retain, - payload = Payload} -> + payload = Payload, + props = Props} -> Qos = effective_qos(MsgQos, SubscribeQos), {PacketId, State} = case Qos of ?QOS_0 -> @@ -699,7 +700,8 @@ maybe_send_retained_message(RPid, #mqtt_topic{filter = Topic0, qos = SubscribeQo }, variable = #mqtt_packet_publish{ packet_id = PacketId, - topic_name = Topic + topic_name = Topic, + props = Props }, payload = Payload}, _ = send(Packet, State), @@ -1150,7 +1152,7 @@ publish_to_queues( {Expiration, Timestamp} = case Props of #{'Message-Expiry-Interval' := ExpirySeconds} -> {integer_to_binary(ExpirySeconds * 1000), - os:system_time(seconds)}; + os:system_time(second)}; _ -> {undefined, undefined} end, @@ -1601,7 +1603,7 @@ p_basic_to_publish_properties(#'P_basic'{headers = Headers, %% "The PUBLISH packet sent to a Client by the Server MUST contain a Message %% Expiry Interval set to the received value minus the time that the %% Application Message has been waiting in the Server" [MQTT-3.3.2-6] - WaitingSeconds = os:system_time(seconds) - TimestampSeconds, + WaitingSeconds = os:system_time(second) - TimestampSeconds, Expiry = max(0, ExpirationSeconds - WaitingSeconds), #{'Message-Expiry-Interval' => Expiry}; false -> diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store.erl index 6770162d74..fc0127f82b 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store.erl @@ -22,18 +22,24 @@ -include("rabbit_mqtt_packet.hrl"). -include_lib("kernel/include/logger.hrl"). -export([start/1, insert/3, lookup/2, delete/2, terminate/1]). --export_type([state/0]). +-export([expire/2]). +-export_type([state/0, expire/0]). -define(STATE, ?MODULE). -record(?STATE, {store_mod :: module(), store_state :: term()}). -opaque state() :: #?STATE{}. +-type expire() :: #{Topic :: binary() := + {InsertionTimestamp :: integer(), + MessageExpiryInterval :: pos_integer()}}. + -callback new(Directory :: file:name_all(), rabbit_types:vhost()) -> State :: any(). -callback recover(Directory :: file:name_all(), rabbit_types:vhost()) -> - {ok, State :: any()} | {error, uninitialized}. + {ok, State :: any(), expire()} | + {error, uninitialized}. -callback insert(Topic :: binary(), mqtt_msg(), State :: any()) -> ok. @@ -47,25 +53,25 @@ -callback terminate(State :: any()) -> ok. --spec start(rabbit_types:vhost()) -> state(). +-spec start(rabbit_types:vhost()) -> {state(), expire()}. start(VHost) -> {ok, Mod} = application:get_env(?APP_NAME, retained_message_store), Dir = rabbit:data_dir(), ?LOG_INFO("Starting MQTT retained message store ~s for vhost '~ts'", [Mod, VHost]), - S = case Mod:recover(Dir, VHost) of - {ok, StoreState} -> - ?LOG_INFO("Recovered MQTT retained message store ~s for vhost '~ts'", - [Mod, VHost]), - StoreState; - {error, uninitialized} -> - StoreState = Mod:new(Dir, VHost), - ?LOG_INFO("Initialized MQTT retained message store ~s for vhost '~ts'", - [Mod, VHost]), - StoreState - end, - #?STATE{store_mod = Mod, - store_state = S}. + {S, Expire} = case Mod:recover(Dir, VHost) of + {ok, StoreState, Expire0} -> + ?LOG_INFO("Recovered MQTT retained message store ~s for vhost '~ts'", + [Mod, VHost]), + {StoreState, Expire0}; + {error, uninitialized} -> + StoreState = Mod:new(Dir, VHost), + ?LOG_INFO("Initialized MQTT retained message store ~s for vhost '~ts'", + [Mod, VHost]), + {StoreState, #{}} + end, + {#?STATE{store_mod = Mod, + store_state = S}, Expire}. -spec insert(Topic :: binary(), mqtt_msg(), state()) -> ok. insert(Topic, Msg, #?STATE{store_mod = Mod, @@ -87,3 +93,22 @@ delete(Topic, #?STATE{store_mod = Mod, terminate(#?STATE{store_mod = Mod, store_state = StoreState}) -> ok = Mod:terminate(StoreState). + +-spec expire(ets | dets, ets:tid() | dets:tab_name()) -> expire(). +expire(Mod, Tab) -> + Now = os:system_time(second), + Mod:foldl( + fun(#retained_message{topic = Topic, + mqtt_msg = #mqtt_msg{props = #{'Message-Expiry-Interval' := Expiry}, + timestamp = Timestamp}}, Acc) + when is_integer(Expiry) andalso + is_integer(Timestamp) -> + if Now - Timestamp >= Expiry -> + Mod:delete(Tab, Topic), + Acc; + true -> + maps:put(Topic, {Timestamp, Expiry}, Acc) + end; + (_, Acc) -> + Acc + end, #{}, Tab). diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store_dets.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store_dets.erl index 6c55a6407d..7d7a48e6aa 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store_dets.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store_dets.erl @@ -8,7 +8,9 @@ -module(rabbit_mqtt_retained_msg_store_dets). -behaviour(rabbit_mqtt_retained_msg_store). + -include("rabbit_mqtt_packet.hrl"). +-include_lib("kernel/include/logger.hrl"). -export([new/2, recover/2, insert/3, lookup/2, delete/2, terminate/1]). @@ -18,16 +20,22 @@ -spec new(file:name_all(), rabbit_types:vhost()) -> store_state(). new(Dir, VHost) -> - Tid = open_table(Dir, VHost), - #store_state{table = Tid}. + {ok, TabName} = open_table(Dir, VHost), + #store_state{table = TabName}. -spec recover(file:name_all(), rabbit_types:vhost()) -> - {error, uninitialized} | {ok, store_state()}. + {ok, store_state(), rabbit_mqtt_retained_msg_store:expire()} | + {error, uninitialized}. recover(Dir, VHost) -> - case open_table(Dir, VHost) of - {error, _} -> {error, uninitialized}; - {ok, Tid} -> {ok, #store_state{table = Tid}} - end. + case open_table(Dir, VHost) of + {ok, TabName} -> + {ok, + #store_state{table = TabName}, + rabbit_mqtt_retained_msg_store:expire(dets, TabName)}; + {error, Reason} -> + ?LOG_ERROR("~s failed to open table: ~p", [?MODULE, Reason]), + {error, uninitialized} + end. -spec insert(binary(), mqtt_msg(), store_state()) -> ok. insert(Topic, Msg, #store_state{table = T}) -> diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store_ets.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store_ets.erl index afb96501d7..129f870cac 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store_ets.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store_ets.erl @@ -8,6 +8,7 @@ -module(rabbit_mqtt_retained_msg_store_ets). -behaviour(rabbit_mqtt_retained_msg_store). + -include("rabbit_mqtt_packet.hrl"). -export([new/2, recover/2, insert/3, lookup/2, delete/2, terminate/1]). @@ -28,14 +29,19 @@ new(Dir, VHost) -> #store_state{table = Tid, filename = Path}. -spec recover(file:name_all(), rabbit_types:vhost()) -> - {error, uninitialized} | {ok, store_state()}. + {ok, store_state(), rabbit_mqtt_retained_msg_store:expire()} | + {error, uninitialized}. recover(Dir, VHost) -> - Path = rabbit_mqtt_util:path_for(Dir, VHost), - case ets:file2tab(Path) of - {ok, Tid} -> _ = file:delete(Path), - {ok, #store_state{table = Tid, filename = Path}}; - {error, _} -> {error, uninitialized} - end. + Path = rabbit_mqtt_util:path_for(Dir, VHost), + case ets:file2tab(Path) of + {ok, Tid} -> + _ = file:delete(Path), + {ok, + #store_state{table = Tid, filename = Path}, + rabbit_mqtt_retained_msg_store:expire(ets, Tid)}; + {error, _} -> + {error, uninitialized} + end. -spec insert(binary(), mqtt_msg(), store_state()) -> ok. insert(Topic, Msg, #store_state{table = T}) -> diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store_noop.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store_noop.erl index a2fb38e360..9ba6db5d6c 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store_noop.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store_noop.erl @@ -15,7 +15,7 @@ new(_Dir, _VHost) -> ok. recover(_Dir, _VHost) -> - {ok, ok}. + {ok, ok, #{}}. insert(_Topic, _Msg, _State) -> ok. diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_retainer.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_retainer.erl index f2adcb83ab..6887141e6b 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_retainer.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_retainer.erl @@ -18,6 +18,12 @@ -define(TIMEOUT, 30_000). +-define(STATE, ?MODULE). +-record(?STATE, {store_state :: rabbit_mqtt_retained_msg_store:state(), + expire :: #{Topic :: binary() := TimerRef :: reference()} + }). +-type state() :: #?STATE{}. + -spec start_link(rabbit_types:vhost()) -> gen_server:start_ret(). start_link(VHost) -> @@ -37,27 +43,88 @@ clear(Pid, Topic) -> gen_server:cast(Pid, {clear, Topic}). -spec init(rabbit_types:vhost()) -> - {ok, rabbit_mqtt_retained_msg_store:state()}. + {ok, state()}. init(VHost) -> process_flag(trap_exit, true), - State = rabbit_mqtt_retained_msg_store:start(VHost), - {ok, State}. + {StoreState, Expire0} = rabbit_mqtt_retained_msg_store:start(VHost), + Now = os:system_time(second), + Expire = maps:map(fun(Topic, {Timestamp, Expiry}) -> + TimerSecs = max(0, Expiry - (Now - Timestamp)), + start_timer(TimerSecs, Topic) + end, Expire0), + {ok, #?STATE{store_state = StoreState, + expire = Expire}}. -handle_cast({retain, Topic, Msg}, State) -> - ok = rabbit_mqtt_retained_msg_store:insert(Topic, Msg, State), - {noreply, State}; -handle_cast({clear, Topic}, State) -> - ok = rabbit_mqtt_retained_msg_store:delete(Topic, State), - {noreply, State}. +handle_cast({retain, Topic, Msg0 = #mqtt_msg{props = Props}}, + State = #?STATE{store_state = StoreState, + expire = Expire0}) -> + Expire2 = case maps:take(Topic, Expire0) of + {OldTimer, Expire1} -> + cancel_timer(OldTimer), + Expire1; + error -> + Expire0 + end, + {Msg, Expire} = case maps:find('Message-Expiry-Interval', Props) of + {ok, ExpirySeconds} -> + Timer = start_timer(ExpirySeconds, Topic), + {Msg0#mqtt_msg{timestamp = os:system_time(second)}, + maps:put(Topic, Timer, Expire2)}; + error -> + {Msg0, Expire2} + end, + ok = rabbit_mqtt_retained_msg_store:insert(Topic, Msg, StoreState), + {noreply, State#?STATE{expire = Expire}}; +handle_cast({clear, Topic}, State = #?STATE{store_state = StoreState, + expire = Expire0}) -> + Expire = case maps:take(Topic, Expire0) of + {OldTimer, Expire1} -> + cancel_timer(OldTimer), + Expire1; + error -> + Expire0 + end, + ok = rabbit_mqtt_retained_msg_store:delete(Topic, StoreState), + {noreply, State#?STATE{expire = Expire}}. -handle_call({fetch, Topic}, _From, State) -> - Reply = rabbit_mqtt_retained_msg_store:lookup(Topic, State), +handle_call({fetch, Topic}, _From, State = #?STATE{store_state = StoreState}) -> + Reply = case rabbit_mqtt_retained_msg_store:lookup(Topic, StoreState) of + #mqtt_msg{props = #{'Message-Expiry-Interval' := Expiry0} = Props, + timestamp = Timestamp} = MqttMsg -> + %% “The PUBLISH packet sent to a Client by the Server MUST contain a Message + %% Expiry Interval set to the received value minus the time that the + %% Application Message has been waiting in the Server [MQTT-3.3.2-6].” + Expiry = max(0, Expiry0 - (os:system_time(second) - Timestamp)), + MqttMsg#mqtt_msg{props = maps:put('Message-Expiry-Interval', Expiry, Props)}; + Other -> + Other + end, {reply, Reply, State}. +handle_info({timeout, Timer, Topic}, State = #?STATE{store_state = StoreState, + expire = Expire0}) -> + Expire = case maps:take(Topic, Expire0) of + {Timer, Expire1} -> + ok = rabbit_mqtt_retained_msg_store:delete(Topic, StoreState), + Expire1; + _ -> + Expire0 + end, + {noreply, State#?STATE{expire = Expire}}; handle_info(stop, State) -> {stop, normal, State}; handle_info(Info, State) -> {stop, {unknown_info, Info}, State}. -terminate(_Reason, State) -> - rabbit_mqtt_retained_msg_store:terminate(State). +terminate(_Reason, #?STATE{store_state = StoreState}) -> + rabbit_mqtt_retained_msg_store:terminate(StoreState). + +-spec start_timer(integer(), binary()) -> reference(). +start_timer(Seconds, Topic) + when is_binary(Topic) -> + erlang:start_timer(timer:seconds(Seconds), self(), Topic). + +-spec cancel_timer(reference()) -> ok. +cancel_timer(TimerRef) -> + ok = erlang:cancel_timer(TimerRef, [{async, true}, + {info, false}]). diff --git a/deps/rabbitmq_mqtt/test/retainer_SUITE.erl b/deps/rabbitmq_mqtt/test/retainer_SUITE.erl index 1637e88ffa..3cd81ae62f 100644 --- a/deps/rabbitmq_mqtt/test/retainer_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/retainer_SUITE.erl @@ -8,9 +8,11 @@ -compile([export_all, nowarn_export_all]). -include_lib("common_test/include/ct.hrl"). --import(util, [expect_publishes/3, - connect/2, - connect/3]). +-include_lib("eunit/include/eunit.hrl"). +-import(util, [connect/2, connect/3, + expect_publishes/3, + assert_message_expiry_interval/2 + ]). all() -> [ @@ -37,7 +39,8 @@ tests() -> should_translate_amqp2mqtt_on_publish, should_translate_amqp2mqtt_on_retention, should_translate_amqp2mqtt_on_retention_search, - recover + recover, + recover_with_message_expiry_interval ]. suite() -> @@ -91,6 +94,13 @@ end_per_group(_, Config) -> rabbit_ct_client_helpers:teardown_steps() ++ rabbit_ct_broker_helpers:teardown_steps()). +init_per_testcase(recover_with_message_expiry_interval = T, Config) -> + case ?config(mqtt_version, Config) of + v4 -> + {skip, "Message Expiry Interval not supported in MQTT v4"}; + v5 -> + rabbit_ct_helpers:testcase_started(Config, T) + end; init_per_testcase(Testcase, Config) -> rabbit_ct_helpers:testcase_started(Config, Testcase). @@ -172,3 +182,56 @@ recover(Config) -> {ok, _, _} = emqtt:subscribe(C2, Topic, qos1), ok = expect_publishes(C2, Topic, [Payload]), ok = emqtt:disconnect(C2). + +recover_with_message_expiry_interval(Config) -> + ClientId = atom_to_binary(?FUNCTION_NAME), + C1 = connect(ClientId, Config), + Start = os:system_time(second), + {ok, _} = emqtt:publish(C1, <<"topic/1">>, + <<"m1">>, [{retain, true}, {qos, 1}]), + {ok, _} = emqtt:publish(C1, <<"topic/2">>, #{'Message-Expiry-Interval' => 100}, + <<"m2">>, [{retain, true}, {qos, 1}]), + {ok, _} = emqtt:publish(C1, <<"topic/3">>, #{'Message-Expiry-Interval' => 3}, + <<"m3">>, [{retain, true}, {qos, 1}]), + {ok, _} = emqtt:publish(C1, <<"topic/4">>, #{'Message-Expiry-Interval' => 15}, + <<"m4">>, [{retain, true}, {qos, 1}]), + ok = emqtt:disconnect(C1), + %% Takes around 9 seconds on Linux. + ok = rabbit_ct_broker_helpers:restart_node(Config, 0), + C2 = connect(ClientId, Config), + + %% Retained message for topic/3 should have expired during node restart. + %% Wait for retained message for topic/4 to expire. + ElapsedSeconds1 = os:system_time(second) - Start, + SleepMs = max(0, timer:seconds(15 - ElapsedSeconds1 + 1)), + ct:pal("Sleeping for ~b ms", [SleepMs]), + timer:sleep(SleepMs), + + ElapsedSeconds2 = os:system_time(second) - Start, + {ok, _, [1,1,1,1]} = emqtt:subscribe(C2, [{<<"topic/1">>, qos1}, + {<<"topic/2">>, qos1}, + {<<"topic/3">>, qos1}, + {<<"topic/4">>, qos1}]), + receive {publish, #{client_pid := C2, + retain := true, + topic := <<"topic/1">>, + payload := <<"m1">>, + properties := Props}} + when map_size(Props) =:= 0 -> ok + after 100 -> ct:fail("did not topic/1") + end, + + receive {publish, #{client_pid := C2, + retain := true, + topic := <<"topic/2">>, + payload := <<"m2">>, + properties := #{'Message-Expiry-Interval' := MEI}}} -> + assert_message_expiry_interval(100 - ElapsedSeconds2, MEI) + after 100 -> ct:fail("did not topic/2") + end, + + receive Unexpected -> ct:fail("Received unexpectedly: ~p", [Unexpected]) + after 0 -> ok + end, + + ok = emqtt:disconnect(C2). diff --git a/deps/rabbitmq_mqtt/test/util.erl b/deps/rabbitmq_mqtt/test/util.erl index 57ea1e6ab1..97d80a2bdb 100644 --- a/deps/rabbitmq_mqtt/test/util.erl +++ b/deps/rabbitmq_mqtt/test/util.erl @@ -21,6 +21,7 @@ get_events/1, assert_event_type/2, assert_event_prop/2, + assert_message_expiry_interval/2, await_exit/1, await_exit/2, maybe_skip_v5/1 @@ -113,6 +114,12 @@ assert_event_prop(ExpectedProps, Event) assert_event_prop(P, Event) end, ExpectedProps). +assert_message_expiry_interval(Expected, Actual) + when is_integer(Expected), + is_integer(Actual) -> + ?assert(Expected >= Actual - 1), + ?assert(Expected =< Actual + 1). + await_exit(Pid) -> receive {'EXIT', Pid, _} -> ok diff --git a/deps/rabbitmq_mqtt/test/v5_SUITE.erl b/deps/rabbitmq_mqtt/test/v5_SUITE.erl index c9034677fc..4b2cf932f2 100644 --- a/deps/rabbitmq_mqtt/test/v5_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/v5_SUITE.erl @@ -16,7 +16,8 @@ -import(util, [ start_client/4, - connect/2, connect/3, connect/4 + connect/2, connect/3, connect/4, + assert_message_expiry_interval/2 ]). all() -> @@ -40,6 +41,7 @@ cluster_size_1_tests() -> client_set_max_packet_size_invalid, message_expiry_interval, message_expiry_interval_will_message, + message_expiry_interval_retained_message, client_publish_qos2 ]. @@ -166,7 +168,8 @@ message_expiry_interval(Config) -> %% "The PUBLISH packet sent to a Client by the Server MUST contain a Message %% Expiry Interval set to the received value minus the time that the %% Application Message has been waiting in the Server" [MQTT-3.3.2-6] - properties := #{'Message-Expiry-Interval' := 10-2}}} -> ok + properties := #{'Message-Expiry-Interval' := MEI}}} -> + assert_message_expiry_interval(10 - 2, MEI) after 100 -> ct:fail("did not receive m3") end, assert_nothing_received(), @@ -206,13 +209,63 @@ message_expiry_interval_will_message(Config) -> assert_nothing_received(), ok = emqtt:disconnect(Sub2). +message_expiry_interval_retained_message(Config) -> + Pub = connect(<<"publisher">>, Config), + + {ok, _} = emqtt:publish(Pub, <<"topic1">>, #{'Message-Expiry-Interval' => 100}, + <<"m1.1">>, [{retain, true}, {qos, 1}]), + {ok, _} = emqtt:publish(Pub, <<"topic2">>, #{'Message-Expiry-Interval' => 2}, + <<"m2">>, [{retain, true}, {qos, 1}]), + {ok, _} = emqtt:publish(Pub, <<"topic3">>, #{'Message-Expiry-Interval' => 100}, + <<"m3.1">>, [{retain, true}, {qos, 1}]), + {ok, _} = emqtt:publish(Pub, <<"topic4">>, #{'Message-Expiry-Interval' => 100}, + <<"m4">>, [{retain, true}, {qos, 1}]), + + {ok, _} = emqtt:publish(Pub, <<"topic1">>, #{'Message-Expiry-Interval' => 2}, + <<"m1.2">>, [{retain, true}, {qos, 1}]), + {ok, _} = emqtt:publish(Pub, <<"topic2">>, #{'Message-Expiry-Interval' => 2}, + <<>>, [{retain, true}, {qos, 1}]), + {ok, _} = emqtt:publish(Pub, <<"topic3">>, #{}, + <<"m3.2">>, [{retain, true}, {qos, 1}]), + timer:sleep(2001), + %% Expectations: + %% topic1 expired because 2 seconds elapsed + %% topic2 is not retained because it got deleted + %% topic3 is retained because its new message does not have an Expiry-Interval set + %% topic4 is retained because 100 seconds have not elapsed + Sub = connect(<<"subscriber">>, Config), + {ok, _, [1,1,1,1]} = emqtt:subscribe(Sub, [{<<"topic1">>, qos1}, + {<<"topic2">>, qos1}, + {<<"topic3">>, qos1}, + {<<"topic4">>, qos1}]), + receive {publish, #{client_pid := Sub, + retain := true, + topic := <<"topic3">>, + payload := <<"m3.2">>, + properties := Props}} + when map_size(Props) =:= 0 -> ok + after 100 -> ct:fail("did not topic3") + end, + + receive {publish, #{client_pid := Sub, + retain := true, + topic := <<"topic4">>, + payload := <<"m4">>, + properties := #{'Message-Expiry-Interval' := MEI}}} -> + assert_message_expiry_interval(100 - 2, MEI) + after 100 -> ct:fail("did not receive topic4") + end, + assert_nothing_received(), + + ok = emqtt:disconnect(Pub), + ok = emqtt:disconnect(Sub). + client_publish_qos2(Config) -> Topic = ClientId = atom_to_binary(?FUNCTION_NAME), {C, Connect} = start_client(ClientId, Config, 0, []), - {ok, Props} = Connect(C), - ?assertEqual(1, maps:get('Maximum-QoS', Props)), - {error, Response} = emqtt:publish(C, Topic, #{}, <<"msg">>, [{qos, 2}]), - ?assertEqual({disconnected, 155, #{}}, Response). + ?assertMatch({ok, #{'Maximum-QoS' := 1}}, Connect(C)), + ?assertEqual({error, {disconnected, _RcQosNotSupported = 155, #{}}}, + emqtt:publish(C, Topic, <<"msg">>, [{qos, 2}])). satisfy_bazel(_Config) -> ok. diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index eb5e35c7b8..e01542da37 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -237,7 +237,7 @@ init([KeepaliveSup, peer_host = PeerHost, port = Port, peer_port = PeerPort, - connected_at = os:system_time(milli_seconds), + connected_at = os:system_time(millisecond), auth_mechanism = none, helper_sup = KeepaliveSup, socket = RealSocket,