diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java index ffa4115674..03811089b8 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java @@ -366,7 +366,8 @@ public abstract class DataBufferUtils { sink.onDispose(() -> closeChannel(channel)); write(source, channel).subscribe(DataBufferUtils::release, sink::error, - sink::success); + sink::success, + Context.of(sink.contextView())); } catch (IOException ex) { sink.error(ex); diff --git a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java index 94bfb409a9..74516632d7 100644 --- a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java +++ b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java @@ -990,6 +990,27 @@ class DataBufferUtilsTests extends AbstractDataBufferAllocatingTests { } } + @ParameterizedDataBufferAllocatingTest + void propagateContextPath(DataBufferFactory bufferFactory) throws IOException { + Path path = Paths.get(this.resource.getURI()); + Path out = Files.createTempFile("data-buffer-utils-tests", ".tmp"); + + Flux result = DataBufferUtils.read(path, bufferFactory, 1024, StandardOpenOption.READ) + .transformDeferredContextual((f, ctx) -> { + assertThat(ctx.getOrDefault("key", "EMPTY")).isEqualTo("TEST"); + return f; + }) + .transform(f -> DataBufferUtils.write(f, out)) + .transformDeferredContextual((f, ctx) -> { + assertThat(ctx.getOrDefault("key", "EMPTY")).isEqualTo("TEST"); + return f; + }) + .contextWrite(Context.of("key", "TEST")); + + StepVerifier.create(result) + .verifyComplete(); + } + private static class ZeroDemandSubscriber extends BaseSubscriber { @Override