Set up JMS destination in JUnit extension

This commit is contained in:
Arnaud Cogoluègnes 2025-02-13 08:40:50 +01:00
parent 5bcdc805d1
commit cb59ad877c
No known key found for this signature in database
GPG Key ID: D5C8C4DFAD43AFA8
7 changed files with 303 additions and 252 deletions

View File

@ -122,10 +122,8 @@ jms_temporary_queue(Config) ->
%% Send different message types from JMS client to JMS client. %% Send different message types from JMS client to JMS client.
message_types_jms_to_jms(Config) -> message_types_jms_to_jms(Config) ->
TestName = QName = atom_to_binary(?FUNCTION_NAME), TestName = atom_to_binary(?FUNCTION_NAME),
ok = declare_queue(QName, <<"quorum">>, Config), ok = run_jms_test(TestName, [], Config).
ok = run_jms_test(TestName, [{"-Dqueue=~ts", [rabbitmq_amqp_address:queue(QName)]}], Config),
ok = delete_queue(QName, Config).
%% Send different message types from JMS client to Erlang AMQP 1.0 client. %% Send different message types from JMS client to Erlang AMQP 1.0 client.
message_types_jms_to_amqp(Config) -> message_types_jms_to_amqp(Config) ->
@ -133,10 +131,8 @@ message_types_jms_to_amqp(Config) ->
ok = run_jms_test(TestName, [], Config). ok = run_jms_test(TestName, [], Config).
temporary_queue_rpc(Config) -> temporary_queue_rpc(Config) ->
TestName = QName = atom_to_binary(?FUNCTION_NAME), TestName = atom_to_binary(?FUNCTION_NAME),
ok = declare_queue(QName, <<"classic">>, Config), ok = run_jms_test(TestName, [], Config).
ok = run_jms_test(TestName, [{"-Dqueue=~ts", [rabbitmq_amqp_address:queue(QName)]}], Config),
ok = delete_queue(QName, Config).
temporary_queue_delete(Config) -> temporary_queue_delete(Config) ->
TestName = atom_to_binary(?FUNCTION_NAME), TestName = atom_to_binary(?FUNCTION_NAME),

View File

@ -89,6 +89,26 @@
<style>GOOGLE</style> <style>GOOGLE</style>
</googleJavaFormat> </googleJavaFormat>
</java> </java>
<ratchetFrom>origin/main</ratchetFrom>
<licenseHeader>
<content>// The contents of this file are subject to the Mozilla Public License
// Version 2.0 (the "License"); you may not use this file except in
// compliance with the License. You may obtain a copy of the License
// at https://www.mozilla.org/en-US/MPL/2.0/
//
// Software distributed under the License is distributed on an "AS IS"
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
// the License for the specific language governing rights and
// limitations under the License.
//
// The Original Code is RabbitMQ.
//
// The Initial Developer of the Original Code is Pivotal Software, Inc.
// Copyright (c) $YEAR Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc.
// and/or its subsidiaries. All rights reserved.
//
</content>
</licenseHeader>
</configuration> </configuration>
</plugin> </plugin>

View File

@ -14,7 +14,6 @@
// Copyright (c) 2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. // Copyright (c) 2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc.
// and/or its subsidiaries. All rights reserved. // and/or its subsidiaries. All rights reserved.
// //
package com.rabbitmq.amqp.tests.jms; package com.rabbitmq.amqp.tests.jms;
import static com.rabbitmq.amqp.tests.jms.Cli.startBroker; import static com.rabbitmq.amqp.tests.jms.Cli.startBroker;
@ -41,12 +40,12 @@ import org.junit.jupiter.api.Timeout;
@JmsTestInfrastructure @JmsTestInfrastructure
public class JmsConnectionTest { public class JmsConnectionTest {
String destination; ConnectionFactory factory;
@Test @Test
@Timeout(30) @Timeout(30)
public void testCreateConnection() throws Exception { public void testCreateConnection() throws Exception {
try (Connection connection = connection()) { try (Connection connection = factory.createConnection()) {
assertNotNull(connection); assertNotNull(connection);
} }
} }
@ -54,7 +53,7 @@ public class JmsConnectionTest {
@Test @Test
@Timeout(30) @Timeout(30)
public void testCreateConnectionAndStart() throws Exception { public void testCreateConnectionAndStart() throws Exception {
try (Connection connection = connection()) { try (Connection connection = factory.createConnection()) {
assertNotNull(connection); assertNotNull(connection);
connection.start(); connection.start();
} }
@ -65,7 +64,6 @@ public class JmsConnectionTest {
// Currently not supported by RabbitMQ. // Currently not supported by RabbitMQ.
@Disabled @Disabled
public void testCreateWithDuplicateClientIdFails() throws Exception { public void testCreateWithDuplicateClientIdFails() throws Exception {
JmsConnectionFactory factory = (JmsConnectionFactory) connectionFactory();
JmsConnection connection1 = (JmsConnection) factory.createConnection(); JmsConnection connection1 = (JmsConnection) factory.createConnection();
connection1.setClientID("Test"); connection1.setClientID("Test");
assertNotNull(connection1); assertNotNull(connection1);
@ -89,7 +87,7 @@ public class JmsConnectionTest {
assertThrows( assertThrows(
JMSException.class, JMSException.class,
() -> { () -> {
try (Connection connection = connection()) { try (Connection connection = factory.createConnection()) {
connection.setClientID("Test"); connection.setClientID("Test");
connection.start(); connection.start();
connection.setClientID("NewTest"); connection.setClientID("NewTest");
@ -100,9 +98,10 @@ public class JmsConnectionTest {
@Test @Test
@Timeout(30) @Timeout(30)
public void testCreateConnectionAsSystemAdmin() throws Exception { public void testCreateConnectionAsSystemAdmin() throws Exception {
JmsConnectionFactory factory = (JmsConnectionFactory) connectionFactory(); JmsConnectionFactory f = (JmsConnectionFactory) factory;
factory.setUsername(adminUsername());
factory.setPassword(adminPassword()); f.setUsername(adminUsername());
f.setPassword(adminPassword());
try (Connection connection = factory.createConnection()) { try (Connection connection = factory.createConnection()) {
assertNotNull(connection); assertNotNull(connection);
connection.start(); connection.start();
@ -112,8 +111,7 @@ public class JmsConnectionTest {
@Test @Test
@Timeout(30) @Timeout(30)
public void testCreateConnectionCallSystemAdmin() throws Exception { public void testCreateConnectionCallSystemAdmin() throws Exception {
try (Connection connection = try (Connection connection = factory.createConnection(adminUsername(), adminPassword())) {
connectionFactory().createConnection(adminUsername(), adminPassword())) {
assertNotNull(connection); assertNotNull(connection);
connection.start(); connection.start();
} }
@ -121,13 +119,13 @@ public class JmsConnectionTest {
@Test @Test
@Timeout(30) @Timeout(30)
public void testCreateConnectionAsUnknwonUser() { public void testCreateConnectionAsUnknownUser() {
assertThrows( assertThrows(
JMSSecurityException.class, JMSSecurityException.class,
() -> { () -> {
JmsConnectionFactory factory = (JmsConnectionFactory) connectionFactory(); JmsConnectionFactory f = (JmsConnectionFactory) factory;
factory.setUsername("unknown"); f.setUsername("unknown");
factory.setPassword("unknown"); f.setPassword("unknown");
try (Connection connection = factory.createConnection()) { try (Connection connection = factory.createConnection()) {
assertNotNull(connection); assertNotNull(connection);
connection.start(); connection.start();
@ -137,11 +135,11 @@ public class JmsConnectionTest {
@Test @Test
@Timeout(30) @Timeout(30)
public void testCreateConnectionCallUnknwonUser() { public void testCreateConnectionCallUnknownUser() {
assertThrows( assertThrows(
JMSSecurityException.class, JMSSecurityException.class,
() -> { () -> {
try (Connection connection = connectionFactory().createConnection("unknown", "unknown")) { try (Connection connection = factory.createConnection("unknown", "unknown")) {
assertNotNull(connection); assertNotNull(connection);
connection.start(); connection.start();
} }
@ -150,11 +148,10 @@ public class JmsConnectionTest {
@Test @Test
@Timeout(30) @Timeout(30)
public void testBrokerStopWontHangConnectionClose() throws Exception { public void testBrokerStopWontHangConnectionClose(Queue queue) throws Exception {
Connection connection = connection(); Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = queue(destination);
connection.start(); connection.start();
MessageProducer producer = session.createProducer(queue); MessageProducer producer = session.createProducer(queue);
@ -179,7 +176,7 @@ public class JmsConnectionTest {
@Timeout(60) @Timeout(60)
public void testConnectionExceptionBrokerStop() throws Exception { public void testConnectionExceptionBrokerStop() throws Exception {
final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1);
try (Connection connection = connection()) { try (Connection connection = factory.createConnection()) {
connection.setExceptionListener(exception -> latch.countDown()); connection.setExceptionListener(exception -> latch.countDown());
connection.start(); connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

View File

@ -14,11 +14,9 @@
// Copyright (c) 2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. // Copyright (c) 2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc.
// and/or its subsidiaries. All rights reserved. // and/or its subsidiaries. All rights reserved.
// //
package com.rabbitmq.amqp.tests.jms; package com.rabbitmq.amqp.tests.jms;
import static com.rabbitmq.amqp.tests.jms.TestUtils.brokerUri; import static com.rabbitmq.amqp.tests.jms.TestUtils.brokerUri;
import static com.rabbitmq.amqp.tests.jms.TestUtils.connection;
import static org.junit.jupiter.api.Assertions.*; import static org.junit.jupiter.api.Assertions.*;
import static org.junit.jupiter.api.Assertions.fail; import static org.junit.jupiter.api.Assertions.fail;
@ -35,13 +33,16 @@ import org.junit.jupiter.api.Timeout;
* Based on * Based on
* https://github.com/apache/qpid-jms/tree/main/qpid-jms-interop-tests/qpid-jms-activemq-tests. * https://github.com/apache/qpid-jms/tree/main/qpid-jms-interop-tests/qpid-jms-activemq-tests.
*/ */
@JmsTestInfrastructure
public class JmsTemporaryQueueTest { public class JmsTemporaryQueueTest {
ConnectionFactory factory;
Connection connection; Connection connection;
@BeforeEach @BeforeEach
void init() throws JMSException { void init() throws JMSException {
connection = connection(); connection = factory.createConnection();
} }
@AfterEach @AfterEach

View File

@ -1,3 +1,19 @@
// The contents of this file are subject to the Mozilla Public License
// Version 2.0 (the "License"); you may not use this file except in
// compliance with the License. You may obtain a copy of the License
// at https://www.mozilla.org/en-US/MPL/2.0/
//
// Software distributed under the License is distributed on an "AS IS"
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
// the License for the specific language governing rights and
// limitations under the License.
//
// The Original Code is RabbitMQ.
//
// The Initial Developer of the Original Code is Pivotal Software, Inc.
// Copyright (c) 2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc.
// and/or its subsidiaries. All rights reserved.
//
package com.rabbitmq.amqp.tests.jms; package com.rabbitmq.amqp.tests.jms;
import static com.rabbitmq.amqp.tests.jms.TestUtils.protonClient; import static com.rabbitmq.amqp.tests.jms.TestUtils.protonClient;
@ -5,209 +21,175 @@ import static com.rabbitmq.amqp.tests.jms.TestUtils.protonConnection;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.*; import static org.junit.jupiter.api.Assertions.*;
import jakarta.jms.*;
import java.util.*;
import java.util.concurrent.TimeUnit;
import javax.naming.Context;
import com.rabbitmq.qpid.protonj2.client.Client; import com.rabbitmq.qpid.protonj2.client.Client;
import com.rabbitmq.qpid.protonj2.client.Delivery; import com.rabbitmq.qpid.protonj2.client.Delivery;
import com.rabbitmq.qpid.protonj2.client.Receiver; import com.rabbitmq.qpid.protonj2.client.Receiver;
import jakarta.jms.*;
import jakarta.jms.Queue; import jakarta.jms.Queue;
import java.util.*;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@JmsTestInfrastructure @JmsTestInfrastructure
public class JmsTest { public class JmsTest {
private javax.naming.Context getContext() throws Exception{ ConnectionFactory factory;
// Configure a JNDI initial context, see
// https://github.com/apache/qpid-jms/blob/main/qpid-jms-docs/Configuration.md#configuring-a-jndi-initialcontext
Hashtable<Object, Object> env = new Hashtable<>();
env.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
String uri = System.getProperty("rmq_broker_uri", "amqp://localhost:5672"); // https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#jakarta-messaging-message-types
// For a list of options, see @Test
// https://github.com/apache/qpid-jms/blob/main/qpid-jms-docs/Configuration.md#jms-configuration-options public void message_types_jms_to_jms(Queue queue) throws Exception {
uri = uri + "?jms.clientID=my-client-id"; try (Connection connection = factory.createConnection()) {
env.put("connectionfactory.myConnection", uri); Session session = connection.createSession();
MessageProducer producer = session.createProducer(queue);
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
String queueName = System.getProperty("queue"); // TextMessage
if (queueName != null) { String msg1 = "msg1";
env.put("queue.myQueue", queueName); TextMessage textMessage = session.createTextMessage(msg1);
} producer.send(textMessage);
TextMessage receivedTextMessage = (TextMessage) consumer.receive(5000);
assertEquals(msg1, receivedTextMessage.getText());
javax.naming.Context context = new javax.naming.InitialContext(env); // BytesMessage
return context; String msg2 = "msg2";
BytesMessage bytesMessage = session.createBytesMessage();
bytesMessage.writeUTF(msg2);
producer.send(bytesMessage);
BytesMessage receivedBytesMessage = (BytesMessage) consumer.receive(5000);
assertEquals(msg2, receivedBytesMessage.readUTF());
// MapMessage
MapMessage mapMessage = session.createMapMessage();
mapMessage.setString("key1", "value");
mapMessage.setBoolean("key2", true);
mapMessage.setDouble("key3", 1.0);
mapMessage.setLong("key4", 1L);
producer.send(mapMessage);
MapMessage receivedMapMessage = (MapMessage) consumer.receive(5000);
assertEquals("value", receivedMapMessage.getString("key1"));
assertEquals(true, receivedMapMessage.getBoolean("key2"));
assertEquals(1.0, receivedMapMessage.getDouble("key3"));
assertEquals(1L, receivedMapMessage.getLong("key4"));
// StreamMessage
StreamMessage streamMessage = session.createStreamMessage();
streamMessage.writeString("value");
streamMessage.writeBoolean(true);
streamMessage.writeDouble(1.0);
streamMessage.writeLong(1L);
producer.send(streamMessage);
StreamMessage receivedStreamMessage = (StreamMessage) consumer.receive(5000);
assertEquals("value", receivedStreamMessage.readString());
assertEquals(true, receivedStreamMessage.readBoolean());
assertEquals(1.0, receivedStreamMessage.readDouble());
assertEquals(1L, receivedStreamMessage.readLong());
// ObjectMessage
ObjectMessage objectMessage = session.createObjectMessage();
ArrayList<Integer> list = new ArrayList<>(Arrays.asList(1, 2, 3));
objectMessage.setObject(list);
producer.send(objectMessage);
ObjectMessage receivedObjectMessage = (ObjectMessage) consumer.receive(5000);
assertEquals(list, receivedObjectMessage.getObject());
}
}
@Test
public void message_types_jms_to_amqp(Queue queue) throws Exception {
String msg1 = "msg1🥕";
try (Connection connection = factory.createConnection()) {
Session session = connection.createSession();
MessageProducer producer = session.createProducer(queue);
// TextMessage
TextMessage textMessage = session.createTextMessage(msg1);
producer.send(textMessage);
// MapMessage
MapMessage mapMessage = session.createMapMessage();
mapMessage.setString("key1", "value");
mapMessage.setBoolean("key2", true);
mapMessage.setDouble("key3", -1.1);
mapMessage.setLong("key4", -1L);
producer.send(mapMessage);
// StreamMessage
StreamMessage streamMessage = session.createStreamMessage();
streamMessage.writeString("value");
streamMessage.writeBoolean(true);
streamMessage.writeDouble(-1.1);
streamMessage.writeLong(-1L);
producer.send(streamMessage);
} }
// https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#jakarta-messaging-message-types try (Client client = protonClient();
@Test com.rabbitmq.qpid.protonj2.client.Connection amqpConnection = protonConnection(client)) {
public void message_types_jms_to_jms() throws Exception { Receiver receiver = amqpConnection.openReceiver(queue.getQueueName());
Context context = getContext(); Delivery delivery = receiver.receive(10, TimeUnit.SECONDS);
ConnectionFactory factory = (ConnectionFactory) context.lookup("myConnection"); assertNotNull(delivery);
assertEquals(msg1, delivery.message().body());
try (Connection connection = factory.createConnection()) { delivery = receiver.receive(10, TimeUnit.SECONDS);
Session session = connection.createSession(); assertNotNull(delivery);
Destination queue = (Destination) context.lookup("myQueue"); com.rabbitmq.qpid.protonj2.client.Message<Map<String, Object>> mapMessage =
MessageProducer producer = session.createProducer(queue); delivery.message();
MessageConsumer consumer = session.createConsumer(queue); assertThat(mapMessage.body())
connection.start(); .containsEntry("key1", "value")
.containsEntry("key2", true)
.containsEntry("key3", -1.1)
.containsEntry("key4", -1L);
// TextMessage delivery = receiver.receive(10, TimeUnit.SECONDS);
String msg1 = "msg1"; assertNotNull(delivery);
TextMessage textMessage = session.createTextMessage(msg1); com.rabbitmq.qpid.protonj2.client.Message<List<Object>> listMessage = delivery.message();
producer.send(textMessage); assertThat(listMessage.body()).containsExactly("value", true, -1.1, -1L);
TextMessage receivedTextMessage = (TextMessage) consumer.receive(5000);
assertEquals(msg1, receivedTextMessage.getText());
// BytesMessage
String msg2 = "msg2";
BytesMessage bytesMessage = session.createBytesMessage();
bytesMessage.writeUTF(msg2);
producer.send(bytesMessage);
BytesMessage receivedBytesMessage = (BytesMessage) consumer.receive(5000);
assertEquals(msg2, receivedBytesMessage.readUTF());
// MapMessage
MapMessage mapMessage = session.createMapMessage();
mapMessage.setString("key1", "value");
mapMessage.setBoolean("key2", true);
mapMessage.setDouble("key3", 1.0);
mapMessage.setLong("key4", 1L);
producer.send(mapMessage);
MapMessage receivedMapMessage = (MapMessage) consumer.receive(5000);
assertEquals("value", receivedMapMessage.getString("key1"));
assertEquals(true, receivedMapMessage.getBoolean("key2"));
assertEquals(1.0, receivedMapMessage.getDouble("key3"));
assertEquals(1L, receivedMapMessage.getLong("key4"));
// StreamMessage
StreamMessage streamMessage = session.createStreamMessage();
streamMessage.writeString("value");
streamMessage.writeBoolean(true);
streamMessage.writeDouble(1.0);
streamMessage.writeLong(1L);
producer.send(streamMessage);
StreamMessage receivedStreamMessage = (StreamMessage) consumer.receive(5000);
assertEquals("value", receivedStreamMessage.readString());
assertEquals(true, receivedStreamMessage.readBoolean());
assertEquals(1.0, receivedStreamMessage.readDouble());
assertEquals(1L, receivedStreamMessage.readLong());
// ObjectMessage
ObjectMessage objectMessage = session.createObjectMessage();
ArrayList<Integer> list = new ArrayList<>(Arrays.asList(1, 2, 3));
objectMessage.setObject(list);
producer.send(objectMessage);
ObjectMessage receivedObjectMessage = (ObjectMessage) consumer.receive(5000);
assertEquals(list, receivedObjectMessage.getObject());
}
}
String destination;
@Test
public void message_types_jms_to_amqp() throws Exception {
Context context = getContext();
ConnectionFactory factory = (ConnectionFactory) context.lookup("myConnection");
Queue queue = TestUtils.queue(destination);
String msg1 = "msg1🥕";
try (Connection connection = factory.createConnection()) {
Session session = connection.createSession();
MessageProducer producer = session.createProducer(queue);
// TextMessage
TextMessage textMessage = session.createTextMessage(msg1);
producer.send(textMessage);
// MapMessage
MapMessage mapMessage = session.createMapMessage();
mapMessage.setString("key1", "value");
mapMessage.setBoolean("key2", true);
mapMessage.setDouble("key3", -1.1);
mapMessage.setLong("key4", -1L);
producer.send(mapMessage);
// StreamMessage
StreamMessage streamMessage = session.createStreamMessage();
streamMessage.writeString("value");
streamMessage.writeBoolean(true);
streamMessage.writeDouble(-1.1);
streamMessage.writeLong(-1L);
producer.send(streamMessage);
}
try (Client client = protonClient();
com.rabbitmq.qpid.protonj2.client.Connection amqpConnection = protonConnection(client)) {
Receiver receiver = amqpConnection.openReceiver(queue.getQueueName());
Delivery delivery = receiver.receive(10, TimeUnit.SECONDS);
assertNotNull(delivery);
assertEquals(msg1, delivery.message().body());
delivery = receiver.receive(10, TimeUnit.SECONDS);
assertNotNull(delivery);
com.rabbitmq.qpid.protonj2.client.Message<Map<String, Object>> mapMessage = delivery.message();
assertThat(mapMessage.body()).containsEntry("key1", "value")
.containsEntry("key2", true)
.containsEntry("key3", -1.1)
.containsEntry("key4", -1L);
delivery = receiver.receive(10, TimeUnit.SECONDS);
assertNotNull(delivery);
com.rabbitmq.qpid.protonj2.client.Message<List<Object>> listMessage = delivery.message();
assertThat(listMessage.body()).containsExactly("value", true, -1.1, -1L);
} }
} }
// Test that Request/reply pattern using a TemporaryQueue works. // Test that Request/reply pattern using a TemporaryQueue works.
// https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#requestreply-pattern-using-a-temporaryqueue-jakarta-ee // https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#requestreply-pattern-using-a-temporaryqueue-jakarta-ee
@Test @Test
public void temporary_queue_rpc() throws Exception { public void temporary_queue_rpc(Queue requestQueue) throws Exception {
Context context = getContext(); try (JMSContext clientContext = factory.createContext()) {
ConnectionFactory factory = (ConnectionFactory) context.lookup("myConnection"); Destination responseQueue = clientContext.createTemporaryQueue();
JMSConsumer clientConsumer = clientContext.createConsumer(responseQueue);
try (JMSContext clientContext = factory.createContext()) { TextMessage clientRequestMessage = clientContext.createTextMessage("hello");
Destination responseQueue = clientContext.createTemporaryQueue(); clientContext
JMSConsumer clientConsumer = clientContext.createConsumer(responseQueue); .createProducer()
.setJMSReplyTo(responseQueue)
.send(requestQueue, clientRequestMessage);
Destination requestQueue = (Destination) context.lookup("myQueue"); // Let's open a new connection to simulate the RPC server.
TextMessage clientRequestMessage = clientContext.createTextMessage("hello"); try (JMSContext serverContext = factory.createContext()) {
clientContext.createProducer(). JMSConsumer serverConsumer = serverContext.createConsumer(requestQueue);
setJMSReplyTo(responseQueue). TextMessage serverRequestMessage = (TextMessage) serverConsumer.receive(5000);
send(requestQueue, clientRequestMessage);
// Let's open a new connection to simulate the RPC server. TextMessage serverResponseMessage =
try (JMSContext serverContext = factory.createContext()) { serverContext.createTextMessage(serverRequestMessage.getText().toUpperCase());
JMSConsumer serverConsumer = serverContext.createConsumer(requestQueue); serverContext
TextMessage serverRequestMessage = (TextMessage) serverConsumer.receive(5000); .createProducer()
.send(serverRequestMessage.getJMSReplyTo(), serverResponseMessage);
}
TextMessage serverResponseMessage = serverContext.createTextMessage( TextMessage clientResponseMessage = (TextMessage) clientConsumer.receive(5000);
serverRequestMessage.getText().toUpperCase()); assertEquals("HELLO", clientResponseMessage.getText());
serverContext.createProducer().
send(serverRequestMessage.getJMSReplyTo(), serverResponseMessage);
}
TextMessage clientResponseMessage = (TextMessage) clientConsumer.receive(5000);
assertEquals("HELLO", clientResponseMessage.getText());
}
} }
}
// Test that a temporary queue can be deleted. // Test that a temporary queue can be deleted.
@Test @Test
public void temporary_queue_delete() throws Exception { public void temporary_queue_delete() throws Exception {
Context context = getContext(); try (JMSContext clientContext = factory.createContext()) {
ConnectionFactory factory = (ConnectionFactory) context.lookup("myConnection"); TemporaryQueue queue = clientContext.createTemporaryQueue();
queue.delete();
try (JMSContext clientContext = factory.createContext()) { try {
TemporaryQueue queue = clientContext.createTemporaryQueue(); clientContext.createProducer().send(queue, "hello");
queue.delete(); fail("should not be able to create producer for deleted temporary queue");
try { } catch (IllegalStateRuntimeException expectedException) {
clientContext.createProducer().send(queue, "hello"); assertEquals("Temporary destination has been deleted", expectedException.getMessage());
fail("should not be able to create producer for deleted temporary queue"); }
} catch (IllegalStateRuntimeException expectedException) {
assertEquals("Temporary destination has been deleted", expectedException.getMessage());
}
}
} }
}
} }

View File

@ -11,19 +11,29 @@
// The Original Code is RabbitMQ. // The Original Code is RabbitMQ.
// //
// The Initial Developer of the Original Code is Pivotal Software, Inc. // The Initial Developer of the Original Code is Pivotal Software, Inc.
// Copyright (c) 2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. // Copyright (c) 2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc.
// and/or its subsidiaries. All rights reserved.
// //
package com.rabbitmq.amqp.tests.jms; package com.rabbitmq.amqp.tests.jms;
import static java.util.Collections.singletonMap;
import com.rabbitmq.client.amqp.Connection; import com.rabbitmq.client.amqp.Connection;
import com.rabbitmq.client.amqp.Environment; import com.rabbitmq.client.amqp.Environment;
import com.rabbitmq.client.amqp.impl.AmqpEnvironmentBuilder; import com.rabbitmq.client.amqp.impl.AmqpEnvironmentBuilder;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.Queue;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.lang.reflect.Parameter;
import java.util.Collections;
import java.util.Optional;
import java.util.function.Predicate;
import javax.naming.Context;
import javax.naming.NamingException;
import org.junit.jupiter.api.extension.*; import org.junit.jupiter.api.extension.*;
final class JmsTestInfrastructureExtension final class JmsTestInfrastructureExtension
implements BeforeAllCallback, AfterAllCallback, BeforeEachCallback, AfterEachCallback { implements BeforeEachCallback, AfterEachCallback, ParameterResolver {
private static final ExtensionContext.Namespace NAMESPACE = private static final ExtensionContext.Namespace NAMESPACE =
ExtensionContext.Namespace.create(JmsTestInfrastructureExtension.class); ExtensionContext.Namespace.create(JmsTestInfrastructureExtension.class);
@ -32,52 +42,87 @@ final class JmsTestInfrastructureExtension
return extensionContext.getRoot().getStore(NAMESPACE); return extensionContext.getRoot().getStore(NAMESPACE);
} }
private static Field field(Class<?> cls, String name) { private static Optional<Field> field(Class<?> cls, Predicate<Field> predicate) {
Field field = null; for (Field field : cls.getDeclaredFields()) {
while (field == null && cls != null) { if (predicate.test(field)) {
try { return Optional.of(field);
field = cls.getDeclaredField(name);
} catch (NoSuchFieldException e) {
cls = cls.getSuperclass();
} }
} }
return field; return Optional.empty();
} }
@Override private static boolean isQueue(Parameter parameter) {
public void beforeAll(ExtensionContext context) { return Queue.class.isAssignableFrom(parameter.getType());
} }
@Override @Override
public void beforeEach(ExtensionContext context) throws Exception { public void beforeEach(ExtensionContext context) throws Exception {
Field field = field(context.getTestInstance().get().getClass(), "destination"); if (context.getTestMethod().isPresent()) {
if (field != null) { String queueName;
field.setAccessible(true); for (Parameter parameter : context.getTestMethod().get().getParameters()) {
String destination = TestUtils.name(context); if (isQueue(parameter)) {
field.set(context.getTestInstance().get(), destination); queueName = TestUtils.name(context);
try (Environment environment = new AmqpEnvironmentBuilder().build(); String queueAddress = TestUtils.queueAddress(queueName);
Connection connection = environment.connectionBuilder().uri(TestUtils.brokerUri()).build()) { try (Environment environment = new AmqpEnvironmentBuilder().build();
connection.management().queue(destination).declare(); Connection connection =
environment.connectionBuilder().uri(TestUtils.brokerUri()).build()) {
connection.management().queue(queueName).declare();
}
store(context).put("queueName", queueName);
Context jndiContext = TestUtils.context(singletonMap("queue." + queueName, queueAddress));
store(context).put("jndiContext", jndiContext);
}
}
if (context.getTestInstance().isPresent()) {
Optional<Field> connectionFactoryField =
field(
context.getTestInstance().get().getClass(),
field -> ConnectionFactory.class.isAssignableFrom(field.getType()));
if (connectionFactoryField.isPresent()) {
connectionFactoryField.get().setAccessible(true);
Context jndiContext =
store(context)
.getOrComputeIfAbsent(
"jndiContext", k -> TestUtils.context(Collections.emptyMap()), Context.class);
ConnectionFactory connectionFactory =
(ConnectionFactory) jndiContext.lookup("testConnectionFactory");
connectionFactoryField.get().set(context.getTestInstance().get(), connectionFactory);
}
} }
} }
} }
@Override @Override
public void afterEach(ExtensionContext context) throws Exception { public void afterEach(ExtensionContext context) {
Field field = field(context.getTestInstance().get().getClass(), "destination"); String queueName = store(context).remove("queueName", String.class);
if (field != null) { if (queueName != null) {
field.setAccessible(true);
String destination = (String) field.get(context.getTestInstance().get());
try (Environment environment = new AmqpEnvironmentBuilder().build(); try (Environment environment = new AmqpEnvironmentBuilder().build();
Connection connection = environment.connectionBuilder().uri(TestUtils.brokerUri()).build()) { Connection connection =
connection.management().queueDelete(destination); environment.connectionBuilder().uri(TestUtils.brokerUri()).build()) {
connection.management().queueDelete(queueName);
} }
} }
store(context).remove("jndiContext", Context.class);
} }
@Override @Override
public void afterAll(ExtensionContext context) { public boolean supportsParameter(
ParameterContext parameterContext, ExtensionContext extensionContext)
throws ParameterResolutionException {
return isQueue(parameterContext.getParameter());
}
@Override
public Object resolveParameter(
ParameterContext parameterContext, ExtensionContext extensionContext)
throws ParameterResolutionException {
String queueName = store(extensionContext).get("queueName", String.class);
Context jndiContext = store(extensionContext).get("jndiContext", Context.class);
try {
return jndiContext.lookup(queueName);
} catch (NamingException e) {
throw new RuntimeException(e);
}
} }
} }

View File

@ -14,7 +14,6 @@
// Copyright (c) 2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. // Copyright (c) 2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc.
// and/or its subsidiaries. All rights reserved. // and/or its subsidiaries. All rights reserved.
// //
package com.rabbitmq.amqp.tests.jms; package com.rabbitmq.amqp.tests.jms;
import static java.lang.String.format; import static java.lang.String.format;
@ -22,16 +21,14 @@ import static java.lang.String.format;
import com.rabbitmq.qpid.protonj2.client.Client; import com.rabbitmq.qpid.protonj2.client.Client;
import com.rabbitmq.qpid.protonj2.client.ConnectionOptions; import com.rabbitmq.qpid.protonj2.client.ConnectionOptions;
import com.rabbitmq.qpid.protonj2.client.exceptions.ClientException; import com.rabbitmq.qpid.protonj2.client.exceptions.ClientException;
import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.JMSException;
import jakarta.jms.Queue;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.Hashtable;
import java.util.Map;
import java.util.UUID; import java.util.UUID;
import org.apache.qpid.jms.JmsConnectionFactory; import javax.naming.Context;
import org.apache.qpid.jms.JmsQueue; import javax.naming.NamingException;
import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtensionContext; import org.junit.jupiter.api.extension.ExtensionContext;
@ -72,17 +69,30 @@ final class TestUtils {
return "guest"; return "guest";
} }
static ConnectionFactory connectionFactory() { static Context context(Map<String, String> extraEnv) {
return new JmsConnectionFactory(brokerUri()); // Configure a JNDI initial context, see
// https://github.com/apache/qpid-jms/blob/main/qpid-jms-docs/Configuration.md#configuring-a-jndi-initialcontext
Hashtable<Object, Object> env = new Hashtable<>();
env.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
String uri = brokerUri();
// For a list of options, see
// https://github.com/apache/qpid-jms/blob/main/qpid-jms-docs/Configuration.md#jms-configuration-options
uri = uri + "?jms.clientID=my-client-id";
env.put("connectionfactory.testConnectionFactory", uri);
env.putAll(extraEnv);
try {
return new javax.naming.InitialContext(env);
} catch (NamingException e) {
throw new RuntimeException(e);
}
} }
static Connection connection() throws JMSException { static String queueAddress(String name) {
return connectionFactory().createConnection();
}
static Queue queue(String name) {
// no path encoding, use names with e.g. ASCII characters only // no path encoding, use names with e.g. ASCII characters only
return new JmsQueue("/queues/" + name); return "/queues/" + name;
} }
static Client protonClient() { static Client protonClient() {