commit
5857bcccc6
|
@ -28,7 +28,6 @@ import org.springframework.context.annotation.Configuration;
|
||||||
import org.springframework.data.elasticsearch.client.ClientConfiguration;
|
import org.springframework.data.elasticsearch.client.ClientConfiguration;
|
||||||
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient;
|
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient;
|
||||||
import org.springframework.data.elasticsearch.client.reactive.ReactiveRestClients;
|
import org.springframework.data.elasticsearch.client.reactive.ReactiveRestClients;
|
||||||
import org.springframework.http.HttpHeaders;
|
|
||||||
import org.springframework.util.unit.DataSize;
|
import org.springframework.util.unit.DataSize;
|
||||||
import org.springframework.web.reactive.function.client.ExchangeStrategies;
|
import org.springframework.web.reactive.function.client.ExchangeStrategies;
|
||||||
import org.springframework.web.reactive.function.client.WebClient;
|
import org.springframework.web.reactive.function.client.WebClient;
|
||||||
|
@ -50,39 +49,28 @@ public class ReactiveElasticsearchRestClientAutoConfiguration {
|
||||||
public ClientConfiguration clientConfiguration(ReactiveElasticsearchRestClientProperties properties) {
|
public ClientConfiguration clientConfiguration(ReactiveElasticsearchRestClientProperties properties) {
|
||||||
ClientConfiguration.MaybeSecureClientConfigurationBuilder builder = ClientConfiguration.builder()
|
ClientConfiguration.MaybeSecureClientConfigurationBuilder builder = ClientConfiguration.builder()
|
||||||
.connectedTo(properties.getEndpoints().toArray(new String[0]));
|
.connectedTo(properties.getEndpoints().toArray(new String[0]));
|
||||||
if (properties.isUseSsl()) {
|
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
|
||||||
builder.usingSsl();
|
map.from(properties.isUseSsl()).whenTrue().toCall(builder::usingSsl);
|
||||||
}
|
map.from(properties.getUsername()).whenHasText()
|
||||||
configureTimeouts(builder, properties);
|
.to((username) -> builder.withBasicAuth(username, properties.getPassword()));
|
||||||
configureExchangeStrategies(builder, properties);
|
map.from(properties.getConnectionTimeout()).to(builder::withConnectTimeout);
|
||||||
|
map.from(properties.getSocketTimeout()).to(builder::withSocketTimeout);
|
||||||
|
configureExchangeStrategies(map, builder, properties);
|
||||||
return builder.build();
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void configureTimeouts(ClientConfiguration.TerminalClientConfigurationBuilder builder,
|
private void configureExchangeStrategies(PropertyMapper map,
|
||||||
|
ClientConfiguration.TerminalClientConfigurationBuilder builder,
|
||||||
ReactiveElasticsearchRestClientProperties properties) {
|
ReactiveElasticsearchRestClientProperties properties) {
|
||||||
PropertyMapper map = PropertyMapper.get();
|
map.from(properties.getMaxInMemorySize()).asInt(DataSize::toBytes).to((maxInMemorySize) -> {
|
||||||
map.from(properties.getConnectionTimeout()).whenNonNull().to(builder::withConnectTimeout);
|
builder.withWebClientConfigurer((webClient) -> {
|
||||||
map.from(properties.getSocketTimeout()).whenNonNull().to(builder::withSocketTimeout);
|
ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder()
|
||||||
map.from(properties.getUsername()).whenHasText().to((username) -> {
|
.codecs((configurer) -> configurer.defaultCodecs().maxInMemorySize(maxInMemorySize)).build();
|
||||||
HttpHeaders headers = new HttpHeaders();
|
return webClient.mutate().exchangeStrategies(exchangeStrategies).build();
|
||||||
headers.setBasicAuth(username, properties.getPassword());
|
});
|
||||||
builder.withDefaultHeaders(headers);
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private void configureExchangeStrategies(ClientConfiguration.TerminalClientConfigurationBuilder builder,
|
|
||||||
ReactiveElasticsearchRestClientProperties properties) {
|
|
||||||
PropertyMapper map = PropertyMapper.get();
|
|
||||||
builder.withClientConfigurer(ReactiveRestClients.WebClientConfigurationCallback.from((webClient) -> {
|
|
||||||
ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder()
|
|
||||||
.codecs((configurer) -> map.from(properties.getMaxInMemorySize()).whenNonNull()
|
|
||||||
.asInt(DataSize::toBytes)
|
|
||||||
.to((maxInMemorySize) -> configurer.defaultCodecs().maxInMemorySize(maxInMemorySize)))
|
|
||||||
.build();
|
|
||||||
return webClient.mutate().exchangeStrategies(exchangeStrategies).build();
|
|
||||||
}));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
@ConditionalOnMissingBean
|
@ConditionalOnMissingBean
|
||||||
public ReactiveElasticsearchClient reactiveElasticsearchClient(ClientConfiguration clientConfiguration) {
|
public ReactiveElasticsearchClient reactiveElasticsearchClient(ClientConfiguration clientConfiguration) {
|
||||||
|
|
Loading…
Reference in New Issue