Fix guard against multiple subscriptions
This commit changes the guard against multiple subscriptions, as the previously used doOnSubscribe hook could not function as guard in certain scenarios. Closes gh-32727
This commit is contained in:
parent
f1a1190700
commit
a3afe51c9f
|
|
@ -16,8 +16,12 @@
|
|||
|
||||
package org.springframework.http.client.reactive;
|
||||
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.reactivestreams.Publisher;
|
||||
import org.reactivestreams.Subscriber;
|
||||
import org.reactivestreams.Subscription;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
import org.springframework.core.io.buffer.DataBuffer;
|
||||
|
|
@ -55,16 +59,7 @@ public abstract class AbstractClientHttpResponse implements ClientHttpResponse {
|
|||
this.statusCode = statusCode;
|
||||
this.headers = headers;
|
||||
this.cookies = cookies;
|
||||
this.body = singleSubscription(body);
|
||||
}
|
||||
|
||||
private static Flux<DataBuffer> singleSubscription(Flux<DataBuffer> body) {
|
||||
AtomicBoolean subscribed = new AtomicBoolean();
|
||||
return body.doOnSubscribe(s -> {
|
||||
if (!subscribed.compareAndSet(false, true)) {
|
||||
throw new IllegalStateException("The client response body can only be consumed once");
|
||||
}
|
||||
});
|
||||
this.body = Flux.from(new SingleSubscriberPublisher<>(body));
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -87,4 +82,39 @@ public abstract class AbstractClientHttpResponse implements ClientHttpResponse {
|
|||
public Flux<DataBuffer> getBody() {
|
||||
return this.body;
|
||||
}
|
||||
|
||||
|
||||
private static final class SingleSubscriberPublisher<T> implements Publisher<T> {
|
||||
|
||||
private static final Subscription NO_OP_SUBSCRIPTION = new Subscription() {
|
||||
@Override
|
||||
public void request(long l) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel() {
|
||||
}
|
||||
};
|
||||
|
||||
private final Publisher<T> delegate;
|
||||
|
||||
private final AtomicBoolean subscribed = new AtomicBoolean();
|
||||
|
||||
|
||||
public SingleSubscriberPublisher(Publisher<T> delegate) {
|
||||
this.delegate = delegate;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void subscribe(Subscriber<? super T> subscriber) {
|
||||
Objects.requireNonNull(subscriber, "Subscriber must not be null");
|
||||
if (this.subscribed.compareAndSet(false, true)) {
|
||||
this.delegate.subscribe(subscriber);
|
||||
}
|
||||
else {
|
||||
subscriber.onSubscribe(NO_OP_SUBSCRIPTION);
|
||||
subscriber.onError(new IllegalStateException("The client response body can only be consumed once"));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue