Merge pull request #8098 from rabbitmq/mqtt-connection-closed-event

Add MQTT client id to connection closed event
This commit is contained in:
Michael Klishin 2023-05-05 00:20:36 +04:00 committed by GitHub
commit e8ffc45cc9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 26 additions and 31 deletions

View File

@ -50,7 +50,8 @@
messages_unacknowledged
]).
-define(CREATION_EVENT_KEYS,
%% Connection opened or closed.
-define(EVENT_KEYS,
?ITEMS ++
[name,
client_properties,

View File

@ -149,7 +149,6 @@ process_connect(
{ok, RaRegisterState} ?= register_client_id(VHost, ClientId),
{TraceState, ConnName} = init_trace(VHost, ConnName0),
ok = rabbit_mqtt_keepalive:start(KeepaliveSecs, Socket),
self() ! connection_created,
{ok,
#state{
cfg = #cfg{socket = Socket,
@ -207,6 +206,7 @@ process_connect(State0) ->
{ok, SessPresent, State2} ?= handle_clean_sess_qos1(QoS0SessPresent, State1),
State = cache_subscriptions(SessPresent, State2),
rabbit_networking:register_non_amqp_connection(self()),
self() ! connection_created,
{ok, SessPresent, State}
else
{error, _} = Error ->
@ -1190,19 +1190,9 @@ send(Packet, ProtoVer, SendFun) ->
Data = rabbit_mqtt_packet:serialise(Packet, ProtoVer),
ok = SendFun(Data).
-spec terminate(boolean(), binary(), atom(), state()) -> ok.
terminate(SendWill, ConnName, ProtoFamily,
State = #state{cfg = #cfg{vhost = VHost},
auth_state = #auth_state{user = #user{username = Username}}
}) ->
-spec terminate(boolean(), binary(), rabbit_event:event_props(), state()) -> ok.
terminate(SendWill, ConnName, Infos, State) ->
maybe_send_will(SendWill, ConnName, State),
Infos = [{name, ConnName},
{node, node()},
{pid, self()},
{disconnected_at, os:system_time(milli_seconds)},
{protocol, {ProtoFamily, proto_version_tuple(State)}},
{vhost, VHost},
{user, Username}],
rabbit_core_metrics:connection_closed(self()),
rabbit_event:notify(connection_closed, Infos),
rabbit_networking:unregister_non_amqp_connection(self()),

View File

@ -27,7 +27,6 @@
-type option(T) :: undefined | T.
-define(HIBERNATE_AFTER, 1000).
-define(PROTO_FAMILY, 'MQTT').
-record(state,
{socket :: rabbit_net:socket(),
@ -140,7 +139,7 @@ handle_cast(QueueEvent = {queue_event, _, _},
end;
handle_cast({force_event_refresh, Ref}, State0) ->
Infos = infos(?CREATION_EVENT_KEYS, State0),
Infos = infos(?EVENT_KEYS, State0),
rabbit_event:notify(connection_created, Infos, Ref),
State = rabbit_event:init_stats_timer(State0, #state.stats_timer),
{noreply, State, ?HIBERNATE_AFTER};
@ -154,7 +153,7 @@ handle_cast(Msg, State) ->
{stop, {mqtt_unexpected_cast, Msg}, State}.
handle_info(connection_created, State) ->
Infos = infos(?CREATION_EVENT_KEYS, State),
Infos = infos(?EVENT_KEYS, State),
rabbit_core_metrics:connection_created(self(), Infos),
rabbit_event:notify(connection_created, Infos),
{noreply, State, ?HIBERNATE_AFTER};
@ -265,7 +264,8 @@ terminate(Reason, {SendWill, State = #state{conn_name = ConnName,
connect_packet_unprocessed ->
ok;
_ ->
rabbit_mqtt_processor:terminate(SendWill, ConnName, ?PROTO_FAMILY, PState)
Infos = infos(?EVENT_KEYS, State),
rabbit_mqtt_processor:terminate(SendWill, ConnName, Infos, PState)
end,
log_terminate(Reason, State).
@ -514,7 +514,7 @@ i(Cert, #state{socket = Sock})
i(timeout, #state{keepalive = KState}) ->
rabbit_mqtt_keepalive:interval_secs(KState);
i(protocol, #state{proc_state = ProcState}) ->
{?PROTO_FAMILY, rabbit_mqtt_processor:proto_version_tuple(ProcState)};
{'MQTT', rabbit_mqtt_processor:proto_version_tuple(ProcState)};
i(Key, #state{proc_state = ProcState}) ->
rabbit_mqtt_processor:info(Key, ProcState).

View File

@ -438,12 +438,13 @@ events(Config) ->
true -> 'Web MQTT';
false -> 'MQTT'
end,
ExpectedConnectionProps = [{protocol, {Proto, {3,1,1}}},
{node, Server},
{vhost, <<"/">>},
{user, <<"guest">>},
{pid, ConnectionPid}],
assert_event_prop(ExpectedConnectionProps, E1),
assert_event_prop([{protocol, {Proto, {3,1,1}}},
{node, Server},
{vhost, <<"/">>},
{user, <<"guest">>},
{client_properties, [{client_id, longstr, ClientId}]},
{pid, ConnectionPid}],
E1),
{ok, _, _} = emqtt:subscribe(C, <<"TopicA">>, qos0),
@ -490,7 +491,10 @@ events(Config) ->
[E6, E7 | E8] = get_events(Server),
assert_event_type(connection_closed, E6),
assert_event_prop(ExpectedConnectionProps, E6),
?assertEqual(E1#event.props, E6#event.props,
"connection_closed event props should match connection_created event props. "
"See https://github.com/rabbitmq/rabbitmq-server/discussions/6331"),
case is_feature_flag_enabled(Config, rabbit_mqtt_qos0_queue) of
true ->
assert_event_type(queue_deleted, E7),

View File

@ -48,7 +48,6 @@
-define(CLOSE_NORMAL, 1000).
-define(CLOSE_PROTOCOL_ERROR, 1002).
-define(CLOSE_UNACCEPTABLE_DATA_TYPE, 1003).
-define(PROTO_FAMILY, 'Web MQTT').
%% cowboy_sub_protcol
upgrade(Req, Env, Handler, HandlerState) ->
@ -164,7 +163,7 @@ websocket_info({'$gen_cast', {close_connection, Reason}}, State = #state{ proc_s
[rabbit_mqtt_processor:info(client_id, ProcState), ConnName, Reason]),
stop(State);
websocket_info({'$gen_cast', {force_event_refresh, Ref}}, State0) ->
Infos = infos(?CREATION_EVENT_KEYS, State0),
Infos = infos(?EVENT_KEYS, State0),
rabbit_event:notify(connection_created, Infos, Ref),
State = rabbit_event:init_stats_timer(State0, #state.stats_timer),
{[], State, hibernate};
@ -209,7 +208,7 @@ websocket_info({shutdown, Reason}, #state{conn_name = ConnName} = State) ->
?LOG_INFO("Web MQTT closing connection ~tp: ~tp", [ConnName, Reason]),
stop(State, ?CLOSE_NORMAL, Reason);
websocket_info(connection_created, State) ->
Infos = infos(?CREATION_EVENT_KEYS, State),
Infos = infos(?EVENT_KEYS, State),
rabbit_core_metrics:connection_created(self(), Infos),
rabbit_event:notify(connection_created, Infos),
{[], State, hibernate};
@ -231,7 +230,8 @@ terminate(_Reason, _Request,
connect_packet_unprocessed ->
ok;
_ ->
rabbit_mqtt_processor:terminate(SendWill, ConnName, ?PROTO_FAMILY, PState)
Infos = infos(?EVENT_KEYS, State),
rabbit_mqtt_processor:terminate(SendWill, ConnName, Infos, PState)
end.
%% Internal.
@ -395,7 +395,7 @@ i(reductions, _) ->
i(garbage_collection, _) ->
rabbit_misc:get_gc_info(self());
i(protocol, #state{proc_state = PState}) ->
{?PROTO_FAMILY, rabbit_mqtt_processor:proto_version_tuple(PState)};
{'Web MQTT', rabbit_mqtt_processor:proto_version_tuple(PState)};
i(SSL, #state{socket = Sock})
when SSL =:= ssl;
SSL =:= ssl_protocol;