Merge cc7dc955ac
into 7e6874ad80
This commit is contained in:
commit
665f20642e
|
@ -0,0 +1,72 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2002-present the original author or authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* https://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.springframework.core.concurrency;
|
||||||
|
|
||||||
|
import org.jspecify.annotations.Nullable;
|
||||||
|
|
||||||
|
import org.springframework.util.ConcurrencyThrottleSupport;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Template-style API that throttles concurrent executions of user-provided callbacks
|
||||||
|
* according to a configurable concurrency limit.
|
||||||
|
*
|
||||||
|
* <p>Blocking semantics are identical to {@link ConcurrencyThrottleSupport}:
|
||||||
|
* when the configured limit is reached, additional callers will block until a
|
||||||
|
* permit becomes available.
|
||||||
|
*
|
||||||
|
* <p>The default concurrency limit of this template is 1.
|
||||||
|
*
|
||||||
|
* @author Geonhu Park
|
||||||
|
* @since 7.1
|
||||||
|
* @see ConcurrencyThrottleSupport
|
||||||
|
*/
|
||||||
|
@SuppressWarnings("serial")
|
||||||
|
public class ConcurrencyLimitTemplate extends ConcurrencyThrottleSupport {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a default {@code ConcurrencyLimitTemplate}
|
||||||
|
* with concurrency limit 1.
|
||||||
|
*/
|
||||||
|
public ConcurrencyLimitTemplate() {
|
||||||
|
this(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a {@code ConcurrencyThrottleInterceptor}
|
||||||
|
* with the given concurrency limit.
|
||||||
|
*/
|
||||||
|
public ConcurrencyLimitTemplate(int concurrencyLimit) {
|
||||||
|
setConcurrencyLimit(concurrencyLimit);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Execute the supplied callback under the configured concurrency limit.
|
||||||
|
* @param concurrencyLimited the unit of work to run
|
||||||
|
* @param <R> the result type (nullable)
|
||||||
|
* @return the callback's result (possibly {@code null})
|
||||||
|
* @throws Throwable any exception thrown by the callback
|
||||||
|
*/
|
||||||
|
public <R extends @Nullable Object> @Nullable R execute(ConcurrencyLimited<R> concurrencyLimited) throws Throwable {
|
||||||
|
beforeAccess();
|
||||||
|
try {
|
||||||
|
return concurrencyLimited.execute();
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
afterAccess();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,39 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2002-present the original author or authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* https://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.springframework.core.concurrency;
|
||||||
|
|
||||||
|
import org.jspecify.annotations.Nullable;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Functional callback representing a single unit of work to be executed under
|
||||||
|
* the concurrency throttling of a {@link ConcurrencyLimitTemplate}.
|
||||||
|
*
|
||||||
|
* @author Geonhu Park
|
||||||
|
* @since 7.1
|
||||||
|
* @param <R> the result type (nullable)
|
||||||
|
* @see ConcurrencyLimitTemplate
|
||||||
|
*/
|
||||||
|
@FunctionalInterface
|
||||||
|
public interface ConcurrencyLimited<R extends @Nullable Object> {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Execute the concurrency-limited operation.
|
||||||
|
* @return the result (may be {@code null})
|
||||||
|
* @throws Throwable any error from the underlying operation
|
||||||
|
*/
|
||||||
|
R execute() throws Throwable;
|
||||||
|
}
|
|
@ -0,0 +1,5 @@
|
||||||
|
/**
|
||||||
|
* Concurrency limiting (throttling) support via {@link org.springframework.core.concurrency.ConcurrencyLimitTemplate}
|
||||||
|
* and {@link org.springframework.core.concurrency.ConcurrencyLimited}.
|
||||||
|
*/
|
||||||
|
package org.springframework.core.concurrency;
|
|
@ -44,6 +44,7 @@ import org.apache.commons.logging.LogFactory;
|
||||||
* @see #beforeAccess()
|
* @see #beforeAccess()
|
||||||
* @see #afterAccess()
|
* @see #afterAccess()
|
||||||
* @see org.springframework.aop.interceptor.ConcurrencyThrottleInterceptor
|
* @see org.springframework.aop.interceptor.ConcurrencyThrottleInterceptor
|
||||||
|
* @see org.springframework.core.concurrency.ConcurrencyLimitTemplate
|
||||||
* @see java.io.Serializable
|
* @see java.io.Serializable
|
||||||
*/
|
*/
|
||||||
@SuppressWarnings("serial")
|
@SuppressWarnings("serial")
|
||||||
|
|
|
@ -0,0 +1,111 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2002-present the original author or authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* https://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.springframework.core.concurrency;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.junit.jupiter.params.ParameterizedTest;
|
||||||
|
import org.junit.jupiter.params.provider.ValueSource;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests for {@link ConcurrencyLimitTemplate}.
|
||||||
|
*
|
||||||
|
* @author Geonhu Park
|
||||||
|
*/
|
||||||
|
class ConcurrencyLimitTemplateTests {
|
||||||
|
|
||||||
|
private static final Log logger = LogFactory.getLog(ConcurrencyLimitTemplateTests.class);
|
||||||
|
|
||||||
|
private static final int NR_OF_THREADS = 100;
|
||||||
|
|
||||||
|
private static final int NR_OF_ITERATIONS = 1000;
|
||||||
|
|
||||||
|
@ParameterizedTest
|
||||||
|
@ValueSource(ints = {1, 10})
|
||||||
|
void multipleThreadsWithLimit(int concurrencyLimit) {
|
||||||
|
ConcurrencyLimitTemplate template = new ConcurrencyLimitTemplate(concurrencyLimit);
|
||||||
|
|
||||||
|
Thread[] threads = new Thread[NR_OF_THREADS];
|
||||||
|
for (int i = 0; i < NR_OF_THREADS; i++) {
|
||||||
|
threads[i] = new ConcurrencyThread(template, null);
|
||||||
|
threads[i].start();
|
||||||
|
}
|
||||||
|
for (int i = 0; i < NR_OF_THREADS / 10; i++) {
|
||||||
|
try {
|
||||||
|
Thread.sleep(5);
|
||||||
|
}
|
||||||
|
catch (InterruptedException ex) {
|
||||||
|
ex.printStackTrace();
|
||||||
|
}
|
||||||
|
threads[i] = new ConcurrencyThread(template,
|
||||||
|
(i % 2 == 0 ? new OutOfMemoryError() : new IllegalStateException()));
|
||||||
|
threads[i].start();
|
||||||
|
}
|
||||||
|
for (Thread t : threads) {
|
||||||
|
try {
|
||||||
|
t.join();
|
||||||
|
}
|
||||||
|
catch (InterruptedException ex) {
|
||||||
|
ex.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class ConcurrencyThread extends Thread {
|
||||||
|
|
||||||
|
private final ConcurrencyLimitTemplate template;
|
||||||
|
private final Throwable ex;
|
||||||
|
|
||||||
|
ConcurrencyThread(ConcurrencyLimitTemplate template, Throwable ex) {
|
||||||
|
this.template = template;
|
||||||
|
this.ex = ex;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
if (this.ex != null) {
|
||||||
|
try {
|
||||||
|
this.template.execute(() -> {
|
||||||
|
throw this.ex;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
catch (RuntimeException | Error err) {
|
||||||
|
if (err == this.ex) {
|
||||||
|
logger.info("Expected exception thrown", err);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
ex.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (Throwable th) {
|
||||||
|
th.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
for (int i = 0; i < NR_OF_ITERATIONS; i++) {
|
||||||
|
try {
|
||||||
|
this.template.execute(() -> null);
|
||||||
|
}
|
||||||
|
catch (Throwable th) {
|
||||||
|
th.printStackTrace();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue