Cosmetics

This commit is contained in:
Michael Klishin 2019-02-03 19:43:44 +03:00
parent 08f97b9e6f
commit 9caf40afa1
1 changed files with 28 additions and 40 deletions

View File

@ -12,7 +12,7 @@ groups() ->
[
{non_parallel_tests, [], [
coerce_configuration_data,
should_translate_amqp2mqtt_on_publish,
should_translate_amqp2mqtt_on_publish,
should_translate_amqp2mqtt_on_retention,
should_translate_amqp2mqtt_on_retention_search
]}
@ -71,12 +71,7 @@ end_per_testcase(Testcase, Config) ->
coerce_configuration_data(Config) ->
P = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt),
{ok, C} = emqttc:start_link([{host, "localhost"},
{port, P},
{client_id, <<"simpleClientRetainer">>},
{proto_ver, 3},
{logger, info},
{puback_timeout, 1}]),
{ok, C} = emqttc:start_link(connection_opts(P)),
emqttc:subscribe(C, <<"TopicA">>, qos0),
emqttc:publish(C, <<"TopicA">>, <<"Payload">>),
@ -86,10 +81,10 @@ coerce_configuration_data(Config) ->
ok.
expect_publishes(_Topic, []) -> ok;
expect_publishes(Topic, [Payload|Rest]) ->
expect_publishes(Topic, [Payload | Rest]) ->
receive
{publish, Topic, Payload} -> expect_publishes(Topic, Rest)
after 500 ->
after 1500 ->
throw({publish_not_delivered, Payload})
end.
@ -100,17 +95,12 @@ expect_publishes(Topic, [Payload|Rest]) ->
%% -------------------------------------------------------------------
should_translate_amqp2mqtt_on_publish(Config) ->
P = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt),
{ok, C} = emqttc:start_link([{host, "localhost"},
{port, P},
{client_id, <<"simpleClientRetainer">>},
{proto_ver,3},
{logger, info},
{puback_timeout, 1}]),
emqttc:subscribe(C, <<"TopicA/Device.Field">>, qos1),
emqttc:publish(C,<<"TopicA/Device.Field">>, <<"Payload">>, [{retain,true}]),
expect_publishes(<<"TopicA/Device/Field">>, [<<"Payload">>]),
emqttc:disconnect(C),
ok.
{ok, C} = emqttc:start_link(connection_opts(P)),
%% there's an active consumer
emqttc:subscribe(C, <<"TopicA/Device.Field">>, qos1),
emqttc:publish(C, <<"TopicA/Device.Field">>, <<"Payload">>, [{retain, true}]),
expect_publishes(<<"TopicA/Device/Field">>, [<<"Payload">>]),
emqttc:disconnect(C).
%% -------------------------------------------------------------------
%% If a client is publishes a retained message to TopicA/Device.Field and another
@ -119,33 +109,31 @@ should_translate_amqp2mqtt_on_publish(Config) ->
%% -------------------------------------------------------------------
should_translate_amqp2mqtt_on_retention(Config) ->
P = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt),
{ok, C} = emqttc:start_link([{host, "localhost"},
{port, P},
{client_id, <<"simpleClientRetainer">>},
{proto_ver,3},
{logger, info},
{puback_timeout, 1}]),
emqttc:publish(C,<<"TopicA/Device.Field">>, <<"Payload">>, [{retain,true}]),
emqttc:subscribe(C, <<"TopicA/Device.Field">>, qos1),
expect_publishes(<<"TopicA/Device/Field">>, [<<"Payload">>]),
emqttc:disconnect(C),
ok.
{ok, C} = emqttc:start_link(connection_opts(P)),
%% publish with retain = true before a consumer comes around
emqttc:publish(C, <<"TopicA/Device.Field">>, <<"Payload">>, [{retain, true}]),
emqttc:subscribe(C, <<"TopicA/Device.Field">>, qos1),
expect_publishes(<<"TopicA/Device/Field">>, [<<"Payload">>]),
emqttc:disconnect(C).
%% -------------------------------------------------------------------
%% If a client is publishes a retained message to TopicA/Device.Field and another
%% client subscribes to TopicA/Device.Field the client should be
%% client subscribes to TopicA/Device/Field the client should be
%% sent retained message for the translated topic (TopicA/Device/Field)
%% -------------------------------------------------------------------
should_translate_amqp2mqtt_on_retention_search(Config) ->
P = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt),
{ok, C} = emqttc:start_link([{host, "localhost"},
{port, P},
{client_id, <<"simpleClientRetainer">>},
{proto_ver,3},
{logger, info},
{puback_timeout, 1}]),
emqttc:publish(C,<<"TopicA/Device.Field">>, <<"Payload">>, [{retain,true}]),
{ok, C} = emqttc:start_link(connection_opts(P)),
emqttc:publish(C, <<"TopicA/Device.Field">>, <<"Payload">>, [{retain, true}]),
emqttc:subscribe(C, <<"TopicA/Device/Field">>, qos1),
expect_publishes(<<"TopicA/Device/Field">>, [<<"Payload">>]),
emqttc:disconnect(C),
ok.
ok.
connection_opts(Port) ->
[{host, "localhost"},
{port, Port},
{client_id, <<"simpleClientRetainer">>},
{proto_ver,3},
{logger, info},
{puback_timeout, 1}].