Add support for RabbitStreamTemplate

See gh-28060
This commit is contained in:
Eddú Meléndez 2021-09-17 17:41:04 -05:00 committed by Stephane Nicoll
parent cd0bb8e68f
commit 3952046132
4 changed files with 204 additions and 0 deletions

View File

@ -42,6 +42,7 @@ import org.springframework.util.StringUtils;
* @author Gary Russell
* @author Artsiom Yudovin
* @author Franjo Zilic
* @author Eddú Meléndez
* @since 1.0.0
*/
@ConfigurationProperties(prefix = "spring.rabbitmq")
@ -1194,6 +1195,11 @@ public class RabbitProperties {
*/
private String password;
/**
* Name of the stream.
*/
private String name;
public String getHost() {
return this.host;
}
@ -1226,6 +1232,14 @@ public class RabbitProperties {
this.password = password;
}
public String getName() {
return this.name;
}
public void setName(String name) {
this.name = name;
}
}
}

View File

@ -23,6 +23,7 @@ import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.EnvironmentBuilder;
import org.springframework.amqp.rabbit.config.ContainerCustomizer;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
@ -33,11 +34,16 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.rabbit.stream.config.StreamRabbitListenerContainerFactory;
import org.springframework.rabbit.stream.listener.ConsumerCustomizer;
import org.springframework.rabbit.stream.listener.StreamListenerContainer;
import org.springframework.rabbit.stream.producer.ProducerCustomizer;
import org.springframework.rabbit.stream.producer.RabbitStreamOperations;
import org.springframework.rabbit.stream.producer.RabbitStreamTemplate;
import org.springframework.rabbit.stream.support.converter.StreamMessageConverter;
/**
* Configuration for Spring RabbitMQ Stream plugin support.
*
* @author Gary Russell
* @author Eddú Meléndez
*/
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(StreamRabbitListenerContainerFactory.class)
@ -78,4 +84,28 @@ class RabbitStreamConfiguration {
return (value) -> (value != null) ? value : fallback.get();
}
@Bean
@ConditionalOnMissingBean
RabbitStreamTemplateConfigurer rabbitStreamTemplateConfigurer(RabbitProperties properties,
ObjectProvider<MessageConverter> messageConverter,
ObjectProvider<StreamMessageConverter> streamMessageConverter,
ObjectProvider<ProducerCustomizer> producerCustomizer) {
RabbitStreamTemplateConfigurer configurer = new RabbitStreamTemplateConfigurer();
configurer.setMessageConverter(messageConverter.getIfUnique());
configurer.setStreamMessageConverter(streamMessageConverter.getIfUnique());
configurer.setProducerCustomizer(producerCustomizer.getIfUnique());
return configurer;
}
@Bean
@ConditionalOnMissingBean(RabbitStreamOperations.class)
@ConditionalOnProperty(prefix = "spring.rabbitmq.stream", name = "name")
RabbitStreamTemplate rabbitStreamTemplate(Environment rabbitStreamEnvironment, RabbitProperties properties,
RabbitStreamTemplateConfigurer configurer) {
RabbitStreamTemplate template = new RabbitStreamTemplate(rabbitStreamEnvironment,
properties.getStream().getName());
configurer.configure(template);
return template;
}
}

View File

@ -0,0 +1,74 @@
/*
* Copyright 2012-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.autoconfigure.amqp;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.boot.context.properties.PropertyMapper;
import org.springframework.rabbit.stream.producer.ProducerCustomizer;
import org.springframework.rabbit.stream.producer.RabbitStreamTemplate;
import org.springframework.rabbit.stream.support.converter.StreamMessageConverter;
/**
* Configure {@link RabbitStreamTemplate} with sensible defaults.
*
* @author Eddú Meléndez
* @since 2.7.0
*/
public class RabbitStreamTemplateConfigurer {
private MessageConverter messageConverter;
private StreamMessageConverter streamMessageConverter;
private ProducerCustomizer producerCustomizer;
/**
* Set the {@link MessageConverter} to use or {@code null} if the out-of-the-box
* converter should be used.
* @param messageConverter the {@link MessageConverter}
*/
public void setMessageConverter(MessageConverter messageConverter) {
this.messageConverter = messageConverter;
}
public void setStreamMessageConverter(StreamMessageConverter streamMessageConverter) {
this.streamMessageConverter = streamMessageConverter;
}
public void setProducerCustomizer(ProducerCustomizer producerCustomizer) {
this.producerCustomizer = producerCustomizer;
}
/**
* Configure the specified {@link RabbitStreamTemplate}. The template can be further
* tuned and default settings can be overridden.
* @param template the {@link RabbitStreamTemplate} instance to configure
*/
public void configure(RabbitStreamTemplate template) {
PropertyMapper map = PropertyMapper.get();
if (this.messageConverter != null) {
template.setMessageConverter(this.messageConverter);
}
if (this.streamMessageConverter != null) {
template.setStreamConverter(this.streamMessageConverter);
}
if (this.producerCustomizer != null) {
template.setProducerCustomizer(this.producerCustomizer);
}
}
}

View File

@ -26,6 +26,7 @@ import org.springframework.amqp.rabbit.config.ContainerCustomizer;
import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.context.annotation.Bean;
@ -33,6 +34,10 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.rabbit.stream.config.StreamRabbitListenerContainerFactory;
import org.springframework.rabbit.stream.listener.ConsumerCustomizer;
import org.springframework.rabbit.stream.listener.StreamListenerContainer;
import org.springframework.rabbit.stream.producer.ProducerCustomizer;
import org.springframework.rabbit.stream.producer.RabbitStreamTemplate;
import org.springframework.rabbit.stream.support.converter.StreamMessageConverter;
import org.springframework.test.util.ReflectionTestUtils;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
@ -44,6 +49,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
*
* @author Gary Russell
* @author Andy Wilkinson
* @author Eddú Meléndez
*/
class RabbitStreamConfigurationTests {
@ -149,6 +155,66 @@ class RabbitStreamConfigurationTests {
verify(builder).password("confidential");
}
@Test
void testDefaultRabbitStreamTemplateConfiguration() {
this.contextRunner
.withPropertyValues("spring.rabbitmq.listener.type:stream", "spring.rabbitmq.stream.name:stream-test")
.run((context) -> {
RabbitStreamTemplate streamTemplate = context.getBean(RabbitStreamTemplate.class);
String streamName = (String) ReflectionTestUtils.getField(streamTemplate, "streamName");
assertThat(context).hasSingleBean(RabbitStreamTemplate.class);
assertThat(streamName).isEqualTo("stream-test");
});
}
@Test
void testDefaultRabbitStreamTemplateConfigurationWithoutStreamName() {
this.contextRunner.withPropertyValues("spring.rabbitmq.listener.type:stream")
.run((context) -> assertThat(context).doesNotHaveBean(RabbitStreamTemplate.class));
}
@Test
void testRabbitStreamTemplateConfigurationWithCustomMessageConverter() {
this.contextRunner.withUserConfiguration(RabbitAutoConfigurationTests.MessageConvertersConfiguration.class)
.withPropertyValues("spring.rabbitmq.listener.type:stream", "spring.rabbitmq.stream.name:stream-test")
.run((context) -> {
RabbitStreamTemplate streamTemplate = context.getBean(RabbitStreamTemplate.class);
MessageConverter messageConverter = (MessageConverter) ReflectionTestUtils.getField(streamTemplate,
"messageConverter");
String streamName = (String) ReflectionTestUtils.getField(streamTemplate, "streamName");
assertThat(messageConverter).isSameAs(context.getBean(MessageConverter.class));
assertThat(streamName).isEqualTo("stream-test");
});
}
@Test
void testRabbitStreamTemplateConfigurationWithCustomStreamMessageConverter() {
this.contextRunner.withUserConfiguration(StreamMessageConverterConfiguration.class)
.withPropertyValues("spring.rabbitmq.listener.type:stream", "spring.rabbitmq.stream.name:stream-test")
.run((context) -> {
RabbitStreamTemplate streamTemplate = context.getBean(RabbitStreamTemplate.class);
StreamMessageConverter messageConverter = (StreamMessageConverter) ReflectionTestUtils
.getField(streamTemplate, "streamConverter");
String streamName = (String) ReflectionTestUtils.getField(streamTemplate, "streamName");
assertThat(messageConverter).isSameAs(context.getBean(StreamMessageConverter.class));
assertThat(streamName).isEqualTo("stream-test");
});
}
@Test
void testRabbitStreamTemplateConfigurationWithCustomProducerCustomizer() {
this.contextRunner.withUserConfiguration(ProducerCustomizerConfiguration.class)
.withPropertyValues("spring.rabbitmq.listener.type:stream", "spring.rabbitmq.stream.name:stream-test")
.run((context) -> {
RabbitStreamTemplate streamTemplate = context.getBean(RabbitStreamTemplate.class);
ProducerCustomizer producerCustomizer = (ProducerCustomizer) ReflectionTestUtils
.getField(streamTemplate, "producerCustomizer");
String streamName = (String) ReflectionTestUtils.getField(streamTemplate, "streamName");
assertThat(producerCustomizer).isSameAs(context.getBean(ProducerCustomizer.class));
assertThat(streamName).isEqualTo("stream-test");
});
}
@Configuration(proxyBeanMethods = false)
static class TestConfiguration {
@ -196,4 +262,24 @@ class RabbitStreamConfigurationTests {
}
@Configuration(proxyBeanMethods = false)
static class StreamMessageConverterConfiguration {
@Bean
StreamMessageConverter myStreamMessageConverter() {
return mock(StreamMessageConverter.class);
}
}
@Configuration(proxyBeanMethods = false)
static class ProducerCustomizerConfiguration {
@Bean
ProducerCustomizer myProducerCustomizer() {
return mock(ProducerCustomizer.class);
}
}
}