Align InputStreamSubscriber copies

There are legitimate differences, but also some are fixes that
should be on both sides.

See gh-31677
This commit is contained in:
rstoyanchev 2024-10-25 14:02:24 +01:00
parent 37622a7f90
commit a366ea0e15
2 changed files with 31 additions and 23 deletions

View File

@ -1,11 +1,5 @@
package org.springframework.core.io.buffer;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.lang.Nullable;
import reactor.core.Exceptions;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
@ -18,6 +12,14 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantLock;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.Exceptions;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
/**
* Bridges between {@link Publisher Publisher<DataBuffer>} and {@link InputStream}.
*
@ -73,6 +75,8 @@ final class InputStreamSubscriber extends InputStream implements Subscriber<Data
@Override
public void onNext(DataBuffer t) {
Assert.notNull(t, "DataBuffer must not be null");
if (this.done) {
discard(t);
return;
@ -351,5 +355,4 @@ final class InputStreamSubscriber extends InputStream implements Subscriber<Data
}
}
}

View File

@ -1,15 +1,5 @@
package org.springframework.http.client;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import reactor.core.Exceptions;
import java.io.IOException;
import java.io.InputStream;
import java.util.ConcurrentModificationException;
@ -24,6 +14,17 @@ import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.Exceptions;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
/**
* Bridges between {@link Flow.Publisher Flow.Publisher&lt;T&gt;} and {@link InputStream}.
*
@ -43,10 +44,10 @@ final class InputStreamSubscriber<T> extends InputStream implements Flow.Subscri
final int prefetch;
final int limit;
final Function<T, byte[]> mapper;
final Consumer<T> onDiscardHandler;
final ReentrantLock lock;
final Queue<T> queue;
final Function<T, byte[]> mapper;
final Consumer<T> onDiscardHandler;
final AtomicReference<Object> parkedThread = new AtomicReference<>();
final AtomicInteger workAmount = new AtomicInteger();
@ -248,20 +249,24 @@ final class InputStreamSubscriber<T> extends InputStream implements Flow.Subscri
byte[] bytes = getBytesOrAwait();
if (bytes == DONE) {
this.closed = true;
cleanAndFinalize();
if (this.error == null) {
this.closed = true;
return j == 0 ? -1 : j;
}
else {
throw Exceptions.propagate(error);
if (j == 0) {
this.closed = true;
throw Exceptions.propagate(error);
}
return j;
}
} else if (bytes == CLOSED) {
this.s.cancel();
cleanAndFinalize();
return -1;
}
int i = this.position;
for (; i < bytes.length && j < len; i++, j++) {
b[off + j] = bytes[i];
@ -311,7 +316,7 @@ final class InputStreamSubscriber<T> extends InputStream implements Flow.Subscri
actualWorkAmount = workAmount.addAndGet(-actualWorkAmount);
if (actualWorkAmount == 0) {
await();
await();
}
}
}