Instrument @JmsListener session for response messages
Prior to this commit, the observation instrumentation for `@JmsListener` annotated methods (implemented in `AbstractMessageListenerContainer` would not instrument the JMS session using the Micrometer JMS support. This means that response messages returned from the listener method would be sent but no observation would be recorded. As a result, tracing message properties would be also missing. This commit ensures that the session provided to the listener method is instrumented beforehand, if Micrometer is on the classpath and an observation registry has been configured. Fixes gh-33221
This commit is contained in:
parent
e8630f3409
commit
0bb309f433
|
@ -162,6 +162,8 @@ include-code::./JmsTemplatePublish[]
|
||||||
|
|
||||||
It uses the `io.micrometer.jakarta9.instrument.jms.DefaultJmsPublishObservationConvention` by default, backed by the `io.micrometer.jakarta9.instrument.jms.JmsPublishObservationContext`.
|
It uses the `io.micrometer.jakarta9.instrument.jms.DefaultJmsPublishObservationConvention` by default, backed by the `io.micrometer.jakarta9.instrument.jms.JmsPublishObservationContext`.
|
||||||
|
|
||||||
|
Similar observations are recorded with `@JmsListener` annotated methods when response messages are returned from the listener method.
|
||||||
|
|
||||||
[[observability.jms.process]]
|
[[observability.jms.process]]
|
||||||
=== JMS message Processing instrumentation
|
=== JMS message Processing instrumentation
|
||||||
|
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
package org.springframework.jms.listener;
|
package org.springframework.jms.listener;
|
||||||
|
|
||||||
import io.micrometer.jakarta9.instrument.jms.DefaultJmsProcessObservationConvention;
|
import io.micrometer.jakarta9.instrument.jms.DefaultJmsProcessObservationConvention;
|
||||||
|
import io.micrometer.jakarta9.instrument.jms.JmsInstrumentation;
|
||||||
import io.micrometer.jakarta9.instrument.jms.JmsObservationDocumentation;
|
import io.micrometer.jakarta9.instrument.jms.JmsObservationDocumentation;
|
||||||
import io.micrometer.jakarta9.instrument.jms.JmsProcessObservationContext;
|
import io.micrometer.jakarta9.instrument.jms.JmsProcessObservationContext;
|
||||||
import io.micrometer.jakarta9.instrument.jms.JmsProcessObservationConvention;
|
import io.micrometer.jakarta9.instrument.jms.JmsProcessObservationConvention;
|
||||||
|
@ -772,6 +773,9 @@ public abstract class AbstractMessageListenerContainer extends AbstractJmsListen
|
||||||
Observation observation = createObservation(message);
|
Observation observation = createObservation(message);
|
||||||
try {
|
try {
|
||||||
Session sessionToUse = session;
|
Session sessionToUse = session;
|
||||||
|
if (micrometerJakartaPresent && this.observationRegistry != null) {
|
||||||
|
sessionToUse = MicrometerInstrumentation.instrumentSession(sessionToUse, this.observationRegistry);
|
||||||
|
}
|
||||||
if (!isExposeListenerSession()) {
|
if (!isExposeListenerSession()) {
|
||||||
// We need to expose a separate Session.
|
// We need to expose a separate Session.
|
||||||
conToClose = createConnection();
|
conToClose = createConnection();
|
||||||
|
@ -992,6 +996,14 @@ public abstract class AbstractMessageListenerContainer extends AbstractJmsListen
|
||||||
private static class MessageRejectedWhileStoppingException extends RuntimeException {
|
private static class MessageRejectedWhileStoppingException extends RuntimeException {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private abstract static class MicrometerInstrumentation {
|
||||||
|
|
||||||
|
static Session instrumentSession(Session session, ObservationRegistry registry) {
|
||||||
|
return JmsInstrumentation.instrumentSession(session, registry);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
private abstract static class ObservationFactory {
|
private abstract static class ObservationFactory {
|
||||||
|
|
||||||
private static final JmsProcessObservationConvention DEFAULT_CONVENTION = new DefaultJmsProcessObservationConvention();
|
private static final JmsProcessObservationConvention DEFAULT_CONVENTION = new DefaultJmsProcessObservationConvention();
|
||||||
|
|
|
@ -23,7 +23,9 @@ import java.util.stream.Stream;
|
||||||
|
|
||||||
import io.micrometer.observation.Observation;
|
import io.micrometer.observation.Observation;
|
||||||
import io.micrometer.observation.tck.TestObservationRegistry;
|
import io.micrometer.observation.tck.TestObservationRegistry;
|
||||||
|
import jakarta.jms.Message;
|
||||||
import jakarta.jms.MessageListener;
|
import jakarta.jms.MessageListener;
|
||||||
|
import jakarta.jms.TextMessage;
|
||||||
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
||||||
import org.apache.activemq.artemis.junit.EmbeddedActiveMQExtension;
|
import org.apache.activemq.artemis.junit.EmbeddedActiveMQExtension;
|
||||||
import org.assertj.core.api.Assertions;
|
import org.assertj.core.api.Assertions;
|
||||||
|
@ -81,6 +83,33 @@ class MessageListenerContainerObservationTests {
|
||||||
listenerContainer.shutdown();
|
listenerContainer.shutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ParameterizedTest(name = "[{index}] {0}")
|
||||||
|
@MethodSource("listenerContainers")
|
||||||
|
void shouldRecordJmsPublishObservations(AbstractMessageListenerContainer listenerContainer) throws Exception {
|
||||||
|
CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
listenerContainer.setConnectionFactory(connectionFactory);
|
||||||
|
listenerContainer.setObservationRegistry(registry);
|
||||||
|
listenerContainer.setDestinationName("spring.test.observation");
|
||||||
|
listenerContainer.setMessageListener((SessionAwareMessageListener) (message, session) -> {
|
||||||
|
Message response = session.createTextMessage("test response");
|
||||||
|
session.createProducer(message.getJMSReplyTo()).send(response);
|
||||||
|
});
|
||||||
|
listenerContainer.afterPropertiesSet();
|
||||||
|
listenerContainer.start();
|
||||||
|
JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
|
||||||
|
TextMessage response = (TextMessage) jmsTemplate.sendAndReceive("spring.test.observation",
|
||||||
|
session -> session.createTextMessage("test request"));
|
||||||
|
|
||||||
|
// request received by listener and response received by template
|
||||||
|
assertThat(registry).hasNumberOfObservationsWithNameEqualTo("jms.message.process", 2);
|
||||||
|
// response sent to the template
|
||||||
|
assertThat(registry).hasNumberOfObservationsWithNameEqualTo("jms.message.publish", 1);
|
||||||
|
|
||||||
|
Assertions.assertThat(response.getText()).isEqualTo("test response");
|
||||||
|
listenerContainer.stop();
|
||||||
|
listenerContainer.shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
@ParameterizedTest(name = "[{index}] {0}")
|
@ParameterizedTest(name = "[{index}] {0}")
|
||||||
@MethodSource("listenerContainers")
|
@MethodSource("listenerContainers")
|
||||||
void shouldHaveObservationScopeInErrorHandler(AbstractMessageListenerContainer listenerContainer) throws Exception {
|
void shouldHaveObservationScopeInErrorHandler(AbstractMessageListenerContainer listenerContainer) throws Exception {
|
||||||
|
|
Loading…
Reference in New Issue