Require all stable feature flags added up to 3.13.0

Since feature flag `message_containers` introduced in 3.13.0 is required in 4.0,
we can also require all other feature flags introduced in or before 3.13.0
and remove their compatibility code for 4.0:

* restart_streams
* stream_sac_coordinator_unblock_group
* stream_filtering
* stream_update_config_command
This commit is contained in:
David Ansari 2024-07-10 11:40:24 +02:00
parent cabf4dd863
commit 18e8c1d5f8
12 changed files with 78 additions and 287 deletions

View File

@ -101,7 +101,7 @@
{restart_streams,
#{desc => "Support for restarting streams with optional preferred next leader argument."
"Used to implement stream leader rebalancing",
stability => stable,
stability => required,
depends_on => [stream_queue]
}}).
@ -109,14 +109,14 @@
{stream_sac_coordinator_unblock_group,
#{desc => "Bug fix to unblock a group of consumers in a super stream partition",
doc_url => "https://github.com/rabbitmq/rabbitmq-server/issues/7743",
stability => stable,
stability => required,
depends_on => [stream_single_active_consumer]
}}).
-rabbit_feature_flag(
{stream_filtering,
#{desc => "Support for stream filtering.",
stability => stable,
stability => required,
depends_on => [stream_queue]
}}).
@ -153,7 +153,7 @@
{stream_update_config_command,
#{desc => "A new internal command that is used to update streams as "
"part of a policy.",
stability => stable,
stability => required,
depends_on => [stream_queue]
}}).

View File

@ -174,19 +174,14 @@ restart_stream(QRes, Options)
restart_stream(Q, Options)
when ?is_amqqueue(Q) andalso
?amqqueue_is_stream(Q) ->
case rabbit_feature_flags:is_enabled(restart_streams) of
true ->
rabbit_log:info("restarting stream ~s in vhost ~s with options ~p",
[maps:get(name, amqqueue:get_type_state(Q)), amqqueue:get_vhost(Q), Options]),
#{name := StreamId} = amqqueue:get_type_state(Q),
case process_command({restart_stream, StreamId, Options}) of
{ok, {ok, LeaderPid}, _} ->
{ok, node(LeaderPid)};
Err ->
Err
end;
false ->
{error, {feature_flag_not_enabled, restart_stream}}
rabbit_log:info("restarting stream ~s in vhost ~s with options ~p",
[maps:get(name, amqqueue:get_type_state(Q)), amqqueue:get_vhost(Q), Options]),
#{name := StreamId} = amqqueue:get_type_state(Q),
case process_command({restart_stream, StreamId, Options}) of
{ok, {ok, LeaderPid}, _} ->
{ok, node(LeaderPid)};
Err ->
Err
end.
delete_stream(Q, ActingUser)
@ -254,22 +249,17 @@ policy_changed(Q) when ?is_amqqueue(Q) ->
{ok, ok, ra:server_id()} | {error, not_supported | term()}.
update_config(Q, Config)
when ?is_amqqueue(Q) andalso is_map(Config) ->
case rabbit_feature_flags:is_enabled(stream_update_config_command) of
true ->
%% there are the only a few configuration keys that are safe to
%% update
StreamId = maps:get(name, amqqueue:get_type_state(Q)),
case maps:with([filter_size,
retention,
writer_mod,
replica_mod], Config) of
Conf when map_size(Conf) > 0 ->
process_command({update_config, StreamId, Conf});
_ ->
{error, no_updatable_keys}
end;
false ->
{error, feature_not_enabled}
%% there are the only a few configuration keys that are safe to
%% update
StreamId = maps:get(name, amqqueue:get_type_state(Q)),
case maps:with([filter_size,
retention,
writer_mod,
replica_mod], Config) of
Conf when map_size(Conf) > 0 ->
process_command({update_config, StreamId, Conf});
_ ->
{error, no_updatable_keys}
end.
sac_state(#?MODULE{single_active_consumer = SacState}) ->

View File

@ -96,8 +96,7 @@
soft_limit :: non_neg_integer(),
slow = false :: boolean(),
readers = #{} :: #{rabbit_types:ctag() => #stream{}},
writer_id :: binary(),
filtering_supported :: boolean()
writer_id :: binary()
}).
-import(rabbit_queue_type_util, [args_policy_lookup/3]).
@ -286,8 +285,7 @@ consume(Q, #{no_ack := true,
consume(Q, #{limiter_active := true}, _State)
when ?amqqueue_is_stream(Q) ->
{error, global_qos_not_supported_for_queue_type};
consume(Q, Spec,
#stream_client{filtering_supported = FilteringSupported} = QState0)
consume(Q, Spec, #stream_client{} = QState0)
when ?amqqueue_is_stream(Q) ->
%% Messages should include the offset as a custom header.
case get_local_pid(QState0) of
@ -307,26 +305,19 @@ consume(Q, Spec,
{error, _} = Err ->
Err;
{ok, OffsetSpec} ->
FilterSpec = filter_spec(Args),
case {FilterSpec, FilteringSupported} of
{#{filter_spec := _}, false} ->
{protocol_error, precondition_failed,
"Filtering is not supported", []};
_ ->
ConsumerPrefetchCount = case Mode of
{simple_prefetch, C} -> C;
_ -> 0
end,
AckRequired = not NoAck,
rabbit_core_metrics:consumer_created(
ChPid, ConsumerTag, ExclusiveConsume, AckRequired,
QName, ConsumerPrefetchCount, false, up, Args),
%% reply needs to be sent before the stream
%% begins sending
maybe_send_reply(ChPid, OkMsg),
_ = rabbit_stream_coordinator:register_local_member_listener(Q),
begin_stream(QState, ConsumerTag, OffsetSpec, Mode, AckRequired, FilterSpec)
end
ConsumerPrefetchCount = case Mode of
{simple_prefetch, C} -> C;
_ -> 0
end,
AckRequired = not NoAck,
rabbit_core_metrics:consumer_created(
ChPid, ConsumerTag, ExclusiveConsume, AckRequired,
QName, ConsumerPrefetchCount, false, up, Args),
%% reply needs to be sent before the stream
%% begins sending
maybe_send_reply(ChPid, OkMsg),
_ = rabbit_stream_coordinator:register_local_member_listener(Q),
begin_stream(QState, ConsumerTag, OffsetSpec, Mode, AckRequired, filter_spec(Args))
end;
{undefined, _} ->
{protocol_error, precondition_failed,
@ -510,8 +501,7 @@ deliver(QSs, Msg, Options) ->
lists:foldl(
fun({Q, stateless}, {Qs, Actions}) ->
LeaderPid = amqqueue:get_pid(Q),
ok = osiris:write(LeaderPid,
stream_message(Msg, filtering_supported())),
ok = osiris:write(LeaderPid, stream_message(Msg)),
{Qs, Actions};
({Q, S0}, {Qs, Actions0}) ->
{S, Actions} = deliver0(maps:get(correlation, Options, undefined),
@ -526,11 +516,9 @@ deliver0(MsgId, Msg,
next_seq = Seq,
correlation = Correlation0,
soft_limit = SftLmt,
slow = Slow0,
filtering_supported = FilteringSupported} = State,
slow = Slow0} = State,
Actions0) ->
ok = osiris:write(LeaderPid, WriterId, Seq,
stream_message(Msg, FilteringSupported)),
ok = osiris:write(LeaderPid, WriterId, Seq, stream_message(Msg)),
Correlation = case MsgId of
undefined ->
Correlation0;
@ -547,19 +535,14 @@ deliver0(MsgId, Msg,
correlation = Correlation,
slow = Slow}, Actions}.
stream_message(Msg, FilteringSupported) ->
stream_message(Msg) ->
McAmqp = mc:convert(mc_amqp, Msg),
MsgData = mc:protocol_state(McAmqp),
case FilteringSupported of
true ->
case mc:x_header(<<"x-stream-filter-value">>, McAmqp) of
undefined ->
MsgData;
{utf8, Value} ->
{Value, MsgData}
end;
false ->
MsgData
case mc:x_header(<<"x-stream-filter-value">>, McAmqp) of
undefined ->
MsgData;
{utf8, Value} ->
{Value, MsgData}
end.
-spec dequeue(_, _, _, _, client()) -> no_return().
@ -936,8 +919,7 @@ init(Q) when ?is_amqqueue(Q) ->
name = amqqueue:get_name(Q),
leader = Leader,
writer_id = WriterId,
soft_limit = SoftLimit,
filtering_supported = filtering_supported()}};
soft_limit = SoftLimit}};
{ok, stream_not_found, _} ->
{error, stream_not_found};
{error, coordinator_unavailable} = E ->
@ -1294,8 +1276,7 @@ notify_decorators(Q) when ?is_amqqueue(Q) ->
resend_all(#stream_client{leader = LeaderPid,
writer_id = WriterId,
correlation = Corrs,
filtering_supported = FilteringSupported} = State) ->
correlation = Corrs} = State) ->
Msgs = lists:sort(maps:values(Corrs)),
case Msgs of
[] -> ok;
@ -1304,8 +1285,7 @@ resend_all(#stream_client{leader = LeaderPid,
[Seq, maps:size(Corrs)])
end,
[begin
ok = osiris:write(LeaderPid, WriterId, Seq,
stream_message(Msg, FilteringSupported))
ok = osiris:write(LeaderPid, WriterId, Seq, stream_message(Msg))
end || {Seq, Msg} <- Msgs],
State.
@ -1340,9 +1320,6 @@ list_with_minimum_quorum() ->
is_stateful() -> true.
filtering_supported() ->
rabbit_feature_flags:is_enabled(stream_filtering).
get_nodes(Q) when ?is_amqqueue(Q) ->
#{nodes := Nodes} = amqqueue:get_type_state(Q),
Nodes.

View File

@ -629,32 +629,12 @@ handle_consumer_removal(Group0, Consumer, Stream, ConsumerName) ->
end
end.
message_type() ->
case has_unblock_group_support() of
true ->
map;
false ->
tuple
end.
notify_consumer_effect(Pid, SubId, Stream, Name, Active) ->
notify_consumer_effect(Pid, SubId, Stream, Name, Active, false).
notify_consumer_effect(Pid, SubId, Stream, Name, Active, SteppingDown) ->
notify_consumer_effect(Pid, SubId, Stream, Name, Active, SteppingDown, message_type()).
notify_consumer_effect(Pid, SubId, Stream, Name, Active, SteppingDown, map).
notify_consumer_effect(Pid, SubId, _Stream, _Name, Active, false = _SteppingDown, tuple) ->
mod_call_effect(Pid,
{sac,
{{subscription_id, SubId},
{active, Active},
{extra, []}}});
notify_consumer_effect(Pid, SubId, _Stream, _Name, Active, true = _SteppingDown, tuple) ->
mod_call_effect(Pid,
{sac,
{{subscription_id, SubId},
{active, Active},
{extra, [{stepping_down, true}]}}});
notify_consumer_effect(Pid, SubId, Stream, Name, Active, false = _SteppingDown, map) ->
mod_call_effect(Pid,
{sac, #{subscription_id => SubId,
@ -776,6 +756,3 @@ mod_call_effect(Pid, Msg) ->
send_message(ConnectionPid, Msg) ->
ConnectionPid ! Msg,
ok.
has_unblock_group_support() ->
rabbit_feature_flags:is_enabled(stream_sac_coordinator_unblock_group).

View File

@ -3138,7 +3138,6 @@ global_counters(Config) ->
ok = amqp10_client:close_connection(Connection).
stream_filtering(Config) ->
ok = rabbit_ct_broker_helpers:enable_feature_flag(Config, ?FUNCTION_NAME),
Stream = atom_to_binary(?FUNCTION_NAME),
Address = rabbitmq_amqp_address:queue(Stream),
Ch = rabbit_ct_client_helpers:open_channel(Config),

View File

@ -239,27 +239,10 @@ init_per_group1(Group, Config) ->
{skip, _} ->
Ret;
Config2 ->
EnableFF = rabbit_ct_broker_helpers:enable_feature_flag(
Config2, stream_queue),
case EnableFF of
ok ->
if Clustered ->
rabbit_ct_broker_helpers:enable_feature_flag(
Config2, stream_update_config_command);
true ->
ok
end,
ok = rabbit_ct_broker_helpers:rpc(
Config2, 0, application, set_env,
[rabbit, channel_tick_interval, 100]),
Config2;
{skip, _} = Skip ->
end_per_group(Group, Config2),
Skip;
Other ->
end_per_group(Group, Config2),
{skip, Other}
end
ok = rabbit_ct_broker_helpers:rpc(
Config2, 0, application, set_env,
[rabbit, channel_tick_interval, 100]),
Config2
end.
end_per_group(_, Config) ->
@ -1437,34 +1420,27 @@ tracking_status(Config) ->
rabbit_ct_broker_helpers:rpc(Config, Server, ?MODULE, delete_testcase_queue, [Q]).
restart_stream(Config) ->
case rabbit_ct_broker_helpers:enable_feature_flag(Config, restart_stream) of
ok ->
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
Q = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Config, Server, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
Q = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Config, Server, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
publish_confirm(Ch, Q, [<<"msg">>]),
Vhost = ?config(rmq_vhost, Config),
QName = #resource{virtual_host = Vhost,
kind = queue,
name = Q},
%% restart the stream
?assertMatch({ok, _},
rabbit_ct_broker_helpers:rpc(Config, Server,
rabbit_stream_coordinator,
?FUNCTION_NAME, [QName])),
publish_confirm(Ch, Q, [<<"msg">>]),
Vhost = ?config(rmq_vhost, Config),
QName = #resource{virtual_host = Vhost,
kind = queue,
name = Q},
%% restart the stream
?assertMatch({ok, _},
rabbit_ct_broker_helpers:rpc(Config, Server,
rabbit_stream_coordinator,
?FUNCTION_NAME, [QName])),
publish_confirm(Ch, Q, [<<"msg2">>]),
rabbit_ct_broker_helpers:rpc(Config, Server, ?MODULE, delete_testcase_queue, [Q]),
ok;
_ ->
ct:pal("skipping test ~s as feature flag `restart_stream` not supported",
[?FUNCTION_NAME]),
ok
end.
publish_confirm(Ch, Q, [<<"msg2">>]),
rabbit_ct_broker_helpers:rpc(Config, Server, ?MODULE, delete_testcase_queue, [Q]),
ok.
format(Config) ->
%% tests rabbit_stream_queue:format/2
@ -2817,18 +2793,7 @@ ensure_retention_applied(Config, Server) ->
rabbit_ct_broker_helpers:rpc(Config, Server, gen_server, call, [osiris_retention, test]).
rebalance(Config) ->
case rabbit_ct_broker_helpers:enable_feature_flag(Config, restart_stream) of
ok ->
rebalance0(Config);
_ ->
ct:pal("skipping test ~s as feature flag `restart_stream` not supported",
[?FUNCTION_NAME]),
ok
end.
rebalance0(Config) ->
[Server0 | _] =
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
[Server0 | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
Q1 = <<"st1">>,

View File

@ -103,13 +103,6 @@ init_per_testcase0(publish_unauthorized_error, Config) ->
StompPort = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_stomp),
{ok, ClientFoo} = rabbit_stomp_client:connect(Version, "user", "pass", StompPort),
rabbit_ct_helpers:set_config(Config, [{client_foo, ClientFoo}]);
init_per_testcase0(stream_filtering, Config) ->
case rabbit_ct_helpers:is_mixed_versions() of
true ->
{skip, "mixed version clusters are not supported for stream filtering"};
_ ->
Config
end;
init_per_testcase0(_, Config) ->
Config.

View File

@ -182,8 +182,7 @@ init([KeepaliveSup,
correlation_id_sequence = 0,
outstanding_requests = #{},
request_timeout = RequestTimeout,
deliver_version = DeliverVersion,
filtering_supported = rabbit_stream_utils:filtering_supported()},
deliver_version = DeliverVersion},
State =
#stream_connection_state{consumers = #{},
blocked = false,
@ -549,9 +548,6 @@ increase_messages_confirmed(Counters, Count) ->
rabbit_global_counters:messages_confirmed(stream, Count),
atomics:add(Counters, 2, Count).
increase_messages_errored(Counters, Count) ->
atomics:add(Counters, 3, Count).
messages_consumed(Counters) ->
atomics:get(Counters, 1).
@ -714,19 +710,6 @@ open(info, {OK, S, Data},
StatemData#statem_data{connection = Connection1,
connection_state = State2}}
end;
open(info,
{sac, {{subscription_id, SubId},
{active, Active}, {extra, Extra}}},
State) ->
Msg0 = #{subscription_id => SubId,
active => Active},
Msg1 = case Extra of
[{stepping_down, true}] ->
Msg0#{stepping_down => true};
_ ->
Msg0
end,
open(info, {sac, Msg1}, State);
open(info,
{sac, #{subscription_id := SubId,
active := Active} = Msg},
@ -1783,31 +1766,6 @@ handle_frame_post_auth(Transport,
{publish, PublisherId, MessageCount, Messages}) ->
handle_frame_post_auth(Transport, Connection, State,
{publish, ?VERSION_1, PublisherId, MessageCount, Messages});
handle_frame_post_auth(Transport,
#stream_connection{filtering_supported = false,
publishers = Publishers,
socket = S} = Connection,
State,
{publish_v2, PublisherId, MessageCount, Messages}) ->
case Publishers of
#{PublisherId := #publisher{message_counters = Counters}} ->
increase_messages_received(Counters, MessageCount),
increase_messages_errored(Counters, MessageCount),
ok;
_ ->
ok
end,
rabbit_global_counters:increase_protocol_counter(stream,
?PRECONDITION_FAILED,
1),
PublishingIds = publishing_ids_from_messages(?VERSION_2, Messages),
Command = {publish_error,
PublisherId,
?RESPONSE_CODE_PRECONDITION_FAILED,
PublishingIds},
Frame = rabbit_stream_core:frame(Command),
send(Transport, S, Frame),
{Connection, State};
handle_frame_post_auth(Transport,
Connection,
State,
@ -1932,29 +1890,6 @@ handle_frame_post_auth(Transport,
1),
{Connection0, State}
end;
handle_frame_post_auth(Transport,
#stream_connection{filtering_supported = false} = Connection,
State,
{request, CorrelationId,
{subscribe,
SubscriptionId, _, _, _, Properties}} = Request) ->
case rabbit_stream_utils:filter_defined(Properties) of
true ->
rabbit_log:warning("Cannot create subcription ~tp, it defines a filter "
"and filtering is not active",
[SubscriptionId]),
response(Transport,
Connection,
subscribe,
CorrelationId,
?RESPONSE_CODE_PRECONDITION_FAILED),
rabbit_global_counters:increase_protocol_counter(stream,
?PRECONDITION_FAILED,
1),
{Connection, State};
false ->
handle_frame_post_auth(Transport, {ok, Connection}, State, Request)
end;
handle_frame_post_auth(Transport, #stream_connection{} = Connection, State,
{request, _,
{subscribe,

View File

@ -93,7 +93,6 @@
deliver_version :: rabbit_stream_core:command_version(),
request_timeout :: pos_integer(),
outstanding_requests_timer :: undefined | erlang:reference(),
filtering_supported :: boolean(),
%% internal sequence used for publishers
internal_sequence = 0 :: integer(),
token_expiry_timer = undefined :: undefined | erlang:reference()}).

View File

@ -34,7 +34,6 @@
filter_defined/1,
filter_spec/1,
command_versions/0,
filtering_supported/0,
check_super_stream_management_permitted/4,
offset_lag/4,
consumer_offset/3]).
@ -298,14 +297,8 @@ filter_spec(Properties) ->
end.
command_versions() ->
PublishMaxVersion = case filtering_supported() of
false ->
?VERSION_1;
true ->
?VERSION_2
end,
[{declare_publisher, ?VERSION_1, ?VERSION_1},
{publish, ?VERSION_1, PublishMaxVersion},
{publish, ?VERSION_1, ?VERSION_2},
{query_publisher_sequence, ?VERSION_1, ?VERSION_1},
{delete_publisher, ?VERSION_1, ?VERSION_1},
{subscribe, ?VERSION_1, ?VERSION_1},
@ -324,9 +317,6 @@ command_versions() ->
{create_super_stream, ?VERSION_1, ?VERSION_1},
{delete_super_stream, ?VERSION_1, ?VERSION_1}].
filtering_supported() ->
rabbit_feature_flags:is_enabled(stream_filtering).
q(VirtualHost, Name) ->
rabbit_misc:r(VirtualHost, queue, Name).

View File

@ -37,8 +37,7 @@ all() ->
groups() ->
[{single_node, [],
[filtering_ff, %% must stay at the top, feature flag disabled for this one
test_stream,
[test_stream,
test_stream_tls,
test_publish_v2,
test_super_stream_creation_deletion,
@ -185,13 +184,6 @@ end_per_testcase(cannot_update_username_after_authenticated = TestCase, Config)
ok = rabbit_ct_broker_helpers:delete_user(Config, <<"other">>),
rabbit_ct_helpers:testcase_finished(Config, TestCase);
end_per_testcase(filtering_ff = TestCase, Config) ->
_ = rabbit_ct_broker_helpers:rpc(Config,
0,
rabbit_feature_flags,
enable,
[stream_filtering]),
rabbit_ct_helpers:testcase_finished(Config, TestCase);
end_per_testcase(close_connection_on_consumer_update_timeout = TestCase, Config) ->
ok = rabbit_ct_broker_helpers:rpc(Config,
0,
@ -212,32 +204,6 @@ end_per_testcase(store_offset_requires_read_access = TestCase, Config) ->
end_per_testcase(TestCase, Config) ->
rabbit_ct_helpers:testcase_finished(Config, TestCase).
filtering_ff(Config) ->
Stream = atom_to_binary(?FUNCTION_NAME, utf8),
Transport = gen_tcp,
Port = get_stream_port(Config),
Opts = [{active, false}, {mode, binary}],
{ok, S} = Transport:connect("localhost", Port, Opts),
C0 = rabbit_stream_core:init(0),
C1 = test_peer_properties(Transport, S, C0),
C2 = test_authenticate(Transport, S, C1),
C3 = test_create_stream(Transport, S, Stream, C2),
PublisherId = 42,
C4 = test_declare_publisher(Transport, S, PublisherId, Stream, C3),
Body = <<"hello">>,
C5 = test_publish_confirm(Transport, S, publish_v2, PublisherId, Body,
publish_error, C4),
SubscriptionId = 42,
C6 = test_subscribe(Transport, S, SubscriptionId, Stream,
#{<<"filter.0">> => <<"foo">>},
?RESPONSE_CODE_PRECONDITION_FAILED,
C5),
C7 = test_delete_stream(Transport, S, Stream, C6),
_C8 = test_close(Transport, S, C7),
closed = wait_for_socket_close(Transport, S, 10),
ok.
test_global_counters(Config) ->
Stream = atom_to_binary(?FUNCTION_NAME, utf8),
test_server(gen_tcp, Stream, Config),

View File

@ -281,7 +281,7 @@ def rabbitmq_integration_suite(
# required starting from 3.12.0 in rabbitmq_management_agent:
# empty_basic_get_metric, drop_unroutable_metric
# required starting from 4.0 in rabbit:
"message_containers",
"message_containers,stream_update_config_command,stream_filtering,stream_sac_coordinator_unblock_group,restart_streams",
"RABBITMQ_RUN": "$(location :rabbitmq-for-tests-run)",
"RABBITMQCTL": "$TEST_SRCDIR/$TEST_WORKSPACE/{}/broker-for-tests-home/sbin/rabbitmqctl".format(package),
"RABBITMQ_PLUGINS": "$TEST_SRCDIR/$TEST_WORKSPACE/{}/broker-for-tests-home/sbin/rabbitmq-plugins".format(package),