parent
1b27a7baab
commit
bf82ed7186
|
|
@ -0,0 +1,175 @@
|
|||
/*
|
||||
* Copyright 2002-2023 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.web.service.invoker;
|
||||
|
||||
import java.time.Duration;
|
||||
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import org.springframework.core.ParameterizedTypeReference;
|
||||
import org.springframework.core.ReactiveAdapterRegistry;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
/**
|
||||
*
|
||||
* @author Rossen Stoyanchev
|
||||
* @since 6.1
|
||||
*/
|
||||
@SuppressWarnings("removal")
|
||||
public abstract class AbstractReactorHttpExchangeAdapter
|
||||
implements ReactorHttpExchangeAdapter, org.springframework.web.service.invoker.HttpClientAdapter {
|
||||
|
||||
private ReactiveAdapterRegistry reactiveAdapterRegistry = ReactiveAdapterRegistry.getSharedInstance();
|
||||
|
||||
@Nullable
|
||||
private Duration blockTimeout;
|
||||
|
||||
|
||||
/**
|
||||
* Protected constructor, for subclasses.
|
||||
*/
|
||||
protected AbstractReactorHttpExchangeAdapter() {
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
*
|
||||
* @param reactiveAdapterRegistry
|
||||
*/
|
||||
public void setReactiveAdapterRegistry(ReactiveAdapterRegistry reactiveAdapterRegistry) {
|
||||
this.reactiveAdapterRegistry = reactiveAdapterRegistry;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
@Override
|
||||
public ReactiveAdapterRegistry getReactiveAdapterRegistry() {
|
||||
return this.reactiveAdapterRegistry;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param blockTimeout
|
||||
*/
|
||||
public void setBlockTimeout(@Nullable Duration blockTimeout) {
|
||||
this.blockTimeout = blockTimeout;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
@Override
|
||||
@Nullable
|
||||
public Duration getBlockTimeout() {
|
||||
return this.blockTimeout;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void exchange(HttpRequestValues requestValues) {
|
||||
if (this.blockTimeout != null) {
|
||||
exchangeForMono(requestValues).block(this.blockTimeout);
|
||||
}
|
||||
else {
|
||||
exchangeForMono(requestValues).block();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public HttpHeaders exchangeForHeaders(HttpRequestValues requestValues) {
|
||||
HttpHeaders headers = (this.blockTimeout != null ?
|
||||
exchangeForHeadersMono(requestValues).block(this.blockTimeout) :
|
||||
exchangeForHeadersMono(requestValues).block());
|
||||
Assert.state(headers != null, "Expected HttpHeaders");
|
||||
return headers;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> T exchangeForBody(HttpRequestValues requestValues, ParameterizedTypeReference<T> bodyType) {
|
||||
return (this.blockTimeout != null ?
|
||||
exchangeForBodyMono(requestValues, bodyType).block(this.blockTimeout) :
|
||||
exchangeForBodyMono(requestValues, bodyType).block());
|
||||
}
|
||||
|
||||
@Override
|
||||
public ResponseEntity<Void> exchangeForBodilessEntity(HttpRequestValues requestValues) {
|
||||
ResponseEntity<Void> entity = (this.blockTimeout != null ?
|
||||
exchangeForBodilessEntityMono(requestValues).block(this.blockTimeout) :
|
||||
exchangeForBodilessEntityMono(requestValues).block());
|
||||
Assert.state(entity != null, "Expected ResponseEntity");
|
||||
return entity;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> ResponseEntity<T> exchangeForEntity(HttpRequestValues requestValues, ParameterizedTypeReference<T> bodyType) {
|
||||
ResponseEntity<T> entity = (this.blockTimeout != null ?
|
||||
exchangeForEntityMono(requestValues, bodyType).block(this.blockTimeout) :
|
||||
exchangeForEntityMono(requestValues, bodyType).block());
|
||||
Assert.state(entity != null, "Expected ResponseEntity");
|
||||
return entity;
|
||||
}
|
||||
|
||||
|
||||
// HttpClientAdapter implementation
|
||||
|
||||
@Override
|
||||
public Mono<Void> requestToVoid(HttpRequestValues requestValues) {
|
||||
return exchangeForMono(requestValues);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<HttpHeaders> requestToHeaders(HttpRequestValues requestValues) {
|
||||
return exchangeForHeadersMono(requestValues);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> Mono<T> requestToBody(HttpRequestValues requestValues, ParameterizedTypeReference<T> bodyType) {
|
||||
return exchangeForBodyMono(requestValues, bodyType);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> Flux<T> requestToBodyFlux(HttpRequestValues requestValues, ParameterizedTypeReference<T> bodyType) {
|
||||
return exchangeForBodyFlux(requestValues, bodyType);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<ResponseEntity<Void>> requestToBodilessEntity(HttpRequestValues requestValues) {
|
||||
return exchangeForBodilessEntityMono(requestValues);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> Mono<ResponseEntity<T>> requestToEntity(
|
||||
HttpRequestValues requestValues, ParameterizedTypeReference<T> bodyType) {
|
||||
|
||||
return exchangeForEntityMono(requestValues, bodyType);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> Mono<ResponseEntity<Flux<T>>> requestToEntityFlux(
|
||||
HttpRequestValues requestValues, ParameterizedTypeReference<T> bodyType) {
|
||||
|
||||
return exchangeForEntityFlux(requestValues, bodyType);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2022 the original author or authors.
|
||||
* Copyright 2002-2023 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
|
@ -29,7 +29,9 @@ import org.springframework.http.ResponseEntity;
|
|||
*
|
||||
* @author Rossen Stoyanchev
|
||||
* @since 6.0
|
||||
* @deprecated in favor of {@link ReactorHttpExchangeAdapter}
|
||||
*/
|
||||
@Deprecated(since = "6.1", forRemoval = true)
|
||||
public interface HttpClientAdapter {
|
||||
|
||||
/**
|
||||
|
|
@ -86,4 +88,55 @@ public interface HttpClientAdapter {
|
|||
*/
|
||||
<T> Mono<ResponseEntity<Flux<T>>> requestToEntityFlux(HttpRequestValues requestValues, ParameterizedTypeReference<T> bodyType);
|
||||
|
||||
|
||||
/**
|
||||
* Adapt this {@link HttpClientAdapter} to {@link ReactorHttpExchangeAdapter}.
|
||||
* @return
|
||||
* @since 6.1
|
||||
*/
|
||||
default ReactorHttpExchangeAdapter asHttpExchangeAdapter() {
|
||||
|
||||
return new AbstractReactorHttpExchangeAdapter() {
|
||||
|
||||
@Override
|
||||
public Mono<Void> exchangeForMono(HttpRequestValues requestValues) {
|
||||
return requestToVoid(requestValues);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<HttpHeaders> exchangeForHeadersMono(HttpRequestValues requestValues) {
|
||||
return requestToHeaders(requestValues);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> Mono<T> exchangeForBodyMono(HttpRequestValues requestValues, ParameterizedTypeReference<T> bodyType) {
|
||||
return requestToBody(requestValues, bodyType);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> Flux<T> exchangeForBodyFlux(HttpRequestValues requestValues, ParameterizedTypeReference<T> bodyType) {
|
||||
return requestToBodyFlux(requestValues, bodyType);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<ResponseEntity<Void>> exchangeForBodilessEntityMono(HttpRequestValues requestValues) {
|
||||
return requestToBodilessEntity(requestValues);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> Mono<ResponseEntity<T>> exchangeForEntityMono(
|
||||
HttpRequestValues requestValues, ParameterizedTypeReference<T> bodyType) {
|
||||
|
||||
return requestToEntity(requestValues, bodyType);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> Mono<ResponseEntity<Flux<T>>> exchangeForEntityFlux(
|
||||
HttpRequestValues requestValues, ParameterizedTypeReference<T> bodyType) {
|
||||
|
||||
return requestToEntityFlux(requestValues, bodyType);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,69 @@
|
|||
/*
|
||||
* Copyright 2002-2023 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.web.service.invoker;
|
||||
|
||||
import org.springframework.core.ParameterizedTypeReference;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.lang.Nullable;
|
||||
|
||||
/**
|
||||
* Contract to abstract an underlying HTTP client and decouple it from the
|
||||
* {@linkplain HttpServiceProxyFactory#createClient(Class) HTTP service proxy}.
|
||||
*
|
||||
* @author Rossen Stoyanchev
|
||||
* @since 6.1
|
||||
*/
|
||||
public interface HttpExchangeAdapter {
|
||||
|
||||
/**
|
||||
* Perform the given request, and release the response content, if any.
|
||||
* @param requestValues the request to perform
|
||||
*/
|
||||
void exchange(HttpRequestValues requestValues);
|
||||
|
||||
/**
|
||||
* Perform the given request, release the response content, and return the
|
||||
* response headers.
|
||||
* @param requestValues the request to perform
|
||||
* @return the response headers
|
||||
*/
|
||||
HttpHeaders exchangeForHeaders(HttpRequestValues requestValues);
|
||||
|
||||
/**
|
||||
* Perform the given request and decode the response content to the given type.
|
||||
* @param requestValues the request to perform
|
||||
* @param bodyType the target type to decode to
|
||||
* @return the decoded response.
|
||||
* @param <T> the type the response is decoded to
|
||||
*/
|
||||
@Nullable
|
||||
<T> T exchangeForBody(HttpRequestValues requestValues, ParameterizedTypeReference<T> bodyType);
|
||||
|
||||
/**
|
||||
* Variant of {@link #exchange(HttpRequestValues)} with additional
|
||||
* access to the response status and headers.
|
||||
*/
|
||||
ResponseEntity<Void> exchangeForBodilessEntity(HttpRequestValues requestValues);
|
||||
|
||||
/**
|
||||
* Variant of {@link #exchangeForBody(HttpRequestValues, ParameterizedTypeReference)}
|
||||
* with additional access to the response status and headers.
|
||||
*/
|
||||
<T> ResponseEntity<T> exchangeForEntity(HttpRequestValues requestValues, ParameterizedTypeReference<T> bodyType);
|
||||
|
||||
}
|
||||
|
|
@ -32,7 +32,6 @@ import org.springframework.core.KotlinDetector;
|
|||
import org.springframework.core.MethodParameter;
|
||||
import org.springframework.core.ParameterizedTypeReference;
|
||||
import org.springframework.core.ReactiveAdapter;
|
||||
import org.springframework.core.ReactiveAdapterRegistry;
|
||||
import org.springframework.core.annotation.AnnotatedElementUtils;
|
||||
import org.springframework.core.annotation.SynthesizingMethodParameter;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
|
|
@ -41,6 +40,7 @@ import org.springframework.http.MediaType;
|
|||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.ClassUtils;
|
||||
import org.springframework.util.ObjectUtils;
|
||||
import org.springframework.util.StringUtils;
|
||||
import org.springframework.util.StringValueResolver;
|
||||
|
|
@ -49,7 +49,7 @@ import org.springframework.web.service.annotation.HttpExchange;
|
|||
/**
|
||||
* Implements the invocation of an {@link HttpExchange @HttpExchange}-annotated,
|
||||
* {@link HttpServiceProxyFactory#createClient(Class) HTTP service proxy} method
|
||||
* by delegating to an {@link HttpClientAdapter} to perform actual requests.
|
||||
* by delegating to an {@link HttpExchangeAdapter} to perform actual requests.
|
||||
*
|
||||
* @author Rossen Stoyanchev
|
||||
* @author Sebastien Deleuze
|
||||
|
|
@ -57,6 +57,10 @@ import org.springframework.web.service.annotation.HttpExchange;
|
|||
*/
|
||||
final class HttpServiceMethod {
|
||||
|
||||
private static final boolean REACTOR_PRESENT =
|
||||
ClassUtils.isPresent("reactor.core.publisher.Mono", HttpServiceMethod.class.getClassLoader());
|
||||
|
||||
|
||||
private final Method method;
|
||||
|
||||
private final MethodParameter[] parameters;
|
||||
|
|
@ -70,14 +74,16 @@ final class HttpServiceMethod {
|
|||
|
||||
HttpServiceMethod(
|
||||
Method method, Class<?> containingClass, List<HttpServiceArgumentResolver> argumentResolvers,
|
||||
HttpClientAdapter client, @Nullable StringValueResolver embeddedValueResolver,
|
||||
ReactiveAdapterRegistry reactiveRegistry, @Nullable Duration blockTimeout) {
|
||||
HttpExchangeAdapter adapter, @Nullable StringValueResolver embeddedValueResolver) {
|
||||
|
||||
this.method = method;
|
||||
this.parameters = initMethodParameters(method);
|
||||
this.argumentResolvers = argumentResolvers;
|
||||
this.requestValuesInitializer = HttpRequestValuesInitializer.create(method, containingClass, embeddedValueResolver);
|
||||
this.responseFunction = ResponseFunction.create(client, method, reactiveRegistry, blockTimeout);
|
||||
this.responseFunction =
|
||||
(REACTOR_PRESENT && adapter instanceof ReactorHttpExchangeAdapter reactorAdapter ?
|
||||
ReactorExchangeResponseFunction.create(reactorAdapter, method) :
|
||||
ExchangeResponseFunction.create(adapter, method));
|
||||
}
|
||||
|
||||
private static MethodParameter[] initMethodParameters(Method method) {
|
||||
|
|
@ -267,13 +273,37 @@ final class HttpServiceMethod {
|
|||
|
||||
|
||||
/**
|
||||
* Function to execute a request, obtain a response, and adapt to the expected
|
||||
* return type, blocking if necessary.
|
||||
* Execute a request, obtain a response, and adapt to the expected return type.
|
||||
*/
|
||||
private record ResponseFunction(
|
||||
private interface ResponseFunction {
|
||||
|
||||
@Nullable
|
||||
Object execute(HttpRequestValues requestValues);
|
||||
|
||||
}
|
||||
|
||||
|
||||
private record ExchangeResponseFunction(
|
||||
Function<HttpRequestValues, Object> responseFunction) implements ResponseFunction {
|
||||
|
||||
@Override
|
||||
public Object execute(HttpRequestValues requestValues) {
|
||||
return null;
|
||||
}
|
||||
|
||||
public static ResponseFunction create(HttpExchangeAdapter client, Method method) {
|
||||
return new ExchangeResponseFunction(httpRequestValues -> null);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* {@link ResponseFunction} for {@link ReactorHttpExchangeAdapter}.
|
||||
*/
|
||||
private record ReactorExchangeResponseFunction(
|
||||
Function<HttpRequestValues, Publisher<?>> responseFunction,
|
||||
@Nullable ReactiveAdapter returnTypeAdapter,
|
||||
boolean blockForOptional, @Nullable Duration blockTimeout) {
|
||||
boolean blockForOptional, @Nullable Duration blockTimeout) implements ResponseFunction {
|
||||
|
||||
@Nullable
|
||||
public Object execute(HttpRequestValues requestValues) {
|
||||
|
|
@ -300,10 +330,7 @@ final class HttpServiceMethod {
|
|||
/**
|
||||
* Create the {@code ResponseFunction} that matches the method's return type.
|
||||
*/
|
||||
public static ResponseFunction create(
|
||||
HttpClientAdapter client, Method method, ReactiveAdapterRegistry reactiveRegistry,
|
||||
@Nullable Duration blockTimeout) {
|
||||
|
||||
public static ResponseFunction create(ReactorHttpExchangeAdapter client, Method method) {
|
||||
MethodParameter returnParam = new MethodParameter(method, -1);
|
||||
Class<?> returnType = returnParam.getParameterType();
|
||||
boolean isSuspending = KotlinDetector.isSuspendingFunction(method);
|
||||
|
|
@ -311,29 +338,29 @@ final class HttpServiceMethod {
|
|||
returnType = Mono.class;
|
||||
}
|
||||
|
||||
ReactiveAdapter reactiveAdapter = reactiveRegistry.getAdapter(returnType);
|
||||
ReactiveAdapter reactiveAdapter = client.getReactiveAdapterRegistry().getAdapter(returnType);
|
||||
|
||||
MethodParameter actualParam = (reactiveAdapter != null ? returnParam.nested() : returnParam.nestedIfOptional());
|
||||
Class<?> actualType = isSuspending ? actualParam.getParameterType() : actualParam.getNestedParameterType();
|
||||
|
||||
Function<HttpRequestValues, Publisher<?>> responseFunction;
|
||||
if (actualType.equals(void.class) || actualType.equals(Void.class)) {
|
||||
responseFunction = client::requestToVoid;
|
||||
responseFunction = client::exchangeForMono;
|
||||
}
|
||||
else if (reactiveAdapter != null && reactiveAdapter.isNoValue()) {
|
||||
responseFunction = client::requestToVoid;
|
||||
responseFunction = client::exchangeForMono;
|
||||
}
|
||||
else if (actualType.equals(HttpHeaders.class)) {
|
||||
responseFunction = client::requestToHeaders;
|
||||
responseFunction = client::exchangeForHeadersMono;
|
||||
}
|
||||
else if (actualType.equals(ResponseEntity.class)) {
|
||||
MethodParameter bodyParam = isSuspending ? actualParam : actualParam.nested();
|
||||
Class<?> bodyType = bodyParam.getNestedParameterType();
|
||||
if (bodyType.equals(Void.class)) {
|
||||
responseFunction = client::requestToBodilessEntity;
|
||||
responseFunction = client::exchangeForBodilessEntityMono;
|
||||
}
|
||||
else {
|
||||
ReactiveAdapter bodyAdapter = reactiveRegistry.getAdapter(bodyType);
|
||||
ReactiveAdapter bodyAdapter = client.getReactiveAdapterRegistry().getAdapter(bodyType);
|
||||
responseFunction = initResponseEntityFunction(client, bodyParam, bodyAdapter, isSuspending);
|
||||
}
|
||||
}
|
||||
|
|
@ -341,16 +368,17 @@ final class HttpServiceMethod {
|
|||
responseFunction = initBodyFunction(client, actualParam, reactiveAdapter, isSuspending);
|
||||
}
|
||||
|
||||
boolean blockForOptional = returnType.equals(Optional.class);
|
||||
return new ResponseFunction(responseFunction, reactiveAdapter, blockForOptional, blockTimeout);
|
||||
return new ReactorExchangeResponseFunction(
|
||||
responseFunction, reactiveAdapter, returnType.equals(Optional.class), client.getBlockTimeout());
|
||||
}
|
||||
|
||||
@SuppressWarnings("ConstantConditions")
|
||||
private static Function<HttpRequestValues, Publisher<?>> initResponseEntityFunction(HttpClientAdapter client,
|
||||
MethodParameter methodParam, @Nullable ReactiveAdapter reactiveAdapter, boolean isSuspending) {
|
||||
private static Function<HttpRequestValues, Publisher<?>> initResponseEntityFunction(
|
||||
ReactorHttpExchangeAdapter client, MethodParameter methodParam,
|
||||
@Nullable ReactiveAdapter reactiveAdapter, boolean isSuspending) {
|
||||
|
||||
if (reactiveAdapter == null) {
|
||||
return request -> client.requestToEntity(
|
||||
return request -> client.exchangeForEntityMono(
|
||||
request, ParameterizedTypeReference.forType(methodParam.getNestedGenericParameterType()));
|
||||
}
|
||||
|
||||
|
|
@ -363,28 +391,28 @@ final class HttpServiceMethod {
|
|||
|
||||
// Shortcut for Flux
|
||||
if (reactiveAdapter.getReactiveType().equals(Flux.class)) {
|
||||
return request -> client.requestToEntityFlux(request, bodyType);
|
||||
return request -> client.exchangeForEntityFlux(request, bodyType);
|
||||
}
|
||||
|
||||
return request -> client.requestToEntityFlux(request, bodyType)
|
||||
return request -> client.exchangeForEntityFlux(request, bodyType)
|
||||
.map(entity -> {
|
||||
Object body = reactiveAdapter.fromPublisher(entity.getBody());
|
||||
return new ResponseEntity<>(body, entity.getHeaders(), entity.getStatusCode());
|
||||
});
|
||||
}
|
||||
|
||||
private static Function<HttpRequestValues, Publisher<?>> initBodyFunction(HttpClientAdapter client,
|
||||
MethodParameter methodParam, @Nullable ReactiveAdapter reactiveAdapter, boolean isSuspending) {
|
||||
private static Function<HttpRequestValues, Publisher<?>> initBodyFunction(
|
||||
ReactorHttpExchangeAdapter client, MethodParameter methodParam,
|
||||
@Nullable ReactiveAdapter reactiveAdapter, boolean isSuspending) {
|
||||
|
||||
ParameterizedTypeReference<?> bodyType =
|
||||
ParameterizedTypeReference.forType(isSuspending ? methodParam.getGenericParameterType() :
|
||||
methodParam.getNestedGenericParameterType());
|
||||
|
||||
return (reactiveAdapter != null && reactiveAdapter.isMultiValue() ?
|
||||
request -> client.requestToBodyFlux(request, bodyType) :
|
||||
request -> client.requestToBody(request, bodyType));
|
||||
request -> client.exchangeForBodyFlux(request, bodyType) :
|
||||
request -> client.exchangeForBodyMono(request, bodyType));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -57,29 +57,21 @@ import org.springframework.web.service.annotation.HttpExchange;
|
|||
*/
|
||||
public final class HttpServiceProxyFactory {
|
||||
|
||||
private final HttpClientAdapter clientAdapter;
|
||||
private final HttpExchangeAdapter exchangeAdapter;
|
||||
|
||||
private final List<HttpServiceArgumentResolver> argumentResolvers;
|
||||
|
||||
@Nullable
|
||||
private final StringValueResolver embeddedValueResolver;
|
||||
|
||||
private final ReactiveAdapterRegistry reactiveAdapterRegistry;
|
||||
|
||||
@Nullable
|
||||
private final Duration blockTimeout;
|
||||
|
||||
|
||||
private HttpServiceProxyFactory(
|
||||
HttpClientAdapter clientAdapter, List<HttpServiceArgumentResolver> argumentResolvers,
|
||||
@Nullable StringValueResolver embeddedValueResolver,
|
||||
ReactiveAdapterRegistry reactiveAdapterRegistry, @Nullable Duration blockTimeout) {
|
||||
HttpExchangeAdapter exchangeAdapter, List<HttpServiceArgumentResolver> argumentResolvers,
|
||||
@Nullable StringValueResolver embeddedValueResolver) {
|
||||
|
||||
this.clientAdapter = clientAdapter;
|
||||
this.exchangeAdapter = exchangeAdapter;
|
||||
this.argumentResolvers = argumentResolvers;
|
||||
this.embeddedValueResolver = embeddedValueResolver;
|
||||
this.reactiveAdapterRegistry = reactiveAdapterRegistry;
|
||||
this.blockTimeout = blockTimeout;
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -109,16 +101,25 @@ public final class HttpServiceProxyFactory {
|
|||
"No argument resolvers: afterPropertiesSet was not called");
|
||||
|
||||
return new HttpServiceMethod(
|
||||
method, serviceType, this.argumentResolvers, this.clientAdapter,
|
||||
this.embeddedValueResolver, this.reactiveAdapterRegistry, this.blockTimeout);
|
||||
method, serviceType, this.argumentResolvers, this.exchangeAdapter, this.embeddedValueResolver);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Return a builder that's initialized with the given client.
|
||||
*/
|
||||
public static Builder builderFor(HttpExchangeAdapter exchangeAdapter) {
|
||||
return new Builder().exchangeAdapter(exchangeAdapter);
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a builder that's initialized with the given client.
|
||||
* @deprecated in favor of {@link #builderFor(HttpExchangeAdapter)}
|
||||
*/
|
||||
@SuppressWarnings("removal")
|
||||
@Deprecated(since = "6.1", forRemoval = true)
|
||||
public static Builder builder(HttpClientAdapter clientAdapter) {
|
||||
return new Builder().clientAdapter(clientAdapter);
|
||||
return new Builder().exchangeAdapter(clientAdapter.asHttpExchangeAdapter());
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -135,7 +136,7 @@ public final class HttpServiceProxyFactory {
|
|||
public static final class Builder {
|
||||
|
||||
@Nullable
|
||||
private HttpClientAdapter clientAdapter;
|
||||
private HttpExchangeAdapter exchangeAdapter;
|
||||
|
||||
private final List<HttpServiceArgumentResolver> customArgumentResolvers = new ArrayList<>();
|
||||
|
||||
|
|
@ -145,21 +146,30 @@ public final class HttpServiceProxyFactory {
|
|||
@Nullable
|
||||
private StringValueResolver embeddedValueResolver;
|
||||
|
||||
private ReactiveAdapterRegistry reactiveAdapterRegistry = ReactiveAdapterRegistry.getSharedInstance();
|
||||
|
||||
@Nullable
|
||||
private Duration blockTimeout;
|
||||
|
||||
private Builder() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Provide the HTTP client to perform requests through.
|
||||
* @param adapter a client adapted to {@link HttpExchangeAdapter}
|
||||
* @return this same builder instance
|
||||
* @since 6.1
|
||||
*/
|
||||
public Builder exchangeAdapter(HttpExchangeAdapter adapter) {
|
||||
this.exchangeAdapter = adapter;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Provide the HTTP client to perform requests through.
|
||||
* @param clientAdapter a client adapted to {@link HttpClientAdapter}
|
||||
* @return this same builder instance
|
||||
* @deprecated in favor of {@link #exchangeAdapter(HttpExchangeAdapter)}
|
||||
*/
|
||||
@SuppressWarnings("removal")
|
||||
@Deprecated(since = "6.1", forRemoval = true)
|
||||
public Builder clientAdapter(HttpClientAdapter clientAdapter) {
|
||||
this.clientAdapter = clientAdapter;
|
||||
this.exchangeAdapter = clientAdapter.asHttpExchangeAdapter();
|
||||
return this;
|
||||
}
|
||||
|
||||
|
|
@ -200,9 +210,13 @@ public final class HttpServiceProxyFactory {
|
|||
* asynchronous types for HTTP service method return values.
|
||||
* <p>By default this is {@link ReactiveAdapterRegistry#getSharedInstance()}.
|
||||
* @return this same builder instance
|
||||
* @deprecated in favor of setting the same directly on the {@link HttpExchangeAdapter}
|
||||
*/
|
||||
@Deprecated(since = "6.1", forRemoval = true)
|
||||
public Builder reactiveAdapterRegistry(ReactiveAdapterRegistry registry) {
|
||||
this.reactiveAdapterRegistry = registry;
|
||||
if (this.exchangeAdapter instanceof AbstractReactorHttpExchangeAdapter settable) {
|
||||
settable.setReactiveAdapterRegistry(registry);
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
|
|
@ -215,9 +229,13 @@ public final class HttpServiceProxyFactory {
|
|||
* client, which provides more control over such settings.
|
||||
* @param blockTimeout the timeout value
|
||||
* @return this same builder instance
|
||||
* @deprecated in favor of setting the same directly on the {@link HttpExchangeAdapter}
|
||||
*/
|
||||
@Deprecated(since = "6.1", forRemoval = true)
|
||||
public Builder blockTimeout(@Nullable Duration blockTimeout) {
|
||||
this.blockTimeout = blockTimeout;
|
||||
if (this.exchangeAdapter instanceof AbstractReactorHttpExchangeAdapter settable) {
|
||||
settable.setBlockTimeout(blockTimeout);
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
|
|
@ -225,11 +243,10 @@ public final class HttpServiceProxyFactory {
|
|||
* Build the {@link HttpServiceProxyFactory} instance.
|
||||
*/
|
||||
public HttpServiceProxyFactory build() {
|
||||
Assert.notNull(this.clientAdapter, "HttpClientAdapter is required");
|
||||
Assert.notNull(this.exchangeAdapter, "HttpClientAdapter is required");
|
||||
|
||||
return new HttpServiceProxyFactory(
|
||||
this.clientAdapter, initArgumentResolvers(),
|
||||
this.embeddedValueResolver, this.reactiveAdapterRegistry, this.blockTimeout);
|
||||
this.exchangeAdapter, initArgumentResolvers(), this.embeddedValueResolver);
|
||||
}
|
||||
|
||||
private List<HttpServiceArgumentResolver> initArgumentResolvers() {
|
||||
|
|
@ -242,10 +259,10 @@ public final class HttpServiceProxyFactory {
|
|||
|
||||
// Annotation-based
|
||||
resolvers.add(new RequestHeaderArgumentResolver(service));
|
||||
resolvers.add(new RequestBodyArgumentResolver(this.reactiveAdapterRegistry));
|
||||
resolvers.add(new RequestBodyArgumentResolver());
|
||||
resolvers.add(new PathVariableArgumentResolver(service));
|
||||
resolvers.add(new RequestParamArgumentResolver(service));
|
||||
resolvers.add(new RequestPartArgumentResolver(this.reactiveAdapterRegistry));
|
||||
resolvers.add(new RequestPartArgumentResolver());
|
||||
resolvers.add(new CookieValueArgumentResolver(service));
|
||||
resolvers.add(new RequestAttributeArgumentResolver());
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,106 @@
|
|||
/*
|
||||
* Copyright 2002-2023 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.web.service.invoker;
|
||||
|
||||
import java.time.Duration;
|
||||
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import org.springframework.core.ParameterizedTypeReference;
|
||||
import org.springframework.core.ReactiveAdapterRegistry;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
|
||||
/**
|
||||
* Contract to abstract a Project Reactor, HTTP client to decouple it from the
|
||||
* {@linkplain HttpServiceProxyFactory#createClient(Class) HTTP service proxy}.
|
||||
*
|
||||
* @author Rossen Stoyanchev
|
||||
* @since 6.1
|
||||
*/
|
||||
public interface ReactorHttpExchangeAdapter extends HttpExchangeAdapter {
|
||||
|
||||
/**
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
ReactiveAdapterRegistry getReactiveAdapterRegistry();
|
||||
|
||||
/**
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
Duration getBlockTimeout();
|
||||
|
||||
/**
|
||||
* Perform the given request, and release the response content, if any.
|
||||
* @param requestValues the request to perform
|
||||
* @return {@code Mono} that completes when the request is fully executed
|
||||
* and the response content is released.
|
||||
*/
|
||||
Mono<Void> exchangeForMono(HttpRequestValues requestValues);
|
||||
|
||||
/**
|
||||
* Perform the given request, release the response content, and return the
|
||||
* response headers.
|
||||
* @param requestValues the request to perform
|
||||
* @return {@code Mono} that returns the response headers the request is
|
||||
* fully executed and the response content released.
|
||||
*/
|
||||
Mono<HttpHeaders> exchangeForHeadersMono(HttpRequestValues requestValues);
|
||||
|
||||
/**
|
||||
* Perform the given request and decode the response content to the given type.
|
||||
* @param requestValues the request to perform
|
||||
* @param bodyType the target type to decode to
|
||||
* @return {@code Mono} that returns the decoded response.
|
||||
* @param <T> the type the response is decoded to
|
||||
*/
|
||||
<T> Mono<T> exchangeForBodyMono(HttpRequestValues requestValues, ParameterizedTypeReference<T> bodyType);
|
||||
|
||||
/**
|
||||
* Perform the given request and decode the response content to a stream with
|
||||
* elements of the given type.
|
||||
* @param requestValues the request to perform
|
||||
* @param bodyType the target stream element type to decode to
|
||||
* @return {@code Flux} with decoded stream elements.
|
||||
* @param <T> the type the response is decoded to
|
||||
*/
|
||||
<T> Flux<T> exchangeForBodyFlux(HttpRequestValues requestValues, ParameterizedTypeReference<T> bodyType);
|
||||
|
||||
/**
|
||||
* Variant of {@link #exchangeForMono(HttpRequestValues)} with additional
|
||||
* access to the response status and headers.
|
||||
*/
|
||||
Mono<ResponseEntity<Void>> exchangeForBodilessEntityMono(HttpRequestValues requestValues);
|
||||
|
||||
/**
|
||||
* Variant of {@link #exchangeForBodyMono(HttpRequestValues, ParameterizedTypeReference)}
|
||||
* with additional access to the response status and headers.
|
||||
*/
|
||||
<T> Mono<ResponseEntity<T>> exchangeForEntityMono(
|
||||
HttpRequestValues requestValues, ParameterizedTypeReference<T> bodyType);
|
||||
|
||||
/**
|
||||
* Variant of {@link #exchangeForBodyFlux(HttpRequestValues, ParameterizedTypeReference)}
|
||||
* with additional access to the response status and headers.
|
||||
*/
|
||||
<T> Mono<ResponseEntity<Flux<T>>> exchangeForEntityFlux(
|
||||
HttpRequestValues requestValues, ParameterizedTypeReference<T> bodyType);
|
||||
|
||||
}
|
||||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2022 the original author or authors.
|
||||
* Copyright 2002-2023 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
|
@ -36,6 +36,17 @@ public class RequestBodyArgumentResolver implements HttpServiceArgumentResolver
|
|||
private final ReactiveAdapterRegistry reactiveAdapterRegistry;
|
||||
|
||||
|
||||
/**
|
||||
* Default constructor that uses {@link ReactiveAdapterRegistry#getSharedInstance()}.
|
||||
* @since 6.1
|
||||
*/
|
||||
public RequestBodyArgumentResolver() {
|
||||
this(ReactiveAdapterRegistry.getSharedInstance());
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructor with a {@link ReactiveAdapterRegistry}.
|
||||
*/
|
||||
public RequestBodyArgumentResolver(ReactiveAdapterRegistry reactiveAdapterRegistry) {
|
||||
Assert.notNull(reactiveAdapterRegistry, "ReactiveAdapterRegistry is required");
|
||||
this.reactiveAdapterRegistry = reactiveAdapterRegistry;
|
||||
|
|
|
|||
|
|
@ -50,6 +50,17 @@ public class RequestPartArgumentResolver extends AbstractNamedValueArgumentResol
|
|||
private final ReactiveAdapterRegistry reactiveAdapterRegistry;
|
||||
|
||||
|
||||
/**
|
||||
* Default constructor that uses {@link ReactiveAdapterRegistry#getSharedInstance()}.
|
||||
* @since 6.1
|
||||
*/
|
||||
public RequestPartArgumentResolver() {
|
||||
this(ReactiveAdapterRegistry.getSharedInstance());
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructor with a {@link ReactiveAdapterRegistry}.
|
||||
*/
|
||||
public RequestPartArgumentResolver(ReactiveAdapterRegistry reactiveAdapterRegistry) {
|
||||
this.reactiveAdapterRegistry = reactiveAdapterRegistry;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -25,13 +25,14 @@ import org.springframework.http.HttpMethod;
|
|||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.web.reactive.function.client.WebClient;
|
||||
import org.springframework.web.service.invoker.HttpClientAdapter;
|
||||
import org.springframework.web.service.invoker.HttpRequestValues;
|
||||
import org.springframework.web.service.invoker.HttpServiceProxyFactory;
|
||||
import org.springframework.web.service.invoker.ReactorHttpExchangeAdapter;
|
||||
import org.springframework.web.service.invoker.AbstractReactorHttpExchangeAdapter;
|
||||
|
||||
/**
|
||||
* {@link HttpClientAdapter} that enables an {@link HttpServiceProxyFactory} to
|
||||
* use {@link WebClient} for request execution.
|
||||
* {@link ReactorHttpExchangeAdapter} that enables an {@link HttpServiceProxyFactory}
|
||||
* to use {@link WebClient} for request execution.
|
||||
*
|
||||
* <p>Use static factory methods in this class to create an
|
||||
* {@code HttpServiceProxyFactory} configured with a given {@code WebClient}.
|
||||
|
|
@ -39,7 +40,7 @@ import org.springframework.web.service.invoker.HttpServiceProxyFactory;
|
|||
* @author Rossen Stoyanchev
|
||||
* @since 6.0
|
||||
*/
|
||||
public final class WebClientAdapter implements HttpClientAdapter {
|
||||
public final class WebClientAdapter extends AbstractReactorHttpExchangeAdapter {
|
||||
|
||||
private final WebClient webClient;
|
||||
|
||||
|
|
@ -53,37 +54,37 @@ public final class WebClientAdapter implements HttpClientAdapter {
|
|||
|
||||
|
||||
@Override
|
||||
public Mono<Void> requestToVoid(HttpRequestValues requestValues) {
|
||||
public Mono<Void> exchangeForMono(HttpRequestValues requestValues) {
|
||||
return newRequest(requestValues).retrieve().toBodilessEntity().then();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<HttpHeaders> requestToHeaders(HttpRequestValues requestValues) {
|
||||
public Mono<HttpHeaders> exchangeForHeadersMono(HttpRequestValues requestValues) {
|
||||
return newRequest(requestValues).retrieve().toBodilessEntity().map(ResponseEntity::getHeaders);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> Mono<T> requestToBody(HttpRequestValues requestValues, ParameterizedTypeReference<T> bodyType) {
|
||||
public <T> Mono<T> exchangeForBodyMono(HttpRequestValues requestValues, ParameterizedTypeReference<T> bodyType) {
|
||||
return newRequest(requestValues).retrieve().bodyToMono(bodyType);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> Flux<T> requestToBodyFlux(HttpRequestValues requestValues, ParameterizedTypeReference<T> bodyType) {
|
||||
public <T> Flux<T> exchangeForBodyFlux(HttpRequestValues requestValues, ParameterizedTypeReference<T> bodyType) {
|
||||
return newRequest(requestValues).retrieve().bodyToFlux(bodyType);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<ResponseEntity<Void>> requestToBodilessEntity(HttpRequestValues requestValues) {
|
||||
public Mono<ResponseEntity<Void>> exchangeForBodilessEntityMono(HttpRequestValues requestValues) {
|
||||
return newRequest(requestValues).retrieve().toBodilessEntity();
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> Mono<ResponseEntity<T>> requestToEntity(HttpRequestValues requestValues, ParameterizedTypeReference<T> bodyType) {
|
||||
public <T> Mono<ResponseEntity<T>> exchangeForEntityMono(HttpRequestValues requestValues, ParameterizedTypeReference<T> bodyType) {
|
||||
return newRequest(requestValues).retrieve().toEntity(bodyType);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> Mono<ResponseEntity<Flux<T>>> requestToEntityFlux(HttpRequestValues requestValues, ParameterizedTypeReference<T> bodyType) {
|
||||
public <T> Mono<ResponseEntity<Flux<T>>> exchangeForEntityFlux(HttpRequestValues requestValues, ParameterizedTypeReference<T> bodyType) {
|
||||
return newRequest(requestValues).retrieve().toEntityFlux(bodyType);
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue