diff --git a/deps/rabbitmq_mqtt/test/retainer_SUITE.erl b/deps/rabbitmq_mqtt/test/retainer_SUITE.erl index 82056f24d1..e7ed6dfa5a 100644 --- a/deps/rabbitmq_mqtt/test/retainer_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/retainer_SUITE.erl @@ -12,7 +12,7 @@ groups() -> [ {non_parallel_tests, [], [ coerce_configuration_data, - should_translate_amqp2mqtt_on_publish, + should_translate_amqp2mqtt_on_publish, should_translate_amqp2mqtt_on_retention, should_translate_amqp2mqtt_on_retention_search ]} @@ -71,12 +71,7 @@ end_per_testcase(Testcase, Config) -> coerce_configuration_data(Config) -> P = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt), - {ok, C} = emqttc:start_link([{host, "localhost"}, - {port, P}, - {client_id, <<"simpleClientRetainer">>}, - {proto_ver, 3}, - {logger, info}, - {puback_timeout, 1}]), + {ok, C} = emqttc:start_link(connection_opts(P)), emqttc:subscribe(C, <<"TopicA">>, qos0), emqttc:publish(C, <<"TopicA">>, <<"Payload">>), @@ -86,10 +81,10 @@ coerce_configuration_data(Config) -> ok. expect_publishes(_Topic, []) -> ok; -expect_publishes(Topic, [Payload|Rest]) -> +expect_publishes(Topic, [Payload | Rest]) -> receive {publish, Topic, Payload} -> expect_publishes(Topic, Rest) - after 500 -> + after 1500 -> throw({publish_not_delivered, Payload}) end. @@ -100,17 +95,12 @@ expect_publishes(Topic, [Payload|Rest]) -> %% ------------------------------------------------------------------- should_translate_amqp2mqtt_on_publish(Config) -> P = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt), - {ok, C} = emqttc:start_link([{host, "localhost"}, - {port, P}, - {client_id, <<"simpleClientRetainer">>}, - {proto_ver,3}, - {logger, info}, - {puback_timeout, 1}]), - emqttc:subscribe(C, <<"TopicA/Device.Field">>, qos1), - emqttc:publish(C,<<"TopicA/Device.Field">>, <<"Payload">>, [{retain,true}]), - expect_publishes(<<"TopicA/Device/Field">>, [<<"Payload">>]), - emqttc:disconnect(C), - ok. + {ok, C} = emqttc:start_link(connection_opts(P)), + %% there's an active consumer + emqttc:subscribe(C, <<"TopicA/Device.Field">>, qos1), + emqttc:publish(C, <<"TopicA/Device.Field">>, <<"Payload">>, [{retain, true}]), + expect_publishes(<<"TopicA/Device/Field">>, [<<"Payload">>]), + emqttc:disconnect(C). %% ------------------------------------------------------------------- %% If a client is publishes a retained message to TopicA/Device.Field and another @@ -119,33 +109,31 @@ should_translate_amqp2mqtt_on_publish(Config) -> %% ------------------------------------------------------------------- should_translate_amqp2mqtt_on_retention(Config) -> P = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt), - {ok, C} = emqttc:start_link([{host, "localhost"}, - {port, P}, - {client_id, <<"simpleClientRetainer">>}, - {proto_ver,3}, - {logger, info}, - {puback_timeout, 1}]), - emqttc:publish(C,<<"TopicA/Device.Field">>, <<"Payload">>, [{retain,true}]), - emqttc:subscribe(C, <<"TopicA/Device.Field">>, qos1), - expect_publishes(<<"TopicA/Device/Field">>, [<<"Payload">>]), - emqttc:disconnect(C), - ok. + {ok, C} = emqttc:start_link(connection_opts(P)), + %% publish with retain = true before a consumer comes around + emqttc:publish(C, <<"TopicA/Device.Field">>, <<"Payload">>, [{retain, true}]), + emqttc:subscribe(C, <<"TopicA/Device.Field">>, qos1), + expect_publishes(<<"TopicA/Device/Field">>, [<<"Payload">>]), + emqttc:disconnect(C). %% ------------------------------------------------------------------- %% If a client is publishes a retained message to TopicA/Device.Field and another -%% client subscribes to TopicA/Device.Field the client should be +%% client subscribes to TopicA/Device/Field the client should be %% sent retained message for the translated topic (TopicA/Device/Field) %% ------------------------------------------------------------------- should_translate_amqp2mqtt_on_retention_search(Config) -> P = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt), - {ok, C} = emqttc:start_link([{host, "localhost"}, - {port, P}, - {client_id, <<"simpleClientRetainer">>}, - {proto_ver,3}, - {logger, info}, - {puback_timeout, 1}]), - emqttc:publish(C,<<"TopicA/Device.Field">>, <<"Payload">>, [{retain,true}]), + {ok, C} = emqttc:start_link(connection_opts(P)), + emqttc:publish(C, <<"TopicA/Device.Field">>, <<"Payload">>, [{retain, true}]), emqttc:subscribe(C, <<"TopicA/Device/Field">>, qos1), expect_publishes(<<"TopicA/Device/Field">>, [<<"Payload">>]), emqttc:disconnect(C), - ok. \ No newline at end of file + ok. + +connection_opts(Port) -> + [{host, "localhost"}, + {port, Port}, + {client_id, <<"simpleClientRetainer">>}, + {proto_ver,3}, + {logger, info}, + {puback_timeout, 1}].