Fall back to connection details when configuring Rabbit Streams
Closes gh-42489
This commit is contained in:
parent
05b4edfd2c
commit
95665a4b28
|
@ -1,5 +1,5 @@
|
||||||
/*
|
/*
|
||||||
* Copyright 2012-2023 the original author or authors.
|
* Copyright 2012-2024 the original author or authors.
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
|
@ -65,9 +65,9 @@ class RabbitStreamConfiguration {
|
||||||
|
|
||||||
@Bean(name = "rabbitStreamEnvironment")
|
@Bean(name = "rabbitStreamEnvironment")
|
||||||
@ConditionalOnMissingBean(name = "rabbitStreamEnvironment")
|
@ConditionalOnMissingBean(name = "rabbitStreamEnvironment")
|
||||||
Environment rabbitStreamEnvironment(RabbitProperties properties,
|
Environment rabbitStreamEnvironment(RabbitProperties properties, RabbitConnectionDetails connectionDetails,
|
||||||
ObjectProvider<EnvironmentBuilderCustomizer> customizers) {
|
ObjectProvider<EnvironmentBuilderCustomizer> customizers) {
|
||||||
EnvironmentBuilder builder = configure(Environment.builder(), properties);
|
EnvironmentBuilder builder = configure(Environment.builder(), properties, connectionDetails);
|
||||||
customizers.orderedStream().forEach((customizer) -> customizer.customize(builder));
|
customizers.orderedStream().forEach((customizer) -> customizer.customize(builder));
|
||||||
return builder.build();
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
@ -96,18 +96,29 @@ class RabbitStreamConfiguration {
|
||||||
return template;
|
return template;
|
||||||
}
|
}
|
||||||
|
|
||||||
static EnvironmentBuilder configure(EnvironmentBuilder builder, RabbitProperties properties) {
|
static EnvironmentBuilder configure(EnvironmentBuilder builder, RabbitProperties properties,
|
||||||
|
RabbitConnectionDetails connectionDetails) {
|
||||||
|
return configure(builder, properties.getStream(), connectionDetails);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static EnvironmentBuilder configure(EnvironmentBuilder builder, RabbitProperties.Stream stream,
|
||||||
|
RabbitConnectionDetails connectionDetails) {
|
||||||
builder.lazyInitialization(true);
|
builder.lazyInitialization(true);
|
||||||
RabbitProperties.Stream stream = properties.getStream();
|
|
||||||
PropertyMapper map = PropertyMapper.get();
|
PropertyMapper map = PropertyMapper.get();
|
||||||
map.from(stream.getHost()).to(builder::host);
|
map.from(stream.getHost()).to(builder::host);
|
||||||
map.from(stream.getPort()).to(builder::port);
|
map.from(stream.getPort()).to(builder::port);
|
||||||
map.from(stream.getVirtualHost())
|
map.from(stream.getVirtualHost())
|
||||||
.as(withFallback(properties::getVirtualHost))
|
.as(withFallback(connectionDetails::getVirtualHost))
|
||||||
.whenNonNull()
|
.whenNonNull()
|
||||||
.to(builder::virtualHost);
|
.to(builder::virtualHost);
|
||||||
map.from(stream.getUsername()).as(withFallback(properties::getUsername)).whenNonNull().to(builder::username);
|
map.from(stream.getUsername())
|
||||||
map.from(stream.getPassword()).as(withFallback(properties::getPassword)).whenNonNull().to(builder::password);
|
.as(withFallback(connectionDetails::getUsername))
|
||||||
|
.whenNonNull()
|
||||||
|
.to(builder::username);
|
||||||
|
map.from(stream.getPassword())
|
||||||
|
.as(withFallback(connectionDetails::getPassword))
|
||||||
|
.whenNonNull()
|
||||||
|
.to(builder::password);
|
||||||
return builder;
|
return builder;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
/*
|
/*
|
||||||
* Copyright 2012-2023 the original author or authors.
|
* Copyright 2012-2024 the original author or authors.
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
|
@ -17,6 +17,7 @@
|
||||||
package org.springframework.boot.autoconfigure.amqp;
|
package org.springframework.boot.autoconfigure.amqp;
|
||||||
|
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
import com.rabbitmq.stream.BackOffDelayPolicy;
|
import com.rabbitmq.stream.BackOffDelayPolicy;
|
||||||
import com.rabbitmq.stream.Codec;
|
import com.rabbitmq.stream.Codec;
|
||||||
|
@ -113,12 +114,14 @@ class RabbitStreamConfigurationTests {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void environmentUsesPropertyDefaultsByDefault() {
|
void environmentUsesConnectionDetailsByDefault() {
|
||||||
EnvironmentBuilder builder = mock(EnvironmentBuilder.class);
|
EnvironmentBuilder builder = mock(EnvironmentBuilder.class);
|
||||||
RabbitProperties properties = new RabbitProperties();
|
RabbitProperties properties = new RabbitProperties();
|
||||||
RabbitStreamConfiguration.configure(builder, properties);
|
RabbitStreamConfiguration.configure(builder, properties,
|
||||||
|
new TestRabbitConnectionDetails("guest", "guest", "vhost"));
|
||||||
then(builder).should().port(5552);
|
then(builder).should().port(5552);
|
||||||
then(builder).should().host("localhost");
|
then(builder).should().host("localhost");
|
||||||
|
then(builder).should().virtualHost("vhost");
|
||||||
then(builder).should().lazyInitialization(true);
|
then(builder).should().lazyInitialization(true);
|
||||||
then(builder).should().username("guest");
|
then(builder).should().username("guest");
|
||||||
then(builder).should().password("guest");
|
then(builder).should().password("guest");
|
||||||
|
@ -130,7 +133,8 @@ class RabbitStreamConfigurationTests {
|
||||||
EnvironmentBuilder builder = mock(EnvironmentBuilder.class);
|
EnvironmentBuilder builder = mock(EnvironmentBuilder.class);
|
||||||
RabbitProperties properties = new RabbitProperties();
|
RabbitProperties properties = new RabbitProperties();
|
||||||
properties.getStream().setPort(5553);
|
properties.getStream().setPort(5553);
|
||||||
RabbitStreamConfiguration.configure(builder, properties);
|
RabbitStreamConfiguration.configure(builder, properties,
|
||||||
|
new TestRabbitConnectionDetails("guest", "guest", "vhost"));
|
||||||
then(builder).should().port(5553);
|
then(builder).should().port(5553);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -139,7 +143,8 @@ class RabbitStreamConfigurationTests {
|
||||||
EnvironmentBuilder builder = mock(EnvironmentBuilder.class);
|
EnvironmentBuilder builder = mock(EnvironmentBuilder.class);
|
||||||
RabbitProperties properties = new RabbitProperties();
|
RabbitProperties properties = new RabbitProperties();
|
||||||
properties.getStream().setHost("stream.rabbit.example.com");
|
properties.getStream().setHost("stream.rabbit.example.com");
|
||||||
RabbitStreamConfiguration.configure(builder, properties);
|
RabbitStreamConfiguration.configure(builder, properties,
|
||||||
|
new TestRabbitConnectionDetails("guest", "guest", "vhost"));
|
||||||
then(builder).should().host("stream.rabbit.example.com");
|
then(builder).should().host("stream.rabbit.example.com");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -148,7 +153,8 @@ class RabbitStreamConfigurationTests {
|
||||||
EnvironmentBuilder builder = mock(EnvironmentBuilder.class);
|
EnvironmentBuilder builder = mock(EnvironmentBuilder.class);
|
||||||
RabbitProperties properties = new RabbitProperties();
|
RabbitProperties properties = new RabbitProperties();
|
||||||
properties.getStream().setVirtualHost("stream-virtual-host");
|
properties.getStream().setVirtualHost("stream-virtual-host");
|
||||||
RabbitStreamConfiguration.configure(builder, properties);
|
RabbitStreamConfiguration.configure(builder, properties,
|
||||||
|
new TestRabbitConnectionDetails("guest", "guest", "vhost"));
|
||||||
then(builder).should().virtualHost("stream-virtual-host");
|
then(builder).should().virtualHost("stream-virtual-host");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -156,20 +162,22 @@ class RabbitStreamConfigurationTests {
|
||||||
void whenStreamVirtualHostIsNotSetButDefaultVirtualHostIsSetThenEnvironmentUsesDefaultVirtualHost() {
|
void whenStreamVirtualHostIsNotSetButDefaultVirtualHostIsSetThenEnvironmentUsesDefaultVirtualHost() {
|
||||||
EnvironmentBuilder builder = mock(EnvironmentBuilder.class);
|
EnvironmentBuilder builder = mock(EnvironmentBuilder.class);
|
||||||
RabbitProperties properties = new RabbitProperties();
|
RabbitProperties properties = new RabbitProperties();
|
||||||
properties.setVirtualHost("default-virtual-host");
|
properties.setVirtualHost("properties-virtual-host");
|
||||||
RabbitStreamConfiguration.configure(builder, properties);
|
RabbitStreamConfiguration.configure(builder, properties,
|
||||||
|
new TestRabbitConnectionDetails("guest", "guest", "default-virtual-host"));
|
||||||
then(builder).should().virtualHost("default-virtual-host");
|
then(builder).should().virtualHost("default-virtual-host");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void whenStreamCredentialsAreNotSetThenEnvironmentUsesRabbitCredentials() {
|
void whenStreamCredentialsAreNotSetThenEnvironmentUsesConnectionDetailsCredentials() {
|
||||||
EnvironmentBuilder builder = mock(EnvironmentBuilder.class);
|
EnvironmentBuilder builder = mock(EnvironmentBuilder.class);
|
||||||
RabbitProperties properties = new RabbitProperties();
|
RabbitProperties properties = new RabbitProperties();
|
||||||
properties.setUsername("alice");
|
properties.setUsername("alice");
|
||||||
properties.setPassword("secret");
|
properties.setPassword("secret");
|
||||||
RabbitStreamConfiguration.configure(builder, properties);
|
RabbitStreamConfiguration.configure(builder, properties,
|
||||||
then(builder).should().username("alice");
|
new TestRabbitConnectionDetails("bob", "password", "vhost"));
|
||||||
then(builder).should().password("secret");
|
then(builder).should().username("bob");
|
||||||
|
then(builder).should().password("password");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -180,7 +188,8 @@ class RabbitStreamConfigurationTests {
|
||||||
properties.setPassword("secret");
|
properties.setPassword("secret");
|
||||||
properties.getStream().setUsername("bob");
|
properties.getStream().setUsername("bob");
|
||||||
properties.getStream().setPassword("confidential");
|
properties.getStream().setPassword("confidential");
|
||||||
RabbitStreamConfiguration.configure(builder, properties);
|
RabbitStreamConfiguration.configure(builder, properties,
|
||||||
|
new TestRabbitConnectionDetails("charlotte", "hidden", "vhost"));
|
||||||
then(builder).should().username("bob");
|
then(builder).should().username("bob");
|
||||||
then(builder).should().password("confidential");
|
then(builder).should().password("confidential");
|
||||||
}
|
}
|
||||||
|
@ -334,4 +343,40 @@ class RabbitStreamConfigurationTests {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static final class TestRabbitConnectionDetails implements RabbitConnectionDetails {
|
||||||
|
|
||||||
|
private final String username;
|
||||||
|
|
||||||
|
private final String password;
|
||||||
|
|
||||||
|
private final String virtualHost;
|
||||||
|
|
||||||
|
private TestRabbitConnectionDetails(String username, String password, String virtualHost) {
|
||||||
|
this.username = username;
|
||||||
|
this.password = password;
|
||||||
|
this.virtualHost = virtualHost;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getUsername() {
|
||||||
|
return this.username;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getPassword() {
|
||||||
|
return this.password;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getVirtualHost() {
|
||||||
|
return this.virtualHost;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<Address> getAddresses() {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue