Polishing
This commit is contained in:
parent
21d069695f
commit
b0cabb29f3
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2019 the original author or authors.
|
||||
* Copyright 2002-2020 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
@ -196,7 +196,8 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|||
return doBegin(synchronizationManager, transaction, definition).doOnSuccess(ignore ->
|
||||
prepareSynchronization(synchronizationManager, status, definition)).thenReturn(status)
|
||||
.onErrorResume(ErrorPredicates.RUNTIME_OR_ERROR, beginEx ->
|
||||
resumeAfterBeginException(synchronizationManager, transaction, suspendedResourcesHolder, beginEx).then(Mono.error(beginEx)));
|
||||
resumeAfterBeginException(synchronizationManager, transaction, suspendedResourcesHolder, beginEx)
|
||||
.then(Mono.error(beginEx)));
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -281,7 +282,9 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|||
if (synchronizationManager.isSynchronizationActive()) {
|
||||
Mono<List<TransactionSynchronization>> suspendedSynchronizations = doSuspendSynchronization(synchronizationManager);
|
||||
return suspendedSynchronizations.flatMap(synchronizations -> {
|
||||
Mono<Optional<Object>> suspendedResources = (transaction != null ? doSuspend(synchronizationManager, transaction).map(Optional::of).defaultIfEmpty(Optional.empty()) : Mono.just(Optional.empty()));
|
||||
Mono<Optional<Object>> suspendedResources = (transaction != null ?
|
||||
doSuspend(synchronizationManager, transaction).map(Optional::of).defaultIfEmpty(Optional.empty()) :
|
||||
Mono.just(Optional.empty()));
|
||||
return suspendedResources.map(it -> {
|
||||
String name = synchronizationManager.getCurrentTransactionName();
|
||||
synchronizationManager.setCurrentTransactionName(null);
|
||||
|
@ -293,12 +296,15 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|||
synchronizationManager.setActualTransactionActive(false);
|
||||
return new SuspendedResourcesHolder(
|
||||
it.orElse(null), synchronizations, name, readOnly, isolationLevel, wasActive);
|
||||
}).onErrorResume(ErrorPredicates.RUNTIME_OR_ERROR, ex -> doResumeSynchronization(synchronizationManager, synchronizations).cast(SuspendedResourcesHolder.class));
|
||||
}).onErrorResume(ErrorPredicates.RUNTIME_OR_ERROR,
|
||||
ex -> doResumeSynchronization(synchronizationManager, synchronizations)
|
||||
.cast(SuspendedResourcesHolder.class));
|
||||
});
|
||||
}
|
||||
else if (transaction != null) {
|
||||
// Transaction active but no synchronization active.
|
||||
Mono<Optional<Object>> suspendedResources = doSuspend(synchronizationManager, transaction).map(Optional::of).defaultIfEmpty(Optional.empty());
|
||||
Mono<Optional<Object>> suspendedResources =
|
||||
doSuspend(synchronizationManager, transaction).map(Optional::of).defaultIfEmpty(Optional.empty());
|
||||
return suspendedResources.map(it -> new SuspendedResourcesHolder(it.orElse(null)));
|
||||
}
|
||||
else {
|
||||
|
@ -445,10 +451,12 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|||
// Eclipse compiler with regard to inferred generics.
|
||||
Mono<Object> result = propagateException;
|
||||
if (ErrorPredicates.UNEXPECTED_ROLLBACK.test(ex)) {
|
||||
result = triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_ROLLED_BACK).then(propagateException);
|
||||
result = triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_ROLLED_BACK)
|
||||
.then(propagateException);
|
||||
}
|
||||
else if (ErrorPredicates.TRANSACTION_EXCEPTION.test(ex)) {
|
||||
result = triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_UNKNOWN).then(propagateException);
|
||||
result = triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_UNKNOWN)
|
||||
.then(propagateException);
|
||||
}
|
||||
else if (ErrorPredicates.RUNTIME_OR_ERROR.test(ex)) {
|
||||
Mono<Void> mono;
|
||||
|
@ -458,7 +466,8 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|||
else {
|
||||
mono = Mono.empty();
|
||||
}
|
||||
result = mono.then(doRollbackOnCommitException(synchronizationManager, status, ex)).then(propagateException);
|
||||
result = mono.then(doRollbackOnCommitException(synchronizationManager, status, ex))
|
||||
.then(propagateException);
|
||||
}
|
||||
|
||||
return result;
|
||||
|
@ -573,9 +582,9 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|||
if (status.isDebug()) {
|
||||
logger.trace("Triggering beforeCommit synchronization");
|
||||
}
|
||||
return TransactionSynchronizationUtils.triggerBeforeCommit(synchronizationManager.getSynchronizations(), status.isReadOnly());
|
||||
return TransactionSynchronizationUtils.triggerBeforeCommit(
|
||||
synchronizationManager.getSynchronizations(), status.isReadOnly());
|
||||
}
|
||||
|
||||
return Mono.empty();
|
||||
}
|
||||
|
||||
|
@ -593,7 +602,6 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|||
}
|
||||
return TransactionSynchronizationUtils.triggerBeforeCompletion(synchronizationManager.getSynchronizations());
|
||||
}
|
||||
|
||||
return Mono.empty();
|
||||
}
|
||||
|
||||
|
@ -611,7 +619,6 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|||
}
|
||||
return TransactionSynchronizationUtils.invokeAfterCommit(synchronizationManager.getSynchronizations());
|
||||
}
|
||||
|
||||
return Mono.empty();
|
||||
}
|
||||
|
||||
|
@ -639,10 +646,10 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|||
// Existing transaction that we participate in, controlled outside
|
||||
// of the scope of this Spring transaction manager -> try to register
|
||||
// an afterCompletion callback with the existing (JTA) transaction.
|
||||
return registerAfterCompletionWithExistingTransaction(synchronizationManager, status.getTransaction(), synchronizations);
|
||||
return registerAfterCompletionWithExistingTransaction(
|
||||
synchronizationManager, status.getTransaction(), synchronizations);
|
||||
}
|
||||
}
|
||||
|
||||
return Mono.empty();
|
||||
}
|
||||
|
||||
|
@ -690,7 +697,8 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|||
logger.debug("Resuming suspended transaction after completion of inner transaction");
|
||||
}
|
||||
Object transaction = (status.hasTransaction() ? status.getTransaction() : null);
|
||||
return cleanup.then(resume(synchronizationManager, transaction, (SuspendedResourcesHolder) status.getSuspendedResources()));
|
||||
return cleanup.then(resume(synchronizationManager, transaction,
|
||||
(SuspendedResourcesHolder) status.getSuspendedResources()));
|
||||
}
|
||||
return cleanup;
|
||||
});
|
||||
|
@ -716,14 +724,16 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|||
* returned transaction object.
|
||||
* @param synchronizationManager the synchronization manager bound to the current transaction
|
||||
* @return the current transaction object
|
||||
* @throws org.springframework.transaction.CannotCreateTransactionException if transaction support is not available
|
||||
* @throws org.springframework.transaction.CannotCreateTransactionException
|
||||
* if transaction support is not available
|
||||
* @throws TransactionException in case of lookup or system errors
|
||||
* @see #doBegin
|
||||
* @see #doCommit
|
||||
* @see #doRollback
|
||||
* @see GenericReactiveTransaction#getTransaction
|
||||
*/
|
||||
protected abstract Object doGetTransaction(TransactionSynchronizationManager synchronizationManager) throws TransactionException;
|
||||
protected abstract Object doGetTransaction(TransactionSynchronizationManager synchronizationManager)
|
||||
throws TransactionException;
|
||||
|
||||
/**
|
||||
* Check if the given transaction object indicates an existing transaction
|
||||
|
@ -775,7 +785,8 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|||
* @param transaction the transaction object returned by {@code doGetTransaction}
|
||||
* @return an object that holds suspended resources
|
||||
* (will be kept unexamined for passing it into doResume)
|
||||
* @throws org.springframework.transaction.TransactionSuspensionNotSupportedException if suspending is not supported by the transaction manager implementation
|
||||
* @throws org.springframework.transaction.TransactionSuspensionNotSupportedException
|
||||
* if suspending is not supported by the transaction manager implementation
|
||||
* @throws TransactionException in case of system errors
|
||||
* @see #doResume
|
||||
*/
|
||||
|
@ -795,7 +806,8 @@ public abstract class AbstractReactiveTransactionManager implements ReactiveTran
|
|||
* @param transaction the transaction object returned by {@code doGetTransaction}
|
||||
* @param suspendedResources the object that holds suspended resources,
|
||||
* as returned by doSuspend
|
||||
* @throws org.springframework.transaction.TransactionSuspensionNotSupportedException if resuming is not supported by the transaction manager implementation
|
||||
* @throws org.springframework.transaction.TransactionSuspensionNotSupportedException
|
||||
* if suspending is not supported by the transaction manager implementation
|
||||
* @throws TransactionException in case of system errors
|
||||
* @see #doSuspend
|
||||
*/
|
||||
|
|
|
@ -34,6 +34,7 @@ import org.springframework.util.Assert;
|
|||
* Reactor-Netty implementation of {@link ClientHttpConnector}.
|
||||
*
|
||||
* @author Brian Clozel
|
||||
* @author Rossen Stoyanchev
|
||||
* @since 5.0
|
||||
* @see reactor.netty.http.client.HttpClient
|
||||
*/
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2002-2019 the original author or authors.
|
||||
* Copyright 2002-2020 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
@ -38,6 +38,7 @@ import org.springframework.http.ZeroCopyHttpOutputMessage;
|
|||
* {@link ClientHttpRequest} implementation for the Reactor-Netty HTTP client.
|
||||
*
|
||||
* @author Brian Clozel
|
||||
* @author Rossen Stoyanchev
|
||||
* @since 5.0
|
||||
* @see reactor.netty.http.client.HttpClient
|
||||
*/
|
||||
|
|
|
@ -43,6 +43,7 @@ import org.springframework.util.MultiValueMap;
|
|||
* {@link ClientHttpResponse} implementation for the Reactor-Netty HTTP client.
|
||||
*
|
||||
* @author Brian Clozel
|
||||
* @author Rossen Stoyanchev
|
||||
* @since 5.0
|
||||
* @see reactor.netty.http.client.HttpClient
|
||||
*/
|
||||
|
@ -66,7 +67,7 @@ class ReactorClientHttpResponse implements ClientHttpResponse {
|
|||
/**
|
||||
* Constructor that matches the inputs from
|
||||
* {@link reactor.netty.http.client.HttpClient.ResponseReceiver#responseConnection(BiFunction)}.
|
||||
* @since 5.3
|
||||
* @since 5.2.8
|
||||
*/
|
||||
public ReactorClientHttpResponse(HttpClientResponse response, Connection connection) {
|
||||
this.response = response;
|
||||
|
@ -77,7 +78,7 @@ class ReactorClientHttpResponse implements ClientHttpResponse {
|
|||
|
||||
/**
|
||||
* Constructor with inputs extracted from a {@link Connection}.
|
||||
* @deprecated as of 5.2.8
|
||||
* @deprecated as of 5.2.8, in favor of {@link #ReactorClientHttpResponse(HttpClientResponse, Connection)}
|
||||
*/
|
||||
@Deprecated
|
||||
public ReactorClientHttpResponse(HttpClientResponse response, NettyInbound inbound, ByteBufAllocator alloc) {
|
||||
|
|
|
@ -38,6 +38,7 @@ import org.springframework.util.Assert;
|
|||
* and is expected typically to be declared as a Spring-managed bean.
|
||||
*
|
||||
* @author Rossen Stoyanchev
|
||||
* @author Brian Clozel
|
||||
* @since 5.1
|
||||
*/
|
||||
public class ReactorResourceFactory implements InitializingBean, DisposableBean {
|
||||
|
@ -97,8 +98,8 @@ public class ReactorResourceFactory implements InitializingBean, DisposableBean
|
|||
*/
|
||||
public void addGlobalResourcesConsumer(Consumer<HttpResources> consumer) {
|
||||
this.useGlobalResources = true;
|
||||
this.globalResourcesConsumer = this.globalResourcesConsumer != null ?
|
||||
this.globalResourcesConsumer.andThen(consumer) : consumer;
|
||||
this.globalResourcesConsumer = (this.globalResourcesConsumer != null ?
|
||||
this.globalResourcesConsumer.andThen(consumer) : consumer);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -221,8 +222,7 @@ public class ReactorResourceFactory implements InitializingBean, DisposableBean
|
|||
@Override
|
||||
public void destroy() {
|
||||
if (this.useGlobalResources) {
|
||||
HttpResources.disposeLoopsAndConnectionsLater(
|
||||
this.shutdownQuietPeriod, this.shutdownTimeout).block();
|
||||
HttpResources.disposeLoopsAndConnectionsLater(this.shutdownQuietPeriod, this.shutdownTimeout).block();
|
||||
}
|
||||
else {
|
||||
try {
|
||||
|
|
Loading…
Reference in New Issue