From a94868158ece76925342d9b06d75a2d2abe0cfbb Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Mon, 1 Apr 2019 17:22:54 -0400 Subject: [PATCH] Sync ChannelSendOperator copy in spring-messaging See gh-22720 --- .../reactive/ChannelSendOperator.java | 36 +++++++++++++++---- 1 file changed, 30 insertions(+), 6 deletions(-) diff --git a/spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/reactive/ChannelSendOperator.java b/spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/reactive/ChannelSendOperator.java index a46796e7456..77371140b48 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/reactive/ChannelSendOperator.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/reactive/ChannelSendOperator.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,6 +28,8 @@ import reactor.core.publisher.Mono; import reactor.core.publisher.Operators; import reactor.util.context.Context; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.lang.Nullable; import org.springframework.util.Assert; @@ -287,13 +289,20 @@ class ChannelSendOperator extends Mono implements Scannable { } private boolean emitCachedSignals() { - if (this.item != null) { - requiredWriteSubscriber().onNext(this.item); - } if (this.error != null) { - requiredWriteSubscriber().onError(this.error); + try { + requiredWriteSubscriber().onError(this.error); + } + finally { + releaseCachedItem(); + } return true; } + T item = this.item; + this.item = null; + if (item != null) { + requiredWriteSubscriber().onNext(item); + } if (this.completed) { requiredWriteSubscriber().onComplete(); return true; @@ -306,7 +315,22 @@ class ChannelSendOperator extends Mono implements Scannable { Subscription s = this.subscription; if (s != null) { this.subscription = null; - s.cancel(); + try { + s.cancel(); + } + finally { + releaseCachedItem(); + } + } + } + + private void releaseCachedItem() { + synchronized (this) { + Object item = this.item; + if (item instanceof DataBuffer) { + DataBufferUtils.release((DataBuffer) item); + } + this.item = null; } }