diff --git a/deps/rabbit/src/rabbit_core_ff.erl b/deps/rabbit/src/rabbit_core_ff.erl index eed253e8c9..e8817e1751 100644 --- a/deps/rabbit/src/rabbit_core_ff.erl +++ b/deps/rabbit/src/rabbit_core_ff.erl @@ -205,3 +205,10 @@ stability => stable, depends_on => [message_containers] }}). + +-rabbit_feature_flag( + {'rabbitmq_4.1.0', + #{desc => "Allows rolling upgrades to 4.1.x", + stability => stable, + depends_on => ['rabbitmq_4.0.0'] + }}). diff --git a/deps/rabbitmq_mqtt/Makefile b/deps/rabbitmq_mqtt/Makefile index feb46e65b5..928c34c43c 100644 --- a/deps/rabbitmq_mqtt/Makefile +++ b/deps/rabbitmq_mqtt/Makefile @@ -43,7 +43,7 @@ export BUILD_WITHOUT_QUIC LOCAL_DEPS = ssl DEPS = ranch rabbit amqp10_common -TEST_DEPS = emqtt ct_helper rabbitmq_ct_helpers rabbitmq_ct_client_helpers rabbitmq_management amqp_client rabbitmq_consistent_hash_exchange rabbitmq_amqp_client rabbitmq_stomp rabbitmq_stream +TEST_DEPS = emqtt ct_helper rabbitmq_ct_helpers rabbitmq_ct_client_helpers rabbitmq_management amqp_client rabbitmq_consistent_hash_exchange rabbitmq_amqp_client rabbitmq_stomp rabbitmq_stream rabbitmq_federation PLT_APPS += rabbitmqctl elixir @@ -94,7 +94,7 @@ define ct_master.erl halt(0) endef -PARALLEL_CT_SET_1_A = auth retainer +PARALLEL_CT_SET_1_A = auth retainer federation feature_flag PARALLEL_CT_SET_1_B = cluster command config config_schema mc_mqtt packet_prop \ processor protocol_interop proxy_protocol rabbit_mqtt_confirms reader util PARALLEL_CT_SET_1_C = java v5 diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl index 97e5edf831..b14decb189 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl @@ -88,8 +88,13 @@ send_fun :: send_fun(), %% Maximum MQTT packet size in bytes for packets sent from server to client. max_packet_size_outbound :: max_packet_size(), - topic_alias_maximum_outbound :: non_neg_integer() - }). + topic_alias_maximum_outbound :: non_neg_integer(), + %% https://github.com/rabbitmq/rabbitmq-server/issues/13040 + %% The database stores the MQTT subscription options in the binding arguments for: + %% * v1 as Erlang record #mqtt_subscription_opts{} + %% * v2 as AMQP 0.9.1 table + binding_args_v2 :: boolean() + }). -record(state, {cfg :: #cfg{}, @@ -207,6 +212,9 @@ process_connect( {TraceState, ConnName} = init_trace(VHost, ConnName0), ok = rabbit_mqtt_keepalive:start(KeepaliveSecs, Socket), Exchange = rabbit_misc:r(VHost, exchange, persistent_term:get(?PERSISTENT_TERM_EXCHANGE)), + %% To simplify logic, we decide at connection establishment time to stick + %% with either binding args v1 or v2 for the lifetime of the connection. + BindingArgsV2 = rabbit_feature_flags:is_enabled('rabbitmq_4.1.0'), S = #state{ cfg = #cfg{socket = Socket, proto_ver = proto_integer_to_atom(ProtoVer), @@ -229,7 +237,8 @@ process_connect( user_prop = maps:get('User-Property', ConnectProps, []), will_msg = WillMsg, max_packet_size_outbound = MaxPacketSize, - topic_alias_maximum_outbound = TopicAliasMaxOutbound}, + topic_alias_maximum_outbound = TopicAliasMaxOutbound, + binding_args_v2 = BindingArgsV2}, auth_state = #auth_state{ user = User, authz_ctx = AuthzCtx}}, @@ -432,7 +441,8 @@ process_request(?SUBSCRIBE, packet_id = SubscribePktId, subscriptions = Subscriptions}, payload = undefined}, - #state{cfg = #cfg{proto_ver = ProtoVer}} = State0) -> + State0 = #state{cfg = #cfg{proto_ver = ProtoVer, + binding_args_v2 = BindingArgsV2}}) -> ?LOG_DEBUG("Received a SUBSCRIBE with subscription(s) ~p", [Subscriptions]), {ResultRev, RetainedRev, State1} = lists:foldl( @@ -460,7 +470,7 @@ process_request(?SUBSCRIBE, maybe {ok, Q} ?= ensure_queue(QoS, S0), QName = amqqueue:get_name(Q), - BindingArgs = binding_args_for_proto_ver(ProtoVer, TopicFilter, Opts), + BindingArgs = binding_args_for_proto_ver(ProtoVer, TopicFilter, Opts, BindingArgsV2), ok ?= add_subscription(TopicFilter, BindingArgs, QName, S0), ok ?= maybe_delete_old_subscription(TopicFilter, Opts, S0), Subs = maps:put(TopicFilter, Opts, S0#state.subscriptions), @@ -508,10 +518,11 @@ process_request(?UNSUBSCRIBE, {ReasonCodes, State} = lists:foldl( fun(TopicFilter, {L, #state{subscriptions = Subs0, - cfg = #cfg{proto_ver = ProtoVer}} = S0}) -> + cfg = #cfg{proto_ver = ProtoVer, + binding_args_v2 = BindingArgsV2}} = S0}) -> case maps:take(TopicFilter, Subs0) of {Opts, Subs} -> - BindingArgs = binding_args_for_proto_ver(ProtoVer, TopicFilter, Opts), + BindingArgs = binding_args_for_proto_ver(ProtoVer, TopicFilter, Opts, BindingArgsV2), case delete_subscription( TopicFilter, BindingArgs, Opts#mqtt_subscription_opts.qos, S0) of ok -> @@ -872,14 +883,19 @@ init_subscriptions(_SessionPresent = _SubscriptionsPresent = true, init_subscriptions(_, State) -> {ok, State}. +%% We suppress a warning because rabbit_misc:table_lookup/2 declares the correct spec and +%% we must handle binding args v1 where binding arguments are not a valid AMQP 0.9.1 table. +-dialyzer({no_match, init_subscriptions0/2}). + -spec init_subscriptions0(qos(), state()) -> {ok, subscriptions()} | {error, reason_code()}. -init_subscriptions0(QoS, State0 = #state{cfg = #cfg{proto_ver = ProtoVer, - exchange = Exchange}}) -> +init_subscriptions0(QoS, State = #state{cfg = #cfg{proto_ver = ProtoVer, + exchange = Exchange, + binding_args_v2 = BindingArgsV2}}) -> Bindings = rabbit_binding:list_for_source_and_destination( Exchange, - queue_name(QoS, State0), + queue_name(QoS, State), %% Querying table rabbit_route is catastrophic for CPU usage. %% Querying table rabbit_reverse_route is acceptable because %% the source exchange is always the same in the MQTT plugin whereas @@ -887,37 +903,56 @@ init_subscriptions0(QoS, State0 = #state{cfg = #cfg{proto_ver = ProtoVer, %% rabbit_reverse_route is sorted by destination queue. _Reverse = true), try - Subs = lists:foldl( + Subs = lists:map( fun(#binding{key = Key, - args = Args = []}, - Acc) -> + args = Args = []}) -> Opts = #mqtt_subscription_opts{qos = QoS}, TopicFilter = amqp_to_mqtt(Key), case ProtoVer of ?MQTT_PROTO_V5 -> %% session upgrade - NewBindingArgs = binding_args_for_proto_ver(ProtoVer, TopicFilter, Opts), - ok = recreate_subscription(TopicFilter, Args, NewBindingArgs, QoS, State0); + NewBindingArgs = binding_args_for_proto_ver(ProtoVer, TopicFilter, Opts, BindingArgsV2), + ok = recreate_subscription(TopicFilter, Args, NewBindingArgs, QoS, State); _ -> ok end, - maps:put(TopicFilter, Opts, Acc); + {TopicFilter, Opts}; (#binding{key = Key, - args = Args}, - Acc) -> - Opts0 = #mqtt_subscription_opts{} = lists:keyfind(mqtt_subscription_opts, 1, Args), + args = Args}) -> TopicFilter = amqp_to_mqtt(Key), Opts = case ProtoVer of ?MQTT_PROTO_V5 -> - Opts0; + case rabbit_misc:table_lookup(Args, <<"x-mqtt-subscription-opts">>) of + {table, Table} -> + %% binding args v2 + subscription_opts_from_table(Table); + undefined -> + %% binding args v1 + Opts0 = #mqtt_subscription_opts{} = lists:keyfind( + mqtt_subscription_opts, 1, Args), + case BindingArgsV2 of + true -> + %% Migrate v1 to v2. + %% Note that this migration must be in place even for some versions + %% (jump upgrade) after feature flag 'rabbitmq_4.1.0' has become + %% required since enabling the feature flag doesn't migrate binding + %% args for existing connections. + NewArgs = binding_args_for_proto_ver( + ProtoVer, TopicFilter, Opts0, BindingArgsV2), + ok = recreate_subscription(TopicFilter, Args, NewArgs, QoS, State); + false -> + ok + end, + Opts0 + end; _ -> %% session downgrade - ok = recreate_subscription(TopicFilter, Args, [], QoS, State0), + ok = recreate_subscription(TopicFilter, Args, [], QoS, State), #mqtt_subscription_opts{qos = QoS} end, - maps:put(TopicFilter, Opts, Acc) - end, #{}, Bindings), - {ok, Subs} + {TopicFilter, Opts} + end, Bindings), + {ok, maps:from_list(Subs)} catch throw:{error, Reason} -> Rc = case Reason of access_refused -> ?RC_NOT_AUTHORIZED; @@ -1482,14 +1517,52 @@ consume(Q, QoS, #state{ Err end. -binding_args_for_proto_ver(?MQTT_PROTO_V3, _, _) -> +binding_args_for_proto_ver(?MQTT_PROTO_V3, _, _, _) -> []; -binding_args_for_proto_ver(?MQTT_PROTO_V4, _, _) -> +binding_args_for_proto_ver(?MQTT_PROTO_V4, _, _, _) -> []; -binding_args_for_proto_ver(?MQTT_PROTO_V5, TopicFilter, SubOpts) -> +binding_args_for_proto_ver(?MQTT_PROTO_V5, TopicFilter, SubOpts0, V2) -> + SubOpts = case V2 of + true -> + Table = subscription_opts_to_table(SubOpts0), + {<<"x-mqtt-subscription-opts">>, table, Table}; + false -> + SubOpts0 + end, BindingKey = mqtt_to_amqp(TopicFilter), [SubOpts, {<<"x-binding-key">>, longstr, BindingKey}]. +subscription_opts_to_table(#mqtt_subscription_opts{ + qos = Qos, + no_local = NoLocal, + retain_as_published = RetainAsPublished, + retain_handling = RetainHandling, + id = Id}) -> + Table0 = [{<<"qos">>, unsignedbyte, Qos}, + {<<"no-local">>, bool, NoLocal}, + {<<"retain-as-published">>, bool, RetainAsPublished}, + {<<"retain-handling">>, unsignedbyte, RetainHandling}], + Table = case Id of + undefined -> + Table0; + _ -> + [{<<"id">>, unsignedint, Id} | Table0] + end, + rabbit_misc:sort_field_table(Table). + +subscription_opts_from_table(Table) -> + #{<<"qos">> := Qos, + <<"no-local">> := NoLocal, + <<"retain-as-published">> := RetainAsPublished, + <<"retain-handling">> := RetainHandling + } = Map = rabbit_misc:amqp_table(Table), + #mqtt_subscription_opts{ + qos = Qos, + no_local = NoLocal, + retain_as_published = RetainAsPublished, + retain_handling = RetainHandling, + id = maps:get(<<"id">>, Map, undefined)}. + add_subscription(TopicFilter, BindingArgs, Qos, State) when is_integer(Qos) -> add_subscription(TopicFilter, BindingArgs, queue_name(Qos, State), State); @@ -1506,12 +1579,13 @@ delete_subscription(TopicFilter, BindingArgs, Qos, State) -> %% Subscription will be identical to that in the previous Subscription, although its %% Subscription Options could be different." [v5 3.8.4] maybe_delete_old_subscription(TopicFilter, Opts, State = #state{subscriptions = Subs, - cfg = #cfg{proto_ver = ProtoVer}}) -> + cfg = #cfg{proto_ver = ProtoVer, + binding_args_v2 = BindingArgsV2}}) -> case Subs of #{TopicFilter := OldOpts} when OldOpts =/= Opts -> delete_subscription(TopicFilter, - binding_args_for_proto_ver(ProtoVer, TopicFilter, OldOpts), + binding_args_for_proto_ver(ProtoVer, TopicFilter, OldOpts, BindingArgsV2), OldOpts#mqtt_subscription_opts.qos, State); _ -> diff --git a/deps/rabbitmq_mqtt/test/feature_flag_SUITE.erl b/deps/rabbitmq_mqtt/test/feature_flag_SUITE.erl new file mode 100644 index 0000000000..e4e9e1ebcc --- /dev/null +++ b/deps/rabbitmq_mqtt/test/feature_flag_SUITE.erl @@ -0,0 +1,111 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. + +%% This suite should be deleted when feature flag 'rabbitmq_4.1.0' becomes required. +-module(feature_flag_SUITE). +-compile([export_all, + nowarn_export_all]). + +-include_lib("eunit/include/eunit.hrl"). + +-import(util, + [connect/2, + connect/3, + non_clean_sess_opts/0 + ]). + +-define(RC_SESSION_TAKEN_OVER, 16#8E). + +all() -> + [migrate_binding_args]. + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + Config1 = rabbit_ct_helpers:set_config( + Config, + [{mqtt_version, v5}, + {rmq_nodename_suffix, ?MODULE}]), + Config2 = rabbit_ct_helpers:merge_app_env( + Config1, + {rabbit, [{forced_feature_flags_on_init, []}]}), + rabbit_ct_helpers:run_setup_steps( + Config2, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps( + Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()). + +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase). + +end_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_finished(Config, Testcase). + +migrate_binding_args(Config) -> + %% Feature flag rabbitmq_4.1.0 enables binding arguments v2. + FeatureFlag = 'rabbitmq_4.1.0', + ?assertNot(rabbit_ct_broker_helpers:is_feature_flag_enabled(Config, FeatureFlag)), + + Sub1a = connect(<<"sub 1">>, Config, non_clean_sess_opts()), + {ok, _, [0]} = emqtt:subscribe(Sub1a, <<"x/+">>, qos0), + ok = emqtt:disconnect(Sub1a), + + Sub2a = connect(<<"sub 2">>, Config,non_clean_sess_opts()), + {ok, _, [0, 1]} = emqtt:subscribe( + Sub2a, + #{'Subscription-Identifier' => 9}, + [{<<"x/y">>, [{nl, false}, {rap, false}, {qos, qos0}]}, + {<<"z">>, [{nl, true}, {rap, true}, {qos, qos1}]}]), + + Pub = connect(<<"pub">>, Config), + {ok, _} = emqtt:publish(Pub, <<"x/y">>, <<"m1">>, [{retain, true}, {qos, 1}]), + receive {publish, #{client_pid := Sub2a, + qos := 0, + topic := <<"x/y">>, + payload := <<"m1">>, + retain := false}} -> ok + after 10_000 -> ct:fail({missing_publish, ?LINE}) + end, + + ?assertEqual(ok, rabbit_ct_broker_helpers:enable_feature_flag(Config, FeatureFlag)), + + %% Connecting causes binding args to be migrated from v1 to v2. + Sub1b = connect(<<"sub 1">>, Config, [{clean_start, false}]), + receive {publish, #{client_pid := Sub1b, + qos := 0, + topic := <<"x/y">>, + payload := <<"m1">>}} -> ok + after 10_000 -> ct:fail({missing_publish, ?LINE}) + end, + + unlink(Sub2a), + %% Connecting causes binding args to be migrated from v1 to v2. + Sub2b = connect(<<"sub 2">>, Config, [{clean_start, false}]), + receive {disconnected, ?RC_SESSION_TAKEN_OVER, #{}} -> ok + after 10_000 -> ct:fail({missing_disconnected, ?LINE}) + end, + + {ok, _} = emqtt:publish(Sub2b, <<"z">>, <<"m2">>, qos1), + %% We should not receive m2 since it's a local publish. + {ok, _} = emqtt:publish(Pub, <<"z">>, <<"m3">>, [{retain, true}, {qos, qos1}]), + receive {publish, Publish} -> + ?assertMatch(#{client_pid := Sub2b, + qos := 1, + topic := <<"z">>, + payload := <<"m3">>, + properties := #{'Subscription-Identifier' := 9}, + retain := true}, + Publish) + after 10_000 -> ct:fail({missing_publish, ?LINE}) + end, + + ok = emqtt:disconnect(Sub1b), + ok = emqtt:disconnect(Sub2b), + ok = emqtt:disconnect(Pub). diff --git a/deps/rabbitmq_mqtt/test/federation_SUITE.erl b/deps/rabbitmq_mqtt/test/federation_SUITE.erl new file mode 100644 index 0000000000..a87cb3cf73 --- /dev/null +++ b/deps/rabbitmq_mqtt/test/federation_SUITE.erl @@ -0,0 +1,102 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. + +-module(federation_SUITE). +-compile([export_all, + nowarn_export_all]). + +-include_lib("eunit/include/eunit.hrl"). + +-import(rabbit_ct_helpers, + [eventually/3]). + +all() -> + [exchange_federation]. + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + Config1 = rabbit_ct_helpers:set_config( + Config, + [{rmq_nodename_suffix, ?MODULE}, + {rmq_nodes_count, 2}, + {rmq_nodes_clustered, false}]), + rabbit_ct_helpers:run_setup_steps( + Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps( + Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()). + +init_per_testcase(Testcase, Config) -> + case rabbit_ct_broker_helpers:enable_feature_flag(Config, 'rabbitmq_4.1.0') of + ok -> + rabbit_ct_helpers:testcase_started(Config, Testcase); + Skip = {skip, _Reason} -> + Skip + end. + +end_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_finished(Config, Testcase). + +%% Test that exchange federation works for MQTT clients. +%% https://github.com/rabbitmq/rabbitmq-server/issues/13040 +exchange_federation(Config) -> + Upstream = 0, + Downstream = 1, + ok = rabbit_ct_broker_helpers:set_parameter( + Config, Downstream, <<"federation-upstream">>, <<"origin">>, + [ + {<<"uri">>, rabbit_ct_broker_helpers:node_uri(Config, Upstream)} + ]), + ok = rabbit_ct_broker_helpers:set_policy( + Config, Downstream, <<"my policy">>, <<"^amq\.topic$">>, <<"exchanges">>, + [ + {<<"federation-upstream-set">>, <<"all">>} + ]), + + %% Subscribe on the downstream. + SubV4 = util:connect(<<"v4 client">>, Config, Downstream, [{proto_ver, v4}]), + SubV5 = util:connect(<<"v5 client">>, Config, Downstream, [{proto_ver, v5}]), + {ok, _, [1]} = emqtt:subscribe(SubV4, <<"vsn/4">>, qos1), + {ok, _, [1]} = emqtt:subscribe(SubV5, #{'Subscription-Identifier' => 500}, <<"vsn/5">>, qos1), + + %% "The bindings are replicated with the upstream asynchronously so the effect of + %% adding or removing a binding is only guaranteed to be seen eventually." + %% https://www.rabbitmq.com/docs/federated-exchanges#details + eventually( + ?_assertMatch( + [_V4, _V5], + rabbit_ct_broker_helpers:rpc( + Config, Upstream, rabbit_binding, list_for_source, + [rabbit_misc:r(<<"/">>, exchange, <<"amq.topic">>)])), + 1000, 10), + + %% Publish on the upstream. + Pub = util:connect(<<"v3 client">>, Config, Upstream, [{proto_ver, v3}]), + {ok, #{reason_code_name := success}} = emqtt:publish(Pub, <<"vsn/4">>, <<"m1">>, qos1), + {ok, #{reason_code_name := success}} = emqtt:publish(Pub, <<"vsn/5">>, <<"m2">>, qos1), + + receive {publish, #{client_pid := SubV4, + qos := 1, + topic := <<"vsn/4">>, + payload := <<"m1">>}} -> ok + after 10_000 -> ct:fail({missing_publish, ?LINE}) + end, + receive {publish, #{client_pid := SubV5, + qos := 1, + topic := <<"vsn/5">>, + payload := <<"m2">>, + properties := #{'Subscription-Identifier' := 500}}} -> ok + after 10_000 -> ct:fail({missing_publish, ?LINE}) + end, + + ok = emqtt:disconnect(SubV4), + ok = emqtt:disconnect(SubV5), + ok = emqtt:disconnect(Pub).