Polishing
This commit is contained in:
parent
45c20e34e4
commit
cf75a09011
|
@ -330,7 +330,7 @@ public class BeanPropertyRowMapper<T> implements Function<Readable, T> {
|
|||
bw.setPropertyValue(pd.getName(), value);
|
||||
}
|
||||
catch (TypeMismatchException ex) {
|
||||
if (value == null && this.primitivesDefaultedForNullValue) {
|
||||
if (value == null && isPrimitivesDefaultedForNullValue()) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
String propertyType = ClassUtils.getQualifiedName(pd.getPropertyType());
|
||||
//here too, we miss the rowNumber information
|
||||
|
|
|
@ -51,25 +51,6 @@ class JdkClientHttpRequest extends AbstractStreamingClientHttpRequest {
|
|||
|
||||
private static final Set<String> DISALLOWED_HEADERS = disallowedHeaders();
|
||||
|
||||
/**
|
||||
* By default, {@link HttpRequest} does not allow {@code Connection},
|
||||
* {@code Content-Length}, {@code Expect}, {@code Host}, or {@code Upgrade}
|
||||
* headers to be set, but this can be overriden with the
|
||||
* {@code jdk.httpclient.allowRestrictedHeaders} system property.
|
||||
* @see jdk.internal.net.http.common.Utils#getDisallowedHeaders()
|
||||
*/
|
||||
private static Set<String> disallowedHeaders() {
|
||||
TreeSet<String> headers = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
|
||||
headers.addAll(Set.of("connection", "content-length", "expect", "host", "upgrade"));
|
||||
|
||||
String headersToAllow = System.getProperty("jdk.httpclient.allowRestrictedHeaders");
|
||||
if (headersToAllow != null) {
|
||||
Set<String> toAllow = StringUtils.commaDelimitedListToSet(headersToAllow);
|
||||
headers.removeAll(toAllow);
|
||||
}
|
||||
return Collections.unmodifiableSet(headers);
|
||||
}
|
||||
|
||||
|
||||
private final HttpClient httpClient;
|
||||
|
||||
|
@ -85,6 +66,7 @@ class JdkClientHttpRequest extends AbstractStreamingClientHttpRequest {
|
|||
|
||||
public JdkClientHttpRequest(HttpClient httpClient, URI uri, HttpMethod method, Executor executor,
|
||||
@Nullable Duration readTimeout) {
|
||||
|
||||
this.httpClient = httpClient;
|
||||
this.uri = uri;
|
||||
this.method = method;
|
||||
|
@ -92,6 +74,7 @@ class JdkClientHttpRequest extends AbstractStreamingClientHttpRequest {
|
|||
this.timeout = readTimeout;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public HttpMethod getMethod() {
|
||||
return this.method;
|
||||
|
@ -107,7 +90,8 @@ class JdkClientHttpRequest extends AbstractStreamingClientHttpRequest {
|
|||
protected ClientHttpResponse executeInternal(HttpHeaders headers, @Nullable Body body) throws IOException {
|
||||
try {
|
||||
HttpRequest request = buildRequest(headers, body);
|
||||
HttpResponse<InputStream> response = this.httpClient.send(request, HttpResponse.BodyHandlers.ofInputStream());
|
||||
HttpResponse<InputStream> response =
|
||||
this.httpClient.send(request, HttpResponse.BodyHandlers.ofInputStream());
|
||||
return new JdkClientHttpResponse(response);
|
||||
}
|
||||
catch (UncheckedIOException ex) {
|
||||
|
@ -121,9 +105,7 @@ class JdkClientHttpRequest extends AbstractStreamingClientHttpRequest {
|
|||
|
||||
|
||||
private HttpRequest buildRequest(HttpHeaders headers, @Nullable Body body) {
|
||||
HttpRequest.Builder builder = HttpRequest.newBuilder()
|
||||
.uri(this.uri);
|
||||
|
||||
HttpRequest.Builder builder = HttpRequest.newBuilder().uri(this.uri);
|
||||
if (this.timeout != null) {
|
||||
builder.timeout(this.timeout);
|
||||
}
|
||||
|
@ -144,8 +126,7 @@ class JdkClientHttpRequest extends AbstractStreamingClientHttpRequest {
|
|||
if (body != null) {
|
||||
Flow.Publisher<ByteBuffer> outputStreamPublisher = OutputStreamPublisher.create(
|
||||
outputStream -> body.writeTo(StreamUtils.nonClosing(outputStream)),
|
||||
BYTE_MAPPER,
|
||||
this.executor);
|
||||
BYTE_MAPPER, this.executor);
|
||||
|
||||
long contentLength = headers.getContentLength();
|
||||
if (contentLength != -1) {
|
||||
|
@ -160,6 +141,25 @@ class JdkClientHttpRequest extends AbstractStreamingClientHttpRequest {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* By default, {@link HttpRequest} does not allow {@code Connection},
|
||||
* {@code Content-Length}, {@code Expect}, {@code Host}, or {@code Upgrade}
|
||||
* headers to be set, but this can be overriden with the
|
||||
* {@code jdk.httpclient.allowRestrictedHeaders} system property.
|
||||
* @see jdk.internal.net.http.common.Utils#getDisallowedHeaders()
|
||||
*/
|
||||
private static Set<String> disallowedHeaders() {
|
||||
TreeSet<String> headers = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
|
||||
headers.addAll(Set.of("connection", "content-length", "expect", "host", "upgrade"));
|
||||
|
||||
String headersToAllow = System.getProperty("jdk.httpclient.allowRestrictedHeaders");
|
||||
if (headersToAllow != null) {
|
||||
Set<String> toAllow = StringUtils.commaDelimitedListToSet(headersToAllow);
|
||||
headers.removeAll(toAllow);
|
||||
}
|
||||
return Collections.unmodifiableSet(headers);
|
||||
}
|
||||
|
||||
|
||||
private static final class ByteBufferMapper implements OutputStreamPublisher.ByteMapper<ByteBuffer> {
|
||||
|
||||
|
@ -178,7 +178,6 @@ class JdkClientHttpRequest extends AbstractStreamingClientHttpRequest {
|
|||
byteBuffer.flip();
|
||||
return byteBuffer;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -27,7 +27,6 @@ import org.springframework.http.HttpMethod;
|
|||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
|
||||
/**
|
||||
* {@link ClientHttpRequestFactory} implementation based on the Java
|
||||
* {@link HttpClient}.
|
||||
|
|
|
@ -52,7 +52,7 @@ class JdkClientHttpResponse implements ClientHttpResponse {
|
|||
this.response = response;
|
||||
this.headers = adaptHeaders(response);
|
||||
InputStream inputStream = response.body();
|
||||
this.body = (inputStream != null) ? inputStream : InputStream.nullInputStream();
|
||||
this.body = (inputStream != null ? inputStream : InputStream.nullInputStream());
|
||||
}
|
||||
|
||||
private static HttpHeaders adaptHeaders(HttpResponse<?> response) {
|
||||
|
@ -103,4 +103,5 @@ class JdkClientHttpResponse implements ClientHttpResponse {
|
|||
catch (IOException ignored) {
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -141,12 +141,10 @@ final class ReactorNettyClientRequest extends AbstractStreamingClientHttpRequest
|
|||
|
||||
private final ByteBufAllocator allocator;
|
||||
|
||||
|
||||
public ByteBufMapper(ByteBufAllocator allocator) {
|
||||
this.allocator = allocator;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public ByteBuf map(int b) {
|
||||
ByteBuf byteBuf = this.allocator.buffer(1);
|
||||
|
@ -161,4 +159,5 @@ final class ReactorNettyClientRequest extends AbstractStreamingClientHttpRequest
|
|||
return byteBuf;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -36,13 +36,11 @@ public class ReactorNettyClientRequestFactory implements ClientHttpRequestFactor
|
|||
|
||||
private final HttpClient httpClient;
|
||||
|
||||
|
||||
private Duration exchangeTimeout = Duration.ofSeconds(5);
|
||||
|
||||
private Duration readTimeout = Duration.ofSeconds(10);
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Create a new instance of the {@code ReactorNettyClientRequestFactory}
|
||||
* with a default {@link HttpClient} that has compression enabled.
|
||||
|
@ -61,6 +59,7 @@ public class ReactorNettyClientRequestFactory implements ClientHttpRequestFactor
|
|||
this.httpClient = httpClient;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Set the underlying connect timeout in milliseconds.
|
||||
* A value of 0 specifies an infinite timeout.
|
||||
|
@ -125,9 +124,9 @@ public class ReactorNettyClientRequestFactory implements ClientHttpRequestFactor
|
|||
}
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public ClientHttpRequest createRequest(URI uri, HttpMethod httpMethod) throws IOException {
|
||||
return new ReactorNettyClientRequest(this.httpClient, uri, httpMethod, this.exchangeTimeout, this.readTimeout);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -48,7 +48,6 @@ final class ReactorNettyClientResponse implements ClientHttpResponse {
|
|||
private volatile InputStream body;
|
||||
|
||||
|
||||
|
||||
public ReactorNettyClientResponse(HttpClientResponse response, Connection connection, Duration readTimeout) {
|
||||
this.response = response;
|
||||
this.connection = connection;
|
||||
|
@ -56,6 +55,7 @@ final class ReactorNettyClientResponse implements ClientHttpResponse {
|
|||
this.headers = HttpHeaders.readOnlyHttpHeaders(new Netty4HeadersAdapter(response.responseHeaders()));
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public HttpStatusCode getStatusCode() {
|
||||
return HttpStatusCode.valueOf(this.response.status().code());
|
||||
|
@ -73,21 +73,23 @@ final class ReactorNettyClientResponse implements ClientHttpResponse {
|
|||
|
||||
@Override
|
||||
public InputStream getBody() throws IOException {
|
||||
if (this.body == null) {
|
||||
InputStream body = this.connection.inbound().receive()
|
||||
.aggregate().asInputStream().block(this.readTimeout);
|
||||
if (body != null) {
|
||||
this.body = body;
|
||||
}
|
||||
else {
|
||||
throw new IOException("Could not receive body");
|
||||
}
|
||||
InputStream body = this.body;
|
||||
if (body != null) {
|
||||
return body;
|
||||
}
|
||||
return this.body;
|
||||
|
||||
body = this.connection.inbound().receive()
|
||||
.aggregate().asInputStream().block(this.readTimeout);
|
||||
if (body == null) {
|
||||
throw new IOException("Could not receive body");
|
||||
}
|
||||
this.body = body;
|
||||
return body;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
this.connection.dispose();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue