Sync ChannelSendOperator copy in spring-messaging

See gh-22720
This commit is contained in:
Rossen Stoyanchev 2019-04-01 17:22:54 -04:00
parent 20742c74dc
commit a94868158e
1 changed files with 30 additions and 6 deletions

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2018 the original author or authors. * Copyright 2002-2019 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -28,6 +28,8 @@ import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators; import reactor.core.publisher.Operators;
import reactor.util.context.Context; import reactor.util.context.Context;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.lang.Nullable; import org.springframework.lang.Nullable;
import org.springframework.util.Assert; import org.springframework.util.Assert;
@ -287,13 +289,20 @@ class ChannelSendOperator<T> extends Mono<Void> implements Scannable {
} }
private boolean emitCachedSignals() { private boolean emitCachedSignals() {
if (this.item != null) {
requiredWriteSubscriber().onNext(this.item);
}
if (this.error != null) { if (this.error != null) {
try {
requiredWriteSubscriber().onError(this.error); requiredWriteSubscriber().onError(this.error);
}
finally {
releaseCachedItem();
}
return true; return true;
} }
T item = this.item;
this.item = null;
if (item != null) {
requiredWriteSubscriber().onNext(item);
}
if (this.completed) { if (this.completed) {
requiredWriteSubscriber().onComplete(); requiredWriteSubscriber().onComplete();
return true; return true;
@ -306,8 +315,23 @@ class ChannelSendOperator<T> extends Mono<Void> implements Scannable {
Subscription s = this.subscription; Subscription s = this.subscription;
if (s != null) { if (s != null) {
this.subscription = null; this.subscription = null;
try {
s.cancel(); s.cancel();
} }
finally {
releaseCachedItem();
}
}
}
private void releaseCachedItem() {
synchronized (this) {
Object item = this.item;
if (item instanceof DataBuffer) {
DataBufferUtils.release((DataBuffer) item);
}
this.item = null;
}
} }