Merge branch '5.3.x' into main

This commit is contained in:
Rossen Stoyanchev 2021-10-05 14:52:45 +01:00
commit c1856d24e2
4 changed files with 35 additions and 32 deletions

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2019 the original author or authors. * Copyright 2002-2021 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -68,8 +68,8 @@ public abstract class TransactionContextManager {
* Create a {@link TransactionContext} and register it in the subscriber {@link Context}. * Create a {@link TransactionContext} and register it in the subscriber {@link Context}.
* @return functional context registration. * @return functional context registration.
* @throws IllegalStateException if a transaction context is already associated. * @throws IllegalStateException if a transaction context is already associated.
* @see Mono#subscriberContext(Function) * @see Mono#contextWrite(Function)
* @see Flux#subscriberContext(Function) * @see Flux#contextWrite(Function)
*/ */
public static Function<Context, Context> createTransactionContext() { public static Function<Context, Context> createTransactionContext() {
return context -> context.put(TransactionContext.class, new TransactionContext()); return context -> context.put(TransactionContext.class, new TransactionContext());

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2019 the original author or authors. * Copyright 2002-2021 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -350,12 +350,14 @@ public abstract class AbstractReactiveTransactionAspectTests {
} }
private void checkReactiveTransaction(boolean expected) { private void checkReactiveTransaction(boolean expected) {
Mono.subscriberContext().handle((context, sink) -> { Mono.deferContextual(Mono::just)
.handle((context, sink) -> {
if (context.hasKey(TransactionContext.class) != expected) { if (context.hasKey(TransactionContext.class) != expected) {
fail("Should have thrown NoTransactionException"); fail("Should have thrown NoTransactionException");
} }
sink.complete(); sink.complete();
}).block(); })
.block();
} }

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2019 the original author or authors. * Copyright 2002-2021 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -41,19 +41,19 @@ public class ReactiveTransactionSupportTests {
ReactiveTransactionManager tm = new ReactiveTestTransactionManager(false, true); ReactiveTransactionManager tm = new ReactiveTestTransactionManager(false, true);
tm.getReactiveTransaction(new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_SUPPORTS)) tm.getReactiveTransaction(new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_SUPPORTS))
.subscriberContext(TransactionContextManager.createTransactionContext()).cast(GenericReactiveTransaction.class) .contextWrite(TransactionContextManager.createTransactionContext()).cast(GenericReactiveTransaction.class)
.as(StepVerifier::create).consumeNextWith(actual -> assertThat(actual.hasTransaction()).isFalse() .as(StepVerifier::create).consumeNextWith(actual -> assertThat(actual.hasTransaction()).isFalse()
).verifyComplete(); ).verifyComplete();
tm.getReactiveTransaction(new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_REQUIRED)) tm.getReactiveTransaction(new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_REQUIRED))
.cast(GenericReactiveTransaction.class).subscriberContext(TransactionContextManager.createTransactionContext()) .cast(GenericReactiveTransaction.class).contextWrite(TransactionContextManager.createTransactionContext())
.as(StepVerifier::create).consumeNextWith(actual -> { .as(StepVerifier::create).consumeNextWith(actual -> {
assertThat(actual.hasTransaction()).isTrue(); assertThat(actual.hasTransaction()).isTrue();
assertThat(actual.isNewTransaction()).isTrue(); assertThat(actual.isNewTransaction()).isTrue();
}).verifyComplete(); }).verifyComplete();
tm.getReactiveTransaction(new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_MANDATORY)) tm.getReactiveTransaction(new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_MANDATORY))
.subscriberContext(TransactionContextManager.createTransactionContext()).cast(GenericReactiveTransaction.class) .contextWrite(TransactionContextManager.createTransactionContext()).cast(GenericReactiveTransaction.class)
.as(StepVerifier::create).expectError(IllegalTransactionStateException.class).verify(); .as(StepVerifier::create).expectError(IllegalTransactionStateException.class).verify();
} }
@ -62,21 +62,21 @@ public class ReactiveTransactionSupportTests {
ReactiveTransactionManager tm = new ReactiveTestTransactionManager(true, true); ReactiveTransactionManager tm = new ReactiveTestTransactionManager(true, true);
tm.getReactiveTransaction(new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_SUPPORTS)) tm.getReactiveTransaction(new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_SUPPORTS))
.subscriberContext(TransactionContextManager.createTransactionContext()).cast(GenericReactiveTransaction.class) .contextWrite(TransactionContextManager.createTransactionContext()).cast(GenericReactiveTransaction.class)
.as(StepVerifier::create).consumeNextWith(actual -> { .as(StepVerifier::create).consumeNextWith(actual -> {
assertThat(actual.getTransaction()).isNotNull(); assertThat(actual.getTransaction()).isNotNull();
assertThat(actual.isNewTransaction()).isFalse(); assertThat(actual.isNewTransaction()).isFalse();
}).verifyComplete(); }).verifyComplete();
tm.getReactiveTransaction(new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_REQUIRED)) tm.getReactiveTransaction(new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_REQUIRED))
.subscriberContext(TransactionContextManager.createTransactionContext()).cast(GenericReactiveTransaction.class) .contextWrite(TransactionContextManager.createTransactionContext()).cast(GenericReactiveTransaction.class)
.as(StepVerifier::create).consumeNextWith(actual -> { .as(StepVerifier::create).consumeNextWith(actual -> {
assertThat(actual.getTransaction()).isNotNull(); assertThat(actual.getTransaction()).isNotNull();
assertThat(actual.isNewTransaction()).isFalse(); assertThat(actual.isNewTransaction()).isFalse();
}).verifyComplete(); }).verifyComplete();
tm.getReactiveTransaction(new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_MANDATORY)) tm.getReactiveTransaction(new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_MANDATORY))
.subscriberContext(TransactionContextManager.createTransactionContext()).cast(GenericReactiveTransaction.class) .contextWrite(TransactionContextManager.createTransactionContext()).cast(GenericReactiveTransaction.class)
.as(StepVerifier::create).consumeNextWith(actual -> { .as(StepVerifier::create).consumeNextWith(actual -> {
assertThat(actual.getTransaction()).isNotNull(); assertThat(actual.getTransaction()).isNotNull();
assertThat(actual.isNewTransaction()).isFalse(); assertThat(actual.isNewTransaction()).isFalse();
@ -87,7 +87,7 @@ public class ReactiveTransactionSupportTests {
public void commitWithoutExistingTransaction() { public void commitWithoutExistingTransaction() {
ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(false, true); ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(false, true);
tm.getReactiveTransaction(new DefaultTransactionDefinition()).flatMap(tm::commit) tm.getReactiveTransaction(new DefaultTransactionDefinition()).flatMap(tm::commit)
.subscriberContext(TransactionContextManager.createTransactionContext()) .contextWrite(TransactionContextManager.createTransactionContext())
.as(StepVerifier::create).verifyComplete(); .as(StepVerifier::create).verifyComplete();
assertHasBegan(tm); assertHasBegan(tm);
@ -101,7 +101,7 @@ public class ReactiveTransactionSupportTests {
public void rollbackWithoutExistingTransaction() { public void rollbackWithoutExistingTransaction() {
ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(false, true); ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(false, true);
tm.getReactiveTransaction(new DefaultTransactionDefinition()).flatMap(tm::rollback) tm.getReactiveTransaction(new DefaultTransactionDefinition()).flatMap(tm::rollback)
.subscriberContext(TransactionContextManager.createTransactionContext()).as(StepVerifier::create) .contextWrite(TransactionContextManager.createTransactionContext()).as(StepVerifier::create)
.verifyComplete(); .verifyComplete();
assertHasBegan(tm); assertHasBegan(tm);
@ -116,7 +116,7 @@ public class ReactiveTransactionSupportTests {
ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(false, true); ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(false, true);
tm.getReactiveTransaction(new DefaultTransactionDefinition()).doOnNext(ReactiveTransaction::setRollbackOnly) tm.getReactiveTransaction(new DefaultTransactionDefinition()).doOnNext(ReactiveTransaction::setRollbackOnly)
.flatMap(tm::commit) .flatMap(tm::commit)
.subscriberContext(TransactionContextManager.createTransactionContext()).as(StepVerifier::create) .contextWrite(TransactionContextManager.createTransactionContext()).as(StepVerifier::create)
.verifyComplete(); .verifyComplete();
assertHasBegan(tm); assertHasBegan(tm);
@ -130,7 +130,7 @@ public class ReactiveTransactionSupportTests {
public void commitWithExistingTransaction() { public void commitWithExistingTransaction() {
ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(true, true); ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(true, true);
tm.getReactiveTransaction(new DefaultTransactionDefinition()).flatMap(tm::commit) tm.getReactiveTransaction(new DefaultTransactionDefinition()).flatMap(tm::commit)
.subscriberContext(TransactionContextManager.createTransactionContext()) .contextWrite(TransactionContextManager.createTransactionContext())
.as(StepVerifier::create).verifyComplete(); .as(StepVerifier::create).verifyComplete();
assertHasNotBegan(tm); assertHasNotBegan(tm);
@ -144,7 +144,7 @@ public class ReactiveTransactionSupportTests {
public void rollbackWithExistingTransaction() { public void rollbackWithExistingTransaction() {
ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(true, true); ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(true, true);
tm.getReactiveTransaction(new DefaultTransactionDefinition()).flatMap(tm::rollback) tm.getReactiveTransaction(new DefaultTransactionDefinition()).flatMap(tm::rollback)
.subscriberContext(TransactionContextManager.createTransactionContext()).as(StepVerifier::create) .contextWrite(TransactionContextManager.createTransactionContext()).as(StepVerifier::create)
.verifyComplete(); .verifyComplete();
assertHasNotBegan(tm); assertHasNotBegan(tm);
@ -158,7 +158,7 @@ public class ReactiveTransactionSupportTests {
public void rollbackOnlyWithExistingTransaction() { public void rollbackOnlyWithExistingTransaction() {
ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(true, true); ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(true, true);
tm.getReactiveTransaction(new DefaultTransactionDefinition()).doOnNext(ReactiveTransaction::setRollbackOnly).flatMap(tm::commit) tm.getReactiveTransaction(new DefaultTransactionDefinition()).doOnNext(ReactiveTransaction::setRollbackOnly).flatMap(tm::commit)
.subscriberContext(TransactionContextManager.createTransactionContext()).as(StepVerifier::create) .contextWrite(TransactionContextManager.createTransactionContext()).as(StepVerifier::create)
.verifyComplete(); .verifyComplete();
assertHasNotBegan(tm); assertHasNotBegan(tm);

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2020 the original author or authors. * Copyright 2002-2021 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -28,7 +28,7 @@ import org.springframework.transaction.*
import org.springframework.transaction.reactive.TransactionContext import org.springframework.transaction.reactive.TransactionContext
import reactor.core.publisher.Mono import reactor.core.publisher.Mono
import reactor.core.publisher.SynchronousSink import reactor.core.publisher.SynchronousSink
import reactor.util.context.Context import reactor.util.context.ContextView
import java.lang.reflect.Method import java.lang.reflect.Method
import kotlin.coroutines.Continuation import kotlin.coroutines.Continuation
@ -299,14 +299,15 @@ abstract class AbstractCoroutinesTransactionAspectTests {
} }
} }
@Suppress("DEPRECATION")
private fun checkReactiveTransaction(expected: Boolean) { private fun checkReactiveTransaction(expected: Boolean) {
Mono.subscriberContext().handle { context: Context, sink: SynchronousSink<Any?> -> Mono.deferContextual{context -> Mono.just(context)}
.handle { context: ContextView, sink: SynchronousSink<Any?> ->
if (context.hasKey(TransactionContext::class.java) != expected) { if (context.hasKey(TransactionContext::class.java) != expected) {
Fail.fail<Any>("Should have thrown NoTransactionException") Fail.fail<Any>("Should have thrown NoTransactionException")
} }
sink.complete() sink.complete()
}.block() }
.block()
} }
protected open fun advised(target: Any, rtm: ReactiveTransactionManager, tas: Array<TransactionAttributeSource>): Any { protected open fun advised(target: Any, rtm: ReactiveTransactionManager, tas: Array<TransactionAttributeSource>): Any {