Merge branch '6.1.x'

This commit is contained in:
Brian Clozel 2024-07-19 17:21:41 +02:00
commit b701b26b07
3 changed files with 44 additions and 0 deletions

View File

@ -162,6 +162,8 @@ include-code::./JmsTemplatePublish[]
It uses the `io.micrometer.jakarta9.instrument.jms.DefaultJmsPublishObservationConvention` by default, backed by the `io.micrometer.jakarta9.instrument.jms.JmsPublishObservationContext`.
Similar observations are recorded with `@JmsListener` annotated methods when response messages are returned from the listener method.
[[observability.jms.process]]
=== JMS message Processing instrumentation

View File

@ -17,6 +17,7 @@
package org.springframework.jms.listener;
import io.micrometer.jakarta9.instrument.jms.DefaultJmsProcessObservationConvention;
import io.micrometer.jakarta9.instrument.jms.JmsInstrumentation;
import io.micrometer.jakarta9.instrument.jms.JmsObservationDocumentation;
import io.micrometer.jakarta9.instrument.jms.JmsProcessObservationContext;
import io.micrometer.jakarta9.instrument.jms.JmsProcessObservationConvention;
@ -772,6 +773,9 @@ public abstract class AbstractMessageListenerContainer extends AbstractJmsListen
Observation observation = createObservation(message);
try {
Session sessionToUse = session;
if (micrometerJakartaPresent && this.observationRegistry != null) {
sessionToUse = MicrometerInstrumentation.instrumentSession(sessionToUse, this.observationRegistry);
}
if (!isExposeListenerSession()) {
// We need to expose a separate Session.
conToClose = createConnection();
@ -992,6 +996,14 @@ public abstract class AbstractMessageListenerContainer extends AbstractJmsListen
private static class MessageRejectedWhileStoppingException extends RuntimeException {
}
private abstract static class MicrometerInstrumentation {
static Session instrumentSession(Session session, ObservationRegistry registry) {
return JmsInstrumentation.instrumentSession(session, registry);
}
}
private abstract static class ObservationFactory {
private static final JmsProcessObservationConvention DEFAULT_CONVENTION = new DefaultJmsProcessObservationConvention();

View File

@ -23,9 +23,12 @@ import java.util.stream.Stream;
import io.micrometer.observation.Observation;
import io.micrometer.observation.tck.TestObservationRegistry;
import jakarta.jms.Message;
import jakarta.jms.MessageListener;
import jakarta.jms.TextMessage;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.junit.EmbeddedActiveMQExtension;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.RegisterExtension;
@ -81,6 +84,33 @@ class MessageListenerContainerObservationTests {
listenerContainer.shutdown();
}
@ParameterizedTest(name = "[{index}] {0}")
@MethodSource("listenerContainers")
void shouldRecordJmsPublishObservations(AbstractMessageListenerContainer listenerContainer) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
listenerContainer.setConnectionFactory(connectionFactory);
listenerContainer.setObservationRegistry(registry);
listenerContainer.setDestinationName("spring.test.observation");
listenerContainer.setMessageListener((SessionAwareMessageListener) (message, session) -> {
Message response = session.createTextMessage("test response");
session.createProducer(message.getJMSReplyTo()).send(response);
});
listenerContainer.afterPropertiesSet();
listenerContainer.start();
JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
TextMessage response = (TextMessage) jmsTemplate.sendAndReceive("spring.test.observation",
session -> session.createTextMessage("test request"));
// request received by listener and response received by template
assertThat(registry).hasNumberOfObservationsWithNameEqualTo("jms.message.process", 2);
// response sent to the template
assertThat(registry).hasNumberOfObservationsWithNameEqualTo("jms.message.publish", 1);
Assertions.assertThat(response.getText()).isEqualTo("test response");
listenerContainer.stop();
listenerContainer.shutdown();
}
@ParameterizedTest(name = "[{index}] {0}")
@MethodSource("listenerContainers")
void shouldHaveObservationScopeInErrorHandler(AbstractMessageListenerContainer listenerContainer) throws Exception {