diff --git a/framework-docs/modules/ROOT/pages/integration/observability.adoc b/framework-docs/modules/ROOT/pages/integration/observability.adoc index e4208a28d8..64e5f230dd 100644 --- a/framework-docs/modules/ROOT/pages/integration/observability.adoc +++ b/framework-docs/modules/ROOT/pages/integration/observability.adoc @@ -162,6 +162,8 @@ include-code::./JmsTemplatePublish[] It uses the `io.micrometer.jakarta9.instrument.jms.DefaultJmsPublishObservationConvention` by default, backed by the `io.micrometer.jakarta9.instrument.jms.JmsPublishObservationContext`. +Similar observations are recorded with `@JmsListener` annotated methods when response messages are returned from the listener method. + [[observability.jms.process]] === JMS message Processing instrumentation diff --git a/spring-jms/src/main/java/org/springframework/jms/listener/AbstractMessageListenerContainer.java b/spring-jms/src/main/java/org/springframework/jms/listener/AbstractMessageListenerContainer.java index 6c0f78b743..6362a4ba82 100644 --- a/spring-jms/src/main/java/org/springframework/jms/listener/AbstractMessageListenerContainer.java +++ b/spring-jms/src/main/java/org/springframework/jms/listener/AbstractMessageListenerContainer.java @@ -17,6 +17,7 @@ package org.springframework.jms.listener; import io.micrometer.jakarta9.instrument.jms.DefaultJmsProcessObservationConvention; +import io.micrometer.jakarta9.instrument.jms.JmsInstrumentation; import io.micrometer.jakarta9.instrument.jms.JmsObservationDocumentation; import io.micrometer.jakarta9.instrument.jms.JmsProcessObservationContext; import io.micrometer.jakarta9.instrument.jms.JmsProcessObservationConvention; @@ -772,6 +773,9 @@ public abstract class AbstractMessageListenerContainer extends AbstractJmsListen Observation observation = createObservation(message); try { Session sessionToUse = session; + if (micrometerJakartaPresent && this.observationRegistry != null) { + sessionToUse = MicrometerInstrumentation.instrumentSession(sessionToUse, this.observationRegistry); + } if (!isExposeListenerSession()) { // We need to expose a separate Session. conToClose = createConnection(); @@ -992,6 +996,14 @@ public abstract class AbstractMessageListenerContainer extends AbstractJmsListen private static class MessageRejectedWhileStoppingException extends RuntimeException { } + private abstract static class MicrometerInstrumentation { + + static Session instrumentSession(Session session, ObservationRegistry registry) { + return JmsInstrumentation.instrumentSession(session, registry); + } + + } + private abstract static class ObservationFactory { private static final JmsProcessObservationConvention DEFAULT_CONVENTION = new DefaultJmsProcessObservationConvention(); diff --git a/spring-jms/src/test/java/org/springframework/jms/listener/MessageListenerContainerObservationTests.java b/spring-jms/src/test/java/org/springframework/jms/listener/MessageListenerContainerObservationTests.java index 5a732071fd..b6db6fbc17 100644 --- a/spring-jms/src/test/java/org/springframework/jms/listener/MessageListenerContainerObservationTests.java +++ b/spring-jms/src/test/java/org/springframework/jms/listener/MessageListenerContainerObservationTests.java @@ -23,9 +23,12 @@ import java.util.stream.Stream; import io.micrometer.observation.Observation; import io.micrometer.observation.tck.TestObservationRegistry; +import jakarta.jms.Message; import jakarta.jms.MessageListener; +import jakarta.jms.TextMessage; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.junit.EmbeddedActiveMQExtension; +import org.assertj.core.api.Assertions; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.extension.RegisterExtension; @@ -81,6 +84,33 @@ class MessageListenerContainerObservationTests { listenerContainer.shutdown(); } + @ParameterizedTest(name = "[{index}] {0}") + @MethodSource("listenerContainers") + void shouldRecordJmsPublishObservations(AbstractMessageListenerContainer listenerContainer) throws Exception { + CountDownLatch latch = new CountDownLatch(1); + listenerContainer.setConnectionFactory(connectionFactory); + listenerContainer.setObservationRegistry(registry); + listenerContainer.setDestinationName("spring.test.observation"); + listenerContainer.setMessageListener((SessionAwareMessageListener) (message, session) -> { + Message response = session.createTextMessage("test response"); + session.createProducer(message.getJMSReplyTo()).send(response); + }); + listenerContainer.afterPropertiesSet(); + listenerContainer.start(); + JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory); + TextMessage response = (TextMessage) jmsTemplate.sendAndReceive("spring.test.observation", + session -> session.createTextMessage("test request")); + + // request received by listener and response received by template + assertThat(registry).hasNumberOfObservationsWithNameEqualTo("jms.message.process", 2); + // response sent to the template + assertThat(registry).hasNumberOfObservationsWithNameEqualTo("jms.message.publish", 1); + + Assertions.assertThat(response.getText()).isEqualTo("test response"); + listenerContainer.stop(); + listenerContainer.shutdown(); + } + @ParameterizedTest(name = "[{index}] {0}") @MethodSource("listenerContainers") void shouldHaveObservationScopeInErrorHandler(AbstractMessageListenerContainer listenerContainer) throws Exception {