From 9283b4f4f61b18907bf9a86b7e1493fd2de9c097 Mon Sep 17 00:00:00 2001 From: David Ansari Date: Thu, 5 Jan 2023 15:29:42 +0000 Subject: [PATCH] Add test AMQP 0.9.1 to MQTT with QoS 0 --- deps/rabbitmq_mqtt/test/shared_SUITE.erl | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/deps/rabbitmq_mqtt/test/shared_SUITE.erl b/deps/rabbitmq_mqtt/test/shared_SUITE.erl index b58ed3a6de..baad5ce9cf 100644 --- a/deps/rabbitmq_mqtt/test/shared_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/shared_SUITE.erl @@ -101,6 +101,7 @@ tests() -> ,subscribe_multiple ,large_message_mqtt_to_mqtt ,large_message_amqp_to_mqtt + ,amqp_to_mqtt_qos0 ,keepalive ,keepalive_turned_off ,duplicate_client_id @@ -279,7 +280,7 @@ will_without_disconnect(Config) -> ok = emqtt:disconnect(Sub). quorum_queue_rejects(Config) -> - {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), Name = atom_to_binary(?FUNCTION_NAME), ok = rabbit_ct_broker_helpers:set_policy( @@ -314,7 +315,7 @@ publish_to_all_queue_types_qos1(Config) -> publish_to_all_queue_types(Config, qos1). publish_to_all_queue_types(Config, QoS) -> - Ch = rabbit_ct_client_helpers:open_channel(Config, 0), + Ch = rabbit_ct_client_helpers:open_channel(Config), CQ = <<"classic-queue">>, CMQ = <<"classic-mirrored-queue">>, @@ -390,7 +391,7 @@ flow(Config, {App, Par, Val}, QueueType) Result = rpc_all(Config, application, set_env, [App, Par, Val]), ?assert(lists:all(fun(R) -> R =:= ok end, Result)), - Ch = rabbit_ct_client_helpers:open_channel(Config, 0), + Ch = rabbit_ct_client_helpers:open_channel(Config), QueueName = Topic = atom_to_binary(?FUNCTION_NAME), declare_queue(Ch, QueueName, [{<<"x-queue-type">>, longstr, QueueType}]), bind(Ch, QueueName, Topic), @@ -696,7 +697,7 @@ consuming_classic_queue_down(Config) -> ok. delete_create_queue(Config) -> - Ch = rabbit_ct_client_helpers:open_channel(Config, 0), + Ch = rabbit_ct_client_helpers:open_channel(Config), CQ1 = <<"classic-queue-1-delete-create">>, CQ2 = <<"classic-queue-2-delete-create">>, QQ = <<"quorum-queue-delete-create">>, @@ -857,7 +858,7 @@ large_message_amqp_to_mqtt(Config) -> C = connect(ClientId, Config), {ok, _, [1]} = emqtt:subscribe(C, {Topic, qos1}), - Ch = rabbit_ct_client_helpers:open_channel(Config, 0), + Ch = rabbit_ct_client_helpers:open_channel(Config), Payload0 = binary:copy(<<"x">>, 8_000_000), Payload = <>, amqp_channel:call(Ch, @@ -867,6 +868,19 @@ large_message_amqp_to_mqtt(Config) -> ok = expect_publishes(C, Topic, [Payload]), ok = emqtt:disconnect(C). +amqp_to_mqtt_qos0(Config) -> + Topic = ClientId = Payload = atom_to_binary(?FUNCTION_NAME), + C = connect(ClientId, Config), + {ok, _, [0]} = emqtt:subscribe(C, {Topic, qos0}), + + Ch = rabbit_ct_client_helpers:open_channel(Config), + amqp_channel:call(Ch, + #'basic.publish'{exchange = <<"amq.topic">>, + routing_key = Topic}, + #amqp_msg{payload = Payload}), + ok = expect_publishes(C, Topic, [Payload]), + ok = emqtt:disconnect(C). + %% Packet identifier is a non zero two byte integer. %% Test that the server wraps around the packet identifier. many_qos1_messages(Config) ->