Add support for Coroutines transactions

This commit adds Coroutines extensions for
TransactionalOperator.transactional that accept suspending lambda or
Kotlin Flow parameters.

@Transactional on suspending functions is not supported yet, gh-23575
has been created for that purpose.

Closes gh-22915
This commit is contained in:
Sebastien Deleuze 2019-09-03 23:37:48 +02:00
parent aeb857c3ba
commit 2b4eb610a7
5 changed files with 161 additions and 0 deletions

View File

@ -1,10 +1,13 @@
description = "Spring Transaction"
apply plugin: "kotlin"
dependencies {
compile(project(":spring-beans"))
compile(project(":spring-core"))
optional(project(":spring-aop"))
optional(project(":spring-context")) // for JCA, @EnableTransactionManagement
optional(project(":kotlin-coroutines"))
optional("javax.ejb:javax.ejb-api")
optional("javax.interceptor:javax.interceptor-api")
optional("javax.resource:javax.resource-api")
@ -12,6 +15,10 @@ dependencies {
optional("com.ibm.websphere:uow")
optional("io.projectreactor:reactor-core")
optional("io.vavr:vavr")
optional("org.jetbrains.kotlin:kotlin-reflect")
optional("org.jetbrains.kotlin:kotlin-stdlib")
optional("org.jetbrains.kotlinx:kotlinx-coroutines-core")
optional("org.jetbrains.kotlinx:kotlinx-coroutines-reactor")
testCompile("org.aspectj:aspectjweaver")
testCompile("org.codehaus.groovy:groovy")
testCompile("org.eclipse.persistence:javax.persistence")

View File

@ -21,6 +21,8 @@ import java.util.Properties;
import java.util.concurrent.ConcurrentMap;
import io.vavr.control.Try;
import kotlin.reflect.KFunction;
import kotlin.reflect.jvm.ReflectJvmMapping;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Flux;
@ -30,6 +32,7 @@ import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.BeanFactoryAnnotationUtils;
import org.springframework.core.KotlinDetector;
import org.springframework.core.NamedThreadLocal;
import org.springframework.core.ReactiveAdapter;
import org.springframework.core.ReactiveAdapterRegistry;
@ -41,6 +44,7 @@ import org.springframework.transaction.ReactiveTransactionManager;
import org.springframework.transaction.TransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.TransactionSystemException;
import org.springframework.transaction.TransactionUsageException;
import org.springframework.transaction.reactive.TransactionContextManager;
import org.springframework.transaction.support.CallbackPreferringPlatformTransactionManager;
import org.springframework.util.Assert;
@ -322,6 +326,10 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
final InvocationCallback invocation) throws Throwable {
if (this.reactiveAdapterRegistry != null) {
if (KotlinDetector.isKotlinType(method.getDeclaringClass()) && KotlinDelegate.isSuspend(method)) {
throw new TransactionUsageException("Annotated transactions on suspending functions are not supported," +
" use TransactionalOperator.transactional extensions instead.");
}
ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(method.getReturnType());
if (adapter != null) {
return new ReactiveTransactionSupport(adapter).invokeWithinTransaction(method, targetClass, invocation);
@ -809,6 +817,17 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init
}
}
/**
* Inner class to avoid a hard dependency on Kotlin at runtime.
*/
private static class KotlinDelegate {
static private boolean isSuspend(Method method) {
KFunction<?> function = ReflectJvmMapping.getKotlinFunction(method);
return function != null && function.isSuspend();
}
}
/**
* Delegate for Reactor-based management of transactional methods with a

View File

@ -0,0 +1,29 @@
package org.springframework.transaction.reactive
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.reactive.asFlow
import kotlinx.coroutines.reactive.awaitFirstOrNull
import kotlinx.coroutines.reactor.asFlux
import kotlinx.coroutines.reactor.mono
/**
* Coroutines variant of [TransactionalOperator.transactional] with a [Flow] parameter.
*
* @author Sebastien Deleuze
* @since 5.2
*/
@ExperimentalCoroutinesApi
fun <T : Any> TransactionalOperator.transactional(flow: Flow<T>): Flow<T> =
transactional(flow.asFlux()).asFlow()
/**
* Coroutines variant of [TransactionalOperator.transactional] with a suspending lambda
* parameter.
*
* @author Sebastien Deleuze
* @since 5.2
*/
suspend fun <T : Any> TransactionalOperator.transactional(f: suspend () -> T?): T? =
transactional(mono(Dispatchers.Unconfined) { f() }).awaitFirstOrNull()

View File

@ -0,0 +1,100 @@
/*
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.transaction.reactive
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.runBlocking
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.fail
import org.springframework.transaction.support.DefaultTransactionDefinition
class TransactionalOperatorExtensionsTests {
private val tm = ReactiveTestTransactionManager(false, true)
@Test
fun commitWithSuspendingFunction() {
val operator = TransactionalOperator.create(tm, DefaultTransactionDefinition())
runBlocking {
operator.transactional {
delay(1)
true
}
}
assertThat(tm.commit).isTrue()
assertThat(tm.rollback).isFalse()
}
@Test
fun rollbackWithSuspendingFunction() {
val operator = TransactionalOperator.create(tm, DefaultTransactionDefinition())
runBlocking {
try {
operator.transactional {
delay(1)
throw IllegalStateException()
}
} catch (ex: IllegalStateException) {
assertThat(tm.commit).isFalse()
assertThat(tm.rollback).isTrue()
return@runBlocking
}
fail("")
}
}
@Test
@ExperimentalCoroutinesApi
fun commitWithFlow() {
val operator = TransactionalOperator.create(tm, DefaultTransactionDefinition())
val flow = flow {
emit(1)
emit(2)
emit(3)
emit(4)
}
runBlocking {
val list = operator.transactional(flow).toList()
assertThat(list).hasSize(4)
}
assertThat(tm.commit).isTrue()
assertThat(tm.rollback).isFalse()
}
@Test
@ExperimentalCoroutinesApi
fun rollbackWithFlow() {
val operator = TransactionalOperator.create(tm, DefaultTransactionDefinition())
val flow = flow<Int> {
delay(1)
throw IllegalStateException()
}
runBlocking {
try {
operator.transactional(flow).toList()
} catch (ex: IllegalStateException) {
assertThat(tm.commit).isFalse()
assertThat(tm.rollback).isTrue()
return@runBlocking
}
}
}
}

View File

@ -574,6 +574,12 @@ class UserHandler(builder: WebClient.Builder) {
}
----
=== Transactions
Transactions on Coroutines are supported via the programmatic variant of the Reactive
transaction management provided as of Spring Framework 5.2. `TransactionalOperator.transactional`
extensions with suspending lambda and Kotlin `Flow` parameter are provided for that purpose.
[[kotlin-spring-projects-in-kotlin]]
== Spring Projects in Kotlin