Polish "Add support for RabbitStreamTemplate"
See gh-28060
This commit is contained in:
parent
3952046132
commit
6b6da22f2c
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2012-2021 the original author or authors.
|
||||
* Copyright 2012-2022 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2012-2021 the original author or authors.
|
||||
* Copyright 2012-2022 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
@ -69,21 +69,6 @@ class RabbitStreamConfiguration {
|
|||
return configure(Environment.builder(), properties).build();
|
||||
}
|
||||
|
||||
static EnvironmentBuilder configure(EnvironmentBuilder builder, RabbitProperties properties) {
|
||||
builder.lazyInitialization(true);
|
||||
RabbitProperties.Stream stream = properties.getStream();
|
||||
PropertyMapper mapper = PropertyMapper.get();
|
||||
mapper.from(stream.getHost()).to(builder::host);
|
||||
mapper.from(stream.getPort()).to(builder::port);
|
||||
mapper.from(stream.getUsername()).as(withFallback(properties::getUsername)).whenNonNull().to(builder::username);
|
||||
mapper.from(stream.getPassword()).as(withFallback(properties::getPassword)).whenNonNull().to(builder::password);
|
||||
return builder;
|
||||
}
|
||||
|
||||
private static Function<String, String> withFallback(Supplier<String> fallback) {
|
||||
return (value) -> (value != null) ? value : fallback.get();
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnMissingBean
|
||||
RabbitStreamTemplateConfigurer rabbitStreamTemplateConfigurer(RabbitProperties properties,
|
||||
|
@ -108,4 +93,19 @@ class RabbitStreamConfiguration {
|
|||
return template;
|
||||
}
|
||||
|
||||
static EnvironmentBuilder configure(EnvironmentBuilder builder, RabbitProperties properties) {
|
||||
builder.lazyInitialization(true);
|
||||
RabbitProperties.Stream stream = properties.getStream();
|
||||
PropertyMapper mapper = PropertyMapper.get();
|
||||
mapper.from(stream.getHost()).to(builder::host);
|
||||
mapper.from(stream.getPort()).to(builder::port);
|
||||
mapper.from(stream.getUsername()).as(withFallback(properties::getUsername)).whenNonNull().to(builder::username);
|
||||
mapper.from(stream.getPassword()).as(withFallback(properties::getPassword)).whenNonNull().to(builder::password);
|
||||
return builder;
|
||||
}
|
||||
|
||||
private static Function<String, String> withFallback(Supplier<String> fallback) {
|
||||
return (value) -> (value != null) ? value : fallback.get();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2012-2021 the original author or authors.
|
||||
* Copyright 2012-2022 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
@ -17,7 +17,6 @@
|
|||
package org.springframework.boot.autoconfigure.amqp;
|
||||
|
||||
import org.springframework.amqp.support.converter.MessageConverter;
|
||||
import org.springframework.boot.context.properties.PropertyMapper;
|
||||
import org.springframework.rabbit.stream.producer.ProducerCustomizer;
|
||||
import org.springframework.rabbit.stream.producer.RabbitStreamTemplate;
|
||||
import org.springframework.rabbit.stream.support.converter.StreamMessageConverter;
|
||||
|
@ -45,10 +44,19 @@ public class RabbitStreamTemplateConfigurer {
|
|||
this.messageConverter = messageConverter;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the {@link StreamMessageConverter} to use or {@code null} if the out-of-the-box
|
||||
* stream message converter should be used.
|
||||
* @param streamMessageConverter the {@link StreamMessageConverter}
|
||||
*/
|
||||
public void setStreamMessageConverter(StreamMessageConverter streamMessageConverter) {
|
||||
this.streamMessageConverter = streamMessageConverter;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the {@link ProducerCustomizer} instances to use.
|
||||
* @param producerCustomizer the producer customizer
|
||||
*/
|
||||
public void setProducerCustomizer(ProducerCustomizer producerCustomizer) {
|
||||
this.producerCustomizer = producerCustomizer;
|
||||
}
|
||||
|
@ -59,7 +67,6 @@ public class RabbitStreamTemplateConfigurer {
|
|||
* @param template the {@link RabbitStreamTemplate} instance to configure
|
||||
*/
|
||||
public void configure(RabbitStreamTemplate template) {
|
||||
PropertyMapper map = PropertyMapper.get();
|
||||
if (this.messageConverter != null) {
|
||||
template.setMessageConverter(this.messageConverter);
|
||||
}
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2012-2021 the original author or authors.
|
||||
* Copyright 2012-2022 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
@ -31,13 +31,13 @@ import org.springframework.boot.autoconfigure.AutoConfigurations;
|
|||
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.context.annotation.Primary;
|
||||
import org.springframework.rabbit.stream.config.StreamRabbitListenerContainerFactory;
|
||||
import org.springframework.rabbit.stream.listener.ConsumerCustomizer;
|
||||
import org.springframework.rabbit.stream.listener.StreamListenerContainer;
|
||||
import org.springframework.rabbit.stream.producer.ProducerCustomizer;
|
||||
import org.springframework.rabbit.stream.producer.RabbitStreamTemplate;
|
||||
import org.springframework.rabbit.stream.support.converter.StreamMessageConverter;
|
||||
import org.springframework.test.util.ReflectionTestUtils;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
@ -160,10 +160,9 @@ class RabbitStreamConfigurationTests {
|
|||
this.contextRunner
|
||||
.withPropertyValues("spring.rabbitmq.listener.type:stream", "spring.rabbitmq.stream.name:stream-test")
|
||||
.run((context) -> {
|
||||
RabbitStreamTemplate streamTemplate = context.getBean(RabbitStreamTemplate.class);
|
||||
String streamName = (String) ReflectionTestUtils.getField(streamTemplate, "streamName");
|
||||
assertThat(context).hasSingleBean(RabbitStreamTemplate.class);
|
||||
assertThat(streamName).isEqualTo("stream-test");
|
||||
assertThat(context.getBean(RabbitStreamTemplate.class)).hasFieldOrPropertyWithValue("streamName",
|
||||
"stream-test");
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -175,43 +174,39 @@ class RabbitStreamConfigurationTests {
|
|||
|
||||
@Test
|
||||
void testRabbitStreamTemplateConfigurationWithCustomMessageConverter() {
|
||||
this.contextRunner.withUserConfiguration(RabbitAutoConfigurationTests.MessageConvertersConfiguration.class)
|
||||
this.contextRunner.withUserConfiguration(MessageConvertersConfiguration.class)
|
||||
.withPropertyValues("spring.rabbitmq.listener.type:stream", "spring.rabbitmq.stream.name:stream-test")
|
||||
.run((context) -> {
|
||||
assertThat(context).hasSingleBean(RabbitStreamTemplate.class);
|
||||
RabbitStreamTemplate streamTemplate = context.getBean(RabbitStreamTemplate.class);
|
||||
MessageConverter messageConverter = (MessageConverter) ReflectionTestUtils.getField(streamTemplate,
|
||||
"messageConverter");
|
||||
String streamName = (String) ReflectionTestUtils.getField(streamTemplate, "streamName");
|
||||
assertThat(messageConverter).isSameAs(context.getBean(MessageConverter.class));
|
||||
assertThat(streamName).isEqualTo("stream-test");
|
||||
assertThat(streamTemplate).hasFieldOrPropertyWithValue("streamName", "stream-test");
|
||||
assertThat(streamTemplate).extracting("messageConverter")
|
||||
.isSameAs(context.getBean(MessageConverter.class));
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
void testRabbitStreamTemplateConfigurationWithCustomStreamMessageConverter() {
|
||||
this.contextRunner.withUserConfiguration(StreamMessageConverterConfiguration.class)
|
||||
this.contextRunner
|
||||
.withBean("myStreamMessageConverter", StreamMessageConverter.class,
|
||||
() -> mock(StreamMessageConverter.class))
|
||||
.withPropertyValues("spring.rabbitmq.listener.type:stream", "spring.rabbitmq.stream.name:stream-test")
|
||||
.run((context) -> {
|
||||
RabbitStreamTemplate streamTemplate = context.getBean(RabbitStreamTemplate.class);
|
||||
StreamMessageConverter messageConverter = (StreamMessageConverter) ReflectionTestUtils
|
||||
.getField(streamTemplate, "streamConverter");
|
||||
String streamName = (String) ReflectionTestUtils.getField(streamTemplate, "streamName");
|
||||
assertThat(messageConverter).isSameAs(context.getBean(StreamMessageConverter.class));
|
||||
assertThat(streamName).isEqualTo("stream-test");
|
||||
assertThat(context).hasSingleBean(RabbitStreamTemplate.class);
|
||||
assertThat(context.getBean(RabbitStreamTemplate.class)).extracting("messageConverter")
|
||||
.isSameAs(context.getBean("myStreamMessageConverter"));
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
void testRabbitStreamTemplateConfigurationWithCustomProducerCustomizer() {
|
||||
this.contextRunner.withUserConfiguration(ProducerCustomizerConfiguration.class)
|
||||
this.contextRunner
|
||||
.withBean("myProducerCustomizer", ProducerCustomizer.class, () -> mock(ProducerCustomizer.class))
|
||||
.withPropertyValues("spring.rabbitmq.listener.type:stream", "spring.rabbitmq.stream.name:stream-test")
|
||||
.run((context) -> {
|
||||
RabbitStreamTemplate streamTemplate = context.getBean(RabbitStreamTemplate.class);
|
||||
ProducerCustomizer producerCustomizer = (ProducerCustomizer) ReflectionTestUtils
|
||||
.getField(streamTemplate, "producerCustomizer");
|
||||
String streamName = (String) ReflectionTestUtils.getField(streamTemplate, "streamName");
|
||||
assertThat(producerCustomizer).isSameAs(context.getBean(ProducerCustomizer.class));
|
||||
assertThat(streamName).isEqualTo("stream-test");
|
||||
assertThat(context).hasSingleBean(RabbitStreamTemplate.class);
|
||||
assertThat(context.getBean(RabbitStreamTemplate.class)).extracting("producerCustomizer")
|
||||
.isSameAs(context.getBean("myProducerCustomizer"));
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -263,21 +258,17 @@ class RabbitStreamConfigurationTests {
|
|||
}
|
||||
|
||||
@Configuration(proxyBeanMethods = false)
|
||||
static class StreamMessageConverterConfiguration {
|
||||
static class MessageConvertersConfiguration {
|
||||
|
||||
@Bean
|
||||
StreamMessageConverter myStreamMessageConverter() {
|
||||
return mock(StreamMessageConverter.class);
|
||||
@Primary
|
||||
MessageConverter myMessageConverter() {
|
||||
return mock(MessageConverter.class);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Configuration(proxyBeanMethods = false)
|
||||
static class ProducerCustomizerConfiguration {
|
||||
|
||||
@Bean
|
||||
ProducerCustomizer myProducerCustomizer() {
|
||||
return mock(ProducerCustomizer.class);
|
||||
MessageConverter anotherMessageConverter() {
|
||||
return mock(MessageConverter.class);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -78,6 +78,24 @@ If you need to create more `RabbitTemplate` instances or if you want to override
|
|||
|
||||
|
||||
|
||||
[[messaging.amqp.sending-stream]]
|
||||
=== Sending a Message To A Stream
|
||||
To send a message to a particular stream, specify the name of the stream, as shown in the following example:
|
||||
|
||||
[source,yaml,indent=0,subs="verbatim",configprops,configblocks]
|
||||
----
|
||||
spring:
|
||||
rabbitmq:
|
||||
stream:
|
||||
name: "my-stream"
|
||||
----
|
||||
|
||||
If a `MessageConverter`, `StreamMessageConverter`, or `ProducerCustomizer` bean is defined, it is associated automatically to the auto-configured `RabbitStreamTemplate`.
|
||||
|
||||
If you need to create more `RabbitStreamTemplate` instances or if you want to override the default, Spring Boot provides a `RabbitStreamTemplateConfigurer` bean that you can use to initialize a `RabbitStreamTemplate` with the same settings as the factories used by the auto-configuration.
|
||||
|
||||
|
||||
|
||||
[[messaging.amqp.receiving]]
|
||||
=== Receiving a Message
|
||||
When the Rabbit infrastructure is present, any bean can be annotated with `@RabbitListener` to create a listener endpoint.
|
||||
|
|
Loading…
Reference in New Issue