rabbitmq-server/deps/rabbitmq_stomp/test/topic_SUITE.erl

177 lines
6.9 KiB
Erlang

%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%
-module(topic_SUITE).
-compile(export_all).
-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
-include("rabbit_stomp.hrl").
-include("rabbit_stomp_frame.hrl").
all() ->
[{group, list_to_atom("version_" ++ V)} || V <- ?SUPPORTED_VERSIONS].
groups() ->
Tests = [
publish_topic_authorisation,
subscribe_topic_authorisation,
change_default_topic_exchange
],
[{list_to_atom("version_" ++ V), [sequence], Tests}
|| V <- ?SUPPORTED_VERSIONS].
init_per_suite(Config) ->
Config1 = rabbit_ct_helpers:set_config(Config,
[{rmq_nodename_suffix, ?MODULE}]),
rabbit_ct_helpers:log_environment(),
rabbit_ct_helpers:run_setup_steps(Config1,
rabbit_ct_broker_helpers:setup_steps()).
end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config,
rabbit_ct_broker_helpers:teardown_steps()).
init_per_group(Group, Config) ->
Version = string:sub_string(atom_to_list(Group), 9),
rabbit_ct_helpers:set_config(Config, [{version, Version}]).
end_per_group(_Group, Config) -> Config.
init_per_testcase(_TestCase, Config) ->
Version = ?config(version, Config),
StompPort = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_stomp),
{ok, Connection} = amqp_connection:start(#amqp_params_direct{
node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename)
}),
{ok, Channel} = amqp_connection:open_channel(Connection),
{ok, Client} = rabbit_stomp_client:connect(Version, StompPort),
Config1 = rabbit_ct_helpers:set_config(Config, [
{amqp_connection, Connection},
{amqp_channel, Channel},
{stomp_client, Client}
]),
init_per_testcase0(Config1).
end_per_testcase(_TestCase, Config) ->
Connection = ?config(amqp_connection, Config),
Channel = ?config(amqp_channel, Config),
Client = ?config(stomp_client, Config),
rabbit_stomp_client:disconnect(Client),
amqp_channel:close(Channel),
amqp_connection:close(Connection),
end_per_testcase0(Config).
init_per_testcase0(Config) ->
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_auth_backend_internal, add_user,
[<<"user">>, <<"pass">>, <<"acting-user">>]),
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_auth_backend_internal, set_permissions, [
<<"user">>, <<"/">>, <<".*">>, <<".*">>, <<".*">>, <<"acting-user">>]),
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_auth_backend_internal, set_topic_permissions, [
<<"user">>, <<"/">>, <<"amq.topic">>, <<"^{username}.Authorised">>, <<"^{username}.Authorised">>, <<"acting-user">>]),
Version = ?config(version, Config),
StompPort = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_stomp),
{ok, ClientFoo} = rabbit_stomp_client:connect(Version, "user", "pass", StompPort),
rabbit_ct_helpers:set_config(Config, [{client_foo, ClientFoo}]).
end_per_testcase0(Config) ->
ClientFoo = ?config(client_foo, Config),
rabbit_stomp_client:disconnect(ClientFoo),
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_auth_backend_internal, delete_user,
[<<"user">>, <<"acting-user">>]),
Config.
publish_topic_authorisation(Config) ->
ClientFoo = ?config(client_foo, Config),
AuthorisedTopic = "/topic/user.AuthorisedTopic",
RestrictedTopic = "/topic/user.RestrictedTopic",
%% send on authorised topic
rabbit_stomp_client:send(
ClientFoo, "SUBSCRIBE", [{"destination", AuthorisedTopic}]),
rabbit_stomp_client:send(
ClientFoo, "SEND", [{"destination", AuthorisedTopic}], ["authorised hello"]),
{ok, _Client1, _, Body} = stomp_receive(ClientFoo, "MESSAGE"),
[<<"authorised hello">>] = Body,
%% send on restricted topic
rabbit_stomp_client:send(
ClientFoo, "SEND", [{"destination", RestrictedTopic}], ["hello"]),
{ok, _Client2, Hdrs2, _} = stomp_receive(ClientFoo, "ERROR"),
"access_refused" = proplists:get_value("message", Hdrs2),
ok.
subscribe_topic_authorisation(Config) ->
ClientFoo = ?config(client_foo, Config),
AuthorisedTopic = "/topic/user.AuthorisedTopic",
RestrictedTopic = "/topic/user.RestrictedTopic",
%% subscribe to authorised topic
rabbit_stomp_client:send(
ClientFoo, "SUBSCRIBE", [{"destination", AuthorisedTopic}]),
rabbit_stomp_client:send(
ClientFoo, "SEND", [{"destination", AuthorisedTopic}], ["authorised hello"]),
{ok, _Client1, _, Body} = stomp_receive(ClientFoo, "MESSAGE"),
[<<"authorised hello">>] = Body,
%% subscribe to restricted topic
rabbit_stomp_client:send(
ClientFoo, "SUBSCRIBE", [{"destination", RestrictedTopic}]),
{ok, _Client2, Hdrs2, _} = stomp_receive(ClientFoo, "ERROR"),
"access_refused" = proplists:get_value("message", Hdrs2),
ok.
change_default_topic_exchange(Config) ->
Channel = ?config(amqp_channel, Config),
Version = ?config(version, Config),
StompPort = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_stomp),
Ex = <<"my-topic-exchange">>,
ok = rabbit_ct_broker_helpers:rpc(Config, 0, application, set_env, [rabbitmq_stomp, default_topic_exchange, Ex]),
{ok, ClientFoo} = rabbit_stomp_client:connect(Version, StompPort),
AuthorisedTopic = "/topic/user.AuthorisedTopic",
Declare = #'exchange.declare'{exchange = Ex, type = <<"topic">>},
#'exchange.declare_ok'{} = amqp_channel:call(Channel, Declare),
0 = length(rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_binding, list_for_source, [#resource{virtual_host= <<"/">>, kind = exchange, name = Ex}])),
rabbit_stomp_client:send(
ClientFoo, "SUBSCRIBE", [{"destination", AuthorisedTopic}]),
timer:sleep(500),
1 = length(rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_binding, list_for_source, [#resource{virtual_host= <<"/">>, kind = exchange, name = Ex}])),
rabbit_stomp_client:send(
ClientFoo, "SEND", [{"destination", AuthorisedTopic}], ["ohai there"]),
{ok, _Client1, _, Body} = stomp_receive(ClientFoo, "MESSAGE"),
[<<"ohai there">>] = Body,
Delete = #'exchange.delete'{exchange = Ex},
#'exchange.delete_ok'{} = amqp_channel:call(Channel, Delete),
ok = rabbit_ct_broker_helpers:rpc(Config, 0, application, unset_env, [rabbitmq_stomp, default_topic_exchange]),
rabbit_stomp_client:disconnect(ClientFoo),
ok.
stomp_receive(Client, Command) ->
{#stomp_frame{command = Command,
headers = Hdrs,
body_iolist = Body}, Client1} =
rabbit_stomp_client:recv(Client),
{ok, Client1, Hdrs, Body}.