Add stream subscription properties to metrics

This commit is contained in:
Arnaud Cogoluègnes 2021-05-18 16:54:41 +02:00
parent f6d4c4f55d
commit 77b99a2efd
No known key found for this signature in database
GPG Key ID: D5C8C4DFAD43AFA8
3 changed files with 47 additions and 14 deletions

View File

@ -20,8 +20,8 @@
%% API
-export([init/0]).
-export([consumer_created/6,
consumer_updated/6,
-export([consumer_created/7,
consumer_updated/7,
consumer_cancelled/3]).
-export([publisher_created/4,
publisher_updated/7,
@ -39,9 +39,13 @@ consumer_created(Connection,
SubscriptionId,
Credits,
MessageCount,
Offset) ->
Offset,
Properties) ->
Values =
[{credits, Credits}, {consumed, MessageCount}, {offset, Offset}],
[{credits, Credits},
{consumed, MessageCount},
{offset, Offset},
{properties, Properties}],
ets:insert(?TABLE_CONSUMER,
{{StreamResource, Connection, SubscriptionId}, Values}),
rabbit_core_metrics:consumer_created(Connection,
@ -52,7 +56,7 @@ consumer_created(Connection,
0,
true,
up,
[]),
rabbit_misc:to_amqp_table(Properties)),
ok.
consumer_tag(SubscriptionId) ->
@ -64,9 +68,13 @@ consumer_updated(Connection,
SubscriptionId,
Credits,
MessageCount,
Offset) ->
Offset,
Properties) ->
Values =
[{credits, Credits}, {consumed, MessageCount}, {offset, Offset}],
[{credits, Credits},
{consumed, MessageCount},
{offset, Offset},
{properties, Properties}],
ets:insert(?TABLE_CONSUMER,
{{StreamResource, Connection, SubscriptionId}, Values}),
ok.
@ -77,6 +85,11 @@ consumer_cancelled(Connection, StreamResource, SubscriptionId) ->
rabbit_core_metrics:consumer_deleted(Connection,
consumer_tag(SubscriptionId),
StreamResource),
%% FIXME send consumer_deleted event when this function is called
%% otherwise the consumer is not fully cleaned up and can still show up in the REST API
%% rabbit_event:notify(consumer_deleted, [{consumer_tag, consumer_tag(SubscriptionId)},
%% {channel, self()},
%% {queue, StreamResource}]),
ok.
publisher_created(Connection,

View File

@ -1467,7 +1467,8 @@ handle_frame_post_auth(Transport,
SubscriptionId,
Credit1,
messages_consumed(ConsumerCounters1),
ConsumerOffset),
ConsumerOffset,
Properties),
{Connection1#stream_connection{stream_subscriptions
=
StreamSubscriptions1},
@ -2276,11 +2277,13 @@ emit_stats(#stream_connection{publishers = Publishers} = Connection,
Id,
Credit,
messages_consumed(Counters),
consumer_offset(Counters))
consumer_offset(Counters),
Properties)
|| #consumer{stream = S,
subscription_id = Id,
credit = Credit,
counters = Counters}
counters = Counters,
properties = Properties}
<- maps:values(Consumers)],
[rabbit_stream_metrics:publisher_updated(self(),
stream_r(S, Connection),

View File

@ -53,6 +53,7 @@ import org.assertj.core.api.Condition;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.NullSource;
import org.junit.jupiter.params.provider.ValueSource;
@ -224,7 +225,9 @@ public class HttpTest {
assertThat(consumers).hasSize(2);
consumers.forEach(
c -> {
assertThat(c).containsKeys("subscription_id", "credits", "connection_details", "queue");
assertThat(c)
.containsKeys(
"subscription_id", "credits", "connection_details", "queue", "properties");
assertThat(c)
.extractingByKey("connection_details", as(MAP))
.containsValue(connectionName(c1));
@ -434,8 +437,20 @@ public class HttpTest {
waitUntil(() -> request.call().size() == initialCount);
}
@Test
void consumers() throws Exception {
static Stream<Map<String, String>> subscriptionProperties() {
return Stream.of(Collections.emptyMap(), map());
}
static Map<String, String> map() {
Map<String, String> map = new LinkedHashMap<>();
map.put("key1", "value1");
map.put("key2", "value2");
return map;
}
@ParameterizedTest
@MethodSource("subscriptionProperties")
void consumers(Map<String, String> subscriptionProperties) throws Exception {
Callable<List<Map<String, Object>>> request = () -> toMaps(get("/stream/consumers"));
int initialCount = request.call().size();
String connectionProvidedName = UUID.randomUUID().toString();
@ -449,7 +464,7 @@ public class HttpTest {
client1.credit(subscriptionId, 1))
.shutdownListener(shutdownContext -> closed.set(true)));
client.subscribe((byte) 0, stream, OffsetSpecification.first(), 10);
client.subscribe((byte) 0, stream, OffsetSpecification.first(), 10, subscriptionProperties);
waitUntil(() -> request.call().size() == initialCount + 1);
waitUntil(() -> entities(request.call(), client).size() == 1);
@ -458,6 +473,8 @@ public class HttpTest {
assertThat(((Number) consumer.get("consumed")).intValue()).isEqualTo(0);
assertThat(((Number) consumer.get("offset")).intValue()).isEqualTo(0);
assertThat(((Number) consumer.get("subscription_id")).intValue()).isEqualTo(0);
assertThat(consumer.get("properties")).isNotNull().isEqualTo(subscriptionProperties);
assertThat(connectionDetails(consumer))
.containsEntry("name", connectionName(client))
.containsEntry("user", "guest")