Polish "Add auto-configuration for spring-rabbit-stream"
See gh-27480
This commit is contained in:
parent
9784838229
commit
7a0fe0f95f
|
@ -897,8 +897,8 @@ public class RabbitProperties {
|
||||||
public static class StreamContainer extends BaseContainer {
|
public static class StreamContainer extends BaseContainer {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* When true, the container factory will create containers that support listeners
|
* Whether the container will support listeners that consume native stream
|
||||||
* that consume native stream messages instead of spring-amqp {@code Message}s.
|
* messages instead of Spring AMQP messages.
|
||||||
*/
|
*/
|
||||||
boolean nativeListener;
|
boolean nativeListener;
|
||||||
|
|
||||||
|
@ -1172,24 +1172,24 @@ public class RabbitProperties {
|
||||||
public static final class Stream {
|
public static final class Stream {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Host of a RabbitMQ instance with the Stream Plugin Enabled
|
* Host of a RabbitMQ instance with the Stream plugin enabled.
|
||||||
*/
|
*/
|
||||||
private String host = "localhost";
|
private String host = "localhost";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Stream port of a RabbitMQ instance with the Stream Plugin Enabled
|
* Stream port of a RabbitMQ instance with the Stream plugin enabled.
|
||||||
*/
|
*/
|
||||||
private int port = DEFAULT_STREAM_PORT;
|
private int port = DEFAULT_STREAM_PORT;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Login user to authenticate to the broker. If not set
|
* Login user to authenticate to the broker. When not set,
|
||||||
* {@code spring.rabbitmq.username} will be used.
|
* spring.rabbitmq.username is used.
|
||||||
*/
|
*/
|
||||||
private String username;
|
private String username;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Login password to authenticate to the broker. If not set
|
* Login password to authenticate to the broker. When not set
|
||||||
* {@code spring.rabbitmq.password} will be used.
|
* spring.rabbitmq.password is used.
|
||||||
*/
|
*/
|
||||||
private String password;
|
private String password;
|
||||||
|
|
||||||
|
|
|
@ -16,15 +16,18 @@
|
||||||
|
|
||||||
package org.springframework.boot.autoconfigure.amqp;
|
package org.springframework.boot.autoconfigure.amqp;
|
||||||
|
|
||||||
|
import java.util.function.Function;
|
||||||
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
import com.rabbitmq.stream.Environment;
|
import com.rabbitmq.stream.Environment;
|
||||||
import com.rabbitmq.stream.EnvironmentBuilder;
|
import com.rabbitmq.stream.EnvironmentBuilder;
|
||||||
|
|
||||||
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
|
|
||||||
import org.springframework.amqp.rabbit.config.ContainerCustomizer;
|
import org.springframework.amqp.rabbit.config.ContainerCustomizer;
|
||||||
import org.springframework.beans.factory.ObjectProvider;
|
import org.springframework.beans.factory.ObjectProvider;
|
||||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||||
|
import org.springframework.boot.context.properties.PropertyMapper;
|
||||||
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Bean;
|
||||||
import org.springframework.context.annotation.Configuration;
|
import org.springframework.context.annotation.Configuration;
|
||||||
import org.springframework.rabbit.stream.config.StreamRabbitListenerContainerFactory;
|
import org.springframework.rabbit.stream.config.StreamRabbitListenerContainerFactory;
|
||||||
|
@ -32,7 +35,7 @@ import org.springframework.rabbit.stream.listener.ConsumerCustomizer;
|
||||||
import org.springframework.rabbit.stream.listener.StreamListenerContainer;
|
import org.springframework.rabbit.stream.listener.StreamListenerContainer;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Configuration for Spring RabbitMQ Stream Plugin support.
|
* Configuration for Spring RabbitMQ Stream plugin support.
|
||||||
*
|
*
|
||||||
* @author Gary Russell
|
* @author Gary Russell
|
||||||
*/
|
*/
|
||||||
|
@ -46,7 +49,6 @@ class RabbitStreamConfiguration {
|
||||||
StreamRabbitListenerContainerFactory streamRabbitListenerContainerFactory(Environment rabbitStreamEnvironment,
|
StreamRabbitListenerContainerFactory streamRabbitListenerContainerFactory(Environment rabbitStreamEnvironment,
|
||||||
RabbitProperties properties, ObjectProvider<ConsumerCustomizer> consumerCustomizer,
|
RabbitProperties properties, ObjectProvider<ConsumerCustomizer> consumerCustomizer,
|
||||||
ObjectProvider<ContainerCustomizer<StreamListenerContainer>> containerCustomizer) {
|
ObjectProvider<ContainerCustomizer<StreamListenerContainer>> containerCustomizer) {
|
||||||
|
|
||||||
StreamRabbitListenerContainerFactory factory = new StreamRabbitListenerContainerFactory(
|
StreamRabbitListenerContainerFactory factory = new StreamRabbitListenerContainerFactory(
|
||||||
rabbitStreamEnvironment);
|
rabbitStreamEnvironment);
|
||||||
factory.setNativeListener(properties.getListener().getStream().isNativeListener());
|
factory.setNativeListener(properties.getListener().getStream().isNativeListener());
|
||||||
|
@ -58,24 +60,22 @@ class RabbitStreamConfiguration {
|
||||||
@Bean(name = "rabbitStreamEnvironment")
|
@Bean(name = "rabbitStreamEnvironment")
|
||||||
@ConditionalOnMissingBean(name = "rabbitStreamEnvironment")
|
@ConditionalOnMissingBean(name = "rabbitStreamEnvironment")
|
||||||
Environment rabbitStreamEnvironment(RabbitProperties properties) {
|
Environment rabbitStreamEnvironment(RabbitProperties properties) {
|
||||||
|
return configure(Environment.builder(), properties).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
static EnvironmentBuilder configure(EnvironmentBuilder builder, RabbitProperties properties) {
|
||||||
|
builder.lazyInitialization(true);
|
||||||
RabbitProperties.Stream stream = properties.getStream();
|
RabbitProperties.Stream stream = properties.getStream();
|
||||||
String username = stream.getUsername();
|
PropertyMapper mapper = PropertyMapper.get();
|
||||||
if (username == null) {
|
mapper.from(stream.getHost()).to(builder::host);
|
||||||
username = properties.getUsername();
|
mapper.from(stream.getPort()).to(builder::port);
|
||||||
|
mapper.from(stream.getUsername()).as(withFallback(properties::getUsername)).whenNonNull().to(builder::username);
|
||||||
|
mapper.from(stream.getPassword()).as(withFallback(properties::getPassword)).whenNonNull().to(builder::password);
|
||||||
|
return builder;
|
||||||
}
|
}
|
||||||
String password = stream.getPassword();
|
|
||||||
if (password == null) {
|
private static Function<String, String> withFallback(Supplier<String> fallback) {
|
||||||
password = properties.getPassword();
|
return (value) -> (value != null) ? value : fallback.get();
|
||||||
}
|
|
||||||
EnvironmentBuilder builder = Environment.builder().lazyInitialization(true).host(stream.getHost())
|
|
||||||
.port(stream.getPort());
|
|
||||||
if (username != null) {
|
|
||||||
builder.username(username);
|
|
||||||
}
|
|
||||||
if (password != null) {
|
|
||||||
builder.password(password);
|
|
||||||
}
|
|
||||||
return builder.build();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,12 +16,16 @@
|
||||||
|
|
||||||
package org.springframework.boot.autoconfigure.amqp;
|
package org.springframework.boot.autoconfigure.amqp;
|
||||||
|
|
||||||
|
import com.rabbitmq.stream.Environment;
|
||||||
|
import com.rabbitmq.stream.EnvironmentBuilder;
|
||||||
|
import org.assertj.core.api.InstanceOfAssertFactories;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
|
|
||||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||||
import org.springframework.amqp.rabbit.config.ContainerCustomizer;
|
import org.springframework.amqp.rabbit.config.ContainerCustomizer;
|
||||||
|
import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
|
||||||
|
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
|
||||||
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;
|
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;
|
||||||
import org.springframework.beans.DirectFieldAccessor;
|
|
||||||
import org.springframework.boot.autoconfigure.AutoConfigurations;
|
import org.springframework.boot.autoconfigure.AutoConfigurations;
|
||||||
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
|
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
|
||||||
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Bean;
|
||||||
|
@ -31,11 +35,15 @@ import org.springframework.rabbit.stream.listener.ConsumerCustomizer;
|
||||||
import org.springframework.rabbit.stream.listener.StreamListenerContainer;
|
import org.springframework.rabbit.stream.listener.StreamListenerContainer;
|
||||||
|
|
||||||
import static org.assertj.core.api.Assertions.assertThat;
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests for {@link RabbitStreamConfiguration}.
|
* Tests for {@link RabbitStreamConfiguration}.
|
||||||
*
|
*
|
||||||
* @author Gary Russell
|
* @author Gary Russell
|
||||||
|
* @author Andy Wilkinson
|
||||||
*/
|
*/
|
||||||
class RabbitStreamConfigurationTests {
|
class RabbitStreamConfigurationTests {
|
||||||
|
|
||||||
|
@ -43,26 +51,106 @@ class RabbitStreamConfigurationTests {
|
||||||
.withConfiguration(AutoConfigurations.of(RabbitAutoConfiguration.class));
|
.withConfiguration(AutoConfigurations.of(RabbitAutoConfiguration.class));
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void testContainerType() {
|
@SuppressWarnings("unchecked")
|
||||||
|
void whenListenerTypeIsStreamThenStreamListenerContainerAndEnvironmentAreAutoConfigured() {
|
||||||
this.contextRunner.withUserConfiguration(TestConfiguration.class)
|
this.contextRunner.withUserConfiguration(TestConfiguration.class)
|
||||||
.withPropertyValues("spring.rabbitmq.listener.type:stream",
|
.withPropertyValues("spring.rabbitmq.listener.type:stream").run((context) -> {
|
||||||
"spring.rabbitmq.listener.stream.native-listener:true")
|
|
||||||
.run((context) -> {
|
|
||||||
RabbitListenerEndpointRegistry registry = context.getBean(RabbitListenerEndpointRegistry.class);
|
RabbitListenerEndpointRegistry registry = context.getBean(RabbitListenerEndpointRegistry.class);
|
||||||
assertThat(registry.getListenerContainer("test")).isInstanceOf(StreamListenerContainer.class);
|
MessageListenerContainer listenerContainer = registry.getListenerContainer("test");
|
||||||
assertThat(new DirectFieldAccessor(registry.getListenerContainer("test"))
|
assertThat(listenerContainer).isInstanceOf(StreamListenerContainer.class);
|
||||||
.getPropertyValue("consumerCustomizer")).isNotNull();
|
assertThat(listenerContainer).extracting("consumerCustomizer").isNotNull();
|
||||||
assertThat(new DirectFieldAccessor(context.getBean(StreamRabbitListenerContainerFactory.class))
|
assertThat(context.getBean(StreamRabbitListenerContainerFactory.class))
|
||||||
.getPropertyValue("nativeListener")).isEqualTo(Boolean.TRUE);
|
.extracting("nativeListener", InstanceOfAssertFactories.BOOLEAN).isFalse();
|
||||||
assertThat(context.getBean(TestConfiguration.class).containerCustomizerCalled).isTrue();
|
verify(context.getBean(ContainerCustomizer.class)).configure(listenerContainer);
|
||||||
|
assertThat(context).hasSingleBean(Environment.class);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@Configuration(proxyBeanMethods = false)
|
@Test
|
||||||
@EnableRabbit
|
void whenNativeListenerIsEnabledThenContainerFactoryIsConfiguredToUseNativeListeners() {
|
||||||
static class TestConfiguration {
|
this.contextRunner
|
||||||
|
.withPropertyValues("spring.rabbitmq.listener.type:stream",
|
||||||
|
"spring.rabbitmq.listener.stream.native-listener:true")
|
||||||
|
.run((context) -> assertThat(context.getBean(StreamRabbitListenerContainerFactory.class))
|
||||||
|
.extracting("nativeListener", InstanceOfAssertFactories.BOOLEAN).isTrue());
|
||||||
|
}
|
||||||
|
|
||||||
boolean containerCustomizerCalled;
|
@Test
|
||||||
|
void whenCustomEnvironmenIsDefinedThenAutoConfiguredEnvironmentBacksOff() {
|
||||||
|
this.contextRunner.withUserConfiguration(CustomEnvironmentConfiguration.class).run((context) -> {
|
||||||
|
assertThat(context).hasSingleBean(Environment.class);
|
||||||
|
assertThat(context.getBean(Environment.class))
|
||||||
|
.isSameAs(context.getBean(CustomEnvironmentConfiguration.class).environment);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void whenCustomMessageListenerContainerIsDefinedThenAutoConfiguredContainerBacksOff() {
|
||||||
|
this.contextRunner.withUserConfiguration(CustomMessageListenerContainerFactoryConfiguration.class)
|
||||||
|
.run((context) -> {
|
||||||
|
assertThat(context).hasSingleBean(RabbitListenerContainerFactory.class);
|
||||||
|
assertThat(context.getBean(RabbitListenerContainerFactory.class)).isSameAs(context.getBean(
|
||||||
|
CustomMessageListenerContainerFactoryConfiguration.class).listenerContainerFactory);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void environmentUsesPropertyDefaultsByDefault() {
|
||||||
|
EnvironmentBuilder builder = mock(EnvironmentBuilder.class);
|
||||||
|
RabbitProperties properties = new RabbitProperties();
|
||||||
|
RabbitStreamConfiguration.configure(builder, properties);
|
||||||
|
verify(builder).port(5552);
|
||||||
|
verify(builder).host("localhost");
|
||||||
|
verify(builder).lazyInitialization(true);
|
||||||
|
verify(builder).username("guest");
|
||||||
|
verify(builder).password("guest");
|
||||||
|
verifyNoMoreInteractions(builder);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void whenStreamPortIsSetThenEnvironmentUsesCustomPort() {
|
||||||
|
EnvironmentBuilder builder = mock(EnvironmentBuilder.class);
|
||||||
|
RabbitProperties properties = new RabbitProperties();
|
||||||
|
properties.getStream().setPort(5553);
|
||||||
|
RabbitStreamConfiguration.configure(builder, properties);
|
||||||
|
verify(builder).port(5553);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void whenStreamHostIsSetThenEnvironmentUsesCustomHost() {
|
||||||
|
EnvironmentBuilder builder = mock(EnvironmentBuilder.class);
|
||||||
|
RabbitProperties properties = new RabbitProperties();
|
||||||
|
properties.getStream().setHost("stream.rabbit.example.com");
|
||||||
|
RabbitStreamConfiguration.configure(builder, properties);
|
||||||
|
verify(builder).host("stream.rabbit.example.com");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void whenStreamCredentialsAreNotSetThenEnvironmentUsesRabbitCredentials() {
|
||||||
|
EnvironmentBuilder builder = mock(EnvironmentBuilder.class);
|
||||||
|
RabbitProperties properties = new RabbitProperties();
|
||||||
|
properties.setUsername("alice");
|
||||||
|
properties.setPassword("secret");
|
||||||
|
RabbitStreamConfiguration.configure(builder, properties);
|
||||||
|
verify(builder).username("alice");
|
||||||
|
verify(builder).password("secret");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void whenStreamCredentialsAreSetThenEnvironmentUsesStreamCredentials() {
|
||||||
|
EnvironmentBuilder builder = mock(EnvironmentBuilder.class);
|
||||||
|
RabbitProperties properties = new RabbitProperties();
|
||||||
|
properties.setUsername("alice");
|
||||||
|
properties.setPassword("secret");
|
||||||
|
properties.getStream().setUsername("bob");
|
||||||
|
properties.getStream().setPassword("confidential");
|
||||||
|
RabbitStreamConfiguration.configure(builder, properties);
|
||||||
|
verify(builder).username("bob");
|
||||||
|
verify(builder).password("confidential");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Configuration(proxyBeanMethods = false)
|
||||||
|
static class TestConfiguration {
|
||||||
|
|
||||||
@RabbitListener(id = "test", queues = "stream", autoStartup = "false")
|
@RabbitListener(id = "test", queues = "stream", autoStartup = "false")
|
||||||
void listen(String in) {
|
void listen(String in) {
|
||||||
|
@ -70,13 +158,40 @@ class RabbitStreamConfigurationTests {
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
ConsumerCustomizer consumerCustomizer() {
|
ConsumerCustomizer consumerCustomizer() {
|
||||||
return (id, consumer) -> {
|
return mock(ConsumerCustomizer.class);
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
ContainerCustomizer<StreamListenerContainer> containerCustomizer() {
|
ContainerCustomizer<StreamListenerContainer> containerCustomizer() {
|
||||||
return (container) -> this.containerCustomizerCalled = true;
|
return mock(ContainerCustomizer.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Configuration(proxyBeanMethods = false)
|
||||||
|
static class CustomEnvironmentConfiguration {
|
||||||
|
|
||||||
|
private final Environment environment = Environment.builder().lazyInitialization(true).build();
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
Environment rabbitStreamEnvironment() {
|
||||||
|
return this.environment;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Configuration(proxyBeanMethods = false)
|
||||||
|
static class CustomMessageListenerContainerFactoryConfiguration {
|
||||||
|
|
||||||
|
@SuppressWarnings("rawtypes")
|
||||||
|
private final RabbitListenerContainerFactory listenerContainerFactory = mock(
|
||||||
|
RabbitListenerContainerFactory.class);
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
RabbitListenerContainerFactory<MessageListenerContainer> rabbitListenerContainerFactory() {
|
||||||
|
return this.listenerContainerFactory;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue