Add topic authorisation for consumption
Part of rabbitmq/rabbitmq-server#1085
This commit is contained in:
parent
a110419d36
commit
96aaf35900
|
|
@ -804,7 +804,7 @@ check_subscribe([#mqtt_topic{name = TopicName} | Topics], Fn, PState) ->
|
||||||
_ -> {err, unauthorized, PState}
|
_ -> {err, unauthorized, PState}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
check_topic_access(TopicName, write = Access,
|
check_topic_access(TopicName, Access,
|
||||||
#proc_state{
|
#proc_state{
|
||||||
auth_state = #auth_state{user = User,
|
auth_state = #auth_state{user = User,
|
||||||
vhost = VHost},
|
vhost = VHost},
|
||||||
|
|
@ -823,24 +823,7 @@ check_topic_access(TopicName, write = Access,
|
||||||
_:Error ->
|
_:Error ->
|
||||||
rabbit_log:error("~p~n", [Error]),
|
rabbit_log:error("~p~n", [Error]),
|
||||||
{error, access_refused}
|
{error, access_refused}
|
||||||
end;
|
end.
|
||||||
check_topic_access(TopicName, read = Access,
|
|
||||||
#proc_state{
|
|
||||||
auth_state = #auth_state{user = User,
|
|
||||||
vhost = VHost}}) ->
|
|
||||||
Resource = #resource{virtual_host = VHost,
|
|
||||||
kind = topic,
|
|
||||||
name = TopicName},
|
|
||||||
try rabbit_access_control:check_resource_access(User, Resource, Access) of
|
|
||||||
R -> R
|
|
||||||
catch
|
|
||||||
_:{amqp_error, access_refused, Msg, _} ->
|
|
||||||
rabbit_log:error("operation resulted in an error (access_refused): ~p~n", [Msg]),
|
|
||||||
{error, access_refused};
|
|
||||||
_:Error ->
|
|
||||||
rabbit_log:error("~p~n", [Error]),
|
|
||||||
{error, access_refused}
|
|
||||||
end.
|
|
||||||
|
|
||||||
info(consumer_tags, #proc_state{consumer_tags = Val}) -> Val;
|
info(consumer_tags, #proc_state{consumer_tags = Val}) -> Val;
|
||||||
info(unacked_pubs, #proc_state{unacked_pubs = Val}) -> Val;
|
info(unacked_pubs, #proc_state{unacked_pubs = Val}) -> Val;
|
||||||
|
|
|
||||||
|
|
@ -81,7 +81,10 @@ init_per_testcase(Testcase, Config) ->
|
||||||
User = "O=client,CN=" ++ Hostname,
|
User = "O=client,CN=" ++ Hostname,
|
||||||
{ok,_} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, ["add_user", User, ""]),
|
{ok,_} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, ["add_user", User, ""]),
|
||||||
{ok, _} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, ["set_permissions", "-p", "/", User, ".*", ".*", ".*"]),
|
{ok, _} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, ["set_permissions", "-p", "/", User, ".*", ".*", ".*"]),
|
||||||
{ok, _} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, ["set_topic_permissions", "-p", "/", "guest", "amq.topic", "test-topic|test-retained-topic|.*mid.*|.*topic.*"]),
|
{ok, _} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0,
|
||||||
|
["set_topic_permissions", "-p", "/", "guest", "amq.topic",
|
||||||
|
"test-topic|test-retained-topic|.*mid.*|.*topic.*",
|
||||||
|
"test-topic|test-retained-topic|.*mid.*|.*topic.*"]),
|
||||||
rabbit_ct_helpers:testcase_started(Config, Testcase).
|
rabbit_ct_helpers:testcase_started(Config, Testcase).
|
||||||
|
|
||||||
end_per_testcase(Testcase, Config) ->
|
end_per_testcase(Testcase, Config) ->
|
||||||
|
|
|
||||||
|
|
@ -665,7 +665,7 @@ public class MqttTest implements MqttCallback {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test public void topicAuthorisation() throws Exception {
|
@Test public void topicAuthorisationPublish() throws Exception {
|
||||||
client.connect(conOpt);
|
client.connect(conOpt);
|
||||||
client.setCallback(this);
|
client.setCallback(this);
|
||||||
client.subscribe("some/topic");
|
client.subscribe("some/topic");
|
||||||
|
|
@ -681,6 +681,20 @@ public class MqttTest implements MqttCallback {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test public void topicAuthorisationSubscribe() throws Exception {
|
||||||
|
client.connect(conOpt);
|
||||||
|
client.setCallback(this);
|
||||||
|
client.subscribe("some/topic");
|
||||||
|
try {
|
||||||
|
client.subscribe("forbidden");
|
||||||
|
fail("Subscribing to a forbidden topic, an exception should have been thrown");
|
||||||
|
client.disconnect();
|
||||||
|
} catch(Exception e) {
|
||||||
|
// OK
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test public void interopM2A() throws MqttException, IOException, InterruptedException, TimeoutException {
|
@Test public void interopM2A() throws MqttException, IOException, InterruptedException, TimeoutException {
|
||||||
setUpAmqp();
|
setUpAmqp();
|
||||||
String queue = ch.queueDeclare().getQueue();
|
String queue = ch.queueDeclare().getQueue();
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue