Add ClientHttpConnector test suite

This commit introduces a test suite for ClientHttpConnector
implementations, as well as fixes that resolve issues identified by
these tests.

Closes gh-24941
This commit is contained in:
Arjen Poutsma 2020-04-17 14:19:06 +02:00
parent 6b1170b19a
commit 7bd524e9d7
6 changed files with 270 additions and 33 deletions

View File

@ -27,6 +27,7 @@ import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
import org.apache.hc.client5.http.protocol.HttpClientContext;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.HttpStreamResetException;
import org.apache.hc.core5.http.Message;
import org.apache.hc.core5.http.nio.AsyncRequestProducer;
import org.apache.hc.core5.reactive.ReactiveResponseConsumer;
@ -43,6 +44,7 @@ import org.springframework.util.Assert;
* {@link ClientHttpConnector} implementation for the Apache HttpComponents HttpClient 5.x.
*
* @author Martin Tarjányi
* @author Arjen Poutsma
* @since 5.3
* @see <a href="https://hc.apache.org/index.html">Apache HttpComponents</a>
*/
@ -148,7 +150,12 @@ public class HttpComponentsClientHttpConnector implements ClientHttpConnector {
@Override
public void failed(Exception ex) {
this.sink.error(ex);
Throwable t = ex;
if (t instanceof HttpStreamResetException) {
HttpStreamResetException httpStreamResetException = (HttpStreamResetException) ex;
t = httpStreamResetException.getCause();
}
this.sink.error(t);
}
@Override

View File

@ -20,6 +20,7 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.function.Function;
import org.apache.hc.client5.http.cookie.CookieStore;
import org.apache.hc.client5.http.impl.cookie.BasicClientCookie;
@ -39,13 +40,14 @@ import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import static org.springframework.http.MediaType.ALL_VALUE;
/**
* {@link ClientHttpRequest} implementation for the Apache HttpComponents HttpClient 5.x.
*
* @author Martin Tarjányi
* @author Arjen Poutsma
* @since 5.3
* @see <a href="https://hc.apache.org/index.html">Apache HttpComponents</a>
*/
@ -72,7 +74,9 @@ class HttpComponentsClientHttpRequest extends AbstractClientHttpRequest {
@Override
public HttpMethod getMethod() {
return HttpMethod.resolve(this.httpRequest.getMethod());
HttpMethod method = HttpMethod.resolve(this.httpRequest.getMethod());
Assert.state(method != null, "Method must not be null");
return method;
}
@Override
@ -100,7 +104,7 @@ class HttpComponentsClientHttpRequest extends AbstractClientHttpRequest {
@Override
public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
return writeWith(Flux.from(body).flatMap(p -> p));
return writeWith(Flux.from(body).flatMap(Function.identity()));
}
@Override

View File

@ -39,6 +39,7 @@ import org.springframework.util.MultiValueMap;
* {@link ClientHttpResponse} implementation for the Apache HttpComponents HttpClient 5.x.
*
* @author Martin Tarjányi
* @author Arjen Poutsma
* @since 5.3
* @see <a href="https://hc.apache.org/index.html">Apache HttpComponents</a>
*/

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -91,12 +91,13 @@ public class JettyClientHttpConnector implements ClientHttpConnector {
}
}
/**
* Set the buffer factory to be used.
*/
public void setBufferFactory(DataBufferFactory bufferFactory) {
this.bufferFactory = bufferFactory;
}
@Override
public Mono<ClientHttpResponse> connect(HttpMethod method, URI uri,
Function<? super ClientHttpRequest, Mono<Void>> requestCallback) {
@ -125,16 +126,7 @@ public class JettyClientHttpConnector implements ClientHttpConnector {
}
private DataBuffer toDataBuffer(ContentChunk chunk) {
// We must copy until this is resolved:
// https://github.com/eclipse/jetty.project/issues/2429
// Use copy instead of buffer wrapping because Callback#succeeded() is
// used not only to release the buffer but also to request more data
// which is a problem for codecs that buffer data.
DataBuffer buffer = this.bufferFactory.allocateBuffer(chunk.buffer.capacity());
buffer.write(chunk.buffer);
DataBuffer buffer = this.bufferFactory.wrap(chunk.buffer);
chunk.callback.succeeded();
return buffer;
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -26,14 +26,13 @@ import org.eclipse.jetty.reactive.client.ContentChunk;
import org.eclipse.jetty.reactive.client.ReactiveRequest;
import org.eclipse.jetty.util.Callback;
import org.reactivestreams.Publisher;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.PooledDataBuffer;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
@ -87,21 +86,18 @@ class JettyClientHttpRequest extends AbstractClientHttpRequest {
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
Flux<ContentChunk> chunks = Flux.from(body).map(this::toContentChunk);
ReactiveRequest.Content content = ReactiveRequest.Content.fromPublisher(chunks, getContentType());
this.reactiveRequest = ReactiveRequest.newBuilder(this.jettyRequest).content(content).build();
return doCommit(this::completes);
return Mono.<Void>create(sink -> {
Flux<ContentChunk> chunks = Flux.from(body).map(buffer -> toContentChunk(buffer, sink));
ReactiveRequest.Content content = ReactiveRequest.Content.fromPublisher(chunks, getContentType());
this.reactiveRequest = ReactiveRequest.newBuilder(this.jettyRequest).content(content).build();
sink.success();
})
.then(doCommit(this::completes));
}
@Override
public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
Flux<ContentChunk> chunks = Flux.from(body)
.flatMap(Function.identity())
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release)
.map(this::toContentChunk);
ReactiveRequest.Content content = ReactiveRequest.Content.fromPublisher(chunks, getContentType());
this.reactiveRequest = ReactiveRequest.newBuilder(this.jettyRequest).content(content).build();
return doCommit(this::completes);
return writeWith(Flux.from(body).flatMap(Function.identity()));
}
private String getContentType() {
@ -113,7 +109,7 @@ class JettyClientHttpRequest extends AbstractClientHttpRequest {
return Mono.empty();
}
private ContentChunk toContentChunk(DataBuffer buffer) {
private ContentChunk toContentChunk(DataBuffer buffer, MonoSink<Void> sink) {
return new ContentChunk(buffer.asByteBuffer(), new Callback() {
@Override
public void succeeded() {
@ -123,7 +119,7 @@ class JettyClientHttpRequest extends AbstractClientHttpRequest {
@Override
public void failed(Throwable x) {
DataBufferUtils.release(buffer);
throw Exceptions.propagate(x);
sink.error(x);
}
});
}

View File

@ -0,0 +1,237 @@
/*
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.client.reactive;
import java.io.IOException;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import okhttp3.mockwebserver.RecordedRequest;
import okio.Buffer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.ReactiveHttpOutputMessage;
import org.springframework.lang.NonNull;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
/**
* @author Arjen Poutsma
*/
public class ClientHttpConnectorTests {
private static final int BUF_SIZE = 1024;
private static final EnumSet<HttpMethod> METHODS_WITH_BODY =
EnumSet.of(HttpMethod.PUT, HttpMethod.POST, HttpMethod.PATCH);
private final MockWebServer server = new MockWebServer();
private final DataBufferFactory bufferFactory = new DefaultDataBufferFactory();
@BeforeEach
void startServer() throws IOException {
server.start();
}
@AfterEach
void stopServer() throws IOException {
server.shutdown();
}
@ParameterizedTest
@MethodSource("org.springframework.http.client.reactive.ClientHttpConnectorTests#methodsWithConnectors")
void basic(ClientHttpConnector connector, HttpMethod method) throws Exception {
URI uri = this.server.url("/").uri();
String responseBody = "bar\r\n";
prepareResponse(response -> {
response.setResponseCode(200);
response.addHeader("Baz", "Qux");
response.setBody(responseBody);
});
String requestBody = "foo\r\n";
boolean requestHasBody = METHODS_WITH_BODY.contains(method);
Mono<ClientHttpResponse> futureResponse = connector.connect(method, uri, request -> {
assertThat(request.getMethod()).isEqualTo(method);
assertThat(request.getURI()).isEqualTo(uri);
request.getHeaders().add("Foo", "Bar");
if (requestHasBody) {
Mono<DataBuffer> body = Mono.fromCallable(() -> {
byte[] bytes = requestBody.getBytes(StandardCharsets.UTF_8);
return this.bufferFactory.wrap(bytes);
});
return request.writeWith(body);
}
else {
return request.setComplete();
}
});
CountDownLatch latch = new CountDownLatch(1);
StepVerifier.create(futureResponse)
.assertNext(response -> {
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(response.getHeaders().getFirst("Baz")).isEqualTo("Qux");
DataBufferUtils.join(response.getBody())
.map(buffer -> {
String s = buffer.toString(StandardCharsets.UTF_8);
DataBufferUtils.release(buffer);
return s;
}).subscribe(
s -> assertThat(s).isEqualTo(responseBody),
throwable -> {
latch.countDown();
fail(throwable.getMessage(), throwable);
},
latch::countDown);
})
.verifyComplete();
latch.await();
expectRequest(request -> {
assertThat(request.getMethod()).isEqualTo(method.name());
assertThat(request.getHeader("Foo")).isEqualTo("Bar");
if (requestHasBody) {
assertThat(request.getBody().readUtf8()).isEqualTo(requestBody);
}
});
}
@ParameterizedConnectorTest
void errorInRequestBody(ClientHttpConnector connector) {
Exception error = new RuntimeException();
Flux<DataBuffer> body = Flux.concat(
stringBuffer("foo"),
Mono.error(error)
);
prepareResponse(response -> response.setResponseCode(200));
Mono<ClientHttpResponse> futureResponse =
connector.connect(HttpMethod.POST, this.server.url("/").uri(), request -> request.writeWith(body));
StepVerifier.create(futureResponse)
.expectErrorSatisfies(throwable -> assertThat(throwable).isSameAs(error))
.verify();
}
@ParameterizedConnectorTest
void cancelResponseBody(ClientHttpConnector connector) {
Buffer responseBody = randomBody(100);
prepareResponse(response -> response.setBody(responseBody));
ClientHttpResponse response = connector.connect(HttpMethod.POST, this.server.url("/").uri(),
ReactiveHttpOutputMessage::setComplete).block();
assertThat(response).isNotNull();
StepVerifier.create(response.getBody(), 1)
.expectNextCount(1)
.thenRequest(1)
.thenCancel()
.verify();
}
@NonNull
private Buffer randomBody(int size) {
Buffer responseBody = new Buffer();
Random rnd = new Random();
for (int i = 0; i < size; i++) {
byte[] bytes = new byte[BUF_SIZE];
rnd.nextBytes(bytes);
responseBody.write(bytes);
}
return responseBody;
}
private void prepareResponse(Consumer<MockResponse> consumer) {
MockResponse response = new MockResponse();
consumer.accept(response);
this.server.enqueue(response);
}
private void expectRequest(Consumer<RecordedRequest> consumer) {
try {
consumer.accept(this.server.takeRequest());
}
catch (InterruptedException ex) {
throw new IllegalStateException(ex);
}
}
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
@ParameterizedTest
@MethodSource("org.springframework.http.client.reactive.ClientHttpConnectorTests#connectors")
public @interface ParameterizedConnectorTest {
}
static List<ClientHttpConnector> connectors() {
return Arrays.asList(
new ReactorClientHttpConnector(),
new JettyClientHttpConnector(),
new HttpComponentsClientHttpConnector()
);
}
static List<Arguments> methodsWithConnectors() {
List<Arguments> result = new ArrayList<>();
for (ClientHttpConnector connector : connectors()) {
for (HttpMethod method : HttpMethod.values()) {
result.add(Arguments.of(connector, method));
}
}
return result;
}
private Mono<DataBuffer> stringBuffer(String value) {
return Mono.fromCallable(() -> {
byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
DataBuffer buffer = this.bufferFactory.allocateBuffer(bytes.length);
buffer.write(bytes);
return buffer;
});
}
}