Add ContextPropagatingTaskDecorator
Prior to this commit, `@Async` and `@EventListener` annotated methods would lose the the logging and observation contexts whenever their execution was scheduled on a different Thread. The Context Propagation library supports this use case and can propagate context values in ThreadLocals, Reactor Context and more. This commit introduces a new `TaskDecorator` implementation that leverages the Context Propagation library. When configured on a `TaskExecutor`, this allows to properly propagate context value through the execution of the task. This implementation is completely optional and requires the "io.micrometer:context-propagation" library on the classpath. Enabling this feature must be done consciously and sometimes selectively, as context propagation introduces some overhead. Closes gh-31130
This commit is contained in:
parent
d47c7f9552
commit
f5a356c9c6
|
@ -478,20 +478,21 @@ Kotlin::
|
|||
----
|
||||
======
|
||||
|
||||
Notice that `ApplicationListener` is generically parameterized with the type of your
|
||||
custom event (`BlockedListEvent` in the preceding example). This means that the
|
||||
`onApplicationEvent()` method can remain type-safe, avoiding any need for downcasting.
|
||||
You can register as many event listeners as you wish, but note that, by default, event
|
||||
listeners receive events synchronously. This means that the `publishEvent()` method
|
||||
blocks until all listeners have finished processing the event. One advantage of this
|
||||
synchronous and single-threaded approach is that, when a listener receives an event,
|
||||
it operates inside the transaction context of the publisher if a transaction context
|
||||
is available. If another strategy for event publication becomes necessary, e.g.
|
||||
asynchronous event processing by default, see the javadoc for Spring's
|
||||
{api-spring-framework}/context/event/ApplicationEventMulticaster.html[`ApplicationEventMulticaster`] interface
|
||||
and {api-spring-framework}/context/event/SimpleApplicationEventMulticaster.html[`SimpleApplicationEventMulticaster`]
|
||||
implementation for configuration options which can be applied to a custom
|
||||
"applicationEventMulticaster" bean definition.
|
||||
Notice that `ApplicationListener` is generically parameterized with the type of your custom event (`BlockedListEvent` in the preceding example).
|
||||
This means that the `onApplicationEvent()` method can remain type-safe, avoiding any need for downcasting.
|
||||
You can register as many event listeners as you wish, but note that, by default, event listeners receive events synchronously.
|
||||
This means that the `publishEvent()` method blocks until all listeners have finished processing the event.
|
||||
One advantage of this synchronous and single-threaded approach is that, when a listener receives an event,
|
||||
it operates inside the transaction context of the publisher if a transaction context is available.
|
||||
If another strategy for event publication becomes necessary, e.g. asynchronous event processing by default,
|
||||
see the javadoc for Spring's {api-spring-framework}/context/event/ApplicationEventMulticaster.html[`ApplicationEventMulticaster`] interface
|
||||
and {api-spring-framework}/context/event/SimpleApplicationEventMulticaster.html[`SimpleApplicationEventMulticaster`] implementation
|
||||
for configuration options which can be applied to a custom "applicationEventMulticaster" bean definition.
|
||||
In these cases, ThreadLocals and logging context are not propagated for the event processing.
|
||||
See xref:integration/observability.adoc#observability.application-events[the `@EventListener` Observability section]
|
||||
for more information on Observability concerns.
|
||||
|
||||
|
||||
|
||||
The following example shows the bean definitions used to register and configure each of
|
||||
the classes above:
|
||||
|
@ -747,6 +748,9 @@ Be aware of the following limitations when using asynchronous events:
|
|||
value. If you need to publish another event as the result of the processing, inject an
|
||||
{api-spring-framework}/context/ApplicationEventPublisher.html[`ApplicationEventPublisher`]
|
||||
to publish the event manually.
|
||||
* ThreadLocals and logging context are not propagated by default for the event processing.
|
||||
See xref:integration/observability.adoc#observability.application-events[the `@EventListener` Observability section]
|
||||
for more information on Observability concerns.
|
||||
|
||||
|
||||
[[context-functionality-events-order]]
|
||||
|
|
|
@ -349,3 +349,28 @@ Instrumentation uses the `org.springframework.web.reactive.function.client.Clien
|
|||
|===
|
||||
|
||||
|
||||
[[observability.application-events]]
|
||||
== Application Events and `@EventListener`
|
||||
|
||||
Spring Framework does not contribute Observations for xref:core/beans/context-introduction.adoc#context-functionality-events-annotation[`@EventListener` calls],
|
||||
as they don't have the right semantics for such instrumentation.
|
||||
By default, event publication and processing is done synchronously and on the same Thread.
|
||||
This means that during the execution of that task, the ThreadLocals and logging context will be the same as the event publisher.
|
||||
|
||||
If the application configures globally a custom `ApplicationEventMulticaster` with a strategy that schedules event processing on different threads, this is no longer true.
|
||||
All `@EventListener` methods will be processed on a different thread, outstide of the main event publication thread.
|
||||
In these cases, the https://micrometer.io/docs/contextPropagation[Micrometer Context Propagation library] can help propagating such values and better correlate the processing of the events.
|
||||
The application can configure the chosen `TaskExecutor` to use a `ContextPropagatingTaskDecorator` that decorates tasks and propagates context.
|
||||
For this to work, the `io.micrometer:context-propagation` library must be present on the classpath:
|
||||
|
||||
include-code::./ApplicationEventsConfiguration[]
|
||||
|
||||
Similarly, if that asynchronous choice is made locally for each `@EventListener` annotated method, by adding an `@Async` method to it,
|
||||
you can choose a `TaskExecutor` that propagates context by referring to it by its qualifier.
|
||||
Given the following `TaskExecutor` bean definition, configured with the dedicated task decorator:
|
||||
|
||||
include-code::./EventAsyncExecutionConfiguration[]
|
||||
|
||||
Annotating event listeners with `@Async` and the relevant qualifier will achieve similar context propagation results:
|
||||
|
||||
include-code::./EmailNotificationListener[]
|
||||
|
|
|
@ -0,0 +1,38 @@
|
|||
/*
|
||||
* Copyright 2002-2023 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.docs.integration.observability.applicationevents;
|
||||
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.context.event.SimpleApplicationEventMulticaster;
|
||||
import org.springframework.core.task.SimpleAsyncTaskExecutor;
|
||||
import org.springframework.core.task.support.ContextPropagatingTaskDecorator;
|
||||
|
||||
@Configuration
|
||||
public class ApplicationEventsConfiguration {
|
||||
|
||||
@Bean(name = "applicationEventMulticaster")
|
||||
public SimpleApplicationEventMulticaster simpleApplicationEventMulticaster() {
|
||||
SimpleApplicationEventMulticaster eventMulticaster = new SimpleApplicationEventMulticaster();
|
||||
SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
|
||||
// decorate task execution with a decorator that supports context propagation
|
||||
taskExecutor.setTaskDecorator(new ContextPropagatingTaskDecorator());
|
||||
eventMulticaster.setTaskExecutor(taskExecutor);
|
||||
return eventMulticaster;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,39 @@
|
|||
/*
|
||||
* Copyright 2002-2023 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.docs.integration.observability.applicationevents;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import org.springframework.context.event.EventListener;
|
||||
import org.springframework.scheduling.annotation.Async;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Component
|
||||
public class EmailNotificationListener {
|
||||
|
||||
private final Log logger = LogFactory.getLog(EmailNotificationListener.class);
|
||||
|
||||
@EventListener(EmailReceivedEvent.class)
|
||||
@Async("propagatingContextExecutor")
|
||||
public void emailReceived(EmailReceivedEvent event) {
|
||||
// asynchronously process the received event
|
||||
// this logging statement will contain the expected MDC entries from the propagated context
|
||||
logger.info("email has been received");
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,28 @@
|
|||
/*
|
||||
* Copyright 2002-2023 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.docs.integration.observability.applicationevents;
|
||||
|
||||
import org.springframework.context.ApplicationEvent;
|
||||
|
||||
@SuppressWarnings("serial")
|
||||
public class EmailReceivedEvent extends ApplicationEvent {
|
||||
|
||||
public EmailReceivedEvent(Object source) {
|
||||
super(source);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,36 @@
|
|||
/*
|
||||
* Copyright 2002-2023 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.docs.integration.observability.applicationevents;
|
||||
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.core.task.SimpleAsyncTaskExecutor;
|
||||
import org.springframework.core.task.TaskExecutor;
|
||||
import org.springframework.core.task.support.ContextPropagatingTaskDecorator;
|
||||
|
||||
@Configuration
|
||||
public class EventAsyncExecutionConfiguration {
|
||||
|
||||
@Bean(name = "propagatingContextExecutor")
|
||||
public TaskExecutor propagatingContextExecutor() {
|
||||
SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
|
||||
// decorate task execution with a decorator that supports context propagation
|
||||
taskExecutor.setTaskDecorator(new ContextPropagatingTaskDecorator());
|
||||
return taskExecutor;
|
||||
}
|
||||
|
||||
}
|
|
@ -73,6 +73,7 @@ dependencies {
|
|||
api(project(":spring-jcl"))
|
||||
compileOnly("io.projectreactor.tools:blockhound")
|
||||
compileOnly("org.graalvm.sdk:graal-sdk")
|
||||
optional("io.micrometer:context-propagation")
|
||||
optional("io.netty:netty-buffer")
|
||||
optional("io.netty:netty5-buffer")
|
||||
optional("io.projectreactor:reactor-core")
|
||||
|
|
|
@ -0,0 +1,62 @@
|
|||
/*
|
||||
* Copyright 2002-2023 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.core.task.support;
|
||||
|
||||
import io.micrometer.context.ContextSnapshot;
|
||||
import io.micrometer.context.ContextSnapshotFactory;
|
||||
|
||||
import org.springframework.core.task.TaskDecorator;
|
||||
|
||||
/**
|
||||
* {@link TaskDecorator} that {@link ContextSnapshot#wrap(Runnable) wrap the execution} of
|
||||
* tasks, assisting with context propagation.
|
||||
* <p>This operation is only useful when the task execution is scheduled on a different
|
||||
* thread than the original call stack; this depends on the choice of
|
||||
* {@link org.springframework.core.task.TaskExecutor}. This is particularly useful for
|
||||
* restoring a logging context or an observation context for the task execution. Note that
|
||||
* this decorator will cause some overhead for task execution and is not recommended for
|
||||
* applications that run lots of very small tasks.
|
||||
*
|
||||
* @author Brian Clozel
|
||||
* @since 6.1
|
||||
* @see CompositeTaskDecorator
|
||||
*/
|
||||
public class ContextPropagatingTaskDecorator implements TaskDecorator {
|
||||
|
||||
private final ContextSnapshotFactory factory;
|
||||
|
||||
/**
|
||||
* Create a new decorator that uses a default instance of the {@link ContextSnapshotFactory}.
|
||||
*/
|
||||
public ContextPropagatingTaskDecorator() {
|
||||
this(ContextSnapshotFactory.builder().build());
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new decorator using the given {@link ContextSnapshotFactory}.
|
||||
* @param factory the context snapshot factory to use.
|
||||
*/
|
||||
public ContextPropagatingTaskDecorator(ContextSnapshotFactory factory) {
|
||||
this.factory = factory;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Runnable decorate(Runnable runnable) {
|
||||
return this.factory.captureAll().wrap(runnable);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,100 @@
|
|||
/*
|
||||
* Copyright 2002-2023 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.core.task.support;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import io.micrometer.context.ContextRegistry;
|
||||
import io.micrometer.context.ContextSnapshotFactory;
|
||||
import io.micrometer.context.ThreadLocalAccessor;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
/**
|
||||
* Tests for {@link ContextPropagatingTaskDecorator}.
|
||||
* @author Brian Clozel
|
||||
*/
|
||||
class ContextPropagatingTaskDecoratorTests {
|
||||
|
||||
@Test
|
||||
void shouldPropagateContextInTaskExecution() throws Exception {
|
||||
AtomicReference<String> actual = new AtomicReference<>("");
|
||||
ContextRegistry registry = new ContextRegistry();
|
||||
registry.registerThreadLocalAccessor(new TestThreadLocalAccessor());
|
||||
ContextSnapshotFactory snapshotFactory = ContextSnapshotFactory.builder().contextRegistry(registry).build();
|
||||
|
||||
Runnable task = () -> actual.set(TestThreadLocalHolder.getValue());
|
||||
TestThreadLocalHolder.setValue("expected");
|
||||
|
||||
Thread execution = new Thread(new ContextPropagatingTaskDecorator(snapshotFactory).decorate(task));
|
||||
execution.start();
|
||||
execution.join();
|
||||
assertThat(actual.get()).isEqualTo("expected");
|
||||
TestThreadLocalHolder.reset();
|
||||
}
|
||||
|
||||
static class TestThreadLocalHolder {
|
||||
|
||||
private static final ThreadLocal<String> holder = new ThreadLocal<>();
|
||||
|
||||
public static void setValue(String value) {
|
||||
holder.set(value);
|
||||
}
|
||||
|
||||
public static String getValue() {
|
||||
return holder.get();
|
||||
}
|
||||
|
||||
public static void reset() {
|
||||
holder.remove();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
static class TestThreadLocalAccessor implements ThreadLocalAccessor<String> {
|
||||
|
||||
public static final String KEY = "test.threadlocal";
|
||||
|
||||
@Override
|
||||
public Object key() {
|
||||
return KEY;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getValue() {
|
||||
return TestThreadLocalHolder.getValue();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setValue(String value) {
|
||||
TestThreadLocalHolder.setValue(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setValue() {
|
||||
TestThreadLocalHolder.reset();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void restore(String previousValue) {
|
||||
setValue(previousValue);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue