diff --git a/framework-docs/framework-docs.gradle b/framework-docs/framework-docs.gradle index 89250dcf59..0f27003582 100644 --- a/framework-docs/framework-docs.gradle +++ b/framework-docs/framework-docs.gradle @@ -65,6 +65,7 @@ dependencies { implementation("com.github.ben-manes.caffeine:caffeine") implementation("com.mchange:c3p0:0.9.5.5") implementation("com.oracle.database.jdbc:ojdbc11") + implementation("io.micrometer:context-propagation") implementation("io.projectreactor.netty:reactor-netty-http") implementation("jakarta.jms:jakarta.jms-api") implementation("jakarta.servlet:jakarta.servlet-api") @@ -78,6 +79,8 @@ dependencies { implementation("org.assertj:assertj-core") implementation("org.eclipse.jetty.websocket:jetty-websocket-jetty-api") implementation("org.jetbrains.kotlin:kotlin-stdlib") + implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core") + implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor") implementation("org.junit.jupiter:junit-jupiter-api") implementation("tools.jackson.core:jackson-databind") implementation("tools.jackson.dataformat:jackson-dataformat-xml") diff --git a/framework-docs/modules/ROOT/pages/languages/kotlin/coroutines.adoc b/framework-docs/modules/ROOT/pages/languages/kotlin/coroutines.adoc index 7d8dd95ae7..5b59519730 100644 --- a/framework-docs/modules/ROOT/pages/languages/kotlin/coroutines.adoc +++ b/framework-docs/modules/ROOT/pages/languages/kotlin/coroutines.adoc @@ -250,3 +250,36 @@ For Kotlin `Flow`, a `Flow.transactional` extension is provided. } } ---- + +[[coroutines.propagation]] +== Context Propagation + +Spring applications are xref:integration/observability.adoc[instrumented with Micrometer for Observability support]. +For tracing support, the current observation is propagated through a `ThreadLocal` for blocking code, +or the Reactor `Context` for reactive pipelines. But the current observation also needs to be made available +in the execution context of a suspended function. Without that, the current "traceId" will not be automatically prepended +to logged statements from coroutines. + +The `org.springframework.core.PropagationContextElement` operator generally ensures that the +{micrometer-context-propagation-docs}/[Micrometer Context Propagation library] works with Kotlin Coroutines. + +The `PropagationContextElement` requires the following dependencies: + +`build.gradle.kts` +[source,kotlin,indent=0] +---- +dependencies { + implementation("io.micrometer:context-propagation:${contextPropagationVersion}") + implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:${coroutinesVersion}") + implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:${coroutinesVersion}") +} +---- + +Applications can then use the `PropagationContextElement` operator to connect the `currentCoroutineContext()` +with the context propagation mechanism: + +include-code::./ContextPropagationSample[tag=context,indent=0] + +Here, assuming that Micrometer Tracing is configured, the resulting logging statement +will show the current "traceId" and unlock better observability for your application. + diff --git a/framework-docs/src/main/kotlin/org/springframework/docs/languages/kotlin/coroutines/propagation/ContextPropagationSample.kt b/framework-docs/src/main/kotlin/org/springframework/docs/languages/kotlin/coroutines/propagation/ContextPropagationSample.kt new file mode 100644 index 0000000000..e8f0721511 --- /dev/null +++ b/framework-docs/src/main/kotlin/org/springframework/docs/languages/kotlin/coroutines/propagation/ContextPropagationSample.kt @@ -0,0 +1,40 @@ +/* + * Copyright 2002-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.docs.languages.kotlin.coroutines.propagation + +import kotlinx.coroutines.currentCoroutineContext +import kotlinx.coroutines.withContext +import org.apache.commons.logging.Log +import org.apache.commons.logging.LogFactory +import org.springframework.core.PropagationContextElement + +class ContextPropagationSample { + + companion object { + private val logger: Log = LogFactory.getLog( + ContextPropagationSample::class.java + ) + } + + // tag::context[] + suspend fun suspendingFunction() { + return withContext(PropagationContextElement(currentCoroutineContext())) { + logger.info("Suspending function with traceId") + } + } + // end::context[] +} \ No newline at end of file diff --git a/spring-core/spring-core.gradle b/spring-core/spring-core.gradle index a6445f5438..cd88a0ee6d 100644 --- a/spring-core/spring-core.gradle +++ b/spring-core/spring-core.gradle @@ -104,6 +104,8 @@ dependencies { testImplementation("jakarta.xml.bind:jakarta.xml.bind-api") testImplementation("org.jetbrains.kotlinx:kotlinx-serialization-json") testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor") + testImplementation("io.micrometer:context-propagation") + testImplementation("io.micrometer:micrometer-observation-test") testImplementation("org.mockito:mockito-core") testImplementation("com.networknt:json-schema-validator"); testImplementation("org.skyscreamer:jsonassert") diff --git a/spring-core/src/main/kotlin/org/springframework/core/PropagationContextElement.kt b/spring-core/src/main/kotlin/org/springframework/core/PropagationContextElement.kt new file mode 100644 index 0000000000..c55d66fd41 --- /dev/null +++ b/spring-core/src/main/kotlin/org/springframework/core/PropagationContextElement.kt @@ -0,0 +1,75 @@ +/* + * Copyright 2002-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.core + +import io.micrometer.context.ContextRegistry +import io.micrometer.context.ContextSnapshot +import io.micrometer.context.ContextSnapshotFactory +import kotlinx.coroutines.ThreadContextElement +import kotlinx.coroutines.reactor.ReactorContext +import reactor.util.context.ContextView +import kotlin.coroutines.AbstractCoroutineContextElement +import kotlin.coroutines.CoroutineContext + + +/** + * [ThreadContextElement] that restores `ThreadLocals` from the Reactor [ContextSnapshot] + * every time the coroutine with this element in the context is resumed on a thread. + * + * This effectively ensures that Kotlin Coroutines, Reactor and Micrometer Context Propagation + * work together in an application, typically for observability purposes. + * + * Applications need to have both `"io.micrometer:context-propagation"` and + * `"org.jetbrains.kotlinx:kotlinx-coroutines-reactor"` on the classpath to use this context element. + * + * The `PropagationContextElement` can be used like this: + * + * ```kotlin + * suspend fun suspendable() { + * withContext(PropagationContextElement(coroutineContext)) { + * logger.info("Log statement with traceId") + * } + * } + * ``` + * + * @author Brian Clozel + * @since 7.0 + */ +class PropagationContextElement(private val context: CoroutineContext) : ThreadContextElement, + AbstractCoroutineContextElement(Key) { + + companion object Key : CoroutineContext.Key + + val contextSnapshot: ContextSnapshot + get() { + val contextView: ContextView? = context[ReactorContext]?.context + val contextSnapshotFactory = + ContextSnapshotFactory.builder().contextRegistry(ContextRegistry.getInstance()).build() + if (contextView != null) { + return contextSnapshotFactory.captureFrom(contextView) + } + return contextSnapshotFactory.captureAll() + } + + override fun restoreThreadContext(context: CoroutineContext, oldState: ContextSnapshot.Scope) { + oldState.close() + } + + override fun updateThreadContext(context: CoroutineContext): ContextSnapshot.Scope { + return contextSnapshot.setThreadLocals() + } +} \ No newline at end of file diff --git a/spring-core/src/test/kotlin/org/springframework/core/PropagationContextElementTests.kt b/spring-core/src/test/kotlin/org/springframework/core/PropagationContextElementTests.kt new file mode 100644 index 0000000000..d7a7bdf571 --- /dev/null +++ b/spring-core/src/test/kotlin/org/springframework/core/PropagationContextElementTests.kt @@ -0,0 +1,93 @@ +/* + * Copyright 2002-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.core + +import io.micrometer.observation.Observation +import io.micrometer.observation.tck.TestObservationRegistry +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.currentCoroutineContext +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.withContext +import org.assertj.core.api.Assertions +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.AfterAll +import org.junit.jupiter.api.BeforeAll +import org.junit.jupiter.api.Test +import org.reactivestreams.Publisher +import reactor.core.publisher.Hooks +import reactor.core.publisher.Mono +import reactor.core.scheduler.Schedulers +import kotlin.coroutines.Continuation + + +/** + * Kotlin tests for [PropagationContextElement]. + * + * @author Brian Clozel + */ +class PropagationContextElementTests { + + private val observationRegistry = TestObservationRegistry.create() + + companion object { + + @BeforeAll + @JvmStatic + fun init() { + Hooks.enableAutomaticContextPropagation() + } + + @AfterAll + @JvmStatic + fun cleanup() { + Hooks.disableAutomaticContextPropagation() + } + + } + + @Test + fun restoresFromThreadLocal() { + val observation = Observation.createNotStarted("coroutine", observationRegistry) + observation.observe { + val result = runBlocking(Dispatchers.Unconfined) { + suspendingFunction("test") + } + Assertions.assertThat(result).isEqualTo("coroutine") + } + } + + @Test + @Suppress("UNCHECKED_CAST") + fun restoresFromReactorContext() { + val method = PropagationContextElementTests::class.java.getDeclaredMethod("suspendingFunction", String::class.java, Continuation::class.java) + val publisher = CoroutinesUtils.invokeSuspendingFunction(method, this, "test", null) as Publisher + val observation = Observation.createNotStarted("coroutine", observationRegistry) + observation.observe { + val result = Mono.from(publisher).publishOn(Schedulers.boundedElastic()).block() + assertThat(result).isEqualTo("coroutine") + } + } + + suspend fun suspendingFunction(value: String): String? { + return withContext(PropagationContextElement(currentCoroutineContext())) { + val currentObservation = observationRegistry.currentObservation + assertThat(currentObservation).isNotNull + currentObservation?.context?.name + } + } + +}