diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl index 4dbe0ec026..764e921f4a 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl @@ -74,8 +74,7 @@ init(Ref) -> {ok, ConnStr} -> ConnName = rabbit_data_coercion:to_binary(ConnStr), rabbit_log_connection:debug("MQTT accepting TCP connection ~tp (~ts)", [self(), ConnName]), - rabbit_alarm:register( - self(), {?MODULE, conserve_resources, []}), + rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}), LoginTimeout = application:get_env(?APP_NAME, login_timeout, 10_000), erlang:send_after(LoginTimeout, self(), login_timeout), ProcessorState = rabbit_mqtt_processor:initial_state(RealSocket, ConnName), diff --git a/deps/rabbitmq_mqtt/test/reader_SUITE.erl b/deps/rabbitmq_mqtt/test/reader_SUITE.erl index 85adc75591..17822b8d41 100644 --- a/deps/rabbitmq_mqtt/test/reader_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/reader_SUITE.erl @@ -110,15 +110,15 @@ block(Config) -> % %% Let it block timer:sleep(100), - %% Blocked, but still will publish + %% Blocked, but still will publish when unblocked puback_timeout = publish_qos1_timeout(C, <<"Topic1">>, <<"Now blocked">>, 1000), puback_timeout = publish_qos1_timeout(C, <<"Topic1">>, <<"Still blocked">>, 1000), %% Unblock rpc(Config, vm_memory_monitor, set_vm_memory_high_watermark, [0.4]), ok = expect_publishes(<<"Topic1">>, [<<"Not blocked yet">>, - <<"Now blocked">>, - <<"Still blocked">>]), + <<"Now blocked">>, + <<"Still blocked">>]), ok = emqtt:disconnect(C). block_connack_timeout(Config) -> diff --git a/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl b/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl index 1531d7bba0..8901256174 100644 --- a/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl +++ b/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl @@ -18,7 +18,9 @@ websocket_info/2, terminate/3 ]). --export([close_connection/2]). + +-export([conserve_resources/3, + close_connection/2]). %% cowboy_sub_protocol -export([upgrade/4, @@ -97,6 +99,7 @@ websocket_init({State0 = #state{socket = Sock}, PeerAddr}) -> {ok, ConnStr} -> ConnName = rabbit_data_coercion:to_binary(ConnStr), rabbit_log_connection:info("Accepting Web MQTT connection ~s", [ConnName]), + rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}), PState = rabbit_mqtt_processor:initial_state( rabbit_net:unwrap_socket(Sock), ConnName, @@ -118,6 +121,13 @@ close_connection(Pid, Reason) -> sys:terminate(Pid, Reason), ok. +-spec conserve_resources(pid(), + rabbit_alarm:resource_alarm_source(), + rabbit_alarm:resource_alert()) -> ok. +conserve_resources(Pid, _, {_, Conserve, _}) -> + Pid ! {conserve_resources, Conserve}, + ok. + -spec websocket_handle(ping | pong | {text | binary | ping | pong, binary()}, State) -> {cowboy_websocket:commands(), State} | {cowboy_websocket:commands(), State, hibernate}. @@ -140,8 +150,7 @@ websocket_handle(Frame, State) -> {cowboy_websocket:commands(), State} | {cowboy_websocket:commands(), State, hibernate}. websocket_info({conserve_resources, Conserve}, State) -> - NewState = State#state{conserve = Conserve}, - handle_credits(NewState); + handle_credits(State#state{conserve = Conserve}); websocket_info({bump_credit, Msg}, State) -> credit_flow:handle_bump_msg(Msg), handle_credits(State); @@ -305,12 +314,12 @@ stop(State, CloseCode, Error0) -> {[{close, CloseCode, Error}], State}. handle_credits(State0) -> - case control_throttle(State0) of - State = #state{connection_state = running} -> - {[{active, true}], State, hibernate}; - State -> - {[], State, hibernate} - end. + State = #state{connection_state = CS} = control_throttle(State0), + Active = case CS of + running -> true; + blocked -> false + end, + {[{active, Active}], State, hibernate}. control_throttle(State = #state{connection_state = CS, conserve = Conserve, diff --git a/deps/rabbitmq_web_mqtt/test/system_SUITE.erl b/deps/rabbitmq_web_mqtt/test/system_SUITE.erl index 1224fc2bcc..573891623a 100644 --- a/deps/rabbitmq_web_mqtt/test/system_SUITE.erl +++ b/deps/rabbitmq_web_mqtt/test/system_SUITE.erl @@ -33,7 +33,7 @@ all() -> groups() -> [ {non_parallel_tests, [], - [connection + [block , pubsub_shared_connection , pubsub_separate_connections , last_will_enabled_disconnect @@ -99,8 +99,30 @@ maybe_stop_inets(Testcase) -> ok end. -connection(Config) -> - C = ws_connect(?FUNCTION_NAME, Config), +%% ------------------------------------------------------------------- +%% Testsuite cases +%% ------------------------------------------------------------------- + +block(Config) -> + Topic = ClientId = atom_to_binary(?FUNCTION_NAME), + C = ws_connect(ClientId, Config), + + {ok, _, _} = emqtt:subscribe(C, Topic), + {ok, _} = emqtt:publish(C, Topic, <<"Not blocked yet">>, [{qos, 1}]), + + ok = rpc(Config, 0, vm_memory_monitor, set_vm_memory_high_watermark, [0.00000001]), + %% Let it block + timer:sleep(100), + + %% Blocked, but still will publish when unblocked + puback_timeout = publish_qos1_timeout(C, Topic, <<"Now blocked">>, 1000), + puback_timeout = publish_qos1_timeout(C, Topic, <<"Still blocked">>, 1000), + + %% Unblock + rpc(Config, 0, vm_memory_monitor, set_vm_memory_high_watermark, [0.4]), + ok = expect_publishes(C, Topic, [<<"Not blocked yet">>, + <<"Now blocked">>, + <<"Still blocked">>]), ok = emqtt:disconnect(C). pubsub_shared_connection(Config) -> @@ -332,3 +354,22 @@ expect_publishes(ClientPid, Topic, [Payload|Rest]) -> after 1000 -> {publish_not_received, Payload} end. + +publish_qos1_timeout(Client, Topic, Payload, Timeout) -> + Mref = erlang:monitor(process, Client), + ok = emqtt:publish_async(Client, Topic, #{}, Payload, [{qos, 1}], infinity, + {fun ?MODULE:sync_publish_result/3, [self(), Mref]}), + receive + {Mref, Reply} -> + erlang:demonitor(Mref, [flush]), + Reply; + {'DOWN', Mref, process, Client, Reason} -> + ct:fail("client is down: ~tp", [Reason]) + after + Timeout -> + erlang:demonitor(Mref, [flush]), + puback_timeout + end. + +sync_publish_result(Caller, Mref, Result) -> + erlang:send(Caller, {Mref, Result}).