DISCONNECT v5 clients with Server Shutting Down
When RabbitMQ enters maintenance mode / is being drained, all client connections are closed. This commit sends a DISCONNECT packet to (Web) MQTT 5.0 clients with Reason Code "Server shutting down" before the connection is closed.
This commit is contained in:
		
							parent
							
								
									8c0b0e9338
								
							
						
					
					
						commit
						23837c5270
					
				| 
						 | 
				
			
			@ -71,7 +71,7 @@ emit_connection_info(Items, Ref, AggregatorPid, Pids) ->
 | 
			
		|||
              rabbit_mqtt_reader:info(Pid, Items)
 | 
			
		||||
      end, Pids).
 | 
			
		||||
 | 
			
		||||
-spec close_local_client_connections(string() | binary()) -> {'ok', non_neg_integer()}.
 | 
			
		||||
-spec close_local_client_connections(atom()) -> {'ok', non_neg_integer()}.
 | 
			
		||||
close_local_client_connections(Reason) ->
 | 
			
		||||
    Pids = local_connection_pids(),
 | 
			
		||||
    lists:foreach(fun(Pid) ->
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -27,9 +27,8 @@ handle_event({event, vhost_deleted, Info, _, _}, ?STATE) ->
 | 
			
		|||
    rabbit_mqtt_retainer_sup:delete_child_for_vhost(Name),
 | 
			
		||||
    {ok, ?STATE};
 | 
			
		||||
handle_event({event, maintenance_connections_closed, _Info, _, _}, ?STATE) ->
 | 
			
		||||
    %% we should close our connections
 | 
			
		||||
    {ok, NConnections} = rabbit_mqtt:close_local_client_connections("node is being put into maintenance mode"),
 | 
			
		||||
    rabbit_log:warning("Closed ~b local MQTT client connections", [NConnections]),
 | 
			
		||||
    {ok, NConnections} = rabbit_mqtt:close_local_client_connections(maintenance),
 | 
			
		||||
    rabbit_log:warning("Closed ~b local (Web) MQTT client connections", [NConnections]),
 | 
			
		||||
    {ok, ?STATE};
 | 
			
		||||
handle_event(_Event, ?STATE) ->
 | 
			
		||||
    {ok, ?STATE}.
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -132,7 +132,11 @@ handle_cast({close_connection, Reason},
 | 
			
		|||
            State = #state{conn_name = ConnName, proc_state = PState}) ->
 | 
			
		||||
    ?LOG_WARNING("MQTT disconnecting client ~tp with client ID '~ts', reason: ~ts",
 | 
			
		||||
                 [ConnName, rabbit_mqtt_processor:info(client_id, PState), Reason]),
 | 
			
		||||
    {stop, {shutdown, server_initiated_close}, State};
 | 
			
		||||
    case Reason of
 | 
			
		||||
        maintenance -> rabbit_mqtt_processor:send_disconnect(?RC_SERVER_SHUTTING_DOWN, PState);
 | 
			
		||||
        _ -> ok
 | 
			
		||||
    end,
 | 
			
		||||
    {stop, {shutdown, {disconnect, server_initiated}}, State};
 | 
			
		||||
 | 
			
		||||
handle_cast(QueueEvent = {queue_event, _, _},
 | 
			
		||||
            State = #state{proc_state = PState0}) ->
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -48,6 +48,7 @@
 | 
			
		|||
         http_delete/3]).
 | 
			
		||||
 | 
			
		||||
%% defined in MQTT v5 (not in v4 or v3)
 | 
			
		||||
-define(RC_SERVER_SHUTTING_DOWN, 16#8B).
 | 
			
		||||
-define(RC_KEEP_ALIVE_TIMEOUT, 16#8D).
 | 
			
		||||
-define(RC_SESSION_TAKEN_OVER, 16#8E).
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -1131,6 +1132,7 @@ maintenance(Config) ->
 | 
			
		|||
    C0 = connect(<<"client-0">>, Config, 0, []),
 | 
			
		||||
    C1a = connect(<<"client-1a">>, Config, 1, []),
 | 
			
		||||
    C1b = connect(<<"client-1b">>, Config, 1, []),
 | 
			
		||||
    ClientsNode1 = [C1a, C1b],
 | 
			
		||||
 | 
			
		||||
    timer:sleep(500),
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -1141,12 +1143,14 @@ maintenance(Config) ->
 | 
			
		|||
 | 
			
		||||
    process_flag(trap_exit, true),
 | 
			
		||||
    ok = drain_node(Config, 1),
 | 
			
		||||
    [await_exit(Pid) || Pid <- [C1a, C1b]],
 | 
			
		||||
    [await_exit(Pid) || Pid <- ClientsNode1],
 | 
			
		||||
    [assert_v5_disconnect_reason_code(Config, ?RC_SERVER_SHUTTING_DOWN) || _ <- ClientsNode1],
 | 
			
		||||
    ok = revive_node(Config, 1),
 | 
			
		||||
    ?assert(erlang:is_process_alive(C0)),
 | 
			
		||||
 | 
			
		||||
    ok = drain_node(Config, 0),
 | 
			
		||||
    await_exit(C0),
 | 
			
		||||
    assert_v5_disconnect_reason_code(Config, ?RC_SERVER_SHUTTING_DOWN),
 | 
			
		||||
    ok = revive_node(Config, 0).
 | 
			
		||||
 | 
			
		||||
keepalive(Config) ->
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -36,6 +36,7 @@
 | 
			
		|||
-define(RC_DISCONNECT_WITH_WILL, 16#04).
 | 
			
		||||
-define(RC_NO_SUBSCRIPTION_EXISTED, 16#11).
 | 
			
		||||
-define(RC_UNSPECIFIED_ERROR, 16#80).
 | 
			
		||||
-define(RC_SERVER_SHUTTING_DOWN, 16#8B).
 | 
			
		||||
-define(RC_SESSION_TAKEN_OVER, 16#8E).
 | 
			
		||||
 | 
			
		||||
all() ->
 | 
			
		||||
| 
						 | 
				
			
			@ -1672,21 +1673,24 @@ will_delay_node_restart(Config) ->
 | 
			
		|||
    Topic = <<"my/topic">>,
 | 
			
		||||
    Payload = <<"my-will">>,
 | 
			
		||||
 | 
			
		||||
    Sub1a = connect(<<"sub1">>, Config, 0, [{properties, #{'Session-Expiry-Interval' => 900}}]),
 | 
			
		||||
    {ok, _, [0]} = emqtt:subscribe(Sub1a, Topic),
 | 
			
		||||
    Sub2 = connect(<<"sub2">>, Config, 1, []),
 | 
			
		||||
    {ok, _, [0]} = emqtt:subscribe(Sub2, Topic),
 | 
			
		||||
    Sub0a = connect(<<"sub0">>, Config, 0, [{properties, #{'Session-Expiry-Interval' => 900}}]),
 | 
			
		||||
    {ok, _, [0]} = emqtt:subscribe(Sub0a, Topic),
 | 
			
		||||
    Sub1 = connect(<<"sub1">>, Config, 1, []),
 | 
			
		||||
    {ok, _, [0]} = emqtt:subscribe(Sub1, Topic),
 | 
			
		||||
    WillDelaySecs = 10,
 | 
			
		||||
    Ca = connect(<<"will">>, Config, 0,
 | 
			
		||||
                 [{properties, #{'Session-Expiry-Interval' => 900}},
 | 
			
		||||
                  {will_props, #{'Will-Delay-Interval' => WillDelaySecs}},
 | 
			
		||||
                  {will_topic, Topic},
 | 
			
		||||
                  {will_qos, 0},
 | 
			
		||||
                  {will_payload, Payload}]),
 | 
			
		||||
    unlink(Sub1a),
 | 
			
		||||
    unlink(Ca),
 | 
			
		||||
    C0a = connect(<<"will">>, Config, 0,
 | 
			
		||||
                  [{properties, #{'Session-Expiry-Interval' => 900}},
 | 
			
		||||
                   {will_props, #{'Will-Delay-Interval' => WillDelaySecs}},
 | 
			
		||||
                   {will_topic, Topic},
 | 
			
		||||
                   {will_qos, 0},
 | 
			
		||||
                   {will_payload, Payload}]),
 | 
			
		||||
    ClientsNode0 = [Sub0a, C0a],
 | 
			
		||||
    [unlink(C) || C <- ClientsNode0],
 | 
			
		||||
    T = erlang:monotonic_time(millisecond),
 | 
			
		||||
    ok = rabbit_ct_broker_helpers:drain_node(Config, 0),
 | 
			
		||||
    [receive {disconnected, ?RC_SERVER_SHUTTING_DOWN, #{}} -> ok
 | 
			
		||||
     after 10_000 -> ct:fail("server did not disconnect us")
 | 
			
		||||
     end || _ <- ClientsNode0],
 | 
			
		||||
    ok = rabbit_ct_broker_helpers:stop_node(Config, 0),
 | 
			
		||||
    ElapsedMs = erlang:monotonic_time(millisecond) - T,
 | 
			
		||||
    SleepMs = max(0, timer:seconds(WillDelaySecs) - ElapsedMs),
 | 
			
		||||
| 
						 | 
				
			
			@ -1695,20 +1699,20 @@ will_delay_node_restart(Config) ->
 | 
			
		|||
    assert_nothing_received(),
 | 
			
		||||
    ok = rabbit_ct_broker_helpers:start_node(Config, 0),
 | 
			
		||||
    %% After node 0 restarts, we should receive the Will Message promptly on both nodes 0 and 1.
 | 
			
		||||
    receive {publish, #{client_pid := Sub2,
 | 
			
		||||
    receive {publish, #{client_pid := Sub1,
 | 
			
		||||
                        payload := Payload}} -> ok
 | 
			
		||||
    after 1000 -> ct:fail("did not receive Will Message on node 1")
 | 
			
		||||
    end,
 | 
			
		||||
    Sub1b = connect(<<"sub1">>, Config, 0, [{clean_start, false}]),
 | 
			
		||||
    receive {publish, #{client_pid := Sub1b,
 | 
			
		||||
    Sub0b = connect(<<"sub0">>, Config, 0, [{clean_start, false}]),
 | 
			
		||||
    receive {publish, #{client_pid := Sub0b,
 | 
			
		||||
                        payload := Payload}} -> ok
 | 
			
		||||
    after 1000 -> ct:fail("did not receive Will Message on node 0")
 | 
			
		||||
    end,
 | 
			
		||||
 | 
			
		||||
    ok = emqtt:disconnect(Sub1b),
 | 
			
		||||
    ok = emqtt:disconnect(Sub2),
 | 
			
		||||
    Cb = connect(<<"will">>, Config),
 | 
			
		||||
    ok = emqtt:disconnect(Cb).
 | 
			
		||||
    ok = emqtt:disconnect(Sub0b),
 | 
			
		||||
    ok = emqtt:disconnect(Sub1),
 | 
			
		||||
    C0b = connect(<<"will">>, Config),
 | 
			
		||||
    ok = emqtt:disconnect(C0b).
 | 
			
		||||
 | 
			
		||||
session_migrate_v3_v5(Config) ->
 | 
			
		||||
    session_switch_v3_v5(Config, true).
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -45,6 +45,7 @@
 | 
			
		|||
 | 
			
		||||
%% Close frame status codes as defined in https://www.rfc-editor.org/rfc/rfc6455#section-7.4.1
 | 
			
		||||
-define(CLOSE_NORMAL, 1000).
 | 
			
		||||
-define(CLOSE_SERVER_GOING_DOWN, 1001).
 | 
			
		||||
-define(CLOSE_PROTOCOL_ERROR, 1002).
 | 
			
		||||
-define(CLOSE_UNACCEPTABLE_DATA_TYPE, 1003).
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -163,11 +164,18 @@ websocket_info({'$gen_cast', {duplicate_id, SendWill}},
 | 
			
		|||
    rabbit_mqtt_processor:send_disconnect(?RC_SESSION_TAKEN_OVER, ProcState),
 | 
			
		||||
    defer_close(?CLOSE_NORMAL, SendWill),
 | 
			
		||||
    {[], State};
 | 
			
		||||
websocket_info({'$gen_cast', {close_connection, Reason}}, State = #state{ proc_state = ProcState,
 | 
			
		||||
                                                                          conn_name = ConnName }) ->
 | 
			
		||||
websocket_info({'$gen_cast', {close_connection, Reason}}, State = #state{proc_state = ProcState,
 | 
			
		||||
                                                                         conn_name = ConnName}) ->
 | 
			
		||||
    ?LOG_WARNING("Web MQTT disconnecting client with ID '~s' (~p), reason: ~s",
 | 
			
		||||
                 [rabbit_mqtt_processor:info(client_id, ProcState), ConnName, Reason]),
 | 
			
		||||
    stop(State);
 | 
			
		||||
    case Reason of
 | 
			
		||||
        maintenance ->
 | 
			
		||||
            rabbit_mqtt_processor:send_disconnect(?RC_SERVER_SHUTTING_DOWN, ProcState),
 | 
			
		||||
            defer_close(?CLOSE_SERVER_GOING_DOWN),
 | 
			
		||||
            {[], State};
 | 
			
		||||
        _ ->
 | 
			
		||||
            stop(State)
 | 
			
		||||
    end;
 | 
			
		||||
websocket_info({'$gen_cast', {force_event_refresh, Ref}}, State0) ->
 | 
			
		||||
    Infos = infos(?EVENT_KEYS, State0),
 | 
			
		||||
    rabbit_event:notify(connection_created, Infos, Ref),
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue