Merge branch '5.3.x'

This commit is contained in:
Arjen Poutsma 2022-08-16 11:24:01 +02:00
commit ef178d24ec
2 changed files with 23 additions and 1 deletions

View File

@ -366,7 +366,8 @@ public abstract class DataBufferUtils {
sink.onDispose(() -> closeChannel(channel));
write(source, channel).subscribe(DataBufferUtils::release,
sink::error,
sink::success);
sink::success,
Context.of(sink.contextView()));
}
catch (IOException ex) {
sink.error(ex);

View File

@ -990,6 +990,27 @@ class DataBufferUtilsTests extends AbstractDataBufferAllocatingTests {
}
}
@ParameterizedDataBufferAllocatingTest
void propagateContextPath(DataBufferFactory bufferFactory) throws IOException {
Path path = Paths.get(this.resource.getURI());
Path out = Files.createTempFile("data-buffer-utils-tests", ".tmp");
Flux<Void> result = DataBufferUtils.read(path, bufferFactory, 1024, StandardOpenOption.READ)
.transformDeferredContextual((f, ctx) -> {
assertThat(ctx.getOrDefault("key", "EMPTY")).isEqualTo("TEST");
return f;
})
.transform(f -> DataBufferUtils.write(f, out))
.transformDeferredContextual((f, ctx) -> {
assertThat(ctx.getOrDefault("key", "EMPTY")).isEqualTo("TEST");
return f;
})
.contextWrite(Context.of("key", "TEST"));
StepVerifier.create(result)
.verifyComplete();
}
private static class ZeroDemandSubscriber extends BaseSubscriber<DataBuffer> {
@Override