Propagate Reactor Context when using FluxSink

This commit makes sure that the Reactor context from a given mono or
flux is propagated to the Flux returned by a FluxSink. This change
affects both DataBufferUtils::write and internal classes used by the
DefaultPartHttpMessageReader.

Closes gh-27517
This commit is contained in:
Arjen Poutsma 2021-10-05 16:20:30 +02:00
parent 7b9848a352
commit c99210c01f
4 changed files with 74 additions and 0 deletions

View File

@ -47,6 +47,7 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink; import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.publisher.SynchronousSink; import reactor.core.publisher.SynchronousSink;
import reactor.util.context.Context;
import org.springframework.core.io.Resource; import org.springframework.core.io.Resource;
import org.springframework.lang.Nullable; import org.springframework.lang.Nullable;
@ -1057,6 +1058,12 @@ public abstract class DataBufferUtils {
protected void hookOnComplete() { protected void hookOnComplete() {
this.sink.complete(); this.sink.complete();
} }
@Override
public Context currentContext() {
return this.sink.currentContext();
}
} }
@ -1148,6 +1155,12 @@ public abstract class DataBufferUtils {
this.sink.next(dataBuffer); this.sink.next(dataBuffer);
this.dataBuffer.set(null); this.dataBuffer.set(null);
} }
@Override
public Context currentContext() {
return this.sink.currentContext();
}
} }
} }

View File

@ -24,6 +24,7 @@ import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler; import java.nio.channels.CompletionHandler;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel; import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SeekableByteChannel;
import java.nio.channels.WritableByteChannel; import java.nio.channels.WritableByteChannel;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.nio.file.Files; import java.nio.file.Files;
@ -43,6 +44,7 @@ import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.test.StepVerifier; import reactor.test.StepVerifier;
import reactor.util.context.Context;
import org.springframework.core.io.ByteArrayResource; import org.springframework.core.io.ByteArrayResource;
import org.springframework.core.io.ClassPathResource; import org.springframework.core.io.ClassPathResource;
@ -940,6 +942,53 @@ class DataBufferUtilsTests extends AbstractDataBufferAllocatingTests {
release(foo); release(foo);
} }
@ParameterizedDataBufferAllocatingTest
void propagateContextByteChannel(String displayName, DataBufferFactory bufferFactory) throws IOException {
Path path = Paths.get(this.resource.getURI());
try (SeekableByteChannel out = Files.newByteChannel(this.tempFile, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING)) {
Flux<DataBuffer> result = DataBufferUtils.read(path, bufferFactory, 1024, StandardOpenOption.READ)
.transformDeferredContextual((f, ctx) -> {
assertThat(ctx.getOrDefault("key", "EMPTY")).isEqualTo("TEST");
return f;
})
.transform(f -> DataBufferUtils.write(f, out))
.transformDeferredContextual((f, ctx) -> {
assertThat(ctx.getOrDefault("key", "EMPTY")).isEqualTo("TEST");
return f;
})
.contextWrite(Context.of("key", "TEST"));
StepVerifier.create(result)
.consumeNextWith(DataBufferUtils::release)
.verifyComplete();
}
}
@ParameterizedDataBufferAllocatingTest
void propagateContextAsynchronousFileChannel(String displayName, DataBufferFactory bufferFactory) throws IOException {
Path path = Paths.get(this.resource.getURI());
try (AsynchronousFileChannel out = AsynchronousFileChannel.open(this.tempFile, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING)) {
Flux<DataBuffer> result = DataBufferUtils.read(path, bufferFactory, 1024, StandardOpenOption.READ)
.transformDeferredContextual((f, ctx) -> {
assertThat(ctx.getOrDefault("key", "EMPTY")).isEqualTo("TEST");
return f;
})
.transform(f -> DataBufferUtils.write(f, out))
.transformDeferredContextual((f, ctx) -> {
assertThat(ctx.getOrDefault("key", "EMPTY")).isEqualTo("TEST");
return f;
})
.contextWrite(Context.of("key", "TEST"));
StepVerifier.create(result)
.consumeNextWith(DataBufferUtils::release)
.verifyComplete();
}
}
private static class ZeroDemandSubscriber extends BaseSubscriber<DataBuffer> { private static class ZeroDemandSubscriber extends BaseSubscriber<DataBuffer> {

View File

@ -29,6 +29,7 @@ import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber; import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink; import reactor.core.publisher.FluxSink;
import reactor.util.context.Context;
import org.springframework.core.codec.DecodingException; import org.springframework.core.codec.DecodingException;
import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBuffer;
@ -98,6 +99,11 @@ final class MultipartParser extends BaseSubscriber<DataBuffer> {
}); });
} }
@Override
public Context currentContext() {
return this.sink.currentContext();
}
@Override @Override
protected void hookOnSubscribe(Subscription subscription) { protected void hookOnSubscribe(Subscription subscription) {
requestBuffer(); requestBuffer();

View File

@ -42,6 +42,7 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink; import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Scheduler;
import reactor.util.context.Context;
import org.springframework.core.codec.DecodingException; import org.springframework.core.codec.DecodingException;
import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBuffer;
@ -113,6 +114,11 @@ final class PartGenerator extends BaseSubscriber<MultipartParser.Token> {
}); });
} }
@Override
public Context currentContext() {
return this.sink.currentContext();
}
@Override @Override
protected void hookOnSubscribe(Subscription subscription) { protected void hookOnSubscribe(Subscription subscription) {
requestToken(); requestToken();