From b62e09806bba2e9e83b82c009fa4bac76d7c316f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20G=C3=B6m=C3=B6ri?= Date: Mon, 3 Feb 2025 18:29:08 +0100 Subject: [PATCH 01/15] Show consumer count column on Mgmt UI Channels page Consumer count is already returned by the /channels API endpoint. Now the consumer count column can be shown in the channels table but it is hidden by default. --- deps/rabbitmq_management/priv/www/js/global.js | 1 + .../priv/www/js/tmpl/channels-list.ejs | 9 +++++++++ 2 files changed, 10 insertions(+) diff --git a/deps/rabbitmq_management/priv/www/js/global.js b/deps/rabbitmq_management/priv/www/js/global.js index a35821ebd7..0e3f59025d 100644 --- a/deps/rabbitmq_management/priv/www/js/global.js +++ b/deps/rabbitmq_management/priv/www/js/global.js @@ -96,6 +96,7 @@ var ALL_COLUMNS = ['mode', 'Mode', true], ['state', 'State', true]], 'Details': [['msgs-unconfirmed', 'Unconfirmed', true], + ['consumer-count', 'Consumer count', false], ['prefetch', 'Prefetch', true], ['msgs-unacked', 'Unacked', true]], 'Transactions': [['msgs-uncommitted', 'Msgs uncommitted', false], diff --git a/deps/rabbitmq_management/priv/www/js/tmpl/channels-list.ejs b/deps/rabbitmq_management/priv/www/js/tmpl/channels-list.ejs index ef6c543bba..09a3435435 100644 --- a/deps/rabbitmq_management/priv/www/js/tmpl/channels-list.ejs +++ b/deps/rabbitmq_management/priv/www/js/tmpl/channels-list.ejs @@ -35,6 +35,9 @@ <% if (show_column('channels', 'msgs-unconfirmed')) { %> <%= fmt_sort('Unconfirmed', 'messages_unconfirmed') %> <% } %> +<% if (show_column('channels', 'consumer-count')) { %> + <%= fmt_sort('Consumer count', 'consumer_count') %> +<% } %> <% if (show_column('channels', 'prefetch')) { %> Prefetch <% } %> @@ -85,6 +88,9 @@ <% if (show_column('channels', 'msgs-unconfirmed')) { %> Unconfirmed <% } %> +<% if (show_column('channels', 'consumer-count')) { %> + Consumer count +<% } %> <% if (show_column('channels', 'prefetch')) { %> Prefetch <% } %> @@ -152,6 +158,9 @@ <% if (show_column('channels', 'msgs-unconfirmed')) { %> <%= channel.messages_unconfirmed %> <% } %> +<% if (show_column('channels', 'consumer-count')) { %> + <%= channel.consumer_count %> +<% } %> <% if (show_column('channels', 'prefetch')) { %> <% if (channel.prefetch_count != 0) { %> From 703ee8529e02fab6ae7995132ee401b41afad680 Mon Sep 17 00:00:00 2001 From: Michal Kuratczyk Date: Fri, 7 Feb 2025 15:51:40 +0100 Subject: [PATCH 02/15] Add rabbitmq_endpoint label to rabbitmq_identity_info --- ...etheus_rabbitmq_core_metrics_collector.erl | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl index 9ea8fcfa2d..1f4534495e 100644 --- a/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl +++ b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl @@ -304,22 +304,25 @@ collect_mf('detailed', Callback) -> collect(true, ?DETAILED_METRIC_NAME_PREFIX, vhosts_filter_from_pdict(), enabled_mfs_from_pdict(?METRICS_RAW), Callback), collect(true, ?CLUSTER_METRIC_NAME_PREFIX, vhosts_filter_from_pdict(), enabled_mfs_from_pdict(?METRICS_CLUSTER), Callback), %% identity is here to enable filtering on a cluster name (as already happens in existing dashboards) - emit_identity_info(Callback), + emit_identity_info(<<"detailed">>, Callback), ok; collect_mf('per-object', Callback) -> collect(true, ?METRIC_NAME_PREFIX, false, ?METRICS_RAW, Callback), totals(Callback), - emit_identity_info(Callback), + emit_identity_info(<<"per-object">>, Callback), ok; collect_mf('memory-breakdown', Callback) -> collect(false, ?METRIC_NAME_PREFIX, false, ?METRICS_MEMORY_BREAKDOWN, Callback), - emit_identity_info(Callback), + emit_identity_info(<<"memory-breakdown">>, Callback), ok; collect_mf(_Registry, Callback) -> PerObjectMetrics = application:get_env(rabbitmq_prometheus, return_per_object_metrics, false), collect(PerObjectMetrics, ?METRIC_NAME_PREFIX, false, ?METRICS_RAW, Callback), totals(Callback), - emit_identity_info(Callback), + case PerObjectMetrics of + true -> emit_identity_info(<<"per-object">>, Callback); + false -> emit_identity_info(<<"aggregated">>, Callback) + end, ok. collect(PerObjectMetrics, Prefix, VHostsFilter, IncludedMFs, Callback) -> @@ -336,9 +339,9 @@ totals(Callback) -> end || {Table, Name, Type, Help} <- ?TOTALS], ok. -emit_identity_info(Callback) -> +emit_identity_info(Endpoint, Callback) -> add_metric_family(build_info(), Callback), - add_metric_family(identity_info(), Callback), + add_metric_family(identity_info(Endpoint), Callback), ok. %% Aggregated `auth``_attempt_detailed_metrics` and @@ -387,7 +390,7 @@ build_info() -> }] }. -identity_info() -> +identity_info(Endpoint) -> { identity_info, untyped, @@ -396,7 +399,8 @@ identity_info() -> [ {rabbitmq_node, node()}, {rabbitmq_cluster, rabbit_nodes:cluster_name()}, - {rabbitmq_cluster_permanent_id, rabbit_nodes:persistent_cluster_id()} + {rabbitmq_cluster_permanent_id, rabbit_nodes:persistent_cluster_id()}, + {rabbitmq_endpoint, Endpoint} ], 1 }] From 5cbda4c838591373b254d091f9775f1cf6e6ba40 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-S=C3=A9bastien=20P=C3=A9dron?= Date: Tue, 11 Feb 2025 14:50:54 +0100 Subject: [PATCH 03/15] rabbit_stream_queue_SUITE: Swap uses of node 2 and 3 in `format` [Why] We hit some transient errors with the previous order when doing mixed-version testing. Swapping the nodes seems to fix the problem. --- deps/rabbit/test/rabbit_stream_queue_SUITE.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl index 03acbe3efe..f22bba0985 100644 --- a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl +++ b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl @@ -1565,13 +1565,13 @@ format(Config) -> case length(Nodes) of 3 -> [_, Server2, Server3] = Nodes, - ok = rabbit_control_helper:command(stop_app, Server2), ok = rabbit_control_helper:command(stop_app, Server3), + ok = rabbit_control_helper:command(stop_app, Server2), Fmt2 = rabbit_ct_broker_helpers:rpc(Config, Server, rabbit_stream_queue, ?FUNCTION_NAME, [QRecord, #{}]), - ok = rabbit_control_helper:command(start_app, Server2), ok = rabbit_control_helper:command(start_app, Server3), + ok = rabbit_control_helper:command(start_app, Server2), ?assertEqual(stream, proplists:get_value(type, Fmt2)), ?assertEqual(minority, proplists:get_value(state, Fmt2)), ?assertEqual(Server, proplists:get_value(leader, Fmt2)), From 38cba9d63de2420322967d6307c21928a97e5f42 Mon Sep 17 00:00:00 2001 From: David Ansari Date: Tue, 11 Feb 2025 18:06:01 +0100 Subject: [PATCH 04/15] Fix MQTT test flake in Khepri mixed version mode The following test flaked in CI under Khepri in mixed version mode: ``` make -C deps/rabbitmq_mqtt ct-v5 t=cluster_size_3:will_delay_node_restart RABBITMQ_METADATA_STORE=khepri SECONDARY_DIST=rabbitmq_server-4.0.5 FULL=1 ``` The first node took exactly 30 seconds for draining: ``` 2025-02-10 15:00:09.550824+00:00 [debug] <0.1449.0> MQTT accepting TCP connection <0.1449.0> (127.0.0.1:33376 -> 127.0.0.1:27005) 2025-02-10 15:00:09.550992+00:00 [debug] <0.1449.0> Received a CONNECT, client ID: sub0, username: undefined, clean start: true, protocol version: 5, keepalive: 60, property names: ['Session-Expiry-Interval'] 2025-02-10 15:00:09.551134+00:00 [debug] <0.1449.0> MQTT connection 127.0.0.1:33376 -> 127.0.0.1:27005 picked vhost using plugin_configuration_or_default_vhost 2025-02-10 15:00:09.551219+00:00 [debug] <0.1449.0> User 'guest' authenticated successfully by backend rabbit_auth_backend_internal 2025-02-10 15:00:09.551530+00:00 [info] <0.1449.0> Accepted MQTT connection 127.0.0.1:33376 -> 127.0.0.1:27005 for client ID sub0 2025-02-10 15:00:09.551651+00:00 [debug] <0.1449.0> Received a SUBSCRIBE with subscription(s) [{mqtt_subscription,<<"my/topic">>, 2025-02-10 15:00:09.551651+00:00 [debug] <0.1449.0> {mqtt_subscription_opts,0,false, 2025-02-10 15:00:09.551651+00:00 [debug] <0.1449.0> false,0,undefined}}] 2025-02-10 15:00:09.556233+00:00 [debug] <0.896.0> RabbitMQ metadata store: follower leader cast - redirecting to {rabbitmq_metadata,'rmq-ct-mqtt-cluster_size_3-2-27054@localhost'} 2025-02-10 15:00:09.561518+00:00 [debug] <0.1456.0> MQTT accepting TCP connection <0.1456.0> (127.0.0.1:33390 -> 127.0.0.1:27005) 2025-02-10 15:00:09.561634+00:00 [debug] <0.1456.0> Received a CONNECT, client ID: will, username: undefined, clean start: true, protocol version: 5, keepalive: 60, property names: ['Session-Expiry-Interval'] 2025-02-10 15:00:09.561715+00:00 [debug] <0.1456.0> MQTT connection 127.0.0.1:33390 -> 127.0.0.1:27005 picked vhost using plugin_configuration_or_default_vhost 2025-02-10 15:00:09.561828+00:00 [debug] <0.1456.0> User 'guest' authenticated successfully by backend rabbit_auth_backend_internal 2025-02-10 15:00:09.562596+00:00 [info] <0.1456.0> Accepted MQTT connection 127.0.0.1:33390 -> 127.0.0.1:27005 for client ID will 2025-02-10 15:00:09.565743+00:00 [warning] <0.1460.0> This node is being put into maintenance (drain) mode 2025-02-10 15:00:09.565833+00:00 [debug] <0.1460.0> Marking the node as undergoing maintenance 2025-02-10 15:00:09.570772+00:00 [info] <0.1460.0> Marked this node as undergoing maintenance 2025-02-10 15:00:09.570904+00:00 [info] <0.1460.0> Asked to suspend 9 client connection listeners. No new client connections will be accepted until these listeners are resumed! 2025-02-10 15:00:09.572268+00:00 [warning] <0.1460.0> Suspended all listeners and will no longer accept client connections 2025-02-10 15:00:09.572317+00:00 [warning] <0.1460.0> Closed 0 local client connections 2025-02-10 15:00:09.572418+00:00 [warning] <0.1449.0> MQTT disconnecting client <<"127.0.0.1:33376 -> 127.0.0.1:27005">> with client ID 'sub0', reason: maintenance 2025-02-10 15:00:09.572414+00:00 [warning] <0.1000.0> Closed 2 local (Web) MQTT client connections 2025-02-10 15:00:09.572499+00:00 [warning] <0.1456.0> MQTT disconnecting client <<"127.0.0.1:33390 -> 127.0.0.1:27005">> with client ID 'will', reason: maintenance 2025-02-10 15:00:09.572866+00:00 [alert] <0.1000.0> Closed 0 local STOMP client connections 2025-02-10 15:00:09.577432+00:00 [debug] <0.1456.0> scheduled delayed Will Message to topic my/topic for MQTT client ID will to be sent in 10000 ms 2025-02-10 15:00:12.991328+00:00 [debug] <0.1469.0> Will reconcile virtual host processes on all cluster members... 2025-02-10 15:00:12.991443+00:00 [debug] <0.1469.0> Will make sure that processes of 1 virtual hosts are running on all reachable cluster nodes 2025-02-10 15:00:12.992497+00:00 [debug] <0.1469.0> Done with virtual host processes reconciliation (run 3) 2025-02-10 15:00:16.511733+00:00 [debug] <0.1476.0> Will reconcile virtual host processes on all cluster members... 2025-02-10 15:00:16.511864+00:00 [debug] <0.1476.0> Will make sure that processes of 1 virtual hosts are running on all reachable cluster nodes 2025-02-10 15:00:16.514293+00:00 [debug] <0.1476.0> Done with virtual host processes reconciliation (run 4) 2025-02-10 15:00:24.897477+00:00 [debug] <0.1479.0> Will reconcile virtual host processes on all cluster members... 2025-02-10 15:00:24.897607+00:00 [debug] <0.1479.0> Will make sure that processes of 1 virtual hosts are running on all reachable cluster nodes 2025-02-10 15:00:24.898483+00:00 [debug] <0.1479.0> Done with virtual host processes reconciliation (run 5) 2025-02-10 15:00:24.898527+00:00 [debug] <0.1479.0> Will reschedule virtual host process reconciliation after 30 seconds 2025-02-10 15:00:32.994347+00:00 [debug] <0.1484.0> Will reconcile virtual host processes on all cluster members... 2025-02-10 15:00:32.994474+00:00 [debug] <0.1484.0> Will make sure that processes of 1 virtual hosts are running on all reachable cluster nodes 2025-02-10 15:00:32.996539+00:00 [debug] <0.1484.0> Done with virtual host processes reconciliation (run 6) 2025-02-10 15:00:32.996585+00:00 [debug] <0.1484.0> Will reschedule virtual host process reconciliation after 30 seconds 2025-02-10 15:00:39.576325+00:00 [info] <0.1460.0> Will transfer leadership of 0 quorum queues with current leader on this node 2025-02-10 15:00:39.576456+00:00 [info] <0.1460.0> Leadership transfer for quorum queues hosted on this node has been initiated 2025-02-10 15:00:39.576948+00:00 [info] <0.1460.0> Will stop local follower replicas of 0 quorum queues on this node 2025-02-10 15:00:39.576990+00:00 [info] <0.1460.0> Stopped all local replicas of quorum queues hosted on this node 2025-02-10 15:00:39.577120+00:00 [info] <0.1460.0> Will transfer leadership of metadata store with current leader on this node 2025-02-10 15:00:39.577282+00:00 [info] <0.1460.0> Khepri clustering: transferring leadership to node 'rmq-ct-mqtt-cluster_size_3-2-27054@localhost' 2025-02-10 15:00:39.577424+00:00 [info] <0.1460.0> Khepri clustering: skipping leadership transfer, leader is already in node 'rmq-ct-mqtt-cluster_size_3-2-27054@localhost' 2025-02-10 15:00:39.577547+00:00 [info] <0.1460.0> Leadership transfer for metadata store on this node has been done. The new leader is 'rmq-ct-mqtt-cluster_size_3-2-27054@localhost' 2025-02-10 15:00:39.577674+00:00 [info] <0.1460.0> Node is ready to be shut down for maintenance or upgrade 2025-02-10 15:00:39.595638+00:00 [notice] <0.64.0> SIGTERM received - shutting down 2025-02-10 15:00:39.595638+00:00 [notice] <0.64.0> 2025-02-10 15:00:39.595758+00:00 [debug] <0.44.0> Running rabbit_prelaunch:shutdown_func() as part of `kernel` shutdown ``` Running the same test locally revealed that [rabbit_maintenance:status_consistent_read/1](https://github.com/rabbitmq/rabbitmq-server/blob/55ae91809433d9e6edfcc98563bcb2f0736ee79e/deps/rabbit/src/rabbit_maintenance.erl#L131) takes exactly 30 seconds to complete. The test case assumes a Will Delay higher than the time it takes to drain and shut down the node. Hence, this commit increases the Will Delay time from 10 seconds to 40 seconds. --- deps/rabbitmq_mqtt/test/v5_SUITE.erl | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/deps/rabbitmq_mqtt/test/v5_SUITE.erl b/deps/rabbitmq_mqtt/test/v5_SUITE.erl index 043addb9a0..3021785731 100644 --- a/deps/rabbitmq_mqtt/test/v5_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/v5_SUITE.erl @@ -1665,7 +1665,8 @@ will_delay_node_restart(Config) -> {ok, _, [0]} = emqtt:subscribe(Sub0a, Topic), Sub1 = connect(<<"sub1">>, Config, 1, []), {ok, _, [0]} = emqtt:subscribe(Sub1, Topic), - WillDelaySecs = 10, + %% In mixed version mode with Khepri, draining the node can take 30 seconds. + WillDelaySecs = 40, C0a = connect(<<"will">>, Config, 0, [{properties, #{'Session-Expiry-Interval' => 900}}, {will_props, #{'Will-Delay-Interval' => WillDelaySecs}}, From a92a04cfb1584b93f9b9a24f173d1d3d68519029 Mon Sep 17 00:00:00 2001 From: Iliia Khaprov - VMware by Broadcom Date: Wed, 12 Feb 2025 14:39:16 +0100 Subject: [PATCH 05/15] Fix Elixir.RabbitMQ.CLI.Ctl.Commands.DeleteShovelCommand rabbit_shovel_dyn_worker_sup_sup doesn't export stop_and_delete_child It exports stop_child which in turn calls stop_and_delete_child. --- ...Elixir.RabbitMQ.CLI.Ctl.Commands.DeleteShovelCommand.erl | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/deps/rabbitmq_shovel/src/Elixir.RabbitMQ.CLI.Ctl.Commands.DeleteShovelCommand.erl b/deps/rabbitmq_shovel/src/Elixir.RabbitMQ.CLI.Ctl.Commands.DeleteShovelCommand.erl index 1058541578..0529e6a207 100644 --- a/deps/rabbitmq_shovel/src/Elixir.RabbitMQ.CLI.Ctl.Commands.DeleteShovelCommand.erl +++ b/deps/rabbitmq_shovel/src/Elixir.RabbitMQ.CLI.Ctl.Commands.DeleteShovelCommand.erl @@ -84,10 +84,6 @@ run([Name], #{node := Node, vhost := VHost}) -> {badrpc, _} = Error -> Error; {error, not_found} -> - ErrMsg = rabbit_misc:format("Shovel with the given name was not found " - "on the target node '~ts' and/or virtual host '~ts'. " - "It may be failing to connect and report its state, will delete its runtime parameter...", - [Node, VHost]), try_force_removing(HostingNode, VHost, Name, ActingUser), {error, rabbit_data_coercion:to_binary(ErrMsg)}; ok -> @@ -117,4 +113,4 @@ try_clearing_runtime_parameter(Node, VHost, ShovelName, ActingUser) -> _ = rabbit_misc:rpc_call(Node, rabbit_runtime_parameters, clear, [VHost, <<"shovel">>, ShovelName, ActingUser]). try_stopping_child_process(Node, VHost, ShovelName) -> - _ = rabbit_misc:rpc_call(Node, rabbit_shovel_dyn_worker_sup_sup, stop_and_delete_child, [{VHost, ShovelName}]). + _ = rabbit_misc:rpc_call(Node, rabbit_shovel_dyn_worker_sup_sup, stop_child, [{VHost, ShovelName}]). From b0a9f145e139924b4c94777d50775797901191a3 Mon Sep 17 00:00:00 2001 From: Marcial Rosales Date: Wed, 12 Feb 2025 16:55:31 +0100 Subject: [PATCH 06/15] Add clear cache command --- deps/rabbitmq_auth_backend_cache/Makefile | 2 + ...CLI.Ctl.Commands.AuthClearCacheCommand.erl | 83 +++++++++++++++++++ 2 files changed, 85 insertions(+) create mode 100644 deps/rabbitmq_auth_backend_cache/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AuthClearCacheCommand.erl diff --git a/deps/rabbitmq_auth_backend_cache/Makefile b/deps/rabbitmq_auth_backend_cache/Makefile index 4a91fb69bb..6a16429ed5 100644 --- a/deps/rabbitmq_auth_backend_cache/Makefile +++ b/deps/rabbitmq_auth_backend_cache/Makefile @@ -19,6 +19,8 @@ endef DEPS = rabbit_common rabbit TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers +PLT_APPS += rabbitmqctl + DEP_EARLY_PLUGINS = rabbit_common/mk/rabbitmq-early-plugin.mk DEP_PLUGINS = rabbit_common/mk/rabbitmq-plugin.mk diff --git a/deps/rabbitmq_auth_backend_cache/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AuthClearCacheCommand.erl b/deps/rabbitmq_auth_backend_cache/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AuthClearCacheCommand.erl new file mode 100644 index 0000000000..00888b8486 --- /dev/null +++ b/deps/rabbitmq_auth_backend_cache/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AuthClearCacheCommand.erl @@ -0,0 +1,83 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% + +-module('Elixir.RabbitMQ.CLI.Ctl.Commands.AuthClearCacheCommand'). + +-behaviour('Elixir.RabbitMQ.CLI.CommandBehaviour'). + +-export([ + usage/0, + usage_additional/0, + usage_doc_guides/0, + flags/0, + validate/2, + merge_defaults/2, + banner/2, + run/2, + switches/0, + aliases/0, + output/2, + scopes/0, + formatter/0, + help_section/0, + description/0 + ]). + + +%%---------------------------------------------------------------------------- +%% Callbacks +%%---------------------------------------------------------------------------- +scopes() -> + [vmware, ctl]. + +switches() -> + []. + +usage() -> + <<"auth_clear_cache">>. + +usage_additional() -> + []. + +usage_doc_guides() -> + []. + +help_section() -> + {plugin, rabbitmq_auth_backend_cache}. + +description() -> + <<"Clear cache of authorization decisions">>. + +flags() -> + []. + +validate(_, _) -> + ok. + +formatter() -> + 'Elixir.RabbitMQ.CLI.Formatters.Table'. + +merge_defaults(A, O) -> + {A, O}. + +banner(_, _) -> + erlang:iolist_to_binary([<<"Will delete all cached authorization decisions">>]). + +run(_Args, #{node := Node}) -> + case rabbit_misc:rpc_call(Node, rabbit_auth_backend_cache, clear_cache_cluster_wide, []) of + {badrpc, _} = Error -> + Error; + Deleted -> + Deleted + end. + +aliases() -> + []. + +output(Value, _Opts) -> + 'Elixir.RabbitMQ.CLI.DefaultOutput':output(Value). + \ No newline at end of file From 2ab890f3446760a3b97f31294e98a20916d06035 Mon Sep 17 00:00:00 2001 From: Marcial Rosales Date: Wed, 12 Feb 2025 17:15:51 +0100 Subject: [PATCH 07/15] Fix flake on rabbitmq_mqtt auth_SUITE (#13180) * Separate invalid client test from the valid one * Apply same changes from pr #13197 * Deal with stalereferences caused by timing issues looking up objects in the DOM * Unlink before assertion --- deps/rabbitmq_mqtt/test/auth_SUITE.erl | 13 ++-- selenium/package.json | 2 +- selenium/test/pageobjects/BasePage.js | 88 ++++++++++++++++++-------- 3 files changed, 69 insertions(+), 34 deletions(-) diff --git a/deps/rabbitmq_mqtt/test/auth_SUITE.erl b/deps/rabbitmq_mqtt/test/auth_SUITE.erl index 51bfe7b291..a7a4ea78f1 100644 --- a/deps/rabbitmq_mqtt/test/auth_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/auth_SUITE.erl @@ -68,9 +68,11 @@ sub_groups() -> ssl_user_vhost_parameter_mapping_vhost_does_not_exist, ssl_user_cert_vhost_mapping_takes_precedence_over_port_vhost_mapping ]}, + {ssl_user_with_invalid_client_id_in_cert_san_dns, [], + [invalid_client_id_from_cert_san_dns + ]}, {ssl_user_with_client_id_in_cert_san_dns, [], - [client_id_from_cert_san_dns, - invalid_client_id_from_cert_san_dns + [client_id_from_cert_san_dns ]}, {ssl_user_with_client_id_in_cert_san_dns_1, [], [client_id_from_cert_san_dns_1 @@ -207,7 +209,8 @@ mqtt_config(no_ssl_user) -> mqtt_config(client_id_propagation) -> {rabbitmq_mqtt, [{ssl_cert_login, true}, {allow_anonymous, true}]}; -mqtt_config(ssl_user_with_client_id_in_cert_san_dns) -> +mqtt_config(T) when T == ssl_user_with_client_id_in_cert_san_dns; + T == ssl_user_with_invalid_client_id_in_cert_san_dns -> {rabbitmq_mqtt, [{ssl_cert_login, true}, {allow_anonymous, false}, {ssl_cert_client_id_from, subject_alternative_name}, @@ -588,8 +591,8 @@ client_id_from_cert_dn(Config) -> invalid_client_id_from_cert_san_dns(Config) -> MqttClientId = <<"other_client_id">>, {ok, C} = connect_ssl(MqttClientId, Config), - ?assertMatch({error, _}, emqtt:connect(C)), - unlink(C). + unlink(C), + {error, {client_identifier_not_valid, _}} = emqtt:connect(C). ssl_user_vhost_parameter_mapping_success(Config) -> expect_successful_connection(fun connect_ssl/1, Config). diff --git a/selenium/package.json b/selenium/package.json index 251a751f09..a0dca54d43 100644 --- a/selenium/package.json +++ b/selenium/package.json @@ -12,7 +12,7 @@ "author": "", "license": "ISC", "dependencies": { - "chromedriver": "^130.0.4", + "chromedriver": "^132.0", "ejs": "^3.1.8", "express": "^4.18.2", "geckodriver": "^3.0.2", diff --git a/selenium/test/pageobjects/BasePage.js b/selenium/test/pageobjects/BasePage.js index b543115208..dd6ff22302 100644 --- a/selenium/test/pageobjects/BasePage.js +++ b/selenium/test/pageobjects/BasePage.js @@ -45,6 +45,7 @@ module.exports = class BasePage { return this.selectOption(SELECT_REFRESH, option) } async waitForOverviewTab() { + await this.driver.sleep(250) return this.waitForDisplayed(OVERVIEW_TAB) } @@ -56,6 +57,7 @@ module.exports = class BasePage { return this.click(CONNECTIONS_TAB) } async waitForConnectionsTab() { + await this.driver.sleep(250) return this.waitForDisplayed(CONNECTIONS_TAB) } @@ -63,6 +65,7 @@ module.exports = class BasePage { return this.click(ADMIN_TAB) } async waitForAdminTab() { + await this.driver.sleep(250) return this.waitForDisplayed(ADMIN_TAB) } @@ -70,6 +73,7 @@ module.exports = class BasePage { return this.click(CHANNELS_TAB) } async waitForChannelsTab() { + await this.driver.sleep(250) return this.waitForDisplayed(CHANNELS_TAB) } @@ -77,6 +81,7 @@ module.exports = class BasePage { return this.click(EXCHANGES_TAB) } async waitForExchangesTab() { + await this.driver.sleep(250) return this.waitForDisplayed(EXCHANGES_TAB) } @@ -180,42 +185,69 @@ module.exports = class BasePage { } async waitForLocated (locator) { - try { - return this.driver.wait(until.elementLocated(locator), this.timeout, - 'Timed out after [timeout=' + this.timeout + ';polling=' + this.polling + '] seconds locating ' + locator, - this.polling) - }catch(error) { - if (!error.name.includes("NoSuchSessionError")) { - console.error("Failed waitForLocated " + locator + " due to " + error) - } - throw error - } + let attempts = 3 + let retry = false + let rethrowError = null + do { + try { + return this.driver.wait(until.elementLocated(locator), this.timeout, + 'Timed out after [timeout=' + this.timeout + ';polling=' + this.polling + '] seconds locating ' + locator, + this.polling) + }catch(error) { + if (error.name.includes("StaleElementReferenceError")) { + retry = true + }else if (!error.name.includes("NoSuchSessionError")) { + console.error("Failed waitForLocated " + locator + " due to " + error) + retry = false + } + rethrowError = error + } + } while (retry && --attempts > 0) + throw rethrowError } async waitForVisible (element) { - try { - return this.driver.wait(until.elementIsVisible(element), this.timeout, - 'Timed out after [timeout=' + this.timeout + ';polling=' + this.polling + '] awaiting till visible ' + element, - this.polling) - }catch(error) { - if (!error.name.includes("NoSuchSessionError")) { - console.error("Failed to find visible element " + element + " due to " + error) + let attempts = 3 + let retry = false + let rethrowError = null + do { + try { + return this.driver.wait(until.elementIsVisible(element), this.timeout, + 'Timed out after [timeout=' + this.timeout + ';polling=' + this.polling + '] awaiting till visible ' + element, + this.polling) + }catch(error) { + if (error.name.includes("StaleElementReferenceError")) { + retry = true + }else if (!error.name.includes("NoSuchSessionError")) { + console.error("Failed to find visible element " + element + " due to " + error) + retry = false + } + rethrowError = error } - throw error - } + } while (retry && --attempts > 0) + throw rethrowError } async waitForDisplayed (locator) { - if (this.interactionDelay && this.interactionDelay > 0) await this.driver.sleep(this.interactionDelay) - try { - return this.waitForVisible(await this.waitForLocated(locator)) - }catch(error) { - if (!error.name.includes("NoSuchSessionError")) { - console.error("Failed to waitForDisplayed " + locator + " due to " + error) - } - throw error - } + let attempts = 3 + let retry = false + let rethrowError = null + do { + if (this.interactionDelay && this.interactionDelay > 0) await this.driver.sleep(this.interactionDelay) + try { + return this.waitForVisible(await this.waitForLocated(locator)) + }catch(error) { + if (error.name.includes("StaleElementReferenceError")) { + retry = true + }else if (!error.name.includes("NoSuchSessionError")) { + retry = false + console.error("Failed to waitForDisplayed " + locator + " due to " + error) + } + rethrowError = error + } + } while (retry && --attempts > 0 ) + throw rethrowError } async getText (locator) { From 06ec8f0342ae120a7a6b48a90392df052555d4e8 Mon Sep 17 00:00:00 2001 From: David Ansari Date: Sun, 9 Feb 2025 13:28:41 +0100 Subject: [PATCH 08/15] Orderly shutdown of sessions Make AMQP 1.0 connection shut down its sessions before sending the close frame to the client similar to how the AMQP 0.9.1 connection shuts down its channels before closing the connection. This commit avoids concurrent deletion of exclusive queues by the session process and the classic queue process. This commit should also fix https://github.com/rabbitmq/rabbitmq-server/issues/2596 --- deps/rabbit/include/rabbit_amqp_reader.hrl | 2 ++ deps/rabbit/src/rabbit_amqp_reader.erl | 36 ++++++++++++++++++++-- deps/rabbit/src/rabbit_amqp_session.erl | 4 ++- 3 files changed, 39 insertions(+), 3 deletions(-) diff --git a/deps/rabbit/include/rabbit_amqp_reader.hrl b/deps/rabbit/include/rabbit_amqp_reader.hrl index 0077a9c9c2..732bc9f043 100644 --- a/deps/rabbit/include/rabbit_amqp_reader.hrl +++ b/deps/rabbit/include/rabbit_amqp_reader.hrl @@ -3,6 +3,8 @@ -define(CLOSING_TIMEOUT, 30_000). -define(SILENT_CLOSE_DELAY, 3_000). +-define(SHUTDOWN_SESSIONS_TIMEOUT, 10_000). + %% Allow for potentially large sets of tokens during the SASL exchange. %% https://docs.oasis-open.org/amqp/amqp-cbs/v1.0/csd01/amqp-cbs-v1.0-csd01.html#_Toc67999915 -define(INITIAL_MAX_FRAME_SIZE, 8192). diff --git a/deps/rabbit/src/rabbit_amqp_reader.erl b/deps/rabbit/src/rabbit_amqp_reader.erl index 423aa84ed8..f18387fb0a 100644 --- a/deps/rabbit/src/rabbit_amqp_reader.erl +++ b/deps/rabbit/src/rabbit_amqp_reader.erl @@ -220,10 +220,17 @@ terminate(_, _) -> %%-------------------------------------------------------------------------- %% error handling / termination -close(Error, State = #v1{connection = #v1_connection{timeout = Timeout}}) -> +close(Error, State0 = #v1{connection = #v1_connection{timeout = Timeout}}) -> %% Client properties will be emitted in the connection_closed event by rabbit_reader. - ClientProperties = i(client_properties, State), + ClientProperties = i(client_properties, State0), put(client_properties, ClientProperties), + + %% "It is illegal to send any more frames (or bytes of any other kind) + %% after sending a close frame." [2.7.9] + %% Sessions might send frames via the writer proc. + %% Therefore, let's first try to orderly shutdown our sessions. + State = shutdown_sessions(State0), + Time = case Timeout > 0 andalso Timeout < ?CLOSING_TIMEOUT of true -> Timeout; @@ -233,6 +240,31 @@ close(Error, State = #v1{connection = #v1_connection{timeout = Timeout}}) -> ok = send_on_channel0(State, #'v1_0.close'{error = Error}, amqp10_framing), State#v1{connection_state = closed}. +shutdown_sessions(#v1{tracked_channels = Channels} = State) -> + maps:foreach(fun(_ChannelNum, Pid) -> + gen_server:cast(Pid, shutdown) + end, Channels), + TimerRef = erlang:send_after(?SHUTDOWN_SESSIONS_TIMEOUT, + self(), + shutdown_sessions_timeout), + wait_for_shutdown_sessions(TimerRef, State). + +wait_for_shutdown_sessions(TimerRef, #v1{tracked_channels = Channels} = State) + when map_size(Channels) =:= 0 -> + ok = erlang:cancel_timer(TimerRef, [{async, false}, + {info, false}]), + State; +wait_for_shutdown_sessions(TimerRef, #v1{tracked_channels = Channels} = State0) -> + receive + {{'DOWN', ChannelNum}, _MRef, process, SessionPid, _Reason} -> + State = untrack_channel(ChannelNum, SessionPid, State0), + wait_for_shutdown_sessions(TimerRef, State); + shutdown_sessions_timeout -> + ?LOG_INFO("sessions not shut down after ~b ms: ~p", + [?SHUTDOWN_SESSIONS_TIMEOUT, Channels]), + State0 + end. + handle_session_exit(ChannelNum, SessionPid, Reason, State0) -> State = untrack_channel(ChannelNum, SessionPid, State0), S = case terminated_normally(Reason) of diff --git a/deps/rabbit/src/rabbit_amqp_session.erl b/deps/rabbit/src/rabbit_amqp_session.erl index b23c492d3b..2ecc5728b5 100644 --- a/deps/rabbit/src/rabbit_amqp_session.erl +++ b/deps/rabbit/src/rabbit_amqp_session.erl @@ -602,7 +602,9 @@ handle_cast({reset_authz, User}, #state{cfg = Cfg} = State0) -> noreply(State) catch exit:#'v1_0.error'{} = Error -> log_error_and_close_session(Error, State1) - end. + end; +handle_cast(shutdown, State) -> + {stop, normal, State}. log_error_and_close_session( Error, State = #state{cfg = #cfg{reader_pid = ReaderPid, From 9062476a180ee1e167a9ecd27025eaffe6f84186 Mon Sep 17 00:00:00 2001 From: David Ansari Date: Tue, 4 Feb 2025 18:45:24 +0100 Subject: [PATCH 09/15] Support dynamic creation of queues MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What? Support the `dynamic` field of sources and targets. ## Why? 1. This allows AMQP clients to dynamically create exclusive queues, which can be useful for RPC workloads. 2. Support creation of JMS temporary queues over AMQP using the Qpid JMS client. Exclusive queues map very nicely to JMS temporary queues because: > Although sessions are used to create temporary destinations, this is only for convenience. Their scope is actually the entire connection. Their lifetime is that of their connection and any of the connection’s sessions are allowed to create a consumer for them. https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#creating-temporary-destinations ## How? If the terminus contains the capability `temporary-queue` as defined in [amqp-bindmap-jms-v1.0-wd10](https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=67638) [5.2] and as sent by Qpid JMS client, RabbitMQ will create an exclusive queue. (This allows a future commit to take other actions if capability `temporary-topic` will be used, such as the additional creation of bindings.) No matter what the desired node properties are, RabbitMQ will set the lifetime policy delete-on-close deleting the exclusive queue when the link which caused its creation ceases to exist. This means the exclusive queue will be deleted if either: * the link gets detached, or * the session ends, or * the connection closes Although the AMQP JMS Mapping and Qpid JMS create only a **sending** link with `dynamic=true`, this commit also supports **receiving** links with `dynamic=true` for non-JMS AMQP clients. RabbitMQ is free to choose the generated queue name. As suggested by the AMQP spec, the generated queue name will contain the container-id and link name unless they are very long. Co-authored-by: Arnaud Cogoluègnes --- .../src/amqp10_client_session.erl | 36 +- deps/rabbit/src/rabbit_amqp_reader.erl | 6 +- deps/rabbit/src/rabbit_amqp_session.erl | 313 +++++++++++++----- deps/rabbit/test/amqp_auth_SUITE.erl | 91 +++++ deps/rabbit/test/amqp_client_SUITE.erl | 228 +++++++++++++ deps/rabbit/test/amqp_jms_SUITE.erl | 50 ++- .../java/com/rabbitmq/amqp/tests/jms/Cli.java | 163 +++++++++ .../amqp/tests/jms/JmsConnectionTest.java | 199 +++++++++++ .../amqp/tests/jms/JmsTemporaryQueueTest.java | 135 ++++++++ .../com/rabbitmq/amqp/tests/jms/JmsTest.java | 57 +++- .../rabbitmq/amqp/tests/jms/TestUtils.java | 66 ++++ 11 files changed, 1243 insertions(+), 101 deletions(-) create mode 100644 deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/Cli.java create mode 100644 deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsConnectionTest.java create mode 100644 deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsTemporaryQueueTest.java create mode 100644 deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/TestUtils.java diff --git a/deps/amqp10_client/src/amqp10_client_session.erl b/deps/amqp10_client/src/amqp10_client_session.erl index 7b74180587..435cce8aed 100644 --- a/deps/amqp10_client/src/amqp10_client_session.erl +++ b/deps/amqp10_client/src/amqp10_client_session.erl @@ -698,23 +698,39 @@ build_frames(Channel, Trf, Payload, MaxPayloadSize, Acc) -> make_source(#{role := {sender, _}}) -> #'v1_0.source'{}; -make_source(#{role := {receiver, #{address := Address} = Source, _Pid}, filter := Filter}) -> +make_source(#{role := {receiver, Source, _Pid}, + filter := Filter}) -> Durable = translate_terminus_durability(maps:get(durable, Source, none)), + Dynamic = maps:get(dynamic, Source, false), TranslatedFilter = translate_filters(Filter), - #'v1_0.source'{address = {utf8, Address}, + #'v1_0.source'{address = make_address(Source), durable = {uint, Durable}, - filter = TranslatedFilter}. + dynamic = Dynamic, + filter = TranslatedFilter, + capabilities = make_capabilities(Source)}. make_target(#{role := {receiver, _Source, _Pid}}) -> #'v1_0.target'{}; -make_target(#{role := {sender, #{address := Address} = Target}}) -> +make_target(#{role := {sender, Target}}) -> Durable = translate_terminus_durability(maps:get(durable, Target, none)), - TargetAddr = case is_binary(Address) of - true -> {utf8, Address}; - false -> Address - end, - #'v1_0.target'{address = TargetAddr, - durable = {uint, Durable}}. + Dynamic = maps:get(dynamic, Target, false), + #'v1_0.target'{address = make_address(Target), + durable = {uint, Durable}, + dynamic = Dynamic, + capabilities = make_capabilities(Target)}. + +make_address(#{address := Addr}) -> + if is_binary(Addr) -> + {utf8, Addr}; + is_atom(Addr) -> + Addr + end. + +make_capabilities(#{capabilities := Caps0}) -> + Caps = [{symbol, C} || C <- Caps0], + {array, symbol, Caps}; +make_capabilities(_) -> + undefined. max_message_size(#{max_message_size := Size}) when is_integer(Size) andalso diff --git a/deps/rabbit/src/rabbit_amqp_reader.erl b/deps/rabbit/src/rabbit_amqp_reader.erl index f18387fb0a..3e5d5cc08d 100644 --- a/deps/rabbit/src/rabbit_amqp_reader.erl +++ b/deps/rabbit/src/rabbit_amqp_reader.erl @@ -260,8 +260,8 @@ wait_for_shutdown_sessions(TimerRef, #v1{tracked_channels = Channels} = State0) State = untrack_channel(ChannelNum, SessionPid, State0), wait_for_shutdown_sessions(TimerRef, State); shutdown_sessions_timeout -> - ?LOG_INFO("sessions not shut down after ~b ms: ~p", - [?SHUTDOWN_SESSIONS_TIMEOUT, Channels]), + ?LOG_INFO("sessions running ~b ms after requested to be shut down: ~p", + [?SHUTDOWN_SESSIONS_TIMEOUT, maps:values(Channels)]), State0 end. @@ -792,6 +792,7 @@ send_to_new_session( connection = #v1_connection{outgoing_max_frame_size = MaxFrame, vhost = Vhost, user = User, + container_id = ContainerId, name = ConnName}, writer = WriterPid} = State) -> %% Subtract fixed frame header size. @@ -804,6 +805,7 @@ send_to_new_session( OutgoingMaxFrameSize, User, Vhost, + ContainerId, ConnName, BeginFrame], case rabbit_amqp_session_sup:start_session(SessionSup, ChildArgs) of diff --git a/deps/rabbit/src/rabbit_amqp_session.erl b/deps/rabbit/src/rabbit_amqp_session.erl index 2ecc5728b5..4ad681707a 100644 --- a/deps/rabbit/src/rabbit_amqp_session.erl +++ b/deps/rabbit/src/rabbit_amqp_session.erl @@ -85,8 +85,10 @@ -define(MAX_PERMISSION_CACHE_SIZE, 12). -define(HIBERNATE_AFTER, 6_000). -define(CREDIT_REPLY_TIMEOUT, 30_000). +%% Capability defined in amqp-bindmap-jms-v1.0-wd10 [5.2] and sent by Qpid JMS client. +-define(CAP_TEMPORARY_QUEUE, <<"temporary-queue">>). --export([start_link/8, +-export([start_link/9, process_frame/2, list_local/0, conserve_resources/3, @@ -163,6 +165,7 @@ routing_key :: rabbit_types:routing_key() | to | subject, %% queue_name_bin is only set if the link target address refers to a queue. queue_name_bin :: undefined | rabbit_misc:resource_name(), + dynamic :: boolean(), max_message_size :: pos_integer(), delivery_count :: sequence_no(), credit :: rabbit_queue_type:credit(), @@ -206,6 +209,7 @@ %% or a topic filter, an outgoing link will always consume from a queue. queue_name :: rabbit_amqqueue:name(), queue_type :: rabbit_queue_type:queue_type(), + dynamic :: boolean(), send_settled :: boolean(), max_message_size :: unlimited | pos_integer(), @@ -260,6 +264,7 @@ -record(cfg, { outgoing_max_frame_size :: unlimited | pos_integer(), + container_id :: binary(), reader_pid :: rabbit_types:connection(), writer_pid :: pid(), user :: rabbit_types:user(), @@ -382,15 +387,17 @@ -type state() :: #state{}. -start_link(ReaderPid, WriterPid, ChannelNum, FrameMax, User, Vhost, ConnName, BeginFrame) -> - Args = {ReaderPid, WriterPid, ChannelNum, FrameMax, User, Vhost, ConnName, BeginFrame}, +start_link(ReaderPid, WriterPid, ChannelNum, FrameMax, + User, Vhost, ContainerId, ConnName, BeginFrame) -> + Args = {ReaderPid, WriterPid, ChannelNum, FrameMax, + User, Vhost, ContainerId, ConnName, BeginFrame}, Opts = [{hibernate_after, ?HIBERNATE_AFTER}], gen_server:start_link(?MODULE, Args, Opts). process_frame(Pid, FrameBody) -> gen_server:cast(Pid, {frame_body, FrameBody}). -init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ConnName, +init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ContainerId, ConnName, #'v1_0.begin'{ %% "If a session is locally initiated, the remote-channel MUST NOT be set." [2.7.2] remote_channel = undefined, @@ -401,6 +408,7 @@ init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ConnName, process_flag(trap_exit, true), rabbit_process_flag:adjust_for_message_handling_proc(), logger:update_process_metadata(#{channel_number => ChannelNum, + amqp_container => ContainerId, connection => ConnName, vhost => Vhost, user => User#user.username}), @@ -453,7 +461,8 @@ init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ConnName, remote_incoming_window = RemoteIncomingWindow, remote_outgoing_window = RemoteOutgoingWindow, outgoing_delivery_id = ?INITIAL_OUTGOING_DELIVERY_ID, - cfg = #cfg{reader_pid = ReaderPid, + cfg = #cfg{container_id = ContainerId, + reader_pid = ReaderPid, writer_pid = WriterPid, outgoing_max_frame_size = MaxFrameSize, user = User, @@ -470,14 +479,17 @@ init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ConnName, terminate(_Reason, #state{incoming_links = IncomingLinks, outgoing_links = OutgoingLinks, - queue_states = QStates}) -> + queue_states = QStates, + cfg = Cfg}) -> maps:foreach( - fun (_, _) -> - rabbit_global_counters:publisher_deleted(?PROTOCOL) + fun (_, Link) -> + rabbit_global_counters:publisher_deleted(?PROTOCOL), + maybe_delete_dynamic_queue(Link, Cfg) end, IncomingLinks), maps:foreach( - fun (_, _) -> - rabbit_global_counters:consumer_deleted(?PROTOCOL) + fun (_, Link) -> + rabbit_global_counters:consumer_deleted(?PROTOCOL), + maybe_delete_dynamic_queue(Link, Cfg) end, OutgoingLinks), ok = rabbit_queue_type:close(QStates). @@ -1094,39 +1106,52 @@ handle_frame(#'v1_0.attach'{handle = ?UINT(Handle)} = Attach, end; handle_frame(Detach = #'v1_0.detach'{handle = ?UINT(HandleInt)}, - State0 = #state{incoming_links = IncomingLinks, + State0 = #state{incoming_links = IncomingLinks0, outgoing_links = OutgoingLinks0, outgoing_unsettled_map = Unsettled0, outgoing_pending = Pending0, queue_states = QStates0, - cfg = #cfg{user = #user{username = Username}}}) -> + cfg = Cfg = #cfg{user = #user{username = Username}}}) -> {OutgoingLinks, Unsettled, Pending, QStates} = case maps:take(HandleInt, OutgoingLinks0) of - {#outgoing_link{queue_name = QName}, OutgoingLinks1} -> + {#outgoing_link{queue_name = QName, + dynamic = Dynamic}, OutgoingLinks1} -> Ctag = handle_to_ctag(HandleInt), {Unsettled1, Pending1} = remove_outgoing_link(Ctag, Unsettled0, Pending0), - case rabbit_amqqueue:lookup(QName) of - {ok, Q} -> - Spec = #{consumer_tag => Ctag, - reason => remove, - user => Username}, - case rabbit_queue_type:cancel(Q, Spec, QStates0) of - {ok, QStates1} -> - {OutgoingLinks1, Unsettled1, Pending1, QStates1}; - {error, Reason} -> - protocol_error( - ?V_1_0_AMQP_ERROR_INTERNAL_ERROR, - "Failed to remove consumer from ~s: ~tp", - [rabbit_misc:rs(amqqueue:get_name(Q)), Reason]) - end; - {error, not_found} -> - {OutgoingLinks1, Unsettled1, Pending1, QStates0} + case Dynamic of + true -> + delete_dynamic_queue(QName, Cfg), + {OutgoingLinks1, Unsettled1, Pending1, QStates0}; + false -> + case rabbit_amqqueue:lookup(QName) of + {ok, Q} -> + Spec = #{consumer_tag => Ctag, + reason => remove, + user => Username}, + case rabbit_queue_type:cancel(Q, Spec, QStates0) of + {ok, QStates1} -> + {OutgoingLinks1, Unsettled1, Pending1, QStates1}; + {error, Reason} -> + protocol_error( + ?V_1_0_AMQP_ERROR_INTERNAL_ERROR, + "Failed to remove consumer from ~s: ~tp", + [rabbit_misc:rs(amqqueue:get_name(Q)), Reason]) + end; + {error, not_found} -> + {OutgoingLinks1, Unsettled1, Pending1, QStates0} + end end; error -> {OutgoingLinks0, Unsettled0, Pending0, QStates0} end, - - State1 = State0#state{incoming_links = maps:remove(HandleInt, IncomingLinks), + IncomingLinks = case maps:take(HandleInt, IncomingLinks0) of + {IncomingLink, IncomingLinks1} -> + maybe_delete_dynamic_queue(IncomingLink, Cfg), + IncomingLinks1; + error -> + IncomingLinks0 + end, + State1 = State0#state{incoming_links = IncomingLinks, outgoing_links = OutgoingLinks, outgoing_unsettled_map = Unsettled, outgoing_pending = Pending, @@ -1271,29 +1296,33 @@ handle_attach(#'v1_0.attach'{ reply_frames([Reply], State); handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_SENDER, - name = LinkName = {utf8, LinkName0}, + name = LinkName = {utf8, LinkNameBin}, handle = Handle = ?UINT(HandleInt), source = Source, snd_settle_mode = MaybeSndSettleMode, - target = Target = #'v1_0.target'{address = TargetAddress}, + target = Target0, initial_delivery_count = DeliveryCount = ?UINT(DeliveryCountInt) }, State0 = #state{incoming_links = IncomingLinks0, permission_cache = PermCache0, - cfg = #cfg{max_link_credit = MaxLinkCredit, + cfg = #cfg{container_id = ContainerId, + reader_pid = ReaderPid, + max_link_credit = MaxLinkCredit, vhost = Vhost, user = User}}) -> - case ensure_target(Target, Vhost, User, PermCache0) of - {ok, Exchange, RoutingKey, QNameBin, PermCache} -> + case ensure_target(Target0, LinkNameBin, Vhost, User, + ContainerId, ReaderPid, PermCache0) of + {ok, Exchange, RoutingKey, QNameBin, Target, PermCache} -> SndSettleMode = snd_settle_mode(MaybeSndSettleMode), MaxMessageSize = persistent_term:get(max_message_size), IncomingLink = #incoming_link{ - name = LinkName0, + name = LinkNameBin, snd_settle_mode = SndSettleMode, - target_address = address(TargetAddress), + target_address = address(Target#'v1_0.target'.address), exchange = Exchange, routing_key = RoutingKey, queue_name_bin = QNameBin, + dynamic = default(Target#'v1_0.target'.dynamic, false), max_message_size = MaxMessageSize, delivery_count = DeliveryCountInt, credit = MaxLinkCredit}, @@ -1327,10 +1356,9 @@ handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_SENDER, end; handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER, - name = LinkName = {utf8, LinkName0}, + name = LinkName = {utf8, LinkNameBin}, handle = Handle = ?UINT(HandleInt), - source = Source = #'v1_0.source'{address = SourceAddress, - filter = DesiredFilter}, + source = Source0 = #'v1_0.source'{filter = DesiredFilter}, snd_settle_mode = SndSettleMode, rcv_settle_mode = RcvSettleMode, max_message_size = MaybeMaxMessageSize, @@ -1341,6 +1369,7 @@ handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER, topic_permission_cache = TopicPermCache0, cfg = #cfg{vhost = Vhost, user = User = #user{username = Username}, + container_id = ContainerId, reader_pid = ReaderPid}}) -> {SndSettled, EffectiveSndSettleMode} = case SndSettleMode of @@ -1352,10 +1381,11 @@ handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER, %% client only for durable messages. {false, ?V_1_0_SENDER_SETTLE_MODE_UNSETTLED} end, - case ensure_source(Source, Vhost, User, PermCache0, TopicPermCache0) of + case ensure_source(Source0, LinkNameBin, Vhost, User, ContainerId, + ReaderPid, PermCache0, TopicPermCache0) of {error, Reason} -> protocol_error(?V_1_0_AMQP_ERROR_INVALID_FIELD, "Attach rejected: ~tp", [Reason]); - {ok, QName = #resource{name = QNameBin}, PermCache1, TopicPermCache} -> + {ok, QName = #resource{name = QNameBin}, Source, PermCache1, TopicPermCache} -> PermCache = check_resource_access(QName, read, User, PermCache1), case rabbit_amqqueue:with( QName, @@ -1441,12 +1471,14 @@ handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER, %% Echo back that we will respect the client's requested max-message-size. max_message_size = MaybeMaxMessageSize, offered_capabilities = OfferedCaps}, + {utf8, SourceAddress} = Source#'v1_0.source'.address, MaxMessageSize = max_message_size(MaybeMaxMessageSize), Link = #outgoing_link{ - name = LinkName0, - source_address = address(SourceAddress), + name = LinkNameBin, + source_address = SourceAddress, queue_name = queue_resource(Vhost, QNameBin), queue_type = QType, + dynamic = default(Source#'v1_0.source'.dynamic, false), send_settled = SndSettled, max_message_size = MaxMessageSize, credit_api_version = CreditApiVsn, @@ -2616,17 +2648,53 @@ maybe_grant_mgmt_link_credit(Credit, _, _) -> {Credit, []}. -spec ensure_source(#'v1_0.source'{}, + binary(), rabbit_types:vhost(), rabbit_types:user(), + binary(), + rabbit_types:connection(), permission_cache(), topic_permission_cache()) -> - {ok, rabbit_amqqueue:name(), permission_cache(), topic_permission_cache()} | + {ok, + rabbit_amqqueue:name(), + #'v1_0.source'{}, + permission_cache(), + topic_permission_cache()} | {error, term()}. -ensure_source(#'v1_0.source'{dynamic = true}, _, _, _, _) -> - exit_not_implemented("Dynamic sources not supported"); -ensure_source(#'v1_0.source'{address = Address, - durable = Durable}, - Vhost, User, PermCache, TopicPermCache) -> +ensure_source(#'v1_0.source'{ + address = undefined, + dynamic = true, + %% We will reply with the actual node properties. + dynamic_node_properties = _IgnoreDesiredProperties, + capabilities = {array, symbol, Caps} + } = Source0, + LinkName, Vhost, User, ContainerId, + ConnPid, PermCache0, TopicPermCache) -> + case lists:member({symbol, ?CAP_TEMPORARY_QUEUE}, Caps) of + true -> + {QNameBin, Address, Props, PermCache} = + declare_dynamic_queue(ContainerId, LinkName, Vhost, User, ConnPid, PermCache0), + Source = Source0#'v1_0.source'{ + address = {utf8, Address}, + %% While Khepri stores queue records durably, the terminus + %% - i.e. the existence of this receiver - is not stored durably. + durable = ?V_1_0_TERMINUS_DURABILITY_NONE, + expiry_policy = ?V_1_0_TERMINUS_EXPIRY_POLICY_LINK_DETACH, + timeout = {uint, 0}, + dynamic_node_properties = Props, + distribution_mode = ?V_1_0_STD_DIST_MODE_MOVE, + capabilities = rabbit_amqp_util:capabilities([?CAP_TEMPORARY_QUEUE]) + }, + QName = queue_resource(Vhost, QNameBin), + {ok, QName, Source, PermCache, TopicPermCache}; + false -> + exit_not_implemented("Dynamic source not supported: ~p", [Source0]) + end; +ensure_source(Source = #'v1_0.source'{dynamic = true}, _, _, _, _, _, _, _) -> + exit_not_implemented("Dynamic source not supported: ~p", [Source]); +ensure_source(Source = #'v1_0.source'{address = Address, + durable = Durable}, + _LinkName, Vhost, User, _ContainerId, _ConnPid, PermCache, TopicPermCache) -> case Address of {utf8, <<"/queues/", QNameBinQuoted/binary>>} -> %% The only possible v2 source address format is: @@ -2635,15 +2703,20 @@ ensure_source(#'v1_0.source'{address = Address, QNameBin -> QName = queue_resource(Vhost, QNameBin), ok = exit_if_absent(QName), - {ok, QName, PermCache, TopicPermCache} + {ok, QName, Source, PermCache, TopicPermCache} catch error:_ -> {error, {bad_address, Address}} end; {utf8, SourceAddr} -> case address_v1_permitted() of true -> - ensure_source_v1(SourceAddr, Vhost, User, Durable, - PermCache, TopicPermCache); + case ensure_source_v1(SourceAddr, Vhost, User, Durable, + PermCache, TopicPermCache) of + {ok, QName, PermCache1, TopicPermCache1} -> + {ok, QName, Source, PermCache1, TopicPermCache1}; + Err -> + Err + end; false -> {error, {amqp_address_v1_not_permitted, Address}} end; @@ -2689,42 +2762,71 @@ ensure_source_v1(Address, Err end. -address(undefined) -> - null; -address({utf8, String}) -> - String. - -spec ensure_target(#'v1_0.target'{}, + binary(), rabbit_types:vhost(), rabbit_types:user(), + binary(), + rabbit_types:connection(), permission_cache()) -> {ok, rabbit_types:exchange() | rabbit_exchange:name() | to, rabbit_types:routing_key() | to | subject, rabbit_misc:resource_name() | undefined, + #'v1_0.target'{}, permission_cache()} | {error, term()}. -ensure_target(#'v1_0.target'{dynamic = true}, _, _, _) -> - exit_not_implemented("Dynamic targets not supported"); -ensure_target(#'v1_0.target'{address = Address, - durable = Durable}, - Vhost, User, PermCache) -> +ensure_target(#'v1_0.target'{ + address = undefined, + dynamic = true, + %% We will reply with the actual node properties. + dynamic_node_properties = _IgnoreDesiredProperties, + capabilities = {array, symbol, Caps} + } = Target0, + LinkName, Vhost, User, ContainerId, ConnPid, PermCache0) -> + case lists:member({symbol, ?CAP_TEMPORARY_QUEUE}, Caps) of + true -> + {QNameBin, Address, Props, PermCache1} = + declare_dynamic_queue(ContainerId, LinkName, Vhost, User, ConnPid, PermCache0), + {ok, Exchange, PermCache} = check_exchange(?DEFAULT_EXCHANGE_NAME, User, Vhost, PermCache1), + Target = #'v1_0.target'{ + address = {utf8, Address}, + %% While Khepri stores queue records durably, + %% the terminus - i.e. the existence of this producer - is not stored durably. + durable = ?V_1_0_TERMINUS_DURABILITY_NONE, + expiry_policy = ?V_1_0_TERMINUS_EXPIRY_POLICY_LINK_DETACH, + timeout = {uint, 0}, + dynamic = true, + dynamic_node_properties = Props, + capabilities = rabbit_amqp_util:capabilities([?CAP_TEMPORARY_QUEUE]) + }, + {ok, Exchange, QNameBin, QNameBin, Target, PermCache}; + false -> + exit_not_implemented("Dynamic target not supported: ~p", [Target0]) + end; +ensure_target(Target = #'v1_0.target'{dynamic = true}, _, _, _, _, _, _) -> + exit_not_implemented("Dynamic target not supported: ~p", [Target]); +ensure_target(Target = #'v1_0.target'{address = Address, + durable = Durable}, + _LinkName, Vhost, User, _ContainerId, _ConnPid, PermCache0) -> case target_address_version(Address) of 2 -> case ensure_target_v2(Address, Vhost) of {ok, to, RKey, QNameBin} -> - {ok, to, RKey, QNameBin, PermCache}; + {ok, to, RKey, QNameBin, Target, PermCache0}; {ok, XNameBin, RKey, QNameBin} -> - check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache); + {ok, Exchange, PermCache} = check_exchange(XNameBin, User, Vhost, PermCache0), + {ok, Exchange, RKey, QNameBin, Target, PermCache}; {error, _} = Err -> Err end; 1 -> case address_v1_permitted() of true -> - case ensure_target_v1(Address, Vhost, User, Durable, PermCache) of + case ensure_target_v1(Address, Vhost, User, Durable, PermCache0) of {ok, XNameBin, RKey, QNameBin, PermCache1} -> - check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache1); + {ok, Exchange, PermCache} = check_exchange(XNameBin, User, Vhost, PermCache1), + {ok, Exchange, RKey, QNameBin, Target, PermCache}; {error, _} = Err -> Err end; @@ -2733,7 +2835,7 @@ ensure_target(#'v1_0.target'{address = Address, end end. -check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache0) -> +check_exchange(XNameBin, User, Vhost, PermCache0) -> XName = exchange_resource(Vhost, XNameBin), PermCache = check_resource_access(XName, write, User, PermCache0), case rabbit_exchange:lookup(XName) of @@ -2747,7 +2849,7 @@ check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache0) -> <<"amq.", _/binary>> -> X; _ -> XName end, - {ok, Exchange, RKey, QNameBin, PermCache}; + {ok, Exchange, PermCache}; {error, not_found} -> exit_not_found(XName) end. @@ -3035,7 +3137,10 @@ credit_reply_timeout(QType, QName) -> protocol_error(?V_1_0_AMQP_ERROR_INTERNAL_ERROR, Fmt, Args). default(undefined, Default) -> Default; -default(Thing, _Default) -> Thing. +default(Thing, _Default) -> Thing. + +address(undefined) -> null; +address({utf8, String}) -> String. snd_settle_mode({ubyte, Val}) -> case Val of @@ -3249,20 +3354,20 @@ ensure_terminus(Type, {exchange, {XNameList, _RoutingKey}}, Vhost, User, Durabil ok = exit_if_absent(exchange, Vhost, XNameList), case Type of target -> {undefined, PermCache}; - source -> declare_queue(generate_queue_name(), Vhost, User, Durability, PermCache) + source -> declare_queue_v1(generate_queue_name_v1(), Vhost, User, Durability, PermCache) end; ensure_terminus(target, {topic, _bindingkey}, _, _, _, PermCache) -> %% exchange amq.topic exists {undefined, PermCache}; ensure_terminus(source, {topic, _BindingKey}, Vhost, User, Durability, PermCache) -> %% exchange amq.topic exists - declare_queue(generate_queue_name(), Vhost, User, Durability, PermCache); + declare_queue_v1(generate_queue_name_v1(), Vhost, User, Durability, PermCache); ensure_terminus(target, {queue, undefined}, _, _, _, PermCache) -> %% Target "/queue" means publish to default exchange with message subject as routing key. %% Default exchange exists. {undefined, PermCache}; ensure_terminus(_, {queue, QNameList}, Vhost, User, Durability, PermCache) -> - declare_queue(unicode:characters_to_binary(QNameList), Vhost, User, Durability, PermCache); + declare_queue_v1(unicode:characters_to_binary(QNameList), Vhost, User, Durability, PermCache); ensure_terminus(_, {amqqueue, QNameList}, Vhost, _, _, PermCache) -> %% Target "/amq/queue/" is handled specially due to AMQP legacy: %% "Queue names starting with "amq." are reserved for pre-declared and @@ -3287,22 +3392,39 @@ exit_if_absent(ResourceName = #resource{kind = Kind}) -> false -> exit_not_found(ResourceName) end. -generate_queue_name() -> +generate_queue_name_v1() -> rabbit_guid:binary(rabbit_guid:gen_secure(), "amq.gen"). +%% "The generated name of the address SHOULD include the link name and the +%% container-id of the remote container to allow for ease of identification." [3.5.4] +%% Let's include container-id and link name if they are not very long +%% because the generated address might be sent in every message. +generate_queue_name_dynamic(ContainerId, LinkName) + when byte_size(ContainerId) + byte_size(LinkName) < 150 -> + Prefix = <<"amq.dyn-", ContainerId/binary, "-", LinkName/binary>>, + rabbit_guid:binary(rabbit_guid:gen_secure(), Prefix); +generate_queue_name_dynamic(_, _) -> + rabbit_guid:binary(rabbit_guid:gen_secure(), "amq.dyn.gen"). + +declare_queue_v1(QNameBin, Vhost, User, TerminusDurability, PermCache0) -> + Durable = queue_is_durable(TerminusDurability), + {ok, PermCache} = declare_queue(QNameBin, Vhost, User, Durable, none, PermCache0), + {QNameBin, PermCache}. + declare_queue(QNameBin, Vhost, User = #user{username = Username}, - TerminusDurability, + Durable, + QOwner, PermCache0) -> QName = queue_resource(Vhost, QNameBin), PermCache = check_resource_access(QName, configure, User, PermCache0), rabbit_core_metrics:queue_declared(QName), Q0 = amqqueue:new(QName, _Pid = none, - queue_is_durable(TerminusDurability), + Durable, _AutoDelete = false, - _QOwner = none, + QOwner, _QArgs = [], Vhost, #{user => Username}, @@ -3322,7 +3444,40 @@ declare_queue(QNameBin, "Failed to declare ~s: ~p", [rabbit_misc:rs(QName), Other]) end, - {QNameBin, PermCache}. + {ok, PermCache}. + +declare_dynamic_queue(ContainerId, LinkName, Vhost, User, ConnPid, PermCache0) -> + QNameBin = generate_queue_name_dynamic(ContainerId, LinkName), + {ok, PermCache} = declare_queue(QNameBin, Vhost, User, true, ConnPid, PermCache0), + QNameBinQuoted = uri_string:quote(QNameBin), + Address = <<"/queues/", QNameBinQuoted/binary>>, + Props = {map, [{{symbol, <<"lifetime-policy">>}, + {described, ?V_1_0_SYMBOL_DELETE_ON_CLOSE, {list, []}}}, + {{symbol, <<"supported-dist-modes">>}, + {array, symbol, [?V_1_0_STD_DIST_MODE_MOVE]}}]}, + {QNameBin, Address, Props, PermCache}. + +maybe_delete_dynamic_queue(#incoming_link{dynamic = true, + queue_name_bin = QNameBin}, + Cfg = #cfg{vhost = Vhost}) -> + QName = queue_resource(Vhost, QNameBin), + delete_dynamic_queue(QName, Cfg); +maybe_delete_dynamic_queue(#outgoing_link{dynamic = true, + queue_name = QName}, + Cfg) -> + delete_dynamic_queue(QName, Cfg); +maybe_delete_dynamic_queue(_, _) -> + ok. + +delete_dynamic_queue(QName, #cfg{user = #user{username = Username}}) -> + %% No real need to check for 'configure' access again since this queue is owned by + %% this connection and the user had 'configure' access when the queue got declared. + _ = rabbit_amqqueue:with( + QName, + fun(Q) -> + rabbit_queue_type:delete(Q, false, false, Username) + end), + ok. outcomes(#'v1_0.source'{outcomes = undefined}) -> {array, symbol, ?OUTCOMES}; diff --git a/deps/rabbit/test/amqp_auth_SUITE.erl b/deps/rabbit/test/amqp_auth_SUITE.erl index 581351c462..5889cbdd50 100644 --- a/deps/rabbit/test/amqp_auth_SUITE.erl +++ b/deps/rabbit/test/amqp_auth_SUITE.erl @@ -55,9 +55,12 @@ groups() -> [ %% authz attach_source_queue, + attach_source_queue_dynamic, attach_target_exchange, attach_target_topic_exchange, attach_target_queue, + attach_target_queue_dynamic_exchange_write, + attach_target_queue_dynamic_queue_configure, target_per_message_exchange, target_per_message_internal_exchange, target_per_message_topic, @@ -437,6 +440,39 @@ attach_source_queue(Config) -> end, ok = close_connection_sync(Conn). +attach_source_queue_dynamic(Config) -> + OpnConf = connection_config(Config), + {ok, Connection} = amqp10_client:open_connection(OpnConf), + {ok, Session} = amqp10_client:begin_session_sync(Connection), + + %% missing configure permission to queue + ok = set_permissions(Config, <<>>, <<".*">>, <<".*">>), + + Source = #{address => undefined, + dynamic => true, + capabilities => [<<"temporary-queue">>], + durable => none}, + AttachArgs = #{name => <<"my link">>, + role => {receiver, Source, self()}, + snd_settle_mode => unsettled, + rcv_settle_mode => first, + filter => #{}}, + {ok, _Recv} = amqp10_client:attach_link(Session, AttachArgs), + receive {amqp10_event, + {session, Session, + {ended, Error}}} -> + #'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, + description = {utf8, Description}} = Error, + ?assertEqual( + match, + re:run(Description, + <<"^configure access to queue 'amq\.dyn-.*' in vhost " + "'test vhost' refused for user 'test user'$">>, + [{capture, none}])) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) + end, + ok = close_connection_sync(Connection). + attach_target_exchange(Config) -> XName = <<"amq.fanout">>, Address1 = rabbitmq_amqp_address:exchange(XName), @@ -485,6 +521,61 @@ attach_target_queue(Config) -> end, ok = amqp10_client:close_connection(Conn). +attach_target_queue_dynamic_exchange_write(Config) -> + OpnConf = connection_config(Config), + {ok, Connection} = amqp10_client:open_connection(OpnConf), + {ok, Session} = amqp10_client:begin_session_sync(Connection), + + %% missing write permission to default exchange + ok = set_permissions(Config, <<".*">>, <<>>, <<".*">>), + + Target = #{address => undefined, + dynamic => true, + capabilities => [<<"temporary-queue">>]}, + AttachArgs = #{name => <<"my link">>, + role => {sender, Target}, + snd_settle_mode => mixed, + rcv_settle_mode => first}, + {ok, _Recv} = amqp10_client:attach_link(Session, AttachArgs), + ExpectedErr = error_unauthorized( + <<"write access to exchange 'amq.default' ", + "in vhost 'test vhost' refused for user 'test user'">>), + receive {amqp10_event, {session, Session, {ended, ExpectedErr}}} -> ok + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) + end, + ok = close_connection_sync(Connection). + +attach_target_queue_dynamic_queue_configure(Config) -> + OpnConf = connection_config(Config), + {ok, Connection} = amqp10_client:open_connection(OpnConf), + {ok, Session} = amqp10_client:begin_session_sync(Connection), + + %% missing configure permission to queue + ok = set_permissions(Config, <<>>, <<".*">>, <<".*">>), + + Target = #{address => undefined, + dynamic => true, + capabilities => [<<"temporary-queue">>]}, + AttachArgs = #{name => <<"my link">>, + role => {sender, Target}, + snd_settle_mode => mixed, + rcv_settle_mode => first}, + {ok, _Recv} = amqp10_client:attach_link(Session, AttachArgs), + receive {amqp10_event, + {session, Session, + {ended, Error}}} -> + #'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, + description = {utf8, Description}} = Error, + ?assertEqual( + match, + re:run(Description, + <<"^configure access to queue 'amq\.dyn-.*' in vhost " + "'test vhost' refused for user 'test user'$">>, + [{capture, none}])) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) + end, + ok = close_connection_sync(Connection). + target_per_message_exchange(Config) -> TargetAddress = null, To1 = rabbitmq_amqp_address:exchange(<<"amq.fanout">>), diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index 17d997a78a..3c3f47574d 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -130,6 +130,10 @@ groups() -> handshake_timeout, credential_expires, attach_to_exclusive_queue, + dynamic_target_short_link_name, + dynamic_target_long_link_name, + dynamic_source_rpc, + dynamic_terminus_delete, modified_classic_queue, modified_quorum_queue, modified_dead_letter_headers_exchange, @@ -4762,6 +4766,230 @@ attach_to_exclusive_queue(Config) -> #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), ok = rabbit_ct_client_helpers:close_channel(Ch). +dynamic_target_short_link_name(Config) -> + OpnConf0 = connection_config(Config), + OpnConf = OpnConf0#{container_id := <<"my-container">>, + notify_with_performative => true}, + {ok, Connection} = amqp10_client:open_connection(OpnConf), + {ok, Session} = amqp10_client:begin_session_sync(Connection), + + %% "The address of the target MUST NOT be set" [3.5.4] + Target = #{address => undefined, + dynamic => true, + capabilities => [<<"temporary-queue">>]}, + ShortLinkName = <<"my/sender">>, + AttachArgs = #{name => ShortLinkName, + role => {sender, Target}, + snd_settle_mode => mixed, + rcv_settle_mode => first}, + {ok, Sender} = amqp10_client:attach_link(Session, AttachArgs), + Addr = receive {amqp10_event, {link, Sender, {attached, Attach}}} -> + #'v1_0.attach'{ + target = #'v1_0.target'{ + address = {utf8, Address}, + dynamic = true}} = Attach, + Address + after 30000 -> ct:fail({missing_event, ?LINE}) + end, + %% The client doesn't really care what the address looks like. + %% However let's do whitebox testing here and check the address format. + %% We expect the address to contain both container ID and link name since they are short. + ?assertMatch(<<"/queues/amq.dyn-my-container-my%2Fsender-", _GUID/binary>>, Addr), + ok = wait_for_credit(Sender), + ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"t1">>, <<"m1">>)), + ok = wait_for_accepted(<<"t1">>), + + {ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"my-receiver">>, Addr, unsettled), + {ok, Msg} = amqp10_client:get_msg(Receiver), + ?assertEqual(<<"m1">>, amqp10_msg:body_bin(Msg)), + ok = amqp10_client:accept_msg(Receiver, Msg), + + %% The exclusive queue should be deleted when we close our connection. + ?assertMatch([_ExclusiveQueue], rpc(Config, rabbit_amqqueue, list, [])), + ok = close_connection_sync(Connection), + eventually(?_assertEqual([], rpc(Config, rabbit_amqqueue, list, []))), + ok. + +dynamic_target_long_link_name(Config) -> + OpnConf0 = connection_config(Config), + OpnConf = OpnConf0#{container_id := <<"my-container">>, + notify_with_performative => true}, + {ok, Connection} = amqp10_client:open_connection(OpnConf), + {ok, Session} = amqp10_client:begin_session_sync(Connection), + + %% "The address of the target MUST NOT be set" [3.5.4] + Target = #{address => undefined, + dynamic => true, + capabilities => [<<"temporary-queue">>]}, + LongLinkName = binary:copy(<<"z">>, 200), + AttachArgs = #{name => LongLinkName, + role => {sender, Target}, + snd_settle_mode => mixed, + rcv_settle_mode => first}, + {ok, Sender} = amqp10_client:attach_link(Session, AttachArgs), + Addr = receive {amqp10_event, {link, Sender, {attached, Attach}}} -> + #'v1_0.attach'{ + target = #'v1_0.target'{ + address = {utf8, Address}, + dynamic = true}} = Attach, + Address + after 30000 -> ct:fail({missing_event, ?LINE}) + end, + %% The client doesn't really care what the address looks like. + %% However let's do whitebox testing here and check the address format. + %% We expect the address to not contain the long link name. + ?assertMatch(<<"/queues/amq.dyn.gen-", _GUID/binary>>, Addr), + ok = wait_for_credit(Sender), + ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"t1">>, <<"m1">>)), + ok = wait_for_accepted(<<"t1">>), + + {ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"my-receiver">>, Addr, unsettled), + {ok, Msg} = amqp10_client:get_msg(Receiver), + ?assertEqual(<<"m1">>, amqp10_msg:body_bin(Msg)), + ok = amqp10_client:accept_msg(Receiver, Msg), + flush(accepted), + + %% Since RabbitMQ uses the delete-on-close lifetime policy, the exclusive queue should be + %% "deleted at the point that the link which caused its creation ceases to exist" [3.5.10] + ok = amqp10_client:detach_link(Sender), + receive {amqp10_event, {link, Receiver, {detached, Detach}}} -> + ?assertMatch( + #'v1_0.detach'{error = #'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_RESOURCE_DELETED}}, + Detach) + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + ok = close_connection_sync(Connection). + +%% Test the following RPC workflow: +%% RPC client -> queue -> RPC server +%% RPC server -> dynamic queue -> RPC client +dynamic_source_rpc(Config) -> + OpnConf0 = connection_config(Config), + OpnConf = OpnConf0#{container_id := <<"rpc-client">>, + notify_with_performative => true}, + {ok, ConnectionClient} = amqp10_client:open_connection(OpnConf), + {ok, SessionClient} = amqp10_client:begin_session_sync(ConnectionClient), + + %% "The address of the source MUST NOT be set" [3.5.3] + Source = #{address => undefined, + dynamic => true, + capabilities => [<<"temporary-queue">>], + durable => none}, + AttachArgs = #{name => <<"rpc-client-receiver🥕"/utf8>>, + role => {receiver, Source, self()}, + snd_settle_mode => unsettled, + rcv_settle_mode => first, + filter => #{}}, + {ok, ReceiverClient} = amqp10_client:attach_link(SessionClient, AttachArgs), + RespAddr = receive {amqp10_event, {link, ReceiverClient, {attached, Attach}}} -> + #'v1_0.attach'{ + source = #'v1_0.source'{ + address = {utf8, Address}, + dynamic = true}} = Attach, + Address + after 30000 -> ct:fail({missing_event, ?LINE}) + end, + %% The client doesn't really care what the address looks like. + %% However let's do whitebox testing here and check the address format. + %% We expect the address to contain both container ID and link name since they are short. + ?assertMatch(<<"/queues/amq.dyn-rpc-client-rpc-client-receiver", _CarrotAndGUID/binary>>, + RespAddr), + + %% Let's use a separate connection for the RPC server. + {_, SessionServer, LinkPair} = RpcServer = init(Config), + ReqQName = atom_to_binary(?FUNCTION_NAME), + ReqAddr = rabbitmq_amqp_address:queue(ReqQName), + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, ReqQName, #{}), + {ok, ReceiverServer} = amqp10_client:attach_receiver_link(SessionServer, <<"rpc-server-receiver">>, ReqAddr, unsettled), + {ok, SenderServer} = amqp10_client:attach_sender_link(SessionServer, <<"rpc-server-sender">>, null), + ok = wait_for_credit(SenderServer), + + {ok, SenderClient} = amqp10_client:attach_sender_link(SessionClient, <<"rpc-client-sender">>, ReqAddr), + wait_for_credit(SenderClient), + flush(attached), + + ok = amqp10_client:send_msg( + SenderClient, + amqp10_msg:set_properties( + #{reply_to => RespAddr}, + amqp10_msg:new(<<"t1">>, <<"hello">>))), + ok = wait_for_accepted(<<"t1">>), + + {ok, ReqMsg} = amqp10_client:get_msg(ReceiverServer), + ReqBody = amqp10_msg:body_bin(ReqMsg), + RespBody = string:uppercase(ReqBody), + #{reply_to := ReplyTo} = amqp10_msg:properties(ReqMsg), + ok = amqp10_client:send_msg( + SenderServer, + amqp10_msg:set_properties( + #{to => ReplyTo}, + amqp10_msg:new(<<"t2">>, RespBody))), + ok = wait_for_accepted(<<"t2">>), + ok = amqp10_client:accept_msg(ReceiverServer, ReqMsg), + + {ok, RespMsg} = amqp10_client:get_msg(ReceiverClient), + ?assertEqual(<<"HELLO">>, amqp10_msg:body_bin(RespMsg)), + ok = amqp10_client:accept_msg(ReceiverClient, RespMsg), + + ok = detach_link_sync(ReceiverServer), + ok = detach_link_sync(SenderClient), + {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, ReqQName), + ok = detach_link_sync(SenderServer), + ok = close(RpcServer), + ok = close_connection_sync(ConnectionClient). + +dynamic_terminus_delete(Config) -> + OpnConf = connection_config(Config), + {ok, Connection} = amqp10_client:open_connection(OpnConf), + {ok, Session1} = amqp10_client:begin_session_sync(Connection), + {ok, Session2} = amqp10_client:begin_session_sync(Connection), + + Terminus = #{address => undefined, + dynamic => true, + capabilities => [<<"temporary-queue">>], + durable => none}, + RcvAttachArgs = #{role => {receiver, Terminus, self()}, + snd_settle_mode => unsettled, + rcv_settle_mode => first, + filter => #{}}, + SndAttachArgs = #{role => {sender, Terminus}, + snd_settle_mode => mixed, + rcv_settle_mode => first}, + RcvAttachArgs1 = RcvAttachArgs#{name => <<"receiver 1">>}, + RcvAttachArgs2 = RcvAttachArgs#{name => <<"receiver 2">>}, + RcvAttachArgs3 = RcvAttachArgs#{name => <<"receiver 3">>}, + SndAttachArgs1 = SndAttachArgs#{name => <<"sender 1">>}, + SndAttachArgs2 = SndAttachArgs#{name => <<"sender 2">>}, + SndAttachArgs3 = SndAttachArgs#{name => <<"sender 3">>}, + {ok, _R1} = amqp10_client:attach_link(Session1, RcvAttachArgs1), + {ok, _R2} = amqp10_client:attach_link(Session2, RcvAttachArgs2), + {ok, R3} = amqp10_client:attach_link(Session2, RcvAttachArgs3), + {ok, _S1} = amqp10_client:attach_link(Session1, SndAttachArgs1), + {ok, _S2} = amqp10_client:attach_link(Session2, SndAttachArgs2), + {ok, S3} = amqp10_client:attach_link(Session2, SndAttachArgs3), + [receive {amqp10_event, {link, _LinkRef, attached}} -> ok + after 30000 -> ct:fail({missing_event, ?LINE}) + end + || _ <- lists:seq(1, 6)], + + %% We should now have 6 exclusive queues. + ?assertEqual(6, rpc(Config, rabbit_amqqueue, count, [])), + + %% Since RabbitMQ uses the delete-on-close lifetime policy, the exclusive queue should be + %% "deleted at the point that the link which caused its creation ceases to exist" [3.5.10] + ok = detach_link_sync(R3), + ok = detach_link_sync(S3), + ?assertEqual(4, rpc(Config, rabbit_amqqueue, count, [])), + + %% When a session is ended, the sessions's links cease to exist. + ok = end_session_sync(Session2), + eventually(?_assertEqual(2, rpc(Config, rabbit_amqqueue, count, []))), + + %% When a connection is closed, the connection's links cease to exist. + ok = close_connection_sync(Connection), + eventually(?_assertEqual(0, rpc(Config, rabbit_amqqueue, count, []))), + ok. + priority_classic_queue(Config) -> QArgs = #{<<"x-queue-type">> => {utf8, <<"classic">>}, <<"x-max-priority">> => {ulong, 10}}, diff --git a/deps/rabbit/test/amqp_jms_SUITE.erl b/deps/rabbit/test/amqp_jms_SUITE.erl index a97bd5d68b..baad72b014 100644 --- a/deps/rabbit/test/amqp_jms_SUITE.erl +++ b/deps/rabbit/test/amqp_jms_SUITE.erl @@ -14,6 +14,10 @@ -compile(nowarn_export_all). -compile(export_all). +-import(rabbit_ct_broker_helpers, + [rpc/4]). +-import(rabbit_ct_helpers, + [eventually/3]). -import(amqp_utils, [init/1, close/1, @@ -30,8 +34,15 @@ all() -> groups() -> [{cluster_size_1, [shuffle], [ + %% CT test case per Java class + jms_connection, + jms_temporary_queue, + + %% CT test case per test in Java class JmsTest message_types_jms_to_jms, - message_types_jms_to_amqp + message_types_jms_to_amqp, + temporary_queue_rpc, + temporary_queue_delete ] }]. @@ -54,7 +65,9 @@ end_per_suite(Config) -> init_per_group(cluster_size_1, Config) -> Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"), - Config1 = rabbit_ct_helpers:set_config(Config, {rmq_nodename_suffix, Suffix}), + Config1 = rabbit_ct_helpers:set_config( + Config, + {rmq_nodename_suffix, Suffix}), Config2 = rabbit_ct_helpers:merge_app_env( Config1, {rabbit, @@ -82,6 +95,9 @@ init_per_testcase(Testcase, Config) -> rabbit_ct_helpers:testcase_started(Config, Testcase). end_per_testcase(Testcase, Config) -> + %% Assert that every testcase cleaned up. + eventually(?_assertEqual([], rpc(Config, rabbit_amqqueue, list, [])), 1000, 5), + eventually(?_assertEqual([], rpc(Config, rabbit_amqp_session, list_local, [])), 1000, 5), rabbit_ct_helpers:testcase_finished(Config, Testcase). build_maven_test_project(Config) -> @@ -98,11 +114,17 @@ build_maven_test_project(Config) -> %% Testcases. %% ------------------------------------------------------------------- +jms_connection(Config) -> + ok = run(?FUNCTION_NAME, [{"-Dtest=~s", [<<"JmsConnectionTest">>]}], Config). + +jms_temporary_queue(Config) -> + ok = run(?FUNCTION_NAME, [{"-Dtest=~s", [<<"JmsTemporaryQueueTest">>]}], Config). + %% Send different message types from JMS client to JMS client. message_types_jms_to_jms(Config) -> TestName = QName = atom_to_binary(?FUNCTION_NAME), ok = declare_queue(QName, <<"quorum">>, Config), - ok = run(TestName, [{"-Dqueue=~ts", [rabbitmq_amqp_address:queue(QName)]}], Config), + ok = run_jms_test(TestName, [{"-Dqueue=~ts", [rabbitmq_amqp_address:queue(QName)]}], Config), ok = delete_queue(QName, Config). %% Send different message types from JMS client to Erlang AMQP 1.0 client. @@ -112,7 +134,7 @@ message_types_jms_to_amqp(Config) -> Address = rabbitmq_amqp_address:queue(QName), %% The JMS client sends messaegs. - ok = run(TestName, [{"-Dqueue=~ts", [Address]}], Config), + ok = run_jms_test(TestName, [{"-Dqueue=~ts", [Address]}], Config), %% The Erlang AMQP 1.0 client receives messages. OpnConf = connection_config(Config), @@ -120,6 +142,7 @@ message_types_jms_to_amqp(Config) -> {ok, Session} = amqp10_client:begin_session_sync(Connection), {ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, Address, settled), {ok, Msg1} = amqp10_client:get_msg(Receiver), + ?assertEqual( #'v1_0.amqp_value'{content = {utf8, <<"msg1🥕"/utf8>>}}, amqp10_msg:body(Msg1)), @@ -149,16 +172,31 @@ message_types_jms_to_amqp(Config) -> ok = close_connection_sync(Connection), ok = delete_queue(QName, Config). +temporary_queue_rpc(Config) -> + TestName = QName = atom_to_binary(?FUNCTION_NAME), + ok = declare_queue(QName, <<"classic">>, Config), + ok = run_jms_test(TestName, [{"-Dqueue=~ts", [rabbitmq_amqp_address:queue(QName)]}], Config), + ok = delete_queue(QName, Config). + +temporary_queue_delete(Config) -> + TestName = atom_to_binary(?FUNCTION_NAME), + ok = run_jms_test(TestName, [], Config). + %% ------------------------------------------------------------------- %% Helpers %% ------------------------------------------------------------------- +run_jms_test(TestName, JavaProps, Config) -> + run(TestName, [{"-Dtest=JmsTest#~ts", [TestName]} | JavaProps], Config). + run(TestName, JavaProps, Config) -> TestProjectDir = ?config(data_dir, Config), + Cmd = [filename:join([TestProjectDir, "mvnw"]), "test", - {"-Dtest=JmsTest#~ts", [TestName]}, - {"-Drmq_broker_uri=~ts", [rabbit_ct_broker_helpers:node_uri(Config, 0)]} + {"-Drmq_broker_uri=~ts", [rabbit_ct_broker_helpers:node_uri(Config, 0)]}, + {"-Dnodename=~ts", [rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename)]}, + {"-Drabbitmqctl.bin=~ts", [rabbit_ct_helpers:get_config(Config, rabbitmqctl_cmd)]} ] ++ JavaProps, case rabbit_ct_helpers:exec(Cmd, [{cd, TestProjectDir}]) of {ok, _Stdout_} -> diff --git a/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/Cli.java b/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/Cli.java new file mode 100644 index 0000000000..2dc08413ea --- /dev/null +++ b/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/Cli.java @@ -0,0 +1,163 @@ +// The contents of this file are subject to the Mozilla Public License +// Version 2.0 (the "License"); you may not use this file except in +// compliance with the License. You may obtain a copy of the License +// at https://www.mozilla.org/en-US/MPL/2.0/ +// +// Software distributed under the License is distributed on an "AS IS" +// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +// the License for the specific language governing rights and +// limitations under the License. +// +// The Original Code is RabbitMQ. +// +// The Initial Developer of the Original Code is Pivotal Software, Inc. +// Copyright (c) 2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +// +package com.rabbitmq.amqp.tests.jms; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.InetAddress; +import java.net.UnknownHostException; + +final class Cli { + + private Cli() {} + + static void startBroker() { + rabbitmqctl("start_app"); + } + + static void stopBroker() { + rabbitmqctl("stop_app"); + } + + private static ProcessState rabbitmqctl(String command) { + return rabbitmqctl(command, nodename()); + } + + private static ProcessState rabbitmqctl(String command, String nodename) { + return executeCommand(rabbitmqctlCommand() + " -n '" + nodename + "'" + " " + command); + } + + private static String rabbitmqctlCommand() { + return System.getProperty("rabbitmqctl.bin"); + } + + public static String nodename() { + return System.getProperty("nodename", "rabbit@" + hostname()); + } + + public static String hostname() { + try { + return InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + try { + return executeCommand("hostname").output(); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + } + + private static ProcessState executeCommand(String command) { + return executeCommand(command, false); + } + + private static ProcessState executeCommand(String command, boolean ignoreError) { + Process pr = executeCommandProcess(command); + InputStreamPumpState inputState = new InputStreamPumpState(pr.getInputStream()); + InputStreamPumpState errorState = new InputStreamPumpState(pr.getErrorStream()); + + int ev = waitForExitValue(pr, inputState, errorState); + inputState.pump(); + errorState.pump(); + if (ev != 0 && !ignoreError) { + throw new RuntimeException( + "unexpected command exit value: " + + ev + + "\ncommand: " + + command + + "\n" + + "\nstdout:\n" + + inputState.buffer.toString() + + "\nstderr:\n" + + errorState.buffer.toString() + + "\n"); + } + return new ProcessState(inputState); + } + + private static int waitForExitValue( + Process pr, InputStreamPumpState inputState, InputStreamPumpState errorState) { + while (true) { + try { + inputState.pump(); + errorState.pump(); + pr.waitFor(); + break; + } catch (InterruptedException ignored) { + } + } + return pr.exitValue(); + } + + private static Process executeCommandProcess(String command) { + String[] finalCommand; + if (System.getProperty("os.name").toLowerCase().contains("windows")) { + finalCommand = new String[4]; + finalCommand[0] = "C:\\winnt\\system32\\cmd.exe"; + finalCommand[1] = "/y"; + finalCommand[2] = "/c"; + finalCommand[3] = command; + } else { + finalCommand = new String[3]; + finalCommand[0] = "/bin/sh"; + finalCommand[1] = "-c"; + finalCommand[2] = command; + } + try { + return Runtime.getRuntime().exec(finalCommand); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + static class ProcessState { + + private final InputStreamPumpState inputState; + + ProcessState(InputStreamPumpState inputState) { + this.inputState = inputState; + } + + String output() { + return inputState.buffer.toString(); + } + } + + private static class InputStreamPumpState { + + private final BufferedReader reader; + private final StringBuilder buffer; + + private InputStreamPumpState(InputStream in) { + this.reader = new BufferedReader(new InputStreamReader(in)); + this.buffer = new StringBuilder(); + } + + void pump() { + String line; + while (true) { + try { + if ((line = reader.readLine()) == null) break; + } catch (IOException e) { + throw new RuntimeException(e); + } + buffer.append(line).append("\n"); + } + } + } +} diff --git a/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsConnectionTest.java b/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsConnectionTest.java new file mode 100644 index 0000000000..210f28c043 --- /dev/null +++ b/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsConnectionTest.java @@ -0,0 +1,199 @@ +// The contents of this file are subject to the Mozilla Public License +// Version 2.0 (the "License"); you may not use this file except in +// compliance with the License. You may obtain a copy of the License +// at https://www.mozilla.org/en-US/MPL/2.0/ +// +// Software distributed under the License is distributed on an "AS IS" +// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +// the License for the specific language governing rights and +// limitations under the License. +// +// The Original Code is RabbitMQ. +// +// The Initial Developer of the Original Code is Pivotal Software, Inc. +// Copyright (c) 2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +// + +package com.rabbitmq.amqp.tests.jms; + +import static com.rabbitmq.amqp.tests.jms.Cli.startBroker; +import static com.rabbitmq.amqp.tests.jms.Cli.stopBroker; +import static com.rabbitmq.amqp.tests.jms.TestUtils.*; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +import jakarta.jms.*; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.apache.qpid.jms.JmsConnection; +import org.apache.qpid.jms.JmsConnectionFactory; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.Timeout; + +/** + * Based on https://github.com/apache/qpid-jms/tree/main/qpid-jms-interop-tests/qpid-jms-activemq-tests. + */ +public class JmsConnectionTest { + + @Test + @Timeout(30) + public void testCreateConnection() throws Exception { + JmsConnectionFactory factory = new JmsConnectionFactory(brokerUri()); + try (Connection connection = factory.createConnection()) { + assertNotNull(connection); + } + } + + @Test + @Timeout(30) + public void testCreateConnectionAndStart() throws Exception { + JmsConnectionFactory factory = new JmsConnectionFactory(brokerUri()); + try (Connection connection = factory.createConnection()) { + assertNotNull(connection); + connection.start(); + } + } + + @Test + @Timeout(30) + // Currently not supported by RabbitMQ. + @Disabled + public void testCreateWithDuplicateClientIdFails() throws Exception { + JmsConnectionFactory factory = new JmsConnectionFactory(brokerUri()); + JmsConnection connection1 = (JmsConnection) factory.createConnection(); + connection1.setClientID("Test"); + assertNotNull(connection1); + connection1.start(); + JmsConnection connection2 = (JmsConnection) factory.createConnection(); + try { + connection2.setClientID("Test"); + fail("should have thrown a JMSException"); + } catch (InvalidClientIDException ex) { + // OK + } catch (Exception unexpected) { + fail("Wrong exception type thrown: " + unexpected); + } + + connection1.close(); + connection2.close(); + } + + @Test + public void testSetClientIdAfterStartedFails() { + assertThrows( + JMSException.class, + () -> { + JmsConnectionFactory factory = new JmsConnectionFactory(brokerUri()); + try (Connection connection = factory.createConnection()) { + connection.setClientID("Test"); + connection.start(); + connection.setClientID("NewTest"); + } + }); + } + + @Test + @Timeout(30) + public void testCreateConnectionAsSystemAdmin() throws Exception { + JmsConnectionFactory factory = new JmsConnectionFactory(brokerUri()); + factory.setUsername(adminUsername()); + factory.setPassword(adminPassword()); + try (Connection connection = factory.createConnection()) { + assertNotNull(connection); + connection.start(); + } + } + + @Test + @Timeout(30) + public void testCreateConnectionCallSystemAdmin() throws Exception { + JmsConnectionFactory factory = new JmsConnectionFactory(brokerUri()); + try (Connection connection = factory.createConnection(adminUsername(), adminPassword())) { + assertNotNull(connection); + connection.start(); + } + } + + @Test + @Timeout(30) + public void testCreateConnectionAsUnknwonUser() { + assertThrows( + JMSSecurityException.class, + () -> { + JmsConnectionFactory factory = new JmsConnectionFactory(TestUtils.brokerUri()); + factory.setUsername("unknown"); + factory.setPassword("unknown"); + try (Connection connection = factory.createConnection()) { + assertNotNull(connection); + connection.start(); + } + }); + } + + @Test + @Timeout(30) + public void testCreateConnectionCallUnknwonUser() { + assertThrows( + JMSSecurityException.class, + () -> { + JmsConnectionFactory factory = new JmsConnectionFactory(brokerUri()); + try (Connection connection = factory.createConnection("unknown", "unknown")) { + assertNotNull(connection); + connection.start(); + } + }); + } + + @Test + @Timeout(30) + public void testBrokerStopWontHangConnectionClose(TestInfo info) throws Exception { + Connection connection = new JmsConnectionFactory(brokerUri()).createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // TODO use a "regular" queue + TemporaryQueue queue = session.createTemporaryQueue(); + // String destinationName = name(info); + // Queue queue = session.createQueue("/queues/" + destinationName); + connection.start(); + + MessageProducer producer = session.createProducer(queue); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + + Message m = session.createTextMessage("Sample text"); + producer.send(m); + + try { + stopBroker(); + try { + connection.close(); + } catch (Exception ex) { + fail("Should not have thrown an exception."); + } + } finally { + startBroker(); + } + } + + @Test + @Timeout(60) + public void testConnectionExceptionBrokerStop() throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + try (Connection connection = new JmsConnectionFactory(brokerUri()).createConnection()) { + connection.setExceptionListener(exception -> latch.countDown()); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + assertNotNull(session); + + try { + stopBroker(); + assertTrue(latch.await(10, TimeUnit.SECONDS)); + } finally { + startBroker(); + } + } + } +} diff --git a/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsTemporaryQueueTest.java b/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsTemporaryQueueTest.java new file mode 100644 index 0000000000..3da83a9066 --- /dev/null +++ b/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsTemporaryQueueTest.java @@ -0,0 +1,135 @@ +// The contents of this file are subject to the Mozilla Public License +// Version 2.0 (the "License"); you may not use this file except in +// compliance with the License. You may obtain a copy of the License +// at https://www.mozilla.org/en-US/MPL/2.0/ +// +// Software distributed under the License is distributed on an "AS IS" +// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +// the License for the specific language governing rights and +// limitations under the License. +// +// The Original Code is RabbitMQ. +// +// The Initial Developer of the Original Code is Pivotal Software, Inc. +// Copyright (c) 2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +// + +package com.rabbitmq.amqp.tests.jms; + +import static com.rabbitmq.amqp.tests.jms.TestUtils.brokerUri; +import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assertions.fail; + +import jakarta.jms.*; +import jakarta.jms.IllegalStateException; +import java.util.UUID; +import org.apache.qpid.jms.JmsConnectionFactory; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +/** + * Based on https://github.com/apache/qpid-jms/tree/main/qpid-jms-interop-tests/qpid-jms-activemq-tests. + */ +public class JmsTemporaryQueueTest { + + Connection connection; + + @AfterEach + void tearDown() throws JMSException { + connection.close(); + } + + @Test + @Timeout(60) + public void testCreatePublishConsumeTemporaryQueue() throws Exception { + connection = new JmsConnectionFactory(brokerUri()).createConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + assertNotNull(session); + TemporaryQueue queue = session.createTemporaryQueue(); + MessageConsumer consumer = session.createConsumer(queue); + + MessageProducer producer = session.createProducer(queue); + String body = UUID.randomUUID().toString(); + producer.send(session.createTextMessage(body)); + assertEquals(body, consumer.receive(60_000).getBody(String.class)); + } + + @Test + @Timeout(60) + public void testCantConsumeFromTemporaryQueueCreatedOnAnotherConnection() throws Exception { + connection = new JmsConnectionFactory(brokerUri()).createConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + TemporaryQueue tempQueue = session.createTemporaryQueue(); + session.createConsumer(tempQueue); + + Connection connection2 = new JmsConnectionFactory(brokerUri()).createConnection(); + try { + Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE); + try { + session2.createConsumer(tempQueue); + fail("should not be able to consumer from temporary queue from another connection"); + } catch (InvalidDestinationException ide) { + // expected + } + } finally { + connection2.close(); + } + } + + @Test + @Timeout(60) + public void testCantSendToTemporaryQueueFromClosedConnection() throws Exception { + connection = new JmsConnectionFactory(brokerUri()).createConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + TemporaryQueue tempQueue = session.createTemporaryQueue(); + + Connection connection2 = new JmsConnectionFactory(brokerUri()).createConnection(); + try { + Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE); + Message msg = session2.createMessage(); + MessageProducer producer = session2.createProducer(tempQueue); + + // Close the original connection + connection.close(); + + try { + producer.send(msg); + fail("should not be able to send to temporary queue from closed connection"); + } catch (jakarta.jms.IllegalStateException ide) { + // expected + } + } finally { + connection2.close(); + } + } + + @Test + @Timeout(60) + public void testCantDeleteTemporaryQueueWithConsumers() throws Exception { + connection = new JmsConnectionFactory(brokerUri()).createConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + TemporaryQueue tempQueue = session.createTemporaryQueue(); + MessageConsumer consumer = session.createConsumer(tempQueue); + + try { + tempQueue.delete(); + fail("should not be able to delete temporary queue with active consumers"); + } catch (IllegalStateException ide) { + // expected + } + + consumer.close(); + + // Now it should be allowed + tempQueue.delete(); + } +} diff --git a/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsTest.java b/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsTest.java index f5c5bffba2..23b66512fa 100644 --- a/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsTest.java +++ b/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsTest.java @@ -1,7 +1,6 @@ package com.rabbitmq.amqp.tests.jms; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.*; import jakarta.jms.*; import java.util.*; @@ -104,8 +103,6 @@ public class JmsTest { Session session = connection.createSession(); Destination queue = (Destination) context.lookup("myQueue"); MessageProducer producer = session.createProducer(queue); - MessageConsumer consumer = session.createConsumer(queue); - connection.start(); // TextMessage String msg1 = "msg1🥕"; @@ -128,5 +125,57 @@ public class JmsTest { streamMessage.writeLong(-1L); producer.send(streamMessage); } + + } + + // Test that Request/reply pattern using a TemporaryQueue works. + // https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#requestreply-pattern-using-a-temporaryqueue-jakarta-ee + @Test + public void temporary_queue_rpc() throws Exception { + Context context = getContext(); + ConnectionFactory factory = (ConnectionFactory) context.lookup("myConnection"); + + try (JMSContext clientContext = factory.createContext()) { + Destination responseQueue = clientContext.createTemporaryQueue(); + JMSConsumer clientConsumer = clientContext.createConsumer(responseQueue); + + Destination requestQueue = (Destination) context.lookup("myQueue"); + TextMessage clientRequestMessage = clientContext.createTextMessage("hello"); + clientContext.createProducer(). + setJMSReplyTo(responseQueue). + send(requestQueue, clientRequestMessage); + + // Let's open a new connection to simulate the RPC server. + try (JMSContext serverContext = factory.createContext()) { + JMSConsumer serverConsumer = serverContext.createConsumer(requestQueue); + TextMessage serverRequestMessage = (TextMessage) serverConsumer.receive(5000); + + TextMessage serverResponseMessage = serverContext.createTextMessage( + serverRequestMessage.getText().toUpperCase()); + serverContext.createProducer(). + send(serverRequestMessage.getJMSReplyTo(), serverResponseMessage); + } + + TextMessage clientResponseMessage = (TextMessage) clientConsumer.receive(5000); + assertEquals("HELLO", clientResponseMessage.getText()); + } + } + + // Test that a temporary queue can be deleted. + @Test + public void temporary_queue_delete() throws Exception { + Context context = getContext(); + ConnectionFactory factory = (ConnectionFactory) context.lookup("myConnection"); + + try (JMSContext clientContext = factory.createContext()) { + TemporaryQueue queue = clientContext.createTemporaryQueue(); + queue.delete(); + try { + clientContext.createProducer().send(queue, "hello"); + fail("should not be able to create producer for deleted temporary queue"); + } catch (IllegalStateRuntimeException expectedException) { + assertEquals("Temporary destination has been deleted", expectedException.getMessage()); + } + } } } diff --git a/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/TestUtils.java b/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/TestUtils.java new file mode 100644 index 0000000000..d53a6bd26f --- /dev/null +++ b/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/TestUtils.java @@ -0,0 +1,66 @@ +// The contents of this file are subject to the Mozilla Public License +// Version 2.0 (the "License"); you may not use this file except in +// compliance with the License. You may obtain a copy of the License +// at https://www.mozilla.org/en-US/MPL/2.0/ +// +// Software distributed under the License is distributed on an "AS IS" +// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +// the License for the specific language governing rights and +// limitations under the License. +// +// The Original Code is RabbitMQ. +// +// The Initial Developer of the Original Code is Pivotal Software, Inc. +// Copyright (c) 2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +// + +package com.rabbitmq.amqp.tests.jms; + +import static java.lang.String.format; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; +import java.lang.reflect.Method; +import java.util.UUID; + +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.condition.EnabledIfSystemProperty; + +final class TestUtils { + + private static final String DEFAULT_BROKER_URI = "amqp://localhost:5672"; + + private TestUtils() { } + + static String brokerUri() { + String uri = System.getProperty("rmq_broker_uri", "amqp://localhost:5672"); + return uri == null || uri.isEmpty() ? DEFAULT_BROKER_URI : uri; + } + + static String adminUsername() { + return "guest"; + } + + static String adminPassword() { + return "guest"; + } + + static String name(TestInfo info) { + return name(info.getTestClass().get(), info.getTestMethod().get()); + } + + + private static String name(Class testClass, Method testMethod) { + return name(testClass, testMethod.getName()); + } + + private static String name(Class testClass, String testMethod) { + String uuid = UUID.randomUUID().toString(); + return format( + "%s_%s%s", testClass.getSimpleName(), testMethod, uuid.substring(uuid.length() / 2)); + } + +} From fd350386a9b298866e9e336a86853d8eb3d2654c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Mon, 10 Feb 2025 11:45:58 +0100 Subject: [PATCH 10/15] Add helpers for JMS tests --- deps/rabbit/test/amqp_jms_SUITE_data/pom.xml | 20 ++++- .../amqp/tests/jms/JmsConnectionTest.java | 43 +++++----- .../amqp/tests/jms/JmsTemporaryQueueTest.java | 17 ++-- .../amqp/tests/jms/JmsTestInfrastructure.java | 26 ++++++ .../jms/JmsTestInfrastructureExtension.java | 83 +++++++++++++++++++ .../rabbitmq/amqp/tests/jms/TestUtils.java | 36 +++++--- 6 files changed, 185 insertions(+), 40 deletions(-) create mode 100644 deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsTestInfrastructure.java create mode 100644 deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsTestInfrastructureExtension.java diff --git a/deps/rabbit/test/amqp_jms_SUITE_data/pom.xml b/deps/rabbit/test/amqp_jms_SUITE_data/pom.xml index cce3ecb58f..ff312c90a8 100644 --- a/deps/rabbit/test/amqp_jms_SUITE_data/pom.xml +++ b/deps/rabbit/test/amqp_jms_SUITE_data/pom.xml @@ -10,6 +10,7 @@ 5.10.2 2.6.1 + [0.5.0-SNAPSHOT,) 1.2.13 2.43.0 1.25.2 @@ -30,13 +31,18 @@ ${qpid-jms-client.version} test - ch.qos.logback logback-classic ${logback.version} test + + com.rabbitmq.client + amqp-client + ${amqp-client.version} + test + @@ -81,4 +87,16 @@ + + + + + ossrh + https://oss.sonatype.org/content/repositories/snapshots + true + false + + + + diff --git a/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsConnectionTest.java b/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsConnectionTest.java index 210f28c043..d526cbbee4 100644 --- a/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsConnectionTest.java +++ b/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsConnectionTest.java @@ -11,7 +11,8 @@ // The Original Code is RabbitMQ. // // The Initial Developer of the Original Code is Pivotal Software, Inc. -// Copyright (c) 2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +// Copyright (c) 2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. +// and/or its subsidiaries. All rights reserved. // package com.rabbitmq.amqp.tests.jms; @@ -31,19 +32,21 @@ import org.apache.qpid.jms.JmsConnection; import org.apache.qpid.jms.JmsConnectionFactory; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.Timeout; /** - * Based on https://github.com/apache/qpid-jms/tree/main/qpid-jms-interop-tests/qpid-jms-activemq-tests. + * Based on + * https://github.com/apache/qpid-jms/tree/main/qpid-jms-interop-tests/qpid-jms-activemq-tests. */ +@JmsTestInfrastructure public class JmsConnectionTest { + String destination; + @Test @Timeout(30) public void testCreateConnection() throws Exception { - JmsConnectionFactory factory = new JmsConnectionFactory(brokerUri()); - try (Connection connection = factory.createConnection()) { + try (Connection connection = connection()) { assertNotNull(connection); } } @@ -51,8 +54,7 @@ public class JmsConnectionTest { @Test @Timeout(30) public void testCreateConnectionAndStart() throws Exception { - JmsConnectionFactory factory = new JmsConnectionFactory(brokerUri()); - try (Connection connection = factory.createConnection()) { + try (Connection connection = connection()) { assertNotNull(connection); connection.start(); } @@ -63,7 +65,7 @@ public class JmsConnectionTest { // Currently not supported by RabbitMQ. @Disabled public void testCreateWithDuplicateClientIdFails() throws Exception { - JmsConnectionFactory factory = new JmsConnectionFactory(brokerUri()); + JmsConnectionFactory factory = (JmsConnectionFactory) connectionFactory(); JmsConnection connection1 = (JmsConnection) factory.createConnection(); connection1.setClientID("Test"); assertNotNull(connection1); @@ -87,8 +89,7 @@ public class JmsConnectionTest { assertThrows( JMSException.class, () -> { - JmsConnectionFactory factory = new JmsConnectionFactory(brokerUri()); - try (Connection connection = factory.createConnection()) { + try (Connection connection = connection()) { connection.setClientID("Test"); connection.start(); connection.setClientID("NewTest"); @@ -99,7 +100,7 @@ public class JmsConnectionTest { @Test @Timeout(30) public void testCreateConnectionAsSystemAdmin() throws Exception { - JmsConnectionFactory factory = new JmsConnectionFactory(brokerUri()); + JmsConnectionFactory factory = (JmsConnectionFactory) connectionFactory(); factory.setUsername(adminUsername()); factory.setPassword(adminPassword()); try (Connection connection = factory.createConnection()) { @@ -111,8 +112,8 @@ public class JmsConnectionTest { @Test @Timeout(30) public void testCreateConnectionCallSystemAdmin() throws Exception { - JmsConnectionFactory factory = new JmsConnectionFactory(brokerUri()); - try (Connection connection = factory.createConnection(adminUsername(), adminPassword())) { + try (Connection connection = + connectionFactory().createConnection(adminUsername(), adminPassword())) { assertNotNull(connection); connection.start(); } @@ -124,7 +125,7 @@ public class JmsConnectionTest { assertThrows( JMSSecurityException.class, () -> { - JmsConnectionFactory factory = new JmsConnectionFactory(TestUtils.brokerUri()); + JmsConnectionFactory factory = (JmsConnectionFactory) connectionFactory(); factory.setUsername("unknown"); factory.setPassword("unknown"); try (Connection connection = factory.createConnection()) { @@ -140,8 +141,7 @@ public class JmsConnectionTest { assertThrows( JMSSecurityException.class, () -> { - JmsConnectionFactory factory = new JmsConnectionFactory(brokerUri()); - try (Connection connection = factory.createConnection("unknown", "unknown")) { + try (Connection connection = connectionFactory().createConnection("unknown", "unknown")) { assertNotNull(connection); connection.start(); } @@ -150,14 +150,11 @@ public class JmsConnectionTest { @Test @Timeout(30) - public void testBrokerStopWontHangConnectionClose(TestInfo info) throws Exception { - Connection connection = new JmsConnectionFactory(brokerUri()).createConnection(); + public void testBrokerStopWontHangConnectionClose() throws Exception { + Connection connection = connection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - // TODO use a "regular" queue - TemporaryQueue queue = session.createTemporaryQueue(); - // String destinationName = name(info); - // Queue queue = session.createQueue("/queues/" + destinationName); + Queue queue = queue(destination); connection.start(); MessageProducer producer = session.createProducer(queue); @@ -182,7 +179,7 @@ public class JmsConnectionTest { @Timeout(60) public void testConnectionExceptionBrokerStop() throws Exception { final CountDownLatch latch = new CountDownLatch(1); - try (Connection connection = new JmsConnectionFactory(brokerUri()).createConnection()) { + try (Connection connection = connection()) { connection.setExceptionListener(exception -> latch.countDown()); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); diff --git a/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsTemporaryQueueTest.java b/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsTemporaryQueueTest.java index 3da83a9066..ae60fa4b8a 100644 --- a/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsTemporaryQueueTest.java +++ b/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsTemporaryQueueTest.java @@ -11,12 +11,14 @@ // The Original Code is RabbitMQ. // // The Initial Developer of the Original Code is Pivotal Software, Inc. -// Copyright (c) 2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +// Copyright (c) 2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. +// and/or its subsidiaries. All rights reserved. // package com.rabbitmq.amqp.tests.jms; import static com.rabbitmq.amqp.tests.jms.TestUtils.brokerUri; +import static com.rabbitmq.amqp.tests.jms.TestUtils.connection; import static org.junit.jupiter.api.Assertions.*; import static org.junit.jupiter.api.Assertions.fail; @@ -25,16 +27,23 @@ import jakarta.jms.IllegalStateException; import java.util.UUID; import org.apache.qpid.jms.JmsConnectionFactory; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; /** - * Based on https://github.com/apache/qpid-jms/tree/main/qpid-jms-interop-tests/qpid-jms-activemq-tests. + * Based on + * https://github.com/apache/qpid-jms/tree/main/qpid-jms-interop-tests/qpid-jms-activemq-tests. */ public class JmsTemporaryQueueTest { Connection connection; + @BeforeEach + void init() throws JMSException { + connection = connection(); + } + @AfterEach void tearDown() throws JMSException { connection.close(); @@ -43,7 +52,6 @@ public class JmsTemporaryQueueTest { @Test @Timeout(60) public void testCreatePublishConsumeTemporaryQueue() throws Exception { - connection = new JmsConnectionFactory(brokerUri()).createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -60,7 +68,6 @@ public class JmsTemporaryQueueTest { @Test @Timeout(60) public void testCantConsumeFromTemporaryQueueCreatedOnAnotherConnection() throws Exception { - connection = new JmsConnectionFactory(brokerUri()).createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -84,7 +91,6 @@ public class JmsTemporaryQueueTest { @Test @Timeout(60) public void testCantSendToTemporaryQueueFromClosedConnection() throws Exception { - connection = new JmsConnectionFactory(brokerUri()).createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -113,7 +119,6 @@ public class JmsTemporaryQueueTest { @Test @Timeout(60) public void testCantDeleteTemporaryQueueWithConsumers() throws Exception { - connection = new JmsConnectionFactory(brokerUri()).createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); diff --git a/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsTestInfrastructure.java b/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsTestInfrastructure.java new file mode 100644 index 0000000000..0fbb689eb8 --- /dev/null +++ b/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsTestInfrastructure.java @@ -0,0 +1,26 @@ +// The contents of this file are subject to the Mozilla Public License +// Version 2.0 (the "License"); you may not use this file except in +// compliance with the License. You may obtain a copy of the License +// at https://www.mozilla.org/en-US/MPL/2.0/ +// +// Software distributed under the License is distributed on an "AS IS" +// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +// the License for the specific language governing rights and +// limitations under the License. +// +// The Original Code is RabbitMQ. +// +// The Initial Developer of the Original Code is Pivotal Software, Inc. +// Copyright (c) 2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. +// and/or its subsidiaries. All rights reserved. +// +package com.rabbitmq.amqp.tests.jms; + +import java.lang.annotation.*; +import org.junit.jupiter.api.extension.ExtendWith; + +@Target(ElementType.TYPE) +@Retention(RetentionPolicy.RUNTIME) +@Documented +@ExtendWith(JmsTestInfrastructureExtension.class) +public @interface JmsTestInfrastructure {} diff --git a/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsTestInfrastructureExtension.java b/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsTestInfrastructureExtension.java new file mode 100644 index 0000000000..2254b00ab2 --- /dev/null +++ b/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsTestInfrastructureExtension.java @@ -0,0 +1,83 @@ +// The contents of this file are subject to the Mozilla Public License +// Version 2.0 (the "License"); you may not use this file except in +// compliance with the License. You may obtain a copy of the License +// at https://www.mozilla.org/en-US/MPL/2.0/ +// +// Software distributed under the License is distributed on an "AS IS" +// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +// the License for the specific language governing rights and +// limitations under the License. +// +// The Original Code is RabbitMQ. +// +// The Initial Developer of the Original Code is Pivotal Software, Inc. +// Copyright (c) 2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +// +package com.rabbitmq.amqp.tests.jms; + + +import com.rabbitmq.client.amqp.Connection; +import com.rabbitmq.client.amqp.Environment; +import com.rabbitmq.client.amqp.impl.AmqpEnvironmentBuilder; +import java.lang.reflect.Field; +import org.junit.jupiter.api.extension.*; + +final class JmsTestInfrastructureExtension + implements BeforeAllCallback, AfterAllCallback, BeforeEachCallback, AfterEachCallback { + + private static final ExtensionContext.Namespace NAMESPACE = + ExtensionContext.Namespace.create(JmsTestInfrastructureExtension.class); + + private static ExtensionContext.Store store(ExtensionContext extensionContext) { + return extensionContext.getRoot().getStore(NAMESPACE); + } + + private static Field field(Class cls, String name) { + Field field = null; + while (field == null && cls != null) { + try { + field = cls.getDeclaredField(name); + } catch (NoSuchFieldException e) { + cls = cls.getSuperclass(); + } + } + return field; + } + + @Override + public void beforeAll(ExtensionContext context) { + + } + + @Override + public void beforeEach(ExtensionContext context) throws Exception { + Field field = field(context.getTestInstance().get().getClass(), "destination"); + if (field != null) { + field.setAccessible(true); + String destination = TestUtils.name(context); + field.set(context.getTestInstance().get(), destination); + try (Environment environment = new AmqpEnvironmentBuilder().build(); + Connection connection = environment.connectionBuilder().uri(TestUtils.brokerUri()).build()) { + connection.management().queue(destination).declare(); + } + } + } + + @Override + public void afterEach(ExtensionContext context) throws Exception { + Field field = field(context.getTestInstance().get().getClass(), "destination"); + if (field != null) { + field.setAccessible(true); + String destination = (String) field.get(context.getTestInstance().get()); + try (Environment environment = new AmqpEnvironmentBuilder().build(); + Connection connection = environment.connectionBuilder().uri(TestUtils.brokerUri()).build()) { + connection.management().queueDelete(destination); + } + } + } + + @Override + public void afterAll(ExtensionContext context) { + + } +} diff --git a/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/TestUtils.java b/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/TestUtils.java index d53a6bd26f..192babb84d 100644 --- a/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/TestUtils.java +++ b/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/TestUtils.java @@ -11,29 +11,30 @@ // The Original Code is RabbitMQ. // // The Initial Developer of the Original Code is Pivotal Software, Inc. -// Copyright (c) 2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +// Copyright (c) 2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. +// and/or its subsidiaries. All rights reserved. // package com.rabbitmq.amqp.tests.jms; import static java.lang.String.format; -import java.lang.annotation.ElementType; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; +import jakarta.jms.Connection; +import jakarta.jms.ConnectionFactory; +import jakarta.jms.JMSException; +import jakarta.jms.Queue; import java.lang.reflect.Method; import java.util.UUID; - -import org.junit.jupiter.api.Tag; +import org.apache.qpid.jms.JmsConnectionFactory; +import org.apache.qpid.jms.JmsQueue; import org.junit.jupiter.api.TestInfo; -import org.junit.jupiter.api.condition.EnabledIfSystemProperty; +import org.junit.jupiter.api.extension.ExtensionContext; final class TestUtils { private static final String DEFAULT_BROKER_URI = "amqp://localhost:5672"; - private TestUtils() { } + private TestUtils() {} static String brokerUri() { String uri = System.getProperty("rmq_broker_uri", "amqp://localhost:5672"); @@ -48,10 +49,26 @@ final class TestUtils { return "guest"; } + static ConnectionFactory connectionFactory() { + return new JmsConnectionFactory(brokerUri()); + } + + static Connection connection() throws JMSException { + return connectionFactory().createConnection(); + } + + static Queue queue(String name) { + // no path encoding, use names with e.g. ASCII characters only + return new JmsQueue("/queues/" + name); + } + static String name(TestInfo info) { return name(info.getTestClass().get(), info.getTestMethod().get()); } + static String name(ExtensionContext context) { + return name(context.getTestInstance().get().getClass(), context.getTestMethod().get()); + } private static String name(Class testClass, Method testMethod) { return name(testClass, testMethod.getName()); @@ -62,5 +79,4 @@ final class TestUtils { return format( "%s_%s%s", testClass.getSimpleName(), testMethod, uuid.substring(uuid.length() / 2)); } - } From 4ec2b755eec918e599373be13cd73956298bea5c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Tue, 11 Feb 2025 15:47:01 +0100 Subject: [PATCH 11/15] Use ProtonJ2 in JMS-to-AMQP interop test --- deps/rabbit/test/amqp_jms_SUITE.erl | 44 +----------------- deps/rabbit/test/amqp_jms_SUITE_data/pom.xml | 7 +++ .../com/rabbitmq/amqp/tests/jms/JmsTest.java | 46 ++++++++++++++++--- .../rabbitmq/amqp/tests/jms/TestUtils.java | 37 +++++++++++++++ 4 files changed, 85 insertions(+), 49 deletions(-) diff --git a/deps/rabbit/test/amqp_jms_SUITE.erl b/deps/rabbit/test/amqp_jms_SUITE.erl index baad72b014..7a5462eda3 100644 --- a/deps/rabbit/test/amqp_jms_SUITE.erl +++ b/deps/rabbit/test/amqp_jms_SUITE.erl @@ -129,48 +129,8 @@ message_types_jms_to_jms(Config) -> %% Send different message types from JMS client to Erlang AMQP 1.0 client. message_types_jms_to_amqp(Config) -> - TestName = QName = atom_to_binary(?FUNCTION_NAME), - ok = declare_queue(QName, <<"quorum">>, Config), - Address = rabbitmq_amqp_address:queue(QName), - - %% The JMS client sends messaegs. - ok = run_jms_test(TestName, [{"-Dqueue=~ts", [Address]}], Config), - - %% The Erlang AMQP 1.0 client receives messages. - OpnConf = connection_config(Config), - {ok, Connection} = amqp10_client:open_connection(OpnConf), - {ok, Session} = amqp10_client:begin_session_sync(Connection), - {ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, Address, settled), - {ok, Msg1} = amqp10_client:get_msg(Receiver), - - ?assertEqual( - #'v1_0.amqp_value'{content = {utf8, <<"msg1🥕"/utf8>>}}, - amqp10_msg:body(Msg1)), - {ok, Msg2} = amqp10_client:get_msg(Receiver), - ?assertEqual( - #'v1_0.amqp_value'{ - content = {map, [ - {{utf8, <<"key1">>}, {utf8, <<"value">>}}, - {{utf8, <<"key2">>}, true}, - {{utf8, <<"key3">>}, {double, -1.1}}, - {{utf8, <<"key4">>}, {long, -1}} - ]}}, - amqp10_msg:body(Msg2)), - {ok, Msg3} = amqp10_client:get_msg(Receiver), - ?assertEqual( - [ - #'v1_0.amqp_sequence'{ - content = [{utf8, <<"value">>}, - true, - {double, -1.1}, - {long, -1}]} - ], - amqp10_msg:body(Msg3)), - - ok = detach_link_sync(Receiver), - ok = end_session_sync(Session), - ok = close_connection_sync(Connection), - ok = delete_queue(QName, Config). + TestName = atom_to_binary(?FUNCTION_NAME), + ok = run_jms_test(TestName, [], Config). temporary_queue_rpc(Config) -> TestName = QName = atom_to_binary(?FUNCTION_NAME), diff --git a/deps/rabbit/test/amqp_jms_SUITE_data/pom.xml b/deps/rabbit/test/amqp_jms_SUITE_data/pom.xml index ff312c90a8..8b06c85521 100644 --- a/deps/rabbit/test/amqp_jms_SUITE_data/pom.xml +++ b/deps/rabbit/test/amqp_jms_SUITE_data/pom.xml @@ -9,6 +9,7 @@ https://www.rabbitmq.com 5.10.2 + 3.27.3 2.6.1 [0.5.0-SNAPSHOT,) 1.2.13 @@ -43,6 +44,12 @@ ${amqp-client.version} test + + org.assertj + assertj-core + ${assertj.version} + test + diff --git a/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsTest.java b/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsTest.java index 23b66512fa..71e736a4e0 100644 --- a/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsTest.java +++ b/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsTest.java @@ -1,12 +1,22 @@ package com.rabbitmq.amqp.tests.jms; +import static com.rabbitmq.amqp.tests.jms.TestUtils.protonClient; +import static com.rabbitmq.amqp.tests.jms.TestUtils.protonConnection; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.*; import jakarta.jms.*; import java.util.*; +import java.util.concurrent.TimeUnit; import javax.naming.Context; + +import com.rabbitmq.qpid.protonj2.client.Client; +import com.rabbitmq.qpid.protonj2.client.Delivery; +import com.rabbitmq.qpid.protonj2.client.Receiver; +import jakarta.jms.Queue; import org.junit.jupiter.api.Test; +@JmsTestInfrastructure public class JmsTest { private javax.naming.Context getContext() throws Exception{ @@ -94,18 +104,20 @@ public class JmsTest { } } + String destination; + @Test public void message_types_jms_to_amqp() throws Exception { Context context = getContext(); ConnectionFactory factory = (ConnectionFactory) context.lookup("myConnection"); + Queue queue = TestUtils.queue(destination); + String msg1 = "msg1🥕"; try (Connection connection = factory.createConnection()) { Session session = connection.createSession(); - Destination queue = (Destination) context.lookup("myQueue"); MessageProducer producer = session.createProducer(queue); // TextMessage - String msg1 = "msg1🥕"; TextMessage textMessage = session.createTextMessage(msg1); producer.send(textMessage); @@ -126,12 +138,32 @@ public class JmsTest { producer.send(streamMessage); } - } + try (Client client = protonClient(); + com.rabbitmq.qpid.protonj2.client.Connection amqpConnection = protonConnection(client)) { + Receiver receiver = amqpConnection.openReceiver(queue.getQueueName()); + Delivery delivery = receiver.receive(10, TimeUnit.SECONDS); + assertNotNull(delivery); + assertEquals(msg1, delivery.message().body()); - // Test that Request/reply pattern using a TemporaryQueue works. - // https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#requestreply-pattern-using-a-temporaryqueue-jakarta-ee - @Test - public void temporary_queue_rpc() throws Exception { + delivery = receiver.receive(10, TimeUnit.SECONDS); + assertNotNull(delivery); + com.rabbitmq.qpid.protonj2.client.Message> mapMessage = delivery.message(); + assertThat(mapMessage.body()).containsEntry("key1", "value") + .containsEntry("key2", true) + .containsEntry("key3", -1.1) + .containsEntry("key4", -1L); + + delivery = receiver.receive(10, TimeUnit.SECONDS); + assertNotNull(delivery); + com.rabbitmq.qpid.protonj2.client.Message> listMessage = delivery.message(); + assertThat(listMessage.body()).containsExactly("value", true, -1.1, -1L); + } + } + + // Test that Request/reply pattern using a TemporaryQueue works. + // https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#requestreply-pattern-using-a-temporaryqueue-jakarta-ee + @Test + public void temporary_queue_rpc() throws Exception { Context context = getContext(); ConnectionFactory factory = (ConnectionFactory) context.lookup("myConnection"); diff --git a/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/TestUtils.java b/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/TestUtils.java index 192babb84d..8cb972cbbb 100644 --- a/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/TestUtils.java +++ b/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/TestUtils.java @@ -19,11 +19,16 @@ package com.rabbitmq.amqp.tests.jms; import static java.lang.String.format; +import com.rabbitmq.qpid.protonj2.client.Client; +import com.rabbitmq.qpid.protonj2.client.ConnectionOptions; +import com.rabbitmq.qpid.protonj2.client.exceptions.ClientException; import jakarta.jms.Connection; import jakarta.jms.ConnectionFactory; import jakarta.jms.JMSException; import jakarta.jms.Queue; import java.lang.reflect.Method; +import java.net.URI; +import java.net.URISyntaxException; import java.util.UUID; import org.apache.qpid.jms.JmsConnectionFactory; import org.apache.qpid.jms.JmsQueue; @@ -41,6 +46,24 @@ final class TestUtils { return uri == null || uri.isEmpty() ? DEFAULT_BROKER_URI : uri; } + static String brokerHost() { + try { + URI uri = new URI(brokerUri()); + return uri.getHost(); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } + + static int brokerPort() { + try { + URI uri = new URI(brokerUri()); + return uri.getPort(); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } + static String adminUsername() { return "guest"; } @@ -62,6 +85,20 @@ final class TestUtils { return new JmsQueue("/queues/" + name); } + static Client protonClient() { + return Client.create(); + } + + static com.rabbitmq.qpid.protonj2.client.Connection protonConnection(Client client) { + ConnectionOptions connectionOptions = new ConnectionOptions().virtualHost("vhost:/"); + connectionOptions.saslOptions().addAllowedMechanism("ANONYMOUS"); + try { + return client.connect(brokerHost(), brokerPort(), connectionOptions); + } catch (ClientException e) { + throw new RuntimeException(e); + } + } + static String name(TestInfo info) { return name(info.getTestClass().get(), info.getTestMethod().get()); } From c5867a7bd373d01587547973105e855ca80d4912 Mon Sep 17 00:00:00 2001 From: David Ansari Date: Tue, 11 Feb 2025 16:15:35 +0100 Subject: [PATCH 12/15] Add 4.1.0 release notes --- deps/rabbit/src/rabbit_reader.erl | 2 +- release-notes/4.1.0.md | 20 +++++++++++++------- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/deps/rabbit/src/rabbit_reader.erl b/deps/rabbit/src/rabbit_reader.erl index 498e333bc8..723ca4b5df 100644 --- a/deps/rabbit/src/rabbit_reader.erl +++ b/deps/rabbit/src/rabbit_reader.erl @@ -202,7 +202,7 @@ conserve_resources(Pid, Source, {_, Conserve, _}) -> server_properties(Protocol) -> {ok, Product} = application:get_key(rabbit, description), - {ok, Version} = application:get_key(rabbit, vsn), + Version = rabbit_misc:version(), %% Get any configuration-specified server properties {ok, RawConfigServerProps} = application:get_env(rabbit, diff --git a/release-notes/4.1.0.md b/release-notes/4.1.0.md index d61c8d9ee4..3a82c3bed0 100644 --- a/release-notes/4.1.0.md +++ b/release-notes/4.1.0.md @@ -124,13 +124,6 @@ This section can be incomplete and will be expanded as 4.1 approaches its releas GitHub issue: [#12599](https://github.com/rabbitmq/rabbitmq-server/pull/12599) - * Nodes will now fall back to system CA certificate list (if available) when no CA certificate - is explicitly configured. - - Contributed by @LoisSotoLopez. - - GitHub issue: [#10519](https://github.com/rabbitmq/rabbitmq-server/issues/10519), [#12564](https://github.com/rabbitmq/rabbitmq-server/pull/12564) - * AMQP 1.0 filters now have capped complexity: filtering on more than 16 properties won't be possible. This is a protection mechanism recommended in the AMQP 1.0 spec. @@ -145,6 +138,19 @@ This section can be incomplete and will be expanded as 4.1 approaches its releas GitHub issue: [#12559](https://github.com/rabbitmq/rabbitmq-server/pull/12559) + * Support field `dynamic` of AMQP 1.0 [source](https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-source) and [target](https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-target). + + This allows AMQP clients to dynamically create [exclusive queues](https://www.rabbitmq.com/docs/queues#exclusive-queues), which can be useful for RPC workloads. + + GitHub issue: [#13231](https://github.com/rabbitmq/rabbitmq-server/pull/13231) + + * Nodes will now fall back to system CA certificate list (if available) when no CA certificate + is explicitly configured. + + Contributed by @LoisSotoLopez. + + GitHub issue: [#10519](https://github.com/rabbitmq/rabbitmq-server/issues/10519), [#12564](https://github.com/rabbitmq/rabbitmq-server/pull/12564) + * Peer discovery resilience improvements. GitHub issues: [#12801](https://github.com/rabbitmq/rabbitmq-server/pull/12801), [#12809](https://github.com/rabbitmq/rabbitmq-server/pull/12809) From c4debab35516e7ac0ad914822ff1e2749034d29c Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Wed, 12 Feb 2025 12:35:40 -0500 Subject: [PATCH 13/15] Update 4.0.1 release notes to mention one more major pain point that Khepri addresses --- release-notes/4.0.1.md | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/release-notes/4.0.1.md b/release-notes/4.0.1.md index a7587586e0..eb98cb64ea 100644 --- a/release-notes/4.0.1.md +++ b/release-notes/4.0.1.md @@ -245,6 +245,30 @@ This section is incomplete and will be expanded as 4.0 approaches its release ca ### Core Server +#### Bug Fixes + + * A whole category of issues with binding inconsistency are addressed with the stabilization + of [Khepri](https://github.com/rabbitmq/khepri), a new [metadata store](https://www.rabbitmq.com/docs/metadata-store) that uses a tree of nested objects instead of multiple tables. + + With Mnesia, the original metadata store, bindings are stored in two tables, one for durable + bindings (between durable exchanges and durable queues or streams) and another for semi-durable + and transient ones (where either the queue is transient or both the queue and the exchange are). + + When a node was stopped or failed, all non-replicated transient queues on that node were deleted + by the remaining cluster peers. Due to high lock contention around these tables with Mnesia, this + could take a while. In the case where the restarted (or failed) node came online before all bindings + were removed, and/or clients could begin to create new bindings concurrently, the bindings table + rows could end up being inconsistent, resulting in obscure "binding not found" errors. + + Khepri avoids this problem entirely by only supporting durable entities and using a very different + [tree-based data model](https://github.com/rabbitmq/rabbitmq-server/pull/11225) that makes bindings removal much more efficient and lock contention-free. + + Mnesia users can work around this problem by using [quorum queues](https://www.rabbitmq.com/docs/quorum-queues) or durable classic queues + and durable exchanges. Their durable bindings will not be removed when a node stops. + + GitHub issues (discussions): [#11952](https://github.com/rabbitmq/rabbitmq-server/discussions/11952), [#13030](https://github.com/rabbitmq/rabbitmq-server/discussions/13030), [#12927](https://github.com/rabbitmq/rabbitmq-server/discussions/12927), [#12783](https://github.com/rabbitmq/rabbitmq-server/discussions/12783) + + #### Enhancements * Efficient sub-linear quorum queue recovery on node startup using checkpoints. From 71178170762d21c3b3c151cdeb53a7a29c4cea2d Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Wed, 12 Feb 2025 12:47:55 -0500 Subject: [PATCH 14/15] A minor 4.0.1 release notes update --- release-notes/4.0.1.md | 1 + 1 file changed, 1 insertion(+) diff --git a/release-notes/4.0.1.md b/release-notes/4.0.1.md index eb98cb64ea..25f90e10a8 100644 --- a/release-notes/4.0.1.md +++ b/release-notes/4.0.1.md @@ -265,6 +265,7 @@ This section is incomplete and will be expanded as 4.0 approaches its release ca Mnesia users can work around this problem by using [quorum queues](https://www.rabbitmq.com/docs/quorum-queues) or durable classic queues and durable exchanges. Their durable bindings will not be removed when a node stops. + Queues that are transient in nature can be declared as durable classic ones with a [TTL](https://www.rabbitmq.com/docs/ttl) of a few hours. GitHub issues (discussions): [#11952](https://github.com/rabbitmq/rabbitmq-server/discussions/11952), [#13030](https://github.com/rabbitmq/rabbitmq-server/discussions/13030), [#12927](https://github.com/rabbitmq/rabbitmq-server/discussions/12927), [#12783](https://github.com/rabbitmq/rabbitmq-server/discussions/12783) From dae4967bf1e44a263c3534a5620d904e56831b61 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Wed, 12 Feb 2025 14:26:30 -0500 Subject: [PATCH 15/15] 'ctl auth_clear_cache' => 'ctl clear_auth_backend_cache' --- ...tMQ.CLI.Ctl.Commands.ClearAuthBackendCacheCommand.erl} | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) rename deps/rabbitmq_auth_backend_cache/src/{Elixir.RabbitMQ.CLI.Ctl.Commands.AuthClearCacheCommand.erl => Elixir.RabbitMQ.CLI.Ctl.Commands.ClearAuthBackendCacheCommand.erl} (85%) diff --git a/deps/rabbitmq_auth_backend_cache/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AuthClearCacheCommand.erl b/deps/rabbitmq_auth_backend_cache/src/Elixir.RabbitMQ.CLI.Ctl.Commands.ClearAuthBackendCacheCommand.erl similarity index 85% rename from deps/rabbitmq_auth_backend_cache/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AuthClearCacheCommand.erl rename to deps/rabbitmq_auth_backend_cache/src/Elixir.RabbitMQ.CLI.Ctl.Commands.ClearAuthBackendCacheCommand.erl index 00888b8486..2cfe0106e2 100644 --- a/deps/rabbitmq_auth_backend_cache/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AuthClearCacheCommand.erl +++ b/deps/rabbitmq_auth_backend_cache/src/Elixir.RabbitMQ.CLI.Ctl.Commands.ClearAuthBackendCacheCommand.erl @@ -5,7 +5,7 @@ %% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. %% --module('Elixir.RabbitMQ.CLI.Ctl.Commands.AuthClearCacheCommand'). +-module('Elixir.RabbitMQ.CLI.Ctl.Commands.ClearAuthBackendCacheCommand'). -behaviour('Elixir.RabbitMQ.CLI.CommandBehaviour'). @@ -38,7 +38,7 @@ switches() -> []. usage() -> - <<"auth_clear_cache">>. + <<"clear_auth_backend_cache">>. usage_additional() -> []. @@ -50,7 +50,7 @@ help_section() -> {plugin, rabbitmq_auth_backend_cache}. description() -> - <<"Clear cache of authorization decisions">>. + <<"Clears rabbitmq_auth_backend_cache plugin's cache on the target node">>. flags() -> []. @@ -65,7 +65,7 @@ merge_defaults(A, O) -> {A, O}. banner(_, _) -> - erlang:iolist_to_binary([<<"Will delete all cached authorization decisions">>]). + <<"Will clear rabbitmq_auth_backend_cache plugin's cache on the target node...">>. run(_Args, #{node := Node}) -> case rabbit_misc:rpc_call(Node, rabbit_auth_backend_cache, clear_cache_cluster_wide, []) of