diff --git a/deps/rabbitmq_stream_management/src/rabbit_stream_consumers_mgmt.erl b/deps/rabbitmq_stream_management/src/rabbit_stream_consumers_mgmt.erl index 753ebdad54..4fc11f811a 100644 --- a/deps/rabbitmq_stream_management/src/rabbit_stream_consumers_mgmt.erl +++ b/deps/rabbitmq_stream_management/src/rabbit_stream_consumers_mgmt.erl @@ -54,8 +54,9 @@ to_json(ReqData, Context = #context{user = User}) -> is_authorized(ReqData, Context) -> rabbit_mgmt_util:is_authorized(ReqData, Context). -filter_user(List, #user{username = _Username, tags = Tags}) -> +filter_user(List, #user{username = Username, tags = Tags}) -> case rabbit_mgmt_util:is_monitor(Tags) of true -> List; - false -> List + false -> [I || I <- List, + rabbit_misc:pget(user, rabbit_misc:pget(connection_details, I)) == Username] end. diff --git a/deps/rabbitmq_stream_management/test/http_SUITE_data/src/test/java/com/rabbitmq/stream/HttpTest.java b/deps/rabbitmq_stream_management/test/http_SUITE_data/src/test/java/com/rabbitmq/stream/HttpTest.java index ab77b82a60..37fb89eaf3 100644 --- a/deps/rabbitmq_stream_management/test/http_SUITE_data/src/test/java/com/rabbitmq/stream/HttpTest.java +++ b/deps/rabbitmq_stream_management/test/http_SUITE_data/src/test/java/com/rabbitmq/stream/HttpTest.java @@ -101,6 +101,16 @@ public class HttpTest { return format("127.0.0.1:%d -> 127.0.0.1:%d", localAddress.getPort(), remoteAddress.getPort()); } + static List> consumers(List> consumers, Client client) { + String connectionName = connectionName(client); + return consumers.stream() + .filter( + c -> + c.get("connection_details") instanceof Map + && connectionName.equals(((Map) c.get("connection_details")).get("name"))) + .collect(Collectors.toList()); + } + @Test void connections() throws Exception { Callable>> request = () -> toMaps(get("/stream/connections")); @@ -181,16 +191,6 @@ public class HttpTest { waitUntil(() -> consumers(request.call(), client).isEmpty()); } - static List> consumers(List> consumers, Client client) { - String connectionName = connectionName(client); - return consumers.stream() - .filter( - c -> - c.get("connection_details") instanceof Map - && connectionName.equals(((Map) c.get("connection_details")).get("name"))) - .collect(Collectors.toList()); - } - @Test void permissions() throws Exception { String[][] vhostsUsers = @@ -257,75 +257,58 @@ public class HttpTest { .findFirst() .get(); - class TestConfiguration { - final String user; - final String endpoint; - final Map connectionRequests; - final Map vhostConnections; - - TestConfiguration( - String user, String endpoint, Object[] connectionRequests, Object... vhostConnections) { - this.user = user; - this.endpoint = endpoint; - this.connectionRequests = new LinkedHashMap<>(connectionRequests.length / 2); - for (int i = 0; i < connectionRequests.length; i = i + 2) { - this.connectionRequests.put( - connectionRequests[i].toString(), (Integer) connectionRequests[i + 1]); - } - this.vhostConnections = new LinkedHashMap<>(); - for (int i = 0; i < vhostConnections.length; i = i + 2) { - this.vhostConnections.put( - vhostConnections[i].toString(), (Boolean) vhostConnections[i + 1]); - } - } - } - - TestConfiguration[] testConfigurations = - new TestConfiguration[] { - new TestConfiguration( + PermissionsTestConfiguration[] testConfigurations = + new PermissionsTestConfiguration[] { + new PermissionsTestConfiguration( "guest", "/connections", - new Object[] {"", 5, "/%2f", 1, "/vh1", 2, "/vh2", 2}, + requests(r("", 5), r("/%2f", 1), r("/vh1", 2), r("/vh2", 2)), "vh1/" + vhost1ConnectionName, true, "vh2/" + vhost2ConnectionName, true), - new TestConfiguration( + new PermissionsTestConfiguration( "user-monitoring", "/connections", - new Object[] {"", 5, "/%2f", 1, "/vh1", 2, "/vh2", 2}, + requests(r("", 5), r("/%2f", 1), r("/vh1", 2), r("/vh2", 2)), "vh1/" + vhost1ConnectionName, true, "vh2/" + vhost2ConnectionName, true), - new TestConfiguration( + new PermissionsTestConfiguration( "user-management", "/connections", - new Object[] {"", 2, "/%2f", -1, "/vh1", 2, "/vh2", -1}, + requests(r("", 2), r("/%2f", -1), r("/vh1", 2), r("/vh2", -1)), "vh1/" + vhost1ConnectionName, true, "vh2/" + vhost2ConnectionName, false), - new TestConfiguration( + new PermissionsTestConfiguration( "guest", "/consumers", - new Object[] {"", vhostsUsers.length * consumersPerConnection}), - new TestConfiguration( - "guest", "/consumers", new Object[] {"/%2f", consumersPerConnection}), - new TestConfiguration( - "guest", "/consumers", new Object[] {"/vh1", consumersPerConnection * 2}), + requests( + r("", vhostsUsers.length * consumersPerConnection), + r("/%2f", consumersPerConnection), + r("/vh1", consumersPerConnection * 2))), + new PermissionsTestConfiguration( + "user-management", + "/consumers", + requests( + r("", consumersPerConnection * 2), // only their connections + r("/vh1", consumersPerConnection * 2), + r("/vh2", 0))) }; - for (TestConfiguration configuration : testConfigurations) { + for (PermissionsTestConfiguration configuration : testConfigurations) { OkHttpClient client = httpClient(configuration.user); - for (Entry request : configuration.connectionRequests.entrySet()) { - if (request.getValue() >= 0) { - assertThat(toMaps(get(client, "/stream" + configuration.endpoint + request.getKey()))) - .hasSize(request.getValue()); + for (TestRequest request : configuration.requests) { + if (request.expectedCount >= 0) { + assertThat(toMaps(get(client, "/stream" + configuration.endpoint + request.endpoint))) + .hasSize(request.expectedCount); } else { assertThatThrownBy( () -> - toMaps(get(client, "/stream" + configuration.endpoint + request.getKey()))) + toMaps(get(client, "/stream" + configuration.endpoint + request.endpoint))) .hasMessageContaining("401"); } } @@ -353,4 +336,41 @@ public class HttpTest { }); } } + + static class PermissionsTestConfiguration { + final String user; + final String endpoint; + final TestRequest[] requests; + final Map vhostConnections; + + PermissionsTestConfiguration( + String user, String endpoint, TestRequest[] requests, Object... vhostConnections) { + this.user = user; + this.endpoint = endpoint; + this.requests = requests; + this.vhostConnections = new LinkedHashMap<>(); + for (int i = 0; i < vhostConnections.length; i = i + 2) { + this.vhostConnections.put( + vhostConnections[i].toString(), (Boolean) vhostConnections[i + 1]); + } + } + } + + static TestRequest[] requests(TestRequest... requests) { + return requests; + } + + static class TestRequest { + final String endpoint; + final int expectedCount; + + TestRequest(String endpoint, int expectedCount) { + this.endpoint = endpoint; + this.expectedCount = expectedCount; + } + } + + static TestRequest r(String endpoint, int expectedCount) { + return new TestRequest(endpoint, expectedCount); + } }