Add client_id as an topic authz expandable variable
References rabbitmq/rabbitmq-server#1229
This commit is contained in:
parent
cd13024d6e
commit
81ab03c133
|
|
@ -79,12 +79,20 @@ process_request(?CONNECT,
|
||||||
clean_sess = CleanSess,
|
clean_sess = CleanSess,
|
||||||
client_id = ClientId0,
|
client_id = ClientId0,
|
||||||
keep_alive = Keepalive} = Var},
|
keep_alive = Keepalive} = Var},
|
||||||
PState = #proc_state{ ssl_login_name = SSLLoginName,
|
PState0 = #proc_state{ ssl_login_name = SSLLoginName,
|
||||||
send_fun = SendFun }) ->
|
send_fun = SendFun,
|
||||||
|
adapter_info = AdapterInfo = #amqp_adapter_info{additional_info = Extra} }) ->
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
ClientId = case ClientId0 of
|
ClientId = case ClientId0 of
|
||||||
[] -> rabbit_mqtt_util:gen_client_id();
|
[] -> rabbit_mqtt_util:gen_client_id();
|
||||||
[_|_] -> ClientId0
|
[_|_] -> ClientId0
|
||||||
end,
|
end,
|
||||||
|
AdapterInfo1 = AdapterInfo#amqp_adapter_info{additional_info = [
|
||||||
|
{variable_map, #{<<"client_id">> => rabbit_data_coercion:to_binary(ClientId)}}
|
||||||
|
| Extra]},
|
||||||
|
PState = PState0#proc_state{adapter_info = AdapterInfo1},
|
||||||
{Return, PState1} =
|
{Return, PState1} =
|
||||||
case {lists:member(ProtoVersion, proplists:get_keys(?PROTOCOL_NAMES)),
|
case {lists:member(ProtoVersion, proplists:get_keys(?PROTOCOL_NAMES)),
|
||||||
ClientId0 =:= [] andalso CleanSess =:= false} of
|
ClientId0 =:= [] andalso CleanSess =:= false} of
|
||||||
|
|
@ -819,24 +827,32 @@ check_subscribe([#mqtt_topic{name = TopicName} | Topics], Fn, PState) ->
|
||||||
|
|
||||||
check_topic_access(TopicName, Access,
|
check_topic_access(TopicName, Access,
|
||||||
#proc_state{
|
#proc_state{
|
||||||
auth_state = #auth_state{user = User,
|
auth_state = #auth_state{user = User = #user{username = Username},
|
||||||
vhost = VHost},
|
vhost = VHost},
|
||||||
exchange = Exchange}) ->
|
exchange = Exchange,
|
||||||
|
client_id = ClientId}) ->
|
||||||
Resource = #resource{virtual_host = VHost,
|
Resource = #resource{virtual_host = VHost,
|
||||||
kind = topic,
|
kind = topic,
|
||||||
name = Exchange},
|
name = Exchange},
|
||||||
Context = #{routing_key => rabbit_mqtt_util:mqtt2amqp(TopicName)},
|
|
||||||
|
|
||||||
try rabbit_access_control:check_topic_access(User, Resource, Access, Context) of
|
Context = #{routing_key => rabbit_mqtt_util:mqtt2amqp(TopicName),
|
||||||
R -> R
|
variable_map => #{
|
||||||
catch
|
<<"username">> => Username,
|
||||||
_:{amqp_error, access_refused, Msg, _} ->
|
<<"vhost">> => VHost,
|
||||||
rabbit_log:error("operation resulted in an error (access_refused): ~p~n", [Msg]),
|
<<"client_id">> => rabbit_data_coercion:to_binary(ClientId)
|
||||||
{error, access_refused};
|
}
|
||||||
_:Error ->
|
},
|
||||||
rabbit_log:error("~p~n", [Error]),
|
|
||||||
{error, access_refused}
|
try rabbit_access_control:check_topic_access(User, Resource, Access, Context) of
|
||||||
end.
|
R -> R
|
||||||
|
catch
|
||||||
|
_:{amqp_error, access_refused, Msg, _} ->
|
||||||
|
rabbit_log:error("operation resulted in an error (access_refused): ~p~n", [Msg]),
|
||||||
|
{error, access_refused};
|
||||||
|
_:Error ->
|
||||||
|
rabbit_log:error("~p~n", [Error]),
|
||||||
|
{error, access_refused}
|
||||||
|
end.
|
||||||
|
|
||||||
info(consumer_tags, #proc_state{consumer_tags = Val}) -> Val;
|
info(consumer_tags, #proc_state{consumer_tags = Val}) -> Val;
|
||||||
info(unacked_pubs, #proc_state{unacked_pubs = Val}) -> Val;
|
info(unacked_pubs, #proc_state{unacked_pubs = Val}) -> Val;
|
||||||
|
|
|
||||||
|
|
@ -83,8 +83,8 @@ init_per_testcase(Testcase, Config) ->
|
||||||
{ok, _} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, ["set_permissions", "-p", "/", User, ".*", ".*", ".*"]),
|
{ok, _} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, ["set_permissions", "-p", "/", User, ".*", ".*", ".*"]),
|
||||||
{ok, _} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0,
|
{ok, _} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0,
|
||||||
["set_topic_permissions", "-p", "/", "guest", "amq.topic",
|
["set_topic_permissions", "-p", "/", "guest", "amq.topic",
|
||||||
"test-topic|test-retained-topic|.*mid.*|.*topic.*",
|
"test-topic|test-retained-topic|.*mid.*|.*topic.*|{username}.{client_id}.a",
|
||||||
"test-topic|test-retained-topic|.*mid.*|.*topic.*|last-will"]),
|
"test-topic|test-retained-topic|.*mid.*|.*topic.*|last-will|{username}.{client_id}.a"]),
|
||||||
rabbit_ct_helpers:testcase_started(Config, Testcase).
|
rabbit_ct_helpers:testcase_started(Config, Testcase).
|
||||||
|
|
||||||
end_per_testcase(Testcase, Config) ->
|
end_per_testcase(Testcase, Config) ->
|
||||||
|
|
|
||||||
|
|
@ -11,7 +11,7 @@
|
||||||
// The Original Code is RabbitMQ.
|
// The Original Code is RabbitMQ.
|
||||||
//
|
//
|
||||||
// The Initial Developer of the Original Code is GoPivotal, Inc.
|
// The Initial Developer of the Original Code is GoPivotal, Inc.
|
||||||
// Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
|
// Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
|
||||||
//
|
//
|
||||||
|
|
||||||
package com.rabbitmq.mqtt.test;
|
package com.rabbitmq.mqtt.test;
|
||||||
|
|
@ -738,6 +738,23 @@ public class MqttTest implements MqttCallback {
|
||||||
client2.disconnect();
|
client2.disconnect();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test public void topicAuthorisationVariableExpansion() throws Exception {
|
||||||
|
client.connect(conOpt);
|
||||||
|
client.setCallback(this);
|
||||||
|
String topicWithExpandedVariables = "guest/" + clientId + "/a";
|
||||||
|
client.subscribe(topicWithExpandedVariables);
|
||||||
|
publish(client, topicWithExpandedVariables, 1, "content".getBytes());
|
||||||
|
waitAtMost(timeout).until(receivedMessagesSize(),equalTo(1));
|
||||||
|
assertTrue(client.isConnected());
|
||||||
|
try {
|
||||||
|
publish(client, "guest/WrongClientId/a", 1, "content".getBytes());
|
||||||
|
fail("Publishing on a forbidden topic, an exception should have been thrown");
|
||||||
|
client.disconnect();
|
||||||
|
} catch(Exception e) {
|
||||||
|
// OK
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test public void interopM2A() throws MqttException, IOException, InterruptedException, TimeoutException {
|
@Test public void interopM2A() throws MqttException, IOException, InterruptedException, TimeoutException {
|
||||||
setUpAmqp();
|
setUpAmqp();
|
||||||
String queue = ch.queueDeclare().getQueue();
|
String queue = ch.queueDeclare().getQueue();
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue