Merge branch '5.1.x'

This commit is contained in:
Rossen Stoyanchev 2019-04-16 21:08:54 -04:00
commit de3238dbea
15 changed files with 112 additions and 141 deletions

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2018 the original author or authors. * Copyright 2002-2019 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -16,6 +16,7 @@
package org.springframework.core.codec; package org.springframework.core.codec;
import java.time.Duration;
import java.util.Map; import java.util.Map;
import java.util.function.Consumer; import java.util.function.Consumer;
@ -32,6 +33,8 @@ import org.springframework.lang.Nullable;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import org.springframework.util.MimeType; import org.springframework.util.MimeType;
import static org.junit.Assert.*;
/** /**
* Abstract base class for {@link Decoder} unit tests. Subclasses need to implement * Abstract base class for {@link Decoder} unit tests. Subclasses need to implement
* {@link #canDecode()}, {@link #decode()} and {@link #decodeToMono()}, possibly using the wide * {@link #canDecode()}, {@link #decode()} and {@link #decodeToMono()}, possibly using the wide
@ -99,6 +102,7 @@ public abstract class AbstractDecoderTestCase<D extends Decoder<?>>
*/ */
protected <T> void testDecodeAll(Publisher<DataBuffer> input, Class<? extends T> outputClass, protected <T> void testDecodeAll(Publisher<DataBuffer> input, Class<? extends T> outputClass,
Consumer<StepVerifier.FirstStep<T>> stepConsumer) { Consumer<StepVerifier.FirstStep<T>> stepConsumer) {
testDecodeAll(input, ResolvableType.forClass(outputClass), stepConsumer, null, null); testDecodeAll(input, ResolvableType.forClass(outputClass), stepConsumer, null, null);
} }
@ -122,6 +126,7 @@ public abstract class AbstractDecoderTestCase<D extends Decoder<?>>
protected <T> void testDecodeAll(Publisher<DataBuffer> input, ResolvableType outputType, protected <T> void testDecodeAll(Publisher<DataBuffer> input, ResolvableType outputType,
Consumer<StepVerifier.FirstStep<T>> stepConsumer, Consumer<StepVerifier.FirstStep<T>> stepConsumer,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) { @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
testDecode(input, outputType, stepConsumer, mimeType, hints); testDecode(input, outputType, stepConsumer, mimeType, hints);
testDecodeError(input, outputType, mimeType, hints); testDecodeError(input, outputType, mimeType, hints);
testDecodeCancel(input, outputType, mimeType, hints); testDecodeCancel(input, outputType, mimeType, hints);
@ -151,6 +156,7 @@ public abstract class AbstractDecoderTestCase<D extends Decoder<?>>
*/ */
protected <T> void testDecode(Publisher<DataBuffer> input, Class<? extends T> outputClass, protected <T> void testDecode(Publisher<DataBuffer> input, Class<? extends T> outputClass,
Consumer<StepVerifier.FirstStep<T>> stepConsumer) { Consumer<StepVerifier.FirstStep<T>> stepConsumer) {
testDecode(input, ResolvableType.forClass(outputClass), stepConsumer, null, null); testDecode(input, ResolvableType.forClass(outputClass), stepConsumer, null, null);
} }
@ -202,16 +208,14 @@ public abstract class AbstractDecoderTestCase<D extends Decoder<?>>
protected void testDecodeError(Publisher<DataBuffer> input, ResolvableType outputType, protected void testDecodeError(Publisher<DataBuffer> input, ResolvableType outputType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) { @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
input = Flux.concat( input = Mono.from(input).concatWith(Flux.error(new InputException()));
Flux.from(input).take(1), try {
Flux.error(new InputException())); this.decoder.decode(input, outputType, mimeType, hints).blockLast(Duration.ofSeconds(5));
fail();
Flux<?> result = this.decoder.decode(input, outputType, mimeType, hints); }
catch (InputException ex) {
StepVerifier.create(result) // expected
.expectNextCount(1) }
.expectError(InputException.class)
.verify();
} }
/** /**
@ -229,11 +233,7 @@ public abstract class AbstractDecoderTestCase<D extends Decoder<?>>
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) { @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
Flux<?> result = this.decoder.decode(input, outputType, mimeType, hints); Flux<?> result = this.decoder.decode(input, outputType, mimeType, hints);
StepVerifier.create(result).expectNextCount(1).thenCancel().verify();
StepVerifier.create(result)
.expectNextCount(1)
.thenCancel()
.verify();
} }
/** /**
@ -249,9 +249,7 @@ public abstract class AbstractDecoderTestCase<D extends Decoder<?>>
Flux<DataBuffer> input = Flux.empty(); Flux<DataBuffer> input = Flux.empty();
Flux<?> result = this.decoder.decode(input, outputType, mimeType, hints); Flux<?> result = this.decoder.decode(input, outputType, mimeType, hints);
StepVerifier.create(result).verifyComplete();
StepVerifier.create(result)
.verifyComplete();
} }
// Mono // Mono
@ -297,6 +295,7 @@ public abstract class AbstractDecoderTestCase<D extends Decoder<?>>
protected <T> void testDecodeToMonoAll(Publisher<DataBuffer> input, ResolvableType outputType, protected <T> void testDecodeToMonoAll(Publisher<DataBuffer> input, ResolvableType outputType,
Consumer<StepVerifier.FirstStep<T>> stepConsumer, Consumer<StepVerifier.FirstStep<T>> stepConsumer,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) { @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
testDecodeToMono(input, outputType, stepConsumer, mimeType, hints); testDecodeToMono(input, outputType, stepConsumer, mimeType, hints);
testDecodeToMonoError(input, outputType, mimeType, hints); testDecodeToMonoError(input, outputType, mimeType, hints);
testDecodeToMonoCancel(input, outputType, mimeType, hints); testDecodeToMonoCancel(input, outputType, mimeType, hints);
@ -326,6 +325,7 @@ public abstract class AbstractDecoderTestCase<D extends Decoder<?>>
*/ */
protected <T> void testDecodeToMono(Publisher<DataBuffer> input, protected <T> void testDecodeToMono(Publisher<DataBuffer> input,
Class<? extends T> outputClass, Consumer<StepVerifier.FirstStep<T>> stepConsumer) { Class<? extends T> outputClass, Consumer<StepVerifier.FirstStep<T>> stepConsumer) {
testDecodeToMono(input, ResolvableType.forClass(outputClass), stepConsumer, null, null); testDecodeToMono(input, ResolvableType.forClass(outputClass), stepConsumer, null, null);
} }
@ -377,15 +377,9 @@ public abstract class AbstractDecoderTestCase<D extends Decoder<?>>
protected void testDecodeToMonoError(Publisher<DataBuffer> input, ResolvableType outputType, protected void testDecodeToMonoError(Publisher<DataBuffer> input, ResolvableType outputType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) { @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
input = Flux.concat( input = Mono.from(input).concatWith(Flux.error(new InputException()));
Flux.from(input).take(1),
Flux.error(new InputException()));
Mono<?> result = this.decoder.decodeToMono(input, outputType, mimeType, hints); Mono<?> result = this.decoder.decodeToMono(input, outputType, mimeType, hints);
StepVerifier.create(result).expectError(InputException.class).verify();
StepVerifier.create(result)
.expectError(InputException.class)
.verify();
} }
/** /**
@ -401,10 +395,7 @@ public abstract class AbstractDecoderTestCase<D extends Decoder<?>>
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) { @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
Mono<?> result = this.decoder.decodeToMono(input, outputType, mimeType, hints); Mono<?> result = this.decoder.decodeToMono(input, outputType, mimeType, hints);
StepVerifier.create(result).thenCancel().verify();
StepVerifier.create(result)
.thenCancel()
.verify();
} }
/** /**
@ -418,11 +409,8 @@ public abstract class AbstractDecoderTestCase<D extends Decoder<?>>
protected void testDecodeToMonoEmpty(ResolvableType outputType, @Nullable MimeType mimeType, protected void testDecodeToMonoEmpty(ResolvableType outputType, @Nullable MimeType mimeType,
@Nullable Map<String, Object> hints) { @Nullable Map<String, Object> hints) {
Flux<DataBuffer> input = Flux.empty(); Mono<?> result = this.decoder.decodeToMono(Flux.empty(), outputType, mimeType, hints);
Mono<?> result = this.decoder.decodeToMono(input, outputType, mimeType, hints); StepVerifier.create(result).verifyComplete();
StepVerifier.create(result)
.verifyComplete();
} }
/** /**
@ -431,10 +419,10 @@ public abstract class AbstractDecoderTestCase<D extends Decoder<?>>
* @return the deferred buffer * @return the deferred buffer
*/ */
protected Mono<DataBuffer> dataBuffer(byte[] bytes) { protected Mono<DataBuffer> dataBuffer(byte[] bytes) {
return Mono.defer(() -> { return Mono.fromCallable(() -> {
DataBuffer dataBuffer = this.bufferFactory.allocateBuffer(bytes.length); DataBuffer dataBuffer = this.bufferFactory.allocateBuffer(bytes.length);
dataBuffer.write(bytes); dataBuffer.write(bytes);
return Mono.just(dataBuffer); return dataBuffer;
}); });
} }
@ -442,9 +430,6 @@ public abstract class AbstractDecoderTestCase<D extends Decoder<?>>
* Exception used in {@link #testDecodeError} and {@link #testDecodeToMonoError} * Exception used in {@link #testDecodeError} and {@link #testDecodeToMonoError}
*/ */
@SuppressWarnings("serial") @SuppressWarnings("serial")
public static class InputException extends RuntimeException { public static class InputException extends RuntimeException {}
}
} }

View File

@ -19,25 +19,20 @@ package org.springframework.core.codec;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Collections; import java.util.Collections;
import java.util.Map;
import org.junit.Test; import org.junit.Test;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
import org.springframework.core.ResolvableType; import org.springframework.core.ResolvableType;
import org.springframework.core.io.ByteArrayResource; import org.springframework.core.io.ByteArrayResource;
import org.springframework.core.io.InputStreamResource; import org.springframework.core.io.InputStreamResource;
import org.springframework.core.io.Resource; import org.springframework.core.io.Resource;
import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.lang.Nullable;
import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils; import org.springframework.util.MimeTypeUtils;
import org.springframework.util.StreamUtils; import org.springframework.util.StreamUtils;
import static org.junit.Assert.*; import static org.junit.Assert.*;
import static org.springframework.core.ResolvableType.forClass; import static org.springframework.core.ResolvableType.*;
/** /**
* @author Arjen Poutsma * @author Arjen Poutsma
@ -67,9 +62,7 @@ public class ResourceDecoderTests extends AbstractDecoderTestCase<ResourceDecode
@Override @Override
@Test @Test
public void decode() { public void decode() {
Flux<DataBuffer> input = Flux.concat( Flux<DataBuffer> input = Flux.concat(dataBuffer(this.fooBytes), dataBuffer(this.barBytes));
dataBuffer(this.fooBytes),
dataBuffer(this.barBytes));
testDecodeAll(input, Resource.class, step -> step testDecodeAll(input, Resource.class, step -> step
.consumeNextWith(resource -> { .consumeNextWith(resource -> {
@ -85,21 +78,6 @@ public class ResourceDecoderTests extends AbstractDecoderTestCase<ResourceDecode
.verify()); .verify());
} }
@Override
protected void testDecodeError(Publisher<DataBuffer> input, ResolvableType outputType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
input = Flux.concat(
Flux.from(input).take(1),
Flux.error(new InputException()));
Flux<Resource> result = this.decoder.decode(input, outputType, mimeType, hints);
StepVerifier.create(result)
.expectError(InputException.class)
.verify();
}
@Override @Override
public void decodeToMono() { public void decodeToMono() {
Flux<DataBuffer> input = Flux.concat( Flux<DataBuffer> input = Flux.concat(

View File

@ -19,7 +19,6 @@ package org.springframework.core.codec;
import java.util.Collections; import java.util.Collections;
import java.util.function.Consumer; import java.util.function.Consumer;
import io.netty.buffer.PooledByteBufAllocator;
import org.junit.After; import org.junit.After;
import org.junit.Test; import org.junit.Test;
import org.reactivestreams.Subscription; import org.reactivestreams.Subscription;
@ -34,7 +33,6 @@ import org.springframework.core.io.Resource;
import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.LeakAwareDataBufferFactory; import org.springframework.core.io.buffer.LeakAwareDataBufferFactory;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.core.io.buffer.support.DataBufferTestUtils; import org.springframework.core.io.buffer.support.DataBufferTestUtils;
import org.springframework.core.io.support.ResourceRegion; import org.springframework.core.io.support.ResourceRegion;
import org.springframework.util.MimeType; import org.springframework.util.MimeType;
@ -51,8 +49,7 @@ public class ResourceRegionEncoderTests {
private ResourceRegionEncoder encoder = new ResourceRegionEncoder(); private ResourceRegionEncoder encoder = new ResourceRegionEncoder();
private LeakAwareDataBufferFactory bufferFactory = private LeakAwareDataBufferFactory bufferFactory = new LeakAwareDataBufferFactory();
new LeakAwareDataBufferFactory(new NettyDataBufferFactory(PooledByteBufAllocator.DEFAULT));
@After @After

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2018 the original author or authors. * Copyright 2002-2019 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -19,18 +19,16 @@ package org.springframework.core.codec;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map;
import org.junit.Test; import org.junit.Test;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.test.StepVerifier; import reactor.test.StepVerifier;
import org.springframework.core.ResolvableType; import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.lang.Nullable;
import org.springframework.util.MimeType; import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils; import org.springframework.util.MimeTypeUtils;
@ -61,10 +59,8 @@ public class StringDecoderTests extends AbstractDecoderTestCase<StringDecoder> {
assertTrue(this.decoder.canDecode(TYPE, MimeTypeUtils.TEXT_HTML)); assertTrue(this.decoder.canDecode(TYPE, MimeTypeUtils.TEXT_HTML));
assertTrue(this.decoder.canDecode(TYPE, MimeTypeUtils.APPLICATION_JSON)); assertTrue(this.decoder.canDecode(TYPE, MimeTypeUtils.APPLICATION_JSON));
assertTrue(this.decoder.canDecode(TYPE, MimeTypeUtils.parseMimeType("text/plain;charset=utf-8"))); assertTrue(this.decoder.canDecode(TYPE, MimeTypeUtils.parseMimeType("text/plain;charset=utf-8")));
assertFalse(this.decoder.canDecode( assertFalse(this.decoder.canDecode(ResolvableType.forClass(Integer.class), MimeTypeUtils.TEXT_PLAIN));
ResolvableType.forClass(Integer.class), MimeTypeUtils.TEXT_PLAIN)); assertFalse(this.decoder.canDecode(ResolvableType.forClass(Object.class), MimeTypeUtils.APPLICATION_JSON));
assertFalse(this.decoder.canDecode(
ResolvableType.forClass(Object.class), MimeTypeUtils.APPLICATION_JSON));
} }
@Override @Override
@ -76,24 +72,7 @@ public class StringDecoderTests extends AbstractDecoderTestCase<StringDecoder> {
String s = String.format("%s\n%s\n%s", u, e, o); String s = String.format("%s\n%s\n%s", u, e, o);
Flux<DataBuffer> input = toDataBuffers(s, 1, UTF_8); Flux<DataBuffer> input = toDataBuffers(s, 1, UTF_8);
testDecodeAll(input, ResolvableType.forClass(String.class), step -> step testDecodeAll(input, TYPE, step -> step.expectNext(u, e, o).verifyComplete(), null, null);
.expectNext(u, e, o)
.verifyComplete(), null, null);
}
@Override
protected void testDecodeError(Publisher<DataBuffer> input, ResolvableType outputType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
input = Flux.concat(
Flux.from(input).take(1),
Flux.error(new InputException()));
Flux<String> result = this.decoder.decode(input, outputType, mimeType, hints);
StepVerifier.create(result)
.expectError(InputException.class)
.verify();
} }
@Test @Test
@ -105,21 +84,21 @@ public class StringDecoderTests extends AbstractDecoderTestCase<StringDecoder> {
Flux<DataBuffer> source = toDataBuffers(s, 2, UTF_16BE); Flux<DataBuffer> source = toDataBuffers(s, 2, UTF_16BE);
MimeType mimeType = MimeTypeUtils.parseMimeType("text/plain;charset=utf-16be"); MimeType mimeType = MimeTypeUtils.parseMimeType("text/plain;charset=utf-16be");
testDecode(source, TYPE, step -> step testDecode(source, TYPE, step -> step.expectNext(u, e, o).verifyComplete(), mimeType, null);
.expectNext(u, e, o)
.verifyComplete(), mimeType, null);
} }
private Flux<DataBuffer> toDataBuffers(String s, int length, Charset charset) { private Flux<DataBuffer> toDataBuffers(String s, int length, Charset charset) {
byte[] bytes = s.getBytes(charset); byte[] bytes = s.getBytes(charset);
List<byte[]> chunks = new ArrayList<>();
List<DataBuffer> dataBuffers = new ArrayList<>();
for (int i = 0; i < bytes.length; i += length) { for (int i = 0; i < bytes.length; i += length) {
DataBuffer dataBuffer = this.bufferFactory.allocateBuffer(length); chunks.add(Arrays.copyOfRange(bytes, i, i + length));
dataBuffer.write(bytes, i, length);
dataBuffers.add(dataBuffer);
} }
return Flux.fromIterable(dataBuffers); return Flux.fromIterable(chunks)
.map(chunk -> {
DataBuffer dataBuffer = this.bufferFactory.allocateBuffer(length);
dataBuffer.write(chunk, 0, chunk.length);
return dataBuffer;
});
} }
@Test @Test

View File

@ -63,6 +63,11 @@ class LeakAwareDataBuffer implements PooledDataBuffer {
return this.leakError; return this.leakError;
} }
public DataBuffer getDelegate() {
return this.delegate;
}
@Override @Override
public boolean isAllocated() { public boolean isAllocated() {
return this.delegate instanceof PooledDataBuffer && return this.delegate instanceof PooledDataBuffer &&

View File

@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import io.netty.buffer.PooledByteBufAllocator;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
@ -55,7 +56,7 @@ public class LeakAwareDataBufferFactory implements DataBufferFactory {
* {@link DefaultDataBufferFactory}. * {@link DefaultDataBufferFactory}.
*/ */
public LeakAwareDataBufferFactory() { public LeakAwareDataBufferFactory() {
this(new DefaultDataBufferFactory()); this(new NettyDataBufferFactory(PooledByteBufAllocator.DEFAULT));
} }
/** /**
@ -67,6 +68,7 @@ public class LeakAwareDataBufferFactory implements DataBufferFactory {
this.delegate = delegate; this.delegate = delegate;
} }
/** /**
* Checks whether all of the data buffers allocated by this factory have also been released. * Checks whether all of the data buffers allocated by this factory have also been released.
* If not, then an {@link AssertionError} is thrown. Typically used from a JUnit {@link After} * If not, then an {@link AssertionError} is thrown. Typically used from a JUnit {@link After}
@ -126,6 +128,10 @@ public class LeakAwareDataBufferFactory implements DataBufferFactory {
@Override @Override
public DataBuffer join(List<? extends DataBuffer> dataBuffers) { public DataBuffer join(List<? extends DataBuffer> dataBuffers) {
// Remove LeakAwareDataBuffer wrapper so delegate can find native buffers
dataBuffers = dataBuffers.stream()
.map(o -> o instanceof LeakAwareDataBuffer ? ((LeakAwareDataBuffer) o).getDelegate() : o)
.collect(Collectors.toList());
return new LeakAwareDataBuffer(this.delegate.join(dataBuffers), this); return new LeakAwareDataBuffer(this.delegate.join(dataBuffers), this);
} }

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2018 the original author or authors. * Copyright 2002-2019 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -25,8 +25,10 @@ import java.util.function.Function;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.DefaultDataBufferFactory; import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.http.HttpHeaders; import org.springframework.http.HttpHeaders;
@ -54,10 +56,17 @@ public class MockServerHttpResponse extends AbstractServerHttpResponse {
public MockServerHttpResponse() { public MockServerHttpResponse() {
super(new DefaultDataBufferFactory()); this(new DefaultDataBufferFactory());
}
public MockServerHttpResponse(DataBufferFactory dataBufferFactory) {
super(dataBufferFactory);
this.writeHandler = body -> { this.writeHandler = body -> {
this.body = body.cache(); // Avoid .then() which causes data buffers to be released
return this.body.then(); MonoProcessor<Void> completion = MonoProcessor.create();
this.body = body.doOnComplete(completion::onComplete).doOnError(completion::onError).cache();
this.body.subscribe();
return completion;
}; };
} }
@ -125,8 +134,10 @@ public class MockServerHttpResponse extends AbstractServerHttpResponse {
* charset or "UTF-8" by default. * charset or "UTF-8" by default.
*/ */
public Mono<String> getBodyAsString() { public Mono<String> getBodyAsString() {
Charset charset = Optional.ofNullable(getHeaders().getContentType()).map(MimeType::getCharset) Charset charset = Optional.ofNullable(getHeaders().getContentType()).map(MimeType::getCharset)
.orElse(StandardCharsets.UTF_8); .orElse(StandardCharsets.UTF_8);
return getBody() return getBody()
.reduce(bufferFactory().allocateBuffer(), (previous, current) -> { .reduce(bufferFactory().allocateBuffer(), (previous, current) -> {
previous.write(current); previous.write(current);
@ -137,8 +148,10 @@ public class MockServerHttpResponse extends AbstractServerHttpResponse {
} }
private static String bufferToString(DataBuffer buffer, Charset charset) { private static String bufferToString(DataBuffer buffer, Charset charset) {
Assert.notNull(charset, "'charset' must not be null");
byte[] bytes = new byte[buffer.readableByteCount()]; byte[] bytes = new byte[buffer.readableByteCount()];
buffer.read(bytes); buffer.read(bytes);
DataBufferUtils.release(buffer);
return new String(bytes, charset); return new String(bytes, charset);
} }

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2018 the original author or authors. * Copyright 2002-2019 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -81,8 +81,16 @@ class ReactorClientHttpRequest extends AbstractClientHttpRequest implements Zero
@Override @Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) { public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
return doCommit(() -> { return doCommit(() -> {
// Send as Mono if possible as an optimization hint to Reactor Netty
if (body instanceof Mono) {
Mono<ByteBuf> byteBufMono = Mono.from(body).map(NettyDataBufferFactory::toByteBuf);
return this.outbound.send(byteBufMono).then();
}
else {
Flux<ByteBuf> byteBufFlux = Flux.from(body).map(NettyDataBufferFactory::toByteBuf); Flux<ByteBuf> byteBufFlux = Flux.from(body).map(NettyDataBufferFactory::toByteBuf);
return this.outbound.send(byteBufFlux).then(); return this.outbound.send(byteBufFlux).then();
}
}); });
} }

View File

@ -125,16 +125,14 @@ public class EncoderHttpMessageWriter<T> implements HttpMessageWriter<T> {
})) }))
.flatMap(buffer -> { .flatMap(buffer -> {
headers.setContentLength(buffer.readableByteCount()); headers.setContentLength(buffer.readableByteCount());
return message.writeWith( return message.writeWith(Mono.just(buffer)
Mono.fromCallable(() -> buffer)
.doOnDiscard(PooledDataBuffer.class, PooledDataBuffer::release)); .doOnDiscard(PooledDataBuffer.class, PooledDataBuffer::release));
}); });
} }
if (isStreamingMediaType(contentType)) { if (isStreamingMediaType(contentType)) {
return message.writeAndFlushWith(body.map(buffer -> return message.writeAndFlushWith(body.map(buffer ->
Mono.fromCallable(() -> buffer) Mono.just(buffer).doOnDiscard(PooledDataBuffer.class, PooledDataBuffer::release)));
.doOnDiscard(PooledDataBuffer.class, PooledDataBuffer::release)));
} }
return message.writeWith(body); return message.writeWith(body);

View File

@ -28,6 +28,8 @@ import reactor.core.publisher.Mono;
import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.PooledDataBuffer;
import org.springframework.http.HttpHeaders; import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpLogging; import org.springframework.http.HttpLogging;
import org.springframework.http.HttpStatus; import org.springframework.http.HttpStatus;
@ -172,9 +174,16 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse {
} }
@Override @Override
@SuppressWarnings("unchecked")
public final Mono<Void> writeWith(Publisher<? extends DataBuffer> body) { public final Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
return new ChannelSendOperator<>(body, // Write as Mono if possible as an optimization hint to Reactor Netty
writePublisher -> doCommit(() -> writeWithInternal(writePublisher))) // ChannelSendOperator not necessary for Mono
if (body instanceof Mono) {
return ((Mono<? extends DataBuffer>) body).flatMap(buffer ->
doCommit(() -> writeWithInternal(Mono.just(buffer)))
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release));
}
return new ChannelSendOperator<>(body, inner -> doCommit(() -> writeWithInternal(inner)))
.doOnError(t -> removeContentLength()); .doOnError(t -> removeContentLength());
} }

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2018 the original author or authors. * Copyright 2002-2019 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -85,8 +85,7 @@ public class FormHttpMessageWriterTests extends AbstractLeakCheckingTestCase {
String expected = "name+1=value+1&name+2=value+2%2B1&name+2=value+2%2B2&name+3"; String expected = "name+1=value+1&name+2=value+2%2B1&name+2=value+2%2B2&name+3";
StepVerifier.create(response.getBody()) StepVerifier.create(response.getBody())
.consumeNextWith(stringConsumer( .consumeNextWith(stringConsumer(expected))
expected))
.expectComplete() .expectComplete()
.verify(); .verify();
HttpHeaders headers = response.getHeaders(); HttpHeaders headers = response.getHeaders();
@ -96,8 +95,7 @@ public class FormHttpMessageWriterTests extends AbstractLeakCheckingTestCase {
private Consumer<DataBuffer> stringConsumer(String expected) { private Consumer<DataBuffer> stringConsumer(String expected) {
return dataBuffer -> { return dataBuffer -> {
String value = String value = DataBufferTestUtils.dumpString(dataBuffer, StandardCharsets.UTF_8);
DataBufferTestUtils.dumpString(dataBuffer, StandardCharsets.UTF_8);
DataBufferUtils.release(dataBuffer); DataBufferUtils.release(dataBuffer);
assertEquals(expected, value); assertEquals(expected, value);
}; };

View File

@ -24,7 +24,6 @@ import java.util.List;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import io.netty.buffer.ByteBufAllocator;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
@ -38,7 +37,6 @@ import reactor.test.StepVerifier;
import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.LeakAwareDataBufferFactory; import org.springframework.core.io.buffer.LeakAwareDataBufferFactory;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import static org.junit.Assert.*; import static org.junit.Assert.*;
@ -135,8 +133,7 @@ public class ChannelSendOperatorTests {
@Test // gh-22720 @Test // gh-22720
public void cancelWhileItemCached() { public void cancelWhileItemCached() {
NettyDataBufferFactory delegate = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT); LeakAwareDataBufferFactory bufferFactory = new LeakAwareDataBufferFactory();
LeakAwareDataBufferFactory bufferFactory = new LeakAwareDataBufferFactory(delegate);
ChannelSendOperator<DataBuffer> operator = new ChannelSendOperator<>( ChannelSendOperator<DataBuffer> operator = new ChannelSendOperator<>(
Mono.fromCallable(() -> { Mono.fromCallable(() -> {
@ -164,8 +161,7 @@ public class ChannelSendOperatorTests {
// 2. writeFunction applied and writeCompletionBarrier subscribed to it // 2. writeFunction applied and writeCompletionBarrier subscribed to it
// 3. Write Publisher fails right after that and before request(n) from server // 3. Write Publisher fails right after that and before request(n) from server
NettyDataBufferFactory delegate = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT); LeakAwareDataBufferFactory bufferFactory = new LeakAwareDataBufferFactory();
LeakAwareDataBufferFactory bufferFactory = new LeakAwareDataBufferFactory(delegate);
ZeroDemandSubscriber writeSubscriber = new ZeroDemandSubscriber(); ZeroDemandSubscriber writeSubscriber = new ZeroDemandSubscriber();
ChannelSendOperator<DataBuffer> operator = new ChannelSendOperator<>( ChannelSendOperator<DataBuffer> operator = new ChannelSendOperator<>(
@ -200,8 +196,7 @@ public class ChannelSendOperatorTests {
// 2. writeFunction applied and writeCompletionBarrier subscribed to it // 2. writeFunction applied and writeCompletionBarrier subscribed to it
// 3. writeFunction fails, e.g. to flush status and headers, before request(n) from server // 3. writeFunction fails, e.g. to flush status and headers, before request(n) from server
NettyDataBufferFactory delegate = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT); LeakAwareDataBufferFactory bufferFactory = new LeakAwareDataBufferFactory();
LeakAwareDataBufferFactory bufferFactory = new LeakAwareDataBufferFactory(delegate);
ChannelSendOperator<DataBuffer> operator = new ChannelSendOperator<>( ChannelSendOperator<DataBuffer> operator = new ChannelSendOperator<>(
Flux.create(sink -> { Flux.create(sink -> {

View File

@ -25,6 +25,7 @@ import java.util.function.Function;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.DataBufferFactory;
@ -61,8 +62,11 @@ public class MockServerHttpResponse extends AbstractServerHttpResponse {
public MockServerHttpResponse(DataBufferFactory dataBufferFactory) { public MockServerHttpResponse(DataBufferFactory dataBufferFactory) {
super(dataBufferFactory); super(dataBufferFactory);
this.writeHandler = body -> { this.writeHandler = body -> {
this.body = body.cache(); // Avoid .then() which causes data buffers to be released
return this.body.then(); MonoProcessor<Void> completion = MonoProcessor.create();
this.body = body.doOnComplete(completion::onComplete).doOnError(completion::onError).cache();
this.body.subscribe();
return completion;
}; };
} }

View File

@ -135,8 +135,7 @@ public class ModelAttributeMethodArgumentResolver extends HandlerMethodArgumentR
BindingResult errors = binder.getBindingResult(); BindingResult errors = binder.getBindingResult();
if (adapter != null) { if (adapter != null) {
return adapter.fromPublisher(errors.hasErrors() ? return adapter.fromPublisher(errors.hasErrors() ?
Mono.error(new WebExchangeBindException(parameter, errors)) : Mono.error(new WebExchangeBindException(parameter, errors)) : valueMono);
valueMono);
} }
else { else {
if (errors.hasErrors() && !hasErrorsArgument(parameter)) { if (errors.hasErrors() && !hasErrorsArgument(parameter)) {

View File

@ -17,7 +17,6 @@ package org.springframework.web.reactive.result.view;
import java.util.function.Supplier; import java.util.function.Supplier;
import io.netty.buffer.PooledByteBufAllocator;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription; import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber; import reactor.core.publisher.BaseSubscriber;
@ -26,7 +25,6 @@ import reactor.core.publisher.Mono;
import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.LeakAwareDataBufferFactory; import org.springframework.core.io.buffer.LeakAwareDataBufferFactory;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.http.HttpHeaders; import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus; import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseCookie; import org.springframework.http.ResponseCookie;
@ -47,8 +45,7 @@ public class ZeroDemandResponse implements ServerHttpResponse {
public ZeroDemandResponse() { public ZeroDemandResponse() {
NettyDataBufferFactory delegate = new NettyDataBufferFactory(PooledByteBufAllocator.DEFAULT); this.bufferFactory = new LeakAwareDataBufferFactory();
this.bufferFactory = new LeakAwareDataBufferFactory(delegate);
} }