Emit events on stream consume and cancel

This commit is contained in:
Arnaud Cogoluègnes 2025-01-20 16:14:58 +01:00
parent a8c8cf2fd9
commit 31a4d611f1
No known key found for this signature in database
GPG Key ID: D5C8C4DFAD43AFA8
8 changed files with 300 additions and 58 deletions

View File

@ -313,7 +313,8 @@ consume(Q, Spec, #stream_client{} = QState0)
consumer_tag := ConsumerTag,
exclusive_consume := ExclusiveConsume,
args := Args,
ok_msg := OkMsg} = Spec,
ok_msg := OkMsg,
acting_user := ActingUser} = Spec,
QName = amqqueue:get_name(Q),
rabbit_log:debug("~s:~s Local pid resolved ~0p",
[?MODULE, ?FUNCTION_NAME, LocalPid]),
@ -330,6 +331,15 @@ consume(Q, Spec, #stream_client{} = QState0)
rabbit_core_metrics:consumer_created(
ChPid, ConsumerTag, ExclusiveConsume, AckRequired,
QName, ConsumerPrefetchCount, true, up, Args),
rabbit_event:notify(consumer_created,
[{consumer_tag, ConsumerTag},
{exclusive, ExclusiveConsume},
{ack_required, AckRequired},
{channel, ChPid},
{queue, QName},
{prefetch_count, ConsumerPrefetchCount},
{arguments, Args},
{user_who_performed_action, ActingUser}]),
%% reply needs to be sent before the stream
%% begins sending
maybe_send_reply(ChPid, OkMsg),

View File

@ -0,0 +1,59 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 2.0 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at https://www.mozilla.org/en-US/MPL/2.0/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is Pivotal Software, Inc.
%% Copyright (c) 2025 Broadcom. All Rights Reserved.
%% The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%
-module(rabbit_list_test_event_handler).
-behaviour(gen_event).
-export([start_link/0, stop/0, get_events/0, clear_events/0]).
%% callbacks
-export([init/1, handle_event/2, handle_call/2, handle_info/2, terminate/2, code_change/3]).
start_link() ->
gen_event:start_link({local, ?MODULE}).
stop() ->
gen_event:stop(?MODULE).
get_events() ->
gen_event:call(?MODULE, ?MODULE, get_events).
clear_events() ->
gen_event:call(?MODULE, ?MODULE, clear_events).
%% Callbacks
init([]) ->
{ok, []}.
handle_event(Event, State) ->
{ok, [Event | State]}.
handle_call(get_events, State) ->
{ok, lists:reverse(State), State};
handle_call(clear_events, _) ->
{ok, ok, []}.
handle_info(_Info, State) ->
{ok, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.

View File

@ -144,6 +144,7 @@ all_tests_3() ->
consume_credit_multiple_ack,
basic_cancel,
consumer_metrics_cleaned_on_connection_close,
consume_cancel_should_create_events,
receive_basic_cancel_on_queue_deletion,
keep_consuming_on_leader_restart,
max_length_bytes,
@ -1195,7 +1196,7 @@ consumer_metrics_cleaned_on_connection_close(Config) ->
Conn = rabbit_ct_client_helpers:open_connection(Config, Server),
{ok, Ch} = amqp_connection:open_channel(Conn),
qos(Ch, 10, false),
CTag = <<"consumer_metrics_cleaned_on_connection_close">>,
CTag = rabbit_data_coercion:to_binary(?FUNCTION_NAME),
subscribe(Ch, Q, false, 0, CTag),
rabbit_ct_helpers:await_condition(
fun() ->
@ -1211,6 +1212,49 @@ consumer_metrics_cleaned_on_connection_close(Config) ->
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
consume_cancel_should_create_events(Config) ->
HandlerMod = rabbit_list_test_event_handler,
rabbit_ct_broker_helpers:add_code_path_to_all_nodes(Config, HandlerMod),
rabbit_ct_broker_helpers:rpc(Config, 0,
gen_event,
add_handler,
[rabbit_event, HandlerMod, []]),
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Q = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Config, Server, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
Conn = rabbit_ct_client_helpers:open_connection(Config, Server),
{ok, Ch} = amqp_connection:open_channel(Conn),
qos(Ch, 10, false),
ok = rabbit_ct_broker_helpers:rpc(Config, 0,
gen_event,
call,
[rabbit_event, HandlerMod, clear_events]),
CTag = rabbit_data_coercion:to_binary(?FUNCTION_NAME),
?assertEqual([], filtered_events(Config, consumer_created, CTag)),
?assertEqual([], filtered_events(Config, consumer_deleted, CTag)),
subscribe(Ch, Q, false, 0, CTag),
?awaitMatch([{event, consumer_created, _, _, _}], filtered_events(Config, consumer_created, CTag), ?WAIT),
?assertEqual([], filtered_events(Config, consumer_deleted, CTag)),
amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = CTag}),
?awaitMatch([{event, consumer_deleted, _, _, _}], filtered_events(Config, consumer_deleted, CTag), ?WAIT),
rabbit_ct_broker_helpers:rpc(Config, 0,
gen_event,
delete_handler,
[rabbit_event, HandlerMod, []]),
ok = rabbit_ct_client_helpers:close_connection(Conn),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
receive_basic_cancel_on_queue_deletion(Config) ->
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@ -1395,6 +1439,18 @@ filter_consumers(Config, Server, CTag) ->
end
end, [], CInfo).
filtered_events(Config, EventType, CTag) ->
Events = rabbit_ct_broker_helpers:rpc(Config, 0,
gen_event,
call,
[rabbit_event, rabbit_list_test_event_handler, get_events]),
lists:filter(fun({event, Type, Fields, _, _}) when Type =:= EventType ->
proplists:get_value(consumer_tag, Fields) =:= CTag;
(_) ->
false
end, Events).
consume_and_reject(Config) ->
consume_and_(Config, fun (DT) -> #'basic.reject'{delivery_tag = DT} end).
consume_and_nack(Config) ->

View File

@ -20,9 +20,9 @@
%% API
-export([init/0]).
-export([consumer_created/9,
-export([consumer_created/10,
consumer_updated/9,
consumer_cancelled/4]).
consumer_cancelled/5]).
-export([publisher_created/4,
publisher_updated/7,
publisher_deleted/3]).
@ -42,7 +42,8 @@ consumer_created(Connection,
Offset,
OffsetLag,
Active,
Properties) ->
Properties,
ActingUser) ->
Values =
[{credits, Credits},
{consumed, MessageCount},
@ -55,16 +56,32 @@ consumer_created(Connection,
ets:insert(?TABLE_CONSUMER,
{{StreamResource, Connection, SubscriptionId}, Values}),
rabbit_global_counters:consumer_created(stream),
rabbit_core_metrics:consumer_created(Connection,
consumer_tag(SubscriptionId),
false,
false,
CTag = consumer_tag(SubscriptionId),
ExclusiveConsume = false,
AckRequired = false,
Pid = Connection,
PrefetchCount = 0,
Args = rabbit_misc:to_amqp_table(Properties),
rabbit_core_metrics:consumer_created(Pid,
CTag,
ExclusiveConsume,
AckRequired,
StreamResource,
0,
PrefetchCount,
Active,
rabbit_stream_utils:consumer_activity_status(Active,
Properties),
rabbit_misc:to_amqp_table(Properties)),
Args),
rabbit_event:notify(consumer_created,
[{consumer_tag, CTag},
{exclusive, ExclusiveConsume},
{ack_required, AckRequired},
{channel, Pid},
{queue, StreamResource},
{prefetch_count, PrefetchCount},
{arguments, Args},
{user_who_performed_action, ActingUser}]),
ok.
consumer_tag(SubscriptionId) ->
@ -104,7 +121,7 @@ consumer_updated(Connection,
ok.
consumer_cancelled(Connection, StreamResource, SubscriptionId, Notify) ->
consumer_cancelled(Connection, StreamResource, SubscriptionId, ActingUser, Notify) ->
ets:delete(?TABLE_CONSUMER,
{StreamResource, Connection, SubscriptionId}),
rabbit_global_counters:consumer_deleted(stream),
@ -115,7 +132,8 @@ consumer_cancelled(Connection, StreamResource, SubscriptionId, Notify) ->
true ->
rabbit_event:notify(consumer_deleted,
[{consumer_tag, consumer_tag(SubscriptionId)},
{channel, self()}, {queue, StreamResource}]);
{channel, self()}, {queue, StreamResource},
{user_who_performed_action, ActingUser}]);
_ -> ok
end,
ok.

View File

@ -2924,9 +2924,8 @@ consumer_name(_Properties) ->
maybe_dispatch_on_subscription(Transport,
State,
ConsumerState,
#stream_connection{deliver_version =
DeliverVersion} =
Connection,
#stream_connection{deliver_version = DeliverVersion,
user = #user{username = Username}} = Connection,
Consumers,
Stream,
SubscriptionId,
@ -2970,13 +2969,14 @@ maybe_dispatch_on_subscription(Transport,
ConsumerOffset,
ConsumerOffsetLag,
true,
SubscriptionProperties),
SubscriptionProperties,
Username),
State#stream_connection_state{consumers = Consumers1}
end;
maybe_dispatch_on_subscription(_Transport,
State,
ConsumerState,
Connection,
#stream_connection{user = #user{username = Username}} = Connection,
Consumers,
Stream,
SubscriptionId,
@ -3000,7 +3000,8 @@ maybe_dispatch_on_subscription(_Transport,
Offset,
0, %% offset lag
Active,
SubscriptionProperties),
SubscriptionProperties,
Username),
Consumers1 = Consumers#{SubscriptionId => ConsumerState},
State#stream_connection_state{consumers = Consumers1}.
@ -3205,19 +3206,15 @@ partition_index(VirtualHost, Stream, Properties) ->
-1
end.
notify_connection_closed(#statem_data{connection =
#stream_connection{name = Name,
publishers =
Publishers} =
Connection,
connection_state =
#stream_connection_state{consumers =
Consumers} =
ConnectionState}) ->
notify_connection_closed(#statem_data{
connection = #stream_connection{name = Name,
user = #user{username = Username},
publishers = Publishers} = Connection,
connection_state = #stream_connection_state{consumers = Consumers} = ConnectionState}) ->
rabbit_core_metrics:connection_closed(self()),
[rabbit_stream_metrics:consumer_cancelled(self(),
stream_r(S, Connection),
SubId, false)
SubId, Username, false)
|| #consumer{configuration =
#consumer_configuration{stream = S,
subscription_id = SubId}}
@ -3275,24 +3272,15 @@ clean_state_after_super_stream_deletion(Partitions, Connection, State, Transport
end, {Connection, State}, Partitions).
clean_state_after_stream_deletion_or_failure(MemberPid, Stream,
#stream_connection{virtual_host =
VirtualHost,
stream_subscriptions
=
StreamSubscriptions,
publishers =
Publishers,
publisher_to_ids
=
PublisherToIds,
stream_leaders =
Leaders,
outstanding_requests = Requests0} =
C0,
#stream_connection_state{consumers
=
Consumers} =
S0) ->
#stream_connection{
user = #user{username = Username},
virtual_host = VirtualHost,
stream_subscriptions = StreamSubscriptions,
publishers = Publishers,
publisher_to_ids = PublisherToIds,
stream_leaders = Leaders,
outstanding_requests = Requests0} = C0,
#stream_connection_state{consumers = Consumers} = S0) ->
{SubscriptionsCleaned, C1, S1} =
case stream_has_subscriptions(Stream, C0) of
true ->
@ -3306,6 +3294,7 @@ clean_state_after_stream_deletion_or_failure(MemberPid, Stream,
stream_r(Stream,
C0),
SubId,
Username,
false),
maybe_unregister_consumer(
VirtualHost, Consumer,
@ -3317,6 +3306,7 @@ clean_state_after_stream_deletion_or_failure(MemberPid, Stream,
stream_r(Stream,
C0),
SubId,
Username,
false),
maybe_unregister_consumer(
VirtualHost, Consumer,
@ -3429,11 +3419,11 @@ lookup_leader_from_manager(VirtualHost, Stream) ->
rabbit_stream_manager:lookup_leader(VirtualHost, Stream).
remove_subscription(SubscriptionId,
#stream_connection{virtual_host = VirtualHost,
outstanding_requests = Requests0,
stream_subscriptions =
StreamSubscriptions} =
Connection,
#stream_connection{
user = #user{username = Username},
virtual_host = VirtualHost,
outstanding_requests = Requests0,
stream_subscriptions = StreamSubscriptions} = Connection,
#stream_connection_state{consumers = Consumers} = State,
Notify) ->
#{SubscriptionId := Consumer} = Consumers,
@ -3462,6 +3452,7 @@ remove_subscription(SubscriptionId,
rabbit_stream_metrics:consumer_cancelled(self(),
stream_r(Stream, Connection2),
SubscriptionId,
Username,
Notify),
Requests1 = maybe_unregister_consumer(

View File

@ -0,0 +1,54 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 2.0 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at https://www.mozilla.org/en-US/MPL/2.0/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is Pivotal Software, Inc.
%% Copyright (c) 2025 Broadcom. All Rights Reserved.
%% The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%
-module(rabbit_list_test_event_handler).
-behaviour(gen_event).
-export([start_link/0, stop/0, get_events/0]).
%% callbacks
-export([init/1, handle_event/2, handle_call/2, handle_info/2, terminate/2, code_change/3]).
start_link() ->
gen_event:start_link({local, ?MODULE}).
stop() ->
gen_event:stop(?MODULE).
get_events() ->
gen_event:call(?MODULE, ?MODULE, get_events).
%% Callbacks
init([]) ->
{ok, []}.
handle_event(Event, State) ->
{ok, [Event | State]}.
handle_call(get_events, State) ->
{ok, lists:reverse(State), State}.
handle_info(_Info, State) ->
{ok, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.

View File

@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is Pivotal Software, Inc.
%% Copyright (c) 2020-2024 Broadcom. All Rights Reserved.
%% Copyright (c) 2020-2025 Broadcom. All Rights Reserved.
%% The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%
@ -66,7 +66,8 @@ groups() ->
unauthorized_vhost_access_should_close_with_delay,
sasl_anonymous,
test_publisher_with_too_long_reference_errors,
test_consumer_with_too_long_reference_errors
test_consumer_with_too_long_reference_errors,
subscribe_unsubscribe_should_create_events
]},
%% Run `test_global_counters` on its own so the global metrics are
%% initialised to 0 for each testcase
@ -489,7 +490,8 @@ test_gc_consumers(Config) ->
0,
0,
true,
#{}]),
#{},
<<"guest">>]),
?awaitMatch(0, consumer_count(Config), ?WAIT),
ok.
@ -1011,6 +1013,57 @@ test_consumer_with_too_long_reference_errors(Config) ->
test_close(T, S, C),
ok.
subscribe_unsubscribe_should_create_events(Config) ->
HandlerMod = rabbit_list_test_event_handler,
rabbit_ct_broker_helpers:add_code_path_to_all_nodes(Config, HandlerMod),
rabbit_ct_broker_helpers:rpc(Config, 0,
gen_event,
add_handler,
[rabbit_event, HandlerMod, []]),
Stream = atom_to_binary(?FUNCTION_NAME, utf8),
Transport = gen_tcp,
Port = get_stream_port(Config),
Opts = get_opts(Transport),
{ok, S} = Transport:connect("localhost", Port, Opts),
C0 = rabbit_stream_core:init(0),
C1 = test_peer_properties(Transport, S, C0),
C2 = test_authenticate(Transport, S, C1),
C3 = test_create_stream(Transport, S, Stream, C2),
?assertEqual([], filtered_events(Config, consumer_created)),
?assertEqual([], filtered_events(Config, consumer_deleted)),
SubscriptionId = 42,
C4 = test_subscribe(Transport, S, SubscriptionId, Stream, C3),
?awaitMatch([{event, consumer_created, _, _, _}], filtered_events(Config, consumer_created), ?WAIT),
?assertEqual([], filtered_events(Config, consumer_deleted)),
C5 = test_unsubscribe(Transport, S, SubscriptionId, C4),
?awaitMatch([{event, consumer_deleted, _, _, _}], filtered_events(Config, consumer_deleted), ?WAIT),
rabbit_ct_broker_helpers:rpc(Config, 0,
gen_event,
delete_handler,
[rabbit_event, HandlerMod, []]),
C6 = test_delete_stream(Transport, S, Stream, C5, false),
_C7 = test_close(Transport, S, C6),
closed = wait_for_socket_close(Transport, S, 10),
ok.
filtered_events(Config, EventType) ->
Events = rabbit_ct_broker_helpers:rpc(Config, 0,
gen_event,
call,
[rabbit_event, rabbit_list_test_event_handler, get_events]),
lists:filter(fun({event, Type, _, _, _}) when Type =:= EventType ->
true;
(_) ->
false
end, Events).
consumer_offset_info(Config, ConnectionName) ->
[[{offset, Offset},
{offset_lag, Lag}]] = rpc(Config, 0, ?MODULE,

View File

@ -9,7 +9,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is Pivotal Software, Inc.
%% Copyright (c) 2024 Broadcom. All Rights Reserved.
%% Copyright (c) 2024-2025 Broadcom. All Rights Reserved.
%% The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%
@ -143,7 +143,8 @@ evaluate_state_after_secret_update_test(_) ->
{C1, S1} = Mod:evaluate_state_after_secret_update(ModTransport, #user{},
#stream_connection{publishers = Publishers,
stream_subscriptions = Subscriptions},
stream_subscriptions = Subscriptions,
user = #user{}},
#stream_connection_state{consumers = Consumers}),
meck:validate(ModLog),
@ -176,7 +177,7 @@ evaluate_state_after_secret_update_test(_) ->
Now = os:system_time(second),
meck:expect(rabbit_access_control, expiry_timestamp, fun (_) -> Now + 60 end),
{C2, _} = Mod:evaluate_state_after_secret_update(ModTransport, #user{},
#stream_connection{},
#stream_connection{user = #user{}},
#stream_connection_state{}),
#stream_connection{token_expiry_timer = TRef2} = C2,
Cancel2 = erlang:cancel_timer(TRef2, [{async, false}, {info, true}]),