Replace anonymous impls with ListenableFutureAdapater

Before this change AsyncRestTemplate had two anonymous implementations
of ListenableFuture that were adapting the result. Those have been
replaces with ListenableFutureAdapter.

This commit is preparation for SPR-13785.
This commit is contained in:
Rossen Stoyanchev 2016-01-14 16:47:55 -05:00
parent 93298fc9fa
commit 037f351efd
1 changed files with 31 additions and 110 deletions

View File

@ -24,8 +24,6 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.springframework.core.ParameterizedTypeReference; import org.springframework.core.ParameterizedTypeReference;
import org.springframework.core.task.AsyncListenableTaskExecutor; import org.springframework.core.task.AsyncListenableTaskExecutor;
@ -44,11 +42,8 @@ import org.springframework.http.client.SimpleClientHttpRequestFactory;
import org.springframework.http.client.support.InterceptingAsyncHttpAccessor; import org.springframework.http.client.support.InterceptingAsyncHttpAccessor;
import org.springframework.http.converter.HttpMessageConverter; import org.springframework.http.converter.HttpMessageConverter;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import org.springframework.util.concurrent.FailureCallback;
import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureAdapter; import org.springframework.util.concurrent.ListenableFutureAdapter;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.util.concurrent.SuccessCallback;
import org.springframework.web.util.DefaultUriTemplateHandler; import org.springframework.web.util.DefaultUriTemplateHandler;
import org.springframework.web.util.UriTemplateHandler; import org.springframework.web.util.UriTemplateHandler;
@ -245,75 +240,37 @@ public class AsyncRestTemplate extends InterceptingAsyncHttpAccessor implements
// POST // POST
@Override @Override
public ListenableFuture<URI> postForLocation(String url, HttpEntity<?> request, Object... uriVariables) public ListenableFuture<URI> postForLocation(String url, HttpEntity<?> request, Object... uriVars)
throws RestClientException { throws RestClientException {
AsyncRequestCallback requestCallback = httpEntityCallback(request); AsyncRequestCallback callback = httpEntityCallback(request);
ResponseExtractor<HttpHeaders> headersExtractor = headersExtractor(); ResponseExtractor<HttpHeaders> extractor = headersExtractor();
ListenableFuture<HttpHeaders> headersFuture = ListenableFuture<HttpHeaders> future = execute(url, HttpMethod.POST, callback, extractor, uriVars);
execute(url, HttpMethod.POST, requestCallback, headersExtractor, uriVariables); return adaptToLocationHeader(future);
return extractLocationHeader(headersFuture);
} }
@Override @Override
public ListenableFuture<URI> postForLocation(String url, HttpEntity<?> request, Map<String, ?> uriVariables) public ListenableFuture<URI> postForLocation(String url, HttpEntity<?> request, Map<String, ?> uriVars)
throws RestClientException { throws RestClientException {
AsyncRequestCallback requestCallback = httpEntityCallback(request); AsyncRequestCallback callback = httpEntityCallback(request);
ResponseExtractor<HttpHeaders> headersExtractor = headersExtractor(); ResponseExtractor<HttpHeaders> extractor = headersExtractor();
ListenableFuture<HttpHeaders> headersFuture = ListenableFuture<HttpHeaders> future = execute(url, HttpMethod.POST, callback, extractor, uriVars);
execute(url, HttpMethod.POST, requestCallback, headersExtractor, uriVariables); return adaptToLocationHeader(future);
return extractLocationHeader(headersFuture);
} }
@Override @Override
public ListenableFuture<URI> postForLocation(URI url, HttpEntity<?> request) throws RestClientException { public ListenableFuture<URI> postForLocation(URI url, HttpEntity<?> request) throws RestClientException {
AsyncRequestCallback requestCallback = httpEntityCallback(request); AsyncRequestCallback callback = httpEntityCallback(request);
ResponseExtractor<HttpHeaders> headersExtractor = headersExtractor(); ResponseExtractor<HttpHeaders> extractor = headersExtractor();
ListenableFuture<HttpHeaders> headersFuture = ListenableFuture<HttpHeaders> future = execute(url, HttpMethod.POST, callback, extractor);
execute(url, HttpMethod.POST, requestCallback, headersExtractor); return adaptToLocationHeader(future);
return extractLocationHeader(headersFuture);
} }
private static ListenableFuture<URI> extractLocationHeader(final ListenableFuture<HttpHeaders> headersFuture) { private static ListenableFuture<URI> adaptToLocationHeader(ListenableFuture<HttpHeaders> future) {
return new ListenableFuture<URI>() { return new ListenableFutureAdapter<URI, HttpHeaders>(future) {
@Override @Override
public void addCallback(final ListenableFutureCallback<? super URI> callback) { protected URI adapt(HttpHeaders headers) throws ExecutionException {
addCallback(callback, callback);
}
@Override
public void addCallback(final SuccessCallback<? super URI> successCallback, final FailureCallback failureCallback) {
headersFuture.addCallback(new ListenableFutureCallback<HttpHeaders>() {
@Override
public void onSuccess(HttpHeaders result) {
successCallback.onSuccess(result.getLocation());
}
@Override
public void onFailure(Throwable ex) {
failureCallback.onFailure(ex);
}
});
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return headersFuture.cancel(mayInterruptIfRunning);
}
@Override
public boolean isCancelled() {
return headersFuture.isCancelled();
}
@Override
public boolean isDone() {
return headersFuture.isDone();
}
@Override
public URI get() throws InterruptedException, ExecutionException {
HttpHeaders headers = headersFuture.get();
return headers.getLocation();
}
@Override
public URI get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
HttpHeaders headers = headersFuture.get(timeout, unit);
return headers.getLocation(); return headers.getLocation();
} }
}; };
@ -389,71 +346,35 @@ public class AsyncRestTemplate extends InterceptingAsyncHttpAccessor implements
// OPTIONS // OPTIONS
@Override @Override
public ListenableFuture<Set<HttpMethod>> optionsForAllow(String url, Object... uriVariables) throws RestClientException { public ListenableFuture<Set<HttpMethod>> optionsForAllow(String url, Object... uriVars) throws RestClientException {
ResponseExtractor<HttpHeaders> headersExtractor = headersExtractor(); ResponseExtractor<HttpHeaders> extractor = headersExtractor();
ListenableFuture<HttpHeaders> headersFuture = execute(url, HttpMethod.OPTIONS, null, headersExtractor, uriVariables); ListenableFuture<HttpHeaders> future = execute(url, HttpMethod.OPTIONS, null, extractor, uriVars);
return extractAllowHeader(headersFuture); return adaptToAllowHeader(future);
} }
@Override @Override
public ListenableFuture<Set<HttpMethod>> optionsForAllow(String url, Map<String, ?> uriVariables) throws RestClientException { public ListenableFuture<Set<HttpMethod>> optionsForAllow(String url, Map<String, ?> uriVars) throws RestClientException {
ResponseExtractor<HttpHeaders> headersExtractor = headersExtractor(); ResponseExtractor<HttpHeaders> extractor = headersExtractor();
ListenableFuture<HttpHeaders> headersFuture = execute(url, HttpMethod.OPTIONS, null, headersExtractor, uriVariables); ListenableFuture<HttpHeaders> future = execute(url, HttpMethod.OPTIONS, null, extractor, uriVars);
return extractAllowHeader(headersFuture); return adaptToAllowHeader(future);
} }
@Override @Override
public ListenableFuture<Set<HttpMethod>> optionsForAllow(URI url) throws RestClientException { public ListenableFuture<Set<HttpMethod>> optionsForAllow(URI url) throws RestClientException {
ResponseExtractor<HttpHeaders> headersExtractor = headersExtractor(); ResponseExtractor<HttpHeaders> extractor = headersExtractor();
ListenableFuture<HttpHeaders> headersFuture = execute(url, HttpMethod.OPTIONS, null, headersExtractor); ListenableFuture<HttpHeaders> future = execute(url, HttpMethod.OPTIONS, null, extractor);
return extractAllowHeader(headersFuture); return adaptToAllowHeader(future);
} }
private static ListenableFuture<Set<HttpMethod>> extractAllowHeader(final ListenableFuture<HttpHeaders> headersFuture) { private static ListenableFuture<Set<HttpMethod>> adaptToAllowHeader(ListenableFuture<HttpHeaders> future) {
return new ListenableFuture<Set<HttpMethod>>() { return new ListenableFutureAdapter<Set<HttpMethod>, HttpHeaders>(future) {
@Override @Override
public void addCallback(final ListenableFutureCallback<? super Set<HttpMethod>> callback) { protected Set<HttpMethod> adapt(HttpHeaders headers) throws ExecutionException {
addCallback(callback, callback);
}
@Override
public void addCallback(final SuccessCallback<? super Set<HttpMethod>> successCallback, final FailureCallback failureCallback) {
headersFuture.addCallback(new ListenableFutureCallback<HttpHeaders>() {
@Override
public void onSuccess(HttpHeaders result) {
successCallback.onSuccess(result.getAllow());
}
@Override
public void onFailure(Throwable ex) {
failureCallback.onFailure(ex);
}
});
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return headersFuture.cancel(mayInterruptIfRunning);
}
@Override
public boolean isCancelled() {
return headersFuture.isCancelled();
}
@Override
public boolean isDone() {
return headersFuture.isDone();
}
@Override
public Set<HttpMethod> get() throws InterruptedException, ExecutionException {
HttpHeaders headers = headersFuture.get();
return headers.getAllow();
}
@Override
public Set<HttpMethod> get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
HttpHeaders headers = headersFuture.get(timeout, unit);
return headers.getAllow(); return headers.getAllow();
} }
}; };
} }
// exchange // exchange
@Override @Override