Update AsyncRestTemplate interception
AsyncRequestExecution now properly supports decoration of the request (URI, HTTP method, and headers). Removed a no-op IdentityListenableFutureAdapter. Use Spring Framework coding style. Issue: SPR-12538
This commit is contained in:
parent
12969f6268
commit
258cc7b27f
|
|
@ -22,21 +22,28 @@ import org.springframework.util.concurrent.ListenableFuture;
|
|||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* The execution context of asynchronous client http request.
|
||||
* Represents the context of a client-side HTTP request execution.
|
||||
*
|
||||
* <p>Used to invoke the next interceptor in the interceptor chain, or - if the
|
||||
* calling interceptor is last - execute the request itself.
|
||||
*
|
||||
* @author Jakub Narloch
|
||||
* @author Rossen Stoyanchev
|
||||
* @since 4.3
|
||||
* @see AsyncClientHttpRequestInterceptor
|
||||
*/
|
||||
public interface AsyncClientHttpRequestExecution {
|
||||
|
||||
/**
|
||||
* Resumes the request execution by invoking next interceptor in the chain or executing the
|
||||
* request to the remote service.
|
||||
* Resume the request execution by invoking next interceptor in the chain
|
||||
* or executing the request to the remote service.
|
||||
*
|
||||
* @param request the http request, containing the http method and headers
|
||||
* @param body the body of the request
|
||||
* @return the future
|
||||
* @throws IOException in case of I/O errors
|
||||
*/
|
||||
ListenableFuture<ClientHttpResponse> executeAsync(HttpRequest request, byte[] body) throws IOException;
|
||||
}
|
||||
ListenableFuture<ClientHttpResponse> executeAsync(HttpRequest request, byte[] body)
|
||||
throws IOException;
|
||||
|
||||
}
|
||||
|
|
@ -23,23 +23,52 @@ import org.springframework.util.concurrent.ListenableFuture;
|
|||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* The asynchronous HTTP request interceptor.
|
||||
* Intercepts client-side HTTP requests. Implementations of this interface can be
|
||||
* {@linkplain org.springframework.web.client.AsyncRestTemplate#setInterceptors(java.util.List)
|
||||
* registered} with the {@link org.springframework.web.client.AsyncRestTemplate
|
||||
* AsyncRestTemplate} as to modify the outgoing {@link HttpRequest} and/or
|
||||
* register to modify the incoming {@link ClientHttpResponse} with help of a
|
||||
* {@link org.springframework.util.concurrent.ListenableFutureAdapter
|
||||
* ListenableFutureAdapter}.
|
||||
*
|
||||
* <p>The main entry point for interceptors is {@link #intercept}.
|
||||
*
|
||||
* @author Jakub Narloch
|
||||
* @author Rossen Stoyanchev
|
||||
* @since 4.3
|
||||
* @see org.springframework.web.client.AsyncRestTemplate
|
||||
* @see InterceptingAsyncHttpAccessor
|
||||
*/
|
||||
public interface AsyncClientHttpRequestInterceptor {
|
||||
|
||||
/**
|
||||
* Intercepts the outgoing client HTTP request.
|
||||
* Intercept the given request, and return a response future. The given
|
||||
* {@link AsyncClientHttpRequestExecution} allows the interceptor to pass on
|
||||
* the request to the next entity in the chain.
|
||||
*
|
||||
* @param request the request
|
||||
* @param body the request's body
|
||||
* @param execution the request execution context
|
||||
* @return the future
|
||||
* <p>An implementation might follow this pattern:
|
||||
* <ol>
|
||||
* <li>Examine the {@linkplain HttpRequest request} and body</li>
|
||||
* <li>Optionally {@linkplain org.springframework.http.client.support.HttpRequestWrapper
|
||||
* wrap} the request to filter HTTP attributes.</li>
|
||||
* <li>Optionally modify the body of the request.</li>
|
||||
* <li>One of the following:
|
||||
* <ul>
|
||||
* <li>execute the request through {@link ClientHttpRequestExecution}</li>
|
||||
* <li>don't execute the request to block the execution altogether</li>
|
||||
* </ul>
|
||||
* <li>Optionally adapt the response to filter HTTP attributes with the help of
|
||||
* {@link org.springframework.util.concurrent.ListenableFutureAdapter
|
||||
* ListenableFutureAdapter}.</li>
|
||||
* </ol>
|
||||
*
|
||||
* @param request the request, containing method, URI, and headers
|
||||
* @param body the body of the request
|
||||
* @param execution the request execution
|
||||
* @return the response future
|
||||
* @throws IOException in case of I/O errors
|
||||
*/
|
||||
ListenableFuture<ClientHttpResponse> interceptRequest(
|
||||
HttpRequest request, byte[] body, AsyncClientHttpRequestExecution execution) throws IOException;
|
||||
ListenableFuture<ClientHttpResponse> intercept(HttpRequest request, byte[] body,
|
||||
AsyncClientHttpRequestExecution execution) throws IOException;
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -16,24 +16,23 @@
|
|||
|
||||
package org.springframework.http.client;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.HttpMethod;
|
||||
import org.springframework.http.HttpRequest;
|
||||
import org.springframework.util.StreamUtils;
|
||||
import org.springframework.util.concurrent.ListenableFuture;
|
||||
import org.springframework.util.concurrent.ListenableFutureAdapter;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
/**
|
||||
* A {@link AsyncClientHttpRequest} wrapper that enriches it proceeds the actual request execution with calling
|
||||
* the registered interceptors.
|
||||
* An {@link AsyncClientHttpRequest} wrapper that enriches it proceeds the actual
|
||||
* request execution with calling the registered interceptors.
|
||||
*
|
||||
* @author Jakub Narloch
|
||||
* @author Rossen Stoyanchev
|
||||
* @see InterceptingAsyncClientHttpRequestFactory
|
||||
*/
|
||||
class InterceptingAsyncClientHttpRequest extends AbstractBufferingAsyncClientHttpRequest {
|
||||
|
|
@ -46,6 +45,7 @@ class InterceptingAsyncClientHttpRequest extends AbstractBufferingAsyncClientHtt
|
|||
|
||||
private HttpMethod httpMethod;
|
||||
|
||||
|
||||
/**
|
||||
* Creates new instance of {@link InterceptingAsyncClientHttpRequest}.
|
||||
*
|
||||
|
|
@ -55,8 +55,7 @@ class InterceptingAsyncClientHttpRequest extends AbstractBufferingAsyncClientHtt
|
|||
* @param httpMethod the HTTP method
|
||||
*/
|
||||
public InterceptingAsyncClientHttpRequest(AsyncClientHttpRequestFactory requestFactory,
|
||||
List<AsyncClientHttpRequestInterceptor> interceptors, URI uri,
|
||||
HttpMethod httpMethod) {
|
||||
List<AsyncClientHttpRequestInterceptor> interceptors, URI uri, HttpMethod httpMethod) {
|
||||
|
||||
this.requestFactory = requestFactory;
|
||||
this.interceptors = interceptors;
|
||||
|
|
@ -64,8 +63,11 @@ class InterceptingAsyncClientHttpRequest extends AbstractBufferingAsyncClientHtt
|
|||
this.httpMethod = httpMethod;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected ListenableFuture<ClientHttpResponse> executeInternal(HttpHeaders headers, byte[] body) throws IOException {
|
||||
protected ListenableFuture<ClientHttpResponse> executeInternal(HttpHeaders headers, byte[] body)
|
||||
throws IOException {
|
||||
|
||||
return new AsyncRequestExecution().executeAsync(this, body);
|
||||
}
|
||||
|
||||
|
|
@ -79,37 +81,37 @@ class InterceptingAsyncClientHttpRequest extends AbstractBufferingAsyncClientHtt
|
|||
return uri;
|
||||
}
|
||||
|
||||
|
||||
private class AsyncRequestExecution implements AsyncClientHttpRequestExecution {
|
||||
|
||||
private Iterator<AsyncClientHttpRequestInterceptor> nextInterceptor = interceptors.iterator();
|
||||
private Iterator<AsyncClientHttpRequestInterceptor> iterator;
|
||||
|
||||
public AsyncRequestExecution() {
|
||||
this.iterator = interceptors.iterator();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListenableFuture<ClientHttpResponse> executeAsync(HttpRequest request, byte[] body) throws IOException {
|
||||
if (nextInterceptor.hasNext()) {
|
||||
AsyncClientHttpRequestInterceptor interceptor = nextInterceptor.next();
|
||||
ListenableFuture<ClientHttpResponse> future = interceptor.interceptRequest(request, body, this);
|
||||
return new IdentityListenableFutureAdapter<ClientHttpResponse>(future);
|
||||
public ListenableFuture<ClientHttpResponse> executeAsync(HttpRequest request, byte[] body)
|
||||
throws IOException {
|
||||
|
||||
if (this.iterator.hasNext()) {
|
||||
AsyncClientHttpRequestInterceptor interceptor = this.iterator.next();
|
||||
return interceptor.intercept(request, body, this);
|
||||
}
|
||||
else {
|
||||
AsyncClientHttpRequest req = requestFactory.createAsyncRequest(uri, httpMethod);
|
||||
req.getHeaders().putAll(getHeaders());
|
||||
URI theUri = request.getURI();
|
||||
HttpMethod theMethod = request.getMethod();
|
||||
HttpHeaders theHeaders = request.getHeaders();
|
||||
|
||||
AsyncClientHttpRequest delegate = requestFactory.createAsyncRequest(theUri, theMethod);
|
||||
delegate.getHeaders().putAll(theHeaders);
|
||||
if (body.length > 0) {
|
||||
StreamUtils.copy(body, req.getBody());
|
||||
StreamUtils.copy(body, delegate.getBody());
|
||||
}
|
||||
return req.executeAsync();
|
||||
|
||||
return delegate.executeAsync();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static class IdentityListenableFutureAdapter<T> extends ListenableFutureAdapter<T, T> {
|
||||
|
||||
protected IdentityListenableFutureAdapter(ListenableFuture<T> adaptee) {
|
||||
super(adaptee);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected T adapt(T adapteeResult) throws ExecutionException {
|
||||
return adapteeResult;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -16,17 +16,18 @@
|
|||
|
||||
package org.springframework.http.client;
|
||||
|
||||
import org.springframework.http.HttpMethod;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import org.springframework.http.HttpMethod;
|
||||
|
||||
/**
|
||||
* The intercepting request factory.
|
||||
* Wrapper for a {@link AsyncClientHttpRequestFactory} that has support for
|
||||
* {@link AsyncClientHttpRequestInterceptor}s.
|
||||
*
|
||||
* @author Jakub Narloch
|
||||
* @since 4.3
|
||||
* @see InterceptingAsyncClientHttpRequest
|
||||
*/
|
||||
public class InterceptingAsyncClientHttpRequestFactory implements AsyncClientHttpRequestFactory {
|
||||
|
|
@ -35,22 +36,25 @@ public class InterceptingAsyncClientHttpRequestFactory implements AsyncClientHtt
|
|||
|
||||
private List<AsyncClientHttpRequestInterceptor> interceptors;
|
||||
|
||||
|
||||
/**
|
||||
* Creates new instance of {@link InterceptingAsyncClientHttpRequestFactory} with delegated request factory and
|
||||
* list of interceptors.
|
||||
* Create new instance of {@link InterceptingAsyncClientHttpRequestFactory}
|
||||
* with delegated request factory and list of interceptors.
|
||||
*
|
||||
* @param delegate the delegated request factory
|
||||
* @param interceptors the list of interceptors.
|
||||
* @param delegate the request factory to delegate to
|
||||
* @param interceptors the list of interceptors to use
|
||||
*/
|
||||
public InterceptingAsyncClientHttpRequestFactory(AsyncClientHttpRequestFactory delegate, List<AsyncClientHttpRequestInterceptor> interceptors) {
|
||||
public InterceptingAsyncClientHttpRequestFactory(AsyncClientHttpRequestFactory delegate,
|
||||
List<AsyncClientHttpRequestInterceptor> interceptors) {
|
||||
|
||||
this.delegate = delegate;
|
||||
this.interceptors = interceptors != null ? interceptors : Collections.<AsyncClientHttpRequestInterceptor>emptyList();
|
||||
this.interceptors = (interceptors != null ? interceptors :
|
||||
Collections.<AsyncClientHttpRequestInterceptor>emptyList());
|
||||
}
|
||||
|
||||
@Override
|
||||
public AsyncClientHttpRequest createAsyncRequest(URI uri, HttpMethod httpMethod) throws IOException {
|
||||
|
||||
return new InterceptingAsyncClientHttpRequest(delegate, interceptors, uri, httpMethod);
|
||||
public AsyncClientHttpRequest createAsyncRequest(URI uri, HttpMethod method) {
|
||||
return new InterceptingAsyncClientHttpRequest(this.delegate, this.interceptors, uri, method);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,30 +19,27 @@ package org.springframework.http.client.support;
|
|||
import org.springframework.http.client.AsyncClientHttpRequestFactory;
|
||||
import org.springframework.http.client.AsyncClientHttpRequestInterceptor;
|
||||
import org.springframework.http.client.InterceptingAsyncClientHttpRequestFactory;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* The HTTP accessor that extends the base {@link AsyncHttpAccessor} with request intercepting functionality.
|
||||
* The HTTP accessor that extends the base {@link AsyncHttpAccessor} with request
|
||||
* intercepting functionality.
|
||||
*
|
||||
* @author Jakub Narloch
|
||||
* @author Rossen Stoyanchev
|
||||
* @since 4.3
|
||||
*/
|
||||
public abstract class InterceptingAsyncHttpAccessor extends AsyncHttpAccessor {
|
||||
|
||||
private List<AsyncClientHttpRequestInterceptor> interceptors = new ArrayList<AsyncClientHttpRequestInterceptor>();
|
||||
private List<AsyncClientHttpRequestInterceptor> interceptors =
|
||||
new ArrayList<AsyncClientHttpRequestInterceptor>();
|
||||
|
||||
|
||||
/**
|
||||
* Retrieves the list of interceptors.
|
||||
*
|
||||
* @return the list of interceptors
|
||||
*/
|
||||
public List<AsyncClientHttpRequestInterceptor> getInterceptors() {
|
||||
return interceptors;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the list of interceptors.
|
||||
* Sets the request interceptors that this accessor should use.
|
||||
*
|
||||
* @param interceptors the list of interceptors
|
||||
*/
|
||||
|
|
@ -50,12 +47,22 @@ public abstract class InterceptingAsyncHttpAccessor extends AsyncHttpAccessor {
|
|||
this.interceptors = interceptors;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the request interceptor that this accessor uses.
|
||||
*/
|
||||
public List<AsyncClientHttpRequestInterceptor> getInterceptors() {
|
||||
return this.interceptors;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AsyncClientHttpRequestFactory getAsyncRequestFactory() {
|
||||
AsyncClientHttpRequestFactory asyncRequestFactory = super.getAsyncRequestFactory();
|
||||
if(interceptors.isEmpty()) {
|
||||
return asyncRequestFactory;
|
||||
AsyncClientHttpRequestFactory delegate = super.getAsyncRequestFactory();
|
||||
if (!CollectionUtils.isEmpty(getInterceptors())) {
|
||||
return new InterceptingAsyncClientHttpRequestFactory(delegate, getInterceptors());
|
||||
}
|
||||
else {
|
||||
return delegate;
|
||||
}
|
||||
return new InterceptingAsyncClientHttpRequestFactory(asyncRequestFactory, getInterceptors());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -18,9 +18,10 @@ package org.springframework.web.client;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.charset.Charset;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
|
@ -43,12 +44,20 @@ import org.springframework.http.client.AsyncClientHttpRequestExecution;
|
|||
import org.springframework.http.client.AsyncClientHttpRequestInterceptor;
|
||||
import org.springframework.http.client.ClientHttpResponse;
|
||||
import org.springframework.http.client.HttpComponentsAsyncClientHttpRequestFactory;
|
||||
import org.springframework.http.client.support.HttpRequestWrapper;
|
||||
import org.springframework.util.LinkedMultiValueMap;
|
||||
import org.springframework.util.MultiValueMap;
|
||||
import org.springframework.util.concurrent.ListenableFuture;
|
||||
import org.springframework.util.concurrent.ListenableFutureCallback;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertSame;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
/**
|
||||
* @author Arjen Poutsma
|
||||
|
|
@ -610,8 +619,8 @@ public class AsyncRestTemplateIntegrationTests extends AbstractJettyServerTestCa
|
|||
@Test
|
||||
public void getAndInterceptResponse() throws Exception {
|
||||
RequestInterceptor interceptor = new RequestInterceptor();
|
||||
template.setInterceptors(Arrays.<AsyncClientHttpRequestInterceptor>asList(interceptor));
|
||||
ListenableFuture<ResponseEntity<String>> future = template.getForEntity(baseUrl + "/get", String.class);
|
||||
template.setInterceptors(Collections.singletonList(interceptor));
|
||||
ListenableFuture<ResponseEntity<String>> future = template.getForEntity("/get", String.class);
|
||||
|
||||
ResponseEntity<String> response = future.get();
|
||||
assertNotNull(interceptor.response);
|
||||
|
|
@ -623,28 +632,44 @@ public class AsyncRestTemplateIntegrationTests extends AbstractJettyServerTestCa
|
|||
@Test
|
||||
public void getAndInterceptError() throws Exception {
|
||||
RequestInterceptor interceptor = new RequestInterceptor();
|
||||
template.setInterceptors(Arrays.<AsyncClientHttpRequestInterceptor>asList(interceptor));
|
||||
ListenableFuture<ResponseEntity<String>> future = template.getForEntity(baseUrl + "/status/notfound", String.class);
|
||||
template.setInterceptors(Collections.singletonList(interceptor));
|
||||
ListenableFuture<ResponseEntity<String>> future = template.getForEntity("/status/notfound", String.class);
|
||||
|
||||
try {
|
||||
future.get();
|
||||
fail("No exception thrown");
|
||||
} catch (ExecutionException ex) {
|
||||
// expected
|
||||
}
|
||||
assertNotNull(interceptor.response);
|
||||
assertEquals(HttpStatus.NOT_FOUND, interceptor.response.getStatusCode());
|
||||
assertNull(interceptor.exception);
|
||||
}
|
||||
|
||||
public static class RequestInterceptor implements AsyncClientHttpRequestInterceptor {
|
||||
|
||||
private ClientHttpResponse response;
|
||||
private static class RequestInterceptor implements AsyncClientHttpRequestInterceptor {
|
||||
|
||||
private Throwable exception;
|
||||
private volatile ClientHttpResponse response;
|
||||
|
||||
private volatile Throwable exception;
|
||||
|
||||
@Override
|
||||
public ListenableFuture<ClientHttpResponse> interceptRequest(HttpRequest request, byte[] body,
|
||||
AsyncClientHttpRequestExecution execution) throws IOException {
|
||||
public ListenableFuture<ClientHttpResponse> intercept(HttpRequest request, byte[] body,
|
||||
AsyncClientHttpRequestExecution execution) throws IOException {
|
||||
|
||||
request = new HttpRequestWrapper(request) {
|
||||
|
||||
@Override
|
||||
public URI getURI() {
|
||||
try {
|
||||
return new URI(baseUrl + super.getURI().toString());
|
||||
}
|
||||
catch (URISyntaxException ex) {
|
||||
throw new IllegalStateException(ex);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
ListenableFuture<ClientHttpResponse> future = execution.executeAsync(request, body);
|
||||
future.addCallback(resp -> response = resp, ex -> exception = ex);
|
||||
return future;
|
||||
|
|
|
|||
Loading…
Reference in New Issue