Switch to Reactor snapshots and remove workaround

Following the 5.2 M1 release we can switch back to Reactor snapshots
and remove the workaround for a fix coming in Reactor Core 3.2.9.
This commit is contained in:
Rossen Stoyanchev 2019-04-10 16:09:28 -04:00
parent bb28477587
commit 6e7da62085
4 changed files with 17 additions and 22 deletions

View File

@ -40,7 +40,7 @@ ext {
log4jVersion = "2.11.2" log4jVersion = "2.11.2"
nettyVersion = "4.1.34.Final" nettyVersion = "4.1.34.Final"
quartzVersion = "2.3.1" quartzVersion = "2.3.1"
reactorVersion = "Californium-SR6" reactorVersion = "Californium-BUILD-SNAPSHOT"
rxjavaVersion = "1.3.8" rxjavaVersion = "1.3.8"
rxjavaAdapterVersion = "1.2.1" rxjavaAdapterVersion = "1.2.1"
rxjava2Version = "2.2.8" rxjava2Version = "2.2.8"
@ -149,6 +149,7 @@ configure(allprojects) { project ->
repositories { repositories {
maven { url "https://repo.spring.io/libs-release" } maven { url "https://repo.spring.io/libs-release" }
maven { url "https://repo.spring.io/snapshot" } // Reactor
maven { url "https://oss.jfrog.org/artifactory/libs-snapshot" } // RSocket maven { url "https://oss.jfrog.org/artifactory/libs-snapshot" } // RSocket
mavenLocal() mavenLocal()
} }

View File

@ -57,12 +57,6 @@ public abstract class DataBufferUtils {
private static final Consumer<DataBuffer> RELEASE_CONSUMER = DataBufferUtils::release; private static final Consumer<DataBuffer> RELEASE_CONSUMER = DataBufferUtils::release;
/**
* Workaround to disable use of pooled buffers:
* https://github.com/reactor/reactor-core/issues/1634.
*/
private static final DataBufferFactory defaultDataBufferFactory = new DefaultDataBufferFactory();
//--------------------------------------------------------------------- //---------------------------------------------------------------------
// Reading // Reading
@ -141,14 +135,12 @@ public abstract class DataBufferUtils {
Assert.isTrue(position >= 0, "'position' must be >= 0"); Assert.isTrue(position >= 0, "'position' must be >= 0");
Assert.isTrue(bufferSize > 0, "'bufferSize' must be > 0"); Assert.isTrue(bufferSize > 0, "'bufferSize' must be > 0");
DataBufferFactory bufferFactoryToUse = defaultDataBufferFactory;
Flux<DataBuffer> flux = Flux.using(channelSupplier, Flux<DataBuffer> flux = Flux.using(channelSupplier,
channel -> Flux.create(sink -> { channel -> Flux.create(sink -> {
ReadCompletionHandler handler = ReadCompletionHandler handler =
new ReadCompletionHandler(channel, sink, position, bufferFactoryToUse, bufferSize); new ReadCompletionHandler(channel, sink, position, bufferFactory, bufferSize);
sink.onDispose(handler::dispose); sink.onDispose(handler::dispose);
DataBuffer dataBuffer = bufferFactoryToUse.allocateBuffer(bufferSize); DataBuffer dataBuffer = bufferFactory.allocateBuffer(bufferSize);
ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, bufferSize); ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, bufferSize);
channel.read(byteBuffer, position, dataBuffer, handler); channel.read(byteBuffer, position, dataBuffer, handler);
}), }),

View File

@ -7,7 +7,7 @@ dependencyManagement {
} }
} }
def rsocketVersion = "0.12.1-RC3" def rsocketVersion = "0.12.1-RC4-SNAPSHOT"
dependencies { dependencies {
compile(project(":spring-beans")) compile(project(":spring-beans"))

View File

@ -238,9 +238,10 @@ public class RSocketBufferLeakTests {
/** /**
* Similar {@link org.springframework.core.io.buffer.LeakAwareDataBufferFactory} * Unlike {@link org.springframework.core.io.buffer.LeakAwareDataBufferFactory}
* but extends {@link NettyDataBufferFactory} rather than rely on * this one is an instance of {@link NettyDataBufferFactory} which is necessary
* decoration, since {@link PayloadUtils} does instanceof checks. * since {@link PayloadUtils} does instanceof checks, and that also allows
* intercepting {@link NettyDataBufferFactory#wrap(ByteBuf)}.
*/ */
private static class LeakAwareNettyDataBufferFactory extends NettyDataBufferFactory { private static class LeakAwareNettyDataBufferFactory extends NettyDataBufferFactory {
@ -277,32 +278,33 @@ public class RSocketBufferLeakTests {
@Override @Override
public NettyDataBuffer allocateBuffer() { public NettyDataBuffer allocateBuffer() {
return (NettyDataBuffer) record(super.allocateBuffer()); return (NettyDataBuffer) recordHint(super.allocateBuffer());
} }
@Override @Override
public NettyDataBuffer allocateBuffer(int initialCapacity) { public NettyDataBuffer allocateBuffer(int initialCapacity) {
return (NettyDataBuffer) record(super.allocateBuffer(initialCapacity)); return (NettyDataBuffer) recordHint(super.allocateBuffer(initialCapacity));
} }
@Override @Override
public NettyDataBuffer wrap(ByteBuf byteBuf) { public NettyDataBuffer wrap(ByteBuf byteBuf) {
NettyDataBuffer dataBuffer = super.wrap(byteBuf); NettyDataBuffer dataBuffer = super.wrap(byteBuf);
if (byteBuf != Unpooled.EMPTY_BUFFER) { if (byteBuf != Unpooled.EMPTY_BUFFER) {
record(dataBuffer); recordHint(dataBuffer);
} }
return dataBuffer; return dataBuffer;
} }
@Override @Override
public DataBuffer join(List<? extends DataBuffer> dataBuffers) { public DataBuffer join(List<? extends DataBuffer> dataBuffers) {
return record(super.join(dataBuffers)); return recordHint(super.join(dataBuffers));
} }
private DataBuffer record(DataBuffer buffer) { private DataBuffer recordHint(DataBuffer buffer) {
this.created.add(new DataBufferLeakInfo(buffer, new AssertionError(String.format( AssertionError error = new AssertionError(String.format(
"DataBuffer leak: {%s} {%s} not released.%nStacktrace at buffer creation: ", buffer, "DataBuffer leak: {%s} {%s} not released.%nStacktrace at buffer creation: ", buffer,
ObjectUtils.getIdentityHexString(((NettyDataBuffer) buffer).getNativeBuffer()))))); ObjectUtils.getIdentityHexString(((NettyDataBuffer) buffer).getNativeBuffer())));
this.created.add(new DataBufferLeakInfo(buffer, error));
return buffer; return buffer;
} }
} }