Merge pull request #17587 from dreis2211
* pr/17587: Polish "Adjust to changes in Spring AMQP 2.2 snapshots" Adjust to changes in Spring AMQP 2.2 snapshots Closes gh-17587
This commit is contained in:
commit
afe3ab7517
|
@ -24,6 +24,7 @@ import java.util.List;
|
|||
import org.springframework.amqp.core.AcknowledgeMode;
|
||||
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory.CacheMode;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.boot.context.properties.DeprecatedConfigurationProperty;
|
||||
import org.springframework.boot.convert.DurationUnit;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
@ -662,10 +663,10 @@ public class RabbitProperties {
|
|||
private Integer maxConcurrency;
|
||||
|
||||
/**
|
||||
* Number of messages to be processed between acks when the acknowledge mode is
|
||||
* AUTO. If larger than prefetch, prefetch will be increased to this value.
|
||||
* Batch size, expressed as the number of physical messages, to be used by the
|
||||
* container.
|
||||
*/
|
||||
private Integer transactionSize;
|
||||
private Integer batchSize;
|
||||
|
||||
/**
|
||||
* Whether to fail if the queues declared by the container are not available on
|
||||
|
@ -690,12 +691,34 @@ public class RabbitProperties {
|
|||
this.maxConcurrency = maxConcurrency;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the number of messages processed in one transaction.
|
||||
* @return the number of messages
|
||||
* @deprecated since 2.2.0 in favor of {@link SimpleContainer#getBatchSize()}
|
||||
*/
|
||||
@DeprecatedConfigurationProperty(replacement = "spring.rabbitmq.listener.simple.batch-size")
|
||||
@Deprecated
|
||||
public Integer getTransactionSize() {
|
||||
return this.transactionSize;
|
||||
return getBatchSize();
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the number of messages processed in one transaction.
|
||||
* @param transactionSize the number of messages
|
||||
* @deprecated since 2.2.0 in favor of
|
||||
* {@link SimpleContainer#setBatchSize(Integer)}
|
||||
*/
|
||||
@Deprecated
|
||||
public void setTransactionSize(Integer transactionSize) {
|
||||
this.transactionSize = transactionSize;
|
||||
setBatchSize(transactionSize);
|
||||
}
|
||||
|
||||
public Integer getBatchSize() {
|
||||
return this.batchSize;
|
||||
}
|
||||
|
||||
public void setBatchSize(Integer batchSize) {
|
||||
this.batchSize = batchSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -38,7 +38,7 @@ public final class SimpleRabbitListenerContainerFactoryConfigurer
|
|||
configure(factory, connectionFactory, config);
|
||||
map.from(config::getConcurrency).whenNonNull().to(factory::setConcurrentConsumers);
|
||||
map.from(config::getMaxConcurrency).whenNonNull().to(factory::setMaxConcurrentConsumers);
|
||||
map.from(config::getTransactionSize).whenNonNull().to(factory::setTxSize);
|
||||
map.from(config::getBatchSize).whenNonNull().to(factory::setBatchSize);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
package org.springframework.boot.autoconfigure.amqp;
|
||||
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import javax.net.ssl.SSLContext;
|
||||
|
@ -125,6 +126,7 @@ class RabbitAutoConfigurationTests {
|
|||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
void testConnectionFactoryWithOverrides() {
|
||||
this.contextRunner.withUserConfiguration(TestConfiguration.class)
|
||||
.withPropertyValues("spring.rabbitmq.host:remote-server", "spring.rabbitmq.port:9000",
|
||||
|
@ -137,15 +139,16 @@ class RabbitAutoConfigurationTests {
|
|||
assertThat(connectionFactory.getVirtualHost()).isEqualTo("/vhost");
|
||||
com.rabbitmq.client.ConnectionFactory rcf = connectionFactory.getRabbitConnectionFactory();
|
||||
assertThat(rcf.getConnectionTimeout()).isEqualTo(123);
|
||||
assertThat((Address[]) ReflectionTestUtils.getField(connectionFactory, "addresses")).hasSize(1);
|
||||
assertThat((List<Address>) ReflectionTestUtils.getField(connectionFactory, "addresses")).hasSize(1);
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
void testConnectionFactoryWithCustomConnectionNameStrategy() {
|
||||
this.contextRunner.withUserConfiguration(ConnectionNameStrategyConfiguration.class).run((context) -> {
|
||||
CachingConnectionFactory connectionFactory = context.getBean(CachingConnectionFactory.class);
|
||||
Address[] addresses = (Address[]) ReflectionTestUtils.getField(connectionFactory, "addresses");
|
||||
List<Address> addresses = (List<Address>) ReflectionTestUtils.getField(connectionFactory, "addresses");
|
||||
assertThat(addresses).hasSize(1);
|
||||
com.rabbitmq.client.ConnectionFactory rcf = mock(com.rabbitmq.client.ConnectionFactory.class);
|
||||
given(rcf.newConnection(isNull(), eq(addresses), anyString())).willReturn(mock(Connection.class));
|
||||
|
@ -363,8 +366,8 @@ class RabbitAutoConfigurationTests {
|
|||
this.contextRunner.withUserConfiguration(TestConfiguration5.class).run((context) -> {
|
||||
SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory = context
|
||||
.getBean("rabbitListenerContainerFactory", SimpleRabbitListenerContainerFactory.class);
|
||||
rabbitListenerContainerFactory.setTxSize(10);
|
||||
verify(rabbitListenerContainerFactory).setTxSize(10);
|
||||
rabbitListenerContainerFactory.setBatchSize(10);
|
||||
verify(rabbitListenerContainerFactory).setBatchSize(10);
|
||||
assertThat(rabbitListenerContainerFactory.getAdviceChain()).isNull();
|
||||
});
|
||||
}
|
||||
|
@ -385,7 +388,7 @@ class RabbitAutoConfigurationTests {
|
|||
"spring.rabbitmq.listener.simple.prefetch:40",
|
||||
"spring.rabbitmq.listener.simple.defaultRequeueRejected:false",
|
||||
"spring.rabbitmq.listener.simple.idleEventInterval:5",
|
||||
"spring.rabbitmq.listener.simple.transactionSize:20",
|
||||
"spring.rabbitmq.listener.simple.batchSize:20",
|
||||
"spring.rabbitmq.listener.simple.missingQueuesFatal:false")
|
||||
.run((context) -> {
|
||||
SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory = context
|
||||
|
@ -393,12 +396,24 @@ class RabbitAutoConfigurationTests {
|
|||
assertThat(rabbitListenerContainerFactory).hasFieldOrPropertyWithValue("concurrentConsumers", 5);
|
||||
assertThat(rabbitListenerContainerFactory).hasFieldOrPropertyWithValue("maxConcurrentConsumers",
|
||||
10);
|
||||
assertThat(rabbitListenerContainerFactory).hasFieldOrPropertyWithValue("txSize", 20);
|
||||
assertThat(rabbitListenerContainerFactory).hasFieldOrPropertyWithValue("batchSize", 20);
|
||||
assertThat(rabbitListenerContainerFactory).hasFieldOrPropertyWithValue("missingQueuesFatal", false);
|
||||
checkCommonProps(context, rabbitListenerContainerFactory);
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
@Deprecated
|
||||
void testRabbitListenerContainerFactoryWithDeprecatedTransactionSizeStillWorks() {
|
||||
this.contextRunner
|
||||
.withUserConfiguration(MessageConvertersConfiguration.class, MessageRecoverersConfiguration.class)
|
||||
.withPropertyValues("spring.rabbitmq.listener.simple.transactionSize:20").run((context) -> {
|
||||
SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory = context
|
||||
.getBean("rabbitListenerContainerFactory", SimpleRabbitListenerContainerFactory.class);
|
||||
assertThat(rabbitListenerContainerFactory).hasFieldOrPropertyWithValue("batchSize", 20);
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
void testDirectRabbitListenerContainerFactoryWithCustomSettings() {
|
||||
this.contextRunner
|
||||
|
|
Loading…
Reference in New Issue