rabbitmq-server/deps/rabbit/test/unicode_SUITE.erl

114 lines
4.4 KiB
Erlang

-module(unicode_SUITE).
-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
-compile(export_all).
%% Unicode U+1F407
-define(UNICODE_STRING, "bunny🐇bunny").
all() ->
[
{group, queues}
].
groups() ->
[
{queues, [], [
classic_queue_v2,
quorum_queue,
stream
]}
].
%% -------------------------------------------------------------------
%% Testsuite setup/teardown.
%% -------------------------------------------------------------------
init_per_suite(Config) ->
rabbit_ct_helpers:log_environment(),
rabbit_ct_helpers:run_setup_steps(Config).
end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config).
init_per_group(Group, Config0) ->
PrivDir0 = ?config(priv_dir, Config0),
PrivDir = filename:join(PrivDir0, ?UNICODE_STRING),
ok = file:make_dir(PrivDir),
Config = rabbit_ct_helpers:set_config(Config0, [{priv_dir, PrivDir},
{rmq_nodename_suffix, Group}]),
rabbit_ct_helpers:run_steps(Config,
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps()
).
end_per_group(_, Config) ->
rabbit_ct_helpers:run_steps(Config,
rabbit_ct_client_helpers:teardown_steps() ++
rabbit_ct_broker_helpers:teardown_steps()).
init_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_started(Config, Testcase).
end_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_finished(Config, Testcase).
classic_queue_v2(Config) ->
ok = queue(Config, ?FUNCTION_NAME, []).
quorum_queue(Config) ->
ok = queue(Config, ?FUNCTION_NAME, [{<<"x-queue-type">>, longstr, <<"quorum">>}]).
queue(Config, QName0, Args) ->
QName1 = rabbit_data_coercion:to_binary(QName0),
QName = <<QName1/binary, ?UNICODE_STRING/utf8>>,
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
amqp_channel:call(Ch, #'queue.declare'{queue = QName,
durable = true,
arguments = Args
}),
rabbit_ct_client_helpers:publish(Ch, QName, 1),
{#'basic.get_ok'{}, #amqp_msg{payload = <<"1">>}} =
amqp_channel:call(Ch, #'basic.get'{queue = QName, no_ack = false}),
{'queue.delete_ok', 0} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
ok.
stream(Config) ->
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
ConsumerTag = QName0 = atom_to_binary(?FUNCTION_NAME),
QName = <<QName0/binary, ?UNICODE_STRING/utf8>>,
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
amqp_channel:call(Ch, #'queue.declare'{queue = QName,
durable = true,
arguments = [{<<"x-queue-type">>, longstr, <<"stream">>}]
}),
rabbit_ct_client_helpers:publish(Ch, QName, 1),
?assertMatch(#'basic.qos_ok'{},
amqp_channel:call(Ch, #'basic.qos'{global = false,
prefetch_count = 1})),
amqp_channel:subscribe(Ch,
#'basic.consume'{queue = QName,
no_ack = false,
consumer_tag = ConsumerTag,
arguments = [{<<"x-stream-offset">>, long, 0}]},
self()),
receive
#'basic.consume_ok'{consumer_tag = ConsumerTag} ->
ok
end,
DelTag = receive
{#'basic.deliver'{delivery_tag = DeliveryTag}, _} ->
DeliveryTag
after 30_000 ->
ct:fail(timeout)
end,
ok = amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DelTag,
multiple = false}),
amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = ConsumerTag}),
{'queue.delete_ok', 0} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
ok.