Merge branch '5.2.x'

This commit is contained in:
Rossen Stoyanchev 2020-06-19 22:09:43 +01:00
commit 78d1591e2d
6 changed files with 77 additions and 36 deletions

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2019 the original author or authors. * Copyright 2002-2020 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -24,6 +24,7 @@ import reactor.core.publisher.Flux;
import org.springframework.core.ResolvableType; import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.PooledDataBuffer; import org.springframework.core.io.buffer.PooledDataBuffer;
import org.springframework.lang.Nullable; import org.springframework.lang.Nullable;
import org.springframework.util.MimeType; import org.springframework.util.MimeType;
@ -51,7 +52,7 @@ public abstract class AbstractSingleValueEncoder<T> extends AbstractEncoder<T> {
return Flux.from(inputStream) return Flux.from(inputStream)
.take(1) .take(1)
.concatMap(value -> encode(value, bufferFactory, elementType, mimeType, hints)) .concatMap(value -> encode(value, bufferFactory, elementType, mimeType, hints))
.doOnDiscard(PooledDataBuffer.class, PooledDataBuffer::release); .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
} }
/** /**

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2019 the original author or authors. * Copyright 2002-2020 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -39,6 +39,9 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer; import java.util.function.Consumer;
import io.netty.util.IllegalReferenceCountException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription; import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber; import reactor.core.publisher.BaseSubscriber;
@ -60,6 +63,8 @@ import org.springframework.util.Assert;
*/ */
public abstract class DataBufferUtils { public abstract class DataBufferUtils {
private final static Log logger = LogFactory.getLog(DataBufferUtils.class);
private static final Consumer<DataBuffer> RELEASE_CONSUMER = DataBufferUtils::release; private static final Consumer<DataBuffer> RELEASE_CONSUMER = DataBufferUtils::release;
@ -494,7 +499,15 @@ public abstract class DataBufferUtils {
if (dataBuffer instanceof PooledDataBuffer) { if (dataBuffer instanceof PooledDataBuffer) {
PooledDataBuffer pooledDataBuffer = (PooledDataBuffer) dataBuffer; PooledDataBuffer pooledDataBuffer = (PooledDataBuffer) dataBuffer;
if (pooledDataBuffer.isAllocated()) { if (pooledDataBuffer.isAllocated()) {
return pooledDataBuffer.release(); try {
return pooledDataBuffer.release();
}
catch (IllegalReferenceCountException ex) {
if (logger.isDebugEnabled()) {
logger.debug("RefCount already at 0", ex);
}
return false;
}
} }
} }
return false; return false;
@ -523,7 +536,6 @@ public abstract class DataBufferUtils {
* @return a buffer that is composed from the {@code dataBuffers} argument * @return a buffer that is composed from the {@code dataBuffers} argument
* @since 5.0.3 * @since 5.0.3
*/ */
@SuppressWarnings("unchecked")
public static Mono<DataBuffer> join(Publisher<? extends DataBuffer> dataBuffers) { public static Mono<DataBuffer> join(Publisher<? extends DataBuffer> dataBuffers) {
return join(dataBuffers, -1); return join(dataBuffers, -1);
} }

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2018 the original author or authors. * Copyright 2002-2020 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -17,15 +17,13 @@
package org.springframework.http.client.reactive; package org.springframework.http.client.reactive;
import java.net.URI; import java.net.URI;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function; import java.util.function.Function;
import io.netty.buffer.ByteBufAllocator;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.netty.NettyInbound;
import reactor.netty.NettyOutbound; import reactor.netty.NettyOutbound;
import reactor.netty.http.client.HttpClient; import reactor.netty.http.client.HttpClient;
import reactor.netty.http.client.HttpClientRequest; import reactor.netty.http.client.HttpClientRequest;
import reactor.netty.http.client.HttpClientResponse;
import reactor.netty.resources.ConnectionProvider; import reactor.netty.resources.ConnectionProvider;
import reactor.netty.resources.LoopResources; import reactor.netty.resources.LoopResources;
@ -104,12 +102,23 @@ public class ReactorClientHttpConnector implements ClientHttpConnector {
return Mono.error(new IllegalArgumentException("URI is not absolute: " + uri)); return Mono.error(new IllegalArgumentException("URI is not absolute: " + uri));
} }
AtomicReference<ReactorClientHttpResponse> responseRef = new AtomicReference<>();
return this.httpClient return this.httpClient
.request(io.netty.handler.codec.http.HttpMethod.valueOf(method.name())) .request(io.netty.handler.codec.http.HttpMethod.valueOf(method.name()))
.uri(uri.toString()) .uri(uri.toString())
.send((request, outbound) -> requestCallback.apply(adaptRequest(method, uri, request, outbound))) .send((request, outbound) -> requestCallback.apply(adaptRequest(method, uri, request, outbound)))
.responseConnection((res, con) -> Mono.just(adaptResponse(res, con.inbound(), con.outbound().alloc()))) .responseConnection((response, connection) -> {
.next(); responseRef.set(new ReactorClientHttpResponse(response, connection));
return Mono.just((ClientHttpResponse) responseRef.get());
})
.next()
.doOnCancel(() -> {
ReactorClientHttpResponse response = responseRef.get();
if (response != null && response.bodyNotSubscribed()) {
response.getConnection().dispose();
}
});
} }
private ReactorClientHttpRequest adaptRequest(HttpMethod method, URI uri, HttpClientRequest request, private ReactorClientHttpRequest adaptRequest(HttpMethod method, URI uri, HttpClientRequest request,
@ -118,10 +127,4 @@ public class ReactorClientHttpConnector implements ClientHttpConnector {
return new ReactorClientHttpRequest(method, uri, request, nettyOutbound); return new ReactorClientHttpRequest(method, uri, request, nettyOutbound);
} }
private ClientHttpResponse adaptResponse(HttpClientResponse response, NettyInbound nettyInbound,
ByteBufAllocator allocator) {
return new ReactorClientHttpResponse(response, nettyInbound, allocator);
}
} }

View File

@ -17,10 +17,11 @@
package org.springframework.http.client.reactive; package org.springframework.http.client.reactive;
import java.util.Collection; import java.util.Collection;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import io.netty.buffer.ByteBufAllocator;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.netty.Connection;
import reactor.netty.NettyInbound; import reactor.netty.NettyInbound;
import reactor.netty.http.client.HttpClientResponse; import reactor.netty.http.client.HttpClientResponse;
@ -48,16 +49,24 @@ class ReactorClientHttpResponse implements ClientHttpResponse {
private final NettyDataBufferFactory bufferFactory; private final NettyDataBufferFactory bufferFactory;
private final Connection connection;
private final HttpHeaders headers; private final HttpHeaders headers;
private final AtomicBoolean rejectSubscribers = new AtomicBoolean(); // 0 - not subscribed, 1 - subscribed, 2 - cancelled
private final AtomicInteger state = new AtomicInteger(0);
public ReactorClientHttpResponse(HttpClientResponse response, NettyInbound inbound, ByteBufAllocator alloc) { /**
* Constructor that matches the inputs from
* {@link reactor.netty.http.client.HttpClient.ResponseReceiver#responseConnection(BiFunction)}.
* @since 5.3
*/
public ReactorClientHttpResponse(HttpClientResponse response, Connection connection) {
this.response = response; this.response = response;
this.inbound = inbound; this.inbound = connection.inbound();
this.bufferFactory = new NettyDataBufferFactory(alloc); this.bufferFactory = new NettyDataBufferFactory(connection.outbound().alloc());
this.connection = connection;
MultiValueMap<String, String> adapter = new NettyHeadersAdapter(response.responseHeaders()); MultiValueMap<String, String> adapter = new NettyHeadersAdapter(response.responseHeaders());
this.headers = HttpHeaders.readOnlyHttpHeaders(adapter); this.headers = HttpHeaders.readOnlyHttpHeaders(adapter);
} }
@ -67,17 +76,17 @@ class ReactorClientHttpResponse implements ClientHttpResponse {
public Flux<DataBuffer> getBody() { public Flux<DataBuffer> getBody() {
return this.inbound.receive() return this.inbound.receive()
.doOnSubscribe(s -> { .doOnSubscribe(s -> {
if (this.rejectSubscribers.get()) { if (!this.state.compareAndSet(0, 1)) {
throw new IllegalStateException("The client response body can only be consumed once."); // https://github.com/reactor/reactor-netty/issues/503
// FluxReceive rejects multiple subscribers, but not after a cancel().
// Subsequent subscribers after cancel() will not be rejected, but will hang instead.
// So we need to reject once in cancelled state.
if (this.state.get() == 2) {
throw new IllegalStateException("The client response body can only be consumed once.");
}
} }
}) })
.doOnCancel(() -> .doOnCancel(() -> this.state.compareAndSet(1, 2))
// https://github.com/reactor/reactor-netty/issues/503
// FluxReceive rejects multiple subscribers, but not after a cancel().
// Subsequent subscribers after cancel() will not be rejected, but will hang instead.
// So we need to intercept and reject them in that case.
this.rejectSubscribers.set(true)
)
.map(byteBuf -> { .map(byteBuf -> {
byteBuf.retain(); byteBuf.retain();
return this.bufferFactory.wrap(byteBuf); return this.bufferFactory.wrap(byteBuf);
@ -114,6 +123,20 @@ class ReactorClientHttpResponse implements ClientHttpResponse {
return CollectionUtils.unmodifiableMultiValueMap(result); return CollectionUtils.unmodifiableMultiValueMap(result);
} }
/**
* For use by {@link ReactorClientHttpConnector}.
*/
boolean bodyNotSubscribed() {
return this.state.get() == 0;
}
/**
* For use by {@link ReactorClientHttpConnector}.
*/
Connection getConnection() {
return this.connection;
}
@Override @Override
public String toString() { public String toString() {
return "ReactorClientHttpResponse{" + return "ReactorClientHttpResponse{" +

View File

@ -29,6 +29,7 @@ import org.springframework.core.codec.AbstractEncoder;
import org.springframework.core.codec.Encoder; import org.springframework.core.codec.Encoder;
import org.springframework.core.codec.Hints; import org.springframework.core.codec.Hints;
import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.PooledDataBuffer; import org.springframework.core.io.buffer.PooledDataBuffer;
import org.springframework.http.HttpLogging; import org.springframework.http.HttpLogging;
import org.springframework.http.MediaType; import org.springframework.http.MediaType;
@ -126,13 +127,13 @@ public class EncoderHttpMessageWriter<T> implements HttpMessageWriter<T> {
.flatMap(buffer -> { .flatMap(buffer -> {
message.getHeaders().setContentLength(buffer.readableByteCount()); message.getHeaders().setContentLength(buffer.readableByteCount());
return message.writeWith(Mono.just(buffer) return message.writeWith(Mono.just(buffer)
.doOnDiscard(PooledDataBuffer.class, PooledDataBuffer::release)); .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release));
}); });
} }
if (isStreamingMediaType(contentType)) { if (isStreamingMediaType(contentType)) {
return message.writeAndFlushWith(body.map(buffer -> return message.writeAndFlushWith(body.map(buffer ->
Mono.just(buffer).doOnDiscard(PooledDataBuffer.class, PooledDataBuffer::release))); Mono.just(buffer).doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release)));
} }
return message.writeWith(body); return message.writeWith(body);

View File

@ -38,6 +38,7 @@ import org.springframework.core.codec.Hints;
import org.springframework.core.io.Resource; import org.springframework.core.io.Resource;
import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.PooledDataBuffer; import org.springframework.core.io.buffer.PooledDataBuffer;
import org.springframework.core.log.LogFormatUtils; import org.springframework.core.log.LogFormatUtils;
import org.springframework.http.HttpEntity; import org.springframework.http.HttpEntity;
@ -206,7 +207,7 @@ public class MultipartHttpMessageWriter extends MultipartWriterSupport
Flux<DataBuffer> body = Flux.fromIterable(map.entrySet()) Flux<DataBuffer> body = Flux.fromIterable(map.entrySet())
.concatMap(entry -> encodePartValues(boundary, entry.getKey(), entry.getValue(), bufferFactory)) .concatMap(entry -> encodePartValues(boundary, entry.getKey(), entry.getValue(), bufferFactory))
.concatWith(generateLastLine(boundary, bufferFactory)) .concatWith(generateLastLine(boundary, bufferFactory))
.doOnDiscard(PooledDataBuffer.class, PooledDataBuffer::release); .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
return outputMessage.writeWith(body); return outputMessage.writeWith(body);
} }