diff --git a/build.gradle b/build.gradle index b3d4eeb59d..3b296f64a5 100644 --- a/build.gradle +++ b/build.gradle @@ -40,7 +40,7 @@ ext { log4jVersion = "2.11.2" nettyVersion = "4.1.34.Final" quartzVersion = "2.3.1" - reactorVersion = "Californium-SR6" + reactorVersion = "Californium-BUILD-SNAPSHOT" rxjavaVersion = "1.3.8" rxjavaAdapterVersion = "1.2.1" rxjava2Version = "2.2.8" @@ -149,6 +149,7 @@ configure(allprojects) { project -> repositories { maven { url "https://repo.spring.io/libs-release" } + maven { url "https://repo.spring.io/snapshot" } // Reactor maven { url "https://oss.jfrog.org/artifactory/libs-snapshot" } // RSocket mavenLocal() } diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java index cc7a70916f..b12f9434b2 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java @@ -57,12 +57,6 @@ public abstract class DataBufferUtils { private static final Consumer RELEASE_CONSUMER = DataBufferUtils::release; - /** - * Workaround to disable use of pooled buffers: - * https://github.com/reactor/reactor-core/issues/1634. - */ - private static final DataBufferFactory defaultDataBufferFactory = new DefaultDataBufferFactory(); - //--------------------------------------------------------------------- // Reading @@ -141,14 +135,12 @@ public abstract class DataBufferUtils { Assert.isTrue(position >= 0, "'position' must be >= 0"); Assert.isTrue(bufferSize > 0, "'bufferSize' must be > 0"); - DataBufferFactory bufferFactoryToUse = defaultDataBufferFactory; - Flux flux = Flux.using(channelSupplier, channel -> Flux.create(sink -> { ReadCompletionHandler handler = - new ReadCompletionHandler(channel, sink, position, bufferFactoryToUse, bufferSize); + new ReadCompletionHandler(channel, sink, position, bufferFactory, bufferSize); sink.onDispose(handler::dispose); - DataBuffer dataBuffer = bufferFactoryToUse.allocateBuffer(bufferSize); + DataBuffer dataBuffer = bufferFactory.allocateBuffer(bufferSize); ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, bufferSize); channel.read(byteBuffer, position, dataBuffer, handler); }), diff --git a/spring-messaging/spring-messaging.gradle b/spring-messaging/spring-messaging.gradle index a98d5f3b9d..e9fa9fecc5 100644 --- a/spring-messaging/spring-messaging.gradle +++ b/spring-messaging/spring-messaging.gradle @@ -7,7 +7,7 @@ dependencyManagement { } } -def rsocketVersion = "0.12.1-RC3" +def rsocketVersion = "0.12.1-RC4-SNAPSHOT" dependencies { compile(project(":spring-beans")) diff --git a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketBufferLeakTests.java b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketBufferLeakTests.java index 0614e4c2ca..70fd9d29a7 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketBufferLeakTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketBufferLeakTests.java @@ -238,9 +238,10 @@ public class RSocketBufferLeakTests { /** - * Similar {@link org.springframework.core.io.buffer.LeakAwareDataBufferFactory} - * but extends {@link NettyDataBufferFactory} rather than rely on - * decoration, since {@link PayloadUtils} does instanceof checks. + * Unlike {@link org.springframework.core.io.buffer.LeakAwareDataBufferFactory} + * this one is an instance of {@link NettyDataBufferFactory} which is necessary + * since {@link PayloadUtils} does instanceof checks, and that also allows + * intercepting {@link NettyDataBufferFactory#wrap(ByteBuf)}. */ private static class LeakAwareNettyDataBufferFactory extends NettyDataBufferFactory { @@ -277,32 +278,33 @@ public class RSocketBufferLeakTests { @Override public NettyDataBuffer allocateBuffer() { - return (NettyDataBuffer) record(super.allocateBuffer()); + return (NettyDataBuffer) recordHint(super.allocateBuffer()); } @Override public NettyDataBuffer allocateBuffer(int initialCapacity) { - return (NettyDataBuffer) record(super.allocateBuffer(initialCapacity)); + return (NettyDataBuffer) recordHint(super.allocateBuffer(initialCapacity)); } @Override public NettyDataBuffer wrap(ByteBuf byteBuf) { NettyDataBuffer dataBuffer = super.wrap(byteBuf); if (byteBuf != Unpooled.EMPTY_BUFFER) { - record(dataBuffer); + recordHint(dataBuffer); } return dataBuffer; } @Override public DataBuffer join(List dataBuffers) { - return record(super.join(dataBuffers)); + return recordHint(super.join(dataBuffers)); } - private DataBuffer record(DataBuffer buffer) { - this.created.add(new DataBufferLeakInfo(buffer, new AssertionError(String.format( + private DataBuffer recordHint(DataBuffer buffer) { + AssertionError error = new AssertionError(String.format( "DataBuffer leak: {%s} {%s} not released.%nStacktrace at buffer creation: ", buffer, - ObjectUtils.getIdentityHexString(((NettyDataBuffer) buffer).getNativeBuffer()))))); + ObjectUtils.getIdentityHexString(((NettyDataBuffer) buffer).getNativeBuffer()))); + this.created.add(new DataBufferLeakInfo(buffer, error)); return buffer; } }