Filter records based on user in stream consumer endpoint
This commit is contained in:
parent
48b04e05a4
commit
c0779e6fa4
|
|
@ -54,8 +54,9 @@ to_json(ReqData, Context = #context{user = User}) ->
|
||||||
is_authorized(ReqData, Context) ->
|
is_authorized(ReqData, Context) ->
|
||||||
rabbit_mgmt_util:is_authorized(ReqData, Context).
|
rabbit_mgmt_util:is_authorized(ReqData, Context).
|
||||||
|
|
||||||
filter_user(List, #user{username = _Username, tags = Tags}) ->
|
filter_user(List, #user{username = Username, tags = Tags}) ->
|
||||||
case rabbit_mgmt_util:is_monitor(Tags) of
|
case rabbit_mgmt_util:is_monitor(Tags) of
|
||||||
true -> List;
|
true -> List;
|
||||||
false -> List
|
false -> [I || I <- List,
|
||||||
|
rabbit_misc:pget(user, rabbit_misc:pget(connection_details, I)) == Username]
|
||||||
end.
|
end.
|
||||||
|
|
|
||||||
|
|
@ -101,6 +101,16 @@ public class HttpTest {
|
||||||
return format("127.0.0.1:%d -> 127.0.0.1:%d", localAddress.getPort(), remoteAddress.getPort());
|
return format("127.0.0.1:%d -> 127.0.0.1:%d", localAddress.getPort(), remoteAddress.getPort());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static List<Map<String, Object>> consumers(List<Map<String, Object>> consumers, Client client) {
|
||||||
|
String connectionName = connectionName(client);
|
||||||
|
return consumers.stream()
|
||||||
|
.filter(
|
||||||
|
c ->
|
||||||
|
c.get("connection_details") instanceof Map
|
||||||
|
&& connectionName.equals(((Map) c.get("connection_details")).get("name")))
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void connections() throws Exception {
|
void connections() throws Exception {
|
||||||
Callable<List<Map<String, Object>>> request = () -> toMaps(get("/stream/connections"));
|
Callable<List<Map<String, Object>>> request = () -> toMaps(get("/stream/connections"));
|
||||||
|
|
@ -181,16 +191,6 @@ public class HttpTest {
|
||||||
waitUntil(() -> consumers(request.call(), client).isEmpty());
|
waitUntil(() -> consumers(request.call(), client).isEmpty());
|
||||||
}
|
}
|
||||||
|
|
||||||
static List<Map<String, Object>> consumers(List<Map<String, Object>> consumers, Client client) {
|
|
||||||
String connectionName = connectionName(client);
|
|
||||||
return consumers.stream()
|
|
||||||
.filter(
|
|
||||||
c ->
|
|
||||||
c.get("connection_details") instanceof Map
|
|
||||||
&& connectionName.equals(((Map) c.get("connection_details")).get("name")))
|
|
||||||
.collect(Collectors.toList());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void permissions() throws Exception {
|
void permissions() throws Exception {
|
||||||
String[][] vhostsUsers =
|
String[][] vhostsUsers =
|
||||||
|
|
@ -257,75 +257,58 @@ public class HttpTest {
|
||||||
.findFirst()
|
.findFirst()
|
||||||
.get();
|
.get();
|
||||||
|
|
||||||
class TestConfiguration {
|
PermissionsTestConfiguration[] testConfigurations =
|
||||||
final String user;
|
new PermissionsTestConfiguration[] {
|
||||||
final String endpoint;
|
new PermissionsTestConfiguration(
|
||||||
final Map<String, Integer> connectionRequests;
|
|
||||||
final Map<String, Boolean> vhostConnections;
|
|
||||||
|
|
||||||
TestConfiguration(
|
|
||||||
String user, String endpoint, Object[] connectionRequests, Object... vhostConnections) {
|
|
||||||
this.user = user;
|
|
||||||
this.endpoint = endpoint;
|
|
||||||
this.connectionRequests = new LinkedHashMap<>(connectionRequests.length / 2);
|
|
||||||
for (int i = 0; i < connectionRequests.length; i = i + 2) {
|
|
||||||
this.connectionRequests.put(
|
|
||||||
connectionRequests[i].toString(), (Integer) connectionRequests[i + 1]);
|
|
||||||
}
|
|
||||||
this.vhostConnections = new LinkedHashMap<>();
|
|
||||||
for (int i = 0; i < vhostConnections.length; i = i + 2) {
|
|
||||||
this.vhostConnections.put(
|
|
||||||
vhostConnections[i].toString(), (Boolean) vhostConnections[i + 1]);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
TestConfiguration[] testConfigurations =
|
|
||||||
new TestConfiguration[] {
|
|
||||||
new TestConfiguration(
|
|
||||||
"guest",
|
"guest",
|
||||||
"/connections",
|
"/connections",
|
||||||
new Object[] {"", 5, "/%2f", 1, "/vh1", 2, "/vh2", 2},
|
requests(r("", 5), r("/%2f", 1), r("/vh1", 2), r("/vh2", 2)),
|
||||||
"vh1/" + vhost1ConnectionName,
|
"vh1/" + vhost1ConnectionName,
|
||||||
true,
|
true,
|
||||||
"vh2/" + vhost2ConnectionName,
|
"vh2/" + vhost2ConnectionName,
|
||||||
true),
|
true),
|
||||||
new TestConfiguration(
|
new PermissionsTestConfiguration(
|
||||||
"user-monitoring",
|
"user-monitoring",
|
||||||
"/connections",
|
"/connections",
|
||||||
new Object[] {"", 5, "/%2f", 1, "/vh1", 2, "/vh2", 2},
|
requests(r("", 5), r("/%2f", 1), r("/vh1", 2), r("/vh2", 2)),
|
||||||
"vh1/" + vhost1ConnectionName,
|
"vh1/" + vhost1ConnectionName,
|
||||||
true,
|
true,
|
||||||
"vh2/" + vhost2ConnectionName,
|
"vh2/" + vhost2ConnectionName,
|
||||||
true),
|
true),
|
||||||
new TestConfiguration(
|
new PermissionsTestConfiguration(
|
||||||
"user-management",
|
"user-management",
|
||||||
"/connections",
|
"/connections",
|
||||||
new Object[] {"", 2, "/%2f", -1, "/vh1", 2, "/vh2", -1},
|
requests(r("", 2), r("/%2f", -1), r("/vh1", 2), r("/vh2", -1)),
|
||||||
"vh1/" + vhost1ConnectionName,
|
"vh1/" + vhost1ConnectionName,
|
||||||
true,
|
true,
|
||||||
"vh2/" + vhost2ConnectionName,
|
"vh2/" + vhost2ConnectionName,
|
||||||
false),
|
false),
|
||||||
new TestConfiguration(
|
new PermissionsTestConfiguration(
|
||||||
"guest",
|
"guest",
|
||||||
"/consumers",
|
"/consumers",
|
||||||
new Object[] {"", vhostsUsers.length * consumersPerConnection}),
|
requests(
|
||||||
new TestConfiguration(
|
r("", vhostsUsers.length * consumersPerConnection),
|
||||||
"guest", "/consumers", new Object[] {"/%2f", consumersPerConnection}),
|
r("/%2f", consumersPerConnection),
|
||||||
new TestConfiguration(
|
r("/vh1", consumersPerConnection * 2))),
|
||||||
"guest", "/consumers", new Object[] {"/vh1", consumersPerConnection * 2}),
|
new PermissionsTestConfiguration(
|
||||||
|
"user-management",
|
||||||
|
"/consumers",
|
||||||
|
requests(
|
||||||
|
r("", consumersPerConnection * 2), // only their connections
|
||||||
|
r("/vh1", consumersPerConnection * 2),
|
||||||
|
r("/vh2", 0)))
|
||||||
};
|
};
|
||||||
|
|
||||||
for (TestConfiguration configuration : testConfigurations) {
|
for (PermissionsTestConfiguration configuration : testConfigurations) {
|
||||||
OkHttpClient client = httpClient(configuration.user);
|
OkHttpClient client = httpClient(configuration.user);
|
||||||
for (Entry<String, Integer> request : configuration.connectionRequests.entrySet()) {
|
for (TestRequest request : configuration.requests) {
|
||||||
if (request.getValue() >= 0) {
|
if (request.expectedCount >= 0) {
|
||||||
assertThat(toMaps(get(client, "/stream" + configuration.endpoint + request.getKey())))
|
assertThat(toMaps(get(client, "/stream" + configuration.endpoint + request.endpoint)))
|
||||||
.hasSize(request.getValue());
|
.hasSize(request.expectedCount);
|
||||||
} else {
|
} else {
|
||||||
assertThatThrownBy(
|
assertThatThrownBy(
|
||||||
() ->
|
() ->
|
||||||
toMaps(get(client, "/stream" + configuration.endpoint + request.getKey())))
|
toMaps(get(client, "/stream" + configuration.endpoint + request.endpoint)))
|
||||||
.hasMessageContaining("401");
|
.hasMessageContaining("401");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -353,4 +336,41 @@ public class HttpTest {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static class PermissionsTestConfiguration {
|
||||||
|
final String user;
|
||||||
|
final String endpoint;
|
||||||
|
final TestRequest[] requests;
|
||||||
|
final Map<String, Boolean> vhostConnections;
|
||||||
|
|
||||||
|
PermissionsTestConfiguration(
|
||||||
|
String user, String endpoint, TestRequest[] requests, Object... vhostConnections) {
|
||||||
|
this.user = user;
|
||||||
|
this.endpoint = endpoint;
|
||||||
|
this.requests = requests;
|
||||||
|
this.vhostConnections = new LinkedHashMap<>();
|
||||||
|
for (int i = 0; i < vhostConnections.length; i = i + 2) {
|
||||||
|
this.vhostConnections.put(
|
||||||
|
vhostConnections[i].toString(), (Boolean) vhostConnections[i + 1]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static TestRequest[] requests(TestRequest... requests) {
|
||||||
|
return requests;
|
||||||
|
}
|
||||||
|
|
||||||
|
static class TestRequest {
|
||||||
|
final String endpoint;
|
||||||
|
final int expectedCount;
|
||||||
|
|
||||||
|
TestRequest(String endpoint, int expectedCount) {
|
||||||
|
this.endpoint = endpoint;
|
||||||
|
this.expectedCount = expectedCount;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static TestRequest r(String endpoint, int expectedCount) {
|
||||||
|
return new TestRequest(endpoint, expectedCount);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue