Add test AMQP 0.9.1 to MQTT with QoS 0

This commit is contained in:
David Ansari 2023-01-05 15:29:42 +00:00
parent fb93a3c17d
commit 9283b4f4f6
1 changed files with 19 additions and 5 deletions

View File

@ -101,6 +101,7 @@ tests() ->
,subscribe_multiple
,large_message_mqtt_to_mqtt
,large_message_amqp_to_mqtt
,amqp_to_mqtt_qos0
,keepalive
,keepalive_turned_off
,duplicate_client_id
@ -279,7 +280,7 @@ will_without_disconnect(Config) ->
ok = emqtt:disconnect(Sub).
quorum_queue_rejects(Config) ->
{_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
{_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
Name = atom_to_binary(?FUNCTION_NAME),
ok = rabbit_ct_broker_helpers:set_policy(
@ -314,7 +315,7 @@ publish_to_all_queue_types_qos1(Config) ->
publish_to_all_queue_types(Config, qos1).
publish_to_all_queue_types(Config, QoS) ->
Ch = rabbit_ct_client_helpers:open_channel(Config, 0),
Ch = rabbit_ct_client_helpers:open_channel(Config),
CQ = <<"classic-queue">>,
CMQ = <<"classic-mirrored-queue">>,
@ -390,7 +391,7 @@ flow(Config, {App, Par, Val}, QueueType)
Result = rpc_all(Config, application, set_env, [App, Par, Val]),
?assert(lists:all(fun(R) -> R =:= ok end, Result)),
Ch = rabbit_ct_client_helpers:open_channel(Config, 0),
Ch = rabbit_ct_client_helpers:open_channel(Config),
QueueName = Topic = atom_to_binary(?FUNCTION_NAME),
declare_queue(Ch, QueueName, [{<<"x-queue-type">>, longstr, QueueType}]),
bind(Ch, QueueName, Topic),
@ -696,7 +697,7 @@ consuming_classic_queue_down(Config) ->
ok.
delete_create_queue(Config) ->
Ch = rabbit_ct_client_helpers:open_channel(Config, 0),
Ch = rabbit_ct_client_helpers:open_channel(Config),
CQ1 = <<"classic-queue-1-delete-create">>,
CQ2 = <<"classic-queue-2-delete-create">>,
QQ = <<"quorum-queue-delete-create">>,
@ -857,7 +858,7 @@ large_message_amqp_to_mqtt(Config) ->
C = connect(ClientId, Config),
{ok, _, [1]} = emqtt:subscribe(C, {Topic, qos1}),
Ch = rabbit_ct_client_helpers:open_channel(Config, 0),
Ch = rabbit_ct_client_helpers:open_channel(Config),
Payload0 = binary:copy(<<"x">>, 8_000_000),
Payload = <<Payload0/binary, "y">>,
amqp_channel:call(Ch,
@ -867,6 +868,19 @@ large_message_amqp_to_mqtt(Config) ->
ok = expect_publishes(C, Topic, [Payload]),
ok = emqtt:disconnect(C).
amqp_to_mqtt_qos0(Config) ->
Topic = ClientId = Payload = atom_to_binary(?FUNCTION_NAME),
C = connect(ClientId, Config),
{ok, _, [0]} = emqtt:subscribe(C, {Topic, qos0}),
Ch = rabbit_ct_client_helpers:open_channel(Config),
amqp_channel:call(Ch,
#'basic.publish'{exchange = <<"amq.topic">>,
routing_key = Topic},
#amqp_msg{payload = Payload}),
ok = expect_publishes(C, Topic, [Payload]),
ok = emqtt:disconnect(C).
%% Packet identifier is a non zero two byte integer.
%% Test that the server wraps around the packet identifier.
many_qos1_messages(Config) ->