From 8dabb3e6269ab18bb2256266a70d4c285bbb4ff5 Mon Sep 17 00:00:00 2001 From: Juergen Hoeller Date: Thu, 2 May 2019 16:23:37 +0200 Subject: [PATCH] Shorter class names for common reactive transaction API types Introduces TransactionExecution base interface for TransactionStatus as well as ReactiveTransaction. Renames getTransaction method to getReactiveTransaction, allowing for combined implementations of PlatformTransactionManager and ReactiveTransactionManager. See gh-22646 --- .../transaction/ReactiveTransaction.java | 36 +++ .../ReactiveTransactionManager.java | 14 +- ...nStatus.java => TransactionExecution.java} | 22 +- .../transaction/TransactionStatus.java | 38 +-- .../AbstractReactiveTransactionManager.java | 236 +++++++++--------- .../AbstractReactiveTransactionStatus.java | 76 ------ ...s.java => GenericReactiveTransaction.java} | 39 ++- ...a => ReactiveResourceSynchronization.java} | 57 ++--- ...Callback.java => TransactionCallback.java} | 7 +- .../reactive/TransactionContext.java | 6 +- .../reactive/TransactionContextManager.java | 4 +- ...n.java => TransactionSynchronization.java} | 11 +- ...=> TransactionSynchronizationManager.java} | 38 +-- ...a => TransactionSynchronizationUtils.java} | 56 ++--- .../reactive/TransactionalOperator.java | 4 +- ...or.java => TransactionalOperatorImpl.java} | 18 +- .../support/AbstractTransactionStatus.java | 33 +-- .../support/DefaultTransactionStatus.java | 26 +- .../ReactiveTestTransactionManager.java | 10 +- ...a => ReactiveTransactionSupportTests.java} | 40 +-- 20 files changed, 359 insertions(+), 412 deletions(-) create mode 100644 spring-tx/src/main/java/org/springframework/transaction/ReactiveTransaction.java rename spring-tx/src/main/java/org/springframework/transaction/{ReactiveTransactionStatus.java => TransactionExecution.java} (61%) delete mode 100644 spring-tx/src/main/java/org/springframework/transaction/reactive/AbstractReactiveTransactionStatus.java rename spring-tx/src/main/java/org/springframework/transaction/reactive/{DefaultReactiveTransactionStatus.java => GenericReactiveTransaction.java} (82%) rename spring-tx/src/main/java/org/springframework/transaction/reactive/{ReactiveResourceHolderSynchronization.java => ReactiveResourceSynchronization.java} (71%) rename spring-tx/src/main/java/org/springframework/transaction/reactive/{ReactiveTransactionCallback.java => TransactionCallback.java} (90%) rename spring-tx/src/main/java/org/springframework/transaction/reactive/{ReactiveTransactionSynchronization.java => TransactionSynchronization.java} (94%) rename spring-tx/src/main/java/org/springframework/transaction/reactive/{ReactiveTransactionSynchronizationManager.java => TransactionSynchronizationManager.java} (90%) rename spring-tx/src/main/java/org/springframework/transaction/reactive/{ReactiveTransactionSynchronizationUtils.java => TransactionSynchronizationUtils.java} (59%) rename spring-tx/src/main/java/org/springframework/transaction/reactive/{DefaultTransactionalOperator.java => TransactionalOperatorImpl.java} (82%) rename spring-tx/src/test/java/org/springframework/transaction/reactive/{ReactiveTransactionSupportUnitTests.java => ReactiveTransactionSupportTests.java} (82%) diff --git a/spring-tx/src/main/java/org/springframework/transaction/ReactiveTransaction.java b/spring-tx/src/main/java/org/springframework/transaction/ReactiveTransaction.java new file mode 100644 index 00000000000..11a33e681b5 --- /dev/null +++ b/spring-tx/src/main/java/org/springframework/transaction/ReactiveTransaction.java @@ -0,0 +1,36 @@ +/* + * Copyright 2002-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.transaction; + +/** + * Representation of an ongoing reactive transaction. + * This is currently a marker interface extending {@link TransactionExecution} + * but may acquire further methods in a future revision. + * + *

Transactional code can use this to retrieve status information, + * and to programmatically request a rollback (instead of throwing + * an exception that causes an implicit rollback). + * + * @author Mark Paluch + * @author Juergen Hoeller + * @since 5.2 + * @see #setRollbackOnly() + * @see ReactiveTransactionManager#getReactiveTransaction + */ +public interface ReactiveTransaction extends TransactionExecution { + +} diff --git a/spring-tx/src/main/java/org/springframework/transaction/ReactiveTransactionManager.java b/spring-tx/src/main/java/org/springframework/transaction/ReactiveTransactionManager.java index e30a3628314..553e4e329ff 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/ReactiveTransactionManager.java +++ b/spring-tx/src/main/java/org/springframework/transaction/ReactiveTransactionManager.java @@ -32,7 +32,7 @@ import reactor.core.publisher.Mono; public interface ReactiveTransactionManager { /** - * Emit a currently active transaction or create a new one, according to + * Emit a currently active reactive transaction or create a new one, according to * the specified propagation behavior. *

Note that parameters like isolation level or timeout will only be applied * to new transactions, and thus be ignored when participating in active ones. @@ -54,7 +54,7 @@ public interface ReactiveTransactionManager { * @see TransactionDefinition#getTimeout * @see TransactionDefinition#isReadOnly */ - Mono getTransaction(TransactionDefinition definition) throws TransactionException; + Mono getReactiveTransaction(TransactionDefinition definition) throws TransactionException; /** * Commit the given transaction, with regard to its status. If the transaction @@ -72,7 +72,7 @@ public interface ReactiveTransactionManager { * database right before commit, with the resulting DataAccessException * causing the transaction to fail. The original exception will be * propagated to the caller of this commit method in such a case. - * @param status object returned by the {@code getTransaction} method + * @param transaction object returned by the {@code getTransaction} method * @throws UnexpectedRollbackException in case of an unexpected rollback * that the transaction coordinator initiated * @throws HeuristicCompletionException in case of a transaction failure @@ -81,9 +81,9 @@ public interface ReactiveTransactionManager { * (typically caused by fundamental resource failures) * @throws IllegalTransactionStateException if the given transaction * is already completed (that is, committed or rolled back) - * @see ReactiveTransactionStatus#setRollbackOnly + * @see ReactiveTransaction#setRollbackOnly */ - Mono commit(ReactiveTransactionStatus status) throws TransactionException; + Mono commit(ReactiveTransaction transaction) throws TransactionException; /** * Perform a rollback of the given transaction. @@ -95,12 +95,12 @@ public interface ReactiveTransactionManager { * The transaction will already have been completed and cleaned up when commit * returns, even in case of a commit exception. Consequently, a rollback call * after commit failure will lead to an IllegalTransactionStateException. - * @param status object returned by the {@code getTransaction} method + * @param transaction object returned by the {@code getTransaction} method * @throws TransactionSystemException in case of rollback or system errors * (typically caused by fundamental resource failures) * @throws IllegalTransactionStateException if the given transaction * is already completed (that is, committed or rolled back) */ - Mono rollback(ReactiveTransactionStatus status) throws TransactionException; + Mono rollback(ReactiveTransaction transaction) throws TransactionException; } diff --git a/spring-tx/src/main/java/org/springframework/transaction/ReactiveTransactionStatus.java b/spring-tx/src/main/java/org/springframework/transaction/TransactionExecution.java similarity index 61% rename from spring-tx/src/main/java/org/springframework/transaction/ReactiveTransactionStatus.java rename to spring-tx/src/main/java/org/springframework/transaction/TransactionExecution.java index 01d5f16ab06..d5d0706187e 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/ReactiveTransactionStatus.java +++ b/spring-tx/src/main/java/org/springframework/transaction/TransactionExecution.java @@ -17,20 +17,14 @@ package org.springframework.transaction; /** - * Representation of the status of a transaction exposing a reactive - * interface. + * Common representation of the current state of a transaction. + * Serves as base interface for {@link TransactionStatus} as well as + * {@link ReactiveTransaction}. * - *

Transactional code can use this to retrieve status information, - * and to programmatically request a rollback (instead of throwing - * an exception that causes an implicit rollback). - * - * @author Mark Paluch * @author Juergen Hoeller * @since 5.2 - * @see #setRollbackOnly() - * @see ReactiveTransactionManager#getTransaction */ -public interface ReactiveTransactionStatus { +public interface TransactionExecution { /** * Return whether the present transaction is new; otherwise participating @@ -43,12 +37,6 @@ public interface ReactiveTransactionStatus { * Set the transaction rollback-only. This instructs the transaction manager * that the only possible outcome of the transaction may be a rollback, as * alternative to throwing an exception which would in turn trigger a rollback. - *

This is mainly intended for transactions managed by - * {@link org.springframework.transaction.reactive.TransactionalOperator} or - * {@link org.springframework.transaction.interceptor.TransactionInterceptor}, - * where the actual commit/rollback decision is made by the container. - * @see org.springframework.transaction.reactive.ReactiveTransactionCallback#doInTransaction - * @see org.springframework.transaction.interceptor.TransactionAttribute#rollbackOn */ void setRollbackOnly(); @@ -61,8 +49,6 @@ public interface ReactiveTransactionStatus { /** * Return whether this transaction is completed, that is, * whether it has already been committed or rolled back. - * @see ReactiveTransactionManager#commit - * @see ReactiveTransactionManager#rollback */ boolean isCompleted(); diff --git a/spring-tx/src/main/java/org/springframework/transaction/TransactionStatus.java b/spring-tx/src/main/java/org/springframework/transaction/TransactionStatus.java index 557cd772044..5968c57fd7b 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/TransactionStatus.java +++ b/spring-tx/src/main/java/org/springframework/transaction/TransactionStatus.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -36,14 +36,7 @@ import java.io.Flushable; * @see org.springframework.transaction.support.TransactionCallback#doInTransaction * @see org.springframework.transaction.interceptor.TransactionInterceptor#currentTransactionStatus() */ -public interface TransactionStatus extends SavepointManager, Flushable { - - /** - * Return whether the present transaction is new; otherwise participating - * in an existing transaction, or potentially not running in an actual - * transaction in the first place. - */ - boolean isNewTransaction(); +public interface TransactionStatus extends TransactionExecution, SavepointManager, Flushable { /** * Return whether this transaction internally carries a savepoint, @@ -58,25 +51,6 @@ public interface TransactionStatus extends SavepointManager, Flushable { */ boolean hasSavepoint(); - /** - * Set the transaction rollback-only. This instructs the transaction manager - * that the only possible outcome of the transaction may be a rollback, as - * alternative to throwing an exception which would in turn trigger a rollback. - *

This is mainly intended for transactions managed by - * {@link org.springframework.transaction.support.TransactionTemplate} or - * {@link org.springframework.transaction.interceptor.TransactionInterceptor}, - * where the actual commit/rollback decision is made by the container. - * @see org.springframework.transaction.support.TransactionCallback#doInTransaction - * @see org.springframework.transaction.interceptor.TransactionAttribute#rollbackOn - */ - void setRollbackOnly(); - - /** - * Return whether the transaction has been marked as rollback-only - * (either by the application or by the transaction infrastructure). - */ - boolean isRollbackOnly(); - /** * Flush the underlying session to the datastore, if applicable: * for example, all affected Hibernate/JPA sessions. @@ -88,12 +62,4 @@ public interface TransactionStatus extends SavepointManager, Flushable { @Override void flush(); - /** - * Return whether this transaction is completed, that is, - * whether it has already been committed or rolled back. - * @see PlatformTransactionManager#commit - * @see PlatformTransactionManager#rollback - */ - boolean isCompleted(); - } diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/AbstractReactiveTransactionManager.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/AbstractReactiveTransactionManager.java index 849c6bced5c..6652f44e675 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/reactive/AbstractReactiveTransactionManager.java +++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/AbstractReactiveTransactionManager.java @@ -32,8 +32,8 @@ import reactor.core.publisher.Mono; import org.springframework.lang.Nullable; import org.springframework.transaction.IllegalTransactionStateException; import org.springframework.transaction.InvalidTimeoutException; +import org.springframework.transaction.ReactiveTransaction; import org.springframework.transaction.ReactiveTransactionManager; -import org.springframework.transaction.ReactiveTransactionStatus; import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.TransactionException; import org.springframework.transaction.TransactionSuspensionNotSupportedException; @@ -74,7 +74,7 @@ import org.springframework.transaction.UnexpectedRollbackException; * @author Mark Paluch * @author Juergen Hoeller * @since 5.2 - * @see ReactiveTransactionSynchronizationManager + * @see TransactionSynchronizationManager */ @SuppressWarnings("serial") public abstract class AbstractReactiveTransactionManager implements ReactiveTransactionManager, Serializable { @@ -95,8 +95,8 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran * @see #doBegin */ @Override - public final Mono getTransaction(TransactionDefinition definition) throws TransactionException { - return ReactiveTransactionSynchronizationManager.currentTransaction() + public final Mono getReactiveTransaction(TransactionDefinition definition) throws TransactionException { + return TransactionSynchronizationManager.currentTransaction() .flatMap(synchronizationManager -> { Object transaction = doGetTransaction(synchronizationManager); @@ -124,7 +124,7 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { return TransactionContextManager.currentContext() - .map(ReactiveTransactionSynchronizationManager::new) + .map(TransactionSynchronizationManager::new) .flatMap(nestedSynchronizationManager -> suspend(nestedSynchronizationManager, null) .map(Optional::of) @@ -134,7 +134,7 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition); } return Mono.defer(() -> { - DefaultReactiveTransactionStatus status = newTransactionStatus( + GenericReactiveTransaction status = newReactiveTransaction( nestedSynchronizationManager, definition, transaction, true, debugEnabled, suspendedResources.orElse(null)); return doBegin(nestedSynchronizationManager, transaction, definition) @@ -151,15 +151,15 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran logger.warn("Custom isolation level specified but no actual transaction initiated; " + "isolation level will effectively be ignored: " + definition); } - return Mono.just(prepareTransactionStatus(synchronizationManager, definition, null, true, debugEnabled, null)); + return Mono.just(prepareReactiveTransaction(synchronizationManager, definition, null, true, debugEnabled, null)); } }); } /** - * Create a TransactionStatus for an existing transaction. + * Create a ReactiveTransaction for an existing transaction. */ - private Mono handleExistingTransaction(ReactiveTransactionSynchronizationManager synchronizationManager, + private Mono handleExistingTransaction(TransactionSynchronizationManager synchronizationManager, TransactionDefinition definition, Object transaction, boolean debugEnabled) throws TransactionException { if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) { @@ -172,11 +172,11 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran logger.debug("Suspending current transaction"); } Mono suspend = suspend(synchronizationManager, transaction); - return suspend.map(suspendedResources -> prepareTransactionStatus(synchronizationManager, + return suspend.map(suspendedResources -> prepareReactiveTransaction(synchronizationManager, definition, null, false, debugEnabled, suspendedResources)) // - .switchIfEmpty(Mono.fromSupplier(() -> prepareTransactionStatus(synchronizationManager, + .switchIfEmpty(Mono.fromSupplier(() -> prepareReactiveTransaction(synchronizationManager, definition, null, false, debugEnabled, null))) - .cast(ReactiveTransactionStatus.class); + .cast(ReactiveTransaction.class); } if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) { @@ -186,7 +186,7 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran } Mono suspendedResources = suspend(synchronizationManager, transaction); return suspendedResources.flatMap(suspendedResourcesHolder -> { - DefaultReactiveTransactionStatus status = newTransactionStatus(synchronizationManager, + GenericReactiveTransaction status = newReactiveTransaction(synchronizationManager, definition, transaction, true, debugEnabled, suspendedResources); return doBegin(synchronizationManager, transaction, definition).doOnSuccess(ignore -> prepareSynchronization(synchronizationManager, status, definition)).thenReturn(status) @@ -200,7 +200,7 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran logger.debug("Creating nested transaction with name [" + definition.getName() + "]"); } // Nested transaction through nested begin and commit/rollback calls. - DefaultReactiveTransactionStatus status = newTransactionStatus(synchronizationManager, + GenericReactiveTransaction status = newReactiveTransaction(synchronizationManager, definition, transaction, true, debugEnabled, null); return doBegin(synchronizationManager, transaction, definition).doOnSuccess(ignore -> prepareSynchronization(synchronizationManager, status, definition)).thenReturn(status); @@ -210,33 +210,33 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran if (debugEnabled) { logger.debug("Participating in existing transaction"); } - return Mono.just(prepareTransactionStatus(synchronizationManager, definition, transaction, false, debugEnabled, null)); + return Mono.just(prepareReactiveTransaction(synchronizationManager, definition, transaction, false, debugEnabled, null)); } /** - * Create a new TransactionStatus for the given arguments, + * Create a new ReactiveTransaction for the given arguments, * also initializing transaction synchronization as appropriate. - * @see #newTransactionStatus - * @see #prepareTransactionStatus + * @see #newReactiveTransaction + * @see #prepareReactiveTransaction */ - private DefaultReactiveTransactionStatus prepareTransactionStatus( - ReactiveTransactionSynchronizationManager synchronizationManager, TransactionDefinition definition, + private GenericReactiveTransaction prepareReactiveTransaction( + TransactionSynchronizationManager synchronizationManager, TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction, boolean debug, @Nullable Object suspendedResources) { - DefaultReactiveTransactionStatus status = newTransactionStatus(synchronizationManager, + GenericReactiveTransaction status = newReactiveTransaction(synchronizationManager, definition, transaction, newTransaction, debug, suspendedResources); prepareSynchronization(synchronizationManager, status, definition); return status; } /** - * Create a TransactionStatus instance for the given arguments. + * Create a ReactiveTransaction instance for the given arguments. */ - private DefaultReactiveTransactionStatus newTransactionStatus( - ReactiveTransactionSynchronizationManager synchronizationManager, TransactionDefinition definition, + private GenericReactiveTransaction newReactiveTransaction( + TransactionSynchronizationManager synchronizationManager, TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction, boolean debug, @Nullable Object suspendedResources) { - return new DefaultReactiveTransactionStatus(transaction, newTransaction, + return new GenericReactiveTransaction(transaction, newTransaction, !synchronizationManager.isSynchronizationActive(), definition.isReadOnly(), debug, suspendedResources); } @@ -244,8 +244,8 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran /** * Initialize transaction synchronization as appropriate. */ - private void prepareSynchronization(ReactiveTransactionSynchronizationManager synchronizationManager, - DefaultReactiveTransactionStatus status, TransactionDefinition definition) { + private void prepareSynchronization(TransactionSynchronizationManager synchronizationManager, + GenericReactiveTransaction status, TransactionDefinition definition) { if (status.isNewSynchronization()) { synchronizationManager.setActualTransactionActive(status.hasTransaction()); @@ -270,11 +270,11 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran * @see #doSuspend * @see #resume */ - private Mono suspend(ReactiveTransactionSynchronizationManager synchronizationManager, + private Mono suspend(TransactionSynchronizationManager synchronizationManager, @Nullable Object transaction) throws TransactionException { if (synchronizationManager.isSynchronizationActive()) { - Mono> suspendedSynchronizations = doSuspendSynchronization(synchronizationManager); + Mono> suspendedSynchronizations = doSuspendSynchronization(synchronizationManager); return suspendedSynchronizations.flatMap(synchronizations -> { Mono> suspendedResources = (transaction != null ? doSuspend(synchronizationManager, transaction).map(Optional::of).defaultIfEmpty(Optional.empty()) : Mono.just(Optional.empty())); return suspendedResources.map(it -> { @@ -313,7 +313,7 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran * @see #doResume * @see #suspend */ - private Mono resume(ReactiveTransactionSynchronizationManager synchronizationManager, + private Mono resume(TransactionSynchronizationManager synchronizationManager, @Nullable Object transaction, @Nullable SuspendedResourcesHolder resourcesHolder) throws TransactionException { @@ -322,7 +322,7 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran if (suspendedResources != null) { return doResume(synchronizationManager, transaction, suspendedResources); } - List suspendedSynchronizations = resourcesHolder.suspendedSynchronizations; + List suspendedSynchronizations = resourcesHolder.suspendedSynchronizations; if (suspendedSynchronizations != null) { synchronizationManager.setActualTransactionActive(resourcesHolder.wasActive); synchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel); @@ -338,7 +338,7 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran /** * Resume outer transaction after inner transaction begin failed. */ - private Mono resumeAfterBeginException(ReactiveTransactionSynchronizationManager synchronizationManager, + private Mono resumeAfterBeginException(TransactionSynchronizationManager synchronizationManager, Object transaction, @Nullable SuspendedResourcesHolder suspendedResources, Throwable beginEx) { String exMessage = "Inner transaction begin exception overridden by outer transaction resume exception"; @@ -350,14 +350,14 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran * Suspend all current synchronizations and deactivate transaction * synchronization for the current transaction context. * @param synchronizationManager the synchronization manager bound to the current transaction - * @return the List of suspended ReactiveTransactionSynchronization objects + * @return the List of suspended TransactionSynchronization objects */ - private Mono> doSuspendSynchronization( - ReactiveTransactionSynchronizationManager synchronizationManager) { + private Mono> doSuspendSynchronization( + TransactionSynchronizationManager synchronizationManager) { - List suspendedSynchronizations = synchronizationManager.getSynchronizations(); + List suspendedSynchronizations = synchronizationManager.getSynchronizations(); return Flux.fromIterable(suspendedSynchronizations) - .concatMap(ReactiveTransactionSynchronization::suspend) + .concatMap(TransactionSynchronization::suspend) .then(Mono.defer(() -> { synchronizationManager.clearSynchronization(); return Mono.just(suspendedSynchronizations); @@ -368,10 +368,10 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran * Reactivate transaction synchronization for the current transaction context * and resume all given synchronizations. * @param synchronizationManager the synchronization manager bound to the current transaction - * @param suspendedSynchronizations a List of ReactiveTransactionSynchronization objects + * @param suspendedSynchronizations a List of TransactionSynchronization objects */ - private Mono doResumeSynchronization(ReactiveTransactionSynchronizationManager synchronizationManager, - List suspendedSynchronizations) { + private Mono doResumeSynchronization(TransactionSynchronizationManager synchronizationManager, + List suspendedSynchronizations) { synchronizationManager.initSynchronization(); return Flux.fromIterable(suspendedSynchronizations) @@ -385,26 +385,26 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran * transactions and programmatic rollback requests. * Delegates to {@code isRollbackOnly}, {@code doCommit} * and {@code rollback}. - * @see ReactiveTransactionStatus#isRollbackOnly() + * @see ReactiveTransaction#isRollbackOnly() * @see #doCommit * @see #rollback */ @Override - public final Mono commit(ReactiveTransactionStatus status) throws TransactionException { - if (status.isCompleted()) { + public final Mono commit(ReactiveTransaction transaction) throws TransactionException { + if (transaction.isCompleted()) { return Mono.error(new IllegalTransactionStateException( "Transaction is already completed - do not call commit or rollback more than once per transaction")); } - return ReactiveTransactionSynchronizationManager.currentTransaction().flatMap(synchronizationManager -> { - DefaultReactiveTransactionStatus defStatus = (DefaultReactiveTransactionStatus) status; - if (defStatus.isRollbackOnly()) { - if (defStatus.isDebug()) { + return TransactionSynchronizationManager.currentTransaction().flatMap(synchronizationManager -> { + GenericReactiveTransaction reactiveTx = (GenericReactiveTransaction) transaction; + if (reactiveTx.isRollbackOnly()) { + if (reactiveTx.isDebug()) { logger.debug("Transactional code has requested rollback"); } - return processRollback(synchronizationManager, defStatus); + return processRollback(synchronizationManager, reactiveTx); } - return processCommit(synchronizationManager, defStatus); + return processCommit(synchronizationManager, reactiveTx); }); } @@ -415,8 +415,8 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran * @param status object representing the transaction * @throws TransactionException in case of commit failure */ - private Mono processCommit(ReactiveTransactionSynchronizationManager synchronizationManager, - DefaultReactiveTransactionStatus status) throws TransactionException { + private Mono processCommit(TransactionSynchronizationManager synchronizationManager, + GenericReactiveTransaction status) throws TransactionException { AtomicBoolean beforeCompletionInvoked = new AtomicBoolean(false); @@ -435,10 +435,10 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran })).then(Mono.empty().onErrorResume(ex -> { Mono propagateException = Mono.error(ex); if (ErrorPredicates.UNEXPECTED_ROLLBACK.test(ex)) { - return triggerAfterCompletion(synchronizationManager, status, ReactiveTransactionSynchronization.STATUS_ROLLED_BACK).then(propagateException); + return triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_ROLLED_BACK).then(propagateException); } if (ErrorPredicates.TRANSACTION_EXCEPTION.test(ex)) { - triggerAfterCompletion(synchronizationManager, status, ReactiveTransactionSynchronization.STATUS_UNKNOWN).then(propagateException); + triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_UNKNOWN).then(propagateException); } if (ErrorPredicates.RUNTIME_OR_ERROR.test(ex)) { Mono mono; @@ -453,8 +453,8 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran return propagateException; })).then(Mono.defer(() -> triggerAfterCommit(synchronizationManager, status).onErrorResume(ex -> - triggerAfterCompletion(synchronizationManager, status, ReactiveTransactionSynchronization.STATUS_COMMITTED).then(Mono.error(ex))) - .then(triggerAfterCompletion(synchronizationManager, status, ReactiveTransactionSynchronization.STATUS_COMMITTED)))); + triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_COMMITTED).then(Mono.error(ex))) + .then(triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_COMMITTED)))); return commit .onErrorResume(ex -> cleanupAfterCompletion(synchronizationManager, status) @@ -468,14 +468,14 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran * @see #doSetRollbackOnly */ @Override - public final Mono rollback(ReactiveTransactionStatus status) throws TransactionException { - if (status.isCompleted()) { + public final Mono rollback(ReactiveTransaction transaction) throws TransactionException { + if (transaction.isCompleted()) { return Mono.error(new IllegalTransactionStateException( "Transaction is already completed - do not call commit or rollback more than once per transaction")); } - return ReactiveTransactionSynchronizationManager.currentTransaction().flatMap(synchronizationManager -> { - DefaultReactiveTransactionStatus defStatus = (DefaultReactiveTransactionStatus) status; - return processRollback(synchronizationManager, defStatus); + return TransactionSynchronizationManager.currentTransaction().flatMap(synchronizationManager -> { + GenericReactiveTransaction reactiveTx = (GenericReactiveTransaction) transaction; + return processRollback(synchronizationManager, reactiveTx); }); } @@ -486,8 +486,8 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran * @param status object representing the transaction * @throws TransactionException in case of rollback failure */ - private Mono processRollback(ReactiveTransactionSynchronizationManager synchronizationManager, - DefaultReactiveTransactionStatus status) { + private Mono processRollback(TransactionSynchronizationManager synchronizationManager, + GenericReactiveTransaction status) { return triggerBeforeCompletion(synchronizationManager, status).then(Mono.defer(() -> { if (status.isNewTransaction()) { @@ -511,9 +511,9 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran return beforeCompletion; } })).onErrorResume(ErrorPredicates.RUNTIME_OR_ERROR, ex -> triggerAfterCompletion( - synchronizationManager, status, ReactiveTransactionSynchronization.STATUS_UNKNOWN) + synchronizationManager, status, TransactionSynchronization.STATUS_UNKNOWN) .then(Mono.error(ex))) - .then(Mono.defer(() -> triggerAfterCompletion(synchronizationManager, status, ReactiveTransactionSynchronization.STATUS_ROLLED_BACK))) + .then(Mono.defer(() -> triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_ROLLED_BACK))) .onErrorResume(ex -> cleanupAfterCompletion(synchronizationManager, status).then(Mono.error(ex))) .then(cleanupAfterCompletion(synchronizationManager, status)); } @@ -526,8 +526,8 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran * @throws TransactionException in case of rollback failure * @see #doRollback */ - private Mono doRollbackOnCommitException(ReactiveTransactionSynchronizationManager synchronizationManager, - DefaultReactiveTransactionStatus status, Throwable ex) throws TransactionException { + private Mono doRollbackOnCommitException(TransactionSynchronizationManager synchronizationManager, + GenericReactiveTransaction status, Throwable ex) throws TransactionException { return Mono.defer(() -> { if (status.isNewTransaction()) { @@ -545,9 +545,9 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran return Mono.empty(); }).onErrorResume(ErrorPredicates.RUNTIME_OR_ERROR, rbex -> { logger.error("Commit exception overridden by rollback exception", ex); - return triggerAfterCompletion(synchronizationManager, status, ReactiveTransactionSynchronization.STATUS_UNKNOWN) + return triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_UNKNOWN) .then(Mono.error(rbex)); - }).then(triggerAfterCompletion(synchronizationManager, status, ReactiveTransactionSynchronization.STATUS_ROLLED_BACK)); + }).then(triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_ROLLED_BACK)); } @@ -556,14 +556,14 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran * @param synchronizationManager the synchronization manager bound to the current transaction * @param status object representing the transaction */ - private Mono triggerBeforeCommit(ReactiveTransactionSynchronizationManager synchronizationManager, - DefaultReactiveTransactionStatus status) { + private Mono triggerBeforeCommit(TransactionSynchronizationManager synchronizationManager, + GenericReactiveTransaction status) { if (status.isNewSynchronization()) { if (status.isDebug()) { logger.trace("Triggering beforeCommit synchronization"); } - return ReactiveTransactionSynchronizationUtils.triggerBeforeCommit(synchronizationManager.getSynchronizations(), status.isReadOnly()); + return TransactionSynchronizationUtils.triggerBeforeCommit(synchronizationManager.getSynchronizations(), status.isReadOnly()); } return Mono.empty(); @@ -574,14 +574,14 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran * @param synchronizationManager the synchronization manager bound to the current transaction * @param status object representing the transaction */ - private Mono triggerBeforeCompletion(ReactiveTransactionSynchronizationManager synchronizationManager, - DefaultReactiveTransactionStatus status) { + private Mono triggerBeforeCompletion(TransactionSynchronizationManager synchronizationManager, + GenericReactiveTransaction status) { if (status.isNewSynchronization()) { if (status.isDebug()) { logger.trace("Triggering beforeCompletion synchronization"); } - return ReactiveTransactionSynchronizationUtils.triggerBeforeCompletion(synchronizationManager.getSynchronizations()); + return TransactionSynchronizationUtils.triggerBeforeCompletion(synchronizationManager.getSynchronizations()); } return Mono.empty(); @@ -592,14 +592,14 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran * @param synchronizationManager the synchronization manager bound to the current transaction * @param status object representing the transaction */ - private Mono triggerAfterCommit(ReactiveTransactionSynchronizationManager synchronizationManager, - DefaultReactiveTransactionStatus status) { + private Mono triggerAfterCommit(TransactionSynchronizationManager synchronizationManager, + GenericReactiveTransaction status) { if (status.isNewSynchronization()) { if (status.isDebug()) { logger.trace("Triggering afterCommit synchronization"); } - return ReactiveTransactionSynchronizationUtils.invokeAfterCommit(synchronizationManager.getSynchronizations()); + return TransactionSynchronizationUtils.invokeAfterCommit(synchronizationManager.getSynchronizations()); } return Mono.empty(); @@ -609,13 +609,13 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran * Trigger {@code afterCompletion} callbacks. * @param synchronizationManager the synchronization manager bound to the current transaction * @param status object representing the transaction - * @param completionStatus completion status according to ReactiveTransactionSynchronization constants + * @param completionStatus completion status according to TransactionSynchronization constants */ - private Mono triggerAfterCompletion(ReactiveTransactionSynchronizationManager synchronizationManager, - DefaultReactiveTransactionStatus status, int completionStatus) { + private Mono triggerAfterCompletion(TransactionSynchronizationManager synchronizationManager, + GenericReactiveTransaction status, int completionStatus) { if (status.isNewSynchronization()) { - List synchronizations = synchronizationManager.getSynchronizations(); + List synchronizations = synchronizationManager.getSynchronizations(); synchronizationManager.clearSynchronization(); if (!status.hasTransaction() || status.isNewTransaction()) { if (status.isDebug()) { @@ -638,22 +638,22 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran /** * Actually invoke the {@code afterCompletion} methods of the - * given Spring ReactiveTransactionSynchronization objects. + * given TransactionSynchronization objects. *

To be called by this abstract manager itself, or by special implementations * of the {@code registerAfterCompletionWithExistingTransaction} callback. * @param synchronizationManager the synchronization manager bound to the current transaction - * @param synchronizations a List of ReactiveTransactionSynchronization objects + * @param synchronizations a List of TransactionSynchronization objects * @param completionStatus the completion status according to the - * constants in the ReactiveTransactionSynchronization interface - * @see #registerAfterCompletionWithExistingTransaction(ReactiveTransactionSynchronizationManager, Object, List) - * @see ReactiveTransactionSynchronization#STATUS_COMMITTED - * @see ReactiveTransactionSynchronization#STATUS_ROLLED_BACK - * @see ReactiveTransactionSynchronization#STATUS_UNKNOWN + * constants in the TransactionSynchronization interface + * @see #registerAfterCompletionWithExistingTransaction(TransactionSynchronizationManager, Object, List) + * @see TransactionSynchronization#STATUS_COMMITTED + * @see TransactionSynchronization#STATUS_ROLLED_BACK + * @see TransactionSynchronization#STATUS_UNKNOWN */ - private Mono invokeAfterCompletion(ReactiveTransactionSynchronizationManager synchronizationManager, - List synchronizations, int completionStatus) { + private Mono invokeAfterCompletion(TransactionSynchronizationManager synchronizationManager, + List synchronizations, int completionStatus) { - return ReactiveTransactionSynchronizationUtils.invokeAfterCompletion(synchronizations, completionStatus); + return TransactionSynchronizationUtils.invokeAfterCompletion(synchronizations, completionStatus); } /** @@ -663,8 +663,8 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran * @param status object representing the transaction * @see #doCleanupAfterCompletion */ - private Mono cleanupAfterCompletion(ReactiveTransactionSynchronizationManager synchronizationManager, - DefaultReactiveTransactionStatus status) { + private Mono cleanupAfterCompletion(TransactionSynchronizationManager synchronizationManager, + GenericReactiveTransaction status) { return Mono.defer(() -> { status.setCompleted(); @@ -710,9 +710,9 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran * @see #doBegin * @see #doCommit * @see #doRollback - * @see DefaultReactiveTransactionStatus#getTransaction + * @see GenericReactiveTransaction#getTransaction */ - protected abstract Object doGetTransaction(ReactiveTransactionSynchronizationManager synchronizationManager) throws TransactionException; + protected abstract Object doGetTransaction(TransactionSynchronizationManager synchronizationManager) throws TransactionException; /** * Check if the given transaction object indicates an existing transaction @@ -752,7 +752,7 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran * @throws org.springframework.transaction.NestedTransactionNotSupportedException * if the underlying transaction does not support nesting (e.g. through savepoints) */ - protected abstract Mono doBegin(ReactiveTransactionSynchronizationManager synchronizationManager, + protected abstract Mono doBegin(TransactionSynchronizationManager synchronizationManager, Object transaction, TransactionDefinition definition) throws TransactionException; /** @@ -768,7 +768,7 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran * @throws TransactionException in case of system errors * @see #doResume */ - protected Mono doSuspend(ReactiveTransactionSynchronizationManager synchronizationManager, + protected Mono doSuspend(TransactionSynchronizationManager synchronizationManager, Object transaction) throws TransactionException { throw new TransactionSuspensionNotSupportedException( @@ -788,7 +788,7 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran * @throws TransactionException in case of system errors * @see #doSuspend */ - protected Mono doResume(ReactiveTransactionSynchronizationManager synchronizationManager, + protected Mono doResume(TransactionSynchronizationManager synchronizationManager, @Nullable Object transaction, Object suspendedResources) throws TransactionException { throw new TransactionSuspensionNotSupportedException( @@ -805,8 +805,8 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran * @throws RuntimeException in case of errors; will be propagated to the caller * (note: do not throw TransactionException subclasses here!) */ - protected Mono prepareForCommit(ReactiveTransactionSynchronizationManager synchronizationManager, - DefaultReactiveTransactionStatus status) { + protected Mono prepareForCommit(TransactionSynchronizationManager synchronizationManager, + GenericReactiveTransaction status) { return Mono.empty(); } @@ -820,10 +820,10 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran * @param synchronizationManager the synchronization manager bound to the current transaction * @param status the status representation of the transaction * @throws TransactionException in case of commit or system errors - * @see DefaultReactiveTransactionStatus#getTransaction + * @see GenericReactiveTransaction#getTransaction */ - protected abstract Mono doCommit(ReactiveTransactionSynchronizationManager synchronizationManager, - DefaultReactiveTransactionStatus status) throws TransactionException; + protected abstract Mono doCommit(TransactionSynchronizationManager synchronizationManager, + GenericReactiveTransaction status) throws TransactionException; /** * Perform an actual rollback of the given transaction. @@ -833,10 +833,10 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran * @param synchronizationManager the synchronization manager bound to the current transaction * @param status the status representation of the transaction * @throws TransactionException in case of system errors - * @see DefaultReactiveTransactionStatus#getTransaction + * @see GenericReactiveTransaction#getTransaction */ - protected abstract Mono doRollback(ReactiveTransactionSynchronizationManager synchronizationManager, - DefaultReactiveTransactionStatus status) throws TransactionException; + protected abstract Mono doRollback(TransactionSynchronizationManager synchronizationManager, + GenericReactiveTransaction status) throws TransactionException; /** * Set the given transaction rollback-only. Only called on rollback @@ -848,8 +848,8 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran * @param status the status representation of the transaction * @throws TransactionException in case of system errors */ - protected Mono doSetRollbackOnly(ReactiveTransactionSynchronizationManager synchronizationManager, - DefaultReactiveTransactionStatus status) throws TransactionException { + protected Mono doSetRollbackOnly(TransactionSynchronizationManager synchronizationManager, + GenericReactiveTransaction status) throws TransactionException { throw new IllegalTransactionStateException( "Participating in existing transactions is not supported - when 'isExistingTransaction' " + @@ -866,18 +866,18 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran * chance to determine the actual outcome of the outer transaction. * @param synchronizationManager the synchronization manager bound to the current transaction * @param transaction transaction object returned by {@code doGetTransaction} - * @param synchronizations a List of ReactiveTransactionSynchronization objects + * @param synchronizations a List of TransactionSynchronization objects * @throws TransactionException in case of system errors - * @see #invokeAfterCompletion(ReactiveTransactionSynchronizationManager, List, int) - * @see ReactiveTransactionSynchronization#afterCompletion(int) - * @see ReactiveTransactionSynchronization#STATUS_UNKNOWN + * @see #invokeAfterCompletion(TransactionSynchronizationManager, List, int) + * @see TransactionSynchronization#afterCompletion(int) + * @see TransactionSynchronization#STATUS_UNKNOWN */ - protected Mono registerAfterCompletionWithExistingTransaction(ReactiveTransactionSynchronizationManager synchronizationManager, - Object transaction, List synchronizations) throws TransactionException { + protected Mono registerAfterCompletionWithExistingTransaction(TransactionSynchronizationManager synchronizationManager, + Object transaction, List synchronizations) throws TransactionException { logger.debug("Cannot register Spring after-completion synchronization with existing transaction - " + "processing Spring after-completion callbacks immediately, with outcome status 'unknown'"); - return invokeAfterCompletion(synchronizationManager, synchronizations, ReactiveTransactionSynchronization.STATUS_UNKNOWN); + return invokeAfterCompletion(synchronizationManager, synchronizations, TransactionSynchronization.STATUS_UNKNOWN); } /** @@ -888,7 +888,7 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran * @param synchronizationManager the synchronization manager bound to the current transaction * @param transaction transaction object returned by {@code doGetTransaction} */ - protected Mono doCleanupAfterCompletion(ReactiveTransactionSynchronizationManager synchronizationManager, + protected Mono doCleanupAfterCompletion(TransactionSynchronizationManager synchronizationManager, Object transaction) { return Mono.empty(); @@ -918,7 +918,7 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran private final Object suspendedResources; @Nullable - private List suspendedSynchronizations; + private List suspendedSynchronizations; @Nullable private String name; @@ -935,7 +935,7 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran } private SuspendedResourcesHolder( - @Nullable Object suspendedResources, List suspendedSynchronizations, + @Nullable Object suspendedResources, List suspendedSynchronizations, @Nullable String name, boolean readOnly, @Nullable Integer isolationLevel, boolean wasActive) { this.suspendedResources = suspendedResources; diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/AbstractReactiveTransactionStatus.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/AbstractReactiveTransactionStatus.java deleted file mode 100644 index 6b7552004c1..00000000000 --- a/spring-tx/src/main/java/org/springframework/transaction/reactive/AbstractReactiveTransactionStatus.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Copyright 2002-2019 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.transaction.reactive; - -import org.springframework.transaction.ReactiveTransactionStatus; - -/** - * Abstract base implementation of the {@link ReactiveTransactionStatus} interface. - * - *

Pre-implements the handling of local rollback-only and completed flags. - * - *

Does not assume any specific internal transaction handling, such as an - * underlying transaction object, and no transaction synchronization mechanism. - * - * @author Mark Paluch - * @author Juergen Hoeller - * @since 5.2 - * @see #setRollbackOnly() - * @see #isRollbackOnly() - * @see #setCompleted() - * @see #isCompleted() - * @see DefaultReactiveTransactionStatus - */ -public abstract class AbstractReactiveTransactionStatus implements ReactiveTransactionStatus { - - private boolean rollbackOnly = false; - - private boolean completed = false; - - - //--------------------------------------------------------------------- - // Handling of current transaction state - //--------------------------------------------------------------------- - - @Override - public void setRollbackOnly() { - this.rollbackOnly = true; - } - - /** - * Determine the rollback-only flag via checking this ReactiveTransactionStatus. - *

Will only return "true" if the application called {@code setRollbackOnly} - * on this TransactionStatus object. - */ - @Override - public boolean isRollbackOnly() { - return this.rollbackOnly; - } - - /** - * Mark this transaction as completed, that is, committed or rolled back. - */ - public void setCompleted() { - this.completed = true; - } - - @Override - public boolean isCompleted() { - return this.completed; - } - -} diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/DefaultReactiveTransactionStatus.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/GenericReactiveTransaction.java similarity index 82% rename from spring-tx/src/main/java/org/springframework/transaction/reactive/DefaultReactiveTransactionStatus.java rename to spring-tx/src/main/java/org/springframework/transaction/reactive/GenericReactiveTransaction.java index 14dd58782ee..702392368a8 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/reactive/DefaultReactiveTransactionStatus.java +++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/GenericReactiveTransaction.java @@ -17,11 +17,11 @@ package org.springframework.transaction.reactive; import org.springframework.lang.Nullable; -import org.springframework.transaction.ReactiveTransactionStatus; +import org.springframework.transaction.ReactiveTransaction; import org.springframework.util.Assert; /** - * Default implementation of the {@link ReactiveTransactionStatus} interface, + * Default implementation of the {@link ReactiveTransaction} interface, * used by {@link AbstractReactiveTransactionManager}. Based on the concept * of an underlying "transaction object". * @@ -38,7 +38,7 @@ import org.springframework.util.Assert; * @see AbstractReactiveTransactionManager * @see #getTransaction */ -public class DefaultReactiveTransactionStatus extends AbstractReactiveTransactionStatus { +public class GenericReactiveTransaction implements ReactiveTransaction { @Nullable private final Object transaction; @@ -54,6 +54,10 @@ public class DefaultReactiveTransactionStatus extends AbstractReactiveTransactio @Nullable private final Object suspendedResources; + private boolean rollbackOnly = false; + + private boolean completed = false; + /** * Create a new {@code DefaultReactiveTransactionStatus} instance. @@ -70,7 +74,7 @@ public class DefaultReactiveTransactionStatus extends AbstractReactiveTransactio * @param suspendedResources a holder for resources that have been suspended * for this transaction, if any */ - public DefaultReactiveTransactionStatus( + public GenericReactiveTransaction( @Nullable Object transaction, boolean newTransaction, boolean newSynchronization, boolean readOnly, boolean debug, @Nullable Object suspendedResources) { @@ -137,4 +141,31 @@ public class DefaultReactiveTransactionStatus extends AbstractReactiveTransactio return this.suspendedResources; } + @Override + public void setRollbackOnly() { + this.rollbackOnly = true; + } + + /** + * Determine the rollback-only flag via checking this ReactiveTransactionStatus. + *

Will only return "true" if the application called {@code setRollbackOnly} + * on this TransactionStatus object. + */ + @Override + public boolean isRollbackOnly() { + return this.rollbackOnly; + } + + /** + * Mark this transaction as completed, that is, committed or rolled back. + */ + public void setCompleted() { + this.completed = true; + } + + @Override + public boolean isCompleted() { + return this.completed; + } + } diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveResourceHolderSynchronization.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveResourceSynchronization.java similarity index 71% rename from spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveResourceHolderSynchronization.java rename to spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveResourceSynchronization.java index 188587626a3..e2c4992660b 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveResourceHolderSynchronization.java +++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveResourceSynchronization.java @@ -18,40 +18,38 @@ package org.springframework.transaction.reactive; import reactor.core.publisher.Mono; -import org.springframework.transaction.support.ResourceHolder; - /** - * {@link ReactiveTransactionSynchronization} implementation that manages a - * {@link ResourceHolder} bound through {@link ReactiveTransactionSynchronizationManager}. + * {@link TransactionSynchronization} implementation that manages a + * resource object bound through {@link TransactionSynchronizationManager}. * * @author Mark Paluch + * @author Juergen Hoeller * @since 5.2 - * @param the resource holder type + * @param the resource holder type * @param the resource key type */ -public abstract class ReactiveResourceHolderSynchronization - implements ReactiveTransactionSynchronization { +public abstract class ReactiveResourceSynchronization implements TransactionSynchronization { - private final H resourceHolder; + private final O resourceObject; private final K resourceKey; - private final ReactiveTransactionSynchronizationManager synchronizationManager; + private final TransactionSynchronizationManager synchronizationManager; private volatile boolean holderActive = true; /** - * Create a new ResourceHolderSynchronization for the given holder. - * @param resourceHolder the ResourceHolder to manage - * @param resourceKey the key to bind the ResourceHolder for + * Create a new ReactiveResourceSynchronization for the given holder. + * @param resourceObject the resource object to manage + * @param resourceKey the key to bind the resource object for * @param synchronizationManager the synchronization manager bound to the current transaction - * @see ReactiveTransactionSynchronizationManager#bindResource + * @see TransactionSynchronizationManager#bindResource */ - public ReactiveResourceHolderSynchronization( - H resourceHolder, K resourceKey, ReactiveTransactionSynchronizationManager synchronizationManager) { + public ReactiveResourceSynchronization( + O resourceObject, K resourceKey, TransactionSynchronizationManager synchronizationManager) { - this.resourceHolder = resourceHolder; + this.resourceObject = resourceObject; this.resourceKey = resourceKey; this.synchronizationManager = synchronizationManager; } @@ -68,7 +66,7 @@ public abstract class ReactiveResourceHolderSynchronization resume() { if (this.holderActive) { - this.synchronizationManager.bindResource(this.resourceKey, this.resourceHolder); + this.synchronizationManager.bindResource(this.resourceKey, this.resourceObject); } return Mono.empty(); } @@ -84,7 +82,7 @@ public abstract class ReactiveResourceHolderSynchronization afterCommit() { if (!shouldReleaseBeforeCompletion()) { - return processResourceAfterCommit(this.resourceHolder); + return processResourceAfterCommit(this.resourceObject); } return Mono.empty(); } @@ -109,21 +107,20 @@ public abstract class ReactiveResourceHolderSynchronization this.resourceHolder.reset()); + return sync; }); } @@ -157,7 +154,7 @@ public abstract class ReactiveResourceHolderSynchronization processResourceAfterCommit(H resourceHolder) { + protected Mono processResourceAfterCommit(O resourceHolder) { return Mono.empty(); } /** * Release the given resource (after it has been unbound from the thread). * @param resourceHolder the resource holder to process - * @param resourceKey the key that the ResourceHolder was bound for + * @param resourceKey the key that the resource object was bound for */ - protected Mono releaseResource(H resourceHolder, K resourceKey) { + protected Mono releaseResource(O resourceHolder, K resourceKey) { return Mono.empty(); } /** * Perform a cleanup on the given resource (which is left bound to the thread). * @param resourceHolder the resource holder to process - * @param resourceKey the key that the ResourceHolder was bound for + * @param resourceKey the key that the resource object was bound for * @param committed whether the transaction has committed ({@code true}) * or rolled back ({@code false}) */ - protected Mono cleanupResource(H resourceHolder, K resourceKey, boolean committed) { + protected Mono cleanupResource(O resourceHolder, K resourceKey, boolean committed) { return Mono.empty(); } diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveTransactionCallback.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionCallback.java similarity index 90% rename from spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveTransactionCallback.java rename to spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionCallback.java index 4ee74803f11..9ba759f0d55 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveTransactionCallback.java +++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionCallback.java @@ -18,7 +18,7 @@ package org.springframework.transaction.reactive; import org.reactivestreams.Publisher; -import org.springframework.transaction.ReactiveTransactionStatus; +import org.springframework.transaction.ReactiveTransaction; /** * Callback interface for reactive transactional code. Used with {@link TransactionalOperator}'s @@ -30,12 +30,13 @@ import org.springframework.transaction.ReactiveTransactionStatus; * Spring's {@link org.springframework.transaction.annotation.Transactional} annotation). * * @author Mark Paluch + * @author Juergen Hoeller * @since 5.2 * @see TransactionalOperator * @param the result type */ @FunctionalInterface -public interface ReactiveTransactionCallback { +public interface TransactionCallback { /** * Gets called by {@link TransactionalOperator} within a transactional context. @@ -46,6 +47,6 @@ public interface ReactiveTransactionCallback { * @return a result publisher * @see TransactionalOperator#transactional */ - Publisher doInTransaction(ReactiveTransactionStatus status); + Publisher doInTransaction(ReactiveTransaction status); } diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionContext.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionContext.java index 760d65303dd..a13c8caee73 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionContext.java +++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionContext.java @@ -45,7 +45,7 @@ public class TransactionContext { private final Map resources = new LinkedHashMap<>(); @Nullable - private Set synchronizations; + private Set synchronizations; private volatile @Nullable String currentTransactionName; @@ -85,12 +85,12 @@ public class TransactionContext { return this.resources; } - public void setSynchronizations(@Nullable Set synchronizations) { + public void setSynchronizations(@Nullable Set synchronizations) { this.synchronizations = synchronizations; } @Nullable - public Set getSynchronizations() { + public Set getSynchronizations() { return this.synchronizations; } diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionContextManager.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionContextManager.java index f35e8830b14..53dd59f23db 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionContextManager.java +++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionContextManager.java @@ -33,7 +33,7 @@ import org.springframework.transaction.NoTransactionException; * * @author Mark Paluch * @since 5.2 - * @see ReactiveTransactionSynchronization + * @see TransactionSynchronization */ public abstract class TransactionContextManager { @@ -80,7 +80,7 @@ public abstract class TransactionContextManager { /** * Return a {@link Function} to create or associate a new {@link TransactionContext}. * Interaction with transactional resources through - * {@link ReactiveTransactionSynchronizationManager} requires a TransactionContext + * {@link TransactionSynchronizationManager} requires a TransactionContext * to be registered in the subscriber context. * @return functional context registration. */ diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveTransactionSynchronization.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionSynchronization.java similarity index 94% rename from spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveTransactionSynchronization.java rename to spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionSynchronization.java index ed10f52530f..ac8309f4c8f 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveTransactionSynchronization.java +++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionSynchronization.java @@ -22,7 +22,7 @@ import reactor.core.publisher.Mono; * Interface for reactive transaction synchronization callbacks. * Supported by {@link AbstractReactiveTransactionManager}. * - *

ReactiveTransactionSynchronization implementations can implement the + *

TransactionSynchronization implementations can implement the * {@link org.springframework.core.Ordered} interface to influence their execution order. * A synchronization that does not implement the {@link org.springframework.core.Ordered} * interface is appended to the end of the synchronization chain. @@ -31,11 +31,12 @@ import reactor.core.publisher.Mono; * allowing for fine-grained interaction with their execution order (if necessary). * * @author Mark Paluch + * @author Juergen Hoeller * @since 5.2 - * @see ReactiveTransactionSynchronizationManager + * @see TransactionSynchronizationManager * @see AbstractReactiveTransactionManager */ -public interface ReactiveTransactionSynchronization { +public interface TransactionSynchronization { /** Completion status in case of proper commit. */ int STATUS_COMMITTED = 0; @@ -50,7 +51,7 @@ public interface ReactiveTransactionSynchronization { /** * Suspend this synchronization. * Supposed to unbind resources from TransactionSynchronizationManager if managing any. - * @see ReactiveTransactionSynchronizationManager#unbindResource + * @see TransactionSynchronizationManager#unbindResource */ default Mono suspend() { return Mono.empty(); @@ -59,7 +60,7 @@ public interface ReactiveTransactionSynchronization { /** * Resume this synchronization. * Supposed to rebind resources to TransactionSynchronizationManager if managing any. - * @see ReactiveTransactionSynchronizationManager#bindResource + * @see TransactionSynchronizationManager#bindResource */ default Mono resume() { return Mono.empty(); diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveTransactionSynchronizationManager.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionSynchronizationManager.java similarity index 90% rename from spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveTransactionSynchronizationManager.java rename to spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionSynchronizationManager.java index d8448e93c9d..8db41c7dd23 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveTransactionSynchronizationManager.java +++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionSynchronizationManager.java @@ -67,29 +67,29 @@ import org.springframework.util.Assert; * @since 5.2 * @see #isSynchronizationActive * @see #registerSynchronization - * @see ReactiveTransactionSynchronization + * @see TransactionSynchronization */ -public class ReactiveTransactionSynchronizationManager { +public class TransactionSynchronizationManager { - private static final Log logger = LogFactory.getLog(ReactiveTransactionSynchronizationManager.class); + private static final Log logger = LogFactory.getLog(TransactionSynchronizationManager.class); private final TransactionContext transactionContext; - public ReactiveTransactionSynchronizationManager(TransactionContext transactionContext) { + public TransactionSynchronizationManager(TransactionContext transactionContext) { this.transactionContext = transactionContext; } /** - * Return the ReactiveTransactionSynchronizationManager of the current transaction. + * Return the TransactionSynchronizationManager of the current transaction. * Mainly intended for code that wants to bind resources or synchronizations. * rollback-only but not throw an application exception. * @throws NoTransactionException if the transaction info cannot be found, * because the method was invoked outside a managed transaction. */ - public static Mono currentTransaction() { - return TransactionContextManager.currentContext().map(ReactiveTransactionSynchronizationManager::new); + public static Mono currentTransaction() { + return TransactionContextManager.currentContext().map(TransactionSynchronizationManager::new); } /** @@ -98,7 +98,7 @@ public class ReactiveTransactionSynchronizationManager { * @return if there is a value bound to the current thread */ public boolean hasResource(Object key) { - Object actualKey = ReactiveTransactionSynchronizationUtils.unwrapResourceIfNecessary(key); + Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key); Object value = doGetResource(actualKey); return (value != null); } @@ -111,7 +111,7 @@ public class ReactiveTransactionSynchronizationManager { */ @Nullable public Object getResource(Object key) { - Object actualKey = ReactiveTransactionSynchronizationUtils.unwrapResourceIfNecessary(key); + Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key); Object value = doGetResource(actualKey); if (value != null && logger.isTraceEnabled()) { logger.trace("Retrieved value [" + value + "] for key [" + actualKey + "] bound to context [" + @@ -137,7 +137,7 @@ public class ReactiveTransactionSynchronizationManager { * @throws IllegalStateException if there is already a value bound to the context */ public void bindResource(Object key, Object value) throws IllegalStateException { - Object actualKey = ReactiveTransactionSynchronizationUtils.unwrapResourceIfNecessary(key); + Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key); Assert.notNull(value, "Value must not be null"); Map map = this.transactionContext.getResources(); Object oldValue = map.put(actualKey, value); @@ -158,7 +158,7 @@ public class ReactiveTransactionSynchronizationManager { * @throws IllegalStateException if there is no value bound to the context */ public Object unbindResource(Object key) throws IllegalStateException { - Object actualKey = ReactiveTransactionSynchronizationUtils.unwrapResourceIfNecessary(key); + Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key); Object value = doUnbindResource(actualKey); if (value == null) { throw new IllegalStateException( @@ -174,7 +174,7 @@ public class ReactiveTransactionSynchronizationManager { */ @Nullable public Object unbindResourceIfPossible(Object key) { - Object actualKey = ReactiveTransactionSynchronizationUtils.unwrapResourceIfNecessary(key); + Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key); return doUnbindResource(actualKey); } @@ -229,11 +229,11 @@ public class ReactiveTransactionSynchronizationManager { * @throws IllegalStateException if transaction synchronization is not active * @see org.springframework.core.Ordered */ - public void registerSynchronization(ReactiveTransactionSynchronization synchronization) + public void registerSynchronization(TransactionSynchronization synchronization) throws IllegalStateException { Assert.notNull(synchronization, "TransactionSynchronization must not be null"); - Set synchs = this.transactionContext.getSynchronizations(); + Set synchs = this.transactionContext.getSynchronizations(); if (synchs == null) { throw new IllegalStateException("Transaction synchronization is not active"); } @@ -245,10 +245,10 @@ public class ReactiveTransactionSynchronizationManager { * for the current context. * @return unmodifiable List of TransactionSynchronization instances * @throws IllegalStateException if synchronization is not active - * @see ReactiveTransactionSynchronization + * @see TransactionSynchronization */ - public List getSynchronizations() throws IllegalStateException { - Set synchs = this.transactionContext.getSynchronizations(); + public List getSynchronizations() throws IllegalStateException { + Set synchs = this.transactionContext.getSynchronizations(); if (synchs == null) { throw new IllegalStateException("Transaction synchronization is not active"); } @@ -260,7 +260,7 @@ public class ReactiveTransactionSynchronizationManager { } else { // Sort lazily here, not in registerSynchronization. - List sortedSynchs = new ArrayList<>(synchs); + List sortedSynchs = new ArrayList<>(synchs); AnnotationAwareOrderComparator.sort(sortedSynchs); return Collections.unmodifiableList(sortedSynchs); } @@ -325,7 +325,7 @@ public class ReactiveTransactionSynchronizationManager { * to suppress change detection on commit. The present method is meant * to be used for earlier read-only checks. * @see org.springframework.transaction.TransactionDefinition#isReadOnly() - * @see ReactiveTransactionSynchronization#beforeCommit(boolean) + * @see TransactionSynchronization#beforeCommit(boolean) */ public boolean isCurrentTransactionReadOnly() { return this.transactionContext.isCurrentTransactionReadOnly(); diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveTransactionSynchronizationUtils.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionSynchronizationUtils.java similarity index 59% rename from spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveTransactionSynchronizationUtils.java rename to spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionSynchronizationUtils.java index 0913bdae360..85064218fb6 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/reactive/ReactiveTransactionSynchronizationUtils.java +++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionSynchronizationUtils.java @@ -29,21 +29,21 @@ import org.springframework.util.Assert; import org.springframework.util.ClassUtils; /** - * Utility methods for triggering specific {@link ReactiveTransactionSynchronization} + * Utility methods for triggering specific {@link TransactionSynchronization} * callback methods on all currently registered synchronizations. * * @author Mark Paluch * @author Juergen Hoeller * @since 5.2 - * @see ReactiveTransactionSynchronization - * @see ReactiveTransactionSynchronizationManager#getSynchronizations() + * @see TransactionSynchronization + * @see TransactionSynchronizationManager#getSynchronizations() */ -abstract class ReactiveTransactionSynchronizationUtils { +abstract class TransactionSynchronizationUtils { - private static final Log logger = LogFactory.getLog(ReactiveTransactionSynchronizationUtils.class); + private static final Log logger = LogFactory.getLog(TransactionSynchronizationUtils.class); private static final boolean aopAvailable = ClassUtils.isPresent( - "org.springframework.aop.scope.ScopedObject", ReactiveTransactionSynchronizationUtils.class.getClassLoader()); + "org.springframework.aop.scope.ScopedObject", TransactionSynchronizationUtils.class.getClassLoader()); /** @@ -68,51 +68,51 @@ abstract class ReactiveTransactionSynchronizationUtils { /** * Actually invoke the {@code triggerBeforeCommit} methods of the - * given Spring ReactiveTransactionSynchronization objects. - * @param synchronizations a List of ReactiveTransactionSynchronization objects - * @see ReactiveTransactionSynchronization#beforeCommit(boolean) + * given Spring TransactionSynchronization objects. + * @param synchronizations a List of TransactionSynchronization objects + * @see TransactionSynchronization#beforeCommit(boolean) */ - public static Mono triggerBeforeCommit(Collection synchronizations, boolean readOnly) { + public static Mono triggerBeforeCommit(Collection synchronizations, boolean readOnly) { return Flux.fromIterable(synchronizations).concatMap(it -> it.beforeCommit(readOnly)).then(); } /** * Actually invoke the {@code beforeCompletion} methods of the - * given Spring ReactiveTransactionSynchronization objects. - * @param synchronizations a List of ReactiveTransactionSynchronization objects - * @see ReactiveTransactionSynchronization#beforeCompletion() + * given Spring TransactionSynchronization objects. + * @param synchronizations a List of TransactionSynchronization objects + * @see TransactionSynchronization#beforeCompletion() */ - public static Mono triggerBeforeCompletion(Collection synchronizations) { + public static Mono triggerBeforeCompletion(Collection synchronizations) { return Flux.fromIterable(synchronizations) - .concatMap(ReactiveTransactionSynchronization::beforeCompletion).onErrorContinue((t, o) -> + .concatMap(TransactionSynchronization::beforeCompletion).onErrorContinue((t, o) -> logger.error("TransactionSynchronization.beforeCompletion threw exception", t)).then(); } /** * Actually invoke the {@code afterCommit} methods of the - * given Spring ReactiveTransactionSynchronization objects. - * @param synchronizations a List of ReactiveTransactionSynchronization objects - * @see ReactiveTransactionSynchronization#afterCommit() + * given Spring TransactionSynchronization objects. + * @param synchronizations a List of TransactionSynchronization objects + * @see TransactionSynchronization#afterCommit() */ - public static Mono invokeAfterCommit(Collection synchronizations) { + public static Mono invokeAfterCommit(Collection synchronizations) { return Flux.fromIterable(synchronizations) - .concatMap(ReactiveTransactionSynchronization::afterCommit) + .concatMap(TransactionSynchronization::afterCommit) .then(); } /** * Actually invoke the {@code afterCompletion} methods of the - * given Spring ReactiveTransactionSynchronization objects. - * @param synchronizations a List of ReactiveTransactionSynchronization objects + * given Spring TransactionSynchronization objects. + * @param synchronizations a List of TransactionSynchronization objects * @param completionStatus the completion status according to the - * constants in the ReactiveTransactionSynchronization interface - * @see ReactiveTransactionSynchronization#afterCompletion(int) - * @see ReactiveTransactionSynchronization#STATUS_COMMITTED - * @see ReactiveTransactionSynchronization#STATUS_ROLLED_BACK - * @see ReactiveTransactionSynchronization#STATUS_UNKNOWN + * constants in the TransactionSynchronization interface + * @see TransactionSynchronization#afterCompletion(int) + * @see TransactionSynchronization#STATUS_COMMITTED + * @see TransactionSynchronization#STATUS_ROLLED_BACK + * @see TransactionSynchronization#STATUS_UNKNOWN */ public static Mono invokeAfterCompletion( - Collection synchronizations, int completionStatus) { + Collection synchronizations, int completionStatus) { return Flux.fromIterable(synchronizations).concatMap(it -> it.afterCompletion(completionStatus)) .onErrorContinue((t, o) -> logger.error("TransactionSynchronization.afterCompletion threw exception", t)).then(); diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperator.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperator.java index 34382e2c2db..557d3e90f0c 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperator.java +++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperator.java @@ -56,7 +56,7 @@ public interface TransactionalOperator { static TransactionalOperator create( ReactiveTransactionManager transactionManager, TransactionDefinition transactionDefinition){ - return new DefaultTransactionalOperator(transactionManager, transactionDefinition); + return new TransactionalOperatorImpl(transactionManager, transactionDefinition); } @@ -93,6 +93,6 @@ public interface TransactionalOperator { * @throws TransactionException in case of initialization, rollback, or system errors * @throws RuntimeException if thrown by the TransactionCallback */ - Flux execute(ReactiveTransactionCallback action) throws TransactionException; + Flux execute(TransactionCallback action) throws TransactionException; } diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/DefaultTransactionalOperator.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperatorImpl.java similarity index 82% rename from spring-tx/src/main/java/org/springframework/transaction/reactive/DefaultTransactionalOperator.java rename to spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperatorImpl.java index c1285fdf94a..ea2ab1d7dbc 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/reactive/DefaultTransactionalOperator.java +++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperatorImpl.java @@ -21,8 +21,8 @@ import org.apache.commons.logging.LogFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import org.springframework.transaction.ReactiveTransaction; import org.springframework.transaction.ReactiveTransactionManager; -import org.springframework.transaction.ReactiveTransactionStatus; import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.TransactionException; import org.springframework.transaction.TransactionSystemException; @@ -39,9 +39,9 @@ import org.springframework.util.Assert; * @see ReactiveTransactionManager */ @SuppressWarnings("serial") -final class DefaultTransactionalOperator implements TransactionalOperator { +final class TransactionalOperatorImpl implements TransactionalOperator { - private static final Log logger = LogFactory.getLog(DefaultTransactionalOperator.class); + private static final Log logger = LogFactory.getLog(TransactionalOperatorImpl.class); private final ReactiveTransactionManager transactionManager; @@ -55,7 +55,7 @@ final class DefaultTransactionalOperator implements TransactionalOperator { * @param transactionDefinition the transaction definition to copy the * default settings from. Local properties can still be set to change values. */ - DefaultTransactionalOperator(ReactiveTransactionManager transactionManager, TransactionDefinition transactionDefinition) { + TransactionalOperatorImpl(ReactiveTransactionManager transactionManager, TransactionDefinition transactionDefinition) { Assert.notNull(transactionManager, "ReactiveTransactionManager must not be null"); Assert.notNull(transactionManager, "TransactionDefinition must not be null"); this.transactionManager = transactionManager; @@ -72,9 +72,9 @@ final class DefaultTransactionalOperator implements TransactionalOperator { @Override - public Flux execute(ReactiveTransactionCallback action) throws TransactionException { + public Flux execute(TransactionCallback action) throws TransactionException { return TransactionContextManager.currentContext().flatMapMany(context -> { - Mono status = this.transactionManager.getTransaction(this.transactionDefinition); + Mono status = this.transactionManager.getReactiveTransaction(this.transactionDefinition); return status.flatMapMany(it -> { // This is an around advice: Invoke the next interceptor in the chain. // This will normally result in a target object being invoked. @@ -98,7 +98,7 @@ final class DefaultTransactionalOperator implements TransactionalOperator { * @param ex the thrown application exception or error * @throws TransactionException in case of a rollback error */ - private Mono rollbackOnException(ReactiveTransactionStatus status, Throwable ex) throws TransactionException { + private Mono rollbackOnException(ReactiveTransaction status, Throwable ex) throws TransactionException { logger.debug("Initiating transaction rollback on application exception", ex); return this.transactionManager.rollback(status).onErrorMap(ex2 -> { logger.error("Application exception overridden by rollback exception", ex); @@ -113,8 +113,8 @@ final class DefaultTransactionalOperator implements TransactionalOperator { @Override public boolean equals(Object other) { - return (this == other || (super.equals(other) && (!(other instanceof DefaultTransactionalOperator) || - getTransactionManager() == ((DefaultTransactionalOperator) other).getTransactionManager()))); + return (this == other || (super.equals(other) && (!(other instanceof TransactionalOperatorImpl) || + getTransactionManager() == ((TransactionalOperatorImpl) other).getTransactionManager()))); } @Override diff --git a/spring-tx/src/main/java/org/springframework/transaction/support/AbstractTransactionStatus.java b/spring-tx/src/main/java/org/springframework/transaction/support/AbstractTransactionStatus.java index f82c96a56c4..97d9cd33483 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/support/AbstractTransactionStatus.java +++ b/spring-tx/src/main/java/org/springframework/transaction/support/AbstractTransactionStatus.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -55,7 +55,7 @@ public abstract class AbstractTransactionStatus implements TransactionStatus { //--------------------------------------------------------------------- - // Handling of current transaction state + // Implementation of TransactionExecution //--------------------------------------------------------------------- @Override @@ -93,13 +93,6 @@ public abstract class AbstractTransactionStatus implements TransactionStatus { return false; } - /** - * This implementations is empty, considering flush as a no-op. - */ - @Override - public void flush() { - } - /** * Mark this transaction as completed, that is, committed or rolled back. */ @@ -117,6 +110,11 @@ public abstract class AbstractTransactionStatus implements TransactionStatus { // Handling of current savepoint state //--------------------------------------------------------------------- + @Override + public boolean hasSavepoint() { + return (this.savepoint != null); + } + /** * Set a savepoint for this transaction. Useful for PROPAGATION_NESTED. * @see org.springframework.transaction.TransactionDefinition#PROPAGATION_NESTED @@ -133,11 +131,6 @@ public abstract class AbstractTransactionStatus implements TransactionStatus { return this.savepoint; } - @Override - public boolean hasSavepoint() { - return (this.savepoint != null); - } - /** * Create a savepoint and hold it for the transaction. * @throws org.springframework.transaction.NestedTransactionNotSupportedException @@ -223,4 +216,16 @@ public abstract class AbstractTransactionStatus implements TransactionStatus { throw new NestedTransactionNotSupportedException("This transaction does not support savepoints"); } + + //--------------------------------------------------------------------- + // Flushing support + //--------------------------------------------------------------------- + + /** + * This implementations is empty, considering flush as a no-op. + */ + @Override + public void flush() { + } + } diff --git a/spring-tx/src/main/java/org/springframework/transaction/support/DefaultTransactionStatus.java b/spring-tx/src/main/java/org/springframework/transaction/support/DefaultTransactionStatus.java index 574789a2e49..5ee90ce3c39 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/support/DefaultTransactionStatus.java +++ b/spring-tx/src/main/java/org/springframework/transaction/support/DefaultTransactionStatus.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -165,18 +165,6 @@ public class DefaultTransactionStatus extends AbstractTransactionStatus { ((SmartTransactionObject) this.transaction).isRollbackOnly()); } - /** - * Delegate the flushing to the transaction object, provided that the latter - * implements the {@link SmartTransactionObject} interface. - * @see SmartTransactionObject#flush() - */ - @Override - public void flush() { - if (this.transaction instanceof SmartTransactionObject) { - ((SmartTransactionObject) this.transaction).flush(); - } - } - /** * This implementation exposes the {@link SavepointManager} interface * of the underlying transaction object, if any. @@ -203,4 +191,16 @@ public class DefaultTransactionStatus extends AbstractTransactionStatus { return (this.transaction instanceof SavepointManager); } + /** + * Delegate the flushing to the transaction object, provided that the latter + * implements the {@link SmartTransactionObject} interface. + * @see SmartTransactionObject#flush() + */ + @Override + public void flush() { + if (this.transaction instanceof SmartTransactionObject) { + ((SmartTransactionObject) this.transaction).flush(); + } + } + } diff --git a/spring-tx/src/test/java/org/springframework/transaction/reactive/ReactiveTestTransactionManager.java b/spring-tx/src/test/java/org/springframework/transaction/reactive/ReactiveTestTransactionManager.java index b6b4e58b547..d1045e5e2d8 100644 --- a/spring-tx/src/test/java/org/springframework/transaction/reactive/ReactiveTestTransactionManager.java +++ b/spring-tx/src/test/java/org/springframework/transaction/reactive/ReactiveTestTransactionManager.java @@ -52,7 +52,7 @@ class ReactiveTestTransactionManager extends AbstractReactiveTransactionManager @Override - protected Object doGetTransaction(ReactiveTransactionSynchronizationManager synchronizationManager) { + protected Object doGetTransaction(TransactionSynchronizationManager synchronizationManager) { return TRANSACTION; } @@ -62,7 +62,7 @@ class ReactiveTestTransactionManager extends AbstractReactiveTransactionManager } @Override - protected Mono doBegin(ReactiveTransactionSynchronizationManager synchronizationManager, Object transaction, TransactionDefinition definition) { + protected Mono doBegin(TransactionSynchronizationManager synchronizationManager, Object transaction, TransactionDefinition definition) { if (!TRANSACTION.equals(transaction)) { return Mono.error(new IllegalArgumentException("Not the same transaction object")); } @@ -73,7 +73,7 @@ class ReactiveTestTransactionManager extends AbstractReactiveTransactionManager } @Override - protected Mono doCommit(ReactiveTransactionSynchronizationManager synchronizationManager, DefaultReactiveTransactionStatus status) { + protected Mono doCommit(TransactionSynchronizationManager synchronizationManager, GenericReactiveTransaction status) { if (!TRANSACTION.equals(status.getTransaction())) { return Mono.error(new IllegalArgumentException("Not the same transaction object")); } @@ -81,7 +81,7 @@ class ReactiveTestTransactionManager extends AbstractReactiveTransactionManager } @Override - protected Mono doRollback(ReactiveTransactionSynchronizationManager synchronizationManager, DefaultReactiveTransactionStatus status) { + protected Mono doRollback(TransactionSynchronizationManager synchronizationManager, GenericReactiveTransaction status) { if (!TRANSACTION.equals(status.getTransaction())) { return Mono.error(new IllegalArgumentException("Not the same transaction object")); } @@ -89,7 +89,7 @@ class ReactiveTestTransactionManager extends AbstractReactiveTransactionManager } @Override - protected Mono doSetRollbackOnly(ReactiveTransactionSynchronizationManager synchronizationManager, DefaultReactiveTransactionStatus status) { + protected Mono doSetRollbackOnly(TransactionSynchronizationManager synchronizationManager, GenericReactiveTransaction status) { if (!TRANSACTION.equals(status.getTransaction())) { return Mono.error(new IllegalArgumentException("Not the same transaction object")); } diff --git a/spring-tx/src/test/java/org/springframework/transaction/reactive/ReactiveTransactionSupportUnitTests.java b/spring-tx/src/test/java/org/springframework/transaction/reactive/ReactiveTransactionSupportTests.java similarity index 82% rename from spring-tx/src/test/java/org/springframework/transaction/reactive/ReactiveTransactionSupportUnitTests.java rename to spring-tx/src/test/java/org/springframework/transaction/reactive/ReactiveTransactionSupportTests.java index 5a3b2691610..d14d0a4aa81 100644 --- a/spring-tx/src/test/java/org/springframework/transaction/reactive/ReactiveTransactionSupportUnitTests.java +++ b/spring-tx/src/test/java/org/springframework/transaction/reactive/ReactiveTransactionSupportTests.java @@ -23,7 +23,7 @@ import reactor.test.StepVerifier; import org.springframework.transaction.IllegalTransactionStateException; import org.springframework.transaction.ReactiveTransactionManager; -import org.springframework.transaction.ReactiveTransactionStatus; +import org.springframework.transaction.ReactiveTransaction; import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.support.DefaultTransactionDefinition; @@ -34,27 +34,27 @@ import static org.junit.Assert.*; * * @author Mark Paluch */ -public class ReactiveTransactionSupportUnitTests { +public class ReactiveTransactionSupportTests { @Test public void noExistingTransaction() { ReactiveTransactionManager tm = new ReactiveTestTransactionManager(false, true); - tm.getTransaction(new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_SUPPORTS)) - .subscriberContext(TransactionContextManager.createTransactionContext()).cast(DefaultReactiveTransactionStatus.class) + tm.getReactiveTransaction(new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_SUPPORTS)) + .subscriberContext(TransactionContextManager.createTransactionContext()).cast(GenericReactiveTransaction.class) .as(StepVerifier::create).consumeNextWith(actual -> { assertFalse(actual.hasTransaction()); }).verifyComplete(); - tm.getTransaction(new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_REQUIRED)) - .cast(DefaultReactiveTransactionStatus.class).subscriberContext(TransactionContextManager.createTransactionContext()) + tm.getReactiveTransaction(new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_REQUIRED)) + .cast(GenericReactiveTransaction.class).subscriberContext(TransactionContextManager.createTransactionContext()) .as(StepVerifier::create).consumeNextWith(actual -> { assertTrue(actual.hasTransaction()); assertTrue(actual.isNewTransaction()); }).verifyComplete(); - tm.getTransaction(new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_MANDATORY)) - .subscriberContext(TransactionContextManager.createTransactionContext()).cast(DefaultReactiveTransactionStatus.class) + tm.getReactiveTransaction(new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_MANDATORY)) + .subscriberContext(TransactionContextManager.createTransactionContext()).cast(GenericReactiveTransaction.class) .as(StepVerifier::create).expectError(IllegalTransactionStateException.class).verify(); } @@ -62,22 +62,22 @@ public class ReactiveTransactionSupportUnitTests { public void existingTransaction() { ReactiveTransactionManager tm = new ReactiveTestTransactionManager(true, true); - tm.getTransaction(new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_SUPPORTS)) - .subscriberContext(TransactionContextManager.createTransactionContext()).cast(DefaultReactiveTransactionStatus.class) + tm.getReactiveTransaction(new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_SUPPORTS)) + .subscriberContext(TransactionContextManager.createTransactionContext()).cast(GenericReactiveTransaction.class) .as(StepVerifier::create).consumeNextWith(actual -> { assertNotNull(actual.getTransaction()); assertFalse(actual.isNewTransaction()); }).verifyComplete(); - tm.getTransaction(new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_REQUIRED)) - .subscriberContext(TransactionContextManager.createTransactionContext()).cast(DefaultReactiveTransactionStatus.class) + tm.getReactiveTransaction(new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_REQUIRED)) + .subscriberContext(TransactionContextManager.createTransactionContext()).cast(GenericReactiveTransaction.class) .as(StepVerifier::create).consumeNextWith(actual -> { assertNotNull(actual.getTransaction()); assertFalse(actual.isNewTransaction()); }).verifyComplete(); - tm.getTransaction(new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_MANDATORY)) - .subscriberContext(TransactionContextManager.createTransactionContext()).cast(DefaultReactiveTransactionStatus.class) + tm.getReactiveTransaction(new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_MANDATORY)) + .subscriberContext(TransactionContextManager.createTransactionContext()).cast(GenericReactiveTransaction.class) .as(StepVerifier::create).consumeNextWith(actual -> { assertNotNull(actual.getTransaction()); assertFalse(actual.isNewTransaction()); @@ -87,7 +87,7 @@ public class ReactiveTransactionSupportUnitTests { @Test public void commitWithoutExistingTransaction() { ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(false, true); - tm.getTransaction(new DefaultTransactionDefinition()).flatMap(tm::commit) + tm.getReactiveTransaction(new DefaultTransactionDefinition()).flatMap(tm::commit) .subscriberContext(TransactionContextManager.createTransactionContext()) .as(StepVerifier::create).verifyComplete(); @@ -100,7 +100,7 @@ public class ReactiveTransactionSupportUnitTests { @Test public void rollbackWithoutExistingTransaction() { ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(false, true); - tm.getTransaction(new DefaultTransactionDefinition()).flatMap(tm::rollback) + tm.getReactiveTransaction(new DefaultTransactionDefinition()).flatMap(tm::rollback) .subscriberContext(TransactionContextManager.createTransactionContext()).as(StepVerifier::create) .verifyComplete(); @@ -113,7 +113,7 @@ public class ReactiveTransactionSupportUnitTests { @Test public void rollbackOnlyWithoutExistingTransaction() { ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(false, true); - tm.getTransaction(new DefaultTransactionDefinition()).doOnNext(ReactiveTransactionStatus::setRollbackOnly) + tm.getReactiveTransaction(new DefaultTransactionDefinition()).doOnNext(ReactiveTransaction::setRollbackOnly) .flatMap(tm::commit) .subscriberContext(TransactionContextManager.createTransactionContext()).as(StepVerifier::create) .verifyComplete(); @@ -127,7 +127,7 @@ public class ReactiveTransactionSupportUnitTests { @Test public void commitWithExistingTransaction() { ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(true, true); - tm.getTransaction(new DefaultTransactionDefinition()).flatMap(tm::commit) + tm.getReactiveTransaction(new DefaultTransactionDefinition()).flatMap(tm::commit) .subscriberContext(TransactionContextManager.createTransactionContext()) .as(StepVerifier::create).verifyComplete(); @@ -140,7 +140,7 @@ public class ReactiveTransactionSupportUnitTests { @Test public void rollbackWithExistingTransaction() { ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(true, true); - tm.getTransaction(new DefaultTransactionDefinition()).flatMap(tm::rollback) + tm.getReactiveTransaction(new DefaultTransactionDefinition()).flatMap(tm::rollback) .subscriberContext(TransactionContextManager.createTransactionContext()).as(StepVerifier::create) .verifyComplete(); @@ -153,7 +153,7 @@ public class ReactiveTransactionSupportUnitTests { @Test public void rollbackOnlyWithExistingTransaction() { ReactiveTestTransactionManager tm = new ReactiveTestTransactionManager(true, true); - tm.getTransaction(new DefaultTransactionDefinition()).doOnNext(ReactiveTransactionStatus::setRollbackOnly).flatMap(tm::commit) + tm.getReactiveTransaction(new DefaultTransactionDefinition()).doOnNext(ReactiveTransaction::setRollbackOnly).flatMap(tm::commit) .subscriberContext(TransactionContextManager.createTransactionContext()).as(StepVerifier::create) .verifyComplete();