diff --git a/framework-docs/modules/ROOT/pages/integration/jms/sending.adoc b/framework-docs/modules/ROOT/pages/integration/jms/sending.adoc index 5ad13d40b1..27beb62257 100644 --- a/framework-docs/modules/ROOT/pages/integration/jms/sending.adoc +++ b/framework-docs/modules/ROOT/pages/integration/jms/sending.adoc @@ -9,39 +9,7 @@ that takes no destination argument uses the default destination. The following example uses the `MessageCreator` callback to create a text message from the supplied `Session` object: -[source,java,indent=0,subs="verbatim,quotes"] ----- - import jakarta.jms.ConnectionFactory; - import jakarta.jms.JMSException; - import jakarta.jms.Message; - import jakarta.jms.Queue; - import jakarta.jms.Session; - - import org.springframework.jms.core.MessageCreator; - import org.springframework.jms.core.JmsTemplate; - - public class JmsQueueSender { - - private JmsTemplate jmsTemplate; - private Queue queue; - - public void setConnectionFactory(ConnectionFactory cf) { - this.jmsTemplate = new JmsTemplate(cf); - } - - public void setQueue(Queue queue) { - this.queue = queue; - } - - public void simpleSend() { - this.jmsTemplate.send(this.queue, new MessageCreator() { - public Message createMessage(Session session) throws JMSException { - return session.createTextMessage("hello queue world"); - } - }); - } - } ----- +include-code::./JmsQueueSender[] In the preceding example, the `JmsTemplate` is constructed by passing a reference to a `ConnectionFactory`. As an alternative, a zero-argument constructor and @@ -84,21 +52,7 @@ gives you access to the message after it has been converted but before it is sen following example shows how to modify a message header and a property after a `java.util.Map` is converted to a message: -[source,java,indent=0,subs="verbatim,quotes"] ----- - public void sendWithConversion() { - Map map = new HashMap<>(); - map.put("Name", "Mark"); - map.put("Age", new Integer(47)); - jmsTemplate.convertAndSend("testQueue", map, new MessagePostProcessor() { - public Message postProcessMessage(Message message) throws JMSException { - message.setIntProperty("AccountID", 1234); - message.setJMSCorrelationID("123-00001"); - return message; - } - }); - } ----- +include-code::./JmsSenderWithConversion[] This results in a message of the following form: @@ -126,32 +80,6 @@ to `jakarta.jms.TextMessage`, `jakarta.jms.BytesMessage`, etc. For a contract su generic message payloads, use `org.springframework.messaging.converter.MessageConverter` with `JmsMessagingTemplate` or preferably `JmsClient` as your central delegate instead. - -[[jms-sending-jmsclient]] -== Sending a Message with `JmsClient` - -[source,java,indent=0,subs="verbatim,quotes"] ----- -// Reusable handle, typically created through JmsClient.create(ConnectionFactory) -// For custom conversion, use JmsClient.create(ConnectionFactory, MessageConverter) -private JmsClient jmsClient; - -public void sendWithConversion() { - this.jmsClient.destination("myQueue") - .withTimeToLive(1000) - .send("myPayload"); // optionally with a headers Map next to the payload -} - -public void sendCustomMessage() { - Message message = - MessageBuilder.withPayload("myPayload").build(); // optionally with headers - this.jmsClient.destination("myQueue") - .withTimeToLive(1000) - .send(message); -} ----- - - [[jms-sending-callbacks]] == Using `SessionCallback` and `ProducerCallback` on `JmsTemplate` @@ -160,3 +88,21 @@ want to perform multiple operations on a JMS `Session` or `MessageProducer`. The `SessionCallback` and `ProducerCallback` expose the JMS `Session` and `Session` / `MessageProducer` pair, respectively. The `execute()` methods on `JmsTemplate` run these callback methods. + + +[[jms-sending-jmsclient]] +== Sending a Message with `JmsClient` + +include-code::./JmsClientSample[] + + +[[jms-sending-postprocessor]] +== Post-processing outgoing messages + +Applications often need to intercept messages before they are sent out, for example to add message properties to all outgoing messages. +The `org.springframework.messaging.core.MessagePostProcessor` based on the spring-messaging `Message` can do that, +when configured on the `JmsClient`. It will be used for all outgoing messages sent with the `send` and `sendAndReceive` methods. + +Here is an example of an interceptor adding a "tenantId" property to all outgoing messages. + +include-code::./JmsClientWithPostProcessor[] diff --git a/framework-docs/src/main/java/org/springframework/docs/integration/jms/jmssending/JmsQueueSender.java b/framework-docs/src/main/java/org/springframework/docs/integration/jms/jmssending/JmsQueueSender.java new file mode 100644 index 0000000000..9b8aede1b8 --- /dev/null +++ b/framework-docs/src/main/java/org/springframework/docs/integration/jms/jmssending/JmsQueueSender.java @@ -0,0 +1,48 @@ +/* + * Copyright 2002-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.docs.integration.jms.jmssending; + +import jakarta.jms.ConnectionFactory; +import jakarta.jms.JMSException; +import jakarta.jms.Message; +import jakarta.jms.Queue; +import jakarta.jms.Session; + +import org.springframework.jms.core.MessageCreator; +import org.springframework.jms.core.JmsTemplate; + +public class JmsQueueSender { + + private JmsTemplate jmsTemplate; + private Queue queue; + + public void setConnectionFactory(ConnectionFactory cf) { + this.jmsTemplate = new JmsTemplate(cf); + } + + public void setQueue(Queue queue) { + this.queue = queue; + } + + public void simpleSend() { + this.jmsTemplate.send(this.queue, new MessageCreator() { + public Message createMessage(Session session) throws JMSException { + return session.createTextMessage("hello queue world"); + } + }); + } +} diff --git a/framework-docs/src/main/java/org/springframework/docs/integration/jms/jmssendingconversion/JmsSenderWithConversion.java b/framework-docs/src/main/java/org/springframework/docs/integration/jms/jmssendingconversion/JmsSenderWithConversion.java new file mode 100644 index 0000000000..a9f43e2c5f --- /dev/null +++ b/framework-docs/src/main/java/org/springframework/docs/integration/jms/jmssendingconversion/JmsSenderWithConversion.java @@ -0,0 +1,45 @@ +/* + * Copyright 2002-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.docs.integration.jms.jmssendingconversion; + +import java.util.HashMap; +import java.util.Map; + +import jakarta.jms.JMSException; +import jakarta.jms.Message; + +import org.springframework.jms.core.JmsTemplate; +import org.springframework.jms.core.MessagePostProcessor; + +public class JmsSenderWithConversion { + + private JmsTemplate jmsTemplate; + + public void sendWithConversion() { + Map map = new HashMap<>(); + map.put("Name", "Mark"); + map.put("Age", 47); + jmsTemplate.convertAndSend("testQueue", map, new MessagePostProcessor() { + public Message postProcessMessage(Message message) throws JMSException { + message.setIntProperty("AccountID", 1234); + message.setJMSCorrelationID("123-00001"); + return message; + } + }); + } + +} diff --git a/framework-docs/src/main/java/org/springframework/docs/integration/jms/jmssendingjmsclient/JmsClientSample.java b/framework-docs/src/main/java/org/springframework/docs/integration/jms/jmssendingjmsclient/JmsClientSample.java new file mode 100644 index 0000000000..3f7468c839 --- /dev/null +++ b/framework-docs/src/main/java/org/springframework/docs/integration/jms/jmssendingjmsclient/JmsClientSample.java @@ -0,0 +1,46 @@ +/* + * Copyright 2002-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.docs.integration.jms.jmssendingjmsclient; + +import jakarta.jms.ConnectionFactory; + +import org.springframework.jms.core.JmsClient; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; + +public class JmsClientSample { + + private final JmsClient jmsClient; + + public JmsClientSample(ConnectionFactory connectionFactory) { + // For custom options, use JmsClient.builder(ConnectionFactory) + this.jmsClient = JmsClient.create(connectionFactory); + } + + public void sendWithConversion() { + this.jmsClient.destination("myQueue") + .withTimeToLive(1000) + .send("myPayload"); // optionally with a headers Map next to the payload + } + + public void sendCustomMessage() { + Message message = MessageBuilder.withPayload("myPayload").build(); // optionally with headers + this.jmsClient.destination("myQueue") + .withTimeToLive(1000) + .send(message); + } +} diff --git a/framework-docs/src/main/java/org/springframework/docs/integration/jms/jmssendingpostprocessor/JmsClientWithPostProcessor.java b/framework-docs/src/main/java/org/springframework/docs/integration/jms/jmssendingpostprocessor/JmsClientWithPostProcessor.java new file mode 100644 index 0000000000..2e5a4a4a63 --- /dev/null +++ b/framework-docs/src/main/java/org/springframework/docs/integration/jms/jmssendingpostprocessor/JmsClientWithPostProcessor.java @@ -0,0 +1,58 @@ +/* + * Copyright 2002-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.docs.integration.jms.jmssendingpostprocessor; + + +import jakarta.jms.ConnectionFactory; + +import org.springframework.jms.core.JmsClient; +import org.springframework.messaging.Message; +import org.springframework.messaging.core.MessagePostProcessor; +import org.springframework.messaging.support.MessageBuilder; + +public class JmsClientWithPostProcessor { + + private final JmsClient jmsClient; + + public JmsClientWithPostProcessor(ConnectionFactory connectionFactory) { + this.jmsClient = JmsClient.builder(connectionFactory) + .messagePostProcessor(new TenantIdMessageInterceptor("42")) + .build(); + } + + public void sendWithPostProcessor() { + this.jmsClient.destination("myQueue") + .withTimeToLive(1000) + .send("myPayload"); + } + + static class TenantIdMessageInterceptor implements MessagePostProcessor { + + private final String tenantId; + + public TenantIdMessageInterceptor(String tenantId) { + this.tenantId = tenantId; + } + + @Override + public Message postProcessMessage(Message message) { + return MessageBuilder.fromMessage(message) + .setHeader("tenantId", this.tenantId) + .build(); + } + } +} diff --git a/spring-jms/src/main/java/org/springframework/jms/core/DefaultJmsClient.java b/spring-jms/src/main/java/org/springframework/jms/core/DefaultJmsClient.java index fb9b2d89a6..ac8d13af80 100644 --- a/spring-jms/src/main/java/org/springframework/jms/core/DefaultJmsClient.java +++ b/spring-jms/src/main/java/org/springframework/jms/core/DefaultJmsClient.java @@ -27,6 +27,7 @@ import org.springframework.jms.support.JmsAccessor; import org.springframework.messaging.Message; import org.springframework.messaging.MessagingException; import org.springframework.messaging.converter.MessageConverter; +import org.springframework.messaging.core.MessagePostProcessor; import org.springframework.util.Assert; /** @@ -34,28 +35,38 @@ import org.springframework.util.Assert; * as created by the static factory methods. * * @author Juergen Hoeller + * @author Brian Clozel * @since 7.0 * @see JmsClient#create(ConnectionFactory) - * @see JmsClient#create(ConnectionFactory, MessageConverter) * @see JmsClient#create(JmsOperations) - * @see JmsClient#create(JmsOperations, MessageConverter) */ class DefaultJmsClient implements JmsClient { private final JmsOperations jmsTemplate; - private final @Nullable MessageConverter messageConverter; + private @Nullable MessageConverter messageConverter; + + private @Nullable MessagePostProcessor messagePostProcessor; - public DefaultJmsClient(ConnectionFactory connectionFactory, @Nullable MessageConverter messageConverter) { + public DefaultJmsClient(ConnectionFactory connectionFactory) { + Assert.notNull(connectionFactory, "ConnectionFactory must not be null"); this.jmsTemplate = new JmsTemplate(connectionFactory); + } + + public DefaultJmsClient(JmsOperations jmsTemplate) { + Assert.notNull(jmsTemplate, "JmsTemplate must not be null"); + this.jmsTemplate = jmsTemplate; + } + + void setMessageConverter(MessageConverter messageConverter) { + Assert.notNull(messageConverter, "MessageConverter must not be null"); this.messageConverter = messageConverter; } - public DefaultJmsClient(JmsOperations jmsTemplate, @Nullable MessageConverter messageConverter) { - Assert.notNull(jmsTemplate, "JmsTemplate must not be null"); - this.jmsTemplate = jmsTemplate; - this.messageConverter = messageConverter; + void setMessagePostProcessor(MessagePostProcessor messagePostProcessor) { + Assert.notNull(messagePostProcessor, "MessagePostProcessor must not be null"); + this.messagePostProcessor = messagePostProcessor; } @@ -141,17 +152,18 @@ class DefaultJmsClient implements JmsClient { @Override public void send(Message message) throws MessagingException { + message = postProcessMessage(message); this.delegate.send(message); } @Override public void send(Object payload) throws MessagingException { - this.delegate.convertAndSend(payload); + this.delegate.convertAndSend(payload, DefaultJmsClient.this.messagePostProcessor); } @Override public void send(Object payload, Map headers) throws MessagingException { - this.delegate.convertAndSend(payload, headers); + this.delegate.convertAndSend(payload, headers, DefaultJmsClient.this.messagePostProcessor); } @Override @@ -176,19 +188,27 @@ class DefaultJmsClient implements JmsClient { @Override public Optional> sendAndReceive(Message requestMessage) throws MessagingException { + requestMessage = postProcessMessage(requestMessage); return Optional.ofNullable(this.delegate.sendAndReceive(requestMessage)); } @Override public Optional sendAndReceive(Object request, Class targetClass) throws MessagingException { - return Optional.ofNullable(this.delegate.convertSendAndReceive(request, targetClass)); + return Optional.ofNullable(this.delegate.convertSendAndReceive(request, targetClass, DefaultJmsClient.this.messagePostProcessor)); } @Override public Optional sendAndReceive(Object request, Map headers, Class targetClass) throws MessagingException { - return Optional.ofNullable(this.delegate.convertSendAndReceive(request, headers, targetClass)); + return Optional.ofNullable(this.delegate.convertSendAndReceive(request, headers, targetClass, DefaultJmsClient.this.messagePostProcessor)); + } + + private Message postProcessMessage(Message message) { + if (DefaultJmsClient.this.messagePostProcessor != null) { + return DefaultJmsClient.this.messagePostProcessor.postProcessMessage(message); + } + return message; } } diff --git a/spring-jms/src/main/java/org/springframework/jms/core/DefaultJmsClientBuilder.java b/spring-jms/src/main/java/org/springframework/jms/core/DefaultJmsClientBuilder.java new file mode 100644 index 0000000000..da314c2396 --- /dev/null +++ b/spring-jms/src/main/java/org/springframework/jms/core/DefaultJmsClientBuilder.java @@ -0,0 +1,88 @@ +/* + * Copyright 2002-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.jms.core; + +import java.util.ArrayList; +import java.util.List; + +import jakarta.jms.ConnectionFactory; +import org.jspecify.annotations.Nullable; + +import org.springframework.messaging.converter.CompositeMessageConverter; +import org.springframework.messaging.converter.MessageConverter; +import org.springframework.messaging.core.CompositeMessagePostProcessor; +import org.springframework.messaging.core.MessagePostProcessor; +import org.springframework.util.Assert; + +/** + * Default implementation of {@link JmsClient.Builder}. + * @author Brian Clozel + * @since 7.0 + * @see JmsClient#builder(ConnectionFactory) + * @see JmsClient#builder(JmsOperations) + */ +class DefaultJmsClientBuilder implements JmsClient.Builder { + + private final DefaultJmsClient jmsClient; + + private @Nullable List messageConverters; + + private @Nullable List messagePostProcessors; + + + DefaultJmsClientBuilder(ConnectionFactory connectionFactory) { + Assert.notNull(connectionFactory, "ConnectionFactory must not be null"); + this.jmsClient = new DefaultJmsClient(connectionFactory); + } + + DefaultJmsClientBuilder(JmsOperations jmsTemplate) { + Assert.notNull(jmsTemplate, "JmsOperations must not be null"); + this.jmsClient = new DefaultJmsClient(jmsTemplate); + } + + @Override + public JmsClient.Builder messageConverter(MessageConverter messageConverter) { + Assert.notNull(messageConverter, "MessageConverter must not be null"); + if (this.messageConverters == null) { + this.messageConverters = new ArrayList<>(); + } + this.messageConverters.add(messageConverter); + return this; + } + + @Override + public JmsClient.Builder messagePostProcessor(MessagePostProcessor messagePostProcessor) { + Assert.notNull(messagePostProcessor, "MessagePostProcessor must not be null"); + if (this.messagePostProcessors == null) { + this.messagePostProcessors = new ArrayList<>(); + } + this.messagePostProcessors.add(messagePostProcessor); + return this; + } + + @Override + public JmsClient build() { + if (this.messageConverters != null) { + this.jmsClient.setMessageConverter(new CompositeMessageConverter(this.messageConverters)); + } + if (this.messagePostProcessors != null) { + this.jmsClient.setMessagePostProcessor(new CompositeMessagePostProcessor(this.messagePostProcessors)); + } + return this.jmsClient; + } + +} diff --git a/spring-jms/src/main/java/org/springframework/jms/core/JmsClient.java b/spring-jms/src/main/java/org/springframework/jms/core/JmsClient.java index bbbbc89b43..fb200c5d4e 100644 --- a/spring-jms/src/main/java/org/springframework/jms/core/JmsClient.java +++ b/spring-jms/src/main/java/org/springframework/jms/core/JmsClient.java @@ -25,6 +25,7 @@ import jakarta.jms.Destination; import org.springframework.messaging.Message; import org.springframework.messaging.MessagingException; import org.springframework.messaging.converter.MessageConverter; +import org.springframework.messaging.core.MessagePostProcessor; /** * A fluent {@code JmsClient} with common send and receive operations against a JMS @@ -73,6 +74,7 @@ import org.springframework.messaging.converter.MessageConverter; * * * @author Juergen Hoeller + * @author Brian Clozel * @since 7.0 * @see JmsTemplate * @see JmsMessagingTemplate @@ -103,16 +105,7 @@ public interface JmsClient { * @param connectionFactory the factory to obtain JMS connections from */ static JmsClient create(ConnectionFactory connectionFactory) { - return new DefaultJmsClient(connectionFactory, null); - } - - /** - * Create a new {@code JmsClient} for the given {@link ConnectionFactory}. - * @param connectionFactory the factory to obtain JMS connections from - * @param messageConverter the message converter for payload objects - */ - static JmsClient create(ConnectionFactory connectionFactory, MessageConverter messageConverter) { - return new DefaultJmsClient(connectionFactory, messageConverter); + return new DefaultJmsClient(connectionFactory); } /** @@ -121,17 +114,56 @@ public interface JmsClient { * (can be a custom {@link JmsOperations} implementation as well) */ static JmsClient create(JmsOperations jmsTemplate) { - return new DefaultJmsClient(jmsTemplate, null); + return new DefaultJmsClient(jmsTemplate); } /** - * Create a new {@code JmsClient} for the given {@link JmsOperations}. + * Obtain a {@code JmsClient} builder that will use the given connection + * factory for JMS connections. + * @param connectionFactory the factory to obtain JMS connections from + * @return a {@code JmsClient} builder that uses the given connection factory. + */ + static Builder builder(ConnectionFactory connectionFactory) { + return new DefaultJmsClientBuilder(connectionFactory); + } + + /** + * Obtain a {@code JmsClient} builder based on the configuration of the + * given {@code JmsTemplate}. * @param jmsTemplate the {@link JmsTemplate} to use for performing operations * (can be a custom {@link JmsOperations} implementation as well) - * @param messageConverter the message converter for payload objects + * @return a {@code JmsClient} builder that uses the given JMS template. */ - static JmsClient create(JmsOperations jmsTemplate, MessageConverter messageConverter) { - return new DefaultJmsClient(jmsTemplate, messageConverter); + static Builder builder(JmsOperations jmsTemplate) { + return new DefaultJmsClientBuilder(jmsTemplate); + } + + /** + * A mutable builder for creating a {@link JmsClient}. + */ + interface Builder { + + /** + * Add a {@code MessageConverter} to use for converting payload objects to/from messages. + * Message converters will be considered in order of registration. + * @param messageConverter the message converter for payload objects + * @return this builder + */ + Builder messageConverter(MessageConverter messageConverter); + + /** + * Add a {@link MessagePostProcessor} to use for modifying {@code Message} instances before sending. + * Post-processors will be executed in order of registration. + * @param messagePostProcessor the post-processor to use for outgoing messages + * @return this builder + */ + Builder messagePostProcessor(MessagePostProcessor messagePostProcessor); + + /** + * Build the {@code JmsClient} instance. + */ + JmsClient build(); + } diff --git a/spring-jms/src/main/java/org/springframework/jms/core/MessagePostProcessor.java b/spring-jms/src/main/java/org/springframework/jms/core/MessagePostProcessor.java index aa3d9d9ce7..00b0c94349 100644 --- a/spring-jms/src/main/java/org/springframework/jms/core/MessagePostProcessor.java +++ b/spring-jms/src/main/java/org/springframework/jms/core/MessagePostProcessor.java @@ -20,18 +20,19 @@ import jakarta.jms.JMSException; import jakarta.jms.Message; /** - * To be used with JmsTemplate's send method that converts an object to a message. + * Post-processes a {@link Message}. This is the JMS equivalent of the spring-messaging + * {@link org.springframework.messaging.core.MessagePostProcessor}. * - *

This allows for further modification of the message after it has been processed - * by the converter and is useful for setting JMS headers and properties. - * - *

Often implemented as a lambda expression or as an anonymous inner class. + *

This is involved right before a {@link JmsClient} sends a message over the wire, for setting additional + * JMS properties and headers. With {@link JmsTemplate}, the message post processor is only involved + * in methods accepting it as an argument, to customize the outgoing message produced + * by a {@link org.springframework.jms.support.converter.MessageConverter}. * * @author Mark Pollack * @since 1.1 + * @see JmsClient.OperationSpec#send(org.springframework.messaging.Message) * @see JmsTemplate#convertAndSend(String, Object, MessagePostProcessor) * @see JmsTemplate#convertAndSend(jakarta.jms.Destination, Object, MessagePostProcessor) - * @see org.springframework.jms.support.converter.MessageConverter */ @FunctionalInterface public interface MessagePostProcessor { diff --git a/spring-jms/src/test/java/org/springframework/jms/core/JmsClientTests.java b/spring-jms/src/test/java/org/springframework/jms/core/JmsClientTests.java index a7731d2798..49b71187a4 100644 --- a/spring-jms/src/test/java/org/springframework/jms/core/JmsClientTests.java +++ b/spring-jms/src/test/java/org/springframework/jms/core/JmsClientTests.java @@ -141,6 +141,21 @@ class JmsClientTests { assertTextMessage(this.messageCreator.getValue()); // see createTextMessage } + @Test + void convertAndSendPayloadAndHeadersWithPostProcessor() throws JMSException { + Destination destination = new Destination() {}; + Map headers = new HashMap<>(); + headers.put("foo", "bar"); + + this.jmsClient = JmsClient.builder(this.jmsTemplate) + .messagePostProcessor(msg -> MessageBuilder.fromMessage(msg).setHeader("spring", "framework").build()) + .build(); + this.jmsClient.destination(destination).send("Hello", headers); + verify(this.jmsTemplate).send(eq(destination), this.messageCreator.capture()); + TextMessage jmsMessage = createTextMessage(this.messageCreator.getValue()); + assertThat(jmsMessage.getObjectProperty("spring")).isEqualTo("framework"); + } + @Test void receive() { Destination destination = new Destination() {}; @@ -209,7 +224,7 @@ class JmsClientTests { jakarta.jms.Message jmsMessage = createJmsTextMessage("123"); given(this.jmsTemplate.receive("myQueue")).willReturn(jmsMessage); - this.jmsClient = JmsClient.create(this.jmsTemplate, new GenericMessageConverter()); + this.jmsClient = JmsClient.builder(this.jmsTemplate).messageConverter(new GenericMessageConverter()).build(); Integer payload = this.jmsClient.destination("myQueue").receive(Integer.class).get(); assertThat(payload).isEqualTo(Integer.valueOf(123)); @@ -258,7 +273,7 @@ class JmsClientTests { jakarta.jms.Message jmsMessage = createJmsTextMessage("123"); given(this.jmsTemplate.receiveSelected("myQueue", "selector")).willReturn(jmsMessage); - this.jmsClient = JmsClient.create(this.jmsTemplate, new GenericMessageConverter()); + this.jmsClient = JmsClient.builder(this.jmsTemplate).messageConverter(new GenericMessageConverter()).build(); Integer payload = this.jmsClient.destination("myQueue").receive("selector", Integer.class).get(); assertThat(payload).isEqualTo(Integer.valueOf(123)); @@ -315,6 +330,22 @@ class JmsClientTests { assertThat(reply).isEqualTo("My reply"); } + @Test + void convertSendAndReceivePayloadWithPostProcessor() throws JMSException { + Destination destination = new Destination() {}; + jakarta.jms.Message replyJmsMessage = createJmsTextMessage("My reply"); + given(this.jmsTemplate.sendAndReceive(eq(destination), any())).willReturn(replyJmsMessage); + + this.jmsClient = JmsClient.builder(this.jmsTemplate) + .messagePostProcessor(msg -> MessageBuilder.fromMessage(msg).setHeader("spring", "framework").build()) + .build(); + this.jmsClient.destination(destination).sendAndReceive("my Payload", String.class); + verify(this.jmsTemplate).sendAndReceive(eq(destination), this.messageCreator.capture()); + TextMessage jmsMessage = createTextMessage(this.messageCreator.getValue()); + assertThat(jmsMessage.getObjectProperty("spring")).isEqualTo("framework"); + verify(this.jmsTemplate, times(1)).sendAndReceive(eq(destination), any()); + } + @Test void convertSendAndReceivePayloadName() { jakarta.jms.Message replyJmsMessage = createJmsTextMessage("My reply"); @@ -391,6 +422,32 @@ class JmsClientTests { verify(connection).close(); } + @Test + void sendWithPostProcessor() throws Exception { + ConnectionFactory connectionFactory = mock(); + Connection connection = mock(); + Session session = mock(); + Queue queue = mock(); + MessageProducer messageProducer = mock(); + TextMessage textMessage = mock(); + + given(connectionFactory.createConnection()).willReturn(connection); + given(connection.createSession(false, Session.AUTO_ACKNOWLEDGE)).willReturn(session); + given(session.createProducer(queue)).willReturn(messageProducer); + given(session.createTextMessage("just testing")).willReturn(textMessage); + + JmsClient.builder(connectionFactory) + .messagePostProcessor(msg -> MessageBuilder.fromMessage(msg).setHeader("spring", "framework").build()) + .build() + .destination(queue).send("just testing"); + + verify(textMessage).setObjectProperty("spring", "framework"); + verify(messageProducer).send(textMessage); + verify(messageProducer).close(); + verify(session).close(); + verify(connection).close(); + } + @Test void sendWithCustomSettings() throws Exception { ConnectionFactory connectionFactory = mock(); diff --git a/spring-messaging/src/main/java/org/springframework/messaging/core/CompositeMessagePostProcessor.java b/spring-messaging/src/main/java/org/springframework/messaging/core/CompositeMessagePostProcessor.java new file mode 100644 index 0000000000..9d2dd046b2 --- /dev/null +++ b/spring-messaging/src/main/java/org/springframework/messaging/core/CompositeMessagePostProcessor.java @@ -0,0 +1,48 @@ +/* + * Copyright 2002-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.core; + +import java.util.List; + +import org.springframework.messaging.Message; + +/** + * Composite {@link MessagePostProcessor} implementation that iterates over + * a given collection of delegate {@link MessagePostProcessor} instances. + * @author Brian Clozel + * @since 7.0 + */ +public class CompositeMessagePostProcessor implements MessagePostProcessor { + + private final List messagePostProcessors; + + /** + * Construct a CompositeMessagePostProcessor from the given delegate MessagePostProcessors. + * @param messagePostProcessors the MessagePostProcessors to delegate to + */ + public CompositeMessagePostProcessor(List messagePostProcessors) { + this.messagePostProcessors = messagePostProcessors; + } + + @Override + public Message postProcessMessage(Message message) { + for (MessagePostProcessor messagePostProcessor : this.messagePostProcessors) { + message = messagePostProcessor.postProcessMessage(message); + } + return message; + } +}