parent
21de098756
commit
f35903f23d
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2019 the original author or authors.
|
||||
* Copyright 2002-2020 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
|
@ -24,6 +24,7 @@ import reactor.core.publisher.Flux;
|
|||
import org.springframework.core.ResolvableType;
|
||||
import org.springframework.core.io.buffer.DataBuffer;
|
||||
import org.springframework.core.io.buffer.DataBufferFactory;
|
||||
import org.springframework.core.io.buffer.DataBufferUtils;
|
||||
import org.springframework.core.io.buffer.PooledDataBuffer;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.util.MimeType;
|
||||
|
|
@ -51,7 +52,7 @@ public abstract class AbstractSingleValueEncoder<T> extends AbstractEncoder<T> {
|
|||
return Flux.from(inputStream)
|
||||
.take(1)
|
||||
.concatMap(value -> encode(value, bufferFactory, elementType, mimeType, hints))
|
||||
.doOnDiscard(PooledDataBuffer.class, PooledDataBuffer::release);
|
||||
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2019 the original author or authors.
|
||||
* Copyright 2002-2020 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
|
@ -39,6 +39,9 @@ import java.util.concurrent.atomic.AtomicLong;
|
|||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import io.netty.util.IllegalReferenceCountException;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.reactivestreams.Publisher;
|
||||
import org.reactivestreams.Subscription;
|
||||
import reactor.core.publisher.BaseSubscriber;
|
||||
|
|
@ -60,6 +63,8 @@ import org.springframework.util.Assert;
|
|||
*/
|
||||
public abstract class DataBufferUtils {
|
||||
|
||||
private final static Log logger = LogFactory.getLog(DataBufferUtils.class);
|
||||
|
||||
private static final Consumer<DataBuffer> RELEASE_CONSUMER = DataBufferUtils::release;
|
||||
|
||||
|
||||
|
|
@ -494,7 +499,15 @@ public abstract class DataBufferUtils {
|
|||
if (dataBuffer instanceof PooledDataBuffer) {
|
||||
PooledDataBuffer pooledDataBuffer = (PooledDataBuffer) dataBuffer;
|
||||
if (pooledDataBuffer.isAllocated()) {
|
||||
return pooledDataBuffer.release();
|
||||
try {
|
||||
return pooledDataBuffer.release();
|
||||
}
|
||||
catch (IllegalReferenceCountException ex) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("RefCount already at 0", ex);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
|
|
@ -523,7 +536,6 @@ public abstract class DataBufferUtils {
|
|||
* @return a buffer that is composed from the {@code dataBuffers} argument
|
||||
* @since 5.0.3
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public static Mono<DataBuffer> join(Publisher<? extends DataBuffer> dataBuffers) {
|
||||
return join(dataBuffers, -1);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -29,6 +29,7 @@ import org.springframework.core.codec.AbstractEncoder;
|
|||
import org.springframework.core.codec.Encoder;
|
||||
import org.springframework.core.codec.Hints;
|
||||
import org.springframework.core.io.buffer.DataBuffer;
|
||||
import org.springframework.core.io.buffer.DataBufferUtils;
|
||||
import org.springframework.core.io.buffer.PooledDataBuffer;
|
||||
import org.springframework.http.HttpLogging;
|
||||
import org.springframework.http.MediaType;
|
||||
|
|
@ -126,13 +127,13 @@ public class EncoderHttpMessageWriter<T> implements HttpMessageWriter<T> {
|
|||
.flatMap(buffer -> {
|
||||
message.getHeaders().setContentLength(buffer.readableByteCount());
|
||||
return message.writeWith(Mono.just(buffer)
|
||||
.doOnDiscard(PooledDataBuffer.class, PooledDataBuffer::release));
|
||||
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release));
|
||||
});
|
||||
}
|
||||
|
||||
if (isStreamingMediaType(contentType)) {
|
||||
return message.writeAndFlushWith(body.map(buffer ->
|
||||
Mono.just(buffer).doOnDiscard(PooledDataBuffer.class, PooledDataBuffer::release)));
|
||||
Mono.just(buffer).doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release)));
|
||||
}
|
||||
|
||||
return message.writeWith(body);
|
||||
|
|
|
|||
|
|
@ -40,6 +40,7 @@ import org.springframework.core.codec.Hints;
|
|||
import org.springframework.core.io.Resource;
|
||||
import org.springframework.core.io.buffer.DataBuffer;
|
||||
import org.springframework.core.io.buffer.DataBufferFactory;
|
||||
import org.springframework.core.io.buffer.DataBufferUtils;
|
||||
import org.springframework.core.io.buffer.PooledDataBuffer;
|
||||
import org.springframework.core.log.LogFormatUtils;
|
||||
import org.springframework.http.HttpEntity;
|
||||
|
|
@ -247,7 +248,7 @@ public class MultipartHttpMessageWriter extends LoggingCodecSupport
|
|||
Flux<DataBuffer> body = Flux.fromIterable(map.entrySet())
|
||||
.concatMap(entry -> encodePartValues(boundary, entry.getKey(), entry.getValue(), bufferFactory))
|
||||
.concatWith(generateLastLine(boundary, bufferFactory))
|
||||
.doOnDiscard(PooledDataBuffer.class, PooledDataBuffer::release);
|
||||
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
|
||||
|
||||
return outputMessage.writeWith(body);
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue