Remove RxJava 1.x variants of WebClient adapters

Issue: SPR-14743
This commit is contained in:
Brian Clozel 2016-10-04 14:47:55 +02:00
parent ee17f56626
commit 11aa920785
5 changed files with 0 additions and 845 deletions

View File

@ -1,169 +0,0 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.client.reactive.support;
import java.net.URI;
import reactor.adapter.RxJava1Adapter;
import reactor.core.publisher.Mono;
import rx.Observable;
import rx.Single;
import org.springframework.core.ResolvableType;
import org.springframework.http.HttpCookie;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.http.client.reactive.ClientHttpRequest;
import org.springframework.web.client.RestClientException;
import org.springframework.web.client.reactive.ClientWebRequest;
import org.springframework.web.client.reactive.ClientWebRequestBuilder;
import org.springframework.web.client.reactive.ClientWebRequestPostProcessor;
import org.springframework.web.client.reactive.DefaultClientWebRequestBuilder;
/**
* Builds a {@link ClientHttpRequest} using a {@code Observable}
* or {@code Single} as request body.
*
* <p>See static factory methods in {@link RxJava1ClientWebRequestBuilders}
*
* @author Brian Clozel
* @see RxJava1ClientWebRequestBuilders
*/
public class RxJava1ClientWebRequestBuilder implements ClientWebRequestBuilder {
private final DefaultClientWebRequestBuilder delegate;
public RxJava1ClientWebRequestBuilder(HttpMethod httpMethod, String urlTemplate,
Object... urlVariables) throws RestClientException {
this.delegate = new DefaultClientWebRequestBuilder(httpMethod, urlTemplate, urlVariables);
}
public RxJava1ClientWebRequestBuilder(HttpMethod httpMethod, URI url) {
this.delegate = new DefaultClientWebRequestBuilder(httpMethod, url);
}
/**
* Add an HTTP request header
*/
public RxJava1ClientWebRequestBuilder header(String name, String... values) {
this.delegate.header(name, values);
return this;
}
/**
* Add all provided HTTP request headers
*/
public RxJava1ClientWebRequestBuilder headers(HttpHeaders httpHeaders) {
this.delegate.headers(httpHeaders);
return this;
}
/**
* Set the Content-Type request header to the given {@link MediaType}
*/
public RxJava1ClientWebRequestBuilder contentType(MediaType contentType) {
this.delegate.contentType(contentType);
return this;
}
/**
* Set the Content-Type request header to the given media type
*/
public RxJava1ClientWebRequestBuilder contentType(String contentType) {
this.delegate.contentType(contentType);
return this;
}
/**
* Set the Accept request header to the given {@link MediaType}s
*/
public RxJava1ClientWebRequestBuilder accept(MediaType... mediaTypes) {
this.delegate.accept(mediaTypes);
return this;
}
/**
* Set the Accept request header to the given media types
*/
public RxJava1ClientWebRequestBuilder accept(String... mediaTypes) {
this.delegate.accept(mediaTypes);
return this;
}
/**
* Add a Cookie to the HTTP request
*/
public RxJava1ClientWebRequestBuilder cookie(String name, String value) {
this.delegate.cookie(name, value);
return this;
}
/**
* Add a Cookie to the HTTP request
*/
public RxJava1ClientWebRequestBuilder cookie(HttpCookie cookie) {
this.delegate.cookie(cookie);
return this;
}
/**
* Allows performing more complex operations with a strategy. For example, a
* {@link ClientWebRequestPostProcessor} implementation might accept the arguments of username
* and password and set an HTTP Basic authentication header.
*
* @param postProcessor the {@link ClientWebRequestPostProcessor} to use. Cannot be null.
*
* @return this instance for further modifications.
*/
public RxJava1ClientWebRequestBuilder apply(ClientWebRequestPostProcessor postProcessor) {
this.delegate.apply(postProcessor);
return this;
}
/**
* Use the given object as the request body
*/
public RxJava1ClientWebRequestBuilder body(Object content) {
this.delegate.body(Mono.just(content), ResolvableType.forInstance(content));
return this;
}
/**
* Use the given {@link Single} as the request body and use its {@link ResolvableType}
* as type information for the element published by this reactive stream
*/
public RxJava1ClientWebRequestBuilder body(Single<?> content, ResolvableType elementType) {
this.delegate.body(RxJava1Adapter.singleToMono(content), elementType);
return this;
}
/**
* Use the given {@link Observable} as the request body and use its {@link ResolvableType}
* as type information for the elements published by this reactive stream
*/
public RxJava1ClientWebRequestBuilder body(Observable<?> content, ResolvableType elementType) {
this.delegate.body(RxJava1Adapter.observableToFlux(content), elementType);
return this;
}
@Override
public ClientWebRequest build() {
return this.delegate.build();
}
}

View File

@ -1,110 +0,0 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.client.reactive.support;
import org.springframework.http.HttpMethod;
/**
* Static factory methods for {@link RxJava1ClientWebRequestBuilder ClientWebRequestBuilders}
* using the {@link rx.Observable} and {@link rx.Single} API.
*
* @author Brian Clozel
*/
public abstract class RxJava1ClientWebRequestBuilders {
/**
* Create a {@link RxJava1ClientWebRequestBuilder} for a GET request.
*
* @param urlTemplate a URL template; the resulting URL will be encoded
* @param urlVariables zero or more URL variables
*/
public static RxJava1ClientWebRequestBuilder get(String urlTemplate, Object... urlVariables) {
return new RxJava1ClientWebRequestBuilder(HttpMethod.GET, urlTemplate, urlVariables);
}
/**
* Create a {@link RxJava1ClientWebRequestBuilder} for a POST request.
*
* @param urlTemplate a URL template; the resulting URL will be encoded
* @param urlVariables zero or more URL variables
*/
public static RxJava1ClientWebRequestBuilder post(String urlTemplate, Object... urlVariables) {
return new RxJava1ClientWebRequestBuilder(HttpMethod.POST, urlTemplate, urlVariables);
}
/**
* Create a {@link RxJava1ClientWebRequestBuilder} for a PUT request.
*
* @param urlTemplate a URL template; the resulting URL will be encoded
* @param urlVariables zero or more URL variables
*/
public static RxJava1ClientWebRequestBuilder put(String urlTemplate, Object... urlVariables) {
return new RxJava1ClientWebRequestBuilder(HttpMethod.PUT, urlTemplate, urlVariables);
}
/**
* Create a {@link RxJava1ClientWebRequestBuilder} for a PATCH request.
*
* @param urlTemplate a URL template; the resulting URL will be encoded
* @param urlVariables zero or more URL variables
*/
public static RxJava1ClientWebRequestBuilder patch(String urlTemplate, Object... urlVariables) {
return new RxJava1ClientWebRequestBuilder(HttpMethod.PATCH, urlTemplate, urlVariables);
}
/**
* Create a {@link RxJava1ClientWebRequestBuilder} for a DELETE request.
*
* @param urlTemplate a URL template; the resulting URL will be encoded
* @param urlVariables zero or more URL variables
*/
public static RxJava1ClientWebRequestBuilder delete(String urlTemplate, Object... urlVariables) {
return new RxJava1ClientWebRequestBuilder(HttpMethod.DELETE, urlTemplate, urlVariables);
}
/**
* Create a {@link RxJava1ClientWebRequestBuilder} for an OPTIONS request.
*
* @param urlTemplate a URL template; the resulting URL will be encoded
* @param urlVariables zero or more URL variables
*/
public static RxJava1ClientWebRequestBuilder options(String urlTemplate, Object... urlVariables) {
return new RxJava1ClientWebRequestBuilder(HttpMethod.OPTIONS, urlTemplate, urlVariables);
}
/**
* Create a {@link RxJava1ClientWebRequestBuilder} for a HEAD request.
*
* @param urlTemplate a URL template; the resulting URL will be encoded
* @param urlVariables zero or more URL variables
*/
public static RxJava1ClientWebRequestBuilder head(String urlTemplate, Object... urlVariables) {
return new RxJava1ClientWebRequestBuilder(HttpMethod.HEAD, urlTemplate, urlVariables);
}
/**
* Create a {@link RxJava1ClientWebRequestBuilder} for a request with the given HTTP method.
*
* @param httpMethod the HTTP method
* @param urlTemplate a URL template; the resulting URL will be encoded
* @param urlVariables zero or more URL variables
*/
public static RxJava1ClientWebRequestBuilder request(HttpMethod httpMethod, String urlTemplate, Object... urlVariables) {
return new RxJava1ClientWebRequestBuilder(httpMethod, urlTemplate, urlVariables);
}
}

View File

@ -1,221 +0,0 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.client.reactive.support;
import java.util.Collections;
import java.util.List;
import reactor.adapter.RxJava1Adapter;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import rx.Observable;
import rx.Single;
import org.springframework.core.ResolvableType;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.http.client.reactive.ClientHttpResponse;
import org.springframework.http.codec.HttpMessageReader;
import org.springframework.web.client.reactive.BodyExtractor;
import org.springframework.web.client.reactive.ResponseExtractor;
import org.springframework.web.client.reactive.WebClientException;
/**
* Static factory methods for {@link ResponseExtractor} and {@link BodyExtractor},
* based on the {@link Observable} and {@link Single} APIs.
*
* @author Brian Clozel
* @since 5.0
*/
public class RxJava1ResponseExtractors {
/**
* Extract the response body and decode it, returning it as a {@code Single<T>}.
* @see ResolvableType#forClassWithGenerics(Class, Class[])
*/
@SuppressWarnings("unchecked")
public static <T> ResponseExtractor<Single<T>> body(ResolvableType bodyType) {
return (clientResponse, webClientConfig) -> (Single<T>) RxJava1Adapter
.publisherToSingle(clientResponse
.doOnNext(response -> webClientConfig.getResponseErrorHandler()
.handleError(response, webClientConfig.getMessageReaders()))
.flatMap(resp -> decodeResponseBodyAsMono(resp, bodyType, webClientConfig.getMessageReaders())));
}
/**
* Extract the response body and decode it, returning it as a {@code Single<T>}.
*/
public static <T> ResponseExtractor<Single<T>> body(Class<T> sourceClass) {
ResolvableType bodyType = ResolvableType.forClass(sourceClass);
return body(bodyType);
}
/**
* Extract the response body and decode it, returning it as a {@code Single<T>}.
* @see ResolvableType#forClassWithGenerics(Class, Class[])
*/
@SuppressWarnings("unchecked")
public static <T> BodyExtractor<Single<T>> as(ResolvableType bodyType) {
return (clientResponse, messageConverters) ->
(Single<T>) RxJava1Adapter.publisherToSingle(
decodeResponseBodyAsMono(clientResponse, bodyType, messageConverters));
}
/**
* Extract the response body and decode it, returning it as a {@code Single<T>}
*/
public static <T> BodyExtractor<Single<T>> as(Class<T> sourceClass) {
ResolvableType bodyType = ResolvableType.forClass(sourceClass);
return as(bodyType);
}
/**
* Extract the response body and decode it, returning it as an {@code Observable<T>}
* @see ResolvableType#forClassWithGenerics(Class, Class[])
*/
public static <T> ResponseExtractor<Observable<T>> bodyStream(ResolvableType bodyType) {
return (clientResponse, webClientConfig) -> RxJava1Adapter
.publisherToObservable(clientResponse
.doOnNext(response -> webClientConfig.getResponseErrorHandler()
.handleError(response, webClientConfig.getMessageReaders()))
.flatMap(resp -> decodeResponseBody(resp, bodyType, webClientConfig.getMessageReaders())));
}
/**
* Extract the response body and decode it, returning it as an {@code Observable<T>}.
*/
public static <T> ResponseExtractor<Observable<T>> bodyStream(Class<T> sourceClass) {
ResolvableType bodyType = ResolvableType.forClass(sourceClass);
return bodyStream(bodyType);
}
/**
* Extract the response body and decode it, returning it as a {@code Observable<T>}.
* @see ResolvableType#forClassWithGenerics(Class, Class[])
*/
@SuppressWarnings("unchecked")
public static <T> BodyExtractor<Observable<T>> asStream(ResolvableType bodyType) {
return (clientResponse, messageConverters) ->
(Observable<T>) RxJava1Adapter
.publisherToObservable(decodeResponseBody(clientResponse, bodyType, messageConverters));
}
/**
* Extract the response body and decode it, returning it as a {@code Observable<T>}.
*/
public static <T> BodyExtractor<Observable<T>> asStream(Class<T> sourceClass) {
ResolvableType bodyType = ResolvableType.forClass(sourceClass);
return asStream(bodyType);
}
/**
* Extract the full response body as a {@code ResponseEntity}
* with its body decoded as a single type {@code T}.
* @see ResolvableType#forClassWithGenerics(Class, Class[])
*/
@SuppressWarnings("unchecked")
public static <T> ResponseExtractor<Single<ResponseEntity<T>>> response(ResolvableType bodyType) {
return (clientResponse, webClientConfig) ->
RxJava1Adapter.publisherToSingle(clientResponse
.then(response ->
Mono.when(
decodeResponseBody(response, bodyType, webClientConfig.getMessageReaders()).next(),
Mono.just(response.getHeaders()),
Mono.just(response.getStatusCode())))
.map(tuple ->
new ResponseEntity<>((T) tuple.getT1(), tuple.getT2(), tuple.getT3())));
}
/**
* Extract the full response body as a {@code ResponseEntity}
* with its body decoded as a single type {@code T}.
*/
public static <T> ResponseExtractor<Single<ResponseEntity<T>>> response(Class<T> sourceClass) {
ResolvableType bodyType = ResolvableType.forClass(sourceClass);
return response(bodyType);
}
/**
* Extract the full response body as a {@code ResponseEntity}
* with its body decoded as an {@code Observable<T>}.
*/
public static <T> ResponseExtractor<Single<ResponseEntity<Observable<T>>>> responseStream(Class<T> sourceClass) {
ResolvableType resolvableType = ResolvableType.forClass(sourceClass);
return responseStream(resolvableType);
}
/**
* Extract the full response body as a {@code ResponseEntity}
* with its body decoded as an {@code Observable<T>}
* @see ResolvableType#forClassWithGenerics(Class, Class[])
*/
public static <T> ResponseExtractor<Single<ResponseEntity<Observable<T>>>> responseStream(ResolvableType bodyType) {
return (clientResponse, webClientConfig) -> RxJava1Adapter.publisherToSingle(clientResponse
.map(response -> new ResponseEntity<>(
RxJava1Adapter
.publisherToObservable(
// RxJava1ResponseExtractors.<T> is required for Eclipse JDT.
RxJava1ResponseExtractors.<T> decodeResponseBody(response, bodyType, webClientConfig.getMessageReaders())),
response.getHeaders(),
response.getStatusCode())));
}
/**
* Extract the response headers as an {@code HttpHeaders} instance.
*/
public static ResponseExtractor<Single<HttpHeaders>> headers() {
return (clientResponse, messageConverters) -> RxJava1Adapter
.publisherToSingle(clientResponse.map(resp -> resp.getHeaders()));
}
@SuppressWarnings("unchecked")
protected static <T> Flux<T> decodeResponseBody(ClientHttpResponse response,
ResolvableType responseType, List<HttpMessageReader<?>> messageReaders) {
MediaType contentType = response.getHeaders().getContentType();
HttpMessageReader<?> converter = resolveMessageReader(messageReaders, responseType, contentType);
return (Flux<T>) converter.read(responseType, response, Collections.emptyMap());
}
@SuppressWarnings("unchecked")
protected static <T> Mono<T> decodeResponseBodyAsMono(ClientHttpResponse response,
ResolvableType responseType, List<HttpMessageReader<?>> messageReaders) {
MediaType contentType = response.getHeaders().getContentType();
HttpMessageReader<?> converter = resolveMessageReader(messageReaders, responseType, contentType);
return (Mono<T>) converter.readMono(responseType, response, Collections.emptyMap());
}
protected static HttpMessageReader<?> resolveMessageReader(List<HttpMessageReader<?>> messageReaders,
ResolvableType responseType, MediaType contentType) {
return messageReaders.stream()
.filter(e -> e.canRead(responseType, contentType))
.findFirst()
.orElseThrow(() ->
new WebClientException(
"Could not decode response body of type '" + contentType
+ "' with target type '" + responseType.toString() + "'"));
}
}

View File

@ -1,4 +0,0 @@
/**
* Support classes for the reactive WebClient.
*/
package org.springframework.web.client.reactive.support;

View File

@ -1,341 +0,0 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.client.reactive;
import static org.junit.Assert.*;
import static org.springframework.web.client.reactive.support.RxJava1ClientWebRequestBuilders.*;
import static org.springframework.web.client.reactive.support.RxJava1ResponseExtractors.*;
import java.util.concurrent.TimeUnit;
import okhttp3.HttpUrl;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import okhttp3.mockwebserver.RecordedRequest;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import rx.Observable;
import rx.Single;
import rx.observers.TestSubscriber;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.http.codec.Pojo;
/**
* {@link WebClient} integration tests with the {@code Observable} and {@code Single} API.
*
* @author Brian Clozel
*/
public class RxJava1WebClientIntegrationTests {
private MockWebServer server;
private WebClient webClient;
@Before
public void setup() {
this.server = new MockWebServer();
this.webClient = new WebClient(new ReactorClientHttpConnector());
}
@Test
public void shouldGetHeaders() throws Exception {
HttpUrl baseUrl = server.url("/greeting?name=Spring");
this.server.enqueue(new MockResponse().setHeader("Content-Type", "text/plain").setBody("Hello Spring!"));
Single<HttpHeaders> result = this.webClient
.perform(get(baseUrl.toString()))
.extract(headers());
TestSubscriber<HttpHeaders> ts = new TestSubscriber<HttpHeaders>();
result.subscribe(ts);
ts.awaitTerminalEvent(2, TimeUnit.SECONDS);
HttpHeaders httpHeaders = ts.getOnNextEvents().get(0);
assertEquals(MediaType.TEXT_PLAIN, httpHeaders.getContentType());
assertEquals(13L, httpHeaders.getContentLength());
ts.assertValueCount(1);
ts.assertCompleted();
RecordedRequest request = server.takeRequest();
assertEquals(1, server.getRequestCount());
assertEquals("*/*", request.getHeader(HttpHeaders.ACCEPT));
assertEquals("/greeting?name=Spring", request.getPath());
}
@Test
public void shouldGetPlainTextResponseAsObject() throws Exception {
HttpUrl baseUrl = server.url("/greeting?name=Spring");
this.server.enqueue(new MockResponse().setBody("Hello Spring!"));
Single<String> result = this.webClient
.perform(get(baseUrl.toString())
.header("X-Test-Header", "testvalue"))
.extract(body(String.class));
TestSubscriber<String> ts = new TestSubscriber<String>();
result.subscribe(ts);
ts.awaitTerminalEvent(2, TimeUnit.SECONDS);
String response = ts.getOnNextEvents().get(0);
assertEquals("Hello Spring!", response);
ts.assertValueCount(1);
ts.assertCompleted();
RecordedRequest request = server.takeRequest();
assertEquals(1, server.getRequestCount());
assertEquals("testvalue", request.getHeader("X-Test-Header"));
assertEquals("*/*", request.getHeader(HttpHeaders.ACCEPT));
assertEquals("/greeting?name=Spring", request.getPath());
}
@Test
public void shouldGetPlainTextResponse() throws Exception {
HttpUrl baseUrl = server.url("/greeting?name=Spring");
this.server.enqueue(new MockResponse().setHeader("Content-Type", "text/plain").setBody("Hello Spring!"));
Single<ResponseEntity<String>> result = this.webClient
.perform(get(baseUrl.toString())
.accept(MediaType.TEXT_PLAIN))
.extract(response(String.class));
TestSubscriber<ResponseEntity<String>> ts = new TestSubscriber<ResponseEntity<String>>();
result.subscribe(ts);
ts.awaitTerminalEvent(2, TimeUnit.SECONDS);
ResponseEntity<String> response = ts.getOnNextEvents().get(0);
assertEquals(200, response.getStatusCode().value());
assertEquals(MediaType.TEXT_PLAIN, response.getHeaders().getContentType());
assertEquals("Hello Spring!", response.getBody());
ts.assertValueCount(1);
ts.assertCompleted();
RecordedRequest request = server.takeRequest();
assertEquals(1, server.getRequestCount());
assertEquals("/greeting?name=Spring", request.getPath());
assertEquals("text/plain", request.getHeader(HttpHeaders.ACCEPT));
}
@Test
public void shouldGetJsonAsMonoOfString() throws Exception {
HttpUrl baseUrl = server.url("/json");
String content = "{\"bar\":\"barbar\",\"foo\":\"foofoo\"}";
this.server.enqueue(new MockResponse().setHeader("Content-Type", "application/json")
.setBody(content));
Single<String> result = this.webClient
.perform(get(baseUrl.toString())
.accept(MediaType.APPLICATION_JSON))
.extract(body(String.class));
TestSubscriber<String> ts = new TestSubscriber<String>();
result.subscribe(ts);
ts.awaitTerminalEvent(2, TimeUnit.SECONDS);
String response = ts.getOnNextEvents().get(0);
assertEquals(content, response);
ts.assertValueCount(1);
ts.assertCompleted();
RecordedRequest request = server.takeRequest();
assertEquals(1, server.getRequestCount());
assertEquals("/json", request.getPath());
assertEquals("application/json", request.getHeader(HttpHeaders.ACCEPT));
}
@Test
public void shouldGetJsonAsMonoOfPojo() throws Exception {
HttpUrl baseUrl = server.url("/pojo");
this.server.enqueue(new MockResponse().setHeader("Content-Type", "application/json")
.setBody("{\"bar\":\"barbar\",\"foo\":\"foofoo\"}"));
Single<Pojo> result = this.webClient
.perform(get(baseUrl.toString())
.accept(MediaType.APPLICATION_JSON))
.extract(body(Pojo.class));
TestSubscriber<Pojo> ts = new TestSubscriber<Pojo>();
result.subscribe(ts);
ts.awaitTerminalEvent(2, TimeUnit.SECONDS);
Pojo response = ts.getOnNextEvents().get(0);
assertEquals("barbar", response.getBar());
ts.assertValueCount(1);
ts.assertCompleted();
RecordedRequest request = server.takeRequest();
assertEquals(1, server.getRequestCount());
assertEquals("/pojo", request.getPath());
assertEquals("application/json", request.getHeader(HttpHeaders.ACCEPT));
}
@Test
public void shouldGetJsonAsFluxOfPojos() throws Exception {
HttpUrl baseUrl = server.url("/pojos");
this.server.enqueue(new MockResponse().setHeader("Content-Type", "application/json")
.setBody("[{\"bar\":\"bar1\",\"foo\":\"foo1\"},{\"bar\":\"bar2\",\"foo\":\"foo2\"}]"));
Observable<Pojo> result = this.webClient
.perform(get(baseUrl.toString())
.accept(MediaType.APPLICATION_JSON))
.extract(bodyStream(Pojo.class));
TestSubscriber<Pojo> ts = new TestSubscriber<Pojo>();
result.subscribe(ts);
ts.awaitTerminalEvent(2, TimeUnit.SECONDS);
assertThat(ts.getOnNextEvents().get(0).getBar(), Matchers.is("bar1"));
assertThat(ts.getOnNextEvents().get(1).getBar(), Matchers.is("bar2"));
ts.assertValueCount(2);
ts.assertCompleted();
RecordedRequest request = server.takeRequest();
assertEquals(1, server.getRequestCount());
assertEquals("/pojos", request.getPath());
assertEquals("application/json", request.getHeader(HttpHeaders.ACCEPT));
}
@Test
public void shouldGetJsonAsResponseOfPojosStream() throws Exception {
HttpUrl baseUrl = server.url("/pojos");
this.server.enqueue(new MockResponse().setHeader("Content-Type", "application/json")
.setBody("[{\"bar\":\"bar1\",\"foo\":\"foo1\"},{\"bar\":\"bar2\",\"foo\":\"foo2\"}]"));
Single<ResponseEntity<Observable<Pojo>>> result = this.webClient
.perform(get(baseUrl.toString())
.accept(MediaType.APPLICATION_JSON))
.extract(responseStream(Pojo.class));
TestSubscriber<ResponseEntity<Observable<Pojo>>> ts = new TestSubscriber<ResponseEntity<Observable<Pojo>>>();
result.subscribe(ts);
ts.awaitTerminalEvent(2, TimeUnit.SECONDS);
ResponseEntity<Observable<Pojo>> response = ts.getOnNextEvents().get(0);
assertEquals(200, response.getStatusCode().value());
assertEquals(MediaType.APPLICATION_JSON, response.getHeaders().getContentType());
ts.assertValueCount(1);
ts.assertCompleted();
RecordedRequest request = server.takeRequest();
assertEquals(1, server.getRequestCount());
assertEquals("/pojos", request.getPath());
assertEquals("application/json", request.getHeader(HttpHeaders.ACCEPT));
}
@Test
public void shouldPostPojoAsJson() throws Exception {
HttpUrl baseUrl = server.url("/pojo/capitalize");
this.server.enqueue(new MockResponse()
.setHeader("Content-Type", "application/json")
.setBody("{\"bar\":\"BARBAR\",\"foo\":\"FOOFOO\"}"));
Pojo spring = new Pojo("foofoo", "barbar");
Single<Pojo> result = this.webClient
.perform(post(baseUrl.toString())
.body(spring)
.contentType(MediaType.APPLICATION_JSON)
.accept(MediaType.APPLICATION_JSON))
.extract(body(Pojo.class));
TestSubscriber<Pojo> ts = new TestSubscriber<Pojo>();
result.subscribe(ts);
ts.awaitTerminalEvent(2, TimeUnit.SECONDS);
assertThat(ts.getOnNextEvents().get(0).getBar(), Matchers.is("BARBAR"));
ts.assertValueCount(1);
ts.assertCompleted();
RecordedRequest request = server.takeRequest();
assertEquals(1, server.getRequestCount());
assertEquals("/pojo/capitalize", request.getPath());
assertEquals("{\"foo\":\"foofoo\",\"bar\":\"barbar\"}", request.getBody().readUtf8());
assertEquals("chunked", request.getHeader(HttpHeaders.TRANSFER_ENCODING));
assertEquals("application/json", request.getHeader(HttpHeaders.ACCEPT));
assertEquals("application/json", request.getHeader(HttpHeaders.CONTENT_TYPE));
}
@Test
public void shouldSendCookieHeader() throws Exception {
HttpUrl baseUrl = server.url("/test");
this.server.enqueue(new MockResponse()
.setHeader("Content-Type", "text/plain").setBody("test"));
Single<String> result = this.webClient
.perform(get(baseUrl.toString())
.cookie("testkey", "testvalue"))
.extract(body(String.class));
TestSubscriber<String> ts = new TestSubscriber<String>();
result.subscribe(ts);
ts.awaitTerminalEvent(2, TimeUnit.SECONDS);
String response = ts.getOnNextEvents().get(0);
assertEquals("test", response);
ts.assertValueCount(1);
ts.assertCompleted();
RecordedRequest request = server.takeRequest();
assertEquals(1, server.getRequestCount());
assertEquals("/test", request.getPath());
assertEquals("testkey=testvalue", request.getHeader(HttpHeaders.COOKIE));
}
@Test
public void shouldGetErrorWhen404() throws Exception {
HttpUrl baseUrl = server.url("/greeting?name=Spring");
this.server.enqueue(new MockResponse().setResponseCode(404)
.setHeader("Content-Type", "text/plain").setBody("Not Found"));
Single<String> result = this.webClient
.perform(get(baseUrl.toString()))
.extract(body(String.class));
TestSubscriber<String> ts = new TestSubscriber<String>();
result.subscribe(ts);
ts.awaitTerminalEvent(2, TimeUnit.SECONDS);
ts.assertError(WebClientErrorException.class);
WebClientErrorException exc = (WebClientErrorException) ts.getOnErrorEvents().get(0);
assertEquals(404, exc.getStatus().value());
assertEquals(MediaType.TEXT_PLAIN, exc.getResponseHeaders().getContentType());
RecordedRequest request = server.takeRequest();
assertEquals(1, server.getRequestCount());
assertEquals("*/*", request.getHeader(HttpHeaders.ACCEPT));
assertEquals("/greeting?name=Spring", request.getPath());
}
@After
public void tearDown() throws Exception {
this.server.shutdown();
}
}