Merge pull request #28060 from eddumelendez

* pr/28060:
  Polish "Add support for RabbitStreamTemplate"
  Add support for RabbitStreamTemplate

Closes gh-28060
This commit is contained in:
Stephane Nicoll 2022-01-03 14:11:23 +01:00
commit 3d5ea71ed7
5 changed files with 223 additions and 3 deletions

View File

@ -1,5 +1,5 @@
/*
* Copyright 2012-2021 the original author or authors.
* Copyright 2012-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -42,6 +42,7 @@ import org.springframework.util.StringUtils;
* @author Gary Russell
* @author Artsiom Yudovin
* @author Franjo Zilic
* @author Eddú Meléndez
* @since 1.0.0
*/
@ConfigurationProperties(prefix = "spring.rabbitmq")
@ -1194,6 +1195,11 @@ public class RabbitProperties {
*/
private String password;
/**
* Name of the stream.
*/
private String name;
public String getHost() {
return this.host;
}
@ -1226,6 +1232,14 @@ public class RabbitProperties {
this.password = password;
}
public String getName() {
return this.name;
}
public void setName(String name) {
this.name = name;
}
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2012-2021 the original author or authors.
* Copyright 2012-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -23,6 +23,7 @@ import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.EnvironmentBuilder;
import org.springframework.amqp.rabbit.config.ContainerCustomizer;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
@ -33,11 +34,16 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.rabbit.stream.config.StreamRabbitListenerContainerFactory;
import org.springframework.rabbit.stream.listener.ConsumerCustomizer;
import org.springframework.rabbit.stream.listener.StreamListenerContainer;
import org.springframework.rabbit.stream.producer.ProducerCustomizer;
import org.springframework.rabbit.stream.producer.RabbitStreamOperations;
import org.springframework.rabbit.stream.producer.RabbitStreamTemplate;
import org.springframework.rabbit.stream.support.converter.StreamMessageConverter;
/**
* Configuration for Spring RabbitMQ Stream plugin support.
*
* @author Gary Russell
* @author Eddú Meléndez
*/
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(StreamRabbitListenerContainerFactory.class)
@ -63,6 +69,30 @@ class RabbitStreamConfiguration {
return configure(Environment.builder(), properties).build();
}
@Bean
@ConditionalOnMissingBean
RabbitStreamTemplateConfigurer rabbitStreamTemplateConfigurer(RabbitProperties properties,
ObjectProvider<MessageConverter> messageConverter,
ObjectProvider<StreamMessageConverter> streamMessageConverter,
ObjectProvider<ProducerCustomizer> producerCustomizer) {
RabbitStreamTemplateConfigurer configurer = new RabbitStreamTemplateConfigurer();
configurer.setMessageConverter(messageConverter.getIfUnique());
configurer.setStreamMessageConverter(streamMessageConverter.getIfUnique());
configurer.setProducerCustomizer(producerCustomizer.getIfUnique());
return configurer;
}
@Bean
@ConditionalOnMissingBean(RabbitStreamOperations.class)
@ConditionalOnProperty(prefix = "spring.rabbitmq.stream", name = "name")
RabbitStreamTemplate rabbitStreamTemplate(Environment rabbitStreamEnvironment, RabbitProperties properties,
RabbitStreamTemplateConfigurer configurer) {
RabbitStreamTemplate template = new RabbitStreamTemplate(rabbitStreamEnvironment,
properties.getStream().getName());
configurer.configure(template);
return template;
}
static EnvironmentBuilder configure(EnvironmentBuilder builder, RabbitProperties properties) {
builder.lazyInitialization(true);
RabbitProperties.Stream stream = properties.getStream();

View File

@ -0,0 +1,81 @@
/*
* Copyright 2012-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.autoconfigure.amqp;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.rabbit.stream.producer.ProducerCustomizer;
import org.springframework.rabbit.stream.producer.RabbitStreamTemplate;
import org.springframework.rabbit.stream.support.converter.StreamMessageConverter;
/**
* Configure {@link RabbitStreamTemplate} with sensible defaults.
*
* @author Eddú Meléndez
* @since 2.7.0
*/
public class RabbitStreamTemplateConfigurer {
private MessageConverter messageConverter;
private StreamMessageConverter streamMessageConverter;
private ProducerCustomizer producerCustomizer;
/**
* Set the {@link MessageConverter} to use or {@code null} if the out-of-the-box
* converter should be used.
* @param messageConverter the {@link MessageConverter}
*/
public void setMessageConverter(MessageConverter messageConverter) {
this.messageConverter = messageConverter;
}
/**
* Set the {@link StreamMessageConverter} to use or {@code null} if the out-of-the-box
* stream message converter should be used.
* @param streamMessageConverter the {@link StreamMessageConverter}
*/
public void setStreamMessageConverter(StreamMessageConverter streamMessageConverter) {
this.streamMessageConverter = streamMessageConverter;
}
/**
* Set the {@link ProducerCustomizer} instances to use.
* @param producerCustomizer the producer customizer
*/
public void setProducerCustomizer(ProducerCustomizer producerCustomizer) {
this.producerCustomizer = producerCustomizer;
}
/**
* Configure the specified {@link RabbitStreamTemplate}. The template can be further
* tuned and default settings can be overridden.
* @param template the {@link RabbitStreamTemplate} instance to configure
*/
public void configure(RabbitStreamTemplate template) {
if (this.messageConverter != null) {
template.setMessageConverter(this.messageConverter);
}
if (this.streamMessageConverter != null) {
template.setStreamConverter(this.streamMessageConverter);
}
if (this.producerCustomizer != null) {
template.setProducerCustomizer(this.producerCustomizer);
}
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2012-2021 the original author or authors.
* Copyright 2012-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -26,13 +26,18 @@ import org.springframework.amqp.rabbit.config.ContainerCustomizer;
import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.rabbit.stream.config.StreamRabbitListenerContainerFactory;
import org.springframework.rabbit.stream.listener.ConsumerCustomizer;
import org.springframework.rabbit.stream.listener.StreamListenerContainer;
import org.springframework.rabbit.stream.producer.ProducerCustomizer;
import org.springframework.rabbit.stream.producer.RabbitStreamTemplate;
import org.springframework.rabbit.stream.support.converter.StreamMessageConverter;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
@ -44,6 +49,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
*
* @author Gary Russell
* @author Andy Wilkinson
* @author Eddú Meléndez
*/
class RabbitStreamConfigurationTests {
@ -149,6 +155,61 @@ class RabbitStreamConfigurationTests {
verify(builder).password("confidential");
}
@Test
void testDefaultRabbitStreamTemplateConfiguration() {
this.contextRunner
.withPropertyValues("spring.rabbitmq.listener.type:stream", "spring.rabbitmq.stream.name:stream-test")
.run((context) -> {
assertThat(context).hasSingleBean(RabbitStreamTemplate.class);
assertThat(context.getBean(RabbitStreamTemplate.class)).hasFieldOrPropertyWithValue("streamName",
"stream-test");
});
}
@Test
void testDefaultRabbitStreamTemplateConfigurationWithoutStreamName() {
this.contextRunner.withPropertyValues("spring.rabbitmq.listener.type:stream")
.run((context) -> assertThat(context).doesNotHaveBean(RabbitStreamTemplate.class));
}
@Test
void testRabbitStreamTemplateConfigurationWithCustomMessageConverter() {
this.contextRunner.withUserConfiguration(MessageConvertersConfiguration.class)
.withPropertyValues("spring.rabbitmq.listener.type:stream", "spring.rabbitmq.stream.name:stream-test")
.run((context) -> {
assertThat(context).hasSingleBean(RabbitStreamTemplate.class);
RabbitStreamTemplate streamTemplate = context.getBean(RabbitStreamTemplate.class);
assertThat(streamTemplate).hasFieldOrPropertyWithValue("streamName", "stream-test");
assertThat(streamTemplate).extracting("messageConverter")
.isSameAs(context.getBean(MessageConverter.class));
});
}
@Test
void testRabbitStreamTemplateConfigurationWithCustomStreamMessageConverter() {
this.contextRunner
.withBean("myStreamMessageConverter", StreamMessageConverter.class,
() -> mock(StreamMessageConverter.class))
.withPropertyValues("spring.rabbitmq.listener.type:stream", "spring.rabbitmq.stream.name:stream-test")
.run((context) -> {
assertThat(context).hasSingleBean(RabbitStreamTemplate.class);
assertThat(context.getBean(RabbitStreamTemplate.class)).extracting("messageConverter")
.isSameAs(context.getBean("myStreamMessageConverter"));
});
}
@Test
void testRabbitStreamTemplateConfigurationWithCustomProducerCustomizer() {
this.contextRunner
.withBean("myProducerCustomizer", ProducerCustomizer.class, () -> mock(ProducerCustomizer.class))
.withPropertyValues("spring.rabbitmq.listener.type:stream", "spring.rabbitmq.stream.name:stream-test")
.run((context) -> {
assertThat(context).hasSingleBean(RabbitStreamTemplate.class);
assertThat(context.getBean(RabbitStreamTemplate.class)).extracting("producerCustomizer")
.isSameAs(context.getBean("myProducerCustomizer"));
});
}
@Configuration(proxyBeanMethods = false)
static class TestConfiguration {
@ -196,4 +257,20 @@ class RabbitStreamConfigurationTests {
}
@Configuration(proxyBeanMethods = false)
static class MessageConvertersConfiguration {
@Bean
@Primary
MessageConverter myMessageConverter() {
return mock(MessageConverter.class);
}
@Bean
MessageConverter anotherMessageConverter() {
return mock(MessageConverter.class);
}
}
}

View File

@ -78,6 +78,24 @@ If you need to create more `RabbitTemplate` instances or if you want to override
[[messaging.amqp.sending-stream]]
=== Sending a Message To A Stream
To send a message to a particular stream, specify the name of the stream, as shown in the following example:
[source,yaml,indent=0,subs="verbatim",configprops,configblocks]
----
spring:
rabbitmq:
stream:
name: "my-stream"
----
If a `MessageConverter`, `StreamMessageConverter`, or `ProducerCustomizer` bean is defined, it is associated automatically to the auto-configured `RabbitStreamTemplate`.
If you need to create more `RabbitStreamTemplate` instances or if you want to override the default, Spring Boot provides a `RabbitStreamTemplateConfigurer` bean that you can use to initialize a `RabbitStreamTemplate` with the same settings as the factories used by the auto-configuration.
[[messaging.amqp.receiving]]
=== Receiving a Message
When the Rabbit infrastructure is present, any bean can be annotated with `@RabbitListener` to create a listener endpoint.