From a366ea0e15cd0c1e6f21e1ee1561842b5e936b99 Mon Sep 17 00:00:00 2001 From: rstoyanchev Date: Fri, 25 Oct 2024 14:02:24 +0100 Subject: [PATCH] Align InputStreamSubscriber copies There are legitimate differences, but also some are fixes that should be on both sides. See gh-31677 --- .../core/io/buffer/InputStreamSubscriber.java | 17 +++++---- .../http/client/InputStreamSubscriber.java | 37 +++++++++++-------- 2 files changed, 31 insertions(+), 23 deletions(-) diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/InputStreamSubscriber.java b/spring-core/src/main/java/org/springframework/core/io/buffer/InputStreamSubscriber.java index b364927d95..b5a89c9bae 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/InputStreamSubscriber.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/InputStreamSubscriber.java @@ -1,11 +1,5 @@ package org.springframework.core.io.buffer; -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; -import org.springframework.lang.Nullable; -import reactor.core.Exceptions; - import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; @@ -18,6 +12,14 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.LockSupport; import java.util.concurrent.locks.ReentrantLock; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import reactor.core.Exceptions; + +import org.springframework.lang.Nullable; +import org.springframework.util.Assert; + /** * Bridges between {@link Publisher Publisher<DataBuffer>} and {@link InputStream}. * @@ -73,6 +75,8 @@ final class InputStreamSubscriber extends InputStream implements Subscriber extends InputStream implements Flow.Subscri final int prefetch; final int limit; - final Function mapper; - final Consumer onDiscardHandler; final ReentrantLock lock; final Queue queue; + final Function mapper; + final Consumer onDiscardHandler; final AtomicReference parkedThread = new AtomicReference<>(); final AtomicInteger workAmount = new AtomicInteger(); @@ -248,20 +249,24 @@ final class InputStreamSubscriber extends InputStream implements Flow.Subscri byte[] bytes = getBytesOrAwait(); if (bytes == DONE) { - this.closed = true; cleanAndFinalize(); if (this.error == null) { + this.closed = true; return j == 0 ? -1 : j; } else { - throw Exceptions.propagate(error); + if (j == 0) { + this.closed = true; + throw Exceptions.propagate(error); + } + + return j; } } else if (bytes == CLOSED) { this.s.cancel(); cleanAndFinalize(); return -1; } - int i = this.position; for (; i < bytes.length && j < len; i++, j++) { b[off + j] = bytes[i]; @@ -311,7 +316,7 @@ final class InputStreamSubscriber extends InputStream implements Flow.Subscri actualWorkAmount = workAmount.addAndGet(-actualWorkAmount); if (actualWorkAmount == 0) { - await(); + await(); } } }