From 2f76102603b5064ed1f8a38ed388324d0ea1a10e Mon Sep 17 00:00:00 2001 From: David Ansari Date: Mon, 13 Feb 2023 11:52:25 +0000 Subject: [PATCH] Remove compatibility for flag stream_single_active_consumer Remove compatibility code for feature flag `stream_single_active_consumer` because this feature flag is `required` in 3.12. See https://github.com/rabbitmq/rabbitmq-server/pull/7219 --- deps/rabbit/src/rabbit_core_ff.erl | 1 - .../src/rabbit_stream_sac_coordinator.erl | 189 ++++++++---------- .../src/rabbit_stream_reader.erl | 18 +- .../src/rabbit_stream_utils.erl | 4 - 4 files changed, 84 insertions(+), 128 deletions(-) diff --git a/deps/rabbit/src/rabbit_core_ff.erl b/deps/rabbit/src/rabbit_core_ff.erl index 8cd06f24b5..97aa469cbd 100644 --- a/deps/rabbit/src/rabbit_core_ff.erl +++ b/deps/rabbit/src/rabbit_core_ff.erl @@ -59,7 +59,6 @@ {stream_single_active_consumer, #{desc => "Single active consumer for streams", doc_url => "https://www.rabbitmq.com/stream.html", - %%TODO remove compatibility code stability => required, depends_on => [stream_queue] }}). diff --git a/deps/rabbit/src/rabbit_stream_sac_coordinator.erl b/deps/rabbit/src/rabbit_stream_sac_coordinator.erl index 55123e05ce..c761b9fa60 100644 --- a/deps/rabbit/src/rabbit_stream_sac_coordinator.erl +++ b/deps/rabbit/src/rabbit_stream_sac_coordinator.erl @@ -39,7 +39,6 @@ handle_connection_down/2, consumer_groups/3, group_consumers/5, - is_ff_enabled/0, overview/1]). %% Single Active Consumer API @@ -50,8 +49,7 @@ pid(), binary(), integer()) -> - {ok, boolean()} | {error, feature_flag_disabled} | - {error, term()}. + {ok, boolean()} | {error, term()}. register_consumer(VirtualHost, Stream, PartitionIndex, @@ -59,70 +57,62 @@ register_consumer(VirtualHost, ConnectionPid, Owner, SubscriptionId) -> - maybe_sac_execute(fun() -> - process_command({sac, - #command_register_consumer{vhost = - VirtualHost, - stream = - Stream, - partition_index - = - PartitionIndex, - consumer_name - = - ConsumerName, - connection_pid - = - ConnectionPid, - owner = - Owner, - subscription_id - = - SubscriptionId}}) - end). + process_command({sac, + #command_register_consumer{vhost = + VirtualHost, + stream = + Stream, + partition_index + = + PartitionIndex, + consumer_name + = + ConsumerName, + connection_pid + = + ConnectionPid, + owner = + Owner, + subscription_id + = + SubscriptionId}}). -spec unregister_consumer(binary(), binary(), binary(), pid(), integer()) -> - ok | {error, feature_flag_disabled} | - {error, term()}. + ok | {error, term()}. unregister_consumer(VirtualHost, Stream, ConsumerName, ConnectionPid, SubscriptionId) -> - maybe_sac_execute(fun() -> - process_command({sac, - #command_unregister_consumer{vhost = - VirtualHost, - stream = - Stream, - consumer_name - = - ConsumerName, - connection_pid - = - ConnectionPid, - subscription_id - = - SubscriptionId}}) - end). + process_command({sac, + #command_unregister_consumer{vhost = + VirtualHost, + stream = + Stream, + consumer_name + = + ConsumerName, + connection_pid + = + ConnectionPid, + subscription_id + = + SubscriptionId}}). --spec activate_consumer(binary(), binary(), binary()) -> - ok | {error, feature_flag_disabled}. +-spec activate_consumer(binary(), binary(), binary()) -> ok. activate_consumer(VirtualHost, Stream, ConsumerName) -> - maybe_sac_execute(fun() -> - process_command({sac, - #command_activate_consumer{vhost = - VirtualHost, - stream = - Stream, - consumer_name - = - ConsumerName}}) - end). + process_command({sac, + #command_activate_consumer{vhost = + VirtualHost, + stream = + Stream, + consumer_name + = + ConsumerName}}). process_command(Cmd) -> case rabbit_stream_coordinator:process_command(Cmd) of @@ -137,62 +127,50 @@ process_command(Cmd) -> %% return the current groups for a given virtual host -spec consumer_groups(binary(), [atom()]) -> {ok, - [term()] | {error, feature_flag_disabled | atom()}}. + [term()] | {error, atom()}}. consumer_groups(VirtualHost, InfoKeys) -> - maybe_sac_execute(fun() -> - case ra:local_query({rabbit_stream_coordinator, - node()}, - fun(State) -> - SacState = - rabbit_stream_coordinator:sac_state(State), - consumer_groups(VirtualHost, - InfoKeys, - SacState) - end) - of - {ok, {_, Result}, _} -> Result; - {error, noproc} -> - %% not started yet, so no groups - {ok, []}; - {error, _} = Err -> Err; - {timeout, _} -> {error, timeout} - end - end). + case ra:local_query({rabbit_stream_coordinator, + node()}, + fun(State) -> + SacState = + rabbit_stream_coordinator:sac_state(State), + consumer_groups(VirtualHost, + InfoKeys, + SacState) + end) + of + {ok, {_, Result}, _} -> Result; + {error, noproc} -> + %% not started yet, so no groups + {ok, []}; + {error, _} = Err -> Err; + {timeout, _} -> {error, timeout} + end. %% get the consumers of a given group in a given virtual host -spec group_consumers(binary(), binary(), binary(), [atom()]) -> {ok, [term()]} | - {error, feature_flag_disabled | atom()}. + {error, atom()}. group_consumers(VirtualHost, Stream, Reference, InfoKeys) -> - maybe_sac_execute(fun() -> - case ra:local_query({rabbit_stream_coordinator, - node()}, - fun(State) -> - SacState = - rabbit_stream_coordinator:sac_state(State), - group_consumers(VirtualHost, - Stream, - Reference, - InfoKeys, - SacState) - end) - of - {ok, {_, {ok, _} = Result}, _} -> Result; - {ok, {_, {error, _} = Err}, _} -> Err; - {error, noproc} -> - %% not started yet, so the group cannot exist - {error, not_found}; - {error, _} = Err -> Err; - {timeout, _} -> {error, timeout} - end - end). - -maybe_sac_execute(Fun) -> - case rabbit_stream_sac_coordinator:is_ff_enabled() of - true -> - Fun(); - false -> - {error, feature_flag_disabled} + case ra:local_query({rabbit_stream_coordinator, + node()}, + fun(State) -> + SacState = + rabbit_stream_coordinator:sac_state(State), + group_consumers(VirtualHost, + Stream, + Reference, + InfoKeys, + SacState) + end) + of + {ok, {_, {ok, _} = Result}, _} -> Result; + {ok, {_, {error, _} = Err}, _} -> Err; + {error, noproc} -> + %% not started yet, so the group cannot exist + {error, not_found}; + {error, _} = Err -> Err; + {timeout, _} -> {error, timeout} end. -spec overview(state()) -> map(). @@ -765,6 +743,3 @@ mod_call_effect(Pid, Msg) -> send_message(ConnectionPid, Msg) -> ConnectionPid ! Msg, ok. - -is_ff_enabled() -> - rabbit_feature_flags:is_enabled(stream_single_active_consumer). diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 5102f91cf3..177a9c4278 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -1887,23 +1887,9 @@ handle_frame_post_auth(Transport, Properties]), Sac = single_active_consumer(Properties), ConsumerName = consumer_name(Properties), - case {Sac, rabbit_stream_utils:is_sac_ff_enabled(), - ConsumerName} + case {Sac, ConsumerName} of - {true, false, _} -> - rabbit_log:warning("Cannot create subcription ~tp, stream single " - "active consumer feature flag is not enabled", - [SubscriptionId]), - response(Transport, - Connection, - subscribe, - CorrelationId, - ?RESPONSE_CODE_PRECONDITION_FAILED), - rabbit_global_counters:increase_protocol_counter(stream, - ?PRECONDITION_FAILED, - 1), - {Connection, State}; - {true, _, undefined} -> + {true, undefined} -> rabbit_log:warning("Cannot create subcription ~tp, a single active " "consumer must have a name", [SubscriptionId]), diff --git a/deps/rabbitmq_stream/src/rabbit_stream_utils.erl b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl index 455fce87e7..5ba9b17e46 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_utils.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl @@ -28,7 +28,6 @@ extract_stream_list/2, sort_partitions/1, strip_cr_lf/1, - is_sac_ff_enabled/0, consumer_activity_status/2, command_versions/0]). @@ -245,9 +244,6 @@ sort_partitions(Partitions) -> strip_cr_lf(NameBin) -> binary:replace(NameBin, [<<"\n">>, <<"\r">>], <<"">>, [global]). -is_sac_ff_enabled() -> - rabbit_feature_flags:is_enabled(stream_single_active_consumer). - consumer_activity_status(Active, Properties) -> case {rabbit_stream_reader:single_active_consumer(Properties), Active} of