Merge pull request #14114 from rabbitmq/otp28

Improve Erlang/OTP 28 compatibility, use OTP 28 in the pipelines
This commit is contained in:
Michael Klishin 2025-06-27 21:16:39 +04:00 committed by GitHub
commit f55377575a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
28 changed files with 295 additions and 254 deletions

View File

@ -20,7 +20,7 @@ on:
# a tag of the erlang image, see https://hub.docker.com/_/erlang for available tags
# also used in the setup-beam step (same tag should work for both)
description: OTP version (eg. `26`, `26.2.5.6`)
default: 27
default: 28
build_arm:
description: Build for ARM64 as well?
type: boolean
@ -36,7 +36,7 @@ jobs:
strategy:
matrix:
otp_version:
- ${{ github.event.inputs.otp_version || '27' }}
- ${{ github.event.inputs.otp_version || '28' }}
runs-on: ubuntu-latest
outputs:
# When dependabot, or a user from a fork, creates PRs, secrets are not injected, and the OCI workflow can't push the image
@ -76,7 +76,7 @@ jobs:
fail-fast: false
matrix:
otp_version:
- ${{ github.event.inputs.otp_version || '27' }}
- ${{ github.event.inputs.otp_version || '28' }}
needs: build-package-generic-unix
runs-on: ubuntu-latest
if: ${{ needs.build-package-generic-unix.outputs.authorized }} == 'true'

View File

@ -13,7 +13,7 @@ concurrency:
group: ${{ github.workflow }}-${{ github.ref_name }}
cancel-in-progress: true
env:
OTP_VERSION: "27"
OTP_VERSION: "28"
jobs:
peer-discovery-aws-integration-test:
name: Integration Test
@ -48,10 +48,10 @@ jobs:
polling-seconds: 60
- name: CONFIGURE OTP & ELIXIR
if: steps.authorized.outputs.authorized == 'true'
uses: erlef/setup-beam@v1.17
uses: erlef/setup-beam@v1
with:
otp-version: ${{ env.OTP_VERSION }}
elixir-version: "1.17"
elixir-version: "1.18"
- name: SETUP ecs-cli
if: steps.authorized.outputs.authorized == 'true'
env:

View File

@ -44,7 +44,7 @@ jobs:
uses: actions/checkout@v4
- name: Configure OTP & Elixir
uses: erlef/setup-beam@v1.17
uses: erlef/setup-beam@v1
with:
otp-version: ${{ matrix.erlang_version }}
elixir-version: ${{ matrix.elixir_version }}

View File

@ -33,8 +33,13 @@ jobs:
- name: FETCH TAGS
run: git fetch --tags
- name: EXTRACT ACTIVEMQ VERSION
if: inputs.plugin == 'amqp10_client'
run: |
awk '/^ACTIVEMQ_VERSION/ {print $1 "=" $3}' deps/amqp10_client/Makefile >> $GITHUB_ENV
- name: SETUP OTP & ELIXIR
uses: erlef/setup-beam@v1.17
uses: erlef/setup-beam@v1
with:
otp-version: ${{ inputs.erlang_version }}
elixir-version: ${{ inputs.elixir_version }}
@ -99,12 +104,27 @@ jobs:
docker run -d --network host --name erlang_low_version erlang:${LOW_ERLANG_VERSION} \
erl -sname rabbit_fifo_prop@localhost -setcookie $(cat ~/.erlang.cookie) -noinput
- name: RESTORE ACTIVEMQ FROM CACHE
if: inputs.plugin == 'amqp10_client'
uses: actions/cache/restore@v4
id: cache-activemq-restore
with:
path: deps/amqp10_client/test/system_SUITE_data/apache-activemq-${{ env.ACTIVEMQ_VERSION }}-bin.tar.gz
key: activemq-${{ env.ACTIVEMQ_VERSION }}
- name: RUN TESTS
if: inputs.plugin != 'rabbitmq_cli'
run: |
sudo netstat -ntp
make -C deps/${{ inputs.plugin }} ${{ inputs.make_target }} RABBITMQ_METADATA_STORE=${{ inputs.metadata_store }}
- name: CACHE ACTIVEMQ
uses: actions/cache/save@v4
if: inputs.plugin == 'amqp10_client' && steps.cache-activemq-restore.outputs.cache-hit != 'true'
with:
path: deps/amqp10_client/test/system_SUITE_data/apache-activemq-${{ env.ACTIVEMQ_VERSION }}-bin.tar.gz
key: activemq-${{ env.ACTIVEMQ_VERSION }}
# rabbitmq_cli needs a correct broker version for two of its tests.
# But setting PROJECT_VERSION makes other plugins fail.
- name: RUN TESTS (rabbitmq_cli)

View File

@ -17,7 +17,7 @@ jobs:
plugin:
# These are using plugin-specific test jobs.
- rabbit
- rabbitmq_mqtt
# - rabbitmq_mqtt # disabled due to Elixir 1.18 JSON conficts
- rabbitmq_peer_discovery_aws
# These are from the test-plugin test job.
- amqp10_client
@ -57,14 +57,14 @@ jobs:
- rabbitmq_shovel
- rabbitmq_shovel_management
- rabbitmq_shovel_prometheus
- rabbitmq_stomp
- rabbitmq_stream
# - rabbitmq_stomp # disabled due to Elixir 1.18 JSON conficts
# - rabbitmq_stream # disabled due to Elixir 1.18 JSON conficts
- rabbitmq_stream_common
- rabbitmq_stream_management
- rabbitmq_tracing
- rabbitmq_trust_store
- rabbitmq_web_dispatch
- rabbitmq_web_mqtt
# - rabbitmq_web_mqtt # disabled due to Elixir 1.18 JSON conficts
- rabbitmq_web_stomp
# This one we do not want to run tests so no corresponding test job.
- rabbitmq_ct_helpers

View File

@ -23,8 +23,9 @@ jobs:
erlang_version:
- '26'
- '27'
- '28'
elixir_version:
- '1.17'
- '1.18'
# @todo Add macOS and Windows.
runs-on: ubuntu-latest
timeout-minutes: 60
@ -36,7 +37,7 @@ jobs:
run: git fetch --tags
- name: SETUP OTP & ELIXIR
uses: erlef/setup-beam@v1.17
uses: erlef/setup-beam@v1.19
with:
otp-version: ${{ matrix.erlang_version }}
elixir-version: ${{ matrix.elixir_version }}
@ -62,9 +63,9 @@ jobs:
fail-fast: false
matrix:
erlang_version:
- '27'
- '28'
elixir_version:
- '1.17'
- '1.18'
metadata_store:
- mnesia
- khepri
@ -81,9 +82,9 @@ jobs:
fail-fast: false
matrix:
erlang_version:
- '27'
- '28'
elixir_version:
- '1.17'
- '1.18'
metadata_store:
- mnesia
- khepri
@ -100,9 +101,9 @@ jobs:
fail-fast: false
matrix:
erlang_version: # Latest OTP
- '27'
- '28'
elixir_version: # Latest Elixir
- '1.17'
- '1.18'
uses: ./.github/workflows/test-make-type-check.yaml
with:
erlang_version: ${{ matrix.erlang_version }}

View File

@ -32,7 +32,7 @@ jobs:
uses: actions/checkout@v4
- name: Configure OTP & Elixir
uses: erlef/setup-beam@v1.17
uses: erlef/setup-beam@v1
with:
otp-version: ${{ matrix.erlang_version }}
elixir-version: ${{ matrix.elixir_version }}

View File

@ -36,7 +36,7 @@ jobs:
uses: actions/checkout@v4
- name: Configure OTP & Elixir
uses: erlef/setup-beam@v1.17
uses: erlef/setup-beam@v1
with:
otp-version: ${{ matrix.erlang_version }}
elixir-version: ${{ matrix.elixir_version }}

View File

@ -610,13 +610,13 @@ ranch_handshake(Ref) ->
tune_buffer_size(Sock, dynamic_buffer) ->
case rabbit_net:setopts(Sock, [{buffer, 128}]) of
ok -> ok;
{error, _} -> rabbit_net:fast_close(Sock),
{error, _} -> _ = rabbit_net:fast_close(Sock),
exit(normal)
end;
tune_buffer_size(Sock, static_buffer) ->
case tune_buffer_size_static(Sock) of
ok -> ok;
{error, _} -> rabbit_net:fast_close(Sock),
{error, _} -> _ = rabbit_net:fast_close(Sock),
exit(normal)
end.

View File

@ -275,7 +275,7 @@ socket_op(Sock, Fun) ->
case Fun(Sock) of
{ok, Res} -> Res;
{error, Reason} -> socket_error(Reason),
rabbit_net:fast_close(RealSocket),
_ = rabbit_net:fast_close(RealSocket),
exit(normal)
end.
@ -287,10 +287,10 @@ start_connection(Parent, HelperSups, RanchRef, Deb, Sock) ->
RealSocket = rabbit_net:unwrap_socket(Sock),
Name = case rabbit_net:connection_string(Sock, inbound) of
{ok, Str} -> list_to_binary(Str);
{error, enotconn} -> rabbit_net:fast_close(RealSocket),
{error, enotconn} -> _ = rabbit_net:fast_close(RealSocket),
exit(normal);
{error, Reason} -> socket_error(Reason),
rabbit_net:fast_close(RealSocket),
_ = rabbit_net:fast_close(RealSocket),
exit(normal)
end,
{ok, HandshakeTimeout} = application:get_env(rabbit, handshake_timeout),
@ -364,7 +364,7 @@ start_connection(Parent, HelperSups, RanchRef, Deb, Sock) ->
%% We don't call gen_tcp:close/1 here since it waits for
%% pending output to be sent, which results in unnecessary
%% delays.
rabbit_net:fast_close(RealSocket),
_ = rabbit_net:fast_close(RealSocket),
rabbit_networking:unregister_connection(self()),
rabbit_core_metrics:connection_closed(self()),
ClientProperties = case get(client_properties) of

View File

@ -100,60 +100,64 @@ end_per_testcase(Testcase, Config) ->
%% Test cases.
%% -------------------------------------------------------------------
single_node_list_of_user(Config) ->
Username = proplists:get_value(rmq_username, Config),
Username2 = <<"guest2">>,
Username1 = list_to_binary(atom_to_list(?FUNCTION_NAME) ++ "-1"),
Username2 = list_to_binary(atom_to_list(?FUNCTION_NAME) ++ "-2"),
Vhost = proplists:get_value(rmq_vhost, Config),
rabbit_ct_broker_helpers:add_user(Config, Username2),
rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost),
[ begin
rabbit_ct_broker_helpers:add_user(Config, U),
rabbit_ct_broker_helpers:set_full_permissions(Config, U, Vhost)
end || U <- [Username1, Username2]],
?assertEqual(0, count_connections_in(Config, Username)),
?assertEqual(0, count_connections_in(Config, Username1)),
?assertEqual(0, count_connections_in(Config, Username2)),
[Conn1] = open_connections(Config, [0]),
?awaitMatch(1, count_connections_in(Config, Username), ?AWAIT_TIMEOUT),
[#tracked_connection{username = Username}] = connections_in(Config, Username),
[Conn1] = open_connections(Config, [{0, Username1}]),
?awaitMatch(1, count_connections_in(Config, Username1), ?AWAIT_TIMEOUT),
[#tracked_connection{username = Username1}] = connections_in(Config, Username1),
close_connections([Conn1]),
?awaitMatch(0, count_connections_in(Config, Username), ?AWAIT_TIMEOUT),
?awaitMatch(0, count_connections_in(Config, Username1), ?AWAIT_TIMEOUT),
[Conn2] = open_connections(Config, [{0, Username2}]),
?awaitMatch(1, count_connections_in(Config, Username2), ?AWAIT_TIMEOUT),
[#tracked_connection{username = Username2}] = connections_in(Config, Username2),
[Conn3] = open_connections(Config, [0]),
?awaitMatch(1, count_connections_in(Config, Username), ?AWAIT_TIMEOUT),
[#tracked_connection{username = Username}] = connections_in(Config, Username),
[Conn3] = open_connections(Config, [{0, Username1}]),
?awaitMatch(1, count_connections_in(Config, Username1), ?AWAIT_TIMEOUT),
[#tracked_connection{username = Username1}] = connections_in(Config, Username1),
[Conn4] = open_connections(Config, [0]),
[Conn4] = open_connections(Config, [{0, Username1}]),
kill_connections([Conn4]),
?awaitMatch(1, count_connections_in(Config, Username), ?AWAIT_TIMEOUT),
[#tracked_connection{username = Username}] = connections_in(Config, Username),
?awaitMatch(1, count_connections_in(Config, Username1), ?AWAIT_TIMEOUT),
[#tracked_connection{username = Username1}] = connections_in(Config, Username1),
[Conn5] = open_connections(Config, [0]),
?awaitMatch(2, count_connections_in(Config, Username), ?AWAIT_TIMEOUT),
[Username, Username] =
[Conn5] = open_connections(Config, [{0, Username1}]),
?awaitMatch(2, count_connections_in(Config, Username1), ?AWAIT_TIMEOUT),
[Username1, Username1] =
lists:map(fun (#tracked_connection{username = U}) -> U end,
connections_in(Config, Username)),
connections_in(Config, Username1)),
close_connections([Conn2, Conn3, Conn5]),
rabbit_ct_broker_helpers:delete_user(Config, Username2),
?awaitMatch(0, length(all_connections(Config)), ?AWAIT_TIMEOUT).
single_node_user_deletion_forces_connection_closure(Config) ->
Username = proplists:get_value(rmq_username, Config),
Username2 = <<"guest2">>,
Username1 = list_to_binary(atom_to_list(?FUNCTION_NAME) ++ "-1"),
Username2 = list_to_binary(atom_to_list(?FUNCTION_NAME) ++ "-2"),
Vhost = proplists:get_value(rmq_vhost, Config),
rabbit_ct_broker_helpers:add_user(Config, Username2),
rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost),
[ begin
rabbit_ct_broker_helpers:add_user(Config, U),
rabbit_ct_broker_helpers:set_full_permissions(Config, U, Vhost)
end || U <- [Username1, Username2]],
?assertEqual(0, count_connections_in(Config, Username)),
?assertEqual(0, count_connections_in(Config, Username1)),
?assertEqual(0, count_connections_in(Config, Username2)),
[Conn1] = open_connections(Config, [0]),
?awaitMatch(1, count_connections_in(Config, Username), ?AWAIT_TIMEOUT),
[Conn1] = open_connections(Config, [{0, Username1}]),
?awaitMatch(1, count_connections_in(Config, Username1), ?AWAIT_TIMEOUT),
[_Conn2] = open_connections(Config, [{0, Username2}]),
?awaitMatch(1, count_connections_in(Config, Username2), ?AWAIT_TIMEOUT),
@ -162,22 +166,24 @@ single_node_user_deletion_forces_connection_closure(Config) ->
?awaitMatch(0, count_connections_in(Config, Username2), ?AWAIT_TIMEOUT),
close_connections([Conn1]),
?awaitMatch(0, count_connections_in(Config, Username), ?AWAIT_TIMEOUT).
?awaitMatch(0, count_connections_in(Config, Username1), ?AWAIT_TIMEOUT).
cluster_user_deletion_forces_connection_closure(Config) ->
Username = proplists:get_value(rmq_username, Config),
Username2 = <<"guest2">>,
Username1 = list_to_binary(atom_to_list(?FUNCTION_NAME) ++ "-1"),
Username2 = list_to_binary(atom_to_list(?FUNCTION_NAME) ++ "-2"),
Vhost = proplists:get_value(rmq_vhost, Config),
rabbit_ct_broker_helpers:add_user(Config, Username2),
rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost),
[ begin
rabbit_ct_broker_helpers:add_user(Config, U),
rabbit_ct_broker_helpers:set_full_permissions(Config, U, Vhost)
end || U <- [Username1, Username2]],
?assertEqual(0, count_connections_in(Config, Username)),
?assertEqual(0, count_connections_in(Config, Username1)),
?assertEqual(0, count_connections_in(Config, Username2)),
[Conn1] = open_connections(Config, [{0, Username}]),
?awaitMatch(1, count_connections_in(Config, Username), ?AWAIT_TIMEOUT),
[Conn1] = open_connections(Config, [{0, Username1}]),
?awaitMatch(1, count_connections_in(Config, Username1), ?AWAIT_TIMEOUT),
[_Conn2] = open_connections(Config, [{1, Username2}]),
?awaitMatch(1, count_connections_in(Config, Username2), ?AWAIT_TIMEOUT),
@ -186,7 +192,7 @@ cluster_user_deletion_forces_connection_closure(Config) ->
?awaitMatch(0, count_connections_in(Config, Username2), ?AWAIT_TIMEOUT),
close_connections([Conn1]),
?awaitMatch(0, count_connections_in(Config, Username), ?AWAIT_TIMEOUT).
?awaitMatch(0, count_connections_in(Config, Username1), ?AWAIT_TIMEOUT).
%% -------------------------------------------------------------------
%% Helpers

View File

@ -1285,38 +1285,43 @@ single_active_consumer_priority(Config) ->
ok.
force_shrink_member_to_current_member(Config) ->
[Server0, Server1, Server2] =
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
case rabbit_ct_helpers:is_mixed_versions() of
true ->
{skip, "Should not run in mixed version environments"};
_ ->
[Server0, Server1, Server2] =
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
QQ = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', QQ, 0, 0},
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
QQ = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', QQ, 0, 0},
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
RaName = ra_name(QQ),
rabbit_ct_client_helpers:publish(Ch, QQ, 3),
wait_for_messages_ready([Server0], RaName, 3),
RaName = ra_name(QQ),
rabbit_ct_client_helpers:publish(Ch, QQ, 3),
wait_for_messages_ready([Server0], RaName, 3),
{ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [QQ, <<"/">>]),
#{nodes := Nodes0} = amqqueue:get_type_state(Q0),
?assertEqual(3, length(Nodes0)),
{ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [QQ, <<"/">>]),
#{nodes := Nodes0} = amqqueue:get_type_state(Q0),
?assertEqual(3, length(Nodes0)),
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue,
force_shrink_member_to_current_member, [<<"/">>, QQ]),
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue,
force_shrink_member_to_current_member, [<<"/">>, QQ]),
wait_for_messages_ready([Server0], RaName, 3),
wait_for_messages_ready([Server0], RaName, 3),
{ok, Q1} = rpc:call(Server0, rabbit_amqqueue, lookup, [QQ, <<"/">>]),
#{nodes := Nodes1} = amqqueue:get_type_state(Q1),
?assertEqual(1, length(Nodes1)),
{ok, Q1} = rpc:call(Server0, rabbit_amqqueue, lookup, [QQ, <<"/">>]),
#{nodes := Nodes1} = amqqueue:get_type_state(Q1),
?assertEqual(1, length(Nodes1)),
%% grow queues back to all nodes
[rpc:call(Server0, rabbit_quorum_queue, grow, [S, <<"/">>, <<".*">>, all]) || S <- [Server1, Server2]],
%% grow queues back to all nodes
[rpc:call(Server0, rabbit_quorum_queue, grow, [S, <<"/">>, <<".*">>, all]) || S <- [Server1, Server2]],
wait_for_messages_ready([Server0], RaName, 3),
{ok, Q2} = rpc:call(Server0, rabbit_amqqueue, lookup, [QQ, <<"/">>]),
#{nodes := Nodes2} = amqqueue:get_type_state(Q2),
?assertEqual(3, length(Nodes2)).
wait_for_messages_ready([Server0], RaName, 3),
{ok, Q2} = rpc:call(Server0, rabbit_amqqueue, lookup, [QQ, <<"/">>]),
#{nodes := Nodes2} = amqqueue:get_type_state(Q2),
?assertEqual(3, length(Nodes2))
end.
force_all_queues_shrink_member_to_current_member(Config) ->
[Server0, Server1, Server2] =

View File

@ -1111,32 +1111,43 @@ two_nodes_same_otp_version(Config0) ->
%% Run the log on two Erlang nodes with different OTP versions.
two_nodes_different_otp_version(_Config) ->
Node = 'rabbit_fifo_prop@localhost',
case net_adm:ping(Node) of
pong ->
case is_same_otp_version(Node) of
true ->
ct:fail("expected CT node and 'rabbit_fifo_prop@localhost' "
"to have different OTP versions");
false ->
Prefixes = ["rabbit_fifo", "rabbit_misc", "mc",
"lqueue", "priority_queue", "ra_"],
[begin
Mod = list_to_atom(ModStr),
{Mod, Bin, _File} = code:get_object_code(Mod),
{module, Mod} = erpc:call(Node, code, load_binary, [Mod, ModStr, Bin])
end
|| {ModStr, _FileName, _Loaded} <- code:all_available(),
lists:any(fun(Prefix) -> lists:prefix(Prefix, ModStr) end, Prefixes)],
two_nodes(Node)
end;
pang ->
Reason = {node_down, Node},
case rabbit_ct_helpers:is_ci() of
true ->
ct:fail(Reason);
false ->
{skip, Reason}
case erlang:system_info(otp_release) of
"28" ->
%% Compiling a BEAM file on OTP 28 and loading it on OTP 26 or 27
%% causes a "corrupt atom table" error.
%% https://github.com/erlang/otp/pull/8913#issue-2572291638
{skip, "loading BEAM file compiled on OTP 28 on a lower OTP version is unsupported"};
_ ->
Node = 'rabbit_fifo_prop@localhost',
case net_adm:ping(Node) of
pong ->
case is_same_otp_version(Node) of
true ->
ct:fail("expected CT node and 'rabbit_fifo_prop@localhost' "
"to have different OTP versions");
false ->
Prefixes = ["rabbit_fifo", "rabbit_misc", "mc",
"lqueue", "priority_queue", "ra_"],
[begin
Mod = list_to_atom(ModStr),
{Mod, Bin, _File} = code:get_object_code(Mod),
{module, Mod} = erpc:call(Node, code, load_binary,
[Mod, ModStr, Bin])
end
|| {ModStr, _FileName, _Loaded} <- code:all_available(),
lists:any(fun(Prefix) ->
lists:prefix(Prefix, ModStr)
end, Prefixes)],
two_nodes(Node)
end;
pang ->
Reason = {node_down, Node},
case rabbit_ct_helpers:is_ci() of
true ->
ct:fail(Reason);
false ->
{skip, Reason}
end
end
end.

View File

@ -43,7 +43,7 @@ DEP_EARLY_PLUGINS = $(PROJECT)/mk/rabbitmq-early-plugin.mk
DEP_PLUGINS = $(PROJECT)/mk/rabbitmq-build.mk \
$(PROJECT)/mk/rabbitmq-hexpm.mk
PLT_APPS += mnesia crypto ssl
PLT_APPS += mnesia crypto ssl xmerl
include ../../rabbitmq-components.mk
include ../../erlang.mk

View File

@ -109,11 +109,18 @@ find_by_type(Type, {rdnSequence, RDNs}) ->
%% Formatting functions
%%--------------------------------------------------------------------------
-if (?OTP_RELEASE >= 28).
-define(M, 'PKIXAlgs-2009').
-else.
-define(M, 'OTP-PUB-KEY').
-endif.
sanitize_other_name(Bin) when is_binary(Bin) ->
%% We make a wild assumption about the types here
%% but ASN.1 decoding functions in OTP only offer so much and SAN values
%% are expected to be "string-like" by RabbitMQ
case 'OTP-PUB-KEY':decode('DirectoryString', Bin) of
case ?M:decode('DirectoryString', Bin) of
{ok, {_, Val}} -> Val;
Other -> Other
end.

View File

@ -82,19 +82,10 @@
-define(SSL_CLOSE_TIMEOUT, 5000).
-define(IS_SSL(Sock), is_tuple(Sock)
andalso (tuple_size(Sock) =:= 3)
andalso (element(1, Sock) =:= sslsocket)).
is_ssl(Sock) -> ?IS_SSL(Sock).
%% Seems hackish. Is hackish. But the structure is stable and
%% kept this way for backward compatibility reasons. We need
%% it for two reasons: there are no ssl:getstat(Sock) function,
%% and no ssl:close(Timeout) function. Both of them are being
%% worked on as we speak.
ssl_get_socket(Sock) ->
element(2, element(2, Sock)).
ssl_info(Sock) when ?IS_SSL(Sock) ->
ssl:connection_information(Sock);
ssl_info(_Sock) ->
@ -119,12 +110,12 @@ controlling_process(Sock, Pid) when is_port(Sock) ->
gen_tcp:controlling_process(Sock, Pid).
getstat(Sock, Stats) when ?IS_SSL(Sock) ->
inet:getstat(ssl_get_socket(Sock), Stats);
ssl:getstat(Sock, Stats);
getstat(Sock, Stats) when is_port(Sock) ->
inet:getstat(Sock, Stats);
%% Used by Proxy protocol support in plugins
getstat({rabbit_proxy_socket, Sock, _}, Stats) when ?IS_SSL(Sock) ->
inet:getstat(ssl_get_socket(Sock), Stats);
ssl:getstat(Sock, Stats);
getstat({rabbit_proxy_socket, Sock, _}, Stats) when is_port(Sock) ->
inet:getstat(Sock, Stats).
@ -177,27 +168,7 @@ close(Sock) when ?IS_SSL(Sock) -> ssl:close(Sock);
close(Sock) when is_port(Sock) -> gen_tcp:close(Sock).
fast_close(Sock) when ?IS_SSL(Sock) ->
%% We cannot simply port_close the underlying tcp socket since the
%% TLS protocol is quite insistent that a proper closing handshake
%% should take place (see RFC 5245 s7.2.1). So we call ssl:close
%% instead, but that can block for a very long time, e.g. when
%% there is lots of pending output and there is tcp backpressure,
%% or the ssl_connection process has entered the the
%% workaround_transport_delivery_problems function during
%% termination, which, inexplicably, does a gen_tcp:recv(Socket,
%% 0), which may never return if the client doesn't send a FIN or
%% that gets swallowed by the network. Since there is no timeout
%% variant of ssl:close, we construct our own.
{Pid, MRef} = spawn_monitor(fun () -> ssl:close(Sock) end),
erlang:send_after(?SSL_CLOSE_TIMEOUT, self(), {Pid, ssl_close_timeout}),
receive
{Pid, ssl_close_timeout} ->
erlang:demonitor(MRef, [flush]),
exit(Pid, kill);
{'DOWN', MRef, process, Pid, _Reason} ->
ok
end,
catch port_close(ssl_get_socket(Sock)),
_ = ssl:close(Sock, ?SSL_CLOSE_TIMEOUT),
ok;
fast_close(Sock) when is_port(Sock) ->
catch port_close(Sock), ok.

View File

@ -23,7 +23,9 @@ parse_endpoint(Destination, AllowAnonymousQueue)
parse_endpoint(Destination, AllowAnonymousQueue)
when is_list(Destination) ->
case re:split(Destination, "/", [unicode, {return, list}]) of
[Name] ->
[] -> %% in OTP28+, re:split("", "/") returns []
{ok, {queue, unescape("")}};
[Name] -> %% before OTP28, re:split("", "/") returns [[]]
{ok, {queue, unescape(Name)}};
["", Type | Rest]
when Type =:= "exchange" orelse Type =:= "queue" orelse

View File

@ -21,8 +21,8 @@ TEST_DEPS = amqp amqp_client temp x509 rabbit
dep_amqp = hex 3.3.0
dep_csv = hex 3.2.1
dep_json = hex 1.4.1
dep_temp = hex 0.4.7
dep_x509 = hex 0.8.8
dep_temp = hex 0.4.9
dep_x509 = hex 0.9.0
DEP_EARLY_PLUGINS = rabbit_common/mk/rabbitmq-early-plugin.mk
DEP_PLUGINS = rabbit_common/mk/rabbitmq-build.mk

View File

@ -99,7 +99,7 @@ defmodule SetPermissionsCommandTest do
assert @command.run(
[context[:user], "^#{context[:user]}-.*", ".*", "*"],
context[:opts]
) == {:error, {:invalid_regexp, ~c"*", {~c"nothing to repeat", 0}}}
) == {:error, {:invalid_regexp, ~c"*", {~c"quantifier does not follow a repeatable item", 0}}}
# asserts that the failed command didn't change anything
u = Enum.find(list_permissions(context[:vhost]), fn x -> x[:user] == context[:user] end)

View File

@ -100,7 +100,7 @@ defmodule SetPermissionsGloballyCommandTest do
assert @command.run(
[context[:user], "^#{context[:user]}-.*", ".*", "*"],
context[:opts]
) == {:error, {:invalid_regexp, ~c"*", {~c"nothing to repeat", 0}}}
) == {:error, {:invalid_regexp, ~c"*", {~c"quantifier does not follow a repeatable item", 0}}}
# asserts that the failed command didn't change anything
p4 = Enum.find(list_permissions(@vhost1), fn x -> x[:user] == context[:user] end)

View File

@ -68,6 +68,24 @@ defmodule DisablePluginsCommandTest do
}
end
# Helper functions for order-insensitive assertions
defp normalize_result_map(map) when is_map(map) do
map
|> Map.update(:stopped, [], &Enum.sort/1)
|> Map.update(:disabled, [], &Enum.sort/1)
|> Map.update(:set, [], &Enum.sort/1)
end
defp normalize_stream_result([list, map]) when is_list(list) and is_map(map) do
[Enum.sort(list), normalize_result_map(map)]
end
defp normalize_stream_result(other), do: other
defp assert_lists_equal(expected, actual) do
assert Enum.sort(expected) == Enum.sort(actual)
end
test "validate: specifying both --online and --offline is reported as invalid", context do
assert match?(
{:validation_failure, {:bad_argument, _}},
@ -104,16 +122,18 @@ defmodule DisablePluginsCommandTest do
assert {:stream, test_stream} =
@command.run(["rabbitmq_stomp"], Map.merge(context[:opts], %{node: :nonode}))
assert [
[:rabbitmq_exchange_federation, :rabbitmq_federation, :rabbitmq_federation_common, :rabbitmq_queue_federation],
%{mode: :offline, disabled: [:rabbitmq_stomp], set: [:rabbitmq_exchange_federation, :rabbitmq_federation, :rabbitmq_federation_common, :rabbitmq_queue_federation]}
] ==
Enum.to_list(test_stream)
result = Enum.to_list(test_stream)
expected = [
[:rabbitmq_exchange_federation, :rabbitmq_federation, :rabbitmq_federation_common, :rabbitmq_queue_federation],
%{mode: :offline, disabled: [:rabbitmq_stomp], set: [:rabbitmq_exchange_federation, :rabbitmq_federation, :rabbitmq_federation_common, :rabbitmq_queue_federation]}
]
assert normalize_stream_result(expected) == normalize_stream_result(result)
assert {:ok, [[:rabbitmq_federation]]} == :file.consult(context[:opts][:enabled_plugins_file])
assert [:amqp_client, :rabbitmq_exchange_federation, :rabbitmq_federation, :rabbitmq_federation_common, :rabbitmq_queue_federation, :rabbitmq_stomp] ==
Enum.sort(:rabbit_misc.rpc_call(context[:opts][:node], :rabbit_plugins, :active, []))
result = :rabbit_misc.rpc_call(context[:opts][:node], :rabbit_plugins, :active, [])
expected = [:amqp_client, :rabbitmq_exchange_federation, :rabbitmq_federation, :rabbitmq_federation_common, :rabbitmq_queue_federation, :rabbitmq_stomp]
assert_lists_equal(expected, result)
end
test "in offline mode, writes out enabled plugins and reports implicitly enabled plugin list",
@ -124,15 +144,18 @@ defmodule DisablePluginsCommandTest do
Map.merge(context[:opts], %{offline: true, online: false})
)
assert [
[:rabbitmq_exchange_federation, :rabbitmq_federation, :rabbitmq_federation_common, :rabbitmq_queue_federation],
%{mode: :offline, disabled: [:rabbitmq_stomp], set: [:rabbitmq_exchange_federation, :rabbitmq_federation, :rabbitmq_federation_common, :rabbitmq_queue_federation]}
] == Enum.to_list(test_stream)
result = Enum.to_list(test_stream)
expected = [
[:rabbitmq_exchange_federation, :rabbitmq_federation, :rabbitmq_federation_common, :rabbitmq_queue_federation],
%{mode: :offline, disabled: [:rabbitmq_stomp], set: [:rabbitmq_exchange_federation, :rabbitmq_federation, :rabbitmq_federation_common, :rabbitmq_queue_federation]}
]
assert normalize_stream_result(expected) == normalize_stream_result(result)
assert {:ok, [[:rabbitmq_federation]]} == :file.consult(context[:opts][:enabled_plugins_file])
assert [:amqp_client, :rabbitmq_exchange_federation, :rabbitmq_federation, :rabbitmq_federation_common, :rabbitmq_queue_federation, :rabbitmq_stomp] ==
Enum.sort(:rabbit_misc.rpc_call(context[:opts][:node], :rabbit_plugins, :active, []))
active_plugins = :rabbit_misc.rpc_call(context[:opts][:node], :rabbit_plugins, :active, [])
expected_active = [:amqp_client, :rabbitmq_exchange_federation, :rabbitmq_federation, :rabbitmq_federation_common, :rabbitmq_queue_federation, :rabbitmq_stomp]
assert_lists_equal(expected_active, active_plugins)
end
test "in offline mode, removes implicitly enabled plugins when the last explicitly enabled one is removed",
@ -143,10 +166,12 @@ defmodule DisablePluginsCommandTest do
Map.merge(context[:opts], %{offline: true, online: false})
)
assert [
[:rabbitmq_stomp],
%{mode: :offline, disabled: [:rabbitmq_federation_common, :rabbitmq_queue_federation, :rabbitmq_exchange_federation, :rabbitmq_federation], set: [:rabbitmq_stomp]}
] == Enum.to_list(test_stream0)
result = Enum.to_list(test_stream0)
expected = [
[:rabbitmq_stomp],
%{mode: :offline, disabled: [:rabbitmq_exchange_federation, :rabbitmq_federation, :rabbitmq_federation_common, :rabbitmq_queue_federation], set: [:rabbitmq_stomp]}
]
assert normalize_stream_result(expected) == normalize_stream_result(result)
assert {:ok, [[:rabbitmq_stomp]]} == :file.consult(context[:opts][:enabled_plugins_file])
@ -156,8 +181,9 @@ defmodule DisablePluginsCommandTest do
Map.merge(context[:opts], %{offline: true, online: false})
)
assert [[], %{mode: :offline, disabled: [:rabbitmq_stomp], set: []}] ==
Enum.to_list(test_stream1)
result = Enum.to_list(test_stream1)
expected = [[], %{mode: :offline, disabled: [:rabbitmq_stomp], set: []}]
assert normalize_stream_result(expected) == normalize_stream_result(result)
assert {:ok, [[]]} = :file.consult(context[:opts][:enabled_plugins_file])
end
@ -165,102 +191,90 @@ defmodule DisablePluginsCommandTest do
test "updates plugin list and stops disabled plugins", context do
assert {:stream, test_stream0} = @command.run(["rabbitmq_stomp"], context[:opts])
assert [
[:rabbitmq_exchange_federation, :rabbitmq_federation, :rabbitmq_federation_common, :rabbitmq_queue_federation],
%{
mode: :online,
started: [],
stopped: [:rabbitmq_stomp],
disabled: [:rabbitmq_stomp],
set: [:rabbitmq_exchange_federation, :rabbitmq_federation, :rabbitmq_federation_common, :rabbitmq_queue_federation]
}
] ==
Enum.to_list(test_stream0)
result = Enum.to_list(test_stream0)
expected = [
[:rabbitmq_exchange_federation, :rabbitmq_federation, :rabbitmq_federation_common, :rabbitmq_queue_federation],
%{
mode: :online,
started: [],
stopped: [:rabbitmq_stomp],
disabled: [:rabbitmq_stomp],
set: [:rabbitmq_exchange_federation, :rabbitmq_federation, :rabbitmq_federation_common, :rabbitmq_queue_federation]
}
]
assert normalize_stream_result(expected) == normalize_stream_result(result)
assert {:ok, [[:rabbitmq_federation]]} == :file.consult(context[:opts][:enabled_plugins_file])
assert [:amqp_client, :rabbitmq_exchange_federation, :rabbitmq_federation, :rabbitmq_federation_common, :rabbitmq_queue_federation] ==
Enum.sort(:rabbit_misc.rpc_call(context[:opts][:node], :rabbit_plugins, :active, []))
result = :rabbit_misc.rpc_call(context[:opts][:node], :rabbit_plugins, :active, [])
expected = [:amqp_client, :rabbitmq_exchange_federation, :rabbitmq_federation, :rabbitmq_federation_common, :rabbitmq_queue_federation]
assert_lists_equal(expected, result)
assert {:stream, test_stream1} = @command.run(["rabbitmq_federation"], context[:opts])
assert [
[],
%{
mode: :online,
started: [],
stopped: [:rabbitmq_exchange_federation, :rabbitmq_federation, :rabbitmq_federation_common, :rabbitmq_queue_federation],
disabled: [:rabbitmq_federation_common, :rabbitmq_queue_federation, :rabbitmq_exchange_federation, :rabbitmq_federation],
set: []
}
] ==
Enum.to_list(test_stream1)
result = Enum.to_list(test_stream1)
expected = [
[],
%{
mode: :online,
started: [],
stopped: [:rabbitmq_exchange_federation, :rabbitmq_federation, :rabbitmq_federation_common, :rabbitmq_queue_federation],
disabled: [:rabbitmq_federation_common, :rabbitmq_queue_federation, :rabbitmq_exchange_federation, :rabbitmq_federation],
set: []
}
]
assert normalize_stream_result(expected) == normalize_stream_result(result)
assert {:ok, [[]]} == :file.consult(context[:opts][:enabled_plugins_file])
assert Enum.empty?(
Enum.sort(:rabbit_misc.rpc_call(context[:opts][:node], :rabbit_plugins, :active, []))
)
result = :rabbit_misc.rpc_call(context[:opts][:node], :rabbit_plugins, :active, [])
assert Enum.empty?(result)
end
test "can disable multiple plugins at once", context do
assert {:stream, test_stream} =
@command.run(["rabbitmq_stomp", "rabbitmq_federation"], context[:opts])
[[], m0] = Enum.to_list(test_stream)
m1 =
m0
|> Map.update!(:stopped, &Enum.sort/1)
|> Map.update!(:disabled, &Enum.sort/1)
expected_list = Enum.sort([:rabbitmq_exchange_federation, :rabbitmq_federation, :rabbitmq_federation_common, :rabbitmq_queue_federation, :rabbitmq_stomp])
assert [
[],
%{
mode: :online,
started: [],
stopped: expected_list,
disabled: expected_list,
set: []
}
] == [[], m1]
result = Enum.to_list(test_stream)
expected_list = [:rabbitmq_exchange_federation, :rabbitmq_federation, :rabbitmq_federation_common, :rabbitmq_queue_federation, :rabbitmq_stomp]
expected = [
[],
%{
mode: :online,
started: [],
stopped: expected_list,
disabled: expected_list,
set: []
}
]
assert normalize_stream_result(expected) == normalize_stream_result(result)
assert {:ok, [[]]} == :file.consult(context[:opts][:enabled_plugins_file])
assert Enum.empty?(
Enum.sort(:rabbit_misc.rpc_call(context[:opts][:node], :rabbit_plugins, :active, []))
)
active_plugins = :rabbit_misc.rpc_call(context[:opts][:node], :rabbit_plugins, :active, [])
assert Enum.empty?(active_plugins)
end
test "disabling a dependency disables all plugins that depend on it", context do
assert {:stream, test_stream} = @command.run(["amqp_client"], context[:opts])
[[], m0] = Enum.to_list(test_stream)
m1 =
m0
|> Map.update!(:stopped, &Enum.sort/1)
|> Map.update!(:disabled, &Enum.sort/1)
expected_list = Enum.sort([:rabbitmq_exchange_federation, :rabbitmq_federation, :rabbitmq_federation_common, :rabbitmq_queue_federation, :rabbitmq_stomp])
assert [
[],
%{
mode: :online,
started: [],
stopped: expected_list,
disabled: expected_list,
set: []
}
] == [[], m1]
result = Enum.to_list(test_stream)
expected_list = [:rabbitmq_exchange_federation, :rabbitmq_federation, :rabbitmq_federation_common, :rabbitmq_queue_federation, :rabbitmq_stomp]
expected = [
[],
%{
mode: :online,
started: [],
stopped: expected_list,
disabled: expected_list,
set: []
}
]
assert normalize_stream_result(expected) == normalize_stream_result(result)
assert {:ok, [[]]} == :file.consult(context[:opts][:enabled_plugins_file])
assert Enum.empty?(
Enum.sort(:rabbit_misc.rpc_call(context[:opts][:node], :rabbit_plugins, :active, []))
)
result = :rabbit_misc.rpc_call(context[:opts][:node], :rabbit_plugins, :active, [])
assert Enum.empty?(result)
end
test "formats enabled plugins mismatch errors", context do

View File

@ -5,7 +5,7 @@ DEPS = rabbit_common rabbitmq_ct_helpers amqp_client
DEP_PLUGINS = rabbit_common/mk/rabbitmq-build.mk
PLT_APPS += common_test crypto
PLT_APPS += common_test crypto ssl
include ../../rabbitmq-components.mk
include ../../erlang.mk

View File

@ -23,7 +23,7 @@ new(WsUrl, PPid, AuthInfo, Protocols) ->
new(WsUrl, PPid, AuthInfo, Protocols, <<>>).
new(WsUrl, PPid, AuthInfo, Protocols, TcpPreface) ->
_ = crypto:start(),
_ = application:start(crypto),
_ = application:ensure_all_started(ssl),
{Transport, Url} = case WsUrl of
"ws://" ++ Rest -> {gen_tcp, Rest};

View File

@ -73,6 +73,7 @@
-compile(inline).
-compile(inline_list_funcs).
-compile({no_auto_import,[ceil/1]}).
-type value() :: tuple().
-type internal_value() :: tuple() | drop.

View File

@ -33,7 +33,7 @@ endef
DEPS = ranch rabbit_common rabbit amqp_client
TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers rabbitmq_management
PLT_APPS += rabbitmq_cli elixir
PLT_APPS += rabbitmq_cli elixir ssl
DEP_EARLY_PLUGINS = rabbit_common/mk/rabbitmq-early-plugin.mk
DEP_PLUGINS = rabbit_common/mk/rabbitmq-plugin.mk

View File

@ -25,7 +25,7 @@ LOCAL_DEPS = ssl
DEPS = rabbit rabbitmq_stream_common osiris ranch
TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers amqp_client amqp10_client
PLT_APPS += rabbitmq_cli elixir
PLT_APPS += rabbitmq_cli elixir ssl
DEP_EARLY_PLUGINS = rabbit_common/mk/rabbitmq-early-plugin.mk
DEP_PLUGINS = rabbit_common/mk/rabbitmq-plugin.mk

View File

@ -21,7 +21,7 @@ LOCAL_DEPS = ssl
DEPS = rabbit cowboy rabbitmq_mqtt
TEST_DEPS = emqtt rabbitmq_ct_helpers rabbitmq_ct_client_helpers rabbitmq_management rabbitmq_stomp rabbitmq_consistent_hash_exchange
PLT_APPS += rabbitmq_cli elixir cowlib
PLT_APPS += rabbitmq_cli elixir cowlib ssl
# FIXME: Add Ranch as a BUILD_DEPS to be sure the correct version is picked.
# See rabbitmq-components.mk.

17
erlang.mk vendored
View File

@ -17,7 +17,7 @@
ERLANG_MK_FILENAME := $(realpath $(lastword $(MAKEFILE_LIST)))
export ERLANG_MK_FILENAME
ERLANG_MK_VERSION = e13b4c7
ERLANG_MK_VERSION = f157f11
ERLANG_MK_WITHOUT =
# Make 3.81 and 3.82 are deprecated.
@ -559,6 +559,14 @@ export ERL_LIBS
export NO_AUTOPATCH
# Elixir.
# Elixir is automatically enabled in all cases except when
# an Erlang project uses an Elixir dependency. In that case
# $(ELIXIR) must be set explicitly.
ELIXIR ?= $(if $(filter elixir,$(BUILD_DEPS) $(DEPS)),dep,$(if $(EX_FILES),system,disable))
export ELIXIR
# Verbosity.
dep_verbose_0 = @echo " DEP $1 ($(call query_version,$1))";
@ -1778,12 +1786,6 @@ endif
# Copyright (c) 2024, Loïc Hoguin <essen@ninenines.eu>
# This file is part of erlang.mk and subject to the terms of the ISC License.
# Elixir is automatically enabled in all cases except when
# an Erlang project uses an Elixir dependency. In that case
# $(ELIXIR) must be set explicitly.
ELIXIR ?= $(if $(filter elixir,$(BUILD_DEPS) $(DEPS)),dep,$(if $(EX_FILES),system,disable))
export ELIXIR
ifeq ($(ELIXIR),system)
# We expect 'elixir' to be on the path.
ELIXIR_BIN ?= $(shell readlink -f `which elixir`)
@ -1964,6 +1966,7 @@ endef
define compile_ex.erl
{ok, _} = application:ensure_all_started(elixir),
{ok, _} = application:ensure_all_started(mix),
$(foreach dep,$(LOCAL_DEPS),_ = application:load($(dep)),)
ModCode = list_to_atom("Elixir.Code"),
ModCode:put_compiler_option(ignore_module_conflict, true),
ModComp = list_to_atom("Elixir.Kernel.ParallelCompiler"),