From 2b4eb610a7c8059e5b35c0e3807fc1335162c379 Mon Sep 17 00:00:00 2001 From: Sebastien Deleuze Date: Tue, 3 Sep 2019 23:37:48 +0200 Subject: [PATCH] Add support for Coroutines transactions This commit adds Coroutines extensions for TransactionalOperator.transactional that accept suspending lambda or Kotlin Flow parameters. @Transactional on suspending functions is not supported yet, gh-23575 has been created for that purpose. Closes gh-22915 --- spring-tx/spring-tx.gradle | 7 ++ .../interceptor/TransactionAspectSupport.java | 19 ++++ .../TransactionalOperatorExtensions.kt | 29 +++++ .../TransactionalOperatorExtensionsTests.kt | 100 ++++++++++++++++++ src/docs/asciidoc/languages/kotlin.adoc | 6 ++ 5 files changed, 161 insertions(+) create mode 100644 spring-tx/src/main/kotlin/org/springframework/transaction/reactive/TransactionalOperatorExtensions.kt create mode 100644 spring-tx/src/test/kotlin/org/springframework/transaction/reactive/TransactionalOperatorExtensionsTests.kt diff --git a/spring-tx/spring-tx.gradle b/spring-tx/spring-tx.gradle index e0cbd53924f..aa2a6a3fa09 100644 --- a/spring-tx/spring-tx.gradle +++ b/spring-tx/spring-tx.gradle @@ -1,10 +1,13 @@ description = "Spring Transaction" +apply plugin: "kotlin" + dependencies { compile(project(":spring-beans")) compile(project(":spring-core")) optional(project(":spring-aop")) optional(project(":spring-context")) // for JCA, @EnableTransactionManagement + optional(project(":kotlin-coroutines")) optional("javax.ejb:javax.ejb-api") optional("javax.interceptor:javax.interceptor-api") optional("javax.resource:javax.resource-api") @@ -12,6 +15,10 @@ dependencies { optional("com.ibm.websphere:uow") optional("io.projectreactor:reactor-core") optional("io.vavr:vavr") + optional("org.jetbrains.kotlin:kotlin-reflect") + optional("org.jetbrains.kotlin:kotlin-stdlib") + optional("org.jetbrains.kotlinx:kotlinx-coroutines-core") + optional("org.jetbrains.kotlinx:kotlinx-coroutines-reactor") testCompile("org.aspectj:aspectjweaver") testCompile("org.codehaus.groovy:groovy") testCompile("org.eclipse.persistence:javax.persistence") diff --git a/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java b/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java index f387418e144..8f227c54c7d 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java +++ b/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java @@ -21,6 +21,8 @@ import java.util.Properties; import java.util.concurrent.ConcurrentMap; import io.vavr.control.Try; +import kotlin.reflect.KFunction; +import kotlin.reflect.jvm.ReflectJvmMapping; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import reactor.core.publisher.Flux; @@ -30,6 +32,7 @@ import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.BeanFactoryAware; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.BeanFactoryAnnotationUtils; +import org.springframework.core.KotlinDetector; import org.springframework.core.NamedThreadLocal; import org.springframework.core.ReactiveAdapter; import org.springframework.core.ReactiveAdapterRegistry; @@ -41,6 +44,7 @@ import org.springframework.transaction.ReactiveTransactionManager; import org.springframework.transaction.TransactionManager; import org.springframework.transaction.TransactionStatus; import org.springframework.transaction.TransactionSystemException; +import org.springframework.transaction.TransactionUsageException; import org.springframework.transaction.reactive.TransactionContextManager; import org.springframework.transaction.support.CallbackPreferringPlatformTransactionManager; import org.springframework.util.Assert; @@ -322,6 +326,10 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init final InvocationCallback invocation) throws Throwable { if (this.reactiveAdapterRegistry != null) { + if (KotlinDetector.isKotlinType(method.getDeclaringClass()) && KotlinDelegate.isSuspend(method)) { + throw new TransactionUsageException("Annotated transactions on suspending functions are not supported," + + " use TransactionalOperator.transactional extensions instead."); + } ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(method.getReturnType()); if (adapter != null) { return new ReactiveTransactionSupport(adapter).invokeWithinTransaction(method, targetClass, invocation); @@ -809,6 +817,17 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init } } + /** + * Inner class to avoid a hard dependency on Kotlin at runtime. + */ + private static class KotlinDelegate { + + static private boolean isSuspend(Method method) { + KFunction function = ReflectJvmMapping.getKotlinFunction(method); + return function != null && function.isSuspend(); + } + } + /** * Delegate for Reactor-based management of transactional methods with a diff --git a/spring-tx/src/main/kotlin/org/springframework/transaction/reactive/TransactionalOperatorExtensions.kt b/spring-tx/src/main/kotlin/org/springframework/transaction/reactive/TransactionalOperatorExtensions.kt new file mode 100644 index 00000000000..3a874b860a7 --- /dev/null +++ b/spring-tx/src/main/kotlin/org/springframework/transaction/reactive/TransactionalOperatorExtensions.kt @@ -0,0 +1,29 @@ +package org.springframework.transaction.reactive + +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.reactive.asFlow +import kotlinx.coroutines.reactive.awaitFirstOrNull +import kotlinx.coroutines.reactor.asFlux +import kotlinx.coroutines.reactor.mono + +/** + * Coroutines variant of [TransactionalOperator.transactional] with a [Flow] parameter. + * + * @author Sebastien Deleuze + * @since 5.2 + */ +@ExperimentalCoroutinesApi +fun TransactionalOperator.transactional(flow: Flow): Flow = + transactional(flow.asFlux()).asFlow() + +/** +* Coroutines variant of [TransactionalOperator.transactional] with a suspending lambda +* parameter. +* +* @author Sebastien Deleuze +* @since 5.2 +*/ +suspend fun TransactionalOperator.transactional(f: suspend () -> T?): T? = + transactional(mono(Dispatchers.Unconfined) { f() }).awaitFirstOrNull() diff --git a/spring-tx/src/test/kotlin/org/springframework/transaction/reactive/TransactionalOperatorExtensionsTests.kt b/spring-tx/src/test/kotlin/org/springframework/transaction/reactive/TransactionalOperatorExtensionsTests.kt new file mode 100644 index 00000000000..ecbe883cb8d --- /dev/null +++ b/spring-tx/src/test/kotlin/org/springframework/transaction/reactive/TransactionalOperatorExtensionsTests.kt @@ -0,0 +1,100 @@ +/* + * Copyright 2002-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.transaction.reactive + +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.runBlocking +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.fail +import org.springframework.transaction.support.DefaultTransactionDefinition + +class TransactionalOperatorExtensionsTests { + + private val tm = ReactiveTestTransactionManager(false, true) + + @Test + fun commitWithSuspendingFunction() { + val operator = TransactionalOperator.create(tm, DefaultTransactionDefinition()) + runBlocking { + operator.transactional { + delay(1) + true + } + } + assertThat(tm.commit).isTrue() + assertThat(tm.rollback).isFalse() + } + + @Test + fun rollbackWithSuspendingFunction() { + val operator = TransactionalOperator.create(tm, DefaultTransactionDefinition()) + runBlocking { + try { + operator.transactional { + delay(1) + throw IllegalStateException() + } + } catch (ex: IllegalStateException) { + assertThat(tm.commit).isFalse() + assertThat(tm.rollback).isTrue() + return@runBlocking + } + fail("") + } + } + + @Test + @ExperimentalCoroutinesApi + fun commitWithFlow() { + val operator = TransactionalOperator.create(tm, DefaultTransactionDefinition()) + val flow = flow { + emit(1) + emit(2) + emit(3) + emit(4) + } + runBlocking { + val list = operator.transactional(flow).toList() + assertThat(list).hasSize(4) + } + assertThat(tm.commit).isTrue() + assertThat(tm.rollback).isFalse() + } + + @Test + @ExperimentalCoroutinesApi + fun rollbackWithFlow() { + val operator = TransactionalOperator.create(tm, DefaultTransactionDefinition()) + val flow = flow { + delay(1) + throw IllegalStateException() + } + runBlocking { + try { + operator.transactional(flow).toList() + } catch (ex: IllegalStateException) { + assertThat(tm.commit).isFalse() + assertThat(tm.rollback).isTrue() + return@runBlocking + } + } + } +} diff --git a/src/docs/asciidoc/languages/kotlin.adoc b/src/docs/asciidoc/languages/kotlin.adoc index 1189bdae35a..7cd8265d1be 100644 --- a/src/docs/asciidoc/languages/kotlin.adoc +++ b/src/docs/asciidoc/languages/kotlin.adoc @@ -574,6 +574,12 @@ class UserHandler(builder: WebClient.Builder) { } ---- +=== Transactions + +Transactions on Coroutines are supported via the programmatic variant of the Reactive +transaction management provided as of Spring Framework 5.2. `TransactionalOperator.transactional` +extensions with suspending lambda and Kotlin `Flow` parameter are provided for that purpose. + [[kotlin-spring-projects-in-kotlin]] == Spring Projects in Kotlin